diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index aa9c7ce89..8f004086e 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -17,6 +17,7 @@ import atexit import os import threading +import time from queue import Full, Queue from typing import Any, Dict, List, Optional, cast @@ -408,7 +409,65 @@ def flush(self) -> None: self._score_ingestion_queue.join() langfuse_logger.debug("Successfully flushed score ingestion queue") - self._media_upload_queue.join() + # Configurable health check timeout for media consumer threads (seconds) + MEDIA_CONSUMER_HEALTH_TIMEOUT_SECONDS = 5.0 + # Configurable maximum time to wait for the queue to drain (seconds) + MEDIA_FLUSH_DRAIN_TIMEOUT_SECONDS = 30.0 + + # Check if threads are alive AND healthy (recently active) + healthy_threads = [ + c for c in self._media_upload_consumers + if c.is_alive() and c.is_healthy(timeout_seconds=MEDIA_CONSUMER_HEALTH_TIMEOUT_SECONDS) + ] + + if healthy_threads: + # Wait for queue to be processed, but with a timeout + langfuse_logger.debug( + f"{len(healthy_threads)} healthy consumer threads active, waiting for queue to drain" + ) + start_time = time.time() + timeout = MEDIA_FLUSH_DRAIN_TIMEOUT_SECONDS + + while not self._media_upload_queue.empty() and (time.time() - start_time) < timeout: + time.sleep(0.1) + + if self._media_upload_queue.empty(): + langfuse_logger.debug("Successfully flushed media upload queue via consumer threads") + return + else: + langfuse_logger.warning( + f"Media upload queue not empty after {timeout}s, " + f"{self._media_upload_queue.qsize()} items remaining. " + f"Processing synchronously." + ) + else: + alive_count = len([c for c in self._media_upload_consumers if c.is_alive()]) + total_count = len(self._media_upload_consumers) + + if alive_count == 0: + langfuse_logger.warning( + f"All {total_count} queue consumer threads are dead." + f"Processing {self._media_upload_queue.qsize()} queued items synchronously." + ) + else: + langfuse_logger.warning( + f"Queue consumer threads are alive but unhealthy ({alive_count}/{total_count} alive but stalled). " + f"Processing {self._media_upload_queue.qsize()} queued items synchronously." + ) + # Synchronous fallback processing + items_processed = 0 + while not self._media_upload_queue.empty(): + try: + self._media_manager.process_next_media_upload() + items_processed += 1 + except Exception as e: + langfuse_logger.error(f"Error processing media upload synchronously: {e}") + + if items_processed > 0: + langfuse_logger.info( + f"Processed {items_processed} media uploads synchronously in flush()" + ) + langfuse_logger.debug("Successfully flushed media upload queue") def shutdown(self) -> None: diff --git a/langfuse/_task_manager/media_upload_consumer.py b/langfuse/_task_manager/media_upload_consumer.py index 182170864..d0f211906 100644 --- a/langfuse/_task_manager/media_upload_consumer.py +++ b/langfuse/_task_manager/media_upload_consumer.py @@ -1,5 +1,6 @@ import logging import threading +import time from .media_manager import MediaManager @@ -9,6 +10,8 @@ class MediaUploadConsumer(threading.Thread): _identifier: int _max_retries: int _media_manager: MediaManager + # Default health check threshold in seconds + DEFAULT_HEALTH_TIMEOUT_SECONDS: float = 5.0 def __init__( self, @@ -25,6 +28,8 @@ def __init__( # run() *after* we set it to False in pause... and keep running # forever. self.running = True + # Track when thread last processed something + self.last_activity = time.time() self._identifier = identifier self._media_manager = media_manager @@ -34,7 +39,18 @@ def run(self) -> None: f"Thread: Media upload consumer thread #{self._identifier} started and actively processing queue items" ) while self.running: - self._media_manager.process_next_media_upload() + try: + # Update activity timestamp before processing + self.last_activity = time.time() + self._media_manager.process_next_media_upload() + # Update after successful processing + self.last_activity = time.time() + except Exception as e: + self._log.exception( + f"Thread #{self._identifier}: Unexpected error in consumer loop: {e}" + ) + # Continue running despite errors + time.sleep(0.1) def pause(self) -> None: """Pause the media upload consumer.""" @@ -42,3 +58,14 @@ def pause(self) -> None: f"Thread: Pausing media upload consumer thread #{self._identifier}" ) self.running = False + + def is_healthy(self, timeout_seconds: float = DEFAULT_HEALTH_TIMEOUT_SECONDS) -> bool: + """ + Check if thread is healthy and recently active. + Returns False if thread hasn't processed anything in timeout_seconds. + """ + if not self.is_alive(): + return False + + time_since_activity = time.time() - self.last_activity + return time_since_activity < timeout_seconds