Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/source/publishing/ogcapi-features.rst
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,18 @@ Must have PostGIS installed.
geom_field: foo_geom
count: true # Optional; Default true; Enable/disable count for improved performance.

This can be represented as a connection dictionary or as a connection string as follows:

.. code-block:: yaml

providers:
- type: feature
name: PostgreSQL
data: postgresql://postgres:postgres@127.0.0.1:3010/test
id_field: osm_id
table: hotosm_bdi_waterways
geom_field: foo_geom

A number of database connection options can be also configured in the provider in order to adjust properly the sqlalchemy engine client.
These are optional and if not specified, the default from the engine will be used. Please see also `SQLAlchemy docs <https://docs.sqlalchemy.org/en/14/core/engines.html#custom-dbapi-connect-arguments-on-connect-routines>`_.

Expand Down
45 changes: 18 additions & 27 deletions pygeoapi/process/manager/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
from typing import Any, Tuple

from sqlalchemy import insert, update, delete
from sqlalchemy.engine import make_url
from sqlalchemy.orm import Session

from pygeoapi.api import FORMAT_TYPES, F_JSON, F_JSONLD
Expand All @@ -56,7 +55,9 @@
ProcessorGenericError
)
from pygeoapi.process.manager.base import BaseManager
from pygeoapi.provider.sql import get_engine, get_table_model
from pygeoapi.provider.sql import (
get_engine, get_table_model, store_db_parameters
)
from pygeoapi.util import JobStatus


Expand All @@ -66,13 +67,15 @@
class PostgreSQLManager(BaseManager):
"""PostgreSQL Manager"""

default_port = 5432

def __init__(self, manager_def: dict):
"""
Initialize object

:param manager_def: manager definition

:returns: `pygeoapi.process.manager.postgresqs.PostgreSQLManager`
:returns: `pygeoapi.process.manager.postgresql.PostgreSQLManager`
"""

super().__init__(manager_def)
Expand All @@ -81,30 +84,18 @@ def __init__(self, manager_def: dict):
self.supports_subscribing = True
self.connection = manager_def['connection']

try:
self.db_search_path = tuple(self.connection.get('search_path',
['public']))
except Exception:
self.db_search_path = ('public',)

try:
LOGGER.debug('Connecting to database')
if isinstance(self.connection, str):
_url = make_url(self.connection)
self._engine = get_engine(
'postgresql+psycopg2',
_url.host,
_url.port,
_url.database,
_url.username,
_url.password)
else:
self._engine = get_engine('postgresql+psycopg2',
**self.connection)
except Exception as err:
msg = 'Test connecting to DB failed'
LOGGER.error(f'{msg}: {err}')
raise ProcessorGenericError(msg)
options = manager_def.get('options', {})
store_db_parameters(self, manager_def['connection'], options)
self._engine = get_engine(
'postgresql+psycopg2',
self.db_host,
self.db_port,
self.db_name,
self.db_user,
self._db_password,
self.db_conn,
**self.db_options
)

try:
LOGGER.debug('Getting table model')
Expand Down
139 changes: 91 additions & 48 deletions pygeoapi/provider/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,12 @@
#
# =================================================================

# Testing local postgis with docker:
# docker run --name "postgis" \
# -v postgres_data:/var/lib/postgresql -p 5432:5432 \
# -e ALLOW_IP_RANGE=0.0.0.0/0 \
# -e POSTGRES_USER=postgres \
# -e POSTGRES_PASS=postgres \
# -e POSTGRES_DBNAME=test \
# -d -t kartoza/postgis

# Import dump:
# gunzip < tests/data/hotosm_bdi_waterways.sql.gz |
# psql -U postgres -h 127.0.0.1 -p 5432 test

from copy import deepcopy
from datetime import datetime
from decimal import Decimal
import functools
import logging
from typing import Optional
from typing import Optional, Any

