Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions langfuse/_task_manager/ingestion_consumer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
import logging
import os
import threading
import time

from queue import Empty, Queue
from typing import Any, List, Optional

Expand All @@ -21,8 +21,8 @@

from .media_manager import MediaManager

MAX_EVENT_SIZE_BYTES = 1_000_000
MAX_BATCH_SIZE_BYTES = 2_500_000
MAX_EVENT_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_EVENT_SIZE_BYTES", 1_000_000))
MAX_BATCH_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_BATCH_SIZE_BYTES", 2_500_000))
Comment on lines +24 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: No error handling for invalid integer values in environment variables. Could crash on startup if LANGFUSE_MAX_EVENT_SIZE_BYTES or LANGFUSE_MAX_BATCH_SIZE_BYTES contain non-integer values.

Suggested change
MAX_EVENT_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_EVENT_SIZE_BYTES", 1_000_000))
MAX_BATCH_SIZE_BYTES = int(os.environ.get("LANGFUSE_MAX_BATCH_SIZE_BYTES", 2_500_000))
def safe_int_env(key: str, default: int) -> int:
try:
return int(os.environ.get(key, default))
except ValueError:
logging.warning(f"Invalid integer value for {key}, using default {default}")
return default
MAX_EVENT_SIZE_BYTES = safe_int_env("LANGFUSE_MAX_EVENT_SIZE_BYTES", 1_000_000)
MAX_BATCH_SIZE_BYTES = safe_int_env("LANGFUSE_MAX_BATCH_SIZE_BYTES", 2_500_000)



class IngestionMetadata(pydantic.BaseModel):
Expand Down
4 changes: 3 additions & 1 deletion tests/test_langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2184,7 +2184,9 @@ def _generate_random_dict(n: int, key_length: int = 8) -> Dict[str, Any]:
overhead = duration_with_langfuse - duration_without_langfuse
print(f"Langfuse overhead: {overhead}ms")

assert overhead < 50, f"Langfuse tracing overhead of {overhead}ms exceeds threshold"
assert (
overhead < 100
), f"Langfuse tracing overhead of {overhead}ms exceeds threshold"

handler.flush()

Expand Down