Skip to content

Commit a7f3994

Browse files
Niall Egansusodapop
authored andcommitted
Implement simple .connect and close for client
## What changes are proposed in this pull request? A simple implementation of the `connect` and `close` methods for the new client. There are a number of follow-on tickets: - Implement support for the SSL args (https://databricks.atlassian.net/browse/SC-77240) - Implement support for no-explicit open execution (https://databricks.atlassian.net/browse/SC-77289) - Implement support for the client explicitly setting the session id (https://databricks.atlassian.net/browse/SC-77290) But I plan to return to those after I have the basic execute & fetch functionality. Also fixing the flakiness we've seen in the driver local tests and re-enabling them. It seems as though the command is sometimes still returned in the running state (and was also sometimes timing out). ## How is this tested? - Added a new DriverLocalTest that checks we can make a connection without everything blowing up - New unit tests - Running the driver local test suites 5 times to ensure that the flakiness was fixed Author: Niall Egan <niall.egan@databricks.com> Author: Ubuntu <ubuntu@ip-10-110-20-37.us-west-2.compute.internal> #97384 is resolved by NiallEgan/connection_method_for_client.
0 parents  commit a7f3994

File tree

5 files changed

+2619
-0
lines changed

5 files changed

+2619
-0
lines changed
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import logging
2+
from typing import Dict, Tuple, List
3+
4+
import grpc
5+
from google.protobuf import message
6+
7+
import cmdexec.clients.python.sql_command_service_pb2 as command_pb2
8+
from cmdexec.clients.python.sql_command_service_pb2_grpc import SqlCommandServiceStub
9+
from cmdexec.clients.python.errors import OperationalError, InterfaceError
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
def connect(**kwargs):
15+
return Connection(**kwargs)
16+
17+
18+
class Connection:
19+
def __init__(self, **kwargs):
20+
try:
21+
self.host = kwargs["HOST"]
22+
self.port = kwargs["PORT"]
23+
except KeyError:
24+
raise InterfaceError("Please include arguments HOST and PORT in kwargs for Connection")
25+
26+
self._base_client = CmdExecBaseHttpClient(self.host, self.port, kwargs.get("metadata", []))
27+
open_session_request = command_pb2.OpenSessionRequest(
28+
configuration={},
29+
client_session_id=None,
30+
session_info_fields=None,
31+
)
32+
33+
try:
34+
resp = self._base_client.make_request(self._base_client.stub.OpenSession,
35+
open_session_request)
36+
self.session_id = resp.id
37+
logger.info("Successfully opened session " + str(self.session_id.hex()))
38+
except grpc.RpcError as error:
39+
raise OperationalError("Error during database connection", error)
40+
41+
def cursor(self):
42+
pass
43+
44+
def close(self):
45+
try:
46+
close_session_request = command_pb2.CloseSessionRequest(id=self.session_id)
47+
self._base_client.make_request(self._base_client.stub.CloseSession,
48+
close_session_request)
49+
except grpc.RpcError as error:
50+
raise OperationalError("Error during database connection close", error)
51+
52+
53+
class CmdExecBaseHttpClient:
54+
"""
55+
A thin wrapper around a gRPC channel that takes cares of headers etc.
56+
"""
57+
58+
def __init__(self, host: str, port: int, http_headers: List[Tuple[str, str]]):
59+
self.host_url = host + ":" + str(port)
60+
self.http_headers = [(k.lower(), v) for (k, v) in http_headers]
61+
self.channel = grpc.insecure_channel(self.host_url)
62+
self.stub = SqlCommandServiceStub(self.channel)
63+
64+
def make_request(self, method, request):
65+
try:
66+
response = method(request, metadata=self.http_headers)
67+
logger.info("Received message: %s", response)
68+
return response
69+
except grpc.RpcError as error:
70+
logger.error("Received error during gRPC request: %s", error)
71+
raise error

cmdexec/clients/python/errors.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
class Error(Exception):
2+
pass
3+
4+
5+
class Warning(Exception):
6+
pass
7+
8+
9+
class InterfaceError(Error):
10+
pass
11+
12+
13+
class DatabaseError(Error):
14+
pass
15+
16+
17+
class InternalError(DatabaseError):
18+
pass
19+
20+
21+
class OperationalError(DatabaseError):
22+
pass
23+
24+
25+
class ProgrammingError(DatabaseError):
26+
pass
27+
28+
29+
class IntegrityError(DatabaseError):
30+
pass
31+
32+
33+
class DataError(DatabaseError):
34+
pass
35+
36+
37+
class NotSupportedError(DatabaseError):
38+
pass

0 commit comments

Comments
 (0)