Skip to content

Commit 24d45d0

Browse files
authored
fix: updates timeout/retry code to respect hanging server (#2408)
**Description** This PR fixes a crash when handling `_InactiveRpcError` during retry logic and ensures proper `timeout` propagation in `RowIterator.to_dataframe`. **Fixes** **Retry Logic Crash**: Addressed an issue in `google/cloud/bigquery/retry.py` where `_should_retry` would raise a `TypeError` when inspecting unstructured `gRPC` errors (like `_InactiveRpcError`). The fix adds robust error inspection to fallback gracefully when `exc.errors` is not subscriptable. **Timeout Propagation**: Added the missing `timeout` parameter to `RowIterator.to_dataframe` in `google/cloud/bigquery/table.py`. This ensures that the user-specified `timeout` is correctly passed down to the underlying `to_arrow` call, preventing the client from hanging indefinitely when the Storage API is unresponsive. **Changes** Modified `google/cloud/bigquery/retry.py`: Updated `_should_retry` to handle `TypeError` and `KeyError` when accessing `exc.errors`. Modified `google/cloud/bigquery/table.py`: Updated `RowIterator.to_dataframe` signature and implementation to accept and pass the `timeout` parameter. The first half of this work was completed in PR #2354
1 parent 7b8ceea commit 24d45d0

File tree

9 files changed

+214
-17
lines changed

9 files changed

+214
-17
lines changed

google/cloud/bigquery/_pandas_helpers.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
from google.cloud.bigquery import _pyarrow_helpers
3535
from google.cloud.bigquery import _versions_helpers
36+
from google.cloud.bigquery import retry as bq_retry
3637
from google.cloud.bigquery import schema
3738

3839

@@ -740,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types):
740741
return pyarrow.RecordBatch.from_arrays(arrays, names=column_names)
741742

742743

