Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions langfuse/_client/environment_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,12 @@

**Default value**: ``True``
"""

LANGFUSE_MEDIA_UPLOAD_ENABLED = "LANGFUSE_MEDIA_UPLOAD_ENABLED"
"""
.. envvar: LANGFUSE_MEDIA_UPLOAD_ENABLED

Controls whether media detection and upload is attempted by the SDK.

**Default value**: ``True``
"""
20 changes: 13 additions & 7 deletions langfuse/_client/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.constants import LANGFUSE_TRACER_NAME
from langfuse._client.environment_variables import (
LANGFUSE_MEDIA_UPLOAD_ENABLED,
LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT,
LANGFUSE_RELEASE,
LANGFUSE_TRACING_ENVIRONMENT,
Expand Down Expand Up @@ -190,6 +191,10 @@ def _initialize_instance(
)

# Media
self._media_upload_enabled = os.environ.get(
LANGFUSE_MEDIA_UPLOAD_ENABLED, "True"
).lower() not in ("false", "0")

self._media_upload_queue = Queue(100_000)
self._media_manager = MediaManager(
api_client=self.api,
Expand All @@ -202,13 +207,14 @@ def _initialize_instance(
int(os.getenv(LANGFUSE_MEDIA_UPLOAD_THREAD_COUNT, 1)), 1
)

for i in range(media_upload_thread_count):
media_upload_consumer = MediaUploadConsumer(
identifier=i,
media_manager=self._media_manager,
)
media_upload_consumer.start()
self._media_upload_consumers.append(media_upload_consumer)
if self._media_upload_enabled:
for i in range(media_upload_thread_count):
media_upload_consumer = MediaUploadConsumer(
identifier=i,
media_manager=self._media_manager,
)
media_upload_consumer.start()
self._media_upload_consumers.append(media_upload_consumer)

# Prompt cache
self.prompt_cache = PromptCache()
Expand Down
8 changes: 8 additions & 0 deletions langfuse/_task_manager/media_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
import time
from queue import Empty, Full, Queue
from typing import Any, Callable, Optional, TypeVar, cast
Expand All @@ -7,6 +8,7 @@
import requests
from typing_extensions import ParamSpec

from langfuse._client.environment_variables import LANGFUSE_MEDIA_UPLOAD_ENABLED
from langfuse._utils import _get_timestamp
from langfuse.api import GetMediaUploadUrlRequest, PatchMediaBody
from langfuse.api.client import FernLangfuse
Expand All @@ -33,6 +35,9 @@ def __init__(
self._api_client = api_client
self._queue = media_upload_queue
self._max_retries = max_retries
self._enabled = os.environ.get(
LANGFUSE_MEDIA_UPLOAD_ENABLED, "True"
).lower() not in ("false", "0")

def process_next_media_upload(self):
try:
Expand Down Expand Up @@ -60,6 +65,9 @@ def _find_and_process_media(
observation_id: Optional[str],
field: str,
):
if not self._enabled:
return data

seen = set()
max_levels = 10

Expand Down