diff --git a/.github/workflows/python-ci.yml b/.github/workflows/python-ci.yml index 772d198e28..8b4f16dd0f 100644 --- a/.github/workflows/python-ci.yml +++ b/.github/workflows/python-ci.yml @@ -58,6 +58,8 @@ jobs: python-version: ${{ matrix.python }} cache: poetry cache-dependency-path: ./poetry.lock + - name: Install system dependencies + run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos - name: Install run: make install-dependencies - name: Linters diff --git a/.github/workflows/python-integration.yml b/.github/workflows/python-integration.yml index 8b0a8a97f0..9b4efc4111 100644 --- a/.github/workflows/python-integration.yml +++ b/.github/workflows/python-integration.yml @@ -50,6 +50,8 @@ jobs: - uses: actions/checkout@v4 with: fetch-depth: 2 + - name: Install system dependencies + run: sudo apt-get update && sudo apt-get install -y libkrb5-dev # for kerberos - name: Install run: make install - name: Run integration tests diff --git a/poetry.lock b/poetry.lock index 4ac55de26a..e1aff52534 100644 --- a/poetry.lock +++ b/poetry.lock @@ -58,7 +58,7 @@ description = "Happy Eyeballs for asyncio" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "aiohappyeyeballs-2.6.1-py3-none-any.whl", hash = "sha256:f349ba8f4b75cb25c99c5c2d84e997e485204d2902a9597802b0371f09331fb8"}, {file = "aiohappyeyeballs-2.6.1.tar.gz", hash = "sha256:c3f9d0113123803ccadfdf3f0faa505bc78e6a72d1cc4806cbd719826e943558"}, @@ -71,7 +71,7 @@ description = "Async http client/server framework (asyncio)" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "aiohttp-3.11.14-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:e2bc827c01f75803de77b134afdbf74fa74b62970eafdf190f3244931d7a5c0d"}, {file = "aiohttp-3.11.14-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e365034c5cf6cf74f57420b57682ea79e19eb29033399dd3f40de4d0171998fa"}, @@ -196,7 +196,7 @@ description = "aiosignal: a list of registered asynchronous callbacks" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" files = [ {file = "aiosignal-1.3.2-py2.py3-none-any.whl", hash = "sha256:45cde58e409a301715980c2b01d0c28bdde3770d8290b5eb2173759d9acb31a5"}, {file = "aiosignal-1.3.2.tar.gz", hash = "sha256:a8c255c66fafb1e499c9351d0bf32ff2d8a0321595ebac3b93713656d2436f54"}, @@ -248,7 +248,7 @@ description = "Timeout context manager for asyncio programs" optional = true python-versions = ">=3.8" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and python_version <= \"3.10\"" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and python_version <= \"3.10\"" files = [ {file = "async_timeout-5.0.1-py3-none-any.whl", hash = "sha256:39e3809566ff85354557ec2398b55e096c8364bacac9405a7a1fa429e77fe76c"}, {file = "async_timeout-5.0.1.tar.gz", hash = "sha256:d9321a7a3d5a6a5e187e824d2fa0793ce379a202935782d555d6e9d2735677d3"}, @@ -265,7 +265,7 @@ files = [ {file = "attrs-25.3.0-py3-none-any.whl", hash = "sha256:427318ce031701fea540783410126f03899a97ffc6f61596ad581ac2e40e3bc3"}, {file = "attrs-25.3.0.tar.gz", hash = "sha256:75d7cefc7fb576747b2c81b4442d4d4a1ce0900973527c011d1030fd3bf4af1b"}, ] -markers = {main = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")"} +markers = {main = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")"} [package.extras] benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] @@ -467,7 +467,7 @@ files = [ {file = "boto3-1.37.1-py3-none-any.whl", hash = "sha256:4320441f904435a1b85e6ecb81793192e522c737cc9ed6566014e29f0a11cb22"}, {file = "boto3-1.37.1.tar.gz", hash = "sha256:96d18f7feb0c1fcb95f8837b74b6c8880e1b4e35ce5f8a8f8cb243a090c278ed"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\""} [package.dependencies] botocore = ">=1.37.1,<1.38.0" @@ -488,7 +488,7 @@ files = [ {file = "botocore-1.37.1-py3-none-any.whl", hash = "sha256:c1db1bfc5d8c6b3b6d1ca6794f605294b4264e82a7e727b88e0fef9c2b9fbb9c"}, {file = "botocore-1.37.1.tar.gz", hash = "sha256:b194db8fb2a0ffba53568c364ae26166e7eec0445496b2ac86a6e142f3dd982f"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} [package.dependencies] jmespath = ">=0.7.1,<2.0.0" @@ -1441,7 +1441,7 @@ description = "A list-like structure which implements collections.abc.MutableSeq optional = true python-versions = ">=3.8" groups = ["main"] -markers = "(extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\") and (extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\" or extra == \"ray\")" +markers = "(extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\") and (extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"ray\")" files = [ {file = "frozenlist-1.5.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:5b6a66c18b5b9dd261ca98dffcb826a525334b2f29e7caa54e182255c5f6a65a"}, {file = "frozenlist-1.5.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d1b3eb7b05ea246510b43a7e53ed1653e55c2121019a97e60cad7efb881a97bb"}, @@ -2157,7 +2157,7 @@ files = [ {file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"}, {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\" or extra == \"s3fs\""} [[package]] name = "joserfc" @@ -2277,6 +2277,21 @@ markers = {main = "extra == \"ray\""} [package.dependencies] referencing = ">=0.31.0" +[[package]] +name = "kerberos" +version = "1.3.1" +description = "Kerberos high-level interface" +optional = true +python-versions = "*" +groups = ["main"] +markers = "extra == \"hive-kerberos\"" +files = [ + {file = "kerberos-1.3.1-cp27-cp27m-macosx_11_1_x86_64.whl", hash = "sha256:98a695c072efef535cb2b5f98e474d00671588859a94ec96c2c1508a113ff3aa"}, + {file = "kerberos-1.3.1-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:2e03c6a9d201d4aab5f899bfb8150de15335955bfce8ca43bfe9a41d7aae54dc"}, + {file = "kerberos-1.3.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:2002b3b1541fc51e2c081ee7048f55e5d9ca63dd09f0d7b951c263920db3a0bb"}, + {file = "kerberos-1.3.1.tar.gz", hash = "sha256:cdd046142a4e0060f96a00eb13d82a5d9ebc0f2d7934393ed559bac773460a2c"}, +] + [[package]] name = "lazy-object-proxy" version = "1.10.0" @@ -2960,7 +2975,7 @@ description = "multidict implementation" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "multidict-6.2.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:b9f6392d98c0bd70676ae41474e2eecf4c7150cb419237a41f8f96043fcb81d1"}, {file = "multidict-6.2.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:3501621d5e86f1a88521ea65d5cad0a0834c77b26f193747615b7c911e5422d2"}, @@ -3554,7 +3569,7 @@ description = "Accelerated property cache" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "propcache-0.3.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:f27785888d2fdd918bc36de8b8739f2d6c791399552333721b58193f68ea3e98"}, {file = "propcache-0.3.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4e89cde74154c7b5957f87a355bb9c8ec929c167b59c83d90654ea36aeb6180"}, @@ -3868,7 +3883,7 @@ files = [ {file = "pyarrow-19.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:8464c9fbe6d94a7fe1599e7e8965f350fd233532868232ab2596a71586c5a429"}, {file = "pyarrow-19.0.1.tar.gz", hash = "sha256:3bf266b485df66a400f282ac0b6d1b500b9d2ae73314a153dbe97d6d5cc8a99e"}, ] -markers = {main = "extra == \"daft\" or extra == \"duckdb\" or extra == \"pandas\" or extra == \"pyarrow\" or extra == \"ray\""} +markers = {main = "extra == \"pyarrow\" or extra == \"pandas\" or extra == \"duckdb\" or extra == \"ray\" or extra == \"daft\""} [package.extras] test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] @@ -4948,7 +4963,7 @@ files = [ {file = "s3transfer-0.11.3-py3-none-any.whl", hash = "sha256:ca855bdeb885174b5ffa95b9913622459d4ad8e331fc98eb01e6d5eb6a30655d"}, {file = "s3transfer-0.11.3.tar.gz", hash = "sha256:edae4977e3a122445660c7c114bba949f9d191bae3b34a096f18a1c8c354527a"}, ] -markers = {main = "extra == \"dynamodb\" or extra == \"glue\" or extra == \"rest-sigv4\""} +markers = {main = "extra == \"glue\" or extra == \"dynamodb\" or extra == \"rest-sigv4\""} [package.dependencies] botocore = ">=1.36.0,<2.0a.0" @@ -5684,7 +5699,7 @@ description = "Yet another URL library" optional = true python-versions = ">=3.9" groups = ["main"] -markers = "extra == \"adlfs\" or extra == \"gcsfs\" or extra == \"s3fs\"" +markers = "extra == \"s3fs\" or extra == \"adlfs\" or extra == \"gcsfs\"" files = [ {file = "yarl-1.18.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:7df647e8edd71f000a5208fe6ff8c382a1de8edfbccdbbfe649d263de07d8c34"}, {file = "yarl-1.18.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:c69697d3adff5aa4f874b19c0e4ed65180ceed6318ec856ebc423aa5850d84f7"}, @@ -5918,7 +5933,7 @@ dynamodb = ["boto3"] gcsfs = ["gcsfs"] glue = ["boto3", "mypy-boto3-glue"] hive = ["thrift"] -hive-kerberos = ["thrift", "thrift-sasl"] +hive-kerberos = ["kerberos", "thrift", "thrift-sasl"] pandas = ["pandas", "pyarrow"] polars = ["polars"] pyarrow = ["pyarrow"] @@ -5934,4 +5949,4 @@ zstandard = ["zstandard"] [metadata] lock-version = "2.1" python-versions = "^3.9.2, !=3.9.7" -content-hash = "1f312d7fc9e6eb6b41a2b8035b74b0bdef6578f6bd01bddc836271c25083f7cd" +content-hash = "399953abc8989b7801af4507a6d0ac7660936d2db5f3661ce4503272ef803873" diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 2d411cb409..75a63e0ae7 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -18,7 +18,6 @@ import logging import socket import time -from functools import cached_property from types import TracebackType from typing import ( TYPE_CHECKING, @@ -160,7 +159,6 @@ def _init_thrift_transport(self) -> TTransport: else: return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive") - @cached_property def _client(self) -> Client: protocol = TBinaryProtocol.TBinaryProtocol(self._transport) client = Client(protocol) @@ -173,11 +171,11 @@ def __enter__(self) -> Client: if not self._transport.isOpen(): try: self._transport.open() - except TTransport.TTransportException: + except (TypeError, TTransport.TTransportException): # reinitialize _transport self._transport = self._init_thrift_transport() self._transport.open() - return self._client + return self._client() # recreate the client def __exit__( self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] diff --git a/pyproject.toml b/pyproject.toml index f73d39d56d..ca729fda9d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -83,6 +83,7 @@ cachetools = "^5.5.0" pyiceberg-core = { version = "^0.4.0", optional = true } polars = { version = "^1.21.0", optional = true } thrift-sasl = { version = ">=0.4.3", optional = true } +kerberos = {version = "^1.3.1", optional = true} [tool.poetry.group.dev.dependencies] pytest = "7.4.4" @@ -295,7 +296,7 @@ daft = ["getdaft"] polars = ["polars"] snappy = ["python-snappy"] hive = ["thrift"] -hive-kerberos = ["thrift", "thrift_sasl"] +hive-kerberos = ["thrift", "thrift_sasl", "kerberos"] s3fs = ["s3fs"] glue = ["boto3", "mypy-boto3-glue"] adlfs = ["adlfs"] diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 07cd79d4c7..99d1c67cb4 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -15,12 +15,18 @@ # specific language governing permissions and limitations # under the License. # pylint: disable=protected-access,redefined-outer-name +import base64 import copy +import struct +import threading import uuid +from collections.abc import Generator from copy import deepcopy +from typing import Optional from unittest.mock import MagicMock, call, patch import pytest +import thrift.transport.TSocket from hive_metastore.ttypes import ( AlreadyExistsException, FieldSchema, @@ -38,11 +44,13 @@ from pyiceberg.catalog import PropertiesUpdateSummary from pyiceberg.catalog.hive import ( + HIVE_KERBEROS_AUTH, LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, HiveCatalog, _construct_hive_storage_descriptor, + _HiveClient, ) from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, @@ -183,6 +191,59 @@ def hive_database(tmp_path_factory: pytest.TempPathFactory) -> HiveDatabase: ) +class SaslServer(threading.Thread): + def __init__(self, socket: thrift.transport.TSocket.TServerSocket, response: bytes) -> None: + super().__init__() + self.daemon = True + self._socket = socket + self._response = response + self._port = None + self._port_bound = threading.Event() + + def run(self) -> None: + self._socket.listen() + + try: + address = self._socket.handle.getsockname() + # AF_INET addresses are 2-tuples (host, port) and AF_INET6 are + # 4-tuples (host, port, ...), i.e. port is always at index 1. + _host, self._port, *_ = address + finally: + self._port_bound.set() + + # Accept connections and respond to each connection with the same message. + # The responsibility for closing the connection is on the client + while True: + try: + client = self._socket.accept() + if client: + client.write(self._response) + client.flush() + except Exception: + pass + + @property + def port(self) -> Optional[int]: + self._port_bound.wait() + return self._port + + def close(self) -> None: + self._socket.close() + + +@pytest.fixture(scope="session") +def kerberized_hive_metastore_fake_url() -> Generator[str, None, None]: + server = SaslServer( + # Port 0 means pick any available port. + socket=thrift.transport.TSocket.TServerSocket(port=0), + # Always return a message with status 5 (COMPLETE). + response=struct.pack(">BI", 5, 0), + ) + server.start() + yield f"thrift://localhost:{server.port}" + server.close() + + def test_no_uri_supplied() -> None: with pytest.raises(KeyError): HiveCatalog("production") @@ -1239,3 +1300,45 @@ def test_create_hive_client_failure() -> None: with pytest.raises(Exception, match="Connection failed"): HiveCatalog._create_hive_client(properties) assert mock_hive_client.call_count == 2 + + +def test_create_hive_client_with_kerberos( + kerberized_hive_metastore_fake_url: str, +) -> None: + properties = { + "uri": kerberized_hive_metastore_fake_url, + "ugi": "user", + HIVE_KERBEROS_AUTH: "true", + } + client = HiveCatalog._create_hive_client(properties) + assert client is not None + + +def test_create_hive_client_with_kerberos_using_context_manager( + kerberized_hive_metastore_fake_url: str, +) -> None: + client = _HiveClient( + uri=kerberized_hive_metastore_fake_url, + kerberos_auth=True, + ) + with ( + patch( + "puresasl.mechanisms.kerberos.authGSSClientStep", + return_value=None, + ), + patch( + "puresasl.mechanisms.kerberos.authGSSClientResponse", + return_value=base64.b64encode(b"Some Response"), + ), + patch( + "puresasl.mechanisms.GSSAPIMechanism.complete", + return_value=True, + ), + ): + with client as open_client: + assert open_client._iprot.trans.isOpen() + + # Use the context manager a second time to see if + # closing and re-opening work as expected. + with client as open_client: + assert open_client._iprot.trans.isOpen()