Skip to content

Commit 2ebc4e2

Browse files
committed
fix(dramatiq): cleanup isolated scope and transaction when message is skipped
The Dramatiq middleware was missing the [`after_skip_message` hook](https://dramatiq.io/reference.html#dramatiq.Middleware.after_skip_message), which is triggered in place of [`after_process_message`](https://dramatiq.io/reference.html#dramatiq.Middleware.after_skip_message) when [`SkipMessage`](https://dramatiq.io/reference.html#dramatiq.middleware.SkipMessage) is raised. Thus, the isolated scope and transaction that are opened in [`before_process_message`](https://dramatiq.io/reference.html#dramatiq.Middleware.before_process_message) were never closed; leading to a memory leak particularly visible in a context where lot of messages are skipped. The fix is simply to assign `after_skip_message` to `after_process_message`, so the teardown logic is triggered. This pattern is used in the [Dramatiq code base](https://github.com/Bogdanp/dramatiq/blob/143aaa228521606806a87daefff2d21f88607e70/dramatiq/middleware/time_limit.py#L97).
1 parent e335ad8 commit 2ebc4e2

File tree

2 files changed

+30
-2
lines changed

2 files changed

+30
-2
lines changed

sentry_sdk/integrations/dramatiq.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,8 @@ def after_process_message(
188188
transaction.__exit__(type(exception), exception, None)
189189
scope_manager.__exit__(type(exception), exception, None)
190190

191+
after_skip_message = after_process_message
192+
191193

192194
def _make_message_event_processor(
193195
message: "Message[R]", integration: "DramatiqIntegration"

tests/integrations/dramatiq/test_dramatiq.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
import pytest
21
import uuid
32

43
import dramatiq
4+
import pytest
55
from dramatiq.brokers.stub import StubBroker
6+
from dramatiq.middleware import Middleware, SkipMessage
67

78
import sentry_sdk
8-
from sentry_sdk.tracing import TransactionSource
99
from sentry_sdk import start_transaction
1010
from sentry_sdk.consts import SPANSTATUS
1111
from sentry_sdk.integrations.dramatiq import DramatiqIntegration
1212
from sentry_sdk.integrations.logging import ignore_logger
13+
from sentry_sdk.tracing import Transaction, TransactionSource
1314

1415
ignore_logger("dramatiq.worker.WorkerThread")
1516

@@ -386,3 +387,28 @@ def dummy_actor():
386387
worker.join()
387388

388389
assert events == []
390+
391+
392+
@pytest.mark.parametrize("broker", [1.0], indirect=True)
393+
def test_that_skip_message_cleans_up_scope_and_transaction(
394+
broker, worker, capture_events
395+
):
396+
transactions: list[Transaction] = []
397+
398+
class SkipMessageMiddleware(Middleware):
399+
def before_process_message(self, broker, message):
400+
transactions.append(sentry_sdk.get_current_scope().transaction)
401+
raise SkipMessage()
402+
403+
broker.add_middleware(SkipMessageMiddleware())
404+
405+
@dramatiq.actor(max_retries=0)
406+
def skipped_actor(): ...
407+
408+
skipped_actor.send()
409+
410+
broker.join(skipped_actor.queue_name)
411+
worker.join()
412+
413+
(transaction,) = transactions
414+
assert transaction.timestamp is not None

0 commit comments

Comments
 (0)