Skip to content

Commit f2ad152

Browse files
committed
finish tests
1 parent 7194c0a commit f2ad152

File tree

5 files changed

+351
-4
lines changed

5 files changed

+351
-4
lines changed

splitio/events/events_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ def start(self):
133133

134134
self._running = True
135135
_LOGGER.debug('Starting SDK Event Task worker')
136-
asyncio.get_running_loop().create_task(self._run())
136+
asyncio.get_running_loop().create_task(self._run(), name="EventsTaskWorker")
137137

138138
async def stop(self, stop_flag=None):
139139
"""Stop worker."""

tests/client/test_factory.py

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1109,4 +1109,108 @@ async def _make_factory_with_apikey(apikey, *_, **__):
11091109
await asyncio.sleep(0.5)
11101110
assert factory.destroyed
11111111
assert len(build_redis.mock_calls) == 2
1112-
1112+
1113+
@pytest.mark.asyncio
1114+
async def test_internal_ready_event_notification(self, mocker):
1115+
"""Test that a client with in-memory storage is sending internal events correctly."""
1116+
# Setup synchronizer
1117+
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, telemetry_runtime_producer, sse_url=None, client_key=None):
1118+
synchronizer = mocker.Mock(spec=SynchronizerAsync)
1119+
async def sync_all(*_):
1120+
return None
1121+
synchronizer.sync_all = sync_all
1122+
1123+
def start_periodic_fetching():
1124+
pass
1125+
synchronizer.start_periodic_fetching = start_periodic_fetching
1126+
1127+
def start_periodic_data_recording():
1128+
pass
1129+
synchronizer.start_periodic_data_recording = start_periodic_data_recording
1130+
1131+
self._ready_flag = ready_flag
1132+
self._synchronizer = synchronizer
1133+
self._streaming_enabled = False
1134+
self._telemetry_runtime_producer = telemetry_runtime_producer
1135+
1136+
mocker.patch('splitio.sync.manager.ManagerAsync.__init__', new=_split_synchronizer)
1137+
1138+
async def synchronize_config(*_):
1139+
await asyncio.sleep(2)
1140+
pass
1141+
mocker.patch('splitio.sync.telemetry.InMemoryTelemetrySubmitterAsync.synchronize_config', new=synchronize_config)
1142+
1143+
async def record_ready_time(*_):
1144+
pass
1145+
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_ready_time', new=record_ready_time)
1146+
1147+
async def record_active_and_redundant_factories(*_):
1148+
pass
1149+
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_active_and_redundant_factories', new=record_active_and_redundant_factories)
1150+
1151+
# Start factory and make assertions
1152+
factory = await get_factory_async('some_api_key', config={'streamingEmabled': False})
1153+
for task in asyncio.all_tasks():
1154+
if task.get_name() == "EventsTaskWorker":
1155+
task.cancel()
1156+
try:
1157+
await factory.block_until_ready(3)
1158+
except:
1159+
pass
1160+
await asyncio.sleep(.2)
1161+
event = await factory._internal_events_queue.get()
1162+
assert event.internal_event == SdkInternalEvent.SDK_READY
1163+
assert event.metadata == None
1164+
await factory.destroy()
1165+
1166+
@pytest.mark.asyncio
1167+
async def test_internal_timeout_event_notification(self, mocker):
1168+
"""Test that a client with in-memory storage is sending internal events correctly."""
1169+
# Setup synchronizer
1170+
def _split_synchronizer(self, ready_flag, some, auth_api, streaming_enabled, sdk_matadata, telemetry_runtime_producer, sse_url=None, client_key=None):
1171+
synchronizer = mocker.Mock(spec=SynchronizerAsync)
1172+
async def sync_all(*_):
1173+
return None
1174+
synchronizer.sync_all = sync_all
1175+
1176+
def start_periodic_fetching():
1177+
pass
1178+
synchronizer.start_periodic_fetching = start_periodic_fetching
1179+
1180+
def start_periodic_data_recording():
1181+
pass
1182+
synchronizer.start_periodic_data_recording = start_periodic_data_recording
1183+
1184+
self._ready_flag = ready_flag
1185+
self._synchronizer = synchronizer
1186+
self._streaming_enabled = False
1187+
self._telemetry_runtime_producer = telemetry_runtime_producer
1188+
1189+
mocker.patch('splitio.sync.manager.ManagerAsync.__init__', new=_split_synchronizer)
1190+
1191+
async def synchronize_config(*_):
1192+
await asyncio.sleep(3)
1193+
pass
1194+
mocker.patch('splitio.sync.telemetry.InMemoryTelemetrySubmitterAsync.synchronize_config', new=synchronize_config)
1195+
1196+
async def record_ready_time(*_):
1197+
pass
1198+
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_ready_time', new=record_ready_time)
1199+
1200+
async def record_active_and_redundant_factories(*_):
1201+
pass
1202+
mocker.patch('splitio.models.telemetry.TelemetryConfigAsync.record_active_and_redundant_factories', new=record_active_and_redundant_factories)
1203+
1204+
# Start factory and make assertions
1205+
factory = await get_factory_async('some_api_key', config={'streamingEmabled': False})
1206+
for task in asyncio.all_tasks():
1207+
if task.get_name() == "EventsTaskWorker":
1208+
task.cancel()
1209+
try:
1210+
await factory.block_until_ready(1)
1211+
except:
1212+
pass
1213+
event = await factory._internal_events_queue.get()
1214+
assert event.internal_event == SdkInternalEvent.SDK_TIMED_OUT
1215+
assert event.metadata == None
1216+
await factory.destroy()

