From 4c72a6f7b0c1f7cc78e02f301db718e22ac1928e Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Thu, 22 May 2025 14:12:57 +0200 Subject: [PATCH] feat(core): add support for observation events --- langfuse/_client/client.py | 90 +++++++++++++++++++++++++++- langfuse/_client/span.py | 117 ++++++++++++++++++++++++++++++++----- 2 files changed, 190 insertions(+), 17 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index b2c82a30a..ddd9cd883 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -9,6 +9,7 @@ import urllib.parse from datetime import datetime from hashlib import sha256 +from time import time_ns from typing import Any, Dict, List, Literal, Optional, Union, cast, overload import backoff @@ -37,7 +38,11 @@ LANGFUSE_TRACING_ENVIRONMENT, ) from langfuse._client.resource_manager import LangfuseResourceManager -from langfuse._client.span import LangfuseGeneration, LangfuseSpan +from langfuse._client.span import ( + LangfuseEvent, + LangfuseGeneration, + LangfuseSpan, +) from langfuse._utils import _get_timestamp from langfuse._utils.parse_error import handle_fern_exception from langfuse._utils.prompt_cache import PromptCache @@ -945,6 +950,89 @@ def update_current_trace( public=public, ) + def create_event( + self, + *, + trace_context: Optional[TraceContext] = None, + name: str, + input: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Any] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + ) -> LangfuseEvent: + """Create a new Langfuse observation of type 'EVENT'. + + The created Langfuse Event observation will be the child of the current span in the context. + + Args: + trace_context: Optional context for connecting to an existing trace + name: Name of the span (e.g., function or operation name) + input: Input data for the operation (can be any JSON-serializable object) + output: Output data from the operation (can be any JSON-serializable object) + metadata: Additional metadata to associate with the span + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span + + Returns: + The Langfuse Event object + + Example: + ```python + event = langfuse.create_event(name="process-event") + ``` + """ + timestamp = time_ns() + attributes = create_span_attributes( + input=input, + output=output, + metadata=metadata, + version=version, + level=level, + status_message=status_message, + ) + + if trace_context: + trace_id = trace_context.get("trace_id", None) + parent_span_id = trace_context.get("parent_span_id", None) + + if trace_id: + remote_parent_span = self._create_remote_parent_span( + trace_id=trace_id, parent_span_id=parent_span_id + ) + + with otel_trace_api.use_span( + cast(otel_trace_api.Span, remote_parent_span) + ): + otel_span = self._otel_tracer.start_span( + name=name, attributes=attributes, start_time=timestamp + ) + otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) + + return LangfuseEvent( + otel_span=otel_span, + langfuse_client=self, + input=input, + output=output, + metadata=metadata, + environment=self._environment, + ).end(end_time=timestamp) + + otel_span = self._otel_tracer.start_span( + name=name, attributes=attributes, start_time=timestamp + ) + + return LangfuseEvent( + otel_span=otel_span, + langfuse_client=self, + input=input, + output=output, + metadata=metadata, + environment=self._environment, + ).end(end_time=timestamp) + def _create_remote_parent_span( self, *, trace_id: str, parent_span_id: Optional[str] ): diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 99327d763..7f65358f8 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -13,8 +13,8 @@ and scoring integration specific to Langfuse's observability platform. """ -from abc import ABC, abstractmethod from datetime import datetime +from time import time_ns from typing import ( TYPE_CHECKING, Any, @@ -45,7 +45,7 @@ from langfuse.types import MapValue, ScoreDataType, SpanLevel -class LangfuseSpanWrapper(ABC): +class LangfuseSpanWrapper: """Abstract base class for all Langfuse span types. This class provides common functionality for all Langfuse span types, including @@ -64,7 +64,7 @@ def __init__( *, otel_span: otel_trace_api.Span, langfuse_client: "Langfuse", - as_type: Literal["span", "generation"], + as_type: Literal["span", "generation", "event"], input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, @@ -133,18 +133,6 @@ def end(self, *, end_time: Optional[int] = None): return self - @abstractmethod - def update(self, **kwargs) -> Union["LangfuseSpan", "LangfuseGeneration"]: - """Update the span with new information. - - Abstract method that must be implemented by subclasses to update - the span with new information during its lifecycle. - - Args: - **kwargs: Subclass-specific update parameters - """ - pass - def update_trace( self, *, @@ -352,7 +340,7 @@ def _set_processed_span_attributes( self, *, span: otel_trace_api.Span, - as_type: Optional[Literal["span", "generation"]] = None, + as_type: Optional[Literal["span", "generation", "event"]] = None, input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, @@ -934,6 +922,69 @@ def start_as_current_generation( ), ) + def create_event( + self, + *, + name: str, + input: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Any] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + ) -> "LangfuseEvent": + """Create a new Langfuse observation of type 'EVENT'. + + Args: + name: Name of the span (e.g., function or operation name) + input: Input data for the operation (can be any JSON-serializable object) + output: Output data from the operation (can be any JSON-serializable object) + metadata: Additional metadata to associate with the span + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span + + Returns: + The LangfuseEvent object + + Example: + ```python + event = langfuse.create_event(name="process-event") + ``` + """ + timestamp = time_ns() + attributes = create_span_attributes( + input=input, + output=output, + metadata=metadata, + version=version, + level=level, + status_message=status_message, + ) + + with otel_trace_api.use_span(self._otel_span): + new_otel_span = self._langfuse_client._otel_tracer.start_span( + name=name, attributes=attributes, start_time=timestamp + ) + + if new_otel_span.is_recording: + self._set_processed_span_attributes( + span=new_otel_span, + as_type="event", + input=input, + output=output, + metadata=metadata, + ) + + return LangfuseEvent( + otel_span=new_otel_span, + langfuse_client=self._langfuse_client, + input=input, + output=output, + metadata=metadata, + environment=self._environment, + ).end(end_time=timestamp) + class LangfuseGeneration(LangfuseSpanWrapper): """Specialized span implementation for AI model generations in Langfuse. @@ -1069,3 +1120,37 @@ def update( self._otel_span.set_attributes(attributes=attributes) return self + + +class LangfuseEvent(LangfuseSpanWrapper): + """Specialized span implementation for Langfuse Events.""" + + def __init__( + self, + *, + otel_span: otel_trace_api.Span, + langfuse_client: "Langfuse", + input: Optional[Any] = None, + output: Optional[Any] = None, + metadata: Optional[Any] = None, + environment: Optional[str] = None, + ): + """Initialize a new LangfuseEvent span. + + Args: + otel_span: The OpenTelemetry span to wrap + langfuse_client: Reference to the parent Langfuse client + input: Input data for the generation (e.g., prompts) + output: Output from the generation (e.g., completions) + metadata: Additional metadata to associate with the generation + environment: The tracing environment + """ + super().__init__( + otel_span=otel_span, + as_type="event", + langfuse_client=langfuse_client, + input=input, + output=output, + metadata=metadata, + environment=environment, + )