Skip to content

Commit 7194c0a

Browse files
committed
added async classes
1 parent f0d85ba commit 7194c0a

23 files changed

+1016
-251
lines changed

splitio/client/client.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44
from collections import namedtuple
55
import copy
66

7+
from splitio.client import input_validator
78
from splitio.engine.evaluator import Evaluator, CONTROL, EvaluationDataFactory, AsyncEvaluationDataFactory
89
from splitio.engine.splitters import Splitter
910
from splitio.models.impressions import Impression, Label, ImpressionDecorated
1011
from splitio.models.events import Event, EventWrapper, SdkEvent
1112
from splitio.models.telemetry import get_latency_bucket_index, MethodExceptionsAndLatencies
12-
from splitio.client import input_validator
13+
from splitio.optional.loaders import asyncio
1314
from splitio.util.time import get_current_epoch_time_ms, utctime_ms
1415

1516

@@ -40,7 +41,7 @@ class ClientBase(object): # pylint: disable=too-many-instance-attributes
4041
'impressions_disabled': False
4142
}
4243

43-
def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None):
44+
def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None):
4445
"""
4546
Construct a Client instance.
4647
@@ -66,6 +67,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca
6667
self._telemetry_evaluation_producer = self._factory._telemetry_evaluation_producer
6768
self._telemetry_init_producer = self._factory._telemetry_init_producer
6869
self._fallback_treatment_calculator = fallback_treatment_calculator
70+
self._events_manager = events_manager
6971

7072
@property
7173
def ready(self):
@@ -221,6 +223,23 @@ def _get_fallback_eval_results(self, eval_result, feature):
221223
def _check_impression_label(self, result):
222224
return result['impression']['label'] == None or (result['impression']['label'] != None and result['impression']['label'].find(Label.SPLIT_NOT_FOUND) == -1)
223225

226+
def _validate_sdk_event_info(self, sdk_event, callback_handle):
227+
if not self._check_sdk_event(sdk_event):
228+
return False
229+
230+
if not hasattr(callback_handle, '__call__'):
231+
_LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.")
232+
return False
233+
234+
return True
235+
236+
def _check_sdk_event(self, sdk_event):
237+
if not isinstance(sdk_event, SdkEvent):
238+
_LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.")
239+
return False
240+
241+
return True
242+
224243
class Client(ClientBase): # pylint: disable=too-many-instance-attributes
225244
"""Entry point for the split sdk."""
226245

@@ -239,8 +258,7 @@ def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallb
239258
240259
:rtype: Client
241260
"""
242-
ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator)
243-
self._events_manager = events_manager
261+
ClientBase.__init__(self, factory, recorder, events_manager, labels_enabled, fallback_treatment_calculator)
244262
self._context_factory = EvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments'))
245263

246264
def destroy(self):
@@ -256,17 +274,6 @@ def on(self, sdk_event, callback_handle):
256274
return
257275

258276
self._events_manager.register(sdk_event, callback_handle)
259-
260-
def _validate_sdk_event_info(self, sdk_event, callback_handle):
261-
if not isinstance(sdk_event, SdkEvent):
262-
_LOGGER.warning("Client Event Subscription: The event passed must be of type SdkEvent, ignoring event subscribing action.")
263-
return False
264-
265-
if not hasattr(callback_handle, '__call__'):
266-
_LOGGER.warning("Client Event Subscription: The callback handle passed must be of type function, ignoring event subscribing action.")
267-
return False
268-
269-
return True
270277

