Skip to content

Commit 0df687f

Browse files
authored
Merge pull request #616 from splitio/FME-12225-sdk-events-integration
updated sdk ready firing after subscription and integration tests
2 parents 02027af + 983a740 commit 0df687f

File tree

7 files changed

+254
-30
lines changed

7 files changed

+254
-30
lines changed

splitio/client/factory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import pytest
21
"""A module for Split.io Factories."""
32
import logging
43
import threading
@@ -643,7 +642,8 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl
643642
)
644643

645644
telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)
646-
645+
internal_events_task.start()
646+
647647
if preforked_initialization:
648648
synchronizer.sync_all(max_retry_attempts=_MAX_RETRY_SYNC_ALL)
649649
synchronizer._split_synchronizers._segment_sync.shutdown()

splitio/events/events_manager.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
import threading
33
import logging
44
from collections import namedtuple
5-
import pytest
65

76
from splitio.events import EventsManagerInterface
7+
from splitio.models.events import SdkEvent
88

99
_LOGGER = logging.getLogger(__name__)
1010

@@ -25,10 +25,17 @@ def __init__(self, events_configurations, events_delivery):
2525
self._lock = threading.RLock()
2626

2727
def register(self, sdk_event, event_handler):
28-
if self._active_subscriptions.get(sdk_event) != None:
28+
if self._active_subscriptions.get(sdk_event) != None and self._get_event_handler(sdk_event) != None:
2929
return
30-
30+
3131
with self._lock:
32+
# SDK ready already fired
33+
if sdk_event == SdkEvent.SDK_READY and self._event_already_triggered(sdk_event):
34+
self._active_subscriptions[sdk_event] = ActiveSubscriptions(True, event_handler)
35+
_LOGGER.debug("EventsManager: Firing SDK_READY event for new subscription")
36+
self._fire_sdk_event(sdk_event, None)
37+
return
38+
3239
self._active_subscriptions[sdk_event] = ActiveSubscriptions(False, event_handler)
3340

3441
def unregister(self, sdk_event):
@@ -42,18 +49,27 @@ def notify_internal_event(self, sdk_internal_event, event_metadata):
4249
with self._lock:
4350
for sorted_event in self._manager_config.evaluation_order:
4451
if sorted_event in self._get_sdk_event_if_applicable(sdk_internal_event):
45-
_LOGGER.debug("EventsManager: Firing Sdk event %s", sorted_event)
4652
if self._get_event_handler(sorted_event) != None:
47-
notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sorted_event, event_metadata, self._get_event_handler(sorted_event)],
48-
name='SplitSDKEventNotify', daemon=True)
49-
notify_event.start()
50-
self._set_sdk_event_triggered(sorted_event)
53+
self._fire_sdk_event(sorted_event, event_metadata)
54+
55+
# if client is not subscribed to SDK_READY
56+
if sorted_event == SdkEvent.SDK_READY and self._get_event_handler(sorted_event) == None:
57+
_LOGGER.debug("EventsManager: Registering SDK_READY event as fired")
58+
self._active_subscriptions[SdkEvent.SDK_READY] = ActiveSubscriptions(True, None)
59+
5160

5261
def destroy(self):
5362
with self._lock:
5463
self._active_subscriptions = {}
5564
self._internal_events_status = {}
5665

66+
def _fire_sdk_event(self, sdk_event, event_metadata):
67+
_LOGGER.debug("EventsManager: Firing Sdk event %s", sdk_event)
68+
notify_event = threading.Thread(target=self._events_delivery.deliver, args=[sdk_event, event_metadata, self._get_event_handler(sdk_event)],
69+
name='SplitSDKEventNotify', daemon=True)
70+
notify_event.start()
71+
self._set_sdk_event_triggered(sdk_event)
72+
5773
def _event_already_triggered(self, sdk_event):
5874
if self._active_subscriptions.get(sdk_event) != None:
5975
return self._active_subscriptions.get(sdk_event).triggered

splitio/storage/inmemmory.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,11 @@ def update(self, to_add, to_delete, new_change_number):
154154
[self._put(add_segment) for add_segment in to_add]
155155
[self._remove(delete_segment) for delete_segment in to_delete]
156156
self._set_change_number(new_change_number)
157-
self._internal_event_queue.put(
158-
SdkInternalEventNotification(
159-
SdkInternalEvent.RB_SEGMENTS_UPDATED,
160-
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))
157+
if len(to_add) > 0 or len(to_delete) > 0:
158+
self._internal_event_queue.put(
159+
SdkInternalEventNotification(
160+
SdkInternalEvent.RB_SEGMENTS_UPDATED,
161+
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))
161162

