Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion langfuse/_client/get_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,37 @@
from typing import Optional
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Iterator, Optional

from langfuse._client.client import Langfuse
from langfuse._client.resource_manager import LangfuseResourceManager
from langfuse.logger import langfuse_logger

# Context variable to track the current langfuse_public_key in execution context
_current_public_key: ContextVar[Optional[str]] = ContextVar(
"langfuse_public_key", default=None
)


@contextmanager
def _set_current_public_key(public_key: Optional[str]) -> Iterator[None]:
"""Context manager to set and restore the current public key in execution context.

Args:
public_key: The public key to set in context. If None, context is not modified.

Yields:
None
"""
if public_key is None:
yield # Don't modify context if no key provided
return

token = _current_public_key.set(public_key)
try:
yield
finally:
_current_public_key.reset(token)


def get_client(*, public_key: Optional[str] = None) -> Langfuse:
"""Get or create a Langfuse client instance.
Expand Down Expand Up @@ -49,6 +77,10 @@ def get_client(*, public_key: Optional[str] = None) -> Langfuse:
with LangfuseResourceManager._lock:
active_instances = LangfuseResourceManager._instances

# If no explicit public_key provided, check execution context
if not public_key:
public_key = _current_public_key.get(None)

if not public_key:
if len(active_instances) == 0:
# No clients initialized yet, create default instance
Expand Down
224 changes: 115 additions & 109 deletions langfuse/_client/observe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from langfuse._client.environment_variables import (
LANGFUSE_OBSERVE_DECORATOR_IO_CAPTURE_ENABLED,
)
from langfuse._client.get_client import get_client
from langfuse._client.get_client import _set_current_public_key, get_client
from langfuse._client.span import LangfuseGeneration, LangfuseSpan
from langfuse.types import TraceContext

Expand Down Expand Up @@ -231,72 +231,75 @@ async def async_wrapper(*args: Tuple[Any], **kwargs: Dict[str, Any]) -> Any:
else None
)
public_key = cast(str, kwargs.pop("langfuse_public_key", None))
langfuse_client = get_client(public_key=public_key)
context_manager: Optional[
Union[
_AgnosticContextManager[LangfuseGeneration],
_AgnosticContextManager[LangfuseSpan],
]
] = (
(
langfuse_client.start_as_current_generation(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
if as_type == "generation"
else langfuse_client.start_as_current_span(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early

# Set public key in execution context for nested decorated functions
with _set_current_public_key(public_key):
langfuse_client = get_client(public_key=public_key)
context_manager: Optional[
Union[
_AgnosticContextManager[LangfuseGeneration],
_AgnosticContextManager[LangfuseSpan],
]
] = (
(
langfuse_client.start_as_current_generation(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
if as_type == "generation"
else langfuse_client.start_as_current_span(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
)
if langfuse_client
else None
)
if langfuse_client
else None
)

if context_manager is None:
return await func(*args, **kwargs)
if context_manager is None:
return await func(*args, **kwargs)

with context_manager as langfuse_span_or_generation:
is_return_type_generator = False
with context_manager as langfuse_span_or_generation:
is_return_type_generator = False

try:
result = await func(*args, **kwargs)
try:
result = await func(*args, **kwargs)

if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True
if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True

return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)
return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

if inspect.isasyncgen(result):
is_return_type_generator = True
if inspect.isasyncgen(result):
is_return_type_generator = True

return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)
return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

langfuse_span_or_generation.update(output=result)
langfuse_span_or_generation.update(output=result)

return result
except Exception as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e)
)
return result
except Exception as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e)
)

raise e
finally:
if not is_return_type_generator:
langfuse_span_or_generation.end()
raise e
finally:
if not is_return_type_generator:
langfuse_span_or_generation.end()

return cast(F, async_wrapper)

Expand Down Expand Up @@ -333,72 +336,75 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
else None
)
public_key = kwargs.pop("langfuse_public_key", None)
langfuse_client = get_client(public_key=public_key)
context_manager: Optional[
Union[
_AgnosticContextManager[LangfuseGeneration],
_AgnosticContextManager[LangfuseSpan],
]
] = (
(
langfuse_client.start_as_current_generation(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
if as_type == "generation"
else langfuse_client.start_as_current_span(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early

# Set public key in execution context for nested decorated functions
with _set_current_public_key(public_key):
langfuse_client = get_client(public_key=public_key)
context_manager: Optional[
Union[
_AgnosticContextManager[LangfuseGeneration],
_AgnosticContextManager[LangfuseSpan],
]
] = (
(
langfuse_client.start_as_current_generation(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
if as_type == "generation"
else langfuse_client.start_as_current_span(
name=final_name,
trace_context=trace_context,
input=input,
end_on_exit=False, # when returning a generator, closing on exit would be to early
)
)
if langfuse_client
else None
)
if langfuse_client
else None
)

if context_manager is None:
return func(*args, **kwargs)
if context_manager is None:
return func(*args, **kwargs)

with context_manager as langfuse_span_or_generation:
is_return_type_generator = False
with context_manager as langfuse_span_or_generation:
is_return_type_generator = False

try:
result = func(*args, **kwargs)
try:
result = func(*args, **kwargs)

if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True
if capture_output is True:
if inspect.isgenerator(result):
is_return_type_generator = True

return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)
return self._wrap_sync_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

if inspect.isasyncgen(result):
is_return_type_generator = True
if inspect.isasyncgen(result):
is_return_type_generator = True

return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)
return self._wrap_async_generator_result(
langfuse_span_or_generation,
result,
transform_to_string,
)

langfuse_span_or_generation.update(output=result)
langfuse_span_or_generation.update(output=result)

return result
except Exception as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e)
)
return result
except Exception as e:
langfuse_span_or_generation.update(
level="ERROR", status_message=str(e)
)

raise e
finally:
if not is_return_type_generator:
langfuse_span_or_generation.end()
raise e
finally:
if not is_return_type_generator:
langfuse_span_or_generation.end()

return cast(F, sync_wrapper)

Expand Down
Loading