Skip to content

Commit efe9c1c

Browse files
committed
resolving comments
1 parent b82987f commit efe9c1c

File tree

4 files changed

+110
-88
lines changed

4 files changed

+110
-88
lines changed

google/cloud/storage/_experimental/asyncio/async_multi_range_downloader.py

Lines changed: 15 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414

1515
from __future__ import annotations
1616
import asyncio
17+
import logging
1718
from google.api_core import exceptions
1819
from google.api_core.retry_async import AsyncRetry
19-
from google.cloud._storage_v2.types import BidiReadObjectRedirectedError
20+
from google.cloud.storage._experimental.asyncio.retry._helpers import _handle_redirect
2021
from google.rpc import status_pb2
2122

2223
from typing import List, Optional, Tuple, Any, Dict
@@ -46,6 +47,8 @@
4647
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
4748
)
4849

50+
logger = logging.getLogger(__name__)
51+
4952

5053
def _is_read_retryable(exc):
5154
"""Predicate to determine if a read operation should be retried."""
@@ -81,7 +84,7 @@ def _is_read_retryable(exc):
8184
if detail.type_url == _BIDI_READ_REDIRECTED_TYPE_URL:
8285
return True
8386
except Exception as e:
84-
print(f"--- Error parsing status_details_bin: {e}")
87+
logger.error(f"Error parsing status_details_bin: {e}")
8588
return False
8689
return False
8790

@@ -208,46 +211,11 @@ def __init__(
208211

209212
def _on_open_error(self, exc):
210213
"""Extracts routing token and read handle on redirect error during open."""
211-
grpc_error = None
212-
if isinstance(exc, exceptions.Aborted) and exc.errors:
213-
grpc_error = exc.errors[0]
214-
215-
if grpc_error:
216-
if isinstance(grpc_error, BidiReadObjectRedirectedError):
217-
self._routing_token = grpc_error.routing_token
218-
if grpc_error.read_handle:
219-
self.read_handle = grpc_error.read_handle
220-
return
221-
222-
if hasattr(grpc_error, "trailing_metadata"):
223-
trailers = grpc_error.trailing_metadata()
224-
if not trailers:
225-
return
226-
227-
status_details_bin = None
228-
for key, value in trailers:
229-
if key == "grpc-status-details-bin":
230-
status_details_bin = value
231-
break
232-
233-
if status_details_bin:
234-
status_proto = status_pb2.Status()
235-
try:
236-
status_proto.ParseFromString(status_details_bin)
237-
for detail in status_proto.details:
238-
if detail.type_url == _BIDI_READ_REDIRECTED_TYPE_URL:
239-
redirect_proto = (
240-
BidiReadObjectRedirectedError.deserialize(
241-
detail.value
242-
)
243-
)
244-
if redirect_proto.routing_token:
245-
self._routing_token = redirect_proto.routing_token
246-
if redirect_proto.read_handle:
247-
self.read_handle = redirect_proto.read_handle
248-
break
249-
except Exception as e:
250-
print(f"--- Error unpacking redirect in _on_open_error: {e}")
214+
routing_token, read_handle = _handle_redirect(exc)
215+
if routing_token:
216+
self._routing_token = routing_token
217+
if read_handle:
218+
self.read_handle = read_handle
251219

252220
async def open(
253221
self,
@@ -412,6 +380,9 @@ async def generator():
412380
nonlocal attempt_count
413381
attempt_count += 1
414382

383+
if attempt_count > 1:
384+
logger.info(f"Resuming download (attempt {attempt_count-1}) for {len(requests)} ranges.")
385+
415386
async with lock:
416387
current_handle = state.get("read_handle")
417388
current_token = state.get("routing_token")
@@ -426,6 +397,8 @@ async def generator():
426397
)
427398

428399
if should_reopen:
400+
if current_token:
401+
logger.info(f"Re-opening stream with routing token: {current_token}")
429402
# Close existing stream if any
430403
if self.read_obj_str and self.read_obj_str._is_stream_open:
431404
await self.read_obj_str.close()
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import logging
18+
from typing import Tuple, Optional
19+
20+
from google.api_core import exceptions
21+
from google.cloud._storage_v2.types import BidiReadObjectRedirectedError
22+
from google.rpc import status_pb2
23+
24+
_BIDI_READ_REDIRECTED_TYPE_URL = (
25+
"type.googleapis.com/google.storage.v2.BidiReadObjectRedirectedError"
26+
)
27+
28+
29+
def _handle_redirect(
30+
exc: Exception,
31+
) -> Tuple[Optional[str], Optional[bytes]]:
32+
"""
33+
Extracts routing token and read handle from a gRPC error.
34+
35+
:type exc: Exception
36+
:param exc: The exception to parse.
37+
38+
:rtype: Tuple[Optional[str], Optional[bytes]]
39+
:returns: A tuple of (routing_token, read_handle).
40+
"""
41+
routing_token = None
42+
read_handle = None
43+
44+
grpc_error = None
45+
if isinstance(exc, exceptions.Aborted) and exc.errors:
46+
grpc_error = exc.errors[0]
47+
48+
if grpc_error:
49+
if isinstance(grpc_error, BidiReadObjectRedirectedError):
50+
routing_token = grpc_error.routing_token
51+
if grpc_error.read_handle:
52+
read_handle = grpc_error.read_handle
53+
return routing_token, read_handle
54+
55+
if hasattr(grpc_error, "trailing_metadata"):
56+
trailers = grpc_error.trailing_metadata()
57+
if not trailers:
58+
return None, None
59+
60+
status_details_bin = None
61+
for key, value in trailers:
62+
if key == "grpc-status-details-bin":
63+
status_details_bin = value
64+
break
65+
66+
if status_details_bin:
67+
status_proto = status_pb2.Status()
68+
try:
69+
status_proto.ParseFromString(status_details_bin)
70+
for detail in status_proto.details:
71+
if detail.type_url == _BIDI_READ_REDIRECTED_TYPE_URL:
72+
redirect_proto = BidiReadObjectRedirectedError.deserialize(
73+
detail.value
74+
)
75+
if redirect_proto.routing_token:
76+
routing_token = redirect_proto.routing_token
77+
if redirect_proto.read_handle:
78+
read_handle = redirect_proto.read_handle
79+
break
80+
except Exception as e:
81+
logging.ERROR(f"Error unpacking redirect: {e}")
82+
83+
return routing_token, read_handle

google/cloud/storage/_experimental/asyncio/retry/bidi_stream_retry_manager.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import logging
1516
from typing import Any, AsyncIterator, Callable
1617

1718
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
1819
_BaseResumptionStrategy,
1920
)
2021

