diff --git a/keepercli-package/src/keepercli/commands/enterprise_user.py b/keepercli-package/src/keepercli/commands/enterprise_user.py index b1408ff..67f08ce 100644 --- a/keepercli-package/src/keepercli/commands/enterprise_user.py +++ b/keepercli-package/src/keepercli/commands/enterprise_user.py @@ -875,7 +875,7 @@ def execute(self, context: KeeperParams, **kwargs) -> None: return if kwargs.get('approve') or kwargs.get('deny'): - self._process_approval_denial(context, enterprise_data, matching_devices, kwargs) + self._process_approval_denial(context, matching_devices, kwargs) else: self._display_report(enterprise_data, matching_devices, kwargs) @@ -1001,14 +1001,14 @@ def _get_trusted_ip_map(self, context: KeeperParams, emails: List[str]) -> Dict[ return ip_map - def _process_approval_denial(self, context: KeeperParams, enterprise_data, + def _process_approval_denial(self, context: KeeperParams, matching_devices: Dict[str, DeviceApprovalRequest], kwargs: Dict[str, Any]) -> None: """Process device approval or denial requests.""" approve_rq = enterprise_pb2.ApproveUserDevicesRequest() data_keys = {} if kwargs.get('approve'): - data_keys = self._collect_user_data_keys(context, enterprise_data, matching_devices) + data_keys = self._collect_user_data_keys(context, matching_devices) device_requests = self._build_device_requests(matching_devices, data_keys, kwargs) if not device_requests: @@ -1019,7 +1019,7 @@ def _process_approval_denial(self, context: KeeperParams, enterprise_data, response_type=enterprise_pb2.ApproveUserDevicesResponse) context.enterprise_loader.load() - def _collect_user_data_keys(self, context: KeeperParams, enterprise_data, + def _collect_user_data_keys(self, context: KeeperParams, matching_devices: Dict[str, DeviceApprovalRequest]) -> Dict[int, bytes]: """Collect user data keys using ECC and RSA methods.""" data_keys: Dict[int, bytes] = {} @@ -1029,23 +1029,23 @@ def _collect_user_data_keys(self, context: KeeperParams, enterprise_data, ecc_user_ids = user_ids.copy() ecc_user_ids.difference_update(data_keys.keys()) if ecc_user_ids: - ecc_keys = self._get_ecc_data_keys(context, enterprise_data, ecc_user_ids) + ecc_keys = self._get_ecc_data_keys(context, ecc_user_ids) data_keys.update(ecc_keys) # Try RSA method for remaining users (Account Transfer) rsa_user_ids = user_ids.copy() rsa_user_ids.difference_update(data_keys.keys()) if rsa_user_ids and not context.auth.auth_context.forbid_rsa: - rsa_keys = self._get_rsa_data_keys(context, enterprise_data, rsa_user_ids) + rsa_keys = self._get_rsa_data_keys(context, rsa_user_ids) data_keys.update(rsa_keys) return data_keys - def _get_ecc_data_keys(self, context: KeeperParams, enterprise_data, user_ids: Set[int]) -> Dict[int, bytes]: + def _get_ecc_data_keys(self, context: KeeperParams, user_ids: Set[int]) -> Dict[int, bytes]: """Get user data keys using ECC encryption.""" data_keys: Dict[int, bytes] = {} curve = ec.SECP256R1() - ecc_private_key = self._get_ecc_private_key(enterprise_data, curve) + ecc_private_key = context.enterprise_data.enterprise_info.ec_private_key if not ecc_private_key: return data_keys @@ -1054,43 +1054,26 @@ def _get_ecc_data_keys(self, context: KeeperParams, enterprise_data, user_ids: S data_key_rq.enterpriseUserId.extend(user_ids) data_key_rs = context.auth.execute_auth_rest( GET_ENTERPRISE_USER_DATA_KEY_ENDPOINT, data_key_rq, - response_type=APIRequest_pb2.EnterpriseUserIdDataKeyPair) + response_type=enterprise_pb2.EnterpriseUserDataKeys) - enc_data_key = data_key_rs.encryptedDataKey - if enc_data_key: - try: - ephemeral_public_key = ec.EllipticCurvePublicKey.from_encoded_point( - curve, enc_data_key[:ECC_PUBLIC_KEY_LENGTH]) - shared_key = ecc_private_key.exchange(ec.ECDH(), ephemeral_public_key) - digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) - digest.update(shared_key) - enc_key = digest.finalize() - data_key = utils.crypto.decrypt_aes_v2(enc_data_key[ECC_PUBLIC_KEY_LENGTH:], enc_key) - data_keys[data_key_rs.enterpriseUserId] = data_key - except Exception as e: - logger.debug(e) + for key in data_key_rs.keys: + enc_data_key = key.userEncryptedDataKey + if enc_data_key: + try: + ephemeral_public_key = ec.EllipticCurvePublicKey.from_encoded_point( + curve, enc_data_key[:ECC_PUBLIC_KEY_LENGTH]) + shared_key = ecc_private_key.exchange(ec.ECDH(), ephemeral_public_key) + digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) + digest.update(shared_key) + enc_key = digest.finalize() + data_key = utils.crypto.decrypt_aes_v2(enc_data_key[ECC_PUBLIC_KEY_LENGTH:], enc_key) + data_keys[key.enterpriseUserId] = data_key + except Exception as e: + logger.debug(e) return data_keys - def _get_ecc_private_key(self, enterprise_data, curve) -> Optional[Any]: - """Extract and decrypt the ECC private key from enterprise data.""" - if not enterprise_data.enterprise_info.keys: - return None - if not enterprise_data.enterprise_info.keys.ecc_encrypted_private_key: - return None - - try: - keys = enterprise_data.get_enterprise_data_keys() - ecc_private_key_data = utils.base64_url_decode(keys.ecc_encrypted_private_key) - ecc_private_key_data = utils.crypto.decrypt_aes_v2( - ecc_private_key_data, enterprise_data.enterprise_info.tree_key) - private_value = int.from_bytes(ecc_private_key_data, byteorder='big', signed=False) - return ec.derive_private_key(private_value, curve, default_backend()) - except Exception as e: - logger.debug(e) - return None - - def _get_rsa_data_keys(self, context: KeeperParams, enterprise_data, user_ids: Set[int]) -> Dict[int, bytes]: + def _get_rsa_data_keys(self, context: KeeperParams, user_ids: Set[int]) -> Dict[int, bytes]: """Get user data keys from Account Transfer using RSA encryption.""" data_keys: Dict[int, bytes] = {} data_key_rq = APIRequest_pb2.UserDataKeyRequest() @@ -1101,7 +1084,7 @@ def _get_rsa_data_keys(self, context: KeeperParams, enterprise_data, user_ids: S if data_key_rs.noEncryptedDataKey: user_ids_without_key = set(data_key_rs.noEncryptedDataKey) - usernames = [x.username for x in enterprise_data.users.get_all_entities() + usernames = [x.username for x in context.enterprise_data.users.get_all_entities() if x.enterprise_user_id in user_ids_without_key] if usernames: logger.info('User(s) \"%s\" have no accepted account transfers or did not share encryption key', @@ -1109,7 +1092,7 @@ def _get_rsa_data_keys(self, context: KeeperParams, enterprise_data, user_ids: S if data_key_rs.accessDenied: denied_user_ids = set(data_key_rs.accessDenied) - usernames = [x.username for x in enterprise_data.users.get_all_entities() + usernames = [x.username for x in context.enterprise_data.users.get_all_entities() if x.enterprise_user_id in denied_user_ids] if usernames: logger.info('You cannot manage these user(s): %s', ', '.join(usernames)) @@ -1117,7 +1100,7 @@ def _get_rsa_data_keys(self, context: KeeperParams, enterprise_data, user_ids: S if data_key_rs.userDataKeys: for dk in data_key_rs.userDataKeys: try: - role_key = utils.crypto.decrypt_aes_v2(dk.roleKey, enterprise_data.enterprise_info.tree_key) + role_key = utils.crypto.decrypt_aes_v2(dk.roleKey, context.enterprise_data.enterprise_info.tree_key) encrypted_private_key = utils.base64_url_decode(dk.privateKey) decrypted_private_key = utils.crypto.decrypt_aes_v1(encrypted_private_key, role_key) private_key = utils.crypto.load_rsa_private_key(decrypted_private_key) diff --git a/keepercli-package/src/keepercli/commands/two_fa.py b/keepercli-package/src/keepercli/commands/two_fa.py new file mode 100644 index 0000000..258bf40 --- /dev/null +++ b/keepercli-package/src/keepercli/commands/two_fa.py @@ -0,0 +1,593 @@ +import argparse +import datetime +import json + +from . import base +from ..params import KeeperParams +from ..helpers import report_utils +from .. import api, prompt_utils + +from keepersdk import utils +from keepersdk.proto import APIRequest_pb2 +from keepersdk.authentication import two_fa_utils +from keepersdk.errors import KeeperApiError + +logger = api.get_logger() + +# Constants +NEVER_EXPIRE_TIMESTAMP = 3_000_000_000_000 +MILLISECONDS_TO_SECONDS = 1000 + +METHOD_CHOICES = ['totp', 'key', 'sms', 'duo', 'backup'] + +# TFA Restriction keys +ALL_TFA_RESTRICTIONS = { + 'require_security_key_pin', + 'restrict_two_factor_channel_text', + 'restrict_two_factor_channel_google', + 'restrict_two_factor_channel_duo', + 'restrict_two_factor_channel_security_key', + 'restrict_two_factor_channel_rsa', + 'restrict_two_factor_channel_dna' +} + +# Duo capabilities +DUO_CAPABILITIES = ('mobile_otp', 'sms', 'voice') +DUO_CAPABILITY_SMS = 'sms' +DUO_CAPABILITY_VOICE = 'voice' +DUO_CAPABILITY_MOBILE_OTP = 'mobile_otp' +DUO_CAPABILITY_DISPLAY_NAMES = { + 'sms': 'Send a Text Message', + 'voice': 'Make a Voice Call', + 'mobile_otp': 'OTP Code on Mobile' +} + +# Method to Channel Type mapping +METHOD_TO_CHANNEL_TYPE = { + 'totp': APIRequest_pb2.TWO_FA_CT_TOTP, + 'sms': APIRequest_pb2.TWO_FA_CT_SMS, + 'key': APIRequest_pb2.TWO_FA_CT_WEBAUTHN, + 'duo': APIRequest_pb2.TWO_FA_CT_DUO, + 'backup': APIRequest_pb2.TWO_FA_CT_BACKUP +} + +# Method to restriction key mapping +METHOD_TO_RESTRICTION = { + 'totp': 'restrict_two_factor_channel_google', + 'sms': 'restrict_two_factor_channel_text', + 'key': 'restrict_two_factor_channel_security_key' +} + +# Channel Type to Value Type mapping for validation +CHANNEL_TYPE_TO_VALUE_TYPE = { + APIRequest_pb2.TWO_FA_CT_TOTP: APIRequest_pb2.TWO_FA_CODE_TOTP, + APIRequest_pb2.TWO_FA_CT_SMS: APIRequest_pb2.TWO_FA_CODE_SMS, + APIRequest_pb2.TWO_FA_CT_WEBAUTHN: APIRequest_pb2.TWO_FA_RESP_WEBAUTHN, + APIRequest_pb2.TWO_FA_CT_DUO: APIRequest_pb2.TWO_FA_CODE_DUO +} + +# Error messages +ERROR_VAULT_NOT_INITIALIZED = "Vault is not initialized. Login to initialize the vault." +ERROR_METHOD_DISABLED_TOTP = 'Authenticator App (TOTP) 2FA method is disabled by the Administrator' +ERROR_METHOD_DISABLED_SMS = 'Text Message (SMS) 2FA method is disabled by the Administrator' +ERROR_METHOD_DISABLED_KEY = 'Security Key 2FA method is disabled by the Administrator' +ERROR_METHOD_NOT_SUPPORTED = '2FA method "{}" is not supported' +ERROR_NAME_REQUIRED = '"name" argument is required' +ERROR_CHANNEL_NOT_FOUND = '2FA channel "{}" not found' + +# Messages +MSG_NO_2FA_METHODS = 'No 2FA methods are found' +MSG_2FA_EXPIRES = '2FA authentication expires: %s\n' +MSG_2FA_METHOD_ADDED = '2FA method is added' +MSG_2FA_CHANNEL_DELETED = '2FA channel is deleted' +MSG_ENTER_PHONE = '\nEnter your phone number for text messages: ' +MSG_DUO_ENROLL_URL = "Enroll URL" +MSG_DUO_DEVICE_PHONE = 'Device Phone Number: {}' +MSG_DUO_SELECTION_PROMPT = 'We\'ll send you a text message or call with a passcode to your device:' +MSG_DUO_CANCEL = ' q. Cancel' +MSG_DUO_SELECTION = 'Selection: ' +MSG_DUO_ACTION_NOT_SUPPORTED = 'Action "{}" is not supported.' +MSG_VERIFICATION_CODE = 'Verification Code: ' +MSG_INVALID_2FA_CODE = 'Invalid 2FA code: (%s): %s ' +MSG_QR_CODE_NOT_INSTALLED = 'QR Code library is not installed.\npip install pyqrcode' +MSG_DELETE_CHANNEL_PROMPT = 'Do you want to delete 2FA channel "{}"?' + +REPORT_HEADERS = ['method', 'channel_uid', 'name', + 'created', 'phone_number'] + +BACKUP_CODES_TITLE = 'Backup Codes' +TOTP_URL_TEMPLATE = 'otpauth://totp/Keeper:{}?secret={}' +QR_CODE_COLORS = ('black', 'white') +CANCEL_CHOICES = ('q', 'Q') +YES_CHOICES = ('y', 'Y') +DEFAULT_CONFIRMATION = 'n' + +class TwoFaCommand(base.GroupCommand): + + def __init__(self): + super().__init__('Two-Factor Authentication') + self.register_command(ListTwoFaCommand(), 'list') + self.register_command(AddTwoFaCommand(), 'add') + self.register_command(DeleteTwoFaCommand(), 'delete') + self.default_verb = 'list' + + @staticmethod + def two_factor_channel_to_desc(channel): + """Convert channel type to human-readable description.""" + channel_descriptions = { + APIRequest_pb2.TWO_FA_CT_TOTP: 'TOTP', + APIRequest_pb2.TWO_FA_CT_SMS: 'SMS', + APIRequest_pb2.TWO_FA_CT_DUO: 'DUO', + APIRequest_pb2.TWO_FA_CT_RSA: 'RSA SecurID', + APIRequest_pb2.TWO_FA_CT_U2F: 'U2F', + APIRequest_pb2.TWO_FA_CT_WEBAUTHN: 'Security Key', + APIRequest_pb2.TWO_FA_CT_DNA: 'Keeper DNA (Watch)', + APIRequest_pb2.TWO_FA_CT_BACKUP: 'Backup Codes' + } + return channel_descriptions.get(channel, 'Unknown') + +class ListTwoFaCommand(base.ArgparseCommand): + + def __init__(self): + parser = argparse.ArgumentParser( + prog='2fa list', + description='List all two-factor authentication methods', + parents=[base.report_output_parser] + ) + super().__init__(parser) + + def execute(self, context: KeeperParams, **kwargs): + if not context.vault: + raise ValueError(ERROR_VAULT_NOT_INITIALIZED) + + vault = context.vault + response = two_fa_utils.get_two_fa_list(vault) + + if not response or response.expireOn <= 0: + logger.info(MSG_NO_2FA_METHODS) + return + + expire_at = self._format_expiry_time(response.expireOn) + if expire_at: + logger.info(MSG_2FA_EXPIRES, expire_at) + + table = self._build_channel_table(response.channels) + fmt = kwargs.get('format') + header = self._get_report_headers(fmt) + + return report_utils.dump_report_data( + table, header, fmt=fmt, filename=kwargs.get('output'), row_number=True + ) + + @staticmethod + def _format_expiry_time(expire_on): + """Format expiry timestamp to human-readable string.""" + if expire_on > NEVER_EXPIRE_TIMESTAMP: + return 'Never' + dt = datetime.datetime.fromtimestamp(expire_on // MILLISECONDS_TO_SECONDS) + return dt.isoformat() + + @staticmethod + def _build_channel_table(channels): + """Build table rows from channel data.""" + table = [] + for channel in channels: + created_on = datetime.datetime.fromtimestamp( + channel.createdOn // MILLISECONDS_TO_SECONDS + ) + row = [ + TwoFaCommand.two_factor_channel_to_desc(channel.channelType), + utils.base64_url_encode(channel.channel_uid), + channel.channelName, + created_on, + channel.phoneNumber + ] + table.append(row) + return table + + @staticmethod + def _get_report_headers(format_type): + """Get report headers, formatted if not JSON.""" + if format_type == 'json': + return REPORT_HEADERS + return [report_utils.field_to_title(x) for x in REPORT_HEADERS] + +class AddTwoFaCommand(base.ArgparseCommand): + + def __init__(self): + parser = argparse.ArgumentParser(prog='2fa add', description='Add 2FA method') + AddTwoFaCommand.add_arguments_to_parser(parser) + super().__init__(parser) + + @staticmethod + def add_arguments_to_parser(parser: argparse.ArgumentParser): + parser.add_argument( + '--method', '-m', + dest='method', + action='store', + required=True, + choices=METHOD_CHOICES, + help='2FA auth method' + ) + parser.add_argument('--name', dest='name', action='store', help='2FA auth name') + parser.add_argument( + '--key-pin', + dest='key_pin', + action='store_true', + help='force using Security Key PIN' + ) + + def execute(self, context: KeeperParams, **kwargs): + """Execute the add 2FA method command.""" + if not context.vault: + raise ValueError(ERROR_VAULT_NOT_INITIALIZED) + + vault = context.vault + method = kwargs.get('method') + tfa_restrictions = self._get_tfa_restrictions(vault) + + channel_type = self._get_channel_type(method, tfa_restrictions) + channel_uid = utils.base64_url_decode(utils.generate_uid()) + channel_name = kwargs.get('name') or '' + + phone_number, duo_push_type = self._handle_channel_setup( + vault, channel_type, context + ) + if self._is_setup_cancelled(channel_type, phone_number, duo_push_type): + return + + response = two_fa_utils.add_two_fa_method( + vault=vault, + channel_type=channel_type, + channel_uid=channel_uid, + channel_name=channel_name, + phone_number=phone_number or '', + duo_push_type=duo_push_type or APIRequest_pb2.TWO_FA_PUSH_NONE + ) + + if self._handle_channel_specific_setup( + vault, channel_type, response, channel_uid, context, kwargs + ): + return + + value_type = self._get_value_type_for_channel(channel_type) + self._validate_2fa_code(vault, channel_uid, value_type) + + @staticmethod + def _get_tfa_restrictions(vault): + """Get TFA restrictions from vault enforcements.""" + tfa_restrictions = set() + enforcements = vault.keeper_auth.auth_context.enforcements + if not enforcements or 'booleans' not in enforcements: + return tfa_restrictions + + booleans = enforcements['booleans'] + for item in booleans: + key = (item.get('key') or '').lower() + if key in ALL_TFA_RESTRICTIONS: + tfa_restrictions.add(key) + + return tfa_restrictions + + @staticmethod + def _get_channel_type(method, tfa_restrictions): + """Get channel type for method and validate restrictions.""" + if method not in METHOD_TO_CHANNEL_TYPE: + raise base.CommandError(ERROR_METHOD_NOT_SUPPORTED.format(method)) + + restriction_key = METHOD_TO_RESTRICTION.get(method) + if restriction_key and restriction_key in tfa_restrictions: + error_messages = { + 'totp': ERROR_METHOD_DISABLED_TOTP, + 'sms': ERROR_METHOD_DISABLED_SMS, + 'key': ERROR_METHOD_DISABLED_KEY + } + raise base.CommandError(error_messages.get(method, '')) + + return METHOD_TO_CHANNEL_TYPE[method] + + def _handle_channel_setup(self, vault, channel_type, context): + """Handle channel-specific setup (SMS phone, Duo selection).""" + phone_number = None + duo_push_type = None + + if channel_type == APIRequest_pb2.TWO_FA_CT_SMS: + phone_number = self._handle_sms_setup() + elif channel_type == APIRequest_pb2.TWO_FA_CT_DUO: + duo_push_type = self._handle_duo_setup(vault) + + return phone_number, duo_push_type + + @staticmethod + def _is_setup_cancelled(channel_type, phone_number, duo_push_type): + """Check if setup was cancelled by user.""" + if channel_type == APIRequest_pb2.TWO_FA_CT_SMS: + return phone_number is None + if channel_type == APIRequest_pb2.TWO_FA_CT_DUO: + return duo_push_type is None + return False + + @staticmethod + def _handle_sms_setup(): + """Handle SMS phone number input.""" + try: + phone_number = input(MSG_ENTER_PHONE) + return phone_number if phone_number else None + except KeyboardInterrupt: + return None + + def _handle_duo_setup(self, vault): + """Handle Duo setup and return push type.""" + duo_response = vault.keeper_auth.execute_auth_rest( + rest_endpoint='authentication/2fa_duo_status', + request=None, + response_type=APIRequest_pb2.TwoFactorDuoStatus + ) + + if duo_response.enroll_url: + logger.warning(duo_response.message) + logger.warning(MSG_DUO_ENROLL_URL) + logger.info(duo_response.enroll_url) + return None + + capabilities = [ + cap for cap in duo_response.capabilities + if cap in DUO_CAPABILITIES + ] + + if not capabilities: + return None + + logger.info(MSG_DUO_DEVICE_PHONE.format(duo_response.phoneNumber)) + logger.info(MSG_DUO_SELECTION_PROMPT) + + for idx, capability in enumerate(capabilities, 1): + display_name = DUO_CAPABILITY_DISPLAY_NAMES.get( + capability, capability + ) + logger.info(f' {idx}. {display_name}') + + logger.info(MSG_DUO_CANCEL) + + return self._get_duo_push_type_selection(capabilities) + + @staticmethod + def _get_duo_push_type_selection(capabilities): + """Get Duo push type from user selection.""" + while True: + try: + answer = input(MSG_DUO_SELECTION) + if answer in CANCEL_CHOICES: + return None + + if answer and answer.isnumeric(): + code = int(answer) + if 0 < code <= len(capabilities): + selected_capability = capabilities[code - 1] + return { + DUO_CAPABILITY_SMS: APIRequest_pb2.TWO_FA_PUSH_DUO_TEXT, + DUO_CAPABILITY_VOICE: APIRequest_pb2.TWO_FA_PUSH_DUO_CALL, + DUO_CAPABILITY_MOBILE_OTP: APIRequest_pb2.TWO_FA_PUSH_NONE + }.get(selected_capability, APIRequest_pb2.TWO_FA_PUSH_NONE) + + logger.info(MSG_DUO_ACTION_NOT_SUPPORTED.format(answer)) + except (KeyboardInterrupt, ValueError): + return None + + def _handle_channel_specific_setup(self, vault, channel_type, response, + channel_uid, context, kwargs): + """Handle channel-specific post-setup (backup codes, webauthn, totp).""" + if channel_type == APIRequest_pb2.TWO_FA_CT_BACKUP: + self._handle_backup_codes(response) + return True + + if channel_type == APIRequest_pb2.TWO_FA_CT_WEBAUTHN: + self._handle_webauthn_setup(vault, response, channel_uid, kwargs) + return True + + if channel_type == APIRequest_pb2.TWO_FA_CT_TOTP: + self._handle_totp_setup(response, context) + + return False + + @staticmethod + def _handle_backup_codes(response): + """Display backup codes.""" + codes = list(response.backupKeys) + table = [] + for idx in range(0, len(codes), 2): + table.append(codes[idx:idx + 2]) + report_utils.dump_report_data( + table, ('', ''), title=BACKUP_CODES_TITLE, no_header=True + ) + + @staticmethod + def _handle_webauthn_setup(vault, response, channel_uid, kwargs): + """Handle WebAuthn security key setup.""" + try: + from ..login import FidoCliInteraction + from keepersdk.authentication.yubikey import yubikey_register + + request = json.loads(response.challenge) + force_pin = kwargs.get('key_pin') is True + fido_response = yubikey_register( + request, force_pin, user_interaction=FidoCliInteraction() + ) + + if not fido_response: + return + + attestation = { + 'id': fido_response.id, + 'rawId': utils.base64_url_encode(fido_response.raw_id), + 'response': { + 'attestationObject': utils.base64_url_encode( + fido_response.response.attestation_object + ), + 'clientDataJSON': fido_response.response.client_data.b64 + }, + 'type': 'public-key', + 'clientExtensionResults': ( + dict(fido_response.client_extension_results) + if fido_response.client_extension_results else {} + ) + } + + two_fa_utils.validate_two_fa_method( + vault=vault, + channel_uid=channel_uid, + value_type=APIRequest_pb2.TWO_FA_RESP_WEBAUTHN, + value=json.dumps(attestation), + expire_in=APIRequest_pb2.TWO_FA_EXP_IMMEDIATELY + ) + logger.info(MSG_2FA_METHOD_ADDED) + except ImportError as e: + logger.warning(e) + display_fido2_warning() + except Exception as e: + logger.warning(e) + + @staticmethod + def _handle_totp_setup(response, context): + """Handle TOTP setup and display QR code.""" + url = TOTP_URL_TEMPLATE.format( + context.auth.auth_context.username, + response.challenge + ) + logger.info(f'TOTP URL:\n{url}') + + try: + import pyqrcode + qr_code = pyqrcode.create(url) + logger.info(qr_code.terminal(*QR_CODE_COLORS)) + except ModuleNotFoundError: + logger.error(MSG_QR_CODE_NOT_INSTALLED) + + @staticmethod + def _get_value_type_for_channel(channel_type): + """Get value type for channel type validation.""" + return CHANNEL_TYPE_TO_VALUE_TYPE.get( + channel_type, + APIRequest_pb2.TWO_FA_CODE_TOTP + ) + + @staticmethod + def _validate_2fa_code(vault, channel_uid, value_type): + """Validate 2FA code with user input.""" + while True: + try: + answer = input(MSG_VERIFICATION_CODE) + if not answer: + continue + + try: + two_fa_utils.validate_two_fa_method( + vault=vault, + channel_uid=channel_uid, + value_type=value_type, + value=answer, + expire_in=APIRequest_pb2.TWO_FA_EXP_IMMEDIATELY + ) + logger.info(MSG_2FA_METHOD_ADDED) + return + except KeeperApiError as kae: + logger.warning( + MSG_INVALID_2FA_CODE, + kae.result_code, + kae.message + ) + except KeyboardInterrupt: + return + + +warned_on_fido_package = False +install_fido_package_warning = """ + You can use Security Key with KeeperSDK: + Upgrade your Python interpreter to 3.10 or newer + and make sure fido2 package is 2.0.0 or newer +""" + + +def display_fido2_warning(): + global warned_on_fido_package + + if not warned_on_fido_package: + logger.warning(install_fido_package_warning) + warned_on_fido_package = True + + +class DeleteTwoFaCommand(base.ArgparseCommand): + + def __init__(self): + parser = argparse.ArgumentParser( + prog='2fa delete', + description='Delete a two-factor authentication method' + ) + DeleteTwoFaCommand.add_arguments_to_parser(parser) + super().__init__(parser) + + @staticmethod + def add_arguments_to_parser(parser: argparse.ArgumentParser): + parser.add_argument( + '--force', + dest='force', + action='store_true', + help='do not prompt for confirmation' + ) + parser.add_argument('name', help='2FA method UID or name') + + def execute(self, context: KeeperParams, **kwargs): + if not context.vault: + raise ValueError(ERROR_VAULT_NOT_INITIALIZED) + + vault = context.vault + name = kwargs.get('name') + + if not name: + raise base.CommandError(ERROR_NAME_REQUIRED) + + response = two_fa_utils.get_two_fa_list(vault) + if not response: + logger.info(MSG_NO_2FA_METHODS) + return + + channel = self._find_channel_by_name(response.channels, name) + if not channel: + raise base.CommandError(ERROR_CHANNEL_NOT_FOUND.format(name)) + + if not self._confirm_deletion(channel, kwargs.get('force')): + return + + two_fa_utils.delete_two_fa_method(vault, channel.channel_uid) + logger.info(MSG_2FA_CHANNEL_DELETED) + + @staticmethod + def _find_channel_by_name(channels, name): + """Find channel by UID or name (case-insensitive).""" + channel = next( + (ch for ch in channels if utils.base64_url_encode(ch.channel_uid) == name), + None + ) + + if channel: + return channel + + name_lower = name.casefold() + return next( + (ch for ch in channels if ch.channelName.casefold() == name_lower), + None + ) + + @staticmethod + def _confirm_deletion(channel, force): + """Confirm deletion with user if not forced.""" + if force: + return True + + channel_name = channel.channelName or utils.base64_url_encode(channel.channel_uid) + answer = prompt_utils.user_choice( + MSG_DELETE_CHANNEL_PROMPT.format(channel_name), + 'yn', + DEFAULT_CONFIRMATION + ) + return answer in YES_CHOICES diff --git a/keepercli-package/src/keepercli/register_commands.py b/keepercli-package/src/keepercli/register_commands.py index 1e3facd..3e60932 100644 --- a/keepercli-package/src/keepercli/register_commands.py +++ b/keepercli-package/src/keepercli/register_commands.py @@ -14,7 +14,7 @@ def register_commands(commands: base.CliCommands, scopes: Optional[base.CommandS if not scopes or bool(scopes & base.CommandScope.Account): from .commands import account_commands from .biometric import BiometricCommand - from .commands import account_commands + from .commands import account_commands, two_fa commands.register_command('server', base.GetterSetterCommand('server', 'Sets or displays current Keeper region'), base.CommandScope.Account) @@ -24,6 +24,7 @@ def register_commands(commands: base.CliCommands, scopes: Optional[base.CommandS commands.register_command('this-device', account_commands.ThisDeviceCommand(), base.CommandScope.Account) commands.register_command('whoami', account_commands.WhoamiCommand(), base.CommandScope.Account) commands.register_command('reset-password', account_commands.ResetPasswordCommand(), base.CommandScope.Account) + commands.register_command('2fa', two_fa.TwoFaCommand(), base.CommandScope.Account) if not scopes or bool(scopes & base.CommandScope.Vault): diff --git a/keepersdk-package/src/keepersdk/authentication/two_fa_utils.py b/keepersdk-package/src/keepersdk/authentication/two_fa_utils.py new file mode 100644 index 0000000..fb89b94 --- /dev/null +++ b/keepersdk-package/src/keepersdk/authentication/two_fa_utils.py @@ -0,0 +1,76 @@ +from ..proto import APIRequest_pb2 +from ..vault import vault_online + +# REST endpoint constants +REST_ENDPOINT_2FA_LIST = 'authentication/2fa_list' +REST_ENDPOINT_2FA_ADD = 'authentication/2fa_add' +REST_ENDPOINT_2FA_ADD_VALIDATE = 'authentication/2fa_add_validate' +REST_ENDPOINT_2FA_DELETE = 'authentication/2fa_delete' + +DEFAULT_DUO_PUSH_TYPE = APIRequest_pb2.TWO_FA_PUSH_NONE +DEFAULT_EXPIRE_IN = APIRequest_pb2.TWO_FA_EXP_IMMEDIATELY + + +def get_two_fa_list(vault: vault_online.VaultOnline) -> APIRequest_pb2.TwoFactorListResponse: + """Retrieve the list of two-factor authentication methods.""" + return vault.keeper_auth.execute_auth_rest( + rest_endpoint=REST_ENDPOINT_2FA_LIST, + request=None, + response_type=APIRequest_pb2.TwoFactorListResponse + ) + + +def add_two_fa_method( + vault: vault_online.VaultOnline, + channel_type: APIRequest_pb2.TwoFactorChannelType, + channel_uid: bytes, + channel_name: str = '', + phone_number: str = '', + duo_push_type: APIRequest_pb2.TwoFactorPushType = DEFAULT_DUO_PUSH_TYPE +) -> APIRequest_pb2.TwoFactorAddResponse: + """Add a new two-factor authentication method.""" + request = APIRequest_pb2.TwoFactorAddRequest() + request.channelType = channel_type + request.channel_uid = channel_uid + request.channelName = channel_name + request.phoneNumber = phone_number + request.duoPushType = duo_push_type + + return vault.keeper_auth.execute_auth_rest( + rest_endpoint=REST_ENDPOINT_2FA_ADD, + request=request, + response_type=APIRequest_pb2.TwoFactorAddResponse + ) + + +def validate_two_fa_method( + vault: vault_online.VaultOnline, + channel_uid: bytes, + value_type: APIRequest_pb2.TwoFactorValueType, + value: str, + expire_in: APIRequest_pb2.TwoFactorExpiration = DEFAULT_EXPIRE_IN +) -> None: + """Validate a two-factor authentication method.""" + request = APIRequest_pb2.TwoFactorValidateRequest() + request.channel_uid = channel_uid + request.valueType = value_type + request.value = value + request.expireIn = expire_in + + vault.keeper_auth.execute_auth_rest( + rest_endpoint=REST_ENDPOINT_2FA_ADD_VALIDATE, + request=request, + response_type=None + ) + + +def delete_two_fa_method(vault: vault_online.VaultOnline, channel_uid: bytes) -> None: + """Delete a two-factor authentication method.""" + request = APIRequest_pb2.TwoFactorDeleteRequest() + request.channel_uid = channel_uid + + vault.keeper_auth.execute_auth_rest( + rest_endpoint=REST_ENDPOINT_2FA_DELETE, + request=request, + response_type=None + ) \ No newline at end of file diff --git a/keepersdk-package/src/keepersdk/authentication/yubikey.py b/keepersdk-package/src/keepersdk/authentication/yubikey.py index 4ff6e7b..2b650c8 100644 --- a/keepersdk-package/src/keepersdk/authentication/yubikey.py +++ b/keepersdk-package/src/keepersdk/authentication/yubikey.py @@ -1,5 +1,7 @@ import abc +import getpass import json +import logging import os import threading from typing import Optional, Any, Dict @@ -7,8 +9,16 @@ from fido2.client import ClientError, DefaultClientDataCollector, UserInteraction, WebAuthnClient from fido2.ctap import CtapError from fido2.hid import CtapHidDevice -from fido2.webauthn import PublicKeyCredentialRequestOptions, UserVerificationRequirement, AuthenticationResponse +from fido2.webauthn import PublicKeyCredentialRequestOptions, UserVerificationRequirement, AuthenticationResponse, PublicKeyCredentialCreationOptions +from fido2.ctap2 import Ctap2, ClientPin from .. import utils +from prompt_toolkit import PromptSession + + +prompt_session = None +if os.isatty(0) and os.isatty(1): + prompt_session = PromptSession(multiline=False, complete_while_typing=False) + class IKeeperUserInteraction(abc.ABC): @abc.abstractmethod @@ -111,3 +121,87 @@ def yubikey_authenticate(request: Dict[str, Any], user_interaction: UserInteract } return json.dumps(signature) return None + + +def yubikey_register(request, force_pin=False, user_interaction: Optional[UserInteraction] = None): + logger = utils.get_logger() + + rq = request.copy() + user_id = rq['user']['id'] + if isinstance(user_id, str): + rq['user']['id'] = utils.base64_url_decode(user_id) + challenge = rq['challenge'] + if isinstance(challenge, str): + rq['challenge'] = utils.base64_url_decode(challenge) + + if force_pin: + uv = rq['authenticatorSelection']['userVerification'] + if uv != UserVerificationRequirement.REQUIRED: + rq['authenticatorSelection']['userVerification'] = UserVerificationRequirement.REQUIRED + + options = PublicKeyCredentialCreationOptions.from_dict(rq) + origin = options.extensions.get('appidExclude') or options.rp.id + + client = None + data_collector = DefaultClientDataCollector(origin, verify=verify_rp_id_none) + if os.name == 'nt': + from fido2.client.windows import WindowsClient + client = WindowsClient(client_data_collector=data_collector) + else: + dev = next(CtapHidDevice.list_devices(), None) + if not dev: + logger.warning("No Security Key detected") + return None + + from fido2.client import Fido2Client + fido_client = Fido2Client(dev, client_data_collector=data_collector, user_interaction=user_interaction) + uv_configured = any(fido_client.info.options.get(k) for k in ("uv", "clientPin", "bioEnroll")) + uv = options.authenticator_selection.user_verification + if uv == UserVerificationRequirement.REQUIRED: + if not uv_configured: + print('\nSecret Key PIN is required') + answer = input('Do you want to setup PIN code for your Secret Key? (y/n): ') + if answer not in ('y', 'Y'): + return None + prompt1 = ' PIN Code: ' + prompt2 = ' PIN Code Again: ' + if prompt_session: + pin1 = prompt_session.prompt(prompt1, is_password=True) + else: + pin1 = getpass.getpass(prompt1) + if not pin1: + raise Exception('PIN is required') + if prompt_session: + pin2 = prompt_session.prompt(prompt2, is_password=True) + else: + pin2 = getpass.getpass(prompt2) + if not pin2: + raise Exception('PIN is required') + if pin1 != pin2: + raise Exception('PINs do not match') + client_pin = ClientPin(Ctap2(dev)) + client_pin.set_pin(pin1) + elif uv == UserVerificationRequirement.PREFERRED: + if not uv_configured: + rq['authenticatorSelection']['userVerification'] = UserVerificationRequirement.DISCOURAGED + options = PublicKeyCredentialCreationOptions.from_dict(rq) + client = fido_client + + evt = threading.Event() + try: + try: + return client.make_credential(options, event=evt) + except ClientError as err: + if isinstance(err.cause, CtapError): + if err.cause.code == CtapError.ERR.PIN_INVALID: + raise Exception('PIN is invalid') + elif err.cause.code == CtapError.ERR.PIN_AUTH_BLOCKED: + raise Exception('PIN is blocked') + elif isinstance(err.cause, str): + if err.code == ClientError.ERR.CONFIGURATION_UNSUPPORTED: + raise Exception('Security key user verification (PIN or Biometric) is not configured') + raise err + except KeyboardInterrupt: + pass + finally: + evt.set()