743-
def download_arrow_row_iterator(pages, bq_schema):
744+
def download_arrow_row_iterator(pages, bq_schema, timeout=None):
744745
"""Use HTTP JSON RowIterator to construct an iterable of RecordBatches.
745746
746747
Args:
@@ -751,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema):
751752
Mapping[str, Any] \
752753
]]):
753754
A decription of the fields in result pages.
755+
timeout (Optional[float]):
756+
The number of seconds to wait for the underlying download to complete.
757+
If ``None``, wait indefinitely.
758+
754759
Yields:
755760
:class:`pyarrow.RecordBatch`
756761
The next page of records as a ``pyarrow`` record batch.
@@ -759,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema):
759764
column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema]
760765
arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema]
761766

762-
for page in pages:
763-
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
767+
if timeout is None:
768+
for page in pages:
769+
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
770+
else:
771+
start_time = time.monotonic()
772+
for page in pages:
773+
if time.monotonic() - start_time > timeout:
774+
raise concurrent.futures.TimeoutError()
775+
776+
yield _row_iterator_page_to_arrow(page, column_names, arrow_types)
764777

765778

766779
def _row_iterator_page_to_dataframe(page, column_names, dtypes):
@@ -778,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes):
778791
return pandas.DataFrame(columns, columns=column_names)
779792

780793

781-
def download_dataframe_row_iterator(pages, bq_schema, dtypes):
794+
def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None):
782795
"""Use HTTP JSON RowIterator to construct a DataFrame.
783796
784797
Args:
@@ -792,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes):
792805
dtypes(Mapping[str, numpy.dtype]):
793806
The types of columns in result data to hint construction of the
794807
resulting DataFrame. Not all column types have to be specified.
808+
timeout (Optional[float]):
809+
The number of seconds to wait for the underlying download to complete.
810+
If ``None``, wait indefinitely.
811+
795812
Yields:
796813
:class:`pandas.DataFrame`
797814
The next page of records as a ``pandas.DataFrame`` record batch.
798815
"""
799816
bq_schema = schema._to_schema_fields(bq_schema)
800817
column_names = [field.name for field in bq_schema]
801-
for page in pages:
802-
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
818+
819+
if timeout is None:
820+
for page in pages:
821+
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
822+
else:
823+
start_time = time.monotonic()
824+
for page in pages:
825+
if time.monotonic() - start_time > timeout:
826+
raise concurrent.futures.TimeoutError()
827+
828+
yield _row_iterator_page_to_dataframe(page, column_names, dtypes)
803829

804830

805831
def _bqstorage_page_to_arrow(page):
@@ -928,6 +954,7 @@ def _download_table_bqstorage(
928954
if "@" in table.table_id:
929955
raise ValueError("Reading from a specific snapshot is not currently supported.")
930956

957+
start_time = time.monotonic()
931958
requested_streams = determine_requested_streams(preserve_order, max_stream_count)
932959

933960
requested_session = bigquery_storage.types.stream.ReadSession(
@@ -944,10 +971,16 @@ def _download_table_bqstorage(
944971
ArrowSerializationOptions.CompressionCodec(1)
945972
)
946973

974+
retry_policy = (
975+
bq_retry.DEFAULT_RETRY.with_deadline(timeout) if timeout is not None else None
976+
)
977+
947978
session = bqstorage_client.create_read_session(
948979
parent="projects/{}".format(project_id),
949980
read_session=requested_session,
950981
max_stream_count=requested_streams,
982+
retry=retry_policy,
983+
timeout=timeout,
951984
)
952985

953986
_LOGGER.debug(
@@ -983,8 +1016,6 @@ def _download_table_bqstorage(
9831016
# Manually manage the pool to control shutdown behavior on timeout.
9841017
pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams))
9851018
wait_on_shutdown = True
986-
start_time = time.time()
987-
9881019
try:
9891020
# Manually submit jobs and wait for download to complete rather
9901021
# than using pool.map because pool.map continues running in the
@@ -1006,7 +1037,7 @@ def _download_table_bqstorage(
10061037
while not_done:
10071038
# Check for timeout
10081039
if timeout is not None:
1009-
elapsed = time.time() - start_time
1040+
elapsed = time.monotonic() - start_time
10101041
if elapsed > timeout:
10111042
wait_on_shutdown = False
10121043
raise concurrent.futures.TimeoutError(

google/cloud/bigquery/dbapi/cursor.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ def _bqstorage_fetch(self, bqstorage_client):
323323
read_session=requested_session,
324324
# a single stream only, as DB API is not well-suited for multithreading
325325
max_stream_count=1,
326+
retry=None,
327+
timeout=None,
326328
)
327329

328330
if not read_session.streams:

google/cloud/bigquery/retry.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
16+
1517
from google.api_core import exceptions
1618
from google.api_core import retry
1719
import google.api_core.future.polling
1820
from google.auth import exceptions as auth_exceptions # type: ignore
1921
import requests.exceptions
2022

23+
_LOGGER = logging.getLogger(__name__)
2124

2225
_RETRYABLE_REASONS = frozenset(
2326
["rateLimitExceeded", "backendError", "internalError", "badGateway"]
@@ -61,14 +64,17 @@
6164
def _should_retry(exc):
6265
"""Predicate for determining when to retry.
6366
64-
We retry if and only if the 'reason' is 'backendError'
65-
or 'rateLimitExceeded'.
67+
We retry if and only if the 'reason' is in _RETRYABLE_REASONS or is
68+
in _UNSTRUCTURED_RETRYABLE_TYPES.
6669
"""
67-
if not hasattr(exc, "errors") or len(exc.errors) == 0:
68-
# Check for unstructured error returns, e.g. from GFE
70+
try:
71+
reason = exc.errors[0]["reason"]
72+
except (AttributeError, IndexError, TypeError, KeyError):
73+
# Fallback for when errors attribute is missing, empty, or not a dict
74+
# or doesn't contain "reason" (e.g. gRPC exceptions).
75+
_LOGGER.debug("Inspecting unstructured error for retry: %r", exc)
6976
return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES)
7077

71-
reason = exc.errors[0]["reason"]
7278
return reason in _RETRYABLE_REASONS
7379

7480

google/cloud/bigquery/table.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2152,7 +2152,10 @@ def to_arrow_iterable(
21522152
timeout=timeout,
21532153
)
21542154
tabledata_list_download = functools.partial(
2155-
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
2155+
_pandas_helpers.download_arrow_row_iterator,
2156+
iter(self.pages),
2157+
self.schema,
2158+
timeout=timeout,
21562159
)
21572160
return self._to_page_iterable(
21582161
bqstorage_download,
@@ -2366,6 +2369,7 @@ def to_dataframe_iterable(
23662369
iter(self.pages),
23672370
self.schema,
23682371
dtypes,
2372+
timeout=timeout,
23692373
)
23702374
return self._to_page_iterable(
23712375
bqstorage_download,

tests/unit/job/test_query_pandas.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg):
179179
parent="projects/test-project",
180180
read_session=expected_session,
181181
max_stream_count=1, # Use a single stream to preserve row order.
182+
retry=None,
183+
timeout=None,
182184
)
183185

184186

@@ -593,6 +595,8 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg):
593595
parent="projects/bqstorage-billing-project",
594596
read_session=expected_session,
595597
max_stream_count=0, # Use default number of streams for best performance.
598+
retry=None,
599+
timeout=None,
596600
)
597601
bqstorage_client.read_rows.assert_called_once_with(stream_id)
598602

@@ -644,6 +648,8 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression():
644648
parent="projects/bqstorage-billing-project",
645649
read_session=expected_session,
646650
max_stream_count=0,
651+
retry=None,
652+
timeout=None,
647653
)
648654

649655

tests/unit/test__pandas_helpers.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2252,3 +2252,134 @@ def fast_download_stream(
22522252
results = list(result_gen)
22532253

22542254
assert results == ["result_page"]
2255+
2256+
2257+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
2258+
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
2259+
@pytest.mark.parametrize(
2260+
"sleep_time, timeout, should_timeout",
2261+
[
2262+
(0.1, 0.05, True), # Timeout case
2263+
(0, 10.0, False), # Success case
2264+
],
2265+
)
2266+
def test_download_arrow_row_iterator_with_timeout(
2267+
module_under_test, sleep_time, timeout, should_timeout
2268+
):
2269+
bq_schema = [schema.SchemaField("name", "STRING")]
2270+
2271+
# Mock page with to_arrow method
2272+
mock_page = mock.Mock()
2273+
mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays(
2274+
[pyarrow.array(["foo"])],
2275+
names=["name"],
2276+
)
2277+
mock_page.__iter__ = lambda self: iter(["row1"])
2278+
mock_page._columns = [["foo"]]
2279+
2280+
def pages_gen():
2281+
# First page yields quickly
2282+
yield mock_page
2283+
if sleep_time > 0:
2284+
time.sleep(sleep_time)
2285+
yield mock_page
2286+
2287+
iterator = module_under_test.download_arrow_row_iterator(
2288+
pages_gen(), bq_schema, timeout=timeout
2289+
)
2290+
2291+
# First item should always succeed
2292+
next(iterator)
2293+
2294+
if should_timeout:
2295+
with pytest.raises(concurrent.futures.TimeoutError):
2296+
next(iterator)
2297+
else:
2298+
# Should succeed and complete
2299+
results = list(iterator)
2300+
assert len(results) == 1 # 1 remaining item
2301+
2302+
2303+
@pytest.mark.skipif(pandas is None, reason="Requires `pandas`")
2304+
@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`")
2305+
@pytest.mark.parametrize(
2306+
"sleep_time, timeout, should_timeout",
2307+
[
2308+
(0.1, 0.05, True), # Timeout case
2309+
(0, 10.0, False), # Success case
2310+
],
2311+
)
2312+
def test_download_dataframe_row_iterator_with_timeout(
2313+
module_under_test, sleep_time, timeout, should_timeout
2314+
):
2315+
bq_schema = [schema.SchemaField("name", "STRING")]
2316+
dtypes = {}
2317+
2318+
# Mock page
2319+
mock_page = mock.Mock()
2320+
# Mock iterator for _row_iterator_page_to_dataframe checking next(iter(page))
2321+
mock_page.__iter__ = lambda self: iter(["row1"])
2322+
mock_page._columns = [["foo"]]
2323+
2324+
def pages_gen():
2325+
yield mock_page
2326+
if sleep_time > 0:
2327+
time.sleep(sleep_time)
2328+
yield mock_page
2329+
2330+
iterator = module_under_test.download_dataframe_row_iterator(
2331+
pages_gen(), bq_schema, dtypes, timeout=timeout
2332+
)
2333+
2334+
next(iterator)
2335+
2336+
if should_timeout:
2337+
with pytest.raises(concurrent.futures.TimeoutError):
2338+
next(iterator)
2339+
else:
2340+
results = list(iterator)
2341+
assert len(results) == 1
2342+
2343+
2344+
@pytest.mark.skipif(
2345+
bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`"
2346+
)
2347+
def test_download_arrow_bqstorage_passes_timeout_to_create_read_session(
2348+
module_under_test,
2349+
):
2350+
# Mock dependencies
2351+
project_id = "test-project"
2352+
table = mock.Mock()
2353+
table.table_id = "test_table"
2354+
table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test"
2355+
2356+
bqstorage_client = mock.create_autospec(
2357+
bigquery_storage.BigQueryReadClient, instance=True
2358+
)
2359+
# Mock create_read_session to return a session with no streams so the function returns early
2360+
# (Checking start of loop logic vs empty streams return)
2361+
session = mock.Mock()
2362+
# If streams is empty, _download_table_bqstorage returns early, which is fine for this test
2363+
session.streams = []
2364+
bqstorage_client.create_read_session.return_value = session
2365+
2366+
# Call the function
2367+
timeout = 123.456
2368+
# download_arrow_bqstorage yields frames, so we need to iterate to trigger execution
2369+
list(
2370+
module_under_test.download_arrow_bqstorage(
2371+
project_id, table, bqstorage_client, timeout=timeout
2372+
)
2373+
)
2374+
2375+
# Verify timeout and retry were passed
2376+
bqstorage_client.create_read_session.assert_called_once()
2377+
_, kwargs = bqstorage_client.create_read_session.call_args
2378+
assert "timeout" in kwargs
2379+
assert kwargs["timeout"] == timeout
2380+
2381+
assert "retry" in kwargs
2382+
retry_policy = kwargs["retry"]
2383+
assert retry_policy is not None
2384+
# Check if deadline is set correctly in the retry policy
2385+
assert retry_policy._deadline == timeout

tests/unit/test_client_retry.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@
2323

2424
PROJECT = "test-project"
2525

26+
# A deadline > 1.0s is required because the default retry (google.api_core.retry.Retry)
27+
# has an initial delay of 1.0s. If the deadline is <= 1.0s, the first retry attempt
28+
# (scheduled for now + 1.0s) will be rejected immediately as exceeding the deadline.
29+
_RETRY_DEADLINE = 10.0
30+
2631

2732
def _make_credentials():
2833
import google.auth.credentials
@@ -83,7 +88,7 @@ def test_call_api_applying_custom_retry_on_timeout(global_time_lock):
8388
"api_request",
8489
side_effect=[TimeoutError, "result"],
8590
)
86-
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
91+
retry = DEFAULT_RETRY.with_deadline(_RETRY_DEADLINE).with_predicate(
8792
lambda exc: isinstance(exc, TimeoutError)
8893
)
8994

tests/unit/test_dbapi_cursor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -480,7 +480,11 @@ def fake_ensure_bqstorage_client(bqstorage_client=None, **kwargs):
480480
data_format=bigquery_storage.DataFormat.ARROW,
481481
)
482482
mock_bqstorage_client.create_read_session.assert_called_once_with(
483-
parent="projects/P", read_session=expected_session, max_stream_count=1
483+
parent="projects/P",
484+
read_session=expected_session,
485+
max_stream_count=1,
486+
retry=None,
487+
timeout=None,
484488
)
485489

486490
# Check the data returned.

0 commit comments

Comments
 (0)