From 0b351efc6fc8985b27723a8b1a22cf75e542085c Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 29 Nov 2024 11:53:35 +0200 Subject: [PATCH 1/6] pyproject: Remove deprecated ruff rules These are no longer part of the ruleset Signed-off-by: Jussi Kukkonen --- pyproject.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9a6cc3e313..e7e1fc466c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -96,7 +96,6 @@ ignore = [ "TRY", # Individual rules that have been disabled - "ANN101", "ANN102", # nonsense, deprecated in ruff "D400", "D415", "D213", "D205", "D202", "D107", "D407", "D413", "D212", "D104", "D406", "D105", "D411", "D401", "D200", "D203", "ISC001", # incompatible with ruff formatter "PLR0913", "PLR2004", From 1d81a047074c99108c53a2d57e1416cbd9362840 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 29 Nov 2024 12:29:32 +0200 Subject: [PATCH 2/6] Use __future.annotations module This allows using some more nice annotations from 3.10 while still being compatible with even Python 3.8. These are all annotation changes, should not modify any functionality. Signed-off-by: Jussi Kukkonen --- examples/manual_repo/basic_repo.py | 2 + examples/manual_repo/hashed_bin_delegation.py | 7 +- .../succinct_hash_bin_delegations.py | 2 + examples/repository/_simplerepo.py | 5 +- examples/uploader/_localrepo.py | 2 + tests/generated_data/generate_md.py | 5 +- tests/repository_simulator.py | 26 ++-- tests/test_api.py | 8 +- tests/test_examples.py | 2 + tests/test_metadata_eq_.py | 4 +- tests/test_metadata_serialization.py | 50 +++--- tests/test_repository.py | 2 + tests/test_trusted_metadata_set.py | 8 +- tests/test_updater_consistent_snapshot.py | 20 ++- tests/test_updater_delegation_graphs.py | 24 +-- tests/test_updater_fetch_target.py | 2 +- tests/test_updater_key_rotations.py | 8 +- tests/test_updater_ng.py | 2 + tests/utils.py | 23 +-- tuf/api/_payload.py | 147 +++++++++--------- tuf/api/dsse.py | 6 +- tuf/api/metadata.py | 46 +++--- tuf/ngclient/_internal/requests_fetcher.py | 12 +- .../_internal/trusted_metadata_set.py | 24 +-- tuf/ngclient/updater.py | 32 ++-- tuf/repository/_repository.py | 12 +- 26 files changed, 269 insertions(+), 212 deletions(-) diff --git a/examples/manual_repo/basic_repo.py b/examples/manual_repo/basic_repo.py index e9ccc8c429..e619c190af 100644 --- a/examples/manual_repo/basic_repo.py +++ b/examples/manual_repo/basic_repo.py @@ -21,6 +21,8 @@ """ +from __future__ import annotations + import os import tempfile from datetime import datetime, timedelta, timezone diff --git a/examples/manual_repo/hashed_bin_delegation.py b/examples/manual_repo/hashed_bin_delegation.py index 420f46c8a9..0c90651fad 100644 --- a/examples/manual_repo/hashed_bin_delegation.py +++ b/examples/manual_repo/hashed_bin_delegation.py @@ -16,12 +16,14 @@ """ +from __future__ import annotations + import hashlib import os import tempfile -from collections.abc import Iterator from datetime import datetime, timedelta, timezone from pathlib import Path +from typing import TYPE_CHECKING from securesystemslib.signer import CryptoSigner, Signer @@ -34,6 +36,9 @@ ) from tuf.api.serialization.json import JSONSerializer +if TYPE_CHECKING: + from collections.abc import Iterator + def _in(days: float) -> datetime: """Adds 'days' to now and returns datetime object w/o microseconds.""" diff --git a/examples/manual_repo/succinct_hash_bin_delegations.py b/examples/manual_repo/succinct_hash_bin_delegations.py index 40a71486d6..3923a97d16 100644 --- a/examples/manual_repo/succinct_hash_bin_delegations.py +++ b/examples/manual_repo/succinct_hash_bin_delegations.py @@ -18,6 +18,8 @@ NOTE: Metadata files will be written to a 'tmp*'-directory in CWD. """ +from __future__ import annotations + import math import os import tempfile diff --git a/examples/repository/_simplerepo.py b/examples/repository/_simplerepo.py index 8b1904503a..3d19c8de83 100644 --- a/examples/repository/_simplerepo.py +++ b/examples/repository/_simplerepo.py @@ -3,12 +3,13 @@ """Simple example of using the repository library to build a repository""" +from __future__ import annotations + import copy import json import logging from collections import defaultdict from datetime import datetime, timedelta, timezone -from typing import Union from securesystemslib.signer import CryptoSigner, Key, Signer @@ -93,7 +94,7 @@ def snapshot_info(self) -> MetaFile: def _get_verification_result( self, role: str, md: Metadata - ) -> Union[VerificationResult, RootVerificationResult]: + ) -> VerificationResult | RootVerificationResult: """Verify roles metadata using the existing repository metadata""" if role == Root.type: assert isinstance(md.signed, Root) diff --git a/examples/uploader/_localrepo.py b/examples/uploader/_localrepo.py index a27658c487..edae65821b 100644 --- a/examples/uploader/_localrepo.py +++ b/examples/uploader/_localrepo.py @@ -3,6 +3,8 @@ """A Repository implementation for maintainer and developer tools""" +from __future__ import annotations + import contextlib import copy import json diff --git a/tests/generated_data/generate_md.py b/tests/generated_data/generate_md.py index 4af8aab493..6a820fa154 100644 --- a/tests/generated_data/generate_md.py +++ b/tests/generated_data/generate_md.py @@ -3,10 +3,11 @@ # Copyright New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 +from __future__ import annotations + import os import sys from datetime import datetime, timezone -from typing import Optional from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from securesystemslib.signer import CryptoSigner, Signer, SSlibKey @@ -80,7 +81,7 @@ def verify_generation(md: Metadata, path: str) -> None: def generate_all_files( - dump: Optional[bool] = False, verify: Optional[bool] = False + dump: bool | None = False, verify: bool | None = False ) -> None: """Generate a new repository and optionally verify it. diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 4cd3ba56ea..637ba42a54 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -42,13 +42,14 @@ updater.refresh() """ +from __future__ import annotations + import datetime import logging import os import tempfile -from collections.abc import Iterator from dataclasses import dataclass, field -from typing import Optional +from typing import TYPE_CHECKING from urllib import parse import securesystemslib.hash as sslib_hash @@ -72,6 +73,9 @@ from tuf.api.serialization.json import JSONSerializer from tuf.ngclient.fetcher import FetcherInterface +if TYPE_CHECKING: + from collections.abc import Iterator + logger = logging.getLogger(__name__) SPEC_VER = ".".join(SPECIFICATION_VERSION) @@ -81,8 +85,8 @@ class FetchTracker: """Fetcher counter for metadata and targets.""" - metadata: list[tuple[str, Optional[int]]] = field(default_factory=list) - targets: list[tuple[str, Optional[str]]] = field(default_factory=list) + metadata: list[tuple[str, int | None]] = field(default_factory=list) + targets: list[tuple[str, str | None]] = field(default_factory=list) @dataclass @@ -116,7 +120,7 @@ def __init__(self) -> None: # Enable hash-prefixed target file names self.prefix_targets_with_hash = True - self.dump_dir: Optional[str] = None + self.dump_dir: str | None = None self.dump_version = 0 self.fetch_tracker = FetchTracker() @@ -201,7 +205,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: if role == Root.type or ( self.root.consistent_snapshot and ver_and_name != Timestamp.type ): - version: Optional[int] = int(version_str) + version: int | None = int(version_str) else: # the file is not version-prefixed role = ver_and_name @@ -213,7 +217,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: target_path = path[len("/targets/") :] dir_parts, sep, prefixed_filename = target_path.rpartition("/") # extract the hash prefix, if any - prefix: Optional[str] = None + prefix: str | None = None filename = prefixed_filename if self.root.consistent_snapshot and self.prefix_targets_with_hash: prefix, _, filename = prefixed_filename.partition(".") @@ -223,9 +227,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: else: raise DownloadHTTPError(f"Unknown path '{path}'", 404) - def fetch_target( - self, target_path: str, target_hash: Optional[str] - ) -> bytes: + def fetch_target(self, target_path: str, target_hash: str | None) -> bytes: """Return data for 'target_path', checking 'target_hash' if it is given. If hash is None, then consistent_snapshot is not used. @@ -244,7 +246,7 @@ def fetch_target( logger.debug("fetched target %s", target_path) return repo_target.data - def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: + def fetch_metadata(self, role: str, version: int | None = None) -> bytes: """Return signed metadata for 'role', using 'version' if it is given. If version is None, non-versioned metadata is being requested. @@ -261,7 +263,7 @@ def fetch_metadata(self, role: str, version: Optional[int] = None) -> bytes: return self.signed_roots[version - 1] # sign and serialize the requested metadata - md: Optional[Metadata] + md: Metadata | None if role == Timestamp.type: md = self.md_timestamp elif role == Snapshot.type: diff --git a/tests/test_api.py b/tests/test_api.py index 8ef614604a..5f2e7f8c98 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: MIT OR Apache-2.0 """Unit tests for api/metadata.py""" +from __future__ import annotations + import json import logging import os @@ -12,7 +14,7 @@ from copy import copy, deepcopy from datetime import datetime, timedelta, timezone from pathlib import Path -from typing import ClassVar, Optional +from typing import ClassVar from securesystemslib import exceptions as sslib_exceptions from securesystemslib import hash as sslib_hash @@ -245,8 +247,8 @@ def from_priv_key_uri( cls, priv_key_uri: str, public_key: Key, - secrets_handler: Optional[SecretsHandler] = None, - ) -> "Signer": + secrets_handler: SecretsHandler | None = None, + ) -> Signer: pass @property diff --git a/tests/test_examples.py b/tests/test_examples.py index 7cb5f827fa..208603ff64 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: MIT OR Apache-2.0 """Unit tests for 'examples' scripts.""" +from __future__ import annotations + import glob import os import shutil diff --git a/tests/test_metadata_eq_.py b/tests/test_metadata_eq_.py index cf51f6e4e3..428c5ed590 100644 --- a/tests/test_metadata_eq_.py +++ b/tests/test_metadata_eq_.py @@ -3,6 +3,8 @@ """Test __eq__ implementations of classes inside tuf/api/metadata.py.""" +from __future__ import annotations + import copy import os import sys @@ -63,7 +65,7 @@ def setUpClass(cls) -> None: # Keys are class names. # Values are dictionaries containing attribute names and their new values. - classes_attributes_modifications: utils.DataSet = { + classes_attributes_modifications = { "Metadata": {"signed": None, "signatures": None}, "Signed": {"version": -1, "spec_version": "0.0.0"}, "Key": {"keyid": "a", "keytype": "foo", "scheme": "b", "keyval": "b"}, diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 2aeadf1d09..7d1099fcb9 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -37,7 +37,7 @@ class TestSerialization(unittest.TestCase): """Test serialization for all classes in 'tuf/api/metadata.py'.""" - invalid_metadata: utils.DataSet = { + invalid_metadata = { "no signatures field": b'{"signed": \ { "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}} \ @@ -55,7 +55,7 @@ def test_invalid_metadata_serialization(self, test_data: bytes) -> None: with self.assertRaises(DeserializationError): Metadata.from_bytes(test_data) - valid_metadata: utils.DataSet = { + valid_metadata = { "multiple signatures": b'{ \ "signed": \ { "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ @@ -90,7 +90,7 @@ def test_valid_metadata_serialization(self, test_case_data: bytes) -> None: self.assertEqual(test_bytes, md.to_bytes()) - invalid_signatures: utils.DataSet = { + invalid_signatures = { "missing keyid attribute in a signature": '{ "sig": "abc" }', "missing sig attribute in a signature": '{ "keyid": "id" }', } @@ -101,7 +101,7 @@ def test_invalid_signature_serialization(self, test_data: str) -> None: with self.assertRaises(KeyError): Signature.from_dict(case_dict) - valid_signatures: utils.DataSet = { + valid_signatures = { "all": '{ "keyid": "id", "sig": "b"}', "unrecognized fields": '{ "keyid": "id", "sig": "b", "foo": "bar"}', } @@ -114,7 +114,7 @@ def test_signature_serialization(self, test_case_data: str) -> None: # Snapshot instances with meta = {} are valid, but for a full valid # repository it's required that meta has at least one element inside it. - invalid_signed: utils.DataSet = { + invalid_signed = { "no _type": '{"spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no spec_version": '{"_type": "snapshot", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no version": '{"_type": "snapshot", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', @@ -138,7 +138,7 @@ def test_invalid_signed_serialization(self, test_case_data: str) -> None: with self.assertRaises((KeyError, ValueError, TypeError)): Snapshot.from_dict(case_dict) - valid_keys: utils.DataSet = { + valid_keys = { "all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ "keyval": {"public": "foo"}}', "unrecognized field": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ @@ -153,7 +153,7 @@ def test_valid_key_serialization(self, test_case_data: str) -> None: key = Key.from_dict("id", copy.copy(case_dict)) self.assertDictEqual(case_dict, key.to_dict()) - invalid_keys: utils.DataSet = { + invalid_keys = { "no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}', "no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}', "no scheme": '{"keyid": "id", "keytype": "rsa", "keyval": {"public": "foo"}}', @@ -171,7 +171,7 @@ def test_invalid_key_serialization(self, test_case_data: str) -> None: keyid = case_dict.pop("keyid") Key.from_dict(keyid, case_dict) - invalid_roles: utils.DataSet = { + invalid_roles = { "no threshold": '{"keyids": ["keyid"]}', "no keyids": '{"threshold": 3}', "wrong threshold type": '{"keyids": ["keyid"], "threshold": "a"}', @@ -186,7 +186,7 @@ def test_invalid_role_serialization(self, test_case_data: str) -> None: with self.assertRaises((KeyError, TypeError, ValueError)): Role.from_dict(case_dict) - valid_roles: utils.DataSet = { + valid_roles = { "all": '{"keyids": ["keyid"], "threshold": 3}', "many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}', "ordered keyids": '{"keyids": ["c", "b", "a"], "threshold": 1}', @@ -200,7 +200,7 @@ def test_role_serialization(self, test_case_data: str) -> None: role = Role.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, role.to_dict()) - valid_roots: utils.DataSet = { + valid_roots = { "all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ "keys": { \ @@ -248,7 +248,7 @@ def test_root_serialization(self, test_case_data: str) -> None: root = Root.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, root.to_dict()) - invalid_roots: utils.DataSet = { + invalid_roots = { "invalid role name": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ "keys": { \ @@ -293,7 +293,7 @@ def test_invalid_root_serialization(self, test_case_data: str) -> None: with self.assertRaises(ValueError): Root.from_dict(case_dict) - invalid_metafiles: utils.DataSet = { + invalid_metafiles = { "wrong length type": '{"version": 1, "length": "a", "hashes": {"sha256" : "abc"}}', "version 0": '{"version": 0, "length": 1, "hashes": {"sha256" : "abc"}}', "length below 0": '{"version": 1, "length": -1, "hashes": {"sha256" : "abc"}}', @@ -308,7 +308,7 @@ def test_invalid_metafile_serialization(self, test_case_data: str) -> None: with self.assertRaises((TypeError, ValueError, AttributeError)): MetaFile.from_dict(case_dict) - valid_metafiles: utils.DataSet = { + valid_metafiles = { "all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}', "no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }', "length 0": '{"version": 1, "length": 0, "hashes": {"sha256" : "abc"}}', @@ -323,7 +323,7 @@ def test_metafile_serialization(self, test_case_data: str) -> None: metafile = MetaFile.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, metafile.to_dict()) - invalid_timestamps: utils.DataSet = { + invalid_timestamps = { "no metafile": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z"}', } @@ -333,7 +333,7 @@ def test_invalid_timestamp_serialization(self, test_case_data: str) -> None: with self.assertRaises((ValueError, KeyError)): Timestamp.from_dict(case_dict) - valid_timestamps: utils.DataSet = { + valid_timestamps = { "all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}', "legacy spec_version": '{ "_type": "timestamp", "spec_version": "1.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ @@ -348,7 +348,7 @@ def test_timestamp_serialization(self, test_case_data: str) -> None: timestamp = Timestamp.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, timestamp.to_dict()) - valid_snapshots: utils.DataSet = { + valid_snapshots = { "all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": { \ "file1.txt": {"hashes": {"sha256" : "abc"}, "version": 1}, \ @@ -367,7 +367,7 @@ def test_snapshot_serialization(self, test_case_data: str) -> None: snapshot = Snapshot.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, snapshot.to_dict()) - valid_delegated_roles: utils.DataSet = { + valid_delegated_roles = { # DelegatedRole inherits Role and some use cases can be found in the valid_roles. "no hash prefix attribute": '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ "terminating": false, "threshold": 1}', @@ -390,7 +390,7 @@ def test_delegated_role_serialization(self, test_case_data: str) -> None: deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, deserialized_role.to_dict()) - invalid_delegated_roles: utils.DataSet = { + invalid_delegated_roles = { # DelegatedRole inherits Role and some use cases can be found in the invalid_roles. "missing hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', "both hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false, \ @@ -409,7 +409,7 @@ def test_invalid_delegated_role_serialization( with self.assertRaises(ValueError): DelegatedRole.from_dict(case_dict) - valid_succinct_roles: utils.DataSet = { + valid_succinct_roles = { # SuccinctRoles inherits Role and some use cases can be found in the valid_roles. "standard succinct_roles information": '{"keyids": ["keyid"], "threshold": 1, \ "bit_length": 8, "name_prefix": "foo"}', @@ -423,7 +423,7 @@ def test_succinct_roles_serialization(self, test_case_data: str) -> None: succinct_roles = SuccinctRoles.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, succinct_roles.to_dict()) - invalid_succinct_roles: utils.DataSet = { + invalid_succinct_roles = { # SuccinctRoles inherits Role and some use cases can be found in the invalid_roles. "missing bit_length from succinct_roles": '{"keyids": ["keyid"], "threshold": 1, "name_prefix": "foo"}', "missing name_prefix from succinct_roles": '{"keyids": ["keyid"], "threshold": 1, "bit_length": 8}', @@ -439,7 +439,7 @@ def test_invalid_succinct_roles_serialization(self, test_data: str) -> None: with self.assertRaises((ValueError, KeyError, TypeError)): SuccinctRoles.from_dict(case_dict) - invalid_delegations: utils.DataSet = { + invalid_delegations = { "empty delegations": "{}", "missing keys": '{ "roles": [ \ {"keyids": ["keyid"], "name": "a", "terminating": true, "paths": ["fn1"], "threshold": 3}, \ @@ -507,7 +507,7 @@ def test_invalid_delegation_serialization( with self.assertRaises((ValueError, KeyError, AttributeError)): Delegations.from_dict(case_dict) - valid_delegations: utils.DataSet = { + valid_delegations = { "with roles": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ "keyid2" : {"keytype": "ed25519", "scheme": "ed25519", "keyval": {"public": "bar"}}}, \ @@ -533,7 +533,7 @@ def test_delegation_serialization(self, test_case_data: str) -> None: delegation = Delegations.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, delegation.to_dict()) - invalid_targetfiles: utils.DataSet = { + invalid_targetfiles = { "no hashes": '{"length": 1}', "no length": '{"hashes": {"sha256": "abc"}}', # The remaining cases are the same as for invalid_hashes and @@ -548,7 +548,7 @@ def test_invalid_targetfile_serialization( with self.assertRaises(KeyError): TargetFile.from_dict(case_dict, "file1.txt") - valid_targetfiles: utils.DataSet = { + valid_targetfiles = { "all": '{"length": 12, "hashes": {"sha256" : "abc"}, \ "custom" : {"foo": "bar"} }', "no custom": '{"length": 12, "hashes": {"sha256" : "abc"}}', @@ -562,7 +562,7 @@ def test_targetfile_serialization(self, test_case_data: str) -> None: target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) - valid_targets: utils.DataSet = { + valid_targets = { "all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { \ "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} }, \ diff --git a/tests/test_repository.py b/tests/test_repository.py index 977f381d53..f5179e52fd 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -3,6 +3,8 @@ """Tests for tuf.repository module""" +from __future__ import annotations + import copy import logging import sys diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 3dc2437c5b..076a205cc2 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,11 +1,13 @@ """Unit tests for 'tuf/ngclient/_internal/trusted_metadata_set.py'.""" +from __future__ import annotations + import logging import os import sys import unittest from datetime import datetime, timezone -from typing import Callable, ClassVar, Optional +from typing import Callable, ClassVar from securesystemslib.signer import Signer @@ -104,8 +106,8 @@ def setUp(self) -> None: def _update_all_besides_targets( self, - timestamp_bytes: Optional[bytes] = None, - snapshot_bytes: Optional[bytes] = None, + timestamp_bytes: bytes | None = None, + snapshot_bytes: bytes | None = None, ) -> None: """Update all metadata roles besides targets. diff --git a/tests/test_updater_consistent_snapshot.py b/tests/test_updater_consistent_snapshot.py index 998d852296..35497864f9 100644 --- a/tests/test_updater_consistent_snapshot.py +++ b/tests/test_updater_consistent_snapshot.py @@ -3,12 +3,13 @@ """Test ngclient Updater toggling consistent snapshot""" +from __future__ import annotations + import os import sys import tempfile import unittest -from collections.abc import Iterable -from typing import Any, Optional +from typing import TYPE_CHECKING, Any from tests import utils from tests.repository_simulator import RepositorySimulator @@ -21,6 +22,9 @@ ) from tuf.ngclient import Updater +if TYPE_CHECKING: + from collections.abc import Iterable + class TestConsistentSnapshot(unittest.TestCase): """Test different combinations of 'consistent_snapshot' and @@ -28,7 +32,7 @@ class TestConsistentSnapshot(unittest.TestCase): are formed for each combination""" # set dump_dir to trigger repository state dumps - dump_dir: Optional[str] = None + dump_dir: str | None = None def setUp(self) -> None: self.subtest_count = 0 @@ -98,7 +102,7 @@ def _assert_targets_files_exist(self, filenames: Iterable[str]) -> None: for filename in filenames: self.assertIn(filename, local_target_files) - top_level_roles_data: utils.DataSet = { + top_level_roles_data = { "consistent_snaphot disabled": { "consistent_snapshot": False, "calls": [ @@ -143,7 +147,7 @@ def test_top_level_roles_update( finally: self.teardown_subtest() - delegated_roles_data: utils.DataSet = { + delegated_roles_data = { "consistent_snaphot disabled": { "consistent_snapshot": False, "expected_version": None, @@ -162,7 +166,7 @@ def test_delegated_roles_update( # the correct version prefix, depending on 'consistent_snapshot' config try: consistent_snapshot: bool = test_case_data["consistent_snapshot"] - exp_version: Optional[int] = test_case_data["expected_version"] + exp_version: int | None = test_case_data["expected_version"] rolenames = ["role1", "..", "."] exp_calls = [(role, exp_version) for role in rolenames] @@ -190,7 +194,7 @@ def test_delegated_roles_update( finally: self.teardown_subtest() - targets_download_data: utils.DataSet = { + targets_download_data = { "consistent_snaphot disabled": { "consistent_snapshot": False, "prefix_targets": True, @@ -219,7 +223,7 @@ def test_download_targets(self, test_case_data: dict[str, Any]) -> None: try: consistent_snapshot: bool = test_case_data["consistent_snapshot"] prefix_targets_with_hash: bool = test_case_data["prefix_targets"] - hash_algo: Optional[str] = test_case_data["hash_algo"] + hash_algo: str | None = test_case_data["hash_algo"] targetpaths: list[str] = test_case_data["targetpaths"] self.setup_subtest(consistent_snapshot, prefix_targets_with_hash) diff --git a/tests/test_updater_delegation_graphs.py b/tests/test_updater_delegation_graphs.py index f801cbffd5..dbdd16fb79 100644 --- a/tests/test_updater_delegation_graphs.py +++ b/tests/test_updater_delegation_graphs.py @@ -4,13 +4,14 @@ """Test updating delegated targets roles and searching for target files with various delegation graphs""" +from __future__ import annotations + import os import sys import tempfile import unittest -from collections.abc import Iterable from dataclasses import astuple, dataclass, field -from typing import Optional +from typing import TYPE_CHECKING from tests import utils from tests.repository_simulator import RepositorySimulator @@ -23,6 +24,9 @@ ) from tuf.ngclient import Updater +if TYPE_CHECKING: + from collections.abc import Iterable + @dataclass class TestDelegation: @@ -31,8 +35,8 @@ class TestDelegation: keyids: list[str] = field(default_factory=list) threshold: int = 1 terminating: bool = False - paths: Optional[list[str]] = field(default_factory=lambda: ["*"]) - path_hash_prefixes: Optional[list[str]] = None + paths: list[str] | None = field(default_factory=lambda: ["*"]) + path_hash_prefixes: list[str] | None = None @dataclass @@ -63,7 +67,7 @@ class TestDelegations(unittest.TestCase): """Base class for delegation tests""" # set dump_dir to trigger repository state dumps - dump_dir: Optional[str] = None + dump_dir: str | None = None def setUp(self) -> None: self.subtest_count = 0 @@ -139,7 +143,7 @@ class TestDelegationsGraphs(TestDelegations): """Test creating delegations graphs with different complexity and successfully updating the delegated roles metadata""" - graphs: utils.DataSet = { + graphs = { "basic delegation": DelegationsTestCase( delegations=[TestDelegation("targets", "A")], visited_order=["A"], @@ -287,7 +291,7 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None: finally: self.teardown_subtest() - invalid_metadata: utils.DataSet = { + invalid_metadata = { "unsigned delegated role": DelegationsTestCase( delegations=[ TestDelegation("targets", "invalid"), @@ -360,7 +364,7 @@ def test_safely_encoded_rolenames(self) -> None: exp_calls = [(quoted[:-5], 1) for quoted in roles_to_filenames.values()] self.assertListEqual(self.sim.fetch_tracker.metadata, exp_calls) - hash_bins_graph: utils.DataSet = { + hash_bins_graph = { "delegations": DelegationsTestCase( delegations=[ TestDelegation( @@ -432,7 +436,7 @@ class SuccinctRolesTestCase: # By setting the bit_length the total number of bins is 2^bit_length. # In each test case target_path is a path to a random target we want to # fetch and expected_target_bin is the bin we are expecting to visit. - succinct_bins_graph: utils.DataSet = { + succinct_bins_graph = { "bin amount = 2, taget bin index 0": SuccinctRolesTestCase( bit_length=1, target_path="boo", @@ -544,7 +548,7 @@ def setUp(self) -> None: self._init_repo(self.delegations_tree) # fmt: off - targets: utils.DataSet = { + targets = { "no delegations": TargetTestCase("targetfile", True, []), "targetpath matches wildcard": diff --git a/tests/test_updater_fetch_target.py b/tests/test_updater_fetch_target.py index 612f8131e0..5304843fab 100644 --- a/tests/test_updater_fetch_target.py +++ b/tests/test_updater_fetch_target.py @@ -66,7 +66,7 @@ def _init_updater(self) -> Updater: self.sim, ) - targets: utils.DataSet = { + targets = { "standard case": TestTarget( path="targetpath", content=b"target content", diff --git a/tests/test_updater_key_rotations.py b/tests/test_updater_key_rotations.py index c0831dc042..b78113d67d 100644 --- a/tests/test_updater_key_rotations.py +++ b/tests/test_updater_key_rotations.py @@ -3,12 +3,14 @@ """Test ngclient Updater key rotation handling""" +from __future__ import annotations + import os import sys import tempfile import unittest from dataclasses import dataclass -from typing import ClassVar, Optional +from typing import ClassVar from securesystemslib.signer import CryptoSigner, Signer @@ -25,14 +27,14 @@ class MdVersion: keys: list[int] threshold: int sigs: list[int] - res: Optional[type[Exception]] = None + res: type[Exception] | None = None class TestUpdaterKeyRotations(unittest.TestCase): """Test ngclient root rotation handling""" # set dump_dir to trigger repository state dumps - dump_dir: Optional[str] = None + dump_dir: str | None = None temp_dir: ClassVar[tempfile.TemporaryDirectory] keys: ClassVar[list[Key]] signers: ClassVar[list[Signer]] diff --git a/tests/test_updater_ng.py b/tests/test_updater_ng.py index 6f24dfd810..f42e510b1e 100644 --- a/tests/test_updater_ng.py +++ b/tests/test_updater_ng.py @@ -3,6 +3,8 @@ """Test Updater class""" +from __future__ import annotations + import logging import os import shutil diff --git a/tests/utils.py b/tests/utils.py index 26774b6ee0..e020684d49 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -18,6 +18,8 @@ Provide common utilities for TUF tests """ +from __future__ import annotations + import argparse import errno import logging @@ -28,11 +30,13 @@ import sys import threading import time -import unittest import warnings -from collections.abc import Iterator from contextlib import contextmanager -from typing import IO, Any, Callable, Optional +from typing import IO, TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + import unittest + from collections.abc import Iterator logger = logging.getLogger(__name__) @@ -42,15 +46,12 @@ # Used when forming URLs on the client side TEST_HOST_ADDRESS = "127.0.0.1" -# DataSet is only here so type hints can be used. -DataSet = dict[str, Any] - # Test runner decorator: Runs the test as a set of N SubTests, # (where N is number of items in dataset), feeding the actual test # function one test case at a time def run_sub_tests_with_dataset( - dataset: DataSet, + dataset: dict[str, Any], ) -> Callable[[Callable], Callable]: """Decorator starting a unittest.TestCase.subtest() for each of the cases in dataset""" @@ -103,7 +104,7 @@ def wait_for_server( succeeded = False while not succeeded and remaining_timeout > 0: try: - sock: Optional[socket.socket] = socket.socket( + sock: socket.socket | None = socket.socket( socket.AF_INET, socket.SOCK_STREAM ) assert sock is not None @@ -185,14 +186,14 @@ def __init__( server: str = os.path.join(TESTS_DIR, "simple_server.py"), timeout: int = 10, popen_cwd: str = ".", - extra_cmd_args: Optional[list[str]] = None, + extra_cmd_args: list[str] | None = None, ): self.server = server self.__logger = log # Stores popped messages from the queue. self.__logged_messages: list[str] = [] - self.__server_process: Optional[subprocess.Popen] = None - self._log_queue: Optional[queue.Queue] = None + self.__server_process: subprocess.Popen | None = None + self._log_queue: queue.Queue | None = None self.port = -1 if extra_cmd_args is None: extra_cmd_args = [] diff --git a/tuf/api/_payload.py b/tuf/api/_payload.py index 998705846b..264519a133 100644 --- a/tuf/api/_payload.py +++ b/tuf/api/_payload.py @@ -4,20 +4,20 @@ """Helper classes for low-level Metadata API.""" +from __future__ import annotations + import abc import fnmatch import io import logging -from collections.abc import Iterator from dataclasses import dataclass from datetime import datetime, timezone from typing import ( IO, + TYPE_CHECKING, Any, ClassVar, - Optional, TypeVar, - Union, ) from securesystemslib import exceptions as sslib_exceptions @@ -26,6 +26,9 @@ from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError +if TYPE_CHECKING: + from collections.abc import Iterator + _ROOT = "root" _SNAPSHOT = "snapshot" _TARGETS = "targets" @@ -97,10 +100,10 @@ def expires(self, value: datetime) -> None: # or "inner metadata") def __init__( self, - version: Optional[int], - spec_version: Optional[str], - expires: Optional[datetime], - unrecognized_fields: Optional[dict[str, Any]], + version: int | None, + spec_version: str | None, + expires: datetime | None, + unrecognized_fields: dict[str, Any] | None, ): if spec_version is None: spec_version = ".".join(SPECIFICATION_VERSION) @@ -149,7 +152,7 @@ def to_dict(self) -> dict[str, Any]: @classmethod @abc.abstractmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Signed": + def from_dict(cls, signed_dict: dict[str, Any]) -> Signed: """Deserialization helper, creates object from json/dict representation. """ @@ -198,7 +201,7 @@ def _common_fields_to_dict(self) -> dict[str, Any]: **self.unrecognized_fields, } - def is_expired(self, reference_time: Optional[datetime] = None) -> bool: + def is_expired(self, reference_time: datetime | None = None) -> bool: """Check metadata expiration against a reference time. Args: @@ -237,7 +240,7 @@ def __init__( self, keyids: list[str], threshold: int, - unrecognized_fields: Optional[dict[str, Any]] = None, + unrecognized_fields: dict[str, Any] | None = None, ): if len(set(keyids)) != len(keyids): raise ValueError(f"Nonunique keyids: {keyids}") @@ -261,7 +264,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: dict[str, Any]) -> "Role": + def from_dict(cls, role_dict: dict[str, Any]) -> Role: """Create ``Role`` object from its json/dict representation. Raises: @@ -481,13 +484,13 @@ class Root(Signed, _DelegatorMixin): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - keys: Optional[dict[str, Key]] = None, - roles: Optional[dict[str, Role]] = None, - consistent_snapshot: Optional[bool] = True, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + keys: dict[str, Key] | None = None, + roles: dict[str, Role] | None = None, + consistent_snapshot: bool | None = True, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.consistent_snapshot = consistent_snapshot @@ -511,7 +514,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Root": + def from_dict(cls, signed_dict: dict[str, Any]) -> Root: """Create ``Root`` object from its json/dict representation. Raises: @@ -609,7 +612,7 @@ def get_key(self, keyid: str) -> Key: def get_root_verification_result( self, - previous: Optional["Root"], + previous: Root | None, payload: bytes, signatures: dict[str, Signature], ) -> RootVerificationResult: @@ -656,7 +659,7 @@ class BaseFile: @staticmethod def _verify_hashes( - data: Union[bytes, IO[bytes]], expected_hashes: dict[str, str] + data: bytes | IO[bytes], expected_hashes: dict[str, str] ) -> None: """Verify that the hash of ``data`` matches ``expected_hashes``.""" is_bytes = isinstance(data, bytes) @@ -684,9 +687,7 @@ def _verify_hashes( ) @staticmethod - def _verify_length( - data: Union[bytes, IO[bytes]], expected_length: int - ) -> None: + def _verify_length(data: bytes | IO[bytes], expected_length: int) -> None: """Verify that the length of ``data`` matches ``expected_length``.""" if isinstance(data, bytes): observed_length = len(data) @@ -716,7 +717,7 @@ def _validate_length(length: int) -> None: @staticmethod def _get_length_and_hashes( - data: Union[bytes, IO[bytes]], hash_algorithms: Optional[list[str]] + data: bytes | IO[bytes], hash_algorithms: list[str] | None ) -> tuple[int, dict[str, str]]: """Calculate length and hashes of ``data``.""" if isinstance(data, bytes): @@ -771,9 +772,9 @@ class MetaFile(BaseFile): def __init__( self, version: int = 1, - length: Optional[int] = None, - hashes: Optional[dict[str, str]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + length: int | None = None, + hashes: dict[str, str] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): if version <= 0: raise ValueError(f"Metafile version must be > 0, got {version}") @@ -802,7 +803,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, meta_dict: dict[str, Any]) -> "MetaFile": + def from_dict(cls, meta_dict: dict[str, Any]) -> MetaFile: """Create ``MetaFile`` object from its json/dict representation. Raises: @@ -819,9 +820,9 @@ def from_dict(cls, meta_dict: dict[str, Any]) -> "MetaFile": def from_data( cls, version: int, - data: Union[bytes, IO[bytes]], + data: bytes | IO[bytes], hash_algorithms: list[str], - ) -> "MetaFile": + ) -> MetaFile: """Creates MetaFile object from bytes. This constructor should only be used if hashes are wanted. By default, MetaFile(ver) should be used. @@ -853,7 +854,7 @@ def to_dict(self) -> dict[str, Any]: return res_dict - def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: + def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: """Verify that the length and hashes of ``data`` match expected values. Args: @@ -898,11 +899,11 @@ class Timestamp(Signed): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - snapshot_meta: Optional[MetaFile] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + snapshot_meta: MetaFile | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.snapshot_meta = snapshot_meta or MetaFile(1) @@ -916,7 +917,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Timestamp": + def from_dict(cls, signed_dict: dict[str, Any]) -> Timestamp: """Create ``Timestamp`` object from its json/dict representation. Raises: @@ -961,11 +962,11 @@ class Snapshot(Signed): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - meta: Optional[dict[str, MetaFile]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + meta: dict[str, MetaFile] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(version, spec_version, expires, unrecognized_fields) self.meta = meta if meta is not None else {"targets.json": MetaFile(1)} @@ -977,7 +978,7 @@ def __eq__(self, other: object) -> bool: return super().__eq__(other) and self.meta == other.meta @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Snapshot": + def from_dict(cls, signed_dict: dict[str, Any]) -> Snapshot: """Create ``Snapshot`` object from its json/dict representation. Raises: @@ -1038,9 +1039,9 @@ def __init__( keyids: list[str], threshold: int, terminating: bool, - paths: Optional[list[str]] = None, - path_hash_prefixes: Optional[list[str]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + paths: list[str] | None = None, + path_hash_prefixes: list[str] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): super().__init__(keyids, threshold, unrecognized_fields) self.name = name @@ -1074,7 +1075,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: dict[str, Any]) -> "DelegatedRole": + def from_dict(cls, role_dict: dict[str, Any]) -> DelegatedRole: """Create ``DelegatedRole`` object from its json/dict representation. Raises: @@ -1200,7 +1201,7 @@ def __init__( threshold: int, bit_length: int, name_prefix: str, - unrecognized_fields: Optional[dict[str, Any]] = None, + unrecognized_fields: dict[str, Any] | None = None, ) -> None: super().__init__(keyids, threshold, unrecognized_fields) @@ -1232,7 +1233,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, role_dict: dict[str, Any]) -> "SuccinctRoles": + def from_dict(cls, role_dict: dict[str, Any]) -> SuccinctRoles: """Create ``SuccinctRoles`` object from its json/dict representation. Raises: @@ -1340,9 +1341,9 @@ class Delegations: def __init__( self, keys: dict[str, Key], - roles: Optional[dict[str, DelegatedRole]] = None, - succinct_roles: Optional[SuccinctRoles] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + roles: dict[str, DelegatedRole] | None = None, + succinct_roles: SuccinctRoles | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): self.keys = keys if sum(1 for v in [roles, succinct_roles] if v is not None) != 1: @@ -1384,7 +1385,7 @@ def __eq__(self, other: object) -> bool: return all_attributes_check @classmethod - def from_dict(cls, delegations_dict: dict[str, Any]) -> "Delegations": + def from_dict(cls, delegations_dict: dict[str, Any]) -> Delegations: """Create ``Delegations`` object from its json/dict representation. Raises: @@ -1395,7 +1396,7 @@ def from_dict(cls, delegations_dict: dict[str, Any]) -> "Delegations": for keyid, key_dict in keys.items(): keys_res[keyid] = Key.from_dict(keyid, key_dict) roles = delegations_dict.pop("roles", None) - roles_res: Optional[dict[str, DelegatedRole]] = None + roles_res: dict[str, DelegatedRole] | None = None if roles is not None: roles_res = {} @@ -1472,7 +1473,7 @@ def __init__( length: int, hashes: dict[str, str], path: str, - unrecognized_fields: Optional[dict[str, Any]] = None, + unrecognized_fields: dict[str, Any] | None = None, ): self._validate_length(length) self._validate_hashes(hashes) @@ -1505,7 +1506,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, target_dict: dict[str, Any], path: str) -> "TargetFile": + def from_dict(cls, target_dict: dict[str, Any], path: str) -> TargetFile: """Create ``TargetFile`` object from its json/dict representation. Raises: @@ -1530,8 +1531,8 @@ def from_file( cls, target_file_path: str, local_path: str, - hash_algorithms: Optional[list[str]] = None, - ) -> "TargetFile": + hash_algorithms: list[str] | None = None, + ) -> TargetFile: """Create ``TargetFile`` object from a file. Args: @@ -1553,9 +1554,9 @@ def from_file( def from_data( cls, target_file_path: str, - data: Union[bytes, IO[bytes]], - hash_algorithms: Optional[list[str]] = None, - ) -> "TargetFile": + data: bytes | IO[bytes], + hash_algorithms: list[str] | None = None, + ) -> TargetFile: """Create ``TargetFile`` object from bytes. Args: @@ -1572,7 +1573,7 @@ def from_data( length, hashes = cls._get_length_and_hashes(data, hash_algorithms) return cls(length, hashes, target_file_path) - def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: + def verify_length_and_hashes(self, data: bytes | IO[bytes]) -> None: """Verify that length and hashes of ``data`` match expected values. Args: @@ -1626,12 +1627,12 @@ class Targets(Signed, _DelegatorMixin): def __init__( self, - version: Optional[int] = None, - spec_version: Optional[str] = None, - expires: Optional[datetime] = None, - targets: Optional[dict[str, TargetFile]] = None, - delegations: Optional[Delegations] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + version: int | None = None, + spec_version: str | None = None, + expires: datetime | None = None, + targets: dict[str, TargetFile] | None = None, + delegations: Delegations | None = None, + unrecognized_fields: dict[str, Any] | None = None, ) -> None: super().__init__(version, spec_version, expires, unrecognized_fields) self.targets = targets if targets is not None else {} @@ -1648,7 +1649,7 @@ def __eq__(self, other: object) -> bool: ) @classmethod - def from_dict(cls, signed_dict: dict[str, Any]) -> "Targets": + def from_dict(cls, signed_dict: dict[str, Any]) -> Targets: """Create ``Targets`` object from its json/dict representation. Raises: @@ -1681,7 +1682,7 @@ def to_dict(self) -> dict[str, Any]: targets_dict["delegations"] = self.delegations.to_dict() return targets_dict - def add_key(self, key: Key, role: Optional[str] = None) -> None: + def add_key(self, key: Key, role: str | None = None) -> None: """Add new signing key for delegated role ``role``. If succinct_roles is used then the ``role`` argument is not required. @@ -1713,7 +1714,7 @@ def add_key(self, key: Key, role: Optional[str] = None) -> None: self.delegations.keys[key.keyid] = key - def revoke_key(self, keyid: str, role: Optional[str] = None) -> None: + def revoke_key(self, keyid: str, role: str | None = None) -> None: """Revokes key from delegated role ``role`` and updates the delegations key store. @@ -1760,7 +1761,7 @@ def get_delegated_role(self, delegated_role: str) -> Role: if self.delegations is None: raise ValueError("No delegations found") - role: Optional[Role] = None + role: Role | None = None if self.delegations.roles is not None: role = self.delegations.roles.get(delegated_role) elif self.delegations.succinct_roles is not None: diff --git a/tuf/api/dsse.py b/tuf/api/dsse.py index 7834798e14..d027d14013 100644 --- a/tuf/api/dsse.py +++ b/tuf/api/dsse.py @@ -1,5 +1,7 @@ """Low-level TUF DSSE API. (experimental!)""" +from __future__ import annotations + import json from typing import Generic, cast @@ -55,7 +57,7 @@ class SimpleEnvelope(Generic[T], BaseSimpleEnvelope): DEFAULT_PAYLOAD_TYPE = "application/vnd.tuf+json" @classmethod - def from_bytes(cls, data: bytes) -> "SimpleEnvelope[T]": + def from_bytes(cls, data: bytes) -> SimpleEnvelope[T]: """Load envelope from JSON bytes. NOTE: Unlike ``tuf.api.metadata.Metadata.from_bytes``, this method @@ -102,7 +104,7 @@ def to_bytes(self) -> bytes: return json_bytes @classmethod - def from_signed(cls, signed: T) -> "SimpleEnvelope[T]": + def from_signed(cls, signed: T) -> SimpleEnvelope[T]: """Serialize payload as JSON bytes and wrap in envelope. Args: diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index ed54230dab..76b5ce0fde 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -30,9 +30,11 @@ `examples/repository `_. """ +from __future__ import annotations + import logging import tempfile -from typing import Any, Generic, Optional, cast +from typing import TYPE_CHECKING, Any, Generic, cast from securesystemslib.signer import Signature, Signer from securesystemslib.storage import FilesystemBackend, StorageBackendInterface @@ -65,11 +67,13 @@ VerificationResult, ) from tuf.api.exceptions import UnsignedMetadataError -from tuf.api.serialization import ( - MetadataDeserializer, - MetadataSerializer, - SignedSerializer, -) + +if TYPE_CHECKING: + from tuf.api.serialization import ( + MetadataDeserializer, + MetadataSerializer, + SignedSerializer, + ) logger = logging.getLogger(__name__) @@ -121,8 +125,8 @@ class Metadata(Generic[T]): def __init__( self, signed: T, - signatures: Optional[dict[str, Signature]] = None, - unrecognized_fields: Optional[dict[str, Any]] = None, + signatures: dict[str, Signature] | None = None, + unrecognized_fields: dict[str, Any] | None = None, ): self.signed: T = signed self.signatures = signatures if signatures is not None else {} @@ -153,7 +157,7 @@ def signed_bytes(self) -> bytes: return CanonicalJSONSerializer().serialize(self.signed) @classmethod - def from_dict(cls, metadata: dict[str, Any]) -> "Metadata[T]": + def from_dict(cls, metadata: dict[str, Any]) -> Metadata[T]: """Create ``Metadata`` object from its json/dict representation. Args: @@ -205,9 +209,9 @@ def from_dict(cls, metadata: dict[str, Any]) -> "Metadata[T]": def from_file( cls, filename: str, - deserializer: Optional[MetadataDeserializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, - ) -> "Metadata[T]": + deserializer: MetadataDeserializer | None = None, + storage_backend: StorageBackendInterface | None = None, + ) -> Metadata[T]: """Load TUF metadata from file storage. Args: @@ -238,8 +242,8 @@ def from_file( def from_bytes( cls, data: bytes, - deserializer: Optional[MetadataDeserializer] = None, - ) -> "Metadata[T]": + deserializer: MetadataDeserializer | None = None, + ) -> Metadata[T]: """Load TUF metadata from raw data. Args: @@ -263,9 +267,7 @@ def from_bytes( return deserializer.deserialize(data) - def to_bytes( - self, serializer: Optional[MetadataSerializer] = None - ) -> bytes: + def to_bytes(self, serializer: MetadataSerializer | None = None) -> bytes: """Return the serialized TUF file format as bytes. Note that if bytes are first deserialized into ``Metadata`` and then @@ -306,8 +308,8 @@ def to_dict(self) -> dict[str, Any]: def to_file( self, filename: str, - serializer: Optional[MetadataSerializer] = None, - storage_backend: Optional[StorageBackendInterface] = None, + serializer: MetadataSerializer | None = None, + storage_backend: StorageBackendInterface | None = None, ) -> None: """Write TUF metadata to file storage. @@ -345,7 +347,7 @@ def sign( self, signer: Signer, append: bool = False, - signed_serializer: Optional[SignedSerializer] = None, + signed_serializer: SignedSerializer | None = None, ) -> Signature: """Create signature over ``signed`` and assigns it to ``signatures``. @@ -388,8 +390,8 @@ def sign( def verify_delegate( self, delegated_role: str, - delegated_metadata: "Metadata", - signed_serializer: Optional[SignedSerializer] = None, + delegated_metadata: Metadata, + signed_serializer: SignedSerializer | None = None, ) -> None: """Verify that ``delegated_metadata`` is signed with the required threshold of keys for ``delegated_role``. diff --git a/tuf/ngclient/_internal/requests_fetcher.py b/tuf/ngclient/_internal/requests_fetcher.py index 72269aa4ea..2f89e47ab4 100644 --- a/tuf/ngclient/_internal/requests_fetcher.py +++ b/tuf/ngclient/_internal/requests_fetcher.py @@ -9,9 +9,10 @@ # sigstore-python 1.0 still uses the module from there). requests_fetcher # can be moved out of _internal once sigstore-python 1.0 is not relevant. +from __future__ import annotations + import logging -from collections.abc import Iterator -from typing import Optional +from typing import TYPE_CHECKING from urllib import parse # Imports @@ -21,6 +22,9 @@ from tuf.api import exceptions from tuf.ngclient.fetcher import FetcherInterface +if TYPE_CHECKING: + from collections.abc import Iterator + # Globals logger = logging.getLogger(__name__) @@ -39,7 +43,7 @@ def __init__( self, socket_timeout: int = 30, chunk_size: int = 400000, - app_user_agent: Optional[str] = None, + app_user_agent: str | None = None, ) -> None: # http://docs.python-requests.org/en/master/user/advanced/#session-objects: # @@ -103,7 +107,7 @@ def _fetch(self, url: str) -> Iterator[bytes]: return self._chunks(response) - def _chunks(self, response: "requests.Response") -> Iterator[bytes]: + def _chunks(self, response: requests.Response) -> Iterator[bytes]: """A generator function to be returned by fetch. This way the caller of fetch can differentiate between connection diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index a178b318b6..3678ddf3a1 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -61,13 +61,12 @@ >>> trusted_set.update_snapshot(f.read()) """ +from __future__ import annotations + import datetime import logging from collections import abc -from collections.abc import Iterator -from typing import Optional, Union, cast - -from securesystemslib.signer import Signature +from typing import TYPE_CHECKING, Union, cast from tuf.api import exceptions from tuf.api.dsse import SimpleEnvelope @@ -82,6 +81,11 @@ ) from tuf.ngclient.config import EnvelopeType +if TYPE_CHECKING: + from collections.abc import Iterator + + from securesystemslib.signer import Signature + logger = logging.getLogger(__name__) Delegator = Union[Root, Targets] @@ -270,7 +274,7 @@ def _check_final_timestamp(self) -> None: raise exceptions.ExpiredMetadataError("timestamp.json is expired") def update_snapshot( - self, data: bytes, trusted: Optional[bool] = False + self, data: bytes, trusted: bool | None = False ) -> Snapshot: """Verify and load ``data`` as new snapshot metadata. @@ -402,7 +406,7 @@ def update_delegated_targets( # does not match meta version in timestamp self._check_final_snapshot() - delegator: Optional[Delegator] = self.get(delegator_name) + delegator: Delegator | None = self.get(delegator_name) if delegator is None: raise RuntimeError("Cannot load targets before delegator") @@ -453,8 +457,8 @@ def _load_trusted_root(self, data: bytes) -> None: def _load_from_metadata( role: type[T], data: bytes, - delegator: Optional[Delegator] = None, - role_name: Optional[str] = None, + delegator: Delegator | None = None, + role_name: str | None = None, ) -> tuple[T, bytes, dict[str, Signature]]: """Load traditional metadata bytes, and extract and verify payload. @@ -480,8 +484,8 @@ def _load_from_metadata( def _load_from_simple_envelope( role: type[T], data: bytes, - delegator: Optional[Delegator] = None, - role_name: Optional[str] = None, + delegator: Delegator | None = None, + role_name: str | None = None, ) -> tuple[T, bytes, dict[str, Signature]]: """Load simple envelope bytes, and extract and verify payload. diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index f9327610c2..51bda41f26 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -37,19 +37,23 @@ `_. """ +from __future__ import annotations + import contextlib import logging import os import shutil import tempfile -from typing import Optional, cast +from typing import TYPE_CHECKING, cast from urllib import parse from tuf.api import exceptions from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import EnvelopeType, UpdaterConfig -from tuf.ngclient.fetcher import FetcherInterface + +if TYPE_CHECKING: + from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) @@ -80,10 +84,10 @@ def __init__( self, metadata_dir: str, metadata_base_url: str, - target_dir: Optional[str] = None, - target_base_url: Optional[str] = None, - fetcher: Optional[FetcherInterface] = None, - config: Optional[UpdaterConfig] = None, + target_dir: str | None = None, + target_base_url: str | None = None, + fetcher: FetcherInterface | None = None, + config: UpdaterConfig | None = None, ): self._dir = metadata_dir self._metadata_base_url = _ensure_trailing_slash(metadata_base_url) @@ -153,7 +157,7 @@ def _generate_target_file_path(self, targetinfo: TargetFile) -> str: filename = parse.quote(targetinfo.path, "") return os.path.join(self.target_dir, filename) - def get_targetinfo(self, target_path: str) -> Optional[TargetFile]: + def get_targetinfo(self, target_path: str) -> TargetFile | None: """Return ``TargetFile`` instance with information for ``target_path``. The return value can be used as an argument to @@ -186,8 +190,8 @@ def get_targetinfo(self, target_path: str) -> Optional[TargetFile]: def find_cached_target( self, targetinfo: TargetFile, - filepath: Optional[str] = None, - ) -> Optional[str]: + filepath: str | None = None, + ) -> str | None: """Check whether a local file is an up to date target. Args: @@ -216,8 +220,8 @@ def find_cached_target( def download_target( self, targetinfo: TargetFile, - filepath: Optional[str] = None, - target_base_url: Optional[str] = None, + filepath: str | None = None, + target_base_url: str | None = None, ) -> str: """Download the target file specified by ``targetinfo``. @@ -275,7 +279,7 @@ def download_target( return filepath def _download_metadata( - self, rolename: str, length: int, version: Optional[int] = None + self, rolename: str, length: int, version: int | None = None ) -> bytes: """Download a metadata file and return it as bytes.""" encoded_name = parse.quote(rolename, "") @@ -292,7 +296,7 @@ def _load_local_metadata(self, rolename: str) -> bytes: def _persist_metadata(self, rolename: str, data: bytes) -> None: """Write metadata to disk atomically to avoid data loss.""" - temp_file_name: Optional[str] = None + temp_file_name: str | None = None try: # encode the rolename to avoid issues with e.g. path separators encoded_name = parse.quote(rolename, "") @@ -420,7 +424,7 @@ def _load_targets(self, role: str, parent_role: str) -> Targets: def _preorder_depth_first_walk( self, target_filepath: str - ) -> Optional[TargetFile]: + ) -> TargetFile | None: """ Interrogates the tree of target delegations in order of appearance (which implicitly order trustworthiness), and returns the matching diff --git a/tuf/repository/_repository.py b/tuf/repository/_repository.py index 82a75c7c31..f21596bf09 100644 --- a/tuf/repository/_repository.py +++ b/tuf/repository/_repository.py @@ -3,12 +3,13 @@ """Repository Abstraction for metadata management""" +from __future__ import annotations + import logging from abc import ABC, abstractmethod -from collections.abc import Generator from contextlib import contextmanager, suppress from copy import deepcopy -from typing import Optional +from typing import TYPE_CHECKING from tuf.api.exceptions import UnsignedMetadataError from tuf.api.metadata import ( @@ -21,6 +22,9 @@ Timestamp, ) +if TYPE_CHECKING: + from collections.abc import Generator + logger = logging.getLogger(__name__) @@ -229,9 +233,7 @@ def do_snapshot( return update_version, removed - def do_timestamp( - self, force: bool = False - ) -> tuple[bool, Optional[MetaFile]]: + def do_timestamp(self, force: bool = False) -> tuple[bool, MetaFile | None]: """Update timestamp meta information Updates timestamp according to current snapshot state From 687d4557adbd625392ec2f9f045a6ffcd005dc49 Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 29 Nov 2024 12:31:04 +0200 Subject: [PATCH 3/6] Revert "refactor to use dict union, instead of unpacking" This reverts commit eb6d82f324fa3a38a78ec45a4bb70adac45e3cc8. The change itself was fine but since the code is otherwise compatible with python 3.8, let's revert this to be compatible for one more release. Signed-off-by: Jussi Kukkonen --- tuf/api/_payload.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tuf/api/_payload.py b/tuf/api/_payload.py index 264519a133..3149102588 100644 --- a/tuf/api/_payload.py +++ b/tuf/api/_payload.py @@ -346,17 +346,19 @@ def verified(self) -> bool: def signed(self) -> dict[str, Key]: """Dictionary of all signing keys that have signed, from both VerificationResults. - return a union of all signed. + return a union of all signed (in python<3.9 this requires + dict unpacking) """ - return self.first.signed | self.second.signed + return {**self.first.signed, **self.second.signed} @property def unsigned(self) -> dict[str, Key]: """Dictionary of all signing keys that have not signed, from both VerificationResults. - return a union of all unsigned. + return a union of all unsigned (in python<3.9 this requires + dict unpacking) """ - return self.first.unsigned | self.second.unsigned + return {**self.first.unsigned, **self.second.unsigned} class _DelegatorMixin(metaclass=abc.ABCMeta): From fca3086b5d6b7d1088c972caf7f7ff55bb5d0b4b Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 29 Nov 2024 13:19:54 +0200 Subject: [PATCH 4/6] repository: Change RuntimeError to AssertionError These are assertions that should happen in production: something is wrong in an unrecoverable way. This is not an API change since no-one should be catching these. Making these AssertionErrors makes them skippable in coverage. Signed-off-by: Jussi Kukkonen --- tuf/repository/_repository.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tuf/repository/_repository.py b/tuf/repository/_repository.py index f21596bf09..a6c5de1ea4 100644 --- a/tuf/repository/_repository.py +++ b/tuf/repository/_repository.py @@ -114,7 +114,7 @@ def edit_root(self) -> Generator[Root, None, None]: """Context manager for editing root metadata. See edit()""" with self.edit(Root.type) as root: if not isinstance(root, Root): - raise RuntimeError("Unexpected root type") + raise AssertionError("Unexpected root type") yield root @contextmanager @@ -122,7 +122,7 @@ def edit_timestamp(self) -> Generator[Timestamp, None, None]: """Context manager for editing timestamp metadata. See edit()""" with self.edit(Timestamp.type) as timestamp: if not isinstance(timestamp, Timestamp): - raise RuntimeError("Unexpected timestamp type") + raise AssertionError("Unexpected timestamp type") yield timestamp @contextmanager @@ -130,7 +130,7 @@ def edit_snapshot(self) -> Generator[Snapshot, None, None]: """Context manager for editing snapshot metadata. See edit()""" with self.edit(Snapshot.type) as snapshot: if not isinstance(snapshot, Snapshot): - raise RuntimeError("Unexpected snapshot type") + raise AssertionError("Unexpected snapshot type") yield snapshot @contextmanager @@ -140,35 +140,35 @@ def edit_targets( """Context manager for editing targets metadata. See edit()""" with self.edit(rolename) as targets: if not isinstance(targets, Targets): - raise RuntimeError(f"Unexpected targets ({rolename}) type") + raise AssertionError(f"Unexpected targets ({rolename}) type") yield targets def root(self) -> Root: """Read current root metadata""" root = self.open(Root.type).signed if not isinstance(root, Root): - raise RuntimeError("Unexpected root type") + raise AssertionError("Unexpected root type") return root def timestamp(self) -> Timestamp: """Read current timestamp metadata""" timestamp = self.open(Timestamp.type).signed if not isinstance(timestamp, Timestamp): - raise RuntimeError("Unexpected timestamp type") + raise AssertionError("Unexpected timestamp type") return timestamp def snapshot(self) -> Snapshot: """Read current snapshot metadata""" snapshot = self.open(Snapshot.type).signed if not isinstance(snapshot, Snapshot): - raise RuntimeError("Unexpected snapshot type") + raise AssertionError("Unexpected snapshot type") return snapshot def targets(self, rolename: str = Targets.type) -> Targets: """Read current targets metadata""" targets = self.open(rolename).signed if not isinstance(targets, Targets): - raise RuntimeError("Unexpected targets type") + raise AssertionError("Unexpected targets type") return targets def do_snapshot( From d89c8e673f65730a397d5d8cd68f99d104dd430f Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 29 Nov 2024 13:25:56 +0200 Subject: [PATCH 5/6] coverage config: Add some excludes This makes the results more useful Signed-off-by: Jussi Kukkonen --- pyproject.toml | 10 ++++++++++ requirements/test.txt | 2 +- tox.ini | 4 ++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e7e1fc466c..fbaf48633f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -149,3 +149,13 @@ module = [ "securesystemslib.*", ] ignore_missing_imports = "True" + +[tool.coverage.report] +exclude_also = [ + # abstract class method definition + "raise NotImplementedError", + # defensive programming: these cannot happen + "raise AssertionError", + # imports for mypy only + "if TYPE_CHECKING", +] \ No newline at end of file diff --git a/requirements/test.txt b/requirements/test.txt index 66856203ad..69df33451b 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -4,5 +4,5 @@ -r pinned.txt # coverage measurement -coverage==7.6.8 +coverage[toml]==7.6.8 freezegun==1.5.1 diff --git a/tox.ini b/tox.ini index 9d4679749f..03dd2324e8 100644 --- a/tox.ini +++ b/tox.ini @@ -17,7 +17,7 @@ changedir = tests commands = python3 --version python3 -m coverage run aggregate_tests.py - python3 -m coverage report -m --fail-under 97 + python3 -m coverage report --rcfile {toxinidir}/pyproject.toml -m --fail-under 97 deps = -r{toxinidir}/requirements/test.txt @@ -38,7 +38,7 @@ commands_pre = commands = python3 -m coverage run aggregate_tests.py - python3 -m coverage report -m + python3 -m coverage report --rcfile {toxinidir}/pyproject.toml -m [testenv:lint] changedir = {toxinidir} From 4f32a13ab0af3dd3ed672fba0c112a284578611f Mon Sep 17 00:00:00 2001 From: Jussi Kukkonen Date: Fri, 29 Nov 2024 16:31:45 +0200 Subject: [PATCH 6/6] pyproject: Don't require Python 3.9 quite yet We're still compatible with 3.8: let's not force 3.9 yet. Signed-off-by: Jussi Kukkonen --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index fbaf48633f..f4e19f4236 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ name = "tuf" description = "A secure updater framework for Python" readme = "README.md" license = { text = "MIT OR Apache-2.0" } -requires-python = ">=3.9" +requires-python = ">=3.8" authors = [ { email = "theupdateframework@googlegroups.com" }, ]