Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ jobs:
- name: Run unit tests
run: poetry run python -m pytest --import-mode=append

- name: Run Linux analyzer unit tests
run: poetry --project . --directory "analyzer/linux" run python -m pytest -v

# see the mypy configuration in pyproject.toml
- name: Run mypy
run: poetry run mypy
Expand Down
5 changes: 4 additions & 1 deletion analyzer/linux/lib/api/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ def is_alive(self):
return True

def get_parent_pid(self):
return int(self.get_proc_status().get("PPid"))
try:
return int(self.get_proc_status().get("PPid"))
except (TypeError, ValueError):
return None

def get_proc_status(self):
try:
Expand Down
48 changes: 29 additions & 19 deletions analyzer/linux/lib/core/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,25 @@ def _found_target_class(module, name):


def _guess_package_name(file_type, file_name):
if "Bourne-Again" in file_type or "bash" in file_type:
return "bash"
elif "Zip archive" in file_type:
return "zip"
elif "gzip compressed data" in file_type:
return "zip"
elif "PDF document" in file_type or file_name.endswith(".pdf"):
return "pdf"
elif "Composite Document File V2 Document" in file_type or file_name.endswith(".doc"):
return "doc"
elif "Microsoft Word" in file_type or file_name.endswith(".docx"):
return "doc"
elif "ELF" in file_type:
return "generic"
elif "Unicode text" in file_type or file_name.endswith(".js"):
return "js"
try:
if "Bourne-Again" in file_type or "bash" in file_type:
return "bash"
elif "Zip archive" in file_type:
return "zip"
elif "gzip compressed data" in file_type:
return "zip"
elif "PDF document" in file_type or file_name.endswith(".pdf"):
return "pdf"
elif "Composite Document File V2 Document" in file_type or file_name.endswith(".doc"):
return "doc"
elif "Microsoft Word" in file_type or file_name.endswith(".docx"):
return "doc"
elif "ELF" in file_type:
return "generic"
elif "Unicode text" in file_type or file_name.endswith(".js"):
return "js"
except (TypeError, AttributeError):
pass
return None


Expand Down Expand Up @@ -101,9 +104,16 @@ def __init__(self, target, **kwargs):
self.timeout = kwargs.get("timeout")
# Command-line arguments for the target.

_args = self.options.get("arguments", [])
if isinstance(_args, str):
self.args = _args.split()
def _args():
args = self.options.get("arguments")
if isinstance(args, list):
return args
if isinstance(args, str):
return args.split()
return []

self.args = _args()

# Choose an analysis method (or fallback to apicalls)
self.method = self.options.get("method", "apicalls")
# Should our target be launched as root or not
Expand Down
2 changes: 2 additions & 0 deletions analyzer/linux/pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[pytest]
pythonpath = .
97 changes: 97 additions & 0 deletions analyzer/linux/tests/lib/api/test_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from unittest.mock import Mock, patch, mock_open, PropertyMock
from lib.api.process import Process
import base64

import pytest

proc_status = """Name: test-process
Umask: 0002
State: R (running)
Tgid: 42
Ngid: 0
Pid: 42
PPid: 24"""

@pytest.fixture
def os_path_exists(monkeypatch):
monkeypatch.setattr("os.path.exists", Mock(return_value=True))
yield

@pytest.fixture
def os_path_not_exists(monkeypatch):
monkeypatch.setattr("os.path.exists", Mock(return_value=False))
yield

@pytest.fixture
def fake_proc_status_file(monkeypatch):
monkeypatch.setattr("builtins.open", mock_open(read_data=proc_status))
yield

ARGS = {
"pid": 42
}

def test_init():
"""Initialize Process instances using both args and kwargs"""
kw_args_instance = Process(**ARGS)
assert kw_args_instance.pid == ARGS["pid"]
args_instance = Process(*ARGS.values())
assert args_instance.pid == ARGS["pid"]

@pytest.mark.usefixtures("os_path_exists")
def test_proc_alive_states():
for state in [
"State: R (running)",
"State: S (sleeping)",
"State: D (waiting)",
"State: T (stopped)",
"State: t (trace stopped)",
"State: W (paging)",
"State: W (waking)",
"State: P (parked)",
]:
state_file_content = proc_status.replace("State: R (running)", state)
with patch("builtins.open", mock_open(read_data=state_file_content)):
process = Process(**ARGS)
assert process.is_alive()

@pytest.mark.usefixtures("os_path_exists")
def test_proc_dead_states():
for state in [
"State: Z (zombie)",
]:
state_file_content = proc_status.replace("State: R (running)", state)
with patch("builtins.open", mock_open(read_data=state_file_content)):
process = Process(**ARGS)
alive = process.is_alive()
assert not alive

@pytest.mark.usefixtures("os_path_not_exists")
def test_proc_file_not_exists():
process = Process(**ARGS)
assert not process.is_alive()

@pytest.mark.usefixtures("os_path_exists")
def test_proc_file_corrupt():
corrupt_status = base64.b64encode(proc_status.encode("utf-8")).decode("utf-8")
with patch("builtins.open", mock_open(read_data=corrupt_status)):
process = Process(**ARGS)
assert not process.is_alive()

@pytest.mark.usefixtures("os_path_exists", "fake_proc_status_file")
def test_get_ppid():
process = Process(**ARGS)
assert 24 == process.get_parent_pid()

@patch("builtins.open", side_effect=FileNotFoundError)
def test_get_ppid_file_not_exists(bopen):
process = Process(**ARGS)
assert process.get_parent_pid() is None

@patch("subprocess.Popen")
def test_execute(popen):
process = Process(**ARGS)
type(popen.return_value).pid = PropertyMock(return_value=ARGS["pid"])
assert process.execute(["echo", "this is a test message"])
assert ARGS["pid"] == process.pid
assert popen.called
19 changes: 19 additions & 0 deletions analyzer/linux/tests/lib/core/test_packages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from lib.core.packages import _guess_package_name
import pytest

