diff --git a/langfuse/api/reference.md b/langfuse/api/reference.md
index c0dc254f4..0b6bf964a 100644
--- a/langfuse/api/reference.md
+++ b/langfuse/api/reference.md
@@ -1120,7 +1120,15 @@ client.health.health()
-
-Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+Batched ingestion for Langfuse Tracing.
+If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+
+Within each batch, there can be multiple events.
+Each event has a type, an id, a timestamp, metadata and a body.
+Internally, we refer to this as the "event envelope" as it tells us something about the event but not the trace.
+We use the event id within this envelope to deduplicate messages to avoid processing the same event twice, i.e. the event id should be unique per request.
+The event.body.id is the ID of the actual trace and will be used for updates and will be visible within the Langfuse App.
+I.e. if you want to update a trace, you'd use the same body id, but separate event IDs.
Notes:
@@ -1141,9 +1149,7 @@ Notes:
-
```python
-import datetime
-
-from langfuse import IngestionEvent_TraceCreate, TraceBody
+from langfuse import IngestionEvent_ScoreCreate, ScoreBody
from langfuse.client import FernLangfuse
client = FernLangfuse(
@@ -1156,29 +1162,17 @@ client = FernLangfuse(
)
client.ingestion.batch(
batch=[
- IngestionEvent_TraceCreate(
- body=TraceBody(
- id="string",
- timestamp=datetime.datetime.fromisoformat(
- "2024-01-15 09:30:00+00:00",
- ),
- name="string",
- user_id="string",
- input={"key": "value"},
- output={"key": "value"},
- session_id="string",
- release="string",
- version="string",
- metadata={"key": "value"},
- tags=["string"],
- public=True,
+ IngestionEvent_ScoreCreate(
+ id="abcdef-1234-5678-90ab",
+ timestamp="2022-01-01T00:00:00.000Z",
+ body=ScoreBody(
+ id="abcdef-1234-5678-90ab",
+ trace_id="1234-5678-90ab-cdef",
+ name="My Score",
+ value=0.9,
),
- id="string",
- timestamp="string",
- metadata={"key": "value"},
)
],
- metadata={"key": "value"},
)
```
diff --git a/langfuse/api/resources/commons/types/base_score.py b/langfuse/api/resources/commons/types/base_score.py
index 71bed6ef4..89394956b 100644
--- a/langfuse/api/resources/commons/types/base_score.py
+++ b/langfuse/api/resources/commons/types/base_score.py
@@ -33,6 +33,11 @@ class BaseScore(pydantic_v1.BaseModel):
Reference an annotation queue on a score. Populated if the score was initially created in an annotation queue.
"""
+ environment: typing.Optional[str] = pydantic_v1.Field(default=None)
+ """
+ The environment from which this score originated. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
+ """
+
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
"by_alias": True,
diff --git a/langfuse/api/resources/commons/types/observation.py b/langfuse/api/resources/commons/types/observation.py
index 93fabb754..b821476f9 100644
--- a/langfuse/api/resources/commons/types/observation.py
+++ b/langfuse/api/resources/commons/types/observation.py
@@ -125,6 +125,11 @@ class Observation(pydantic_v1.BaseModel):
The cost details of the observation. Key is the name of the cost metric, value is the cost in USD. The total key is the sum of all (non-total) cost metrics or the total value ingested.
"""
+ environment: typing.Optional[str] = pydantic_v1.Field(default=None)
+ """
+ The environment from which this observation originated. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
+ """
+
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
"by_alias": True,
diff --git a/langfuse/api/resources/commons/types/score.py b/langfuse/api/resources/commons/types/score.py
index e39221084..8eed33b78 100644
--- a/langfuse/api/resources/commons/types/score.py
+++ b/langfuse/api/resources/commons/types/score.py
@@ -28,6 +28,7 @@ class Score_Numeric(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["NUMERIC"] = pydantic_v1.Field(
alias="dataType", default="NUMERIC"
)
@@ -85,6 +86,7 @@ class Score_Categorical(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["CATEGORICAL"] = pydantic_v1.Field(
alias="dataType", default="CATEGORICAL"
)
@@ -142,6 +144,7 @@ class Score_Boolean(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["BOOLEAN"] = pydantic_v1.Field(
alias="dataType", default="BOOLEAN"
)
diff --git a/langfuse/api/resources/commons/types/trace.py b/langfuse/api/resources/commons/types/trace.py
index 9c9be5ae8..d977ed3d7 100644
--- a/langfuse/api/resources/commons/types/trace.py
+++ b/langfuse/api/resources/commons/types/trace.py
@@ -70,6 +70,11 @@ class Trace(pydantic_v1.BaseModel):
Public traces are accessible via url without login
"""
+ environment: typing.Optional[str] = pydantic_v1.Field(default=None)
+ """
+ The environment from which this trace originated. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
+ """
+
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
"by_alias": True,
diff --git a/langfuse/api/resources/ingestion/client.py b/langfuse/api/resources/ingestion/client.py
index cc58c32e4..7fd19ea06 100644
--- a/langfuse/api/resources/ingestion/client.py
+++ b/langfuse/api/resources/ingestion/client.py
@@ -31,7 +31,15 @@ def batch(
request_options: typing.Optional[RequestOptions] = None,
) -> IngestionResponse:
"""
- Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+ Batched ingestion for Langfuse Tracing.
+ If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+
+ Within each batch, there can be multiple events.
+ Each event has a type, an id, a timestamp, metadata and a body.
+ Internally, we refer to this as the "event envelope" as it tells us something about the event but not the trace.
+ We use the event id within this envelope to deduplicate messages to avoid processing the same event twice, i.e. the event id should be unique per request.
+ The event.body.id is the ID of the actual trace and will be used for updates and will be visible within the Langfuse App.
+ I.e. if you want to update a trace, you'd use the same body id, but separate event IDs.
Notes:
@@ -72,28 +80,26 @@ def batch(
client.ingestion.batch(
batch=[
IngestionEvent_TraceCreate(
+ id="abcdef-1234-5678-90ab",
+ timestamp="2022-01-01T00:00:00.000Z",
body=TraceBody(
- id="string",
+ id="abcdef-1234-5678-90ab",
timestamp=datetime.datetime.fromisoformat(
- "2024-01-15 09:30:00+00:00",
+ "2022-01-01 00:00:00+00:00",
),
- name="string",
- user_id="string",
- input={"key": "value"},
- output={"key": "value"},
- session_id="string",
- release="string",
- version="string",
- metadata={"key": "value"},
- tags=["string"],
+ name="My Trace",
+ user_id="1234-5678-90ab-cdef",
+ input="My input",
+ output="My output",
+ session_id="1234-5678-90ab-cdef",
+ release="1.0.0",
+ version="1.0.0",
+ metadata="My metadata",
+ tags=["tag1", "tag2"],
public=True,
),
- id="string",
- timestamp="string",
- metadata={"key": "value"},
)
],
- metadata={"key": "value"},
)
"""
_response = self._client_wrapper.httpx_client.request(
@@ -142,7 +148,15 @@ async def batch(
request_options: typing.Optional[RequestOptions] = None,
) -> IngestionResponse:
"""
- Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+ Batched ingestion for Langfuse Tracing.
+ If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+
+ Within each batch, there can be multiple events.
+ Each event has a type, an id, a timestamp, metadata and a body.
+ Internally, we refer to this as the "event envelope" as it tells us something about the event but not the trace.
+ We use the event id within this envelope to deduplicate messages to avoid processing the same event twice, i.e. the event id should be unique per request.
+ The event.body.id is the ID of the actual trace and will be used for updates and will be visible within the Langfuse App.
+ I.e. if you want to update a trace, you'd use the same body id, but separate event IDs.
Notes:
@@ -187,28 +201,26 @@ async def main() -> None:
await client.ingestion.batch(
batch=[
IngestionEvent_TraceCreate(
+ id="abcdef-1234-5678-90ab",
+ timestamp="2022-01-01T00:00:00.000Z",
body=TraceBody(
- id="string",
+ id="abcdef-1234-5678-90ab",
timestamp=datetime.datetime.fromisoformat(
- "2024-01-15 09:30:00+00:00",
+ "2022-01-01 00:00:00+00:00",
),
- name="string",
- user_id="string",
- input={"key": "value"},
- output={"key": "value"},
- session_id="string",
- release="string",
- version="string",
- metadata={"key": "value"},
- tags=["string"],
+ name="My Trace",
+ user_id="1234-5678-90ab-cdef",
+ input="My input",
+ output="My output",
+ session_id="1234-5678-90ab-cdef",
+ release="1.0.0",
+ version="1.0.0",
+ metadata="My metadata",
+ tags=["tag1", "tag2"],
public=True,
),
- id="string",
- timestamp="string",
- metadata={"key": "value"},
)
],
- metadata={"key": "value"},
)
diff --git a/langfuse/api/resources/ingestion/types/observation_body.py b/langfuse/api/resources/ingestion/types/observation_body.py
index 254c600da..d191a1f12 100644
--- a/langfuse/api/resources/ingestion/types/observation_body.py
+++ b/langfuse/api/resources/ingestion/types/observation_body.py
@@ -41,6 +41,7 @@ class ObservationBody(pydantic_v1.BaseModel):
parent_observation_id: typing.Optional[str] = pydantic_v1.Field(
alias="parentObservationId", default=None
)
+ environment: typing.Optional[str] = None
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
diff --git a/langfuse/api/resources/ingestion/types/optional_observation_body.py b/langfuse/api/resources/ingestion/types/optional_observation_body.py
index bd7cb5808..7302d30f9 100644
--- a/langfuse/api/resources/ingestion/types/optional_observation_body.py
+++ b/langfuse/api/resources/ingestion/types/optional_observation_body.py
@@ -25,6 +25,7 @@ class OptionalObservationBody(pydantic_v1.BaseModel):
alias="parentObservationId", default=None
)
version: typing.Optional[str] = None
+ environment: typing.Optional[str] = None
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
diff --git a/langfuse/api/resources/ingestion/types/score_body.py b/langfuse/api/resources/ingestion/types/score_body.py
index aad152dd8..dbe2fbbd9 100644
--- a/langfuse/api/resources/ingestion/types/score_body.py
+++ b/langfuse/api/resources/ingestion/types/score_body.py
@@ -25,6 +25,7 @@ class ScoreBody(pydantic_v1.BaseModel):
id: typing.Optional[str] = None
trace_id: str = pydantic_v1.Field(alias="traceId")
name: str
+ environment: typing.Optional[str] = None
value: CreateScoreValue = pydantic_v1.Field()
"""
The value of the score. Must be passed as string for categorical scores, and numeric for boolean and numeric scores. Boolean score values must equal either 1 or 0 (true or false)
diff --git a/langfuse/api/resources/ingestion/types/trace_body.py b/langfuse/api/resources/ingestion/types/trace_body.py
index 42cb4f197..3f5550435 100644
--- a/langfuse/api/resources/ingestion/types/trace_body.py
+++ b/langfuse/api/resources/ingestion/types/trace_body.py
@@ -21,6 +21,7 @@ class TraceBody(pydantic_v1.BaseModel):
version: typing.Optional[str] = None
metadata: typing.Optional[typing.Any] = None
tags: typing.Optional[typing.List[str]] = None
+ environment: typing.Optional[str] = None
public: typing.Optional[bool] = pydantic_v1.Field(default=None)
"""
Make trace publicly accessible via url
diff --git a/langfuse/api/resources/score/types/get_scores_response_data.py b/langfuse/api/resources/score/types/get_scores_response_data.py
index e1b317975..5642e0f80 100644
--- a/langfuse/api/resources/score/types/get_scores_response_data.py
+++ b/langfuse/api/resources/score/types/get_scores_response_data.py
@@ -30,6 +30,7 @@ class GetScoresResponseData_Numeric(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["NUMERIC"] = pydantic_v1.Field(
alias="dataType", default="NUMERIC"
)
@@ -88,6 +89,7 @@ class GetScoresResponseData_Categorical(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["CATEGORICAL"] = pydantic_v1.Field(
alias="dataType", default="CATEGORICAL"
)
@@ -146,6 +148,7 @@ class GetScoresResponseData_Boolean(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["BOOLEAN"] = pydantic_v1.Field(
alias="dataType", default="BOOLEAN"
)
diff --git a/langfuse/callback/langchain.py b/langfuse/callback/langchain.py
index c697c718d..668c0fd83 100644
--- a/langfuse/callback/langchain.py
+++ b/langfuse/callback/langchain.py
@@ -87,6 +87,7 @@ def __init__(
sdk_integration: Optional[str] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
) -> None:
LangfuseBaseCallbackHandler.__init__(
self,
@@ -113,6 +114,7 @@ def __init__(
sdk_integration=sdk_integration or "langchain",
sample_rate=sample_rate,
mask=mask,
+ environment=environment,
)
self.runs = {}
diff --git a/langfuse/client.py b/langfuse/client.py
index 6f1ba5eef..dc7ed9409 100644
--- a/langfuse/client.py
+++ b/langfuse/client.py
@@ -1,6 +1,7 @@
import datetime as dt
import logging
import os
+import re
import time
import tracemalloc
import typing
@@ -86,6 +87,8 @@
from .version import __version__ as version
+ENVIRONMENT_PATTERN = r"^(?!langfuse)[a-z0-9-_]+$"
+
@dataclass
class FetchTracesResponse:
@@ -185,6 +188,7 @@ def __init__(
enabled: Optional[bool] = True,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
):
"""Initialize the Langfuse client.
@@ -204,6 +208,7 @@ def __init__(
enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
+ environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
Raises:
ValueError: If public_key or secret_key are not set and not found in environment variables.
@@ -287,6 +292,15 @@ def __init__(
else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
)
+ self.environment = environment or os.environ.get("LANGFUSE_TRACING_ENVIRONMENT")
+
+ if self.environment and not bool(
+ re.match(ENVIRONMENT_PATTERN, self.environment)
+ ):
+ self.log.error(
+ f'Invalid environment specified "{environment}" that does not match validation pattern ("{ENVIRONMENT_PATTERN}"). Events will be rejected by Langfuse servers.'
+ )
+
self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
public_api_client = FernLangfuse(
@@ -1470,6 +1484,7 @@ def trace(
"tags": tags,
"timestamp": timestamp or _get_timestamp(),
"public": public,
+ "environment": self.environment,
}
if kwargs is not None:
new_dict.update(kwargs)
@@ -1493,7 +1508,12 @@ def trace(
self._log_memory_usage()
return StatefulTraceClient(
- self.client, new_id, StateType.TRACE, new_id, self.task_manager
+ self.client,
+ new_id,
+ StateType.TRACE,
+ new_id,
+ self.task_manager,
+ self.environment,
)
def _log_memory_usage(self):
@@ -1636,6 +1656,7 @@ def score(
"data_type": data_type,
"comment": comment,
"config_id": config_id,
+ "environment": self.environment,
**kwargs,
}
@@ -1659,10 +1680,16 @@ def score(
StateType.OBSERVATION,
trace_id,
self.task_manager,
+ self.environment,
)
else:
return StatefulClient(
- self.client, new_id, StateType.TRACE, new_id, self.task_manager
+ self.client,
+ new_id,
+ StateType.TRACE,
+ new_id,
+ self.task_manager,
+ self.environment,
)
def span(
@@ -1740,6 +1767,7 @@ def span(
"version": version,
"end_time": end_time,
"trace": {"release": self.release},
+ "environment": self.environment,
**kwargs,
}
@@ -1769,6 +1797,7 @@ def span(
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
+ self.environment,
)
def event(
@@ -1840,6 +1869,7 @@ def event(
"parent_observation_id": parent_observation_id,
"version": version,
"trace": {"release": self.release},
+ "environment": self.environment,
**kwargs,
}
@@ -1868,6 +1898,7 @@ def event(
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
+ self.environment,
)
def generation(
@@ -1969,6 +2000,7 @@ def generation(
"usage_details": usage_details,
"cost_details": cost_details,
"trace": {"release": self.release},
+ "environment": self.environment,
**_create_prompt_context(prompt),
**kwargs,
}
@@ -1978,6 +2010,7 @@ def generation(
"id": new_trace_id,
"release": self.release,
"name": name,
+ "environment": self.environment,
}
request = TraceBody(**trace)
@@ -2013,6 +2046,7 @@ def generation(
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
+ self.environment,
)
def _generate_trace(self, trace_id: str, name: str):
@@ -2020,6 +2054,7 @@ def _generate_trace(self, trace_id: str, name: str):
"id": trace_id,
"release": self.release,
"name": name,
+ "environment": self.environment,
}
trace_body = TraceBody(**trace_dict)
@@ -2108,6 +2143,7 @@ class StatefulClient(object):
state_type (StateType): Enum indicating whether the client is an observation or a trace.
trace_id (str): Id of the trace associated with the stateful client.
task_manager (TaskManager): Manager handling asynchronous tasks for the client.
+ environment (Optional(str)): The tracing environment.
"""
log = logging.getLogger("langfuse")
@@ -2119,6 +2155,7 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulClient.
@@ -2135,6 +2172,15 @@ def __init__(
self.state_type = state_type
self.task_manager = task_manager
+ self.environment = environment or os.environ.get("LANGFUSE_TRACING_ENVIRONMENT")
+
+ if self.environment and not bool(
+ re.match(ENVIRONMENT_PATTERN, self.environment)
+ ):
+ self.log.warning(
+ f'Invalid environment specified "{environment}" that does not match validation pattern ("{ENVIRONMENT_PATTERN}"). Setting will be ignored.'
+ )
+
def _add_state_to_event(self, body: dict):
if self.state_type == StateType.OBSERVATION:
body["parent_observation_id"] = self.id
@@ -2236,6 +2282,7 @@ def generation(
"usage": _convert_usage_input(usage) if usage is not None else None,
"usage_details": usage_details,
"cost_details": cost_details,
+ "environment": self.environment,
**_create_prompt_context(prompt),
**kwargs,
}
@@ -2264,7 +2311,8 @@ def generation(
generation_id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def span(
@@ -2328,6 +2376,7 @@ def span(
"status_message": status_message,
"version": version,
"end_time": end_time,
+ "environment": self.environment,
**kwargs,
}
@@ -2353,7 +2402,8 @@ def span(
span_id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
@overload
@@ -2435,6 +2485,7 @@ def score(
"data_type": data_type,
"comment": comment,
"config_id": config_id,
+ "environment": self.environment,
**kwargs,
}
@@ -2463,7 +2514,8 @@ def score(
self.id,
self.state_type,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def event(
@@ -2524,6 +2576,7 @@ def event(
"level": level,
"status_message": status_message,
"version": version,
+ "environment": self.environment,
**kwargs,
}
@@ -2552,6 +2605,7 @@ def event(
StateType.OBSERVATION,
self.trace_id,
self.task_manager,
+ self.environment,
)
def get_trace_url(self):
@@ -2582,9 +2636,10 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulGenerationClient."""
- super().__init__(client, id, state_type, trace_id, task_manager)
+ super().__init__(client, id, state_type, trace_id, task_manager, environment)
# WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
def update(
@@ -2696,7 +2751,8 @@ def update(
self.id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def end(
@@ -2801,9 +2857,10 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulSpanClient."""
- super().__init__(client, id, state_type, trace_id, task_manager)
+ super().__init__(client, id, state_type, trace_id, task_manager, environment)
# WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
def update(
@@ -2887,7 +2944,8 @@ def update(
self.id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def end(
@@ -2960,7 +3018,8 @@ def end(
self.id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def get_langchain_handler(self, update_parent: bool = False):
@@ -2999,9 +3058,10 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulTraceClient."""
- super().__init__(client, id, state_type, trace_id, task_manager)
+ super().__init__(client, id, state_type, trace_id, task_manager, environment)
self.task_manager = task_manager
def update(
@@ -3092,7 +3152,8 @@ def update(
self.id,
StateType.TRACE,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def get_langchain_handler(self, update_parent: bool = False):
diff --git a/langfuse/decorators/langfuse_decorator.py b/langfuse/decorators/langfuse_decorator.py
index 1116336f3..9145e83c8 100644
--- a/langfuse/decorators/langfuse_decorator.py
+++ b/langfuse/decorators/langfuse_decorator.py
@@ -344,6 +344,7 @@ def _prepare_call(
task_manager=self.client_instance.task_manager,
client=self.client_instance.client,
state_type=StateType.OBSERVATION,
+ environment=self.client_instance.environment,
)
self._set_root_trace_id(provided_parent_trace_id)
@@ -354,6 +355,7 @@ def _prepare_call(
task_manager=self.client_instance.task_manager,
client=self.client_instance.client,
state_type=StateType.TRACE,
+ environment=self.client_instance.environment,
)
self._set_root_trace_id(provided_parent_trace_id)
@@ -490,6 +492,7 @@ def _handle_call_result(
task_manager=self.client_instance.task_manager,
client=self.client_instance.client,
state_type=StateType.TRACE,
+ environment=self.client_instance.environment,
)
trace_client.update(**observation_params)
@@ -1057,6 +1060,7 @@ def configure(
httpx_client: Optional[httpx.Client] = None,
enabled: Optional[bool] = None,
mask: Optional[Callable] = None,
+ environment: Optional[str] = None,
):
"""Configure the Langfuse client.
@@ -1076,7 +1080,7 @@ def configure(
httpx_client: Pass your own httpx client for more customizability of requests.
enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
mask (Callable): Function that masks sensitive information from input and output in log messages.
-
+ environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
"""
langfuse_singleton = LangfuseSingleton()
langfuse_singleton.reset()
@@ -1095,6 +1099,7 @@ def configure(
httpx_client=httpx_client,
enabled=enabled,
mask=mask,
+ environment=environment,
)
@property
diff --git a/langfuse/llama_index/_event_handler.py b/langfuse/llama_index/_event_handler.py
index c299643e3..cb9015d68 100644
--- a/langfuse/llama_index/_event_handler.py
+++ b/langfuse/llama_index/_event_handler.py
@@ -1,4 +1,5 @@
-from typing import Optional, Any, Union, Mapping
+from typing import Any, Mapping, Optional, Union
+from uuid import uuid4 as create_uuid
from langfuse.client import (
Langfuse,
@@ -6,25 +7,25 @@
StateType,
)
from langfuse.utils import _get_timestamp
+
from ._context import InstrumentorContext
-from uuid import uuid4 as create_uuid
try:
from llama_index.core.base.llms.types import (
ChatResponse,
CompletionResponse,
)
+ from llama_index.core.instrumentation.event_handlers import BaseEventHandler
from llama_index.core.instrumentation.events import BaseEvent
from llama_index.core.instrumentation.events.embedding import (
- EmbeddingStartEvent,
EmbeddingEndEvent,
+ EmbeddingStartEvent,
)
- from llama_index.core.instrumentation.event_handlers import BaseEventHandler
from llama_index.core.instrumentation.events.llm import (
- LLMCompletionEndEvent,
- LLMCompletionStartEvent,
LLMChatEndEvent,
LLMChatStartEvent,
+ LLMCompletionEndEvent,
+ LLMCompletionStartEvent,
)
from llama_index.core.utilities.token_counting import TokenCounter
@@ -148,6 +149,7 @@ def _get_generation_client(self, id: str) -> StatefulGenerationClient:
trace_id=trace_id,
task_manager=self._langfuse.task_manager,
state_type=StateType.OBSERVATION,
+ environment=self._langfuse.environment,
)
diff --git a/langfuse/llama_index/_instrumentor.py b/langfuse/llama_index/_instrumentor.py
index f32e9b056..c8bb760d5 100644
--- a/langfuse/llama_index/_instrumentor.py
+++ b/langfuse/llama_index/_instrumentor.py
@@ -1,18 +1,18 @@
-import httpx
import uuid
from contextlib import contextmanager
-from typing import Optional, Dict, Any, List
from logging import getLogger
-from langfuse import Langfuse
+from typing import Any, Dict, List, Optional
+import httpx
+
+from langfuse import Langfuse
from langfuse.client import StatefulTraceClient, StateType
-from langfuse.utils.langfuse_singleton import LangfuseSingleton
from langfuse.types import MaskFunction
+from langfuse.utils.langfuse_singleton import LangfuseSingleton
from ._context import InstrumentorContext
-from ._span_handler import LlamaIndexSpanHandler
from ._event_handler import LlamaIndexEventHandler
-
+from ._span_handler import LlamaIndexSpanHandler
try:
from llama_index.core.instrumentation import get_dispatcher
@@ -63,6 +63,7 @@ class LlamaIndexInstrumentor:
enabled (Optional[bool]): Enable/disable the instrumentor
sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
+ environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
"""
def __init__(
@@ -81,6 +82,7 @@ def __init__(
enabled: Optional[bool] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
):
self._langfuse = LangfuseSingleton().get(
public_key=public_key,
@@ -97,6 +99,7 @@ def __init__(
sample_rate=sample_rate,
mask=mask,
sdk_integration="llama-index_instrumentation",
+ environment=environment,
)
self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse)
self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse)
@@ -285,6 +288,7 @@ def _get_trace_client(self, trace_id: str) -> StatefulTraceClient:
trace_id=trace_id,
task_manager=self._langfuse.task_manager,
state_type=StateType.TRACE,
+ environment=self._langfuse.environment,
)
@property
diff --git a/langfuse/llama_index/_span_handler.py b/langfuse/llama_index/_span_handler.py
index 14387d5aa..d3ff91a88 100644
--- a/langfuse/llama_index/_span_handler.py
+++ b/langfuse/llama_index/_span_handler.py
@@ -1,32 +1,32 @@
import inspect
-from typing import Optional, Any, Tuple, Generator, AsyncGenerator
import uuid
+from logging import getLogger
+from typing import Any, AsyncGenerator, Generator, Optional, Tuple
+
+from pydantic import BaseModel
from langfuse.client import (
Langfuse,
- StatefulSpanClient,
+ StatefulClient,
StatefulGenerationClient,
+ StatefulSpanClient,
StateType,
- StatefulClient,
)
-from logging import getLogger
-from pydantic import BaseModel
-
from ._context import InstrumentorContext
logger = getLogger(__name__)
try:
- from llama_index.core.instrumentation.span_handlers import BaseSpanHandler
- from llama_index.core.instrumentation.span import BaseSpan
- from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.base.base_query_engine import BaseQueryEngine
- from llama_index.core.llms import LLM, ChatResponse
+ from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.base.response.schema import (
- StreamingResponse,
AsyncStreamingResponse,
+ StreamingResponse,
)
+ from llama_index.core.instrumentation.span import BaseSpan
+ from llama_index.core.instrumentation.span_handlers import BaseSpanHandler
+ from llama_index.core.llms import LLM, ChatResponse
from llama_index.core.workflow import Context
except ImportError:
@@ -215,6 +215,7 @@ def _get_generation_client(self, id: str) -> StatefulGenerationClient:
trace_id=trace_id,
task_manager=self._langfuse_client.task_manager,
state_type=StateType.OBSERVATION,
+ environment=self._langfuse_client.environment,
)
def _get_span_client(self, id: str) -> StatefulSpanClient:
@@ -231,6 +232,7 @@ def _get_span_client(self, id: str) -> StatefulSpanClient:
trace_id=trace_id,
task_manager=self._langfuse_client.task_manager,
state_type=StateType.OBSERVATION,
+ environment=self._langfuse_client.environment,
)
def _parse_generation_input(
diff --git a/langfuse/llama_index/llama_index.py b/langfuse/llama_index/llama_index.py
index cb2f28992..b5658880a 100644
--- a/langfuse/llama_index/llama_index.py
+++ b/langfuse/llama_index/llama_index.py
@@ -1,32 +1,34 @@
+import logging
from collections import defaultdict
from contextvars import ContextVar
-from typing import Any, Dict, List, Optional, Union, Tuple, Callable, Generator
+from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union
from uuid import uuid4
-import logging
+
import httpx
+from pydantic import BaseModel
from langfuse.client import (
+ StatefulGenerationClient,
StatefulSpanClient,
StatefulTraceClient,
- StatefulGenerationClient,
StateType,
)
+from langfuse.types import TraceMetadata
+from langfuse.utils.base_callback_handler import LangfuseBaseCallbackHandler
from langfuse.utils.error_logging import (
auto_decorate_methods_with,
catch_and_log_errors,
)
-from langfuse.types import TraceMetadata
-from langfuse.utils.base_callback_handler import LangfuseBaseCallbackHandler
+
from .utils import CallbackEvent, ParsedLLMEndPayload
-from pydantic import BaseModel
try:
from llama_index.core.callbacks.base_handler import (
BaseCallbackHandler as LlamaIndexBaseCallbackHandler,
)
from llama_index.core.callbacks.schema import (
- CBEventType,
BASE_TRACE_EVENT,
+ CBEventType,
EventPayload,
)
from llama_index.core.utilities.token_counting import TokenCounter
@@ -165,6 +167,7 @@ def set_root(
StateType.TRACE,
root.trace_id,
root.task_manager,
+ root.environment,
)
self._task_manager = root.task_manager
diff --git a/langfuse/openai.py b/langfuse/openai.py
index 59e7e6c2a..661e2e918 100644
--- a/langfuse/openai.py
+++ b/langfuse/openai.py
@@ -773,6 +773,7 @@ def initialize(self):
enabled=openai.langfuse_enabled,
sdk_integration="openai",
sample_rate=openai.langfuse_sample_rate,
+ environment=openai.langfuse_environment,
mask=openai.langfuse_mask,
)
@@ -818,6 +819,7 @@ def register_tracing(self):
setattr(openai, "langfuse_debug", None)
setattr(openai, "langfuse_enabled", True)
setattr(openai, "langfuse_sample_rate", None)
+ setattr(openai, "langfuse_environment", None)
setattr(openai, "langfuse_mask", None)
setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
setattr(openai, "flush_langfuse", self.flush)
diff --git a/langfuse/utils/base_callback_handler.py b/langfuse/utils/base_callback_handler.py
index 06de96d94..e2f7920aa 100644
--- a/langfuse/utils/base_callback_handler.py
+++ b/langfuse/utils/base_callback_handler.py
@@ -1,10 +1,11 @@
-from typing import Optional, Union, List, Any, Callable
-import httpx
import logging
import os
import warnings
+from typing import Any, Callable, List, Optional, Union
+
+import httpx
-from langfuse.client import Langfuse, StatefulTraceClient, StatefulSpanClient, StateType
+from langfuse.client import Langfuse, StatefulSpanClient, StatefulTraceClient, StateType
class LangfuseBaseCallbackHandler:
@@ -38,6 +39,7 @@ def __init__(
sdk_integration: str,
sample_rate: Optional[float] = None,
mask: Optional[Callable] = None,
+ environment: Optional[str] = None,
) -> None:
self.version = version
self.session_id = session_id
@@ -109,6 +111,8 @@ def __init__(
args["sample_rate"] = prio_sample_rate
if mask is not None:
args["mask"] = mask
+ if environment is not None:
+ args["environment"] = environment
args["sdk_integration"] = sdk_integration
diff --git a/langfuse/utils/langfuse_singleton.py b/langfuse/utils/langfuse_singleton.py
index 94a79327d..0bf0dbd76 100644
--- a/langfuse/utils/langfuse_singleton.py
+++ b/langfuse/utils/langfuse_singleton.py
@@ -37,6 +37,7 @@ def get(
enabled: Optional[bool] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
) -> Langfuse:
if self._langfuse:
return self._langfuse
@@ -61,6 +62,7 @@ def get(
"enabled": enabled,
"sample_rate": sample_rate,
"mask": mask,
+ "environment": environment,
}
self._langfuse = Langfuse(
diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py
index a09b7c1f9..38aea9a7a 100644
--- a/tests/test_core_sdk.py
+++ b/tests/test_core_sdk.py
@@ -1530,3 +1530,57 @@ def test_generate_trace_id():
trace_url
== f"http://localhost:3000/project/7a88fb47-b4e2-43b8-a06c-a5ce950dc53a/traces/{trace_id}"
)
+
+
+def test_environment_from_constructor():
+ # Test with valid environment
+ langfuse = Langfuse(debug=True, environment="production")
+ api_wrapper = LangfuseAPI()
+
+ trace = langfuse.trace(name="test_environment")
+ sleep(0.1)
+ trace.update(name="updated_name")
+
+ generation = trace.generation(name="test_gen")
+ sleep(0.1)
+ generation.update(name="test_gen_1")
+
+ score_id = create_uuid()
+ langfuse.score(id=score_id, trace_id=trace.id, name="test_score", value=1)
+
+ langfuse.flush()
+ sleep(1)
+
+ fetched_trace = api_wrapper.get_trace(trace.id)
+ assert fetched_trace["environment"] == "production"
+
+ # Check that observations have the environment
+ gen = [o for o in fetched_trace["observations"] if o["id"] == generation.id][0]
+ assert gen["environment"] == "production"
+
+ # Check that scores have the environment
+ assert fetched_trace["scores"][0]["environment"] == "production"
+
+
+def test_environment_from_env_var(monkeypatch):
+ # Test with environment variable
+ monkeypatch.setenv("LANGFUSE_TRACING_ENVIRONMENT", "staging")
+
+ langfuse = Langfuse(debug=True)
+ api_wrapper = LangfuseAPI()
+
+ trace = langfuse.trace(name="test_environment_var")
+ langfuse.flush()
+ sleep(1)
+
+ fetched_trace = api_wrapper.get_trace(trace.id)
+ assert fetched_trace["environment"] == "staging"
+
+ # Test that constructor overrides environment variable
+ langfuse = Langfuse(debug=False, environment="testing")
+ trace = langfuse.trace(name="test_environment_override")
+ langfuse.flush()
+ sleep(1)
+
+ fetched_trace = api_wrapper.get_trace(trace.id)
+ assert fetched_trace["environment"] == "testing"
diff --git a/tests/test_langchain.py b/tests/test_langchain.py
index 90cdec1f2..56a56f69d 100644
--- a/tests/test_langchain.py
+++ b/tests/test_langchain.py
@@ -2339,6 +2339,8 @@ def test_cached_token_usage():
config = {"callbacks": [handler]}
chain.invoke({"test_param": "in a funny way"}, config)
+ chain.invoke({"test_param": "in a funny way"}, config)
+ sleep(1)
# invoke again to force cached token usage
chain.invoke({"test_param": "in a funny way"}, config)
@@ -2359,8 +2361,11 @@ def test_cached_token_usage():
assert generation.cost_details["input_cache_read"] > 0
assert (
- generation.cost_details["input"]
- + generation.cost_details["input_cache_read"]
- + generation.cost_details["output"]
- == generation.cost_details["total"]
+ abs(
+ generation.cost_details["input"]
+ + generation.cost_details["input_cache_read"]
+ + generation.cost_details["output"]
+ - generation.cost_details["total"]
+ )
+ < 0.0001
)