From 2479a83a2db599a13934c8e128080e967ff074d5 Mon Sep 17 00:00:00 2001 From: Abhinav Sharma Date: Fri, 10 Oct 2025 01:33:53 +0530 Subject: [PATCH] fix: ensure ingestion and media queues are always flushed --- langfuse/_client/resource_manager.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 70ed5b17c..28c24e919 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -398,11 +398,9 @@ def _stop_and_join_consumer_threads(self) -> None: def flush(self) -> None: tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) - if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider): - return - - tracer_provider.force_flush() - langfuse_logger.debug("Successfully flushed OTEL tracer provider") + if not isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider): + tracer_provider.force_flush() + langfuse_logger.debug("Successfully flushed OTEL tracer provider") self._score_ingestion_queue.join() langfuse_logger.debug("Successfully flushed score ingestion queue") @@ -415,10 +413,8 @@ def shutdown(self) -> None: atexit.unregister(self.shutdown) tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) - if isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider): - return - - tracer_provider.force_flush() + if not isinstance(tracer_provider, otel_trace_api.ProxyTracerProvider): + tracer_provider.force_flush() self._stop_and_join_consumer_threads()