-
Notifications
You must be signed in to change notification settings - Fork 414
Fix thrift client connection for Kerberos Hive Client #1747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| import logging | ||
| import socket | ||
| import time | ||
| from functools import cached_property | ||
| from types import TracebackType | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
|
|
@@ -143,40 +144,47 @@ class _HiveClient: | |
| """Helper class to nicely open and close the transport.""" | ||
|
|
||
| _transport: TTransport | ||
kevinjqliu marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| _client: Client | ||
| _ugi: Optional[List[str]] | ||
|
|
||
| def __init__(self, uri: str, ugi: Optional[str] = None, kerberos_auth: Optional[bool] = HIVE_KERBEROS_AUTH_DEFAULT): | ||
| self._uri = uri | ||
| self._kerberos_auth = kerberos_auth | ||
| self._ugi = ugi.split(":") if ugi else None | ||
| self._transport = self._init_thrift_transport() | ||
|
|
||
| self._init_thrift_client() | ||
|
|
||
| def _init_thrift_client(self) -> None: | ||
| def _init_thrift_transport(self) -> TTransport: | ||
| url_parts = urlparse(self._uri) | ||
|
|
||
| socket = TSocket.TSocket(url_parts.hostname, url_parts.port) | ||
|
|
||
| if not self._kerberos_auth: | ||
| self._transport = TTransport.TBufferedTransport(socket) | ||
| return TTransport.TBufferedTransport(socket) | ||
| else: | ||
| self._transport = TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive") | ||
| return TTransport.TSaslClientTransport(socket, host=url_parts.hostname, service="hive") | ||
|
|
||
| @cached_property | ||
| def _client(self) -> Client: | ||
| protocol = TBinaryProtocol.TBinaryProtocol(self._transport) | ||
|
|
||
| self._client = Client(protocol) | ||
| client = Client(protocol) | ||
| if self._ugi: | ||
| client.set_ugi(*self._ugi) | ||
| return client | ||
|
|
||
| def __enter__(self) -> Client: | ||
| self._transport.open() | ||
| if self._ugi: | ||
| self._client.set_ugi(*self._ugi) | ||
| """Make sure the transport is initialized and open.""" | ||
| if not self._transport.isOpen(): | ||
| try: | ||
| self._transport.open() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems that we're still going to try re-opening the transport once it has been closed and since the exception raised in that case would be a |
||
| except TTransport.TTransportException: | ||
| # reinitialize _transport | ||
| self._transport = self._init_thrift_transport() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're re-initializing the transport but since |
||
| self._transport.open() | ||
| return self._client | ||
|
|
||
| def __exit__( | ||
| self, exctype: Optional[Type[BaseException]], excinst: Optional[BaseException], exctb: Optional[TracebackType] | ||
| ) -> None: | ||
| self._transport.close() | ||
| """Close transport if it was opened.""" | ||
| if self._transport.isOpen(): | ||
| self._transport.close() | ||
|
|
||
|
|
||
| def _construct_hive_storage_descriptor( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.