From 6d61099967faf407f4f0e7fcf06232909f15bf33 Mon Sep 17 00:00:00 2001
From: "codspeed-hq[bot]" <117304815+codspeed-hq[bot]@users.noreply.github.com>
Date: Fri, 23 Jan 2026 13:52:21 +0000
Subject: [PATCH 1/4] Add CodSpeed performance benchmarking setup
- Add pytest-codspeed to dev dependencies
- Create benchmarks directory with sample benchmark files
- Add GitHub Actions workflow for CodSpeed integration
- Add CodSpeed badge to README
---
.github/workflows/codspeed.yml | 36 +++++++
README.md | 13 ++-
benchmarks/__init__.py | 0
benchmarks/test_benchmark_event_handling.py | 74 +++++++++++++
benchmarks/test_benchmark_request_handling.py | 100 ++++++++++++++++++
benchmarks/test_benchmark_serialization.py | 83 +++++++++++++++
pyproject.toml | 1 +
7 files changed, 302 insertions(+), 5 deletions(-)
create mode 100644 .github/workflows/codspeed.yml
create mode 100644 benchmarks/__init__.py
create mode 100644 benchmarks/test_benchmark_event_handling.py
create mode 100644 benchmarks/test_benchmark_request_handling.py
create mode 100644 benchmarks/test_benchmark_serialization.py
diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml
new file mode 100644
index 0000000..33418f4
--- /dev/null
+++ b/.github/workflows/codspeed.yml
@@ -0,0 +1,36 @@
+name: CodSpeed
+
+on:
+ push:
+ branches:
+ - "main"
+ - "master"
+ pull_request:
+ workflow_dispatch:
+
+permissions:
+ contents: read
+ id-token: write
+
+jobs:
+ benchmarks:
+ name: Run benchmarks
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install -e ".[dev]"
+
+ - name: Run benchmarks
+ uses: CodSpeedHQ/action@v4
+ with:
+ mode: simulation
+ run: pytest benchmarks/ --codspeed
diff --git a/README.md b/README.md
index 8d122b3..f8574c5 100644
--- a/README.md
+++ b/README.md
@@ -24,6 +24,9 @@
+
+
+
@@ -928,16 +931,16 @@ async def process_files_stream(
mediator: cqrs.StreamingRequestMediator = fastapi.Depends(streaming_mediator_factory),
) -> fastapi.responses.StreamingResponse:
async def generate_sse():
- yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\n\n"
+ yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\\n\\n"
async for result in mediator.stream(command):
sse_data = {
"type": "progress",
"data": result.to_dict(),
}
- yield f"data: {json.dumps(sse_data)}\n\n"
+ yield f"data: {json.dumps(sse_data)}\\n\\n"
- yield f"data: {json.dumps({'type': 'complete'})}\n\n"
+ yield f"data: {json.dumps({'type': 'complete'})}\\n\\n"
return fastapi.responses.StreamingResponse(
generate_sse(),
@@ -950,8 +953,8 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastap
## Protobuf messaging
-The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\
-Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data –
+The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\\
+Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data –
think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use
special generated source code to easily write and read your structured data to and from a variety of data streams and
using a variety of languages.
diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py
new file mode 100644
index 0000000..a313016
--- /dev/null
+++ b/benchmarks/test_benchmark_event_handling.py
@@ -0,0 +1,74 @@
+"""Benchmarks for event handling performance."""
+import typing
+
+import di
+import pytest
+
+import cqrs
+from cqrs.events import bootstrap
+
+
+class UserJoinedEvent(cqrs.Event):
+ user_id: str
+ meeting_id: str
+
+
+class UserJoinedEventHandler(cqrs.EventHandler[UserJoinedEvent]):
+ def __init__(self):
+ self.processed_events: typing.List[UserJoinedEvent] = []
+
+ async def handle(self, event: UserJoinedEvent) -> None:
+ self.processed_events.append(event)
+
+
+def events_mapper(mapper: cqrs.EventMap) -> None:
+ mapper.bind(UserJoinedEvent, UserJoinedEventHandler)
+
+
+@pytest.fixture
+def event_mediator():
+ return bootstrap.bootstrap(
+ di_container=di.Container(),
+ events_mapper=events_mapper,
+ )
+
+
+@pytest.mark.benchmark
+def test_benchmark_event_processing(benchmark, event_mediator):
+ """Benchmark event processing performance."""
+ event = UserJoinedEvent(user_id="user_1", meeting_id="meeting_1")
+
+ async def run():
+ await event_mediator.send(event)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_multiple_events(benchmark, event_mediator):
+ """Benchmark processing multiple events in sequence."""
+ events = [
+ UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1")
+ for i in range(10)
+ ]
+
+ async def run():
+ for evt in events:
+ await event_mediator.send(evt)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_notification_event(benchmark):
+ """Benchmark notification event creation and serialization."""
+
+ def run():
+ event = cqrs.NotificationEvent[UserJoinedEvent](
+ event_name="UserJoined",
+ topic="test_topic",
+ payload=UserJoinedEvent(user_id="user_1", meeting_id="meeting_1"),
+ )
+ return event.to_dict()
+
+ benchmark(run)
diff --git a/benchmarks/test_benchmark_request_handling.py b/benchmarks/test_benchmark_request_handling.py
new file mode 100644
index 0000000..d5a11b3
--- /dev/null
+++ b/benchmarks/test_benchmark_request_handling.py
@@ -0,0 +1,100 @@
+"""Benchmarks for request handling performance."""
+import typing
+from collections import defaultdict
+
+import di
+import pytest
+
+import cqrs
+from cqrs.requests import bootstrap
+
+STORAGE = defaultdict[str, typing.List[str]](lambda: [])
+
+
+class JoinMeetingCommand(cqrs.Request):
+ user_id: str
+ meeting_id: str
+
+
+class ReadMeetingQuery(cqrs.Request):
+ meeting_id: str
+
+
+class ReadMeetingQueryResult(cqrs.Response):
+ users: list[str]
+
+
+class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
+ @property
+ def events(self):
+ return []
+
+ async def handle(self, request: JoinMeetingCommand) -> None:
+ STORAGE[request.meeting_id].append(request.user_id)
+
+
+class ReadMeetingQueryHandler(
+ cqrs.RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult],
+):
+ @property
+ def events(self):
+ return []
+
+ async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
+ return ReadMeetingQueryResult(users=STORAGE[request.meeting_id])
+
+
+def command_mapper(mapper: cqrs.RequestMap) -> None:
+ mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
+
+
+def query_mapper(mapper: cqrs.RequestMap) -> None:
+ mapper.bind(ReadMeetingQuery, ReadMeetingQueryHandler)
+
+
+@pytest.fixture
+def mediator():
+ return bootstrap.bootstrap(
+ di_container=di.Container(),
+ queries_mapper=query_mapper,
+ commands_mapper=command_mapper,
+ )
+
+
+@pytest.mark.benchmark
+def test_benchmark_command_handling(benchmark, mediator):
+ """Benchmark command handling performance."""
+ command = JoinMeetingCommand(user_id="user_1", meeting_id="meeting_1")
+
+ async def run():
+ await mediator.send(command)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_query_handling(benchmark, mediator):
+ """Benchmark query handling performance."""
+ # Setup: Add some data first
+ STORAGE["meeting_1"] = ["user_1", "user_2", "user_3"]
+ query = ReadMeetingQuery(meeting_id="meeting_1")
+
+ async def run():
+ return await mediator.send(query)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_multiple_commands(benchmark, mediator):
+ """Benchmark handling multiple commands in sequence."""
+ commands = [
+ JoinMeetingCommand(user_id=f"user_{i}", meeting_id="meeting_2")
+ for i in range(10)
+ ]
+
+ async def run():
+ for cmd in commands:
+ await mediator.send(cmd)
+
+ benchmark(lambda: run())
diff --git a/benchmarks/test_benchmark_serialization.py b/benchmarks/test_benchmark_serialization.py
new file mode 100644
index 0000000..af0fac3
--- /dev/null
+++ b/benchmarks/test_benchmark_serialization.py
@@ -0,0 +1,83 @@
+"""Benchmarks for serialization and deserialization performance."""
+import pytest
+
+import cqrs
+
+
+class SampleRequest(cqrs.Request):
+ field1: str
+ field2: int
+ field3: list[str]
+ field4: dict[str, int]
+
+
+class SampleResponse(cqrs.Response):
+ result: str
+ data: dict[str, str]
+
+
+@pytest.mark.benchmark
+def test_benchmark_request_to_dict(benchmark):
+ """Benchmark request serialization to dictionary."""
+ request = SampleRequest(
+ field1="test_value",
+ field2=42,
+ field3=["a", "b", "c"],
+ field4={"key1": 1, "key2": 2},
+ )
+
+ benchmark(lambda: request.to_dict())
+
+
+@pytest.mark.benchmark
+def test_benchmark_request_from_dict(benchmark):
+ """Benchmark request deserialization from dictionary."""
+ data = {
+ "field1": "test_value",
+ "field2": 42,
+ "field3": ["a", "b", "c"],
+ "field4": {"key1": 1, "key2": 2},
+ }
+
+ benchmark(lambda: SampleRequest.from_dict(**data))
+
+
+@pytest.mark.benchmark
+def test_benchmark_response_to_dict(benchmark):
+ """Benchmark response serialization to dictionary."""
+ response = SampleResponse(
+ result="success",
+ data={"key1": "value1", "key2": "value2"},
+ )
+
+ benchmark(lambda: response.to_dict())
+
+
+@pytest.mark.benchmark
+def test_benchmark_response_from_dict(benchmark):
+ """Benchmark response deserialization from dictionary."""
+ data = {
+ "result": "success",
+ "data": {"key1": "value1", "key2": "value2"},
+ }
+
+ benchmark(lambda: SampleResponse.from_dict(**data))
+
+
+@pytest.mark.benchmark
+def test_benchmark_complex_nested_structure(benchmark):
+ """Benchmark serialization of complex nested structures."""
+
+ class NestedRequest(cqrs.Request):
+ level1: dict[str, list[dict[str, str]]]
+ level2: list[dict[str, int]]
+
+ request = NestedRequest(
+ level1={
+ "group1": [{"name": "item1", "value": "val1"}] * 5,
+ "group2": [{"name": "item2", "value": "val2"}] * 5,
+ },
+ level2=[{"counter": i} for i in range(10)],
+ )
+
+ benchmark(lambda: request.to_dict())
diff --git a/pyproject.toml b/pyproject.toml
index a7e1d78..bd8670f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -42,6 +42,7 @@ dev = [
"ruff==0.6.2",
"vermin>=1.6.0",
"pytest-cov>=4.0.0",
+ "pytest-codspeed==4.2.0",
# Tests
"aio-pika==9.3.0", # from rabbit
"aiokafka==0.10.0", # from kafka
From 5097f9a1f596c99ceb9703262107c56b4894dacc Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?=
=?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?=
Date: Fri, 23 Jan 2026 16:57:52 +0300
Subject: [PATCH 2/4] Change Pydantic models to dc models
---
benchmarks/test_benchmark_event_handling.py | 19 +++++++-------
benchmarks/test_benchmark_request_handling.py | 25 +++++++++++--------
benchmarks/test_benchmark_serialization.py | 23 +++++++++--------
3 files changed, 37 insertions(+), 30 deletions(-)
diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py
index a313016..4f2004b 100644
--- a/benchmarks/test_benchmark_event_handling.py
+++ b/benchmarks/test_benchmark_event_handling.py
@@ -1,14 +1,15 @@
"""Benchmarks for event handling performance."""
+import dataclasses
import typing
+import cqrs
import di
import pytest
-
-import cqrs
from cqrs.events import bootstrap
-class UserJoinedEvent(cqrs.Event):
+@dataclasses.dataclass
+class UserJoinedEvent(cqrs.DCEvent):
user_id: str
meeting_id: str
@@ -37,10 +38,10 @@ def event_mediator():
def test_benchmark_event_processing(benchmark, event_mediator):
"""Benchmark event processing performance."""
event = UserJoinedEvent(user_id="user_1", meeting_id="meeting_1")
-
+
async def run():
await event_mediator.send(event)
-
+
benchmark(lambda: run())
@@ -51,18 +52,18 @@ def test_benchmark_multiple_events(benchmark, event_mediator):
UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1")
for i in range(10)
]
-
+
async def run():
for evt in events:
await event_mediator.send(evt)
-
+
benchmark(lambda: run())
@pytest.mark.benchmark
def test_benchmark_notification_event(benchmark):
"""Benchmark notification event creation and serialization."""
-
+
def run():
event = cqrs.NotificationEvent[UserJoinedEvent](
event_name="UserJoined",
@@ -70,5 +71,5 @@ def run():
payload=UserJoinedEvent(user_id="user_1", meeting_id="meeting_1"),
)
return event.to_dict()
-
+
benchmark(run)
diff --git a/benchmarks/test_benchmark_request_handling.py b/benchmarks/test_benchmark_request_handling.py
index d5a11b3..73af0b3 100644
--- a/benchmarks/test_benchmark_request_handling.py
+++ b/benchmarks/test_benchmark_request_handling.py
@@ -1,26 +1,29 @@
"""Benchmarks for request handling performance."""
+import dataclasses
import typing
from collections import defaultdict
+import cqrs
import di
import pytest
-
-import cqrs
from cqrs.requests import bootstrap
STORAGE = defaultdict[str, typing.List[str]](lambda: [])
-class JoinMeetingCommand(cqrs.Request):
+@dataclasses.dataclass
+class JoinMeetingCommand(cqrs.DCRequest):
user_id: str
meeting_id: str
-class ReadMeetingQuery(cqrs.Request):
+@dataclasses.dataclass
+class ReadMeetingQuery(cqrs.DCRequest):
meeting_id: str
-class ReadMeetingQueryResult(cqrs.Response):
+@dataclasses.dataclass
+class ReadMeetingQueryResult(cqrs.DCResponse):
users: list[str]
@@ -65,10 +68,10 @@ def mediator():
def test_benchmark_command_handling(benchmark, mediator):
"""Benchmark command handling performance."""
command = JoinMeetingCommand(user_id="user_1", meeting_id="meeting_1")
-
+
async def run():
await mediator.send(command)
-
+
benchmark(lambda: run())
@@ -78,10 +81,10 @@ def test_benchmark_query_handling(benchmark, mediator):
# Setup: Add some data first
STORAGE["meeting_1"] = ["user_1", "user_2", "user_3"]
query = ReadMeetingQuery(meeting_id="meeting_1")
-
+
async def run():
return await mediator.send(query)
-
+
benchmark(lambda: run())
@@ -92,9 +95,9 @@ def test_benchmark_multiple_commands(benchmark, mediator):
JoinMeetingCommand(user_id=f"user_{i}", meeting_id="meeting_2")
for i in range(10)
]
-
+
async def run():
for cmd in commands:
await mediator.send(cmd)
-
+
benchmark(lambda: run())
diff --git a/benchmarks/test_benchmark_serialization.py b/benchmarks/test_benchmark_serialization.py
index af0fac3..70569f0 100644
--- a/benchmarks/test_benchmark_serialization.py
+++ b/benchmarks/test_benchmark_serialization.py
@@ -1,17 +1,20 @@
"""Benchmarks for serialization and deserialization performance."""
-import pytest
+import dataclasses
import cqrs
+import pytest
-class SampleRequest(cqrs.Request):
+@dataclasses.dataclass
+class SampleRequest(cqrs.DCRequest):
field1: str
field2: int
field3: list[str]
field4: dict[str, int]
-class SampleResponse(cqrs.Response):
+@dataclasses.dataclass
+class SampleResponse(cqrs.DCResponse):
result: str
data: dict[str, str]
@@ -25,7 +28,7 @@ def test_benchmark_request_to_dict(benchmark):
field3=["a", "b", "c"],
field4={"key1": 1, "key2": 2},
)
-
+
benchmark(lambda: request.to_dict())
@@ -38,7 +41,7 @@ def test_benchmark_request_from_dict(benchmark):
"field3": ["a", "b", "c"],
"field4": {"key1": 1, "key2": 2},
}
-
+
benchmark(lambda: SampleRequest.from_dict(**data))
@@ -49,7 +52,7 @@ def test_benchmark_response_to_dict(benchmark):
result="success",
data={"key1": "value1", "key2": "value2"},
)
-
+
benchmark(lambda: response.to_dict())
@@ -60,18 +63,18 @@ def test_benchmark_response_from_dict(benchmark):
"result": "success",
"data": {"key1": "value1", "key2": "value2"},
}
-
+
benchmark(lambda: SampleResponse.from_dict(**data))
@pytest.mark.benchmark
def test_benchmark_complex_nested_structure(benchmark):
"""Benchmark serialization of complex nested structures."""
-
+
class NestedRequest(cqrs.Request):
level1: dict[str, list[dict[str, str]]]
level2: list[dict[str, int]]
-
+
request = NestedRequest(
level1={
"group1": [{"name": "item1", "value": "val1"}] * 5,
@@ -79,5 +82,5 @@ class NestedRequest(cqrs.Request):
},
level2=[{"counter": i} for i in range(10)],
)
-
+
benchmark(lambda: request.to_dict())
From 9f5aecdc07983a2301f1ecbeb0e5c46529cd3206 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?=
=?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?=
Date: Fri, 23 Jan 2026 16:58:06 +0300
Subject: [PATCH 3/4] Change Pydantic models to dc models
---
benchmarks/test_benchmark_event_handling.py | 4 ++--
benchmarks/test_benchmark_request_handling.py | 1 +
benchmarks/test_benchmark_serialization.py | 1 +
3 files changed, 4 insertions(+), 2 deletions(-)
diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py
index 4f2004b..22df238 100644
--- a/benchmarks/test_benchmark_event_handling.py
+++ b/benchmarks/test_benchmark_event_handling.py
@@ -1,4 +1,5 @@
"""Benchmarks for event handling performance."""
+
import dataclasses
import typing
@@ -49,8 +50,7 @@ async def run():
def test_benchmark_multiple_events(benchmark, event_mediator):
"""Benchmark processing multiple events in sequence."""
events = [
- UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1")
- for i in range(10)
+ UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1") for i in range(10)
]
async def run():
diff --git a/benchmarks/test_benchmark_request_handling.py b/benchmarks/test_benchmark_request_handling.py
index 73af0b3..ea974a2 100644
--- a/benchmarks/test_benchmark_request_handling.py
+++ b/benchmarks/test_benchmark_request_handling.py
@@ -1,4 +1,5 @@
"""Benchmarks for request handling performance."""
+
import dataclasses
import typing
from collections import defaultdict
diff --git a/benchmarks/test_benchmark_serialization.py b/benchmarks/test_benchmark_serialization.py
index 70569f0..25418ee 100644
--- a/benchmarks/test_benchmark_serialization.py
+++ b/benchmarks/test_benchmark_serialization.py
@@ -1,4 +1,5 @@
"""Benchmarks for serialization and deserialization performance."""
+
import dataclasses
import cqrs
From 3dea1fd4d6f716bcff7eff8bfde28fee4f3aeb44 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?=
=?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?=
Date: Fri, 23 Jan 2026 16:58:35 +0300
Subject: [PATCH 4/4] Change Pydantic models to dc models
---
benchmarks/test_benchmark_event_handling.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py
index 22df238..226fab8 100644
--- a/benchmarks/test_benchmark_event_handling.py
+++ b/benchmarks/test_benchmark_event_handling.py
@@ -9,7 +9,7 @@
from cqrs.events import bootstrap
-@dataclasses.dataclass
+@dataclasses.dataclass(frozen=True)
class UserJoinedEvent(cqrs.DCEvent):
user_id: str
meeting_id: str