22+
logger = logging.getLogger(__name__)
2123

2224
class _BidiStreamRetryManager:
2325
"""Manages the generic retry loop for a bidi streaming operation."""
@@ -55,6 +57,7 @@ async def attempt():
5557
return
5658
except Exception as e:
5759
if retry_policy._predicate(e):
60+
logger.info(f"Bidi stream operation failed: {e}. Attempting state recovery and retry.")
5861
await self._strategy.recover_state_on_failure(e, state)
5962
raise e
6063

google/cloud/storage/_experimental/asyncio/retry/reads_resumption_strategy.py

Lines changed: 9 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
# limitations under the License.
1414

1515
from typing import Any, Dict, List, IO
16+
import logging
1617

17-
from google.api_core import exceptions
18-
from google.rpc import status_pb2
1918
from google_crc32c import Checksum
2019
from google.cloud import _storage_v2 as storage_v2
2120
from google.cloud.storage.exceptions import DataCorruption
21+
from google.cloud.storage._experimental.asyncio.retry._helpers import (
22+
_handle_redirect,
23+
)
2224
from google.cloud.storage._experimental.asyncio.retry.base_strategy import (
2325
_BaseResumptionStrategy,
2426
)
@@ -142,47 +144,8 @@ def update_state_from_response(
142144

143145
async def recover_state_on_failure(self, error: Exception, state: Any) -> None:
144146
"""Handles BidiReadObjectRedirectedError for reads."""
145-
# This would parse the gRPC error details, extract the routing_token,
146-
# and store it on the shared state object.
147-
grpc_error = None
148-
if isinstance(error, exceptions.Aborted) and error.errors:
149-
grpc_error = error.errors[0]
150-
151-
if grpc_error:
152-
if isinstance(grpc_error, BidiReadObjectRedirectedError):
153-
if grpc_error.routing_token:
154-
state["routing_token"] = grpc_error.routing_token
155-
if grpc_error.read_handle:
156-
state["read_handle"] = grpc_error.read_handle
157-
return
158-
159-
if hasattr(grpc_error, "trailing_metadata"):
160-
trailers = grpc_error.trailing_metadata()
161-
if not trailers:
162-
return
163-
status_details_bin = None
164-
for key, value in trailers:
165-
if key == "grpc-status-details-bin":
166-
status_details_bin = value
167-
break
168-
169-
if status_details_bin:
170-
status_proto = status_pb2.Status()
171-
try:
172-
status_proto.ParseFromString(status_details_bin)
173-
for detail in status_proto.details:
174-
if detail.type_url == _BIDI_READ_REDIRECTED_TYPE_URL:
175-
redirect_proto = (
176-
BidiReadObjectRedirectedError.deserialize(
177-
detail.value
178-
)
179-
)
180-
if redirect_proto.routing_token:
181-
state[
182-
"routing_token"
183-
] = redirect_proto.routing_token
184-
if redirect_proto.read_handle:
185-
state["read_handle"] = redirect_proto.read_handle
186-
break
187-
except Exception as e:
188-
print(f"--- Error unpacking redirect in _on_open_error: {e}")
147+
routing_token, read_handle = _handle_redirect(error)
148+
if routing_token:
149+
state["routing_token"] = routing_token
150+
if read_handle:
151+
state["read_handle"] = read_handle

0 commit comments

Comments
 (0)