Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 27 additions & 44 deletions keepercli-package/src/keepercli/commands/enterprise_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ def execute(self, context: KeeperParams, **kwargs) -> None:
return

if kwargs.get('approve') or kwargs.get('deny'):
self._process_approval_denial(context, enterprise_data, matching_devices, kwargs)
self._process_approval_denial(context, matching_devices, kwargs)
else:
self._display_report(enterprise_data, matching_devices, kwargs)

Expand Down Expand Up @@ -1001,14 +1001,14 @@ def _get_trusted_ip_map(self, context: KeeperParams, emails: List[str]) -> Dict[

return ip_map

def _process_approval_denial(self, context: KeeperParams, enterprise_data,
def _process_approval_denial(self, context: KeeperParams,
matching_devices: Dict[str, DeviceApprovalRequest], kwargs: Dict[str, Any]) -> None:
"""Process device approval or denial requests."""
approve_rq = enterprise_pb2.ApproveUserDevicesRequest()
data_keys = {}

if kwargs.get('approve'):
data_keys = self._collect_user_data_keys(context, enterprise_data, matching_devices)
data_keys = self._collect_user_data_keys(context, matching_devices)

device_requests = self._build_device_requests(matching_devices, data_keys, kwargs)
if not device_requests:
Expand All @@ -1019,7 +1019,7 @@ def _process_approval_denial(self, context: KeeperParams, enterprise_data,
response_type=enterprise_pb2.ApproveUserDevicesResponse)
context.enterprise_loader.load()

def _collect_user_data_keys(self, context: KeeperParams, enterprise_data,
def _collect_user_data_keys(self, context: KeeperParams,
matching_devices: Dict[str, DeviceApprovalRequest]) -> Dict[int, bytes]:
"""Collect user data keys using ECC and RSA methods."""
data_keys: Dict[int, bytes] = {}
Expand All @@ -1029,23 +1029,23 @@ def _collect_user_data_keys(self, context: KeeperParams, enterprise_data,
ecc_user_ids = user_ids.copy()
ecc_user_ids.difference_update(data_keys.keys())
if ecc_user_ids:
ecc_keys = self._get_ecc_data_keys(context, enterprise_data, ecc_user_ids)
ecc_keys = self._get_ecc_data_keys(context, ecc_user_ids)
data_keys.update(ecc_keys)

# Try RSA method for remaining users (Account Transfer)
rsa_user_ids = user_ids.copy()
rsa_user_ids.difference_update(data_keys.keys())
if rsa_user_ids and not context.auth.auth_context.forbid_rsa:
rsa_keys = self._get_rsa_data_keys(context, enterprise_data, rsa_user_ids)
rsa_keys = self._get_rsa_data_keys(context, rsa_user_ids)
data_keys.update(rsa_keys)

return data_keys

def _get_ecc_data_keys(self, context: KeeperParams, enterprise_data, user_ids: Set[int]) -> Dict[int, bytes]:
def _get_ecc_data_keys(self, context: KeeperParams, user_ids: Set[int]) -> Dict[int, bytes]:
"""Get user data keys using ECC encryption."""
data_keys: Dict[int, bytes] = {}
curve = ec.SECP256R1()
ecc_private_key = self._get_ecc_private_key(enterprise_data, curve)
ecc_private_key = context.enterprise_data.enterprise_info.ec_private_key

if not ecc_private_key:
return data_keys
Expand All @@ -1054,43 +1054,26 @@ def _get_ecc_data_keys(self, context: KeeperParams, enterprise_data, user_ids: S
data_key_rq.enterpriseUserId.extend(user_ids)
data_key_rs = context.auth.execute_auth_rest(
GET_ENTERPRISE_USER_DATA_KEY_ENDPOINT, data_key_rq,
response_type=APIRequest_pb2.EnterpriseUserIdDataKeyPair)
response_type=enterprise_pb2.EnterpriseUserDataKeys)

enc_data_key = data_key_rs.encryptedDataKey
if enc_data_key:
try:
ephemeral_public_key = ec.EllipticCurvePublicKey.from_encoded_point(
curve, enc_data_key[:ECC_PUBLIC_KEY_LENGTH])
shared_key = ecc_private_key.exchange(ec.ECDH(), ephemeral_public_key)
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(shared_key)
enc_key = digest.finalize()
data_key = utils.crypto.decrypt_aes_v2(enc_data_key[ECC_PUBLIC_KEY_LENGTH:], enc_key)
data_keys[data_key_rs.enterpriseUserId] = data_key
except Exception as e:
logger.debug(e)
for key in data_key_rs.keys:
enc_data_key = key.userEncryptedDataKey
if enc_data_key:
try:
ephemeral_public_key = ec.EllipticCurvePublicKey.from_encoded_point(
curve, enc_data_key[:ECC_PUBLIC_KEY_LENGTH])
shared_key = ecc_private_key.exchange(ec.ECDH(), ephemeral_public_key)
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
digest.update(shared_key)
enc_key = digest.finalize()
data_key = utils.crypto.decrypt_aes_v2(enc_data_key[ECC_PUBLIC_KEY_LENGTH:], enc_key)
data_keys[key.enterpriseUserId] = data_key
except Exception as e:
logger.debug(e)

return data_keys

def _get_ecc_private_key(self, enterprise_data, curve) -> Optional[Any]:
"""Extract and decrypt the ECC private key from enterprise data."""
if not enterprise_data.enterprise_info.keys:
return None
if not enterprise_data.enterprise_info.keys.ecc_encrypted_private_key:
return None

try:
keys = enterprise_data.get_enterprise_data_keys()
ecc_private_key_data = utils.base64_url_decode(keys.ecc_encrypted_private_key)
ecc_private_key_data = utils.crypto.decrypt_aes_v2(
ecc_private_key_data, enterprise_data.enterprise_info.tree_key)
private_value = int.from_bytes(ecc_private_key_data, byteorder='big', signed=False)
return ec.derive_private_key(private_value, curve, default_backend())
except Exception as e:
logger.debug(e)
return None

def _get_rsa_data_keys(self, context: KeeperParams, enterprise_data, user_ids: Set[int]) -> Dict[int, bytes]:
def _get_rsa_data_keys(self, context: KeeperParams, user_ids: Set[int]) -> Dict[int, bytes]:
"""Get user data keys from Account Transfer using RSA encryption."""
data_keys: Dict[int, bytes] = {}
data_key_rq = APIRequest_pb2.UserDataKeyRequest()
Expand All @@ -1101,23 +1084,23 @@ def _get_rsa_data_keys(self, context: KeeperParams, enterprise_data, user_ids: S

if data_key_rs.noEncryptedDataKey:
user_ids_without_key = set(data_key_rs.noEncryptedDataKey)
usernames = [x.username for x in enterprise_data.users.get_all_entities()
usernames = [x.username for x in context.enterprise_data.users.get_all_entities()
if x.enterprise_user_id in user_ids_without_key]
if usernames:
logger.info('User(s) \"%s\" have no accepted account transfers or did not share encryption key',
', '.join(usernames))

if data_key_rs.accessDenied:
denied_user_ids = set(data_key_rs.accessDenied)
usernames = [x.username for x in enterprise_data.users.get_all_entities()
usernames = [x.username for x in context.enterprise_data.users.get_all_entities()
if x.enterprise_user_id in denied_user_ids]
if usernames:
logger.info('You cannot manage these user(s): %s', ', '.join(usernames))

if data_key_rs.userDataKeys:
for dk in data_key_rs.userDataKeys:
try:
role_key = utils.crypto.decrypt_aes_v2(dk.roleKey, enterprise_data.enterprise_info.tree_key)
role_key = utils.crypto.decrypt_aes_v2(dk.roleKey, context.enterprise_data.enterprise_info.tree_key)
encrypted_private_key = utils.base64_url_decode(dk.privateKey)
decrypted_private_key = utils.crypto.decrypt_aes_v1(encrypted_private_key, role_key)
private_key = utils.crypto.load_rsa_private_key(decrypted_private_key)
Expand Down
Loading