tests/integration/test_client_e2e.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2600,6 +2600,174 @@ def _ready_callback(self, metadata):
26002600
def _timeout_callback(self, metadata):
26012601
self.timeout_flag = True
26022602

2603+
class InMemoryEventsNotificationAsyncTests(object):
2604+
"""Inmemory storage-based events notification tests."""
2605+
2606+
ready_flag = False
2607+
timeout_flag = False
2608+
2609+
@pytest.mark.asyncio
2610+
async def test_sdk_timeout_fire(self):
2611+
"""Prepare storages with test data."""
2612+
factory2 = await get_factory_async('some_api_key')
2613+
client = factory2.client()
2614+
await client.on(SdkEvent.SDK_READY_TIMED_OUT, self._timeout_callback)
2615+
try:
2616+
await factory2.block_until_ready(1)
2617+
except Exception as e:
2618+
pass
2619+
2620+
await asyncio.sleep(1)
2621+
assert self.timeout_flag
2622+
2623+
"""Shut down the factory."""
2624+
await factory2.destroy()
2625+
2626+
@pytest.mark.asyncio
2627+
async def test_sdk_ready(self):
2628+
"""Prepare storages with test data."""
2629+
events_queue = asyncio.Queue()
2630+
split_storage = InMemorySplitStorageAsync(events_queue)
2631+
segment_storage = InMemorySegmentStorageAsync(events_queue)
2632+
rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue)
2633+
2634+
split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
2635+
with open(split_fn, 'r') as flo:
2636+
data = json.loads(flo.read())
2637+
for split in data['ff']['d']:
2638+
await split_storage.update([splits.from_raw(split)], [], 0)
2639+
2640+
for rbs in data['rbs']['d']:
2641+
await rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)
2642+
2643+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
2644+
with open(segment_fn, 'r') as flo:
2645+
data = json.loads(flo.read())
2646+
await segment_storage.put(segments.from_raw(data))
2647+
2648+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
2649+
with open(segment_fn, 'r') as flo:
2650+
data = json.loads(flo.read())
2651+
await segment_storage.put(segments.from_raw(data))
2652+
2653+
telemetry_storage = await InMemoryTelemetryStorageAsync.create()
2654+
telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage)
2655+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
2656+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
2657+
2658+
storages = {
2659+
'splits': split_storage,
2660+
'segments': segment_storage,
2661+
'rule_based_segments': rb_segment_storage,
2662+
'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer),
2663+
'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer),
2664+
}
2665+
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
2666+
recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
2667+
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
2668+
internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, events_queue)
2669+
2670+
# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
2671+
try:
2672+
factory = SplitFactoryAsync('some_api_key',
2673+
storages,
2674+
True,
2675+
recorder,
2676+
events_queue,
2677+
events_manager,
2678+
None,
2679+
telemetry_producer=telemetry_producer,
2680+
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
2681+
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
2682+
) # pylint:disable=attribute-defined-outside-init
2683+
internal_events_task.start()
2684+
except:
2685+
pass
2686+
2687+
client = factory.client()
2688+
await client.on(SdkEvent.SDK_READY, self._ready_callback)
2689+
await factory.block_until_ready(5)
2690+
assert self.ready_flag
2691+
2692+
"""Shut down the factory."""
2693+
await internal_events_task.stop()
2694+
await factory.destroy()
2695+
2696+
@pytest.mark.asyncio
2697+
async def test_sdk_ready_fire_later(self):
2698+
"""Prepare storages with test data."""
2699+
events_queue = asyncio.Queue()
2700+
split_storage = InMemorySplitStorageAsync(events_queue)
2701+
segment_storage = InMemorySegmentStorageAsync(events_queue)
2702+
rb_segment_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue)
2703+
2704+
split_fn = os.path.join(os.path.dirname(__file__), 'files', 'splitChanges.json')
2705+
with open(split_fn, 'r') as flo:
2706+
data = json.loads(flo.read())
2707+
for split in data['ff']['d']:
2708+
await split_storage.update([splits.from_raw(split)], [], 0)
2709+
2710+
for rbs in data['rbs']['d']:
2711+
await rb_segment_storage.update([rule_based_segments.from_raw(rbs)], [], 0)
2712+
2713+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentEmployeesChanges.json')
2714+
with open(segment_fn, 'r') as flo:
2715+
data = json.loads(flo.read())
2716+
await segment_storage.put(segments.from_raw(data))
2717+
2718+
segment_fn = os.path.join(os.path.dirname(__file__), 'files', 'segmentHumanBeignsChanges.json')
2719+
with open(segment_fn, 'r') as flo:
2720+
data = json.loads(flo.read())
2721+
await segment_storage.put(segments.from_raw(data))
2722+
2723+
telemetry_storage = await InMemoryTelemetryStorageAsync.create()
2724+
telemetry_producer = TelemetryStorageProducerAsync(telemetry_storage)
2725+
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
2726+
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
2727+
2728+
storages = {
2729+
'splits': split_storage,
2730+
'segments': segment_storage,
2731+
'rule_based_segments': rb_segment_storage,
2732+
'impressions': InMemoryImpressionStorageAsync(5000, telemetry_runtime_producer),
2733+
'events': InMemoryEventStorageAsync(5000, telemetry_runtime_producer),
2734+
}
2735+
impmanager = ImpressionsManager(StrategyDebugMode(), StrategyNoneMode(), telemetry_runtime_producer) # no listener
2736+
recorder = StandardRecorderAsync(impmanager, storages['events'], storages['impressions'], telemetry_evaluation_producer, telemetry_runtime_producer, imp_counter=ImpressionsCounter())
2737+
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
2738+
internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, events_queue)
2739+
2740+
# Since we are passing None as SDK_Ready event, the factory will use the Redis telemetry call, using try catch to ignore the exception.
2741+
try:
2742+
factory = SplitFactoryAsync('some_api_key',
2743+
storages,
2744+
True,
2745+
recorder,
2746+
events_queue,
2747+
events_manager,
2748+
None,
2749+
telemetry_producer=telemetry_producer,
2750+
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),
2751+
fallback_treatment_calculator=FallbackTreatmentCalculator(FallbackTreatmentsConfiguration(None, {'fallback_feature': FallbackTreatment("on-local", '{"prop": "val"}')}))
2752+
) # pylint:disable=attribute-defined-outside-init
2753+
internal_events_task.start()
2754+
except:
2755+
pass
2756+
2757+
client = factory.client()
2758+
await factory.block_until_ready(5)
2759+
await client.on(SdkEvent.SDK_READY, self._ready_callback)
2760+
2761+
"""Shut down the factory."""
2762+
await internal_events_task.stop()
2763+
await factory.destroy()
2764+
2765+
async def _ready_callback(self, metadata):
2766+
self.ready_flag = True
2767+
2768+
async def _timeout_callback(self, metadata):
2769+
self.timeout_flag = True
2770+
26032771
class InMemoryIntegrationAsyncTests(object):
26042772
"""Inmemory storage-based integration tests."""
26052773

