Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 19 additions & 20 deletions langfuse/_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

import logging
import os
import warnings
import re
import urllib.parse
import warnings
from datetime import datetime
from hashlib import sha256
from time import time_ns
Expand All @@ -17,8 +17,8 @@
List,
Literal,
Optional,
Union,
Type,
Union,
cast,
overload,
)
Expand All @@ -36,6 +36,13 @@
from packaging.version import Version

from langfuse._client.attributes import LangfuseOtelSpanAttributes
from langfuse._client.constants import (
ObservationTypeGenerationLike,
ObservationTypeLiteral,
ObservationTypeLiteralNoEvent,
ObservationTypeSpanLike,
get_observation_types_list,
)
from langfuse._client.datasets import DatasetClient, DatasetItemClient
from langfuse._client.environment_variables import (
LANGFUSE_DEBUG,
Expand All @@ -47,25 +54,18 @@
LANGFUSE_TRACING_ENABLED,
LANGFUSE_TRACING_ENVIRONMENT,
)
from langfuse._client.constants import (
ObservationTypeLiteral,
ObservationTypeLiteralNoEvent,
ObservationTypeGenerationLike,
ObservationTypeSpanLike,
get_observation_types_list,
)
from langfuse._client.resource_manager import LangfuseResourceManager
from langfuse._client.span import (
LangfuseEvent,
LangfuseGeneration,
LangfuseSpan,
LangfuseAgent,
LangfuseTool,
LangfuseChain,
LangfuseRetriever,
LangfuseEvaluator,
LangfuseEmbedding,
LangfuseEvaluator,
LangfuseEvent,
LangfuseGeneration,
LangfuseGuardrail,
LangfuseRetriever,
LangfuseSpan,
LangfuseTool,
)
from langfuse._utils import _get_timestamp
from langfuse._utils.parse_error import handle_fern_exception
Expand Down Expand Up @@ -2996,11 +2996,10 @@ def _url_encode(self, url: str, *, is_url_param: Optional[bool] = False) -> str:
# we need add safe="" to force escaping of slashes
# This is necessary for prompts in prompt folders
return urllib.parse.quote(url, safe="")

def clear_prompt_cache(self):
"""
Clear the entire prompt cache, removing all cached prompts.


def clear_prompt_cache(self) -> None:
"""Clear the entire prompt cache, removing all cached prompts.

This method is useful when you want to force a complete refresh of all
cached prompts, for example after major updates or when you need to
ensure the latest versions are fetched from the server.
Expand Down
10 changes: 6 additions & 4 deletions langfuse/_utils/prompt_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@

import atexit
import logging
import os
from datetime import datetime
from queue import Empty, Queue
from threading import Thread
from typing import Callable, Dict, List, Optional, Set
import os

from langfuse.model import PromptClient
from langfuse._client.environment_variables import (
LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS
LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS,
)
from langfuse.model import PromptClient

DEFAULT_PROMPT_CACHE_TTL_SECONDS = int(os.getenv(LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS, 60))
DEFAULT_PROMPT_CACHE_TTL_SECONDS = int(
os.getenv(LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS, 60)
)

DEFAULT_PROMPT_CACHE_REFRESH_WORKERS = 1

