Skip to content

Commit 5242dcb

Browse files
committed
Add new event handler based on asyncio
1 parent b9c2422 commit 5242dcb

File tree

4 files changed

+185
-1
lines changed

4 files changed

+185
-1
lines changed

opensips/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,5 @@
2020
""" Main package of OpenSIPS """
2121

2222
from .mi import OpenSIPSMI
23-
from .event import OpenSIPSEvent
23+
from .event import OpenSIPSEvent, AsyncOpenSIPSEvent
2424
from .version import __version__

opensips/event/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,5 @@
2121

2222
from .event import OpenSIPSEvent, OpenSIPSEventException
2323
from .handler import OpenSIPSEventHandler
24+
from .asyncevent import AsyncOpenSIPSEvent
25+
from .asynchandler import AsyncOpenSIPSEventHandler

opensips/event/asyncevent.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
#!/usr/bin/env python
2+
##
3+
## This file is part of the OpenSIPS Python Package
4+
## (see https://github.com/OpenSIPS/python-opensips).
5+
##
6+
## This program is free software: you can redistribute it and/or modify
7+
## it under the terms of the GNU General Public License as published by
8+
## the Free Software Foundation, either version 3 of the License, or
9+
## (at your option) any later version.
10+
##
11+
## This program is distributed in the hope that it will be useful,
12+
## but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
## GNU General Public License for more details.
15+
##
16+
## You should have received a copy of the GNU General Public License
17+
## along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
##
19+
20+
21+
""" Module that implements OpenSIPS Event behavior with asyncio """
22+
23+
from ..mi import OpenSIPSMIException
24+
from .json_helper import extract_json
25+
from .event import OpenSIPSEventException
26+
import asyncio
27+
28+
class AsyncOpenSIPSEvent():
29+
30+
""" Asyncio implementation of the OpenSIPS Event """
31+
32+
def __init__(self, handler, name: str, callback, expire=None):
33+
self._handler = handler
34+
self.name = name
35+
self.callback = callback
36+
self.buf = b""
37+
self.json_queue = []
38+
self.retries = 0
39+
if expire is not None:
40+
self.expire = expire
41+
self.reregister = False
42+
else:
43+
self.expire = 3600
44+
self.reregister = True
45+
46+
try:
47+
self.socket = self._handler.__new_socket__()
48+
self.socket.create()
49+
self._handler.events[self.name] = self
50+
self.resubscribe_task = asyncio.create_task(self.resubscribe())
51+
loop = asyncio.get_running_loop()
52+
loop.add_reader(self.socket.sock.fileno(), self.handle, self.callback)
53+
54+
except ValueError as e:
55+
raise OpenSIPSEventException("Invalid arguments for socket creation: {}".format(e))
56+
57+
def handle(self, callback):
58+
""" Handles the event callbacks """
59+
data = self.socket.read()
60+
if not data:
61+
return
62+
63+
self.buf += data
64+
self.json_queue, self.buf = extract_json(self.json_queue, self.buf)
65+
66+
if not self.json_queue:
67+
self.retries += 1
68+
69+
if self.retries > 10:
70+
callback(None)
71+
return
72+
73+
while self.json_queue:
74+
self.retries = 0
75+
json_obj = self.json_queue.pop(0)
76+
callback(json_obj)
77+
78+
async def resubscribe(self):
79+
""" Resubscribes for the event """
80+
try:
81+
while True:
82+
try:
83+
self._handler.__mi_subscribe__(self.name, self.socket.sock_name, self.expire)
84+
except OpenSIPSEventException as e:
85+
return
86+
except OpenSIPSMIException as e:
87+
return
88+
await asyncio.sleep(self.expire - 60)
89+
except asyncio.CancelledError:
90+
pass
91+
92+
def unsubscribe(self):
93+
""" Unsubscribes the event """
94+
try:
95+
self._handler.__mi_unsubscribe__(self.name, self.socket.sock_name)
96+
self.stop()
97+
del self._handler.events[self.name]
98+
except OpenSIPSEventException as e:
99+
raise e
100+
except OpenSIPSMIException as e:
101+
raise e
102+
103+
def stop(self):
104+
""" Stops the current event processing """
105+
loop = asyncio.get_running_loop()
106+
loop.remove_reader(self.socket.sock.fileno())
107+
self.resubscribe_task.cancel()
108+
self.socket.destroy()

opensips/event/asynchandler.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#!/usr/bin/env python
2+
##
3+
## This file is part of the OpenSIPS Python Package
4+
## (see https://github.com/OpenSIPS/python-opensips).
5+
##
6+
## This program is free software: you can redistribute it and/or modify
7+
## it under the terms of the GNU General Public License as published by
8+
## the Free Software Foundation, either version 3 of the License, or
9+
## (at your option) any later version.
10+
##
11+
## This program is distributed in the hope that it will be useful,
12+
## but WITHOUT ANY WARRANTY; without even the implied warranty of
13+
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14+
## GNU General Public License for more details.
15+
##
16+
## You should have received a copy of the GNU General Public License
17+
## along with this program. If not, see <http://www.gnu.org/licenses/>.
18+
##
19+
20+
""" Module that implements an OpenSIPS Event Handler to manage events subscriptions """
21+
22+
from ..mi import OpenSIPSMI, OpenSIPSMIException
23+
from .asyncevent import AsyncOpenSIPSEvent
24+
from .event import OpenSIPSEventException
25+
from .datagram import Datagram
26+
from .stream import Stream
27+
28+
class AsyncOpenSIPSEventHandler():
29+
30+
""" Implementation of the OpenSIPS Event Handler"""
31+
32+
def __init__(self, mi: OpenSIPSMI = None, _type: str = None, **kwargs):
33+
if mi:
34+
self.mi = mi
35+
else:
36+
self.mi = OpenSIPSMI()
37+
if _type:
38+
self._type = _type
39+
else:
40+
self._type = "datagram"
41+
self.kwargs = kwargs
42+
self.events = {str: AsyncOpenSIPSEvent}
43+
44+
def __new_socket__(self):
45+
if self._type == "datagram":
46+
return Datagram(**self.kwargs)
47+
elif self._type == "stream":
48+
return Stream(**self.kwargs)
49+
else:
50+
raise ValueError("Invalid event type")
51+
52+
def subscribe(self, event_name: str, callback, expire=None):
53+
return AsyncOpenSIPSEvent(self, event_name, callback, expire)
54+
55+
def unsubscribe(self, event_name: str):
56+
self.events[event_name].unsubscribe()
57+
58+
def __mi_subscribe__(self, event_name: str, sock_name: str, expire):
59+
try:
60+
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, expire])
61+
62+
if ret_val != "OK":
63+
raise OpenSIPSEventException("Failed to subscribe to event")
64+
except OpenSIPSMIException as e:
65+
raise e
66+
67+
def __mi_unsubscribe__(self, event_name: str, sock_name: str):
68+
try:
69+
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, 0])
70+
71+
if ret_val != "OK":
72+
raise OpenSIPSEventException("Failed to unsubscribe from event")
73+
except OpenSIPSMIException as e:
74+
raise e

0 commit comments

Comments
 (0)