Source code for pygeoapi.provider.csv_

# =================================================================
#
# Authors: Tom Kralidis <tomkralidis@gmail.com>
#
# Copyright (c) 2018 Tom Kralidis
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
#
# =================================================================

from collections import OrderedDict
import csv
import itertools
import logging
import uuid
import copy

from pygeoapi.provider.base import (BaseProvider, ProviderQueryError,
                                    ProviderItemNotFoundError,
                                    ProviderSchemaError,
                                    ProviderItemAlreadyExistsError)

LOGGER = logging.getLogger(__name__)


[docs]class CSVProvider(BaseProvider): """CSV provider""" def __init__(self, provider_def): """ Initialize object :param provider_def: provider definition :returns: pygeoapi.providers.csv_.CSVProvider """ BaseProvider.__init__(self, provider_def) self.geometry_x = provider_def['geometry']['x_field'] self.geometry_y = provider_def['geometry']['y_field'] self.fields = self.get_fields() def stc(self, s): if s == 'string': return str if s == 'integer': return int if s == 'number': return float def assign_type(self, k, v): if v != '': v = self.stc(self.fields[k])(v) return v def count(self): with open(self.data) as ff: totalrows = 0 data_ = csv.DictReader(ff) for row in data_: totalrows += 1 return totalrows
[docs] def get_fields(self): """ Get provider field information (names, types) :returns: dict of fields """ with open(self.data) as ff: LOGGER.debug('Serializing DictReader') data_ = csv.DictReader(ff) fields = {} samp_data = {} for row in data_: for key in row: if key not in samp_data and row[key] != '': samp_data[key] = row[key] for key in samp_data: try: value = samp_data[key] samp_data[key] = float(value) if samp_data[key]-int(samp_data[key]) == 0: samp_data[key] = int(value) except ValueError: samp_data[key] = str(value) fields[key] = type(samp_data[key]).__name__ fields[self.geometry_x] = float.__name__ fields[self.geometry_y] = float.__name__ # convert python types to openapidoc types for field in fields: if fields[field] == 'int': fields[field] = 'integer' elif fields[field] == 'float': fields[field] = 'number' elif fields[field] == 'str': fields[field] = 'string' return fields
[docs] def _load(self, startindex=0, limit=10, resulttype='results', identifier=None, bbox=[], datetime=None, properties=[]): """ Load CSV data :param startindex: starting record to return (default 0) :param limit: number of records to return (default 10) :param resulttype: return results or hit limit (default results) :param properties: list of tuples (name, value) :returns: dict of GeoJSON FeatureCollection """ found = False result = None feature_collection = { 'type': 'FeatureCollection', 'features': [] } with open(self.data) as ff: LOGGER.debug('Serializing DictReader') data_ = csv.DictReader(ff) if resulttype == 'hits': LOGGER.debug('Returning hits only') feature_collection['numberMatched'] = len(list(data_)) return feature_collection LOGGER.debug('Slicing CSV rows') for row in itertools.islice(data_, startindex, startindex+limit): for key in row: if row[key] != '': row[key] = self.assign_type(key, row[key]) feature = {'type': 'Feature'} feature[self.id_field] = row.pop(self.id_field) feature['geometry'] = { 'type': 'Point', 'coordinates': [ float(row.pop(self.geometry_x)), float(row.pop(self.geometry_y)) ] } if self.properties: feature['properties'] = OrderedDict() for p in self.properties: try: feature['properties'][p] = row[p] except KeyError as err: LOGGER.error(err) raise ProviderQueryError() else: feature['properties'] = dict() for prop in row: feature['properties'][prop] = row[prop] if identifier is not None and \ feature[self.id_field] == \ self.assign_type(self.id_field, identifier): found = True result = feature feature_collection['features'].append(feature) feature_collection['numberMatched'] = \ len(feature_collection['features']) if identifier is not None and not found: return None elif identifier is not None and found: return result feature_collection['numberReturned'] = len( feature_collection['features']) return feature_collection
def get_unused_int_id(self): feats = self._load(limit=self.count())['features'] ids = set(map(int, set([feat[self.id_field] for feat in feats]))) id = 0 while True: if id not in ids: return id id = id + 1
[docs] def query(self, startindex=0, limit=10, resulttype='results', bbox=[], datetime=None, properties=[], sortby=[]): """ CSV query :param startindex: starting record to return (default 0) :param limit: number of records to return (default 10) :param resulttype: return results or hit limit (default results) :param bbox: bounding box [minx,miny,maxx,maxy] :param datetime: temporal (datestamp or extent) :param properties: list of tuples (name, value) :param sortby: list of dicts (property, order) :returns: dict of GeoJSON FeatureCollection """ return self._load(startindex, limit, resulttype)
[docs] def get(self, identifier): """ query CSV id :param identifier: feature id :returns: dict of single GeoJSON feature """ item = self._load(identifier=identifier) if item: for prop in item['properties']: if item['properties'][prop] == '': item['properties'][prop] = None return item else: err = 'item {} not found'.format(identifier) LOGGER.error(err) raise ProviderItemNotFoundError(err)
[docs] def create(self, new_feature): """ create a new feature item :param new_feature: new GeoJSON feature dictionary :returns: feature id """ id_field = self.id_field with open(self.data) as csv_file: curr = [{k: self.assign_type(k, v) for k, v in row.items()} for row in csv.DictReader(csv_file, skipinitialspace=True)] curr_cols = set(curr[0].keys()) - {id_field} new_cols = set(new_feature['properties'].keys()).\ union({'lat', 'long'}) - {id_field} # copy properties of the given data feature = copy.deepcopy(new_feature['properties']) # copy latitude and longitude of given data feature['lat'] = new_feature['geometry']['coordinates'][1] feature['long'] = new_feature['geometry']['coordinates'][0] # initialize id field feature[id_field] = None # set id if present in request if id_field in new_feature: feature[id_field] = new_feature[id_field] elif id_field in new_feature['properties']: feature[id_field] = new_feature['properties'][id_field] # check if id already exist if feature[id_field] is not None: for f in curr: if f[id_field] == self.\ assign_type(id_field, feature[id_field]): err = 'provider item {} already exists'\ .format(feature[id_field]) LOGGER.error(err) raise ProviderItemAlreadyExistsError(err) else: # set a randomly generated id samp_feat = self.query()['features'][0] try: int(samp_feat[id_field]) feature[id_field] = self.get_unused_int_id() except ValueError: feature[id_field] = str(uuid.uuid4()) # if given data has extra properties not in schema if bool(new_cols - curr_cols): err = 'properties {} not prescent in provider schema'\ .format(new_cols - curr_cols) LOGGER.error(err) raise ProviderSchemaError(err) # append given data to provider data items curr.append(feature) # writing new set of data items to csv file try: with open(self.data, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=curr[0].keys()) writer.writeheader() for data in curr: writer.writerow(data) except IOError: LOGGER.error("I/O error") # return id of the new data item added return feature[id_field]
[docs] def replace(self, identifier, new_feature): """ replace an existing feature item with new_feature item :param identifier: feature id :param new_feature: new GeoJSON feature dictionary """ id_field = self.id_field with open(self.data) as csv_file: curr = [{k: self.assign_type(k, v) for k, v in row.items()} for row in csv.DictReader(csv_file, skipinitialspace=True)] curr_cols = set(curr[0].keys()) - {id_field} new_cols = set(new_feature['properties'].keys())\ .union({'lat', 'long'}) - {id_field} index = -1 for i, f in enumerate(curr): if f[id_field] == self.assign_type(id_field, identifier): index = i # if given identifier is not prescent in the data provider if index == -1: err = 'item {} not found'.format(identifier) LOGGER.error(err) raise ProviderItemNotFoundError(err) # if given data item has extra properties not in schema if bool(new_cols - curr_cols): err = 'properties {} not prescent in provider schema'\ .format(new_cols - curr_cols) LOGGER.error(err) raise ProviderSchemaError(err) # copy properties of the given data feature = new_feature['properties'] for k in feature: feature[k] = feature[k] # copy latitude and longitude of given data feature['lat'] = new_feature['geometry']['coordinates'][1] feature['long'] = new_feature['geometry']['coordinates'][0] # set id of given data feature[id_field] = identifier # set empty for remaining attributes for attrib in curr_cols - new_cols: feature[attrib] = '' # replace the data items curr[index] = feature # clean up empty attributes remove_set = set() for attrib in curr[0].keys(): empt = True for feature in curr: if feature[attrib] != '': empt = False break if empt: remove_set.add(attrib) for attrib in remove_set: for feature in curr: feature.pop(attrib) new_fields = set(curr[0].keys())-remove_set # writing new set of data items to csv file try: with open(self.data, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=new_fields) writer.writeheader() for data in curr: writer.writerow(data) except IOError: LOGGER.error("I/O error") self.fields = self.get_fields()
[docs] def update(self, identifier, updates): """ update an existing feature item :param identifier: feature id :param updates: updates dictionary :returns: feature item """ id_field = self.id_field with open(self.data) as csv_file: curr = [{k: self.assign_type(k, v) for k, v in row.items()} for row in csv.DictReader(csv_file, skipinitialspace=True)] found_feature = False for index, feature in enumerate(curr): if feature[id_field] == self.assign_type(id_field, identifier): found_feature = True break if not found_feature: err = 'item {} not found'.format(identifier) LOGGER.error(err) raise ProviderItemNotFoundError(err) else: add_set = set() if 'add' in updates: # add the attribute if its not in the feature for name_val_pair in updates['add']: name = name_val_pair['name'] value = name_val_pair['value'] add_set.add(name) if name not in feature: feature[name] = value else: err = 'property {} already exists'.format(name) LOGGER.error(err) raise ProviderSchemaError(err) if 'modify' in updates: # modify an attribute only if its already prescent for name_val_pair in updates['modify']: name = name_val_pair['name'] value = name_val_pair['value'] if name in feature and name not in {id_field, 'lat', 'long'}: feature[name] = value else: err = 'property {} already exists'.format(name) LOGGER.error(err) raise ProviderSchemaError(err) if 'remove' in updates: # delete an attribute only if its prescent for name in updates['remove']: if name in feature: feature[name] = '' else: err = 'property {} already exists for given provider \ item'.format(name) LOGGER.error(err) raise ProviderSchemaError(err) curr[index] = feature # clean up empty attributes remove_set = set() for attrib in curr[0].keys(): empt = True for feature in curr: if feature[attrib] != '': empt = False break if empt: remove_set.add(attrib) for attrib in remove_set: for feature in curr: feature.pop(attrib) # writing new set of data items to csv file new_fields = set(curr[0].keys()).union(add_set)-remove_set try: with open(self.data, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=new_fields) writer.writeheader() for data in curr: writer.writerow(data) except IOError: LOGGER.error("I/O error") self.fields = self.get_fields() return self.get(identifier)
[docs] def delete(self, identifier): """ deletes an existing feature item :param identifier: feature id """ id_field = self.id_field with open(self.data) as csv_file: curr = [{k: self.assign_type(k, v) for k, v in row.items()} for row in csv.DictReader(csv_file, skipinitialspace=True)] found_feature = False for index, feature in enumerate(curr): if feature[id_field] == self.assign_type(id_field, identifier): found_feature = True break if not found_feature: err = 'item {} not found'.format(identifier) LOGGER.error(err) raise ProviderItemNotFoundError(err) else: curr.pop(index) # clean up empty attributes remove_set = set() for attrib in curr[0].keys(): empt = True for feature in curr: if feature[attrib] != '': empt = False break if empt: remove_set.add(attrib) for attrib in remove_set: for feature in curr: feature.pop(attrib) new_fields = set(curr[0].keys())-remove_set try: with open(self.data, 'w') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=new_fields) writer.writeheader() for data in curr: writer.writerow(data) except IOError: LOGGER.error("I/O error") self.fields = self.get_fields()
def __repr__(self): return '<CSVProvider> {}'.format(self.data)