Skip to content

Commit 325b68e

Browse files
introduce http client (temp) and sea test file
Signed-off-by: varun-edachali-dbx <varun.edachali@databricks.com>
1 parent 3c78ed7 commit 325b68e

File tree

2 files changed

+240
-0
lines changed

2 files changed

+240
-0
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import os
2+
import sys
3+
import logging
4+
from databricks.sql.client import Connection
5+
6+
logging.basicConfig(level=logging.DEBUG)
7+
logger = logging.getLogger(__name__)
8+
9+
def test_sea_session():
10+
"""
11+
Test opening and closing a SEA session using the connector.
12+
13+
This function connects to a Databricks SQL endpoint using the SEA backend,
14+
opens a session, and then closes it.
15+
16+
Required environment variables:
17+
- DATABRICKS_SERVER_HOSTNAME: Databricks server hostname
18+
- DATABRICKS_HTTP_PATH: HTTP path for the SQL endpoint
19+
- DATABRICKS_TOKEN: Personal access token for authentication
20+
"""
21+
22+
server_hostname = os.environ.get("DATABRICKS_SERVER_HOSTNAME")
23+
http_path = os.environ.get("DATABRICKS_HTTP_PATH")
24+
access_token = os.environ.get("DATABRICKS_TOKEN")
25+
catalog = os.environ.get("DATABRICKS_CATALOG")
26+
27+
if not all([server_hostname, http_path, access_token]):
28+
logger.error("Missing required environment variables.")
29+
logger.error("Please set DATABRICKS_SERVER_HOSTNAME, DATABRICKS_HTTP_PATH, and DATABRICKS_TOKEN.")
30+
sys.exit(1)
31+
32+
logger.info(f"Connecting to {server_hostname}")
33+
logger.info(f"HTTP Path: {http_path}")
34+
if catalog:
35+
logger.info(f"Using catalog: {catalog}")
36+
37+
try:
38+
logger.info("Creating connection with SEA backend...")
39+
connection = Connection(
40+
server_hostname=server_hostname,
41+
http_path=http_path,
42+
access_token=access_token,
43+
catalog=catalog,
44+
schema="default",
45+
use_sea=True,
46+
user_agent_entry="SEA-Test-Client" # add custom user agent
47+
)
48+
49+
logger.info(f"Successfully opened SEA session with ID: {connection.get_session_id_hex()}")
50+
logger.info(f"backend type: {type(connection.session.backend)}")
51+
52+
# Close the connection
53+
logger.info("Closing the SEA session...")
54+
connection.close()
55+
logger.info("Successfully closed SEA session")
56+
57+
except Exception as e:
58+
logger.error(f"Error testing SEA session: {str(e)}")
59+
import traceback
60+
logger.error(traceback.format_exc())
61+
sys.exit(1)
62+
63+
logger.info("SEA session test completed successfully")
64+
65+
if __name__ == "__main__":
66+
test_sea_session()
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import json
2+
import logging
3+
import requests
4+
from typing import Dict, Any, Optional, Union, List
5+
from urllib.parse import urljoin
6+
7+
from databricks.sql.auth.authenticators import AuthProvider
8+
from databricks.sql.types import SSLOptions
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class CustomHttpClient:
14+
"""
15+
HTTP client for Statement Execution API (SEA).
16+
17+
This client handles the HTTP communication with the SEA endpoints,
18+
including authentication, request formatting, and response parsing.
19+
"""
20+
21+
def __init__(
22+
self,
23+
server_hostname: str,
24+
port: int,
25+
http_path: str,
26+
http_headers: List[tuple],
27+
auth_provider: AuthProvider,
28+
ssl_options: SSLOptions,
29+
**kwargs,
30+
):
31+
"""
32+
Initialize the SEA HTTP client.
33+
34+
Args:
35+
server_hostname: Hostname of the Databricks server
36+
port: Port number for the connection
37+
http_path: HTTP path for the connection
38+
http_headers: List of HTTP headers to include in requests
39+
auth_provider: Authentication provider
40+
ssl_options: SSL configuration options
41+
**kwargs: Additional keyword arguments
42+
"""
43+
44+
self.server_hostname = server_hostname
45+
self.port = port
46+
self.http_path = http_path
47+
self.auth_provider = auth_provider
48+
self.ssl_options = ssl_options
49+
50+
self.base_url = f"https://{server_hostname}:{port}"
51+
52+
self.headers = dict(http_headers)
53+
self.headers.update({"Content-Type": "application/json"})
54+
55+
self.max_retries = kwargs.get("_retry_stop_after_attempts_count", 30)
56+
57+
# Create a session for connection pooling
58+
self.session = requests.Session()
59+
60+
# Configure SSL verification
61+
if ssl_options.tls_verify:
62+
self.session.verify = ssl_options.tls_trusted_ca_file or True
63+
else:
64+
self.session.verify = False
65+
66+
# Configure client certificates if provided
67+
if ssl_options.tls_client_cert_file:
68+
client_cert = ssl_options.tls_client_cert_file
69+
client_key = ssl_options.tls_client_cert_key_file
70+
client_key_password = ssl_options.tls_client_cert_key_password
71+
72+
if client_key:
73+
self.session.cert = (client_cert, client_key)
74+
else:
75+
self.session.cert = client_cert
76+
77+
if client_key_password:
78+
# Note: requests doesn't directly support key passwords
79+
# This would require more complex handling with libraries like pyOpenSSL
80+
logger.warning(
81+
"Client key password provided but not supported by requests library"
82+
)
83+
84+
def _get_auth_headers(self) -> Dict[str, str]:
85+
"""Get authentication headers from the auth provider."""
86+
headers: Dict[str, str] = {}
87+
self.auth_provider.add_headers(headers)
88+
return headers
89+
90+
def _make_request(
91+
self, method: str, path: str, data: Optional[Dict[str, Any]] = None
92+
) -> Dict[str, Any]:
93+
"""
94+
Make an HTTP request to the SEA endpoint.
95+
96+
Args:
97+
method: HTTP method (GET, POST, DELETE)
98+
path: API endpoint path
99+
data: Request payload data
100+
101+
Returns:
102+
Dict[str, Any]: Response data parsed from JSON
103+
104+
Raises:
105+
RequestError: If the request fails
106+
"""
107+
108+
url = urljoin(self.base_url, path)
109+
headers = {**self.headers, **self._get_auth_headers()}
110+
111+
logger.debug(f"making {method} request to {url}")
112+
113+
try:
114+
if method.upper() == "GET":
115+
response = self.session.get(url, headers=headers, params=data)
116+
elif method.upper() == "POST":
117+
response = self.session.post(url, headers=headers, json=data)
118+
elif method.upper() == "DELETE":
119+
# For DELETE requests, use params for data (query parameters)
120+
response = self.session.delete(url, headers=headers, params=data)
121+
else:
122+
raise ValueError(f"Unsupported HTTP method: {method}")
123+
124+
# Check for HTTP errors
125+
response.raise_for_status()
126+
127+
# Log response details
128+
logger.debug(f"Response status: {response.status_code}")
129+
130+
# Parse JSON response
131+
if response.content:
132+
result = response.json()
133+
# Log response content (but limit it for large responses)
134+
content_str = json.dumps(result)
135+
if len(content_str) > 1000:
136+
logger.debug(
137+
f"Response content (truncated): {content_str[:1000]}..."
138+
)
139+
else:
140+
logger.debug(f"Response content: {content_str}")
141+
return result
142+
return {}
143+
144+
except requests.exceptions.RequestException as e:
145+
# Handle request errors
146+
error_message = f"SEA HTTP request failed: {str(e)}"
147+
logger.error(error_message)
148+
149+
# Extract error details from response if available
150+
if hasattr(e, "response") and e.response is not None:
151+
try:
152+
error_details = e.response.json()
153+
error_message = (
154+
f"{error_message}: {error_details.get('message', '')}"
155+
)
156+
logger.error(
157+
f"Response status: {e.response.status_code}, Error details: {error_details}"
158+
)
159+
except (ValueError, KeyError):
160+
# If we can't parse the JSON, just log the raw content
161+
content_str = (
162+
e.response.content.decode("utf-8", errors="replace")
163+
if isinstance(e.response.content, bytes)
164+
else str(e.response.content)
165+
)
166+
logger.error(
167+
f"Response status: {e.response.status_code}, Raw content: {content_str}"
168+
)
169+
pass
170+
171+
# Re-raise as a RequestError
172+
from databricks.sql.exc import RequestError
173+
174+
raise RequestError(error_message, e)

0 commit comments

Comments
 (0)