From 7893a44db041137019ff80c6b86061ad2303ebca Mon Sep 17 00:00:00 2001 From: Carlos Gonzalez Date: Fri, 12 Dec 2025 11:13:41 -0500 Subject: [PATCH 1/3] Add health checks and fallback logic for media upload threads - Implement health checks for media upload consumer threads to determine activity status. - Add fallback logic to process media uploads synchronously if threads are unhealthy or queue is not drained within the timeout. - Update media upload consumer to track the last activity timestamp and add error handling in the processing loop. --- langfuse/_client/resource_manager.py | 48 ++++++++++++++++++- .../_task_manager/media_upload_consumer.py | 27 ++++++++++- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index aa9c7ce89..c0a5c85b0 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -408,7 +408,53 @@ def flush(self) -> None: self._score_ingestion_queue.join() langfuse_logger.debug("Successfully flushed score ingestion queue") - self._media_upload_queue.join() + # Check if threads are alive AND healthy (recently active) + healthy_threads = [ + c for c in self._media_upload_consumers + if c.is_alive() and c.is_healthy(timeout_seconds=5.0) + ] + + if healthy_threads: + # Wait for queue to be processed, but with a timeout + langfuse_logger.debug( + f"{len(healthy_threads)} healthy consumer threads active, waiting for queue to drain" + ) + start_time = time.time() + timeout = 30 + + while not self._media_upload_queue.empty() and (time.time() - start_time) < timeout: + time.sleep(0.1) + + if self._media_upload_queue.empty(): + langfuse_logger.debug("Successfully flushed media upload queue via consumer threads") + return + else: + langfuse_logger.warning( + f"Media upload queue not empty after {timeout}s, " + f"{self._media_upload_queue.qsize()} items remaining. " + f"Processing synchronously." + ) + else: + alive_count = len([c for c in self._media_upload_consumers if c.is_alive()]) + langfuse_logger.warning( + f"Consumer threads unhealthy or dead ({alive_count} alive but unhealthy). " + f"Processing {self._media_upload_queue.qsize()} queued items synchronously." + ) + + # Synchronous fallback processing + items_processed = 0 + while not self._media_upload_queue.empty(): + try: + self._media_manager.process_next_media_upload() + items_processed += 1 + except Exception as e: + langfuse_logger.error(f"Error processing media upload synchronously: {e}") + + if items_processed > 0: + langfuse_logger.info( + f"Processed {items_processed} media uploads synchronously in flush()" + ) + langfuse_logger.debug("Successfully flushed media upload queue") def shutdown(self) -> None: diff --git a/langfuse/_task_manager/media_upload_consumer.py b/langfuse/_task_manager/media_upload_consumer.py index 182170864..d583b9640 100644 --- a/langfuse/_task_manager/media_upload_consumer.py +++ b/langfuse/_task_manager/media_upload_consumer.py @@ -1,5 +1,6 @@ import logging import threading +import time from .media_manager import MediaManager @@ -25,6 +26,8 @@ def __init__( # run() *after* we set it to False in pause... and keep running # forever. self.running = True + # Track when thread last processed something + self.last_activity = time.time() self._identifier = identifier self._media_manager = media_manager @@ -34,7 +37,18 @@ def run(self) -> None: f"Thread: Media upload consumer thread #{self._identifier} started and actively processing queue items" ) while self.running: - self._media_manager.process_next_media_upload() + try: + # Update activity timestamp before processing + self.last_activity = time.time() + self._media_manager.process_next_media_upload() + # Update after successful processing + self.last_activity = time.time() + except Exception as e: + self._log.error( + f"Thread #{self._identifier}: Unexpected error in consumer loop: {e}" + ) + # Continue running despite errors + time.sleep(0.1) def pause(self) -> None: """Pause the media upload consumer.""" @@ -42,3 +56,14 @@ def pause(self) -> None: f"Thread: Pausing media upload consumer thread #{self._identifier}" ) self.running = False + + def is_healthy(self, timeout_seconds: float = 5.0) -> bool: + """ + Check if thread is healthy and recently active. + Returns False if thread hasn't processed anything in timeout_seconds. + """ + if not self.is_alive(): + return False + + time_since_activity = time.time() - self.last_activity + return time_since_activity < timeout_seconds From 24638f7da20098b71ba2dbdef372f51dafdf0022 Mon Sep 17 00:00:00 2001 From: Carlos Gonzalez Date: Fri, 12 Dec 2025 11:53:10 -0500 Subject: [PATCH 2/3] Refine logging for consumer thread health status during fallback processing --- langfuse/_client/resource_manager.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index c0a5c85b0..661d50215 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -436,11 +436,18 @@ def flush(self) -> None: ) else: alive_count = len([c for c in self._media_upload_consumers if c.is_alive()]) - langfuse_logger.warning( - f"Consumer threads unhealthy or dead ({alive_count} alive but unhealthy). " - f"Processing {self._media_upload_queue.qsize()} queued items synchronously." - ) + total_count = len(self._media_upload_consumers) + if alive_count == 0: + langfuse_logger.warning( + f"All {total_count} queue consumer threads are dead." + f"Processing {self._media_upload_queue.qsize()} queued items synchronously." + ) + else: + langfuse_logger.warning( + f"Queue consumer threads are alive but unhealthy ({alive_count}/{total_count} alive but stalled). " + f"Processing {self._media_upload_queue.qsize()} queued items synchronously." + ) # Synchronous fallback processing items_processed = 0 while not self._media_upload_queue.empty(): From 9a3506c519a98b062910c0548e2e70b9203b5a5e Mon Sep 17 00:00:00 2001 From: Carlos Gonzalez Date: Fri, 12 Dec 2025 15:59:09 -0500 Subject: [PATCH 3/3] Add missing import of time module and make health check timeouts configurable for media consumer threads --- langfuse/_client/resource_manager.py | 10 ++++++++-- langfuse/_task_manager/media_upload_consumer.py | 6 ++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 661d50215..8f004086e 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -17,6 +17,7 @@ import atexit import os import threading +import time from queue import Full, Queue from typing import Any, Dict, List, Optional, cast @@ -408,10 +409,15 @@ def flush(self) -> None: self._score_ingestion_queue.join() langfuse_logger.debug("Successfully flushed score ingestion queue") + # Configurable health check timeout for media consumer threads (seconds) + MEDIA_CONSUMER_HEALTH_TIMEOUT_SECONDS = 5.0 + # Configurable maximum time to wait for the queue to drain (seconds) + MEDIA_FLUSH_DRAIN_TIMEOUT_SECONDS = 30.0 + # Check if threads are alive AND healthy (recently active) healthy_threads = [ c for c in self._media_upload_consumers - if c.is_alive() and c.is_healthy(timeout_seconds=5.0) + if c.is_alive() and c.is_healthy(timeout_seconds=MEDIA_CONSUMER_HEALTH_TIMEOUT_SECONDS) ] if healthy_threads: @@ -420,7 +426,7 @@ def flush(self) -> None: f"{len(healthy_threads)} healthy consumer threads active, waiting for queue to drain" ) start_time = time.time() - timeout = 30 + timeout = MEDIA_FLUSH_DRAIN_TIMEOUT_SECONDS while not self._media_upload_queue.empty() and (time.time() - start_time) < timeout: time.sleep(0.1) diff --git a/langfuse/_task_manager/media_upload_consumer.py b/langfuse/_task_manager/media_upload_consumer.py index d583b9640..d0f211906 100644 --- a/langfuse/_task_manager/media_upload_consumer.py +++ b/langfuse/_task_manager/media_upload_consumer.py @@ -10,6 +10,8 @@ class MediaUploadConsumer(threading.Thread): _identifier: int _max_retries: int _media_manager: MediaManager + # Default health check threshold in seconds + DEFAULT_HEALTH_TIMEOUT_SECONDS: float = 5.0 def __init__( self, @@ -44,7 +46,7 @@ def run(self) -> None: # Update after successful processing self.last_activity = time.time() except Exception as e: - self._log.error( + self._log.exception( f"Thread #{self._identifier}: Unexpected error in consumer loop: {e}" ) # Continue running despite errors @@ -57,7 +59,7 @@ def pause(self) -> None: ) self.running = False - def is_healthy(self, timeout_seconds: float = 5.0) -> bool: + def is_healthy(self, timeout_seconds: float = DEFAULT_HEALTH_TIMEOUT_SECONDS) -> bool: """ Check if thread is healthy and recently active. Returns False if thread hasn't processed anything in timeout_seconds.