162163
def _put(self, rule_based_segment):
163164
"""
@@ -547,10 +548,11 @@ def update(self, to_add, to_delete, new_change_number):
547548
to_notify = []
548549
[to_notify.append(feature.name) for feature in to_add]
549550
to_notify.extend(to_delete)
550-
self._internal_event_queue.put(
551-
SdkInternalEventNotification(
552-
SdkInternalEvent.FLAGS_UPDATED,
553-
EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify))))
551+
if len(to_notify) > 0:
552+
self._internal_event_queue.put(
553+
SdkInternalEventNotification(
554+
SdkInternalEvent.FLAGS_UPDATED,
555+
EventsMetadata(SdkEventType.FLAG_UPDATE, set(to_notify))))
554556

555557
def _put(self, feature_flag):
556558
"""
@@ -999,10 +1001,11 @@ def update(self, segment_name, to_add, to_remove, change_number=None):
9991001
if change_number is not None:
10001002
self._segments[segment_name].change_number = change_number
10011003

1002-
self._internal_event_queue.put(
1003-
SdkInternalEventNotification(
1004-
SdkInternalEvent.SEGMENTS_UPDATED,
1005-
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))
1004+
if len(to_add) > 0 or len(to_remove) >0:
1005+
self._internal_event_queue.put(
1006+
SdkInternalEventNotification(
1007+
SdkInternalEvent.SEGMENTS_UPDATED,
1008+
EventsMetadata(SdkEventType.SEGMENT_UPDATE, {})))
10061009

10071010
def get_change_number(self, segment_name):
10081011
"""

splitio/sync/synchronizer.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,6 @@ def start_periodic_data_recording(self):
329329
for task in self._periodic_data_recording_tasks:
330330
task.start()
331331

332-
if self._split_tasks.internal_events_task:
333-
self._split_tasks.internal_events_task.start()
334-
335332
def stop_periodic_data_recording(self, blocking):
336333
"""
337334
Stop recorders.
@@ -883,8 +880,6 @@ def start_periodic_fetching(self):
883880
self._split_tasks.split_task.start()
884881
if self._split_tasks.segment_task is not None:
885882
self._split_tasks.segment_task.start()
886-
if self._split_tasks.internal_events_task:
887-
self._split_tasks.internal_events_task.start()
888883

889884
def stop_periodic_fetching(self):
890885
"""Stop fetchers for feature flags and segments."""

splitio/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '10.5.1'
1+
__version__ = '10.6.0'

tests/integration/test_client_e2e.py

Lines changed: 177 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from splitio.events.events_manager_config import EventsManagerConfig
2929
from splitio.events.events_task import EventsTask
3030
from splitio.models import splits, segments, rule_based_segments
31+
from splitio.models.events import SdkEvent
3132
from splitio.models.fallback_config import FallbackTreatmentsConfiguration, FallbackTreatmentCalculator
3233
from splitio.models.fallback_treatment import FallbackTreatment
3334
from splitio.recorder.recorder import StandardRecorder, PipelinedRecorder, StandardRecorderAsync, PipelinedRecorderAsync
@@ -2424,6 +2425,181 @@ def clear_cache(self):
24242425
for key in keys_to_delete:
24252426
redis_client.delete(key)
24262427

