File tree Expand file tree Collapse file tree 2 files changed +18
-5
lines changed
Expand file tree Collapse file tree 2 files changed +18
-5
lines changed Original file line number Diff line number Diff line change @@ -39,8 +39,4 @@ def _uniques_formatter(self, uniques):
3939
4040 return {
4141 'keys' : [{'f' : feature , 'ks' : list (keys )} for feature , keys in uniques .items ()]
42- }
43- # for key in uniques:
44- # formatted_uniques["keys"].append(json.loads('{"f":"' + key +'", "ks":' + json.dumps(list(uniques[key])) + '}'))
45-
46- # return formatted_uniques
42+ }
Original file line number Diff line number Diff line change 22from splitio .engine .sender_adapters .in_memory_sender_adapter import InMemorySenderAdapter
33
44_UNIQUE_KEYS_MAX_BULK_SIZE = 5000
5+ import threading
6+ import logging
7+ from splitio .engine .filters .bloom_filter import BloomFilter
8+ from splitio .engine .sender_adapters .in_memory_sender_adapter import InMemorySenderAdapter
9+
10+ _LOGGER = logging .getLogger (__name__ )
511
612class UniqueKeysSynchronizer (object ):
713 """Unique Keys Synchronizer class."""
@@ -31,6 +37,17 @@ def SendAll(self):
3137 self ._impressions_sender_adapter .record_unique_keys (bulk )
3238
3339 def _split_cache_to_bulks (self , cache ):
40+ cache_size = self ._uniqe_keys_tracker ._get_dict_size ()
41+ if cache_size <= self ._max_bulk_size :
42+ self ._uniqe_keys_tracker ._impressions_sender_adapter .record_unique_keys (self ._uniqe_keys_tracker ._cache )
43+ else :
44+ for bulk in self ._split_cache_to_bulks ():
45+ self ._uniqe_keys_tracker ._impressions_sender_adapter .record_unique_keys (bulk )
46+
47+ with self ._lock :
48+ self ._uniqe_keys_tracker ._cache = {}
49+
50+ def _split_cache_to_bulks (self ):
3451 """
3552 Split the current unique keys dictionary into seperate dictionaries,
3653 each with the size of max_bulk_size. Overflow the last feature set() to new unique keys dictionary.
You can’t perform that action at this time.
0 commit comments