Skip to content

Commit d7b436d

Browse files
authored
Merge pull request #401 from splitio/async-memory-split-storage
created inmemory split storage async
2 parents 59840c9 + a943a86 commit d7b436d

File tree

2 files changed

+473
-12
lines changed

2 files changed

+473
-12
lines changed

splitio/storage/inmemmory.py

Lines changed: 277 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,150 @@
77
from splitio.models.segments import Segment
88
from splitio.models.telemetry import HTTPErrors, HTTPLatencies, MethodExceptions, MethodLatencies, LastSynchronization, StreamingEvents, TelemetryConfig, TelemetryCounters, CounterConstants
99
from splitio.storage import SplitStorage, SegmentStorage, ImpressionStorage, EventStorage, TelemetryStorage
10+
from splitio.optional.loaders import asyncio
1011

1112
MAX_SIZE_BYTES = 5 * 1024 * 1024
1213
MAX_TAGS = 10
1314

1415
_LOGGER = logging.getLogger(__name__)
1516

1617

17-
class InMemorySplitStorage(SplitStorage):
18+
class InMemorySplitStorageBase(SplitStorage):
19+
"""InMemory implementation of a split storage base."""
20+
21+
def get(self, split_name):
22+
"""
23+
Retrieve a split.
24+
25+
:param split_name: Name of the feature to fetch.
26+
:type split_name: str
27+
28+
:rtype: splitio.models.splits.Split
29+
"""
30+
pass
31+
32+
def fetch_many(self, split_names):
33+
"""
34+
Retrieve splits.
35+
36+
:param split_names: Names of the features to fetch.
37+
:type split_name: list(str)
38+
39+
:return: A dict with split objects parsed from queue.
40+
:rtype: dict(split_name, splitio.models.splits.Split)
41+
"""
42+
pass
43+
44+
def put(self, split):
45+
"""
46+
Store a split.
47+
48+
:param split: Split object.
49+
:type split: splitio.models.split.Split
50+
"""
51+
pass
52+
53+
def remove(self, split_name):
54+
"""
55+
Remove a split from storage.
56+
57+
:param split_name: Name of the feature to remove.
58+
:type split_name: str
59+
60+
:return: True if the split was found and removed. False otherwise.
61+
:rtype: bool
62+
"""
63+
pass
64+
65+
def get_change_number(self):
66+
"""
67+
Retrieve latest split change number.
68+
69+
:rtype: int
70+
"""
71+
pass
72+
73+
def set_change_number(self, new_change_number):
74+
"""
75+
Set the latest change number.
76+
77+
:param new_change_number: New change number.
78+
:type new_change_number: int
79+
"""
80+
pass
81+
82+
def get_split_names(self):
83+
"""
84+
Retrieve a list of all split names.
85+
86+
:return: List of split names.
87+
:rtype: list(str)
88+
"""
89+
pass
90+
91+
def get_all_splits(self):
92+
"""
93+
Return all the splits.
94+
95+
:return: List of all the splits.
96+
:rtype: list
97+
"""
98+
pass
99+
100+
def get_splits_count(self):
101+
"""
102+
Return splits count.
103+
104+
:rtype: int
105+
"""
106+
pass
107+
108+
def is_valid_traffic_type(self, traffic_type_name):
109+
"""
110+
Return whether the traffic type exists in at least one split in cache.
111+
112+
:param traffic_type_name: Traffic type to validate.
113+
:type traffic_type_name: str
114+
115+
:return: True if the traffic type is valid. False otherwise.
116+
:rtype: bool
117+
"""
118+
pass
119+
120+
def kill_locally(self, split_name, default_treatment, change_number):
121+
"""
122+
Local kill for split
123+
124+
:param split_name: name of the split to perform kill
125+
:type split_name: str
126+
:param default_treatment: name of the default treatment to return
127+
:type default_treatment: str
128+
:param change_number: change_number
129+
:type change_number: int
130+
"""
131+
pass
132+
133+
def _increase_traffic_type_count(self, traffic_type_name):
134+
"""
135+
Increase by one the count for a specific traffic type name.
136+
137+
:param traffic_type_name: Traffic type to increase the count.
138+
:type traffic_type_name: str
139+
"""
140+
self._traffic_types.update([traffic_type_name])
141+
142+
def _decrease_traffic_type_count(self, traffic_type_name):
143+
"""
144+
Decrease by one the count for a specific traffic type name.
145+
146+
:param traffic_type_name: Traffic type to decrease the count.
147+
:type traffic_type_name: str
148+
"""
149+
self._traffic_types.subtract([traffic_type_name])
150+
self._traffic_types += Counter()
151+
152+
153+
class InMemorySplitStorage(InMemorySplitStorageBase):
18154
"""InMemory implementation of a split storage."""
19155

20156
def __init__(self):
@@ -162,24 +298,154 @@ def kill_locally(self, split_name, default_treatment, change_number):
162298
split.local_kill(default_treatment, change_number)
163299
self.put(split)
164300

