diff --git a/src/asyncapi_python/contrib/wire/amqp/config.py b/src/asyncapi_python/contrib/wire/amqp/config.py index e1ab25b..3a62f78 100644 --- a/src/asyncapi_python/contrib/wire/amqp/config.py +++ b/src/asyncapi_python/contrib/wire/amqp/config.py @@ -25,6 +25,7 @@ class AmqpConfig: binding_type: AmqpBindingType = AmqpBindingType.QUEUE queue_properties: dict[str, Any] = field(default_factory=lambda: {}) binding_arguments: dict[str, Any] = field(default_factory=lambda: {}) + arguments: dict[str, Any] = field(default_factory=lambda: {}) def to_producer_args(self) -> dict[str, Any]: """Convert to AmqpProducer constructor arguments""" @@ -34,6 +35,7 @@ def to_producer_args(self) -> dict[str, Any]: "exchange_type": self.exchange_type, "routing_key": self.routing_key, "queue_properties": self.queue_properties, + "arguments": self.arguments, } def to_consumer_args(self) -> dict[str, Any]: @@ -46,4 +48,5 @@ def to_consumer_args(self) -> dict[str, Any]: "binding_type": self.binding_type, "queue_properties": self.queue_properties, "binding_arguments": self.binding_arguments, + "arguments": self.arguments, } diff --git a/src/asyncapi_python/contrib/wire/amqp/consumer.py b/src/asyncapi_python/contrib/wire/amqp/consumer.py index 1530644..3569473 100644 --- a/src/asyncapi_python/contrib/wire/amqp/consumer.py +++ b/src/asyncapi_python/contrib/wire/amqp/consumer.py @@ -36,6 +36,7 @@ def __init__( binding_type: AmqpBindingType = AmqpBindingType.QUEUE, queue_properties: dict[str, Any] | None = None, binding_arguments: dict[str, Any] | None = None, + arguments: dict[str, Any] | None = None, ): self._connection = connection self._queue_name = queue_name @@ -45,6 +46,7 @@ def __init__( self._binding_type = binding_type self._queue_properties = queue_properties or {} self._binding_arguments = binding_arguments or {} + self._arguments = arguments or {} self._channel: AbstractChannel | None = None self._queue: AbstractQueue | None = None self._exchange: AbstractExchange | None = None @@ -67,6 +69,7 @@ async def start(self) -> None: durable=self._queue_properties.get("durable", True), exclusive=self._queue_properties.get("exclusive", False), auto_delete=self._queue_properties.get("auto_delete", False), + arguments=self._arguments, ) # Simple queue binding pattern (default exchange) @@ -76,6 +79,7 @@ async def start(self) -> None: durable=self._queue_properties.get("durable", True), exclusive=self._queue_properties.get("exclusive", False), auto_delete=self._queue_properties.get("auto_delete", False), + arguments=self._arguments, ) # Routing key binding pattern (pub/sub with named exchange) @@ -87,24 +91,28 @@ async def start(self) -> None: name=self._exchange_name, type=ExchangeType.DIRECT, durable=True, + arguments=self._arguments, ) case "topic": self._exchange = await self._channel.declare_exchange( name=self._exchange_name, type=ExchangeType.TOPIC, durable=True, + arguments=self._arguments, ) case "fanout": self._exchange = await self._channel.declare_exchange( name=self._exchange_name, type=ExchangeType.FANOUT, durable=True, + arguments=self._arguments, ) case "headers": self._exchange = await self._channel.declare_exchange( name=self._exchange_name, type=ExchangeType.HEADERS, durable=True, + arguments=self._arguments, ) case unknown_type: raise ValueError(f"Unsupported exchange type: {unknown_type}") @@ -115,6 +123,7 @@ async def start(self) -> None: durable=self._queue_properties.get("durable", False), exclusive=self._queue_properties.get("exclusive", True), auto_delete=self._queue_properties.get("auto_delete", True), + arguments=self._arguments, ) # Bind queue to exchange with routing key @@ -129,24 +138,28 @@ async def start(self) -> None: name=self._exchange_name, type=ExchangeType.FANOUT, durable=True, + arguments=self._arguments, ) case "headers": self._exchange = await self._channel.declare_exchange( name=self._exchange_name, type=ExchangeType.HEADERS, durable=True, + arguments=self._arguments, ) case "topic": self._exchange = await self._channel.declare_exchange( name=self._exchange_name, type=ExchangeType.TOPIC, durable=True, + arguments=self._arguments, ) case "direct": self._exchange = await self._channel.declare_exchange( name=self._exchange_name, type=ExchangeType.DIRECT, durable=True, + arguments=self._arguments, ) case unknown_type: raise ValueError(f"Unsupported exchange type: {unknown_type}") @@ -157,6 +170,7 @@ async def start(self) -> None: durable=self._queue_properties.get("durable", False), exclusive=self._queue_properties.get("exclusive", True), auto_delete=self._queue_properties.get("auto_delete", True), + arguments=self._arguments, ) # Bind queue to exchange with binding arguments (for headers exchange) diff --git a/src/asyncapi_python/contrib/wire/amqp/producer.py b/src/asyncapi_python/contrib/wire/amqp/producer.py index 82da03c..e12fa9e 100644 --- a/src/asyncapi_python/contrib/wire/amqp/producer.py +++ b/src/asyncapi_python/contrib/wire/amqp/producer.py @@ -31,6 +31,7 @@ def __init__( exchange_type: str = "direct", routing_key: str = "", queue_properties: dict[str, Any] | None = None, + arguments: dict[str, Any] | None = None, ): self._connection = connection self._queue_name = queue_name @@ -38,6 +39,7 @@ def __init__( self._exchange_type = exchange_type self._routing_key = routing_key self._queue_properties = queue_properties or {} + self._arguments = arguments or {} self._channel: AbstractChannel | None = None self._target_exchange: AbstractExchange | None = None self._started = False @@ -61,27 +63,40 @@ async def start(self) -> None: durable=self._queue_properties.get("durable", True), exclusive=self._queue_properties.get("exclusive", False), auto_delete=self._queue_properties.get("auto_delete", False), + arguments=self._arguments, ) # Named exchange patterns case (exchange_name, "direct"): self._target_exchange = await self._channel.declare_exchange( - name=exchange_name, type=ExchangeType.DIRECT, durable=True + name=exchange_name, + type=ExchangeType.DIRECT, + durable=True, + arguments=self._arguments, ) case (exchange_name, "topic"): self._target_exchange = await self._channel.declare_exchange( - name=exchange_name, type=ExchangeType.TOPIC, durable=True + name=exchange_name, + type=ExchangeType.TOPIC, + durable=True, + arguments=self._arguments, ) case (exchange_name, "fanout"): self._target_exchange = await self._channel.declare_exchange( - name=exchange_name, type=ExchangeType.FANOUT, durable=True + name=exchange_name, + type=ExchangeType.FANOUT, + durable=True, + arguments=self._arguments, ) case (exchange_name, "headers"): self._target_exchange = await self._channel.declare_exchange( - name=exchange_name, type=ExchangeType.HEADERS, durable=True + name=exchange_name, + type=ExchangeType.HEADERS, + durable=True, + arguments=self._arguments, ) case (exchange_name, unknown_type): diff --git a/src/asyncapi_python/contrib/wire/amqp/resolver.py b/src/asyncapi_python/contrib/wire/amqp/resolver.py index 9f58255..c9d54ae 100644 --- a/src/asyncapi_python/contrib/wire/amqp/resolver.py +++ b/src/asyncapi_python/contrib/wire/amqp/resolver.py @@ -116,6 +116,7 @@ def resolve_amqp_config( "exclusive": True, "auto_delete": True, }, + arguments={}, ) # Reply channel with explicit address - check if direct queue or topic exchange @@ -133,6 +134,7 @@ def resolve_amqp_config( "exclusive": True, "auto_delete": True, }, + arguments={}, ) else: # Topic-based reply pattern - shared exchange with filtering @@ -143,6 +145,7 @@ def resolve_amqp_config( routing_key=app_id, # Filter messages by app_id binding_type=AmqpBindingType.REPLY, queue_properties={"durable": True, "exclusive": False}, + arguments={}, ) # Reply channel with binding - defer to binding resolution @@ -192,6 +195,7 @@ def resolve_amqp_config( routing_key=resolved_address, binding_type=AmqpBindingType.QUEUE, queue_properties={"durable": True, "exclusive": False}, + arguments={}, ) # Operation name pattern (fallback) @@ -204,6 +208,7 @@ def resolve_amqp_config( routing_key=op_name, binding_type=AmqpBindingType.QUEUE, queue_properties={"durable": True, "exclusive": False}, + arguments={}, ) # No match - reject creation @@ -245,6 +250,7 @@ def resolve_queue_binding( # Extract queue properties queue_config = getattr(binding, "queue", None) queue_properties = {"durable": True, "exclusive": False} # Defaults + arguments: dict[str, Any] = {} if queue_config: if hasattr(queue_config, "durable"): queue_properties["durable"] = queue_config.durable @@ -252,6 +258,8 @@ def resolve_queue_binding( queue_properties["exclusive"] = queue_config.exclusive if hasattr(queue_config, "auto_delete"): queue_properties["auto_delete"] = queue_config.auto_delete + if hasattr(queue_config, "arguments") and queue_config.arguments: + arguments = queue_config.arguments return AmqpConfig( queue_name=queue_name, @@ -259,6 +267,7 @@ def resolve_queue_binding( routing_key=queue_name, # For default exchange, routing_key = queue_name binding_type=AmqpBindingType.QUEUE, queue_properties=queue_properties, + arguments=arguments, ) @@ -303,6 +312,15 @@ def resolve_routing_key_binding( if exchange_config and hasattr(exchange_config, "type"): exchange_type = exchange_config.type + # Extract exchange arguments + arguments: dict[str, Any] = {} + if ( + exchange_config + and hasattr(exchange_config, "arguments") + and exchange_config.arguments + ): + arguments = exchange_config.arguments + # Determine routing key - this is where wildcards are allowed match (getattr(binding, "routingKey", None), channel.address, operation_name): case (routing_key, _, _) if routing_key: @@ -327,6 +345,7 @@ def resolve_routing_key_binding( routing_key=resolved_routing_key, binding_type=AmqpBindingType.ROUTING_KEY, queue_properties={"durable": False, "exclusive": True, "auto_delete": True}, + arguments=arguments, ) @@ -366,6 +385,15 @@ def resolve_exchange_binding( if exchange_config and hasattr(exchange_config, "type"): exchange_type = exchange_config.type + # Extract exchange arguments + arguments: dict[str, Any] = {} + if ( + exchange_config + and hasattr(exchange_config, "arguments") + and exchange_config.arguments + ): + arguments = exchange_config.arguments + # Extract binding arguments for headers exchange from dataclass binding_args: dict[str, Any] = {} # Note: bindingKeys is not part of AmqpChannelBinding spec @@ -379,4 +407,5 @@ def resolve_exchange_binding( binding_type=AmqpBindingType.EXCHANGE, queue_properties={"durable": False, "exclusive": True, "auto_delete": True}, binding_arguments=binding_args, + arguments=arguments, ) diff --git a/src/asyncapi_python/kernel/document/bindings.py b/src/asyncapi_python/kernel/document/bindings.py index ba5abbf..97b6f92 100644 --- a/src/asyncapi_python/kernel/document/bindings.py +++ b/src/asyncapi_python/kernel/document/bindings.py @@ -26,13 +26,14 @@ class AmqpExchange: durable: Optional[bool] = None auto_delete: Optional[bool] = None vhost: Optional[str] = None + arguments: Optional[Dict[str, Any]] = None def __repr__(self) -> str: """Custom repr to handle enum properly for code generation.""" from asyncapi_python.kernel.document.bindings import AmqpExchangeType _ = AmqpExchangeType # Explicitly reference the import - return f"spec.AmqpExchange(name={self.name!r}, type=spec.AmqpExchangeType.{self.type.name}, durable={self.durable!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r})" + return f"spec.AmqpExchange(name={self.name!r}, type=spec.AmqpExchangeType.{self.type.name}, durable={self.durable!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r}, arguments={self.arguments!r})" @dataclass @@ -44,10 +45,11 @@ class AmqpQueue: exclusive: Optional[bool] = None auto_delete: Optional[bool] = None vhost: Optional[str] = None + arguments: Optional[Dict[str, Any]] = None def __repr__(self) -> str: """Custom repr for code generation.""" - return f"spec.AmqpQueue(name={self.name!r}, durable={self.durable!r}, exclusive={self.exclusive!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r})" + return f"spec.AmqpQueue(name={self.name!r}, durable={self.durable!r}, exclusive={self.exclusive!r}, auto_delete={self.auto_delete!r}, vhost={self.vhost!r}, arguments={self.arguments!r})" @dataclass @@ -159,6 +161,7 @@ def create_amqp_binding_from_dict(binding_dict: Dict[str, Any]) -> AmqpChannelBi exclusive=queue_config.get("exclusive"), auto_delete=queue_config.get("auto_delete"), vhost=queue_config.get("vhost"), + arguments=queue_config.get("arguments"), ) elif binding_type == "routingKey" and "exchange" in binding_dict: exchange_config = binding_dict["exchange"] @@ -176,6 +179,7 @@ def create_amqp_binding_from_dict(binding_dict: Dict[str, Any]) -> AmqpChannelBi durable=exchange_config.get("durable"), auto_delete=exchange_config.get("auto_delete"), vhost=exchange_config.get("vhost"), + arguments=exchange_config.get("arguments"), ) return binding