tests/storage/test_inmemory_storage.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,36 @@ async def test_flag_sets_withut_config_sets(self):
708708
await storage.update([split3], [], 1)
709709
assert await storage.get_feature_flags_by_sets(['set05']) == ['split3']
710710
assert await storage.get_feature_flags_by_sets(['set04', 'set05']) == ['split3']
711+
712+
@pytest.mark.asyncio
713+
async def test_internal_event_notification(self, mocker):
714+
"""Test retrieving a list of all split names."""
715+
split1 = mocker.Mock()
716+
name1_prop = mocker.PropertyMock()
717+
name1_prop.return_value = 'split1'
718+
type(split1).name = name1_prop
719+
split2 = mocker.Mock()
720+
name2_prop = mocker.PropertyMock()
721+
name2_prop.return_value = 'split2'
722+
type(split2).name = name2_prop
723+
sets_property = mocker.PropertyMock()
724+
sets_property.return_value = ['set_1']
725+
type(split1).sets = sets_property
726+
type(split2).sets = sets_property
727+
events_queue = asyncio.Queue()
728+
storage = InMemorySplitStorageAsync(events_queue)
729+
await storage.update([split1, split2], [], -1)
730+
event = await events_queue.get()
731+
assert event.internal_event == SdkInternalEvent.FLAGS_UPDATED
732+
assert event.metadata.get_type() == SdkEventType.FLAG_UPDATE
733+
assert event.metadata.get_names() == {'split1', 'split2'}
711734

