11import time
22import functools
3- from typing import Optional
3+ from typing import Optional , Dict , Any
44import logging
55from databricks .sql .telemetry .telemetry_client import TelemetryClientFactory
66from databricks .sql .telemetry .models .event import (
1111logger = logging .getLogger (__name__ )
1212
1313
14- class TelemetryExtractor :
14+ def _extract_cursor_data ( cursor ) -> Dict [ str , Any ] :
1515 """
16- Base class for extracting telemetry information from various object types .
16+ Extract telemetry data directly from a Cursor object .
1717
18- This class serves as a proxy that delegates attribute access to the wrapped object
19- while providing a common interface for extracting telemetry-related data.
20- """
21-
22- def __init__ (self , obj ):
23- self ._obj = obj
24-
25- def __getattr__ (self , name ):
26- return getattr (self ._obj , name )
27-
28- def get_session_id_hex (self ):
29- pass
30-
31- def get_statement_id (self ):
32- pass
33-
34- def get_is_compressed (self ):
35- pass
36-
37- def get_execution_result_format (self ):
38- pass
39-
40- def get_retry_count (self ):
41- pass
42-
43- def get_chunk_id (self ):
44- pass
18+ OPTIMIZATION: Uses direct attribute access instead of wrapper objects.
19+ This eliminates object creation overhead and method call indirection.
4520
21+ Args:
22+ cursor: The Cursor object to extract data from
4623
47- class CursorExtractor (TelemetryExtractor ):
24+ Returns:
25+ Dict with telemetry data (values may be None if extraction fails)
4826 """
49- Telemetry extractor specialized for Cursor objects.
50-
51- Extracts telemetry information from database cursor objects, including
52- statement IDs, session information, compression settings, and result formats.
27+ data = {}
28+
29+ # Extract statement_id (query_id) - direct attribute access
30+ try :
31+ data ['statement_id' ] = cursor .query_id
32+ except (AttributeError , Exception ):
33+ data ['statement_id' ] = None
34+
35+ # Extract session_id_hex - direct method call
36+ try :
37+ data ['session_id_hex' ] = cursor .connection .get_session_id_hex ()
38+ except (AttributeError , Exception ):
39+ data ['session_id_hex' ] = None
40+
41+ # Extract is_compressed - direct attribute access
42+ try :
43+ data ['is_compressed' ] = cursor .connection .lz4_compression
44+ except (AttributeError , Exception ):
45+ data ['is_compressed' ] = False
46+
47+ # Extract execution_result_format - inline logic
48+ try :
49+ if cursor .active_result_set is None :
50+ data ['execution_result' ] = ExecutionResultFormat .FORMAT_UNSPECIFIED
51+ else :
52+ from databricks .sql .utils import ColumnQueue , CloudFetchQueue , ArrowQueue
53+
54+ results = cursor .active_result_set .results
55+ if isinstance (results , ColumnQueue ):
56+ data ['execution_result' ] = ExecutionResultFormat .COLUMNAR_INLINE
57+ elif isinstance (results , CloudFetchQueue ):
58+ data ['execution_result' ] = ExecutionResultFormat .EXTERNAL_LINKS
59+ elif isinstance (results , ArrowQueue ):
60+ data ['execution_result' ] = ExecutionResultFormat .INLINE_ARROW
61+ else :
62+ data ['execution_result' ] = ExecutionResultFormat .FORMAT_UNSPECIFIED
63+ except (AttributeError , Exception ):
64+ data ['execution_result' ] = ExecutionResultFormat .FORMAT_UNSPECIFIED
65+
66+ # Extract retry_count - direct attribute access
67+ try :
68+ if hasattr (cursor .backend , "retry_policy" ) and cursor .backend .retry_policy :
69+ data ['retry_count' ] = len (cursor .backend .retry_policy .history )
70+ else :
71+ data ['retry_count' ] = 0
72+ except (AttributeError , Exception ):
73+ data ['retry_count' ] = 0
74+
75+ # chunk_id is always None for Cursor
76+ data ['chunk_id' ] = None
77+
78+ return data
79+
80+
81+ def _extract_result_set_handler_data (handler ) -> Dict [str , Any ]:
5382 """
83+ Extract telemetry data directly from a ResultSetDownloadHandler object.
5484
55- def get_statement_id (self ) -> Optional [str ]:
56- return self .query_id
57-
58- def get_session_id_hex (self ) -> Optional [str ]:
59- return self .connection .get_session_id_hex ()
60-
61- def get_is_compressed (self ) -> bool :
62- return self .connection .lz4_compression
63-
64- def get_execution_result_format (self ) -> ExecutionResultFormat :
65- if self .active_result_set is None :
66- return ExecutionResultFormat .FORMAT_UNSPECIFIED
67-
68- from databricks .sql .utils import ColumnQueue , CloudFetchQueue , ArrowQueue
69-
70- if isinstance (self .active_result_set .results , ColumnQueue ):
71- return ExecutionResultFormat .COLUMNAR_INLINE
72- elif isinstance (self .active_result_set .results , CloudFetchQueue ):
73- return ExecutionResultFormat .EXTERNAL_LINKS
74- elif isinstance (self .active_result_set .results , ArrowQueue ):
75- return ExecutionResultFormat .INLINE_ARROW
76- return ExecutionResultFormat .FORMAT_UNSPECIFIED
77-
78- def get_retry_count (self ) -> int :
79- if hasattr (self .backend , "retry_policy" ) and self .backend .retry_policy :
80- return len (self .backend .retry_policy .history )
81- return 0
82-
83- def get_chunk_id (self ):
84- return None
85+ OPTIMIZATION: Uses direct attribute access instead of wrapper objects.
8586
87+ Args:
88+ handler: The ResultSetDownloadHandler object to extract data from
8689
87- class ResultSetDownloadHandlerExtractor (TelemetryExtractor ):
88- """
89- Telemetry extractor specialized for ResultSetDownloadHandler objects.
90+ Returns:
91+ Dict with telemetry data (values may be None if extraction fails)
9092 """
93+ data = {}
9194
92- def get_session_id_hex (self ) -> Optional [str ]:
93- return self ._obj .session_id_hex
95+ # Extract session_id_hex - direct attribute access
96+ try :
97+ data ['session_id_hex' ] = handler .session_id_hex
98+ except (AttributeError , Exception ):
99+ data ['session_id_hex' ] = None
94100
95- def get_statement_id (self ) -> Optional [str ]:
96- return self ._obj .statement_id
101+ # Extract statement_id - direct attribute access
102+ try :
103+ data ['statement_id' ] = handler .statement_id
104+ except (AttributeError , Exception ):
105+ data ['statement_id' ] = None
97106
98- def get_is_compressed (self ) -> bool :
99- return self ._obj .settings .is_lz4_compressed
107+ # Extract is_compressed - direct attribute access
108+ try :
109+ data ['is_compressed' ] = handler .settings .is_lz4_compressed
110+ except (AttributeError , Exception ):
111+ data ['is_compressed' ] = False
100112
101- def get_execution_result_format ( self ) -> ExecutionResultFormat :
102- return ExecutionResultFormat .EXTERNAL_LINKS
113+ # execution_result is always EXTERNAL_LINKS for result set handlers
114+ data [ 'execution_result' ] = ExecutionResultFormat .EXTERNAL_LINKS
103115
104- def get_retry_count (self ) -> Optional [int ]:
105- # standard requests and urllib3 libraries don't expose retry count
106- return None
116+ # retry_count is not available for result set handlers
117+ data ['retry_count' ] = None
118+
119+ # Extract chunk_id - direct attribute access
120+ try :
121+ data ['chunk_id' ] = handler .chunk_id
122+ except (AttributeError , Exception ):
123+ data ['chunk_id' ] = None
107124
108- def get_chunk_id (self ) -> Optional [int ]:
109- return self ._obj .chunk_id
125+ return data
110126
111127
112- def get_extractor (obj ):
128+ def _extract_telemetry_data (obj ) -> Optional [ Dict [ str , Any ]] :
113129 """
114- Factory function to create the appropriate telemetry extractor for an object .
130+ Extract telemetry data from an object based on its type .
115131
116- Determines the object type and returns the corresponding specialized extractor
117- that can extract telemetry information from that object type .
132+ OPTIMIZATION: Returns a simple dict instead of creating wrapper objects.
133+ This dict will be used to create the SqlExecutionEvent in the background thread .
118134
119135 Args:
120- obj: The object to create an extractor for. Can be a Cursor,
121- ResultSetDownloadHandler, or any other object.
136+ obj: The object to extract data from (Cursor, ResultSetDownloadHandler, etc.)
122137
123138 Returns:
124- TelemetryExtractor: A specialized extractor instance:
125- - CursorExtractor for Cursor objects
126- - ResultSetDownloadHandlerExtractor for ResultSetDownloadHandler objects
127- - None for all other objects
139+ Dict with telemetry data, or None if object type is not supported
128140 """
129- if obj .__class__ .__name__ == "Cursor" :
130- return CursorExtractor (obj )
131- elif obj .__class__ .__name__ == "ResultSetDownloadHandler" :
132- return ResultSetDownloadHandlerExtractor (obj )
141+ obj_type = obj .__class__ .__name__
142+
143+ if obj_type == "Cursor" :
144+ return _extract_cursor_data (obj )
145+ elif obj_type == "ResultSetDownloadHandler" :
146+ return _extract_result_set_handler_data (obj )
133147 else :
134- logger .debug ("No extractor found for %s" , obj . __class__ . __name__ )
148+ logger .debug ("No telemetry extraction available for %s" , obj_type )
135149 return None
136150
137151
@@ -143,11 +157,10 @@ def log_latency(statement_type: StatementType = StatementType.NONE):
143157 data about the operation, including latency, statement information, and
144158 execution context.
145159
146- The decorator automatically:
147- - Measures execution time using high-precision performance counters
148- - Extracts telemetry information from the method's object (self)
149- - Creates a SqlExecutionEvent with execution details
150- - Sends the telemetry data asynchronously via TelemetryClient
160+ OPTIMIZATIONS APPLIED:
161+ - Uses time.monotonic() instead of time.perf_counter() for faster timing
162+ - Direct attribute access instead of wrapper extractor objects
163+ - Dict-based data collection to minimize object creation overhead
151164
152165 Args:
153166 statement_type (StatementType): The type of SQL statement being executed.
@@ -162,46 +175,41 @@ def execute(self, query):
162175 function: A decorator that wraps methods to add latency logging.
163176
164177 Note:
165- The wrapped method's object (self) must be compatible with the
166- telemetry extractor system (e.g., Cursor or ResultSet objects) .
178+ The wrapped method's object (self) must be a Cursor or
179+ ResultSetDownloadHandler for telemetry data extraction .
167180 """
168181
169182 def decorator (func ):
170183 @functools .wraps (func )
171184 def wrapper (self , * args , ** kwargs ):
172- start_time = time .perf_counter ()
185+ # Use monotonic clock for faster timing, sufficient for telemetry
186+ start_time = time .monotonic ()
173187 result = None
174188 try :
175189 result = func (self , * args , ** kwargs )
176190 return result
177191 finally :
178-
179- def _safe_call (func_to_call ):
180- """Calls a function and returns a default value on any exception."""
181- try :
182- return func_to_call ()
183- except Exception :
184- return None
185-
186- end_time = time .perf_counter ()
192+ # Calculate duration once
193+ end_time = time .monotonic ()
187194 duration_ms = int ((end_time - start_time ) * 1000 )
188195
189- extractor = get_extractor (self )
196+ # Extract telemetry data directly without creating extractor objects
197+ telemetry_data = _extract_telemetry_data (self )
190198
191- if extractor is not None :
192- session_id_hex = _safe_call ( extractor . get_session_id_hex )
193- statement_id = _safe_call ( extractor . get_statement_id )
199+ if telemetry_data is not None :
200+ session_id_hex = telemetry_data . get ( 'session_id_hex' )
201+ statement_id = telemetry_data . get ( 'statement_id' )
194202
203+ # Create event from extracted data
195204 sql_exec_event = SqlExecutionEvent (
196205 statement_type = statement_type ,
197- is_compressed = _safe_call (extractor .get_is_compressed ),
198- execution_result = _safe_call (
199- extractor .get_execution_result_format
200- ),
201- retry_count = _safe_call (extractor .get_retry_count ),
202- chunk_id = _safe_call (extractor .get_chunk_id ),
206+ is_compressed = telemetry_data .get ('is_compressed' ),
207+ execution_result = telemetry_data .get ('execution_result' ),
208+ retry_count = telemetry_data .get ('retry_count' ),
209+ chunk_id = telemetry_data .get ('chunk_id' ),
203210 )
204211
212+ # Send telemetry asynchronously
205213 telemetry_client = TelemetryClientFactory .get_telemetry_client (
206214 session_id_hex
207215 )
0 commit comments