-
Notifications
You must be signed in to change notification settings - Fork 224
internal: showcase context propagation using baggage #1349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,6 +20,7 @@ | |
| TYPE_CHECKING, | ||
| Any, | ||
| Dict, | ||
| Iterator, | ||
| List, | ||
| Literal, | ||
| Optional, | ||
|
|
@@ -29,7 +30,9 @@ | |
| overload, | ||
| ) | ||
|
|
||
| from contextlib import contextmanager | ||
| from opentelemetry import trace as otel_trace_api | ||
| from opentelemetry import context | ||
| from opentelemetry.util._decorator import _AgnosticContextManager | ||
|
|
||
| from langfuse.model import PromptClient | ||
|
|
@@ -42,6 +45,7 @@ | |
| create_generation_attributes, | ||
| create_span_attributes, | ||
| create_trace_attributes, | ||
| propagate_attributes, | ||
| ) | ||
| from langfuse._client.constants import ( | ||
| ObservationTypeLiteral, | ||
|
|
@@ -203,6 +207,7 @@ def end(self, *, end_time: Optional[int] = None) -> "LangfuseObservationWrapper" | |
|
|
||
| return self | ||
|
|
||
| @contextmanager | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're turning this into a context manager as we need to detach the context at the end of the execution to avoid memory leaks and issues in async/multi-thread processing. Every call like span.update_trace(name="foo")
[...]
# ongoing executionmust become with span.update_trace(name="foo"):
[...]
# ongoing executionto ensure that all child spans will be started with the attached context.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wow, this is a breaking change! |
||
| def update_trace( | ||
| self, | ||
| *, | ||
|
|
@@ -215,7 +220,7 @@ def update_trace( | |
| metadata: Optional[Any] = None, | ||
| tags: Optional[List[str]] = None, | ||
| public: Optional[bool] = None, | ||
| ) -> "LangfuseObservationWrapper": | ||
| ) -> Iterator[None]: | ||
| """Update the trace that this span belongs to. | ||
|
|
||
| This method updates trace-level attributes of the trace that this span | ||
|
|
@@ -234,7 +239,7 @@ def update_trace( | |
| public: Whether the trace should be publicly accessible | ||
| """ | ||
| if not self._otel_span.is_recording(): | ||
| return self | ||
| yield | ||
|
|
||
| media_processed_input = self._process_media_and_apply_mask( | ||
| data=input, field="input", span=self._otel_span | ||
|
|
@@ -258,9 +263,19 @@ def update_trace( | |
| public=public, | ||
| ) | ||
|
|
||
| ctx = otel_trace_api.set_span_in_context(self._otel_span) | ||
| ctx = propagate_attributes( | ||
| current_ctx=ctx, | ||
| dict_to_propagate=attributes, | ||
| ) | ||
|
|
||
| token = context.attach(ctx) | ||
| self._otel_span.set_attributes(attributes) | ||
|
|
||
| return self | ||
| try: | ||
| yield | ||
| finally: | ||
| context.detach(token) | ||
|
|
||
| @overload | ||
| def score( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,9 +15,11 @@ | |
| import os | ||
| from typing import Dict, List, Optional | ||
|
|
||
| from opentelemetry.context import Context | ||
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | ||
| from opentelemetry.sdk.trace import ReadableSpan | ||
| from opentelemetry.sdk.trace import ReadableSpan, Span | ||
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | ||
| from opentelemetry.processor.baggage import BaggageSpanProcessor, ALLOW_ALL_BAGGAGE_KEYS | ||
|
|
||
| from langfuse._client.constants import LANGFUSE_TRACER_NAME | ||
| from langfuse._client.environment_variables import ( | ||
|
|
@@ -65,6 +67,10 @@ def __init__( | |
| else [] | ||
| ) | ||
|
|
||
| # Initialize a BaggageSpanProcessor so baggage keys are attached to spans. | ||
| # Allow all baggage keys by default (same behaviour as before). | ||
| self._baggage_processor = BaggageSpanProcessor(ALLOW_ALL_BAGGAGE_KEYS) | ||
|
|
||
| env_flush_at = os.environ.get(LANGFUSE_FLUSH_AT, None) | ||
| flush_at = flush_at or int(env_flush_at) if env_flush_at is not None else None | ||
|
|
||
|
|
@@ -105,6 +111,16 @@ def __init__( | |
| else None, | ||
| ) | ||
|
|
||
| def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're injecting the baggage span processor in the The other overwrites are taken as-is from the SpanProcessor, i.e. there is no need to overwrite the on_end, etc. |
||
| # Forward to baggage processor first so baggage can be attached early. | ||
| self._baggage_processor.on_start(span, parent_context) | ||
|
|
||
| # Call parent on_start if it exists (no-op for BatchSpanProcessor but safe). | ||
| try: | ||
| super().on_start(span, parent_context) | ||
| except TypeError: | ||
| super().on_start(span) | ||
|
|
||
| def on_end(self, span: ReadableSpan) -> None: | ||
| # Only export spans that belong to the scoped project | ||
| # This is important to not send spans to wrong project in multi-project setups | ||
|
|
||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The context is immutable, i.e. every set_baggage operation produces a new, updated context with the relevant properties. We update whatever context we get and return the new one.