Skip to content

Commit 1d8b448

Browse files
authored
Merge pull request #560 from splitio/rbs_sse
Updated SSE classes
2 parents 4cd84cd + db5eafc commit 1d8b448

File tree

16 files changed

+1308
-267
lines changed

16 files changed

+1308
-267
lines changed

splitio/models/rule_based_segments.py

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
class RuleBasedSegment(object):
1212
"""RuleBasedSegment object class."""
1313

14-
def __init__(self, name, traffic_yype_Name, change_number, status, conditions, excluded):
14+
def __init__(self, name, traffic_type_name, change_number, status, conditions, excluded):
1515
"""
1616
Class constructor.
1717
1818
:param name: Segment name.
1919
:type name: str
20-
:param traffic_yype_Name: traffic type name.
21-
:type traffic_yype_Name: str
20+
:param traffic_type_name: traffic type name.
21+
:type traffic_type_name: str
2222
:param change_number: change number.
2323
:type change_number: str
2424
:param status: status.
@@ -29,7 +29,7 @@ def __init__(self, name, traffic_yype_Name, change_number, status, conditions, e
2929
:type excluded: Excluded
3030
"""
3131
self._name = name
32-
self._traffic_yype_Name = traffic_yype_Name
32+
self._traffic_type_name = traffic_type_name
3333
self._change_number = change_number
3434
self._status = status
3535
self._conditions = conditions
@@ -41,9 +41,9 @@ def name(self):
4141
return self._name
4242

4343
@property
44-
def traffic_yype_Name(self):
44+
def traffic_type_name(self):
4545
"""Return traffic type name."""
46-
return self._traffic_yype_Name
46+
return self._traffic_type_name
4747

4848
@property
4949
def change_number(self):
@@ -65,6 +65,17 @@ def excluded(self):
6565
"""Return excluded."""
6666
return self._excluded
6767

68+
def to_json(self):
69+
"""Return a JSON representation of this rule based segment."""
70+
return {
71+
'changeNumber': self.change_number,
72+
'trafficTypeName': self.traffic_type_name,
73+
'name': self.name,
74+
'status': self.status,
75+
'conditions': [c.to_json() for c in self.conditions],
76+
'excluded': self.excluded.to_json()
77+
}
78+
6879
def from_raw(raw_rule_based_segment):
6980
"""
7081
Parse a Rule based segment from a JSON portion of splitChanges.
@@ -111,3 +122,10 @@ def get_excluded_keys(self):
111122
def get_excluded_segments(self):
112123
"""Return excluded segments"""
113124
return self._segments
125+
126+
def to_json(self):
127+
"""Return a JSON representation of this object."""
128+
return {
129+
'keys': self._keys,
130+
'segments': self._segments
131+
}

splitio/models/telemetry.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ class OperationMode(Enum):
140140
class UpdateFromSSE(Enum):
141141
"""Update from sse constants"""
142142
SPLIT_UPDATE = 'sp'
143+
RBS_UPDATE = 'rbs'
143144

144145
def get_latency_bucket_index(micros):
145146
"""

splitio/push/parser.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class UpdateType(Enum):
2828
SPLIT_UPDATE = 'SPLIT_UPDATE'
2929
SPLIT_KILL = 'SPLIT_KILL'
3030
SEGMENT_UPDATE = 'SEGMENT_UPDATE'
31+
RB_SEGMENT_UPDATE = 'RB_SEGMENT_UPDATE'
3132

3233

