Skip to content

Commit 7af5dbc

Browse files
authored
feat(core): add support for observation events (#1189)
1 parent d2e7e2d commit 7af5dbc

File tree

2 files changed

+190
-17
lines changed

2 files changed

+190
-17
lines changed

langfuse/_client/client.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import urllib.parse
1010
from datetime import datetime
1111
from hashlib import sha256
12+
from time import time_ns
1213
from typing import Any, Dict, List, Literal, Optional, Union, cast, overload
1314

1415
import backoff
@@ -37,7 +38,11 @@
3738
LANGFUSE_TRACING_ENVIRONMENT,
3839
)
3940
from langfuse._client.resource_manager import LangfuseResourceManager
40-
from langfuse._client.span import LangfuseGeneration, LangfuseSpan
41+
from langfuse._client.span import (
42+
LangfuseEvent,
43+
LangfuseGeneration,
44+
LangfuseSpan,
45+
)
4146
from langfuse._utils import _get_timestamp
4247
from langfuse._utils.parse_error import handle_fern_exception
4348
from langfuse._utils.prompt_cache import PromptCache
@@ -945,6 +950,89 @@ def update_current_trace(
945950
public=public,
946951
)
947952

953+
def create_event(
954+
self,
955+
*,
956+
trace_context: Optional[TraceContext] = None,
957+
name: str,
958+
input: Optional[Any] = None,
959+
output: Optional[Any] = None,
960+
metadata: Optional[Any] = None,
961+
version: Optional[str] = None,
962+
level: Optional[SpanLevel] = None,
963+
status_message: Optional[str] = None,
964+
) -> LangfuseEvent:
965+
"""Create a new Langfuse observation of type 'EVENT'.
966+
967+
The created Langfuse Event observation will be the child of the current span in the context.
968+
969+
Args:
970+
trace_context: Optional context for connecting to an existing trace
971+
name: Name of the span (e.g., function or operation name)
972+
input: Input data for the operation (can be any JSON-serializable object)
973+
output: Output data from the operation (can be any JSON-serializable object)
974+
metadata: Additional metadata to associate with the span
975+
version: Version identifier for the code or component
976+
level: Importance level of the span (info, warning, error)
977+
status_message: Optional status message for the span
978+
979+
Returns:
980+
The Langfuse Event object
981+
982+
Example:
983+
```python
984+
event = langfuse.create_event(name="process-event")
985+
```
986+
"""
987+
timestamp = time_ns()
988+
attributes = create_span_attributes(
989+
input=input,
990+
output=output,
991+
metadata=metadata,
992+
version=version,
993+
level=level,
994+
status_message=status_message,
995+
)
996+
997+
if trace_context:
998+
trace_id = trace_context.get("trace_id", None)
999+
parent_span_id = trace_context.get("parent_span_id", None)
1000+
1001+
if trace_id:
1002+
remote_parent_span = self._create_remote_parent_span(
1003+
trace_id=trace_id, parent_span_id=parent_span_id
1004+
)
1005+
1006+
with otel_trace_api.use_span(
1007+
cast(otel_trace_api.Span, remote_parent_span)
1008+
):
1009+
otel_span = self._otel_tracer.start_span(
1010+
name=name, attributes=attributes, start_time=timestamp
1011+
)
1012+
otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True)
1013+
1014+
return LangfuseEvent(
1015+
otel_span=otel_span,
1016+
langfuse_client=self,
1017+
input=input,
1018+
output=output,
1019+
metadata=metadata,
1020+
environment=self._environment,
1021+
).end(end_time=timestamp)
1022+
1023+
otel_span = self._otel_tracer.start_span(
1024+
name=name, attributes=attributes, start_time=timestamp
1025+
)
1026+
1027+
return LangfuseEvent(
1028+
otel_span=otel_span,
1029+
langfuse_client=self,
1030+
input=input,
1031+
output=output,
1032+
metadata=metadata,
1033+
environment=self._environment,
1034+
).end(end_time=timestamp)
1035+
9481036
def _create_remote_parent_span(
9491037
self, *, trace_id: str, parent_span_id: Optional[str]
9501038
):

langfuse/_client/span.py

