1+ """Events Manager."""
2+ import threading
3+ import logging
4+ from collections import namedtuple
5+ import pytest
6+
7+ from splitio .events import EventsManagerInterface
8+
9+ _LOGGER = logging .getLogger (__name__ )
10+
11+ ValidSdkEvent = namedtuple ('ValidSdkEvent' , ['sdk_event' , 'valid' ])
12+ ActiveSubscriptions = namedtuple ('ActiveSubscriptions' , ['triggered' , 'handler' ])
13+
14+ class EventsManager (EventsManagerInterface ):
15+ """Events Manager class."""
16+
17+ def __init__ (self , events_configurations , events_delivery ):
18+ """
19+ Construct Events Manager instance.
20+ """
21+ self ._active_subscriptions = {}
22+ self ._internal_events_status = {}
23+ self ._events_delivery = events_delivery
24+ self ._manager_config = events_configurations
25+ self ._lock = threading .RLock ()
26+
27+ def register (self , sdk_event , event_handler ):
28+ if self ._active_subscriptions .get (sdk_event ) != None :
29+ return
30+
31+ with self ._lock :
32+ self ._active_subscriptions [sdk_event ] = ActiveSubscriptions (False , event_handler )
33+
34+ def unregister (self , sdk_event ):
35+ if self ._active_subscriptions .get (sdk_event ) == None :
36+ return
37+
38+ with self ._lock :
39+ del self ._active_subscriptions [sdk_event ]
40+
41+ def notify_internal_event (self , sdk_internal_event , event_metadata ):
42+ with self ._lock :
43+ for sorted_event in self ._manager_config .evaluation_order :
44+ if sorted_event in self ._get_sdk_event_if_applicable (sdk_internal_event ):
45+ _LOGGER .debug ("EventsManager: Firing Sdk event %s" , sorted_event )
46+ if self ._get_event_handler (sorted_event ) != None :
47+ notify_event = threading .Thread (target = self ._events_delivery .deliver , args = [sorted_event , event_metadata , self ._get_event_handler (sorted_event )],
48+ name = 'SplitSDKEventNotify' , daemon = True )
49+ notify_event .start ()
50+ self ._set_sdk_event_triggered (sorted_event )
51+
52+ def _event_already_triggered (self , sdk_event ):
53+ if self ._active_subscriptions .get (sdk_event ) != None :
54+ return self ._active_subscriptions .get (sdk_event ).triggered
55+
56+ return False
57+
58+ def _get_internal_event_status (self , sdk_internal_event ):
59+ if self ._internal_events_status .get (sdk_internal_event ) != None :
60+ return self ._internal_events_status [sdk_internal_event ]
61+
62+ return False
63+
64+ def _update_internal_event_status (self , sdk_internal_event , status ):
65+ with self ._lock :
66+ self ._internal_events_status [sdk_internal_event ] = status
67+
68+ def _set_sdk_event_triggered (self , sdk_event ):
69+ if self ._active_subscriptions .get (sdk_event ) == None :
70+ return
71+
72+ if self ._active_subscriptions .get (sdk_event ).triggered == True :
73+ return
74+
75+ self ._active_subscriptions [sdk_event ] = self ._active_subscriptions [sdk_event ]._replace (triggered = True )
76+
77+ def _get_event_handler (self , sdk_event ):
78+ if self ._active_subscriptions .get (sdk_event ) == None :
79+ return None
80+
81+ return self ._active_subscriptions .get (sdk_event ).handler
82+
83+ def _get_sdk_event_if_applicable (self , sdk_internal_event ):
84+ final_sdk_event = ValidSdkEvent (None , False )
85+ self ._update_internal_event_status (sdk_internal_event , True )
86+
87+ events_to_fire = []
88+ require_any_sdk_event = self ._check_require_any (sdk_internal_event )
89+ if require_any_sdk_event .valid :
90+ if (not self ._set_sdk_event_triggered (require_any_sdk_event .sdk_event ) and
91+ self ._execution_limit (require_any_sdk_event .sdk_event ) == 1 ) or \
92+ self ._execution_limit (require_any_sdk_event .sdk_event ) == - 1 :
93+ final_sdk_event = final_sdk_event ._replace (sdk_event = require_any_sdk_event .sdk_event ,
94+ valid = self ._check_prerequisites (require_any_sdk_event .sdk_event ) and \
95+ self ._check_suppressed_by (require_any_sdk_event .sdk_event ))
96+
97+ if final_sdk_event .valid :
98+ events_to_fire .append (final_sdk_event .sdk_event )
99+
100+ [events_to_fire .append (sdk_event ) for sdk_event in self ._check_require_all ()]
101+
102+ return events_to_fire
103+
104+ def _check_require_all (self ):
105+ events = []
106+ for require_name , require_value in self ._manager_config .require_all .items ():
107+ final_status = True
108+ for val in require_value :
109+ final_status &= self ._get_internal_event_status (val )
110+
111+ if final_status and \
112+ self ._check_prerequisites (require_name ) and \
113+ ((not self ._event_already_triggered (require_name ) and
114+ self ._execution_limit (require_name ) == 1 ) or \
115+ self ._execution_limit (require_name ) == - 1 ) and \
116+ len (require_value ) > 0 :
117+
118+ events .append (require_name )
119+
120+ return events
121+
122+ def _check_prerequisites (self , sdk_event ):
123+ for name , value in self ._manager_config .prerequisites .items ():
124+ for val in value :
125+ if name == sdk_event and not self ._event_already_triggered (val ):
126+ return False
127+
128+ return True
129+
130+ def _check_suppressed_by (self , sdk_event ):
131+ for name , value in self ._manager_config .suppressed_by .items ():
132+ for val in value :
133+ if name == sdk_event and self ._event_already_triggered (val ):
134+ return False
135+
136+ return True
137+
138+ def _execution_limit (self , sdk_event ):
139+ limit = self ._manager_config .execution_limits .get (sdk_event )
140+ if limit == None :
141+ return - 1
142+
143+ return limit
144+
145+ def _check_require_any (self , sdk_internal_event ):
146+ valid_sdk_event = ValidSdkEvent (None , False )
147+ for name , val in self ._manager_config .require_any .items ():
148+ if sdk_internal_event in val :
149+ valid_sdk_event = valid_sdk_event ._replace (valid = True , sdk_event = name )
150+ return valid_sdk_event
151+
152+ return valid_sdk_event
0 commit comments