diff --git a/.github/workflows/codspeed.yml b/.github/workflows/codspeed.yml
new file mode 100644
index 0000000..33418f4
--- /dev/null
+++ b/.github/workflows/codspeed.yml
@@ -0,0 +1,36 @@
+name: CodSpeed
+
+on:
+ push:
+ branches:
+ - "main"
+ - "master"
+ pull_request:
+ workflow_dispatch:
+
+permissions:
+ contents: read
+ id-token: write
+
+jobs:
+ benchmarks:
+ name: Run benchmarks
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python 3.12
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.12"
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+ pip install -e ".[dev]"
+
+ - name: Run benchmarks
+ uses: CodSpeedHQ/action@v4
+ with:
+ mode: simulation
+ run: pytest benchmarks/ --codspeed
diff --git a/README.md b/README.md
index 8d122b3..f8574c5 100644
--- a/README.md
+++ b/README.md
@@ -24,6 +24,9 @@
+
+
+
@@ -928,16 +931,16 @@ async def process_files_stream(
mediator: cqrs.StreamingRequestMediator = fastapi.Depends(streaming_mediator_factory),
) -> fastapi.responses.StreamingResponse:
async def generate_sse():
- yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\n\n"
+ yield f"data: {json.dumps({'type': 'start', 'message': 'Processing...'})}\\n\\n"
async for result in mediator.stream(command):
sse_data = {
"type": "progress",
"data": result.to_dict(),
}
- yield f"data: {json.dumps(sse_data)}\n\n"
+ yield f"data: {json.dumps(sse_data)}\\n\\n"
- yield f"data: {json.dumps({'type': 'complete'})}\n\n"
+ yield f"data: {json.dumps({'type': 'complete'})}\\n\\n"
return fastapi.responses.StreamingResponse(
generate_sse(),
@@ -950,8 +953,8 @@ the [documentation](https://github.com/vadikko2/cqrs/blob/master/examples/fastap
## Protobuf messaging
-The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\
-Protocol buffers are Google’s language-neutral, platform-neutral, extensible mechanism for serializing structured data –
+The `python-cqrs` package supports integration with [protobuf](https://developers.google.com/protocol-buffers/).\\
+Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data –
think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use
special generated source code to easily write and read your structured data to and from a variety of data streams and
using a variety of languages.
diff --git a/benchmarks/__init__.py b/benchmarks/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/benchmarks/test_benchmark_event_handling.py b/benchmarks/test_benchmark_event_handling.py
new file mode 100644
index 0000000..226fab8
--- /dev/null
+++ b/benchmarks/test_benchmark_event_handling.py
@@ -0,0 +1,75 @@
+"""Benchmarks for event handling performance."""
+
+import dataclasses
+import typing
+
+import cqrs
+import di
+import pytest
+from cqrs.events import bootstrap
+
+
+@dataclasses.dataclass(frozen=True)
+class UserJoinedEvent(cqrs.DCEvent):
+ user_id: str
+ meeting_id: str
+
+
+class UserJoinedEventHandler(cqrs.EventHandler[UserJoinedEvent]):
+ def __init__(self):
+ self.processed_events: typing.List[UserJoinedEvent] = []
+
+ async def handle(self, event: UserJoinedEvent) -> None:
+ self.processed_events.append(event)
+
+
+def events_mapper(mapper: cqrs.EventMap) -> None:
+ mapper.bind(UserJoinedEvent, UserJoinedEventHandler)
+
+
+@pytest.fixture
+def event_mediator():
+ return bootstrap.bootstrap(
+ di_container=di.Container(),
+ events_mapper=events_mapper,
+ )
+
+
+@pytest.mark.benchmark
+def test_benchmark_event_processing(benchmark, event_mediator):
+ """Benchmark event processing performance."""
+ event = UserJoinedEvent(user_id="user_1", meeting_id="meeting_1")
+
+ async def run():
+ await event_mediator.send(event)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_multiple_events(benchmark, event_mediator):
+ """Benchmark processing multiple events in sequence."""
+ events = [
+ UserJoinedEvent(user_id=f"user_{i}", meeting_id="meeting_1") for i in range(10)
+ ]
+
+ async def run():
+ for evt in events:
+ await event_mediator.send(evt)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_notification_event(benchmark):
+ """Benchmark notification event creation and serialization."""
+
+ def run():
+ event = cqrs.NotificationEvent[UserJoinedEvent](
+ event_name="UserJoined",
+ topic="test_topic",
+ payload=UserJoinedEvent(user_id="user_1", meeting_id="meeting_1"),
+ )
+ return event.to_dict()
+
+ benchmark(run)
diff --git a/benchmarks/test_benchmark_request_handling.py b/benchmarks/test_benchmark_request_handling.py
new file mode 100644
index 0000000..ea974a2
--- /dev/null
+++ b/benchmarks/test_benchmark_request_handling.py
@@ -0,0 +1,104 @@
+"""Benchmarks for request handling performance."""
+
+import dataclasses
+import typing
+from collections import defaultdict
+
+import cqrs
+import di
+import pytest
+from cqrs.requests import bootstrap
+
+STORAGE = defaultdict[str, typing.List[str]](lambda: [])
+
+
+@dataclasses.dataclass
+class JoinMeetingCommand(cqrs.DCRequest):
+ user_id: str
+ meeting_id: str
+
+
+@dataclasses.dataclass
+class ReadMeetingQuery(cqrs.DCRequest):
+ meeting_id: str
+
+
+@dataclasses.dataclass
+class ReadMeetingQueryResult(cqrs.DCResponse):
+ users: list[str]
+
+
+class JoinMeetingCommandHandler(cqrs.RequestHandler[JoinMeetingCommand, None]):
+ @property
+ def events(self):
+ return []
+
+ async def handle(self, request: JoinMeetingCommand) -> None:
+ STORAGE[request.meeting_id].append(request.user_id)
+
+
+class ReadMeetingQueryHandler(
+ cqrs.RequestHandler[ReadMeetingQuery, ReadMeetingQueryResult],
+):
+ @property
+ def events(self):
+ return []
+
+ async def handle(self, request: ReadMeetingQuery) -> ReadMeetingQueryResult:
+ return ReadMeetingQueryResult(users=STORAGE[request.meeting_id])
+
+
+def command_mapper(mapper: cqrs.RequestMap) -> None:
+ mapper.bind(JoinMeetingCommand, JoinMeetingCommandHandler)
+
+
+def query_mapper(mapper: cqrs.RequestMap) -> None:
+ mapper.bind(ReadMeetingQuery, ReadMeetingQueryHandler)
+
+
+@pytest.fixture
+def mediator():
+ return bootstrap.bootstrap(
+ di_container=di.Container(),
+ queries_mapper=query_mapper,
+ commands_mapper=command_mapper,
+ )
+
+
+@pytest.mark.benchmark
+def test_benchmark_command_handling(benchmark, mediator):
+ """Benchmark command handling performance."""
+ command = JoinMeetingCommand(user_id="user_1", meeting_id="meeting_1")
+
+ async def run():
+ await mediator.send(command)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_query_handling(benchmark, mediator):
+ """Benchmark query handling performance."""
+ # Setup: Add some data first
+ STORAGE["meeting_1"] = ["user_1", "user_2", "user_3"]
+ query = ReadMeetingQuery(meeting_id="meeting_1")
+
+ async def run():
+ return await mediator.send(query)
+
+ benchmark(lambda: run())
+
+
+@pytest.mark.benchmark
+def test_benchmark_multiple_commands(benchmark, mediator):
+ """Benchmark handling multiple commands in sequence."""
+ commands = [
+ JoinMeetingCommand(user_id=f"user_{i}", meeting_id="meeting_2")
+ for i in range(10)
+ ]
+
+ async def run():
+ for cmd in commands:
+ await mediator.send(cmd)
+
+ benchmark(lambda: run())
diff --git a/benchmarks/test_benchmark_serialization.py b/benchmarks/test_benchmark_serialization.py
new file mode 100644
index 0000000..25418ee
--- /dev/null
+++ b/benchmarks/test_benchmark_serialization.py
@@ -0,0 +1,87 @@
+"""Benchmarks for serialization and deserialization performance."""
+
+import dataclasses
+
+import cqrs
+import pytest
+
+
+@dataclasses.dataclass
+class SampleRequest(cqrs.DCRequest):
+ field1: str
+ field2: int
+ field3: list[str]
+ field4: dict[str, int]
+
+
+@dataclasses.dataclass
+class SampleResponse(cqrs.DCResponse):
+ result: str
+ data: dict[str, str]
+
+
+@pytest.mark.benchmark
+def test_benchmark_request_to_dict(benchmark):
+ """Benchmark request serialization to dictionary."""
+ request = SampleRequest(
+ field1="test_value",
+ field2=42,
+ field3=["a", "b", "c"],
+ field4={"key1": 1, "key2": 2},
+ )
+
+ benchmark(lambda: request.to_dict())
+
+
+@pytest.mark.benchmark
+def test_benchmark_request_from_dict(benchmark):
+ """Benchmark request deserialization from dictionary."""
+ data = {
+ "field1": "test_value",
+ "field2": 42,
+ "field3": ["a", "b", "c"],
+ "field4": {"key1": 1, "key2": 2},
+ }
+
+ benchmark(lambda: SampleRequest.from_dict(**data))
+
+
+@pytest.mark.benchmark
+def test_benchmark_response_to_dict(benchmark):
+ """Benchmark response serialization to dictionary."""
+ response = SampleResponse(
+ result="success",
+ data={"key1": "value1", "key2": "value2"},
+ )
+
+ benchmark(lambda: response.to_dict())
+
+
+@pytest.mark.benchmark
+def test_benchmark_response_from_dict(benchmark):
+ """Benchmark response deserialization from dictionary."""
+ data = {
+ "result": "success",
+ "data": {"key1": "value1", "key2": "value2"},
+ }
+
+ benchmark(lambda: SampleResponse.from_dict(**data))
+
+
+@pytest.mark.benchmark
+def test_benchmark_complex_nested_structure(benchmark):
+ """Benchmark serialization of complex nested structures."""
+
+ class NestedRequest(cqrs.Request):
+ level1: dict[str, list[dict[str, str]]]
+ level2: list[dict[str, int]]
+
+ request = NestedRequest(
+ level1={
+ "group1": [{"name": "item1", "value": "val1"}] * 5,
+ "group2": [{"name": "item2", "value": "val2"}] * 5,
+ },
+ level2=[{"counter": i} for i in range(10)],
+ )
+
+ benchmark(lambda: request.to_dict())
diff --git a/pyproject.toml b/pyproject.toml
index a7e1d78..bd8670f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -42,6 +42,7 @@ dev = [
"ruff==0.6.2",
"vermin>=1.6.0",
"pytest-cov>=4.0.0",
+ "pytest-codspeed==4.2.0",
# Tests
"aio-pika==9.3.0", # from rabbit
"aiokafka==0.10.0", # from kafka