Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 89 additions & 1 deletion langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import urllib.parse
from datetime import datetime
from hashlib import sha256
from time import time_ns
from typing import Any, Dict, List, Literal, Optional, Union, cast, overload

import backoff
Expand Down Expand Up @@ -37,7 +38,11 @@
LANGFUSE_TRACING_ENVIRONMENT,
)
from langfuse._client.resource_manager import LangfuseResourceManager
from langfuse._client.span import LangfuseGeneration, LangfuseSpan
from langfuse._client.span import (
LangfuseEvent,
LangfuseGeneration,
LangfuseSpan,
)
from langfuse._utils import _get_timestamp
from langfuse._utils.parse_error import handle_fern_exception
from langfuse._utils.prompt_cache import PromptCache
Expand Down Expand Up @@ -945,6 +950,89 @@ def update_current_trace(
public=public,
)

def create_event(
self,
*,
trace_context: Optional[TraceContext] = None,
name: str,
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
version: Optional[str] = None,
level: Optional[SpanLevel] = None,
status_message: Optional[str] = None,
) -> LangfuseEvent:
"""Create a new Langfuse observation of type 'EVENT'.

The created Langfuse Event observation will be the child of the current span in the context.

Args:
trace_context: Optional context for connecting to an existing trace
name: Name of the span (e.g., function or operation name)
input: Input data for the operation (can be any JSON-serializable object)
output: Output data from the operation (can be any JSON-serializable object)
metadata: Additional metadata to associate with the span
version: Version identifier for the code or component
level: Importance level of the span (info, warning, error)
status_message: Optional status message for the span

Returns:
The Langfuse Event object

Example:
```python
event = langfuse.create_event(name="process-event")
```
"""
timestamp = time_ns()
attributes = create_span_attributes(
input=input,
output=output,
metadata=metadata,
version=version,
level=level,
status_message=status_message,
)

if trace_context:
trace_id = trace_context.get("trace_id", None)
parent_span_id = trace_context.get("parent_span_id", None)

if trace_id:
remote_parent_span = self._create_remote_parent_span(
trace_id=trace_id, parent_span_id=parent_span_id
)

with otel_trace_api.use_span(
cast(otel_trace_api.Span, remote_parent_span)
):
otel_span = self._otel_tracer.start_span(
name=name, attributes=attributes, start_time=timestamp
)
otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True)

return LangfuseEvent(
otel_span=otel_span,
langfuse_client=self,
input=input,
output=output,
metadata=metadata,
environment=self._environment,
).end(end_time=timestamp)

otel_span = self._otel_tracer.start_span(
name=name, attributes=attributes, start_time=timestamp
)

return LangfuseEvent(
otel_span=otel_span,
langfuse_client=self,
input=input,
output=output,
metadata=metadata,
environment=self._environment,
).end(end_time=timestamp)

def _create_remote_parent_span(
self, *, trace_id: str, parent_span_id: Optional[str]
):
Expand Down
117 changes: 101 additions & 16 deletions langfuse/_client/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
and scoring integration specific to Langfuse's observability platform.
"""

from abc import ABC, abstractmethod
from datetime import datetime
from time import time_ns
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -45,7 +45,7 @@
from langfuse.types import MapValue, ScoreDataType, SpanLevel


class LangfuseSpanWrapper(ABC):
class LangfuseSpanWrapper:
"""Abstract base class for all Langfuse span types.

This class provides common functionality for all Langfuse span types, including
Expand All @@ -64,7 +64,7 @@ def __init__(
*,
otel_span: otel_trace_api.Span,
langfuse_client: "Langfuse",
as_type: Literal["span", "generation"],
as_type: Literal["span", "generation", "event"],
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
Expand Down Expand Up @@ -133,18 +133,6 @@ def end(self, *, end_time: Optional[int] = None):

return self

@abstractmethod
def update(self, **kwargs) -> Union["LangfuseSpan", "LangfuseGeneration"]:
"""Update the span with new information.

Abstract method that must be implemented by subclasses to update
the span with new information during its lifecycle.

Args:
**kwargs: Subclass-specific update parameters
"""
pass

def update_trace(
self,
*,
Expand Down Expand Up @@ -352,7 +340,7 @@ def _set_processed_span_attributes(
self,
*,
span: otel_trace_api.Span,
as_type: Optional[Literal["span", "generation"]] = None,
as_type: Optional[Literal["span", "generation", "event"]] = None,
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
Expand Down Expand Up @@ -934,6 +922,69 @@ def start_as_current_generation(
),
)

def create_event(
self,
*,
name: str,
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
version: Optional[str] = None,
level: Optional[SpanLevel] = None,
status_message: Optional[str] = None,
) -> "LangfuseEvent":
"""Create a new Langfuse observation of type 'EVENT'.

Args:
name: Name of the span (e.g., function or operation name)
input: Input data for the operation (can be any JSON-serializable object)
output: Output data from the operation (can be any JSON-serializable object)
metadata: Additional metadata to associate with the span
version: Version identifier for the code or component
level: Importance level of the span (info, warning, error)
status_message: Optional status message for the span

Returns:
The LangfuseEvent object

Example:
```python
event = langfuse.create_event(name="process-event")
```
"""
timestamp = time_ns()
attributes = create_span_attributes(
input=input,
output=output,
metadata=metadata,
version=version,
level=level,
status_message=status_message,
)

with otel_trace_api.use_span(self._otel_span):
new_otel_span = self._langfuse_client._otel_tracer.start_span(
name=name, attributes=attributes, start_time=timestamp
)

if new_otel_span.is_recording:
self._set_processed_span_attributes(
span=new_otel_span,
as_type="event",
input=input,
output=output,
metadata=metadata,
)

return LangfuseEvent(
otel_span=new_otel_span,
langfuse_client=self._langfuse_client,
input=input,
output=output,
metadata=metadata,
environment=self._environment,
).end(end_time=timestamp)


class LangfuseGeneration(LangfuseSpanWrapper):
"""Specialized span implementation for AI model generations in Langfuse.
Expand Down Expand Up @@ -1069,3 +1120,37 @@ def update(
self._otel_span.set_attributes(attributes=attributes)

return self


class LangfuseEvent(LangfuseSpanWrapper):
"""Specialized span implementation for Langfuse Events."""

def __init__(
self,
*,
otel_span: otel_trace_api.Span,
langfuse_client: "Langfuse",
input: Optional[Any] = None,
output: Optional[Any] = None,
metadata: Optional[Any] = None,
environment: Optional[str] = None,
):
"""Initialize a new LangfuseEvent span.

Args:
otel_span: The OpenTelemetry span to wrap
langfuse_client: Reference to the parent Langfuse client
input: Input data for the generation (e.g., prompts)
output: Output from the generation (e.g., completions)
metadata: Additional metadata to associate with the generation
environment: The tracing environment
"""
super().__init__(
otel_span=otel_span,
as_type="event",
langfuse_client=langfuse_client,
input=input,
output=output,
metadata=metadata,
environment=environment,
)
Loading