diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index 74e99a26f..adc543639 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -22,11 +22,7 @@ _agnosticcontextmanager, ) -from langfuse._client.attributes import ( - LangfuseOtelSpanAttributes, - create_generation_attributes, - create_span_attributes, -) +from langfuse._client.attributes import LangfuseOtelSpanAttributes from langfuse._client.datasets import DatasetClient, DatasetItemClient from langfuse._client.environment_variables import ( LANGFUSE_DEBUG, @@ -142,6 +138,9 @@ class Langfuse: ``` """ + _resources: Optional[LangfuseResourceManager] = None + _mask: Optional[MaskFunction] = None + def __init__( self, *, @@ -162,7 +161,6 @@ def __init__( ): self._host = host or os.environ.get(LANGFUSE_HOST, "https://cloud.langfuse.com") self._environment = environment or os.environ.get(LANGFUSE_TRACING_ENVIRONMENT) - self._mask = mask self._project_id = None sample_rate = sample_rate or float(os.environ.get(LANGFUSE_SAMPLE_RATE, 1.0)) if not 0.0 <= sample_rate <= 1.0: @@ -191,7 +189,6 @@ def __init__( langfuse_logger.warning( "Authentication error: Langfuse client initialized without public_key. Client will be disabled. " "Provide a public_key parameter or set LANGFUSE_PUBLIC_KEY environment variable. " - "See documentation: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" ) self._otel_tracer = otel_trace_api.NoOpTracer() return @@ -201,7 +198,6 @@ def __init__( langfuse_logger.warning( "Authentication error: Langfuse client initialized without secret_key. Client will be disabled. " "Provide a secret_key parameter or set LANGFUSE_SECRET_KEY environment variable. " - "See documentation: https://langfuse.com/docs/sdk/python/low-level-sdk#initialize-client" ) self._otel_tracer = otel_trace_api.NoOpTracer() return @@ -219,7 +215,9 @@ def __init__( httpx_client=httpx_client, media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, + mask=mask, ) + self._mask = self._resources.mask self._otel_tracer = ( self._resources.tracer @@ -271,15 +269,6 @@ def start_span( span.end() ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -292,29 +281,33 @@ def start_span( with otel_trace_api.use_span( cast(otel_trace_api.Span, remote_parent_span) ): - otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes - ) + otel_span = self._otel_tracer.start_span(name=name) otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) return LangfuseSpan( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ) - otel_span = self._otel_tracer.start_span(name=name, attributes=attributes) + otel_span = self._otel_tracer.start_span(name=name) return LangfuseSpan( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ) def start_as_current_span( @@ -365,15 +358,6 @@ def start_as_current_span( child_span.update(output="sub-result") ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -388,13 +372,15 @@ def start_as_current_span( self._create_span_with_parent_context( as_type="span", name=name, - attributes=attributes, remote_parent_span=remote_parent_span, parent=None, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, ), ) @@ -403,11 +389,13 @@ def start_as_current_span( self._start_as_current_otel_span_with_processed_media( as_type="span", name=name, - attributes=attributes, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, ), ) @@ -479,21 +467,6 @@ def start_generation( generation.end() ``` """ - attributes = create_generation_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - completion_start_time=completion_start_time, - model=model, - model_parameters=model_parameters, - usage_details=usage_details, - cost_details=cost_details, - prompt=prompt, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -506,9 +479,7 @@ def start_generation( with otel_trace_api.use_span( cast(otel_trace_api.Span, remote_parent_span) ): - otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes - ) + otel_span = self._otel_tracer.start_span(name=name) otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) return LangfuseGeneration( @@ -517,9 +488,18 @@ def start_generation( input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) - otel_span = self._otel_tracer.start_span(name=name, attributes=attributes) + otel_span = self._otel_tracer.start_span(name=name) return LangfuseGeneration( otel_span=otel_span, @@ -527,6 +507,15 @@ def start_generation( input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) def start_as_current_generation( @@ -596,21 +585,6 @@ def start_as_current_generation( ) ``` """ - attributes = create_generation_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - completion_start_time=completion_start_time, - model=model, - model_parameters=model_parameters, - usage_details=usage_details, - cost_details=cost_details, - prompt=prompt, - ) - if trace_context: trace_id = trace_context.get("trace_id", None) parent_span_id = trace_context.get("parent_span_id", None) @@ -625,13 +599,21 @@ def start_as_current_generation( self._create_span_with_parent_context( as_type="generation", name=name, - attributes=attributes, remote_parent_span=remote_parent_span, parent=None, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ), ) @@ -640,11 +622,19 @@ def start_as_current_generation( self._start_as_current_otel_span_with_processed_media( as_type="generation", name=name, - attributes=attributes, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ), ) @@ -655,24 +645,40 @@ def _create_span_with_parent_context( name, parent, remote_parent_span, - attributes, as_type: Literal["generation", "span"], + end_on_exit: Optional[bool] = None, input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, - end_on_exit: Optional[bool] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): parent_span = parent or cast(otel_trace_api.Span, remote_parent_span) with otel_trace_api.use_span(parent_span): with self._start_as_current_otel_span_with_processed_media( name=name, - attributes=attributes, as_type=as_type, + end_on_exit=end_on_exit, input=input, output=output, metadata=metadata, - end_on_exit=end_on_exit, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) as langfuse_span: if remote_parent_span is not None: langfuse_span._otel_span.set_attribute( @@ -686,35 +692,54 @@ def _start_as_current_otel_span_with_processed_media( self, *, name: str, - attributes: Dict[str, str], as_type: Optional[Literal["generation", "span"]] = None, + end_on_exit: Optional[bool] = None, input: Optional[Any] = None, output: Optional[Any] = None, metadata: Optional[Any] = None, - end_on_exit: Optional[bool] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): with self._otel_tracer.start_as_current_span( name=name, - attributes=attributes, end_on_exit=end_on_exit if end_on_exit is not None else True, ) as otel_span: yield ( LangfuseSpan( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ) if as_type == "span" else LangfuseGeneration( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) ) @@ -986,14 +1011,6 @@ def create_event( ``` """ timestamp = time_ns() - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) if trace_context: trace_id = trace_context.get("trace_id", None) @@ -1008,30 +1025,34 @@ def create_event( cast(otel_trace_api.Span, remote_parent_span) ): otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes, start_time=timestamp + name=name, start_time=timestamp ) otel_span.set_attribute(LangfuseOtelSpanAttributes.AS_ROOT, True) return LangfuseEvent( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ).end(end_time=timestamp) - otel_span = self._otel_tracer.start_span( - name=name, attributes=attributes, start_time=timestamp - ) + otel_span = self._otel_tracer.start_span(name=name, start_time=timestamp) return LangfuseEvent( otel_span=otel_span, langfuse_client=self, + environment=self._environment, input=input, output=output, metadata=metadata, - environment=self._environment, + version=version, + level=level, + status_message=status_message, ).end(end_time=timestamp) def _create_remote_parent_span( @@ -1325,7 +1346,17 @@ def create_score( "timestamp": _get_timestamp(), "body": new_body, } - self._resources.add_score_task(event) + + if self._resources is not None: + # Force the score to be in sample if it was for a legacy trace ID, i.e. non-32 hexchar + force_sample = ( + not self._is_valid_trace_id(trace_id) if trace_id else True + ) + + self._resources.add_score_task( + event, + force_sample=force_sample, + ) except Exception as e: langfuse_logger.exception( @@ -1519,7 +1550,8 @@ def flush(self): # Continue with other work ``` """ - self._resources.flush() + if self._resources is not None: + self._resources.flush() def shutdown(self): """Shut down the Langfuse client and flush all pending data. @@ -1543,7 +1575,8 @@ def shutdown(self): langfuse.shutdown() ``` """ - self._resources.shutdown() + if self._resources is not None: + self._resources.shutdown() def get_current_trace_id(self) -> Optional[str]: """Get the trace ID of the current active span. @@ -1925,6 +1958,10 @@ def get_prompt( Exception: Propagates any exceptions raised during the fetching of a new prompt, unless there is an expired prompt in the cache, in which case it logs a warning and returns the expired prompt. """ + if self._resources is None: + raise Error( + "SDK is not correctly initalized. Check the init logs for more details." + ) if version is not None and label is not None: raise ValueError("Cannot specify both version and label at the same time.") @@ -2050,7 +2087,8 @@ def fetch_prompts(): else: prompt = TextPromptClient(prompt_response) - self._resources.prompt_cache.set(cache_key, prompt, ttl_seconds) + if self._resources is not None: + self._resources.prompt_cache.set(cache_key, prompt, ttl_seconds) return prompt @@ -2149,7 +2187,8 @@ def create_prompt( ) server_prompt = self.api.prompts.create(request=request) - self._resources.prompt_cache.invalidate(name) + if self._resources is not None: + self._resources.prompt_cache.invalidate(name) return ChatPromptClient(prompt=cast(Prompt_Chat, server_prompt)) @@ -2168,7 +2207,8 @@ def create_prompt( server_prompt = self.api.prompts.create(request=request) - self._resources.prompt_cache.invalidate(name) + if self._resources is not None: + self._resources.prompt_cache.invalidate(name) return TextPromptClient(prompt=cast(Prompt_Text, server_prompt)) @@ -2199,7 +2239,9 @@ def update_prompt( version=version, new_labels=new_labels, ) - self._resources.prompt_cache.invalidate(name) + + if self._resources is not None: + self._resources.prompt_cache.invalidate(name) return updated_prompt diff --git a/langfuse/_client/get_client.py b/langfuse/_client/get_client.py index fe891e05a..b8eaa198c 100644 --- a/langfuse/_client/get_client.py +++ b/langfuse/_client/get_client.py @@ -56,7 +56,16 @@ def get_client(*, public_key: Optional[str] = None) -> Langfuse: if len(active_instances) == 1: # Only one client exists, safe to use without specifying key - return Langfuse(public_key=public_key) + instance = list(active_instances.values())[0] + + # Initialize with the credentials bound to the instance + # This is important if the original instance was instantiated + # via constructor arguments + return Langfuse( + public_key=instance.public_key, + secret_key=instance.secret_key, + host=instance.host, + ) else: # Multiple clients exist but no key specified - disable tracing diff --git a/langfuse/_client/resource_manager.py b/langfuse/_client/resource_manager.py index 14cd84ec9..5eceb3fe5 100644 --- a/langfuse/_client/resource_manager.py +++ b/langfuse/_client/resource_manager.py @@ -18,7 +18,7 @@ import os import threading from queue import Full, Queue -from typing import Dict, Optional, cast, Any +from typing import Any, Dict, Optional, cast import httpx from opentelemetry import trace as otel_trace_api @@ -43,6 +43,7 @@ from langfuse._utils.request import LangfuseClient from langfuse.api.client import AsyncFernLangfuse, FernLangfuse from langfuse.logger import langfuse_logger +from langfuse.types import MaskFunction from ..version import __version__ as langfuse_version @@ -90,6 +91,7 @@ def __new__( httpx_client: Optional[httpx.Client] = None, media_upload_thread_count: Optional[int] = None, sample_rate: Optional[float] = None, + mask: Optional[MaskFunction] = None, ) -> "LangfuseResourceManager": if public_key in cls._instances: return cls._instances[public_key] @@ -110,6 +112,7 @@ def __new__( httpx_client=httpx_client, media_upload_thread_count=media_upload_thread_count, sample_rate=sample_rate, + mask=mask, ) cls._instances[public_key] = instance @@ -130,8 +133,12 @@ def _initialize_instance( media_upload_thread_count: Optional[int] = None, httpx_client: Optional[httpx.Client] = None, sample_rate: Optional[float] = None, + mask: Optional[MaskFunction] = None, ): self.public_key = public_key + self.secret_key = secret_key + self.host = host + self.mask = mask # OTEL Tracer tracer_provider = _init_tracer_provider( @@ -249,13 +256,17 @@ def _initialize_instance( @classmethod def reset(cls): - cls._instances.clear() + with cls._lock: + for key in cls._instances: + cls._instances[key].shutdown() + + cls._instances.clear() - def add_score_task(self, event: dict): + def add_score_task(self, event: dict, *, force_sample: bool = False): try: # Sample scores with the same sampler that is used for tracing tracer_provider = cast(TracerProvider, otel_trace_api.get_tracer_provider()) - should_sample = ( + should_sample = force_sample or ( ( tracer_provider.sampler.should_sample( parent_context=None, diff --git a/langfuse/_client/span.py b/langfuse/_client/span.py index 39d5c62eb..f3c5ef920 100644 --- a/langfuse/_client/span.py +++ b/langfuse/_client/span.py @@ -69,6 +69,15 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): """Initialize a new Langfuse span wrapper. @@ -80,6 +89,15 @@ def __init__( output: Output data from the span (any JSON-serializable object) metadata: Additional metadata to associate with the span environment: The tracing environment + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span + completion_start_time: When the model started generating the response + model: Name/identifier of the AI model used (e.g., "gpt-4") + model_parameters: Parameters used for the model (e.g., temperature, max_tokens) + usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) + cost_details: Cost information for the model call + prompt: Associated prompt template from Langfuse prompt management """ self._otel_span = otel_span self._otel_span.set_attribute( @@ -108,12 +126,35 @@ def __init__( data=metadata, field="metadata", span=self._otel_span ) - attributes = create_span_attributes( - input=media_processed_input, - output=media_processed_output, - metadata=media_processed_metadata, - ) - attributes.pop(LangfuseOtelSpanAttributes.OBSERVATION_TYPE) + attributes = {} + + if as_type == "generation": + attributes = create_generation_attributes( + input=media_processed_input, + output=media_processed_output, + metadata=media_processed_metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, + ) + + else: + attributes = create_span_attributes( + input=media_processed_input, + output=media_processed_output, + metadata=media_processed_metadata, + version=version, + level=level, + status_message=status_message, + ) + + attributes.pop(LangfuseOtelSpanAttributes.OBSERVATION_TYPE, None) self._otel_span.set_attributes( {k: v for k, v in attributes.items() if v is not None} @@ -410,7 +451,7 @@ def _process_media_and_apply_mask( The processed and masked data """ return self._mask_attribute( - data=self._process_media_in_attribute(data=data, span=span, field=field) + data=self._process_media_in_attribute(data=data, field=field) ) def _mask_attribute(self, *, data): @@ -441,7 +482,6 @@ def _process_media_in_attribute( self, *, data: Optional[Any] = None, - span: otel_trace_api.Span, field: Union[Literal["input"], Literal["output"], Literal["metadata"]], ): """Process any media content in the attribute data. @@ -457,16 +497,17 @@ def _process_media_in_attribute( Returns: The data with any media content processed """ - media_processed_attribute = ( - self._langfuse_client._resources._media_manager._find_and_process_media( - data=data, - field=field, - trace_id=self.trace_id, - observation_id=self.id, + if self._langfuse_client._resources is not None: + return ( + self._langfuse_client._resources._media_manager._find_and_process_media( + data=data, + field=field, + trace_id=self.trace_id, + observation_id=self.id, + ) ) - ) - return media_processed_attribute + return data class LangfuseSpan(LangfuseSpanWrapper): @@ -487,6 +528,9 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, ): """Initialize a new LangfuseSpan. @@ -497,6 +541,9 @@ def __init__( output: Output data from the span (any JSON-serializable object) metadata: Additional metadata to associate with the span environment: The tracing environment + version: Version identifier for the code or component + level: Importance level of the span (info, warning, error) + status_message: Optional status message for the span """ super().__init__( otel_span=otel_span, @@ -506,6 +553,9 @@ def __init__( output=output, metadata=metadata, environment=environment, + version=version, + level=level, + status_message=status_message, ) def update( @@ -618,33 +668,19 @@ def start_span( parent_span.end() ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - with otel_trace_api.use_span(self._otel_span): - new_otel_span = self._langfuse_client._otel_tracer.start_span( - name=name, attributes=attributes - ) - - if new_otel_span.is_recording: - self._set_processed_span_attributes( - span=new_otel_span, - as_type="span", - input=input, - output=output, - metadata=metadata, - ) + new_otel_span = self._langfuse_client._otel_tracer.start_span(name=name) return LangfuseSpan( otel_span=new_otel_span, langfuse_client=self._langfuse_client, environment=self._environment, + input=input, + output=output, + metadata=metadata, + version=version, + level=level, + status_message=status_message, ) def start_as_current_span( @@ -692,26 +728,19 @@ def start_as_current_span( parent_span.update(output=result) ``` """ - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) - return cast( _AgnosticContextManager["LangfuseSpan"], self._langfuse_client._create_span_with_parent_context( name=name, - attributes=attributes, as_type="span", remote_parent_span=None, parent=self._otel_span, input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, ), ) @@ -789,7 +818,13 @@ def start_generation( span.end() ``` """ - attributes = create_generation_attributes( + with otel_trace_api.use_span(self._otel_span): + new_otel_span = self._langfuse_client._otel_tracer.start_span(name=name) + + return LangfuseGeneration( + otel_span=new_otel_span, + langfuse_client=self._langfuse_client, + environment=self._environment, input=input, output=output, metadata=metadata, @@ -804,26 +839,6 @@ def start_generation( prompt=prompt, ) - with otel_trace_api.use_span(self._otel_span): - new_otel_span = self._langfuse_client._otel_tracer.start_span( - name=name, attributes=attributes - ) - - if new_otel_span.is_recording: - self._set_processed_span_attributes( - span=new_otel_span, - as_type="generation", - input=input, - output=output, - metadata=metadata, - ) - - return LangfuseGeneration( - otel_span=new_otel_span, - langfuse_client=self._langfuse_client, - environment=self._environment, - ) - def start_as_current_generation( self, *, @@ -893,32 +908,25 @@ def start_as_current_generation( span.update(output={"answer": response.text, "source": "gpt-4"}) ``` """ - attributes = create_generation_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - completion_start_time=completion_start_time, - model=model, - model_parameters=model_parameters, - usage_details=usage_details, - cost_details=cost_details, - prompt=prompt, - ) - return cast( _AgnosticContextManager["LangfuseGeneration"], self._langfuse_client._create_span_with_parent_context( name=name, - attributes=attributes, as_type="generation", remote_parent_span=None, parent=self._otel_span, input=input, output=output, metadata=metadata, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ), ) @@ -953,29 +961,12 @@ def create_event( ``` """ timestamp = time_ns() - attributes = create_span_attributes( - input=input, - output=output, - metadata=metadata, - version=version, - level=level, - status_message=status_message, - ) with otel_trace_api.use_span(self._otel_span): new_otel_span = self._langfuse_client._otel_tracer.start_span( - name=name, attributes=attributes, start_time=timestamp + name=name, start_time=timestamp ) - if new_otel_span.is_recording: - self._set_processed_span_attributes( - span=new_otel_span, - as_type="event", - input=input, - output=output, - metadata=metadata, - ) - return LangfuseEvent( otel_span=new_otel_span, langfuse_client=self._langfuse_client, @@ -983,6 +974,9 @@ def create_event( output=output, metadata=metadata, environment=self._environment, + version=version, + level=level, + status_message=status_message, ).end(end_time=timestamp) @@ -1003,6 +997,15 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, + completion_start_time: Optional[datetime] = None, + model: Optional[str] = None, + model_parameters: Optional[Dict[str, MapValue]] = None, + usage_details: Optional[Dict[str, int]] = None, + cost_details: Optional[Dict[str, float]] = None, + prompt: Optional[PromptClient] = None, ): """Initialize a new LangfuseGeneration span. @@ -1013,6 +1016,15 @@ def __init__( output: Output from the generation (e.g., completions) metadata: Additional metadata to associate with the generation environment: The tracing environment + version: Version identifier for the model or component + level: Importance level of the generation (info, warning, error) + status_message: Optional status message for the generation + completion_start_time: When the model started generating the response + model: Name/identifier of the AI model used (e.g., "gpt-4") + model_parameters: Parameters used for the model (e.g., temperature, max_tokens) + usage_details: Token usage information (e.g., prompt_tokens, completion_tokens) + cost_details: Cost information for the model call + prompt: Associated prompt template from Langfuse prompt management """ super().__init__( otel_span=otel_span, @@ -1022,6 +1034,15 @@ def __init__( output=output, metadata=metadata, environment=environment, + version=version, + level=level, + status_message=status_message, + completion_start_time=completion_start_time, + model=model, + model_parameters=model_parameters, + usage_details=usage_details, + cost_details=cost_details, + prompt=prompt, ) def update( @@ -1134,6 +1155,9 @@ def __init__( output: Optional[Any] = None, metadata: Optional[Any] = None, environment: Optional[str] = None, + version: Optional[str] = None, + level: Optional[SpanLevel] = None, + status_message: Optional[str] = None, ): """Initialize a new LangfuseEvent span. @@ -1144,6 +1168,9 @@ def __init__( output: Output from the event metadata: Additional metadata to associate with the generation environment: The tracing environment + version: Version identifier for the model or component + level: Importance level of the generation (info, warning, error) + status_message: Optional status message for the generation """ super().__init__( otel_span=otel_span, @@ -1153,4 +1180,7 @@ def __init__( output=output, metadata=metadata, environment=environment, + version=version, + level=level, + status_message=status_message, ) diff --git a/tests/test_core_sdk.py b/tests/test_core_sdk.py index ac8b5e529..914b2fb65 100644 --- a/tests/test_core_sdk.py +++ b/tests/test_core_sdk.py @@ -7,6 +7,7 @@ import pytest from langfuse import Langfuse +from langfuse._client.resource_manager import LangfuseResourceManager from langfuse._utils import _get_timestamp from tests.api_wrapper import LangfuseAPI from tests.utils import ( @@ -1781,8 +1782,12 @@ def test_create_trace_sampling_zero(): def test_mask_function(): + LangfuseResourceManager.reset() + def mask_func(data): if isinstance(data, dict): + if "should_raise" in data: + raise return {k: "MASKED" for k in data} elif isinstance(data, str): return "MASKED" @@ -1832,19 +1837,13 @@ def mask_func(data): assert fetched_span["input"] == {"data": "MASKED"} assert fetched_span["output"] == "MASKED" - # Test with faulty mask function - def faulty_mask_func(data): - raise Exception("Masking error") - - langfuse = Langfuse(mask=faulty_mask_func) - # Create a root span with trace properties with langfuse.start_as_current_span(name="test-span") as root_span: - root_span.update_trace(name="test_trace", input={"sensitive": "data"}) + root_span.update_trace(name="test_trace", input={"should_raise": "data"}) # Get trace ID for later use trace_id = root_span.trace_id # Add output to the trace - root_span.update_trace(output={"more": "sensitive"}) + root_span.update_trace(output={"should_raise": "sensitive"}) # Ensure data is sent langfuse.flush() diff --git a/tests/test_otel.py b/tests/test_otel.py index 0206a5d94..5611f1c3f 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -2355,7 +2355,7 @@ def mask_sensitive_data(data): # Since _process_media_in_attribute makes calls to media_manager original_process = span._process_media_in_attribute - def mock_process_media(*, data, span, field): + def mock_process_media(*, data, field): # Just return the data directly without processing return data