3334
class ControlType(Enum):
@@ -329,7 +330,7 @@ def __init__(self, channel, timestamp, change_number, previous_change_number, fe
329330
"""Class constructor."""
330331
BaseUpdate.__init__(self, channel, timestamp, change_number)
331332
self._previous_change_number = previous_change_number
332-
self._feature_flag_definition = feature_flag_definition
333+
self._object_definition = feature_flag_definition
333334
self._compression = compression
334335

335336
@property
@@ -352,13 +353,13 @@ def previous_change_number(self): # pylint:disable=no-self-use
352353
return self._previous_change_number
353354

354355
@property
355-
def feature_flag_definition(self): # pylint:disable=no-self-use
356+
def object_definition(self): # pylint:disable=no-self-use
356357
"""
357358
Return feature flag definition
358359
:returns: The new feature flag definition
359360
:rtype: str
360361
"""
361-
return self._feature_flag_definition
362+
return self._object_definition
362363

363364
@property
364365
def compression(self): # pylint:disable=no-self-use
@@ -451,6 +452,56 @@ def __str__(self):
451452
"""Return string representation."""
452453
return "SegmentChange - changeNumber=%d, name=%s" % (self.change_number, self.segment_name)
453454

455+
class RBSChangeUpdate(BaseUpdate):
456+
"""rbs Change notification."""
457+
458+
def __init__(self, channel, timestamp, change_number, previous_change_number, rbs_definition, compression):
459+
"""Class constructor."""
460+
BaseUpdate.__init__(self, channel, timestamp, change_number)
461+
self._previous_change_number = previous_change_number
462+
self._object_definition = rbs_definition
463+
self._compression = compression
464+
465+
@property
466+
def update_type(self): # pylint:disable=no-self-use
467+
"""
468+
Return the message type.
469+
470+
:returns: The type of this parsed Update.
471+
:rtype: UpdateType
472+
"""
473+
return UpdateType.RB_SEGMENT_UPDATE
474+
475+
@property
476+
def previous_change_number(self): # pylint:disable=no-self-use
477+
"""
478+
Return previous change number
479+
:returns: The previous change number
480+
:rtype: int
481+
"""
482+
return self._previous_change_number
483+
484+
@property
485+
def object_definition(self): # pylint:disable=no-self-use
486+
"""
487+
Return rbs definition
488+
:returns: The new rbs definition
489+
:rtype: str
490+
"""
491+
return self._object_definition
492+
493+
@property
494+
def compression(self): # pylint:disable=no-self-use
495+
"""
496+
Return previous compression type
497+
:returns: The compression type
498+
:rtype: int
499+
"""
500+
return self._compression
501+
502+
def __str__(self):
503+
"""Return string representation."""
504+
return "RBSChange - changeNumber=%d" % (self.change_number)
454505

455506
class ControlMessage(BaseMessage):
456507
"""Control notification."""
@@ -503,6 +554,9 @@ def _parse_update(channel, timestamp, data):
503554
if update_type == UpdateType.SPLIT_UPDATE and change_number is not None:
504555
return SplitChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
505556

557+
if update_type == UpdateType.RB_SEGMENT_UPDATE and change_number is not None:
558+
return RBSChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
559+
506560
elif update_type == UpdateType.SPLIT_KILL and change_number is not None:
507561
return SplitKillUpdate(channel, timestamp, change_number,
508562
data['splitName'], data['defaultTreatment'])

splitio/push/processor.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@ def __init__(self, synchronizer, telemetry_runtime_producer):
3535
self._feature_flag_queue = Queue()
3636
self._segments_queue = Queue()
3737
self._synchronizer = synchronizer
38-
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer)
38+
self._feature_flag_worker = SplitWorker(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer, synchronizer.split_sync.rule_based_segment_storage)
3939
self._segments_worker = SegmentWorker(synchronizer.synchronize_segment, self._segments_queue)
4040
self._handlers = {
4141
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,
4242
UpdateType.SPLIT_KILL: self._handle_feature_flag_kill,
43-
UpdateType.SEGMENT_UPDATE: self._handle_segment_change
43+
UpdateType.SEGMENT_UPDATE: self._handle_segment_change,
44+
UpdateType.RB_SEGMENT_UPDATE: self._handle_feature_flag_update
4445
}
4546

4647
def _handle_feature_flag_update(self, event):
@@ -119,12 +120,13 @@ def __init__(self, synchronizer, telemetry_runtime_producer):
119120
self._feature_flag_queue = asyncio.Queue()
120121
self._segments_queue = asyncio.Queue()
121122
self._synchronizer = synchronizer
122-
self._feature_flag_worker = SplitWorkerAsync(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer)
123+
self._feature_flag_worker = SplitWorkerAsync(synchronizer.synchronize_splits, synchronizer.synchronize_segment, self._feature_flag_queue, synchronizer.split_sync.feature_flag_storage, synchronizer.segment_storage, telemetry_runtime_producer, synchronizer.split_sync.rule_based_segment_storage)
123124
self._segments_worker = SegmentWorkerAsync(synchronizer.synchronize_segment, self._segments_queue)
124125
self._handlers = {
125126
UpdateType.SPLIT_UPDATE: self._handle_feature_flag_update,
126127
UpdateType.SPLIT_KILL: self._handle_feature_flag_kill,
127-
UpdateType.SEGMENT_UPDATE: self._handle_segment_change
128+
UpdateType.SEGMENT_UPDATE: self._handle_segment_change,
129+
UpdateType.RB_SEGMENT_UPDATE: self._handle_feature_flag_update
128130
}
129131

130132
async def _handle_feature_flag_update(self, event):

0 commit comments

Comments
 (0)