From 28ca78a629de412f5e471ee5d9fd75fcfc78a267 Mon Sep 17 00:00:00 2001 From: p1c2u Date: Sun, 1 Feb 2026 14:12:57 +0000 Subject: [PATCH] Hot-path keyword validators fix --- openapi_spec_validator/validation/keywords.py | 228 +++++++++++------- 1 file changed, 136 insertions(+), 92 deletions(-) diff --git a/openapi_spec_validator/validation/keywords.py b/openapi_spec_validator/validation/keywords.py index c752ff4..b57d252 100644 --- a/openapi_spec_validator/validation/keywords.py +++ b/openapi_spec_validator/validation/keywords.py @@ -67,7 +67,7 @@ class SchemaValidator(KeywordValidator): def __init__(self, registry: "KeywordValidatorRegistry"): super().__init__(registry) - self.schema_ids_registry: Optional[list[int]] = [] + self.schema_ids_registry: set[int] = set() @property def default_validator(self) -> ValueValidator: @@ -77,19 +77,25 @@ def _collect_properties(self, schema: SchemaPath) -> set[str]: """Return *all* property names reachable from this schema.""" props: set[str] = set() - if "properties" in schema: - schema_props = (schema / "properties").keys() - props.update(cast(Sequence[str], schema_props)) + schema_value = schema.read_value() + if not isinstance(schema_value, dict): + return props + + schema_props = schema_value.get("properties") + if isinstance(schema_props, dict): + props.update(cast(Sequence[str], schema_props.keys())) for kw in ("allOf", "anyOf", "oneOf"): - if kw in schema: - for sub in schema / kw: - props.update(self._collect_properties(sub)) + kw_value = schema_value.get(kw) + if isinstance(kw_value, list): + kw_path = schema / kw + for idx in range(len(kw_value)): + props.update(self._collect_properties(kw_path / idx)) - if "items" in schema: + if "items" in schema_value: props.update(self._collect_properties(schema / "items")) - if "not" in schema: + if "not" in schema_value: props.update(self._collect_properties(schema / "not")) return props @@ -98,19 +104,22 @@ def __call__( self, schema: SchemaPath, require_properties: bool = True ) -> Iterator[ValidationError]: schema_value = schema.read_value() - if not hasattr(schema_value, "__getitem__"): + if not isinstance(schema_value, dict): return - assert self.schema_ids_registry is not None schema_id = id(schema_value) if schema_id in self.schema_ids_registry: return - self.schema_ids_registry.append(schema_id) + self.schema_ids_registry.add(schema_id) + + schema_dict = schema_value nested_properties = [] - if "allOf" in schema: - all_of = schema / "allOf" - for inner_schema in all_of: + all_of_value = schema_dict.get("allOf") + if isinstance(all_of_value, list): + all_of_path = schema / "allOf" + for idx in range(len(all_of_value)): + inner_schema = all_of_path / idx yield from self(inner_schema, require_properties=False) nested_properties += list( self._collect_properties(inner_schema) @@ -132,35 +141,21 @@ def __call__( require_properties=False, ) - if "not" in schema: - not_schema = schema / "not" - yield from self( - not_schema, - require_properties=False, - ) + if "not" in schema_dict: + yield from self(schema / "not", require_properties=False) - if "items" in schema: - array_schema = schema / "items" - yield from self( - array_schema, - require_properties=False, - ) + if "items" in schema_dict: + yield from self(schema / "items", require_properties=False) - if "properties" in schema: - props = schema / "properties" - for _, prop_schema in props.items(): - yield from self( - prop_schema, - require_properties=False, - ) + props_value = schema_dict.get("properties") + if isinstance(props_value, dict): + props_path = schema / "properties" + for prop_name in props_value.keys(): + yield from self(props_path / prop_name, require_properties=False) - required = ( - "required" in schema and (schema / "required").read_value() or [] - ) - properties = ( - "properties" in schema and (schema / "properties").keys() or [] - ) - if "allOf" in schema: + required = schema_dict.get("required") or [] + properties = list(props_value.keys()) if isinstance(props_value, dict) else [] + if isinstance(all_of_value, list): extra_properties = list( set(required) - set(properties) - set(nested_properties) ) @@ -172,11 +167,9 @@ def __call__( f"Required list has not defined properties: {extra_properties}" ) - if "default" in schema: - default_value = (schema / "default").read_value() - nullable_value = False - if "nullable" in schema: - nullable_value = (schema / "nullable").read_value() + if "default" in schema_dict: + default_value = schema_dict.get("default") + nullable_value = schema_dict.get("nullable", False) if default_value is not None or nullable_value is not True: yield from self.default_validator(schema, default_value) @@ -187,8 +180,12 @@ def schema_validator(self) -> SchemaValidator: return cast(SchemaValidator, self.registry["schema"]) def __call__(self, schemas: SchemaPath) -> Iterator[ValidationError]: - for _, schema in schemas.items(): - yield from self.schema_validator(schema) + schemas_value = schemas.read_value() + if not isinstance(schemas_value, dict): + return + + for schema_name in schemas_value.keys(): + yield from self.schema_validator(schemas / schema_name) class ParameterValidator(KeywordValidator): @@ -197,9 +194,12 @@ def schema_validator(self) -> SchemaValidator: return cast(SchemaValidator, self.registry["schema"]) def __call__(self, parameter: SchemaPath) -> Iterator[ValidationError]: - if "schema" in parameter: - schema = parameter / "schema" - yield from self.schema_validator(schema) + parameter_value = parameter.read_value() + if not isinstance(parameter_value, dict): + return + + if "schema" in parameter_value: + yield from self.schema_validator(parameter / "schema") class OpenAPIV2ParameterValidator(ParameterValidator): @@ -210,11 +210,14 @@ def default_validator(self) -> ValueValidator: def __call__(self, parameter: SchemaPath) -> Iterator[ValidationError]: yield from super().__call__(parameter) - if "default" in parameter: + parameter_value = parameter.read_value() + if not isinstance(parameter_value, dict): + return + + if "default" in parameter_value: # only possible in swagger 2.0 - if "default" in parameter: - default_value = (parameter / "default").read_value() - yield from self.default_validator(parameter, default_value) + default_value = parameter_value.get("default") + yield from self.default_validator(parameter, default_value) class ParametersValidator(KeywordValidator): @@ -227,10 +230,13 @@ def __call__(self, parameters: SchemaPath) -> Iterator[ValidationError]: for parameter in parameters: yield from self.parameter_validator(parameter) - key = (parameter["name"], parameter["in"]) + parameter_value = parameter.read_value() + if not isinstance(parameter_value, dict): + continue + key = (parameter_value.get("name"), parameter_value.get("in")) if key in seen: yield ParameterDuplicateError( - f"Duplicate parameter `{parameter['name']}`" + f"Duplicate parameter `{parameter_value.get('name')}`" ) seen.add(key) @@ -243,9 +249,12 @@ def schema_validator(self) -> SchemaValidator: def __call__( self, mimetype: str, media_type: SchemaPath ) -> Iterator[ValidationError]: - if "schema" in media_type: - schema = media_type / "schema" - yield from self.schema_validator(schema) + media_type_value = media_type.read_value() + if not isinstance(media_type_value, dict): + return + + if "schema" in media_type_value: + yield from self.schema_validator(media_type / "schema") class ContentValidator(KeywordValidator): @@ -254,9 +263,13 @@ def media_type_validator(self) -> MediaTypeValidator: return cast(MediaTypeValidator, self.registry["mediaType"]) def __call__(self, content: SchemaPath) -> Iterator[ValidationError]: - for mimetype, media_type in content.items(): + content_value = content.read_value() + if not isinstance(content_value, dict): + return + + for mimetype in content_value.keys(): assert isinstance(mimetype, str) - yield from self.media_type_validator(mimetype, media_type) + yield from self.media_type_validator(mimetype, content / mimetype) class ResponseValidator(KeywordValidator): @@ -275,9 +288,12 @@ def __call__( self, response_code: str, response: SchemaPath ) -> Iterator[ValidationError]: # openapi 2 - if "schema" in response: - schema = response / "schema" - yield from self.schema_validator(schema) + response_value = response.read_value() + if not isinstance(response_value, dict): + return + + if "schema" in response_value: + yield from self.schema_validator(response / "schema") class OpenAPIV3ResponseValidator(ResponseValidator): @@ -289,9 +305,12 @@ def __call__( self, response_code: str, response: SchemaPath ) -> Iterator[ValidationError]: # openapi 3 - if "content" in response: - content = response / "content" - yield from self.content_validator(content) + response_value = response.read_value() + if not isinstance(response_value, dict): + return + + if "content" in response_value: + yield from self.content_validator(response / "content") class ResponsesValidator(KeywordValidator): @@ -300,9 +319,13 @@ def response_validator(self) -> ResponseValidator: return cast(ResponseValidator, self.registry["response"]) def __call__(self, responses: SchemaPath) -> Iterator[ValidationError]: - for response_code, response in responses.items(): + responses_value = responses.read_value() + if not isinstance(responses_value, dict): + return + + for response_code in responses_value.keys(): assert isinstance(response_code, str) - yield from self.response_validator(response_code, response) + yield from self.response_validator(response_code, responses / response_code) class OperationValidator(KeywordValidator): @@ -328,8 +351,12 @@ def __call__( ) -> Iterator[ValidationError]: assert self.operation_ids_registry is not None - if "operationId" in operation: - operation_id_value = (operation / "operationId").read_value() + operation_value = operation.read_value() + if not isinstance(operation_value, dict): + return + + if "operationId" in operation_value: + operation_id_value = operation_value.get("operationId") if ( operation_id_value is not None and operation_id_value in self.operation_ids_registry @@ -340,14 +367,13 @@ def __call__( ) self.operation_ids_registry.append(operation_id_value) - if "responses" in operation: - responses = operation / "responses" - yield from self.responses_validator(responses) + if "responses" in operation_value: + yield from self.responses_validator(operation / "responses") names = [] parameters = None - if "parameters" in operation: + if "parameters" in operation_value: parameters = operation / "parameters" yield from self.parameters_validator(parameters) names += list(self._get_path_param_names(parameters)) @@ -366,8 +392,13 @@ def __call__( def _get_path_param_names(self, params: SchemaPath) -> Iterator[str]: for param in params: - if param["in"] == "path": - yield param["name"] + param_value = param.read_value() + if not isinstance(param_value, dict): + continue + if param_value.get("in") == "path": + name = param_value.get("name") + if isinstance(name, str): + yield name def _get_path_params_from_url(self, url: str) -> Iterator[str]: formatter = string.Formatter() @@ -398,18 +429,22 @@ def operation_validator(self) -> OperationValidator: def __call__( self, url: str, path_item: SchemaPath ) -> Iterator[ValidationError]: + path_item_value = path_item.read_value() + if not isinstance(path_item_value, dict): + return + parameters = None - if "parameters" in path_item: + if "parameters" in path_item_value: parameters = path_item / "parameters" yield from self.parameters_validator(parameters) - for field_name, operation in path_item.items(): + for field_name in path_item_value.keys(): assert isinstance(field_name, str) if field_name not in self.OPERATIONS: continue yield from self.operation_validator( - url, field_name, operation, parameters + url, field_name, path_item / field_name, parameters ) @@ -419,9 +454,13 @@ def path_validator(self) -> PathValidator: return cast(PathValidator, self.registry["path"]) def __call__(self, paths: SchemaPath) -> Iterator[ValidationError]: - for url, path_item in paths.items(): + paths_value = paths.read_value() + if not isinstance(paths_value, dict): + return + + for url in paths_value.keys(): assert isinstance(url, str) - yield from self.path_validator(url, path_item) + yield from self.path_validator(url, paths / url) class ComponentsValidator(KeywordValidator): @@ -430,9 +469,12 @@ def schemas_validator(self) -> SchemasValidator: return cast(SchemasValidator, self.registry["schemas"]) def __call__(self, components: SchemaPath) -> Iterator[ValidationError]: - if "schemas" in components: - schemas = components / "schemas" - yield from self.schemas_validator(schemas) + components_value = components.read_value() + if not isinstance(components_value, dict): + return + + if "schemas" in components_value: + yield from self.schemas_validator(components / "schemas") class RootValidator(KeywordValidator): @@ -445,9 +487,11 @@ def components_validator(self) -> ComponentsValidator: return cast(ComponentsValidator, self.registry["components"]) def __call__(self, spec: SchemaPath) -> Iterator[ValidationError]: - if "paths" in spec: - paths = spec / "paths" - yield from self.paths_validator(paths) - if "components" in spec: - components = spec / "components" - yield from self.components_validator(components) + spec_value = spec.read_value() + if not isinstance(spec_value, dict): + return + + if "paths" in spec_value: + yield from self.paths_validator(spec / "paths") + if "components" in spec_value: + yield from self.components_validator(spec / "components")