diff --git a/.packit.yaml b/.packit.yaml index 76f4e60ad..917ba5d56 100644 --- a/.packit.yaml +++ b/.packit.yaml @@ -27,7 +27,6 @@ jobs: - job: copr_build trigger: pull_request identifier: copr_pull - manual_trigger: true targets: - fedora-all diff --git a/Makefile b/Makefile index ff9d54f52..19737ec2f 100644 --- a/Makefile +++ b/Makefile @@ -8,6 +8,7 @@ ISORT_MODULES = monkeytype_config.py setup.py bin/stratis src tests MONKEYTYPE_MODULES = stratis_cli._actions._bind \ stratis_cli._actions._constants \ + stratis_cli._actions._crypt \ stratis_cli._actions._data \ stratis_cli._actions._debug \ stratis_cli._actions._environment \ diff --git a/docs/stratis.txt b/docs/stratis.txt index de09713e9..018fad2bb 100644 --- a/docs/stratis.txt +++ b/docs/stratis.txt @@ -117,6 +117,37 @@ pool unbind <(clevis|keyring)> [--token-slot ]:: mechanism. MOVE NOTICE: The "unbind" subcommand can also be found under the "pool encryption" subcommand. The "pool unbind" subcommand that you are using now is deprecated and will be removed in stratis 3.10.0. +pool encryption on --in-place <(--uuid |--name )> [--key-desc ] [--clevis <(nbde|tang|tpm2)> [--tang-url ] [<(--thumbprint | --trust-url)>]:: + Turn encryption on for the specified pool. This operation must encrypt + every block in the pool and takes time proportional to the size of the + pool. +pool encryption off --in-place <(--uuid |--name )>:: + Turn encryption off for the specified pool. This operation must write + the plain text of every block in the pool and takes time proportional to + the size of the pool. +pool encryption reencrypt --in-place <(--uuid |--name )>:: + Reencrypt the pool with a new master key. This operation must overwrite + every block in the pool with re-encrypted data and takes time + proportional to the size of the pool. +pool encryption bind <(nbde|tang)> <(--uuid |--name )> <(--thumbprint | --trust-url)> :: + Bind the devices in the specified pool to a supplementary encryption + mechanism that uses NBDE (Network-Bound Disc Encryption). *tang* is + an alias for *nbde*. +pool encryption bind tpm2 <(--uuid |--name )>:: + Bind the devices in the specified pool to a supplementary encryption + mechanism that uses TPM 2.0 (Trusted Platform Module). +pool encryption bind keyring <(--uuid |--name )> :: + Bind the devices in the specified pool to a supplementary encryption + mechanism using a key in the kernel keyring. +pool encryption rebind clevis <(--uuid |--name )> [--token-slot ]:: + Rebind the devices in the specified pool using the Clevis configuration + with which the devices in the pool were previously bound. +pool encryption rebind keyring <(--uuid |--name )> [--token-slot ]:: + Rebind the devices in the specified pool using the specified key + description. +pool encryption unbind <(clevis|keyring)> <(--uuid |--name )> [--token-slot ]:: + Unbind the devices in the specified pool from the specified encryption + mechanism. pool set-fs-limit :: Set the limit on the number of file systems allowed per-pool. This number may only be increased from its current value. @@ -243,6 +274,20 @@ OPTIONS --token-slot :: For V2 pools only. Use the token slot number to select among different bindings that use the same encryption method. +--in-place :: + This is a mandatory option that must be set when requesting + a long-running in-place encryption operation. These operations are + a kind that must read and write every block in the pool. Hence these + operations take a time that is linear in the size of the pool. + Additionally, these operations are run in place, that is, the + pool's data blocks are directly modified while it is in use. While + the operation is taking place, automatic administrative actions, + for example, extending filesystems, can not be taken on the pool. + Furthermore, user-initiated actions, such as adding a new device to + a pool are also disabled. The pool administrator should therefore + ensure that no administrative operations will become urgently + necessary while the encryption operation is running. Consider + backing up your data before initiating this operation. SIZE SPECIFICATION FORMAT FOR INPUT @@ -338,6 +383,11 @@ Encryption Enabled: Clevis Configuration:: The Clevis configuration, if the pool is encrypted via Clevis. Only displayed if metadata version is 1 and encryption is enabled. +Encryption Enabled: Last Time Reencrypted:: + The last time the pool was re-encrypted. If the pool has never been + re-encrypted, the value is "Never". Only displayed if metadata + version is 2 and encryption is enabled. + Encryption Enabled: Free Token Slots Remaining:: The number of token slots remaining that can accommodate new bindings, followed by a list of binding descriptions, ordered by diff --git a/src/stratis_cli/_actions/__init__.py b/src/stratis_cli/_actions/__init__.py index b95c02e1c..45e079911 100644 --- a/src/stratis_cli/_actions/__init__.py +++ b/src/stratis_cli/_actions/__init__.py @@ -22,6 +22,7 @@ MANAGER_0_INTERFACE, POOL_INTERFACE, ) +from ._crypt import CryptActions from ._debug import ( BlockdevDebugActions, FilesystemDebugActions, @@ -34,3 +35,4 @@ from ._stratis import StratisActions from ._stratisd_version import check_stratisd_version from ._top import TopActions +from ._utils import get_errors diff --git a/src/stratis_cli/_actions/_crypt.py b/src/stratis_cli/_actions/_crypt.py new file mode 100644 index 000000000..97e19ea6c --- /dev/null +++ b/src/stratis_cli/_actions/_crypt.py @@ -0,0 +1,179 @@ +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Miscellaneous whole pool encryption actions. +""" + +# isort: STDLIB +import json +from argparse import Namespace + +from .._constants import PoolId +from .._errors import ( + StratisCliEngineError, + StratisCliIncoherenceError, + StratisCliInPlaceNotSpecified, + StratisCliNoChangeError, +) +from .._stratisd_constants import StratisdErrors +from ._connection import get_object +from ._constants import TOP_OBJECT +from ._utils import long_running_operation + + +class CryptActions: + """ + Whole pool encryption actions. + """ + + @staticmethod + @long_running_operation + def encrypt(namespace: Namespace): + """ + Encrypt a previously unencrypted pool. + """ + + if not namespace.in_place: + raise StratisCliInPlaceNotSpecified() + + # pylint: disable=import-outside-toplevel + from ._data import MOPool, ObjectManager, Pool, pools + + pool_id = PoolId.from_parser_namespace(namespace) + assert pool_id is not None + + proxy = get_object(TOP_OBJECT) + + managed_objects = ObjectManager.Methods.GetManagedObjects(proxy, {}) + + (pool_object_path, mopool) = next( + pools(props=pool_id.managed_objects_key()) + .require_unique_match(True) + .search(managed_objects) + ) + + if bool(MOPool(mopool).Encrypted()): + raise StratisCliNoChangeError("encryption on", pool_id) + + (changed, return_code, message) = Pool.Methods.EncryptPool( + get_object(pool_object_path), + { + "key_descs": ( + [] + if namespace.key_desc is None + else [((False, 0), namespace.key_desc)] + ), + "clevis_infos": ( + [] + if namespace.clevis is None + else [ + ( + (False, 0), + namespace.clevis.pin, + json.dumps(namespace.clevis.config), + ) + ] + ), + }, + ) + + if return_code != StratisdErrors.OK: + raise StratisCliEngineError(return_code, message) + + if not changed: # pragma: no cover + raise StratisCliIncoherenceError( + f"Expected to change {pool_id} encryption status to " + "encrypted, but stratisd reports that it did not change " + "the encryption status" + ) + + @staticmethod + @long_running_operation + def unencrypt(namespace: Namespace): + """ + Unencrypt a previously encrypted pool. + """ + if not namespace.in_place: + raise StratisCliInPlaceNotSpecified() + + # pylint: disable=import-outside-toplevel + from ._data import MOPool, ObjectManager, Pool, pools + + pool_id = PoolId.from_parser_namespace(namespace) + assert pool_id is not None + + proxy = get_object(TOP_OBJECT) + + managed_objects = ObjectManager.Methods.GetManagedObjects(proxy, {}) + + (pool_object_path, mopool) = next( + pools(props=pool_id.managed_objects_key()) + .require_unique_match(True) + .search(managed_objects) + ) + + if not bool(MOPool(mopool).Encrypted()): + raise StratisCliNoChangeError("encryption off", pool_id) + + (changed, return_code, message) = Pool.Methods.DecryptPool( + get_object(pool_object_path), {} + ) + + if return_code != StratisdErrors.OK: # pragma: no cover + raise StratisCliEngineError(return_code, message) + + if not changed: # pragma: no cover + raise StratisCliIncoherenceError( + f"Expected to change {pool_id} encryption status to " + "unencrypted, but stratisd reports that it did not change " + "the pool's encryption status" + ) + + @staticmethod + @long_running_operation + def reencrypt(namespace: Namespace): + """ + Reencrypt an already encrypted pool with a new key. + """ + if not namespace.in_place: + raise StratisCliInPlaceNotSpecified() + + # pylint: disable=import-outside-toplevel + from ._data import ObjectManager, Pool, pools + + pool_id = PoolId.from_parser_namespace(namespace) + assert pool_id is not None + + proxy = get_object(TOP_OBJECT) + + managed_objects = ObjectManager.Methods.GetManagedObjects(proxy, {}) + + (pool_object_path, _) = next( + pools(props=pool_id.managed_objects_key()) + .require_unique_match(True) + .search(managed_objects) + ) + + (changed, return_code, message) = Pool.Methods.ReencryptPool( + get_object(pool_object_path), {} + ) + + if return_code != StratisdErrors.OK: # pragma: no cover + raise StratisCliEngineError(return_code, message) + + if not changed: # pragma: no cover + raise StratisCliIncoherenceError( + f"Expected to reencrypt {pool_id} with a new key but stratisd " + "reports that it did not perform the operation" + ) diff --git a/src/stratis_cli/_actions/_data.py b/src/stratis_cli/_actions/_data.py index b484fb0f6..fc0395bda 100644 --- a/src/stratis_cli/_actions/_data.py +++ b/src/stratis_cli/_actions/_data.py @@ -44,7 +44,7 @@ "deferred until after the stratis_cli module has been fully loaded." ) -DBUS_TIMEOUT_SECONDS = 120 +DBUS_TIMEOUT_SECONDS = 60 # Specification for the lowest manager interface supported by the major # version of stratisd on which this version of the CLI depends. diff --git a/src/stratis_cli/_actions/_introspect.py b/src/stratis_cli/_actions/_introspect.py index b3a742d91..afd151211 100644 --- a/src/stratis_cli/_actions/_introspect.py +++ b/src/stratis_cli/_actions/_introspect.py @@ -177,12 +177,24 @@ + + + + + + + + + + + + @@ -221,6 +233,11 @@ + + + + + @@ -249,13 +266,12 @@ - - - + + diff --git a/src/stratis_cli/_actions/_list_pool.py b/src/stratis_cli/_actions/_list_pool.py index a60f26066..52ddb09b2 100644 --- a/src/stratis_cli/_actions/_list_pool.py +++ b/src/stratis_cli/_actions/_list_pool.py @@ -22,6 +22,7 @@ from typing import List, Optional, Union # isort: THIRDPARTY +from dateutil import parser as date_parser from justbytes import Range from .._alerts import ( @@ -360,6 +361,14 @@ def _print_detail_view( if encrypted: print("Encryption Enabled: Yes") + (valid, timestamp) = mopool.LastReencryptedTimestamp() + reencrypted = ( + date_parser.isoparse(timestamp).astimezone().strftime("%b %d %Y %H:%M") + if valid + else "Never" + ) + print(f" Last Time Reencrypted: {reencrypted}") + if metadata_version is MetadataVersion.V1: # pragma: no cover key_description_str = _non_existent_or_inconsistent_to_str( EncryptionInfoKeyDescription(mopool.KeyDescriptions()) diff --git a/src/stratis_cli/_actions/_utils.py b/src/stratis_cli/_actions/_utils.py index d2c01e0e5..10a34a637 100644 --- a/src/stratis_cli/_actions/_utils.py +++ b/src/stratis_cli/_actions/_utils.py @@ -22,15 +22,22 @@ import os import sys import termios +from argparse import Namespace +from collections.abc import Iterator from enum import Enum -from typing import Any, Optional, Tuple +from functools import wraps +from typing import Any, Callable, Optional, Tuple from uuid import UUID # isort: THIRDPARTY from dbus import Dictionary, Struct +from dbus.exceptions import DBusException from dbus.proxies import ProxyObject from justbytes import Range +# isort: FIRSTPARTY +from dbus_python_client_gen import DPClientInvocationError + from .._errors import ( StratisCliKeyfileNotFoundError, StratisCliPassphraseEmptyError, @@ -295,3 +302,36 @@ def free(self) -> Optional[Range]: Total - used. """ return None if self._used is None else self._total - self._used + + +def get_errors(exc: BaseException) -> Iterator[BaseException]: + """ + Generates a sequence of exceptions starting with exc and following the chain + of causes. + """ + yield exc + while exc.__cause__ is not None: + yield exc.__cause__ + exc = exc.__cause__ + + +def long_running_operation(func: Callable) -> Callable: + """ + Mark a function as a long running operation and catch and ignore NoReply + D-Bus exception. + """ + + @wraps(func) + def wrapper(namespace: Namespace): + try: + func(namespace) + except DPClientInvocationError as err: + if not any( + isinstance(e, DBusException) + and e.get_dbus_name() == "org.freedesktop.DBus.Error.NoReply" + for e in get_errors(err) + ): + raise err + print("Operation initiated", file=sys.stderr) + + return wrapper diff --git a/src/stratis_cli/_error_reporting.py b/src/stratis_cli/_error_reporting.py index b6ff48c7e..aa703fbbe 100644 --- a/src/stratis_cli/_error_reporting.py +++ b/src/stratis_cli/_error_reporting.py @@ -17,7 +17,6 @@ # isort: STDLIB import os import sys -from collections.abc import Iterator # isort: THIRDPARTY import dbus @@ -40,6 +39,7 @@ FILESYSTEM_INTERFACE, MANAGER_0_INTERFACE, POOL_INTERFACE, + get_errors, ) from ._errors import ( StratisCliActionError, @@ -86,17 +86,6 @@ def _interface_name_to_common_name(interface_name): raise StratisCliUnknownInterfaceError(interface_name) # pragma: no cover -def get_errors(exc: BaseException) -> Iterator[BaseException]: - """ - Generates a sequence of exceptions starting with exc and following the chain - of causes. - """ - yield exc - while exc.__cause__ is not None: - yield exc.__cause__ - exc = exc.__cause__ - - def _interpret_errors_0( error: dbus.exceptions.DBusException, ): diff --git a/src/stratis_cli/_errors.py b/src/stratis_cli/_errors.py index b60731efb..1e6c7fe1a 100644 --- a/src/stratis_cli/_errors.py +++ b/src/stratis_cli/_errors.py @@ -454,3 +454,19 @@ def __init__(self, msg): def __str__(self): return self.msg + + +class StratisCliInPlaceNotSpecified(StratisCliUserError): + """ + Raised if the user requested in-place encryption but did not use the + --in-place option. + """ + + def __str__(self) -> str: + return ( + "Specify the --in-place option to demonstrate that you " + "understand the special nature of the procedure that you are " + "about to initiate. Please refer to the discussion of the " + '"--in-place" option in the man pages for further ' + "information." + ) diff --git a/src/stratis_cli/_parser/_encryption.py b/src/stratis_cli/_parser/_encryption.py index a3a76d3b3..8ad275540 100644 --- a/src/stratis_cli/_parser/_encryption.py +++ b/src/stratis_cli/_parser/_encryption.py @@ -19,9 +19,11 @@ import copy from argparse import SUPPRESS, Namespace -from .._actions import BindActions, RebindActions +from .._actions import BindActions, CryptActions, RebindActions from .._constants import Clevis, EncryptionMethod from ._shared import ( + CLEVIS_AND_KERNEL, + IN_PLACE, TRUST_URL_OR_THUMBPRINT, UUID_OR_NAME, ClevisEncryptionOptions, @@ -352,6 +354,88 @@ def __init__(self, namespace: Namespace): ] ENCRYPTION_SUBCMDS = [ + ( + "on", + { + "help": "Make encrypted a previously unencrypted pool", + "args": [ + ( + "--post-parser", + { + "action": RejectAction, + "default": ClevisEncryptionOptions, + "help": SUPPRESS, + "nargs": "?", + }, + ) + ] + + IN_PLACE, + "groups": [ + ( + "Pool Identifier", + { + "description": "Choose one option to specify the pool", + "mut_ex_args": [ + (True, UUID_OR_NAME), + ], + }, + ), + ( + "Encryption", + { + "description": "Arguments controlling encryption", + "args": CLEVIS_AND_KERNEL, + }, + ), + ( + "Tang Server Verification (only if --tang-url option is set)", + { + "description": "Choose one option", + "mut_ex_args": [(False, TRUST_URL_OR_THUMBPRINT)], + }, + ), + ], + "func": CryptActions.encrypt, + }, + ), + ( + "off", + { + "help": "Make unencrypted a previously encrypted pool", + "args": IN_PLACE, + "groups": [ + ( + "Pool Identifier", + { + "description": "Choose one option to specify the pool", + "mut_ex_args": [ + (True, UUID_OR_NAME), + ], + }, + ) + ], + "func": CryptActions.unencrypt, + }, + ), + ( + "reencrypt", + { + "help": "Reencrypt an encrypted pool with a new master key", + "args": IN_PLACE, + "groups": [ + ( + "Pool Identifier", + { + "description": "Choose one option to specify the pool", + "mut_ex_args": [ + (True, UUID_OR_NAME), + ], + }, + ) + ], + "func": CryptActions.reencrypt, + }, + ), ( "bind", { diff --git a/src/stratis_cli/_parser/_pool.py b/src/stratis_cli/_parser/_pool.py index a3026a84f..485ef8afc 100644 --- a/src/stratis_cli/_parser/_pool.py +++ b/src/stratis_cli/_parser/_pool.py @@ -26,7 +26,6 @@ from .._actions import BindActions, PoolActions from .._alerts import PoolAlert from .._constants import ( - Clevis, EncryptionMethod, IntegrityOption, IntegrityTagSpec, @@ -36,6 +35,7 @@ from ._debug import POOL_DEBUG_SUBCMDS from ._encryption import BIND_SUBCMDS, ENCRYPTION_SUBCMDS, REBIND_SUBCMDS from ._shared import ( + CLEVIS_AND_KERNEL, KEYFILE_PATH_OR_STDIN, TRUST_URL_OR_THUMBPRINT, UUID_OR_NAME, @@ -114,34 +114,7 @@ def verify(self, namespace: Namespace, parser: ArgumentParser): "Encryption", { "description": "Arguments controlling creation with encryption", - "args": [ - ( - "--key-desc", - { - "help": ( - "Key description of key in kernel keyring to use " - "for encryption" - ), - }, - ), - ( - "--clevis", - { - "type": Clevis, - "help": "Specification for binding with Clevis.", - "choices": list(Clevis), - }, - ), - ( - "--tang-url", - { - "help": ( - "URL of Clevis tang server " - "(--clevis=[tang|nbde] must be set)" - ), - }, - ), - ], + "args": CLEVIS_AND_KERNEL, }, ), ( diff --git a/src/stratis_cli/_parser/_shared.py b/src/stratis_cli/_parser/_shared.py index 91fddd617..2696e8fdd 100644 --- a/src/stratis_cli/_parser/_shared.py +++ b/src/stratis_cli/_parser/_shared.py @@ -273,3 +273,41 @@ def verify(self, namespace, parser): else ClevisInfo(CLEVIS_PIN_TPM2, {}) ) ) + + +CLEVIS_AND_KERNEL = [ + ( + "--key-desc", + { + "help": ("Key description of key in kernel keyring"), + }, + ), + ( + "--clevis", + { + "type": Clevis, + "help": "Specification for binding with Clevis.", + "choices": list(Clevis), + }, + ), + ( + "--tang-url", + { + "help": "URL of Clevis tang server (--clevis=[tang|nbde] must be set)", + }, + ), +] + +IN_PLACE = [ + ( + "--in-place", + { + "action": "store_true", + "help": ( + "Perform the operation in place; requires no additional " + "devices; see man page entry for this option for more " + "information" + ), + }, + ) +] diff --git a/tests/whitebox/integration/pool/test_encryption.py b/tests/whitebox/integration/pool/test_encryption.py index 40c76c1fa..6a93db20a 100644 --- a/tests/whitebox/integration/pool/test_encryption.py +++ b/tests/whitebox/integration/pool/test_encryption.py @@ -17,7 +17,11 @@ # isort: LOCAL from stratis_cli import StratisCliErrorCodes -from stratis_cli._errors import StratisCliEngineError, StratisCliNoChangeError +from stratis_cli._errors import ( + StratisCliEngineError, + StratisCliInPlaceNotSpecified, + StratisCliNoChangeError, +) from .._keyutils import RandomKeyTmpFile from .._misc import RUNNER, TEST_RUNNER, SimTestCase, device_name_list @@ -424,3 +428,278 @@ def test_unbind_with_unused_token_slot(self): "--token-slot=32", ] self.check_error(StratisCliNoChangeError, command_line, _ERROR) + + +class OffTestCase(SimTestCase): + """ + Test turning encryption off when pool is encrypted. + """ + + _MENU = ["--propagate", "pool", "encryption", "off", "--in-place"] + _POOLNAME = "poolname" + _KEY_DESC = "keydesc" + + def setUp(self): + super().setUp() + with RandomKeyTmpFile() as fname: + command_line = [ + "--propagate", + "key", + "set", + "--keyfile-path", + fname, + self._KEY_DESC, + ] + RUNNER(command_line) + + command_line = [ + "--propagate", + "pool", + "create", + "--key-desc", + self._KEY_DESC, + self._POOLNAME, + ] + _DEVICE_STRATEGY() + RUNNER(command_line) + + def test_decrypt_with_name(self): + """ + Decrypt an encrypted pool, specifying the pool by name. + """ + command_line = self._MENU + [ + f"--name={self._POOLNAME}", + ] + RUNNER(command_line) + + +class OffTestCase2(SimTestCase): + """ + Test turning encryption off when pool is not encrypted. + """ + + _MENU = ["--propagate", "pool", "encryption", "off", "--in-place"] + _POOLNAME = "poolname" + + def setUp(self): + super().setUp() + command_line = ["pool", "create", self._POOLNAME] + _DEVICE_STRATEGY() + RUNNER(command_line) + + def test_decrypt_with_name(self): + """ + Decrypting when unencrypted should return an error. + """ + command_line = self._MENU + [ + f"--name={self._POOLNAME}", + ] + self.check_error(StratisCliNoChangeError, command_line, _ERROR) + + +class ReencryptTestCase(SimTestCase): + """ + Test re-encrypting when pool is encrypted. + """ + + _MENU = ["--propagate", "pool", "encryption", "reencrypt", "--in-place"] + _POOLNAME = "poolname" + _KEY_DESC = "keydesc" + + def setUp(self): + super().setUp() + with RandomKeyTmpFile() as fname: + command_line = [ + "--propagate", + "key", + "set", + "--keyfile-path", + fname, + self._KEY_DESC, + ] + RUNNER(command_line) + + command_line = [ + "--propagate", + "pool", + "create", + "--key-desc", + self._KEY_DESC, + self._POOLNAME, + ] + _DEVICE_STRATEGY() + RUNNER(command_line) + + def test_reencrypt_with_name(self): + """ + Re-encrypt an encrypted pool, specifying the pool by name. + """ + command_line = self._MENU + [ + f"--name={self._POOLNAME}", + ] + RUNNER(command_line) + + # Exercise detail view with last reencryption time set + command_line = ["--propagate", "pool", "list", f"--name={self._POOLNAME}"] + RUNNER(command_line) + + +class ReencryptTestCase2(SimTestCase): + """ + Test reencryption when pool is not encrypted. + """ + + _MENU = ["--propagate", "pool", "encryption", "reencrypt", "--in-place"] + _POOLNAME = "poolname" + + def setUp(self): + super().setUp() + command_line = ["pool", "create", self._POOLNAME] + _DEVICE_STRATEGY() + RUNNER(command_line) + + def test_reencrypt_with_name(self): + """ + Reencrypting when unencrypted should return an error. + """ + command_line = self._MENU + [ + f"--name={self._POOLNAME}", + ] + self.check_error(StratisCliEngineError, command_line, _ERROR) + + +class EncryptTestCase(SimTestCase): + """ + Test encrypting when pool is already encrypted. + """ + + _MENU = ["--propagate", "pool", "encryption", "on", "--in-place"] + _POOLNAME = "poolname" + _KEY_DESC = "keydesc" + + def setUp(self): + super().setUp() + with RandomKeyTmpFile() as fname: + command_line = [ + "--propagate", + "key", + "set", + "--keyfile-path", + fname, + self._KEY_DESC, + ] + RUNNER(command_line) + + command_line = [ + "--propagate", + "pool", + "create", + "--key-desc", + self._KEY_DESC, + self._POOLNAME, + ] + _DEVICE_STRATEGY() + RUNNER(command_line) + + def test_encrypt_with_name(self): + """ + Encrypting when already encrypted should return an error. + """ + command_line = self._MENU + [ + f"--name={self._POOLNAME}", + "--clevis=tpm2", + ] + self.check_error(StratisCliNoChangeError, command_line, _ERROR) + + +class EncryptTestCase2(SimTestCase): + """ + Test encrypting when pool is not already encrypted. + """ + + _MENU = ["--propagate", "pool", "encryption", "on", "--in-place"] + _POOLNAME = "poolname" + _KEY_DESC = "keydesc" + + def setUp(self): + super().setUp() + command_line = [ + "--propagate", + "pool", + "create", + self._POOLNAME, + ] + _DEVICE_STRATEGY() + RUNNER(command_line) + + def test_encrypt_with_name(self): + """ + Encrypting when not already encrypted should succeed. + """ + command_line = self._MENU + [ + f"--name={self._POOLNAME}", + "--clevis=tpm2", + ] + TEST_RUNNER(command_line) + + def test_encryption_with_no_encryption_params(self): + """ + Encrypting without any encryption method fully specified should fail. + """ + command_line = self._MENU + [ + f"--name={self._POOLNAME}", + ] + self.check_error(StratisCliEngineError, command_line, _ERROR) + + +class NoInPlaceTestCase(SimTestCase): + """ + Test encrypting when pool is not already encrypted. + """ + + _POOLNAME = "poolname" + _KEY_DESC = "keydesc" + + def setUp(self): + super().setUp() + command_line = [ + "--propagate", + "pool", + "create", + self._POOLNAME, + ] + _DEVICE_STRATEGY() + RUNNER(command_line) + + def test_on(self): + """ + In place must be specified for on. + """ + command_line = [ + "--propagate", + "pool", + "encryption", + "on", + f"--name={self._POOLNAME}", + "--clevis=tpm2", + ] + self.check_error(StratisCliInPlaceNotSpecified, command_line, _ERROR) + + def test_off(self): + """ + In place must be specified for off. + """ + command_line = [ + "--propagate", + "pool", + "encryption", + "off", + f"--name={self._POOLNAME}", + ] + self.check_error(StratisCliInPlaceNotSpecified, command_line, _ERROR) + + def test_reencrypt(self): + """ + In place must be specified for reencrypt. + """ + command_line = [ + "--propagate", + "pool", + "encryption", + "reencrypt", + f"--name={self._POOLNAME}", + ] + self.check_error(StratisCliInPlaceNotSpecified, command_line, _ERROR) diff --git a/tests/whitebox/integration/test_parser.py b/tests/whitebox/integration/test_parser.py index 3a0125a0d..2953a64c3 100644 --- a/tests/whitebox/integration/test_parser.py +++ b/tests/whitebox/integration/test_parser.py @@ -241,14 +241,18 @@ def test_print_all_help(self): class TestClevisOptions(ParserTestCase): """ - Verify that invalid clevis encryption create options are detected. + Verify that invalid clevis encryption options are detected. """ def _do_clevis_test(self, clevis_args): """ Apply clevis args to create command_line and verify parser error. """ - self._do_test(["pool", "create", "pn", "/dev/n"] + clevis_args) + for subcommand in [ + ["pool", "create", "pn", "/dev/n"], + ["pool", "encryption", "on", "--name=pn"], + ]: + self._do_test(subcommand + clevis_args) def test_create_with_clevis_1(self): """ diff --git a/tests/whitebox/unit/test_running.py b/tests/whitebox/unit/test_running.py new file mode 100644 index 000000000..2f1e74493 --- /dev/null +++ b/tests/whitebox/unit/test_running.py @@ -0,0 +1,61 @@ +# Copyright 2025 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test 'long_running_operation'. +""" + +# isort: STDLIB +import unittest + +# isort: THIRDPARTY +import dbus + +# isort: FIRSTPARTY +from dbus_python_client_gen import DPClientInvocationError + +# isort: LOCAL +from stratis_cli._actions._utils import long_running_operation + + +class LongRunningOperationTestCase(unittest.TestCase): + """ + Test long_running_operation error paths that don't show up in the sim + engine. + """ + + def test_raise_dbus_exception(self): + """ + Should succeed because it catches the distinguishing NoReply D-Bus + error. + """ + + def raises_error(_): + raise DPClientInvocationError( + "fake", "intf", None + ) from dbus.exceptions.DBusException( + name="org.freedesktop.DBus.Error.NoReply" + ) + + self.assertIsNone(long_running_operation(raises_error)(None)) + + def test_no_dbus_exception(self): + """ + Should raise an exception that was previously raised. + """ + + def raises_error(_): + raise DPClientInvocationError("fake", "intf", None) + + with self.assertRaises(DPClientInvocationError): + long_running_operation(raises_error)(None)