Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import atexit
import os
import threading
import time
from queue import Full, Queue
from typing import Any, Dict, List, Optional, cast

Expand Down Expand Up @@ -408,7 +409,65 @@ def flush(self) -> None:
self._score_ingestion_queue.join()
langfuse_logger.debug("Successfully flushed score ingestion queue")

self._media_upload_queue.join()
# Configurable health check timeout for media consumer threads (seconds)
MEDIA_CONSUMER_HEALTH_TIMEOUT_SECONDS = 5.0
# Configurable maximum time to wait for the queue to drain (seconds)
MEDIA_FLUSH_DRAIN_TIMEOUT_SECONDS = 30.0

# Check if threads are alive AND healthy (recently active)
healthy_threads = [
c for c in self._media_upload_consumers
if c.is_alive() and c.is_healthy(timeout_seconds=MEDIA_CONSUMER_HEALTH_TIMEOUT_SECONDS)
]

if healthy_threads:
# Wait for queue to be processed, but with a timeout
langfuse_logger.debug(
f"{len(healthy_threads)} healthy consumer threads active, waiting for queue to drain"
)
start_time = time.time()
timeout = MEDIA_FLUSH_DRAIN_TIMEOUT_SECONDS

while not self._media_upload_queue.empty() and (time.time() - start_time) < timeout:
time.sleep(0.1)

if self._media_upload_queue.empty():
langfuse_logger.debug("Successfully flushed media upload queue via consumer threads")
return
else:
langfuse_logger.warning(
f"Media upload queue not empty after {timeout}s, "
f"{self._media_upload_queue.qsize()} items remaining. "
f"Processing synchronously."
)
else:
alive_count = len([c for c in self._media_upload_consumers if c.is_alive()])
total_count = len(self._media_upload_consumers)

if alive_count == 0:
langfuse_logger.warning(
f"All {total_count} queue consumer threads are dead."
f"Processing {self._media_upload_queue.qsize()} queued items synchronously."
)
else:
langfuse_logger.warning(
f"Queue consumer threads are alive but unhealthy ({alive_count}/{total_count} alive but stalled). "
f"Processing {self._media_upload_queue.qsize()} queued items synchronously."
)
# Synchronous fallback processing
items_processed = 0
while not self._media_upload_queue.empty():
try:
self._media_manager.process_next_media_upload()
items_processed += 1
except Exception as e:
langfuse_logger.error(f"Error processing media upload synchronously: {e}")

if items_processed > 0:
langfuse_logger.info(
f"Processed {items_processed} media uploads synchronously in flush()"
)

langfuse_logger.debug("Successfully flushed media upload queue")

def shutdown(self) -> None:
Expand Down
29 changes: 28 additions & 1 deletion langfuse/_task_manager/media_upload_consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import threading
import time

from .media_manager import MediaManager

Expand All @@ -9,6 +10,8 @@ class MediaUploadConsumer(threading.Thread):
_identifier: int
_max_retries: int
_media_manager: MediaManager
# Default health check threshold in seconds
DEFAULT_HEALTH_TIMEOUT_SECONDS: float = 5.0

def __init__(
self,
Expand All @@ -25,6 +28,8 @@ def __init__(
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
# Track when thread last processed something
self.last_activity = time.time()
self._identifier = identifier
self._media_manager = media_manager

Expand All @@ -34,11 +39,33 @@ def run(self) -> None:
f"Thread: Media upload consumer thread #{self._identifier} started and actively processing queue items"
)
while self.running:
self._media_manager.process_next_media_upload()
try:
# Update activity timestamp before processing
self.last_activity = time.time()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updating last_activity both before and after processing may misrepresent actual work; consider updating only after successful processing.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an upload takes long, I think we'd rather have a signature of when the task starts so that it serves the purpose of knowing the thread is_healthy

self._media_manager.process_next_media_upload()
# Update after successful processing
self.last_activity = time.time()
except Exception as e:
self._log.exception(
f"Thread #{self._identifier}: Unexpected error in consumer loop: {e}"
)
# Continue running despite errors
time.sleep(0.1)

def pause(self) -> None:
"""Pause the media upload consumer."""
self._log.debug(
f"Thread: Pausing media upload consumer thread #{self._identifier}"
)
self.running = False

def is_healthy(self, timeout_seconds: float = DEFAULT_HEALTH_TIMEOUT_SECONDS) -> bool:
"""
Check if thread is healthy and recently active.
Returns False if thread hasn't processed anything in timeout_seconds.
"""
if not self.is_alive():
return False

time_since_activity = time.time() - self.last_activity
return time_since_activity < timeout_seconds