Skip to content

Commit 6e53541

Browse files
fix read api invocation
1 parent b9b8692 commit 6e53541

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

bigframes/core/bq_data.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def _iter_stream(
103103
stop_event: threading.Event,
104104
):
105105
reader = storage_read_client.read_rows(stream_name)
106-
for page in reader.rows():
106+
for page in reader.rows().pages:
107107
try:
108108
result_queue.put(page.to_arrow(), timeout=_WORKER_TIME_INCREMENT)
109109
except queue.Full:
@@ -113,7 +113,8 @@ def _iter_stream(
113113

114114

115115
def _iter_streams(
116-
streams, storage_read_client: bigquery_storage_v1.BigQueryReadClient
116+
streams: Sequence[bq_storage_types.ReadStream],
117+
storage_read_client: bigquery_storage_v1.BigQueryReadClient,
117118
) -> Iterator[pa.RecordBatch]:
118119
stop_event = threading.Event()
119120
result_queue: queue.Queue = queue.Queue(
@@ -125,7 +126,11 @@ def _iter_streams(
125126
for stream in streams:
126127
in_progress.append(
127128
pool.submit(
128-
_iter_stream, stream, storage_read_client, result_queue, stop_event
129+
_iter_stream,
130+
stream.name,
131+
storage_read_client,
132+
result_queue,
133+
stop_event,
129134
)
130135
)
131136

@@ -138,7 +143,7 @@ def _iter_streams(
138143
if future.done():
139144
try:
140145
future.result()
141-
finally:
146+
except Exception:
142147
stop_event.set()
143148
raise
144149
else:

0 commit comments

Comments
 (0)