735+
await storage.kill_locally('split1', 'default_treatment', 3)
736+
event = await events_queue.get()
737+
assert event.internal_event == SdkInternalEvent.FLAG_KILLED_NOTIFICATION
738+
assert event.metadata.get_type() == SdkEventType.FLAG_UPDATE
739+
assert event.metadata.get_names() == {'split1'}
740+
712741
class InMemorySegmentStorageTests(object):
713742
"""In memory segment storage tests."""
714743

@@ -855,6 +884,23 @@ async def test_segment_update(self):
855884
assert not await storage.segment_contains('some_segment', 'key3')
856885
assert await storage.get_change_number('some_segment') == 456
857886

887+
@pytest.mark.asyncio
888+
async def test_internal_event_notification(self):
889+
"""Test updating a segment."""
890+
events_queue = asyncio.Queue()
891+
storage = InMemorySegmentStorageAsync(events_queue)
892+
segment = Segment('some_segment', ['key1', 'key2', 'key3'], 123)
893+
await storage.put(segment)
894+
event = await events_queue.get()
895+
assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED
896+
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
897+
assert len(event.metadata.get_names()) == 0
898+
899+
await storage.update('some_segment', ['key4', 'key5'], ['key2', 'key3'], 456)
900+
event = await events_queue.get()
901+
assert event.internal_event == SdkInternalEvent.SEGMENTS_UPDATED
902+
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
903+
assert len(event.metadata.get_names()) == 0
858904

859905
class InMemoryImpressionsStorageTests(object):
860906
"""InMemory impressions storage test cases."""
@@ -2027,3 +2073,32 @@ async def test_contains(self):
20272073
assert await storage.contains(["segment1"])
20282074
assert await storage.contains(["segment1", "segment3"])
20292075
assert not await storage.contains(["segment5"])
2076+
2077+
@pytest.mark.asyncio
2078+
async def test_internal_event_notification(self, mocker):
2079+
"""Test storing and retrieving splits works."""
2080+
events_queue = asyncio.Queue()
2081+
rbs_storage = InMemoryRuleBasedSegmentStorageAsync(events_queue)
2082+
2083+
segment1 = mocker.Mock(spec=RuleBasedSegment)
2084+
name_property = mocker.PropertyMock()
2085+
name_property.return_value = 'some_segment'
2086+
type(segment1).name = name_property
2087+
2088+
segment2 = mocker.Mock()
2089+
name2_prop = mocker.PropertyMock()
2090+
name2_prop.return_value = 'segment2'
2091+
type(segment2).name = name2_prop
2092+
2093+
await rbs_storage.update([segment1, segment2], [], -1)
2094+
event = await events_queue.get()
2095+
assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED
2096+
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
2097+
assert len(event.metadata.get_names()) == 0
2098+
2099+
await rbs_storage.update([], ['some_segment'], -1)
2100+
event = await events_queue.get()
2101+
assert event.internal_event == SdkInternalEvent.RB_SEGMENTS_UPDATED
2102+
assert event.metadata.get_type() == SdkEventType.SEGMENT_UPDATE
2103+
assert len(event.metadata.get_names()) == 0
2104+

0 commit comments

Comments
 (0)