Skip to content

Commit b9c2422

Browse files
committed
Add resubscribe mechanism.
1 parent e470e27 commit b9c2422

File tree

3 files changed

+32
-7
lines changed

3 files changed

+32
-7
lines changed

docs/event.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ If `callback` function is called with `None` as a parameter, it means that there
4848

4949
## Subscribing
5050

51-
By default, the subscription will be permanent. If you want to set a timeout, you can use the `expires` parameter. The value should be an integer representing the number of seconds the subscription will be active.
51+
By default, the subscription will be permanent with a resubscribing interval of 1 hour. If you want to set a timeout, you can use the `expires` parameter. The value should be an integer representing the number of seconds the subscription will be active.
5252

5353
## How it works
5454

opensips/event/event.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from threading import Thread, Event
2424
from ..mi import OpenSIPSMIException
2525
from .json_helper import extract_json
26+
import time
2627

2728
class OpenSIPSEventException(Exception):
2829
""" Exceptions generated by OpenSIPS Events """
@@ -41,10 +42,17 @@ def __init__(self, handler, name: str, callback, expire=None):
4142
self.buf = b""
4243
self.json_queue = []
4344
self.retries = 0
45+
if expire is not None:
46+
self.expire = expire
47+
self.reregister = False
48+
else:
49+
self.expire = 3600
50+
self.reregister = True
4451

4552
try:
4653
self.socket = self._handler.__new_socket__()
47-
self._handler.__mi_subscribe__(self.name, self.socket.create(), expire)
54+
self._handler.__mi_subscribe__(self.name, self.socket.create(), self.expire)
55+
self.last_subscription = time.time()
4856
self._handler.events[self.name] = self
4957
self.thread = Thread(target=self.handle, args=(callback,))
5058
self.thread.start()
@@ -58,6 +66,16 @@ def __init__(self, handler, name: str, callback, expire=None):
5866
def handle(self, callback):
5967
""" Handles the event callbacks """
6068
while not self.thread_stop.is_set():
69+
if self.reregister and time.time() - self.last_subscription > self.expire - 60:
70+
try:
71+
self.resubscribe()
72+
except Exception as e:
73+
callback(None)
74+
break
75+
elif not self.reregister and time.time() - self.last_subscription > self.expire:
76+
callback(None)
77+
break
78+
6179
data = self.socket.read()
6280
if not data:
6381
continue
@@ -77,6 +95,16 @@ def handle(self, callback):
7795
json_obj = self.json_queue.pop(0)
7896
callback(json_obj)
7997

98+
def resubscribe(self):
99+
""" Resubscribes for the event """
100+
try:
101+
self._handler.__mi_subscribe__(self.name, self.socket.sock_name, self.expire)
102+
self.last_subscription = time.time()
103+
except OpenSIPSEventException as e:
104+
raise e
105+
except OpenSIPSMIException as e:
106+
raise e
107+
80108
def unsubscribe(self):
81109
""" Unsubscribes the event """
82110
try:

opensips/event/handler.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,9 @@ def subscribe(self, event_name: str, callback, expire=None):
5454
def unsubscribe(self, event_name: str):
5555
self.events[event_name].unsubscribe()
5656

57-
def __mi_subscribe__(self, event_name: str, sock_name: str, expire=None):
57+
def __mi_subscribe__(self, event_name: str, sock_name: str, expire):
5858
try:
59-
if expire is None:
60-
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name])
61-
else:
62-
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, expire])
59+
ret_val = self.mi.execute("event_subscribe", [event_name, sock_name, expire])
6360

6461
if ret_val != "OK":
6562
raise OpenSIPSEventException("Failed to subscribe to event")

0 commit comments

Comments
 (0)