Skip to content

Commit 32da7b8

Browse files
authored
Merge pull request #432 from splitio/async-sync-telemetry
added sync.telemetry async classes with a fix in engine.telemetry
2 parents 7bb99d9 + 4f1ce9b commit 32da7b8

File tree

3 files changed

+236
-14
lines changed

3 files changed

+236
-14
lines changed

splitio/engine/telemetry.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ async def get_not_ready_usage(self):
345345
async def get_config_stats(self):
346346
"""Get config stats."""
347347
config_stats = await self._telemetry_storage.get_config_stats()
348-
config_stats.update({'t': self.pop_config_tags()})
348+
config_stats.update({'t': await self.pop_config_tags()})
349349
return config_stats
350350

351351
async def get_config_stats_to_json(self):
@@ -427,9 +427,9 @@ async def pop_formatted_stats(self):
427427
:returns: formatted stats
428428
:rtype: Dict
429429
"""
430-
exceptions = await self.pop_exceptions()['methodExceptions']
431-
latencies = await self.pop_latencies()['methodLatencies']
432-
return self._to_json(exceptions, latencies)
430+
exceptions = await self.pop_exceptions()
431+
latencies = await self.pop_latencies()
432+
return self._to_json(exceptions['methodExceptions'], latencies['methodLatencies'])
433433

434434

435435
class TelemetryRuntimeConsumerBase(object):
@@ -627,8 +627,8 @@ async def pop_formatted_stats(self):
627627
:rtype: Dict
628628
"""
629629
last_synchronization = await self.get_last_synchronization()
630-
http_errors = await self.pop_http_errors()['httpErrors']
631-
http_latencies = await self.pop_http_latencies()['httpLatencies']
630+
http_errors = await self.pop_http_errors()
631+
http_latencies = await self.pop_http_latencies()
632632

