From 6d61099967faf407f4f0e7fcf06232909f15bf33 Mon Sep 17 00:00:00 2001 From: "codspeed-hq[bot]" <117304815+codspeed-hq[bot]@users.noreply.github.com> Date: Fri, 23 Jan 2026 13:52:21 +0000 Subject: [PATCH 1/4] Add CodSpeed performance benchmarking setup - Add pytest-codspeed to dev dependencies - Create benchmarks directory with sample benchmark files - Add GitHub Actions workflow for CodSpeed integration - Add CodSpeed badge to README --- .github/workflows/codspeed.yml | 36 +++++++ README.md | 13 ++- benchmarks/__init__.py | 0 benchmarks/test_benchmark_event_handling.py | 74 +++++++++++++ benchmarks/test_benchmark_request_handling.py | 100 ++++++++++++++++++ benchmarks/test_benchmark_serialization.py | 83 +++++++++++++++ pyproject.toml | 1 + 7 files changed, 302 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/codspeed.yml create mode 100644 benchmarks/__init__.py create mode 100644 benchmarks/test_benchmark_event_handling.py create mode 100644 benchmarks/test_benchmark_request_handling.py create mode 100644 benchmarks/test_benchmark_serialization.py diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml new file mode 100644 index 0000000..33418f4 --- /dev/null +++ b/.github/workflows/codspeed.yml @@ -0,0 +1,36 @@ +name: CodSpeed + +on: + push: + branches: + - "main" + - "master" + pull_request: + workflow_dispatch: + +permissions: + contents: read + id-token: write + +jobs: + benchmarks: + name: Run benchmarks + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e ".[dev]" + + - name: Run benchmarks + uses: CodSpeedHQ/action@v4 + with: + mode: simulation + run: pytest benchmarks/ --codspeed diff --git a/README.md b/README.md index 8d122b3..f8574c5 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,9 @@ Coverage + + CodSpeed + Documentation @@ -928,16 +931,16 @@ async def process_files_stream( mediator: cqrs.StreamingRequestMediator = fastapi.Depends(streaming_mediator_factory), ) -> fastapi.responses.StreamingResponse: async def generate_sse(): - yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\n\n" + yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\\n\\n" async for result in mediator.stream(command): sse_data = { "type": "progress", "data": result.to_dict(), } - yield f"data: {json.dumps(sse_data)}\n\n" + yield f"data: {json.dumps(sse_data)}\\n\\n" - yield f"data: {json.dumps({'type': 'complete'})}\n\n" + yield f"data: {json.dumps({'type': 'complete'})}\\n\\n" return fastapi.responses.StreamingResponse( generate_sse(), @@ -950,8 +953,8 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastap ## Protobuf messaging -The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\ -Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data – +The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\\ +Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages. diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py new file mode 100644 index 0000000..a313016 --- /dev/null +++ b/benchmarks/test_benchmark_event_handling.py @@ -0,0 +1,74 @@ +"""Benchmarks for event handling performance.""" +import typing + +import di +import pytest + +import cqrs +from cqrs.events import bootstrap + + +class UserJoinedEvent(cqrs.Event): + user_id: str + meeting_id: str + + +class UserJoinedEventHandler(cqrs.EventHandler[UserJoinedEvent]): + def __init__(self): + self.processed_events: typing.List[UserJoinedEvent] = [] + + async def handle(self, event: UserJoinedEvent) -> None: + self.processed_events.append(event) + + +def events_mapper(mapper: cqrs.EventMap) -> None: + mapper.bind(UserJoinedEvent, UserJoinedEventHandler) + + +@pytest.fixture +def event_mediator(): + return bootstrap.bootstrap( + di_container=di.Container(), + events_mapper=events_mapper, + ) + + +@pytest.mark.benchmark +def test_benchmark_event_processing(benchmark, event_mediator): + """Benchmark event processing performance.""" + event = UserJoinedEvent(user_id="user_1", meeting_id="meeting_1") + + async def run(): + await event_mediator.send(event) + + benchmark(lambda: run()) + + +@pytest.mark.benchmark +def test_benchmark_multiple_events(benchmark, event_mediator): + """Benchmark processing multiple events in sequence.""" + events = [ + UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1") + for i in range(10) + ] + + async def run(): + for evt in events: + await event_mediator.send(evt) + + benchmark(lambda: run()) + + +@pytest.mark.benchmark +def test_benchmark_notification_event(benchmark): + """Benchmark notification event creation and serialization.""" + + def run(): + event = cqrs.NotificationEvent[UserJoinedEvent]( + event_name="UserJoined", + topic="test_topic", + payload=UserJoinedEvent(user_id="user_1", meeting_id="meeting_1"), + ) + return event.to_dict() + + benchmark(run) diff --git a/benchmarks/test_benchmark_request_handling.py b/benchmarks/test_benchmark_request_handling.py new file mode 100644 index 0000000..d5a11b3 --- /dev/null +++ b/benchmarks/test_benchmark_request_handling.py @@ -0,0 +1,100 @@ +"""Benchmarks for request handling performance.""" +import typing +from collections import defaultdict + +import di +import pytest + +import cqrs +from cqrs.requests import bootstrap + +STORAGE = defaultdict[str, typing.List[str]](lambda: []) + + +class JoinMeetingCommand(cqrs.Request): + user_id: str + meeting_id: str + + +class ReadMeetingQuery(cqrs.Request): + meeting_id: str + + +class ReadMeetingQueryResult(cqrs.Response): + users: list[str] + + +class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]): + @property + def events(self): + return [] + + async def handle(self, request: JoinMeetingCommand) -> None: + STORAGE[request.meeting_id].append(request.user_id) + + +class ReadMeetingQueryHandler( + cqrs.RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult], +): + @property + def events(self): + return [] + + async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult: + return ReadMeetingQueryResult(users=STORAGE[request.meeting_id]) + + +def command_mapper(mapper: cqrs.RequestMap) -> None: + mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler) + + +def query_mapper(mapper: cqrs.RequestMap) -> None: + mapper.bind(ReadMeetingQuery, ReadMeetingQueryHandler) + + +@pytest.fixture +def mediator(): + return bootstrap.bootstrap( + di_container=di.Container(), + queries_mapper=query_mapper, + commands_mapper=command_mapper, + ) + + +@pytest.mark.benchmark +def test_benchmark_command_handling(benchmark, mediator): + """Benchmark command handling performance.""" + command = JoinMeetingCommand(user_id="user_1", meeting_id="meeting_1") + + async def run(): + await mediator.send(command) + + benchmark(lambda: run()) + + +@pytest.mark.benchmark +def test_benchmark_query_handling(benchmark, mediator): + """Benchmark query handling performance.""" + # Setup: Add some data first + STORAGE["meeting_1"] = ["user_1", "user_2", "user_3"] + query = ReadMeetingQuery(meeting_id="meeting_1") + + async def run(): + return await mediator.send(query) + + benchmark(lambda: run()) + + +@pytest.mark.benchmark +def test_benchmark_multiple_commands(benchmark, mediator): + """Benchmark handling multiple commands in sequence.""" + commands = [ + JoinMeetingCommand(user_id=f"user_{i}", meeting_id="meeting_2") + for i in range(10) + ] + + async def run(): + for cmd in commands: + await mediator.send(cmd) + + benchmark(lambda: run()) diff --git a/benchmarks/test_benchmark_serialization.py b/benchmarks/test_benchmark_serialization.py new file mode 100644 index 0000000..af0fac3 --- /dev/null +++ b/benchmarks/test_benchmark_serialization.py @@ -0,0 +1,83 @@ +"""Benchmarks for serialization and deserialization performance.""" +import pytest + +import cqrs + + +class SampleRequest(cqrs.Request): + field1: str + field2: int + field3: list[str] + field4: dict[str, int] + + +class SampleResponse(cqrs.Response): + result: str + data: dict[str, str] + + +@pytest.mark.benchmark +def test_benchmark_request_to_dict(benchmark): + """Benchmark request serialization to dictionary.""" + request = SampleRequest( + field1="test_value", + field2=42, + field3=["a", "b", "c"], + field4={"key1": 1, "key2": 2}, + ) + + benchmark(lambda: request.to_dict()) + + +@pytest.mark.benchmark +def test_benchmark_request_from_dict(benchmark): + """Benchmark request deserialization from dictionary.""" + data = { + "field1": "test_value", + "field2": 42, + "field3": ["a", "b", "c"], + "field4": {"key1": 1, "key2": 2}, + } + + benchmark(lambda: SampleRequest.from_dict(**data)) + + +@pytest.mark.benchmark +def test_benchmark_response_to_dict(benchmark): + """Benchmark response serialization to dictionary.""" + response = SampleResponse( + result="success", + data={"key1": "value1", "key2": "value2"}, + ) + + benchmark(lambda: response.to_dict()) + + +@pytest.mark.benchmark +def test_benchmark_response_from_dict(benchmark): + """Benchmark response deserialization from dictionary.""" + data = { + "result": "success", + "data": {"key1": "value1", "key2": "value2"}, + } + + benchmark(lambda: SampleResponse.from_dict(**data)) + + +@pytest.mark.benchmark +def test_benchmark_complex_nested_structure(benchmark): + """Benchmark serialization of complex nested structures.""" + + class NestedRequest(cqrs.Request): + level1: dict[str, list[dict[str, str]]] + level2: list[dict[str, int]] + + request = NestedRequest( + level1={ + "group1": [{"name": "item1", "value": "val1"}] * 5, + "group2": [{"name": "item2", "value": "val2"}] * 5, + }, + level2=[{"counter": i} for i in range(10)], + ) + + benchmark(lambda: request.to_dict()) diff --git a/pyproject.toml b/pyproject.toml index a7e1d78..bd8670f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ dev = [ "ruff==0.6.2", "vermin>=1.6.0", "pytest-cov>=4.0.0", + "pytest-codspeed==4.2.0", # Tests "aio-pika==9.3.0", # from rabbit "aiokafka==0.10.0", # from kafka From 5097f9a1f596c99ceb9703262107c56b4894dacc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?= Date: Fri, 23 Jan 2026 16:57:52 +0300 Subject: [PATCH 2/4] Change Pydantic models to dc models --- benchmarks/test_benchmark_event_handling.py | 19 +++++++------- benchmarks/test_benchmark_request_handling.py | 25 +++++++++++-------- benchmarks/test_benchmark_serialization.py | 23 +++++++++-------- 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py index a313016..4f2004b 100644 --- a/benchmarks/test_benchmark_event_handling.py +++ b/benchmarks/test_benchmark_event_handling.py @@ -1,14 +1,15 @@ """Benchmarks for event handling performance.""" +import dataclasses import typing +import cqrs import di import pytest - -import cqrs from cqrs.events import bootstrap -class UserJoinedEvent(cqrs.Event): +@dataclasses.dataclass +class UserJoinedEvent(cqrs.DCEvent): user_id: str meeting_id: str @@ -37,10 +38,10 @@ def event_mediator(): def test_benchmark_event_processing(benchmark, event_mediator): """Benchmark event processing performance.""" event = UserJoinedEvent(user_id="user_1", meeting_id="meeting_1") - + async def run(): await event_mediator.send(event) - + benchmark(lambda: run()) @@ -51,18 +52,18 @@ def test_benchmark_multiple_events(benchmark, event_mediator): UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1") for i in range(10) ] - + async def run(): for evt in events: await event_mediator.send(evt) - + benchmark(lambda: run()) @pytest.mark.benchmark def test_benchmark_notification_event(benchmark): """Benchmark notification event creation and serialization.""" - + def run(): event = cqrs.NotificationEvent[UserJoinedEvent]( event_name="UserJoined", @@ -70,5 +71,5 @@ def run(): payload=UserJoinedEvent(user_id="user_1", meeting_id="meeting_1"), ) return event.to_dict() - + benchmark(run) diff --git a/benchmarks/test_benchmark_request_handling.py b/benchmarks/test_benchmark_request_handling.py index d5a11b3..73af0b3 100644 --- a/benchmarks/test_benchmark_request_handling.py +++ b/benchmarks/test_benchmark_request_handling.py @@ -1,26 +1,29 @@ """Benchmarks for request handling performance.""" +import dataclasses import typing from collections import defaultdict +import cqrs import di import pytest - -import cqrs from cqrs.requests import bootstrap STORAGE = defaultdict[str, typing.List[str]](lambda: []) -class JoinMeetingCommand(cqrs.Request): +@dataclasses.dataclass +class JoinMeetingCommand(cqrs.DCRequest): user_id: str meeting_id: str -class ReadMeetingQuery(cqrs.Request): +@dataclasses.dataclass +class ReadMeetingQuery(cqrs.DCRequest): meeting_id: str -class ReadMeetingQueryResult(cqrs.Response): +@dataclasses.dataclass +class ReadMeetingQueryResult(cqrs.DCResponse): users: list[str] @@ -65,10 +68,10 @@ def mediator(): def test_benchmark_command_handling(benchmark, mediator): """Benchmark command handling performance.""" command = JoinMeetingCommand(user_id="user_1", meeting_id="meeting_1") - + async def run(): await mediator.send(command) - + benchmark(lambda: run()) @@ -78,10 +81,10 @@ def test_benchmark_query_handling(benchmark, mediator): # Setup: Add some data first STORAGE["meeting_1"] = ["user_1", "user_2", "user_3"] query = ReadMeetingQuery(meeting_id="meeting_1") - + async def run(): return await mediator.send(query) - + benchmark(lambda: run()) @@ -92,9 +95,9 @@ def test_benchmark_multiple_commands(benchmark, mediator): JoinMeetingCommand(user_id=f"user_{i}", meeting_id="meeting_2") for i in range(10) ] - + async def run(): for cmd in commands: await mediator.send(cmd) - + benchmark(lambda: run()) diff --git a/benchmarks/test_benchmark_serialization.py b/benchmarks/test_benchmark_serialization.py index af0fac3..70569f0 100644 --- a/benchmarks/test_benchmark_serialization.py +++ b/benchmarks/test_benchmark_serialization.py @@ -1,17 +1,20 @@ """Benchmarks for serialization and deserialization performance.""" -import pytest +import dataclasses import cqrs +import pytest -class SampleRequest(cqrs.Request): +@dataclasses.dataclass +class SampleRequest(cqrs.DCRequest): field1: str field2: int field3: list[str] field4: dict[str, int] -class SampleResponse(cqrs.Response): +@dataclasses.dataclass +class SampleResponse(cqrs.DCResponse): result: str data: dict[str, str] @@ -25,7 +28,7 @@ def test_benchmark_request_to_dict(benchmark): field3=["a", "b", "c"], field4={"key1": 1, "key2": 2}, ) - + benchmark(lambda: request.to_dict()) @@ -38,7 +41,7 @@ def test_benchmark_request_from_dict(benchmark): "field3": ["a", "b", "c"], "field4": {"key1": 1, "key2": 2}, } - + benchmark(lambda: SampleRequest.from_dict(**data)) @@ -49,7 +52,7 @@ def test_benchmark_response_to_dict(benchmark): result="success", data={"key1": "value1", "key2": "value2"}, ) - + benchmark(lambda: response.to_dict()) @@ -60,18 +63,18 @@ def test_benchmark_response_from_dict(benchmark): "result": "success", "data": {"key1": "value1", "key2": "value2"}, } - + benchmark(lambda: SampleResponse.from_dict(**data)) @pytest.mark.benchmark def test_benchmark_complex_nested_structure(benchmark): """Benchmark serialization of complex nested structures.""" - + class NestedRequest(cqrs.Request): level1: dict[str, list[dict[str, str]]] level2: list[dict[str, int]] - + request = NestedRequest( level1={ "group1": [{"name": "item1", "value": "val1"}] * 5, @@ -79,5 +82,5 @@ class NestedRequest(cqrs.Request): }, level2=[{"counter": i} for i in range(10)], ) - + benchmark(lambda: request.to_dict()) From 9f5aecdc07983a2301f1ecbeb0e5c46529cd3206 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?= Date: Fri, 23 Jan 2026 16:58:06 +0300 Subject: [PATCH 3/4] Change Pydantic models to dc models --- benchmarks/test_benchmark_event_handling.py | 4 ++-- benchmarks/test_benchmark_request_handling.py | 1 + benchmarks/test_benchmark_serialization.py | 1 + 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py index 4f2004b..22df238 100644 --- a/benchmarks/test_benchmark_event_handling.py +++ b/benchmarks/test_benchmark_event_handling.py @@ -1,4 +1,5 @@ """Benchmarks for event handling performance.""" + import dataclasses import typing @@ -49,8 +50,7 @@ async def run(): def test_benchmark_multiple_events(benchmark, event_mediator): """Benchmark processing multiple events in sequence.""" events = [ - UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1") - for i in range(10) + UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1") for i in range(10) ] async def run(): diff --git a/benchmarks/test_benchmark_request_handling.py b/benchmarks/test_benchmark_request_handling.py index 73af0b3..ea974a2 100644 --- a/benchmarks/test_benchmark_request_handling.py +++ b/benchmarks/test_benchmark_request_handling.py @@ -1,4 +1,5 @@ """Benchmarks for request handling performance.""" + import dataclasses import typing from collections import defaultdict diff --git a/benchmarks/test_benchmark_serialization.py b/benchmarks/test_benchmark_serialization.py index 70569f0..25418ee 100644 --- a/benchmarks/test_benchmark_serialization.py +++ b/benchmarks/test_benchmark_serialization.py @@ -1,4 +1,5 @@ """Benchmarks for serialization and deserialization performance.""" + import dataclasses import cqrs From 3dea1fd4d6f716bcff7eff8bfde28fee4f3aeb44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?= Date: Fri, 23 Jan 2026 16:58:35 +0300 Subject: [PATCH 4/4] Change Pydantic models to dc models --- benchmarks/test_benchmark_event_handling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py index 22df238..226fab8 100644 --- a/benchmarks/test_benchmark_event_handling.py +++ b/benchmarks/test_benchmark_event_handling.py @@ -9,7 +9,7 @@ from cqrs.events import bootstrap -@dataclasses.dataclass +@dataclasses.dataclass(frozen=True) class UserJoinedEvent(cqrs.DCEvent): user_id: str meeting_id: str