Lines changed: 101 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
and scoring integration specific to Langfuse's observability platform.
1414
"""
1515

16-
from abc import ABC, abstractmethod
1716
from datetime import datetime
17+
from time import time_ns
1818
from typing import (
1919
TYPE_CHECKING,
2020
Any,
@@ -45,7 +45,7 @@
4545
from langfuse.types import MapValue, ScoreDataType, SpanLevel
4646

4747

48-
class LangfuseSpanWrapper(ABC):
48+
class LangfuseSpanWrapper:
4949
"""Abstract base class for all Langfuse span types.
5050
5151
This class provides common functionality for all Langfuse span types, including
@@ -64,7 +64,7 @@ def __init__(
6464
*,
6565
otel_span: otel_trace_api.Span,
6666
langfuse_client: "Langfuse",
67-
as_type: Literal["span", "generation"],
67+
as_type: Literal["span", "generation", "event"],
6868
input: Optional[Any] = None,
6969
output: Optional[Any] = None,
7070
metadata: Optional[Any] = None,
@@ -133,18 +133,6 @@ def end(self, *, end_time: Optional[int] = None):
133133

134134
return self
135135

136-
@abstractmethod
137-
def update(self, **kwargs) -> Union["LangfuseSpan", "LangfuseGeneration"]:
138-
"""Update the span with new information.
139-
140-
Abstract method that must be implemented by subclasses to update
141-
the span with new information during its lifecycle.
142-
143-
Args:
144-
**kwargs: Subclass-specific update parameters
145-
"""
146-
pass
147-
148136
def update_trace(
149137
self,
150138
*,
@@ -352,7 +340,7 @@ def _set_processed_span_attributes(
352340
self,
353341
*,
354342
span: otel_trace_api.Span,
355-
as_type: Optional[Literal["span", "generation"]] = None,
343+
as_type: Optional[Literal["span", "generation", "event"]] = None,
356344
input: Optional[Any] = None,
357345
output: Optional[Any] = None,
358346
metadata: Optional[Any] = None,
@@ -934,6 +922,69 @@ def start_as_current_generation(
934922
),
935923
)
936924

925+
def create_event(
926+
self,
927+
*,
928+
name: str,
929+
input: Optional[Any] = None,
930+
output: Optional[Any] = None,
931+
metadata: Optional[Any] = None,
932+
version: Optional[str] = None,
933+
level: Optional[SpanLevel] = None,
934+
status_message: Optional[str] = None,
935+
) -> "LangfuseEvent":
936+
"""Create a new Langfuse observation of type 'EVENT'.
937+
938+
Args:
939+
name: Name of the span (e.g., function or operation name)
940+
input: Input data for the operation (can be any JSON-serializable object)
941+
output: Output data from the operation (can be any JSON-serializable object)
942+
metadata: Additional metadata to associate with the span
943+
version: Version identifier for the code or component
944+
level: Importance level of the span (info, warning, error)
945+
status_message: Optional status message for the span
946+
947+
Returns:
948+
The LangfuseEvent object
949+
950+
Example:
951+
```python
952+
event = langfuse.create_event(name="process-event")
953+
```
954+
"""
955+
timestamp = time_ns()
956+
attributes = create_span_attributes(
957+
input=input,
958+
output=output,
959+
metadata=metadata,
960+
version=version,
961+
level=level,
962+
status_message=status_message,
963+
)
964+
965+
with otel_trace_api.use_span(self._otel_span):
966+
new_otel_span = self._langfuse_client._otel_tracer.start_span(
967+
name=name, attributes=attributes, start_time=timestamp
968+
)
969+
970+
if new_otel_span.is_recording:
971+
self._set_processed_span_attributes(
972+
span=new_otel_span,
973+
as_type="event",
974+
input=input,
975+
output=output,
976+
metadata=metadata,
977+
)
978+
979+
return LangfuseEvent(
980+
otel_span=new_otel_span,
981+
langfuse_client=self._langfuse_client,
982+
input=input,
983+
output=output,
984+
metadata=metadata,
985+
environment=self._environment,
986+
).end(end_time=timestamp)
987+
937988

938989
class LangfuseGeneration(LangfuseSpanWrapper):
939990
"""Specialized span implementation for AI model generations in Langfuse.
@@ -1069,3 +1120,37 @@ def update(
10691120
self._otel_span.set_attributes(attributes=attributes)
10701121

10711122
return self
1123+
1124+
1125+
class LangfuseEvent(LangfuseSpanWrapper):
1126+
"""Specialized span implementation for Langfuse Events."""
1127+
1128+
def __init__(
1129+
self,
1130+
*,
1131+
otel_span: otel_trace_api.Span,
1132+
langfuse_client: "Langfuse",
1133+
input: Optional[Any] = None,
1134+
output: Optional[Any] = None,
1135+
metadata: Optional[Any] = None,
1136+
environment: Optional[str] = None,
1137+
):
1138+
"""Initialize a new LangfuseEvent span.
1139+
1140+
Args:
1141+
otel_span: The OpenTelemetry span to wrap
1142+
langfuse_client: Reference to the parent Langfuse client
1143+
input: Input data for the generation (e.g., prompts)
1144+
output: Output from the generation (e.g., completions)
1145+
metadata: Additional metadata to associate with the generation
1146+
environment: The tracing environment
1147+
"""
1148+
super().__init__(
1149+
otel_span=otel_span,
1150+
as_type="event",
1151+
langfuse_client=langfuse_client,
1152+
input=input,
1153+
output=output,
1154+
metadata=metadata,
1155+
environment=environment,
1156+
)

0 commit comments

Comments
 (0)