Skip to content

Commit 7e1ce87

Browse files
authored
feat(media): utility to resolve media references with media content (#1036)
1 parent db0b4f1 commit 7e1ce87

File tree

4 files changed

+198
-2
lines changed

4 files changed

+198
-2
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ jobs:
9090
rm -rf .env
9191
9292
echo "::group::Run server"
93-
TELEMETRY_ENABLED=false LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d
93+
TELEMETRY_ENABLED=false LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT=http://localhost:9090 LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d
9494
echo "::endgroup::"
9595
9696
# Add this step to check the health of the container

langfuse/client.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -885,6 +885,20 @@ def fetch_observation(
885885
handle_fern_exception(e)
886886
raise e
887887

888+
def fetch_media(self, id: str):
889+
"""Get media content by ID.
890+
891+
Args:
892+
id: The identifier of the media content to fetch.
893+
894+
Returns:
895+
Media object
896+
897+
Raises:
898+
Exception: If the media content could not be found or if an error occurred during the request.
899+
"""
900+
return self.client.media.get(id)
901+
888902
def get_observation(
889903
self,
890904
id: str,

langfuse/media.py

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@
44
import hashlib
55
import logging
66
import os
7-
from typing import Optional, cast, Tuple
7+
import re
8+
import requests
9+
from typing import Optional, cast, Tuple, Any, TypeVar, Literal
810

911
from langfuse.api import MediaContentType
1012
from langfuse.types import ParsedMediaReference
1113

14+
T = TypeVar("T")
15+
1216

1317
class LangfuseMedia:
1418
"""A class for wrapping media objects for upload to Langfuse.
@@ -201,3 +205,116 @@ def _parse_base64_data_uri(
201205
self._log.error("Error parsing base64 data URI", exc_info=e)
202206

203207
return None, None
208+
209+
@staticmethod
210+
def resolve_media_references(
211+
*,
212+
obj: T,
213+
langfuse_client: Any,
214+
resolve_with: Literal["base64_data_uri"],
215+
max_depth: int = 10,
216+
) -> T:
217+
"""Replace media reference strings in an object with base64 data URIs.
218+
219+
This method recursively traverses an object (up to max_depth) looking for media reference strings
220+
in the format "@@@langfuseMedia:...@@@". When found, it (synchronously) fetches the actual media content using
221+
the provided Langfuse client and replaces the reference string with a base64 data URI.
222+
223+
If fetching media content fails for a reference string, a warning is logged and the reference
224+
string is left unchanged.
225+
226+
Args:
227+
obj: The object to process. Can be a primitive value, array, or nested object.
228+
If the object has a __dict__ attribute, a dict will be returned instead of the original object type.
229+
langfuse_client: Langfuse client instance used to fetch media content.
230+
resolve_with: The representation of the media content to replace the media reference string with.
231+
Currently only "base64_data_uri" is supported.
232+
max_depth: Optional. Default is 10. The maximum depth to traverse the object.
233+
234+
Returns:
235+
A deep copy of the input object with all media references replaced with base64 data URIs where possible.
236+
If the input object has a __dict__ attribute, a dict will be returned instead of the original object type.
237+
238+
Example:
239+
obj = {
240+
"image": "@@@langfuseMedia:type=image/jpeg|id=123|source=bytes@@@",
241+
"nested": {
242+
"pdf": "@@@langfuseMedia:type=application/pdf|id=456|source=bytes@@@"
243+
}
244+
}
245+
246+
result = await LangfuseMedia.resolve_media_references(obj, langfuse_client)
247+
248+
# Result:
249+
# {
250+
# "image": "...",
251+
# "nested": {
252+
# "pdf": "data:application/pdf;base64,JVBERi0xLjcK..."
253+
# }
254+
# }
255+
"""
256+
257+
def traverse(obj: Any, depth: int) -> Any:
258+
if depth > max_depth:
259+
return obj
260+
261+
# Handle string with potential media references
262+
if isinstance(obj, str):
263+
regex = r"@@@langfuseMedia:.+?@@@"
264+
reference_string_matches = re.findall(regex, obj)
265+
if len(reference_string_matches) == 0:
266+
return obj
267+
268+
result = obj
269+
reference_string_to_media_content = {}
270+
271+
for reference_string in reference_string_matches:
272+
try:
273+
parsed_media_reference = LangfuseMedia.parse_reference_string(
274+
reference_string
275+
)
276+
media_data = langfuse_client.fetch_media(
277+
parsed_media_reference["media_id"]
278+
)
279+
media_content = requests.get(media_data.url)
280+
if not media_content.ok:
281+
raise Exception("Failed to fetch media content")
282+
283+
base64_media_content = base64.b64encode(
284+
media_content.content
285+
).decode()
286+
base64_data_uri = f"data:{media_data.content_type};base64,{base64_media_content}"
287+
288+
reference_string_to_media_content[reference_string] = (
289+
base64_data_uri
290+
)
291+
except Exception as e:
292+
logging.warning(
293+
f"Error fetching media content for reference string {reference_string}: {e}"
294+
)
295+
# Do not replace the reference string if there's an error
296+
continue
297+
298+
for ref_str, media_content in reference_string_to_media_content.items():
299+
result = result.replace(ref_str, media_content)
300+
301+
return result
302+
303+
# Handle arrays
304+
if isinstance(obj, list):
305+
return [traverse(item, depth + 1) for item in obj]
306+
307+
# Handle dictionaries
308+
if isinstance(obj, dict):
309+
return {key: traverse(value, depth + 1) for key, value in obj.items()}
310+
311+
# Handle objects:
312+
if hasattr(obj, "__dict__"):
313+
return {
314+
key: traverse(value, depth + 1)
315+
for key, value in obj.__dict__.items()
316+
}
317+
318+
return obj
319+
320+
return traverse(obj, 0)

tests/test_media.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import base64
22
import pytest
33
from langfuse.media import LangfuseMedia
4+
from langfuse.client import Langfuse
5+
from uuid import uuid4
6+
import re
7+
48

59
# Test data
610
SAMPLE_JPEG_BYTES = b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x01\x00H\x00H\x00\x00"
@@ -104,3 +108,64 @@ def test_nonexistent_file():
104108
assert media._source is None
105109
assert media._content_bytes is None
106110
assert media._content_type is None
111+
112+
113+
@pytest.mark.skip(reason="Docker networking issues. Enable once LFE-3159 is fixed.")
114+
def test_replace_media_reference_string_in_object(tmp_path):
115+
# Create test audio file
116+
audio_file = "static/joke_prompt.wav"
117+
with open(audio_file, "rb") as f:
118+
mock_audio_bytes = f.read()
119+
120+
# Create Langfuse client and trace with media
121+
langfuse = Langfuse()
122+
123+
mock_trace_name = f"test-trace-with-audio-{uuid4()}"
124+
base64_audio = base64.b64encode(mock_audio_bytes).decode()
125+
126+
trace = langfuse.trace(
127+
name=mock_trace_name,
128+
metadata={
129+
"context": {
130+
"nested": LangfuseMedia(
131+
base64_data_uri=f"data:audio/wav;base64,{base64_audio}"
132+
)
133+
}
134+
},
135+
)
136+
137+
langfuse.flush()
138+
139+
# Verify media reference string format
140+
fetched_trace = langfuse.fetch_trace(trace.id).data
141+
media_ref = fetched_trace.metadata["context"]["nested"]
142+
assert re.match(
143+
r"^@@@langfuseMedia:type=audio/wav\|id=.+\|source=base64_data_uri@@@$",
144+
media_ref,
145+
)
146+
147+
# Resolve media references back to base64
148+
resolved_trace = LangfuseMedia.resolve_media_references(
149+
obj=fetched_trace, langfuse_client=langfuse, resolve_with="base64_data_uri"
150+
)
151+
152+
# Verify resolved base64 matches original
153+
expected_base64 = f"data:audio/wav;base64,{base64_audio}"
154+
assert resolved_trace["metadata"]["context"]["nested"] == expected_base64
155+
156+
# Create second trace reusing the media reference
157+
trace2 = langfuse.trace(
158+
name=f"2-{mock_trace_name}",
159+
metadata={
160+
"context": {"nested": resolved_trace["metadata"]["context"]["nested"]}
161+
},
162+
)
163+
164+
langfuse.flush()
165+
166+
# Verify second trace has same media reference
167+
fetched_trace2 = langfuse.fetch_trace(trace2.id).data
168+
assert (
169+
fetched_trace2.metadata["context"]["nested"]
170+
== fetched_trace.metadata["context"]["nested"]
171+
)

0 commit comments

Comments
 (0)