Skip to content

Commit ccb4626

Browse files
author
Matias Melograno
committed
merged with master
2 parents 4da3a6d + 25aa964 commit ccb4626

File tree

4 files changed

+51
-25
lines changed

4 files changed

+51
-25
lines changed

CHANGES.txt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1-
8.1.3 (Oct XX, 2019)
1+
8.1.4 (Oct XX, 2019)
22
- Added logic to fetch multiple splits at once on get_treatments/get_treatments_with_config.
33
- Added flag `ipAddressesEnabled` into config to enable/disable sending machineName and machineIp when data is posted in headers.
44

5+
8.1.3 (Oct 4, 2019)
6+
- Fixed race condition related to segment fetching and SDK_READY event
7+
58
8.1.2 (Jul 19, 2019)
69
- Validated TLS support for redis connections
710
- Fixed traffic type count issue

splitio/tasks/util/workerpool.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,18 @@ def _wrapper(self, worker_number, func):
6363
while self._should_be_working[worker_number]:
6464
try:
6565
message = self._incoming.get(True, 0.5)
66-
self._incoming.task_done()
6766

6867
# For some reason message can be None in python2 implementation of queue.
6968
# This method must be both ignored and acknowledged with .task_done()
7069
# otherwise .join() will halt.
7170
if message is None:
71+
self._incoming.task_done()
7272
continue
7373

74+
# If the task is successfully executed, the ack is done AFTERWARDS,
75+
# to avoid race conditions on SDK initialization.
7476
ok = self._safe_run(func, message) #pylint: disable=invalid-name
77+
self._incoming.task_done()
7578
if not ok:
7679
self._logger.error(
7780
("Something went wrong during the execution, "

splitio/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '8.1.3-rc1'
1+
__version__ = '8.1.4-rc1'
Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Workerpool test module."""
2+
# pylint: disable=no-self-use,too-few-public-methods,missing-docstring
23
import time
34
import threading
45
from splitio.tasks.util import workerpool
@@ -10,44 +11,63 @@ class WorkerPoolTests(object):
1011
def test_normal_operation(self, mocker):
1112
"""Test normal opeation works properly."""
1213
worker_func = mocker.Mock()
13-
wp = workerpool.WorkerPool(10, worker_func)
14-
wp.start()
15-
for x in range(0, 100):
16-
wp.submit_work(str(x))
14+
wpool = workerpool.WorkerPool(10, worker_func)
15+
wpool.start()
16+
for num in range(0, 100):
17+
wpool.submit_work(str(num))
1718

1819
stop_event = threading.Event()
19-
wp.stop(stop_event)
20+
wpool.stop(stop_event)
2021
stop_event.wait(5)
2122
assert stop_event.is_set()
2223

2324
calls = worker_func.mock_calls
24-
for x in range(0, 100):
25-
assert mocker.call(str(x)) in calls
25+
for num in range(0, 100):
26+
assert mocker.call(str(num)) in calls
2627

27-
def test_failure_in_message_doesnt_breal(self, mocker):
28+
def test_fail_in_msg_doesnt_break(self):
2829
"""Test that if a message cannot be parsed it is ignored and others are processed."""
29-
class Worker:
30+
class Worker(object): #pylint: disable=
3031
def __init__(self):
31-
self._worked = set()
32+
self.worked = set()
3233

33-
def do_work(self, w):
34-
if w == '55':
34+
def do_work(self, work):
35+
if work == '55':
3536
raise Exception('something')
36-
self._worked.add(w)
37+
self.worked.add(work)
3738

3839
worker = Worker()
39-
wp = workerpool.WorkerPool(50, worker.do_work)
40-
wp.start()
41-
for x in range(0, 100):
42-
wp.submit_work(str(x))
40+
wpool = workerpool.WorkerPool(50, worker.do_work)
41+
wpool.start()
42+
for num in range(0, 100):
43+
wpool.submit_work(str(num))
4344

4445
stop_event = threading.Event()
45-
wp.stop(stop_event)
46+
wpool.stop(stop_event)
4647
stop_event.wait(5)
4748
assert stop_event.is_set()
4849

49-
for x in range(0, 100):
50-
if x != 55:
51-
assert str(x) in worker._worked
50+
for num in range(0, 100):
51+
if num != 55:
52+
assert str(num) in worker.worked
5253
else:
53-
assert str(x) not in worker._worked
54+
assert str(num) not in worker.worked
55+
56+
def test_msg_acked_after_processed(self):
57+
"""Test that events are only set after all the work in the pipeline is done."""
58+
class Worker(object):
59+
def __init__(self):
60+
self.worked = set()
61+
62+
def do_work(self, work):
63+
self.worked.add(work)
64+
time.sleep(0.02) # will wait 2 seconds in total for 100 elements
65+
66+
worker = Worker()
67+
wpool = workerpool.WorkerPool(50, worker.do_work)
68+
wpool.start()
69+
for num in range(0, 100):
70+
wpool.submit_work(str(num))
71+
72+
wpool.wait_for_completion()
73+
assert len(worker.worked) == 100

0 commit comments

Comments
 (0)