from geoalchemy2 import Geometry # noqa - this isn't used explicitly but is needed to process Geometry columns
from geoalchemy2.functions import ST_MakeEnvelope, ST_Intersects
Expand All @@ -73,7 +60,7 @@
desc,
delete
)
from sqlalchemy.engine import URL
from sqlalchemy.engine import URL, Engine
from sqlalchemy.exc import (
ConstraintColumnNotFoundError,
InvalidRequestError,
Expand All @@ -82,6 +69,7 @@
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session, load_only
from sqlalchemy.sql.expression import and_
from sqlalchemy.schema import Table

from pygeoapi.crs import get_transform_from_spec, get_srid
from pygeoapi.provider.base import (
Expand Down Expand Up @@ -135,22 +123,22 @@ def __init__(
LOGGER.debug(f'Configured Storage CRS: {self.storage_crs}')

# Read table information from database
options = provider_def.get('options', {})
self._store_db_parameters(provider_def['data'], options)
options = provider_def.get('options', {}) | extra_conn_args
store_db_parameters(self, provider_def['data'], options)
self._engine = get_engine(
driver_name,
self.db_host,
self.db_port,
self.db_name,
self.db_user,
self._db_password,
**self.db_options | extra_conn_args
self.db_conn,
**self.db_options
)
self.table_model = get_table_model(
self.table, self.id_field, self.db_search_path, self._engine
)

LOGGER.debug(f'DB connection: {repr(self._engine.url)}')
self.get_fields()

def query(
Expand Down Expand Up @@ -426,22 +414,6 @@ def delete(self, identifier):

return result.rowcount > 0

def _store_db_parameters(self, parameters, options):
self.db_user = parameters.get('user')
self.db_host = parameters.get('host')
self.db_port = parameters.get('port', self.default_port)
self.db_name = parameters.get('dbname')
# db_search_path gets converted to a tuple here in order to ensure it
# is hashable - which allows us to use functools.cache() when
# reflecting the table definition from the DB
self.db_search_path = tuple(parameters.get('search_path', ['public']))
self._db_password = parameters.get('password')
self.db_options = {
k: v
for k, v in options.items()
if not isinstance(v, dict)
}

def _sqlalchemy_to_feature(self, item, crs_transform_out=None,
select_properties=[]):
"""
Expand Down Expand Up @@ -602,6 +574,48 @@ def _select_properties_clause(self, select_properties, skip_geometry):
return selected_properties_clause


def store_db_parameters(
self: GenericSQLProvider | Any,
connection_data: str | dict[str],
options: dict[str, str]
) -> None:
"""
Store database connection parameters

:self: instance of provider or manager class
:param connection_data: connection string or dict of connection params
:param options: additional connection options

:returns: None
"""
if isinstance(connection_data, str):
self.db_conn = connection_data
connection_data = {}
else:
self.db_conn = None
# OR
self.db_user = connection_data.get('user')
self.db_host = connection_data.get('host')
self.db_port = connection_data.get('port', self.default_port)
self.db_name = (
connection_data.get('dbname') or connection_data.get('database')
)
self.db_query = connection_data.get('query')
self._db_password = connection_data.get('password')
# db_search_path gets converted to a tuple here in order to ensure it
# is hashable - which allows us to use functools.cache() when
# reflecting the table definition from the DB
self.db_search_path = tuple(
connection_data.get('search_path') or
options.pop('search_path', ['public'])
)
self.db_options = {
k: v
for k, v in options.items()
if not isinstance(v, dict)
}


@functools.cache
def get_engine(
driver_name: str,
Expand All @@ -610,20 +624,38 @@ def get_engine(
database: str,
user: str,
password: str,
conn_str: Optional[str] = None,
**connect_args
):
"""Create SQL Alchemy engine."""
conn_str = URL.create(
drivername=driver_name,
username=user,
password=password,
host=host,
port=int(port),
database=database
)
) -> Engine:
"""
Get SQL Alchemy engine.

:param driver_name: database driver name
:param host: database host
:param port: database port
:param database: database name
:param user: database user
:param password: database password
:param conn_str: optional connection URL
:param connect_args: custom connection arguments to pass to create_engine()

:returns: SQL Alchemy engine
"""
if conn_str is None:
conn_str = URL.create(
drivername=driver_name,
username=user,
password=password,
host=host,
port=int(port),
database=database
)

engine = create_engine(
conn_str, connect_args=connect_args, pool_pre_ping=True
)

LOGGER.debug(f'Created engine for {repr(engine.url)}.')
return engine


Expand All @@ -632,14 +664,25 @@ def get_table_model(
table_name: str,
id_field: str,
db_search_path: tuple[str],
engine
):
"""Reflect table."""
engine: Engine
) -> Table:
"""
Reflect table using SQLAlchemy Automap.

:param table_name: name of table to reflect
:param id_field: name of primary key field
:param db_search_path: tuple of database schemas to search for the table
:param engine: SQLAlchemy engine to use for reflection

:returns: SQLAlchemy model of the reflected table
"""
LOGGER.debug('Reflecting table definition from database')
metadata = MetaData()

# Look for table in the first schema in the search path
schema = db_search_path[0]
try:
LOGGER.debug(f'Looking for table {table_name} in schema {schema}')
metadata.reflect(
bind=engine, schema=schema, only=[table_name], views=True
)
Expand Down
Loading