Skip to content

Commit 9ac4bc7

Browse files
committed
changed flush timer to background thread approach
Signed-off-by: Sai Shree Pradhan <saishree.pradhan@databricks.com>
1 parent ef14caa commit 9ac4bc7

File tree

1 file changed

+27
-27
lines changed

1 file changed

+27
-27
lines changed

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -176,41 +176,41 @@ def __init__(
176176
self._driver_connection_params = None
177177
self._host_url = host_url
178178
self._executor = executor
179-
self._flush_timer = None
180179

181-
# Start the periodic flush timer
182-
self._start_flush_timer()
180+
# Background thread for periodic flushing
181+
self._flush_stop_event = threading.Event()
182+
self._flush_thread = None
183183

184-
def _start_flush_timer(self):
185-
"""Start the periodic flush timer"""
184+
# Start the periodic flush thread
185+
self._start_flush_thread()
186186

187-
self._flush_timer = threading.Timer(
188-
self._flush_interval_seconds, self._periodic_flush
189-
)
190-
self._flush_timer.daemon = True # Don't prevent program exit
191-
self._flush_timer.start()
187+
def _start_flush_thread(self):
188+
"""Start the background thread for periodic flushing"""
189+
self._flush_thread = threading.Thread(target=self._flush_worker, daemon=True)
190+
self._flush_thread.start()
192191
logger.debug(
193-
"Started flush timer for connection %s (interval: %d seconds)",
192+
"Started flush thread for connection %s (interval: %d seconds)",
194193
self._session_id_hex,
195194
self._flush_interval_seconds,
196195
)
197196

198-
def _periodic_flush(self):
199-
"""Periodic flush callback - flushes events and reschedules the timer"""
200-
201-
logger.debug(
202-
"Performing periodic flush for connection %s", self._session_id_hex
203-
)
204-
self._flush()
205-
# Reschedule the next flush
206-
self._start_flush_timer()
197+
def _flush_worker(self):
198+
"""Background worker thread for periodic flushing"""
199+
while not self._flush_stop_event.wait(self._flush_interval_seconds):
200+
logger.debug(
201+
"Performing periodic flush for connection %s", self._session_id_hex
202+
)
203+
self._flush()
207204

208-
def _stop_flush_timer(self):
209-
"""Stop the periodic flush timer"""
210-
if self._flush_timer is not None:
211-
self._flush_timer.cancel()
212-
self._flush_timer = None
213-
logger.debug("Stopped flush timer for connection %s", self._session_id_hex)
205+
def _stop_flush_thread(self):
206+
"""Stop the background flush thread"""
207+
if self._flush_thread is not None:
208+
self._flush_stop_event.set()
209+
self._flush_thread.join(
210+
timeout=1.0
211+
) # Wait up to 1 second for graceful shutdown
212+
self._flush_thread = None
213+
logger.debug("Stopped flush thread for connection %s", self._session_id_hex)
214214

215215
def _export_event(self, event):
216216
"""Add an event to the batch queue and flush if batch is full"""
@@ -346,7 +346,7 @@ def export_failure_log(self, error_name, error_message):
346346
def close(self):
347347
"""Flush remaining events and stop timer before closing"""
348348
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
349-
self._stop_flush_timer()
349+
self._stop_flush_thread()
350350
self._flush()
351351

352352

0 commit comments

Comments
 (0)