Skip to content

Commit 3396b5f

Browse files
committed
Updated SSE classes
1 parent 4cd84cd commit 3396b5f

File tree

6 files changed

+292
-81
lines changed

6 files changed

+292
-81
lines changed

splitio/models/telemetry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class OperationMode(Enum):
140140
class UpdateFromSSE(Enum):
141141
"""Update from sse constants"""
142142
SPLIT_UPDATE = 'sp'
143+
RBS_UPDATE = 'rbs'
143144

144145
def get_latency_bucket_index(micros):
145146
"""

splitio/push/parser.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class UpdateType(Enum):
2828
SPLIT_UPDATE = 'SPLIT_UPDATE'
2929
SPLIT_KILL = 'SPLIT_KILL'
3030
SEGMENT_UPDATE = 'SEGMENT_UPDATE'
31+
RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE'
3132

3233

3334
class ControlType(Enum):
@@ -329,7 +330,7 @@ def __init__(self, channel, timestamp, change_number, previous_change_number, fe
329330
"""Class constructor."""
330331
BaseUpdate.__init__(self, channel, timestamp, change_number)
331332
self._previous_change_number = previous_change_number
332-
self._feature_flag_definition = feature_flag_definition
333+
self._object_definition = feature_flag_definition
333334
self._compression = compression
334335

335336
@property
@@ -352,13 +353,13 @@ def previous_change_number(self): # pylint:disable=no-self-use
352353
return self._previous_change_number
353354

354355
@property
355-
def feature_flag_definition(self): # pylint:disable=no-self-use
356+
def object_definition(self): # pylint:disable=no-self-use
356357
"""
357358
Return feature flag definition
358359
:returns: The new feature flag definition
359360
:rtype: str
360361
"""
361-
return self._feature_flag_definition
362+
return self._object_definition
362363

363364
@property
364365
def compression(self): # pylint:disable=no-self-use
@@ -451,6 +452,56 @@ def __str__(self):
451452
"""Return string representation."""
452453
return "SegmentChange - changeNumber=%d, name=%s" % (self.change_number, self.segment_name)
453454

455+
class RBSChangeUpdate(BaseUpdate):
456+
"""rbs Change notification."""
457+
458+
def __init__(self, channel, timestamp, change_number, previous_change_number, rbs_definition, compression):
459+
"""Class constructor."""
460+
BaseUpdate.__init__(self, channel, timestamp, change_number)
461+
self._previous_change_number = previous_change_number
462+
self._object_definition = rbs_definition
463+
self._compression = compression
464+
465+
@property
466+
def update_type(self): # pylint:disable=no-self-use
467+
"""
468+
Return the message type.
469+
470+
:returns: The type of this parsed Update.
471+
:rtype: UpdateType
472+
"""
473+
return UpdateType.RB_SEGMENT_UPDATE
474+
475+
@property
476+
def previous_change_number(self): # pylint:disable=no-self-use
477+
"""
478+
Return previous change number
479+
:returns: The previous change number
480+
:rtype: int
481+
"""
482+
return self._previous_change_number
483+
484+
@property
485+
def object_definition(self): # pylint:disable=no-self-use
486+
"""
487+
Return rbs definition
488+
:returns: The new rbs definition
489+
:rtype: str
490+
"""
491+
return self._object_definition
492+
493+
@property
494+
def compression(self): # pylint:disable=no-self-use
495+
"""
496+
Return previous compression type
497+
:returns: The compression type
498+
:rtype: int
499+
"""
500+
return self._compression
501+
502+
def __str__(self):
503+
"""Return string representation."""
504+
return "RBSChange - changeNumber=%d" % (self.change_number)
454505

455506
class ControlMessage(BaseMessage):
456507
"""Control notification."""
@@ -503,6 +554,9 @@ def _parse_update(channel, timestamp, data):
503554
if update_type == UpdateType.SPLIT_UPDATE and change_number is not None:
504555
return SplitChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
505556

557+
if update_type == UpdateType.RB_SEGMENT_UPDATE and change_number is not None:
558+
return RBSChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
559+
506560
elif update_type == UpdateType.SPLIT_KILL and change_number is not None:
507561
return SplitKillUpdate(channel, timestamp, change_number,
508562
data['splitName'], data['defaultTreatment'])

splitio/push/processor.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ def __init__(self, synchronizer, telemetry_runtime_producer):
3535
self._feature_flag_queue = Queue()
3636
self._segments_queue = Queue()
3737
self._synchronizer = synchronizer
38-
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer)
38+
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer, synchronizer.split_sync.rule_based_segment_storage)
3939
self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue)
4040
self._handlers = {
4141
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,
4242
UpdateType.SPLIT_KILL: self._handle_feature_flag_kill,
43-
UpdateType.SEGMENT_UPDATE: self._handle_segment_change
43+
UpdateType.SEGMENT_UPDATE: self._handle_segment_change,
44+
UpdateType.RB_SEGMENT_UPDATE: self._handle_feature_flag_update
4445
}
4546

4647
def _handle_feature_flag_update(self, event):
@@ -119,12 +120,13 @@ def __init__(self, synchronizer, telemetry_runtime_producer):
119120
self._feature_flag_queue = asyncio.Queue()
120121
self._segments_queue = asyncio.Queue()
121122
self._synchronizer = synchronizer
122-
self._feature_flag_worker = SplitWorkerAsync(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer)
123+
self._feature_flag_worker = SplitWorkerAsync(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer, synchronizer.split_sync.rule_based_segment_storage)
123124
self._segments_worker = SegmentWorkerAsync(synchronizer.synchronize_segment, self._segments_queue)
124125
self._handlers = {
125126
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,
126127
UpdateType.SPLIT_KILL: self._handle_feature_flag_kill,
127-
UpdateType.SEGMENT_UPDATE: self._handle_segment_change
128+
UpdateType.SEGMENT_UPDATE: self._handle_segment_change,
129+
UpdateType.RB_SEGMENT_UPDATE: self._handle_feature_flag_update
128130
}
129131

130132
async def _handle_feature_flag_update(self, event):

splitio/push/workers.py

Lines changed: 90 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,13 @@
99
from enum import Enum
1010

1111
from splitio.models.splits import from_raw
12+
from splitio.models.rule_based_segments import from_raw as rbs_from_raw
1213
from splitio.models.telemetry import UpdateFromSSE
1314
from splitio.push import SplitStorageException
1415
from splitio.push.parser import UpdateType
1516
from splitio.optional.loaders import asyncio
16-
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async
17+
from splitio.util.storage_helper import update_feature_flag_storage, update_feature_flag_storage_async, \
18+
update_rule_based_segment_storage, update_rule_based_segment_storage_async
1719

1820
_LOGGER = logging.getLogger(__name__)
1921

@@ -25,9 +27,9 @@ class CompressionMode(Enum):
2527
ZLIB_COMPRESSION = 2
2628

2729
_compression_handlers = {
28-
CompressionMode.NO_COMPRESSION: lambda event: base64.b64decode(event.feature_flag_definition),
29-
CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
30-
CompressionMode.ZLIB_COMPRESSION: lambda event: zlib.decompress(base64.b64decode(event.feature_flag_definition)).decode('utf-8'),
30+
CompressionMode.NO_COMPRESSION: lambda event: base64.b64decode(event.object_definition),
31+
CompressionMode.GZIP_COMPRESSION: lambda event: gzip.decompress(base64.b64decode(event.object_definition)).decode('utf-8'),
32+
CompressionMode.ZLIB_COMPRESSION: lambda event: zlib.decompress(base64.b64decode(event.object_definition)).decode('utf-8'),
3133
}
3234

3335
class WorkerBase(object, metaclass=abc.ABCMeta):
@@ -45,10 +47,19 @@ def start(self):
4547
def stop(self):
4648
"""Stop worker."""
4749

48-
def _get_feature_flag_definition(self, event):
49-
"""return feature flag definition in event."""
50+
def _get_object_definition(self, event):
51+
"""return feature flag or rule based segment definition in event."""
5052
cm = CompressionMode(event.compression) # will throw if the number is not defined in compression mode
5153
return _compression_handlers[cm](event)
54+
55+
def _get_referenced_rbs(self, feature_flag):
56+
referenced_rbs = set()
57+
for condition in feature_flag.conditions:
58+
for matcher in condition.matchers:
59+
raw_matcher = matcher.to_json()
60+
if raw_matcher['matcherType'] == 'IN_RULE_BASED_SEGMENT':
61+
referenced_rbs.add(raw_matcher['userDefinedSegmentMatcherData']['segmentName'])
62+
return referenced_rbs
5263

5364
class SegmentWorker(WorkerBase):
5465
"""Segment Worker for processing updates."""
@@ -173,7 +184,7 @@ class SplitWorker(WorkerBase):
173184

174185
_centinel = object()
175186

176-
def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer):
187+
def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer, rule_based_segment_storage):
177188
"""
178189
Class constructor.
179190
@@ -189,6 +200,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
189200
:type segment_storage: splitio.storage.inmemory.InMemorySegmentStorage
190201
:param telemetry_runtime_producer: Telemetry runtime producer instance
191202
:type telemetry_runtime_producer: splitio.engine.telemetry.TelemetryRuntimeProducer
203+
:param rule_based_segment_storage: Rule based segment Storage.
204+
:type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage
192205
"""
193206
self._feature_flag_queue = feature_flag_queue
194207
self._handler = synchronize_feature_flag
@@ -198,6 +211,7 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
198211
self._feature_flag_storage = feature_flag_storage
199212
self._segment_storage = segment_storage
200213
self._telemetry_runtime_producer = telemetry_runtime_producer
214+
self._rule_based_segment_storage = rule_based_segment_storage
201215

202216
def is_running(self):
203217
"""Return whether the working is running."""
@@ -206,25 +220,40 @@ def is_running(self):
206220
def _apply_iff_if_needed(self, event):
207221
if not self._check_instant_ff_update(event):
208222
return False
209-
210223
try:
211-
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
212-
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
213-
for segment_name in segment_list:
214-
if self._segment_storage.get(segment_name) is None:
215-
_LOGGER.debug('Fetching new segment %s', segment_name)
216-
self._segment_handler(segment_name, event.change_number)
217-
218-
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
224+
if event.update_type == UpdateType.SPLIT_UPDATE:
225+
new_feature_flag = from_raw(json.loads(self._get_object_definition(event)))
226+
segment_list = update_feature_flag_storage(self._feature_flag_storage, [new_feature_flag], event.change_number)
227+
for segment_name in segment_list:
228+
if self._segment_storage.get(segment_name) is None:
229+
_LOGGER.debug('Fetching new segment %s', segment_name)
230+
self._segment_handler(segment_name, event.change_number)
231+
232+
referenced_rbs = self._get_referenced_rbs(new_feature_flag)
233+
if len(referenced_rbs) > 0 and not self._rule_based_segment_storage.contains(referenced_rbs):
234+
_LOGGER.debug('Fetching new rule based segment(s) %s', referenced_rbs)
235+
self._handler(None, event.change_number)
236+
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
237+
else:
238+
new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event)))
239+
segment_list = update_rule_based_segment_storage(self._rule_based_segment_storage, [new_rbs], event.change_number)
240+
for segment_name in segment_list:
241+
if self._segment_storage.get(segment_name) is None:
242+
_LOGGER.debug('Fetching new segment %s', segment_name)
243+
self._segment_handler(segment_name, event.change_number)
244+
self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE)
219245
return True
220-
246+
221247
except Exception as e:
222248
raise SplitStorageException(e)
223249

224250
def _check_instant_ff_update(self, event):
225251
if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == self._feature_flag_storage.get_change_number():
226252
return True
227253

254+
if event.update_type == UpdateType.RB_SEGMENT_UPDATE and event.compression is not None and event.previous_change_number == self._rule_based_segment_storage.get_change_number():
255+
return True
256+
228257
return False
229258

230259
def _run(self):
@@ -239,8 +268,13 @@ def _run(self):
239268
try:
240269
if self._apply_iff_if_needed(event):
241270
continue
242-
243-
sync_result = self._handler(event.change_number)
271+
till = None
272+
rbs_till = None
273+
if event.update_type == UpdateType.SPLIT_UPDATE:
274+
till = event.change_number
275+
else:
276+
rbs_till = event.change_number
277+
sync_result = self._handler(till, rbs_till)
244278
if not sync_result.success and sync_result.error_code is not None and sync_result.error_code == 414:
245279
_LOGGER.error("URI too long exception caught, sync failed")
246280

@@ -279,7 +313,7 @@ class SplitWorkerAsync(WorkerBase):
279313

280314
_centinel = object()
281315

282-
def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer):
316+
def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_queue, feature_flag_storage, segment_storage, telemetry_runtime_producer, rule_based_segment_storage):
283317
"""
284318
Class constructor.
285319
@@ -295,6 +329,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
295329
:type segment_storage: splitio.storage.inmemory.InMemorySegmentStorage
296330
:param telemetry_runtime_producer: Telemetry runtime producer instance
297331
:type telemetry_runtime_producer: splitio.engine.telemetry.TelemetryRuntimeProducer
332+
:param rule_based_segment_storage: Rule based segment Storage.
333+
:type rule_based_segment_storage: splitio.storage.InMemoryRuleBasedStorage
298334
"""
299335
self._feature_flag_queue = feature_flag_queue
300336
self._handler = synchronize_feature_flag
@@ -303,7 +339,8 @@ def __init__(self, synchronize_feature_flag, synchronize_segment, feature_flag_q
303339
self._feature_flag_storage = feature_flag_storage
304340
self._segment_storage = segment_storage
305341
self._telemetry_runtime_producer = telemetry_runtime_producer
306-
342+
self._rule_based_segment_storage = rule_based_segment_storage
343+
307344
def is_running(self):
308345
"""Return whether the working is running."""
309346
return self._running
@@ -312,23 +349,39 @@ async def _apply_iff_if_needed(self, event):
312349
if not await self._check_instant_ff_update(event):
313350
return False
314351
try:
315-
new_feature_flag = from_raw(json.loads(self._get_feature_flag_definition(event)))
316-
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number)
317-
for segment_name in segment_list:
318-
if await self._segment_storage.get(segment_name) is None:
319-
_LOGGER.debug('Fetching new segment %s', segment_name)
320-
await self._segment_handler(segment_name, event.change_number)
321-
322-
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
352+
if event.update_type == UpdateType.SPLIT_UPDATE:
353+
new_feature_flag = from_raw(json.loads(self._get_object_definition(event)))
354+
segment_list = await update_feature_flag_storage_async(self._feature_flag_storage, [new_feature_flag], event.change_number)
355+
for segment_name in segment_list:
356+
if await self._segment_storage.get(segment_name) is None:
357+
_LOGGER.debug('Fetching new segment %s', segment_name)
358+
await self._segment_handler(segment_name, event.change_number)
359+
360+
referenced_rbs = self._get_referenced_rbs(new_feature_flag)
361+
if len(referenced_rbs) > 0 and not await self._rule_based_segment_storage.contains(referenced_rbs):
362+
await self._handler(None, event.change_number)
363+
364+
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.SPLIT_UPDATE)
365+
else:
366+
new_rbs = rbs_from_raw(json.loads(self._get_object_definition(event)))
367+
segment_list = await update_rule_based_segment_storage_async(self._rule_based_segment_storage, [new_rbs], event.change_number)
368+
for segment_name in segment_list:
369+
if await self._segment_storage.get(segment_name) is None:
370+
_LOGGER.debug('Fetching new segment %s', segment_name)
371+
await self._segment_handler(segment_name, event.change_number)
372+
await self._telemetry_runtime_producer.record_update_from_sse(UpdateFromSSE.RBS_UPDATE)
323373
return True
324374

325375
except Exception as e:
326376
raise SplitStorageException(e)
327377

328-
329378
async def _check_instant_ff_update(self, event):
330379
if event.update_type == UpdateType.SPLIT_UPDATE and event.compression is not None and event.previous_change_number == await self._feature_flag_storage.get_change_number():
331380
return True
381+
382+
if event.update_type == UpdateType.RB_SEGMENT_UPDATE and event.compression is not None and event.previous_change_number == await self._rule_based_segment_storage.get_change_number():
383+
return True
384+
332385
return False
333386

334387
async def _run(self):
@@ -343,7 +396,13 @@ async def _run(self):
343396
try:
344397
if await self._apply_iff_if_needed(event):
345398
continue
346-
await self._handler(event.change_number)
399+
till = None
400+
rbs_till = None
401+
if event.update_type == UpdateType.SPLIT_UPDATE:
402+
till = event.change_number
403+
else:
404+
rbs_till = event.change_number
405+
await self._handler(till, rbs_till)
347406
except SplitStorageException as e: # pylint: disable=broad-except
348407
_LOGGER.error('Exception Updating Feature Flag')
349408
_LOGGER.debug('Exception information: ', exc_info=True)

splitio/spec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
SPEC_VERSION = '1.1'
1+
SPEC_VERSION = '1.3'

0 commit comments

Comments
 (0)