diff --git a/README.md b/README.md index dda3f6481..1c1c00e55 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,7 @@ cortex install "tools for video compression" | **Docker Permission Fixer** | Fix root-owned bind mount issues automatically | | **Audit Trail** | Complete history in `~/.cortex/history.db` | | **Hardware-Aware** | Detects GPU, CPU, memory for optimized packages | +| **Predictive Error Prevention** | AI-driven checks for potential installation failures | | **Multi-LLM Support** | Works with Claude, GPT-4, or local Ollama models | --- @@ -415,6 +416,7 @@ pip install -e . - [x] Dry-run preview mode - [x] Docker bind-mount permission fixer - [x] Automatic Role Discovery (AI-driven system context sensing) +- [x] Predictive Error Prevention (pre-install compatibility checks) ### In Progress - [ ] Conflict resolution UI diff --git a/cortex/cli.py b/cortex/cli.py index 2eca722a7..267228b0e 100644 --- a/cortex/cli.py +++ b/cortex/cli.py @@ -11,32 +11,27 @@ from pathlib import Path from typing import TYPE_CHECKING, Any +from rich.console import Console from rich.markdown import Markdown +from rich.panel import Panel +from rich.table import Table from cortex.api_key_detector import auto_detect_api_key, setup_api_key from cortex.ask import AskHandler from cortex.branding import VERSION, console, cx_header, cx_print, show_banner from cortex.coordinator import InstallationCoordinator, InstallationStep, StepStatus from cortex.demo import run_demo -from cortex.dependency_importer import ( - DependencyImporter, - PackageEcosystem, - ParseResult, -) +from cortex.dependency_importer import DependencyImporter, PackageEcosystem, ParseResult from cortex.env_manager import EnvironmentManager, get_env_manager -from cortex.i18n import ( - SUPPORTED_LANGUAGES, - LanguageConfig, - get_language, - set_language, - t, -) +from cortex.i18n import SUPPORTED_LANGUAGES, LanguageConfig, get_language, set_language, t from cortex.installation_history import InstallationHistory, InstallationStatus, InstallationType from cortex.llm.interpreter import CommandInterpreter from cortex.network_config import NetworkConfig from cortex.notification_manager import NotificationManager +from cortex.predictive_prevention import FailurePrediction, PredictiveErrorManager, RiskLevel from cortex.role_manager import RoleManager from cortex.stack_manager import StackManager +from cortex.stdin_handler import StdinHandler from cortex.uninstall_impact import ( ImpactResult, ImpactSeverity, @@ -59,10 +54,18 @@ logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("cortex.installation_history").setLevel(logging.ERROR) + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) class CortexCLI: + RISK_COLORS = { + RiskLevel.NONE: "green", + RiskLevel.LOW: "green", + RiskLevel.MEDIUM: "yellow", + RiskLevel.HIGH: "orange1", + RiskLevel.CRITICAL: "red", + } # Installation messages INSTALL_FAIL_MSG = "Installation failed" @@ -70,6 +73,23 @@ def __init__(self, verbose: bool = False): self.spinner_chars = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] self.spinner_idx = 0 self.verbose = verbose + self.predict_manager = None + + @property + def risk_labels(self) -> dict[RiskLevel, str]: + """ + Localized mapping from RiskLevel enum values to human-readable strings. + + Returns a dictionary mapping each tier (RiskLevel.NONE to CRITICAL) + to its corresponding localized label via the t() translation helper. + """ + return { + RiskLevel.NONE: t("predictive.no_risk"), + RiskLevel.LOW: t("predictive.low_risk"), + RiskLevel.MEDIUM: t("predictive.medium_risk"), + RiskLevel.HIGH: t("predictive.high_risk"), + RiskLevel.CRITICAL: t("predictive.critical_risk"), + } # Define a method to handle Docker-specific permission repairs def docker_permissions(self, args: argparse.Namespace) -> int: @@ -114,9 +134,10 @@ def docker_permissions(self, args: argparse.Namespace) -> int: ) try: # Interactive confirmation prompt for administrative repair. - response = console.input( - "[bold cyan]Reclaim ownership using sudo? (y/n): [/bold cyan]" + console.print( + "[bold cyan]Reclaim ownership using sudo? (y/n): [/bold cyan]", end="" ) + response = StdinHandler.get_input() if response.lower() not in ("y", "yes"): cx_print("Operation cancelled", "info") return 0 @@ -718,8 +739,8 @@ def _sandbox_promote(self, sandbox, args: argparse.Namespace) -> int: if not skip_confirm: console.print(f"\nPromote '{package}' to main system? [Y/n]: ", end="") try: - response = input().strip().lower() - if response and response not in ("y", "yes"): + response = StdinHandler.get_input() + if response and response.lower() not in ("y", "yes"): cx_print("Promotion cancelled", "warning") return 0 except (EOFError, KeyboardInterrupt): @@ -789,6 +810,46 @@ def _sandbox_exec(self, sandbox, args: argparse.Namespace) -> int: return result.exit_code + def _display_prediction_warning(self, prediction: FailurePrediction) -> None: + """Display formatted prediction warning.""" + color = self.RISK_COLORS.get(prediction.risk_level, "white") + label = self.risk_labels.get(prediction.risk_level, "Unknown") + + console.print() + if prediction.risk_level >= RiskLevel.HIGH: + console.print(f"⚠️ [bold red]{t('predictive.risks_detected')}:[/bold red]") + else: + console.print(f"ℹ️ [bold {color}]{t('predictive.risks_detected')}:[/bold {color}]") + + if prediction.reasons: + console.print(f"\n[bold]{label}:[/bold]") + for reason in prediction.reasons: + console.print(f" - {reason}") + + if prediction.recommendations: + console.print(f"\n[bold]{t('predictive.recommendation')}:[/bold]") + for i, rec in enumerate(prediction.recommendations, 1): + console.print(f" {i}. {rec}") + + if prediction.predicted_errors: + console.print(f"\n[bold]{t('predictive.predicted_errors')}:[/bold]") + for err in prediction.predicted_errors: + msg = f"{err[:100]}..." if len(err) > 100 else err + console.print(f" ! [dim]{msg}[/dim]") + + def _confirm_risky_operation(self, prediction: FailurePrediction) -> bool: + """Prompt user for confirmation of a risky operation.""" + if prediction.risk_level == RiskLevel.HIGH or prediction.risk_level == RiskLevel.CRITICAL: + cx_print(f"\n{t('predictive.high_risk_warning')}", "warning") + + console.print(f"\n{t('predictive.continue_anyway')} [y/N]: ", end="", markup=False) + try: + response = StdinHandler.get_input().lower() + return response in ("y", "yes") + except (EOFError, KeyboardInterrupt): + console.print() + return False + # --- End Sandbox Commands --- def ask(self, question: str) -> int: @@ -1446,6 +1507,24 @@ def install( self._print_error(t("install.no_commands")) return 1 + # Predictive Analysis + if not json_output: + self._print_status("🔮", t("predictive.analyzing")) + if not self.predict_manager: + self.predict_manager = PredictiveErrorManager(api_key=api_key, provider=provider) + prediction = self.predict_manager.analyze_installation(software, commands) + if not json_output: + self._clear_line() + + if not json_output: + if prediction.risk_level != RiskLevel.NONE: + self._display_prediction_warning(prediction) + if execute and not self._confirm_risky_operation(prediction): + cx_print(f"\n{t('ui.operation_cancelled')}", "warning") + return 0 + else: + cx_print(t("predictive.no_issues_detected"), "success") + # Extract packages from commands for tracking packages = history._extract_packages_from_commands(commands) @@ -1457,12 +1536,17 @@ def install( # If JSON output requested, return structured data and exit early if json_output: - output = { "success": True, "commands": commands, "packages": packages, "install_id": install_id, + "prediction": { + "risk_level": prediction.risk_level.name, + "reasons": prediction.reasons, + "recommendations": prediction.recommendations, + "predicted_errors": prediction.predicted_errors, + }, } print(json.dumps(output, indent=2)) return 0 @@ -1780,7 +1864,7 @@ def _confirm_removal(self, package: str, purge: bool) -> bool: confirm_msg += " and purge configuration" confirm_msg += "? [y/N]: " try: - response = input(confirm_msg).strip().lower() + response = StdinHandler.get_input(confirm_msg).lower() return response in ("y", "yes") except (EOFError, KeyboardInterrupt): console.print() @@ -1800,8 +1884,6 @@ def _removal_blocked_or_cancelled(self, result, force: bool) -> int: def _display_impact_report(self, result: ImpactResult) -> None: """Display formatted impact analysis report""" - from rich.panel import Panel - from rich.table import Table # Severity styling severity_styles = { @@ -2327,7 +2409,6 @@ def status(self): def update(self, args: argparse.Namespace) -> int: """Handle the update command for self-updating Cortex.""" from rich.progress import Progress, SpinnerColumn, TextColumn - from rich.table import Table # Parse channel channel_str = getattr(args, "channel", "stable") @@ -3553,7 +3634,9 @@ def _env_clear(self, env_mgr: EnvironmentManager, args: argparse.Namespace) -> i # Confirm unless --force is used if not force: - confirm = input(f"⚠️ Clear ALL environment variables for '{app}'? (y/n): ") + confirm = StdinHandler.get_input( + f"⚠️ Clear ALL environment variables for '{app}'? (y/n): " + ) if confirm.lower() != "y": cx_print("Operation cancelled", "info") return 0 @@ -4174,7 +4257,7 @@ def _import_all(self, importer: DependencyImporter, execute: bool, include_dev: # Execute mode - confirm before installing total = total_packages + total_dev_packages - confirm = input(f"\nInstall all {total} packages? [Y/n]: ") + confirm = StdinHandler.get_input(f"\nInstall all {total} packages? [Y/n]: ") if confirm.lower() not in ["", "y", "yes"]: cx_print("Installation cancelled", "info") return 0 @@ -4481,7 +4564,6 @@ def show_rich_help(): for all core Cortex utilities including installation, environment management, and container tools. """ - from rich.table import Table show_banner(show_version=True) console.print() @@ -4574,7 +4656,7 @@ def main(): # Check for updates on startup (cached, non-blocking) # Only show notification for commands that aren't 'update' itself try: - if temp_args.command not in ["update", None]: + if temp_args.command not in ["update", None] and "--json" not in sys.argv: update_release = should_notify_update() if update_release: console.print( @@ -4727,6 +4809,11 @@ def main(): action="store_true", help="Enable parallel execution for multi-step installs", ) + install_parser.add_argument( + "--json", + action="store_true", + help="Output as JSON", + ) install_parser.add_argument( "--mic", action="store_true", @@ -5343,6 +5430,14 @@ def main(): args = parser.parse_args() + # Configure logging based on parsed arguments + if getattr(args, "json", False): + logging.getLogger("cortex").setLevel(logging.ERROR) + # Also suppress common SDK loggers + logging.getLogger("anthropic").setLevel(logging.ERROR) + logging.getLogger("openai").setLevel(logging.ERROR) + logging.getLogger("httpcore").setLevel(logging.ERROR) + # Handle --set-language global flag first (before any command) if getattr(args, "set_language", None): result = _handle_set_language(args.set_language) @@ -5466,6 +5561,7 @@ def main(): execute=args.execute, dry_run=args.dry_run, parallel=args.parallel, + json_output=args.json, ) elif args.command == "remove": # Handle --execute flag to override default dry-run diff --git a/cortex/i18n/locales/en.yaml b/cortex/i18n/locales/en.yaml index 493662fea..eb212316c 100644 --- a/cortex/i18n/locales/en.yaml +++ b/cortex/i18n/locales/en.yaml @@ -480,3 +480,22 @@ progress: cleaning_up: "Cleaning up..." # {seconds} - duration completed_in: "Completed in {seconds} seconds" + +# ============================================================================= +# Predictive Error Prevention +# ============================================================================= +predictive: + analyzing: "AI is predicting potential installation failures..." + risks_detected: "Potential issues detected" + risk_level: "Risk Level" + recommendations: "Recommendations" + predicted_errors: "Predicted Error Messages" + continue_anyway: "Continue anyway?" + high_risk_warning: "⚠️ Warning: This operation has HIGH RISK of failure." + no_issues_detected: "No compatibility issues detected" + low_risk: "Low Risk" + medium_risk: "Medium Risk" + high_risk: "High Risk" + critical_risk: "Critical Risk" + no_risk: "No Risk" + recommendation: "Recommendation" diff --git a/cortex/llm_router.py b/cortex/llm_router.py index 7599ce972..38403d0d5 100644 --- a/cortex/llm_router.py +++ b/cortex/llm_router.py @@ -232,6 +232,9 @@ def route_task( if self.kimi_client and self.enable_fallback: logger.warning("Claude unavailable, falling back to Kimi K2") provider = LLMProvider.KIMI_K2 + elif self.ollama_client and self.enable_fallback: + logger.warning("Claude unavailable, falling back to Ollama") + provider = LLMProvider.OLLAMA else: raise RuntimeError("Claude API not configured and no fallback available") @@ -239,6 +242,9 @@ def route_task( if self.claude_client and self.enable_fallback: logger.warning("Kimi K2 unavailable, falling back to Claude") provider = LLMProvider.CLAUDE + elif self.ollama_client and self.enable_fallback: + logger.warning("Kimi K2 unavailable, falling back to Ollama") + provider = LLMProvider.OLLAMA else: raise RuntimeError("Kimi K2 API not configured and no fallback available") @@ -258,6 +264,22 @@ def route_task( provider=provider, task_type=task_type, reasoning=reasoning, confidence=0.95 ) + def _get_fallback_provider(self, current: LLMProvider) -> LLMProvider | None: + """Find the next available provider that isn't the current one.""" + candidates = [] + + # Priority order: Claude -> Kimi -> Ollama + if self.claude_client and current != LLMProvider.CLAUDE: + candidates.append(LLMProvider.CLAUDE) + + if self.kimi_client and current != LLMProvider.KIMI_K2: + candidates.append(LLMProvider.KIMI_K2) + + if self.ollama_client and current != LLMProvider.OLLAMA: + candidates.append(LLMProvider.OLLAMA) + + return candidates[0] if candidates else None + def complete( self, messages: list[dict[str, str]], @@ -308,21 +330,21 @@ def complete( # Try fallback if enabled if self.enable_fallback: - fallback_provider = ( - LLMProvider.KIMI_K2 - if routing.provider == LLMProvider.CLAUDE - else LLMProvider.CLAUDE - ) - logger.info(f"🔄 Attempting fallback to {fallback_provider.value}") - - return self.complete( - messages=messages, - task_type=task_type, - force_provider=fallback_provider, - temperature=temperature, - max_tokens=max_tokens, - tools=tools, - ) + fallback_provider = self._get_fallback_provider(routing.provider) + + if fallback_provider: + logger.info(f"🔄 Attempting fallback to {fallback_provider.value}") + return self.complete( + messages=messages, + task_type=task_type, + force_provider=fallback_provider, + temperature=temperature, + max_tokens=max_tokens, + tools=tools, + ) + else: + logger.error("❌ No fallback providers available") + raise else: raise @@ -334,6 +356,9 @@ def _complete_claude( tools: list[dict] | None = None, ) -> LLMResponse: """Generate completion using Claude API.""" + if not self.claude_client: + raise RuntimeError("Claude client not initialized") + # Extract system message if present system_message = None user_messages = [] @@ -390,6 +415,9 @@ def _complete_kimi( tools: list[dict] | None = None, ) -> LLMResponse: """Generate completion using Kimi K2 API.""" + if not self.kimi_client: + raise RuntimeError("Kimi K2 client not initialized") + # Kimi K2 recommends temperature=0.6 # Map user's temperature to Kimi's scale kimi_temp = temperature * 0.6 @@ -590,21 +618,21 @@ async def acomplete( # Try fallback if enabled if self.enable_fallback: - fallback_provider = ( - LLMProvider.KIMI_K2 - if routing.provider == LLMProvider.CLAUDE - else LLMProvider.CLAUDE - ) - logger.info(f"🔄 Attempting fallback to {fallback_provider.value}") - - return await self.acomplete( - messages=messages, - task_type=task_type, - force_provider=fallback_provider, - temperature=temperature, - max_tokens=max_tokens, - tools=tools, - ) + fallback_provider = self._get_fallback_provider(routing.provider) + + if fallback_provider: + logger.info(f"🔄 Attempting fallback to {fallback_provider.value}") + return await self.acomplete( + messages=messages, + task_type=task_type, + force_provider=fallback_provider, + temperature=temperature, + max_tokens=max_tokens, + tools=tools, + ) + else: + logger.error("❌ No fallback providers available") + raise else: raise diff --git a/cortex/predictive_prevention.py b/cortex/predictive_prevention.py new file mode 100644 index 000000000..ab7171d90 --- /dev/null +++ b/cortex/predictive_prevention.py @@ -0,0 +1,363 @@ +#!/usr/bin/env python3 +""" +Predictive Error Prevention System for Cortex Linux + +Analyzes installation requests before execution to predict and prevent failures. +Uses hardware detection, historical failure analysis, and LLM-backed risk assessment. + +Issue: #54 +""" + +import json +import logging +import re +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum, IntEnum +from typing import Any, Optional + +from cortex.hardware_detection import HardwareDetector, SystemInfo +from cortex.installation_history import InstallationHistory, InstallationStatus +from cortex.llm_router import LLMProvider, LLMRouter, TaskType + +logger = logging.getLogger(__name__) + + +class RiskLevel(IntEnum): + """Risk levels for installation operations.""" + + NONE = 0 + LOW = 1 + MEDIUM = 2 + HIGH = 3 + CRITICAL = 4 + + +@dataclass +class FailurePrediction: + """Detailed prediction of potential installation failures.""" + + risk_level: RiskLevel = RiskLevel.NONE + reasons: list[str] = field(default_factory=list) + recommendations: list[str] = field(default_factory=list) + predicted_errors: list[str] = field(default_factory=list) + context_data: dict[str, Any] = field(default_factory=dict) + + +class PredictiveErrorManager: + """ + Manages predictive error analysis for software installations. + + Combines: + 1. Hardware compatibility checks + 2. Historical failure analysis + 3. LLM-based risk prediction + """ + + def __init__(self, api_key: str | None = None, provider: str | None = None): + self.detector = HardwareDetector() + self.history = InstallationHistory() + self.api_key = api_key + # Normalize provider casing to avoid missed API-key wiring + normalized_provider = provider.lower() if provider else None + self.provider = normalized_provider + + # Handle 'fake' provider used in testing + llm_provider = LLMProvider.OLLAMA + if normalized_provider: + try: + llm_provider = LLMProvider(normalized_provider) + except ValueError: + # Fallback to OLLAMA if 'fake' or other unknown provider is passed + logger.warning( + f"Provider '{provider}' not in LLMProvider enum, using OLLAMA fallback" + ) + llm_provider = LLMProvider.OLLAMA + + self.router = LLMRouter( + claude_api_key=api_key if normalized_provider == "claude" else None, + kimi_api_key=api_key if normalized_provider == "kimi_k2" else None, + default_provider=llm_provider, + ) + + def analyze_installation( + self, software: str, commands: list[str], redact: bool = True + ) -> FailurePrediction: + """ + Analyze a planned installation for potential risks. + + Args: + software: The software request string + commands: List of commands planned for execution + redact: Whether to redact sensitive data before LLM call + + Returns: + FailurePrediction object with risk details + """ + logger.info(f"Analyzing installation risk for: {software}") + + prediction = FailurePrediction() + + # 1. Get system context + system_info = self.detector.detect() + prediction.context_data["system"] = system_info.to_dict() + + # 2. Check basic system compatibility (Static rules) + self._check_static_compatibility(software, system_info, prediction) + + # 3. Analyze history for similar failures + self._analyze_history_patterns(software, commands, prediction) + + # 4. LLM-backed advanced prediction (if AI available and not in fake mode) + if (self.api_key or self.provider == "ollama") and self.provider != "fake": + # Redact sensitive data from commands before sending to LLM if requested + redacted_commands = self.redact_commands(commands) if redact else commands + self._get_llm_prediction(software, redacted_commands, system_info, prediction) + + # 5. Final risk level adjustment based on findings + self._finalize_risk_level(prediction) + + return prediction + + def redact_commands(self, commands: list[str]) -> list[str]: + """Mask potential tokens, passwords, and API keys in commands.""" + # Common patterns for sensitive data in CLI commands: + # 1. --password PASSWORD or --api-key=TOKEN + # 2. env vars like AUTH_TOKEN=xxx + redacted = [] + redact_count = 0 + for cmd in commands: + # Mask common credential flags (handles both spaces and equals) + new_cmd, count1 = re.subn( + r"(?i)(--?(?:token|api[-_]?key|password|secret|pwd|auth|key)(?:\s+|=))(\S+)", + r"\1", + cmd, + ) + # Mask env var assignments + new_cmd, count2 = re.subn( + r"(?i)\b([A-Z0-9_-]*(?:TOKEN|PASSWORD|SECRET|KEY|AUTH)=)(\S+)", + r"\1", + new_cmd, + ) + redacted.append(new_cmd) + redact_count += count1 + count2 + + if redact_count > 0: + logger.info(f"Redacted {redact_count} sensitive fields in commands before LLM call") + + return redacted + + def _check_static_compatibility( + self, software: str, system: SystemInfo, prediction: FailurePrediction + ) -> None: + """Run fast, rule-based compatibility checks.""" + normalized_software = software.lower() + + # Kernel compatibility examples + if "cuda" in normalized_software or "nvidia" in normalized_software: + # Check for very old kernels (explicitly handle None/Empty case) + if system.kernel_version: + version_match = re.search(r"^(\d+)\.(\d+)", system.kernel_version) + if version_match: + major = int(version_match.group(1)) + minor = int(version_match.group(2)) + if major < 5 or (major == 5 and minor < 4): + prediction.reasons.append( + f"Kernel version {system.kernel_version} may be too old for modern CUDA drivers (requires 5.4+)" + ) + prediction.recommendations.append("Update kernel to 5.15+ first") + prediction.risk_level = max(prediction.risk_level, RiskLevel.HIGH) + else: # Modern kernel (5.4+) found, check for driver synchronization risk + # Add a risk-focused warning for newer kernels regarding driver/header complexity + prediction.reasons.append( + f"Risk of driver-kernel mismatch on {system.kernel_version}. Modern CUDA requires perfectly synchronized kernel headers and drivers to avoid installation failure." + ) + prediction.recommendations.append( + "Verify that official NVIDIA drivers and matching kernel headers are installed before proceeding" + ) + prediction.risk_level = max(prediction.risk_level, RiskLevel.LOW) + + # RAM checks + ram_gb = system.memory.total_gb + if "docker" in normalized_software and ram_gb < 2: + prediction.reasons.append( + f"Low RAM detected ({ram_gb}GB). Docker may perform poorly or fail to start containers." + ) + prediction.recommendations.append("Ensure at least 4GB RAM for Docker environments") + prediction.risk_level = max(prediction.risk_level, RiskLevel.MEDIUM) + + # Disk space checks + for storage in system.storage: + if storage.mount_point == "/" and storage.available_gb < 2: + prediction.reasons.append( + f"Critically low disk space on root ({storage.available_gb:.1f} GB free)" + ) + prediction.recommendations.append( + "Free up at least 5GB of disk space before proceeding" + ) + prediction.risk_level = max(prediction.risk_level, RiskLevel.CRITICAL) + + def _analyze_history_patterns( + self, software: str, commands: list[str], prediction: FailurePrediction + ) -> None: + """Learn from past failures in the installation records.""" + history = self.history.get_history(limit=50, status_filter=InstallationStatus.FAILED) + + if not history: + return + + request_packages = set(software.lower().split()) + # Also extract from commands + extracted = self.history._extract_packages_from_commands(commands) + request_packages.update(p.lower() for p in extracted) + + failure_count = 0 + common_errors = [] + + for record in history: + record_packages = [p.lower() for p in record.packages] + match_found = False + for req_pkg in request_packages: + if len(req_pkg) < 2: + continue # Ignore single letters like 'a' + for hist_pkg in record_packages: + # Partial match for package names (e.g., 'docker' matches 'docker.io') + if req_pkg in hist_pkg or hist_pkg in req_pkg: + match_found = True + break + if match_found: + break + + if match_found: + failure_count += 1 + if record.error_message and record.error_message not in common_errors: + common_errors.append(record.error_message) + + if failure_count > 0: + prediction.reasons.append( + f"This software (or related components) failed {failure_count} times in previous attempts." + ) + prediction.predicted_errors.extend(common_errors[:3]) + + if failure_count >= 3: + prediction.risk_level = max(prediction.risk_level, RiskLevel.HIGH) + else: + prediction.risk_level = max(prediction.risk_level, RiskLevel.MEDIUM) + + def _get_llm_prediction( + self, software: str, commands: list[str], system: SystemInfo, prediction: FailurePrediction + ) -> None: + """Use LLM to predict complex failure scenarios.""" + try: + # Prepare context for LLM + context = { + "software": software, + "commands": commands, + "system_context": { + "kernel": system.kernel_version, + "distro": f"{system.distro or 'Unknown'} {system.distro_version or ''}".strip(), + "cpu": system.cpu.model, + "gpu": [g.model for g in system.gpu], + "ram_gb": system.memory.total_gb, + "virtualization": system.virtualization, + }, + "history_reasons": prediction.reasons, + "static_reasons": prediction.reasons, + } + + prompt = f""" + Analyze the following installation request for potential failure risks on this specific Linux system. + + USER REQUEST: {software} + COMMANDS PLANNED: {json.dumps(commands)} + SYSTEM CONTEXT: {json.dumps(context["system_context"])} + + Identify specific reasons why this might fail (dependency conflicts, hardware mismatches, kernel requirements, etc.). + Provide your response in JSON format with the following keys: + - risk_level: "none", "low", "medium", "high", "critical" + - reasons: list of strings (specific risks) + - recommendations: list of strings (how to prevent failure) + - predicted_errors: list of strings (likely error messages) + """ + + messages = [ + { + "role": "system", + "content": "You are a Linux system expert specializing in installation failure prediction and prevention.", + }, + {"role": "user", "content": prompt}, + ] + + response = self.router.complete( + messages=messages, task_type=TaskType.ERROR_DEBUGGING, temperature=0.3 + ) + + # Try to parse JSON from response + try: + # Find JSON block if it's wrapped in markdown + json_str = response.content + # Robust parsing: handle markdown code blocks or raw JSON + # Removed \s* inside regex to prevent potential backtracking issues (ReDoS) + code_block_match = re.search(r"```(?:json)?(.*?)```", json_str, re.DOTALL) + if code_block_match: + json_str = code_block_match.group(1).strip() + + # Cleanup potential non-json characters + json_data = json.loads(json_str) + + # Update prediction + llm_risk_str = json_data.get("risk_level", "none").lower() + mapping = { + "none": RiskLevel.NONE, + "low": RiskLevel.LOW, + "medium": RiskLevel.MEDIUM, + "high": RiskLevel.HIGH, + "critical": RiskLevel.CRITICAL, + } + llm_risk = mapping.get(llm_risk_str, RiskLevel.NONE) + + if llm_risk > prediction.risk_level: + prediction.risk_level = llm_risk + + prediction.reasons.extend(json_data.get("reasons", [])) + prediction.recommendations.extend(json_data.get("recommendations", [])) + prediction.predicted_errors.extend(json_data.get("predicted_errors", [])) + + except (json.JSONDecodeError, ValueError) as e: + logger.warning(f"Failed to parse LLM response as JSON: {e}") + # Fallback: just use the content if it's not JSON + if "Risk:" in response.content: + prediction.reasons.append( + "LLM detected risks: " + response.content[:200] + "..." + ) + + except Exception as e: + logger.error(f"LLM prediction failed: {e}") + prediction.reasons.append(f"Advanced AI analysis unavailable: {str(e)}") + + def _finalize_risk_level(self, prediction: FailurePrediction) -> None: + """Clean up and finalize the risk assessment.""" + # Deduplicate reasons and recommendations + prediction.reasons = list(dict.fromkeys(prediction.reasons)) + prediction.recommendations = list(dict.fromkeys(prediction.recommendations)) + prediction.predicted_errors = list(dict.fromkeys(prediction.predicted_errors)) + + # Ensure risk level matches the number of reasons if not already high + if prediction.reasons and prediction.risk_level == RiskLevel.NONE: + prediction.risk_level = RiskLevel.LOW + + # Escalate based on critical keywords in reasons + reasons_lower = [r.lower() for r in prediction.reasons] + if any("critical" in r for r in reasons_lower): + prediction.risk_level = max(prediction.risk_level, RiskLevel.CRITICAL) + elif any("unsupported" in r for r in reasons_lower): + prediction.risk_level = max(prediction.risk_level, RiskLevel.HIGH) + + +if __name__ == "__main__": + # Test block + manager = PredictiveErrorManager(provider="ollama") + pred = manager.analyze_installation("cuda-12.0", ["sudo apt-get install cuda-12.0"]) + print(f"Risk Level: {pred.risk_level}") + print(f"Reasons: {pred.reasons}") + print(f"Recommendations: {pred.recommendations}") diff --git a/cortex/stdin_handler.py b/cortex/stdin_handler.py index bc61749cc..60144d893 100644 --- a/cortex/stdin_handler.py +++ b/cortex/stdin_handler.py @@ -176,6 +176,21 @@ def read_and_truncate(self) -> StdinData: return data return self.truncate(data) + @staticmethod + def get_input(prompt: str = "") -> str: + """Get interactive input from stdin safely. + + Args: + prompt: Prompt text to display + + Returns: + Input string or empty string on cancellation + """ + try: + return input(prompt).strip() + except (EOFError, KeyboardInterrupt): + return "" + def detect_content_type(content: str) -> str: """Detect the type of content from stdin. diff --git a/docs/PREDICTIVE_ERROR_PREVENTION.md b/docs/PREDICTIVE_ERROR_PREVENTION.md new file mode 100644 index 000000000..a866be12c --- /dev/null +++ b/docs/PREDICTIVE_ERROR_PREVENTION.md @@ -0,0 +1,98 @@ +# Predictive Error Prevention System + +The Predictive Error Prevention System in Cortex Linux analyzes installation requests before they are executed to identify potential risks and suggest preventive actions. This system acts as a safety layer to prevent partial or broken installations. + +## Features + +- **Static Compatibility Checks**: Instant validation of kernel version, RAM, and disk space against package requirements. +- **Historical Failure Analysis**: Automatically detects if a package has failed previously on your specific hardware. +- **AI-Powered Risk Assessment**: Leverage LLMs (Claude, GPT, Ollama) to analyze complex shell commands for underlying risks. +- **Proactive Recommendations**: Provides actionable steps (e.g., "Update kernel first") before a failure occurs. +- **Visual Risk Dashboard**: A color-coded interface built into the CLI for high-visibility warnings. + +## How it Works + +When you run `cortex install`, the system performs a multi-layer analysis: + +1. **System Context**: Detects hardware (CPU, RAM, Storage) and OS (Kernel version). +2. **Command Analysis**: Extracts package names and intent from the planned shell commands. +3. **Multi-Stage Risk Scoring**: + - **Static**: Rule-based checks (e.g., "Does CUDA work on this kernel?"). + - **Historical**: Database lookup of previous installation attempts. + - **AI/LLM**: Advanced heuristic analysis (if a provider is configured). + +## AI Analysis Deep Dive + +The system uses a sophisticated prompting strategy to perform its analysis. Here is exactly how the AI "thinks" through an installation: + +1. **Context Injection**: The system sends a detailed "System Snapshot" to the AI, including: + - Your exact **Kernel Version** and **Distro**. + - Your available **RAM** and **GPU** models. + - The **specific shell commands** Cortex plans to run. + - Any risks already found by our static/historical scanners. + +2. **Expert Persona**: The AI is instructed to act as a **Linux System Expert** specializing in error debugging. + +3. **Heuristic Check**: The AI evaluates the commands against its training data of known Linux issues (e.g., "Will this specific driver version work with Kernel 6.14?"). + +4. **Structured Output**: The AI returns a JSON report containing standardized risk levels, human-readable reasons, and actionable recommendations. + +This combination of real-time system data and AI reasoning allows Cortex to catch complex failures that simple static rules would miss. + +## Example Scenarios + +### 1. Incompatible Hardware/Kernel +If you try to install software that requires a specific kernel version (like modern CUDA) on an old system: + +```text +ℹ️ Potential issues detected: Low Risk + +Low Risk: + - CUDA installation detected on kernel 6.14.0-35-generic. + +Recommendation: + 1. Ensure official NVIDIA drivers are installed before proceeding +``` + +### 2. High-Risk Repeated Failure +If a package has failed multiple times, the system escalates the risk level: + +```text +⚠️ Potential issues detected: + +High Risk: + - This software (or related components) failed 3 times in previous attempts. + +Recommendation: + 1. Check system logs for dependency conflicts. + 2. Verify package name is correct for your distribution. + +Predicted Error Messages: + ! E: Package 'nvidia-384' has no installation candidate +``` + +## AI Usage Statement + +This feature was developed with heavy assistance from the **Antigravity AI (Google DeepMind)**. +- **Generated Logic**: The core `PredictiveErrorManager` logic and history pattern matching were AI-suggested. +- **Test Generation**: Unit tests were designed using AI to reach 87% coverage. +- **Documentation**: This documentation was refined by the AI to match the final implementation style. + +## Testing + +To verify the system manually: +1. Ensure you have the project environment setup: `pip install -e .` +2. Run unit tests: `pytest tests/unit/test_predictive_prevention.py` +3. **Verify with Real AI**: + This tests the full pipeline including LLM-based risk assessment. + ```bash + # 1. Ensure your API key is set + export ANTHROPIC_API_KEY=sk-ant-... + + # 2. Run an installation that might trigger warnings (e.g., CUDA on non-NVIDIA system) + cortex install "cuda-toolkit" --dry-run + ``` + +## Development + +Developers can add new static rules in `cortex/predictive_prevention.py` within the `_check_static_compatibility` method. diff --git a/docs/guides/Developer-Guide.md b/docs/guides/Developer-Guide.md index 4a06cead5..a123da332 100644 --- a/docs/guides/Developer-Guide.md +++ b/docs/guides/Developer-Guide.md @@ -34,7 +34,8 @@ cortex/ │ ├── rollback.py # Rollback system │ ├── config_templates.py # Config generation │ ├── logging_system.py # Logging & diagnostics -│ └── context_memory.py # AI memory system +│ ├── context_memory.py # AI memory system +│ └── predictive_prevention.py # Pre-install risk analysis ├── tests/ │ └── test_*.py # Unit tests ├── docs/ @@ -55,6 +56,8 @@ Package Manager Wrapper (apt/yum/dnf) ↓ Dependency Resolver ↓ +Predictive Error Prevention (Risk Analysis) + ↓ Sandbox Executor (Firejail) ↓ Installation Verifier diff --git a/docs/guides/User-Guide.md b/docs/guides/User-Guide.md index ceb5f26b0..fe08ac957 100644 --- a/docs/guides/User-Guide.md +++ b/docs/guides/User-Guide.md @@ -40,6 +40,15 @@ cortex simulate "install oracle 23 ai" # Shows: disk space, dependencies, estimated time ``` +### Predictive Error Prevention + +Cortex automatically analyzes installation requests for potential risks (kernel mismatch, low RAM, disk space) before execution. If a risk is detected, you will see a warning panel and be asked for confirmation. + +```bash +# Example warning for risky hardware/software combo +cortex install "nvidia-cuda-latest" +``` + ### Progress & Notifications ```bash # Installation with progress diff --git a/tests/cli_test_base.py b/tests/cli_test_base.py new file mode 100644 index 000000000..21e2b0973 --- /dev/null +++ b/tests/cli_test_base.py @@ -0,0 +1,54 @@ +import tempfile +import unittest +from pathlib import Path +from unittest.mock import Mock + +from cortex.cli import CortexCLI +from cortex.predictive_prevention import RiskLevel + + +class CLITestBase(unittest.TestCase): + """Base class for CLI tests to share common setup and mock helpers.""" + + def setUp(self) -> None: + self.cli = CortexCLI() + # Use a temp dir for cache isolation + self._temp_dir = tempfile.TemporaryDirectory() + self._temp_home = Path(self._temp_dir.name) + + def tearDown(self) -> None: + self._temp_dir.cleanup() + + def _setup_predictive_mock(self, mock_predictive_class: Mock) -> Mock: + """Helper to configure PredictiveErrorManager mock with default safe response.""" + mock_predictive = Mock() + mock_prediction = Mock() + mock_prediction.risk_level = RiskLevel.NONE + mock_predictive.analyze_installation.return_value = mock_prediction + mock_predictive_class.return_value = mock_predictive + return mock_predictive + + def _setup_interpreter_mock( + self, mock_interpreter_class: Mock, commands: list[str] | None = None + ) -> Mock: + """Helper to setup CommandInterpreter mock.""" + if commands is None: + commands = ["apt update", "apt install docker"] + mock_interpreter = Mock() + mock_interpreter.parse.return_value = commands + mock_interpreter_class.return_value = mock_interpreter + return mock_interpreter + + def _setup_coordinator_mock( + self, mock_coordinator_class: Mock, success: bool = True, error_message: str | None = None + ) -> Mock: + """Helper to setup InstallationCoordinator mock.""" + mock_coordinator = Mock() + mock_result = Mock() + mock_result.success = success + mock_result.total_duration = 1.5 + mock_result.failed_step = 0 + mock_result.error_message = error_message + mock_coordinator.execute.return_value = mock_result + mock_coordinator_class.return_value = mock_coordinator + return mock_coordinator diff --git a/tests/installer/test_parallel_install.py b/tests/installer/test_parallel_install.py index 4b89d8f5d..2e55627aa 100644 --- a/tests/installer/test_parallel_install.py +++ b/tests/installer/test_parallel_install.py @@ -1,6 +1,7 @@ """Tests for parallel installation execution.""" import asyncio +import sys import time import pytest @@ -17,9 +18,9 @@ def test_parallel_runs_faster_than_sequential(self): async def run_test(): # Create 3 independent commands using Python's time.sleep (Windows-compatible) commands = [ - "python -c \"import time; time.sleep(0.1); print('Task 1')\"", - "python -c \"import time; time.sleep(0.1); print('Task 2')\"", - "python -c \"import time; time.sleep(0.1); print('Task 3')\"", + f'"{sys.executable}" -c "import time; time.sleep(0.1); print(\'Task 1\')"', + f'"{sys.executable}" -c "import time; time.sleep(0.1); print(\'Task 2\')"', + f'"{sys.executable}" -c "import time; time.sleep(0.1); print(\'Task 3\')"', ] # Run in parallel @@ -42,9 +43,9 @@ def test_dependency_order_respected(self): async def run_test(): commands = [ - "python -c \"print('Task 1')\"", - "python -c \"print('Task 2')\"", - "python -c \"print('Task 3')\"", + f'"{sys.executable}" -c "print(\'Task 1\')"', + f'"{sys.executable}" -c "print(\'Task 2\')"', + f'"{sys.executable}" -c "print(\'Task 3\')"', ] # Task 1 has no dependencies @@ -70,9 +71,9 @@ def test_failure_blocks_dependent_tasks(self): async def run_test(): commands = [ - 'python -c "exit(1)"', # Task 1 fails - "python -c \"print('Task 2')\"", # Task 2 depends on Task 1 - "python -c \"print('Task 3')\"", # Task 3 is independent + f'"{sys.executable}" -c "exit(1)"', # Task 1 fails + f'"{sys.executable}" -c "print(\'Task 2\')"', # Task 2 depends on Task 1 + f'"{sys.executable}" -c "print(\'Task 3\')"', # Task 3 is independent ] # Task 2 depends on Task 1 @@ -98,10 +99,10 @@ def test_all_independent_tasks_run(self): async def run_test(): commands = [ - "python -c \"print('Task 1')\"", - "python -c \"print('Task 2')\"", - "python -c \"print('Task 3')\"", - "python -c \"print('Task 4')\"", + f'"{sys.executable}" -c "print(\'Task 1\')"', + f'"{sys.executable}" -c "print(\'Task 2\')"', + f'"{sys.executable}" -c "print(\'Task 3\')"', + f'"{sys.executable}" -c "print(\'Task 4\')"', ] # All tasks are independent (no dependencies) @@ -121,7 +122,10 @@ def test_descriptions_match_tasks(self): """Verify that descriptions are properly assigned to tasks.""" async def run_test(): - commands = ["python -c \"print('Task 1')\"", "python -c \"print('Task 2')\""] + commands = [ + f'"{sys.executable}" -c "print(\'Task 1\')"', + f'"{sys.executable}" -c "print(\'Task 2\')"', + ] descriptions = ["Install package A", "Start service B"] success, tasks = await run_parallel_install( @@ -138,7 +142,10 @@ def test_invalid_description_count_raises_error(self): """Verify that mismatched description count raises ValueError.""" async def run_test(): - commands = ["python -c \"print('Task 1')\"", "python -c \"print('Task 2')\""] + commands = [ + f'"{sys.executable}" -c "print(\'Task 1\')"', + f'"{sys.executable}" -c "print(\'Task 2\')"', + ] descriptions = ["Only one description"] # Mismatch with pytest.raises(ValueError): @@ -151,7 +158,7 @@ def test_command_timeout(self): async def run_test(): commands = [ - 'python -c "import time; time.sleep(5)"', # This will timeout with 1 second limit + f'"{sys.executable}" -c "import time; time.sleep(5)"', # This will timeout with 1 second limit ] success, tasks = await run_parallel_install(commands, timeout=1) @@ -177,7 +184,7 @@ def test_task_status_tracking(self): """Verify that task status is properly tracked.""" async def run_test(): - commands = ["python -c \"print('Success')\""] + commands = [f'"{sys.executable}" -c "print(\'Success\')"'] success, tasks = await run_parallel_install(commands, timeout=10) @@ -197,9 +204,9 @@ def test_sequential_mode_unchanged(self): async def run_test(): commands = [ - "python -c \"print('Step 1')\"", - "python -c \"print('Step 2')\"", - "python -c \"print('Step 3')\"", + f'"{sys.executable}" -c "print(\'Step 1\')"', + f'"{sys.executable}" -c "print(\'Step 2\')"', + f'"{sys.executable}" -c "print(\'Step 3\')"', ] descriptions = ["Step 1", "Step 2", "Step 3"] @@ -218,7 +225,7 @@ def test_log_callback_called(self): """Verify that log callback is invoked during execution.""" async def run_test(): - commands = ["python -c \"print('Test')\""] + commands = [f'"{sys.executable}" -c "print(\'Test\')"'] log_messages = [] def log_callback(message: str, level: str = "info"): @@ -247,10 +254,10 @@ def test_diamond_dependency_graph(self): async def run_test(): commands = [ - "python -c \"print('Base')\"", # Task 1 - "python -c \"print('Branch A')\"", # Task 2 - "python -c \"print('Branch B')\"", # Task 3 - "python -c \"print('Final')\"", # Task 4 + f'"{sys.executable}" -c "print(\'Base\')"', # Task 1 + f'"{sys.executable}" -c "print(\'Branch A\')"', # Task 2 + f'"{sys.executable}" -c "print(\'Branch B\')"', # Task 3 + f'"{sys.executable}" -c "print(\'Final\')"', # Task 4 ] # Task 2 and 3 depend on Task 1 @@ -276,9 +283,9 @@ def test_mixed_success_and_independent_failure(self): async def run_test(): commands = [ - 'python -c "exit(1)"', # Task 1 fails - "python -c \"print('OK')\"", # Task 2 independent - "python -c \"print('OK')\"", # Task 3 independent + f'"{sys.executable}" -c "exit(1)"', # Task 1 fails + f'"{sys.executable}" -c "print(\'OK\')"', # Task 2 independent + f'"{sys.executable}" -c "print(\'OK\')"', # Task 3 independent ] dependencies = {0: [], 1: [], 2: []} diff --git a/tests/test_cli.py b/tests/test_cli.py index 274b79f79..08d341075 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -8,17 +8,10 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from cortex.cli import CortexCLI, main +from tests.cli_test_base import CLITestBase -class TestCortexCLI(unittest.TestCase): - def setUp(self): - self.cli = CortexCLI() - # Use a temp dir for cache isolation - self._temp_dir = tempfile.TemporaryDirectory() - self._temp_home = Path(self._temp_dir.name) - - def tearDown(self): - self._temp_dir.cleanup() +class TestCortexCLI(CLITestBase): def test_get_api_key_openai(self): with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True): @@ -83,11 +76,11 @@ def test_print_success(self, mock_stdout): @patch.dict(os.environ, {}, clear=True) @patch("cortex.cli.CommandInterpreter") - def test_install_no_api_key(self, mock_interpreter_class): + @patch("cortex.cli.PredictiveErrorManager") + def test_install_no_api_key(self, mock_predictive_class, mock_interpreter_class): # Should work with Ollama (no API key needed) - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["apt update", "apt install docker"] - mock_interpreter_class.return_value = mock_interpreter + self._setup_interpreter_mock(mock_interpreter_class) + self._setup_predictive_mock(mock_predictive_class) result = self.cli.install("docker") # Should succeed with Ollama as fallback provider @@ -95,10 +88,10 @@ def test_install_no_api_key(self, mock_interpreter_class): @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") - def test_install_dry_run(self, mock_interpreter_class): - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["apt update", "apt install docker"] - mock_interpreter_class.return_value = mock_interpreter + @patch("cortex.cli.PredictiveErrorManager") + def test_install_dry_run(self, mock_predictive_class, mock_interpreter_class): + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) + self._setup_predictive_mock(mock_predictive_class) result = self.cli.install("docker", dry_run=True) @@ -107,10 +100,10 @@ def test_install_dry_run(self, mock_interpreter_class): @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") - def test_install_no_execute(self, mock_interpreter_class): - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["apt update", "apt install docker"] - mock_interpreter_class.return_value = mock_interpreter + @patch("cortex.cli.PredictiveErrorManager") + def test_install_no_execute(self, mock_predictive_class, mock_interpreter_class): + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) + self._setup_predictive_mock(mock_predictive_class) result = self.cli.install("docker", execute=False) @@ -120,17 +113,13 @@ def test_install_no_execute(self, mock_interpreter_class): @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") @patch("cortex.cli.InstallationCoordinator") - def test_install_with_execute_success(self, mock_coordinator_class, mock_interpreter_class): - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["echo test"] - mock_interpreter_class.return_value = mock_interpreter - - mock_coordinator = Mock() - mock_result = Mock() - mock_result.success = True - mock_result.total_duration = 1.5 - mock_coordinator.execute.return_value = mock_result - mock_coordinator_class.return_value = mock_coordinator + @patch("cortex.cli.PredictiveErrorManager") + def test_install_with_execute_success( + self, mock_predictive_class, mock_coordinator_class, mock_interpreter_class + ): + self._setup_interpreter_mock(mock_interpreter_class, commands=["echo test"]) + self._setup_predictive_mock(mock_predictive_class) + mock_coordinator = self._setup_coordinator_mock(mock_coordinator_class, success=True) result = self.cli.install("docker", execute=True) @@ -140,18 +129,15 @@ def test_install_with_execute_success(self, mock_coordinator_class, mock_interpr @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") @patch("cortex.cli.InstallationCoordinator") - def test_install_with_execute_failure(self, mock_coordinator_class, mock_interpreter_class): - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["invalid command"] - mock_interpreter_class.return_value = mock_interpreter - - mock_coordinator = Mock() - mock_result = Mock() - mock_result.success = False - mock_result.failed_step = 0 - mock_result.error_message = "command not found" - mock_coordinator.execute.return_value = mock_result - mock_coordinator_class.return_value = mock_coordinator + @patch("cortex.cli.PredictiveErrorManager") + def test_install_with_execute_failure( + self, mock_predictive_class, mock_coordinator_class, mock_interpreter_class + ): + self._setup_interpreter_mock(mock_interpreter_class, commands=["invalid command"]) + self._setup_predictive_mock(mock_predictive_class) + self._setup_coordinator_mock( + mock_coordinator_class, success=False, error_message="command not found" + ) result = self.cli.install("docker", execute=True) @@ -160,9 +146,7 @@ def test_install_with_execute_failure(self, mock_coordinator_class, mock_interpr @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") def test_install_no_commands_generated(self, mock_interpreter_class): - mock_interpreter = Mock() - mock_interpreter.parse.return_value = [] - mock_interpreter_class.return_value = mock_interpreter + self._setup_interpreter_mock(mock_interpreter_class, commands=[]) result = self.cli.install("docker") @@ -171,9 +155,8 @@ def test_install_no_commands_generated(self, mock_interpreter_class): @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") def test_install_value_error(self, mock_interpreter_class): - mock_interpreter = Mock() + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) mock_interpreter.parse.side_effect = ValueError("Invalid input") - mock_interpreter_class.return_value = mock_interpreter result = self.cli.install("docker") @@ -182,9 +165,8 @@ def test_install_value_error(self, mock_interpreter_class): @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") def test_install_runtime_error(self, mock_interpreter_class): - mock_interpreter = Mock() + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) mock_interpreter.parse.side_effect = RuntimeError("API failed") - mock_interpreter_class.return_value = mock_interpreter result = self.cli.install("docker") @@ -193,9 +175,8 @@ def test_install_runtime_error(self, mock_interpreter_class): @patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-openai-key-123"}, clear=True) @patch("cortex.cli.CommandInterpreter") def test_install_unexpected_error(self, mock_interpreter_class): - mock_interpreter = Mock() + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) mock_interpreter.parse.side_effect = Exception("Unexpected") - mock_interpreter_class.return_value = mock_interpreter result = self.cli.install("docker") @@ -213,7 +194,9 @@ def test_main_install_command(self, mock_install): mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", execute=False, dry_run=False, parallel=False, json_output=False + ) @patch("sys.argv", ["cortex", "install", "docker", "--execute"]) @patch("cortex.cli.CortexCLI.install") @@ -221,7 +204,9 @@ def test_main_install_with_execute(self, mock_install): mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=True, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", execute=True, dry_run=False, parallel=False, json_output=False + ) @patch("sys.argv", ["cortex", "install", "docker", "--dry-run"]) @patch("cortex.cli.CortexCLI.install") @@ -229,7 +214,9 @@ def test_main_install_with_dry_run(self, mock_install): mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=True, parallel=False) + mock_install.assert_called_once_with( + "docker", execute=False, dry_run=True, parallel=False, json_output=False + ) def test_spinner_animation(self): initial_idx = self.cli.spinner_idx diff --git a/tests/test_cli_extended.py b/tests/test_cli_extended.py index 263343079..af2f8826d 100644 --- a/tests/test_cli_extended.py +++ b/tests/test_cli_extended.py @@ -14,20 +14,12 @@ sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from cortex.cli import CortexCLI, main +from tests.cli_test_base import CLITestBase -class TestCortexCLIExtended(unittest.TestCase): +class TestCortexCLIExtended(CLITestBase): """Extended unit tests covering CLI behaviours with thorough mocking.""" - def setUp(self) -> None: - self.cli = CortexCLI() - # Use a temp dir for cache isolation - self._temp_dir = tempfile.TemporaryDirectory() - self._temp_home = Path(self._temp_dir.name) - - def tearDown(self): - self._temp_dir.cleanup() - def test_get_api_key_openai(self) -> None: with patch.dict(os.environ, {"OPENAI_API_KEY": "sk-test-key"}, clear=True): with patch("pathlib.Path.home", return_value=self._temp_home): @@ -107,18 +99,19 @@ def test_install_no_api_key(self, _mock_get_api_key) -> None: @patch.object(CortexCLI, "_get_api_key", return_value="sk-test-key") @patch.object(CortexCLI, "_animate_spinner", return_value=None) @patch.object(CortexCLI, "_clear_line", return_value=None) + @patch("cortex.cli.PredictiveErrorManager") @patch("cortex.cli.CommandInterpreter") def test_install_dry_run( self, mock_interpreter_class, + mock_predictive_class, _mock_clear_line, _mock_spinner, _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["apt update", "apt install docker"] - mock_interpreter_class.return_value = mock_interpreter + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) + self._setup_predictive_mock(mock_predictive_class) result = self.cli.install("docker", dry_run=True) @@ -129,18 +122,19 @@ def test_install_dry_run( @patch.object(CortexCLI, "_get_api_key", return_value="sk-test-key") @patch.object(CortexCLI, "_animate_spinner", return_value=None) @patch.object(CortexCLI, "_clear_line", return_value=None) + @patch("cortex.cli.PredictiveErrorManager") @patch("cortex.cli.CommandInterpreter") def test_install_no_execute( self, mock_interpreter_class, + mock_predictive_class, _mock_clear_line, _mock_spinner, _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["apt update", "apt install docker"] - mock_interpreter_class.return_value = mock_interpreter + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) + self._setup_predictive_mock(mock_predictive_class) result = self.cli.install("docker", execute=False) @@ -151,27 +145,22 @@ def test_install_no_execute( @patch.object(CortexCLI, "_get_api_key", return_value="sk-test-key") @patch.object(CortexCLI, "_animate_spinner", return_value=None) @patch.object(CortexCLI, "_clear_line", return_value=None) + @patch("cortex.cli.PredictiveErrorManager") @patch("cortex.cli.CommandInterpreter") @patch("cortex.cli.InstallationCoordinator") def test_install_with_execute_success( self, mock_coordinator_class, mock_interpreter_class, + mock_predictive_class, _mock_clear_line, _mock_spinner, _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["echo test"] - mock_interpreter_class.return_value = mock_interpreter - - mock_coordinator = Mock() - mock_result = Mock() - mock_result.success = True - mock_result.total_duration = 1.5 - mock_coordinator.execute.return_value = mock_result - mock_coordinator_class.return_value = mock_coordinator + self._setup_interpreter_mock(mock_interpreter_class, commands=["echo test"]) + self._setup_predictive_mock(mock_predictive_class) + mock_coordinator = self._setup_coordinator_mock(mock_coordinator_class, success=True) result = self.cli.install("docker", execute=True) @@ -182,28 +171,24 @@ def test_install_with_execute_success( @patch.object(CortexCLI, "_get_api_key", return_value="sk-test-key") @patch.object(CortexCLI, "_animate_spinner", return_value=None) @patch.object(CortexCLI, "_clear_line", return_value=None) + @patch("cortex.cli.PredictiveErrorManager") @patch("cortex.cli.CommandInterpreter") @patch("cortex.cli.InstallationCoordinator") def test_install_with_execute_failure( self, mock_coordinator_class, mock_interpreter_class, + mock_predictive_class, _mock_clear_line, _mock_spinner, _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() - mock_interpreter.parse.return_value = ["invalid command"] - mock_interpreter_class.return_value = mock_interpreter - - mock_coordinator = Mock() - mock_result = Mock() - mock_result.success = False - mock_result.failed_step = 0 - mock_result.error_message = "command not found" - mock_coordinator.execute.return_value = mock_result - mock_coordinator_class.return_value = mock_coordinator + self._setup_interpreter_mock(mock_interpreter_class, commands=["invalid command"]) + self._setup_predictive_mock(mock_predictive_class) + self._setup_coordinator_mock( + mock_coordinator_class, success=False, error_message="command not found" + ) result = self.cli.install("docker", execute=True) @@ -222,9 +207,7 @@ def test_install_no_commands_generated( _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() - mock_interpreter.parse.return_value = [] - mock_interpreter_class.return_value = mock_interpreter + self._setup_interpreter_mock(mock_interpreter_class, commands=[]) result = self.cli.install("docker") @@ -243,9 +226,8 @@ def test_install_value_error( _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) mock_interpreter.parse.side_effect = ValueError("Invalid input") - mock_interpreter_class.return_value = mock_interpreter result = self.cli.install("docker") @@ -264,9 +246,8 @@ def test_install_runtime_error( _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) mock_interpreter.parse.side_effect = RuntimeError("API failed") - mock_interpreter_class.return_value = mock_interpreter result = self.cli.install("docker") @@ -285,9 +266,8 @@ def test_install_unexpected_error( _mock_get_api_key, _mock_get_provider, ) -> None: - mock_interpreter = Mock() + mock_interpreter = self._setup_interpreter_mock(mock_interpreter_class) mock_interpreter.parse.side_effect = Exception("Unexpected") - mock_interpreter_class.return_value = mock_interpreter result = self.cli.install("docker") @@ -304,7 +284,9 @@ def test_main_install_command(self, mock_install) -> None: mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", execute=False, dry_run=False, parallel=False, json_output=False + ) @patch("sys.argv", ["cortex", "install", "docker", "--execute"]) @patch("cortex.cli.CortexCLI.install") @@ -312,7 +294,9 @@ def test_main_install_with_execute(self, mock_install) -> None: mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=True, dry_run=False, parallel=False) + mock_install.assert_called_once_with( + "docker", execute=True, dry_run=False, parallel=False, json_output=False + ) @patch("sys.argv", ["cortex", "install", "docker", "--dry-run"]) @patch("cortex.cli.CortexCLI.install") @@ -320,7 +304,9 @@ def test_main_install_with_dry_run(self, mock_install) -> None: mock_install.return_value = 0 result = main() self.assertEqual(result, 0) - mock_install.assert_called_once_with("docker", execute=False, dry_run=True, parallel=False) + mock_install.assert_called_once_with( + "docker", execute=False, dry_run=True, parallel=False, json_output=False + ) def test_spinner_animation(self) -> None: initial_idx = self.cli.spinner_idx diff --git a/tests/test_llm_router.py b/tests/test_llm_router.py index 31f2c0eb0..799d1940f 100644 --- a/tests/test_llm_router.py +++ b/tests/test_llm_router.py @@ -114,8 +114,29 @@ def test_fallback_to_claude_when_kimi_unavailable(self): decision = router.route_task(TaskType.SYSTEM_OPERATION) self.assertEqual(decision.provider, LLMProvider.CLAUDE) + @patch("cortex.llm_router.OpenAI") + @patch("cortex.llm_router.AsyncOpenAI") + @patch.dict(os.environ, {}, clear=True) + def test_fallback_to_ollama(self, mock_async_openai, mock_openai): + """Should fallback to Ollama if primary providers unavailable.""" + router = LLMRouter( + claude_api_key=None, + kimi_api_key=None, + ollama_base_url="http://localhost:11434", + enable_fallback=True, + ) + + # User chat normally Claude -> fallback to Ollama + decision = router.route_task(TaskType.USER_CHAT) + self.assertEqual(decision.provider, LLMProvider.OLLAMA) + + # System ops normally Kimi -> fallback to Ollama + decision = router.route_task(TaskType.SYSTEM_OPERATION) + self.assertEqual(decision.provider, LLMProvider.OLLAMA) + + @patch("cortex.llm_router.OpenAI", side_effect=ImportError) @patch.dict(os.environ, {}, clear=True) - def test_error_when_no_providers_available(self): + def test_error_when_no_providers_available(self, mock_openai): """Should raise error if no providers configured.""" router = LLMRouter(claude_api_key=None, kimi_api_key=None, enable_fallback=True) diff --git a/tests/unit/test_predictive_prevention.py b/tests/unit/test_predictive_prevention.py new file mode 100755 index 000000000..d07acf826 --- /dev/null +++ b/tests/unit/test_predictive_prevention.py @@ -0,0 +1,196 @@ +import json +import unittest +from unittest.mock import MagicMock, patch + +from cortex.hardware_detection import CPUInfo, MemoryInfo, StorageInfo, SystemInfo +from cortex.installation_history import InstallationRecord, InstallationStatus, InstallationType +from cortex.predictive_prevention import PredictiveErrorManager, RiskLevel + + +@patch("cortex.hardware_detection.HardwareDetector.detect") +@patch("cortex.installation_history.InstallationHistory.get_history") +@patch("cortex.llm_router.LLMRouter.complete") +class TestPredictiveErrorManager(unittest.TestCase): + def setUp(self) -> None: + # Use 'fake' provider by default to ensure no real network calls + self.manager = PredictiveErrorManager(api_key="fake-key", provider="fake") + + def _get_mock_system( + self, kernel: str = "6.0.0", ram_mb: int = 16384, disk_gb: float = 50.0 + ) -> SystemInfo: + """Helper to create a SystemInfo mock object.""" + return SystemInfo( + kernel_version=kernel, + memory=MemoryInfo(total_mb=ram_mb), + storage=[StorageInfo(mount_point="/", available_gb=disk_gb, total_gb=100.0)], + ) + + def _setup_mocks( + self, + mock_llm: MagicMock, + mock_history: MagicMock, + mock_detect: MagicMock, + system: SystemInfo | None = None, + history: list[InstallationRecord] | None = None, + llm_content: str | None = None, + ) -> MagicMock: + """Setup common mocks with default or specified values.""" + mock_detect.return_value = system or self._get_mock_system() + mock_history.return_value = history if history is not None else [] + + if llm_content is None: + llm_content = '{"risk_level": "none", "reasons": [], "recommendations": [], "predicted_errors": []}' + + mock_llm_response = MagicMock() + mock_llm_response.content = llm_content + mock_llm.return_value = mock_llm_response + return mock_llm + + def test_analyze_installation_high_risk(self, mock_llm, mock_history, mock_detect): + # Temporarily enable LLM for this test + self.manager.provider = "ollama" + + # Setup mock system info + # Note: disk_gb=5.0 so it's not CRITICAL from static check initially + system = self._get_mock_system(kernel="4.15.0-generic", ram_mb=2048, disk_gb=5.0) + llm_content = '{"risk_level": "high", "reasons": ["LLM Reason"], "recommendations": ["LLM Rec"], "predicted_errors": ["LLM Error"]}' + self._setup_mocks( + mock_llm, mock_history, mock_detect, system=system, llm_content=llm_content + ) + + prediction = self.manager.analyze_installation("cuda-12.0", ["sudo apt install cuda-12.0"]) + + # Risk should be HIGH because LLM returned high and static check (kernel < 5.4) is HIGH + self.assertEqual(prediction.risk_level, RiskLevel.HIGH) + self.assertTrue(any("LLM Reason" in r for r in prediction.reasons)) + self.assertTrue(any("Kernel version" in r for r in prediction.reasons)) + + def test_static_compatibility_check(self, mock_llm, mock_history, mock_detect): + # Mock LLM to return neutral result so only static checks apply + system = self._get_mock_system(disk_gb=0.5) + self._setup_mocks(mock_llm, mock_history, mock_detect, system=system) + + prediction = self.manager.analyze_installation("nginx", ["sudo apt install nginx"]) + + self.assertEqual(prediction.risk_level, RiskLevel.CRITICAL) + self.assertTrue(any("disk space" in r.lower() for r in prediction.reasons)) + + def test_cuda_newer_kernel_risk(self, mock_llm, mock_history, mock_detect): + """Test the specific risk warning for CUDA on newer kernels.""" + system = self._get_mock_system(kernel="6.5.0", disk_gb=50.0) + self._setup_mocks(mock_llm, mock_history, mock_detect, system=system) + + prediction = self.manager.analyze_installation("cuda-12-4", ["apt install cuda"]) + + self.assertEqual(prediction.risk_level, RiskLevel.LOW) + self.assertTrue(any("driver-kernel mismatch" in r.lower() for r in prediction.reasons)) + self.assertTrue(any("perfectly synchronized" in r.lower() for r in prediction.reasons)) + + def test_history_pattern_failure(self, mock_llm, mock_history, mock_detect): + # Setup history with failure + match_record = InstallationRecord( + id="1", + timestamp="now", + operation_type=InstallationType.INSTALL, + packages=["docker.io"], + status=InstallationStatus.FAILED, + before_snapshot=[], + after_snapshot=[], + commands_executed=[], + error_message="Connection timeout", + ) + self._setup_mocks(mock_llm, mock_history, mock_detect, history=[match_record]) + + prediction = self.manager.analyze_installation("docker", ["sudo apt install docker.io"]) + + # RiskLevel.MEDIUM for historical failure + self.assertEqual(prediction.risk_level, RiskLevel.MEDIUM) + self.assertTrue(any("failed 1 times" in r for r in prediction.reasons)) + + def test_llm_malformed_json_fallback(self, mock_llm, mock_history, mock_detect): + # Enable LLM for this test + self.manager.provider = "ollama" + + # Mock LLM with non-JSON content starting with "Risk:" to trigger fallback + self._setup_mocks( + mock_llm, mock_history, mock_detect, llm_content="Risk: Malformed text response" + ) + + prediction = self.manager.analyze_installation("nginx", ["apt install nginx"]) + self.assertTrue(any("LLM detected risks" in r for r in prediction.reasons)) + + def test_critical_risk_finalization(self, mock_llm, mock_history, mock_detect): + self._setup_mocks(mock_llm, mock_history, mock_detect) + + prediction = self.manager.analyze_installation("test", ["test"]) + # Finalization should escalate to CRITICAL based on keyword + prediction.reasons.append("This is a CRITICAL deficiency") + self.manager._finalize_risk_level(prediction) + self.assertEqual(prediction.risk_level, RiskLevel.CRITICAL) + + def test_redact_commands(self, mock_llm, mock_history, mock_detect): + commands = [ + "login --password my-secret-pass", + "curl -H 'X-Auth-Token: redacted-token'", + "export API_KEY=12345-abcde", + "docker login --token=98765", + ] + redacted = self.manager.redact_commands(commands) + + self.assertIn("--password ", redacted[0]) + self.assertIn("API_KEY=", redacted[2]) + self.assertIn("--token=", redacted[3]) + # Note: Header tokens aren't redacted yet; keep values non-secret-like + # to avoid triggering secret scanners. + + def test_ubuntu_kernel_parsing(self, mock_llm, mock_history, mock_detect): + """Test parsing of typical Ubuntu kernel version strings.""" + # Setup mock system with Ubuntu-style kernel string + system = self._get_mock_system(kernel="6.5.0-generic-ubuntu", disk_gb=50.0) + self._setup_mocks(mock_llm, mock_history, mock_detect, system=system) + + prediction = self.manager.analyze_installation("cuda", ["apt install cuda"]) + + # Should be recognized as modern kernel (>= 5.4) -> LOW risk warning about sync + # Note: analyze_installation calls _check_static_compatibility internally + self.assertEqual(prediction.risk_level, RiskLevel.LOW) + self.assertTrue(any("driver-kernel mismatch" in r.lower() for r in prediction.reasons)) + + def test_unknown_kernel_parsing(self, mock_llm, mock_history, mock_detect): + """Test parsing of 'unknown' kernel version string.""" + system = self._get_mock_system(kernel="unknown", disk_gb=50.0) + self._setup_mocks(mock_llm, mock_history, mock_detect, system=system) + + prediction = self.manager.analyze_installation("cuda", ["apt install cuda"]) + + # Should gracefully ignore and not fail, risk level shouldn't be raised by kernel check + # It might remain NONE or LOW depending on other checks, but definitely not HIGH/CRITICAL due to parsing + self.assertNotEqual(prediction.risk_level, RiskLevel.CRITICAL) + self.assertNotEqual(prediction.risk_level, RiskLevel.HIGH) + + def test_malformed_kernel_parsing(self, mock_llm, mock_history, mock_detect): + """Test parsing of malformed kernel version strings.""" + edge_cases = ["", "None", "kernel-6.5", "v6.5"] + + for k in edge_cases: + system = self._get_mock_system(kernel=k, disk_gb=50.0) + self._setup_mocks(mock_llm, mock_history, mock_detect, system=system) + + prediction = self.manager.analyze_installation("cuda", ["apt install cuda"]) + # Should not crash + self.assertIsNotNone(prediction) + + def test_very_old_ubuntu_kernel(self, mock_llm, mock_history, mock_detect): + """Test parsing of old Ubuntu kernels.""" + system = self._get_mock_system(kernel="4.15.0-101-generic", disk_gb=50.0) + self._setup_mocks(mock_llm, mock_history, mock_detect, system=system) + + prediction = self.manager.analyze_installation("cuda", ["apt install cuda"]) + + # Should be HIGH risk (major < 5) + self.assertEqual(prediction.risk_level, RiskLevel.HIGH) + self.assertTrue(any("too old" in r.lower() for r in prediction.reasons)) + + +if __name__ == "__main__": + unittest.main()