Skip to content

Commit 1143838

Browse files
committed
preliminary connection closure func
1 parent fd81c5a commit 1143838

File tree

4 files changed

+28
-11
lines changed

4 files changed

+28
-11
lines changed

src/databricks/sql/auth/thrift_http_client.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def startRetryTimer(self):
105105
self.retry_policy and self.retry_policy.start_retry_timer()
106106

107107
def open(self):
108-
109108
# self.__pool replaces the self.__http used by the original THttpClient
110109
_pool_kwargs = {"maxsize": self.max_connections}
111110

@@ -140,19 +139,21 @@ def open(self):
140139
else:
141140
self.__pool = pool_class(self.host, self.port, **_pool_kwargs)
142141

143-
def close(self):
142+
def release_connection(self):
144143
self.__resp and self.__resp.drain_conn()
145144
self.__resp and self.__resp.release_conn()
146145
self.__resp = None
147146

147+
def close(self):
148+
self.__pool.close()
149+
148150
def read(self, sz):
149151
return self.__resp.read(sz)
150152

151153
def isOpen(self):
152154
return self.__resp is not None
153155

154156
def flush(self):
155-
156157
# Pull data out of buffer that will be sent in this request
157158
data = self.__wbuf.getvalue()
158159
self.__wbuf = BytesIO()

src/databricks/sql/backend/sea/backend.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def open_session(
273273

274274
return SessionId.from_sea_session_id(session_id)
275275

276-
def close_session(self, session_id: SessionId) -> None:
276+
def _close_session(self, session_id: SessionId) -> None:
277277
"""
278278
Closes an existing session with the Databricks SQL service.
279279
@@ -285,8 +285,6 @@ def close_session(self, session_id: SessionId) -> None:
285285
OperationalError: If there's an error closing the session
286286
"""
287287

288-
logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id)
289-
290288
if session_id.backend_type != BackendType.SEA:
291289
raise ValueError("Not a valid SEA session ID")
292290
sea_session_id = session_id.to_sea_session_id()
@@ -302,6 +300,23 @@ def close_session(self, session_id: SessionId) -> None:
302300
data=request_data.to_dict(),
303301
)
304302

303+
def close_session(self, session_id: SessionId) -> None:
304+
"""
305+
Closes the session and the underlying HTTP client.
306+
307+
Args:
308+
session_id: The session identifier returned by open_session()
309+
310+
Raises:
311+
ValueError: If the session ID is invalid
312+
OperationalError: If there's an error closing the session
313+
"""
314+
315+
logger.debug("SeaDatabricksClient.close_session(session_id=%s)", session_id)
316+
317+
self._close_session(session_id)
318+
self._http_client.close()
319+
305320
def _extract_description_from_manifest(
306321
self, manifest: ResultManifest
307322
) -> List[Tuple]:

src/databricks/sql/backend/sea/utils/http_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ def _open(self):
197197
def close(self):
198198
"""Close the connection pool."""
199199
if self._pool:
200-
self._pool.clear()
200+
self._pool.close()
201201

202202
def using_proxy(self) -> bool:
203203
"""Check if proxy is being used."""

src/databricks/sql/backend/thrift_backend.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ def __init__(
232232
try:
233233
self._transport.open()
234234
except:
235-
self._transport.close()
235+
self._transport.release_connection()
236236
raise
237237

238238
self._request_lock = threading.RLock()
@@ -478,7 +478,7 @@ def attempt_request(attempt):
478478
)
479479
finally:
480480
# Calling `close()` here releases the active HTTP connection back to the pool
481-
self._transport.close()
481+
self._transport.release_connection()
482482

483483
return RequestErrorInfo(
484484
error=error,
@@ -607,7 +607,7 @@ def open_session(self, session_configuration, catalog, schema) -> SessionId:
607607
self._session_id_hex = session_id.hex_guid
608608
return session_id
609609
except:
610-
self._transport.close()
610+
self._transport.release_connection()
611611
raise
612612

613613
def close_session(self, session_id: SessionId) -> None:
@@ -619,7 +619,8 @@ def close_session(self, session_id: SessionId) -> None:
619619
try:
620620
self.make_request(self._client.CloseSession, req)
621621
finally:
622-
self._transport.close()
622+
self._transport.release_connection()
623+
self._transport.close()
623624

624625
def _check_command_not_in_error_or_closed_state(
625626
self, op_handle, get_operations_resp

0 commit comments

Comments
 (0)