Skip to content

Commit 25aa964

Browse files
authored
Merge pull request #153 from splitio/bugfix/race_condition_on_init
fix race condition
2 parents 16d5e7a + d583db1 commit 25aa964

File tree

4 files changed

+50
-24
lines changed

4 files changed

+50
-24
lines changed

CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
8.1.3 (Oct 4, 2019)
2+
- Fixed race condition related to segment fetching and SDK_READY event
3+
14
8.1.2 (Jul 19, 2019)
25
- Validated TLS support for redis connections
36
- Fixed traffic type count issue

splitio/tasks/util/workerpool.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,18 @@ def _wrapper(self, worker_number, func):
6363
while self._should_be_working[worker_number]:
6464
try:
6565
message = self._incoming.get(True, 0.5)
66-
self._incoming.task_done()
6766

6867
# For some reason message can be None in python2 implementation of queue.
6968
# This method must be both ignored and acknowledged with .task_done()
7069
# otherwise .join() will halt.
7170
if message is None:
71+
self._incoming.task_done()
7272
continue
7373

74+
# If the task is successfully executed, the ack is done AFTERWARDS,
75+
# to avoid race conditions on SDK initialization.
7476
ok = self._safe_run(func, message) #pylint: disable=invalid-name
77+
self._incoming.task_done()
7578
if not ok:
7679
self._logger.error(
7780
("Something went wrong during the execution, "

splitio/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '8.1.2'
1+
__version__ = '8.1.3'
Lines changed: 42 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
"""Workerpool test module."""
2+
# pylint: disable=no-self-use,too-few-public-methods,missing-docstring
23
import time
34
import threading
45
from splitio.tasks.util import workerpool
@@ -10,44 +11,63 @@ class WorkerPoolTests(object):
1011
def test_normal_operation(self, mocker):
1112
"""Test normal opeation works properly."""
1213
worker_func = mocker.Mock()
13-
wp = workerpool.WorkerPool(10, worker_func)
14-
wp.start()
15-
for x in range(0, 100):
16-
wp.submit_work(str(x))
14+
wpool = workerpool.WorkerPool(10, worker_func)
15+
wpool.start()
16+
for num in range(0, 100):
17+
wpool.submit_work(str(num))
1718

1819
stop_event = threading.Event()
19-
wp.stop(stop_event)
20+
wpool.stop(stop_event)
2021
stop_event.wait(5)
2122
assert stop_event.is_set()
2223

2324
calls = worker_func.mock_calls
24-
for x in range(0, 100):
25-
assert mocker.call(str(x)) in calls
25+
for num in range(0, 100):
26+
assert mocker.call(str(num)) in calls
2627

27-
def test_failure_in_message_doesnt_breal(self, mocker):
28+
def test_fail_in_msg_doesnt_break(self):
2829
"""Test that if a message cannot be parsed it is ignored and others are processed."""
29-
class Worker:
30+
class Worker(object): #pylint: disable=
3031
def __init__(self):
31-
self._worked = set()
32+
self.worked = set()
3233

33-
def do_work(self, w):
34-
if w == '55':
34+
def do_work(self, work):
35+
if work == '55':
3536
raise Exception('something')
36-
self._worked.add(w)
37+
self.worked.add(work)
3738

3839
worker = Worker()
39-
wp = workerpool.WorkerPool(50, worker.do_work)
40-
wp.start()
41-
for x in range(0, 100):
42-
wp.submit_work(str(x))
40+
wpool = workerpool.WorkerPool(50, worker.do_work)
41+
wpool.start()
42+
for num in range(0, 100):
43+
wpool.submit_work(str(num))
4344

4445
stop_event = threading.Event()
45-
wp.stop(stop_event)
46+
wpool.stop(stop_event)
4647
stop_event.wait(5)
4748
assert stop_event.is_set()
4849

49-
for x in range(0, 100):
50-
if x != 55:
51-
assert str(x) in worker._worked
50+
for num in range(0, 100):
51+
if num != 55:
52+
assert str(num) in worker.worked
5253
else:
53-
assert str(x) not in worker._worked
54+
assert str(num) not in worker.worked
55+
56+
def test_msg_acked_after_processed(self):
57+
"""Test that events are only set after all the work in the pipeline is done."""
58+
class Worker(object):
59+
def __init__(self):
60+
self.worked = set()
61+
62+
def do_work(self, work):
63+
self.worked.add(work)
64+
time.sleep(0.02) # will wait 2 seconds in total for 100 elements
65+
66+
worker = Worker()
67+
wpool = workerpool.WorkerPool(50, worker.do_work)
68+
wpool.start()
69+
for num in range(0, 100):
70+
wpool.submit_work(str(num))
71+
72+
wpool.wait_for_completion()
73+
assert len(worker.worked) == 100

0 commit comments

Comments
 (0)