Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 142 additions & 41 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import contextvars
import inspect
import logging
import os
Expand All @@ -10,6 +11,7 @@
Dict,
Generator,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Expand All @@ -21,25 +23,24 @@
from opentelemetry.util._decorator import _AgnosticContextManager
from typing_extensions import ParamSpec

from langfuse._client.environment_variables import (
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
)

from langfuse._client.constants import (
ObservationTypeLiteralNoEvent,
get_observation_types_list,
)
from langfuse._client.environment_variables import (
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
)
from langfuse._client.get_client import _set_current_public_key, get_client
from langfuse._client.span import (
LangfuseGeneration,
LangfuseSpan,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseEvaluator,
LangfuseGeneration,
LangfuseGuardrail,
LangfuseRetriever,
LangfuseSpan,
LangfuseTool,
)
from langfuse.types import TraceContext

Expand Down Expand Up @@ -468,29 +469,54 @@ def _wrap_sync_generator_result(
generator: Generator,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Any:
items = []
preserved_context = contextvars.copy_context()

try:
for item in generator:
items.append(item)
return _ContextPreservedSyncGeneratorWrapper(
generator,
preserved_context,
langfuse_span_or_generation,
transform_to_string,
)

def _wrap_async_generator_result(
self,
langfuse_span_or_generation: Union[
LangfuseSpan,
LangfuseGeneration,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseGuardrail,
],
generator: AsyncGenerator,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> Any:
preserved_context = contextvars.copy_context()

yield item
return _ContextPreservedAsyncGeneratorWrapper(
generator,
preserved_context,
langfuse_span_or_generation,
transform_to_string,
)

finally:
output: Any = items

if transform_to_string is not None:
output = transform_to_string(items)
_decorator = LangfuseDecorator()

observe = _decorator.observe

elif all(isinstance(item, str) for item in items):
output = "".join(items)

langfuse_span_or_generation.update(output=output)
langfuse_span_or_generation.end()
class _ContextPreservedSyncGeneratorWrapper:
"""Sync generator wrapper that ensures each iteration runs in preserved context."""

async def _wrap_async_generator_result(
def __init__(
self,
langfuse_span_or_generation: Union[
generator: Generator,
context: contextvars.Context,
span: Union[
LangfuseSpan,
LangfuseGeneration,
LangfuseAgent,
Expand All @@ -501,30 +527,105 @@ async def _wrap_async_generator_result(
LangfuseEmbedding,
LangfuseGuardrail,
],
generator: AsyncGenerator,
transform_to_string: Optional[Callable[[Iterable], str]] = None,
) -> AsyncGenerator:
items = []
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items: List[Any] = []
self.span = span
self.transform_fn = transform_fn

def __iter__(self) -> "_ContextPreservedSyncGeneratorWrapper":
return self

def __next__(self) -> Any:
try:
# Run the generator's __next__ in the preserved context
item = self.context.run(next, self.generator)
self.items.append(item)

return item

except StopIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items

if self.transform_fn is not None:
output = self.transform_fn(self.items)

elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output).end()

raise # Re-raise StopIteration

except Exception as e:
self.span.update(level="ERROR", status_message=str(e)).end()

raise


class _ContextPreservedAsyncGeneratorWrapper:
"""Async generator wrapper that ensures each iteration runs in preserved context."""

def __init__(
self,
generator: AsyncGenerator,
context: contextvars.Context,
span: Union[
LangfuseSpan,
LangfuseGeneration,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseGuardrail,
],
transform_fn: Optional[Callable[[Iterable], str]],
) -> None:
self.generator = generator
self.context = context
self.items: List[Any] = []
self.span = span
self.transform_fn = transform_fn

def __aiter__(self) -> "_ContextPreservedAsyncGeneratorWrapper":
return self

async def __anext__(self) -> Any:
try:
async for item in generator:
items.append(item)
# Run the generator's __anext__ in the preserved context
try:
# Python 3.10+ approach with context parameter
item = await asyncio.create_task(
self.generator.__anext__(), # type: ignore
context=self.context,
) # type: ignore
except TypeError:
# Python < 3.10 fallback - context parameter not supported
item = await self.generator.__anext__()

yield item
self.items.append(item)

finally:
output: Any = items
return item

if transform_to_string is not None:
output = transform_to_string(items)
except StopAsyncIteration:
# Handle output and span cleanup when generator is exhausted
output: Any = self.items

elif all(isinstance(item, str) for item in items):
output = "".join(items)
if self.transform_fn is not None:
output = self.transform_fn(self.items)

langfuse_span_or_generation.update(output=output)
langfuse_span_or_generation.end()
elif all(isinstance(item, str) for item in self.items):
output = "".join(self.items)

self.span.update(output=output).end()

_decorator = LangfuseDecorator()
raise # Re-raise StopAsyncIteration
except Exception as e:
self.span.update(level="ERROR", status_message=str(e)).end()

observe = _decorator.observe
raise
Loading