diff --git a/enrich_code_scanning_alerts.py b/enrich_code_scanning_alerts.py index 036da90..38f7735 100755 --- a/enrich_code_scanning_alerts.py +++ b/enrich_code_scanning_alerts.py @@ -129,6 +129,8 @@ def enrich_alerts(alerts: list, metadata: dict) -> None: def format_header(key: str) -> str: """Format the heading depending on its value.""" + output = "" + if key not in ["cwe", "language"]: output = PUNCTUATION_RE.sub(" ", key).title() elif key == "cwe": @@ -668,6 +670,8 @@ def main() -> None: enrich_alerts(alerts, metadata) fixup_alerts(alerts) + fields = [] + if args.format in ["html", "pdf"]: fields = ( [ diff --git a/estimate_push_protection_rate.py b/estimate_push_protection_rate.py new file mode 100755 index 0000000..e37b8f3 --- /dev/null +++ b/estimate_push_protection_rate.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +"""Estimate how many secrets would have been detected in a list of existing secret detections, and a list of which patterns have push protection now.""" + +import argparse +import json +from datetime import datetime, timezone + + +def add_args(parser: argparse.ArgumentParser) -> None: + """Add command line arguments to the parser.""" + parser.add_argument( + "secrets_file", + type=str, + help="Path to the file containing the list of secrets", + ) + parser.add_argument( + "patterns_file", + type=str, + help="Path to the file containing the list of patterns with push protection", + ) + + +def main() -> None: + """Command line entry point.""" + parser = argparse.ArgumentParser( + description="Estimate push protection rate for secrets" + ) + add_args(parser) + args = parser.parse_args() + + with open(args.patterns_file, "r") as f: + patterns: set = {line.strip() for line in f if line.strip()} + + with open(args.secrets_file, "r") as f: + secrets = json.load(f) + + total_secrets = len(secrets) + protected_secrets = [secret for secret in secrets if secret.get("secret_type") in patterns] + + print(f"Total secrets: {total_secrets}") + print(f"Protected secrets: {len(protected_secrets)}") + + if total_secrets > 0: + protection_rate = (len(protected_secrets) / total_secrets) * 100 + print(f"Estimated push protection rate: {protection_rate:.2f}%") + else: + print("No secrets found to evaluate.") + + # now evaluate how often we'd expect to block pushes, using the `first_commit_date` field + # that's in ISO format with a Z suffix + now = datetime.now(timezone.utc) + + # find the oldest blocked commit + earliest_blocked_commit_date = min([ + datetime.fromisoformat(secret["first_commit_date"].replace("Z", "+00:00")) + for secret in protected_secrets + ]) + + blocking_timespan = now - earliest_blocked_commit_date + rate = len(protected_secrets) / blocking_timespan.days if blocking_timespan.days > 0 else len(protected_secrets) + + print(f"Estimated secrets blocked per day since {earliest_blocked_commit_date.date()}: {rate:.2f}") + + +if __name__ == "__main__": + main() diff --git a/githubapi.py b/githubapi.py index 7de4811..2022aa2 100644 --- a/githubapi.py +++ b/githubapi.py @@ -26,6 +26,20 @@ ISO_NO_TZ_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$") VALID_NAME_RE = re.compile(r"^[A-Za-z0-9_.-]{1,39}$") +GENERIC_SECRET_TYPES = ",".join( + [ + "http_basic_authentication_header", + "http_bearer_authentication_header", + "mongodb_connection_string", + "mysql_connection_string", + "openssh_private_key", + "pgp_private_key", + "postgres_connection_string", + "rsa_private_key", + "password", # Copilot powered secret detection + ] +) + class RateLimited(Exception): """Rate limited exception.""" @@ -54,7 +68,7 @@ def __init__(self, token: str | None = None, hostname="github.com") -> None: self.hostname = hostname @classmethod - def check_name(self, name: str, scope: str) -> bool: + def check_name(cls, name: str, scope: str) -> bool: """Check the name is valid.""" # check repo slug has format or org/Enterprise name is valid if scope == "repo": @@ -112,7 +126,7 @@ def query( if paging is None: try: result = self._do(url, method, data=data) - yield result + yield result.json() except Exception as e: LOG.error("Error: %s", e) # show traceback without raising the exception @@ -161,6 +175,8 @@ def construct_api_url( path = api_path + scope_path + endpoint + query_params = {} + if paging is None: query_params = {} elif paging == "cursor": @@ -314,7 +330,7 @@ def paginate( break if progress: - pbar.update(1) + pbar.update(1) # type: ignore LOG.debug(data) @@ -413,9 +429,14 @@ def list_secret_scanning_alerts( since: datetime.datetime | None = None, scope: str = "org", bypassed: bool = False, + generic: bool = False, ) -> Generator[dict, None, None]: """List secret scanning alerts for a GitHub repository, organization or Enterprise.""" query = {"state": state} if state is not None else {} + + if generic: + query["secret_type"] = GENERIC_SECRET_TYPES + alerts = self.query( scope, name, diff --git a/list_code_scanning_alerts.py b/list_code_scanning_alerts.py index e911471..7fd3c4f 100755 --- a/list_code_scanning_alerts.py +++ b/list_code_scanning_alerts.py @@ -4,7 +4,6 @@ import sys import argparse -import re import logging import datetime import json diff --git a/list_secret_scanning_alerts.py b/list_secret_scanning_alerts.py index 6a49ba5..e33c8e2 100755 --- a/list_secret_scanning_alerts.py +++ b/list_secret_scanning_alerts.py @@ -4,7 +4,6 @@ import sys import argparse -import re import logging import datetime import json @@ -17,47 +16,71 @@ def make_result( - alert: dict, scope: str, name: str, include_secret: bool = True + g: GitHub, alert: dict, scope: str, name: str, include_secret: bool = True, include_locations: bool = False, include_commit: bool = False ) -> dict: - """Make an alert result from the raw data.""" - result = { - "created_at": alert["created_at"], - "push_protection_bypassed_by": ( - alert["push_protection_bypassed_by"]["login"] - if alert["push_protection_bypassed_by"] is not None - else None - ), - "push_protection_bypassed_at": alert["push_protection_bypassed_at"], - "repo": alert["repository"]["full_name"] if scope != "repo" and "repository" in alert else name, - "url": alert["html_url"], - "state": alert["state"], - "resolution": alert["resolution"], - "resolved_at": alert["resolved_at"], - "resolved_by": ( - alert["resolved_by"]["login"] if alert["resolved_by"] is not None else None - ), - "resolution_comment": alert["resolution_comment"], - "validity": alert["validity"], - "secret_type": alert["secret_type"], - "multi_repo": alert.get("multi_repo"), - "publicly_leaked": alert.get("publicly_leaked"), - "push_protection_bypass_request_reviewer": ( - alert["push_protection_bypass_request_reviewer"]["login"] - if alert.get("push_protection_bypass_request_reviewer") is not None - else None - ), - "push_protection_bypass_request_reviewer_comment": alert.get( - "push_protection_bypass_request_reviewer_comment" - ), - "push_protection_bypass_request_comment": alert.get( - "push_protection_bypass_request_comment" - ) - } + """Make a "flat" alert result from the raw alert data.""" + try: + result = { + "created_at": alert["created_at"], + "push_protection_bypassed_by": ( + alert["push_protection_bypassed_by"]["login"] + if alert["push_protection_bypassed_by"] is not None + else None + ), + "push_protection_bypassed_at": alert["push_protection_bypassed_at"], + "repo": alert["repository"]["full_name"] if scope != "repo" and "repository" in alert else name, + "url": alert["html_url"], + "state": alert["state"], + "resolution": alert["resolution"], + "resolved_at": alert["resolved_at"], + "resolved_by": ( + alert["resolved_by"]["login"] if alert["resolved_by"] is not None else None + ), + "resolution_comment": alert["resolution_comment"], + "validity": alert["validity"], + "secret_type": alert["secret_type"], + "multi_repo": alert.get("multi_repo"), + "publicly_leaked": alert.get("publicly_leaked"), + "push_protection_bypass_request_reviewer": ( + alert["push_protection_bypass_request_reviewer"]["login"] + if alert.get("push_protection_bypass_request_reviewer") is not None + else None + ), + "push_protection_bypass_request_reviewer_comment": alert.get( + "push_protection_bypass_request_reviewer_comment" + ), + "push_protection_bypass_request_comment": alert.get( + "push_protection_bypass_request_comment" + ), + } + + first_location = alert.get("first_location_detected") + if first_location is not None: + result["first_location"] = f"{first_location['path']}:{first_location['start_line']}:{first_location['start_column']}@{first_location.get('commit_sha', '')}" + + if include_commit: + # use decorated alert info, if it's there + if alert.get("commit") is not None: + commit_info = alert["commit"] + result["first_commit_date"] = commit_info["committer"]["date"] + result["first_commit_author"] = f"{commit_info["author"]["name"]} <{commit_info["author"]["email"]}>" - if include_secret: - result["secret"] = alert["secret"] + if include_locations: + # use decorated alert info, if it's there + locations = alert.get("locations") + if locations: + result["locations"] = ";".join([f"{loc['details']['path']}:{loc['details']['start_line']}:{loc['details']['start_column']}@{loc['details']['commit_sha']}" for loc in locations if loc.get("type") == "commit"]) - return result + if include_secret: + result["secret"] = alert["secret"] + + return result + except KeyboardInterrupt: + LOG.info("Stopped by user") + sys.exit(1) + except Exception as e: + LOG.error(f"Error processing alert: {e}") + return {} def to_list(result: dict) -> list[str|None]: @@ -75,6 +98,10 @@ def to_list(result: dict) -> list[str|None]: result["validity"], result["secret_type"], (result["secret"] if "secret" in result else None), + (result["first_location"] if "first_location" in result else None), + result["first_commit_date"] if "first_commit_date" in result else None, + result["first_commit_author"] if "first_commit_author" in result else None, + result["locations"] if "locations" in result else None, ] @@ -99,11 +126,53 @@ def output_csv(results: list[dict], quote_all: bool) -> None: "validity", "secret_type", "secret", + "first_location", + "first_commit_date", + "first_commit_author", + "locations", ] ) for result in results: - writer.writerow(to_list(result)) + try: + writer.writerow(to_list(result)) + except KeyboardInterrupt: + LOG.info("Stopped by user") + return + +def decorate_alerts(g: GitHub, alerts: Generator[dict[str, Any], None, None], include_locations: bool = False, include_commit: bool = False) -> Generator[dict, None, None]: + """Decorate alerts with additional information, for both the raw and make_result outputs. + + Resolve locations and commit information, if that was asked for. + """ + for alert in alerts: + first_location: Any | None = alert.get("first_location_detected", None) + + if include_locations: + if "has_more_locations" in alert and not alert["has_more_locations"]: + pass + else: + location_data = g._get(alert["locations_url"]).json() + if first_location is None and location_data[0]['type'] == 'commit': + first_location = location_data[0]['details'] + alert["locations"] = location_data + + if first_location is not None and "first_location_detected" not in alert: + alert["first_location_detected"] = first_location + + if include_commit: + if first_location is None: + # we *have* to get the location info, despite not having --include-locations set + location_data = g._get(alert["locations_url"]).json() + if location_data[0]['type'] == 'commit': + first_location = location_data[0]['details'] + if first_location is not None: + commit_url = first_location.get("commit_url") + if commit_url: + commit_info = g._get(commit_url).json() + alert["commit"] = commit_info + + yield alert def list_secret_scanning_alerts( @@ -113,18 +182,30 @@ def list_secret_scanning_alerts( state: str | None = None, since: datetime.datetime | None = None, include_secret: bool = False, + include_locations: bool = False, + include_commit: bool = False, bypassed: bool = False, raw: bool = False, + generic: bool = False, ) -> Generator[dict, None, None]: + """List secret scanning alerts for a repo/org/Enterprise using the GitHub API. + + Decorate the alerts with additional information, if requested. + + Output either the raw alert data, or flattened results. + """ g = GitHub(hostname=hostname) alerts = g.list_secret_scanning_alerts( name, state=state, since=since, scope=scope, bypassed=bypassed ) + + alerts = decorate_alerts(g, alerts, include_locations=include_locations, include_commit=include_commit) + if raw: return alerts else: results = ( - make_result(alert, scope, name, include_secret=include_secret) + make_result(g, alert, scope, name, include_secret=include_secret, include_locations=include_locations, include_commit=include_commit) for alert in alerts ) return results @@ -143,6 +224,12 @@ def add_args(parser: argparse.ArgumentParser) -> None: required=False, help="Scope of the query", ) + parser.add_argument( + "--generic", + "-g", + action="store_true", + help="Include generic secret types (not just high-confidence ones)", + ) parser.add_argument( "--bypassed", "-b", @@ -163,6 +250,18 @@ def add_args(parser: argparse.ArgumentParser) -> None: action="store_true", help="Do not include the secret in the output", ) + parser.add_argument( + "--include-locations", + "-l", + action="store_true", + help="Include locations in the output", + ) + parser.add_argument( + "--include-commit", + "-c", + action="store_true", + help="Include commit date and committer in the output", + ) parser.add_argument( "--since", "-S", @@ -211,7 +310,10 @@ def main() -> None: state = args.state hostname = args.hostname include_secret = not args.no_include_secret + include_locations = args.include_locations + include_commit = args.include_commit bypassed = args.bypassed + generic = args.generic if not GitHub.check_name(name, scope): raise ValueError("Invalid name: %s for %s", name, scope) @@ -223,10 +325,15 @@ def main() -> None: state=state, since=since, include_secret=include_secret, + include_locations=include_locations, + include_commit=include_commit, bypassed=bypassed, raw=args.raw, + generic=generic, ) + LOG.debug(results) + if args.json: print(json.dumps(list(results), indent=2)) else: