diff --git a/docs/conf.py b/docs/conf.py index f3e3e1f29..aca8b30e5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -63,7 +63,7 @@ templates_path = ['_templates'] # The suffix of source filenames. -source_suffix = '.rst' +source_suffix = {'.rst': 'restructuredtext'} locale_dirs = ['locale/'] # path is example but recommended. diff --git a/docs/pubsub.rst b/docs/pubsub.rst index 4e12a40b1..dafbeb8d2 100644 --- a/docs/pubsub.rst +++ b/docs/pubsub.rst @@ -27,10 +27,30 @@ Example directive: broker: type: mqtt url: mqtt://localhost:1883 + channel: messages/a/data # optional + +HTTP +---- + +Example directive: + +.. code-block:: yaml + + pubsub: + broker: + type: http + url: https://ntfy.sh + channel: messages/a/data # optional .. note:: - For MQTT endpoints requiring authentication, encode the ``url`` value as follows: ``mqtt://username:password@localhost:1883`` + For any Pub/Sub endpoints requiring authentication, encode the ``url`` value as follows: + + * ``mqtt://username:password@localhost:1883`` + * ``https://username:password@localhost`` + +.. note:: + If no ``channel`` is defined, the relevant OGC API endpoint is used. .. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html diff --git a/pycsw/broker/__init__.py b/pycsw/broker/__init__.py index f7e2dc7a2..c0efd2ff7 100644 --- a/pycsw/broker/__init__.py +++ b/pycsw/broker/__init__.py @@ -48,5 +48,6 @@ def load_client(def_: dict) -> BasePubSubClient: CLIENTS = { - 'mqtt': 'pycsw.broker.mqtt.MQTTPubSubClient' + 'mqtt': 'pycsw.broker.mqtt.MQTTPubSubClient', + 'http': 'pycsw.broker.http.HTTPPubSubClient' } diff --git a/pycsw/broker/base.py b/pycsw/broker/base.py index 763eb878a..35ec0a7be 100644 --- a/pycsw/broker/base.py +++ b/pycsw/broker/base.py @@ -50,6 +50,7 @@ def __init__(self, publisher_def: dict): self.type = None self.client_id = f'pycsw-pubsub-{random.randint(0, 1000)}' + self.channel = publisher_def.get('channel') self.show_link = publisher_def.get('show_link', True) self.broker = publisher_def['url'] diff --git a/pycsw/broker/http.py b/pycsw/broker/http.py new file mode 100644 index 000000000..9524f8a86 --- /dev/null +++ b/pycsw/broker/http.py @@ -0,0 +1,100 @@ +# ================================================================= +# +# Authors: Tom Kralidis +# Angelos Tzotsos +# +# Copyright (c) 2025 Tom Kralidis +# Copyright (c) 2025 Angelos Tzotsos +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, +# copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the +# Software is furnished to do so, subject to the following +# conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# +# ================================================================= + +import logging + +import requests + +from pycsw.broker.base import BasePubSubClient + +LOGGER = logging.getLogger(__name__) + + +class HTTPPubSubClient(BasePubSubClient): + """HTTP client""" + + def __init__(self, broker_url): + """ + Initialize object + + :param publisher_def: provider definition + + :returns: pycsw.pubsub.http.HTTPPubSubClient + """ + + super().__init__(broker_url) + self.type = 'http' + self.auth = None + + msg = f'Initializing to broker {self.broker_safe_url} with id {self.client_id}' # noqa + LOGGER.debug(msg) + + if None not in [self.broker_url.username, self.broker_url.password]: + LOGGER.debug('Setting credentials') + self.auth = ( + self.broker_url.username, + self.broker_url.password + ) + + def connect(self) -> None: + """ + Connect to an HTTP broker + + :returns: None + """ + + LOGGER.debug('No connection to HTTP') + pass + + def pub(self, channel: str, message: str, qos: int = 1) -> bool: + """ + Publish a message to a broker/channel + + :param channel: `str` of topic + :param message: `str` of message + + :returns: `bool` of publish result + """ + + LOGGER.debug(f'Publishing to broker {self.broker_safe_url}') + LOGGER.debug(f'Channel: {channel}') + LOGGER.debug(f'Message: {message}') + + url = f'{self.broker}/{channel}' + + try: + response = requests.post(url, auth=self.auth, json=message) + response.raise_for_status() + except Exception as err: + LOGGER.debug(f'Message publishing failed: {err}') + + def __repr__(self): + return f' {self.broker_safe_url}' diff --git a/pycsw/ogc/pubsub/__init__.py b/pycsw/ogc/pubsub/__init__.py index 7ef7778cd..f3de6243d 100644 --- a/pycsw/ogc/pubsub/__init__.py +++ b/pycsw/ogc/pubsub/__init__.py @@ -48,7 +48,7 @@ def publish_message(pubsub_client, action: str, collection: str = None, :returns: `bool` of whether message publishing was successful """ - channel = f'collections/{collection}' + channel = pubsub_client.channel or f'collections/{collection}' type_ = f'org.ogc.api.collection.item.{action}' if action in ['create', 'update']: