Skip to content

Commit e409727

Browse files
authored
test(ci): make test suites v3 ready (#1063)
1 parent b4d3ab9 commit e409727

File tree

10 files changed

+341
-341
lines changed

10 files changed

+341
-341
lines changed

.github/workflows/ci.yml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,13 @@ jobs:
9090
rm -rf .env
9191
9292
echo "::group::Run server"
93-
TELEMETRY_ENABLED=false LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT=http://localhost:9090 LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED=true LANGFUSE_READ_FROM_POSTGRES_ONLY=true LANGFUSE_READ_FROM_CLICKHOUSE_ONLY=false LANGFUSE_RETURN_FROM_CLICKHOUSE=false docker compose up -d
93+
94+
TELEMETRY_ENABLED=false \
95+
LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT=http://localhost:9090 \
96+
LANGFUSE_INGESTION_QUEUE_DELAY_MS=10 \
97+
LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS=10 \
98+
docker compose up -d
99+
94100
echo "::endgroup::"
95101
96102
# Add this step to check the health of the container
@@ -149,7 +155,7 @@ jobs:
149155
- name: Run the automated tests
150156
run: |
151157
python --version
152-
poetry run pytest -s -v --log-cli-level=INFO
158+
poetry run pytest -n auto -s -v --log-cli-level=INFO
153159
154160
all-tests-passed:
155161
# This allows us to have a branch protection rule for tests and deploys with matrix

langfuse/callback/langchain.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
from collections import defaultdict
2-
import httpx
31
import logging
42
import typing
53
import warnings
4+
from collections import defaultdict
65

6+
import httpx
77
import pydantic
88

99
try: # Test that langchain is installed before proceeding
@@ -15,35 +15,36 @@
1515
)
1616
from typing import Any, Dict, List, Optional, Sequence, Union, cast
1717
from uuid import UUID, uuid4
18+
1819
from langfuse.api.resources.ingestion.types.sdk_log_body import SdkLogBody
1920
from langfuse.client import (
21+
StatefulGenerationClient,
2022
StatefulSpanClient,
2123
StatefulTraceClient,
22-
StatefulGenerationClient,
2324
)
2425
from langfuse.extract_model import _extract_model_name
26+
from langfuse.types import MaskFunction
2527
from langfuse.utils import _get_timestamp
2628
from langfuse.utils.base_callback_handler import LangfuseBaseCallbackHandler
27-
from langfuse.types import MaskFunction
2829

2930
try:
3031
from langchain.callbacks.base import (
3132
BaseCallbackHandler as LangchainBaseCallbackHandler,
3233
)
3334
from langchain.schema.agent import AgentAction, AgentFinish
3435
from langchain.schema.document import Document
35-
from langchain_core.outputs import (
36-
ChatGeneration,
37-
LLMResult,
38-
)
3936
from langchain_core.messages import (
4037
AIMessage,
4138
BaseMessage,
4239
ChatMessage,
40+
FunctionMessage,
4341
HumanMessage,
4442
SystemMessage,
4543
ToolMessage,
46-
FunctionMessage,
44+
)
45+
from langchain_core.outputs import (
46+
ChatGeneration,
47+
LLMResult,
4748
)
4849
except ImportError:
4950
raise ModuleNotFoundError(
@@ -149,7 +150,9 @@ def on_llm_new_token(
149150

150151
self.updated_completion_start_time_memo.add(run_id)
151152

152-
def get_langchain_run_name(self, serialized: Optional[Dict[str, Any]], **kwargs: Any) -> str:
153+
def get_langchain_run_name(
154+
self, serialized: Optional[Dict[str, Any]], **kwargs: Any
155+
) -> str:
153156
"""Retrieve the name of a serialized LangChain runnable.
154157
155158
The prioritization for the determination of the run name is as follows:
@@ -1055,16 +1058,24 @@ def _parse_usage_model(usage: typing.Union[pydantic.BaseModel, dict]):
10551058
]
10561059

10571060
usage_model = usage.copy() # Copy all existing key-value pairs
1058-
for model_key, langfuse_key in conversion_list:
1059-
if model_key in usage_model:
1060-
captured_count = usage_model.pop(model_key)
1061-
final_count = (
1062-
sum(captured_count)
1063-
if isinstance(captured_count, list)
1064-
else captured_count
1065-
) # For Bedrock, the token count is a list when streamed
1066-
1067-
usage_model[langfuse_key] = final_count # Translate key and keep the value
1061+
1062+
# Skip OpenAI usage types as they are handled server side
1063+
if not all(
1064+
openai_key in usage_model
1065+
for openai_key in ["prompt_tokens", "completion_tokens", "total_tokens"]
1066+
):
1067+
for model_key, langfuse_key in conversion_list:
1068+
if model_key in usage_model:
1069+
captured_count = usage_model.pop(model_key)
1070+
final_count = (
1071+
sum(captured_count)
1072+
if isinstance(captured_count, list)
1073+
else captured_count
1074+
) # For Bedrock, the token count is a list when streamed
1075+
1076+
usage_model[langfuse_key] = (
1077+
final_count # Translate key and keep the value
1078+
)
10681079

10691080
if isinstance(usage_model, dict):
10701081
if "input_token_details" in usage_model:

tests/api_wrapper.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from time import sleep
23

34
import httpx
45

@@ -11,23 +12,27 @@ def __init__(self, username=None, password=None, base_url=None):
1112
self.BASE_URL = base_url if base_url else os.environ["LANGFUSE_HOST"]
1213

1314
def get_observation(self, observation_id):
15+
sleep(1)
1416
url = f"{self.BASE_URL}/api/public/observations/{observation_id}"
1517
response = httpx.get(url, auth=self.auth)
1618
return response.json()
1719

1820
def get_scores(self, page=None, limit=None, user_id=None, name=None):
21+
sleep(1)
1922
params = {"page": page, "limit": limit, "userId": user_id, "name": name}
2023
url = f"{self.BASE_URL}/api/public/scores"
2124
response = httpx.get(url, params=params, auth=self.auth)
2225
return response.json()
2326

2427
def get_traces(self, page=None, limit=None, user_id=None, name=None):
28+
sleep(1)
2529
params = {"page": page, "limit": limit, "userId": user_id, "name": name}
2630
url = f"{self.BASE_URL}/api/public/traces"
2731
response = httpx.get(url, params=params, auth=self.auth)
2832
return response.json()
2933

3034
def get_trace(self, trace_id):
35+
sleep(1)
3136
url = f"{self.BASE_URL}/api/public/traces/{trace_id}"
3237
response = httpx.get(url, auth=self.auth)
3338
return response.json()

0 commit comments

Comments
 (0)