Skip to content

Commit 87f98f0

Browse files
authored
perf: move serialization to background threads (#994)
1 parent 05895c9 commit 87f98f0

File tree

5 files changed

+147
-58
lines changed

5 files changed

+147
-58
lines changed

langfuse/callback/langchain.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -991,14 +991,8 @@ def _log_debug_event(
991991
parent_run_id: Optional[UUID] = None,
992992
**kwargs,
993993
):
994-
kwargs_log = (
995-
", " + ", ".join([f"{key}: {value}" for key, value in kwargs.items()])
996-
if len(kwargs) > 0
997-
else ""
998-
)
999994
self.log.debug(
1000995
f"Event: {event_name}, run_id: {str(run_id)[:5]}, parent_run_id: {str(parent_run_id)[:5]}"
1001-
+ kwargs_log
1002996
)
1003997

1004998

langfuse/client.py

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1334,11 +1334,11 @@ def trace(
13341334

13351335
new_body = TraceBody(**new_dict)
13361336

1337-
self.log.debug(f"Creating trace {new_body}")
1337+
self.log.debug(f"Creating trace {_filter_io_from_event_body(new_dict)}")
13381338
event = {
13391339
"id": str(uuid.uuid4()),
13401340
"type": "trace-create",
1341-
"body": new_body.dict(exclude_none=True),
1341+
"body": new_body,
13421342
}
13431343

13441344
self.task_manager.add_task(
@@ -1503,7 +1503,7 @@ def score(
15031503
event = {
15041504
"id": str(uuid.uuid4()),
15051505
"type": "score-create",
1506-
"body": new_body.dict(exclude_none=True),
1506+
"body": new_body,
15071507
}
15081508
self.task_manager.add_task(event)
15091509

@@ -1604,17 +1604,16 @@ def span(
16041604
if trace_id is None:
16051605
self._generate_trace(new_trace_id, name or new_trace_id)
16061606

1607-
self.log.debug(f"Creating span {span_body}...")
1607+
self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...")
16081608

16091609
span_body = CreateSpanBody(**span_body)
16101610

16111611
event = {
16121612
"id": str(uuid.uuid4()),
16131613
"type": "span-create",
1614-
"body": span_body.dict(exclude_none=True),
1614+
"body": span_body,
16151615
}
16161616

1617-
self.log.debug(f"Creating span {event}...")
16181617
self.task_manager.add_task(event)
16191618

16201619
except Exception as e:
@@ -1710,10 +1709,12 @@ def event(
17101709
event = {
17111710
"id": str(uuid.uuid4()),
17121711
"type": "event-create",
1713-
"body": request.dict(exclude_none=True),
1712+
"body": request,
17141713
}
17151714

1716-
self.log.debug(f"Creating event {event}...")
1715+
self.log.debug(
1716+
f"Creating event {_filter_io_from_event_body(event_body)} ..."
1717+
)
17171718
self.task_manager.add_task(event)
17181719

17191720
except Exception as e:
@@ -1835,23 +1836,24 @@ def generation(
18351836
event = {
18361837
"id": str(uuid.uuid4()),
18371838
"type": "trace-create",
1838-
"body": request.dict(exclude_none=True),
1839+
"body": request,
18391840
}
18401841

1841-
self.log.debug(f"Creating trace {event}...")
1842+
self.log.debug("Creating trace...")
18421843

18431844
self.task_manager.add_task(event)
18441845

1845-
self.log.debug(f"Creating generation max {generation_body} {usage}...")
1846+
self.log.debug(
1847+
f"Creating generation max {_filter_io_from_event_body(generation_body)}..."
1848+
)
18461849
request = CreateGenerationBody(**generation_body)
18471850

18481851
event = {
18491852
"id": str(uuid.uuid4()),
18501853
"type": "generation-create",
1851-
"body": request.dict(exclude_none=True),
1854+
"body": request,
18521855
}
18531856

1854-
self.log.debug(f"Creating top-level generation {event} ...")
18551857
self.task_manager.add_task(event)
18561858

18571859
except Exception as e:
@@ -1877,10 +1879,10 @@ def _generate_trace(self, trace_id: str, name: str):
18771879
event = {
18781880
"id": str(uuid.uuid4()),
18791881
"type": "trace-create",
1880-
"body": trace_body.dict(exclude_none=True),
1882+
"body": trace_body,
18811883
}
18821884

1883-
self.log.debug(f"Creating trace {event}...")
1885+
self.log.debug(f"Creating trace {_filter_io_from_event_body(trace_dict)}...")
18841886
self.task_manager.add_task(event)
18851887

18861888
def join(self):
@@ -2087,7 +2089,9 @@ def generation(
20872089
"body": new_body.dict(exclude_none=True, exclude_unset=False),
20882090
}
20892091

2090-
self.log.debug(f"Creating generation {new_body}...")
2092+
self.log.debug(
2093+
f"Creating generation {_filter_io_from_event_body(generation_body)}..."
2094+
)
20912095
self.task_manager.add_task(event)
20922096

20932097
except Exception as e:
@@ -2165,7 +2169,7 @@ def span(
21652169
**kwargs,
21662170
}
21672171

2168-
self.log.debug(f"Creating span {span_body}...")
2172+
self.log.debug(f"Creating span {_filter_io_from_event_body(span_body)}...")
21692173

21702174
new_dict = self._add_state_to_event(span_body)
21712175
new_body = self._add_default_values(new_dict)
@@ -2175,7 +2179,7 @@ def span(
21752179
event = {
21762180
"id": str(uuid.uuid4()),
21772181
"type": "span-create",
2178-
"body": event.dict(exclude_none=True),
2182+
"body": event,
21792183
}
21802184

21812185
self.task_manager.add_task(event)
@@ -2284,7 +2288,7 @@ def score(
22842288
event = {
22852289
"id": str(uuid.uuid4()),
22862290
"type": "score-create",
2287-
"body": request.dict(exclude_none=True),
2291+
"body": request,
22882292
}
22892293

22902294
self.task_manager.add_task(event)
@@ -2369,10 +2373,12 @@ def event(
23692373
event = {
23702374
"id": str(uuid.uuid4()),
23712375
"type": "event-create",
2372-
"body": request.dict(exclude_none=True),
2376+
"body": request,
23732377
}
23742378

2375-
self.log.debug(f"Creating event {event}...")
2379+
self.log.debug(
2380+
f"Creating event {_filter_io_from_event_body(event_body)}..."
2381+
)
23762382
self.task_manager.add_task(event)
23772383

23782384
except Exception as e:
@@ -2497,7 +2503,9 @@ def update(
24972503
**kwargs,
24982504
}
24992505

2500-
self.log.debug(f"Update generation {generation_body}...")
2506+
self.log.debug(
2507+
f"Update generation {_filter_io_from_event_body(generation_body)}..."
2508+
)
25012509

25022510
request = UpdateGenerationBody(**generation_body)
25032511

@@ -2507,7 +2515,9 @@ def update(
25072515
"body": request.dict(exclude_none=True, exclude_unset=False),
25082516
}
25092517

2510-
self.log.debug(f"Update generation {event}...")
2518+
self.log.debug(
2519+
f"Update generation {_filter_io_from_event_body(generation_body)}..."
2520+
)
25112521
self.task_manager.add_task(event)
25122522

25132523
except Exception as e:
@@ -2684,14 +2694,14 @@ def update(
26842694
"end_time": end_time,
26852695
**kwargs,
26862696
}
2687-
self.log.debug(f"Update span {span_body}...")
2697+
self.log.debug(f"Update span {_filter_io_from_event_body(span_body)}...")
26882698

26892699
request = UpdateSpanBody(**span_body)
26902700

26912701
event = {
26922702
"id": str(uuid.uuid4()),
26932703
"type": "span-update",
2694-
"body": request.dict(exclude_none=True),
2704+
"body": request,
26952705
}
26962706

26972707
self.task_manager.add_task(event)
@@ -2888,14 +2898,14 @@ def update(
28882898
"tags": tags,
28892899
**kwargs,
28902900
}
2891-
self.log.debug(f"Update trace {trace_body}...")
2901+
self.log.debug(f"Update trace {_filter_io_from_event_body(trace_body)}...")
28922902

28932903
request = TraceBody(**trace_body)
28942904

28952905
event = {
28962906
"id": str(uuid.uuid4()),
28972907
"type": "trace-create",
2898-
"body": request.dict(exclude_none=True),
2908+
"body": request,
28992909
}
29002910

29012911
self.task_manager.add_task(event)
@@ -3350,3 +3360,9 @@ def __init__(self, dataset: Dataset, items: typing.List[DatasetItemClient]):
33503360
self.created_at = dataset.created_at
33513361
self.updated_at = dataset.updated_at
33523362
self.items = items
3363+
3364+
3365+
def _filter_io_from_event_body(event_body: Dict[str, Any]):
3366+
return {
3367+
k: v for k, v in event_body.items() if k not in ("input", "output", "metadata")
3368+
}

langfuse/task_manager.py

Lines changed: 48 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ class Consumer(threading.Thread):
5858
_sdk_name: str
5959
_sdk_version: str
6060
_sdk_integration: str
61+
_mask: Optional[MaskFunction]
62+
_sampler: Sampler
6163

6264
def __init__(
6365
self,
@@ -71,6 +73,8 @@ def __init__(
7173
sdk_name: str,
7274
sdk_version: str,
7375
sdk_integration: str,
76+
sample_rate: float,
77+
mask: Optional[MaskFunction] = None,
7478
):
7579
"""Create a consumer thread."""
7680
threading.Thread.__init__(self)
@@ -91,6 +95,8 @@ def __init__(
9195
self._sdk_name = sdk_name
9296
self._sdk_version = sdk_version
9397
self._sdk_integration = sdk_integration
98+
self._mask = mask
99+
self._sampler = Sampler(sample_rate)
94100

95101
def _next(self):
96102
"""Return the next batch of items to upload."""
@@ -107,13 +113,37 @@ def _next(self):
107113
try:
108114
item = queue.get(block=True, timeout=self._flush_interval - elapsed)
109115

116+
# convert pydantic models to dicts
117+
if "body" in item and isinstance(item["body"], pydantic.BaseModel):
118+
item["body"] = item["body"].dict(exclude_none=True)
119+
120+
# sample event
121+
if not self._sampler.sample_event(item):
122+
queue.task_done()
123+
124+
continue
125+
126+
# truncate item if it exceeds size limit
110127
item_size = self._truncate_item_in_place(
111128
item=item,
112129
max_size=MAX_MSG_SIZE,
113130
log_message="<truncated due to size exceeding limit>",
114131
)
115132

133+
# apply mask
134+
self._apply_mask_in_place(item)
135+
136+
# check for serialization errors
137+
try:
138+
json.dumps(item, cls=EventSerializer)
139+
except Exception as e:
140+
self._log.error(f"Error serializing item, skipping: {e}")
141+
queue.task_done()
142+
143+
continue
144+
116145
items.append(item)
146+
117147
total_size += item_size
118148
if total_size >= BATCH_SIZE_LIMIT:
119149
self._log.debug("hit batch size limit (size: %d)", total_size)
@@ -190,6 +220,20 @@ def _get_item_size(self, item: typing.Any) -> int:
190220
"""Return the size of the item in bytes."""
191221
return len(json.dumps(item, cls=EventSerializer).encode())
192222

223+
def _apply_mask_in_place(self, event: dict):
224+
"""Apply the mask function to the event. This is done in place."""
225+
if not self._mask:
226+
return
227+
228+
body = event["body"] if "body" in event else {}
229+
for key in ("input", "output"):
230+
if key in body:
231+
try:
232+
body[key] = self._mask(data=body[key])
233+
except Exception as e:
234+
self._log.error(f"Mask function failed with error: {e}")
235+
body[key] = "<fully masked due to failed mask function>"
236+
193237
def run(self):
194238
"""Runs the consumer."""
195239
self._log.debug("consumer is running...")
@@ -261,7 +305,7 @@ class TaskManager(object):
261305
_sdk_name: str
262306
_sdk_version: str
263307
_sdk_integration: str
264-
_sampler: Sampler
308+
_sample_rate: float
265309
_mask: Optional[MaskFunction]
266310

267311
def __init__(
@@ -293,7 +337,7 @@ def __init__(
293337
self._sdk_version = sdk_version
294338
self._sdk_integration = sdk_integration
295339
self._enabled = enabled
296-
self._sampler = Sampler(sample_rate)
340+
self._sample_rate = sample_rate
297341
self._mask = mask
298342

299343
self.init_resources()
@@ -314,6 +358,8 @@ def init_resources(self):
314358
sdk_name=self._sdk_name,
315359
sdk_version=self._sdk_version,
316360
sdk_integration=self._sdk_integration,
361+
sample_rate=self._sample_rate,
362+
mask=self._mask,
317363
)
318364
consumer.start()
319365
self._consumers.append(consumer)
@@ -323,12 +369,6 @@ def add_task(self, event: dict):
323369
return
324370

325371
try:
326-
if not self._sampler.sample_event(event):
327-
return # event was sampled out
328-
329-
self._apply_mask_in_place(event)
330-
331-
json.dumps(event, cls=EventSerializer)
332372
event["timestamp"] = _get_timestamp()
333373

334374
self._queue.put(event, block=False)
@@ -340,20 +380,6 @@ def add_task(self, event: dict):
340380

341381
return False
342382

343-
def _apply_mask_in_place(self, event: dict):
344-
"""Apply the mask function to the event. This is done in place."""
345-
if not self._mask:
346-
return
347-
348-
body = event["body"] if "body" in event else {}
349-
for key in ("input", "output"):
350-
if key in body:
351-
try:
352-
body[key] = self._mask(data=body[key])
353-
except Exception as e:
354-
self._log.error(f"Mask function failed with error: {e}")
355-
body[key] = "<fully masked due to failed mask function>"
356-
357383
def flush(self):
358384
"""Force a flush from the internal queue to the server."""
359385
self._log.debug("flushing queue")

0 commit comments

Comments
 (0)