Skip to content

Commit a9e89da

Browse files
author
Matias Melograno
committed
added support for properties in track method
1 parent 00eb469 commit a9e89da

File tree

17 files changed

+193
-105
lines changed

17 files changed

+193
-105
lines changed

splitio/api/events.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from splitio.api.client import HttpClientException
88

99

10-
class EventsAPI(object): #pylint: disable=too-few-public-methods
10+
class EventsAPI(object): # pylint: disable=too-few-public-methods
1111
"""Class that uses an httpClient to communicate with the events API."""
1212

1313
def __init__(self, http_client, apikey, sdk_metadata):
@@ -43,7 +43,14 @@ def _build_bulk(events):
4343
'trafficTypeName': event.traffic_type_name,
4444
'eventTypeId': event.event_type_id,
4545
'value': event.value,
46-
'timestamp': event.timestamp
46+
'timestamp': event.timestamp,
47+
'properties': event.properties,
48+
} if event.properties is not None else {
49+
'key': event.key,
50+
'trafficTypeName': event.traffic_type_name,
51+
'eventTypeId': event.event_type_id,
52+
'value': event.value,
53+
'timestamp': event.timestamp,
4754
}
4855
for event in events
4956
]

splitio/api/impressions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from splitio.api.client import HttpClientException
1010

1111

12-
class ImpressionsAPI(object): # pylint: disable=too-few-public-methods
12+
class ImpressionsAPI(object): # pylint: disable=too-few-public-methods
1313
"""Class that uses an httpClient to communicate with the impressions API."""
1414

1515
def __init__(self, client, apikey, sdk_metadata):

splitio/client/client.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from splitio.engine.evaluator import Evaluator, CONTROL
99
from splitio.engine.splitters import Splitter
1010
from splitio.models.impressions import Impression, Label
11-
from splitio.models.events import Event
11+
from splitio.models.events import Event, EventWrapper
1212
from splitio.models.telemetry import get_latency_bucket_index
1313
from splitio.client import input_validator
1414
from splitio.client.listener import ImpressionListenerException
@@ -340,7 +340,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
340340
event_type = input_validator.validate_event_type(event_type)
341341
traffic_type = input_validator.validate_traffic_type(traffic_type)
342342
value = input_validator.validate_value(value)
343-
valid, properties = input_validator.valid_properties(properties)
343+
valid, properties, size = input_validator.valid_properties(properties)
344344

345345
if key is None or event_type is None or traffic_type is None or value is False \
346346
or valid is False:
@@ -354,4 +354,7 @@ def track(self, key, traffic_type, event_type, value=None, properties=None):
354354
timestamp=int(time.time()*1000),
355355
properties=properties,
356356
)
357-
return self._events_storage.put([event])
357+
return self._events_storage.put([EventWrapper(
358+
event=event,
359+
size=size,
360+
)])

splitio/client/factory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ def _build_in_memory_factory(api_key, config, sdk_url=None, events_url=None): #
304304
tasks['events'].start()
305305
tasks['telemetry'].start()
306306

307+
storages['events'].set_queue_full_hook(tasks['events'].flush())
308+
storages['impressions'].set_queue_full_hook(tasks['impressions'].flush())
309+
307310
def split_ready_task():
308311
"""Wait for splits to be ready and start fetching segments."""
309312
splits_ready_flag.wait()

splitio/client/input_validator.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import math
99

1010
import six
11-
import sys
1211

1312
from splitio.api import APIException
1413
from splitio.client.key import Key
@@ -476,15 +475,16 @@ def valid_properties(properties):
476475
:param properties: dict
477476
:type properties: dict
478477
:return: tuple
479-
:rtype: (bool,dict)
478+
:rtype: (bool,dict,int)
480479
"""
480+
size = 1024 # We assume 1kb events without properties (750 bytes avg measured)
481+
481482
if properties is None:
482-
return True, None
483+
return True, None, size
483484
if not isinstance(properties, dict):
484485
_LOGGER.error('track: properties must be of type dictionary.')
485-
return False, None
486+
return False, None, 0
486487

487-
size = 1024 # We assume 1kb events without properties (750 bytes avg measured)
488488
valid_properties = None
489489

490490
for property, element in properties.items():
@@ -495,7 +495,7 @@ def valid_properties(properties):
495495
valid_properties = dict()
496496

497497
valid_properties[property] = None
498-
size += sys.getsizeof(property)
498+
size += len(property)
499499

500500
if element is None:
501501
continue
@@ -506,17 +506,18 @@ def valid_properties(properties):
506506
element = None
507507

508508
valid_properties[property] = element
509-
size += sys.getsizeof(str(element))
509+
510+
if isinstance(element, six.string_types):
511+
size += len(element)
510512

511513
if size > MAX_PROPERTIES_LENGTH_BYTES:
512514
_LOGGER.error(
513515
'The maximum size allowed for the properties is 32768 bytes. ' +
514516
'Current one is ' + str(size) + ' bytes. Event not queued'
515517
)
516-
return False, None
518+
return False, None, size
517519

518520
if isinstance(valid_properties, dict) and len(valid_properties.keys()) > 300:
519521
_LOGGER.warning('Event has more than 300 properties. Some of them will be trimmed' +
520522
' when processed')
521-
522-
return True, valid_properties
523+
return True, valid_properties, size

splitio/models/events.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,8 @@
1616
'timestamp',
1717
'properties',
1818
])
19+
20+
EventWrapper = namedtuple('EventWrapper', [
21+
'event',
22+
'size',
23+
])

splitio/storage/inmemmory.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, \
99
TelemetryStorage
1010

11+
MAX_SIZE_BYTES = 5 * 1024 * 1024
12+
1113

1214
class InMemorySplitStorage(SplitStorage):
1315
"""InMemory implementation of a split storage."""
@@ -149,7 +151,7 @@ def update(self, segment_name, to_add, to_remove, change_number=None):
149151
:type to_remove: Set
150152
"""
151153
with self._lock:
152-
if not segment_name in self._segments:
154+
if segment_name not in self._segments:
153155
self._segments[segment_name] = Segment(segment_name, to_add, change_number)
154156
return
155157