271278
def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None):
272279
"""
@@ -743,7 +750,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
743750
class ClientAsync(ClientBase): # pylint: disable=too-many-instance-attributes
744751
"""Entry point for the split sdk."""
745752

746-
def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_calculator=None):
753+
def __init__(self, factory, recorder, events_manager, labels_enabled=True, fallback_treatment_calculator=None):
747754
"""
748755
Construct a Client instance.
749756
@@ -758,7 +765,7 @@ def __init__(self, factory, recorder, labels_enabled=True, fallback_treatment_ca
758765
759766
:rtype: Client
760767
"""
761-
ClientBase.__init__(self, factory, recorder, labels_enabled, fallback_treatment_calculator)
768+
ClientBase.__init__(self, factory, recorder, events_manager, labels_enabled, fallback_treatment_calculator)
762769
self._context_factory = AsyncEvaluationDataFactory(factory._get_storage('splits'), factory._get_storage('segments'), factory._get_storage('rule_based_segments'))
763770

764771
async def destroy(self):
@@ -769,6 +776,12 @@ async def destroy(self):
769776
"""
770777
await self._factory.destroy()
771778

779+
async def on(self, sdk_event, callback_handle):
780+
if not self._validate_sdk_event_info(sdk_event, callback_handle):
781+
return
782+
783+
await self._events_manager.register(sdk_event, callback_handle)
784+
772785
async def get_treatment(self, key, feature_flag_name, attributes=None, evaluation_options=None):
773786
"""
774787
Get the treatment for a feature and key, with an optional dictionary of attributes, for async calls

splitio/client/factory.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
TelemetryStorageProducerAsync, TelemetryStorageConsumerAsync
2020
from splitio.engine.impressions.manager import Counter as ImpressionsCounter
2121
from splitio.engine.impressions.unique_keys_tracker import UniqueKeysTracker, UniqueKeysTrackerAsync
22-
from splitio.events.events_manager import EventsManager
22+
from splitio.events.events_manager import EventsManager, EventsManagerAsync
2323
from splitio.events.events_manager_config import EventsManagerConfig
24-
from splitio.events.events_task import EventsTask
24+
from splitio.events.events_task import EventsTask, EventsTaskAsync
2525
from splitio.events.events_delivery import EventsDelivery
2626
from splitio.models.fallback_config import FallbackTreatmentCalculator
2727
from splitio.models.notification import SdkInternalEventNotification
@@ -352,6 +352,8 @@ def __init__( # pylint: disable=too-many-arguments
352352
storages,
353353
labels_enabled,
354354
recorder,
355+
internal_events_queue,
356+
events_manager,
355357
sync_manager=None,
356358
telemetry_producer=None,
357359
telemetry_init_producer=None,
@@ -387,6 +389,8 @@ def __init__( # pylint: disable=too-many-arguments
387389
self._telemetry_submitter = telemetry_submitter
388390
self._ready_time = get_current_epoch_time_ms()
389391
_LOGGER.debug("Running in asyncio mode")
392+
self._internal_events_queue = internal_events_queue
393+
self._events_manager = events_manager
390394
self._manager_start_task = manager_start_task
391395
self._status = Status.NOT_INITIALIZED
392396
self._sdk_ready_flag = asyncio.Event()
@@ -409,6 +413,7 @@ async def _update_status_when_ready_async(self):
409413
_LOGGER.debug(str(e))
410414
self._status = Status.READY
411415
self._sdk_ready_flag.set()
416+
await self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_READY, None))
412417

413418
def manager(self):
414419
"""
@@ -434,6 +439,7 @@ async def block_until_ready(self, timeout=None):
434439
_LOGGER.error("Exception initializing SDK")
435440
_LOGGER.debug(str(e))
436441
await self._telemetry_init_producer.record_bur_time_out()
442+
await self._internal_events_queue.put(SdkInternalEventNotification(SdkInternalEvent.SDK_TIMED_OUT, None))
437443
raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout)
438444

439445
async def destroy(self, destroyed_event=None):
@@ -481,7 +487,7 @@ def client(self):
481487
This client is only a set of references to structures hold by the factory.
482488
Creating one a fast operation and safe to be used anywhere.
483489
"""
484-
return ClientAsync(self, self._recorder, self._labels_enabled, self._fallback_treatment_calculator)
490+
return ClientAsync(self, self._recorder, self._events_manager, self._labels_enabled, self._fallback_treatment_calculator)
485491

486492
def _wrap_impression_listener(listener, metadata):
487493
"""
@@ -698,11 +704,14 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
698704
'events': EventsAPIAsync(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
699705
'telemetry': TelemetryAPIAsync(http_client, api_key, sdk_metadata, telemetry_runtime_producer),
700706
}
707+
internal_events_queue = asyncio.Queue()
708+
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
709+
internal_events_task = EventsTaskAsync(events_manager.notify_internal_event, internal_events_queue)
701710

702711
storages = {
703-
'splits': InMemorySplitStorageAsync(cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
704-
'segments': InMemorySegmentStorageAsync(),
705-
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(),
712+
'splits': InMemorySplitStorageAsync(internal_events_queue, cfg['flagSetsFilter'] if cfg['flagSetsFilter'] is not None else []),
713+
'segments': InMemorySegmentStorageAsync(internal_events_queue),
714+
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue),
706715
'impressions': InMemoryImpressionStorageAsync(cfg['impressionsQueueSize'], telemetry_runtime_producer),
707716
'events': InMemoryEventStorageAsync(cfg['eventsQueueSize'], telemetry_runtime_producer),
708717
}
@@ -748,6 +757,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
748757
TelemetrySyncTaskAsync(synchronizers.telemetry_sync.synchronize_stats, cfg['metricsRefreshRate']),
749758
unique_keys_task,
750759
clear_filter_task,
760+
internal_events_task
751761
)
752762