@pytest.mark.parametrize("file_type, file_name, expected_package_name", [
("", "", None),
("Bourne-Again", None, "bash"),
("Zip archive", None, "zip"),
("gzip compressed data", None, "zip"),
("PDF document", "test.pdf", "pdf"),
("Composite Document File V2 Document", "test.docx", "doc"),
("Microsoft Word", "test.docx", "doc"),
("ELF", None, "generic"),
("Unicode text", "malware.js", "js")
])
def test__guess_package_name(file_type, file_name, expected_package_name):
assert _guess_package_name(file_type, "") == expected_package_name, f"Expected {expected_package_name} for {file_type}, {file_name}"
if file_name:
assert _guess_package_name("", file_name) == expected_package_name, f"Expected {expected_package_name} for {file_type}, {file_name}"
assert _guess_package_name(file_type, file_name) == expected_package_name, f"Expected {expected_package_name} for {file_type}, {file_name}"
53 changes: 53 additions & 0 deletions analyzer/linux/tests/lib/core/test_startup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from unittest.mock import patch, Mock
import logging
from logging import StreamHandler
import pytest

from lib.common.results import NetlogHandler
import lib.core.startup
import lib.core.config

import lib

@pytest.fixture
def patch_netloghandler(monkeypatch):
monkeypatch.setattr(NetlogHandler, "__init__", Mock(return_value=None))
monkeypatch.setattr(NetlogHandler, "connect", Mock())
yield

@patch('os.makedirs')
@patch('os.path.exists')
def test_create_folders_path_not_exists(os_path_exists, os_mkdirs):
"""Test initial folder creation with paths that do not exist"""
# Fake path not existing
os_path_exists.return_value = False
lib.core.startup.create_folders()
assert os_path_exists.called
# Ensure there is an attempt to create a folder
assert os_mkdirs.called

@patch('os.makedirs')
@patch('os.path.exists')
def test_create_folders_path_exists(os_path_exists, os_mkdirs):
"""Test initial folder creation with paths that already exist"""
# Fake path not existing
os_path_exists.return_value = True
lib.core.startup.create_folders()
assert os_path_exists.called
# Ensure there are no attempts to create a folder
assert not os_mkdirs.called

@pytest.mark.usefixtures("patch_netloghandler")
@patch("logging.Logger.addHandler")
def test_init_logging(addhandler):
"""Ensure init_logging adds the right log handlers"""
lib.core.startup.init_logging()
handlers = []
# Get a list of all the types of handlers that are being added
for name, args, kwargs in addhandler.mock_calls:
handlers = [*handlers, *[type(arg) for arg in args]]
# Ensure there is a StreamHandler and a NetlogHandler
assert StreamHandler in handlers
assert NetlogHandler in handlers
# Ensure log level is set to DEBUG
assert lib.core.startup.log.level == logging.DEBUG
35 changes: 35 additions & 0 deletions analyzer/linux/tests/modules/packages/test_package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import sys
import unittest
import pytest

import lib

from lib.core.packages import Package

@pytest.fixture
def patch_netlogfile(monkeypatch):
class MockNetlogFile:
def init(self, *args):
return
def close(self):
return
monkeypatch.setattr(lib.core.packages, "NetlogFile", MockNetlogFile)
monkeypatch.setattr(lib.core.packages, "append_buffer_to_host", lambda *args: None)
yield

class TestPackage(unittest.TestCase):

@pytest.mark.usefixtures("patch_netlogfile")
def test_package_init_args(self):
pkg = Package(sys.executable, options={})
self.assertEqual(pkg.args, [])

@pytest.mark.usefixtures("patch_netlogfile")
def test_package_init_args_list(self):
pkg = Package(sys.executable, options={"arguments": ["foo", "bar"]})
self.assertEqual(pkg.args, ["foo", "bar"])

@pytest.mark.usefixtures("patch_netlogfile")
def test_package_init_args_str(self):
pkg = Package(sys.executable, options={"arguments": "foo bar"})
self.assertEqual(pkg.args, ["foo", "bar"])
34 changes: 34 additions & 0 deletions analyzer/linux/tests/test_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import sys
import unittest

import pytest

if sys.platform == "linux":
import analyzer
from analyzer import PROCESS_LIST, SEEN_LIST

@pytest.mark.skipif(sys.platform != "linux", reason="Requires Linux")
class TestAnalyzer(unittest.TestCase):

def test_add_pids(self):
"""Test add_pids with a variety of valid types"""
# Check that both sets are empty
self.assertEqual(PROCESS_LIST, set())
self.assertEqual(SEEN_LIST, set())

pids = [123, 456, 789]
# Add a list of PIDs
analyzer.add_pids([str(pids[0]), pids[1]])
# Add a set of PIDs
analyzer.add_pids(set([pids[0], pids[2]]))
# Add a tuple of PIDs
analyzer.add_pids((pids[1], pids[2]))

self.assertEqual(PROCESS_LIST, set(pids))
self.assertEqual(SEEN_LIST, set(pids))


def test_add_pids_invalid_var(self):
"""Test add_pids with an invalid type"""
with self.assertRaises(TypeError):
analyzer.add_pids(analyzer.add_pids)
3 changes: 3 additions & 0 deletions conf/default/cuckoo.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ critical = 60
# shutting down a vm. Default is 300 seconds.
vm_state = 300

# Will kill VM if: Task time = timeout + critical + stuck_seconds
stuck_seconds = 100

[tmpfs]
# only if you using volatility to speedup IO
# mkdir -p /mnt/tmpfs
Expand Down
Loading