Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ tests/test_bson.bson.compressed
installer/cape-config.sh
installer/kvm-config.sh

docs/book/src/_build
docs/book/src/_build
/.vs
3 changes: 3 additions & 0 deletions conf/default/web.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -246,3 +246,6 @@ enabled = no

[pcap_ng]
enabled = no

[audit_framework]
enabled = no
158 changes: 158 additions & 0 deletions docs/book/src/usage/audit.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
.. _audit_framework:

===============
Audit Framework
===============

The Audit Framework is a specialized subsystem within CAPE designed to verify the correctness and reliability of the sandbox's analysis capabilities. It allows operators to define specific test cases ("Audit Packages") that run known samples with expected behavioral outcomes ("Objectives"). This is particularly useful for validating that CAPE is correctly capturing specific behaviors (e.g., shellcode injection, network beacons) after updates or configuration changes.

Concepts
========

* **Available Test**: A test case definition stored on the disk. It consists of a payload (e.g., a malware sample) and a Python script defining the success criteria.
* **Test Session**: A collection of test runs. You can group multiple tests into a session to validate a specific aspect of the system (e.g., "Weekly Regression Test").
* **Test Run**: A single execution of an *Available Test* within a *Test Session*. It links to a standard CAPE Task ID.
* **Objective**: A specific criterion that must be met for a test to pass (e.g., "DNS request to evil.com observed", "File dropped in AppData").

Configuration
=============

To enable the Audit Framework, ensure the feature is enabled in your web configuration.

Edit ``conf/web.conf``:

.. code-block:: ini

[audit_framework]
enabled = yes

The framework looks for test packages in ``tests/audit_packages/`` by default.

Creating Audit Packages
=======================

Audit packages are directory-based. Each package must be a subdirectory inside ``tests/audit_packages/`` (or the configured path) containing at least two files:

1. ``payload.zip``: A zip file containing the sample to be analyzed.
* *Note*: If the zip contains a single file, that file is treated as the payload. If it contains multiple files, the extracted directory is treated as the payload (useful for packages requiring dependencies).
2. ``test.py``: A Python script defining the test metadata, objectives, and evaluation logic.

Directory Structure Example
---------------------------

.. code-block:: text

tests/audit_packages/
├── Emotet_Network_Beacon/
│ ├── payload.zip
│ └── test.py
└── AsyncRAT_Config_Extract/
├── payload.zip
└── test.py

The ``test.py`` Structure
-------------------------

The Python script must define a class named ``CapeDynamicTest`` that implements the following methods:

* ``get_metadata()``: Returns a dictionary of test settings.
* ``get_objectives()``: Returns a list of objective objects.
* ``evaluate_results(task_dir)``: Analyzes the analysis results.
* ``get_results()``: Returns the final status of objectives.

**Example `test.py`:**

.. code-block:: python

import os
import json

class TestObjective:
def __init__(self, name, requirement, children=None):
self.name = name
self.requirement = requirement
self.children = children or []

class CapeDynamicTest:
def __init__(self):
self._results = {}

def get_metadata(self):
"""
Define high-level test information.
"""
return {
"Name": "Emotet Beacon Test",
"Description": "Verifies that CAPE detects the C2 network connection.",
"Package": "exe", # CAPE analysis package to use
"Timeout": 200, # Analysis timeout in seconds
"Zip Password": "infected" # Password for payload.zip (optional)
}

def get_objectives(self):
"""
Define the criteria for success.
"""
return [
TestObjective("network_c2", "Must connect to C2 server 1.2.3.4"),
TestObjective("dropped_payload", "Must drop the second stage loader")
]

def evaluate_results(self, task_dir):
"""
Parse the CAPE report to verify objectives.
task_dir: Path to the storage directory for this task (contains report.json, etc.)
"""
report_path = os.path.join(task_dir, "reports", "report.json")

# Default state
self._results = {
"network_c2": {"state": "failure", "state_reason": "IP not found"},
"dropped_payload": {"state": "failure", "state_reason": "File not found"}
}

if not os.path.exists(report_path):
return

with open(report_path, "r") as f:
report = json.load(f)

# Check Network
for host in report.get("network", {}).get("hosts", []):
if host == "1.2.3.4":
self._results["network_c2"] = {"state": "success", "state_reason": "Connection found"}