753763
synchronizer = SynchronizerAsync(synchronizers, tasks)
@@ -770,11 +780,12 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=
770780
)
771781

772782
await telemetry_init_producer.record_config(cfg, extra_cfg, total_flag_sets, invalid_flag_sets)
783+
internal_events_task.start()
773784

774785
manager_start_task = asyncio.get_running_loop().create_task(manager.start())
775786

776787
return SplitFactoryAsync(api_key, storages, cfg['labelsEnabled'],
777-
recorder, manager,
788+
recorder, internal_events_queue, events_manager, manager,
778789
telemetry_producer, telemetry_init_producer,
779790
telemetry_submitter, manager_start_task=manager_start_task,
780791
api_client=http_client, fallback_treatment_calculator=FallbackTreatmentCalculator(cfg['fallbackTreatments']))
@@ -933,12 +944,16 @@ async def _build_redis_factory_async(api_key, cfg):
933944
manager = RedisManagerAsync(synchronizer)
934945
await telemetry_init_producer.record_config(cfg, {}, 0, 0)
935946
manager.start()
947+
internal_events_queue = asyncio.Queue()
948+
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
936949

937950
split_factory = SplitFactoryAsync(
938951
api_key,
939952
storages,
940953
cfg['labelsEnabled'],
941954
recorder,
955+
internal_events_queue,
956+
events_manager,
942957
manager,
943958
telemetry_producer=telemetry_producer,
944959
telemetry_init_producer=telemetry_init_producer,
@@ -1101,12 +1116,16 @@ async def _build_pluggable_factory_async(api_key, cfg):
11011116
manager = RedisManagerAsync(synchronizer)
11021117
manager.start()
11031118
await telemetry_init_producer.record_config(cfg, {}, 0, 0)
1119+
internal_events_queue = asyncio.Queue()
1120+
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
11041121

11051122
split_factory = SplitFactoryAsync(
11061123
api_key,
11071124
storages,
11081125
cfg['labelsEnabled'],
11091126
recorder,
1127+
internal_events_queue,
1128+
events_manager,
11101129
manager,
11111130
telemetry_producer=telemetry_producer,
11121131
telemetry_init_producer=telemetry_init_producer,
@@ -1205,10 +1224,12 @@ async def _build_localhost_factory_async(cfg):
12051224
telemetry_runtime_producer = telemetry_producer.get_telemetry_runtime_producer()
12061225
telemetry_evaluation_producer = telemetry_producer.get_telemetry_evaluation_producer()
12071226

1227+
internal_events_queue = asyncio.Queue()
1228+
events_manager = EventsManagerAsync(EventsManagerConfig(), EventsDelivery())
12081229
storages = {
1209-
'splits': InMemorySplitStorageAsync(),
1210-
'segments': InMemorySegmentStorageAsync(), # not used, just to avoid possible future errors.
1211-
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(),
1230+
'splits': InMemorySplitStorageAsync(internal_events_queue),
1231+
'segments': InMemorySegmentStorageAsync(internal_events_queue), # not used, just to avoid possible future errors.
1232+
'rule_based_segments': InMemoryRuleBasedSegmentStorageAsync(internal_events_queue),
12121233
'impressions': LocalhostImpressionsStorageAsync(),
12131234
'events': LocalhostEventsStorageAsync(),
12141235
}
@@ -1257,11 +1278,14 @@ async def _build_localhost_factory_async(cfg):
12571278
telemetry_evaluation_producer,
12581279
telemetry_runtime_producer
12591280
)
1281+
12601282
return SplitFactoryAsync(
12611283
'localhost',
12621284
storages,
12631285
False,
12641286
recorder,
1287+
internal_events_queue,
1288+
events_manager,
12651289
manager,
12661290
telemetry_producer=telemetry_producer,
12671291
telemetry_init_producer=telemetry_producer.get_telemetry_init_producer(),

splitio/events/events_delivery.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,10 @@ def deliver(self, sdk_event, event_metadata, event_handler):
1919
except Exception as ex:
2020
_LOGGER.error("Exception when calling handler for Sdk Event %s", sdk_event)
2121
_LOGGER.error(ex)
22+
23+
async def deliver_async(self, sdk_event, event_metadata, event_handler):
24+
try:
25+
await event_handler(event_metadata)
26+
except Exception as ex:
27+
_LOGGER.error("Exception when calling handler for Sdk Event %s", sdk_event)
28+
_LOGGER.error(ex)

0 commit comments

Comments
 (0)