Source code for pygeoapi.provider.postgresql

# =================================================================
#
# Authors: Jorge Samuel Mendes de Jesus <jorge.dejesus@protonmail.com>
#          Tom Kralidis <tomkralidis@gmail.com>
#          Mary Bucknell <mbucknell@usgs.gov>
#
# Copyright (c) 2018 Jorge Samuel Mendes de Jesus
# Copyright (c) 2019 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

# Testing local docker:
# docker run --name "postgis" \
# -v postgres_data:/var/lib/postgresql -p 5432:5432 \
# -e ALLOW_IP_RANGE=0.0.0.0/0 \
# -e POSTGRES_USER=postgres \
# -e POSTGRES_PASS=postgres \
# -e POSTGRES_DBNAME=test \
# -d -t kartoza/postgis

# Import dump:
# gunzip < tests/data/hotosm_bdi_waterways.sql.gz |
#  psql -U postgres -h 127.0.0.1 -p 5432 test

import logging
import json
import psycopg2
import uuid
from psycopg2.sql import SQL, Identifier, Literal
from pygeoapi.provider.base import BaseProvider, \
    ProviderConnectionError, ProviderQueryError, ProviderItemNotFoundError, \
    ProviderItemAlreadyExistsError, ProviderSchemaError

from psycopg2.extras import RealDictCursor

LOGGER = logging.getLogger(__name__)


