Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build/
opensips.egg-info/
__pycache__/
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ Currently, the following packages are available:
pip install .
```

or from PyPI:

```bash
pip install opensips
```

2. Import the package in your Python code:

```python
from opensips.mi import OpenSIPSMI, OpenSIPSMIException
from opensips.event import OpenSIPSEvent, OpenSIPSEventException
from opensips.event import OpenSIPSEvent, OpenSIPSEventException, OpenSIPSEventHandler
```

3. Use the methods provided by the modules:
Expand All @@ -38,19 +44,20 @@ Currently, the following packages are available:

```python
mi_connector = OpenSIPSMI('http', url='http://localhost:8888/mi')
event = OpenSIPSEvent(mi_connector, 'datagram', ip='127.0.0.1', port=50012)
hdl = OpenSIPSEventHandler(mi_connector, 'datagram', ip='127.0.0.1', port=50012)

def some_callback(message):
# do something with the message
pass


ev: OpenSIPSEvent = None
try:
event.subscribe('E_PIKE_BLOCKED', some_callback)
event = hdl.subscribe('E_PIKE_BLOCKED', some_callback)
except OpenSIPSEventException as e:
# handle the exception

try:
event.unsubscribe('E_PIKE_BLOCKED')
ev.unsubscribe('E_PIKE_BLOCKED')
except OpenSIPSEventException as e:
# handle the exception
```
Expand Down
31 changes: 24 additions & 7 deletions docs/event.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,45 @@
# OpenSIPS Python Packages - Event Interface

This package can be used to subscribe to OpenSIPS Event Interface events.
This package can be used to subscribe to OpenSIPS events.

## Supported backend protocols

The following event transport protocols are supported:
* `datagram` - uses either UDP or UNIX datagram to receive notifications for subscribed events. If using UDP, the `ip` and `port` parameters are required. If using UNIX datagram, the `socket_path` parameter is required.
* `stream` - uses TCP to communicate with the Event Interface. Requires the `ip` and `port` parameters to be set.
* `datagram` (Default) - uses either UDP or UNIX datagram to receive notifications for subscribed events. By default, the UDP protocol is used with the `ip` and `port` parameters set to `0.0.0.0` and `0` (any available port) respectively, but you can tune them to your needs.
To use the UNIX datagram, set the `socket_path` parameter.
* `stream` - uses TCP to communicate with the Event Interface. Default values for `ip` and `port` are `0.0.0.0` and `0` (any available port) respectively, but you can change them as needed.

## How to use

To instantiate the `OpenSIPSEvent` class, you need to provide a MI connector, the backend protocol and the required parameters in a key-value format. Then `subscribe` and `unsubscribe` methods can be used to manage the subscriptions.
To subscribe to events, you must instantiate an `OpenSIPSEventHandler`. This class can be used to subscribe and unsubscribe from events. It uses an `OpenSIPSMI` object to communicate with the OpenSIPS MI interface. By default, a MI connector is created with the `fifo` type, but you can set it as a parameter in the constructor. As said before, the default transport protocol is `datagram`, but you can change it by setting the `_type` parameter.

Next step is to create an `OpenSIPSEvent` object. This can be done by calling the `subscribe` method of the `OpenSIPSEventHandler` object or by creating an `OpenSIPSEvent` object directly. The `subscribe` method will return an `OpenSIPSEvent` object.

To unsubscribe from an event, you can call the `unsubscribe` method of the `OpenSIPSEvent` object or the `unsubscribe` method of the `OpenSIPSEventHandler` object.

```python
from opensips.mi import OpenSIPSMI, OpenSIPSMIException
from opensips.event import OpenSIPSEvent, OpenSIPSEventException

# simple way
hdl = OpenSIPSEventHandler()

# tuned way
mi_connector = OpenSIPSMI('http', url='http://localhost:8888/mi')
event = OpenSIPSEvent(mi_connector, 'datagram', ip='127.0.0.1', port=50012)
hdl = OpenSIPSEventHandler(mi_connector, 'datagram', ip='127.0.0.1', port=50012)

try:
event.subscribe('E_PIKE_BLOCKED', some_callback)
ev = hdl.subscribe('E_PIKE_BLOCKED', some_callback)
except OpenSIPSEventException as e:
# handle the exception

