From 8d511ccfb940266db0a09e5acf2c78fa91e7a7d5 Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Tue, 25 Feb 2025 15:51:50 +0100
Subject: [PATCH 1/9] feat(client): add native environment support
---
langfuse/api/reference.md | 42 +++++-----
.../api/resources/commons/types/base_score.py | 5 ++
.../resources/commons/types/observation.py | 5 ++
langfuse/api/resources/commons/types/score.py | 3 +
langfuse/api/resources/commons/types/trace.py | 5 ++
langfuse/api/resources/ingestion/client.py | 76 +++++++++++--------
.../score/types/get_scores_response_data.py | 3 +
7 files changed, 83 insertions(+), 56 deletions(-)
diff --git a/langfuse/api/reference.md b/langfuse/api/reference.md
index c0dc254f4..0b6bf964a 100644
--- a/langfuse/api/reference.md
+++ b/langfuse/api/reference.md
@@ -1120,7 +1120,15 @@ client.health.health()
-
-Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+Batched ingestion for Langfuse Tracing.
+If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+
+Within each batch, there can be multiple events.
+Each event has a type, an id, a timestamp, metadata and a body.
+Internally, we refer to this as the "event envelope" as it tells us something about the event but not the trace.
+We use the event id within this envelope to deduplicate messages to avoid processing the same event twice, i.e. the event id should be unique per request.
+The event.body.id is the ID of the actual trace and will be used for updates and will be visible within the Langfuse App.
+I.e. if you want to update a trace, you'd use the same body id, but separate event IDs.
Notes:
@@ -1141,9 +1149,7 @@ Notes:
-
```python
-import datetime
-
-from langfuse import IngestionEvent_TraceCreate, TraceBody
+from langfuse import IngestionEvent_ScoreCreate, ScoreBody
from langfuse.client import FernLangfuse
client = FernLangfuse(
@@ -1156,29 +1162,17 @@ client = FernLangfuse(
)
client.ingestion.batch(
batch=[
- IngestionEvent_TraceCreate(
- body=TraceBody(
- id="string",
- timestamp=datetime.datetime.fromisoformat(
- "2024-01-15 09:30:00+00:00",
- ),
- name="string",
- user_id="string",
- input={"key": "value"},
- output={"key": "value"},
- session_id="string",
- release="string",
- version="string",
- metadata={"key": "value"},
- tags=["string"],
- public=True,
+ IngestionEvent_ScoreCreate(
+ id="abcdef-1234-5678-90ab",
+ timestamp="2022-01-01T00:00:00.000Z",
+ body=ScoreBody(
+ id="abcdef-1234-5678-90ab",
+ trace_id="1234-5678-90ab-cdef",
+ name="My Score",
+ value=0.9,
),
- id="string",
- timestamp="string",
- metadata={"key": "value"},
)
],
- metadata={"key": "value"},
)
```
diff --git a/langfuse/api/resources/commons/types/base_score.py b/langfuse/api/resources/commons/types/base_score.py
index 71bed6ef4..89394956b 100644
--- a/langfuse/api/resources/commons/types/base_score.py
+++ b/langfuse/api/resources/commons/types/base_score.py
@@ -33,6 +33,11 @@ class BaseScore(pydantic_v1.BaseModel):
Reference an annotation queue on a score. Populated if the score was initially created in an annotation queue.
"""
+ environment: typing.Optional[str] = pydantic_v1.Field(default=None)
+ """
+ The environment from which this score originated. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
+ """
+
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
"by_alias": True,
diff --git a/langfuse/api/resources/commons/types/observation.py b/langfuse/api/resources/commons/types/observation.py
index 93fabb754..b821476f9 100644
--- a/langfuse/api/resources/commons/types/observation.py
+++ b/langfuse/api/resources/commons/types/observation.py
@@ -125,6 +125,11 @@ class Observation(pydantic_v1.BaseModel):
The cost details of the observation. Key is the name of the cost metric, value is the cost in USD. The total key is the sum of all (non-total) cost metrics or the total value ingested.
"""
+ environment: typing.Optional[str] = pydantic_v1.Field(default=None)
+ """
+ The environment from which this observation originated. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
+ """
+
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
"by_alias": True,
diff --git a/langfuse/api/resources/commons/types/score.py b/langfuse/api/resources/commons/types/score.py
index e39221084..8eed33b78 100644
--- a/langfuse/api/resources/commons/types/score.py
+++ b/langfuse/api/resources/commons/types/score.py
@@ -28,6 +28,7 @@ class Score_Numeric(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["NUMERIC"] = pydantic_v1.Field(
alias="dataType", default="NUMERIC"
)
@@ -85,6 +86,7 @@ class Score_Categorical(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["CATEGORICAL"] = pydantic_v1.Field(
alias="dataType", default="CATEGORICAL"
)
@@ -142,6 +144,7 @@ class Score_Boolean(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["BOOLEAN"] = pydantic_v1.Field(
alias="dataType", default="BOOLEAN"
)
diff --git a/langfuse/api/resources/commons/types/trace.py b/langfuse/api/resources/commons/types/trace.py
index 9c9be5ae8..d977ed3d7 100644
--- a/langfuse/api/resources/commons/types/trace.py
+++ b/langfuse/api/resources/commons/types/trace.py
@@ -70,6 +70,11 @@ class Trace(pydantic_v1.BaseModel):
Public traces are accessible via url without login
"""
+ environment: typing.Optional[str] = pydantic_v1.Field(default=None)
+ """
+ The environment from which this trace originated. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
+ """
+
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
"by_alias": True,
diff --git a/langfuse/api/resources/ingestion/client.py b/langfuse/api/resources/ingestion/client.py
index cc58c32e4..7fd19ea06 100644
--- a/langfuse/api/resources/ingestion/client.py
+++ b/langfuse/api/resources/ingestion/client.py
@@ -31,7 +31,15 @@ def batch(
request_options: typing.Optional[RequestOptions] = None,
) -> IngestionResponse:
"""
- Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+ Batched ingestion for Langfuse Tracing.
+ If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+
+ Within each batch, there can be multiple events.
+ Each event has a type, an id, a timestamp, metadata and a body.
+ Internally, we refer to this as the "event envelope" as it tells us something about the event but not the trace.
+ We use the event id within this envelope to deduplicate messages to avoid processing the same event twice, i.e. the event id should be unique per request.
+ The event.body.id is the ID of the actual trace and will be used for updates and will be visible within the Langfuse App.
+ I.e. if you want to update a trace, you'd use the same body id, but separate event IDs.
Notes:
@@ -72,28 +80,26 @@ def batch(
client.ingestion.batch(
batch=[
IngestionEvent_TraceCreate(
+ id="abcdef-1234-5678-90ab",
+ timestamp="2022-01-01T00:00:00.000Z",
body=TraceBody(
- id="string",
+ id="abcdef-1234-5678-90ab",
timestamp=datetime.datetime.fromisoformat(
- "2024-01-15 09:30:00+00:00",
+ "2022-01-01 00:00:00+00:00",
),
- name="string",
- user_id="string",
- input={"key": "value"},
- output={"key": "value"},
- session_id="string",
- release="string",
- version="string",
- metadata={"key": "value"},
- tags=["string"],
+ name="My Trace",
+ user_id="1234-5678-90ab-cdef",
+ input="My input",
+ output="My output",
+ session_id="1234-5678-90ab-cdef",
+ release="1.0.0",
+ version="1.0.0",
+ metadata="My metadata",
+ tags=["tag1", "tag2"],
public=True,
),
- id="string",
- timestamp="string",
- metadata={"key": "value"},
)
],
- metadata={"key": "value"},
)
"""
_response = self._client_wrapper.httpx_client.request(
@@ -142,7 +148,15 @@ async def batch(
request_options: typing.Optional[RequestOptions] = None,
) -> IngestionResponse:
"""
- Batched ingestion for Langfuse Tracing. If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+ Batched ingestion for Langfuse Tracing.
+ If you want to use tracing via the API, such as to build your own Langfuse client implementation, this is the only API route you need to implement.
+
+ Within each batch, there can be multiple events.
+ Each event has a type, an id, a timestamp, metadata and a body.
+ Internally, we refer to this as the "event envelope" as it tells us something about the event but not the trace.
+ We use the event id within this envelope to deduplicate messages to avoid processing the same event twice, i.e. the event id should be unique per request.
+ The event.body.id is the ID of the actual trace and will be used for updates and will be visible within the Langfuse App.
+ I.e. if you want to update a trace, you'd use the same body id, but separate event IDs.
Notes:
@@ -187,28 +201,26 @@ async def main() -> None:
await client.ingestion.batch(
batch=[
IngestionEvent_TraceCreate(
+ id="abcdef-1234-5678-90ab",
+ timestamp="2022-01-01T00:00:00.000Z",
body=TraceBody(
- id="string",
+ id="abcdef-1234-5678-90ab",
timestamp=datetime.datetime.fromisoformat(
- "2024-01-15 09:30:00+00:00",
+ "2022-01-01 00:00:00+00:00",
),
- name="string",
- user_id="string",
- input={"key": "value"},
- output={"key": "value"},
- session_id="string",
- release="string",
- version="string",
- metadata={"key": "value"},
- tags=["string"],
+ name="My Trace",
+ user_id="1234-5678-90ab-cdef",
+ input="My input",
+ output="My output",
+ session_id="1234-5678-90ab-cdef",
+ release="1.0.0",
+ version="1.0.0",
+ metadata="My metadata",
+ tags=["tag1", "tag2"],
public=True,
),
- id="string",
- timestamp="string",
- metadata={"key": "value"},
)
],
- metadata={"key": "value"},
)
diff --git a/langfuse/api/resources/score/types/get_scores_response_data.py b/langfuse/api/resources/score/types/get_scores_response_data.py
index e1b317975..5642e0f80 100644
--- a/langfuse/api/resources/score/types/get_scores_response_data.py
+++ b/langfuse/api/resources/score/types/get_scores_response_data.py
@@ -30,6 +30,7 @@ class GetScoresResponseData_Numeric(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["NUMERIC"] = pydantic_v1.Field(
alias="dataType", default="NUMERIC"
)
@@ -88,6 +89,7 @@ class GetScoresResponseData_Categorical(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["CATEGORICAL"] = pydantic_v1.Field(
alias="dataType", default="CATEGORICAL"
)
@@ -146,6 +148,7 @@ class GetScoresResponseData_Boolean(pydantic_v1.BaseModel):
comment: typing.Optional[str] = None
config_id: typing.Optional[str] = pydantic_v1.Field(alias="configId", default=None)
queue_id: typing.Optional[str] = pydantic_v1.Field(alias="queueId", default=None)
+ environment: typing.Optional[str] = None
data_type: typing.Literal["BOOLEAN"] = pydantic_v1.Field(
alias="dataType", default="BOOLEAN"
)
From 5ba7f318fe87549f65158c923601e57520107fc0 Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Tue, 25 Feb 2025 17:36:04 +0100
Subject: [PATCH 2/9] add environment to client
---
langfuse/client.py | 50 +++++++++++++++++++++++++++++++--
tests/test_core_sdk.py | 63 +++++++++++++++++++++++++++++++++++++++++-
2 files changed, 110 insertions(+), 3 deletions(-)
diff --git a/langfuse/client.py b/langfuse/client.py
index 6f1ba5eef..5eff8b87b 100644
--- a/langfuse/client.py
+++ b/langfuse/client.py
@@ -1,6 +1,7 @@
import datetime as dt
import logging
import os
+import re
import time
import tracemalloc
import typing
@@ -86,6 +87,8 @@
from .version import __version__ as version
+ENVIRONMENT_PATTERN = r"^(?!langfuse-)[a-z0-9-_]+$"
+
@dataclass
class FetchTracesResponse:
@@ -185,6 +188,7 @@ def __init__(
enabled: Optional[bool] = True,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
):
"""Initialize the Langfuse client.
@@ -204,6 +208,7 @@ def __init__(
enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
+ environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
Raises:
ValueError: If public_key or secret_key are not set and not found in environment variables.
@@ -287,6 +292,16 @@ def __init__(
else os.environ.get("LANGFUSE_HOST", "https://cloud.langfuse.com")
)
+ self.environment = environment or os.environ.get("LANGFUSE_TRACING_ENVIRONMENT")
+
+ if self.environment and not bool(
+ re.match(ENVIRONMENT_PATTERN, self.environment)
+ ):
+ self.log.warning(
+ f'Invalid environment specified "{environment}" that does not match validation pattern ("{ENVIRONMENT_PATTERN}"). Setting will be ignored.'
+ )
+ self.environment = None
+
self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
public_api_client = FernLangfuse(
@@ -1470,6 +1485,7 @@ def trace(
"tags": tags,
"timestamp": timestamp or _get_timestamp(),
"public": public,
+ "environment": self.environment,
}
if kwargs is not None:
new_dict.update(kwargs)
@@ -1636,6 +1652,7 @@ def score(
"data_type": data_type,
"comment": comment,
"config_id": config_id,
+ "environment": self.environment,
**kwargs,
}
@@ -1659,10 +1676,16 @@ def score(
StateType.OBSERVATION,
trace_id,
self.task_manager,
+ self.environment,
)
else:
return StatefulClient(
- self.client, new_id, StateType.TRACE, new_id, self.task_manager
+ self.client,
+ new_id,
+ StateType.TRACE,
+ new_id,
+ self.task_manager,
+ self.environment,
)
def span(
@@ -1740,6 +1763,7 @@ def span(
"version": version,
"end_time": end_time,
"trace": {"release": self.release},
+ "environment": self.environment,
**kwargs,
}
@@ -1840,6 +1864,7 @@ def event(
"parent_observation_id": parent_observation_id,
"version": version,
"trace": {"release": self.release},
+ "environment": self.environment,
**kwargs,
}
@@ -1969,6 +1994,7 @@ def generation(
"usage_details": usage_details,
"cost_details": cost_details,
"trace": {"release": self.release},
+ "environment": self.environment,
**_create_prompt_context(prompt),
**kwargs,
}
@@ -1978,6 +2004,7 @@ def generation(
"id": new_trace_id,
"release": self.release,
"name": name,
+ "environment": self.environment,
}
request = TraceBody(**trace)
@@ -2020,6 +2047,7 @@ def _generate_trace(self, trace_id: str, name: str):
"id": trace_id,
"release": self.release,
"name": name,
+ "environment": self.environment,
}
trace_body = TraceBody(**trace_dict)
@@ -2108,6 +2136,7 @@ class StatefulClient(object):
state_type (StateType): Enum indicating whether the client is an observation or a trace.
trace_id (str): Id of the trace associated with the stateful client.
task_manager (TaskManager): Manager handling asynchronous tasks for the client.
+ environment (Optional(str)): The tracing environment.
"""
log = logging.getLogger("langfuse")
@@ -2119,6 +2148,7 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulClient.
@@ -2135,6 +2165,16 @@ def __init__(
self.state_type = state_type
self.task_manager = task_manager
+ self.environment = environment or os.environ.get("LANGFUSE_TRACING_ENVIRONMENT")
+
+ if self.environment and not bool(
+ re.match(ENVIRONMENT_PATTERN, self.environment)
+ ):
+ self.log.warning(
+ f'Invalid environment specified "{environment}" that does not match validation pattern ("{ENVIRONMENT_PATTERN}"). Setting will be ignored.'
+ )
+ self.environment = None
+
def _add_state_to_event(self, body: dict):
if self.state_type == StateType.OBSERVATION:
body["parent_observation_id"] = self.id
@@ -2236,6 +2276,7 @@ def generation(
"usage": _convert_usage_input(usage) if usage is not None else None,
"usage_details": usage_details,
"cost_details": cost_details,
+ "environment": self.environment,
**_create_prompt_context(prompt),
**kwargs,
}
@@ -2328,6 +2369,7 @@ def span(
"status_message": status_message,
"version": version,
"end_time": end_time,
+ "environment": self.environment,
**kwargs,
}
@@ -2435,6 +2477,7 @@ def score(
"data_type": data_type,
"comment": comment,
"config_id": config_id,
+ "environment": self.environment,
**kwargs,
}
@@ -2463,7 +2506,8 @@ def score(
self.id,
self.state_type,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def event(
@@ -2524,6 +2568,7 @@ def event(
"level": level,
"status_message": status_message,
"version": version,
+ "environment": self.environment,
**kwargs,
}
@@ -2552,6 +2597,7 @@ def event(
StateType.OBSERVATION,
self.trace_id,
self.task_manager,
+ self.environment,
)
def get_trace_url(self):
diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py
index a09b7c1f9..f06970d4c 100644
--- a/tests/test_core_sdk.py
+++ b/tests/test_core_sdk.py
@@ -1528,5 +1528,66 @@ def test_generate_trace_id():
trace_url = langfuse.get_trace_url()
assert (
trace_url
- == f"http://localhost:3000/project/7a88fb47-b4e2-43b8-a06c-a5ce950dc53a/traces/{trace_id}"
+ == f"http://localhost:300/project/7a88fb47-b4e2-43b8-a06c-a5ce950dc53a/traces/{trace_id}"
)
+
+
+def test_environment_from_constructor():
+ # Test with valid environment
+ langfuse = Langfuse(debug=True, environment="production")
+ api_wrapper = LangfuseAPI()
+
+ trace = langfuse.trace(name="test_environment")
+ generation = trace.generation(name="test_gen")
+ score_id = create_uuid()
+ langfuse.score(id=score_id, trace_id=trace.id, name="test_score", value=1)
+
+ langfuse.flush()
+ sleep(1)
+
+ fetched_trace = api_wrapper.get_trace(trace.id)
+ assert fetched_trace["environment"] == "production"
+
+ # Check that observations have the environment
+ gen = [o for o in fetched_trace["observations"] if o["id"] == generation.id][0]
+ assert gen["environment"] == "production"
+
+ # Check that scores have the environment
+ assert fetched_trace["scores"][0]["environment"] == "production"
+
+
+def test_environment_from_env_var(monkeypatch):
+ # Test with environment variable
+ monkeypatch.setenv("LANGFUSE_TRACING_ENVIRONMENT", "staging")
+
+ langfuse = Langfuse(debug=True)
+ api_wrapper = LangfuseAPI()
+
+ trace = langfuse.trace(name="test_environment_var")
+ langfuse.flush()
+ sleep(1)
+
+ fetched_trace = api_wrapper.get_trace(trace.id)
+ assert fetched_trace["environment"] == "staging"
+
+ # Test that constructor overrides environment variable
+ langfuse = Langfuse(debug=False, environment="testing")
+ trace = langfuse.trace(name="test_environment_override")
+ langfuse.flush()
+ sleep(1)
+
+ fetched_trace = api_wrapper.get_trace(trace.id)
+ assert fetched_trace["environment"] == "testing"
+
+
+def test_invalid_environment():
+ # Test with invalid environment (too long)
+ langfuse = Langfuse(debug=True, environment="UPPERCASE")
+ api_wrapper = LangfuseAPI()
+
+ trace = langfuse.trace(name="test_invalid_environment")
+ langfuse.flush()
+ sleep(1)
+
+ fetched_trace = api_wrapper.get_trace(trace.id)
+ assert fetched_trace["environment"] is None
From 00dbfea0a2cdefc36941f43f95d4bcee431a2410 Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Tue, 25 Feb 2025 17:51:58 +0100
Subject: [PATCH 3/9] add environment support to integrations
---
langfuse/callback/langchain.py | 2 ++
langfuse/client.py | 2 +-
langfuse/decorators/langfuse_decorator.py | 4 +++-
langfuse/llama_index/_instrumentor.py | 15 +++++++++------
langfuse/openai.py | 6 +++++-
langfuse/utils/base_callback_handler.py | 10 +++++++---
langfuse/utils/langfuse_singleton.py | 2 ++
7 files changed, 29 insertions(+), 12 deletions(-)
diff --git a/langfuse/callback/langchain.py b/langfuse/callback/langchain.py
index c697c718d..668c0fd83 100644
--- a/langfuse/callback/langchain.py
+++ b/langfuse/callback/langchain.py
@@ -87,6 +87,7 @@ def __init__(
sdk_integration: Optional[str] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
) -> None:
LangfuseBaseCallbackHandler.__init__(
self,
@@ -113,6 +114,7 @@ def __init__(
sdk_integration=sdk_integration or "langchain",
sample_rate=sample_rate,
mask=mask,
+ environment=environment,
)
self.runs = {}
diff --git a/langfuse/client.py b/langfuse/client.py
index 5eff8b87b..3c144d20b 100644
--- a/langfuse/client.py
+++ b/langfuse/client.py
@@ -208,7 +208,7 @@ def __init__(
enabled: Enables or disables the Langfuse client. If disabled, all observability calls to the backend will be no-ops.
sample_rate: Sampling rate for tracing. If set to 0.2, only 20% of the data will be sent to the backend. Can be set via `LANGFUSE_SAMPLE_RATE` environment variable.
mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
- environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'.
+ environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
Raises:
ValueError: If public_key or secret_key are not set and not found in environment variables.
diff --git a/langfuse/decorators/langfuse_decorator.py b/langfuse/decorators/langfuse_decorator.py
index 1116336f3..a6d5743e5 100644
--- a/langfuse/decorators/langfuse_decorator.py
+++ b/langfuse/decorators/langfuse_decorator.py
@@ -1057,6 +1057,7 @@ def configure(
httpx_client: Optional[httpx.Client] = None,
enabled: Optional[bool] = None,
mask: Optional[Callable] = None,
+ environment: Optional[str] = None,
):
"""Configure the Langfuse client.
@@ -1076,7 +1077,7 @@ def configure(
httpx_client: Pass your own httpx client for more customizability of requests.
enabled: Enables or disables the Langfuse client. Defaults to True. If disabled, no observability data will be sent to Langfuse. If data is requested while disabled, an error will be raised.
mask (Callable): Function that masks sensitive information from input and output in log messages.
-
+ environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
"""
langfuse_singleton = LangfuseSingleton()
langfuse_singleton.reset()
@@ -1095,6 +1096,7 @@ def configure(
httpx_client=httpx_client,
enabled=enabled,
mask=mask,
+ environment=environment,
)
@property
diff --git a/langfuse/llama_index/_instrumentor.py b/langfuse/llama_index/_instrumentor.py
index f32e9b056..eeeff3c04 100644
--- a/langfuse/llama_index/_instrumentor.py
+++ b/langfuse/llama_index/_instrumentor.py
@@ -1,18 +1,18 @@
-import httpx
import uuid
from contextlib import contextmanager
-from typing import Optional, Dict, Any, List
from logging import getLogger
-from langfuse import Langfuse
+from typing import Any, Dict, List, Optional
+import httpx
+
+from langfuse import Langfuse
from langfuse.client import StatefulTraceClient, StateType
-from langfuse.utils.langfuse_singleton import LangfuseSingleton
from langfuse.types import MaskFunction
+from langfuse.utils.langfuse_singleton import LangfuseSingleton
from ._context import InstrumentorContext
-from ._span_handler import LlamaIndexSpanHandler
from ._event_handler import LlamaIndexEventHandler
-
+from ._span_handler import LlamaIndexSpanHandler
try:
from llama_index.core.instrumentation import get_dispatcher
@@ -63,6 +63,7 @@ class LlamaIndexInstrumentor:
enabled (Optional[bool]): Enable/disable the instrumentor
sample_rate (Optional[float]): Sample rate for logging (0.0 to 1.0)
mask (langfuse.types.MaskFunction): Masking function for 'input' and 'output' fields in events. Function must take a single keyword argument `data` and return a serializable, masked version of the data.
+ environment (optional): The tracing environment. Can be any lowercase alphanumeric string with hyphens and underscores that does not start with 'langfuse'. Can bet set via `LANGFUSE_TRACING_ENVIRONMENT` environment variable.
"""
def __init__(
@@ -81,6 +82,7 @@ def __init__(
enabled: Optional[bool] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
):
self._langfuse = LangfuseSingleton().get(
public_key=public_key,
@@ -97,6 +99,7 @@ def __init__(
sample_rate=sample_rate,
mask=mask,
sdk_integration="llama-index_instrumentation",
+ environment=environment,
)
self._span_handler = LlamaIndexSpanHandler(langfuse_client=self._langfuse)
self._event_handler = LlamaIndexEventHandler(langfuse_client=self._langfuse)
diff --git a/langfuse/openai.py b/langfuse/openai.py
index 31206d77e..06405b315 100644
--- a/langfuse/openai.py
+++ b/langfuse/openai.py
@@ -172,7 +172,9 @@ def get_openai_args(self):
# If OpenAI model distillation is enabled, we need to add the metadata to the kwargs
# https://platform.openai.com/docs/guides/distillation
if self.kwargs.get("store", False):
- self.kwargs["metadata"] = {} if self.args.get("metadata", None) is None else self.args["metadata"]
+ self.kwargs["metadata"] = (
+ {} if self.args.get("metadata", None) is None else self.args["metadata"]
+ )
# OpenAI does not support non-string type values in metadata when using
# model distillation feature
@@ -771,6 +773,7 @@ def initialize(self):
enabled=openai.langfuse_enabled,
sdk_integration="openai",
sample_rate=openai.langfuse_sample_rate,
+ environment=openai.langfuse_environment,
)
return self._langfuse
@@ -815,6 +818,7 @@ def register_tracing(self):
setattr(openai, "langfuse_debug", None)
setattr(openai, "langfuse_enabled", True)
setattr(openai, "langfuse_sample_rate", None)
+ setattr(openai, "langfuse_environment", None)
setattr(openai, "langfuse_mask", None)
setattr(openai, "langfuse_auth_check", self.langfuse_auth_check)
setattr(openai, "flush_langfuse", self.flush)
diff --git a/langfuse/utils/base_callback_handler.py b/langfuse/utils/base_callback_handler.py
index 06de96d94..e2f7920aa 100644
--- a/langfuse/utils/base_callback_handler.py
+++ b/langfuse/utils/base_callback_handler.py
@@ -1,10 +1,11 @@
-from typing import Optional, Union, List, Any, Callable
-import httpx
import logging
import os
import warnings
+from typing import Any, Callable, List, Optional, Union
+
+import httpx
-from langfuse.client import Langfuse, StatefulTraceClient, StatefulSpanClient, StateType
+from langfuse.client import Langfuse, StatefulSpanClient, StatefulTraceClient, StateType
class LangfuseBaseCallbackHandler:
@@ -38,6 +39,7 @@ def __init__(
sdk_integration: str,
sample_rate: Optional[float] = None,
mask: Optional[Callable] = None,
+ environment: Optional[str] = None,
) -> None:
self.version = version
self.session_id = session_id
@@ -109,6 +111,8 @@ def __init__(
args["sample_rate"] = prio_sample_rate
if mask is not None:
args["mask"] = mask
+ if environment is not None:
+ args["environment"] = environment
args["sdk_integration"] = sdk_integration
diff --git a/langfuse/utils/langfuse_singleton.py b/langfuse/utils/langfuse_singleton.py
index 94a79327d..0bf0dbd76 100644
--- a/langfuse/utils/langfuse_singleton.py
+++ b/langfuse/utils/langfuse_singleton.py
@@ -37,6 +37,7 @@ def get(
enabled: Optional[bool] = None,
sample_rate: Optional[float] = None,
mask: Optional[MaskFunction] = None,
+ environment: Optional[str] = None,
) -> Langfuse:
if self._langfuse:
return self._langfuse
@@ -61,6 +62,7 @@ def get(
"enabled": enabled,
"sample_rate": sample_rate,
"mask": mask,
+ "environment": environment,
}
self._langfuse = Langfuse(
From 8a29beeca0d7e3a9287e29f7b30e74f2c4e4ec65 Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Tue, 25 Feb 2025 18:01:53 +0100
Subject: [PATCH 4/9] remove setting env to none
---
langfuse/client.py | 5 ++---
tests/test_core_sdk.py | 13 -------------
2 files changed, 2 insertions(+), 16 deletions(-)
diff --git a/langfuse/client.py b/langfuse/client.py
index 3c144d20b..de05439ee 100644
--- a/langfuse/client.py
+++ b/langfuse/client.py
@@ -297,10 +297,9 @@ def __init__(
if self.environment and not bool(
re.match(ENVIRONMENT_PATTERN, self.environment)
):
- self.log.warning(
- f'Invalid environment specified "{environment}" that does not match validation pattern ("{ENVIRONMENT_PATTERN}"). Setting will be ignored.'
+ self.log.error(
+ f'Invalid environment specified "{environment}" that does not match validation pattern ("{ENVIRONMENT_PATTERN}"). Events will be rejected by Langfuse servers.'
)
- self.environment = None
self.httpx_client = httpx_client or httpx.Client(timeout=timeout)
diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py
index f06970d4c..2ae6325ce 100644
--- a/tests/test_core_sdk.py
+++ b/tests/test_core_sdk.py
@@ -1578,16 +1578,3 @@ def test_environment_from_env_var(monkeypatch):
fetched_trace = api_wrapper.get_trace(trace.id)
assert fetched_trace["environment"] == "testing"
-
-
-def test_invalid_environment():
- # Test with invalid environment (too long)
- langfuse = Langfuse(debug=True, environment="UPPERCASE")
- api_wrapper = LangfuseAPI()
-
- trace = langfuse.trace(name="test_invalid_environment")
- langfuse.flush()
- sleep(1)
-
- fetched_trace = api_wrapper.get_trace(trace.id)
- assert fetched_trace["environment"] is None
From 2b5eb26d6c305f924624cc20c3bbe356286def91 Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Tue, 25 Feb 2025 18:11:22 +0100
Subject: [PATCH 5/9] Update tests/test_core_sdk.py
---
tests/test_core_sdk.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py
index 2ae6325ce..7279c66ea 100644
--- a/tests/test_core_sdk.py
+++ b/tests/test_core_sdk.py
@@ -1528,7 +1528,7 @@ def test_generate_trace_id():
trace_url = langfuse.get_trace_url()
assert (
trace_url
- == f"http://localhost:300/project/7a88fb47-b4e2-43b8-a06c-a5ce950dc53a/traces/{trace_id}"
+ == f"http://localhost:3000/project/7a88fb47-b4e2-43b8-a06c-a5ce950dc53a/traces/{trace_id}"
)
From a6361ff3ca59862577e25def99b2d61abd984795 Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Wed, 26 Feb 2025 10:24:24 +0100
Subject: [PATCH 6/9] add environment to ingestion bodies
---
langfuse/api/resources/ingestion/types/observation_body.py | 1 +
.../api/resources/ingestion/types/optional_observation_body.py | 1 +
langfuse/api/resources/ingestion/types/score_body.py | 1 +
langfuse/api/resources/ingestion/types/trace_body.py | 1 +
4 files changed, 4 insertions(+)
diff --git a/langfuse/api/resources/ingestion/types/observation_body.py b/langfuse/api/resources/ingestion/types/observation_body.py
index 254c600da..d191a1f12 100644
--- a/langfuse/api/resources/ingestion/types/observation_body.py
+++ b/langfuse/api/resources/ingestion/types/observation_body.py
@@ -41,6 +41,7 @@ class ObservationBody(pydantic_v1.BaseModel):
parent_observation_id: typing.Optional[str] = pydantic_v1.Field(
alias="parentObservationId", default=None
)
+ environment: typing.Optional[str] = None
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
diff --git a/langfuse/api/resources/ingestion/types/optional_observation_body.py b/langfuse/api/resources/ingestion/types/optional_observation_body.py
index bd7cb5808..7302d30f9 100644
--- a/langfuse/api/resources/ingestion/types/optional_observation_body.py
+++ b/langfuse/api/resources/ingestion/types/optional_observation_body.py
@@ -25,6 +25,7 @@ class OptionalObservationBody(pydantic_v1.BaseModel):
alias="parentObservationId", default=None
)
version: typing.Optional[str] = None
+ environment: typing.Optional[str] = None
def json(self, **kwargs: typing.Any) -> str:
kwargs_with_defaults: typing.Any = {
diff --git a/langfuse/api/resources/ingestion/types/score_body.py b/langfuse/api/resources/ingestion/types/score_body.py
index aad152dd8..dbe2fbbd9 100644
--- a/langfuse/api/resources/ingestion/types/score_body.py
+++ b/langfuse/api/resources/ingestion/types/score_body.py
@@ -25,6 +25,7 @@ class ScoreBody(pydantic_v1.BaseModel):
id: typing.Optional[str] = None
trace_id: str = pydantic_v1.Field(alias="traceId")
name: str
+ environment: typing.Optional[str] = None
value: CreateScoreValue = pydantic_v1.Field()
"""
The value of the score. Must be passed as string for categorical scores, and numeric for boolean and numeric scores. Boolean score values must equal either 1 or 0 (true or false)
diff --git a/langfuse/api/resources/ingestion/types/trace_body.py b/langfuse/api/resources/ingestion/types/trace_body.py
index 42cb4f197..3f5550435 100644
--- a/langfuse/api/resources/ingestion/types/trace_body.py
+++ b/langfuse/api/resources/ingestion/types/trace_body.py
@@ -21,6 +21,7 @@ class TraceBody(pydantic_v1.BaseModel):
version: typing.Optional[str] = None
metadata: typing.Optional[typing.Any] = None
tags: typing.Optional[typing.List[str]] = None
+ environment: typing.Optional[str] = None
public: typing.Optional[bool] = pydantic_v1.Field(default=None)
"""
Make trace publicly accessible via url
From 1670b32a34a258fd196a9cb7f95c2e64de40384a Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Wed, 26 Feb 2025 10:56:21 +0100
Subject: [PATCH 7/9] disallow langfuse
---
langfuse/client.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/langfuse/client.py b/langfuse/client.py
index de05439ee..7c6b358cd 100644
--- a/langfuse/client.py
+++ b/langfuse/client.py
@@ -87,7 +87,7 @@
from .version import __version__ as version
-ENVIRONMENT_PATTERN = r"^(?!langfuse-)[a-z0-9-_]+$"
+ENVIRONMENT_PATTERN = r"^(?!langfuse)[a-z0-9-_]+$"
@dataclass
From 27694a0b09449723f7a48cb0dae5f033c1bc87eb Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Wed, 26 Feb 2025 16:59:41 +0100
Subject: [PATCH 8/9] add environment to stateful clients
---
langfuse/client.py | 38 ++++++++++++++++-------
langfuse/decorators/langfuse_decorator.py | 3 ++
langfuse/llama_index/_event_handler.py | 14 +++++----
langfuse/llama_index/_instrumentor.py | 1 +
langfuse/llama_index/_span_handler.py | 24 +++++++-------
langfuse/llama_index/llama_index.py | 17 +++++-----
tests/test_core_sdk.py | 6 ++++
7 files changed, 68 insertions(+), 35 deletions(-)
diff --git a/langfuse/client.py b/langfuse/client.py
index 7c6b358cd..dc7ed9409 100644
--- a/langfuse/client.py
+++ b/langfuse/client.py
@@ -1508,7 +1508,12 @@ def trace(
self._log_memory_usage()
return StatefulTraceClient(
- self.client, new_id, StateType.TRACE, new_id, self.task_manager
+ self.client,
+ new_id,
+ StateType.TRACE,
+ new_id,
+ self.task_manager,
+ self.environment,
)
def _log_memory_usage(self):
@@ -1792,6 +1797,7 @@ def span(
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
+ self.environment,
)
def event(
@@ -1892,6 +1898,7 @@ def event(
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
+ self.environment,
)
def generation(
@@ -2039,6 +2046,7 @@ def generation(
StateType.OBSERVATION,
new_trace_id,
self.task_manager,
+ self.environment,
)
def _generate_trace(self, trace_id: str, name: str):
@@ -2172,7 +2180,6 @@ def __init__(
self.log.warning(
f'Invalid environment specified "{environment}" that does not match validation pattern ("{ENVIRONMENT_PATTERN}"). Setting will be ignored.'
)
- self.environment = None
def _add_state_to_event(self, body: dict):
if self.state_type == StateType.OBSERVATION:
@@ -2304,7 +2311,8 @@ def generation(
generation_id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def span(
@@ -2394,7 +2402,8 @@ def span(
span_id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
@overload
@@ -2627,9 +2636,10 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulGenerationClient."""
- super().__init__(client, id, state_type, trace_id, task_manager)
+ super().__init__(client, id, state_type, trace_id, task_manager, environment)
# WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
def update(
@@ -2741,7 +2751,8 @@ def update(
self.id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def end(
@@ -2846,9 +2857,10 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulSpanClient."""
- super().__init__(client, id, state_type, trace_id, task_manager)
+ super().__init__(client, id, state_type, trace_id, task_manager, environment)
# WHEN CHANGING THIS METHOD, UPDATE END() FUNCTION ACCORDINGLY
def update(
@@ -2932,7 +2944,8 @@ def update(
self.id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def end(
@@ -3005,7 +3018,8 @@ def end(
self.id,
StateType.OBSERVATION,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def get_langchain_handler(self, update_parent: bool = False):
@@ -3044,9 +3058,10 @@ def __init__(
state_type: StateType,
trace_id: str,
task_manager: TaskManager,
+ environment: Optional[str] = None,
):
"""Initialize the StatefulTraceClient."""
- super().__init__(client, id, state_type, trace_id, task_manager)
+ super().__init__(client, id, state_type, trace_id, task_manager, environment)
self.task_manager = task_manager
def update(
@@ -3137,7 +3152,8 @@ def update(
self.id,
StateType.TRACE,
self.trace_id,
- task_manager=self.task_manager,
+ self.task_manager,
+ self.environment,
)
def get_langchain_handler(self, update_parent: bool = False):
diff --git a/langfuse/decorators/langfuse_decorator.py b/langfuse/decorators/langfuse_decorator.py
index a6d5743e5..9145e83c8 100644
--- a/langfuse/decorators/langfuse_decorator.py
+++ b/langfuse/decorators/langfuse_decorator.py
@@ -344,6 +344,7 @@ def _prepare_call(
task_manager=self.client_instance.task_manager,
client=self.client_instance.client,
state_type=StateType.OBSERVATION,
+ environment=self.client_instance.environment,
)
self._set_root_trace_id(provided_parent_trace_id)
@@ -354,6 +355,7 @@ def _prepare_call(
task_manager=self.client_instance.task_manager,
client=self.client_instance.client,
state_type=StateType.TRACE,
+ environment=self.client_instance.environment,
)
self._set_root_trace_id(provided_parent_trace_id)
@@ -490,6 +492,7 @@ def _handle_call_result(
task_manager=self.client_instance.task_manager,
client=self.client_instance.client,
state_type=StateType.TRACE,
+ environment=self.client_instance.environment,
)
trace_client.update(**observation_params)
diff --git a/langfuse/llama_index/_event_handler.py b/langfuse/llama_index/_event_handler.py
index c299643e3..cb9015d68 100644
--- a/langfuse/llama_index/_event_handler.py
+++ b/langfuse/llama_index/_event_handler.py
@@ -1,4 +1,5 @@
-from typing import Optional, Any, Union, Mapping
+from typing import Any, Mapping, Optional, Union
+from uuid import uuid4 as create_uuid
from langfuse.client import (
Langfuse,
@@ -6,25 +7,25 @@
StateType,
)
from langfuse.utils import _get_timestamp
+
from ._context import InstrumentorContext
-from uuid import uuid4 as create_uuid
try:
from llama_index.core.base.llms.types import (
ChatResponse,
CompletionResponse,
)
+ from llama_index.core.instrumentation.event_handlers import BaseEventHandler
from llama_index.core.instrumentation.events import BaseEvent
from llama_index.core.instrumentation.events.embedding import (
- EmbeddingStartEvent,
EmbeddingEndEvent,
+ EmbeddingStartEvent,
)
- from llama_index.core.instrumentation.event_handlers import BaseEventHandler
from llama_index.core.instrumentation.events.llm import (
- LLMCompletionEndEvent,
- LLMCompletionStartEvent,
LLMChatEndEvent,
LLMChatStartEvent,
+ LLMCompletionEndEvent,
+ LLMCompletionStartEvent,
)
from llama_index.core.utilities.token_counting import TokenCounter
@@ -148,6 +149,7 @@ def _get_generation_client(self, id: str) -> StatefulGenerationClient:
trace_id=trace_id,
task_manager=self._langfuse.task_manager,
state_type=StateType.OBSERVATION,
+ environment=self._langfuse.environment,
)
diff --git a/langfuse/llama_index/_instrumentor.py b/langfuse/llama_index/_instrumentor.py
index eeeff3c04..c8bb760d5 100644
--- a/langfuse/llama_index/_instrumentor.py
+++ b/langfuse/llama_index/_instrumentor.py
@@ -288,6 +288,7 @@ def _get_trace_client(self, trace_id: str) -> StatefulTraceClient:
trace_id=trace_id,
task_manager=self._langfuse.task_manager,
state_type=StateType.TRACE,
+ environment=self._langfuse.environment,
)
@property
diff --git a/langfuse/llama_index/_span_handler.py b/langfuse/llama_index/_span_handler.py
index 14387d5aa..d3ff91a88 100644
--- a/langfuse/llama_index/_span_handler.py
+++ b/langfuse/llama_index/_span_handler.py
@@ -1,32 +1,32 @@
import inspect
-from typing import Optional, Any, Tuple, Generator, AsyncGenerator
import uuid
+from logging import getLogger
+from typing import Any, AsyncGenerator, Generator, Optional, Tuple
+
+from pydantic import BaseModel
from langfuse.client import (
Langfuse,
- StatefulSpanClient,
+ StatefulClient,
StatefulGenerationClient,
+ StatefulSpanClient,
StateType,
- StatefulClient,
)
-from logging import getLogger
-from pydantic import BaseModel
-
from ._context import InstrumentorContext
logger = getLogger(__name__)
try:
- from llama_index.core.instrumentation.span_handlers import BaseSpanHandler
- from llama_index.core.instrumentation.span import BaseSpan
- from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.base.base_query_engine import BaseQueryEngine
- from llama_index.core.llms import LLM, ChatResponse
+ from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.base.response.schema import (
- StreamingResponse,
AsyncStreamingResponse,
+ StreamingResponse,
)
+ from llama_index.core.instrumentation.span import BaseSpan
+ from llama_index.core.instrumentation.span_handlers import BaseSpanHandler
+ from llama_index.core.llms import LLM, ChatResponse
from llama_index.core.workflow import Context
except ImportError:
@@ -215,6 +215,7 @@ def _get_generation_client(self, id: str) -> StatefulGenerationClient:
trace_id=trace_id,
task_manager=self._langfuse_client.task_manager,
state_type=StateType.OBSERVATION,
+ environment=self._langfuse_client.environment,
)
def _get_span_client(self, id: str) -> StatefulSpanClient:
@@ -231,6 +232,7 @@ def _get_span_client(self, id: str) -> StatefulSpanClient:
trace_id=trace_id,
task_manager=self._langfuse_client.task_manager,
state_type=StateType.OBSERVATION,
+ environment=self._langfuse_client.environment,
)
def _parse_generation_input(
diff --git a/langfuse/llama_index/llama_index.py b/langfuse/llama_index/llama_index.py
index cb2f28992..b5658880a 100644
--- a/langfuse/llama_index/llama_index.py
+++ b/langfuse/llama_index/llama_index.py
@@ -1,32 +1,34 @@
+import logging
from collections import defaultdict
from contextvars import ContextVar
-from typing import Any, Dict, List, Optional, Union, Tuple, Callable, Generator
+from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union
from uuid import uuid4
-import logging
+
import httpx
+from pydantic import BaseModel
from langfuse.client import (
+ StatefulGenerationClient,
StatefulSpanClient,
StatefulTraceClient,
- StatefulGenerationClient,
StateType,
)
+from langfuse.types import TraceMetadata
+from langfuse.utils.base_callback_handler import LangfuseBaseCallbackHandler
from langfuse.utils.error_logging import (
auto_decorate_methods_with,
catch_and_log_errors,
)
-from langfuse.types import TraceMetadata
-from langfuse.utils.base_callback_handler import LangfuseBaseCallbackHandler
+
from .utils import CallbackEvent, ParsedLLMEndPayload
-from pydantic import BaseModel
try:
from llama_index.core.callbacks.base_handler import (
BaseCallbackHandler as LlamaIndexBaseCallbackHandler,
)
from llama_index.core.callbacks.schema import (
- CBEventType,
BASE_TRACE_EVENT,
+ CBEventType,
EventPayload,
)
from llama_index.core.utilities.token_counting import TokenCounter
@@ -165,6 +167,7 @@ def set_root(
StateType.TRACE,
root.trace_id,
root.task_manager,
+ root.environment,
)
self._task_manager = root.task_manager
diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py
index 7279c66ea..38aea9a7a 100644
--- a/tests/test_core_sdk.py
+++ b/tests/test_core_sdk.py
@@ -1538,7 +1538,13 @@ def test_environment_from_constructor():
api_wrapper = LangfuseAPI()
trace = langfuse.trace(name="test_environment")
+ sleep(0.1)
+ trace.update(name="updated_name")
+
generation = trace.generation(name="test_gen")
+ sleep(0.1)
+ generation.update(name="test_gen_1")
+
score_id = create_uuid()
langfuse.score(id=score_id, trace_id=trace.id, name="test_score", value=1)
From 8fd01cd291ae06aa9db3cf0e803fb261a997abf7 Mon Sep 17 00:00:00 2001
From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com>
Date: Wed, 26 Feb 2025 18:29:21 +0100
Subject: [PATCH 9/9] fix flaky test
---
tests/test_langchain.py | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git a/tests/test_langchain.py b/tests/test_langchain.py
index 90cdec1f2..56a56f69d 100644
--- a/tests/test_langchain.py
+++ b/tests/test_langchain.py
@@ -2339,6 +2339,8 @@ def test_cached_token_usage():
config = {"callbacks": [handler]}
chain.invoke({"test_param": "in a funny way"}, config)
+ chain.invoke({"test_param": "in a funny way"}, config)
+ sleep(1)
# invoke again to force cached token usage
chain.invoke({"test_param": "in a funny way"}, config)
@@ -2359,8 +2361,11 @@ def test_cached_token_usage():
assert generation.cost_details["input_cache_read"] > 0
assert (
- generation.cost_details["input"]
- + generation.cost_details["input_cache_read"]
- + generation.cost_details["output"]
- == generation.cost_details["total"]
+ abs(
+ generation.cost_details["input"]
+ + generation.cost_details["input_cache_read"]
+ + generation.cost_details["output"]
+ - generation.cost_details["total"]
+ )
+ < 0.0001
)