[docs]class DatabaseConnection: """Database connection class to be used as 'with' statement. The class returns a connection object. """ def __init__(self, conn_dic, table, context="query"): """ PostgreSQLProvider Class constructor returning :param conn: dictionary with connection parameters to be used by psycopg2 dbname – the database name (database is a deprecated alias) user – user name used to authenticate password – password used to authenticate host – database host address (defaults to UNIX socket if not provided) port – connection port number (defaults to 5432 if not provided) search_path – search path to be used (by order) , normally data is in the public schema, [public], or in a specific schema ["osm", "public"]. Note: First we should have the schema being used and then public :param table: table name containing the data. This variable is used to assemble column information :param context: query or hits, if query then it will determine table column otherwise will not do it :returns: psycopg2.extensions.connection """ self.conn_dic = conn_dic self.table = table self.context = context self.columns = None self.fields = {} # Dict of columns. Key is col name, value is type self.conn = None def __enter__(self): try: search_path = self.conn_dic.pop('search_path', ['public']) if search_path != ['public']: self.conn_dic["options"] = '-c \ search_path={}'.format(",".join(search_path)) LOGGER.debug('Using search path: {} '.format(search_path)) self.conn = psycopg2.connect(**self.conn_dic) self.conn.set_client_encoding('utf8') except psycopg2.OperationalError: LOGGER.error("Couldn't connect to Postgis using:{}".format( str(self.conn_dic))) raise ProviderConnectionError() self.cur = self.conn.cursor() if self.context == 'query': # Getting columns query_cols = "SELECT column_name, udt_name FROM information_schema.columns \ WHERE table_name = '{}' and udt_name != 'geometry';".format( self.table) self.cur.execute(query_cols) result = self.cur.fetchall() self.columns = SQL(', ').join( [Identifier(item[0]) for item in result] ) self.fields = dict(result) return self def __exit__(self, exc_type, exc_val, exc_tb): # some logic to commit/rollback self.conn.close()
[docs]class PostgreSQLProvider(BaseProvider): """Generic provider for Postgresql based on psycopg2 using sync approach and server side cursor (using support class DatabaseCursor) """ def __init__(self, provider_def): """ PostgreSQLProvider Class constructor :param provider_def: provider definitions from yml pygeoapi-config. data,id_field, name set in parent class data contains the connection information for class DatabaseCursor :returns: pygeoapi.providers.base.PostgreSQLProvider """ BaseProvider.__init__(self, provider_def) self.table = provider_def['table'] self.id_field = provider_def['id_field'] self.conn_dic = provider_def['data'] self.geom = provider_def.get('geom_field', 'geom') LOGGER.debug('Setting Postgresql properties:') LOGGER.debug('Connection String:{}'.format( ",".join(("{}={}".format(*i) for i in self.conn_dic.items())))) LOGGER.debug('Name:{}'.format(self.name)) LOGGER.debug('ID_field:{}'.format(self.id_field)) LOGGER.debug('Table:{}'.format(self.table)) LOGGER.debug('Get available fields/properties') self.get_fields()
[docs] def get_fields(self): """ Get fields from PostgreSQL table (columns are field) :returns: dict of fields """ if not self.fields: with DatabaseConnection(self.conn_dic, self.table) as db: self.fields = db.fields if 'natural' in self.fields: self.fields['"natural"'] = self.fields['natural'] self.fields.pop('natural') ''' print('----------------------------------') print(self.fields) print('----------------------------------') assert 1 == 2 ''' return self.fields
def __get_where_clauses(self, properties=[], bbox=[]): """ Generarates WHERE conditions to be implemented in query. Private method mainly associated with query method :param properties: list of tuples (name, value) :param bbox: bounding box [minx,miny,maxx,maxy] :returns: psycopg2.sql.Composed or psycopg2.sql.SQL """ where_conditions = [] if properties: property_clauses = [SQL('{} = {}').format( Identifier(k), Literal(v)) for k, v in properties] where_conditions += property_clauses if bbox: bbox_clause = SQL('{} && ST_MakeEnvelope({})').format( Identifier(self.geom), SQL(', ').join( [Literal(bbox_coord) for bbox_coord in bbox])) where_conditions.append(bbox_clause) if where_conditions: where_clause = SQL(' WHERE {}').format( SQL(' AND ').join(where_conditions)) else: where_clause = SQL('') return where_clause
[docs] def query(self, startindex=0, limit=10, resulttype='results', bbox=[], datetime=None, properties=[], sortby=[]): """ Query Postgis for all the content. e,g: http://localhost:5000/collections/hotosm_bdi_waterways/items? limit=1&resulttype=results :param startindex: starting record to return (default 0) :param limit: number of records to return (default 10) :param resulttype: return results or hit limit (default results) :param bbox: bounding box [minx,miny,maxx,maxy] :param datetime: temporal (datestamp or extent) :param properties: list of tuples (name, value) :param sortby: list of dicts (property, order) :returns: GeoJSON FeaturesCollection """ LOGGER.debug('Querying PostGIS') if resulttype == 'hits': with DatabaseConnection(self.conn_dic, self.table, context="hits") as db: cursor = db.conn.cursor(cursor_factory=RealDictCursor) where_clause = self.__get_where_clauses( properties=properties, bbox=bbox) sql_query = SQL("SELECT COUNT(*) as hits from {} {}").\ format(Identifier(self.table), where_clause) try: cursor.execute(sql_query) except Exception as err: LOGGER.error('Error executing sql_query: {}: {}'.format( sql_query.as_string(cursor), err)) raise ProviderQueryError() hits = cursor.fetchone()["hits"] return self.__response_feature_hits(hits) end_index = startindex + limit with DatabaseConnection(self.conn_dic, self.table) as db: cursor = db.conn.cursor(cursor_factory=RealDictCursor) where_clause = self.__get_where_clauses( properties=properties, bbox=bbox) sql_query = SQL("DECLARE \"geo_cursor\" CURSOR FOR \ SELECT DISTINCT {},ST_AsGeoJSON({}) FROM {}{}").\ format(db.columns, Identifier(self.geom), Identifier(self.table), where_clause) LOGGER.debug('SQL Query: {}'.format(sql_query.as_string(cursor))) LOGGER.debug('Start Index: {}'.format(startindex)) LOGGER.debug('End Index: {}'.format(end_index)) try: cursor.execute(sql_query) for index in [startindex, limit]: cursor.execute("fetch forward {} from geo_cursor" .format(index)) except Exception as err: LOGGER.error('Error executing sql_query: {}'.format( sql_query.as_string(cursor))) LOGGER.error(err) raise ProviderQueryError() row_data = cursor.fetchall() feature_collection = { 'type': 'FeatureCollection', 'features': [] } for rd in row_data: feature_collection['features'].append( self.__response_feature(rd)) return feature_collection
[docs] def get_previous(self, cursor, identifier): """ Query previous ID given current ID :param identifier: feature id :returns: feature id """ sql = 'SELECT {} AS id FROM {} WHERE {}<%s ORDER BY {} DESC LIMIT 1' cursor.execute(SQL(sql).format( Identifier(self.id_field), Identifier(self.table), Identifier(self.id_field), Identifier(self.id_field), ), (identifier,)) item = cursor.fetchall() id_ = item[0]['id'] if item else identifier return id_
[docs] def get_next(self, cursor, identifier): """ Query next ID given current ID :param identifier: feature id :returns: feature id """ sql = 'SELECT {} AS id FROM {} WHERE {}>%s ORDER BY {} LIMIT 1' cursor.execute(SQL(sql).format( Identifier(self.id_field), Identifier(self.table), Identifier(self.id_field), Identifier(self.id_field), ), (identifier,)) item = cursor.fetchall() id_ = item[0]['id'] if item else identifier return id_
[docs] def get(self, identifier): """ Query the provider for a specific feature id e.g: /collections/hotosm_bdi_waterways/items/13990765 :param identifier: feature id :returns: GeoJSON FeaturesCollection """ LOGGER.debug('Get item from Postgis') with DatabaseConnection(self.conn_dic, self.table) as db: cursor = db.conn.cursor(cursor_factory=RealDictCursor) sql_query = SQL("SELECT {},ST_AsGeoJSON({}) \ from {} WHERE {}=%s").format(db.columns, Identifier(self.geom), Identifier(self.table), Identifier(self.id_field)) LOGGER.debug('SQL Query: {}'.format(sql_query.as_string(db.conn))) LOGGER.debug('Identifier: {}'.format(identifier)) try: cursor.execute(sql_query, (identifier, )) except Exception as err: LOGGER.error('Error executing sql_query: {}'.format( sql_query.as_string(cursor))) LOGGER.error(err) raise ProviderQueryError() results = cursor.fetchall() row_data = None if results: row_data = results[0] feature = self.__response_feature(row_data) if feature: feature['prev'] = self.get_previous(cursor, identifier) feature['next'] = self.get_next(cursor, identifier) return feature else: err = 'item {} not found'.format(identifier) LOGGER.error(err) raise ProviderItemNotFoundError(err)
def __response_feature(self, row_data): """ Assembles GeoJSON output from DB query :param row_data: DB row result :returns: `dict` of GeoJSON Feature """ if row_data: rd = dict(row_data) feature = { 'type': 'Feature' } feature["geometry"] = json.loads( rd.pop('st_asgeojson')) feature['properties'] = rd feature['id'] = feature['properties'].get(self.id_field) return feature else: return None def __response_feature_hits(self, hits): """Assembles GeoJSON/Feature number e.g: http://localhost:5000/collections/ hotosm_bdi_waterways/items?resulttype=hits :returns: GeoJSON FeaturesCollection """ feature_collection = {"features": [], "type": "FeatureCollection"} feature_collection['numberMatched'] = hits return feature_collection def get_unique_id(self): id_type = self.get_fields()[self.id_field] if 'int' in id_type: id = 0 while True: try: self.get(id) id += 1 except ProviderItemNotFoundError: return id else: return str(uuid.uuid4())
[docs] def create(self, new_feature): """ Insert a new feature to provider :param new_feature: GeoJSON feature :returns: feature id """ LOGGER.debug('Inserting item into Postgis') prop = new_feature['properties'] geom = json.dumps(new_feature['geometry']) nfid = new_feature.get('id', None) or\ new_feature['properties'].get(self.id_field, None) if nfid is not None: try: self.get(nfid) err = 'provider item {} already exists'\ .format(nfid) LOGGER.error(err) raise ProviderItemAlreadyExistsError(err) except ProviderItemNotFoundError: pass else: nfid = self.get_unique_id() curr_cols = set([key for key in self.get_fields()]) new_cols = set([key for key in prop]) if bool(new_cols - curr_cols): err = 'properties {} not prescent in provider schema'\ .format(new_cols - curr_cols) LOGGER.error(err) raise ProviderSchemaError(err) # set missing properties to empty for prop in curr_cols - new_cols: new_feature['properties'][prop] = 'NULL' prop = new_feature['properties'] keys = [key for key in prop] keys_str = ','.join(keys) vals = [prop[key] for key in prop] for index in range(len(vals)): if self.get_fields()[keys[index]] == 'varchar': vals[index] = '\'{}\''.format(vals[index]) vals_str = ','.join(map(str, vals)) sql_query = SQL('INSERT INTO {0} ({1},{2}) \ VALUES ({3},ST_GeomFromGeoJSON(\'{4}\'))'.format( self.table, keys_str, self.geom, vals_str, geom)) with DatabaseConnection(self.conn_dic, self.table) as db: cursor = db.conn.cursor(cursor_factory=RealDictCursor) try: cursor.execute(sql_query) except Exception as err: LOGGER.error('Error executing sql_query: {}: {}'.format( sql_query.as_string(cursor), err)) raise ProviderQueryError() db.conn.commit() return nfid
[docs] def replace(self, identifier, feature): """ Replaces an existing feature in provider :param identifier: feature id :param feature: GeoJSON feature """ LOGGER.debug('Replacing an item in Postgis') # raise error if identifier is invalid self.get(identifier) prop = feature['properties'] geom = json.dumps(feature['geometry']) nfid = feature.get('id', None) or\ feature['properties'].get(self.id_field, None) curr_cols = set([key for key in self.get_fields()]) new_cols = set([key for key in prop]) if nfid is not None and nfid != identifier: err = 'cant change key' LOGGER.error(err) raise ProviderSchemaError(err) if bool(new_cols - curr_cols): err = 'properties {} not prescent in provider schema'\ .format(new_cols - curr_cols) LOGGER.error(err) raise ProviderSchemaError(err) # set missing properties to empty for prop in curr_cols - new_cols: feature['properties'][prop] = None prop = feature['properties'] prop.pop(self.id_field, None) keys = [key for key in prop] vals = [prop[key] for key in prop] for index in range(len(vals)): if vals[index] is None: vals[index] = 'NULL' elif 'varchar' in self.get_fields()[keys[index]]: vals[index] = '\'{}\''.format(vals[index]) keys.append(self.geom) vals.append('ST_GeomFromGeoJSON(\'{}\')'.format(geom)) updates = [i+'='+j for i, j in zip(keys, vals)] updates_string = ','.join(updates) sql_query = SQL('UPDATE {0} SET {1} WHERE {2}={3}'.format( self.table, updates_string, self.id_field, identifier)) with DatabaseConnection(self.conn_dic, self.table) as db: cursor = db.conn.cursor(cursor_factory=RealDictCursor) try: cursor.execute(sql_query) except Exception as err: LOGGER.error('Error executing sql_query: {}: {}'.format( sql_query.as_string(cursor), err)) raise ProviderQueryError() db.conn.commit()
[docs] def update(self, identifier, updates): """ Updates an existing feature to provider :param identifier: feature id :param updates: updates dict :returns: GeoJSON feature """ LOGGER.debug('Updating an item in Postgis') # raise error if identifier is invalid self.get(identifier) mods = updates['modify'] keys = [item['name'] for item in mods] vals = [item['value'] for item in mods] curr_cols = set([key for key in self.get_fields()]) new_cols = set(keys) if self.id_field in new_cols: err = 'cant change key' LOGGER.error(err) raise ProviderSchemaError(err) if bool(new_cols - curr_cols): err = 'properties {} not prescent in provider schema'\ .format(new_cols - curr_cols) LOGGER.error(err) raise ProviderSchemaError(err) for index in range(len(vals)): if self.get_fields()[keys[index]] == 'varchar': vals[index] = '\'{}\''.format(vals[index]) updates = [i+'='+j for i, j in zip(keys, vals)] updates_string = ','.join(updates) sql_query = SQL('UPDATE {0} SET {1} WHERE {2}={3}'.format( self.table, updates_string, self.id_field, identifier)) with DatabaseConnection(self.conn_dic, self.table) as db: cursor = db.conn.cursor(cursor_factory=RealDictCursor) try: cursor.execute(sql_query) except Exception as err: LOGGER.error('Error executing sql_query: {}: {}'.format( sql_query.as_string(cursor), err)) raise ProviderQueryError() db.conn.commit() return self.get(identifier)
[docs] def delete(self, identifier): """ Delet a feature from provider :param identifier: feature id """ LOGGER.debug('Deleting item from Postgis') # raise error if identifier is invalid self.get(identifier) with DatabaseConnection(self.conn_dic, self.table) as db: cursor = db.conn.cursor(cursor_factory=RealDictCursor) sql_query = SQL("DELETE FROM {} WHERE {}=%s". format(self.table, self.id_field)) LOGGER.debug('SQL Query: {}'.format(sql_query.as_string(db.conn))) LOGGER.debug('Identifier: {}'.format(identifier)) try: cursor.execute(sql_query, (identifier, )) db.conn.commit() except Exception as err: LOGGER.error('Error executing sql_query: {}'.format( sql_query.as_string(cursor))) LOGGER.error(err) raise ProviderQueryError()