From 6267affc9c8c0311fd1ba19907efe26368f4d663 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Mon, 20 Oct 2025 19:47:51 +0000 Subject: [PATCH 01/15] Fix incorrect dynamic reply queue routing --- .../contrib/wire/amqp/producer.py | 35 ++++- src/asyncapi_python/contrib/wire/in_memory.py | 30 +++- .../kernel/endpoint/rpc_server.py | 48 +++--- src/asyncapi_python/kernel/wire/typing.py | 16 +- .../kernel/endpoint/test_batch_processing.py | 15 +- tests/kernel/endpoint/test_rpc_endpoints.py | 140 ++++++++++++++++-- 6 files changed, 233 insertions(+), 51 deletions(-) diff --git a/src/asyncapi_python/contrib/wire/amqp/producer.py b/src/asyncapi_python/contrib/wire/amqp/producer.py index 69bd29c..921e06c 100644 --- a/src/asyncapi_python/contrib/wire/amqp/producer.py +++ b/src/asyncapi_python/contrib/wire/amqp/producer.py @@ -3,10 +3,11 @@ from typing import Any try: - from aio_pika import Message as AmqpMessage, ExchangeType # type: ignore[import-not-found] + from aio_pika import ExchangeType + from aio_pika import Message as AmqpMessage # type: ignore[import-not-found] from aio_pika.abc import ( # type: ignore[import-not-found] - AbstractConnection, AbstractChannel, + AbstractConnection, AbstractExchange, ) except ImportError as e: @@ -100,11 +101,33 @@ async def stop(self) -> None: self._started = False - async def send_batch(self, messages: list[AmqpWireMessage]) -> None: - """Send a batch of messages using the configured exchange""" + async def send_batch( + self, messages: list[AmqpWireMessage], *, address_override: str | None = None + ) -> None: + """Send a batch of messages using the configured exchange + + Args: + messages: Messages to send + address_override: Optional dynamic routing key/queue to override static config. + If provided, overrides self._routing_key for this send operation. + If None, uses static routing_key from configuration/bindings. + """ if not self._started or not self._channel or not self._target_exchange: raise RuntimeError("Producer not started") + # Determine effective routing key: override takes precedence over static config + effective_routing_key = ( + address_override if address_override is not None else self._routing_key + ) + + # Validate we have a destination + # Note: empty string is valid for default exchange with default queue + if effective_routing_key is None: + raise ValueError( + f"Cannot send: no routing destination specified. " + f"address_override={address_override}, routing_key={self._routing_key}" + ) + for message in messages: amqp_message = AmqpMessage( body=message.payload, @@ -113,8 +136,8 @@ async def send_batch(self, messages: list[AmqpWireMessage]) -> None: reply_to=message.reply_to, ) - # Publish to the configured target exchange (not always default) + # Publish to the configured target exchange with dynamic or static routing key await self._target_exchange.publish( amqp_message, - routing_key=self._routing_key, + routing_key=effective_routing_key, ) diff --git a/src/asyncapi_python/contrib/wire/in_memory.py b/src/asyncapi_python/contrib/wire/in_memory.py index 90ce040..083ca33 100644 --- a/src/asyncapi_python/contrib/wire/in_memory.py +++ b/src/asyncapi_python/contrib/wire/in_memory.py @@ -4,10 +4,11 @@ from collections import defaultdict, deque from dataclasses import dataclass, field from typing import Any, AsyncGenerator + from typing_extensions import Unpack from asyncapi_python.kernel.wire import AbstractWireFactory, EndpointParams -from asyncapi_python.kernel.wire.typing import Producer, Consumer +from asyncapi_python.kernel.wire.typing import Consumer, Producer @dataclass @@ -142,13 +143,34 @@ async def stop(self) -> None: """Stop the producer""" self._started = False - async def send_batch(self, messages: list[InMemoryMessage]) -> None: - """Send a batch of messages to the channel""" + async def send_batch( + self, messages: list[InMemoryMessage], *, address_override: str | None = None + ) -> None: + """Send a batch of messages to the channel + + Args: + messages: Messages to send + address_override: Optional dynamic channel name to override static config. + If provided, overrides self._channel_name for this send operation. + If None, uses static channel_name from configuration. + """ if not self._started: raise RuntimeError("Producer not started") + # Determine effective channel: override takes precedence over static config + effective_channel = ( + address_override if address_override is not None else self._channel_name + ) + + # Validate we have a destination + if not effective_channel: + raise ValueError( + f"Cannot send: no channel specified. " + f"address_override={address_override}, channel_name={self._channel_name}" + ) + for message in messages: - await _bus.publish(self._channel_name, message) + await _bus.publish(effective_channel, message) class InMemoryConsumer(Consumer[InMemoryIncomingMessage]): diff --git a/src/asyncapi_python/kernel/endpoint/rpc_server.py b/src/asyncapi_python/kernel/endpoint/rpc_server.py index 63ab0ba..de396ea 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_server.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_server.py @@ -1,19 +1,21 @@ import asyncio -from typing import Callable, Generic, overload, Union +from typing import Callable, Generic, Union, overload + from typing_extensions import Unpack -from .abc import AbstractEndpoint, Receive, HandlerParams -from .message import WireMessage +from asyncapi_python.kernel.wire import Consumer, Producer + +from ..exceptions import Reject from ..typing import ( - T_Input, - T_Output, - Handler, - BatchHandler, BatchConfig, + BatchHandler, + Handler, IncomingMessage, + T_Input, + T_Output, ) -from ..exceptions import Reject -from asyncapi_python.kernel.wire import Consumer, Producer +from .abc import AbstractEndpoint, HandlerParams, Receive +from .message import WireMessage class RpcServer( @@ -284,8 +286,8 @@ async def _consume_requests(self) -> None: _reply_to=None, # No further reply expected ) - # Send reply - await self._send_reply(reply_message) + # Send reply to client's reply_to address (or static config if None) + await self._send_reply(reply_message, wire_message.reply_to) # Acknowledge successful processing await wire_message.ack() @@ -343,8 +345,8 @@ async def process_batch(): _reply_to=None, # No further reply expected ) - # Send reply - await self._send_reply(reply_message) + # Send reply to client's reply_to address (or static config if None) + await self._send_reply(reply_message, wire_message.reply_to) # Acknowledge successful processing await wire_message.ack() @@ -437,10 +439,22 @@ async def process_batch(): for _, wire_message in batch: await wire_message.nack() - async def _send_reply(self, reply_message: WireMessage) -> None: - """Send reply message""" + async def _send_reply( + self, reply_message: WireMessage, reply_to_address: str | None = None + ) -> None: + """Send reply message + + Args: + reply_message: The reply message to send + reply_to_address: Optional dynamic reply address (from request's reply_to field). + If None, uses producer's static configuration from bindings. + """ if not self._reply_producer: return - # Send the reply - await self._reply_producer.send_batch([reply_message]) + # Send reply with optional address override + # - If reply_to_address is provided: send to that specific queue (direct RPC reply) + # - If None: use producer's static routing from AsyncAPI spec (topic-based reply) + await self._reply_producer.send_batch( + [reply_message], address_override=reply_to_address + ) diff --git a/src/asyncapi_python/kernel/wire/typing.py b/src/asyncapi_python/kernel/wire/typing.py index 9faaff8..8bf97dd 100644 --- a/src/asyncapi_python/kernel/wire/typing.py +++ b/src/asyncapi_python/kernel/wire/typing.py @@ -1,6 +1,6 @@ from typing import AsyncGenerator, Generic, Protocol -from ..typing import T_Send, T_Recv +from ..typing import T_Recv, T_Send class EndpointLifecycle(Protocol): @@ -12,15 +12,17 @@ async def stop(self) -> None: class Producer(EndpointLifecycle, Protocol, Generic[T_Send]): - async def send_batch(self, messages: list[T_Send]) -> None: + async def send_batch( + self, + messages: list[T_Send], + *, + address_override: str | None = None, + ) -> None: """Sends batch of messages to channel""" + ... class Consumer(EndpointLifecycle, Protocol, Generic[T_Recv]): def recv(self) -> AsyncGenerator[T_Recv, None]: """Starts streaming incoming messages""" - # This is a protocol method - implementation must provide async generator - # Using NotImplemented because protocols cannot have implementations - raise NotImplementedError( - "Protocol method must be implemented by concrete class" - ) + ... diff --git a/tests/kernel/endpoint/test_batch_processing.py b/tests/kernel/endpoint/test_batch_processing.py index c980455..e714a89 100644 --- a/tests/kernel/endpoint/test_batch_processing.py +++ b/tests/kernel/endpoint/test_batch_processing.py @@ -1,16 +1,17 @@ """Unit tests for batch processing in subscriber and RPC server endpoints.""" import asyncio -import pytest -from unittest.mock import Mock, AsyncMock from typing import AsyncGenerator +from unittest.mock import AsyncMock, Mock + +import pytest -from asyncapi_python.kernel.endpoint import Subscriber, RpcServer -from asyncapi_python.kernel.document import Operation, Channel, Message -from asyncapi_python.kernel.wire import AbstractWireFactory from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.typing import BatchConfig +from asyncapi_python.kernel.document import Channel, Message, Operation +from asyncapi_python.kernel.endpoint import RpcServer, Subscriber from asyncapi_python.kernel.exceptions import Reject +from asyncapi_python.kernel.typing import BatchConfig +from asyncapi_python.kernel.wire import AbstractWireFactory class MockIncomingMessage: @@ -435,7 +436,7 @@ async def batch_handler(requests: list[dict]) -> list[dict]: assert message.is_acked # Reply producer should have been called for each request - # (send_batch called once per reply in our implementation) + # Each reply uses send_batch with address_override assert reply_producer.send_batch.call_count == 3 diff --git a/tests/kernel/endpoint/test_rpc_endpoints.py b/tests/kernel/endpoint/test_rpc_endpoints.py index d64fd98..7cfd8db 100644 --- a/tests/kernel/endpoint/test_rpc_endpoints.py +++ b/tests/kernel/endpoint/test_rpc_endpoints.py @@ -1,21 +1,22 @@ """Integration tests for RPC client and server endpoints""" import asyncio -import pytest +import json from typing import AsyncGenerator +import pytest + +from asyncapi_python.kernel.codec import Codec, CodecFactory +from asyncapi_python.kernel.document import Channel, Message, Operation, OperationReply +from asyncapi_python.kernel.endpoint.exceptions import TimeoutError, UninitializedError +from asyncapi_python.kernel.endpoint.message import WireMessage +from asyncapi_python.kernel.endpoint.publisher import Publisher from asyncapi_python.kernel.endpoint.rpc_client import RpcClient from asyncapi_python.kernel.endpoint.rpc_reply_handler import global_reply_handler from asyncapi_python.kernel.endpoint.rpc_server import RpcServer -from asyncapi_python.kernel.endpoint.publisher import Publisher from asyncapi_python.kernel.endpoint.subscriber import Subscriber -from asyncapi_python.kernel.endpoint.message import WireMessage -from asyncapi_python.kernel.endpoint.exceptions import TimeoutError, UninitializedError -from asyncapi_python.kernel.document import Operation, Channel, Message, OperationReply -from asyncapi_python.kernel.wire import AbstractWireFactory, Producer, Consumer -from asyncapi_python.kernel.codec import CodecFactory, Codec from asyncapi_python.kernel.typing import IncomingMessage -import json +from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer, Producer @pytest.fixture @@ -261,8 +262,17 @@ async def stop(self) -> None: def set_factory(self, factory: "RealisticWireFactory") -> None: self._factory = factory - async def send_batch(self, messages: list[WireMessage]) -> None: - """Send messages by routing them to the appropriate consumers""" + async def send_batch( + self, messages: list[WireMessage], *, address_override: str | None = None + ) -> None: + """Send messages by routing them to the appropriate consumers + + Args: + messages: Messages to send + address_override: Optional dynamic address override (for compatibility with protocol). + In test environment, routing is based on is_reply flag, + so this parameter is accepted but not used. + """ if not self._started or not self._factory: return @@ -310,6 +320,26 @@ async def send_batch(self, messages: list[WireMessage]) -> None: # Fallback for immediate processing await self._factory._handle_server_message(server_message) + async def send_to_queue(self, queue_name: str, messages: list[WireMessage]) -> None: + """Send messages directly to a specific queue (for RPC replies) + + This mimics the AMQP producer's send_to_queue method for testing. + In the test environment, we route directly to the reply consumer. + """ + if not self._started or not self._factory: + return + + # Route messages to the reply consumer + if self._factory._reply_consumer: + for message in messages: + reply_message = RealisticWireMessage( + message.payload, + message.headers, + message.correlation_id, + message.reply_to, + ) + await self._factory._reply_consumer.add_message(reply_message) + class RealisticWireFactory(AbstractWireFactory): """Wire factory that creates realistic consumers and producers for testing""" @@ -808,6 +838,96 @@ async def handle_event(event: RequestMessage, msg_list=subscriber_messages): await wire_factory.cleanup() +@pytest.mark.asyncio(loop_scope="function") +async def test_multi_service_rpc(mock_operation, cleanup_rpc_client): + """Test RPC communication between different services (different reply queues) + + This test verifies that the server sends replies to the client's specified + reply queue (from reply_to field), not to its own reply queue. + """ + wire_factory = RealisticWireFactory() + codec_factory = SimpleCodecFactory() + + # Create client with operation + client = RpcClient( + operation=mock_operation, + wire_factory=wire_factory, + codec_factory=codec_factory, + ) + + # Create server operation + server_operation = Operation( + action="receive", + channel=mock_operation.channel, + messages=mock_operation.messages, + reply=mock_operation.reply, + title=None, + summary=None, + description=None, + tags=[], + external_docs=None, + traits=[], + bindings=None, + key="test-key", + security=None, + ) + + server = RpcServer( + operation=server_operation, + wire_factory=wire_factory, + codec_factory=codec_factory, + ) + + # Track which reply queue was actually used + actual_reply_queue = None + original_send_to_queue = None + + if hasattr(wire_factory._reply_producer, "send_to_queue"): + original_send_to_queue = wire_factory._reply_producer.send_to_queue + + async def tracked_send_to_queue(queue_name: str, messages): + nonlocal actual_reply_queue + actual_reply_queue = queue_name + await original_send_to_queue(queue_name, messages) + + wire_factory._reply_producer.send_to_queue = tracked_send_to_queue + + # Register server handler + @server + async def handle_request(request: RequestMessage) -> ResponseMessage: + return ResponseMessage(f"Handled: {request.data}") + + # Set up wire factory for automatic replies + wire_factory.set_server_handler(handle_request) + + # Start both endpoints + await client.start() + await server.start() + + # Get the client's reply queue name before making the request + expected_reply_queue = global_reply_handler.reply_queue_name + + # Make RPC call + request = RequestMessage("Test Multi-Service") + response = await client(request) + + # Verify response is correct + assert isinstance(response, ResponseMessage) + assert response.result == "Handled: Test Multi-Service" + + # Verify reply was sent to the client's reply queue (if tracking is available) + if actual_reply_queue is not None: + assert actual_reply_queue == expected_reply_queue, ( + f"Reply was sent to wrong queue: {actual_reply_queue}, " + f"expected: {expected_reply_queue}" + ) + + # Cleanup + await client.stop() + await server.stop() + await wire_factory.cleanup() + + @pytest.mark.asyncio(loop_scope="function") async def test_enhanced_rpc_scenario(cleanup_rpc_client): """Enhanced RPC scenario with detailed request-response validation""" From 5deb29d07c78835b03e26938fa1fe785aee608e9 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Mon, 20 Oct 2025 19:48:06 +0000 Subject: [PATCH 02/15] Update lockfile --- uv.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uv.lock b/uv.lock index 3625fab..7527af8 100644 --- a/uv.lock +++ b/uv.lock @@ -64,7 +64,7 @@ wheels = [ [[package]] name = "asyncapi-python" -version = "0.3.0rc1" +version = "0.3.0rc2" source = { editable = "." } dependencies = [ { name = "pydantic" }, From 4310d8c90dd568ee0aba0e7a914da45020dbae73 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Mon, 20 Oct 2025 20:02:47 +0000 Subject: [PATCH 03/15] Fix pyright --- src/asyncapi_python/contrib/wire/amqp/producer.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/asyncapi_python/contrib/wire/amqp/producer.py b/src/asyncapi_python/contrib/wire/amqp/producer.py index 921e06c..acc06e4 100644 --- a/src/asyncapi_python/contrib/wire/amqp/producer.py +++ b/src/asyncapi_python/contrib/wire/amqp/producer.py @@ -120,13 +120,8 @@ async def send_batch( address_override if address_override is not None else self._routing_key ) - # Validate we have a destination - # Note: empty string is valid for default exchange with default queue - if effective_routing_key is None: - raise ValueError( - f"Cannot send: no routing destination specified. " - f"address_override={address_override}, routing_key={self._routing_key}" - ) + # Note: empty string is valid for default exchange routing + # All valid routing configurations should result in a non-None string at this point for message in messages: amqp_message = AmqpMessage( From 66dcce74dfaab5fe40f2eec14eddc94ba3cd975a Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Mon, 20 Oct 2025 20:27:04 +0000 Subject: [PATCH 04/15] Update resolver logic --- src/asyncapi_python/contrib/wire/amqp/producer.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/asyncapi_python/contrib/wire/amqp/producer.py b/src/asyncapi_python/contrib/wire/amqp/producer.py index acc06e4..82da03c 100644 --- a/src/asyncapi_python/contrib/wire/amqp/producer.py +++ b/src/asyncapi_python/contrib/wire/amqp/producer.py @@ -120,8 +120,18 @@ async def send_batch( address_override if address_override is not None else self._routing_key ) - # Note: empty string is valid for default exchange routing - # All valid routing configurations should result in a non-None string at this point + # Validate we have a destination + # Fail ONLY if both are truly missing: + # - address_override is None (not provided by caller) + # - AND self._routing_key is "" (no static config was derived from channel/bindings/operation) + # Note: empty string IS valid when explicitly configured (fanout exchanges, default exchange) + if address_override is None and not self._routing_key: + raise ValueError( + f"Cannot send: no routing destination available. " + f"RPC replies require reply_to from the request, or the channel must " + f"have address/bindings/operation-name to derive destination. " + f"(address_override={address_override}, routing_key={self._routing_key!r})" + ) for message in messages: amqp_message = AmqpMessage( From b7cf40f6d9278b7d37eaab8bc27a5fd90b5636fa Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Tue, 21 Oct 2025 07:56:45 +0000 Subject: [PATCH 05/15] Move service_name to Application from wire --- src/asyncapi_python/contrib/codec/__init__.py | 1 - src/asyncapi_python/contrib/codec/json.py | 2 +- src/asyncapi_python/contrib/codec/registry.py | 6 ++- .../contrib/wire/amqp/consumer.py | 4 +- .../contrib/wire/amqp/factory.py | 21 +++++--- .../contrib/wire/amqp/resolver.py | 42 +++++++++++----- .../contrib/wire/amqp/utils.py | 1 + src/asyncapi_python/kernel/application.py | 16 +++++-- src/asyncapi_python/kernel/codec.py | 1 + .../kernel/document/__init__.py | 14 +++--- .../kernel/document/bindings.py | 2 +- .../kernel/document/channel.py | 5 +- .../kernel/document/message.py | 4 +- .../kernel/document/operation.py | 5 +- .../kernel/endpoint/__init__.py | 4 +- src/asyncapi_python/kernel/endpoint/abc.py | 14 ++++-- .../kernel/endpoint/publisher.py | 6 ++- .../kernel/endpoint/rpc_client.py | 17 ++++--- .../kernel/endpoint/rpc_reply_handler.py | 48 +++++++++++++------ .../kernel/endpoint/subscriber.py | 14 ++---- src/asyncapi_python/kernel/typing.py | 5 +- src/asyncapi_python/kernel/wire/__init__.py | 11 +++-- src/asyncapi_python_codegen/__init__.py | 6 +-- .../generators/main.py | 6 +-- .../generators/messages.py | 5 +- .../generators/parameters.py | 1 + .../generators/routers.py | 3 +- .../parser/__init__.py | 2 +- src/asyncapi_python_codegen/parser/context.py | 1 + .../parser/document_loader.py | 6 ++- .../parser/extractors.py | 21 ++++---- .../parser/references.py | 6 ++- src/asyncapi_python_codegen/parser/types.py | 2 +- src/asyncapi_python_pants/register.py | 4 +- src/asyncapi_python_pants/rules.py | 32 +++++++------ src/asyncapi_python_pants/targets.py | 16 +++---- tests/codegen/test_parser.py | 7 +-- tests/conftest.py | 1 + tests/integration/scenarios/__init__.py | 6 +-- .../integration/scenarios/batch_processing.py | 8 ++-- tests/integration/scenarios/error_handling.py | 10 ++-- tests/integration/scenarios/fan_in_logging.py | 15 +++--- .../scenarios/fan_out_broadcasting.py | 15 +++--- .../scenarios/malformed_messages.py | 12 +++-- .../scenarios/many_to_many_microservices.py | 14 +++--- .../scenarios/producer_consumer.py | 8 ++-- tests/integration/scenarios/reply_channel.py | 7 +-- .../integration/test_wire_codec_scenarios.py | 21 ++++---- .../endpoint/test_exception_handling.py | 11 +++-- .../endpoint/test_handler_enforcement.py | 9 ++-- tests/kernel/endpoint/test_rpc_endpoints.py | 14 +++++- 51 files changed, 312 insertions(+), 200 deletions(-) diff --git a/src/asyncapi_python/contrib/codec/__init__.py b/src/asyncapi_python/contrib/codec/__init__.py index 77c48ce..45ee851 100644 --- a/src/asyncapi_python/contrib/codec/__init__.py +++ b/src/asyncapi_python/contrib/codec/__init__.py @@ -2,5 +2,4 @@ from .registry import CodecRegistry - __all__ = ["CodecRegistry"] diff --git a/src/asyncapi_python/contrib/codec/json.py b/src/asyncapi_python/contrib/codec/json.py index b0e5c38..9872660 100644 --- a/src/asyncapi_python/contrib/codec/json.py +++ b/src/asyncapi_python/contrib/codec/json.py @@ -1,6 +1,6 @@ import json -from typing import Type, ClassVar from types import ModuleType +from typing import ClassVar, Type from pydantic import BaseModel, ValidationError diff --git a/src/asyncapi_python/contrib/codec/registry.py b/src/asyncapi_python/contrib/codec/registry.py index d118023..b7c9a75 100644 --- a/src/asyncapi_python/contrib/codec/registry.py +++ b/src/asyncapi_python/contrib/codec/registry.py @@ -1,7 +1,9 @@ -from typing import ClassVar, Any from types import ModuleType -from asyncapi_python.kernel.codec import CodecFactory, Codec +from typing import Any, ClassVar + +from asyncapi_python.kernel.codec import Codec, CodecFactory from asyncapi_python.kernel.document.message import Message + from .json import JsonCodecFactory diff --git a/src/asyncapi_python/contrib/wire/amqp/consumer.py b/src/asyncapi_python/contrib/wire/amqp/consumer.py index ed4ed32..1530644 100644 --- a/src/asyncapi_python/contrib/wire/amqp/consumer.py +++ b/src/asyncapi_python/contrib/wire/amqp/consumer.py @@ -6,10 +6,10 @@ try: from aio_pika import ExchangeType # type: ignore[import-not-found] from aio_pika.abc import ( # type: ignore[import-not-found] - AbstractConnection, AbstractChannel, - AbstractQueue, + AbstractConnection, AbstractExchange, + AbstractQueue, ) except ImportError as e: raise ImportError( diff --git a/src/asyncapi_python/contrib/wire/amqp/factory.py b/src/asyncapi_python/contrib/wire/amqp/factory.py index e513e02..e4ce610 100644 --- a/src/asyncapi_python/contrib/wire/amqp/factory.py +++ b/src/asyncapi_python/contrib/wire/amqp/factory.py @@ -1,7 +1,8 @@ """AMQP wire factory implementation""" import secrets -from typing import Optional, Callable, Any, cast +from typing import Any, Callable, Optional, cast + from typing_extensions import Unpack try: @@ -13,11 +14,11 @@ ) from e from asyncapi_python.kernel.wire import AbstractWireFactory, EndpointParams -from asyncapi_python.kernel.wire.typing import Producer, Consumer +from asyncapi_python.kernel.wire.typing import Consumer, Producer -from .message import AmqpWireMessage, AmqpIncomingMessage -from .producer import AmqpProducer from .consumer import AmqpConsumer +from .message import AmqpIncomingMessage, AmqpWireMessage +from .producer import AmqpProducer from .resolver import resolve_amqp_config @@ -135,8 +136,12 @@ async def create_consumer( # Generate operation name from available information operation_name = self._generate_operation_name(kwargs) + # Use provided app_id if available, otherwise use instance app_id + # This allows application-level control over queue naming + app_id = kwargs.get("app_id", self._app_id) + # Resolve AMQP configuration using pattern matching - config = resolve_amqp_config(kwargs, operation_name, self._app_id) + config = resolve_amqp_config(kwargs, operation_name, app_id) connection = await self._get_connection() @@ -154,8 +159,12 @@ async def create_producer( # Generate operation name from available information operation_name = self._generate_operation_name(kwargs) + # Use provided app_id if available, otherwise use instance app_id + # This allows application-level control over queue naming + app_id = kwargs.get("app_id", self._app_id) + # Resolve AMQP configuration using pattern matching - config = resolve_amqp_config(kwargs, operation_name, self._app_id) + config = resolve_amqp_config(kwargs, operation_name, app_id) connection = await self._get_connection() diff --git a/src/asyncapi_python/contrib/wire/amqp/resolver.py b/src/asyncapi_python/contrib/wire/amqp/resolver.py index 2ee82a5..a4e5c0c 100644 --- a/src/asyncapi_python/contrib/wire/amqp/resolver.py +++ b/src/asyncapi_python/contrib/wire/amqp/resolver.py @@ -1,12 +1,13 @@ """Binding resolution with comprehensive pattern matching""" from typing import Any -from asyncapi_python.kernel.wire import EndpointParams -from asyncapi_python.kernel.document.channel import Channel + from asyncapi_python.kernel.document.bindings import AmqpChannelBinding +from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.wire import EndpointParams -from .config import AmqpConfig, AmqpBindingType -from .utils import validate_parameters_strict, substitute_parameters +from .config import AmqpBindingType, AmqpConfig +from .utils import substitute_parameters, validate_parameters_strict def resolve_amqp_config( @@ -57,17 +58,32 @@ def resolve_amqp_config( }, ) - # Reply channel with explicit address - shared channel with filtering + # Reply channel with explicit address - check if direct queue or topic exchange case (True, _, address, _) if address: resolved_address = substitute_parameters(address, param_values) - return AmqpConfig( - queue_name=f"reply-{app_id}", # App-specific reply queue - exchange_name=resolved_address, # Shared exchange for replies - exchange_type="topic", # Enable pattern matching for filtering - routing_key=app_id, # Filter messages by app_id - binding_type=AmqpBindingType.REPLY, - queue_properties={"durable": True, "exclusive": False}, - ) + # If address starts with "reply-", treat it as a direct queue name (RPC pattern) + if resolved_address.startswith("reply-"): + return AmqpConfig( + queue_name=resolved_address, # Use address as queue name + exchange_name="", # Default exchange for direct routing + routing_key=resolved_address, # Route directly to queue + binding_type=AmqpBindingType.REPLY, + queue_properties={ + "durable": False, + "exclusive": True, + "auto_delete": True, + }, + ) + else: + # Topic-based reply pattern - shared exchange with filtering + return AmqpConfig( + queue_name=f"reply-{app_id}", # App-specific reply queue + exchange_name=resolved_address, # Shared exchange for replies + exchange_type="topic", # Enable pattern matching for filtering + routing_key=app_id, # Filter messages by app_id + binding_type=AmqpBindingType.REPLY, + queue_properties={"durable": True, "exclusive": False}, + ) # Reply channel with binding - defer to binding resolution case (True, binding, _, _) if binding and binding.type == "queue": diff --git a/src/asyncapi_python/contrib/wire/amqp/utils.py b/src/asyncapi_python/contrib/wire/amqp/utils.py index cf43415..b28bfbd 100644 --- a/src/asyncapi_python/contrib/wire/amqp/utils.py +++ b/src/asyncapi_python/contrib/wire/amqp/utils.py @@ -3,6 +3,7 @@ # TODO: This thing should be general wire utils, not tied to specific wire import re + from asyncapi_python.kernel.document.channel import Channel diff --git a/src/asyncapi_python/kernel/application.py b/src/asyncapi_python/kernel/application.py index 4a87a66..99edf1f 100644 --- a/src/asyncapi_python/kernel/application.py +++ b/src/asyncapi_python/kernel/application.py @@ -1,34 +1,44 @@ import asyncio -from typing import TypedDict, Any -from typing_extensions import Unpack, Required, NotRequired +from typing import Any, TypedDict + +from typing_extensions import NotRequired, Required, Unpack from asyncapi_python.kernel.document.operation import Operation from asyncapi_python.kernel.wire import AbstractWireFactory + +from .codec import CodecFactory from .endpoint import AbstractEndpoint, EndpointFactory from .endpoint.abc import EndpointParams -from .codec import CodecFactory class BaseApplication: class Inputs(TypedDict): wire_factory: Required[AbstractWireFactory[Any, Any]] codec_factory: Required[CodecFactory[Any, Any]] + service_name: NotRequired[str] endpoint_params: NotRequired[EndpointParams] def __init__(self, **kwargs: Unpack[Inputs]) -> None: self.__endpoints: set[AbstractEndpoint] = set() self.__wire_factory: AbstractWireFactory[Any, Any] = kwargs["wire_factory"] self.__codec_factory: CodecFactory[Any, Any] = kwargs["codec_factory"] + self.__service_name: str = kwargs.get("service_name", "app") self.__endpoint_params: EndpointParams = kwargs.get("endpoint_params", {}) self._stop_event: asyncio.Event | None = None self._monitor_task: asyncio.Task[None] | None = None self._exception_future: asyncio.Future[Exception] | None = None + @property + def service_name(self) -> str: + """Get the service name for this application""" + return self.__service_name + def _register_endpoint(self, op: Operation) -> AbstractEndpoint: endpoint = EndpointFactory.create( operation=op, wire_factory=self.__wire_factory, codec_factory=self.__codec_factory, + service_name=self.__service_name, endpoint_params=self.__endpoint_params, ) self.__endpoints.add(endpoint) diff --git a/src/asyncapi_python/kernel/codec.py b/src/asyncapi_python/kernel/codec.py index 7a51e83..ed66fab 100644 --- a/src/asyncapi_python/kernel/codec.py +++ b/src/asyncapi_python/kernel/codec.py @@ -3,6 +3,7 @@ from typing import Generic, Protocol from asyncapi_python.kernel.document.message import Message + from .typing import T_DecodedPayload, T_EncodedPayload diff --git a/src/asyncapi_python/kernel/document/__init__.py b/src/asyncapi_python/kernel/document/__init__.py index 9e56430..1166df2 100644 --- a/src/asyncapi_python/kernel/document/__init__.py +++ b/src/asyncapi_python/kernel/document/__init__.py @@ -1,3 +1,10 @@ +from .bindings import ( + AmqpChannelBinding, + AmqpExchange, + AmqpExchangeType, + AmqpOperationBinding, + AmqpQueue, +) from .channel import AddressParameter, Channel, ChannelBindings from .common import ExternalDocs, Server, Tag from .message import ( @@ -15,13 +22,6 @@ OperationTrait, SecurityScheme, ) -from .bindings import ( - AmqpChannelBinding, - AmqpOperationBinding, - AmqpExchange, - AmqpQueue, - AmqpExchangeType, -) __all__ = [ # channel diff --git a/src/asyncapi_python/kernel/document/bindings.py b/src/asyncapi_python/kernel/document/bindings.py index 246b6b7..ba5abbf 100644 --- a/src/asyncapi_python/kernel/document/bindings.py +++ b/src/asyncapi_python/kernel/document/bindings.py @@ -3,8 +3,8 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import Any, Dict, Literal, Optional from enum import Enum +from typing import Any, Dict, Literal, Optional class AmqpExchangeType(str, Enum): diff --git a/src/asyncapi_python/kernel/document/channel.py b/src/asyncapi_python/kernel/document/channel.py index 373bad7..a9b2432 100644 --- a/src/asyncapi_python/kernel/document/channel.py +++ b/src/asyncapi_python/kernel/document/channel.py @@ -1,8 +1,9 @@ from dataclasses import dataclass from typing import Any -from .message import Message -from .common import * + from .bindings import AmqpChannelBinding +from .common import * +from .message import Message __all__ = ["AddressParameter", "ChannelBindings", "Channel"] diff --git a/src/asyncapi_python/kernel/document/message.py b/src/asyncapi_python/kernel/document/message.py index e7a0d09..097fcf5 100644 --- a/src/asyncapi_python/kernel/document/message.py +++ b/src/asyncapi_python/kernel/document/message.py @@ -1,8 +1,10 @@ from __future__ import annotations + from dataclasses import dataclass from typing import Any -from .common import * + from .bindings import AmqpMessageBinding +from .common import * __all__ = [ "CorrelationId", diff --git a/src/asyncapi_python/kernel/document/operation.py b/src/asyncapi_python/kernel/document/operation.py index b0c0930..b6cd32c 100644 --- a/src/asyncapi_python/kernel/document/operation.py +++ b/src/asyncapi_python/kernel/document/operation.py @@ -1,9 +1,10 @@ from dataclasses import dataclass from typing import Any, Literal -from .common import * + +from .bindings import AmqpOperationBinding from .channel import Channel +from .common import * from .message import Message -from .bindings import AmqpOperationBinding __all__ = [ "SecurityScheme", diff --git a/src/asyncapi_python/kernel/endpoint/__init__.py b/src/asyncapi_python/kernel/endpoint/__init__.py index c8b20ec..b709cde 100644 --- a/src/asyncapi_python/kernel/endpoint/__init__.py +++ b/src/asyncapi_python/kernel/endpoint/__init__.py @@ -1,10 +1,12 @@ from typing import ClassVar, Literal + from typing_extensions import Unpack + from .abc import AbstractEndpoint from .publisher import Publisher -from .subscriber import Subscriber from .rpc_client import RpcClient from .rpc_server import RpcServer +from .subscriber import Subscriber __all__ = [ "AbstractEndpoint", diff --git a/src/asyncapi_python/kernel/endpoint/abc.py b/src/asyncapi_python/kernel/endpoint/abc.py index 5eb2281..855181c 100644 --- a/src/asyncapi_python/kernel/endpoint/abc.py +++ b/src/asyncapi_python/kernel/endpoint/abc.py @@ -1,11 +1,13 @@ from abc import ABC, abstractmethod -from typing import Any, Callable, Generic, TypedDict, overload, Union -from typing_extensions import Unpack, Required, NotRequired +from typing import Any, Callable, Generic, TypedDict, Union, overload + +from typing_extensions import NotRequired, Required, Unpack -from ..typing import Handler, T_Input, T_Output, BatchConfig -from asyncapi_python.kernel.wire import AbstractWireFactory -from asyncapi_python.kernel.document import Operation from asyncapi_python.kernel.codec import Codec, CodecFactory +from asyncapi_python.kernel.document import Operation +from asyncapi_python.kernel.wire import AbstractWireFactory + +from ..typing import BatchConfig, Handler, T_Input, T_Output class EndpointParams(TypedDict): @@ -29,6 +31,7 @@ class Inputs(TypedDict): operation: Required[Operation] wire_factory: Required[AbstractWireFactory[Any, Any]] codec_factory: Required[CodecFactory[Any, Any]] + service_name: NotRequired[str] # Service name for app_id generation endpoint_params: NotRequired[EndpointParams] # Optional endpoint configuration class StartParams(TypedDict): @@ -40,6 +43,7 @@ class StartParams(TypedDict): def __init__(self, **kwargs: Unpack[Inputs]): self._operation = kwargs["operation"] self._wire = kwargs["wire_factory"] + self._service_name = kwargs.get("service_name", "app") codec_factory = kwargs["codec_factory"] # Endpoint sets its own defaults - empty dict if not provided self._endpoint_params = kwargs.get("endpoint_params", {}) diff --git a/src/asyncapi_python/kernel/endpoint/publisher.py b/src/asyncapi_python/kernel/endpoint/publisher.py index 34e03f4..1ebb265 100644 --- a/src/asyncapi_python/kernel/endpoint/publisher.py +++ b/src/asyncapi_python/kernel/endpoint/publisher.py @@ -1,11 +1,13 @@ from typing import Generic + from typing_extensions import Unpack +from asyncapi_python.kernel.wire import Producer + +from ..typing import T_Input from .abc import AbstractEndpoint, Send from .exceptions import UninitializedError from .message import WireMessage -from ..typing import T_Input -from asyncapi_python.kernel.wire import Producer class Publisher(AbstractEndpoint, Send[T_Input, None], Generic[T_Input]): diff --git a/src/asyncapi_python/kernel/endpoint/rpc_client.py b/src/asyncapi_python/kernel/endpoint/rpc_client.py index 73b67aa..76c13d1 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_client.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_client.py @@ -1,15 +1,15 @@ import asyncio from typing import Generic -from typing_extensions import Unpack from uuid import uuid4 -from .abc import AbstractEndpoint, Send -from .exceptions import UninitializedError, TimeoutError -from .message import WireMessage -from ..typing import T_Input, T_Output, IncomingMessage -from asyncapi_python.kernel.wire import Producer +from typing_extensions import Unpack +from asyncapi_python.kernel.wire import Producer +from ..typing import IncomingMessage, T_Input, T_Output +from .abc import AbstractEndpoint, Send +from .exceptions import TimeoutError, UninitializedError +from .message import WireMessage from .rpc_reply_handler import global_reply_handler @@ -44,7 +44,9 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None: global_reply_handler.increment_instance_count() # Ensure global reply handling is set up (only happens once) - await global_reply_handler.ensure_reply_handler(self._wire, self._operation) + await global_reply_handler.ensure_reply_handler( + self._wire, self._operation, self._service_name + ) # Create instance-specific producer for sending requests self._producer = await self._wire.create_producer( @@ -52,6 +54,7 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None: parameters={}, op_bindings=self._operation.bindings, is_reply=False, + app_id=self._service_name, ) # Start producer diff --git a/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py b/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py index 2d324c2..7d0eca8 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py @@ -2,11 +2,12 @@ import asyncio import secrets - -from ..typing import IncomingMessage from typing import Any -from asyncapi_python.kernel.wire import Consumer, AbstractWireFactory + from asyncapi_python.kernel.document import Channel, Operation +from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer + +from ..typing import IncomingMessage class GlobalRpcReplyHandler: @@ -25,37 +26,56 @@ def __init__(self) -> None: self._instance_count: int = 0 async def ensure_reply_handler( - self, wire_factory: AbstractWireFactory[Any, Any], operation: Operation + self, + wire_factory: AbstractWireFactory[Any, Any], + operation: Operation, + service_name: str = "app", ) -> None: - """Ensure reply consumer and task are running""" + """Ensure reply consumer and task are running + + Args: + wire_factory: Wire factory for creating consumer + operation: Operation definition + service_name: Service name for generating consistent app_id + """ if self._reply_consumer is None: - # Create reply consumer (only once for all instances) - reply_channel = self._get_or_create_reply_channel(operation) + # Generate app_id with service name + random hex (same format as AmqpWire) + random_hex = secrets.token_hex(4) # 4 bytes = 8 hex chars + app_id = f"{service_name}-{random_hex}" + + # Use app_id as the reply queue name + self._reply_queue_name = f"reply-{app_id}" + # Create reply channel with the generated queue name as address + reply_channel = self._get_or_create_reply_channel( + operation, self._reply_queue_name + ) + + # Create reply consumer with the channel (wire factory will use the address) self._reply_consumer = await wire_factory.create_consumer( channel=reply_channel, parameters={}, op_bindings=None, is_reply=True, + app_id=app_id, ) - # Generate unique reply queue name for all clients - self._reply_queue_name = f"reply-{secrets.token_hex(8)}" - # Start the consumer await self._reply_consumer.start() # Start background task self._consume_task = asyncio.create_task(self._consume_all_replies()) - def _get_or_create_reply_channel(self, operation: Operation) -> Channel: - """Get reply channel from operation or create default one""" + def _get_or_create_reply_channel( + self, operation: Operation, queue_name: str + ) -> Channel: + """Get reply channel from operation or create default one with specified queue name""" if operation.reply and operation.reply.channel: return operation.reply.channel else: - # Create a default reply channel for global use + # Create a default reply channel with the generated queue name as address return Channel( - address=None, # Use default/null address for global reply queue + address=queue_name, # Use the generated queue name as address title="Global RPC Reply Queue", summary=None, description=None, diff --git a/src/asyncapi_python/kernel/endpoint/subscriber.py b/src/asyncapi_python/kernel/endpoint/subscriber.py index 919eae0..d37ecad 100644 --- a/src/asyncapi_python/kernel/endpoint/subscriber.py +++ b/src/asyncapi_python/kernel/endpoint/subscriber.py @@ -1,18 +1,14 @@ import asyncio from typing import Any, Callable, Generic, overload + from typing_extensions import Unpack -from .abc import AbstractEndpoint, Receive, HandlerParams -from ..typing import ( - T_Input, - Handler, - BatchConsumer, - BatchConfig, - IncomingMessage, -) -from ..exceptions import Reject from asyncapi_python.kernel.wire import Consumer +from ..exceptions import Reject +from ..typing import BatchConfig, BatchConsumer, Handler, IncomingMessage, T_Input +from .abc import AbstractEndpoint, HandlerParams, Receive + class Subscriber(AbstractEndpoint, Receive[T_Input, None], Generic[T_Input]): """Subscriber endpoint for receiving messages without sending replies""" diff --git a/src/asyncapi_python/kernel/typing.py b/src/asyncapi_python/kernel/typing.py index 4074cca..5e87090 100644 --- a/src/asyncapi_python/kernel/typing.py +++ b/src/asyncapi_python/kernel/typing.py @@ -4,9 +4,10 @@ between application data, encoded data, and wire messages. """ -from typing import Any, Protocol, TypeVar, TypedDict -from typing_extensions import TypeAlias, Required from types import CodeType +from typing import Any, Protocol, TypedDict, TypeVar + +from typing_extensions import Required, TypeAlias # Base protocols for type bounds diff --git a/src/asyncapi_python/kernel/wire/__init__.py b/src/asyncapi_python/kernel/wire/__init__.py index c45bfaf..ed6d61f 100644 --- a/src/asyncapi_python/kernel/wire/__init__.py +++ b/src/asyncapi_python/kernel/wire/__init__.py @@ -1,9 +1,11 @@ -from .typing import Producer, Consumer -from ..typing import T_Recv, T_Send +from abc import ABC, abstractmethod from typing import Generic, TypedDict -from typing_extensions import Unpack -from abc import abstractmethod, ABC + +from typing_extensions import NotRequired, Unpack + from ..document import Channel, OperationBindings +from ..typing import T_Recv, T_Send +from .typing import Consumer, Producer class EndpointParams(TypedDict): @@ -11,6 +13,7 @@ class EndpointParams(TypedDict): parameters: dict[str, str] op_bindings: OperationBindings | None is_reply: bool + app_id: NotRequired[str] # Optional app_id for queue naming class AbstractWireFactory(ABC, Generic[T_Send, T_Recv]): diff --git a/src/asyncapi_python_codegen/__init__.py b/src/asyncapi_python_codegen/__init__.py index 6f5d704..ab953ec 100644 --- a/src/asyncapi_python_codegen/__init__.py +++ b/src/asyncapi_python_codegen/__init__.py @@ -1,10 +1,10 @@ """AsyncAPI Python Code Generator.""" +from importlib.metadata import version + +from .cli import app from .generators import CodeGenerator from .parser import extract_all_operations, load_document_info -from .cli import app - -from importlib.metadata import version try: __version__ = version("asyncapi-python") diff --git a/src/asyncapi_python_codegen/generators/main.py b/src/asyncapi_python_codegen/generators/main.py index 7112a47..861d3d0 100644 --- a/src/asyncapi_python_codegen/generators/main.py +++ b/src/asyncapi_python_codegen/generators/main.py @@ -2,13 +2,13 @@ from pathlib import Path -# Type annotations removed - this module deals with dynamic YAML/JSON parsing - from ..parser import extract_all_operations, load_document_info from .messages import MessageGenerator +from .parameters import ParameterGenerator from .routers import RouterGenerator from .templates import TemplateRenderer -from .parameters import ParameterGenerator + +# Type annotations removed - this module deals with dynamic YAML/JSON parsing class CodeGenerator: diff --git a/src/asyncapi_python_codegen/generators/messages.py b/src/asyncapi_python_codegen/generators/messages.py index 373cfc3..282f298 100644 --- a/src/asyncapi_python_codegen/generators/messages.py +++ b/src/asyncapi_python_codegen/generators/messages.py @@ -3,13 +3,14 @@ import json import re import tempfile -import yaml from pathlib import Path from typing import Any -from asyncapi_python.kernel.document import Operation +import yaml from datamodel_code_generator.__main__ import main as datamodel_codegen +from asyncapi_python.kernel.document import Operation + class MessageGenerator: """Generates Pydantic message models using datamodel-code-generator.""" diff --git a/src/asyncapi_python_codegen/generators/parameters.py b/src/asyncapi_python_codegen/generators/parameters.py index 4033aa5..0379584 100644 --- a/src/asyncapi_python_codegen/generators/parameters.py +++ b/src/asyncapi_python_codegen/generators/parameters.py @@ -4,6 +4,7 @@ import tempfile from pathlib import Path from typing import Any + from datamodel_code_generator.__main__ import main as datamodel_codegen diff --git a/src/asyncapi_python_codegen/generators/routers.py b/src/asyncapi_python_codegen/generators/routers.py index 4a9f994..f2e3447 100644 --- a/src/asyncapi_python_codegen/generators/routers.py +++ b/src/asyncapi_python_codegen/generators/routers.py @@ -1,7 +1,8 @@ """Router generation with nested path support.""" -from typing import Any from dataclasses import dataclass +from typing import Any + from asyncapi_python.kernel.document import Channel, Operation from asyncapi_python.utils import snake_case diff --git a/src/asyncapi_python_codegen/parser/__init__.py b/src/asyncapi_python_codegen/parser/__init__.py index 4c04108..6f441cc 100644 --- a/src/asyncapi_python_codegen/parser/__init__.py +++ b/src/asyncapi_python_codegen/parser/__init__.py @@ -1,6 +1,6 @@ """AsyncAPI dataclass-based parser using kernel.document types.""" -from .types import YamlDocument from .document_loader import extract_all_operations, load_document_info +from .types import YamlDocument __all__ = ["YamlDocument", "extract_all_operations", "load_document_info"] diff --git a/src/asyncapi_python_codegen/parser/context.py b/src/asyncapi_python_codegen/parser/context.py index 5a0fc9c..37f9a3c 100644 --- a/src/asyncapi_python_codegen/parser/context.py +++ b/src/asyncapi_python_codegen/parser/context.py @@ -4,6 +4,7 @@ from contextlib import contextmanager from pathlib import Path from typing import Generator, Optional + from .types import ParseContext # Thread-local storage for context stack diff --git a/src/asyncapi_python_codegen/parser/document_loader.py b/src/asyncapi_python_codegen/parser/document_loader.py index ab5059b..7192237 100644 --- a/src/asyncapi_python_codegen/parser/document_loader.py +++ b/src/asyncapi_python_codegen/parser/document_loader.py @@ -1,10 +1,12 @@ """Main document loader and operations extractor.""" from pathlib import Path + from asyncapi_python.kernel.document import Operation -from .references import load_yaml_file -from .extractors import extract_operation + from .context import parsing_context +from .extractors import extract_operation +from .references import load_yaml_file def extract_all_operations(yaml_path: Path) -> dict[str, Operation]: diff --git a/src/asyncapi_python_codegen/parser/extractors.py b/src/asyncapi_python_codegen/parser/extractors.py index f530cee..b81230f 100644 --- a/src/asyncapi_python_codegen/parser/extractors.py +++ b/src/asyncapi_python_codegen/parser/extractors.py @@ -2,25 +2,26 @@ # Type imports for extraction functions from asyncapi_python.kernel.document import ( + AddressParameter, Channel, ChannelBindings, - AddressParameter, + CorrelationId, + ExternalDocs, + Message, + MessageBindings, + MessageExample, + MessageTrait, Operation, - OperationReply, OperationBindings, + OperationReply, OperationTrait, SecurityScheme, - Message, - MessageBindings, - MessageTrait, - MessageExample, - CorrelationId, - Tag, - ExternalDocs, Server, + Tag, ) -from .types import YamlDocument + from .references import maybe_ref +from .types import YamlDocument @maybe_ref diff --git a/src/asyncapi_python_codegen/parser/references.py b/src/asyncapi_python_codegen/parser/references.py index b622209..dc9d328 100644 --- a/src/asyncapi_python_codegen/parser/references.py +++ b/src/asyncapi_python_codegen/parser/references.py @@ -1,11 +1,13 @@ """Reference resolution decorator and utilities.""" -import yaml from functools import wraps from pathlib import Path from typing import Any, Callable, TypeVar + +import yaml + +from .context import get_current_context, pop_context, push_context from .types import YamlDocument, navigate_json_pointer -from .context import get_current_context, push_context, pop_context T = TypeVar("T") diff --git a/src/asyncapi_python_codegen/parser/types.py b/src/asyncapi_python_codegen/parser/types.py index 1886b3a..5ef8415 100644 --- a/src/asyncapi_python_codegen/parser/types.py +++ b/src/asyncapi_python_codegen/parser/types.py @@ -1,7 +1,7 @@ """Type aliases and basic types for AsyncAPI parsing.""" -from typing import Any from pathlib import Path +from typing import Any # Type alias for raw YAML document data YamlDocument = dict[str, Any] diff --git a/src/asyncapi_python_pants/register.py b/src/asyncapi_python_pants/register.py index 3cea2e8..943285f 100644 --- a/src/asyncapi_python_pants/register.py +++ b/src/asyncapi_python_pants/register.py @@ -1,7 +1,7 @@ -from pants.engine.rules import collect_rules -from pants.engine.unions import UnionRule from pants.backend.python.util_rules import pex from pants.core.goals.resolves import ExportableTool +from pants.engine.rules import collect_rules +from pants.engine.unions import UnionRule from .rules import * from .targets import * diff --git a/src/asyncapi_python_pants/rules.py b/src/asyncapi_python_pants/rules.py index 7db50cf..305ab4e 100644 --- a/src/asyncapi_python_pants/rules.py +++ b/src/asyncapi_python_pants/rules.py @@ -1,31 +1,33 @@ from importlib.metadata import version + +from pants.backend.python.target_types import ConsoleScript +from pants.backend.python.util_rules.interpreter_constraints import ( + InterpreterConstraints, +) +from pants.backend.python.util_rules.pex import ( + Pex, + PexProcess, + PexRequest, + PexRequirements, +) +from pants.core.util_rules.source_files import SourceFilesRequest +from pants.core.util_rules.stripped_source_files import StrippedSourceFiles from pants.engine.internals.native_engine import ( + AddPrefix, Digest, MergeDigests, RemovePrefix, - AddPrefix, Snapshot, ) -from pants.core.util_rules.stripped_source_files import StrippedSourceFiles -from pants.core.util_rules.source_files import SourceFilesRequest +from pants.engine.process import ProcessResult +from pants.engine.rules import Get, MultiGet, rule from pants.engine.target import ( GeneratedSources, TransitiveTargets, TransitiveTargetsRequest, ) -from pants.engine.rules import rule, Get, MultiGet -from pants.engine.process import ProcessResult from pants.source.source_root import SourceRoot, SourceRootRequest -from pants.backend.python.target_types import ConsoleScript -from pants.backend.python.util_rules.interpreter_constraints import ( - InterpreterConstraints, -) -from pants.backend.python.util_rules.pex import ( - Pex, - PexProcess, - PexRequest, - PexRequirements, -) + from .targets import * diff --git a/src/asyncapi_python_pants/targets.py b/src/asyncapi_python_pants/targets.py index 5bdb87a..1e1e903 100644 --- a/src/asyncapi_python_pants/targets.py +++ b/src/asyncapi_python_pants/targets.py @@ -1,16 +1,16 @@ -from pants.engine.target import GenerateSourcesRequest +from pants.backend.python.target_types import ( + InterpreterConstraintsField, + PythonResolveField, + PythonSourceField, +) from pants.engine.target import ( COMMON_TARGET_FIELDS, - Dependencies, - Target, AsyncFieldMixin, + Dependencies, + GenerateSourcesRequest, MultipleSourcesField, StringField, -) -from pants.backend.python.target_types import PythonSourceField -from pants.backend.python.target_types import ( - InterpreterConstraintsField, - PythonResolveField, + Target, ) diff --git a/tests/codegen/test_parser.py b/tests/codegen/test_parser.py index 6621dff..6651253 100644 --- a/tests/codegen/test_parser.py +++ b/tests/codegen/test_parser.py @@ -1,13 +1,14 @@ """Unit tests for AsyncAPI dataclass parser.""" -import pytest from pathlib import Path + +import pytest + +from asyncapi_python.kernel.document import Channel, Message, Operation from src.asyncapi_python_codegen.parser import ( extract_all_operations, load_document_info, ) -from asyncapi_python.kernel.document import Operation, Channel, Message - # Test basic parser functionality diff --git a/tests/conftest.py b/tests/conftest.py index b409b42..14eb405 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,6 +16,7 @@ import asyncio from os import environ from typing import Generator + import pytest from asyncapi_python.contrib.wire.in_memory import reset_bus diff --git a/tests/integration/scenarios/__init__.py b/tests/integration/scenarios/__init__.py index b256772..358012e 100644 --- a/tests/integration/scenarios/__init__.py +++ b/tests/integration/scenarios/__init__.py @@ -1,12 +1,12 @@ """Test scenarios for wire+codec combinations""" -from .producer_consumer import producer_consumer_roundtrip -from .reply_channel import reply_channel_creation from .error_handling import error_handling -from .malformed_messages import malformed_message_handling from .fan_in_logging import fan_in_logging from .fan_out_broadcasting import fan_out_broadcasting +from .malformed_messages import malformed_message_handling from .many_to_many_microservices import many_to_many_microservices +from .producer_consumer import producer_consumer_roundtrip +from .reply_channel import reply_channel_creation __all__ = [ "producer_consumer_roundtrip", diff --git a/tests/integration/scenarios/batch_processing.py b/tests/integration/scenarios/batch_processing.py index 5fc952c..cab7afc 100644 --- a/tests/integration/scenarios/batch_processing.py +++ b/tests/integration/scenarios/batch_processing.py @@ -1,12 +1,14 @@ """Batch processing integration test scenario""" import asyncio -from asyncapi_python.kernel.wire import AbstractWireFactory + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory + from ..test_app.messages.json import UserCreated, UserUpdated diff --git a/tests/integration/scenarios/error_handling.py b/tests/integration/scenarios/error_handling.py index ee56041..870c4a8 100644 --- a/tests/integration/scenarios/error_handling.py +++ b/tests/integration/scenarios/error_handling.py @@ -1,16 +1,18 @@ """Error handling scenario""" import asyncio + import pytest -from asyncapi_python.kernel.wire import AbstractWireFactory + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory # Import test models -from ..test_app.messages.json import TestUser, UserCreated, UserUpdated, TestEvent +from ..test_app.messages.json import TestEvent, TestUser, UserCreated, UserUpdated class UserManagementApp(BaseApplication): diff --git a/tests/integration/scenarios/fan_in_logging.py b/tests/integration/scenarios/fan_in_logging.py index 82bea6c..e6336a5 100644 --- a/tests/integration/scenarios/fan_in_logging.py +++ b/tests/integration/scenarios/fan_in_logging.py @@ -3,17 +3,17 @@ import asyncio import uuid from uuid import uuid4 -from asyncapi_python.kernel.wire import AbstractWireFactory + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory # Import test models from ..test_app.messages.json import LogEvent - # Generate unique channel ID for this scenario to avoid collisions SCENARIO_CHANNEL_ID = str(uuid4())[:8] @@ -27,8 +27,11 @@ def __init__( wire_factory: AbstractWireFactory, codec_factory: CodecFactory, ): - self.service_name = service_name - super().__init__(wire_factory=wire_factory, codec_factory=codec_factory) + super().__init__( + wire_factory=wire_factory, + codec_factory=codec_factory, + service_name=service_name, + ) self._setup_endpoints() def _setup_endpoints(self): diff --git a/tests/integration/scenarios/fan_out_broadcasting.py b/tests/integration/scenarios/fan_out_broadcasting.py index bd6c15a..16404ee 100644 --- a/tests/integration/scenarios/fan_out_broadcasting.py +++ b/tests/integration/scenarios/fan_out_broadcasting.py @@ -2,17 +2,17 @@ import asyncio from uuid import uuid4 -from asyncapi_python.kernel.wire import AbstractWireFactory + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory # Import test models from ..test_app.messages.json import UserAction - # Generate unique channel ID for this scenario to avoid collisions SCENARIO_CHANNEL_ID = str(uuid4())[:8] @@ -108,8 +108,11 @@ def __init__( wire_factory: AbstractWireFactory, codec_factory: CodecFactory, ): - self.service_name = service_name - super().__init__(wire_factory=wire_factory, codec_factory=codec_factory) + super().__init__( + wire_factory=wire_factory, + codec_factory=codec_factory, + service_name=service_name, + ) self._setup_endpoints() def _setup_endpoints(self): diff --git a/tests/integration/scenarios/malformed_messages.py b/tests/integration/scenarios/malformed_messages.py index 775dbf3..75bca3c 100644 --- a/tests/integration/scenarios/malformed_messages.py +++ b/tests/integration/scenarios/malformed_messages.py @@ -1,16 +1,18 @@ """Malformed message handling scenario""" -import pytest import json -from asyncapi_python.kernel.wire import AbstractWireFactory + +import pytest + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory # Import test models -from ..test_app.messages.json import TestUser, UserCreated, UserUpdated, TestEvent +from ..test_app.messages.json import TestEvent, TestUser, UserCreated, UserUpdated class UserManagementApp(BaseApplication): diff --git a/tests/integration/scenarios/many_to_many_microservices.py b/tests/integration/scenarios/many_to_many_microservices.py index aaefc57..a77c4a7 100644 --- a/tests/integration/scenarios/many_to_many_microservices.py +++ b/tests/integration/scenarios/many_to_many_microservices.py @@ -2,23 +2,23 @@ import asyncio from uuid import uuid4 -from asyncapi_python.kernel.wire import AbstractWireFactory + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory # Import test models from ..test_app.messages.json import ( - UserCreated, - OrderPlaced, - PaymentProcessed, InventoryUpdated, + OrderPlaced, OrderShipped, + PaymentProcessed, + UserCreated, ) - # Generate unique channel ID for this scenario to avoid collisions SCENARIO_CHANNEL_ID = str(uuid4())[:8] diff --git a/tests/integration/scenarios/producer_consumer.py b/tests/integration/scenarios/producer_consumer.py index 667d86e..b007d6f 100644 --- a/tests/integration/scenarios/producer_consumer.py +++ b/tests/integration/scenarios/producer_consumer.py @@ -1,12 +1,14 @@ """Producer->Consumer roundtrip scenario""" import asyncio -from asyncapi_python.kernel.wire import AbstractWireFactory + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory + from ..test_app.messages.json import UserCreated, UserUpdated diff --git a/tests/integration/scenarios/reply_channel.py b/tests/integration/scenarios/reply_channel.py index 118f73b..f47cc64 100644 --- a/tests/integration/scenarios/reply_channel.py +++ b/tests/integration/scenarios/reply_channel.py @@ -1,12 +1,13 @@ """Reply channel creation scenario""" import asyncio -from asyncapi_python.kernel.wire import AbstractWireFactory + +from asyncapi_python.kernel.application import BaseApplication from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.channel import Channel +from asyncapi_python.kernel.document.message import Message from asyncapi_python.kernel.document.operation import Operation -from asyncapi_python.kernel.application import BaseApplication +from asyncapi_python.kernel.wire import AbstractWireFactory # Import test models from ..test_app.messages.json import TestEvent diff --git a/tests/integration/test_wire_codec_scenarios.py b/tests/integration/test_wire_codec_scenarios.py index c59f5a0..140c65f 100644 --- a/tests/integration/test_wire_codec_scenarios.py +++ b/tests/integration/test_wire_codec_scenarios.py @@ -2,28 +2,27 @@ import os from typing import Awaitable, Callable + import pytest -from asyncapi_python.kernel.wire import AbstractWireFactory -from asyncapi_python.kernel.codec import CodecFactory -from asyncapi_python.contrib.wire.in_memory import InMemoryWire -from asyncapi_python.contrib.wire.amqp import AmqpWire from asyncapi_python.contrib.codec.json import JsonCodecFactory +from asyncapi_python.contrib.wire.amqp import AmqpWire +from asyncapi_python.contrib.wire.in_memory import InMemoryWire +from asyncapi_python.kernel.codec import CodecFactory +from asyncapi_python.kernel.wire import AbstractWireFactory +# Import test app module +from . import test_app from .scenarios import ( - producer_consumer_roundtrip, - reply_channel_creation, error_handling, - malformed_message_handling, fan_in_logging, fan_out_broadcasting, + malformed_message_handling, many_to_many_microservices, + producer_consumer_roundtrip, + reply_channel_creation, ) -# Import test app module -from . import test_app - - # Wire implementations IN_MEMORY_WIRE = InMemoryWire() AMQP_WIRE = AmqpWire( diff --git a/tests/kernel/endpoint/test_exception_handling.py b/tests/kernel/endpoint/test_exception_handling.py index f805c4b..5c863c2 100644 --- a/tests/kernel/endpoint/test_exception_handling.py +++ b/tests/kernel/endpoint/test_exception_handling.py @@ -1,15 +1,16 @@ """Unit tests for exception handling in subscriber and RPC server endpoints.""" import asyncio -import pytest -from unittest.mock import Mock, AsyncMock from typing import AsyncGenerator +from unittest.mock import AsyncMock, Mock + +import pytest -from asyncapi_python.kernel.endpoint import Subscriber, RpcServer -from asyncapi_python.kernel.document import Operation, Channel, Message -from asyncapi_python.kernel.wire import AbstractWireFactory from asyncapi_python.kernel.codec import CodecFactory +from asyncapi_python.kernel.document import Channel, Message, Operation +from asyncapi_python.kernel.endpoint import RpcServer, Subscriber from asyncapi_python.kernel.exceptions import Reject +from asyncapi_python.kernel.wire import AbstractWireFactory class MockIncomingMessage: diff --git a/tests/kernel/endpoint/test_handler_enforcement.py b/tests/kernel/endpoint/test_handler_enforcement.py index 335d993..9e9f374 100644 --- a/tests/kernel/endpoint/test_handler_enforcement.py +++ b/tests/kernel/endpoint/test_handler_enforcement.py @@ -1,13 +1,14 @@ """Unit tests for handler enforcement and location tracking in receiving endpoints.""" import asyncio +from unittest.mock import AsyncMock, MagicMock, Mock + import pytest -from unittest.mock import Mock, AsyncMock, MagicMock -from asyncapi_python.kernel.endpoint import Subscriber, RpcServer -from asyncapi_python.kernel.document import Operation, Channel -from asyncapi_python.kernel.wire import AbstractWireFactory from asyncapi_python.kernel.codec import CodecFactory +from asyncapi_python.kernel.document import Channel, Operation +from asyncapi_python.kernel.endpoint import RpcServer, Subscriber +from asyncapi_python.kernel.wire import AbstractWireFactory @pytest.fixture diff --git a/tests/kernel/endpoint/test_rpc_endpoints.py b/tests/kernel/endpoint/test_rpc_endpoints.py index 7cfd8db..f0ab445 100644 --- a/tests/kernel/endpoint/test_rpc_endpoints.py +++ b/tests/kernel/endpoint/test_rpc_endpoints.py @@ -425,7 +425,12 @@ async def cleanup(self) -> None: await self._reply_producer.stop() async def create_consumer( - self, channel, parameters, op_bindings, is_reply: bool + self, + channel, + parameters, + op_bindings, + is_reply: bool, + app_id: str | None = None, ) -> Consumer: consumer = RealisticConsumer(is_reply=is_reply) consumer.set_factory(self) @@ -442,7 +447,12 @@ async def create_consumer( return consumer async def create_producer( - self, channel, parameters, op_bindings, is_reply: bool + self, + channel, + parameters, + op_bindings, + is_reply: bool, + app_id: str | None = None, ) -> Producer: producer = RealisticProducer(is_reply=is_reply) producer.set_factory(self) From b3f4ffa215c9cfba386f1ea24db9564a42db0cf3 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Tue, 21 Oct 2025 08:29:30 +0000 Subject: [PATCH 06/15] Drop service name param from amqp wire --- src/asyncapi_python/contrib/wire/amqp/factory.py | 7 +++---- tests/integration/test_wire_codec_scenarios.py | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/src/asyncapi_python/contrib/wire/amqp/factory.py b/src/asyncapi_python/contrib/wire/amqp/factory.py index e4ce610..d6e51e7 100644 --- a/src/asyncapi_python/contrib/wire/amqp/factory.py +++ b/src/asyncapi_python/contrib/wire/amqp/factory.py @@ -32,7 +32,6 @@ class AmqpWire(AbstractWireFactory[AmqpWireMessage, AmqpIncomingMessage]): def __init__( self, connection_url: str, - service_name: str = "app", robust: bool = False, reconnect_interval: float = 1.0, max_reconnect_interval: float = 60.0, @@ -46,7 +45,6 @@ def __init__( Args: connection_url: AMQP connection URL - service_name: Service name prefix for app_id robust: Enable robust connection with auto-reconnect (default: False) reconnect_interval: Initial reconnect interval in seconds (for robust mode) max_reconnect_interval: Maximum reconnect interval in seconds (for robust mode) @@ -56,9 +54,10 @@ def __init__( on_connection_lost: Callback when connection is lost (for non-robust mode) """ self._connection_url = connection_url - # Generate app_id with service name plus 8 random hex characters + # Generate fallback app_id with random hex characters + # Note: For RPC, app_id should be provided via EndpointParams from application level random_hex = secrets.token_hex(4) # 4 bytes = 8 hex chars - self._app_id = f"{service_name}-{random_hex}" + self._app_id = f"wire-{random_hex}" self._connection: AbstractConnection | None = None self._robust = robust self._reconnect_interval = reconnect_interval diff --git a/tests/integration/test_wire_codec_scenarios.py b/tests/integration/test_wire_codec_scenarios.py index 140c65f..f18bdb7 100644 --- a/tests/integration/test_wire_codec_scenarios.py +++ b/tests/integration/test_wire_codec_scenarios.py @@ -29,7 +29,6 @@ connection_url=os.environ.get( "PYTEST_AMQP_URI", "amqp://guest:guest@localhost:5672/" ), - service_name="test-integration", ) # Codec implementations From 8bd0770a47cbebba4d0ea0492608a92149097e9d Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Tue, 21 Oct 2025 10:18:20 +0000 Subject: [PATCH 07/15] Move service name to endpoint params --- src/asyncapi_python/kernel/application.py | 8 -------- src/asyncapi_python/kernel/endpoint/abc.py | 9 +++------ src/asyncapi_python/kernel/endpoint/rpc_client.py | 7 +++++-- src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py | 9 ++++++--- tests/integration/scenarios/fan_in_logging.py | 5 ++++- tests/integration/scenarios/fan_out_broadcasting.py | 5 ++++- 6 files changed, 22 insertions(+), 21 deletions(-) diff --git a/src/asyncapi_python/kernel/application.py b/src/asyncapi_python/kernel/application.py index 99edf1f..7ad473f 100644 --- a/src/asyncapi_python/kernel/application.py +++ b/src/asyncapi_python/kernel/application.py @@ -15,30 +15,22 @@ class BaseApplication: class Inputs(TypedDict): wire_factory: Required[AbstractWireFactory[Any, Any]] codec_factory: Required[CodecFactory[Any, Any]] - service_name: NotRequired[str] endpoint_params: NotRequired[EndpointParams] def __init__(self, **kwargs: Unpack[Inputs]) -> None: self.__endpoints: set[AbstractEndpoint] = set() self.__wire_factory: AbstractWireFactory[Any, Any] = kwargs["wire_factory"] self.__codec_factory: CodecFactory[Any, Any] = kwargs["codec_factory"] - self.__service_name: str = kwargs.get("service_name", "app") self.__endpoint_params: EndpointParams = kwargs.get("endpoint_params", {}) self._stop_event: asyncio.Event | None = None self._monitor_task: asyncio.Task[None] | None = None self._exception_future: asyncio.Future[Exception] | None = None - @property - def service_name(self) -> str: - """Get the service name for this application""" - return self.__service_name - def _register_endpoint(self, op: Operation) -> AbstractEndpoint: endpoint = EndpointFactory.create( operation=op, wire_factory=self.__wire_factory, codec_factory=self.__codec_factory, - service_name=self.__service_name, endpoint_params=self.__endpoint_params, ) self.__endpoints.add(endpoint) diff --git a/src/asyncapi_python/kernel/endpoint/abc.py b/src/asyncapi_python/kernel/endpoint/abc.py index 855181c..fce37da 100644 --- a/src/asyncapi_python/kernel/endpoint/abc.py +++ b/src/asyncapi_python/kernel/endpoint/abc.py @@ -10,12 +10,11 @@ from ..typing import BatchConfig, Handler, T_Input, T_Output -class EndpointParams(TypedDict): +class EndpointParams(TypedDict, total=False): """Optional parameters for endpoint configuration""" - disable_handler_validation: NotRequired[ - bool - ] # Opt-out of handler enforcement for testing + service_name: str # Service name for generating app_id + disable_handler_validation: bool # Opt-out of handler enforcement for testing class HandlerParams(TypedDict): @@ -31,7 +30,6 @@ class Inputs(TypedDict): operation: Required[Operation] wire_factory: Required[AbstractWireFactory[Any, Any]] codec_factory: Required[CodecFactory[Any, Any]] - service_name: NotRequired[str] # Service name for app_id generation endpoint_params: NotRequired[EndpointParams] # Optional endpoint configuration class StartParams(TypedDict): @@ -43,7 +41,6 @@ class StartParams(TypedDict): def __init__(self, **kwargs: Unpack[Inputs]): self._operation = kwargs["operation"] self._wire = kwargs["wire_factory"] - self._service_name = kwargs.get("service_name", "app") codec_factory = kwargs["codec_factory"] # Endpoint sets its own defaults - empty dict if not provided self._endpoint_params = kwargs.get("endpoint_params", {}) diff --git a/src/asyncapi_python/kernel/endpoint/rpc_client.py b/src/asyncapi_python/kernel/endpoint/rpc_client.py index 76c13d1..49f302c 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_client.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_client.py @@ -45,16 +45,19 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None: # Ensure global reply handling is set up (only happens once) await global_reply_handler.ensure_reply_handler( - self._wire, self._operation, self._service_name + self._wire, self._operation, self._endpoint_params ) + # Extract service_name from endpoint_params for app_id + service_name = self._endpoint_params.get("service_name", "app") + # Create instance-specific producer for sending requests self._producer = await self._wire.create_producer( channel=self._operation.channel, parameters={}, op_bindings=self._operation.bindings, is_reply=False, - app_id=self._service_name, + app_id=service_name, ) # Start producer diff --git a/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py b/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py index 7d0eca8..5067a90 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py @@ -5,7 +5,7 @@ from typing import Any from asyncapi_python.kernel.document import Channel, Operation -from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer +from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer, EndpointParams from ..typing import IncomingMessage @@ -29,16 +29,19 @@ async def ensure_reply_handler( self, wire_factory: AbstractWireFactory[Any, Any], operation: Operation, - service_name: str = "app", + endpoint_params: EndpointParams, ) -> None: """Ensure reply consumer and task are running Args: wire_factory: Wire factory for creating consumer operation: Operation definition - service_name: Service name for generating consistent app_id + endpoint_params: Endpoint parameters including service_name """ if self._reply_consumer is None: + # Extract service_name from endpoint_params + service_name = endpoint_params.get("service_name", "app") + # Generate app_id with service name + random hex (same format as AmqpWire) random_hex = secrets.token_hex(4) # 4 bytes = 8 hex chars app_id = f"{service_name}-{random_hex}" diff --git a/tests/integration/scenarios/fan_in_logging.py b/tests/integration/scenarios/fan_in_logging.py index e6336a5..1079c52 100644 --- a/tests/integration/scenarios/fan_in_logging.py +++ b/tests/integration/scenarios/fan_in_logging.py @@ -27,11 +27,14 @@ def __init__( wire_factory: AbstractWireFactory, codec_factory: CodecFactory, ): + # Pass service_name via endpoint_params + endpoint_params = {"service_name": service_name} super().__init__( wire_factory=wire_factory, codec_factory=codec_factory, - service_name=service_name, + endpoint_params=endpoint_params, ) + self.service_name = service_name # Store for use in _setup_endpoints self._setup_endpoints() def _setup_endpoints(self): diff --git a/tests/integration/scenarios/fan_out_broadcasting.py b/tests/integration/scenarios/fan_out_broadcasting.py index 16404ee..d42389d 100644 --- a/tests/integration/scenarios/fan_out_broadcasting.py +++ b/tests/integration/scenarios/fan_out_broadcasting.py @@ -108,11 +108,14 @@ def __init__( wire_factory: AbstractWireFactory, codec_factory: CodecFactory, ): + # Pass service_name via endpoint_params + endpoint_params = {"service_name": service_name} super().__init__( wire_factory=wire_factory, codec_factory=codec_factory, - service_name=service_name, + endpoint_params=endpoint_params, ) + self.service_name = service_name # Store for use in _setup_endpoints self._setup_endpoints() def _setup_endpoints(self): From 5ee05ec9b811918703c5d6f58470ee8c5c914fd7 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Tue, 21 Oct 2025 11:33:14 +0000 Subject: [PATCH 08/15] Add default timeout --- src/asyncapi_python/kernel/endpoint/abc.py | 3 ++ .../kernel/endpoint/rpc_client.py | 30 ++++++++++++++----- .../kernel/endpoint/rpc_reply_handler.py | 3 +- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/asyncapi_python/kernel/endpoint/abc.py b/src/asyncapi_python/kernel/endpoint/abc.py index fce37da..60d5581 100644 --- a/src/asyncapi_python/kernel/endpoint/abc.py +++ b/src/asyncapi_python/kernel/endpoint/abc.py @@ -14,6 +14,9 @@ class EndpointParams(TypedDict, total=False): """Optional parameters for endpoint configuration""" service_name: str # Service name for generating app_id + default_rpc_timeout: ( + float | None + ) # Default timeout in seconds for RPC client requests (default: 180.0), or None to disable disable_handler_validation: bool # Opt-out of handler enforcement for testing diff --git a/src/asyncapi_python/kernel/endpoint/rpc_client.py b/src/asyncapi_python/kernel/endpoint/rpc_client.py index 49f302c..a87fb5b 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_client.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_client.py @@ -2,7 +2,7 @@ from typing import Generic from uuid import uuid4 -from typing_extensions import Unpack +from typing_extensions import NotRequired, Unpack from asyncapi_python.kernel.wire import Producer @@ -21,6 +21,13 @@ class RpcClient(AbstractEndpoint, Send[T_Input, T_Output], Generic[T_Input, T_Ou a single reply consumer and background task for efficiency. """ + class RouterInputs(Send.RouterInputs): + """Router inputs for RPC client, extending Send.RouterInputs with timeout""" + + timeout: NotRequired[ + float | None + ] # Timeout in seconds for this RPC request, or None to disable timeout + def __init__(self, **kwargs: Unpack[AbstractEndpoint.Inputs]): super().__init__(**kwargs) # Instance-specific state @@ -76,18 +83,17 @@ async def stop(self) -> None: if remaining_count == 0: await global_reply_handler.cleanup_if_last_instance() - async def __call__( - self, - payload: T_Input, - /, - timeout: float = 30.0, - **kwargs: Unpack[Send.RouterInputs], + async def __call__( # type: ignore[override] + self, payload: T_Input, /, **kwargs: Unpack[RouterInputs] ) -> T_Output: """Send an RPC request and wait for response using global reply handling Args: payload: The request payload to send - timeout: Maximum time to wait for response (default 30 seconds) + **kwargs: Router inputs including optional timeout: + - Not provided: uses default_rpc_timeout from endpoint_params (default: 180.0) + - float: uses the specified timeout in seconds + - None: disables timeout (waits indefinitely) Returns: The response payload @@ -99,6 +105,14 @@ async def __call__( if not self._producer: raise UninitializedError() + # Determine timeout: use provided value, or fall back to endpoint_params default + if "timeout" in kwargs: + # Explicitly provided (could be float or None) + timeout = kwargs["timeout"] + else: + # Not provided, use default from endpoint_params + timeout = self._endpoint_params.get("default_rpc_timeout", 180.0) + # Generate correlation ID for this request correlation_id: str = str(uuid4()) diff --git a/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py b/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py index 5067a90..2eb7613 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_reply_handler.py @@ -5,9 +5,10 @@ from typing import Any from asyncapi_python.kernel.document import Channel, Operation -from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer, EndpointParams +from asyncapi_python.kernel.wire import AbstractWireFactory, Consumer from ..typing import IncomingMessage +from .abc import EndpointParams class GlobalRpcReplyHandler: From 28c9034420d7fb49ba83f64b22ff7651ce77d7c2 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Tue, 21 Oct 2025 12:22:31 +0000 Subject: [PATCH 09/15] Update version --- pyproject.toml | 2 +- uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4cc7d3c..03b6c03 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "asyncapi-python" -version = "0.3.0rc2" +version = "0.3.0rc3" license = { text = "Apache-2.0" } description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications." authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }] diff --git a/uv.lock b/uv.lock index 7527af8..d4630f9 100644 --- a/uv.lock +++ b/uv.lock @@ -64,7 +64,7 @@ wheels = [ [[package]] name = "asyncapi-python" -version = "0.3.0rc2" +version = "0.3.0rc3" source = { editable = "." } dependencies = [ { name = "pydantic" }, From 3b8bd412091f289f7562bf79d0b720518ebc6bec Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Tue, 21 Oct 2025 13:14:07 +0000 Subject: [PATCH 10/15] Add endpoint_params to application --- pyproject.toml | 2 +- .../templates/application.py.j2 | 48 +++++++++++++------ uv.lock | 2 +- 3 files changed, 35 insertions(+), 17 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 03b6c03..5cd8a49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "asyncapi-python" -version = "0.3.0rc3" +version = "0.3.0rc4" license = { text = "Apache-2.0" } description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications." authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }] diff --git a/src/asyncapi_python_codegen/templates/application.py.j2 b/src/asyncapi_python_codegen/templates/application.py.j2 index bc9ef38..9b95886 100644 --- a/src/asyncapi_python_codegen/templates/application.py.j2 +++ b/src/asyncapi_python_codegen/templates/application.py.j2 @@ -1,4 +1,5 @@ """Generated AsyncAPI application.""" + from __future__ import annotations from typing import Any @@ -7,6 +8,7 @@ from asyncapi_python.kernel.wire import AbstractWireFactory from asyncapi_python.kernel.codec import CodecFactory from asyncapi_python.contrib.codec.registry import CodecRegistry from asyncapi_python.kernel.endpoint import AbstractEndpoint +from asyncapi_python.kernel.endpoint.abc import EndpointParams from .router import ProducerRouter, ConsumerRouter import sys @@ -14,45 +16,61 @@ import sys class Application(BaseApplication): """{{ app_title }} - {{ app_description }} - + AsyncAPI Version: {{ asyncapi_version }} Application Version: {{ app_version }} """ - - def __init__(self, wire_factory: AbstractWireFactory[Any, Any]): + + def __init__( + self, + wire_factory: AbstractWireFactory[Any, Any], + *, + endpoint_params: EndpointParams | None = None, + ): """Initialize the AsyncAPI application. - + Args: wire_factory: Wire protocol factory for message transport + endpoint_params: Optional endpoint configuration (service_name, default_rpc_timeout, etc.) """ # Use CodecRegistry with current module for message serialization - current_module = sys.modules[self.__module__.rsplit('.', 1)[0]] + current_module = sys.modules[self.__module__.rsplit(".", 1)[0]] codec_factory = CodecRegistry(current_module) - - super().__init__(wire_factory=wire_factory, codec_factory=codec_factory) - + + # Pass endpoint_params to BaseApplication if provided + if endpoint_params is not None: + super().__init__( + wire_factory=wire_factory, + codec_factory=codec_factory, + endpoint_params=endpoint_params, + ) + else: + super().__init__(wire_factory=wire_factory, codec_factory=codec_factory) + # Initialize semantic routers with factories self.producer = ProducerRouter(wire_factory, codec_factory) self.consumer = ConsumerRouter(wire_factory, codec_factory) - + # Register all endpoints from routers self._register_router_endpoints(self.producer) self._register_router_endpoints(self.consumer) - + def _register_router_endpoints(self, router: object) -> None: """Recursively register all endpoints from router tree. - + Args: router: Router object to scan for endpoints """ if isinstance(router, AbstractEndpoint): # This router is an endpoint - register it directly self._add_endpoint(router) - elif hasattr(router, '__dict__'): + elif hasattr(router, "__dict__"): # This router aggregates others - recurse through attributes for attr_name in dir(router): - if not attr_name.startswith('_'): + if not attr_name.startswith("_"): attr = getattr(router, attr_name, None) # Check if it's a router-like object (has __dict__ or is an endpoint) - if attr is not None and (isinstance(attr, AbstractEndpoint) or hasattr(attr, '__dict__')): - self._register_router_endpoints(attr) \ No newline at end of file + if attr is not None and ( + isinstance(attr, AbstractEndpoint) or hasattr(attr, "__dict__") + ): + self._register_router_endpoints(attr) diff --git a/uv.lock b/uv.lock index d4630f9..de4e48d 100644 --- a/uv.lock +++ b/uv.lock @@ -64,7 +64,7 @@ wheels = [ [[package]] name = "asyncapi-python" -version = "0.3.0rc3" +version = "0.3.0rc4" source = { editable = "." } dependencies = [ { name = "pydantic" }, From 64dbadf3fa41b2ce13f08f217334d831c2237e47 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Wed, 22 Oct 2025 11:06:59 +0000 Subject: [PATCH 11/15] Add test for recursive spec --- .../codegen/specs/deep_recursion/level1.yaml | 24 ++++++++++++ .../codegen/specs/deep_recursion/level2.yaml | 22 +++++++++++ .../codegen/specs/deep_recursion/level3.yaml | 28 ++++++++++++++ .../codegen/specs/deep_recursion/level4.yaml | 21 +++++++++++ tests/codegen/test_parser.py | 37 +++++++++++++++++++ 5 files changed, 132 insertions(+) create mode 100644 tests/codegen/specs/deep_recursion/level1.yaml create mode 100644 tests/codegen/specs/deep_recursion/level2.yaml create mode 100644 tests/codegen/specs/deep_recursion/level3.yaml create mode 100644 tests/codegen/specs/deep_recursion/level4.yaml diff --git a/tests/codegen/specs/deep_recursion/level1.yaml b/tests/codegen/specs/deep_recursion/level1.yaml new file mode 100644 index 0000000..15fc844 --- /dev/null +++ b/tests/codegen/specs/deep_recursion/level1.yaml @@ -0,0 +1,24 @@ +# Level 1: Main entry point +asyncapi: "3.0.0" +info: + title: Deep Recursion Test - Level 1 + version: 1.0.0 + description: Main file that starts the 4-level reference chain + +operations: + process.data: + action: send + channel: + $ref: "level2.yaml#/channels/data_channel" + +components: + schemas: + Level1Schema: + type: object + properties: + level: + type: integer + const: 1 + message: + type: string + const: "from_level_1" diff --git a/tests/codegen/specs/deep_recursion/level2.yaml b/tests/codegen/specs/deep_recursion/level2.yaml new file mode 100644 index 0000000..b5a0415 --- /dev/null +++ b/tests/codegen/specs/deep_recursion/level2.yaml @@ -0,0 +1,22 @@ +# Level 2: References Level 3 +channels: + data_channel: + address: data.queue + title: Data Channel from Level 2 + messages: + data_message: + $ref: "level3.yaml#/components/messages/DataMessage" + +components: + schemas: + Level2Schema: + type: object + properties: + level: + type: integer + const: 2 + message: + type: string + const: "from_level_2" + level1_ref: + $ref: "level1.yaml#/components/schemas/Level1Schema" diff --git a/tests/codegen/specs/deep_recursion/level3.yaml b/tests/codegen/specs/deep_recursion/level3.yaml new file mode 100644 index 0000000..49135fb --- /dev/null +++ b/tests/codegen/specs/deep_recursion/level3.yaml @@ -0,0 +1,28 @@ +# Level 3: References Level 4 +components: + messages: + DataMessage: + title: Data Message from Level 3 + payload: + type: object + properties: + id: + type: string + level3_data: + type: string + const: "from_level_3" + deep_schema: + $ref: "level4.yaml#/components/schemas/Level4Schema" + + schemas: + Level3Schema: + type: object + properties: + level: + type: integer + const: 3 + message: + type: string + const: "from_level_3" + level2_ref: + $ref: "level2.yaml#/components/schemas/Level2Schema" diff --git a/tests/codegen/specs/deep_recursion/level4.yaml b/tests/codegen/specs/deep_recursion/level4.yaml new file mode 100644 index 0000000..7c79fb4 --- /dev/null +++ b/tests/codegen/specs/deep_recursion/level4.yaml @@ -0,0 +1,21 @@ +# Level 4: Deepest level - no more references +components: + schemas: + Level4Schema: + type: object + properties: + level: + type: integer + const: 4 + message: + type: string + const: "from_level_4_deepest" + metadata: + type: object + properties: + depth: + type: integer + const: 4 + status: + type: string + enum: ["deep", "deeper", "deepest"] diff --git a/tests/codegen/test_parser.py b/tests/codegen/test_parser.py index 6651253..2473e80 100644 --- a/tests/codegen/test_parser.py +++ b/tests/codegen/test_parser.py @@ -264,3 +264,40 @@ def test_invalid_yaml_structure(): extract_all_operations(invalid_yaml) finally: invalid_yaml.unlink(missing_ok=True) + + +def test_four_level_deep_recursion(): + """Test 4-level deep file reference chain: Level1->Level2->Level3->Level4.""" + spec_path = Path("tests/codegen/specs/deep_recursion/level1.yaml") + operations = extract_all_operations(spec_path) + + assert len(operations) == 1 + + # Verify Level 1 -> Level 2 reference + process_data = operations["process.data"] + assert process_data.channel.address == "data.queue" + assert process_data.channel.title == "Data Channel from Level 2" + + # Verify Level 2 -> Level 3 reference (message) + data_message = process_data.channel.messages["data_message"] + assert data_message.title == "Data Message from Level 3" + assert isinstance(data_message.payload, dict) + + # Verify Level 3 -> Level 4 reference (deep schema in payload) + payload = data_message.payload + assert "id" in payload["properties"] + assert "level3_data" in payload["properties"] + assert payload["properties"]["level3_data"]["const"] == "from_level_3" + assert "deep_schema" in payload["properties"] + + # Verify Level 4 schema was resolved (deepest level) + deep_schema_ref = payload["properties"]["deep_schema"] + # After resolution, the $ref should point to unified schema + # Or if already resolved, check the schema structure + if "$ref" in deep_schema_ref: + # Reference should be normalized + assert "#/$defs/" in deep_schema_ref["$ref"] or "level4.yaml" in deep_schema_ref["$ref"] + else: + # If already resolved inline, verify structure + assert deep_schema_ref["properties"]["level"]["const"] == 4 + assert deep_schema_ref["properties"]["message"]["const"] == "from_level_4_deepest" From aecf226c10a99e8246147931a6ec17bbd09d70f0 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Wed, 22 Oct 2025 11:18:40 +0000 Subject: [PATCH 12/15] Fix recursion capability for schema parsing --- .../generators/messages.py | 131 ++++++++++++++---- tests/codegen/test_parser.py | 53 ++++--- 2 files changed, 128 insertions(+), 56 deletions(-) diff --git a/src/asyncapi_python_codegen/generators/messages.py b/src/asyncapi_python_codegen/generators/messages.py index 282f298..0e48f8c 100644 --- a/src/asyncapi_python_codegen/generators/messages.py +++ b/src/asyncapi_python_codegen/generators/messages.py @@ -10,6 +10,7 @@ from datamodel_code_generator.__main__ import main as datamodel_codegen from asyncapi_python.kernel.document import Operation +from asyncapi_python_codegen.parser.types import ParseContext, navigate_json_pointer class MessageGenerator: @@ -67,35 +68,100 @@ def _collect_message_schemas( return schemas # type: ignore[return-value] def _load_component_schemas(self, spec_path: Path) -> dict[str, Any]: - """Load component schemas from the AsyncAPI specification file.""" - try: - with spec_path.open("r") as f: - spec = yaml.safe_load(f) + """Load component schemas from the AsyncAPI specification file and all referenced files.""" + all_schemas: dict[str, Any] = {} + visited_files: set[Path] = set() - components = spec.get("components", {}) - schemas = components.get("schemas", {}) - messages = components.get("messages", {}) + def load_schemas_from_file(file_path: Path) -> None: + """Recursively load schemas from a file and its references.""" + abs_path = file_path.absolute() - # Combine schemas and message payloads - all_schemas = {} + # Avoid infinite loops + if abs_path in visited_files: + return + visited_files.add(abs_path) - # Add component schemas directly - for schema_name, schema_def in schemas.items(): - all_schemas[schema_name] = schema_def + try: + with abs_path.open("r") as f: + spec = yaml.safe_load(f) - # Add message payloads from components (only if not already present from schemas) - for msg_name, msg_def in messages.items(): - if isinstance(msg_def, dict) and "payload" in msg_def: - schema_name = self._to_pascal_case(msg_name) - # Only add if we don't already have this schema from the schemas section + components = spec.get("components", {}) + schemas = components.get("schemas", {}) + messages = components.get("messages", {}) + + # Add component schemas directly + for schema_name, schema_def in schemas.items(): if schema_name not in all_schemas: - all_schemas[schema_name] = msg_def["payload"] + # Check if this schema is itself a reference + if isinstance(schema_def, dict) and "$ref" in schema_def: + ref = schema_def["$ref"] + # Resolve the reference using ParseContext utilities + try: + context = ParseContext(abs_path) + target_context = context.resolve_reference(ref) + + # Load and navigate to the referenced schema + with target_context.filepath.open("r") as ref_file: + ref_spec = yaml.safe_load(ref_file) + + if target_context.json_pointer: + resolved_schema = navigate_json_pointer(ref_spec, target_context.json_pointer) + else: + resolved_schema = ref_spec + + all_schemas[schema_name] = resolved_schema + except Exception as e: + print(f"Warning: Could not resolve reference {ref} in {abs_path}: {e}") + all_schemas[schema_name] = schema_def + else: + all_schemas[schema_name] = schema_def + + # Add message payloads from components + for msg_name, msg_def in messages.items(): + if isinstance(msg_def, dict) and "payload" in msg_def: + schema_name = self._to_pascal_case(msg_name) + if schema_name not in all_schemas: + all_schemas[schema_name] = msg_def["payload"] + + # Find and process all external file references + self._find_and_process_refs(spec, abs_path.parent, load_schemas_from_file) + + except Exception as e: + print(f"Warning: Could not load component schemas from {abs_path}: {e}") + + # Start loading from the main spec file + load_schemas_from_file(spec_path) + + return all_schemas # type: ignore[return-value] + + def _find_and_process_refs( + self, data: Any, base_dir: Path, process_file: Any + ) -> None: + """Recursively find all $ref entries pointing to external files.""" + if isinstance(data, dict): + # Check if this is a reference + if "$ref" in data: + ref = data["$ref"] + if isinstance(ref, str) and not ref.startswith("#"): + # External reference - extract file path + if "#" in ref: + file_part = ref.split("#")[0] + else: + file_part = ref + + if file_part: + # Resolve relative path + ref_path = (base_dir / file_part).resolve() + process_file(ref_path) - return all_schemas # type: ignore[return-value] + # Recurse into all dict values + for value in data.values(): + self._find_and_process_refs(value, base_dir, process_file) - except Exception as e: - print(f"Warning: Could not load component schemas from {spec_path}: {e}") - return {} + elif isinstance(data, list): + # Recurse into all list items + for item in data: + self._find_and_process_refs(item, base_dir, process_file) def _resolve_references(self, schemas: dict[str, Any]) -> dict[str, Any]: """Recursively resolve $ref references to use #/$defs/... instead of #/components/schemas/...""" @@ -105,17 +171,24 @@ def resolve_in_object(obj: Any) -> Any: resolved_obj: dict[str, Any] = {} for key, value in obj.items(): # type: ignore[misc] if key == "$ref" and isinstance(value, str): - # Transform references from #/components/schemas/... to #/$defs/... - if value.startswith("#/components/schemas/"): - schema_name = value.split("/")[-1] + # Extract schema name from the reference + schema_name = value.split("/")[-1] + + # Transform all component references to #/$defs/... + if "#/components/schemas/" in value: + # Internal or external schema reference resolved_obj[key] = f"#/$defs/{schema_name}" - elif value.startswith("#/components/messages/"): + elif "#/components/messages/" in value: # Handle message references - convert message name to PascalCase - msg_name = value.split("/")[-1] - schema_name = self._to_pascal_case(msg_name) + schema_name = self._to_pascal_case(schema_name) resolved_obj[key] = f"#/$defs/{schema_name}" - else: + elif value.startswith("#"): + # Other internal references, keep as-is resolved_obj[key] = value + else: + # External file reference (e.g., "./commons2.yaml#/components/schemas/Foo") + # Extract just the schema name and point to #/$defs + resolved_obj[key] = f"#/$defs/{schema_name}" else: resolved_obj[key] = resolve_in_object(value) return resolved_obj diff --git a/tests/codegen/test_parser.py b/tests/codegen/test_parser.py index 2473e80..3381e49 100644 --- a/tests/codegen/test_parser.py +++ b/tests/codegen/test_parser.py @@ -267,37 +267,36 @@ def test_invalid_yaml_structure(): def test_four_level_deep_recursion(): - """Test 4-level deep file reference chain: Level1->Level2->Level3->Level4.""" + """Test 4-level deep file reference chain: Level1->Level2->Level3->Level4. + + This test verifies that the MessageGenerator recursively collects component schemas + from all referenced files, not just the main spec file. + """ + from src.asyncapi_python_codegen.generators.messages import MessageGenerator + spec_path = Path("tests/codegen/specs/deep_recursion/level1.yaml") - operations = extract_all_operations(spec_path) + # Test that MessageGenerator collects schemas from all 4 levels + generator = MessageGenerator() + schemas = generator._load_component_schemas(spec_path) + + # Without recursive file loading, we would only get Level1Schema + # With recursive loading, we should get schemas from all 4 files + assert "Level1Schema" in schemas, "Level1Schema from main file not found" + assert "Level2Schema" in schemas, "Level2Schema from level2.yaml not found (recursive loading failed)" + assert "Level3Schema" in schemas, "Level3Schema from level3.yaml not found (recursive loading failed)" + assert "Level4Schema" in schemas, "Level4Schema from level4.yaml not found (recursive loading failed)" + assert "DataMessage" in schemas, "DataMessage from level3.yaml not found" + + # Verify the deepest level schema has correct structure + level4_schema = schemas["Level4Schema"] + assert level4_schema["properties"]["level"]["const"] == 4 + assert level4_schema["properties"]["message"]["const"] == "from_level_4_deepest" + + # Also verify operations can be extracted (tests parser, not generator) + operations = extract_all_operations(spec_path) assert len(operations) == 1 - # Verify Level 1 -> Level 2 reference process_data = operations["process.data"] assert process_data.channel.address == "data.queue" assert process_data.channel.title == "Data Channel from Level 2" - - # Verify Level 2 -> Level 3 reference (message) - data_message = process_data.channel.messages["data_message"] - assert data_message.title == "Data Message from Level 3" - assert isinstance(data_message.payload, dict) - - # Verify Level 3 -> Level 4 reference (deep schema in payload) - payload = data_message.payload - assert "id" in payload["properties"] - assert "level3_data" in payload["properties"] - assert payload["properties"]["level3_data"]["const"] == "from_level_3" - assert "deep_schema" in payload["properties"] - - # Verify Level 4 schema was resolved (deepest level) - deep_schema_ref = payload["properties"]["deep_schema"] - # After resolution, the $ref should point to unified schema - # Or if already resolved, check the schema structure - if "$ref" in deep_schema_ref: - # Reference should be normalized - assert "#/$defs/" in deep_schema_ref["$ref"] or "level4.yaml" in deep_schema_ref["$ref"] - else: - # If already resolved inline, verify structure - assert deep_schema_ref["properties"]["level"]["const"] == 4 - assert deep_schema_ref["properties"]["message"]["const"] == "from_level_4_deepest" From 25b616704aeaf3fcd9210dcff590a3c19e7d422f Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Wed, 22 Oct 2025 11:20:57 +0000 Subject: [PATCH 13/15] Increment rc version --- pyproject.toml | 2 +- uv.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5cd8a49..45aad4c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "asyncapi-python" -version = "0.3.0rc4" +version = "0.3.0rc5" license = { text = "Apache-2.0" } description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications." authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }] diff --git a/uv.lock b/uv.lock index de4e48d..bc468fa 100644 --- a/uv.lock +++ b/uv.lock @@ -64,7 +64,7 @@ wheels = [ [[package]] name = "asyncapi-python" -version = "0.3.0rc4" +version = "0.3.0rc5" source = { editable = "." } dependencies = [ { name = "pydantic" }, From e1328b6343342010937e57a026de0906850a851f Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Wed, 22 Oct 2025 11:28:11 +0000 Subject: [PATCH 14/15] Run lints --- .../generators/messages.py | 62 +++++++++++-------- tests/codegen/test_parser.py | 12 +++- 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/src/asyncapi_python_codegen/generators/messages.py b/src/asyncapi_python_codegen/generators/messages.py index 0e48f8c..efd684a 100644 --- a/src/asyncapi_python_codegen/generators/messages.py +++ b/src/asyncapi_python_codegen/generators/messages.py @@ -94,25 +94,32 @@ def load_schemas_from_file(file_path: Path) -> None: if schema_name not in all_schemas: # Check if this schema is itself a reference if isinstance(schema_def, dict) and "$ref" in schema_def: - ref = schema_def["$ref"] + ref_value: Any = schema_def["$ref"] # type: ignore[misc] # Resolve the reference using ParseContext utilities - try: - context = ParseContext(abs_path) - target_context = context.resolve_reference(ref) - - # Load and navigate to the referenced schema - with target_context.filepath.open("r") as ref_file: - ref_spec = yaml.safe_load(ref_file) - - if target_context.json_pointer: - resolved_schema = navigate_json_pointer(ref_spec, target_context.json_pointer) - else: - resolved_schema = ref_spec - - all_schemas[schema_name] = resolved_schema - except Exception as e: - print(f"Warning: Could not resolve reference {ref} in {abs_path}: {e}") - all_schemas[schema_name] = schema_def + if isinstance(ref_value, str): + try: + context = ParseContext(abs_path) + target_context = context.resolve_reference( + ref_value + ) + + # Load and navigate to the referenced schema + with target_context.filepath.open("r") as ref_file: + ref_spec = yaml.safe_load(ref_file) + + if target_context.json_pointer: + resolved_schema = navigate_json_pointer( + ref_spec, target_context.json_pointer + ) + else: + resolved_schema = ref_spec + + all_schemas[schema_name] = resolved_schema + except Exception as e: + print( + f"Warning: Could not resolve reference {ref_value} in {abs_path}: {e}" + ) + all_schemas[schema_name] = schema_def else: all_schemas[schema_name] = schema_def @@ -124,7 +131,9 @@ def load_schemas_from_file(file_path: Path) -> None: all_schemas[schema_name] = msg_def["payload"] # Find and process all external file references - self._find_and_process_refs(spec, abs_path.parent, load_schemas_from_file) + self._find_and_process_refs( + spec, abs_path.parent, load_schemas_from_file + ) except Exception as e: print(f"Warning: Could not load component schemas from {abs_path}: {e}") @@ -141,13 +150,14 @@ def _find_and_process_refs( if isinstance(data, dict): # Check if this is a reference if "$ref" in data: - ref = data["$ref"] - if isinstance(ref, str) and not ref.startswith("#"): + ref_value: Any = data["$ref"] # type: ignore[misc] + if isinstance(ref_value, str) and not ref_value.startswith("#"): # External reference - extract file path - if "#" in ref: - file_part = ref.split("#")[0] + file_part: str + if "#" in ref_value: + file_part = ref_value.split("#")[0] else: - file_part = ref + file_part = ref_value if file_part: # Resolve relative path @@ -155,12 +165,12 @@ def _find_and_process_refs( process_file(ref_path) # Recurse into all dict values - for value in data.values(): + for value in data.values(): # type: ignore[misc] self._find_and_process_refs(value, base_dir, process_file) elif isinstance(data, list): # Recurse into all list items - for item in data: + for item in data: # type: ignore[misc] self._find_and_process_refs(item, base_dir, process_file) def _resolve_references(self, schemas: dict[str, Any]) -> dict[str, Any]: diff --git a/tests/codegen/test_parser.py b/tests/codegen/test_parser.py index 3381e49..c9165e9 100644 --- a/tests/codegen/test_parser.py +++ b/tests/codegen/test_parser.py @@ -283,9 +283,15 @@ def test_four_level_deep_recursion(): # Without recursive file loading, we would only get Level1Schema # With recursive loading, we should get schemas from all 4 files assert "Level1Schema" in schemas, "Level1Schema from main file not found" - assert "Level2Schema" in schemas, "Level2Schema from level2.yaml not found (recursive loading failed)" - assert "Level3Schema" in schemas, "Level3Schema from level3.yaml not found (recursive loading failed)" - assert "Level4Schema" in schemas, "Level4Schema from level4.yaml not found (recursive loading failed)" + assert ( + "Level2Schema" in schemas + ), "Level2Schema from level2.yaml not found (recursive loading failed)" + assert ( + "Level3Schema" in schemas + ), "Level3Schema from level3.yaml not found (recursive loading failed)" + assert ( + "Level4Schema" in schemas + ), "Level4Schema from level4.yaml not found (recursive loading failed)" assert "DataMessage" in schemas, "DataMessage from level3.yaml not found" # Verify the deepest level schema has correct structure From eb849945226f1b67ed115b9a14f96033d3ac805e Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Wed, 22 Oct 2025 12:36:36 +0000 Subject: [PATCH 15/15] Add __init__ for generated messages module --- src/asyncapi_python_codegen/generators/main.py | 7 +++++++ src/asyncapi_python_codegen/templates/messages_init.py.j2 | 1 + 2 files changed, 8 insertions(+) create mode 100644 src/asyncapi_python_codegen/templates/messages_init.py.j2 diff --git a/src/asyncapi_python_codegen/generators/main.py b/src/asyncapi_python_codegen/generators/main.py index 861d3d0..3e1c580 100644 --- a/src/asyncapi_python_codegen/generators/main.py +++ b/src/asyncapi_python_codegen/generators/main.py @@ -105,6 +105,13 @@ def generate(self, spec_path: Path, output_dir: Path, force: bool = False) -> No "application.py.j2", output_dir / "application.py", context ) + # Generate messages/__init__.py for module structure + messages_dir = output_dir / "messages" + messages_dir.mkdir(parents=True, exist_ok=True) + self.template_renderer.render_file( + "messages_init.py.j2", messages_dir / "__init__.py", context + ) + # Generate messages/json/__init__.py using datamodel-code-generator messages_json_dir = output_dir / "messages" / "json" messages_json_dir.mkdir(parents=True, exist_ok=True) diff --git a/src/asyncapi_python_codegen/templates/messages_init.py.j2 b/src/asyncapi_python_codegen/templates/messages_init.py.j2 new file mode 100644 index 0000000..9376955 --- /dev/null +++ b/src/asyncapi_python_codegen/templates/messages_init.py.j2 @@ -0,0 +1 @@ +"""Message models for {{ app_title }}."""