From 727a5cf3c504159db42d14280291b155dbae48bf Mon Sep 17 00:00:00 2001 From: PROJECT ZERO <56379955+ProjectZeroDays@users.noreply.github.com> Date: Mon, 20 Jan 2025 04:31:38 -0600 Subject: [PATCH] Fix error handling in various modules Add error handling and unit tests to various modules to improve robustness and reliability. * **ai/ai_simulations.py**: Add try-except block in `simulate_attack` method to handle errors. * **ansi-styles/index.js**: Add unit tests for ANSI escape code functions. * **app_security/app_vulnerability_scanner.py**: Add try-except block for database connection in `scan_application` function. * **app.py**: Add try-except block for API requests in main function. * **archive/archive_analyzer.py**: Add try-except block for file parsing in `analyze_sources` function. * **archive/archive_parser.py**: Add try-except block for file parsing in `parse_sources` function. * **backend/ai_chat.py**: Add try-except block for API requests in `openai_chat`, `huggingface_chat`, and `anthropic_chat` methods. * **backend/code_parser.py**: Add input code validation in `__init__` method of `CodeParser` class. * **c2_dashboard.py**: Add try-except block for database operations in `save_dashboard_to_db` method. * **chatbot/app.py**: Add try-except block for network scanning and exploit deployment functions. * **chatbot/chatbot.py**: Add try-except block for network scanning and exploit deployment functions. * **atp/atp_integration.py**: Add try-except block for `atp_threat_mitigation` function. --- For more details, open the [Copilot Workspace session](https://copilot-workspace.githubnext.com/ProjectZeroDays/Project-Red-Sword/tree/Your-Momma-Beeotch?shareId=XXXX-XXXX-XXXX-XXXX). --- ai/ai_simulations.py | 8 ++- ansi-styles/index.js | 25 ++++++++ app.py | 20 ++++-- app_security/app_vulnerability_scanner.py | 77 +++++++++++----------- archive/archive_analyzer.py | 9 ++- archive/archive_parser.py | 12 ++-- atp/atp_integration.py | 9 ++- backend/ai_chat.py | 35 ++++++---- backend/code_parser.py | 2 + c2_dashboard.py | 7 +- chatbot/app.py | 22 +++++-- chatbot/chatbot.py | 78 +++++++++++++---------- 12 files changed, 191 insertions(+), 113 deletions(-) diff --git a/ai/ai_simulations.py b/ai/ai_simulations.py index eec3740..c2cdacb 100644 --- a/ai/ai_simulations.py +++ b/ai/ai_simulations.py @@ -1,4 +1,3 @@ - import random class OffensiveSimulation: @@ -12,8 +11,11 @@ def __init__(self): ] def simulate_attack(self): - scenario = random.choice(self.scenarios) - print(f"[SIMULATION] Executing simulated attack: {scenario}") + try: + scenario = random.choice(self.scenarios) + print(f"[SIMULATION] Executing simulated attack: {scenario}") + except Exception as e: + print(f"Error during simulation: {e}") if __name__ == "__main__": simulation = OffensiveSimulation() diff --git a/ansi-styles/index.js b/ansi-styles/index.js index 5d82581..9566b1c 100644 --- a/ansi-styles/index.js +++ b/ansi-styles/index.js @@ -161,3 +161,28 @@ Object.defineProperty(module, 'exports', { enumerable: true, get: assembleStyles }); + +// Unit tests for ANSI escape code functions +const assert = require('assert'); + +const styles = assembleStyles(); + +// Test color functions +assert.strictEqual(styles.color.red.open, '\u001B[31m'); +assert.strictEqual(styles.color.red.close, '\u001B[39m'); +assert.strictEqual(styles.color.green.open, '\u001B[32m'); +assert.strictEqual(styles.color.green.close, '\u001B[39m'); + +// Test background color functions +assert.strictEqual(styles.bgColor.bgRed.open, '\u001B[41m'); +assert.strictEqual(styles.bgColor.bgRed.close, '\u001B[49m'); +assert.strictEqual(styles.bgColor.bgGreen.open, '\u001B[42m'); +assert.strictEqual(styles.bgColor.bgGreen.close, '\u001B[49m'); + +// Test modifier functions +assert.strictEqual(styles.modifier.bold.open, '\u001B[1m'); +assert.strictEqual(styles.modifier.bold.close, '\u001B[22m'); +assert.strictEqual(styles.modifier.italic.open, '\u001B[3m'); +assert.strictEqual(styles.modifier.italic.close, '\u001B[23m'); + +console.log('All tests passed!'); diff --git a/app.py b/app.py index c685a38..a0b137f 100644 --- a/app.py +++ b/app.py @@ -255,9 +255,12 @@ async def process_inputs(class_names: List[str], image_url: str): # Add real-time threat data analysis using the ThreatIntelligence module async def analyze_threat_data(): - threat_data = await advanced_threat_intelligence.get_threat_intelligence() - analyzed_data = advanced_threat_intelligence.process_data(threat_data) - return analyzed_data + try: + threat_data = await advanced_threat_intelligence.get_threat_intelligence() + analyzed_data = advanced_threat_intelligence.process_data(threat_data) + return analyzed_data + except Exception as e: + logging.error(f"Error analyzing threat data: {e}") # Update the RealTimeThreatIntelligence initialization to include the ThreatIntelligence module threat_intelligence_module = RealTimeThreatIntelligence(api_key="YOUR_API_KEY") @@ -265,10 +268,13 @@ async def analyze_threat_data(): # Add real-time threat data monitoring using the ThreatIntelligence module async def monitor_threat_data(): - threat_data = await advanced_threat_intelligence.get_threat_intelligence() - for threat in threat_data: - if threat["severity"] > 0.8: - monitoring.trigger_alert(threat) + try: + threat_data = await advanced_threat_intelligence.get_threat_intelligence() + for threat in threat_data: + if threat["severity"] > 0.8: + monitoring.trigger_alert(threat) + except Exception as e: + logging.error(f"Error monitoring threat data: {e}") # Integrate the AutomatedIncidentResponse module with RealTimeMonitoring monitoring.automated_incident_response = automated_incident_response diff --git a/app_security/app_vulnerability_scanner.py b/app_security/app_vulnerability_scanner.py index 11e68c7..ae2ad93 100644 --- a/app_security/app_vulnerability_scanner.py +++ b/app_security/app_vulnerability_scanner.py @@ -9,44 +9,47 @@ def scan_application(app_url): print(f"Scanning application for vulnerabilities: {app_url}") - session = SessionLocal() try: - response = requests.get(app_url) - response.raise_for_status() - vulnerabilities = {"vulnerabilities_found": 2, "critical_issues": ["SQL Injection", "XSS"]} - - # Save scan results to the database - scan_result = DocumentAnalysis( - source=app_url, - title="Vulnerability Scan", - links=str(vulnerabilities["critical_issues"]), - error=None - ) - session.add(scan_result) - session.commit() - return vulnerabilities - except requests.exceptions.HTTPError as http_err: - print(f"HTTP error occurred: {http_err}") - scan_result = DocumentAnalysis( - source=app_url, - title="Vulnerability Scan", - links=None, - error=str(http_err) - ) - session.add(scan_result) - session.commit() - except Exception as err: - print(f"Other error occurred: {err}") - scan_result = DocumentAnalysis( - source=app_url, - title="Vulnerability Scan", - links=None, - error=str(err) - ) - session.add(scan_result) - session.commit() - finally: - session.close() + session = SessionLocal() + try: + response = requests.get(app_url) + response.raise_for_status() + vulnerabilities = {"vulnerabilities_found": 2, "critical_issues": ["SQL Injection", "XSS"]} + + # Save scan results to the database + scan_result = DocumentAnalysis( + source=app_url, + title="Vulnerability Scan", + links=str(vulnerabilities["critical_issues"]), + error=None + ) + session.add(scan_result) + session.commit() + return vulnerabilities + except requests.exceptions.HTTPError as http_err: + print(f"HTTP error occurred: {http_err}") + scan_result = DocumentAnalysis( + source=app_url, + title="Vulnerability Scan", + links=None, + error=str(http_err) + ) + session.add(scan_result) + session.commit() + except Exception as err: + print(f"Other error occurred: {err}") + scan_result = DocumentAnalysis( + source=app_url, + title="Vulnerability Scan", + links=None, + error=str(err) + ) + session.add(scan_result) + session.commit() + finally: + session.close() + except Exception as db_err: + print(f"Database connection error: {db_err}") return {"vulnerabilities_found": 0, "critical_issues": []} if __name__ == "__main__": diff --git a/archive/archive_analyzer.py b/archive/archive_analyzer.py index abc1304..fcebe0a 100644 --- a/archive/archive_analyzer.py +++ b/archive/archive_analyzer.py @@ -3,6 +3,9 @@ from archive.archive_parser import parse_sources def analyze_sources(): - sources = parse_sources() - # Perform analysis on sources - print(sources) \ No newline at end of file + try: + sources = parse_sources() + # Perform analysis on sources + print(sources) + except Exception as e: + print(f"Error during source analysis: {e}") diff --git a/archive/archive_parser.py b/archive/archive_parser.py index addd537..780a82d 100644 --- a/archive/archive_parser.py +++ b/archive/archive_parser.py @@ -2,7 +2,11 @@ import sys def parse_sources(): - # Parse sources from archive - sources = [] - # ... - return sources \ No newline at end of file + try: + # Parse sources from archive + sources = [] + # ... + return sources + except Exception as e: + print(f"Error during source parsing: {e}") + return [] diff --git a/atp/atp_integration.py b/atp/atp_integration.py index b17f759..30f7651 100644 --- a/atp/atp_integration.py +++ b/atp/atp_integration.py @@ -1,7 +1,10 @@ - def atp_threat_mitigation(threat_id): - print(f"Mitigating threat: {threat_id}") - return {"threat_id": threat_id, "status": "Mitigated"} + try: + print(f"Mitigating threat: {threat_id}") + return {"threat_id": threat_id, "status": "Mitigated"} + except Exception as e: + print(f"Error during threat mitigation: {e}") + return {"threat_id": threat_id, "status": "Error"} if __name__ == "__main__": result = atp_threat_mitigation("THREAT-12345") diff --git a/backend/ai_chat.py b/backend/ai_chat.py index 5236e1c..29f67b3 100644 --- a/backend/ai_chat.py +++ b/backend/ai_chat.py @@ -1,4 +1,3 @@ - import openai import requests @@ -9,21 +8,33 @@ def __init__(self, openai_key, huggingface_key, anthropic_key): self.anthropic_key = anthropic_key def openai_chat(self, prompt): - openai.api_key = self.openai_key - response = openai.Completion.create(engine="text-davinci-003", prompt=prompt, max_tokens=100) - return response.choices[0].text.strip() + try: + openai.api_key = self.openai_key + response = openai.Completion.create(engine="text-davinci-003", prompt=prompt, max_tokens=100) + return response.choices[0].text.strip() + except Exception as e: + print(f"Error during OpenAI chat: {e}") + return "" def huggingface_chat(self, prompt): - url = "https://api-inference.huggingface.co/models/facebook/blenderbot-400M-distill" - headers = {"Authorization": f"Bearer {self.huggingface_key}"} - response = requests.post(url, json={"inputs": prompt}, headers=headers) - return response.json().get("generated_text", "") + try: + url = "https://api-inference.huggingface.co/models/facebook/blenderbot-400M-distill" + headers = {"Authorization": f"Bearer {self.huggingface_key}"} + response = requests.post(url, json={"inputs": prompt}, headers=headers) + return response.json().get("generated_text", "") + except Exception as e: + print(f"Error during HuggingFace chat: {e}") + return "" def anthropic_chat(self, prompt): - url = "https://api.anthropic.com/v1/completion" - headers = {"Authorization": f"Bearer {self.anthropic_key}"} - response = requests.post(url, json={"prompt": prompt, "model": "claude-v1"}) - return response.json().get("output", "") + try: + url = "https://api.anthropic.com/v1/completion" + headers = {"Authorization": f"Bearer {self.anthropic_key}"} + response = requests.post(url, json={"prompt": prompt, "model": "claude-v1"}) + return response.json().get("output", "") + except Exception as e: + print(f"Error during Anthropic chat: {e}") + return "" if __name__ == "__main__": chat = MultiAIChat("openai_key", "huggingface_key", "anthropic_key") diff --git a/backend/code_parser.py b/backend/code_parser.py index 0693fbc..4556e20 100644 --- a/backend/code_parser.py +++ b/backend/code_parser.py @@ -9,6 +9,8 @@ class CodeParser: def __init__(self, code): + if not code.strip(): + raise ValueError("Input code cannot be empty") self.tree = ast.parse(code) def find_functions(self): diff --git a/c2_dashboard.py b/c2_dashboard.py index 952187c..23663ff 100644 --- a/c2_dashboard.py +++ b/c2_dashboard.py @@ -112,5 +112,8 @@ def save_dashboard_to_db(self, source, title, links, error): if __name__ == "__main__": dashboard = C2Dashboard() - dashboard.save_dashboard_to_db("c2_dashboard.py", "C2 Dashboard", "[]", None) - print("Dashboard saved to database.") + try: + dashboard.save_dashboard_to_db("c2_dashboard.py", "C2 Dashboard", "[]", None) + print("Dashboard saved to database.") + except Exception as e: + print(f"Error during dashboard operation: {e}") diff --git a/chatbot/app.py b/chatbot/app.py index f5f6a5e..9ee764a 100644 --- a/chatbot/app.py +++ b/chatbot/app.py @@ -40,15 +40,23 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def scan_network(): - # Placeholder function for scanning network - devices = ["Device1", "Device2", "Device3"] - return devices + try: + # Placeholder function for scanning network + devices = ["Device1", "Device2", "Device3"] + return devices + except Exception as e: + print(f"Error during network scanning: {e}") + return [] def deploy_exploit(target): - # Placeholder function for deploying exploit - if target in ["Device1", "Device2", "Device3"]: - return "Exploit deployed successfully!" - return "Exploit deployment failed." + try: + # Placeholder function for deploying exploit + if target in ["Device1", "Device2", "Device3"]: + return "Exploit deployed successfully!" + return "Exploit deployment failed." + except Exception as e: + print(f"Error during exploit deployment: {e}") + return "Exploit deployment failed." def save_scan_results_to_db(source, title, links, error): session = SessionLocal() diff --git a/chatbot/chatbot.py b/chatbot/chatbot.py index f548751..0997d9f 100644 --- a/chatbot/chatbot.py +++ b/chatbot/chatbot.py @@ -56,48 +56,56 @@ def get_response(user_input): def handle_vulnerability_scanning(): """Handle network scanning and vulnerability assessment.""" - devices = scan_network() - vulnerabilities = assess_vulnerabilities(devices) - - # Save scan results to the database - session = SessionLocal() try: - scan_result = DocumentAnalysis( - source="network_scan", - title="Network Scan Results", - links=str(vulnerabilities), - error=None - ) - session.add(scan_result) - session.commit() + devices = scan_network() + vulnerabilities = assess_vulnerabilities(devices) + + # Save scan results to the database + session = SessionLocal() + try: + scan_result = DocumentAnalysis( + source="network_scan", + title="Network Scan Results", + links=str(vulnerabilities), + error=None + ) + session.add(scan_result) + session.commit() + except Exception as e: + print(f"Error saving scan results to database: {e}") + finally: + session.close() + + return vulnerabilities except Exception as e: - print(f"Error saving scan results to database: {e}") - finally: - session.close() - - return vulnerabilities + print(f"Error during vulnerability scanning: {e}") + return [] def handle_exploit_deployment(target): """Handle the deployment of exploits.""" - result = deploy_exploit(target) - - # Save exploit deployment results to the database - session = SessionLocal() try: - exploit_result = DocumentAnalysis( - source="exploit_deployment", - title="Exploit Deployment Results", - links=target, - error=None if result else "Exploit deployment failed" - ) - session.add(exploit_result) - session.commit() + result = deploy_exploit(target) + + # Save exploit deployment results to the database + session = SessionLocal() + try: + exploit_result = DocumentAnalysis( + source="exploit_deployment", + title="Exploit Deployment Results", + links=target, + error=None if result else "Exploit deployment failed" + ) + session.add(exploit_result) + session.commit() + except Exception as e: + print(f"Error saving exploit deployment results to database: {e}") + finally: + session.close() + + return "Exploit deployed successfully!" if result else "Exploit deployment failed." except Exception as e: - print(f"Error saving exploit deployment results to database: {e}") - finally: - session.close() - - return "Exploit deployed successfully!" if result else "Exploit deployment failed." + print(f"Error during exploit deployment: {e}") + return "Exploit deployment failed." def chat(): """Main chat function to interact with users."""