2428+
class InMemoryEventsNotificationTests(object):
2429+
"""Inmemory storage-based events notification tests."""
2430+
2431+
ready_flag = False
2432+
timeout_flag = False
2433+
2434+
def test_sdk_timeout_fire(self):
2435+
"""Prepare storages with test data."""
2436+
factory2 = get_factory('some_api_key')
2437+
client = factory2.client()
2438+
client.on(SdkEvent.SDK_READY_TIMED_OUT, self._timeout_callback)
2439+
try:
2440+
factory2.block_until_ready(1)
2441+
except Exception as e:
2442+
print(e)
2443+
pass
2444+
2445+
time.sleep(1)
2446+
assert self.timeout_flag
2447+
2448+
"""Shut down the factory."""
2449+
event = threading.Event()
2450+
factory2.destroy(event)
2451+
event.wait()
2452+
2453+
def test_sdk_ready(self):
2454+
"""Prepare storages with test data."""
2455+
events_queue = queue.Queue()
2456+
split_storage = InMemorySplitStorage(events_queue)
2457+
segment_storage = InMemorySegmentStorage(events_queue)
2458+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
2459+
2460+
split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
2461+
with open(split_fn, 'r') as flo:
2462+
data = json.loads(flo.read())
2463+
for split in data['ff']['d']:
2464+
split_storage.update([splits.from_raw(split)], [], 0)
2465+
2466+
for rbs in data['rbs']['d']:
2467+
rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)
2468+
2469+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
2470+
with open(segment_fn, 'r') as flo:
2471+
data = json.loads(flo.read())
2472+
segment_storage.put(segments.from_raw(data))
2473+
2474+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
2475+
with open(segment_fn, 'r') as flo:
2476+
data = json.loads(flo.read())
2477+
segment_storage.put(segments.from_raw(data))
2478+
2479+
telemetry_storage = InMemoryTelemetryStorage()
2480+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
2481+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
2482+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
2483+
2484+
storages = {
2485+
'splits': split_storage,
2486+
'segments': segment_storage,
2487+
'rule_based_segments': rb_segment_storage,
2488+
'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer),
2489+
'events': InMemoryEventStorage(5000, telemetry_runtime_producer),
2490+
}
2491+
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
2492+
recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
2493+
events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())
2494+
internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue)
2495+
2496+
# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
2497+
try:
2498+
factory = SplitFactory('some_api_key',
2499+
storages,
2500+
True,
2501+
recorder,
2502+
events_queue,
2503+
events_manager,
2504+
None,
2505+
telemetry_producer=telemetry_producer,
2506+
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
2507+
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
2508+
) # pylint:disable=attribute-defined-outside-init
2509+
internal_events_task.start()
2510+
except:
2511+
pass
2512+
2513+
client = factory.client()
2514+
client.on(SdkEvent.SDK_READY, self._ready_callback)
2515+
factory.block_until_ready(5)
2516+
assert self.ready_flag
2517+
2518+
"""Shut down the factory."""
2519+
event = threading.Event()
2520+
factory.destroy(event)
2521+
event.wait()
2522+
2523+
def test_sdk_ready_fire_later(self):
2524+
"""Prepare storages with test data."""
2525+
events_queue = queue.Queue()
2526+
split_storage = InMemorySplitStorage(events_queue)
2527+
segment_storage = InMemorySegmentStorage(events_queue)
2528+
rb_segment_storage = InMemoryRuleBasedSegmentStorage(events_queue)
2529+
2530+
split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
2531+
with open(split_fn, 'r') as flo:
2532+
data = json.loads(flo.read())
2533+
for split in data['ff']['d']:
2534+
split_storage.update([splits.from_raw(split)], [], 0)
2535+
2536+
for rbs in data['rbs']['d']:
2537+
rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)
2538+
2539+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
2540+
with open(segment_fn, 'r') as flo:
2541+
data = json.loads(flo.read())
2542+
segment_storage.put(segments.from_raw(data))
2543+
2544+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
2545+
with open(segment_fn, 'r') as flo:
2546+
data = json.loads(flo.read())
2547+
segment_storage.put(segments.from_raw(data))
2548+
2549+
telemetry_storage = InMemoryTelemetryStorage()
2550+
telemetry_producer = TelemetryStorageProducer(telemetry_storage)
2551+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
2552+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
2553+
2554+
storages = {
2555+
'splits': split_storage,
2556+
'segments': segment_storage,
2557+
'rule_based_segments': rb_segment_storage,
2558+
'impressions': InMemoryImpressionStorage(5000, telemetry_runtime_producer),
2559+
'events': InMemoryEventStorage(5000, telemetry_runtime_producer),
2560+
}
2561+
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
2562+
recorder = StandardRecorder(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
2563+
events_manager = EventsManager(EventsManagerConfig(), EventsDelivery())
2564+
internal_events_task = EventsTask(events_manager.notify_internal_event, events_queue)
2565+
2566+
# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
2567+
try:
2568+
factory = SplitFactory('some_api_key',
2569+
storages,
2570+
True,
2571+
recorder,
2572+
events_queue,
2573+
events_manager,
2574+
None,
2575+
telemetry_producer=telemetry_producer,
2576+
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
2577+
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
2578+
) # pylint:disable=attribute-defined-outside-init
2579+
internal_events_task.start()
2580+
except:
2581+
pass
2582+
2583+
client = factory.client()
2584+
factory.block_until_ready(5)
2585+
2586+
assert client.get_treatment('user1', 'sample_feature', evaluation_options=EvaluationOptions({"prop": "value"})) == 'on'
2587+
2588+
self.ready_flag = False
2589+
client.on(SdkEvent.SDK_READY, self._ready_callback)
2590+
assert self.ready_flag
2591+
2592+
"""Shut down the factory."""
2593+
event = threading.Event()
2594+
factory.destroy(event)
2595+
event.wait()
2596+
2597+
def _ready_callback(self, metadata):
2598+
self.ready_flag = True
2599+
2600+
def _timeout_callback(self, metadata):
2601+
self.timeout_flag = True
2602+
24272603
class InMemoryIntegrationAsyncTests(object):
24282604
"""Inmemory storage-based integration tests."""
24292605

@@ -4984,4 +5160,4 @@ async def _manager_methods_async(factory, skip_rbs=False):
49845160
return
49855161

49865162
assert len(await manager.split_names()) == 9
4987-
assert len(await manager.splits()) == 9
5163+
assert len(await manager.splits()) == 9

0 commit comments

Comments
 (0)