Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ def __enter__(self) -> Client:
try:
self._transport.open()
except (TypeError, TTransport.TTransportException):
# Close the old transport before reinitializing to prevent resource leaks
try:
self._transport.close()
except Exception:
pass
# reinitialize _transport
self._transport = self._init_thrift_transport()
self._transport.open()
Expand Down
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,6 @@ markers = [
# Turns a warning into an error
filterwarnings = [
"error",
"ignore:A plugin raised an exception during an old-style hookwrapper teardown.",
"ignore:unclosed <socket.socket",
# Remove this in a future release of PySpark.
"ignore:distutils Version classes are deprecated. Use packaging.version instead.",
# Remove this in a future release of PySpark. https://github.com/apache/iceberg-python/issues/1349
Expand Down
9 changes: 9 additions & 0 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ def __init__(self, socket: thrift.transport.TSocket.TServerSocket, response: byt
self._response = response
self._port = None
self._port_bound = threading.Event()
self._clients: list[thrift.transport.TSocket.TSocket] = [] # Track accepted client connections

def run(self) -> None:
self._socket.listen()
Expand All @@ -222,6 +223,7 @@ def run(self) -> None:
try:
client = self._socket.accept()
if client:
self._clients.append(client) # Track the client
client.write(self._response)
client.flush()
except Exception:
Expand All @@ -233,6 +235,12 @@ def port(self) -> Optional[int]:
return self._port

def close(self) -> None:
# Close all client connections first
for client in self._clients:
try:
client.close()
except Exception:
pass
self._socket.close()


Expand Down Expand Up @@ -1392,6 +1400,7 @@ def test_create_hive_client_with_kerberos_using_context_manager(
with client as open_client:
assert open_client._iprot.trans.isOpen()

assert not open_client._iprot.trans.isOpen()
# Use the context manager a second time to see if
# closing and re-opening work as expected.
with client as open_client:
Expand Down