Expand Down
10 changes: 9 additions & 1 deletion langfuse/langchain/CallbackHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,9 +939,17 @@ def __join_tags_and_metadata(
def _convert_message_to_dict(self, message: BaseMessage) -> Dict[str, Any]:
# assistant message
if isinstance(message, HumanMessage):
message_dict = {"role": "user", "content": message.content}
message_dict: Dict[str, Any] = {"role": "user", "content": message.content}
elif isinstance(message, AIMessage):
message_dict = {"role": "assistant", "content": message.content}

if (
hasattr(message, "tool_calls")
and message.tool_calls is not None
and len(message.tool_calls) > 0
):
message_dict["tool_calls"] = message.tool_calls

elif isinstance(message, SystemMessage):
message_dict = {"role": "system", "content": message.content}
elif isinstance(message, ToolMessage):
Expand Down
233 changes: 1 addition & 232 deletions tests/test_prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from langfuse.model import ChatPromptClient, TextPromptClient
from tests.utils import create_uuid, get_api

import os
from langfuse._client.environment_variables import LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS

def test_create_prompt():
langfuse = Langfuse()
Expand Down Expand Up @@ -681,25 +679,11 @@ def test_prompt_end_to_end():

@pytest.fixture
def langfuse():
langfuse_instance = Langfuse(
public_key="test-public-key",
secret_key="test-secret-key",
host="https://mock-host.com",
)
langfuse_instance = Langfuse()
langfuse_instance.api = Mock()

return langfuse_instance

@pytest.fixture
def langfuse_with_override_default_cache():
langfuse_instance = Langfuse(
public_key="test-public-key",
secret_key="test-secret-key",
host="https://mock-host.com",
default_cache_ttl_seconds=OVERRIDE_DEFAULT_PROMPT_CACHE_TTL_SECONDS,
)
langfuse_instance.api = Mock()
return langfuse_instance

# Fetching a new prompt when nothing in cache
def test_get_fresh_prompt(langfuse):
Expand Down Expand Up @@ -1426,218 +1410,3 @@ def test_update_prompt():
expected_labels = sorted(["latest", "doe", "production", "john"])
assert sorted(fetched_prompt.labels) == expected_labels
assert sorted(updated_prompt.labels) == expected_labels


def test_environment_variable_override_prompt_cache_ttl():
"""Test that LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS environment variable overrides default TTL."""
import os
from unittest.mock import patch

# Set environment variable to override default TTL
os.environ[LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS] = "120"

# Create a new Langfuse instance to pick up the environment variable
langfuse = Langfuse(
public_key="test-public-key",
secret_key="test-secret-key",
host="https://mock-host.com",
)
langfuse.api = Mock()

prompt_name = "test_env_override_ttl"
prompt = Prompt_Text(
name=prompt_name,
version=1,
prompt="Test prompt with env override",
type="text",
labels=[],
config={},
tags=[],
)
prompt_client = TextPromptClient(prompt)

mock_server_call = langfuse.api.prompts.get
mock_server_call.return_value = prompt

# Mock time to control cache expiration
with patch.object(PromptCacheItem, "get_epoch_seconds") as mock_time:
mock_time.return_value = 0

# First call - should cache the prompt
result1 = langfuse.get_prompt(prompt_name)
assert mock_server_call.call_count == 1
assert result1 == prompt_client

# Check that prompt is cached
cached_item = langfuse._resources.prompt_cache.get(
langfuse._resources.prompt_cache.generate_cache_key(prompt_name, version=None, label=None)
)
assert cached_item is not None
assert cached_item.value == prompt_client

# Debug: check the cache item's expiry time
print(f"DEBUG: Cache item expiry: {cached_item._expiry}")
print(f"DEBUG: Current mock time: {mock_time.return_value}")
print(f"DEBUG: Is expired? {cached_item.is_expired()}")

# Set time to 60 seconds (before new TTL of 120 seconds)
mock_time.return_value = 60

# Second call - should still use cache
result2 = langfuse.get_prompt(prompt_name)
assert mock_server_call.call_count == 1 # No new server call
assert result2 == prompt_client

# Set time to 120 seconds (at TTL expiration)
mock_time.return_value = 120

# Third call - should still use cache (stale cache behavior)
result3 = langfuse.get_prompt(prompt_name)
assert result3 == prompt_client

# Wait for background refresh to complete
while True:
if langfuse._resources.prompt_cache._task_manager.active_tasks() == 0:
break
sleep(0.1)

# Should have made a new server call for refresh
assert mock_server_call.call_count == 2

# Set time to 121 seconds (after TTL expiration)
mock_time.return_value = 121

# Fourth call - should use refreshed cache
result4 = langfuse.get_prompt(prompt_name)
assert result4 == prompt_client

# Clean up environment variable
if LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS in os.environ:
del os.environ[LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS]


@patch.object(PromptCacheItem, "get_epoch_seconds")
def test_default_ttl_when_environment_variable_not_set(mock_time):
"""Test that default 60-second TTL is used when LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS is not set."""
from unittest.mock import patch

# Ensure environment variable is not set
if LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS in os.environ:
del os.environ[LANGFUSE_PROMPT_CACHE_DEFAULT_TTL_SECONDS]

# Set initial time to 0
mock_time.return_value = 0

# Create a new Langfuse instance to pick up the default TTL
langfuse = Langfuse(
public_key="test-public-key",
secret_key="test-secret-key",
host="https://mock-host.com",
)
langfuse.api = Mock()

prompt_name = "test_default_ttl"
prompt = Prompt_Text(
name=prompt_name,
version=1,
prompt="Test prompt with default TTL",
type="text",
labels=[],
config={},
tags=[],
)
prompt_client = TextPromptClient(prompt)

mock_server_call = langfuse.api.prompts.get
mock_server_call.return_value = prompt

# First call - should cache the prompt
result1 = langfuse.get_prompt(prompt_name)
assert mock_server_call.call_count == 1
assert result1 == prompt_client

# Check that prompt is cached
cached_item = langfuse._resources.prompt_cache.get(
langfuse._resources.prompt_cache.generate_cache_key(prompt_name, version=None, label=None)
)
assert cached_item is not None
assert cached_item.value == prompt_client

# Set time to just before default TTL expiration
mock_time.return_value = DEFAULT_PROMPT_CACHE_TTL_SECONDS - 1

# Second call - should still use cache
result2 = langfuse.get_prompt(prompt_name)
assert mock_server_call.call_count == 1 # No new server call
assert result2 == prompt_client

# Set time to just after default TTL expiration to trigger cache expiry
# Use the actual DEFAULT_PROMPT_CACHE_TTL_SECONDS value that was imported
mock_time.return_value = DEFAULT_PROMPT_CACHE_TTL_SECONDS + 1

# Third call - should still use cache (stale cache behavior)
result3 = langfuse.get_prompt(prompt_name)
assert result3 == prompt_client

# Wait for background refresh to complete
while True:
if langfuse._resources.prompt_cache._task_manager.active_tasks() == 0:
break
sleep(0.1)

# Should have made a new server call for refresh
assert mock_server_call.call_count == 2

# Set time to just after default TTL expiration
mock_time.return_value = DEFAULT_PROMPT_CACHE_TTL_SECONDS + 1

# Fourth call - should use refreshed cache
result4 = langfuse.get_prompt(prompt_name)
assert result4 == prompt_client


def test_clear_prompt_cache(langfuse):
"""Test clearing the entire prompt cache."""
prompt_name = create_uuid()

# Mock the API calls
mock_prompt = Prompt_Text(
name=prompt_name,
version=1,
prompt="test prompt",
type="text",
labels=["production"],
config={},
tags=[],
)

# Mock the create_prompt API call
langfuse.api.prompts.create.return_value = mock_prompt

# Mock the get_prompt API call
langfuse.api.prompts.get.return_value = mock_prompt

# Create a prompt and cache it
prompt_client = langfuse.create_prompt(
name=prompt_name,
prompt="test prompt",
labels=["production"],
)

# Get the prompt to cache it
cached_prompt = langfuse.get_prompt(prompt_name)
assert cached_prompt.name == prompt_name

# Verify that the prompt is in the cache
cache_key = f"{prompt_name}-label:production"
assert langfuse._resources.prompt_cache.get(cache_key) is not None, "Prompt should be in cache before clearing"

# Clear the entire prompt cache
langfuse.clear_prompt_cache()

# Verify cache is completely cleared
assert langfuse._resources.prompt_cache.get(cache_key) is None, "Prompt should be removed from cache after clearing"

# Verify data integrity
assert prompt_client.name == prompt_name
assert cached_prompt.name == prompt_name