Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions langfuse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from langfuse.api.resources.utils.resources.pagination.types.meta_response import (
MetaResponse,
)
from langfuse.api.resources.media import GetMediaResponse
from langfuse.model import (
ChatMessageDict,
ChatPromptClient,
Expand All @@ -74,6 +75,7 @@
from langfuse.environment import get_common_release_envs
from langfuse.logging import clean_logger
from langfuse.model import Dataset, MapValue, Observation, TraceWithFullDetails
from langfuse.media import LangfuseMedia
from langfuse.request import LangfuseClient
from langfuse.types import MaskFunction, ScoreDataType, SpanLevel
from langfuse.utils import _convert_usage_input, _create_prompt_context, _get_timestamp
Expand Down Expand Up @@ -111,6 +113,13 @@ class FetchObservationResponse:
data: Observation


@dataclass
class FetchMediaResponse:
"""Response object for fetch_media method."""

data: GetMediaResponse


@dataclass
class FetchSessionsResponse:
"""Response object for fetch_sessions method."""
Expand Down Expand Up @@ -885,19 +894,78 @@ def fetch_observation(
handle_fern_exception(e)
raise e

def fetch_media(self, id: str):
def fetch_media(self, id: str) -> FetchMediaResponse:
"""Get media content by ID.

Args:
id: The identifier of the media content to fetch.

Returns:
Media object
FetchMediaResponse: The media data of the given id on `data`.

Raises:
Exception: If the media content could not be found or if an error occurred during the request.
Exception: If the media content with the given id could not be found within the authenticated project or if an error occurred during the request.
"""
try:
return FetchMediaResponse(data=self.client.media.get(id))
except Exception as e:
handle_fern_exception(e)
raise e

def resolve_media_references(
self,
*,
obj: Any,
resolve_with: Literal["base64_data_uri"],
max_depth: int = 10,
content_fetch_timeout_seconds: int = 10,
):
"""Replace media reference strings in an object with base64 data URIs.

This method recursively traverses an object (up to max_depth) looking for media reference strings
in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using
the provided Langfuse client and replaces the reference string with a base64 data URI.

If fetching media content fails for a reference string, a warning is logged and the reference
string is left unchanged.

Args:
obj: The object to process. Can be a primitive value, array, or nested object.
If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
resolve_with: The representation of the media content to replace the media reference string with.
Currently only "base64_data_uri" is supported.
max_depth: int: The maximum depth to traverse the object. Default is 10.
content_fetch_timeout_seconds: int: The timeout in seconds for fetching media content. Default is 10.

Returns:
A deep copy of the input object with all media references replaced with base64 data URIs where possible.
If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.

Example:
obj = {
"image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@",
"nested": {
"pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@"
}
}

result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)

# Result:
# {
# "image": "data:image/jpeg;base64,/9j/4AAQSkZJRg...",
# "nested": {
# "pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
# }
# }
"""
return self.client.media.get(id)
return LangfuseMedia.resolve_media_references(
langfuse_client=self,
obj=obj,
resolve_with=resolve_with,
max_depth=max_depth,
content_fetch_timeout_seconds=content_fetch_timeout_seconds,
)

def get_observation(
self,
Expand Down
9 changes: 6 additions & 3 deletions langfuse/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def resolve_media_references(
langfuse_client: Any,
resolve_with: Literal["base64_data_uri"],
max_depth: int = 10,
content_fetch_timeout_seconds: int = 10,
) -> T:
"""Replace media reference strings in an object with base64 data URIs.

Expand Down Expand Up @@ -258,7 +259,7 @@ def traverse(obj: Any, depth: int) -> Any:
if depth > max_depth:
return obj

# Handle string with potential media references
# Handle string
if isinstance(obj, str):
regex = r"@@@langfuseMedia:.+?@@@"
reference_string_matches = re.findall(regex, obj)
Expand All @@ -275,8 +276,10 @@ def traverse(obj: Any, depth: int) -> Any:
)
media_data = langfuse_client.fetch_media(
parsed_media_reference["media_id"]
).data
media_content = requests.get(
media_data.url, timeout=content_fetch_timeout_seconds
)
Comment on lines +280 to 282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: timeout only applies to the GET request, but not to the fetch_media call which could also hang. Consider wrapping both operations in a timeout

media_content = requests.get(media_data.url)
if not media_content.ok:
raise Exception("Failed to fetch media content")

Expand All @@ -289,7 +292,7 @@ def traverse(obj: Any, depth: int) -> Any:
base64_data_uri
)
except Exception as e:
logging.warning(
LangfuseMedia._log.warning(
f"Error fetching media content for reference string {reference_string}: {e}"
)
# Do not replace the reference string if there's an error
Expand Down
4 changes: 2 additions & 2 deletions tests/test_media.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ def test_replace_media_reference_string_in_object(tmp_path):
)

# Resolve media references back to base64
resolved_trace = LangfuseMedia.resolve_media_references(
obj=fetched_trace, langfuse_client=langfuse, resolve_with="base64_data_uri"
resolved_trace = langfuse.resolve_media_references(
obj=fetched_trace, resolve_with="base64_data_uri"
)

# Verify resolved base64 matches original
Expand Down
Loading