程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
您现在的位置: 程式師世界 >> 編程語言 >  >> 更多編程語言 >> Python

[Python GeoServer service release and style update]

編輯:Python

Recent project needs , be based on geoserver Developed a set of service release related api,CSDN Record your life .

No nonsense on the code :

1、 adopt PG Of jdbc Connection parameters , Generate geoserver rest Creating a data store requires xml

def gen_conn_xml(self,jdbc_url):
'''
adopt PG Of jdbc Connection parameters , Generate geoserver rest Creating a data store requires xml
'''
#jdbc:postgresql://192.168.10.28:5432/rmbs?user=a&password=b
_host = jdbc_url.split('//')[1].split(':')[0]
_port = jdbc_url.split('//')[1].split(':')[1].split('/')[0]
_database = jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[0]
res = {}
for kv in jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[1].split('&'):
res[kv.split('=')[0]]= kv.split('=')[1]
#print(host,port,db,res)
_user = res['user'] if 'user' in res else ''
_password = res['password'] if 'password' in res else ''
_data = [
'<connectionParameters>',
'<host>'+_host+'</host>',
'<port>'+_port+'</port>',
'<database>'+_database+'</database>',
'<user>'+_user+'</user>',
'<passwd>'+_password+'</passwd>',
'<dbtype>postgis</dbtype>',
'</connectionParameters>'
]
return ''.join(_data)

 2、 Get all workspaces , return json data

def get_workspaces(self):
"""
Get all workspaces , return json data
"""
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None

3、  Get the specified workspace , return json data