165-
def _increase_traffic_type_count(self, traffic_type_name):
301+
302+
class InMemorySplitStorageAsync(InMemorySplitStorageBase):
303+
"""InMemory implementation of a split async storage."""
304+
305+
def __init__(self):
306+
"""Constructor."""
307+
self._lock = asyncio.Lock()
308+
self._splits = {}
309+
self._change_number = -1
310+
self._traffic_types = Counter()
311+
312+
async def get(self, split_name):
166313
"""
167-
Increase by one the count for a specific traffic type name.
314+
Retrieve a split.
168315
169-
:param traffic_type_name: Traffic type to increase the count.
170-
:type traffic_type_name: str
316+
:param split_name: Name of the feature to fetch.
317+
:type split_name: str
318+
319+
:rtype: splitio.models.splits.Split
171320
"""
172-
self._traffic_types.update([traffic_type_name])
321+
async with self._lock:
322+
return self._splits.get(split_name)
173323

174-
def _decrease_traffic_type_count(self, traffic_type_name):
324+
async def fetch_many(self, split_names):
175325
"""
176-
Decrease by one the count for a specific traffic type name.
326+
Retrieve splits.
177327
178-
:param traffic_type_name: Traffic type to decrease the count.
328+
:param split_names: Names of the features to fetch.
329+
:type split_name: list(str)
330+
331+
:return: A dict with split objects parsed from queue.
332+
:rtype: dict(split_name, splitio.models.splits.Split)
333+
"""
334+
return {split_name: await self.get(split_name) for split_name in split_names}
335+
336+
async def put(self, split):
337+
"""
338+
Store a split.
339+
340+
:param split: Split object.
341+
:type split: splitio.models.split.Split
342+
"""
343+
async with self._lock:
344+
if split.name in self._splits:
345+
self._decrease_traffic_type_count(self._splits[split.name].traffic_type_name)
346+
self._splits[split.name] = split
347+
self._increase_traffic_type_count(split.traffic_type_name)
348+
349+
async def remove(self, split_name):
350+
"""
351+
Remove a split from storage.
352+
353+
:param split_name: Name of the feature to remove.
354+
:type split_name: str
355+
356+
:return: True if the split was found and removed. False otherwise.
357+
:rtype: bool
358+
"""
359+
async with self._lock:
360+
split = self._splits.get(split_name)
361+
if not split:
362+
_LOGGER.warning("Tried to delete nonexistant split %s. Skipping", split_name)
363+
return False
364+
365+
self._splits.pop(split_name)
366+
self._decrease_traffic_type_count(split.traffic_type_name)
367+
return True
368+
369+
async def get_change_number(self):
370+
"""
371+
Retrieve latest split change number.
372+
373+
:rtype: int
374+
"""
375+
async with self._lock:
376+
return self._change_number
377+
378+
async def set_change_number(self, new_change_number):
379+
"""
380+
Set the latest change number.
381+
382+
:param new_change_number: New change number.
383+
:type new_change_number: int
384+
"""
385+
async with self._lock:
386+
self._change_number = new_change_number
387+
388+
async def get_split_names(self):
389+
"""
390+
Retrieve a list of all split names.
391+
392+
:return: List of split names.
393+
:rtype: list(str)
394+
"""
395+
async with self._lock:
396+
return list(self._splits.keys())
397+
398+
async def get_all_splits(self):
399+
"""
400+
Return all the splits.
401+
402+
:return: List of all the splits.
403+
:rtype: list
404+
"""
405+
async with self._lock:
406+
return list(self._splits.values())
407+
408+
async def get_splits_count(self):
409+
"""
410+
Return splits count.
411+
412+
:rtype: int
413+
"""
414+
async with self._lock:
415+
return len(self._splits)
416+
417+
async def is_valid_traffic_type(self, traffic_type_name):
418+
"""
419+
Return whether the traffic type exists in at least one split in cache.
420+
421+
:param traffic_type_name: Traffic type to validate.
179422
:type traffic_type_name: str
423+
424+
:return: True if the traffic type is valid. False otherwise.
425+
:rtype: bool
180426
"""
181-
self._traffic_types.subtract([traffic_type_name])
182-
self._traffic_types += Counter()
427+
async with self._lock:
428+
return traffic_type_name in self._traffic_types
429+
430+
async def kill_locally(self, split_name, default_treatment, change_number):
431+
"""
432+
Local kill for split
433+
434+
:param split_name: name of the split to perform kill
435+
:type split_name: str
436+
:param default_treatment: name of the default treatment to return
437+
:type default_treatment: str
438+
:param change_number: change_number
439+
:type change_number: int
440+
"""
441+
if await self.get_change_number() > change_number:
442+
return
443+
async with self._lock:
444+
split = self._splits.get(split_name)
445+
if not split:
446+
return
447+
split.local_kill(default_treatment, change_number)
448+
await self.put(split)
183449

184450

185451
class InMemorySegmentStorage(SegmentStorage):

0 commit comments

Comments
 (0)