633633
return {
634634
'iQ': await self.get_impressions_stats(CounterConstants.IMPRESSIONS_QUEUED),
@@ -638,8 +638,8 @@ async def pop_formatted_stats(self):
638638
'eD': await self.get_events_stats(CounterConstants.EVENTS_DROPPED),
639639
'lS': self._last_synchronization_to_json(last_synchronization),
640640
't': await self.pop_tags(),
641-
'hE': self._http_errors_to_json(http_errors),
642-
'hL': self._http_latencies_to_json(http_latencies),
641+
'hE': self._http_errors_to_json(http_errors['httpErrors']),
642+
'hL': self._http_latencies_to_json(http_latencies['httpLatencies']),
643643
'aR': await self.pop_auth_rejections(),
644644
'tR': await self.pop_token_refreshes(),
645645
'sE': self._streaming_events_to_json(await self.pop_streaming_events()),

splitio/sync/telemetry.py

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,23 @@ def synchronize_stats(self):
1919
"""synchronize runtime stats class."""
2020
self._telemetry_submitter.synchronize_stats()
2121

22+
23+
class TelemetrySynchronizerAsync(object):
24+
"""Telemetry synchronizer class."""
25+
26+
def __init__(self, telemetry_submitter):
27+
"""Initialize Telemetry sync class."""
28+
self._telemetry_submitter = telemetry_submitter
29+
30+
async def synchronize_config(self):
31+
"""synchronize initial config data class."""
32+
await self._telemetry_submitter.synchronize_config()
33+
34+
async def synchronize_stats(self):
35+
"""synchronize runtime stats class."""
36+
await self._telemetry_submitter.synchronize_stats()
37+
38+
2239
class TelemetrySubmitter(object, metaclass=abc.ABCMeta):
2340
"""Telemetry sumbitter interface."""
2441

@@ -30,7 +47,8 @@ def synchronize_config(self):
3047
def synchronize_stats(self):
3148
"""synchronize runtime stats class."""
3249

33-
class InMemoryTelemetrySubmitter(object):
50+
51+
class InMemoryTelemetrySubmitter(TelemetrySubmitter):
3452
"""Telemetry sumbitter class."""
3553

3654
def __init__(self, telemetry_consumer, feature_flag_storage, segment_storage, telemetry_api):
@@ -66,6 +84,43 @@ def _build_stats(self):
6684
merged_dict.update(self._telemetry_evaluation_consumer.pop_formatted_stats())
6785
return merged_dict
6886

87+
88+
class InMemoryTelemetrySubmitterAsync(TelemetrySubmitter):
89+
"""Telemetry sumbitter async class."""
90+
91+
def __init__(self, telemetry_consumer, feature_flag_storage, segment_storage, telemetry_api):
92+
"""Initialize all producer classes."""
93+
self._telemetry_init_consumer = telemetry_consumer.get_telemetry_init_consumer()
94+
self._telemetry_evaluation_consumer = telemetry_consumer.get_telemetry_evaluation_consumer()
95+
self._telemetry_runtime_consumer = telemetry_consumer.get_telemetry_runtime_consumer()
96+
self._telemetry_api = telemetry_api
97+
self._feature_flag_storage = feature_flag_storage
98+
self._segment_storage = segment_storage
99+
100+
async def synchronize_config(self):
101+
"""synchronize initial config data classe."""
102+
await self._telemetry_api.record_init(await self._telemetry_init_consumer.get_config_stats())
103+
104+
async def synchronize_stats(self):
105+
"""synchronize runtime stats class."""
106+
await self._telemetry_api.record_stats(await self._build_stats())
107+
108+
async def _build_stats(self):
109+
"""
110+
Format stats to Dict.
111+
112+
:returns: formatted stats
113+
:rtype: Dict
114+
"""
115+
merged_dict = {
116+
'spC': await self._feature_flag_storage.get_splits_count(),
117+
'seC': await self._segment_storage.get_segments_count(),
118+
'skC': await self._segment_storage.get_segments_keys_count()
119+
}
120+
merged_dict.update(await self._telemetry_runtime_consumer.pop_formatted_stats())
121+
merged_dict.update(await self._telemetry_evaluation_consumer.pop_formatted_stats())
122+
return merged_dict
123+
69124
class RedisTelemetrySubmitter(object):
70125
"""Telemetry sumbitter class."""
71126

@@ -82,6 +137,21 @@ def synchronize_stats(self):
82137
pass
83138

84139

140+
class RedisTelemetrySubmitterAsync(object):
141+
"""Telemetry sumbitter class."""
142+
143+
def __init__(self, telemetry_storage):
144+
"""Initialize all producer classes."""
145+
self._telemetry_storage = telemetry_storage
146+
147+
async def synchronize_config(self):
148+
"""synchronize initial config data classe."""
149+
await self._telemetry_storage.push_config_stats()
150+
151+
async def synchronize_stats(self):
152+
"""No implementation."""
153+
pass
154+
85155
class LocalhostTelemetrySubmitter(object):
86156
"""Telemetry sumbitter class."""
87157

@@ -92,3 +162,14 @@ def synchronize_config(self):
92162
def synchronize_stats(self):
93163
"""No implementation."""
94164
pass
165+
166+
class LocalhostTelemetrySubmitterAsync(object):
167+
"""Telemetry sumbitter class."""
168+
169+
async def synchronize_config(self):
170+
"""No implementation."""
171+
pass
172+
173+
async def synchronize_stats(self):
174+
"""No implementation."""
175+
pass

tests/sync/test_telemetry.py

Lines changed: 146 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
"""Telemetry Worker tests."""
22
import unittest.mock as mock
3-
import json
4-
from splitio.sync.telemetry import TelemetrySynchronizer, InMemoryTelemetrySubmitter
5-
from splitio.engine.telemetry import TelemetryEvaluationConsumer, TelemetryInitConsumer, TelemetryRuntimeConsumer, TelemetryStorageConsumer
6-
from splitio.storage.inmemmory import InMemoryTelemetryStorage, InMemorySegmentStorage, InMemorySplitStorage
3+
import pytest
4+
5+
from splitio.sync.telemetry import TelemetrySynchronizer, TelemetrySynchronizerAsync, InMemoryTelemetrySubmitter, InMemoryTelemetrySubmitterAsync
6+
from splitio.engine.telemetry import TelemetryStorageConsumer, TelemetryStorageConsumerAsync
7+
from splitio.storage.inmemmory import InMemoryTelemetryStorage, InMemoryTelemetryStorageAsync, InMemorySegmentStorage, InMemorySegmentStorageAsync, InMemorySplitStorage, InMemorySplitStorageAsync
78
from splitio.models.splits import Split, Status
89
from splitio.models.segments import Segment
9-
from splitio.models.telemetry import StreamingEvents
10+
from splitio.models.telemetry import StreamingEvents, StreamingEventsAsync
1011
from splitio.api.telemetry import TelemetryAPI
1112

1213
class TelemetrySynchronizerTests(object):
@@ -24,6 +25,31 @@ def test_synchronize_stats(self, mocker):
2425
telemetry_synchronizer.synchronize_stats()
2526
assert(mocker.called)
2627

28+
29+
class TelemetrySynchronizerAsyncTests(object):
30+
"""Telemetry synchronizer async test cases."""
31+
32+
@pytest.mark.asyncio
33+
async def test_synchronize_config(self, mocker):
34+
telemetry_synchronizer = TelemetrySynchronizerAsync(InMemoryTelemetrySubmitterAsync(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()))
35+
self.called = False
36+
async def synchronize_config():
37+
self.called = True
38+
telemetry_synchronizer.synchronize_config = synchronize_config
39+
await telemetry_synchronizer.synchronize_config()
40+
assert(self.called)
41+
42+
@pytest.mark.asyncio
43+
async def test_synchronize_stats(self, mocker):
44+
telemetry_synchronizer = TelemetrySynchronizer(InMemoryTelemetrySubmitter(mocker.Mock(), mocker.Mock(), mocker.Mock(), mocker.Mock()))
45+
self.called = False
46+
async def synchronize_stats():
47+
self.called = True
48+
telemetry_synchronizer.synchronize_stats = synchronize_stats
49+
await telemetry_synchronizer.synchronize_stats()
50+
assert(self.called)
51+
52+
2753
class TelemetrySubmitterTests(object):
2854
"""Telemetry submitter test cases."""
2955

@@ -136,3 +162,118 @@ def record_stats(*args, **kwargs):
136162
"skC": 0,
137163
"t": ['tag1']
138164
})
165+
166+
167+
class TelemetrySubmitterAsyncTests(object):
168+
"""Telemetry submitter async test cases."""
169+
170+
@pytest.mark.asyncio
171+
async def test_synchronize_telemetry(self, mocker):
172+
api = mocker.Mock(spec=TelemetryAPI)
173+
telemetry_storage = await InMemoryTelemetryStorageAsync.create()
174+
telemetry_consumer = TelemetryStorageConsumerAsync(telemetry_storage)
175+
split_storage = InMemorySplitStorageAsync()
176+
await split_storage.put(Split('split1', 1234, 1, False, 'user', Status.ACTIVE, 123))
177+
segment_storage = InMemorySegmentStorageAsync()
178+
await segment_storage.put(Segment('segment1', [], 123))
179+
telemetry_submitter = InMemoryTelemetrySubmitterAsync(telemetry_consumer, split_storage, segment_storage, api)
180+
181+
telemetry_storage._counters._impressions_queued = 100
182+
telemetry_storage._counters._impressions_deduped = 30
183+
telemetry_storage._counters._impressions_dropped = 0
184+
telemetry_storage._counters._events_queued = 20
185+
telemetry_storage._counters._events_dropped = 10
186+
telemetry_storage._counters._auth_rejections = 1
187+
telemetry_storage._counters._token_refreshes = 3
188+
telemetry_storage._counters._session_length = 3
189+
190+
telemetry_storage._method_exceptions._treatment = 10
191+
telemetry_storage._method_exceptions._treatments = 1
192+
telemetry_storage._method_exceptions._treatment_with_config = 5
193+
telemetry_storage._method_exceptions._treatments_with_config = 1
194+
telemetry_storage._method_exceptions._track = 3
195+
196+
telemetry_storage._last_synchronization._split = 5
197+
telemetry_storage._last_synchronization._segment = 3
198+
telemetry_storage._last_synchronization._impression = 10
199+
telemetry_storage._last_synchronization._impression_count = 0
200+
telemetry_storage._last_synchronization._event = 4
201+
telemetry_storage._last_synchronization._telemetry = 0
202+
telemetry_storage._last_synchronization._token = 3
203+
204+
telemetry_storage._http_sync_errors._split = {'500': 3, '501': 2}
205+
telemetry_storage._http_sync_errors._segment = {'401': 1}
206+
telemetry_storage._http_sync_errors._impression = {'500': 1}
207+
telemetry_storage._http_sync_errors._impression_count = {'401': 5}
208+
telemetry_storage._http_sync_errors._event = {'404': 10}
209+
telemetry_storage._http_sync_errors._telemetry = {'501': 3}
210+
telemetry_storage._http_sync_errors._token = {'505': 11}
211+
212+
telemetry_storage._streaming_events = await StreamingEventsAsync.create()
213+
telemetry_storage._tags = ['tag1']
214+
215+
telemetry_storage._method_latencies._treatment = [1] + [0] * 22
216+
telemetry_storage._method_latencies._treatments = [0] * 23
217+
telemetry_storage._method_latencies._treatment_with_config = [0] * 23
218+
telemetry_storage._method_latencies._treatments_with_config = [0] * 23
219+
telemetry_storage._method_latencies._track = [0] * 23
220+
221+
telemetry_storage._http_latencies._split = [1] + [0] * 22
222+
telemetry_storage._http_latencies._segment = [0] * 23
223+
telemetry_storage._http_latencies._impression = [0] * 23
224+
telemetry_storage._http_latencies._impression_count = [0] * 23
225+
telemetry_storage._http_latencies._event = [0] * 23
226+
telemetry_storage._http_latencies._telemetry = [0] * 23
227+
telemetry_storage._http_latencies._token = [0] * 23
228+
229+
await telemetry_storage.record_config({'operationMode': 'inmemory',
230+
'storageType': None,
231+
'streamingEnabled': True,
232+
'impressionsQueueSize': 100,
233+
'eventsQueueSize': 200,
234+
'impressionsMode': 'DEBUG',
235+
'impressionListener': None,
236+
'featuresRefreshRate': 30,
237+
'segmentsRefreshRate': 30,
238+
'impressionsRefreshRate': 60,
239+
'eventsPushRate': 60,
240+
'metricsRefreshRate': 10,
241+
'activeFactoryCount': 1,
242+
'notReady': 0,
243+
'timeUntilReady': 1
244+
}, {}
245+
)
246+
self.formatted_config = ""
247+
async def record_init(*args, **kwargs):
248+
self.formatted_config = args[0]
249+
api.record_init = record_init
250+
251+
await telemetry_submitter.synchronize_config()
252+
assert(self.formatted_config == await telemetry_submitter._telemetry_init_consumer.get_config_stats())
253+
254+
async def record_stats(*args, **kwargs):
255+
self.formatted_stats = args[0]
256+
api.record_stats = record_stats
257+
258+
await telemetry_submitter.synchronize_stats()
259+
assert(self.formatted_stats == {
260+
"iQ": 100,
261+
"iDe": 30,
262+
"iDr": 0,
263+
"eQ": 20,
264+
"eD": 10,
265+
"lS": {"sp": 5, "se": 3, "im": 10, "ic": 0, "ev": 4, "te": 0, "to": 3},
266+
"t": ["tag1"],
267+
"hE": {"sp": {"500": 3, "501": 2}, "se": {"401": 1}, "im": {"500": 1}, "ic": {"401": 5}, "ev": {"404": 10}, "te": {"501": 3}, "to": {"505": 11}},
268+
"hL": {"sp": [1] + [0] * 22, "se": [0] * 23, "im": [0] * 23, "ic": [0] * 23, "ev": [0] * 23, "te": [0] * 23, "to": [0] * 23},
269+
"aR": 1,
270+
"tR": 3,
271+
"sE": [],
272+
"sL": 3,
273+
"mE": {"t": 10, "ts": 1, "tc": 5, "tcs": 1, "tr": 3},
274+
"mL": {"t": [1] + [0] * 22, "ts": [0] * 23, "tc": [0] * 23, "tcs": [0] * 23, "tr": [0] * 23},
275+
"spC": 1,
276+
"seC": 1,
277+
"skC": 0,
278+
"t": ['tag1']
279+
})

0 commit comments

Comments
 (0)