diff --git a/.gitignore b/.gitignore index 5a25724..dbf3afb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ build/ opensips.egg-info/ +__pycache__/ \ No newline at end of file diff --git a/README.md b/README.md index 0f24e49..4a0b5d5 100644 --- a/README.md +++ b/README.md @@ -18,11 +18,17 @@ Currently, the following packages are available: pip install . ``` + or from PyPI: + + ```bash + pip install opensips + ``` + 2. Import the package in your Python code: ```python from opensips.mi import OpenSIPSMI, OpenSIPSMIException - from opensips.event import OpenSIPSEvent, OpenSIPSEventException + from opensips.event import OpenSIPSEvent, OpenSIPSEventException, OpenSIPSEventHandler ``` 3. Use the methods provided by the modules: @@ -38,19 +44,20 @@ Currently, the following packages are available: ```python mi_connector = OpenSIPSMI('http', url='http://localhost:8888/mi') - event = OpenSIPSEvent(mi_connector, 'datagram', ip='127.0.0.1', port=50012) + hdl = OpenSIPSEventHandler(mi_connector, 'datagram', ip='127.0.0.1', port=50012) def some_callback(message): # do something with the message pass - + + ev: OpenSIPSEvent = None try: - event.subscribe('E_PIKE_BLOCKED', some_callback) + event = hdl.subscribe('E_PIKE_BLOCKED', some_callback) except OpenSIPSEventException as e: # handle the exception try: - event.unsubscribe('E_PIKE_BLOCKED') + ev.unsubscribe('E_PIKE_BLOCKED') except OpenSIPSEventException as e: # handle the exception ``` diff --git a/docs/event.md b/docs/event.md index 3ad9a85..8c03e33 100644 --- a/docs/event.md +++ b/docs/event.md @@ -1,28 +1,45 @@ # OpenSIPS Python Packages - Event Interface -This package can be used to subscribe to OpenSIPS Event Interface events. +This package can be used to subscribe to OpenSIPS events. ## Supported backend protocols The following event transport protocols are supported: -* `datagram` - uses either UDP or UNIX datagram to receive notifications for subscribed events. If using UDP, the `ip` and `port` parameters are required. If using UNIX datagram, the `socket_path` parameter is required. -* `stream` - uses TCP to communicate with the Event Interface. Requires the `ip` and `port` parameters to be set. +* `datagram` (Default) - uses either UDP or UNIX datagram to receive notifications for subscribed events. By default, the UDP protocol is used with the `ip` and `port` parameters set to `0.0.0.0` and `0` (any available port) respectively, but you can tune them to your needs. +To use the UNIX datagram, set the `socket_path` parameter. +* `stream` - uses TCP to communicate with the Event Interface. Default values for `ip` and `port` are `0.0.0.0` and `0` (any available port) respectively, but you can change them as needed. ## How to use -To instantiate the `OpenSIPSEvent` class, you need to provide a MI connector, the backend protocol and the required parameters in a key-value format. Then `subscribe` and `unsubscribe` methods can be used to manage the subscriptions. +To subscribe to events, you must instantiate an `OpenSIPSEventHandler`. This class can be used to subscribe and unsubscribe from events. It uses an `OpenSIPSMI` object to communicate with the OpenSIPS MI interface. By default, a MI connector is created with the `fifo` type, but you can set it as a parameter in the constructor. As said before, the default transport protocol is `datagram`, but you can change it by setting the `_type` parameter. + +Next step is to create an `OpenSIPSEvent` object. This can be done by calling the `subscribe` method of the `OpenSIPSEventHandler` object or by creating an `OpenSIPSEvent` object directly. The `subscribe` method will return an `OpenSIPSEvent` object. + +To unsubscribe from an event, you can call the `unsubscribe` method of the `OpenSIPSEvent` object or the `unsubscribe` method of the `OpenSIPSEventHandler` object. ```python +from opensips.mi import OpenSIPSMI, OpenSIPSMIException +from opensips.event import OpenSIPSEvent, OpenSIPSEventException + +# simple way +hdl = OpenSIPSEventHandler() + +# tuned way mi_connector = OpenSIPSMI('http', url='http://localhost:8888/mi') -event = OpenSIPSEvent(mi_connector, 'datagram', ip='127.0.0.1', port=50012) +hdl = OpenSIPSEventHandler(mi_connector, 'datagram', ip='127.0.0.1', port=50012) try: - event.subscribe('E_PIKE_BLOCKED', some_callback) + ev = hdl.subscribe('E_PIKE_BLOCKED', some_callback) except OpenSIPSEventException as e: # handle the exception +# or create an OpenSIPSEvent object directly +ev = OpenSIPSEvent(hdl, 'E_PIKE_BLOCKED', some_callback) + try: - event.unsubscribe('E_PIKE_BLOCKED') + ev.unsubscribe() + # or + hdl.unsubscribe('E_PIKE_BLOCKED') except OpenSIPSEventException as e: # handle the exception ``` diff --git a/opensips/event/__init__.py b/opensips/event/__init__.py index 9006484..682b9dd 100644 --- a/opensips/event/__init__.py +++ b/opensips/event/__init__.py @@ -19,4 +19,5 @@ """ Event package of OpenSIPS """ -from .subscriber import OpenSIPSEvent, OpenSIPSEventException +from .event import OpenSIPSEvent, OpenSIPSEventException +from .handler import OpenSIPSEventHandler diff --git a/opensips/event/__main__.py b/opensips/event/__main__.py index 15d3a31..8d02305 100644 --- a/opensips/event/__main__.py +++ b/opensips/event/__main__.py @@ -25,7 +25,7 @@ import signal import argparse from opensips.mi import OpenSIPSMI -from opensips.event import OpenSIPSEvent, OpenSIPSEventException +from opensips.event import OpenSIPSEventHandler, OpenSIPSEventException parser = argparse.ArgumentParser() @@ -100,10 +100,10 @@ def main(): elif args.type == 'datagram': mi = OpenSIPSMI('datagram', datagram_ip=args.ip, datagram_port=args.port) else: - print(f'Unknownt type: {args.type}') + print(f'Unknown type: {args.type}') sys.exit(1) - ev = OpenSIPSEvent(mi, args.transport, ip=args.listen_ip, port=args.listen_port) + hdl = OpenSIPSEventHandler(mi, args.transport, ip=args.listen_ip, port=args.listen_port) def event_handler(message): """ Event handler callback """ @@ -113,9 +113,11 @@ def event_handler(message): except json.JSONDecodeError as e: print(f"Failed to decode JSON: {e}") + ev = None + def timer(*_): """ Timer to notify when the event expires """ - ev.unsubscribe(args.event) + ev.unsubscribe() sys.exit(0) # successful if args.expire: @@ -126,7 +128,7 @@ def timer(*_): signal.signal(signal.SIGTERM, timer) try: - ev.subscribe(args.event, event_handler, expire=args.expire) + ev = hdl.subscribe(args.event, event_handler, args.expire) except OpenSIPSEventException as e: print(e) sys.exit(1) diff --git a/opensips/event/datagram.py b/opensips/event/datagram.py index 751cbb3..e38264a 100644 --- a/opensips/event/datagram.py +++ b/opensips/event/datagram.py @@ -33,11 +33,9 @@ def __init__(self, **kwargs): if "unix_path" in kwargs: self.sock_name = kwargs["unix_path"] - elif "ip" in kwargs and "port" in kwargs: - self.ip = kwargs["ip"] - self.port = int(kwargs["port"]) else: - raise ValueError("ip and port or unix_path is required for Datagram connector") + self.ip = kwargs.get("ip", "0.0.0.0") + self.port = int(kwargs.get("port", 0)) def create(self): if self.ip is not None: diff --git a/opensips/event/subscriber.py b/opensips/event/event.py similarity index 57% rename from opensips/event/subscriber.py rename to opensips/event/event.py index 9d90594..af7b6dc 100644 --- a/opensips/event/subscriber.py +++ b/opensips/event/event.py @@ -21,9 +21,7 @@ """ Module that implements OpenSIPS Event behavior """ from threading import Thread, Event -from ..mi import OpenSIPSMI, OpenSIPSMIException -from .datagram import Datagram -from .stream import Stream +from ..mi import OpenSIPSMIException class OpenSIPSEventException(Exception): """ Exceptions generated by OpenSIPS Events """ @@ -32,19 +30,26 @@ class OpenSIPSEvent(): """ Implementation of the OpenSIPS Event """ - def __init__(self, mi: OpenSIPSMI, _type: str, **kwargs): - self.mi = mi - self.kwargs = kwargs + def __init__(self, handler, name: str, callback, expire=None): + self._handler = handler + self.name = name + self.callback = callback self.thread = None self.thread_stop = Event() self.thread_stop.clear() - if _type == "datagram": - self.socket = Datagram(**kwargs) - elif _type == "stream": - self.socket = Stream(**kwargs) - else: - raise ValueError("Invalid event type") + try: + self.socket = self._handler.__new_socket__() + self._handler.__mi_subscribe__(self.name, self.socket.create(), expire) + self._handler.events[self.name] = self + self.thread = Thread(target=self.handle, args=(callback,)) + self.thread.start() + except OpenSIPSEventException as e: + raise e + except OpenSIPSMIException as e: + raise e + except ValueError as e: + raise OpenSIPSEventException("Invalid arguments for socket creation: {}".format(e)) def handle(self, callback): """ Handles the event callbacks """ @@ -53,36 +58,14 @@ def handle(self, callback): if data: callback(data) - def subscribe(self, event: str, callback, expire=None): - """ Subscribes for an event """ + def unsubscribe(self): + """ Unsubscribes the event """ try: - sock_name = self.socket.create() - if expire is None: - ret_val = self.mi.execute("event_subscribe", [event, sock_name]) - else: - ret_val = self.mi.execute("event_subscribe", [event, sock_name, expire]) - - if ret_val != "OK": - raise OpenSIPSEventException("Failed to subscribe to event") - - self.thread = Thread(target=self.handle, args=(callback,)) - self.thread.start() - - except OpenSIPSMIException as e: - raise e - except Exception as e: - raise OpenSIPSEventException(f"Failed to subscribe to event: {e}") from e - - def unsubscribe(self, event: str): - """ Unsubscribes for an event """ - try: - ret_val = self.mi.execute("event_subscribe", [event, self.socket.sock_name, 0]) - - if ret_val != "OK": - raise OpenSIPSEventException("Failed to unsubscribe from event") - + self._handler.__mi_unsubscribe__(self.name, self.socket.sock_name) self.stop() - + del self._handler.events[self.name] + except OpenSIPSEventException as e: + raise e except OpenSIPSMIException as e: raise e diff --git a/opensips/event/handler.py b/opensips/event/handler.py new file mode 100644 index 0000000..12ff69e --- /dev/null +++ b/opensips/event/handler.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +## +## This file is part of the OpenSIPS Python Package +## (see https://github.com/OpenSIPS/python-opensips). +## +## This program is free software: you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation, either version 3 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program. If not, see . +## + +""" Module that implements an OpenSIPS Event Handler to manage events subscriptions """ + +from ..mi import OpenSIPSMI, OpenSIPSMIException +from .event import OpenSIPSEvent, OpenSIPSEventException +from .datagram import Datagram +from .stream import Stream + +class OpenSIPSEventHandler(): + + """ Implementation of the OpenSIPS Event Handler""" + + def __init__(self, mi: OpenSIPSMI = None, _type: str = None, **kwargs): + if mi: + self.mi = mi + else: + self.mi = OpenSIPSMI() + if _type: + self._type = _type + else: + self._type = "datagram" + self.kwargs = kwargs + self.events = {str: OpenSIPSEvent} + + def __new_socket__(self): + if self._type == "datagram": + return Datagram(**self.kwargs) + elif self._type == "stream": + return Stream(**self.kwargs) + else: + raise ValueError("Invalid event type") + + def subscribe(self, event_name: str, callback, expire=None): + return OpenSIPSEvent(self, event_name, callback, expire) + + def unsubscribe(self, event_name: str): + self.events[event_name].unsubscribe() + + def __mi_subscribe__(self, event_name: str, sock_name: str, expire=None): + try: + if expire is None: + ret_val = self.mi.execute("event_subscribe", [event_name, sock_name]) + else: + ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, expire]) + + if ret_val != "OK": + raise OpenSIPSEventException("Failed to subscribe to event") + except OpenSIPSMIException as e: + raise e + + def __mi_unsubscribe__(self, event_name: str, sock_name: str): + try: + ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, 0]) + + if ret_val != "OK": + raise OpenSIPSEventException("Failed to unsubscribe from event") + except OpenSIPSMIException as e: + raise e \ No newline at end of file diff --git a/opensips/event/stream.py b/opensips/event/stream.py index 57c938f..06b7115 100644 --- a/opensips/event/stream.py +++ b/opensips/event/stream.py @@ -27,13 +27,8 @@ class Stream(GenericSocket): """ TCP/Stream implementation of a socket """ def __init__(self, **kwargs): - if "ip" not in kwargs: - raise ValueError("ip is required for Stream connector") - if "port" not in kwargs: - raise ValueError("port is required for Stream connector") - - self.ip = kwargs["ip"] - self.port = int(kwargs["port"]) + self.ip = kwargs.get("ip", "0.0.0.0") + self.port = int(kwargs.get("port", 0)) self.sock = None self.sock_name = None