From b67a0146db1150a9efe2a6f0a544075948b5702f Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Sat, 31 May 2025 16:18:03 +0900 Subject: [PATCH] feat: add command model and tests for msfconsole and meterpreter commands; include permissions configuration --- .claude/settings.local.json | 8 + .../src/wish_models/command_result/command.py | 245 ++++++++++++++- .../tests/command_result/test_command.py | 289 ++++++++++++++++++ 3 files changed, 540 insertions(+), 2 deletions(-) create mode 100644 .claude/settings.local.json create mode 100644 wish-models/tests/command_result/test_command.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..849c897 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,8 @@ +{ + "permissions": { + "allow": [ + "Bash(make lint)" + ], + "deny": [] + } +} \ No newline at end of file diff --git a/wish-models/src/wish_models/command_result/command.py b/wish-models/src/wish_models/command_result/command.py index ebc6f08..90695f4 100644 --- a/wish-models/src/wish_models/command_result/command.py +++ b/wish-models/src/wish_models/command_result/command.py @@ -2,7 +2,7 @@ import json from enum import Enum -from typing import Any, Dict +from typing import Any, Dict, List from pydantic import BaseModel @@ -12,7 +12,8 @@ class CommandType(Enum): BASH = "bash" MSFCONSOLE = "msfconsole" - # Future extensions + MSFCONSOLE_RESOURCE = "msfconsole_resource" + METERPRETER = "meterpreter" PYTHON = "python" POWERSHELL = "powershell" @@ -119,6 +120,246 @@ def create_msfconsole_command( tool_parameters=tool_parameters ) + @classmethod + def create_msfconsole_resource_command( + cls, + commands: List[str], + resource_file: str = "/tmp/msf_exploit.rc", + **kwargs + ) -> "Command": + """Create an msfconsole resource file command. + + Args: + commands: List of MSF commands to execute + resource_file: Path to resource file + **kwargs: Additional tool parameters + + Returns: + Command configured for msfconsole resource execution. + """ + # Create resource file content + resource_content = '\n'.join(commands) + bash_cmd = f"cat << 'EOF' > {resource_file}\n{resource_content}\nEOF\nmsfconsole -q -r {resource_file}" + + tool_parameters = { + "resource_file": resource_file, + "commands": commands, + "timeout": 600, + **kwargs + } + + return cls( + command=bash_cmd, + tool_type=CommandType.MSFCONSOLE_RESOURCE, + tool_parameters=tool_parameters + ) + + @classmethod + def create_meterpreter_command( + cls, + command: str, + session_id: str = "", + **kwargs + ) -> "Command": + """Create a meterpreter session command. + + Args: + command: The meterpreter command to execute + session_id: Meterpreter session identifier + **kwargs: Additional tool parameters + + Returns: + Command configured for meterpreter execution. + """ + tool_parameters = { + "timeout": 300, + **kwargs + } + if session_id: + tool_parameters["session_id"] = session_id + + return cls( + command=command, + tool_type=CommandType.METERPRETER, + tool_parameters=tool_parameters + ) + + @classmethod + def create_python_command( + cls, + script_path: str, + arguments: List[str] = None, + **kwargs + ) -> "Command": + """Create a python command. + + Args: + script_path: Path to Python script + arguments: Script arguments + **kwargs: Additional tool parameters + + Returns: + Command configured for python execution. + """ + if arguments is None: + arguments = [] + + args_str = ' '.join(f'"{arg}"' for arg in arguments) + command = f"python3 {script_path} {args_str}".strip() + + tool_parameters = { + "script_path": script_path, + "arguments": arguments, + "timeout": 300, + **kwargs + } + + return cls( + command=command, + tool_type=CommandType.PYTHON, + tool_parameters=tool_parameters + ) + + @classmethod + def create_powershell_command( + cls, + command: str, + execution_policy: str = "Bypass", + **kwargs + ) -> "Command": + """Create a PowerShell command. + + Args: + command: The PowerShell command/script to execute + execution_policy: PowerShell execution policy + **kwargs: Additional tool parameters + + Returns: + Command configured for PowerShell execution. + """ + tool_parameters = { + "execution_policy": execution_policy, + "timeout": 300, + **kwargs + } + + return cls( + command=command, + tool_type=CommandType.POWERSHELL, + tool_parameters=tool_parameters + ) + + def validate_tool_parameters(self) -> List[str]: + """Validate tool parameters for the specified tool type. + + Returns: + List of validation error messages. + """ + errors = [] + + if self.tool_type == CommandType.MSFCONSOLE: + # MSFコマンドでexploit/を使う場合はRHOSTSが必要 + if 'use exploit/' in self.command: + if not self.tool_parameters.get('rhosts'): + errors.append("RHOSTS parameter required for exploit modules") + + # Reverse payloadにはLHOSTが必要 + if any(payload in self.command.lower() for payload in ['reverse', 'bind']): + if not self.tool_parameters.get('lhost'): + errors.append("LHOST parameter required for reverse/bind payloads") + + elif self.tool_type == CommandType.MSFCONSOLE_RESOURCE: + if not self.tool_parameters.get('commands'): + errors.append("Commands list required for resource file execution") + + elif self.tool_type == CommandType.PYTHON: + if not self.tool_parameters.get('script_path'): + errors.append("Script path required for Python execution") + + return errors + + @property + def is_valid(self) -> bool: + """Check if command is valid for its tool type.""" + return len(self.validate_tool_parameters()) == 0 + + def get_execution_context(self) -> Dict[str, Any]: + """Get execution context information for the command. + + Returns: + Dictionary containing execution context details. + """ + context = { + "tool_type": self.tool_type.value, + "requires_privileges": self._requires_privileges(), + "estimated_duration": self._estimate_duration(), + "risk_level": self._assess_risk_level(), + "dependencies": self._get_dependencies() + } + return context + + def _requires_privileges(self) -> bool: + """Check if command requires elevated privileges.""" + privileged_commands = ['sudo', 'su', 'chmod', 'chown', 'mount', 'systemctl'] + if self.tool_type == CommandType.BASH: + return any(cmd in self.command.lower() for cmd in privileged_commands) + elif self.tool_type == CommandType.MSFCONSOLE: + return 'exploit' in self.command.lower() + return False + + def _estimate_duration(self) -> int: + """Estimate command execution duration in seconds.""" + if self.tool_type == CommandType.BASH: + if 'nmap' in self.command: + return 180 # nmap scans take time + elif any(tool in self.command for tool in ['hydra', 'john', 'hashcat']): + return 1800 # Password cracking tools + elif self.tool_type == CommandType.MSFCONSOLE: + return 60 # MSF commands typically quick + elif self.tool_type == CommandType.MSFCONSOLE_RESOURCE: + return 120 # Resource files may take longer + + return self.tool_parameters.get('timeout', 300) + + def _assess_risk_level(self) -> str: + """Assess risk level of command execution.""" + high_risk_patterns = ['rm -rf', 'dd if=', 'mkfs', 'fdisk', 'exploit', 'payload'] + medium_risk_patterns = ['chmod', 'chown', 'sudo', 'su', 'mount'] + + command_lower = self.command.lower() + + if any(pattern in command_lower for pattern in high_risk_patterns): + return "HIGH" + elif any(pattern in command_lower for pattern in medium_risk_patterns): + return "MEDIUM" + else: + return "LOW" + + def _get_dependencies(self) -> List[str]: + """Get list of command dependencies.""" + deps = [] + + if self.tool_type in [CommandType.MSFCONSOLE, CommandType.MSFCONSOLE_RESOURCE]: + deps.append("metasploit-framework") + elif self.tool_type == CommandType.METERPRETER: + deps.append("metasploit-framework") + elif self.tool_type == CommandType.PYTHON: + deps.append("python3") + elif self.tool_type == CommandType.POWERSHELL: + deps.append("powershell") + + # Tool-specific dependencies from command content + if 'nmap' in self.command: + deps.append("nmap") + if 'hydra' in self.command: + deps.append("hydra") + if 'john' in self.command: + deps.append("john") + if 'hashcat' in self.command: + deps.append("hashcat") + + return list(set(deps)) # Remove duplicates + def parse_command_from_string(input_str: str) -> Command: """Parse command input from string (JSON or plain command). diff --git a/wish-models/tests/command_result/test_command.py b/wish-models/tests/command_result/test_command.py new file mode 100644 index 0000000..7971da6 --- /dev/null +++ b/wish-models/tests/command_result/test_command.py @@ -0,0 +1,289 @@ +"""Tests for Command model.""" + + +from wish_models.command_result.command import Command, CommandType, parse_command_from_string + + +class TestCommand: + """Test Command class functionality.""" + + def test_command_creation(self): + """Test basic Command creation.""" + cmd = Command( + command="echo hello", + tool_type=CommandType.BASH, + tool_parameters={"timeout": 30} + ) + + assert cmd.command == "echo hello" + assert cmd.tool_type == CommandType.BASH + assert cmd.tool_parameters["timeout"] == 30 + + def test_create_bash_command(self): + """Test bash command factory method.""" + cmd = Command.create_bash_command( + "nmap -sS 10.10.10.40", + timeout=600, + category="network" + ) + + assert cmd.command == "nmap -sS 10.10.10.40" + assert cmd.tool_type == CommandType.BASH + assert cmd.tool_parameters["timeout"] == 600 + assert cmd.tool_parameters["category"] == "network" + + def test_create_msfconsole_command(self): + """Test msfconsole command factory method.""" + cmd = Command.create_msfconsole_command( + "use exploit/windows/smb/ms17_010_eternalblue", + module="exploit/windows/smb/ms17_010_eternalblue", + rhosts="10.10.10.40", + lhost="10.10.14.1" + ) + + assert cmd.command == "use exploit/windows/smb/ms17_010_eternalblue" + assert cmd.tool_type == CommandType.MSFCONSOLE + assert cmd.tool_parameters["module"] == "exploit/windows/smb/ms17_010_eternalblue" + assert cmd.tool_parameters["rhosts"] == "10.10.10.40" + assert cmd.tool_parameters["lhost"] == "10.10.14.1" + + def test_create_msfconsole_resource_command(self): + """Test msfconsole resource command factory method.""" + commands = [ + "use exploit/windows/smb/ms17_010_eternalblue", + "set RHOSTS 10.10.10.40", + "set LHOST 10.10.14.1", + "exploit" + ] + + cmd = Command.create_msfconsole_resource_command( + commands=commands, + resource_file="/tmp/test.rc" + ) + + assert cmd.tool_type == CommandType.MSFCONSOLE_RESOURCE + assert cmd.tool_parameters["commands"] == commands + assert cmd.tool_parameters["resource_file"] == "/tmp/test.rc" + assert "cat << 'EOF' > /tmp/test.rc" in cmd.command + assert "msfconsole -q -r /tmp/test.rc" in cmd.command + + def test_create_meterpreter_command(self): + """Test meterpreter command factory method.""" + cmd = Command.create_meterpreter_command( + "getuid", + session_id="1" + ) + + assert cmd.command == "getuid" + assert cmd.tool_type == CommandType.METERPRETER + assert cmd.tool_parameters["session_id"] == "1" + + def test_create_python_command(self): + """Test python command factory method.""" + cmd = Command.create_python_command( + script_path="exploit.py", + arguments=["--target", "10.10.10.40", "--port", "80"] + ) + + assert cmd.tool_type == CommandType.PYTHON + assert cmd.tool_parameters["script_path"] == "exploit.py" + assert cmd.tool_parameters["arguments"] == ["--target", "10.10.10.40", "--port", "80"] + assert 'python3 exploit.py "--target" "10.10.10.40" "--port" "80"' == cmd.command + + def test_create_powershell_command(self): + """Test PowerShell command factory method.""" + cmd = Command.create_powershell_command( + "Get-Process", + execution_policy="RemoteSigned" + ) + + assert cmd.command == "Get-Process" + assert cmd.tool_type == CommandType.POWERSHELL + assert cmd.tool_parameters["execution_policy"] == "RemoteSigned" + + def test_validate_tool_parameters_msfconsole_exploit(self): + """Test validation for MSF exploit commands.""" + # Command with exploit but missing RHOSTS + cmd = Command( + command="use exploit/windows/smb/ms17_010_eternalblue", + tool_type=CommandType.MSFCONSOLE, + tool_parameters={} + ) + + errors = cmd.validate_tool_parameters() + assert len(errors) == 1 + assert "RHOSTS parameter required" in errors[0] + + def test_validate_tool_parameters_msfconsole_reverse_payload(self): + """Test validation for reverse payload commands.""" + cmd = Command( + command="set payload windows/meterpreter/reverse_tcp", + tool_type=CommandType.MSFCONSOLE, + tool_parameters={} + ) + + errors = cmd.validate_tool_parameters() + assert len(errors) == 1 + assert "LHOST parameter required" in errors[0] + + def test_validate_tool_parameters_msfconsole_resource(self): + """Test validation for MSF resource commands.""" + cmd = Command( + command="msfconsole -q -r /tmp/exploit.rc", + tool_type=CommandType.MSFCONSOLE_RESOURCE, + tool_parameters={} + ) + + errors = cmd.validate_tool_parameters() + assert len(errors) == 1 + assert "Commands list required" in errors[0] + + def test_validate_tool_parameters_python(self): + """Test validation for Python commands.""" + cmd = Command( + command="python3 exploit.py", + tool_type=CommandType.PYTHON, + tool_parameters={} + ) + + errors = cmd.validate_tool_parameters() + assert len(errors) == 1 + assert "Script path required" in errors[0] + + def test_is_valid_property(self): + """Test is_valid property.""" + # Valid bash command + cmd = Command.create_bash_command("ls -la") + assert cmd.is_valid + + # Invalid MSF command (missing RHOSTS) + cmd = Command( + command="use exploit/windows/smb/ms17_010_eternalblue", + tool_type=CommandType.MSFCONSOLE, + tool_parameters={} + ) + assert not cmd.is_valid + + def test_get_execution_context(self): + """Test execution context information.""" + cmd = Command.create_bash_command("nmap -sS 10.10.10.40") + context = cmd.get_execution_context() + + assert context["tool_type"] == "bash" + assert "requires_privileges" in context + assert "estimated_duration" in context + assert "risk_level" in context + assert "dependencies" in context + assert "nmap" in context["dependencies"] + + def test_requires_privileges(self): + """Test privilege requirement detection.""" + # Privileged command + cmd = Command.create_bash_command("sudo systemctl restart nginx") + assert cmd._requires_privileges() + + # Non-privileged command + cmd = Command.create_bash_command("ls -la") + assert not cmd._requires_privileges() + + # MSF exploit command (privileged) + cmd = Command.create_msfconsole_command("exploit") + assert cmd._requires_privileges() + + def test_estimate_duration(self): + """Test duration estimation.""" + # nmap command + cmd = Command.create_bash_command("nmap -sS 10.10.10.40") + assert cmd._estimate_duration() == 180 + + # hydra command + cmd = Command.create_bash_command("hydra -l admin -P passwords.txt ssh://10.10.10.40") + assert cmd._estimate_duration() == 1800 + + # MSF command + cmd = Command.create_msfconsole_command("exploit") + assert cmd._estimate_duration() == 60 + + def test_assess_risk_level(self): + """Test risk level assessment.""" + # High risk command + cmd = Command.create_bash_command("rm -rf /tmp/test") + assert cmd._assess_risk_level() == "HIGH" + + # Medium risk command + cmd = Command.create_bash_command("sudo chmod 777 /etc/passwd") + assert cmd._assess_risk_level() == "MEDIUM" + + # Low risk command + cmd = Command.create_bash_command("ls -la") + assert cmd._assess_risk_level() == "LOW" + + def test_get_dependencies(self): + """Test dependency detection.""" + # nmap command + cmd = Command.create_bash_command("nmap -sS 10.10.10.40") + deps = cmd._get_dependencies() + assert "nmap" in deps + + # MSF command + cmd = Command.create_msfconsole_command("exploit") + deps = cmd._get_dependencies() + assert "metasploit-framework" in deps + + # Python command + cmd = Command.create_python_command("exploit.py") + deps = cmd._get_dependencies() + assert "python3" in deps + + def test_to_json_from_json(self): + """Test JSON serialization/deserialization.""" + cmd = Command.create_bash_command( + "nmap -sS 10.10.10.40", + timeout=600, + category="network" + ) + + json_str = cmd.to_json() + cmd2 = Command.from_json(json_str) + + assert cmd.command == cmd2.command + assert cmd.tool_type == cmd2.tool_type + assert cmd.tool_parameters == cmd2.tool_parameters + + def test_parse_command_from_string_plain(self): + """Test parsing plain command string.""" + cmd = parse_command_from_string("ls -la") + + assert cmd.command == "ls -la" + assert cmd.tool_type == CommandType.BASH + + def test_parse_command_from_string_json(self): + """Test parsing JSON command string.""" + json_str = '{"command": "nmap -sS 10.10.10.40", "tool_type": "bash", "tool_parameters": {"timeout": 600}}' + cmd = parse_command_from_string(json_str) + + assert cmd.command == "nmap -sS 10.10.10.40" + assert cmd.tool_type == CommandType.BASH + assert cmd.tool_parameters["timeout"] == 600 + + +class TestCommandType: + """Test CommandType enum.""" + + def test_command_type_values(self): + """Test all CommandType enum values.""" + assert CommandType.BASH.value == "bash" + assert CommandType.MSFCONSOLE.value == "msfconsole" + assert CommandType.MSFCONSOLE_RESOURCE.value == "msfconsole_resource" + assert CommandType.METERPRETER.value == "meterpreter" + assert CommandType.PYTHON.value == "python" + assert CommandType.POWERSHELL.value == "powershell" + + def test_command_type_membership(self): + """Test CommandType enum membership.""" + assert CommandType.BASH in CommandType + assert CommandType.MSFCONSOLE in CommandType + assert CommandType.MSFCONSOLE_RESOURCE in CommandType + assert CommandType.METERPRETER in CommandType + assert CommandType.PYTHON in CommandType + assert CommandType.POWERSHELL in CommandType