@@ -126,9 +126,9 @@ class LinkFetcher:
126126 def __init__ (
127127 self ,
128128 download_manager : ResultFileDownloadManager ,
129- backend : " SeaDatabricksClient" ,
129+ backend : SeaDatabricksClient ,
130130 statement_id : str ,
131- initial_links : List [" ExternalLink" ],
131+ initial_links : List [ExternalLink ],
132132 total_chunk_count : int ,
133133 ):
134134 self .download_manager = download_manager
@@ -139,12 +139,12 @@ def __init__(
139139
140140 self ._link_data_update = threading .Condition ()
141141 self ._error : Optional [Exception ] = None
142- self .chunk_index_to_link : Dict [int , " ExternalLink" ] = {}
142+ self .chunk_index_to_link : Dict [int , ExternalLink ] = {}
143143
144144 self ._add_links (initial_links )
145145 self .total_chunk_count = total_chunk_count
146146
147- def _add_links (self , links : List [" ExternalLink" ]):
147+ def _add_links (self , links : List [ExternalLink ]):
148148 for link in links :
149149 self .chunk_index_to_link [link .chunk_index ] = link
150150 self .download_manager .add_link (LinkFetcher ._convert_to_thrift_link (link ))
@@ -178,7 +178,7 @@ def _trigger_next_batch_download(self) -> bool:
178178
179179 return True
180180
181- def get_chunk_link (self , chunk_index : int ) -> Optional [" ExternalLink" ]:
181+ def get_chunk_link (self , chunk_index : int ) -> Optional [ExternalLink ]:
182182 if chunk_index >= self .total_chunk_count :
183183 return None
184184
@@ -197,7 +197,7 @@ def get_chunk_link(self, chunk_index: int) -> Optional["ExternalLink"]:
197197 return self .chunk_index_to_link .get (chunk_index , None )
198198
199199 @staticmethod
200- def _convert_to_thrift_link (link : " ExternalLink" ) -> TSparkArrowResultLink :
200+ def _convert_to_thrift_link (link : ExternalLink ) -> TSparkArrowResultLink :
201201 """Convert SEA external links to Thrift format for compatibility with existing download manager."""
202202 # Parse the ISO format expiration time
203203 expiry_time = int (dateutil .parser .parse (link .expiration ).timestamp ())
@@ -306,10 +306,6 @@ def __init__(
306306
307307 def _create_next_table (self ) -> Union ["pyarrow.Table" , None ]:
308308 """Create next table by retrieving the logical next downloaded file."""
309- if not self .download_manager :
310- logger .debug ("SeaCloudFetchQueue: No download manager, returning" )
311- return None
312-
313309 chunk_link = self .link_fetcher .get_chunk_link (self .current_chunk_index )
314310 if not chunk_link :
315311 return None
@@ -320,3 +316,7 @@ def _create_next_table(self) -> Union["pyarrow.Table", None]:
320316 self .current_chunk_index += 1
321317
322318 return arrow_table
319+
320+ def close (self ):
321+ super ().close ()
322+ self .link_fetcher .stop ()
0 commit comments