From b904f9091cae76581561a8f5ed585ec9ac134e2c Mon Sep 17 00:00:00 2001 From: Enrique Vallespi Gil Date: Thu, 30 Oct 2025 11:23:54 +0100 Subject: [PATCH] Move crawl_n_mask to regexp based logic We're moving crawl_n_mask to regexp based logic which is able to mask any type of file (json, yaml, txt, log). Regexps search for exact words for avoiding false positives (We don't want to maske SecretName when searching for Secret keyword. This is the reason of increase PROTECT_KEYS impacts positive in the performance. New version also capable to mask two secrets in the same log line. Avoiding masking Ansible headers such Task and Play. Added multiprocessing support for parallel file masking. Added file masking integration tests, with temporary files. AI-Assisted: This change was developed with assistance from Claude (Anthropic's AI assistant) for code refactoring, test development, and PEP 8 compliance." Signed-off-by: Enrique Vallespi Gil --- plugins/modules/crawl_n_mask.py | 448 +++++++++++++++++------- roles/artifacts/tasks/main.yml | 1 + tests/unit/modules/test_crawl_n_mask.py | 240 +++++++++++-- 3 files changed, 541 insertions(+), 148 deletions(-) diff --git a/plugins/modules/crawl_n_mask.py b/plugins/modules/crawl_n_mask.py index 8470b9e5d8..4d9cd66b47 100755 --- a/plugins/modules/crawl_n_mask.py +++ b/plugins/modules/crawl_n_mask.py @@ -13,13 +13,12 @@ --- module: crawl_n_mask -short_description: This module mask secrets in yaml files/dirs +short_description: This module mask secrets in yaml/json/log files/dirs version_added: "1.0.0" description: - - This module crawls over a directory (default) and find yaml files which may have secrets in it, and proceeds with masking it. - - If you pass a yaml file, it will directly check and mask secret in it. + - This module crawls over a directory (default) and find yaml/json/log files which may have secrets in it, and proceeds with masking it. - If you pass a directory, it will crawl the directory and find eligible files to mask. options: @@ -43,7 +42,7 @@ """ EXAMPLES = r""" -- name: Mask secrets in all yaml files within /home/zuul/logs +- name: Mask secrets in all yaml/json/log files within /home/zuul/logs crawl_n_mask: path: /home/zuul/logs isdir: True @@ -51,6 +50,10 @@ - name: Mask my_secrets.yaml crawl_n_mask: path: /home/zuul/logs/my_secrets.yaml + +- name: Mask application.log + crawl_n_mask: + path: /var/log/application.log """ RETURN = r""" @@ -62,8 +65,9 @@ """ import os -import re import pathlib +import re +from multiprocessing import Pool, cpu_count from ansible.module_utils.basic import AnsibleModule @@ -86,85 +90,211 @@ # python3 plugins/modules/crawl_n_mask.py ./args.json ################ -# files which are yaml but do not end with .yaml or .yml -ALLOWED_YAML_FILES = [ - "Standalone", -] # dirs which we do not want to scan EXCLUDED_DIRS = [ "openstack-k8s-operators-openstack-must-gather", "tmp", "venv", + ".git", ".github", ] -# file extensions which we do not want to process +# Used to skip Ansible task headers from txt/log masked files +ANSIBLE_SKIP_PATTERNS = [ + "TASK [", + "TASK: ", + "PLAY [", +] +# File extensions which we do not want to process EXCLUDED_FILE_EXT = [ ".py", ".html", ".DS_Store", - ".tar.gz", + ".tar.*", + ".rpm", ".zip", ".j2", + ".subunit", + ".tmp", ] # keys in files whose values need to be masked PROTECT_KEYS = [ - "literals", - "PASSWORD", - "Password", - "password", + "_client_cert_passphrase", + "_client_key_passphrase", + "_local_rsync_password", "_pwd", "_PWD", - "Token", - "Secret", - "secret", - "SECRET", + "_secret_content", + "abotrabbitmq", + "accessSecret", + "adcCredentialSecret", + "admin_password", + "adminPassword", + "AdminPassword", + "ADMIN_PASSWORD", + "adminPasswordSecretKeyRef", + "alt_password", + "AodhDatabasePassword", + "AodhPassword", + "api_secret", + "auth_encryption_key", + "authCertSecret", "Authkey", "authkey", - "private_key", - "privatekey", - "Passphrase", - "passphrase", - "PASSPHRASE", - "encryption_key", - "ENCRYPTION_KEY", - "HeatAuthEncryptionKey", - "oc_login_command", - "METADATA_SHARED_SECRET", - "KEYSTONE_FEDERATION_CLIENT_SECRET", - "rabbit", - "database_connection", - "slave_connection", - "sql_connection", + "aws_secret_access_key", + "BARBICAN_SIMPLE_CRYPTO_ENCRYPTION_KEY", + "BarbicanDatabasePassword", + "BarbicanPassword", + "BarbicanSimpleCryptoKEK", + "BarbicanSimpleCryptoKek", + "bearerToken", + "bind_password", + "bindPassword", + "bootstrapPassword", + "bootstrapToken", + "ca_secret", + "caSecret", + "CeilometerPassword", + "CephClientKey", + "CephClusterFSID", + "CephRgwKey", + "chap_password", "cifmw_openshift_login_password", "cifmw_openshift_login_token", - "BarbicanSimpleCryptoKEK", - "OctaviaHeartbeatKey", - "server-ca-passphrase", - "KeystoneFernetKeys", - "KeystoneFernetKey", - "KeystoneCredential", + "cifmw_openshift_password", + "CinderDatabasePassword", + "CinderPassword", + "client_secret", + "clientSecret", + "clientsecret", + "ClientKey", + "cloud_admin_user_password", + "database_connection", + "databasePassword", + "DatabasePassword", + "db-password", + "DB_ROOT_PASSWORD", + "DbRootPassword", + "defaultAdminPassword", + "DesignateDatabasePassword", + "DesignatePassword", "DesignateRndcKey", - "CephRgwKey", - "CephClusterFSID", - "CephClientKey", - "BarbicanSimpleCryptoKek", - "HashSuffix", - "RabbitCookie", + "docker-password", + "EMAIL_HOST_PASSWORD", + "ENCRYPTION_KEY", + "encryption_key", "erlang_cookie", - "ClientKey", - "swift_store_key", - "secret_key", - "heartbeat_key", "fernet_keys", - "sshkey", + "fromConnectionSecretKey", + "git-password", + "GlanceDatabasePassword", + "GlancePassword", + "HashSuffix", + "HEAT_AUTH_ENCRYPTION_KEY", + "HeatAuthEncryptionKey", + "HeatDatabasePassword", + "HeatPassword", + "heartbeat_key", + "http_basic_password", + "idp_password", + "idp_test_user_password", + "iibpassword", + "ilo_password", + "image_alt_ssh_password", + "image_password", + "image_server_password", + "image_ssh_password", + "infoblox_password", + "ipmi_password", + "IronicDatabasePassword", + "IronicInspectorDatabasePassword", + "IronicInspectorPassword", + "IronicPassword", + "key-password", + "key-store-password", + "key_password", + "keyPassword", + "keystoreKeyPassword", + "keystoreKeypassword", + "keystorePassword", + "keyStorePassword", + "KeystoneCredential", + "KeystoneDatabasePassword", + "KEYSTONE_FEDERATION_CLIENT_SECRET", + "KeystoneFernetKey", + "KeystoneFernetKeys", "keytab_base64", -] -# connection keys which may be part of the value itself -CONNECTION_KEYS = [ + "LibvirtPassword", + "licenseSecret", + "literals", + "managementPassword", + "ManilaDatabasePassword", + "ManilaPassword", + "MARIADB_PASSWORD", + "master-password", + "masterPassword", + "MASTER_PASSWORD", + "metadata_proxy_shared_secret", + "MetadataSecret", + "METADATA_SHARED_SECRET", + "MONGODB_BACKUP_PASSWORD", + "MONGODB_CLUSTER_ADMIN_PASSWORD", + "MONGODB_CLUSTER_MONITOR_PASSWORD", + "MONGODB_DATABASE_ADMIN_PASSWORD", + "MONGODB_USER_ADMIN_PASSWORD", + "mqpassword", + "mysql_root_password", + "mysql_zabbix_password", + "netapp_password", + "NeutronDatabasePassword", + "NeutronPassword", + "nexusInitialPassword", + "NodeRootPassword", + "NovaAPIDatabasePassword", + "NovaCell0DatabasePassword", + "NovaCell1DatabasePassword", + "NovaPassword", + "oc_login_command", + "OctaviaDatabasePassword", + "OctaviaHeartbeatKey", + "OctaviaPassword", + "Passphrase", + "passphrase", + "PASSPHRASE", + "Password", + "password", + "PASSWORD", + "pgpassword", + "pgreplpassword", + "pg_restic_password", + "pgRewindPassword", + "PlacementDatabasePassword", + "PlacementPassword", + "postgresPassword", + "postgresqlPassword", + "private_key", + "privatekey", + "proxy_password", "rabbit", - "database_connection", + "rabbitmqPassword", + "RabbitCookie", + "redfish_password", + "redis_password", + "remote_image_user_password", + "scimAdminPassword", + "Secret", + "secret", + "SECRET", + "secret_key", + "server-ca-passphrase", + "ServicePassword", "slave_connection", + "SPRING_DATASOURCE_PASSWORD", "sql_connection", + "ssh-privatekey", + "sshkey", + "stack_domain_admin_password", + "staticPasswords", + "X-Auth-Token", ] # Masking string MASK_STR = "**********" @@ -172,6 +302,48 @@ # regex of excluded file extensions excluded_file_ext_regex = r"(^.*(%s).*)" % "|".join(EXCLUDED_FILE_EXT) +QUICK_KEYWORDS = frozenset([key.lower() for key in PROTECT_KEYS]) + +# Pre-compiled regex patterns for log file masking. +LOG_PATTERNS = { + # Matches: 'password': 'value' OR \n'password': 'value' + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=", 9=value, 10=") + "python_dict_quoted": re.compile( + r"((?:\s|\\n)*)(['\"])(" + + "|".join(PROTECT_KEYS) + + r")(['\"])(\s*)([:=])(\s*)(['\"])([^'\"]+)(['\"])", + re.IGNORECASE, + ), + # Matches: 'password': 123456789 OR \n'password': 123456789 + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=value) + "python_dict_numeric": re.compile( + r"((?:\s|\\n)*)(['\"])(" + + "|".join(PROTECT_KEYS) + + r")(['\"])(\s*)(:)(\s*)(\d{6,})", + re.IGNORECASE, + ), + # Matches: password: value OR netapp_password=secret OR \npassword: value + # Groups: (1=prefix, 2=key, 3=space-before, 4=sep, 5=space-after, + # 6=open-quote(optional), 7=value, 8=close-quote(optional)) + "plain_key_value": re.compile( + r"((?:\s|\\n)*)\b(" + + "|".join(PROTECT_KEYS) + + r')\b(\s*)([:=])(\s*)(["\']?)([^\s\\"\']+)(["\']?)', + re.IGNORECASE, + ), + # Matches: SHA256 tokens (OpenShift style) + "sha256_token": re.compile(r"sha256~[A-Za-z0-9_-]+"), + # Matches: Bearer + "bearer": re.compile(r"Bearer\s+[a-zA-Z0-9_-]{20,}", re.IGNORECASE), + # Matches: ://user:pass@host + "connection_string": re.compile( + r"://([a-zA-Z0-9_-]+):([a-zA-Z0-9_@!#$%^&*]+)@([a-zA-Z0-9.-]+)" + ), +} + +# Available CPU-1, Max 8 +NUM_WORKERS = min(cpu_count() - 1, 8) + def handle_walk_errors(e): raise e @@ -182,8 +354,9 @@ def crawl(module, path) -> bool: Crawler function which will crawl through the log directory and find eligible files for masking. """ - changed = False + files_to_process = [] base_path = os.path.normpath(path) + results = [] for root, _, files in os.walk(base_path, onerror=handle_walk_errors): # Get relative path from our base path rel_path = os.path.relpath(root, base_path) @@ -191,100 +364,111 @@ def crawl(module, path) -> bool: # Check if any parent directory (not the root) is excluded if any(part in EXCLUDED_DIRS for part in rel_path.split(os.sep)): continue - for f in files: if not re.search(excluded_file_ext_regex, f): - if mask(module, os.path.join(root, f)): - # even if one file is masked, the final result will be True - changed = True - return changed + files_to_process.append(os.path.join(root, f)) + try: + with Pool(processes=NUM_WORKERS) as pool: + results = pool.map(mask_file, files_to_process) + except Exception as e: + module.fail_json(msg=f"Failed to mask files: {str(e)}") + + return any(results) def _get_masked_string(value): + # Not process empty strings + if len(value.strip("'\"")) == 0: + return value if len(value) <= 4: return value[:2] + MASK_STR return value[:2] + MASK_STR + value[-2:] -def partial_mask(value): +def mask_log_line(line: str) -> str: """ - Check length of the string. If it is too long, take 2 chars - from beginning, then add mask string and add 2 chars from the - end. - If value is short, take just 2 chars and add mask string + Masks several secrets occurrence in a single line. + Works good with big file with long lines and sparse secrets. + + Returns masked line with secrets replaced by MASK_STR """ - if not value.strip(): - return - - if "'" in value: - parsed_value = value.split("'") - if len(parsed_value) > 2 and parsed_value[1] != "": - prefix = parsed_value[0] - value = _get_masked_string(parsed_value[1]) - suffix = parsed_value[2] - return f"{prefix}'{value}'{suffix}" - else: - match = re.match(r"^(\s*)(.*?)(\n?)$", value) - if match: - parts = list(match.groups()) - prefix = parts[0] - value = _get_masked_string(parts[1]) - suffix = parts[2] - return f"{prefix}'{value}'{suffix}" + line_lower = line.lower() + has_keyword = any(kw in line_lower for kw in QUICK_KEYWORDS) + + if not has_keyword: + return line -def mask(module, path: str) -> bool: + # Pattern 1: 'password': 'value' + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=", 9=value, 10=") + line = LOG_PATTERNS["python_dict_quoted"].sub( + lambda m: f"{m.group(1)}{m.group(2)}{m.group(3)}{m.group(4)}{m.group(5)}{m.group(6)}{m.group(7)}{m.group(8)}{_get_masked_string(m.group(9))}{m.group(10)}", + line, + ) + + # Pattern 2: 'password': 123456789 + # Groups: (1=prefix, 2=", 3=key, 4=", 5=space-before, 6=sep, 7=space-after, 8=value) + line = LOG_PATTERNS["python_dict_numeric"].sub( + lambda m: f"{m.group(1)}{m.group(2)}{m.group(3)}{m.group(4)}{m.group(5)}{m.group(6)}{m.group(7)}{_get_masked_string(m.group(8))}", + line, + ) + + # Pattern 3: password: value OR password = value + # Groups: (1=prefix, 2=key, 3=space-before, 4=sep, 5=space-after, + # 6=open-quote(optional), 7=value, 8=close-quote(optional)) + line = LOG_PATTERNS["plain_key_value"].sub( + lambda m: f"{m.group(1)}{m.group(2)}{m.group(3)}{m.group(4)}{m.group(5)}{m.group(6)}{_get_masked_string(m.group(7))}{m.group(8)}", + line, + ) + # SHA256 tokens + # sha256~abc123... -> sha256~********** + line = LOG_PATTERNS["sha256_token"].sub(f"sha256~{MASK_STR}", line) + + # Bearer tokens + # Bearer abc123... -> Bearer ********** + line = LOG_PATTERNS["bearer"].sub(f"Bearer {MASK_STR}", line) + + # Connection_string tokens + # mysql://user:pas123@localhost:3306/db -> mysql://*****:*******@:3306/db" + line = LOG_PATTERNS["connection_string"].sub(f"://{MASK_STR}:{MASK_STR}@", line) + + return line + + +def should_skip_ansible_line(line: str) -> bool: """ - Function responsible to begin masking on a provided - log file. It checks for file type, and calls - respective masking methods for that file. + Identifies if the line is in an Ansible header for Tasks or Plays. + + Returns True for lines that should not be masked. """ - changed = False - if ( - path.endswith((tuple(["yaml", "yml"]))) - or os.path.basename(path).split(".")[0] in ALLOWED_YAML_FILES - ): - extension = "yaml" - changed = mask_file(module, path, extension) - return changed + line_upper = line.upper() + return any(pattern.upper() in line_upper for pattern in ANSIBLE_SKIP_PATTERNS) -def mask_yaml(infile, outfile, changed) -> bool: +def mask_log_file_lines(infile, outfile, changed) -> bool: """ - Read the file, search for colon (':'), take value and - mask sensitive data + Mask log file lines with skip logic. + """ for line in infile: - # Skip lines without colon - if ":" not in line: + # Skip Ansible task headers + if should_skip_ansible_line(line): outfile.write(line) continue - key, sep, value = line.partition(":") - masked_value = value - for word in PROTECT_KEYS: - if key.strip() == word: - masked = partial_mask(value) - if not masked: - continue - masked_value = masked_value.replace(value, masked) - changed = True - - outfile.write(f"{key}{sep}{masked_value}") - return changed - + masked_line = mask_log_line(line) + if masked_line != line: + changed = True + outfile.write(masked_line) -def replace_file(temp_path, file_path, changed): - if changed: - temp_path.replace(file_path) - else: - temp_path.unlink(missing_ok=True) + return changed -def mask_file(module, path, extension) -> bool: +def mask_file(path) -> bool: """ Create temporary file, replace sensitive string with masked, then replace the tmp file with original. + Unlink temp file when failure. """ changed = False @@ -293,12 +477,23 @@ def mask_file(module, path, extension) -> bool: try: with file_path.open("r", encoding="utf-8") as infile: with temp_path.open("w", encoding="utf-8") as outfile: - if extension == "yaml": - changed = mask_yaml(infile, outfile, changed) - replace_file(temp_path, file_path, changed) - return changed + changed = mask_log_file_lines(infile, outfile, changed) + replace_file(temp_path, file_path, changed) + return changed + except FileNotFoundError: + print(f"Warning: File not found (possibly broken symlink): {file_path}") + return False except Exception as e: print(f"An unexpected error occurred on masking file {file_path}: {e}") + temp_path.unlink(missing_ok=True) + return False + + +def replace_file(temp_path, file_path, changed): + if changed: + temp_path.replace(file_path) + else: + temp_path.unlink(missing_ok=True) def run_module(): @@ -344,7 +539,10 @@ def run_module(): changed = crawl(module, path) if not isdir and not re.search(excluded_file_ext_regex, path): - changed = mask(module, path) + try: + changed = mask_file(path) + except Exception as e: + module.fail_json(e) result.update(changed=changed) # in the event of a successful module execution, you will want to diff --git a/roles/artifacts/tasks/main.yml b/roles/artifacts/tasks/main.yml index 36e10f79a6..682058fcf4 100644 --- a/roles/artifacts/tasks/main.yml +++ b/roles/artifacts/tasks/main.yml @@ -92,6 +92,7 @@ - name: Mask secrets in yaml log files when: cifmw_artifacts_mask_logs |bool + become: true ignore_errors: true # noqa: ignore-errors timeout: 3600 cifmw.general.crawl_n_mask: diff --git a/tests/unit/modules/test_crawl_n_mask.py b/tests/unit/modules/test_crawl_n_mask.py index 519bdd2792..8330ae4d27 100644 --- a/tests/unit/modules/test_crawl_n_mask.py +++ b/tests/unit/modules/test_crawl_n_mask.py @@ -1,5 +1,9 @@ +import os +import tempfile +from unittest.mock import MagicMock, patch + import pytest -from unittest.mock import patch, MagicMock + from plugins.modules import crawl_n_mask as cnm @@ -14,10 +18,13 @@ class TestCrawlNMask: ) def test_crawl_true(self, test_dir, expected_files): with patch("os.walk") as mock_walk, patch( - "plugins.modules.crawl_n_mask.mask" - ) as mock_mask: + "plugins.modules.crawl_n_mask.Pool" + ) as mock_pool: mock_walk.return_value = expected_files - mock_mask.return_value = True + # Mock the Pool context manager and map method + mock_pool_instance = MagicMock() + mock_pool.return_value.__enter__.return_value = mock_pool_instance + mock_pool_instance.map.return_value = [True] # At least one file changed module = MagicMock() changed = cnm.crawl(module, test_dir) assert changed @@ -33,33 +40,220 @@ def test_crawl_true(self, test_dir, expected_files): ) def test_crawl_false(self, test_dir, expected_files): with patch("os.walk") as mock_walk, patch( - "plugins.modules.crawl_n_mask.mask" - ) as mock_mask: + "plugins.modules.crawl_n_mask.Pool" + ) as mock_pool: mock_walk.return_value = expected_files - mock_mask.return_value = False + # Mock the Pool context manager and map method + mock_pool_instance = MagicMock() + mock_pool.return_value.__enter__.return_value = mock_pool_instance + mock_pool_instance.map.return_value = [False] # No files changed module = MagicMock() changed = cnm.crawl(module, test_dir) assert not changed - def test_partial_mask_scenario_1(self): - example_value = " 'test1234'\n" - expected_value = " 'te**********34'\n" - test_value = cnm.partial_mask(example_value) + def test_get_masked_string_scenario_1(self): + example_value = "test1234" + expected_value = "te**********34" + test_value = cnm._get_masked_string(example_value) assert expected_value == test_value - def test_partial_mask_scenario_2(self): - example_value = " osp_ci_framework_keytab\n" - expected_value = " 'os**********ab'\n" - test_value = cnm.partial_mask(example_value) + def test_get_masked_string_scenario_2(self): + example_value = "examplea_keytab" + expected_value = "ex**********ab" + test_value = cnm._get_masked_string(example_value) assert expected_value == test_value - def test_partial_mask_scenario_3(self): - example_value = " ''\n" - test_value = cnm.partial_mask(example_value) - assert test_value is None + def test_get_masked_string_scenario_3(self): + example_value = "" + expected_value = "" + test_value = cnm._get_masked_string(example_value) + assert expected_value == test_value - def test_partial_mask_scenario_4(self): - example_value = "tet" - expected_value = "'te**********'" - test_value = cnm.partial_mask(example_value) + def test_get_masked_string_scenario_4(self): + example_value = "test" + expected_value = "te**********" + test_value = cnm._get_masked_string(example_value) assert expected_value == test_value + + @pytest.mark.parametrize( + "input_line, expected_output", + [ + # Test python dict quoted pattern + ( + "'admin_password': 'SuperSecret123'", + "'admin_password': 'Su**********23'", + ), + ( + '"AdminPassword": "MyP@ssw0rd"', + '"AdminPassword": "My**********rd"', + ), + # Test numeric passwords + ( + "'db-password': 123456789", + "'db-password': 12**********89", + ), + # Test empty value + ( + "password: ''", + "password: ''", + ), + # Test plain key-value pattern + ( + "admin_password: secret123", + "admin_password: se**********23", + ), + ( + 'password: "abc123"', + 'password: "ab**********23"', + ), + ( + "password: 'abc123'", + "password: 'ab**********23'", + ), + ( + "mysql_root_password=MyPassword", + "mysql_root_password=My**********rd", + ), + # Test SHA256 tokens - treated as plain key:value + ( + "X-Auth-Token sha256~abc123def456ghi789", + "X-Auth-Token sha256~**********", + ), + # Test Bearer tokens - value gets masked + ( + "bearerToken: eyJhbGciOiJIU2d12ansnR5cCI6IkpXVCJ9", + "bearerToken: ey**********J9", + ), + # Test connection strings - credentials in URL get masked + ( + "admin_password in mysql://user:password123@localhost:3306/db", + "admin_password in mysql://**********:**********@:3306/db", + ), + # Test line without secrets - should remain unchanged + ( + "This is a normal log line without secrets", + "This is a normal log line without secrets", + ), + # Test multiple secrets in one line + ( + "'admin_password': 'secret1' and 'db-password'= 'secret2'", + "'admin_password': 'se**********t1' and 'db-password'= 'se**********t2'", + ), + # Test additional keys from PROTECT_KEYS + ( + "redis_password: myRedisSecret", + "redis_password: my**********et", + ), + ( + "clientSecret:oauth2secret", + "clientSecret:oa**********et", + ), + ( + "postgresPassword :dbP@ssw0rd!", + "postgresPassword :db**********d!", + ), + ( + "'BARBICAN_SIMPLE_CRYPTO_ENCRYPTION_KEY' : 'sE12312341==48943y21'", + "'BARBICAN_SIMPLE_CRYPTO_ENCRYPTION_KEY' : 'sE**********21'", + ), + ], + ) + def test_mask_log_line(self, input_line, expected_output): + result = cnm.mask_log_line(input_line) + assert result == expected_output + + def test_should_skip_ansible_line(self): + assert cnm.should_skip_ansible_line("TASK [Install packages]") == True + assert cnm.should_skip_ansible_line("TASK: Configure database") == True + assert cnm.should_skip_ansible_line("PLAY [Deploy application]") == True + assert cnm.should_skip_ansible_line("Some regular log line") == False + + def test_mask_file_with_real_file(self): + # Create a temporary file with secrets + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".yaml") as f: + f.write("admin_password: SuperSecret123\n") + f.write( + "db_connection: admin_password in mysql://user:pass123@localhost/db\n" + ) + f.write("normal_config: some_value\n") + temp_path = f.name + + try: + # Mask the file + changed = cnm.mask_file(temp_path) + + # Verify it was changed + assert changed == True + + # Read the masked file + with open(temp_path, "r") as f: + content = f.read() + + # Verify secrets are masked + assert "SuperSecret123" not in content + assert "pass123" not in content + assert "**********" in content + + # Verify normal content is preserved + assert "normal_config: some_value" in content + + finally: + # Clean up + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_mask_file_no_changes(self): + # Create a temporary file without secrets + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".yaml") as f: + f.write("normal_config: some_value\n") + f.write("another_config: another_value\n") + temp_path = f.name + + try: + # Mask the file + changed = cnm.mask_file(temp_path) + + # Verify it was not changed + assert changed == False + + # Read the file + with open(temp_path, "r") as f: + content = f.read() + + # Verify content is unchanged + assert "normal_config: some_value" in content + assert "another_config: another_value" in content + + finally: + # Clean up + if os.path.exists(temp_path): + os.unlink(temp_path) + + def test_mask_file_preserves_ansible_task_headers(self): + # Create a temporary file with Ansible task headers + with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".log") as f: + f.write("TASK [Setup admin_password variable]\n") + f.write("admin_password: SuperSecret123\n") + f.write("PLAY [Configure mysql_root_password]\n") + temp_path = f.name + + try: + # Mask the file + changed = cnm.mask_file(temp_path) + + # Read the masked file + with open(temp_path, "r") as f: + content = f.read() + + # Verify task headers are preserved exactly + assert "TASK [Setup admin_password variable]" in content + assert "PLAY [Configure mysql_root_password]" in content + + # Verify the actual secret is masked + assert "SuperSecret123" not in content + assert "**********" in content + + finally: + # Clean up + if os.path.exists(temp_path): + os.unlink(temp_path)