From 9ee2ef8a46a40c59fd13bf9e65ef711c697d364c Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 10 Jun 2025 15:00:35 +0200 Subject: [PATCH 1/8] fix(masking): scope mask function to resources singleton --- langfuse/_client/client.py | 3 ++- langfuse/_client/resource_manager.py | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 74e99a26f..f102e8f40 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -162,7 +162,6 @@ def __init__( ): self._host = host or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") self._environment = environment or os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) - self._mask = mask self._project_id = None sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) if not 0.0 <= sample_rate <= 1.0: @@ -219,7 +218,9 @@ def __init__( httpx_client=httpx_client, media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, + mask=mask, ) + self._mask = self._resources.mask self._otel_tracer = ( self._resources.tracer diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 14cd84ec9..974bc91d5 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -18,7 +18,7 @@ import os import threading from queue import Full, Queue -from typing import Dict, Optional, cast, Any +from typing import Any, Dict, Optional, cast import httpx from opentelemetry import trace as otel_trace_api @@ -43,6 +43,7 @@ from langfuse._utils.request import LangfuseClient from langfuse.api.client import AsyncFernLangfuse, FernLangfuse from langfuse.logger import langfuse_logger +from langfuse.types import MaskFunction from ..version import __version__ as langfuse_version @@ -90,6 +91,7 @@ def __new__( httpx_client: Optional[httpx.Client] = None, media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, + mask: Optional[MaskFunction] = None, ) -> "LangfuseResourceManager": if public_key in cls._instances: return cls._instances[public_key] @@ -110,6 +112,7 @@ def __new__( httpx_client=httpx_client, media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, + mask=mask, ) cls._instances[public_key] = instance @@ -130,8 +133,10 @@ def _initialize_instance( media_upload_thread_count: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sample_rate: Optional[float] = None, + mask: Optional[MaskFunction] = None, ): self.public_key = public_key + self.mask = mask # OTEL Tracer tracer_provider = _init_tracer_provider( From 0ccae60573275148cfffff6373be664a8146d0a6 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 10 Jun 2025 16:19:33 +0200 Subject: [PATCH 2/8] fix public key as constructor arg --- langfuse/_client/client.py | 32 ++++++++++++++++++++-------- langfuse/_client/get_client.py | 11 +++++++++- langfuse/_client/resource_manager.py | 2 ++ langfuse/_client/span.py | 20 ++++++++--------- tests/test_otel.py | 2 +- 5 files changed, 46 insertions(+), 21 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index f102e8f40..118be3b5e 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -142,6 +142,9 @@ class Langfuse: ``` """ + _resources: Optional[LangfuseResourceManager] = None + _mask: Optional[MaskFunction] = None + def __init__( self, *, @@ -190,7 +193,6 @@ def __init__( langfuse_logger.warning( "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " - "See documentation: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" ) self._otel_tracer = otel_trace_api.NoOpTracer() return @@ -200,7 +202,6 @@ def __init__( langfuse_logger.warning( "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " - "See documentation: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" ) self._otel_tracer = otel_trace_api.NoOpTracer() return @@ -1326,7 +1327,9 @@ def create_score( "timestamp": _get_timestamp(), "body": new_body, } - self._resources.add_score_task(event) + + if self._resources is not None: + self._resources.add_score_task(event) except Exception as e: langfuse_logger.exception( @@ -1520,7 +1523,8 @@ def flush(self): # Continue with other work ``` """ - self._resources.flush() + if self._resources is not None: + self._resources.flush() def shutdown(self): """Shut down the Langfuse client and flush all pending data. @@ -1544,7 +1548,8 @@ def shutdown(self): langfuse.shutdown() ``` """ - self._resources.shutdown() + if self._resources is not None: + self._resources.shutdown() def get_current_trace_id(self) -> Optional[str]: """Get the trace ID of the current active span. @@ -1926,6 +1931,10 @@ def get_prompt( Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an expired prompt in the cache, in which case it logs a warning and returns the expired prompt. """ + if self._resources is None: + raise Error( + "SDK is not correctly initalized. Check the init logs for more details." + ) if version is not None and label is not None: raise ValueError("Cannot specify both version and label at the same time.") @@ -2051,7 +2060,8 @@ def fetch_prompts(): else: prompt = TextPromptClient(prompt_response) - self._resources.prompt_cache.set(cache_key, prompt, ttl_seconds) + if self._resources is not None: + self._resources.prompt_cache.set(cache_key, prompt, ttl_seconds) return prompt @@ -2150,7 +2160,8 @@ def create_prompt( ) server_prompt = self.api.prompts.create(request=request) - self._resources.prompt_cache.invalidate(name) + if self._resources is not None: + self._resources.prompt_cache.invalidate(name) return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) @@ -2169,7 +2180,8 @@ def create_prompt( server_prompt = self.api.prompts.create(request=request) - self._resources.prompt_cache.invalidate(name) + if self._resources is not None: + self._resources.prompt_cache.invalidate(name) return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) @@ -2200,7 +2212,9 @@ def update_prompt( version=version, new_labels=new_labels, ) - self._resources.prompt_cache.invalidate(name) + + if self._resources is not None: + self._resources.prompt_cache.invalidate(name) return updated_prompt diff --git a/langfuse/_client/get_client.py b/langfuse/_client/get_client.py index fe891e05a..b8eaa198c 100644 --- a/langfuse/_client/get_client.py +++ b/langfuse/_client/get_client.py @@ -56,7 +56,16 @@ def get_client(*, public_key: Optional[str] = None) -> Langfuse: if len(active_instances) == 1: # Only one client exists, safe to use without specifying key - return Langfuse(public_key=public_key) + instance = list(active_instances.values())[0] + + # Initialize with the credentials bound to the instance + # This is important if the original instance was instantiated + # via constructor arguments + return Langfuse( + public_key=instance.public_key, + secret_key=instance.secret_key, + host=instance.host, + ) else: # Multiple clients exist but no key specified - disable tracing diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 974bc91d5..915748071 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -136,6 +136,8 @@ def _initialize_instance( mask: Optional[MaskFunction] = None, ): self.public_key = public_key + self.secret_key = secret_key + self.host = host self.mask = mask # OTEL Tracer diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 39d5c62eb..87f4f1d63 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -410,7 +410,7 @@ def _process_media_and_apply_mask( The processed and masked data """ return self._mask_attribute( - data=self._process_media_in_attribute(data=data, span=span, field=field) + data=self._process_media_in_attribute(data=data, field=field) ) def _mask_attribute(self, *, data): @@ -441,7 +441,6 @@ def _process_media_in_attribute( self, *, data: Optional[Any] = None, - span: otel_trace_api.Span, field: Union[Literal["input"], Literal["output"], Literal["metadata"]], ): """Process any media content in the attribute data. @@ -457,16 +456,17 @@ def _process_media_in_attribute( Returns: The data with any media content processed """ - media_processed_attribute = ( - self._langfuse_client._resources._media_manager._find_and_process_media( - data=data, - field=field, - trace_id=self.trace_id, - observation_id=self.id, + if self._langfuse_client._resources is not None: + return ( + self._langfuse_client._resources._media_manager._find_and_process_media( + data=data, + field=field, + trace_id=self.trace_id, + observation_id=self.id, + ) ) - ) - return media_processed_attribute + return data class LangfuseSpan(LangfuseSpanWrapper): diff --git a/tests/test_otel.py b/tests/test_otel.py index 0206a5d94..5611f1c3f 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -2355,7 +2355,7 @@ def mask_sensitive_data(data): # Since _process_media_in_attribute makes calls to media_manager original_process = span._process_media_in_attribute - def mock_process_media(*, data, span, field): + def mock_process_media(*, data, field): # Just return the data directly without processing return data From 06757cc96e659128156b21e4fb1e305a54d30a7c Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 10 Jun 2025 17:23:51 +0200 Subject: [PATCH 3/8] refactor --- langfuse/_client/client.py | 207 +++++++++++++++++++---------------- langfuse/_client/span.py | 214 +++++++++++++++++++++---------------- 2 files changed, 235 insertions(+), 186 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 118be3b5e..823d99438 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -22,11 +22,7 @@ _agnosticcontextmanager, ) -from langfuse._client.attributes import ( - LangfuseOtelSpanAttributes, - create_generation_attributes, - create_span_attributes, -) +from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.datasets import DatasetClient, DatasetItemClient from langfuse._client.environment_variables import ( LANGFUSE_DEBUG, @@ -273,15 +269,6 @@ def start_span( span.end() ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -294,29 +281,33 @@ def start_span( with otel_trace_api.use_span( cast(otel_trace_api.Span, remote_parent_span) ): - otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes - ) + otel_span = self._otel_tracer.start_span(name=name) otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) return LangfuseSpan( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ) - otel_span = self._otel_tracer.start_span(name=name, attributes=attributes) + otel_span = self._otel_tracer.start_span(name=name) return LangfuseSpan( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ) def start_as_current_span( @@ -367,15 +358,6 @@ def start_as_current_span( child_span.update(output="sub-result") ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -390,13 +372,15 @@ def start_as_current_span( self._create_span_with_parent_context( as_type="span", name=name, - attributes=attributes, remote_parent_span=remote_parent_span, parent=None, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, ), ) @@ -405,11 +389,13 @@ def start_as_current_span( self._start_as_current_otel_span_with_processed_media( as_type="span", name=name, - attributes=attributes, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, ), ) @@ -481,21 +467,6 @@ def start_generation( generation.end() ``` """ - attributes = create_generation_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - completion_start_time=completion_start_time, - model=model, - model_parameters=model_parameters, - usage_details=usage_details, - cost_details=cost_details, - prompt=prompt, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -508,9 +479,7 @@ def start_generation( with otel_trace_api.use_span( cast(otel_trace_api.Span, remote_parent_span) ): - otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes - ) + otel_span = self._otel_tracer.start_span(name=name) otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) return LangfuseGeneration( @@ -519,9 +488,18 @@ def start_generation( input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) - otel_span = self._otel_tracer.start_span(name=name, attributes=attributes) + otel_span = self._otel_tracer.start_span(name=name) return LangfuseGeneration( otel_span=otel_span, @@ -529,6 +507,15 @@ def start_generation( input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) def start_as_current_generation( @@ -598,21 +585,6 @@ def start_as_current_generation( ) ``` """ - attributes = create_generation_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - completion_start_time=completion_start_time, - model=model, - model_parameters=model_parameters, - usage_details=usage_details, - cost_details=cost_details, - prompt=prompt, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -627,13 +599,21 @@ def start_as_current_generation( self._create_span_with_parent_context( as_type="generation", name=name, - attributes=attributes, remote_parent_span=remote_parent_span, parent=None, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ), ) @@ -642,11 +622,19 @@ def start_as_current_generation( self._start_as_current_otel_span_with_processed_media( as_type="generation", name=name, - attributes=attributes, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ), ) @@ -657,24 +645,40 @@ def _create_span_with_parent_context( name, parent, remote_parent_span, - attributes, as_type: Literal["generation", "span"], + end_on_exit: Optional[bool] = None, input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, - end_on_exit: Optional[bool] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): parent_span = parent or cast(otel_trace_api.Span, remote_parent_span) with otel_trace_api.use_span(parent_span): with self._start_as_current_otel_span_with_processed_media( name=name, - attributes=attributes, as_type=as_type, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) as langfuse_span: if remote_parent_span is not None: langfuse_span._otel_span.set_attribute( @@ -688,35 +692,54 @@ def _start_as_current_otel_span_with_processed_media( self, *, name: str, - attributes: Dict[str, str], as_type: Optional[Literal["generation", "span"]] = None, + end_on_exit: Optional[bool] = None, input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, - end_on_exit: Optional[bool] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): with self._otel_tracer.start_as_current_span( name=name, - attributes=attributes, end_on_exit=end_on_exit if end_on_exit is not None else True, ) as otel_span: yield ( LangfuseSpan( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ) if as_type == "span" else LangfuseGeneration( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) ) @@ -988,14 +1011,6 @@ def create_event( ``` """ timestamp = time_ns() - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) if trace_context: trace_id = trace_context.get("trace_id", None) @@ -1010,30 +1025,34 @@ def create_event( cast(otel_trace_api.Span, remote_parent_span) ): otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes, start_time=timestamp + name=name, start_time=timestamp ) otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) return LangfuseEvent( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ).end(end_time=timestamp) - otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes, start_time=timestamp - ) + otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) return LangfuseEvent( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ).end(end_time=timestamp) def _create_remote_parent_span( diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 87f4f1d63..1fbc03610 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -69,6 +69,15 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): """Initialize a new Langfuse span wrapper. @@ -80,6 +89,15 @@ def __init__( output: Output data from the span (any JSON-serializable object) metadata: Additional metadata to associate with the span environment: The tracing environment + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span + completion_start_time: When the model started generating the response + model: Name/identifier of the AI model used (e.g., "gpt-4") + model_parameters: Parameters used for the model (e.g., temperature, max_tokens) + usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) + cost_details: Cost information for the model call + prompt: Associated prompt template from Langfuse prompt management """ self._otel_span = otel_span self._otel_span.set_attribute( @@ -108,12 +126,35 @@ def __init__( data=metadata, field="metadata", span=self._otel_span ) - attributes = create_span_attributes( - input=media_processed_input, - output=media_processed_output, - metadata=media_processed_metadata, - ) - attributes.pop(LangfuseOtelSpanAttributes.OBSERVATION_TYPE) + attributes = {} + + if as_type == "generation": + attributes = create_generation_attributes( + input=input, + output=output, + metadata=metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, + ) + + else: + attributes = create_span_attributes( + input=media_processed_input, + output=media_processed_output, + metadata=media_processed_metadata, + version=version, + level=level, + status_message=status_message, + ) + + attributes.pop(LangfuseOtelSpanAttributes.OBSERVATION_TYPE, None) self._otel_span.set_attributes( {k: v for k, v in attributes.items() if v is not None} @@ -487,6 +528,9 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, ): """Initialize a new LangfuseSpan. @@ -497,6 +541,9 @@ def __init__( output: Output data from the span (any JSON-serializable object) metadata: Additional metadata to associate with the span environment: The tracing environment + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span """ super().__init__( otel_span=otel_span, @@ -506,6 +553,9 @@ def __init__( output=output, metadata=metadata, environment=environment, + version=version, + level=level, + status_message=status_message, ) def update( @@ -618,33 +668,19 @@ def start_span( parent_span.end() ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - with otel_trace_api.use_span(self._otel_span): - new_otel_span = self._langfuse_client._otel_tracer.start_span( - name=name, attributes=attributes - ) - - if new_otel_span.is_recording: - self._set_processed_span_attributes( - span=new_otel_span, - as_type="span", - input=input, - output=output, - metadata=metadata, - ) + new_otel_span = self._langfuse_client._otel_tracer.start_span(name=name) return LangfuseSpan( otel_span=new_otel_span, langfuse_client=self._langfuse_client, environment=self._environment, + input=input, + output=output, + metadata=metadata, + version=version, + level=level, + status_message=status_message, ) def start_as_current_span( @@ -692,26 +728,19 @@ def start_as_current_span( parent_span.update(output=result) ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - return cast( _AgnosticContextManager["LangfuseSpan"], self._langfuse_client._create_span_with_parent_context( name=name, - attributes=attributes, as_type="span", remote_parent_span=None, parent=self._otel_span, input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, ), ) @@ -789,7 +818,13 @@ def start_generation( span.end() ``` """ - attributes = create_generation_attributes( + with otel_trace_api.use_span(self._otel_span): + new_otel_span = self._langfuse_client._otel_tracer.start_span(name=name) + + return LangfuseGeneration( + otel_span=new_otel_span, + langfuse_client=self._langfuse_client, + environment=self._environment, input=input, output=output, metadata=metadata, @@ -804,26 +839,6 @@ def start_generation( prompt=prompt, ) - with otel_trace_api.use_span(self._otel_span): - new_otel_span = self._langfuse_client._otel_tracer.start_span( - name=name, attributes=attributes - ) - - if new_otel_span.is_recording: - self._set_processed_span_attributes( - span=new_otel_span, - as_type="generation", - input=input, - output=output, - metadata=metadata, - ) - - return LangfuseGeneration( - otel_span=new_otel_span, - langfuse_client=self._langfuse_client, - environment=self._environment, - ) - def start_as_current_generation( self, *, @@ -893,32 +908,25 @@ def start_as_current_generation( span.update(output={"answer": response.text, "source": "gpt-4"}) ``` """ - attributes = create_generation_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - completion_start_time=completion_start_time, - model=model, - model_parameters=model_parameters, - usage_details=usage_details, - cost_details=cost_details, - prompt=prompt, - ) - return cast( _AgnosticContextManager["LangfuseGeneration"], self._langfuse_client._create_span_with_parent_context( name=name, - attributes=attributes, as_type="generation", remote_parent_span=None, parent=self._otel_span, input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ), ) @@ -953,29 +961,12 @@ def create_event( ``` """ timestamp = time_ns() - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) with otel_trace_api.use_span(self._otel_span): new_otel_span = self._langfuse_client._otel_tracer.start_span( - name=name, attributes=attributes, start_time=timestamp + name=name, start_time=timestamp ) - if new_otel_span.is_recording: - self._set_processed_span_attributes( - span=new_otel_span, - as_type="event", - input=input, - output=output, - metadata=metadata, - ) - return LangfuseEvent( otel_span=new_otel_span, langfuse_client=self._langfuse_client, @@ -983,6 +974,9 @@ def create_event( output=output, metadata=metadata, environment=self._environment, + version=version, + level=level, + status_message=status_message, ).end(end_time=timestamp) @@ -1003,6 +997,15 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): """Initialize a new LangfuseGeneration span. @@ -1013,6 +1016,15 @@ def __init__( output: Output from the generation (e.g., completions) metadata: Additional metadata to associate with the generation environment: The tracing environment + version: Version identifier for the model or component + level: Importance level of the generation (info, warning, error) + status_message: Optional status message for the generation + completion_start_time: When the model started generating the response + model: Name/identifier of the AI model used (e.g., "gpt-4") + model_parameters: Parameters used for the model (e.g., temperature, max_tokens) + usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) + cost_details: Cost information for the model call + prompt: Associated prompt template from Langfuse prompt management """ super().__init__( otel_span=otel_span, @@ -1022,6 +1034,15 @@ def __init__( output=output, metadata=metadata, environment=environment, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) def update( @@ -1134,6 +1155,9 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, ): """Initialize a new LangfuseEvent span. @@ -1144,6 +1168,9 @@ def __init__( output: Output from the event metadata: Additional metadata to associate with the generation environment: The tracing environment + version: Version identifier for the model or component + level: Importance level of the generation (info, warning, error) + status_message: Optional status message for the generation """ super().__init__( otel_span=otel_span, @@ -1153,4 +1180,7 @@ def __init__( output=output, metadata=metadata, environment=environment, + version=version, + level=level, + status_message=status_message, ) From cff1f13ce2531d7423f0f2758932f20f89992699 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 10 Jun 2025 17:36:56 +0200 Subject: [PATCH 4/8] fix score sampling issue --- langfuse/_client/client.py | 10 +++++++++- langfuse/_client/resource_manager.py | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 823d99438..adc543639 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -1348,7 +1348,15 @@ def create_score( } if self._resources is not None: - self._resources.add_score_task(event) + # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar + force_sample = ( + not self._is_valid_trace_id(trace_id) if trace_id else True + ) + + self._resources.add_score_task( + event, + force_sample=force_sample, + ) except Exception as e: langfuse_logger.exception( diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 915748071..aa6a1c460 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -258,11 +258,11 @@ def _initialize_instance( def reset(cls): cls._instances.clear() - def add_score_task(self, event: dict): + def add_score_task(self, event: dict, *, force_sample: bool = False): try: # Sample scores with the same sampler that is used for tracing tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) - should_sample = ( + should_sample = force_sample or ( ( tracer_provider.sampler.should_sample( parent_context=None, From 1e0eab26d72b962c3154cba03e1059cf5cbaeb99 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 10 Jun 2025 18:04:19 +0200 Subject: [PATCH 5/8] fix --- langfuse/_client/span.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 1fbc03610..f3c5ef920 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -130,9 +130,9 @@ def __init__( if as_type == "generation": attributes = create_generation_attributes( - input=input, - output=output, - metadata=metadata, + input=media_processed_input, + output=media_processed_output, + metadata=media_processed_metadata, version=version, level=level, status_message=status_message, From a385577a418ea61446544ad14d07cd8107af442b Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 10 Jun 2025 18:43:53 +0200 Subject: [PATCH 6/8] push --- tests/test_core_sdk.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index ac8b5e529..bfb12ac95 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -1783,6 +1783,8 @@ def test_create_trace_sampling_zero(): def test_mask_function(): def mask_func(data): if isinstance(data, dict): + if "should_raise" in data: + raise return {k: "MASKED" for k in data} elif isinstance(data, str): return "MASKED" @@ -1832,19 +1834,13 @@ def mask_func(data): assert fetched_span["input"] == {"data": "MASKED"} assert fetched_span["output"] == "MASKED" - # Test with faulty mask function - def faulty_mask_func(data): - raise Exception("Masking error") - - langfuse = Langfuse(mask=faulty_mask_func) - # Create a root span with trace properties with langfuse.start_as_current_span(name="test-span") as root_span: - root_span.update_trace(name="test_trace", input={"sensitive": "data"}) + root_span.update_trace(name="test_trace", input={"should_raise": "data"}) # Get trace ID for later use trace_id = root_span.trace_id # Add output to the trace - root_span.update_trace(output={"more": "sensitive"}) + root_span.update_trace(output={"should_raise": "sensitive"}) # Ensure data is sent langfuse.flush() From 975a42404096d8d49aff781043a12d0f6a6e81a7 Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Tue, 10 Jun 2025 22:08:16 +0200 Subject: [PATCH 7/8] fix --- langfuse/_client/resource_manager.py | 3 ++- tests/test_core_sdk.py | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index aa6a1c460..33cd5e791 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -256,7 +256,8 @@ def _initialize_instance( @classmethod def reset(cls): - cls._instances.clear() + for key in cls._instances: + cls._instances.pop(key).shutdown() def add_score_task(self, event: dict, *, force_sample: bool = False): try: diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index bfb12ac95..914b2fb65 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -7,6 +7,7 @@ import pytest from langfuse import Langfuse +from langfuse._client.resource_manager import LangfuseResourceManager from langfuse._utils import _get_timestamp from tests.api_wrapper import LangfuseAPI from tests.utils import ( @@ -1781,6 +1782,8 @@ def test_create_trace_sampling_zero(): def test_mask_function(): + LangfuseResourceManager.reset() + def mask_func(data): if isinstance(data, dict): if "should_raise" in data: From e66ef81395cabbfa5e53c3db1f9436b9ccfc150a Mon Sep 17 00:00:00 2001 From: Hassieb Pakzad <68423100+hassiebp@users.noreply.github.com> Date: Wed, 11 Jun 2025 09:45:35 +0200 Subject: [PATCH 8/8] fix --- langfuse/_client/resource_manager.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 33cd5e791..5eceb3fe5 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -256,8 +256,11 @@ def _initialize_instance( @classmethod def reset(cls): - for key in cls._instances: - cls._instances.pop(key).shutdown() + with cls._lock: + for key in cls._instances: + cls._instances[key].shutdown() + + cls._instances.clear() def add_score_task(self, event: dict, *, force_sample: bool = False): try: