Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
# the repo. Unless a later match takes precedence, these
# users will be requested for review when someone opens a
# pull request.
* @deeksha-db @samikshya-db @jprakash-db @yunbodeng-db @jackyhu-db @benc-db
* @deeksha-db @samikshya-db @jprakash-db @jackyhu-db @madhav-db @gopalldb @jayantsing-db @vikrantpuppala @shivam2680
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release History

# 4.0.3 (2025-04-22)

- Fix: Removed `packaging` dependency in favour of default libraries, for `urllib3` version checks (databricks/databricks-sql-python#547 by @jprakash-db)
-
# 4.0.2 (2025-04-01)

- Fix: Relaxed the pin for `python-dateutil` to be `^2.8.0` (databricks/databricks-sql-python#538 by @jprakash-db)
Expand Down
2 changes: 1 addition & 1 deletion examples/query_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
result = cursor.fetchall()

for row in result:
print(row)
print(row)
20 changes: 20 additions & 0 deletions examples/test_telemetry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import os
import databricks.sql as sql

# Create connection with telemetry enabled
conn = sql.connect(
server_hostname=os.environ["DATABRICKS_SERVER_HOSTNAME"],
http_path=os.environ["DATABRICKS_HTTP_PATH"],
access_token=os.environ["DATABRICKS_TOKEN"],
enable_telemetry=True, # Enable telemetry
telemetry_batch_size=1 # Set batch size to 1
)

# Execute a simple query to generate telemetry
cursor = conn.cursor()
cursor.execute("SELECT * FROM main.eng_lumberjack.staging_frontend_log_sql_driver_log limit 1")
cursor.fetchall()

# Close the connection
cursor.close()
conn.close()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "databricks-sql-connector"
version = "4.0.2"
version = "4.0.3"
description = "Databricks SQL Connector for Python"
authors = ["Databricks <databricks-sql-connector-maintainers@databricks.com>"]
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion src/databricks/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def __repr__(self):
DATE = DBAPITypeObject("date")
ROWID = DBAPITypeObject()

__version__ = "4.0.2"
__version__ = "4.0.3"
USER_AGENT_NAME = "PyDatabricksSqlConnector"

# These two functions are pyhive legacy
Expand Down
68 changes: 63 additions & 5 deletions src/databricks/sql/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import time
from typing import Dict, Tuple, List, Optional, Any, Union, Sequence

import pandas

try:
Expand Down Expand Up @@ -214,6 +213,12 @@ def read(self) -> Optional[OAuthToken]:
# use_cloud_fetch
# Enable use of cloud fetch to extract large query results in parallel via cloud storage

logger.debug(
"Connection.__init__(server_hostname=%s, http_path=%s)",
server_hostname,
http_path,
)

if access_token:
access_token_kv = {"access_token": access_token}
kwargs = {**kwargs, **access_token_kv}
Expand All @@ -228,6 +233,13 @@ def read(self) -> Optional[OAuthToken]:
server_hostname, **kwargs
)

self.server_telemetry_enabled = True
self.client_telemetry_enabled = kwargs.get("enable_telemetry", False)
self.telemetry_enabled = (
self.client_telemetry_enabled and self.server_telemetry_enabled
)
telemetry_batch_size = kwargs.get("telemetry_batch_size", 200)

user_agent_entry = kwargs.get("user_agent_entry")
if user_agent_entry is None:
user_agent_entry = kwargs.get("_user_agent_entry")
Expand Down Expand Up @@ -315,7 +327,13 @@ def __enter__(self) -> "Connection":
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()
try:
self.close()
except BaseException as e:
logger.warning(f"Exception during connection close in __exit__: {e}")
if exc_type is None:
raise
return False

def __del__(self):
if self.open:
Expand Down Expand Up @@ -413,6 +431,9 @@ def _close(self, close_cursors=True) -> None:

self.open = False

if hasattr(self, "telemetry_client"):
self.telemetry_client.close()

def commit(self):
"""No-op because Databricks does not support transactions"""
pass
Expand Down Expand Up @@ -456,7 +477,14 @@ def __enter__(self) -> "Cursor":
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()
try:
logger.debug("Cursor context manager exiting, calling close()")
self.close()
except BaseException as e:
logger.warning(f"Exception during cursor close in __exit__: {e}")
if exc_type is None:
raise
return False

def __iter__(self):
if self.active_result_set:
Expand Down Expand Up @@ -787,6 +815,9 @@ def execute(

:returns self
"""
logger.debug(
"Cursor.execute(operation=%s, parameters=%s)", operation, parameters
)

param_approach = self._determine_parameter_approach(parameters)
if param_approach == ParameterApproach.NONE:
Expand Down Expand Up @@ -1163,7 +1194,21 @@ def cancel(self) -> None:
def close(self) -> None:
"""Close cursor"""
self.open = False
self.active_op_handle = None

# Close active operation handle if it exists
if self.active_op_handle:
try:
self.thrift_backend.close_command(self.active_op_handle)
except RequestError as e:
if isinstance(e.args[1], CursorAlreadyClosedError):
logger.info("Operation was canceled by a prior request")
else:
logging.warning(f"Error closing operation handle: {e}")
except Exception as e:
logging.warning(f"Error closing operation handle: {e}")
finally:
self.active_op_handle = None

if self.active_result_set:
self._close_and_clear_active_result_set()

Expand Down Expand Up @@ -1415,9 +1460,22 @@ def fetchall_arrow(self) -> "pyarrow.Table":
while not self.has_been_closed_server_side and self.has_more_rows:
self._fill_results_buffer()
partial_results = self.results.remaining_rows()
results = pyarrow.concat_tables([results, partial_results])
if isinstance(results, ColumnTable) and isinstance(
partial_results, ColumnTable
):
results = self.merge_columnar(results, partial_results)
else:
results = pyarrow.concat_tables([results, partial_results])
self._next_row_index += partial_results.num_rows

# If PyArrow is installed and we have a ColumnTable result, convert it to PyArrow Table
# Valid only for metadata commands result set
if isinstance(results, ColumnTable) and pyarrow:
data = {
name: col
for name, col in zip(results.column_names, results.column_table)
}
return pyarrow.Table.from_pydict(data)
return results

def fetchall_columnar(self):
Expand Down
41 changes: 41 additions & 0 deletions src/databricks/sql/telemetry/DriverConnectionParameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.HostDetails import HostDetails
from databricks.sql.telemetry.enums.AuthMech import AuthMech
from databricks.sql.telemetry.enums.AuthFlow import AuthFlow
from databricks.sql.telemetry.enums.DatabricksClientType import DatabricksClientType


@dataclass
class DriverConnectionParameters:
http_path: str
driver_mode: DatabricksClientType
host_details: HostDetails
auth_mech: AuthMech
auth_flow: AuthFlow
auth_scope: str
discovery_url: str
allowed_volume_ingestion_paths: str
azure_tenant_id: str
socket_timeout: int

def to_json(self):
return json.dumps(asdict(self))


# Part of TelemetryEvent
# DriverConnectionParameters connectionParams = new DriverConnectionParameters(
# httpPath = " /sql/1.0/endpoints/1234567890abcdef",
# driverMode = "THRIFT",
# hostDetails = new HostDetails(
# hostUrl = "https://my-workspace.cloud.databricks.com",
# port = 443
# ),
# authMech = "OAUTH",
# authFlow = "AZURE_MANAGED_IDENTITIES",
# authScope = "sql",
# discoveryUrl = "https://example-url",
# allowedVolumeIngestionPaths = "[]",
# azureTenantId = "1234567890abcdef",
# socketTimeout = 10000
# )
23 changes: 23 additions & 0 deletions src/databricks/sql/telemetry/DriverErrorInfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import json
from dataclasses import dataclass, asdict


@dataclass
class DriverErrorInfo:
error_name: str
stack_trace: str

def to_json(self):
return json.dumps(asdict(self))


# Required for ErrorLogs
# DriverErrorInfo errorInfo = new DriverErrorInfo(
# errorName="CONNECTION_ERROR",
# stackTrace="Connection failure while using the Databricks SQL Python connector. Failed to connect to server: https://my-workspace.cloud.databricks.com\n" +
# "databricks.sql.exc.OperationalError: Connection refused: connect\n" +
# "at databricks.sql.thrift_backend.ThriftBackend.make_request(ThriftBackend.py:329)\n" +
# "at databricks.sql.thrift_backend.ThriftBackend.attempt_request(ThriftBackend.py:366)\n" +
# "at databricks.sql.thrift_backend.ThriftBackend.open_session(ThriftBackend.py:575)\n" +
# "at databricks.sql.client.Connection.__init__(client.py:69)\n" +
# "at databricks.sql.client.connect(connection.py:123)")
37 changes: 37 additions & 0 deletions src/databricks/sql/telemetry/DriverSystemConfiguration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql import __version__


@dataclass
class DriverSystemConfiguration:
driver_version: str
os_name: str
os_version: str
os_arch: str
runtime_name: str
runtime_version: str
runtime_vendor: str
client_app_name: str
locale_name: str
driver_name: str
char_set_encoding: str

def to_json(self):
return json.dumps(asdict(self))


# Part of TelemetryEvent
# DriverSystemConfiguration systemConfig = new DriverSystemConfiguration(
# driver_version = "2.9.3",
# os_name = "Darwin",
# os_version = "24.4.0",
# os_arch = "arm64",
# runtime_name = "CPython",
# runtime_version = "3.13.3",
# runtime_vendor = "cpython",
# client_app_name = "databricks-sql-python",
# locale_name = "en_US",
# driver_name = "databricks-sql-python",
# char_set_encoding = "UTF-8"
# )
21 changes: 21 additions & 0 deletions src/databricks/sql/telemetry/DriverVolumeOperation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.enums.DriverVolumeOperationType import (
DriverVolumeOperationType,
)


@dataclass
class DriverVolumeOperation:
volume_operation_type: DriverVolumeOperationType
volume_path: str

def to_json(self):
return json.dumps(asdict(self))


# Part of TelemetryEvent
# DriverVolumeOperation volumeOperation = new DriverVolumeOperation(
# volumeOperationType = "LIST",
# volumePath = "/path/to/volume"
# )
20 changes: 20 additions & 0 deletions src/databricks/sql/telemetry/FrontendLogContext.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.TelemetryClientContext import TelemetryClientContext


@dataclass
class FrontendLogContext:
client_context: TelemetryClientContext

def to_json(self):
return json.dumps(asdict(self))


# used in TelemetryFrontendLog
# FrontendLogContext frontendLogContext = new FrontendLogContext(
# clientContext = new TelemetryClientContext(
# timestampMillis = 1716489600000,
# userAgent = "databricks-sql-python-test"
# )
# )
11 changes: 11 additions & 0 deletions src/databricks/sql/telemetry/FrontendLogEntry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.TelemetryEvent import TelemetryEvent


@dataclass
class FrontendLogEntry:
sql_driver_log: TelemetryEvent

def to_json(self):
return json.dumps(asdict(self))
18 changes: 18 additions & 0 deletions src/databricks/sql/telemetry/HostDetails.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import json
from dataclasses import dataclass, asdict


@dataclass
class HostDetails:
host_url: str
port: int

def to_json(self):
return json.dumps(asdict(self))


# Part of DriverConnectionParameters
# HostDetails hostDetails = new HostDetails(
# hostUrl = "https://my-workspace.cloud.databricks.com",
# port = 443
# )
24 changes: 24 additions & 0 deletions src/databricks/sql/telemetry/SqlExecutionEvent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import json
from dataclasses import dataclass, asdict
from databricks.sql.telemetry.enums.StatementType import StatementType
from databricks.sql.telemetry.enums.ExecutionResultFormat import ExecutionResultFormat


@dataclass
class SqlExecutionEvent:
statement_type: StatementType
is_compressed: bool
execution_result: ExecutionResultFormat
retry_count: int

def to_json(self):
return json.dumps(asdict(self))


# Part of TelemetryEvent
# SqlExecutionEvent sqlExecutionEvent = new SqlExecutionEvent(
# statementType = "QUERY",
# isCompressed = true,
# executionResult = "INLINE_ARROW",
# retryCount = 0
# )
Loading