# or create an OpenSIPSEvent object directly
ev = OpenSIPSEvent(hdl, 'E_PIKE_BLOCKED', some_callback)

try:
event.unsubscribe('E_PIKE_BLOCKED')
ev.unsubscribe()
# or
hdl.unsubscribe('E_PIKE_BLOCKED')
except OpenSIPSEventException as e:
# handle the exception
```
Expand Down
3 changes: 2 additions & 1 deletion opensips/event/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@

""" Event package of OpenSIPS """

from .subscriber import OpenSIPSEvent, OpenSIPSEventException
from .event import OpenSIPSEvent, OpenSIPSEventException
from .handler import OpenSIPSEventHandler
12 changes: 7 additions & 5 deletions opensips/event/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import signal
import argparse
from opensips.mi import OpenSIPSMI
from opensips.event import OpenSIPSEvent, OpenSIPSEventException
from opensips.event import OpenSIPSEventHandler, OpenSIPSEventException

parser = argparse.ArgumentParser()

Expand Down Expand Up @@ -100,10 +100,10 @@ def main():
elif args.type == 'datagram':
mi = OpenSIPSMI('datagram', datagram_ip=args.ip, datagram_port=args.port)
else:
print(f'Unknownt type: {args.type}')
print(f'Unknown type: {args.type}')
sys.exit(1)

ev = OpenSIPSEvent(mi, args.transport, ip=args.listen_ip, port=args.listen_port)
hdl = OpenSIPSEventHandler(mi, args.transport, ip=args.listen_ip, port=args.listen_port)

def event_handler(message):
""" Event handler callback """
Expand All @@ -113,9 +113,11 @@ def event_handler(message):
except json.JSONDecodeError as e:
print(f"Failed to decode JSON: {e}")

ev = None

def timer(*_):
""" Timer to notify when the event expires """
ev.unsubscribe(args.event)
ev.unsubscribe()
sys.exit(0) # successful

if args.expire:
Expand All @@ -126,7 +128,7 @@ def timer(*_):
signal.signal(signal.SIGTERM, timer)

try:
ev.subscribe(args.event, event_handler, expire=args.expire)
ev = hdl.subscribe(args.event, event_handler, args.expire)
except OpenSIPSEventException as e:
print(e)
sys.exit(1)
Expand Down
6 changes: 2 additions & 4 deletions opensips/event/datagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ def __init__(self, **kwargs):

if "unix_path" in kwargs:
self.sock_name = kwargs["unix_path"]
elif "ip" in kwargs and "port" in kwargs:
self.ip = kwargs["ip"]
self.port = int(kwargs["port"])
else:
raise ValueError("ip and port or unix_path is required for Datagram connector")
self.ip = kwargs.get("ip", "0.0.0.0")
self.port = int(kwargs.get("port", 0))

def create(self):
if self.ip is not None:
Expand Down
63 changes: 23 additions & 40 deletions opensips/event/subscriber.py → opensips/event/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
""" Module that implements OpenSIPS Event behavior """

from threading import Thread, Event
from ..mi import OpenSIPSMI, OpenSIPSMIException
from .datagram import Datagram
from .stream import Stream
from ..mi import OpenSIPSMIException

class OpenSIPSEventException(Exception):
""" Exceptions generated by OpenSIPS Events """
Expand All @@ -32,19 +30,26 @@ class OpenSIPSEvent():

""" Implementation of the OpenSIPS Event """

def __init__(self, mi: OpenSIPSMI, _type: str, **kwargs):
self.mi = mi
self.kwargs = kwargs
def __init__(self, handler, name: str, callback, expire=None):
self._handler = handler
self.name = name
self.callback = callback
self.thread = None
self.thread_stop = Event()
self.thread_stop.clear()

if _type == "datagram":
self.socket = Datagram(**kwargs)
elif _type == "stream":
self.socket = Stream(**kwargs)
else:
raise ValueError("Invalid event type")
try:
self.socket = self._handler.__new_socket__()
self._handler.__mi_subscribe__(self.name, self.socket.create(), expire)
self._handler.events[self.name] = self
self.thread = Thread(target=self.handle, args=(callback,))
self.thread.start()
except OpenSIPSEventException as e:
raise e
except OpenSIPSMIException as e:
raise e
except ValueError as e:
raise OpenSIPSEventException("Invalid arguments for socket creation: {}".format(e))

def handle(self, callback):
""" Handles the event callbacks """
Expand All @@ -53,36 +58,14 @@ def handle(self, callback):
if data:
callback(data)

def subscribe(self, event: str, callback, expire=None):
""" Subscribes for an event """
def unsubscribe(self):
""" Unsubscribes the event """
try:
sock_name = self.socket.create()
if expire is None:
ret_val = self.mi.execute("event_subscribe", [event, sock_name])
else:
ret_val = self.mi.execute("event_subscribe", [event, sock_name, expire])

if ret_val != "OK":
raise OpenSIPSEventException("Failed to subscribe to event")

self.thread = Thread(target=self.handle, args=(callback,))
self.thread.start()

except OpenSIPSMIException as e:
raise e
except Exception as e:
raise OpenSIPSEventException(f"Failed to subscribe to event: {e}") from e

def unsubscribe(self, event: str):
""" Unsubscribes for an event """
try:
ret_val = self.mi.execute("event_subscribe", [event, self.socket.sock_name, 0])

if ret_val != "OK":
raise OpenSIPSEventException("Failed to unsubscribe from event")

self._handler.__mi_unsubscribe__(self.name, self.socket.sock_name)
self.stop()

del self._handler.events[self.name]
except OpenSIPSEventException as e:
raise e
except OpenSIPSMIException as e:
raise e

Expand Down
76 changes: 76 additions & 0 deletions opensips/event/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#!/usr/bin/env python
##
## This file is part of the OpenSIPS Python Package
## (see https://github.com/OpenSIPS/python-opensips).
##
## This program is free software: you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation, either version 3 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program. If not, see <http://www.gnu.org/licenses/>.
##

""" Module that implements an OpenSIPS Event Handler to manage events subscriptions """

from ..mi import OpenSIPSMI, OpenSIPSMIException
from .event import OpenSIPSEvent, OpenSIPSEventException
from .datagram import Datagram
from .stream import Stream

class OpenSIPSEventHandler():

""" Implementation of the OpenSIPS Event Handler"""

def __init__(self, mi: OpenSIPSMI = None, _type: str = None, **kwargs):
if mi:
self.mi = mi
else:
self.mi = OpenSIPSMI()
if _type:
self._type = _type
else:
self._type = "datagram"
self.kwargs = kwargs
self.events = {str: OpenSIPSEvent}

def __new_socket__(self):
if self._type == "datagram":
return Datagram(**self.kwargs)
elif self._type == "stream":
return Stream(**self.kwargs)
else:
raise ValueError("Invalid event type")

def subscribe(self, event_name: str, callback, expire=None):
return OpenSIPSEvent(self, event_name, callback, expire)

def unsubscribe(self, event_name: str):
self.events[event_name].unsubscribe()

def __mi_subscribe__(self, event_name: str, sock_name: str, expire=None):
try:
if expire is None:
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name])
else:
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, expire])

if ret_val != "OK":
raise OpenSIPSEventException("Failed to subscribe to event")
except OpenSIPSMIException as e:
raise e

def __mi_unsubscribe__(self, event_name: str, sock_name: str):
try:
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, 0])

if ret_val != "OK":
raise OpenSIPSEventException("Failed to unsubscribe from event")
except OpenSIPSMIException as e:
raise e
9 changes: 2 additions & 7 deletions opensips/event/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,8 @@ class Stream(GenericSocket):
""" TCP/Stream implementation of a socket """

def __init__(self, **kwargs):
if "ip" not in kwargs:
raise ValueError("ip is required for Stream connector")
if "port" not in kwargs:
raise ValueError("port is required for Stream connector")

self.ip = kwargs["ip"]
self.port = int(kwargs["port"])
self.ip = kwargs.get("ip", "0.0.0.0")
self.port = int(kwargs.get("port", 0))
self.sock = None
self.sock_name = None

Expand Down