def get_workspace(self,ws_name):
"""
Get the specified workspace , return json data
"""
_curl = self.gen_url('workspaces/%s' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None

4、  Create the specified workspace , return true/false

def create_workspace(self,ws_name):
"""
Create the specified workspace , return true/false
"""
_data = '<workspace><name>' + ws_name + '</name></workspace>'
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ws_name:
return True
else:
print(_res)
return False 

5、 According to the workspace name , Get data store

def get_datastores(self,ws_name):
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None

6、 Create a new data store

def create_datastore(self,ws_name,ds_name,jdbc_url):
_data = ''.join([
'<dataStore>',
'<name>'+ds_name+'</name>',
self.gen_conn_xml(jdbc_url),
'</dataStore>'])
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ds_name:
return True
else:
print(_res)
return False 

 7、 Get the service layer according to the workspace

def get_layers(self,ws_name):
_curl = self.gen_url('workspaces/%s/layers' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None

8、 Get the layer of the specified workspace and service name

def get_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such layer' in _res:
print(_res)
return None

 9、 Get published services

def get_featureType(self,ws_name,ds_name,featureTypeName):
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json',
'Accept':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such feature type' in _res:
print(_res)
return None

10、 Publish layer services

def publish_layer(self,ws_name,ds_name,layerName):
_data = ''.join([
'<featureType>',
'<name>'+layerName+'</name>',
'<nativeName>'+layerName+'</nativeName>',
'</featureType>'])
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes' % (ws_name,ds_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False 

 11、 Update service style

def update_layer_style(self,ws_name,layerName,style_name):
_data = ''.join([
'<layer>',
'<defaultStyle>',
'<name>' + style_name + '</name>',
'</defaultStyle>',
'<styles>',
'<style><name>polygon</name></style>',
'<style><name>' + style_name + '</name></style>',
'</styles>',
'</layer>'])
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/xml'
}
_res = self.put(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False 

 12、 Delete layer

def delete_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None

 13、 Delete service

def delete_featureType(self,ws_name,ds_name,featureTypeName):
''' Unpublish '''
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res) == 0:
return True
else:
print('delete_featureType->',_res)
return None

14、 Get style by name

def get_style(self, style_name, workspace=None):
try:
url = "{}/styles/{}.json".format(self.rest_root, style_name)
if workspace is not None:
url = "{}/workspaces/{}/styles/{}.json".format(self.rest_root, workspace, style_name)
headers = {
"Content-type": "application/json",
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
}
r = requests.get(url, auth=self.auth, headers=headers)
if r.ok and int(r.status_code) == 200:
return True
else:
return False
except Exception as e:
return False

 15、 Get all styles

def get_styles(self, workspace=None):
try:
url = "{}/styles.json".format(self.rest_root)
if workspace is not None:
url = "{}/workspaces/{}/styles.json".format(self.rest_root, workspace)
r = requests.get(url, auth=self.auth)
return r.json()
except Exception as e:
return "get_styles error: {}".format(e)

16、 Add a new style

def add_new_style(self, path, name=None, workspace=None, sld_version="1.1.0"):
if name is None:
name = os.path.basename(path)
f = name.split(".")
name = f[0] if len(f) > 0 else name
headers = {"content-type": "text/xml"}
url = "{}/workspaces/{}/styles".format(self.rest_root, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
if sld_version == "1.1.0" or sld_version == "1.1":
sld_content_type = "application/vnd.ogc.se+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
# workspace = "default"
url = "{}/styles".format(self.rest_root)
style_xml = "<style><name>{0}</name><filename>{0}.sld</filename></style>".format(name)
try:
r = requests.post(url,data=style_xml,auth=self.auth,headers=headers)
with open(path, "rb") as f:
r_sld = requests.put(url + "/" + name,data=f.read(),auth=self.auth,headers=header_sld)
if r_sld.status_code not in [200, 201]:
return r.ok, r.status_code, r.content
return r_sld.ok, r_sld.status_code
except Exception as e:
return False,str(e)

 17、 Delete style by name

def delete_style(self, style_name, workspace = None):
try:
payload = {"recurse": "true"}
url = "{}/workspaces/{}/styles/{}".format(self.rest_root, workspace, style_name)
if workspace is None:
url = "{}/styles/{}".format(self.rest_root, style_name)
r = requests.delete(url, auth=self.auth, params=payload)
if r.status_code == 200:
return True
else:
raise False
except Exception as e:
return False

  Code summary :

  Invoke the sample :

# -*-coding:utf-8-*-
import sys,os
_root_path = os.path.abspath(os.path.join(os.path.dirname(__file__),".."))
print(_root_path)
sys.path.append(_root_path)
import time
import json
import requests
# Define global configuration information
from Cfg import Cfg
cfg = Cfg()
class GeoServerUtil():
def __init__(self,root=None,auth=None) -> None:
self.rest_root = root if root != None else cfg.geoserver_root + '/geoserver/rest'
self.auth = auth if auth != None else cfg.geoserver_auth # ('admin','geoserver')
def gen_url(self,_api=''):
return '%s/%s' % (self.rest_root,_api)
def gen_conn_xml(self,jdbc_url):
'''
adopt PG Of jdbc Connection parameters , Generate geoserver rest Creating a data store requires xml
'''
#jdbc:postgresql://192.168.10.28:5432/rmbs?user=a&password=b
_host = jdbc_url.split('//')[1].split(':')[0]
_port = jdbc_url.split('//')[1].split(':')[1].split('/')[0]
_database = jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[0]
res = {}
for kv in jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[1].split('&'):
res[kv.split('=')[0]]= kv.split('=')[1]
#print(host,port,db,res)
_user = res['user'] if 'user' in res else ''
_password = res['password'] if 'password' in res else ''
_data = [
'<connectionParameters>',
'<host>'+_host+'</host>',
'<port>'+_port+'</port>',
'<database>'+_database+'</database>',
'<user>'+_user+'</user>',
'<passwd>'+_password+'</passwd>',
'<dbtype>postgis</dbtype>',
'</connectionParameters>'
]
return ''.join(_data)
def get(self,_url,_headers):
print(_url)
_res = requests.get(_url,auth=self.auth,headers=_headers)
#print(_res.text)
return _res.text
def post(self,_url,_headers,_data):
print(_url)
print(_data)
_res = requests.post(_url,auth=self.auth,headers=_headers,data=_data)
#print(_res.text)
return _res.text
def put(self,_url,_headers,_data):
print(_url)
print(_data)
_res = requests.put(_url,auth=self.auth,headers=_headers,data=_data)
#print(_res.text)
return _res.text
def delete(self,_url,_headers,_data):
print(_url)
print(_data)
_res = requests.delete(_url,auth=self.auth,headers=_headers,data=_data)
#print(_res.text)
return _res.text
def get_workspaces(self):
"""
Get all workspaces , return json data
"""
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def get_workspace(self,ws_name):
"""
Get the specified workspace , return json data
"""
_curl = self.gen_url('workspaces/%s' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def create_workspace(self,ws_name):
"""
Create the specified workspace , return true/false
"""
_data = '<workspace><name>' + ws_name + '</name></workspace>'
_curl = self.gen_url('workspaces')
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ws_name:
return True
else:
print(_res)
return False
def get_datastores(self,ws_name):
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def get_store(self,ws_name,ds_name):
_curl = self.gen_url('workspaces/%s/datastores/%s' % (ws_name,ds_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def create_datastore(self,ws_name,ds_name,jdbc_url):
_data = ''.join([
'<dataStore>',
'<name>'+ds_name+'</name>',
self.gen_conn_xml(jdbc_url),
'</dataStore>'])
_curl = self.gen_url('workspaces/%s/datastores' % (ws_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res==ds_name:
return True
else:
print(_res)
return False
def get_layers(self,ws_name):
_curl = self.gen_url('workspaces/%s/layers' % (ws_name))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def get_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such layer' in _res:
print(_res)
return None
def get_featureType(self,ws_name,ds_name,featureTypeName):
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json',
'Accept':'application/json'
}
_res = self.get(_curl,_headers)
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
if not 'No such feature type' in _res:
print(_res)
return None
def publish_layer(self,ws_name,ds_name,layerName):
_data = ''.join([
'<featureType>',
'<name>'+layerName+'</name>',
'<nativeName>'+layerName+'</nativeName>',
'</featureType>'])
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes' % (ws_name,ds_name))
_headers = {
'Content-Type':'application/xml'
}
_res = self.post(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False
def update_layer_style(self,ws_name,layerName,style_name):
_data = ''.join([
'<layer>',
'<defaultStyle>',
'<name>' + style_name + '</name>',
'</defaultStyle>',
'<styles>',
'<style><name>polygon</name></style>',
'<style><name>' + style_name + '</name></style>',
'</styles>',
'</layer>'])
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/xml'
}
_res = self.put(_curl,_headers,_data)
if _res=='':
return True
else:
print(_res)
return False
def delete_layer(self,ws_name,layerName):
_curl = self.gen_url('workspaces/%s/layers/%s' % (ws_name,layerName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res)>0 and _res[0]=='{':
return json.loads(_res)
else:
print(_res)
return None
def delete_featureType(self,ws_name,ds_name,featureTypeName):
''' Unpublish '''
_curl = self.gen_url('workspaces/%s/datastores/%s/featuretypes/%s' % (ws_name,ds_name,featureTypeName))
_headers = {
'Content-Type':'application/json'
}
_res = self.delete(_curl,_headers,{"recurse":True})
if len(_res) == 0:
return True
else:
print('delete_featureType->',_res)
return None
# Get style by name
def get_style(self, style_name, workspace=None):
try:
url = "{}/styles/{}.json".format(self.rest_root, style_name)
if workspace is not None:
url = "{}/workspaces/{}/styles/{}.json".format(self.rest_root, workspace, style_name)
headers = {
"Content-type": "application/json",
"Accept": "*/*",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36"
}
r = requests.get(url, auth=self.auth, headers=headers)
if r.ok and int(r.status_code) == 200:
return True
else:
return False
except Exception as e:
return False
def get_styles(self, workspace=None):
try:
url = "{}/styles.json".format(self.rest_root)
if workspace is not None:
url = "{}/workspaces/{}/styles.json".format(self.rest_root, workspace)
r = requests.get(url, auth=self.auth)
return r.json()
except Exception as e:
return "get_styles error: {}".format(e)
# Add a new style
def add_new_style(self, path, name=None, workspace=None, sld_version="1.1.0"):
if name is None:
name = os.path.basename(path)
f = name.split(".")
name = f[0] if len(f) > 0 else name
headers = {"content-type": "text/xml"}
url = "{}/workspaces/{}/styles".format(self.rest_root, workspace)
sld_content_type = "application/vnd.ogc.sld+xml"
if sld_version == "1.1.0" or sld_version == "1.1":
sld_content_type = "application/vnd.ogc.se+xml"
header_sld = {"content-type": sld_content_type}
if workspace is None:
# workspace = "default"
url = "{}/styles".format(self.rest_root)
style_xml = "<style><name>{0}</name><filename>{0}.sld</filename></style>".format(name)
try:
r = requests.post(url,data=style_xml,auth=self.auth,headers=headers)
with open(path, "rb") as f:
r_sld = requests.put(url + "/" + name,data=f.read(),auth=self.auth,headers=header_sld)
if r_sld.status_code not in [200, 201]:
return r.ok, r.status_code, r.content
return r_sld.ok, r_sld.status_code
except Exception as e:
return False,str(e)
# Delete style by name
def delete_style(self, style_name, workspace = None):
try:
payload = {"recurse": "true"}
url = "{}/workspaces/{}/styles/{}".format(self.rest_root, workspace, style_name)
if workspace is None:
url = "{}/styles/{}".format(self.rest_root, style_name)
r = requests.delete(url, auth=self.auth, params=payload)
if r.status_code == 200:
return True
else:
raise False
except Exception as e:
return False
class Pg2GeoServer_publishmap():
def get_pg_cfg(self,jdbc_url):
#jdbc:postgresql://192.168.10.28:5432/rmbs?user=a&password=b
_host = jdbc_url.split('//')[1].split(':')[0]
_port = jdbc_url.split('//')[1].split(':')[1].split('/')[0]
_database = jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[0]
res = {}
for kv in jdbc_url.split('//')[1].split(':')[1].split('/')[1].split('?')[1].split('&'):
res[kv.split('=')[0]]= kv.split('=')[1]
#print(host,port,db,res)
res['host']=_host
res['port']=_port
res['database']=_database
return res
def publish(self,rid,ws_name,ds_name,layer_name,layer_name2,style_name,group_name):
"""
Publishing services , When layer_name2 Use when empty layer_name To publish \n
layer_name Used to determine the data source during scheme configuration
"""
print(' Service publishing in progress ...')
cfg.WriteLog(rid," Service publishing in progress ...%s" % (''))
url_1 = layer_name.split('|')[0]
dbtable_1 = layer_name.split('|')[1]
schema_1 = 'public'
if layer_name2!=None and len(layer_name2)>0:
dbtable_1 = layer_name2
# If the layer has a schema , Then remove the schema name
if '.' in dbtable_1:
schema_1 = dbtable_1.split('.')[0]
dbtable_1 = dbtable_1.split('.')[1]
print(url_1,dbtable_1)
#print(' Connect to GeoServer...')
#cfg.WriteLog(rid," Connect to GeoServer...%s" % (''))
cat = GeoServerUtil() #Catalog(geoserver_rest, username = user, password = passwd)
ws = cat.get_workspace (ws_name)
if ws is None:
print(' Create workspace ...')
cfg.WriteLog(rid," Create workspace ...%s" % (ws_name))
ws = cat.create_workspace(ws_name)
ds = cat.get_store (ws_name, ds_name)
if ds is None:
print(' Create a data store ...')
cfg.WriteLog(rid," Create a data store ...%s" % (ds_name))
ds = cat.create_datastore(ws_name,ds_name,url_1)
#pg_cfg = self.get_pg_cfg(url_1)
#ds.connection_parameters.update (host=pg_cfg['host'], port=pg_cfg['port'], database=pg_cfg['database'], user=pg_cfg['user'], passwd=pg_cfg['password'], dbtype='postgis', schema=schema_1)
#cat.save(ds)
b = False
layer = cat.get_featureType(ws_name,ds_name,dbtable_1)
while(not layer is None):
print(' Layer service already exists , Ready to delete ...')
cfg.WriteLog(rid," Layer service already exists , Ready to delete ...%s" % (dbtable_1))
k = cat.delete_featureType(ws_name,ds_name,dbtable_1)
print('k->',k)
k = cat.delete_layer(ws_name,dbtable_1)
print('k->',k)
time.sleep(1)
layer = cat.get_featureType(ws_name,ds_name,dbtable_1)
print("===",layer)
if layer is None:
#print(' Get the layer coordinate system ...')
#cfg.WriteLog(rid," Get the layer coordinate system ...%s" % (''))
#res_json = PostgresqlUtilClass(url_1).get_geometry_columns(dbtable_1)
#print(res_json)
print(' Publishing services ...')
cfg.WriteLog(rid," Publishing services ...%s" % (dbtable_1))
#_srs = 'EPSG:%s' % (res_json['srid'])
#ft = cat.publish_featuretype(dbtable_1, ds, _srs, srs=_srs)
b = cat.publish_layer(ws_name,ds_name,dbtable_1)
if style_name != '':
print(" Update service style ...")
cfg.WriteLog(rid," Update service style ...%s" % (style_name))
b = cat.update_layer_style(ws_name,dbtable_1,style_name)
print(" Service release completed !%s" % (''))
cfg.WriteLog(rid," Service release completed !%s" % (''))
return b
def autoPush_tif(self,geoserverWorkeName, fileName):
geoserver = GeoServerUtil()
geocat = Catalog(geoserver.rest_root,username=geoserver.auth[0], password=geoserver.auth[-1])
namespace = geoserverWorkeName
# Grid data to publish
train_path = fileName
ws = geocat.get_workspace(geoserverWorkeName)
if ws is None and geoserverWorkeName:
geocat.create_workspace(geoserverWorkeName,'http://{0}'.format(geoserverWorkeName))
store_name = train_path.replace('\\','/').split('/')[-1].split('.')[0]
geostore = geocat.create_coveragestore3(store_name,train_path,namespace)
print(store_name)
print(geostore)
print(1)
if __name__ == '__main__':
print('Begin...')
b = Pg2GeoServer_publishmap().autoPush_tif('tifserver', r'file:D:\Data\idata-gisserver\SGis-Srv\tmp_dir\8407dc28-86b7-4adb-24ac-a434c03423af\sample_downFiles\JUU2JTk1JUIwJUU2JThEJUFFJUU2JTk2JTg3JUU0JUJCJUI2\c2FtcGxl\img_05.tif')
b = Pg2GeoServer_publishmap().autoPush_tif('ws_sgis1234','D:/data/tif/sfdem.tif')
'''k = GeoServerUtil().get_featureType('sde','rmbs','pmml_35272_110_35273_0')
print(k)'''
print('Ok!')
if b == False:
exit(1)


  1. 上一篇文章:
  2. 下一篇文章:
Copyright © 程式師世界 All Rights Reserved