# Check Dropped Files
if "dropped" in report:
self._results["dropped_payload"] = {"state": "success", "state_reason": "Dropped files present"}

def get_results(self):
"""
Return the dictionary of results calculated in evaluate_results.
Keys must match the Objective names.
"""
return self._results

Web Interface Usage
===================

Access the Audit interface via the sidebar menu or at ``/audit/``.

1. **Manage Tests**:
The main dashboard lists all available tests.
* If you have added new tests to the disk, click **Reload Tests** to update the database.

2. **Create Session**:
* Select the checkboxes next to the tests you wish to run.
* Click **Create Session**.
* You will be redirected to the Session view.

3. **Run Audit**:
* In the Session view, you can see the status of each test (Unqueued, Queued, Running, Complete).
* Click **Queue All** to submit all unqueued tests to CAPE.
* The status will update automatically as CAPE processes the tasks.

4. **View Results**:
* Once a test is ``Complete``, the framework automatically runs the ``evaluate_results`` logic from your `test.py`.
* The UI will display a **Pass** (Green) or **Fail** (Red) badge for each objective.
* You can expand a test row to see detailed reasons for failure or success.
1 change: 1 addition & 0 deletions docs/book/src/usage/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This chapter explains how to use CAPE.
submit
web
api
audit
dist
cluster_administration
packages
Expand Down
4 changes: 3 additions & 1 deletion lib/cuckoo/common/abstracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
from lib.cuckoo.common.path_utils import path_exists, path_mkdir
from lib.cuckoo.common.url_validate import url as url_validator
from lib.cuckoo.common.utils import create_folder, get_memdump_path, load_categories
from lib.cuckoo.core.database import Database, Machine, _Database, Task
from lib.cuckoo.core.database import Database, _Database
from lib.cuckoo.core.data.task import Task
from lib.cuckoo.core.data.machines import Machine

try:
import re2 as re
Expand Down
193 changes: 193 additions & 0 deletions lib/cuckoo/common/audit_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import os
import logging
import zipfile
import shutil
from pathlib import Path
from typing import Any, List, Dict
import importlib.util
from lib.cuckoo.core.data import task as db_task
from lib.cuckoo.core.data.audit_data import TEST_RUNNING, TEST_COMPLETE, TEST_FAILED, TEST_QUEUED

log = logging.getLogger(__name__)

def load_module(module_path):
module_name = "test_py_module"
spec = importlib.util.spec_from_file_location(module_name, str(module_path))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)

if not hasattr(module, 'CapeDynamicTest'):
log.warning(str(dir(module)))
raise ValueError("Module has no CapeDynamicTest class")
tester = module.CapeDynamicTest()

if not hasattr(tester, 'get_metadata'):
raise ValueError(f"CapeDynamicTest from {module_path} lacks get_metadata() function")
return tester


class TestLoader():
def __init__(self, tests_directory):
if not os.path.exists(tests_directory):
raise ValueError(f"Tests directory '{tests_directory}' does not exist.")
self.tests_root = tests_directory

def _extract_payload(self, payload_archive, payload_output_dir, zip_password=None):

# Verify payload ZIP integrity
try:
with zipfile.ZipFile(payload_archive, 'r') as z:
# If a password is provided in JSON, verify we can access the list
if zip_password:
z.setpassword(zip_password.encode())
# Test if the zip is actually readable/not corrupt
z.testzip()
except zipfile.BadZipFile:
if zip_password:
raise zipfile.BadZipFile(f"{payload_archive} is not usable with the given password")
else:
raise zipfile.BadZipFile(f"{payload_archive} is corrupt")

# delete the unwrapped payload in case a new zip has been uploaded
if os.path.exists(payload_output_dir):
shutil.rmtree(payload_output_dir)

with zipfile.ZipFile(payload_archive, 'r') as zip_ref:
if zip_password:
zip_ref.extractall(payload_output_dir, pwd=zip_password)
else:
zip_ref.extractall(payload_output_dir)

payload_path = None
if not os.path.isdir(payload_output_dir):
raise NotADirectoryError("Bad payload directory extracted")

dir_path = Path(payload_output_dir)
dir_contents = list(dir_path.iterdir())
if not dir_contents:
raise FileNotFoundError("Nothing in extracted payload directory")

