diff --git a/ai/ai_simulations.py b/ai/ai_simulations.py index eec3740..c2cdacb 100644 --- a/ai/ai_simulations.py +++ b/ai/ai_simulations.py @@ -1,4 +1,3 @@ - import random class OffensiveSimulation: @@ -12,8 +11,11 @@ def __init__(self): ] def simulate_attack(self): - scenario = random.choice(self.scenarios) - print(f"[SIMULATION] Executing simulated attack: {scenario}") + try: + scenario = random.choice(self.scenarios) + print(f"[SIMULATION] Executing simulated attack: {scenario}") + except Exception as e: + print(f"Error during simulation: {e}") if __name__ == "__main__": simulation = OffensiveSimulation() diff --git a/ansi-styles/index.js b/ansi-styles/index.js index 5d82581..9566b1c 100644 --- a/ansi-styles/index.js +++ b/ansi-styles/index.js @@ -161,3 +161,28 @@ Object.defineProperty(module, 'exports', { enumerable: true, get: assembleStyles }); + +// Unit tests for ANSI escape code functions +const assert = require('assert'); + +const styles = assembleStyles(); + +// Test color functions +assert.strictEqual(styles.color.red.open, '\u001B[31m'); +assert.strictEqual(styles.color.red.close, '\u001B[39m'); +assert.strictEqual(styles.color.green.open, '\u001B[32m'); +assert.strictEqual(styles.color.green.close, '\u001B[39m'); + +// Test background color functions +assert.strictEqual(styles.bgColor.bgRed.open, '\u001B[41m'); +assert.strictEqual(styles.bgColor.bgRed.close, '\u001B[49m'); +assert.strictEqual(styles.bgColor.bgGreen.open, '\u001B[42m'); +assert.strictEqual(styles.bgColor.bgGreen.close, '\u001B[49m'); + +// Test modifier functions +assert.strictEqual(styles.modifier.bold.open, '\u001B[1m'); +assert.strictEqual(styles.modifier.bold.close, '\u001B[22m'); +assert.strictEqual(styles.modifier.italic.open, '\u001B[3m'); +assert.strictEqual(styles.modifier.italic.close, '\u001B[23m'); + +console.log('All tests passed!'); diff --git a/app.py b/app.py index c685a38..a0b137f 100644 --- a/app.py +++ b/app.py @@ -255,9 +255,12 @@ async def process_inputs(class_names: List[str], image_url: str): # Add real-time threat data analysis using the ThreatIntelligence module async def analyze_threat_data(): - threat_data = await advanced_threat_intelligence.get_threat_intelligence() - analyzed_data = advanced_threat_intelligence.process_data(threat_data) - return analyzed_data + try: + threat_data = await advanced_threat_intelligence.get_threat_intelligence() + analyzed_data = advanced_threat_intelligence.process_data(threat_data) + return analyzed_data + except Exception as e: + logging.error(f"Error analyzing threat data: {e}") # Update the RealTimeThreatIntelligence initialization to include the ThreatIntelligence module threat_intelligence_module = RealTimeThreatIntelligence(api_key="YOUR_API_KEY") @@ -265,10 +268,13 @@ async def analyze_threat_data(): # Add real-time threat data monitoring using the ThreatIntelligence module async def monitor_threat_data(): - threat_data = await advanced_threat_intelligence.get_threat_intelligence() - for threat in threat_data: - if threat["severity"] > 0.8: - monitoring.trigger_alert(threat) + try: + threat_data = await advanced_threat_intelligence.get_threat_intelligence() + for threat in threat_data: + if threat["severity"] > 0.8: + monitoring.trigger_alert(threat) + except Exception as e: + logging.error(f"Error monitoring threat data: {e}") # Integrate the AutomatedIncidentResponse module with RealTimeMonitoring monitoring.automated_incident_response = automated_incident_response diff --git a/app_security/app_vulnerability_scanner.py b/app_security/app_vulnerability_scanner.py index 11e68c7..ae2ad93 100644 --- a/app_security/app_vulnerability_scanner.py +++ b/app_security/app_vulnerability_scanner.py @@ -9,44 +9,47 @@ def scan_application(app_url): print(f"Scanning application for vulnerabilities: {app_url}") - session = SessionLocal() try: - response = requests.get(app_url) - response.raise_for_status() - vulnerabilities = {"vulnerabilities_found": 2, "critical_issues": ["SQL Injection", "XSS"]} - - # Save scan results to the database - scan_result = DocumentAnalysis( - source=app_url, - title="Vulnerability Scan", - links=str(vulnerabilities["critical_issues"]), - error=None - ) - session.add(scan_result) - session.commit() - return vulnerabilities - except requests.exceptions.HTTPError as http_err: - print(f"HTTP error occurred: {http_err}") - scan_result = DocumentAnalysis( - source=app_url, - title="Vulnerability Scan", - links=None, - error=str(http_err) - ) - session.add(scan_result) - session.commit() - except Exception as err: - print(f"Other error occurred: {err}") - scan_result = DocumentAnalysis( - source=app_url, - title="Vulnerability Scan", - links=None, - error=str(err) - ) - session.add(scan_result) - session.commit() - finally: - session.close() + session = SessionLocal() + try: + response = requests.get(app_url) + response.raise_for_status() + vulnerabilities = {"vulnerabilities_found": 2, "critical_issues": ["SQL Injection", "XSS"]} + + # Save scan results to the database + scan_result = DocumentAnalysis( + source=app_url, + title="Vulnerability Scan", + links=str(vulnerabilities["critical_issues"]), + error=None + ) + session.add(scan_result) + session.commit() + return vulnerabilities + except requests.exceptions.HTTPError as http_err: + print(f"HTTP error occurred: {http_err}") + scan_result = DocumentAnalysis( + source=app_url, + title="Vulnerability Scan", + links=None, + error=str(http_err) + ) + session.add(scan_result) + session.commit() + except Exception as err: + print(f"Other error occurred: {err}") + scan_result = DocumentAnalysis( + source=app_url, + title="Vulnerability Scan", + links=None, + error=str(err) + ) + session.add(scan_result) + session.commit() + finally: + session.close() + except Exception as db_err: + print(f"Database connection error: {db_err}") return {"vulnerabilities_found": 0, "critical_issues": []} if __name__ == "__main__": diff --git a/archive/archive_analyzer.py b/archive/archive_analyzer.py index abc1304..fcebe0a 100644 --- a/archive/archive_analyzer.py +++ b/archive/archive_analyzer.py @@ -3,6 +3,9 @@ from archive.archive_parser import parse_sources def analyze_sources(): - sources = parse_sources() - # Perform analysis on sources - print(sources) \ No newline at end of file + try: + sources = parse_sources() + # Perform analysis on sources + print(sources) + except Exception as e: + print(f"Error during source analysis: {e}") diff --git a/archive/archive_parser.py b/archive/archive_parser.py index addd537..780a82d 100644 --- a/archive/archive_parser.py +++ b/archive/archive_parser.py @@ -2,7 +2,11 @@ import sys def parse_sources(): - # Parse sources from archive - sources = [] - # ... - return sources \ No newline at end of file + try: + # Parse sources from archive + sources = [] + # ... + return sources + except Exception as e: + print(f"Error during source parsing: {e}") + return [] diff --git a/atp/atp_integration.py b/atp/atp_integration.py index b17f759..30f7651 100644 --- a/atp/atp_integration.py +++ b/atp/atp_integration.py @@ -1,7 +1,10 @@ - def atp_threat_mitigation(threat_id): - print(f"Mitigating threat: {threat_id}") - return {"threat_id": threat_id, "status": "Mitigated"} + try: + print(f"Mitigating threat: {threat_id}") + return {"threat_id": threat_id, "status": "Mitigated"} + except Exception as e: + print(f"Error during threat mitigation: {e}") + return {"threat_id": threat_id, "status": "Error"} if __name__ == "__main__": result = atp_threat_mitigation("THREAT-12345") diff --git a/backend/ai_chat.py b/backend/ai_chat.py index 5236e1c..29f67b3 100644 --- a/backend/ai_chat.py +++ b/backend/ai_chat.py @@ -1,4 +1,3 @@ - import openai import requests @@ -9,21 +8,33 @@ def __init__(self, openai_key, huggingface_key, anthropic_key): self.anthropic_key = anthropic_key def openai_chat(self, prompt): - openai.api_key = self.openai_key - response = openai.Completion.create(engine="text-davinci-003", prompt=prompt, max_tokens=100) - return response.choices[0].text.strip() + try: + openai.api_key = self.openai_key + response = openai.Completion.create(engine="text-davinci-003", prompt=prompt, max_tokens=100) + return response.choices[0].text.strip() + except Exception as e: + print(f"Error during OpenAI chat: {e}") + return "" def huggingface_chat(self, prompt): - url = "https://api-inference.huggingface.co/models/facebook/blenderbot-400M-distill" - headers = {"Authorization": f"Bearer {self.huggingface_key}"} - response = requests.post(url, json={"inputs": prompt}, headers=headers) - return response.json().get("generated_text", "") + try: + url = "https://api-inference.huggingface.co/models/facebook/blenderbot-400M-distill" + headers = {"Authorization": f"Bearer {self.huggingface_key}"} + response = requests.post(url, json={"inputs": prompt}, headers=headers) + return response.json().get("generated_text", "") + except Exception as e: + print(f"Error during HuggingFace chat: {e}") + return "" def anthropic_chat(self, prompt): - url = "https://api.anthropic.com/v1/completion" - headers = {"Authorization": f"Bearer {self.anthropic_key}"} - response = requests.post(url, json={"prompt": prompt, "model": "claude-v1"}) - return response.json().get("output", "") + try: + url = "https://api.anthropic.com/v1/completion" + headers = {"Authorization": f"Bearer {self.anthropic_key}"} + response = requests.post(url, json={"prompt": prompt, "model": "claude-v1"}) + return response.json().get("output", "") + except Exception as e: + print(f"Error during Anthropic chat: {e}") + return "" if __name__ == "__main__": chat = MultiAIChat("openai_key", "huggingface_key", "anthropic_key") diff --git a/backend/code_parser.py b/backend/code_parser.py index 0693fbc..4556e20 100644 --- a/backend/code_parser.py +++ b/backend/code_parser.py @@ -9,6 +9,8 @@ class CodeParser: def __init__(self, code): + if not code.strip(): + raise ValueError("Input code cannot be empty") self.tree = ast.parse(code) def find_functions(self): diff --git a/c2_dashboard.py b/c2_dashboard.py index 952187c..23663ff 100644 --- a/c2_dashboard.py +++ b/c2_dashboard.py @@ -112,5 +112,8 @@ def save_dashboard_to_db(self, source, title, links, error): if __name__ == "__main__": dashboard = C2Dashboard() - dashboard.save_dashboard_to_db("c2_dashboard.py", "C2 Dashboard", "[]", None) - print("Dashboard saved to database.") + try: + dashboard.save_dashboard_to_db("c2_dashboard.py", "C2 Dashboard", "[]", None) + print("Dashboard saved to database.") + except Exception as e: + print(f"Error during dashboard operation: {e}") diff --git a/chatbot/app.py b/chatbot/app.py index f5f6a5e..9ee764a 100644 --- a/chatbot/app.py +++ b/chatbot/app.py @@ -40,15 +40,23 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def scan_network(): - # Placeholder function for scanning network - devices = ["Device1", "Device2", "Device3"] - return devices + try: + # Placeholder function for scanning network + devices = ["Device1", "Device2", "Device3"] + return devices + except Exception as e: + print(f"Error during network scanning: {e}") + return [] def deploy_exploit(target): - # Placeholder function for deploying exploit - if target in ["Device1", "Device2", "Device3"]: - return "Exploit deployed successfully!" - return "Exploit deployment failed." + try: + # Placeholder function for deploying exploit + if target in ["Device1", "Device2", "Device3"]: + return "Exploit deployed successfully!" + return "Exploit deployment failed." + except Exception as e: + print(f"Error during exploit deployment: {e}") + return "Exploit deployment failed." def save_scan_results_to_db(source, title, links, error): session = SessionLocal() diff --git a/chatbot/chatbot.py b/chatbot/chatbot.py index f548751..0997d9f 100644 --- a/chatbot/chatbot.py +++ b/chatbot/chatbot.py @@ -56,48 +56,56 @@ def get_response(user_input): def handle_vulnerability_scanning(): """Handle network scanning and vulnerability assessment.""" - devices = scan_network() - vulnerabilities = assess_vulnerabilities(devices) - - # Save scan results to the database - session = SessionLocal() try: - scan_result = DocumentAnalysis( - source="network_scan", - title="Network Scan Results", - links=str(vulnerabilities), - error=None - ) - session.add(scan_result) - session.commit() + devices = scan_network() + vulnerabilities = assess_vulnerabilities(devices) + + # Save scan results to the database + session = SessionLocal() + try: + scan_result = DocumentAnalysis( + source="network_scan", + title="Network Scan Results", + links=str(vulnerabilities), + error=None + ) + session.add(scan_result) + session.commit() + except Exception as e: + print(f"Error saving scan results to database: {e}") + finally: + session.close() + + return vulnerabilities except Exception as e: - print(f"Error saving scan results to database: {e}") - finally: - session.close() - - return vulnerabilities + print(f"Error during vulnerability scanning: {e}") + return [] def handle_exploit_deployment(target): """Handle the deployment of exploits.""" - result = deploy_exploit(target) - - # Save exploit deployment results to the database - session = SessionLocal() try: - exploit_result = DocumentAnalysis( - source="exploit_deployment", - title="Exploit Deployment Results", - links=target, - error=None if result else "Exploit deployment failed" - ) - session.add(exploit_result) - session.commit() + result = deploy_exploit(target) + + # Save exploit deployment results to the database + session = SessionLocal() + try: + exploit_result = DocumentAnalysis( + source="exploit_deployment", + title="Exploit Deployment Results", + links=target, + error=None if result else "Exploit deployment failed" + ) + session.add(exploit_result) + session.commit() + except Exception as e: + print(f"Error saving exploit deployment results to database: {e}") + finally: + session.close() + + return "Exploit deployed successfully!" if result else "Exploit deployment failed." except Exception as e: - print(f"Error saving exploit deployment results to database: {e}") - finally: - session.close() - - return "Exploit deployed successfully!" if result else "Exploit deployment failed." + print(f"Error during exploit deployment: {e}") + return "Exploit deployment failed." def chat(): """Main chat function to interact with users."""