From 58c8f37206568698b96c8d98f228d796cb12741e Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Sat, 15 Nov 2025 11:35:01 +0000 Subject: [PATCH 1/4] Fix edge case --- .../validation/core/__init__.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/asyncapi_python_codegen/validation/core/__init__.py b/src/asyncapi_python_codegen/validation/core/__init__.py index 9834641..3b99eb1 100644 --- a/src/asyncapi_python_codegen/validation/core/__init__.py +++ b/src/asyncapi_python_codegen/validation/core/__init__.py @@ -251,7 +251,12 @@ def location_must_be_payload(ctx: ValidationContext) -> list[ValidationIssue]: @rule("core") def location_path_exists_in_schema(ctx: ValidationContext) -> list[ValidationIssue]: - """Validate location path exists in message payload schemas.""" + """Validate location path exists in ALL message payload schemas. + + Parameters with location fields must reference paths that exist in every + message in the channel, not just some of them. This prevents runtime errors + when processing messages that lack the required field. + """ issues = [] for channel_key, channel_def in ctx.get_channels().items(): @@ -273,22 +278,23 @@ def location_path_exists_in_schema(ctx: ValidationContext) -> list[ValidationIss path = location.replace("$message.payload#/", "") parts = [p for p in path.split("/") if p] - # Check if path exists in ANY message schema - path_found = False - for msg_def in messages.values(): + # Check if path exists in ALL message schemas + missing_in_messages = [] + for msg_name, msg_def in messages.items(): if not isinstance(msg_def, dict): continue - if _path_exists_in_schema(msg_def.get("payload"), parts): - path_found = True - break + if not _path_exists_in_schema(msg_def.get("payload"), parts): + missing_in_messages.append(msg_name) - if not path_found and messages: + if missing_in_messages: issues.append( ValidationIssue( severity=Severity.ERROR, - message=f"Parameter '{param_name}' location path '{path}' not found in message schemas", + message=f"Parameter '{param_name}' location path '{path}' not found in all message schemas. " + f"Missing in: {', '.join(missing_in_messages)}", path=f"$.channels.{channel_key}.parameters.{param_name}.location", rule="location-path-exists-in-schema", + suggestion=f"Add '{path}' field to all message payloads in this channel", ) ) From 577fbd1fa1e4458e6843ad10153479411663c2e4 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Sat, 15 Nov 2025 11:35:06 +0000 Subject: [PATCH 2/4] Add tests --- tests/codegen/test_validation.py | 171 +++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/tests/codegen/test_validation.py b/tests/codegen/test_validation.py index 034032a..97bcf83 100644 --- a/tests/codegen/test_validation.py +++ b/tests/codegen/test_validation.py @@ -230,6 +230,177 @@ def test_parameter_with_location_warns_not_implemented(tmp_path: Path): assert "myOp" in operations +def test_location_path_must_exist_in_all_messages(tmp_path: Path): + """Test that parameter location path must exist in ALL messages, not just some.""" + spec_file = tmp_path / "location_missing_in_some.yaml" + spec_file.write_text( + """ +asyncapi: 3.0.0 +channels: + alerts: + address: alerts.{location} + parameters: + location: + location: $message.payload#/location + bindings: + amqp: + is: routingKey + exchange: + name: alerts_exchange + type: topic + messages: + alert1: + payload: + type: object + properties: + location: + type: string + message: + type: string + alert2: + payload: + type: object + properties: + message: + type: string +operations: + sendAlert: + action: send + channel: + $ref: '#/channels/alerts' +""" + ) + + with pytest.raises(ValidationError) as exc_info: + extract_all_operations(spec_file) + + # Should fail because 'location' field is missing in alert2 + assert any( + "not found in all message schemas" in error.message + and "alert2" in error.message + for error in exc_info.value.errors + ) + + +def test_location_path_exists_in_all_messages_passes(tmp_path: Path): + """Test that validation passes when location exists in all messages.""" + spec_file = tmp_path / "location_in_all.yaml" + spec_file.write_text( + """ +asyncapi: 3.0.0 +channels: + alerts: + address: alerts.{location} + parameters: + location: + location: $message.payload#/location + bindings: + amqp: + is: routingKey + exchange: + name: alerts_exchange + type: topic + messages: + alert1: + payload: + type: object + properties: + location: + type: string + message: + type: string + alert2: + payload: + type: object + properties: + location: + type: string + severity: + type: string +operations: + sendAlert: + action: send + channel: + $ref: '#/channels/alerts' +""" + ) + + # Should succeed - location exists in both messages + operations = extract_all_operations(spec_file, fail_on_error=True) + assert "sendAlert" in operations + + +def test_location_path_with_single_message(tmp_path: Path): + """Test that validation works correctly with single message.""" + spec_file = tmp_path / "location_single_message.yaml" + spec_file.write_text( + """ +asyncapi: 3.0.0 +channels: + users: + address: users.{userId} + parameters: + userId: + location: $message.payload#/userId + bindings: + amqp: + is: queue + messages: + userEvent: + payload: + type: object + properties: + userId: + type: string + name: + type: string +operations: + publishUser: + action: send + channel: + $ref: '#/channels/users' +""" + ) + + # Should succeed - location exists in the single message + operations = extract_all_operations(spec_file, fail_on_error=True) + assert "publishUser" in operations + + +def test_location_path_with_no_messages(tmp_path: Path): + """Test that validation skips channels with no messages.""" + spec_file = tmp_path / "location_no_messages.yaml" + spec_file.write_text( + """ +asyncapi: 3.0.0 +channels: + emptyChannel: + address: empty.{param} + parameters: + param: + location: $message.payload#/param + bindings: + amqp: + is: queue +operations: + emptyOp: + action: send + channel: + $ref: '#/channels/emptyChannel' + messages: + - payload: + type: object + properties: + param: + type: string +""" + ) + + # Should succeed - validation skips channels with no messages + operations = extract_all_operations(spec_file, fail_on_error=True) + assert "emptyOp" in operations + + def test_undefined_placeholders_in_address(tmp_path: Path): """Test that undefined placeholders in address raise error.""" spec_file = tmp_path / "undefined_params.yaml" From 9f730ffe60dc420a8fd1f10dbe4e2be6abd8e89b Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Sat, 15 Nov 2025 14:13:34 +0000 Subject: [PATCH 3/4] Move parameter count validation from amqp to receiver plane --- pyproject.toml | 2 +- .../contrib/wire/amqp/resolver.py | 19 +-- .../kernel/endpoint/rpc_server.py | 6 + .../kernel/endpoint/subscriber.py | 7 + tests/core/wire/test_amqp_resolver.py | 154 ------------------ uv.lock | 2 +- 6 files changed, 18 insertions(+), 172 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 21bf464..7e03b11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "asyncapi-python" -version = "0.3.0rc6" +version = "0.3.0rc7" license = { text = "Apache-2.0" } description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications." authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }] diff --git a/src/asyncapi_python/contrib/wire/amqp/resolver.py b/src/asyncapi_python/contrib/wire/amqp/resolver.py index 4054c11..9f58255 100644 --- a/src/asyncapi_python/contrib/wire/amqp/resolver.py +++ b/src/asyncapi_python/contrib/wire/amqp/resolver.py @@ -6,10 +6,7 @@ from asyncapi_python.kernel.document.bindings import AmqpChannelBinding from asyncapi_python.kernel.document.channel import Channel from asyncapi_python.kernel.wire import EndpointParams -from asyncapi_python.kernel.wire.utils import ( - substitute_parameters, - validate_parameters_strict, -) +from asyncapi_python.kernel.wire.utils import substitute_parameters from .config import AmqpBindingType, AmqpConfig @@ -186,8 +183,7 @@ def resolve_amqp_config( # Channel address pattern (with parameter substitution) case (False, None, address, _) if address: - # Strict validation for implicit queue binding - validate_parameters_strict(channel, param_values) + # Validate no wildcards for implicit queue binding _validate_no_wildcards_in_queue(param_values) resolved_address = substitute_parameters(address, param_values) return AmqpConfig( @@ -200,8 +196,7 @@ def resolve_amqp_config( # Operation name pattern (fallback) case (False, None, None, op_name) if op_name: - # Strict validation for implicit queue binding - validate_parameters_strict(channel, param_values) + # Validate no wildcards for implicit queue binding _validate_no_wildcards_in_queue(param_values) return AmqpConfig( queue_name=op_name, @@ -228,13 +223,9 @@ def resolve_queue_binding( """Resolve AMQP queue binding configuration Queue bindings require: - - All channel parameters must be provided (strict validation) - No wildcards allowed in parameter values """ - # Strict validation: all parameters required, exact match - validate_parameters_strict(channel, param_values) - # Validate no wildcards in queue binding parameters _validate_no_wildcards_in_queue(param_values) @@ -280,14 +271,10 @@ def resolve_routing_key_binding( """Resolve AMQP routing key binding configuration for pub/sub patterns For routing key bindings: - - All channel-defined parameters must be provided (strict validation) - Parameter values can explicitly contain wildcards ('*' or '#') - Wildcards are allowed for topic exchange pattern matching """ - # Strict validation: all parameters required, exact match - validate_parameters_strict(channel, param_values) - # Determine exchange name and type # For exchange name, we need concrete values (no wildcards) # If param_values has placeholders, use them; otherwise use literal exchange name diff --git a/src/asyncapi_python/kernel/endpoint/rpc_server.py b/src/asyncapi_python/kernel/endpoint/rpc_server.py index d304113..000351a 100644 --- a/src/asyncapi_python/kernel/endpoint/rpc_server.py +++ b/src/asyncapi_python/kernel/endpoint/rpc_server.py @@ -4,6 +4,7 @@ from typing_extensions import Unpack from asyncapi_python.kernel.wire import Consumer, Producer +from asyncapi_python.kernel.wire.utils import validate_parameters_strict from ..exceptions import Reject from ..typing import ( @@ -63,6 +64,11 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None: if not self._reply_codecs: raise RuntimeError("RPC server operation has no reply messages defined") + # Validate subscription parameters before creating consumer + validate_parameters_strict( + self._operation.channel, self._subscription_parameters + ) + # Create consumer for receiving requests self._consumer = await self._wire.create_consumer( channel=self._operation.channel, diff --git a/src/asyncapi_python/kernel/endpoint/subscriber.py b/src/asyncapi_python/kernel/endpoint/subscriber.py index b153feb..955c56c 100644 --- a/src/asyncapi_python/kernel/endpoint/subscriber.py +++ b/src/asyncapi_python/kernel/endpoint/subscriber.py @@ -4,6 +4,7 @@ from typing_extensions import Unpack from asyncapi_python.kernel.wire import Consumer +from asyncapi_python.kernel.wire.utils import validate_parameters_strict from ..exceptions import Reject from ..typing import BatchConfig, BatchConsumer, Handler, IncomingMessage, T_Input @@ -46,6 +47,11 @@ async def start(self, **params: Unpack[AbstractEndpoint.StartParams]) -> None: f"Use @{self._operation.key} decorator to register a handler function." ) + # Validate subscription parameters before creating consumer + validate_parameters_strict( + self._operation.channel, self._subscription_parameters + ) + # Create consumer from wire factory self._consumer = await self._wire.create_consumer( channel=self._operation.channel, @@ -331,3 +337,4 @@ async def process_batch(): # If processing remaining batch fails, just nack all and continue for _, wire_message in batch: await wire_message.nack() + await wire_message.nack() diff --git a/tests/core/wire/test_amqp_resolver.py b/tests/core/wire/test_amqp_resolver.py index 3e8d067..a7070be 100644 --- a/tests/core/wire/test_amqp_resolver.py +++ b/tests/core/wire/test_amqp_resolver.py @@ -196,157 +196,3 @@ def test_resolve_channel_address_without_binding_accepts_concrete_params(): # Verify the address was substituted with concrete value assert config.queue_name == "task.high" - - -def test_resolve_amqp_config_rejects_missing_parameters(): - """Resolver should validate all required parameters are provided.""" - from asyncapi_python.kernel.document.channel import AddressParameter - - # Create channel with 2 required parameters - channel = create_test_channel( - address="weather.{location}.{severity}", - binding=AmqpChannelBinding(type="queue"), - ) - # Add parameter definitions to channel - channel = Channel( - key=channel.key, - address=channel.address, - title=channel.title, - summary=channel.summary, - description=channel.description, - servers=channel.servers, - messages=channel.messages, - parameters={ - "location": AddressParameter( - key="location", - description="Geographic location", - location=None, - ), - "severity": AddressParameter( - key="severity", - description="Alert severity", - location=None, - ), - }, - tags=channel.tags, - external_docs=channel.external_docs, - bindings=channel.bindings, - ) - - params: EndpointParams = { - "channel": channel, - "parameters": {"location": "NYC"}, # Missing severity - "op_bindings": None, - "is_reply": False, - } - - with pytest.raises(ValueError) as exc_info: - resolve_amqp_config(params, "test_op", "test_app") - - error_msg = str(exc_info.value) - assert "Missing required parameters" in error_msg - assert "severity" in error_msg - - -def test_resolve_amqp_config_rejects_extra_parameters(): - """Resolver should reject extra parameters not defined in channel.""" - from asyncapi_python.kernel.document.channel import AddressParameter - - # Create channel with 1 required parameter - channel = create_test_channel( - address="weather.{location}", - binding=AmqpChannelBinding(type="queue"), - ) - # Add parameter definition to channel - channel = Channel( - key=channel.key, - address=channel.address, - title=channel.title, - summary=channel.summary, - description=channel.description, - servers=channel.servers, - messages=channel.messages, - parameters={ - "location": AddressParameter( - key="location", - description="Geographic location", - location=None, - ), - }, - tags=channel.tags, - external_docs=channel.external_docs, - bindings=channel.bindings, - ) - - params: EndpointParams = { - "channel": channel, - "parameters": { - "location": "NYC", - "severity": "high", # Extra parameter - }, - "op_bindings": None, - "is_reply": False, - } - - with pytest.raises(ValueError) as exc_info: - resolve_amqp_config(params, "test_op", "test_app") - - error_msg = str(exc_info.value) - assert "Unexpected parameters" in error_msg - assert "severity" in error_msg - - -def test_resolve_amqp_config_routing_key_rejects_missing_parameters(): - """Routing key bindings should also validate all parameters are provided.""" - from asyncapi_python.kernel.document.channel import AddressParameter - - # Create channel with routingKey binding - channel = create_test_channel( - address="weather.{location}.{severity}", - binding=AmqpChannelBinding( - type="routingKey", - exchange=AmqpExchange( - name="weather_exchange", - type=AmqpExchangeType.TOPIC, - ), - ), - ) - # Add parameter definitions - channel = Channel( - key=channel.key, - address=channel.address, - title=channel.title, - summary=channel.summary, - description=channel.description, - servers=channel.servers, - messages=channel.messages, - parameters={ - "location": AddressParameter( - key="location", - description="Geographic location", - location=None, - ), - "severity": AddressParameter( - key="severity", - description="Alert severity", - location=None, - ), - }, - tags=channel.tags, - external_docs=channel.external_docs, - bindings=channel.bindings, - ) - - params: EndpointParams = { - "channel": channel, - "parameters": {"location": "NYC"}, # Missing severity - "op_bindings": None, - "is_reply": False, - } - - with pytest.raises(ValueError) as exc_info: - resolve_amqp_config(params, "test_op", "test_app") - - error_msg = str(exc_info.value) - assert "Missing required parameters" in error_msg - assert "severity" in error_msg diff --git a/uv.lock b/uv.lock index 8f420ba..0ee18d7 100644 --- a/uv.lock +++ b/uv.lock @@ -64,7 +64,7 @@ wheels = [ [[package]] name = "asyncapi-python" -version = "0.3.0rc6" +version = "0.3.0rc7" source = { editable = "." } dependencies = [ { name = "pydantic" }, From 4223a1b074224be6f52d0fbf76cac2eda17c61b4 Mon Sep 17 00:00:00 2001 From: Yaroslav Petrov Date: Sat, 15 Nov 2025 14:15:15 +0000 Subject: [PATCH 4/4] Increment version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 7e03b11..88a99fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "asyncapi-python" -version = "0.3.0rc7" +version = "0.3.0rc8" license = { text = "Apache-2.0" } description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications." authors = [{ name = "Yaroslav Petrov", email = "yaroslav.v.petrov@gmail.com" }]