diff --git a/langfuse/_client/environment_variables.py b/langfuse/_client/environment_variables.py index e78604f9b..d9df459a0 100644 --- a/langfuse/_client/environment_variables.py +++ b/langfuse/_client/environment_variables.py @@ -110,3 +110,12 @@ **Default value**: ``True`` """ + +LANGFUSE_MEDIA_UPLOAD_ENABLED = "LANGFUSE_MEDIA_UPLOAD_ENABLED" +""" +.. envvar: LANGFUSE_MEDIA_UPLOAD_ENABLED + +Controls whether media detection and upload is attempted by the SDK. + +**Default value**: ``True`` +""" diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index bb28bdf00..1c07de57c 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -29,6 +29,7 @@ from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.constants import LANGFUSE_TRACER_NAME from langfuse._client.environment_variables import ( + LANGFUSE_MEDIA_UPLOAD_ENABLED, LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, LANGFUSE_RELEASE, LANGFUSE_TRACING_ENVIRONMENT, @@ -190,6 +191,10 @@ def _initialize_instance( ) # Media + self._media_upload_enabled = os.environ.get( + LANGFUSE_MEDIA_UPLOAD_ENABLED, "True" + ).lower() not in ("false", "0") + self._media_upload_queue = Queue(100_000) self._media_manager = MediaManager( api_client=self.api, @@ -202,13 +207,14 @@ def _initialize_instance( int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1 ) - for i in range(media_upload_thread_count): - media_upload_consumer = MediaUploadConsumer( - identifier=i, - media_manager=self._media_manager, - ) - media_upload_consumer.start() - self._media_upload_consumers.append(media_upload_consumer) + if self._media_upload_enabled: + for i in range(media_upload_thread_count): + media_upload_consumer = MediaUploadConsumer( + identifier=i, + media_manager=self._media_manager, + ) + media_upload_consumer.start() + self._media_upload_consumers.append(media_upload_consumer) # Prompt cache self.prompt_cache = PromptCache() diff --git a/langfuse/_task_manager/media_manager.py b/langfuse/_task_manager/media_manager.py index de9406027..43a50f8c6 100644 --- a/langfuse/_task_manager/media_manager.py +++ b/langfuse/_task_manager/media_manager.py @@ -1,4 +1,5 @@ import logging +import os import time from queue import Empty, Full, Queue from typing import Any, Callable, Optional, TypeVar, cast @@ -7,6 +8,7 @@ import requests from typing_extensions import ParamSpec +from langfuse._client.environment_variables import LANGFUSE_MEDIA_UPLOAD_ENABLED from langfuse._utils import _get_timestamp from langfuse.api import GetMediaUploadUrlRequest, PatchMediaBody from langfuse.api.client import FernLangfuse @@ -33,6 +35,9 @@ def __init__( self._api_client = api_client self._queue = media_upload_queue self._max_retries = max_retries + self._enabled = os.environ.get( + LANGFUSE_MEDIA_UPLOAD_ENABLED, "True" + ).lower() not in ("false", "0") def process_next_media_upload(self): try: @@ -60,6 +65,9 @@ def _find_and_process_media( observation_id: Optional[str], field: str, ): + if not self._enabled: + return data + seen = set() max_levels = 10