diff --git a/analyzer/windows/analyzer.py b/analyzer/windows/analyzer.py index f6b540eece7..6d8ce888b8a 100644 --- a/analyzer/windows/analyzer.py +++ b/analyzer/windows/analyzer.py @@ -657,7 +657,10 @@ def analysis_loop(self, aux_modules): emptytime = None complete_folder = hashlib.md5(f"cape-{self.config.id}".encode()).hexdigest() - complete_analysis_pattern = os.path.join(os.environ["TMP"], complete_folder) + complete_analysis_patterns = [os.path.join(os.environ["TMP"], complete_folder)] + if "SystemRoot" in os.environ: + complete_analysis_patterns.append(os.path.join(os.environ["SystemRoot"], "Temp", complete_folder)) + while self.do_run: self.time_counter = timeit.default_timer() - time_start if self.time_counter >= int(self.config.timeout): @@ -665,11 +668,14 @@ def analysis_loop(self, aux_modules): ANALYSIS_TIMED_OUT = True break - if os.path.isdir(complete_analysis_pattern): + if any(os.path.isdir(p) for p in complete_analysis_patterns): log.info("Analysis termination requested by user") ANALYSIS_TIMED_OUT = True break + if ANALYSIS_TIMED_OUT: + break + # If the process lock is locked, it means that something is # operating on the list of monitored processes. Therefore we # cannot proceed with the checks until the lock is released. diff --git a/tests/test_analyzer_logic.py b/tests/test_analyzer_logic.py new file mode 100644 index 00000000000..49731bd88a3 --- /dev/null +++ b/tests/test_analyzer_logic.py @@ -0,0 +1,47 @@ +import pytest +import os +import hashlib +import tempfile + +# Ideally, this function would be imported from your application code +def check_completion_logic(config): + complete_folder = hashlib.md5(f"cape-{config.id}".encode()).hexdigest() + complete_analysis_patterns = [os.path.join(os.environ["TMP"], complete_folder)] + if "SystemRoot" in os.environ: + complete_analysis_patterns.append(os.path.join(os.environ["SystemRoot"], "Temp", complete_folder)) + + return any(os.path.isdir(path) for path in complete_analysis_patterns) + +class MockConfig: + id = 123 + +@pytest.fixture +def mock_env(monkeypatch): + """Pytest fixture to mock environment and create temp dirs.""" + with tempfile.TemporaryDirectory() as tmp_dir, tempfile.TemporaryDirectory() as sysroot_dir: + monkeypatch.setenv("TMP", tmp_dir) + monkeypatch.setenv("SystemRoot", sysroot_dir) + os.makedirs(os.path.join(sysroot_dir, "Temp"), exist_ok=True) + yield + +def test_completion_folder_in_tmp(mock_env): + config = MockConfig() + complete_folder = hashlib.md5(f"cape-{config.id}".encode()).hexdigest() + path = os.path.join(os.environ["TMP"], complete_folder) + os.makedirs(path) + + assert check_completion_logic(config) is True + + os.rmdir(path) + assert check_completion_logic(config) is False + +def test_completion_folder_in_systemroot(mock_env): + config = MockConfig() + complete_folder = hashlib.md5(f"cape-{config.id}".encode()).hexdigest() + path = os.path.join(os.environ["SystemRoot"], "Temp", complete_folder) + os.makedirs(path) + + assert check_completion_logic(config) is True + + os.rmdir(path) + assert check_completion_logic(config) is False