Skip to content

Commit 235e5e5

Browse files
read parallel consumer fixes
1 parent 4acf103 commit 235e5e5

File tree

1 file changed

+31
-29
lines changed

1 file changed

+31
-29
lines changed

bigframes/core/bq_data.py

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,14 @@ def _iter_stream(
110110
):
111111
reader = storage_read_client.read_rows(stream_name)
112112
for page in reader.rows().pages:
113-
try:
114-
result_queue.put(page.to_arrow(), timeout=_WORKER_TIME_INCREMENT)
115-
except queue.Full:
116-
continue
117-
if stop_event.is_set():
118-
return
113+
while True: # Alternate between put attempt and checking stop event
114+
try:
115+
result_queue.put(page.to_arrow(), timeout=_WORKER_TIME_INCREMENT)
116+
break
117+
except queue.Full:
118+
if stop_event.is_set():
119+
return
120+
continue
119121

120122

121123
def _iter_streams(
@@ -129,32 +131,32 @@ def _iter_streams(
129131

130132
in_progress: list[concurrent.futures.Future] = []
131133
with concurrent.futures.ThreadPoolExecutor(max_workers=len(streams)) as pool:
132-
for stream in streams:
133-
in_progress.append(
134-
pool.submit(
135-
_iter_stream,
136-
stream.name,
137-
storage_read_client,
138-
result_queue,
139-
stop_event,
134+
try:
135+
for stream in streams:
136+
in_progress.append(
137+
pool.submit(
138+
_iter_stream,
139+
stream.name,
140+
storage_read_client,
141+
result_queue,
142+
stop_event,
143+
)
140144
)
141-
)
142145

143-
while in_progress:
144-
try:
145-
yield result_queue.get(timeout=0.1)
146-
except queue.Empty:
147-
new_in_progress = []
148-
for future in in_progress:
149-
if future.done():
150-
try:
146+
while in_progress:
147+
try:
148+
yield result_queue.get(timeout=0.1)
149+
except queue.Empty:
150+
new_in_progress = []
151+
for future in in_progress:
152+
if future.done():
153+
# Call to raise any exceptions
151154
future.result()
152-
except Exception:
153-
stop_event.set()
154-
raise
155-
else:
156-
new_in_progress.append(future)
157-
in_progress = new_in_progress
155+
else:
156+
new_in_progress.append(future)
157+
in_progress = new_in_progress
158+
finally:
159+
stop_event.set()
158160

159161

160162
@dataclasses.dataclass

0 commit comments

Comments
 (0)