@@ -167,7 +169,7 @@ def get_change_number(self, segment_name):
167169
:rtype: int
168170
"""
169171
with self._lock:
170-
if not segment_name in self._segments:
172+
if segment_name not in self._segments:
171173
return None
172174
return self._segments[segment_name].change_number
173175

@@ -181,7 +183,7 @@ def set_change_number(self, segment_name, new_change_number):
181183
:type new_change_number: int
182184
"""
183185
with self._lock:
184-
if not segment_name in self._segments:
186+
if segment_name not in self._segments:
185187
return
186188
self._segments[segment_name].change_number = new_change_number
187189

@@ -198,7 +200,7 @@ def segment_contains(self, segment_name, key):
198200
:rtype: bool
199201
"""
200202
with self._lock:
201-
if not segment_name in self._segments:
203+
if segment_name not in self._segments:
202204
self._logger.warning(
203205
"Tried to query members for nonexistant segment %s. Returning False",
204206
segment_name
@@ -283,6 +285,7 @@ def __init__(self, eventsQueueSize):
283285
self._lock = threading.Lock()
284286
self._events = queue.Queue(maxsize=eventsQueueSize)
285287
self._queue_full_hook = None
288+
self._size = 0
286289

287290
def set_queue_full_hook(self, hook):
288291
"""
@@ -295,21 +298,27 @@ def set_queue_full_hook(self, hook):
295298

296299
def put(self, events):
297300
"""
298-
Add an avent to storage.
301+
Add an event to storage.
299302
300303
:param event: Event to be added in the storage
301304
"""
302305
try:
303306
with self._lock:
304307
for event in events:
305-
self._events.put(event, False)
308+
self._size += event.size
309+
310+
if self._size >= MAX_SIZE_BYTES:
311+
self._queue_full_hook()
312+
return False
313+
314+
self._events.put(event.event, False)
306315
return True
307316
except queue.Full:
308317
if self._queue_full_hook is not None and callable(self._queue_full_hook):
309318
self._queue_full_hook()
310319
self._logger.warning(
311320
'Events queue is full, failing to add more events. \n'
312-
'Consider increasing parameter `impressionsQueueSize` in configuration'
321+
'Consider increasing parameter `eventsQueueSize` in configuration'
313322
)
314323
return False
315324

@@ -324,6 +333,7 @@ def pop_many(self, count):
324333
while not self._events.empty() and count > 0:
325334
events.append(self._events.get(False))
326335
count -= 1
336+
self._size = 0
327337
return events
328338

329339

splitio/storage/redis.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,19 +371,20 @@ def put(self, events):
371371
to_store = [
372372
json.dumps({
373373
'e': {
374-
'key': event.key,
375-
'trafficTypeName': event.traffic_type_name,
376-
'eventTypeId': event.event_type_id,
377-
'value': event.value,
378-
'timestamp': event.timestamp
374+
'key': e.event.key,
375+
'trafficTypeName': e.event.traffic_type_name,
376+
'eventTypeId': e.event.event_type_id,
377+
'value': e.event.value,
378+
'timestamp': e.event.timestamp,
379+
'properties': e.event.properties,
379380
},
380381
'm': {
381382
's': self._sdk_metadata.sdk_version,
382383
'n': self._sdk_metadata.instance_name,
383384
'i': self._sdk_metadata.instance_ip,
384385
}
385386
})
386-
for event in events
387+
for e in events
387388
]
388389
try:
389390
self._redis.rpush(key, *to_store)

splitio/storage/uwsgi.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from splitio.models.events import Event
1010
from splitio.storage.adapters.uwsgi_cache import _SPLITIO_CHANGE_NUMBERS, \
1111
_SPLITIO_EVENTS_CACHE_NAMESPACE, _SPLITIO_IMPRESSIONS_CACHE_NAMESPACE, \
12-
_SPLITIO_METRICS_CACHE_NAMESPACE, _SPLITIO_MISC_NAMESPACE, UWSGILock, \
12+
_SPLITIO_METRICS_CACHE_NAMESPACE, _SPLITIO_MISC_NAMESPACE, UWSGILock, \
1313
_SPLITIO_SEGMENTS_CACHE_NAMESPACE, _SPLITIO_SPLITS_CACHE_NAMESPACE, \
1414
_SPLITIO_LOCK_CACHE_NAMESPACE
1515

