Skip to content

Commit 67721df

Browse files
authored
feat(span-streaming): Add span batcher (#5398)
- Add a specialized `SpanBatcher` that batches and sends spans by trace ID. Hook it up to the client. - Add a skeleton of a new `StreamedSpan` class that will eventually replace the current `Span` class. Chipping away at #5317 to transform it into reviewable and mergeable PRs.
1 parent 97ee0b1 commit 67721df

File tree

3 files changed

+175
-1
lines changed

3 files changed

+175
-1
lines changed

sentry_sdk/_span_batcher.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
import threading
2+
from collections import defaultdict
3+
from datetime import datetime, timezone
4+
from typing import TYPE_CHECKING
5+
6+
from sentry_sdk._batcher import Batcher
7+
from sentry_sdk.consts import SPANSTATUS
8+
from sentry_sdk.envelope import Envelope, Item, PayloadRef
9+
from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr
10+
11+
if TYPE_CHECKING:
12+
from typing import Any, Callable, Optional
13+
from sentry_sdk.traces import StreamedSpan
14+
from sentry_sdk._types import SerializedAttributeValue
15+
16+
17+
class SpanBatcher(Batcher["StreamedSpan"]):
18+
# TODO[span-first]: size-based flushes
19+
# TODO[span-first]: adjust flush/drop defaults
20+
MAX_BEFORE_FLUSH = 1000
21+
MAX_BEFORE_DROP = 5000
22+
FLUSH_WAIT_TIME = 5.0
23+
24+
TYPE = "span"
25+
CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json"
26+
27+
def __init__(
28+
self,
29+
capture_func: "Callable[[Envelope], None]",
30+
record_lost_func: "Callable[..., None]",
31+
) -> None:
32+
# Spans from different traces cannot be emitted in the same envelope
33+
# since the envelope contains a shared trace header. That's why we bucket
34+
# by trace_id, so that we can then send the buckets each in its own
35+
# envelope.
36+
# trace_id -> span buffer
37+
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
38+
self._capture_func = capture_func
39+
self._record_lost_func = record_lost_func
40+
self._running = True
41+
self._lock = threading.Lock()
42+
43+
self._flush_event: "threading.Event" = threading.Event()
44+
45+
self._flusher: "Optional[threading.Thread]" = None
46+
self._flusher_pid: "Optional[int]" = None
47+
48+
def get_size(self) -> int:
49+
# caller is responsible for locking before checking this
50+
return sum(len(buffer) for buffer in self._span_buffer.values())
51+
52+
def add(self, span: "StreamedSpan") -> None:
53+
if not self._ensure_thread() or self._flusher is None:
54+
return None
55+
56+
with self._lock:
57+
size = self.get_size()
58+
if size >= self.MAX_BEFORE_DROP:
59+
self._record_lost_func(
60+
reason="queue_overflow",
61+
data_category="span",
62+
quantity=1,
63+
)
64+
return None
65+
66+
self._span_buffer[span.trace_id].append(span)
67+
if size + 1 >= self.MAX_BEFORE_FLUSH:
68+
self._flush_event.set()
69+
70+
@staticmethod
71+
def _to_transport_format(item: "StreamedSpan") -> "Any":
72+
# TODO[span-first]
73+
res: "dict[str, Any]" = {}
74+
return res
75+
76+
def _flush(self) -> None:
77+
with self._lock:
78+
if len(self._span_buffer) == 0:
79+
return None
80+
81+
envelopes = []
82+
for trace_id, spans in self._span_buffer.items():
83+
if spans:
84+
# TODO[span-first]
85+
# dsc = spans[0].dynamic_sampling_context()
86+
dsc = None
87+
88+
envelope = Envelope(
89+
headers={
90+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
91+
"trace": dsc,
92+
}
93+
)
94+
95+
envelope.add_item(
96+
Item(
97+
type="span",
98+
content_type="application/vnd.sentry.items.span.v2+json",
99+
headers={
100+
"item_count": len(spans),
101+
},
102+
payload=PayloadRef(
103+
json={
104+
"items": [
105+
self._to_transport_format(span)
106+
for span in spans
107+
]
108+
}
109+
),
110+
)
111+
)
112+
113+
envelopes.append(envelope)
114+
115+
self._span_buffer.clear()
116+
117+
for envelope in envelopes:
118+
self._capture_func(envelope)

sentry_sdk/client.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import sentry_sdk
1212
from sentry_sdk._compat import PY37, check_uwsgi_thread_support
1313
from sentry_sdk._metrics_batcher import MetricsBatcher
14+
from sentry_sdk._span_batcher import SpanBatcher
1415
from sentry_sdk.utils import (
1516
AnnotatedValue,
1617
ContextVar,
@@ -31,6 +32,7 @@
3132
)
3233
from sentry_sdk.serializer import serialize
3334
from sentry_sdk.tracing import trace
35+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
3436
from sentry_sdk.transport import BaseHttpTransport, make_transport
3537
from sentry_sdk.consts import (
3638
SPANDATA,
@@ -188,6 +190,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None:
188190
self.monitor: "Optional[Monitor]" = None
189191
self.log_batcher: "Optional[LogBatcher]" = None
190192
self.metrics_batcher: "Optional[MetricsBatcher]" = None
193+
self.span_batcher: "Optional[SpanBatcher]" = None
191194
self.integrations: "dict[str, Integration]" = {}
192195

193196
def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any":
@@ -399,6 +402,13 @@ def _record_lost_event(
399402
record_lost_func=_record_lost_event,
400403
)
401404

405+
self.span_batcher = None
406+
if has_span_streaming_enabled(self.options):
407+
self.span_batcher = SpanBatcher(
408+
capture_func=_capture_envelope,
409+
record_lost_func=_record_lost_event,
410+
)
411+
402412
max_request_body_size = ("always", "never", "small", "medium")
403413
if self.options["max_request_body_size"] not in max_request_body_size:
404414
raise ValueError(
@@ -909,7 +919,10 @@ def capture_event(
909919
return return_value
910920

911921
def _capture_telemetry(
912-
self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope"
922+
self,
923+
telemetry: "Optional[Union[Log, Metric]]",
924+
ty: str,
925+
scope: "Scope",
913926
) -> None:
914927
# Capture attributes-based telemetry (logs, metrics, spansV2)
915928
if telemetry is None:
@@ -993,6 +1006,8 @@ def close(
9931006
self.log_batcher.kill()
9941007
if self.metrics_batcher is not None:
9951008
self.metrics_batcher.kill()
1009+
if self.span_batcher is not None:
1010+
self.span_batcher.kill()
9961011
if self.monitor:
9971012
self.monitor.kill()
9981013
self.transport.kill()
@@ -1018,6 +1033,8 @@ def flush(
10181033
self.log_batcher.flush()
10191034
if self.metrics_batcher is not None:
10201035
self.metrics_batcher.flush()
1036+
if self.span_batcher is not None:
1037+
self.span_batcher.flush()
10211038
self.transport.flush(timeout=timeout, callback=callback)
10221039

10231040
def __enter__(self) -> "_Client":

sentry_sdk/traces.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""
2+
The API in this file is only meant to be used in span streaming mode.
3+
4+
You can enable span streaming mode via
5+
sentry_sdk.init(_experiments={"trace_lifecycle": "stream"}).
6+
"""
7+
8+
import uuid
9+
from typing import TYPE_CHECKING
10+
11+
if TYPE_CHECKING:
12+
from typing import Optional
13+
14+
15+
class StreamedSpan:
16+
"""
17+
A span holds timing information of a block of code.
18+
19+
Spans can have multiple child spans thus forming a span tree.
20+
21+
This is the Span First span implementation. The original transaction-based
22+
span implementation lives in tracing.Span.
23+
"""
24+
25+
__slots__ = ("_trace_id",)
26+
27+
def __init__(
28+
self,
29+
*,
30+
trace_id: "Optional[str]" = None,
31+
):
32+
self._trace_id = trace_id
33+
34+
@property
35+
def trace_id(self) -> str:
36+
if not self._trace_id:
37+
self._trace_id = uuid.uuid4().hex
38+
39+
return self._trace_id

0 commit comments

Comments
 (0)