Skip to content

Commit dd009df

Browse files
docstrings, logging, pydoc
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 9fd57d8 commit dd009df

File tree

1 file changed

+70
-1
lines changed

1 file changed

+70
-1
lines changed

src/databricks/sql/backend/sea/queue.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,34 @@ def close(self):
123123

124124

125125
class LinkFetcher:
126+
"""
127+
Background helper that incrementally retrieves *external links* for a
128+
result set produced by the SEA backend and feeds them to a
129+
:class:`databricks.sql.cloudfetch.download_manager.ResultFileDownloadManager`.
130+
131+
The SEA backend splits large result sets into *chunks*. Each chunk is
132+
stored remotely (e.g., in object storage) and exposed via a signed URL
133+
encapsulated by an :class:`ExternalLink`. Only the first batch of links is
134+
returned with the initial query response. The remaining links must be
135+
pulled on demand using the *next-chunk* token embedded in each
136+
:pyattr:`ExternalLink.next_chunk_index`.
137+
138+
LinkFetcher takes care of this choreography so callers (primarily
139+
``SeaCloudFetchQueue``) can simply ask for the link of a specific
140+
``chunk_index`` and block until it becomes available.
141+
142+
Key responsibilities:
143+
144+
• Maintain an in-memory mapping from ``chunk_index`` → ``ExternalLink``.
145+
• Launch a background worker thread that continuously requests the next
146+
batch of links from the backend until all chunks have been discovered or
147+
an unrecoverable error occurs.
148+
• Bridge SEA link objects to the Thrift representation expected by the
149+
existing download manager.
150+
• Provide a synchronous API (`get_chunk_link`) that blocks until the desired
151+
link is present in the cache.
152+
"""
153+
126154
def __init__(
127155
self,
128156
download_manager: ResultFileDownloadManager,
@@ -144,12 +172,28 @@ def __init__(
144172
self._add_links(initial_links)
145173
self.total_chunk_count = total_chunk_count
146174

175+
# DEBUG: capture initial state for observability
176+
logger.debug(
177+
"LinkFetcher[%s]: initialized with %d initial link(s); expecting %d total chunk(s)",
178+
statement_id,
179+
len(initial_links),
180+
total_chunk_count,
181+
)
182+
147183
def _add_links(self, links: List[ExternalLink]):
184+
"""Cache *links* locally and enqueue them with the download manager."""
185+
logger.debug(
186+
"LinkFetcher[%s]: caching %d link(s) – chunks %s",
187+
self._statement_id,
188+
len(links),
189+
", ".join(str(l.chunk_index) for l in links) if links else "<none>",
190+
)
148191
for link in links:
149192
self.chunk_index_to_link[link.chunk_index] = link
150193
self.download_manager.add_link(LinkFetcher._convert_to_thrift_link(link))
151194

152195
def _get_next_chunk_index(self) -> Optional[int]:
196+
"""Return the next *chunk_index* that should be requested from the backend, or ``None`` if we have them all."""
153197
with self._link_data_update:
154198
max_chunk_index = max(self.chunk_index_to_link.keys(), default=None)
155199
if max_chunk_index is None:
@@ -158,6 +202,10 @@ def _get_next_chunk_index(self) -> Optional[int]:
158202
return max_link.next_chunk_index
159203

160204
def _trigger_next_batch_download(self) -> bool:
205+
"""Fetch the next batch of links from the backend and return *True* on success."""
206+
logger.debug(
207+
"LinkFetcher[%s]: requesting next batch of links", self._statement_id
208+
)
161209
next_chunk_index = self._get_next_chunk_index()
162210
if next_chunk_index is None:
163211
return False
@@ -176,9 +224,20 @@ def _trigger_next_batch_download(self) -> bool:
176224
self._link_data_update.notify_all()
177225
return False
178226

227+
logger.debug(
228+
"LinkFetcher[%s]: received %d new link(s)",
229+
self._statement_id,
230+
len(links),
231+
)
179232
return True
180233

181234
def get_chunk_link(self, chunk_index: int) -> ExternalLink:
235+
"""Return (blocking) the :class:`ExternalLink` associated with *chunk_index*."""
236+
logger.debug(
237+
"LinkFetcher[%s]: waiting for link of chunk %d",
238+
self._statement_id,
239+
chunk_index,
240+
)
182241
if chunk_index >= self.total_chunk_count:
183242
raise ValueError(
184243
f"Chunk index {chunk_index} is out of range for total chunk count {self.total_chunk_count}"
@@ -213,19 +272,29 @@ def _convert_to_thrift_link(link: ExternalLink) -> TSparkArrowResultLink:
213272
)
214273

215274
def _worker_loop(self):
275+
"""Entry point for the background thread."""
276+
logger.debug("LinkFetcher[%s]: worker thread started", self._statement_id)
216277
while not self._shutdown_event.is_set():
217278
links_downloaded = self._trigger_next_batch_download()
218279
if not links_downloaded:
219280
self._shutdown_event.set()
281+
logger.debug("LinkFetcher[%s]: worker thread exiting", self._statement_id)
220282
self._link_data_update.notify_all()
221283

222284
def start(self):
223-
self._worker_thread = threading.Thread(target=self._worker_loop)
285+
"""Spawn the worker thread."""
286+
logger.debug("LinkFetcher[%s]: starting worker thread", self._statement_id)
287+
self._worker_thread = threading.Thread(
288+
target=self._worker_loop, name=f"LinkFetcher-{self._statement_id}"
289+
)
224290
self._worker_thread.start()
225291

226292
def stop(self):
293+
"""Signal the worker thread to stop and wait for its termination."""
294+
logger.debug("LinkFetcher[%s]: stopping worker thread", self._statement_id)
227295
self._shutdown_event.set()
228296
self._worker_thread.join()
297+
logger.debug("LinkFetcher[%s]: worker thread stopped", self._statement_id)
229298

230299

231300
class SeaCloudFetchQueue(CloudFetchQueue):

0 commit comments

Comments
 (0)