@@ -113,7 +113,7 @@ def remove(self, split_name):
113113
self._KEY_TEMPLATE.format(suffix=split_name),
114114
_SPLITIO_SPLITS_CACHE_NAMESPACE
115115
)
116-
if not result is False:
116+
if result is not False:
117117
self._logger.warning("Trying to retrieve nonexistant split %s. Ignoring.", split_name)
118118
return result
119119

@@ -149,8 +149,8 @@ def get_split_names(self):
149149
return json.loads(
150150
self._uwsgi.cache_get(self._KEY_FEATURE_LIST, _SPLITIO_MISC_NAMESPACE)
151151
)
152-
except TypeError: # Thrown by json.loads when passing none
153-
pass # Fall back to default return statement (empty list)
152+
except TypeError: # Thrown by json.loads when passing none
153+
pass # Fall back to default return statement (empty list)
154154
return []
155155

156156
def get_all_splits(self):
@@ -251,7 +251,6 @@ def put(self, segment):
251251
)
252252
self.set_change_number(segment.name, segment.change_number)
253253

254-
255254
def get_change_number(self, segment_name):
256255
"""
257256
Retrieve latest change number for a segment.
@@ -424,7 +423,7 @@ def put(self, events):
424423
current = []
425424
self._uwsgi.cache_update(
426425
self._EVENTS_KEY,
427-
json.dumps(current + [e._asdict() for e in events]),
426+
json.dumps(current + [e.event._asdict() for e in events]),
428427
0,
429428
_SPLITIO_EVENTS_CACHE_NAMESPACE
430429
)
@@ -457,8 +456,17 @@ def pop_many(self, count):
457456
event['traffic_type_name'],
458457
event['event_type_id'],
459458
event['value'],
460-
event['timestamp']
461-
) for event in current[:count]
459+
event['timestamp'],
460+
event['properties']
461+
) if 'properties' in event else
462+
Event(
463+
event['key'],
464+
event['traffic_type_name'],
465+
event['event_type_id'],
466+
event['value'],
467+
event['timestamp'],
468+
)
469+
for event in current[:count]
462470
]
463471

464472
def request_flush(self):
@@ -501,7 +509,6 @@ def __init__(self, uwsgi_entrypoint):
501509
self._uwsgi = uwsgi_entrypoint
502510
self._logger = logging.getLogger(self.__class__.__name__)
503511

504-
505512
def inc_latency(self, name, bucket):
506513
"""
507514
Add a latency.
@@ -516,7 +523,8 @@ def inc_latency(self, name, bucket):
516523
return
517524

518525
with UWSGILock(self._uwsgi, self._LATENCIES_LOCK_KEY):
519-
latencies_raw = self._uwsgi.cache_get(self._LATENCIES_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
526+
latencies_raw = self._uwsgi.cache_get(
527+
self._LATENCIES_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
520528
latencies = json.loads(latencies_raw) if latencies_raw else {}
521529
to_update = latencies.get(name, [0] * 22)
522530
to_update[bucket] += 1
@@ -536,7 +544,8 @@ def inc_counter(self, name):
536544
:type name: str
537545
"""
538546
with UWSGILock(self._uwsgi, self._COUNTERS_LOCK_KEY):
539-
counters_raw = self._uwsgi.cache_get(self._COUNTERS_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
547+
counters_raw = self._uwsgi.cache_get(
548+
self._COUNTERS_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
540549
counters = json.loads(counters_raw) if counters_raw else {}
541550
value = counters.get(name, 0)
542551
value += 1
@@ -575,7 +584,8 @@ def pop_counters(self):
575584
:rtype: list
576585
"""
577586
with UWSGILock(self._uwsgi, self._COUNTERS_LOCK_KEY):
578-
counters_raw = self._uwsgi.cache_get(self._COUNTERS_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
587+
counters_raw = self._uwsgi.cache_get(
588+
self._COUNTERS_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
579589
self._uwsgi.cache_del(self._COUNTERS_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
580590
return json.loads(counters_raw) if counters_raw else {}
581591

@@ -598,6 +608,7 @@ def pop_latencies(self):
598608
:rtype: list
599609
"""
600610
with UWSGILock(self._uwsgi, self._LATENCIES_LOCK_KEY):
601-
latencies_raw = self._uwsgi.cache_get(self._LATENCIES_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
611+
latencies_raw = self._uwsgi.cache_get(
612+
self._LATENCIES_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
602613
self._uwsgi.cache_del(self._LATENCIES_KEY, _SPLITIO_METRICS_CACHE_NAMESPACE)
603614
return json.loads(latencies_raw) if latencies_raw else {}

0 commit comments

Comments
 (0)