if len(dir_contents) == 1:
payload_path = str(dir_contents[0])
else:
# If multiple items, treat the directory itself as the payload
payload_path = payload_output_dir

if not os.path.exists(payload_path):
raise FileNotFoundError("Nothing extracted from payload archive or it could not be written to disk")

return payload_path

def validate_test_directory(self, test_path: str) -> Dict[str, Any]:
"""
Validates a single test directory and returns the metadata from the test module.
Raises ValueError if the anything is invalid.
"""
payload_archive = os.path.join(test_path, "payload.zip")
module_path = os.path.join(test_path, "test.py")

# Check for required files
if not os.path.exists(payload_archive):
raise ValueError(f"Missing payload.zip in {payload_archive}")
if not os.path.exists(module_path):
raise ValueError(f"Missing test.py in {module_path}")

test_metadata = {}
test_metadata['module_path'] = module_path

# Load and instantiate the python test module and fetch metadata
try:
tester = load_module(module_path)
test_metadata['info'] = tester.get_metadata()

test_metadata['objectives'] = []

def load_objective(objective):
objdict = {'name': objective.name,
'requirement': objective.requirement,
'children': [load_objective(child) for child in objective.children]
}
return objdict
for objective in tester.get_objectives():
test_metadata['objectives'].append(load_objective(objective))

except Exception as e:
raise ValueError(f"Failed to load test module or fetch metadata from {module_path}: {e}")

conf = test_metadata['info'].get("Task Config", None)
if conf:
if conf.get("Request Options",None) is None:
test_metadata['info']["Request Options"] = ""

if 'Name' not in test_metadata['info']:
raise ValueError(f"Metadata in {module_path} missing 'Name' field")
if 'Package' not in test_metadata['info']:
raise ValueError(f"Metadata in {module_path} missing 'Package' field")

zip_password = test_metadata['info'].get("Zip Password", None)
payload_output_dir = os.path.join(test_path, "payload")
test_metadata['payload_path'] = self._extract_payload(payload_archive, payload_output_dir, zip_password)

# Return prepared metadata for DB ingest
return test_metadata

def load_tests(self) -> List[Dict[str, Any]]:
"""
Walks the root directory and yields validated test configurations.
"""
available_tests = []
unavailable_tests = []

if not os.path.exists(self.tests_root):
log.error("Tests root %s does not exist.", self.tests_root)
return {"error": f"Tests root {self.tests_root} does not exist."}

for entry in os.scandir(self.tests_root):
if entry.is_dir():
test_config = None
try:
test_config = self.validate_test_directory(entry.path)
available_tests.append(test_config)
log.info("Loaded test: %s",test_config['info']['Name'])
except Exception as e:
log.exception("Skipping directory %s due to exception",entry.path)
unavailable_tests.append({"module_path":entry.path, "error":str(e)})

return {'available':available_tests, 'unavailable': unavailable_tests}


class TestResultValidator():
def __init__(self, test_module_path:str, task_storage_directory: str):
if os.path.isdir(task_storage_directory):
self.task_directory = task_storage_directory
else:
raise NotADirectoryError(f"Invalid task directory: {task_storage_directory}")

try:
self.test_module = load_module(test_module_path)
except Exception as e:
raise ValueError(f"Failed to load test evaluation module {test_module_path}: {e}")

def evaluate(self):
self.test_module.evaluate_results(self.task_directory)
return self.test_module.get_results()

def task_status_to_run_status(cape_task_status):
if cape_task_status == db_task.TASK_REPORTED:
return TEST_COMPLETE
if cape_task_status == db_task.TASK_PENDING:
return TEST_QUEUED
if cape_task_status in [db_task.TASK_RUNNING,
db_task.TASK_DISTRIBUTED,
db_task.TASK_RECOVERED,
db_task.TASK_COMPLETED,
db_task.TASK_DISTRIBUTED_COMPLETED]:
return TEST_RUNNING
if cape_task_status in [db_task.TASK_BANNED,
db_task.TASK_FAILED_ANALYSIS,
db_task.TASK_FAILED_PROCESSING,
db_task.TASK_FAILED_REPORTING
]:
return TEST_FAILED

raise Exception(f"Unknown cape task status: {cape_task_status}")
Loading