From 950bd17ec9f26532ce9b32cd04c326d155ccc037 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 1 Mar 2025 09:41:39 -0800 Subject: [PATCH 1/6] reinit client --- pyiceberg/catalog/hive.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index ec832727a2..42f4a29e46 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -168,6 +168,10 @@ def _init_thrift_client(self) -> None: self._client = Client(protocol) def __enter__(self) -> Client: + # If the transport is closed, reinitialize it + if not self._transport.isOpen(): + self._init_thrift_client() + self._transport.open() if self._ugi: self._client.set_ugi(*self._ugi) From 1407338f3edd9fde1fcc6d776d2f65d55b113424 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sun, 2 Mar 2025 10:45:48 -0800 Subject: [PATCH 2/6] reinit transport --- pyiceberg/catalog/hive.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 42f4a29e46..7beb9da2d9 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -168,19 +168,26 @@ def _init_thrift_client(self) -> None: self._client = Client(protocol) def __enter__(self) -> Client: - # If the transport is closed, reinitialize it + """Ensure transport is open before returning the client.""" + if self._transport is None or not self._transport.isOpen(): + self._init_thrift_client() # Reinitialize transport if closed + if not self._transport.isOpen(): - self._init_thrift_client() + self._transport.open() - self._transport.open() if self._ugi: self._client.set_ugi(*self._ugi) + return self._client def __exit__( self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] ) -> None: - self._transport.close() + """Close transport if it was opened.""" + if self._transport: + self._transport.close() + self._transport = None # Reset transport so a new one is created next time + self._client = None def _construct_hive_storage_descriptor( From f0b4003eea1d194dbcccc83b5d4bd4e7bd9f7050 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 11 Apr 2025 18:49:23 -0700 Subject: [PATCH 3/6] re-init transport --- pyiceberg/catalog/hive.py | 41 ++++++++++++++------------------------- 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 7beb9da2d9..3524f6e6e8 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -18,6 +18,7 @@ import logging import socket import time +from functools import cached_property from types import TracebackType from typing import ( TYPE_CHECKING, @@ -142,52 +143,40 @@ class _HiveClient: """Helper class to nicely open and close the transport.""" - _transport: TTransport - _client: Client - _ugi: Optional[List[str]] - def __init__(self, uri: str, ugi: Optional[str] = None, kerberos_auth: Optional[bool] = HIVE_KERBEROS_AUTH_DEFAULT): self._uri = uri self._kerberos_auth = kerberos_auth self._ugi = ugi.split(":") if ugi else None - self._init_thrift_client() - - def _init_thrift_client(self) -> None: + def _init_thrift_transport(self) -> TTransport: url_parts = urlparse(self._uri) - socket = TSocket.TSocket(url_parts.hostname, url_parts.port) - if not self._kerberos_auth: - self._transport = TTransport.TBufferedTransport(socket) + return TTransport.TBufferedTransport(socket) else: - self._transport = TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive") + return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive") + @cached_property + def _client(self) -> Client: + self._transport = self._init_thrift_transport() protocol = TBinaryProtocol.TBinaryProtocol(self._transport) - - self._client = Client(protocol) - - def __enter__(self) -> Client: - """Ensure transport is open before returning the client.""" - if self._transport is None or not self._transport.isOpen(): - self._init_thrift_client() # Reinitialize transport if closed - - if not self._transport.isOpen(): - self._transport.open() - + client = Client(protocol) if self._ugi: - self._client.set_ugi(*self._ugi) + client.set_ugi(*self._ugi) + return client + def __enter__(self) -> Client: + """Reinitialize transport if was closed.""" + if self._transport and not self._transport.isOpen(): + self._transport = self._init_thrift_transport() return self._client def __exit__( self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] ) -> None: """Close transport if it was opened.""" - if self._transport: + if self._transport and self._transport.isOpen(): self._transport.close() - self._transport = None # Reset transport so a new one is created next time - self._client = None def _construct_hive_storage_descriptor( From f40e75fe9b0836c30b10d288cdcb1f4409b45645 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Fri, 11 Apr 2025 20:51:35 -0700 Subject: [PATCH 4/6] call transport.open() --- pyiceberg/catalog/hive.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 3524f6e6e8..52e07fb509 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -147,6 +147,7 @@ def __init__(self, uri: str, ugi: Optional[str] = None, kerberos_auth: Optional[ self._uri = uri self._kerberos_auth = kerberos_auth self._ugi = ugi.split(":") if ugi else None + self._transport = self._init_thrift_transport() def _init_thrift_transport(self) -> TTransport: url_parts = urlparse(self._uri) @@ -158,7 +159,6 @@ def _init_thrift_transport(self) -> TTransport: @cached_property def _client(self) -> Client: - self._transport = self._init_thrift_transport() protocol = TBinaryProtocol.TBinaryProtocol(self._transport) client = Client(protocol) if self._ugi: @@ -169,6 +169,7 @@ def __enter__(self) -> Client: """Reinitialize transport if was closed.""" if self._transport and not self._transport.isOpen(): self._transport = self._init_thrift_transport() + self._transport.open() return self._client def __exit__( From d788c8d65a2dd85a201a1782768afeca45fd7e2d Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Sat, 12 Apr 2025 18:58:50 -0700 Subject: [PATCH 5/6] fix --- pyiceberg/catalog/hive.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 52e07fb509..7ff153387c 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -166,9 +166,10 @@ def _client(self) -> Client: return client def __enter__(self) -> Client: - """Reinitialize transport if was closed.""" - if self._transport and not self._transport.isOpen(): + """Make sure the transport is initialized and open.""" + if not self._transport: self._transport = self._init_thrift_transport() + if not self._transport.isOpen(): self._transport.open() return self._client From 88e1dbf545392172f3bf850b2877b8fafddc2111 Mon Sep 17 00:00:00 2001 From: Kevin Liu Date: Tue, 15 Apr 2025 12:06:49 -0700 Subject: [PATCH 6/6] address feedback --- pyiceberg/catalog/hive.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 7ff153387c..2d411cb409 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -143,6 +143,9 @@ class _HiveClient: """Helper class to nicely open and close the transport.""" + _transport: TTransport + _ugi: Optional[List[str]] + def __init__(self, uri: str, ugi: Optional[str] = None, kerberos_auth: Optional[bool] = HIVE_KERBEROS_AUTH_DEFAULT): self._uri = uri self._kerberos_auth = kerberos_auth @@ -167,17 +170,20 @@ def _client(self) -> Client: def __enter__(self) -> Client: """Make sure the transport is initialized and open.""" - if not self._transport: - self._transport = self._init_thrift_transport() if not self._transport.isOpen(): - self._transport.open() + try: + self._transport.open() + except TTransport.TTransportException: + # reinitialize _transport + self._transport = self._init_thrift_transport() + self._transport.open() return self._client def __exit__( self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] ) -> None: """Close transport if it was opened.""" - if self._transport and self._transport.isOpen(): + if self._transport.isOpen(): self._transport.close()