Skip to content

Commit e98e68c

Browse files
committed
Resolving duplicate alerts - matching a closed alert status from a legacy detection to a new alert with a matching secret
1 parent 49776e9 commit e98e68c

File tree

2 files changed

+273
-0
lines changed

2 files changed

+273
-0
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
#!/usr/bin/env python3
2+
3+
"""Resolve duplicate secret scanning alerts for a GitHub repository, organization or Enterprise."""
4+
5+
import sys
6+
import argparse
7+
import re
8+
import logging
9+
import datetime
10+
import json
11+
from typing import Generator, List, Tuple, Iterable
12+
from collections import defaultdict
13+
from defusedcsv import csv # type: ignore
14+
from githubapi import GitHub, parse_date
15+
from list_secret_scanning_alerts import list_secret_scanning_alerts
16+
17+
18+
LOG = logging.getLogger(__name__)
19+
20+
# Hardcoded list of matching secrets
21+
MATCHING_SECRETS = [
22+
("google_cloud_private_key_id", "google_cloud_service_account_credentials"),
23+
]
24+
25+
def index_results_by_secret(results: Iterable[dict]) -> dict:
26+
"""Index results by secret and type for easy lookup."""
27+
28+
indexed_results: dict = {}
29+
30+
for result in results:
31+
repo = result["repo"]
32+
secret_type = result["secret_type"]
33+
secret = result["secret"]
34+
35+
# parse out just the private_key_id for matching on google_cloud_service_account_credentials
36+
if secret_type == "google_cloud_service_account_credentials":
37+
secret = json.loads(secret)["private_key_id"]
38+
39+
indexed_results[repo] = (
40+
{} if repo not in indexed_results else indexed_results[repo]
41+
)
42+
indexed_results[repo][secret_type] = (
43+
{} if secret_type not in indexed_results[repo] else indexed_results[repo][secret_type]
44+
)
45+
indexed_results[repo][secret_type][secret] = result
46+
47+
return indexed_results
48+
49+
50+
def change_state(hostname, old_result: dict, new_result: dict) -> None:
51+
"""Change the state of the alert to match the existing result using the GitHub API to update the alert."""
52+
g = GitHub(hostname=hostname)
53+
54+
repo_name = new_result["repo"]
55+
56+
if old_result["repo"] != repo_name:
57+
LOG.error(f"Repo mismatch: {old_result['repo']} != {repo_name}")
58+
return
59+
60+
state_update = {
61+
"state": old_result["state"],
62+
"resolution": old_result["resolution"],
63+
"resolution_comment": old_result["resolution_comment"],
64+
}
65+
66+
alert_number = new_result["url"].split("/")[-1]
67+
68+
LOG.debug(f"Changing state of alert {repo_name}/{alert_number} to {state_update}")
69+
70+
g.query_once(
71+
"repo",
72+
repo_name,
73+
f"/secret-scanning/alerts/{alert_number}",
74+
data=state_update,
75+
method="PATCH",
76+
)
77+
78+
return
79+
80+
81+
def resolve_duplicates(
82+
indexed_results: dict, matching_secrets_lookup: dict, hostname: str
83+
) -> None:
84+
"""Resolve duplicates by matching on a new secret type and updating the state of the alert to match the existing result."""
85+
for repo, repo_results in indexed_results.items():
86+
LOG.debug(repo_results)
87+
88+
for old_secret_type, new_secret_type in matching_secrets_lookup.items():
89+
try:
90+
old_results = repo_results[old_secret_type]
91+
except KeyError:
92+
LOG.debug(f"No results found for secret type: {old_secret_type}")
93+
continue
94+
95+
for secret, old_result in old_results.items():
96+
try:
97+
new_result = repo_results[new_secret_type][secret]
98+
LOG.debug(f"Found matching alert: {new_result}")
99+
except KeyError:
100+
continue
101+
102+
if new_result["state"] != old_result["state"]:
103+
LOG.info(f"State mismatch, updating state: {new_result['state']} != {old_result['state']}")
104+
105+
if old_result["state"] != "pattern_edited":
106+
change_state(hostname, old_result, new_result)
107+
108+
109+
def add_args(parser: argparse.ArgumentParser) -> None:
110+
"""Add command-line arguments to the parser."""
111+
parser.add_argument(
112+
"name", type=str, help="Name of the repo/org/Enterprise to query"
113+
)
114+
parser.add_argument(
115+
"--scope",
116+
type=str,
117+
default="org",
118+
choices=["ent", "org", "repo"],
119+
required=False,
120+
help="Scope of the query",
121+
)
122+
parser.add_argument(
123+
"--state",
124+
"-s",
125+
type=str,
126+
choices=["open", "resolved"],
127+
required=False,
128+
help="State of the alerts to query",
129+
)
130+
parser.add_argument(
131+
"--since",
132+
"-S",
133+
type=str,
134+
required=False,
135+
help="Only show alerts created after this date/time - ISO 8601 format, e.g. 2024-10-08 or 2024-10-08T12:00; or Nd format, e.g. 7d for 7 days ago",
136+
)
137+
parser.add_argument(
138+
"--hostname",
139+
type=str,
140+
default="github.com",
141+
required=False,
142+
help="GitHub Enterprise hostname (defaults to github.com)",
143+
)
144+
parser.add_argument(
145+
"--debug", "-d", action="store_true", help="Enable debug logging"
146+
)
147+
parser.add_argument(
148+
"--add-matching-secret",
149+
"-a",
150+
action="append",
151+
nargs=2,
152+
metavar=("OLD_TYPE", "NEW_TYPE"),
153+
help="Add a new pair of matched secret types",
154+
)
155+
156+
157+
def main() -> None:
158+
"""CLI entrypoint."""
159+
parser = argparse.ArgumentParser(description=__doc__)
160+
add_args(parser)
161+
args = parser.parse_args()
162+
163+
logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
164+
165+
since = parse_date(args.since)
166+
167+
LOG.debug("Since: %s (%s) [%s]", since, args.since, type(since))
168+
169+
scope = "repo" if "/" in args.name and args.scope != "repo" else args.scope
170+
name = args.name
171+
state = args.state
172+
hostname = args.hostname
173+
174+
if not GitHub.check_name(args.name, scope):
175+
raise ValueError("Invalid name: %s for %s", args.name, scope)
176+
177+
# Update matching secrets with CLI arguments
178+
matching_secrets = MATCHING_SECRETS.copy()
179+
if args.add_matching_secret:
180+
matching_secrets.extend(args.add_matching_secret)
181+
182+
# now make lookup
183+
matching_secrets_lookup = {k: v for k, v in matching_secrets}
184+
185+
# find secret scanning alerts
186+
results = list_secret_scanning_alerts(name, scope, hostname, state=state, since=since)
187+
188+
# index results by secret and type for easy lookup
189+
indexed_results = index_results_by_secret(results)
190+
191+
LOG.debug(indexed_results)
192+
193+
resolve_duplicates(indexed_results, matching_secrets_lookup, hostname)
194+
195+
196+
if __name__ == "__main__":
197+
main()
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import pytest
2+
from unittest.mock import patch, MagicMock, call
3+
from resolve_duplicate_secret_scanning_alerts import main
4+
5+
import argparse
6+
import logging
7+
8+
9+
# logging.getLogger("resolve_duplicate_secret_scanning_alerts").setLevel(logging.DEBUG)
10+
11+
12+
@pytest.fixture
13+
def mock_args():
14+
with patch('argparse.ArgumentParser.parse_args') as mock_parse_args:
15+
mock_parse_args.return_value = argparse.Namespace(
16+
name="test_org",
17+
scope="org",
18+
state="open",
19+
since="2024-10-08",
20+
hostname="github.com",
21+
debug=True,
22+
add_matching_secret=[("old_type", "new_type")]
23+
)
24+
yield mock_parse_args
25+
26+
@pytest.fixture
27+
def mock_github():
28+
with patch('resolve_duplicate_secret_scanning_alerts.GitHub') as mock_github:
29+
yield mock_github
30+
31+
@pytest.fixture
32+
def mock_list_secret_scanning_alerts():
33+
with patch('resolve_duplicate_secret_scanning_alerts.list_secret_scanning_alerts') as mock_list:
34+
mock_list.return_value = [
35+
{"repo": "test_org/test_repo", "secret_type": "old_type", "secret": "secret1", "state": "resolved", "url": "https://github.com/test_org/test_repo/1", "resolution": "false_positive", "resolution_comment": "Foo" },
36+
{"repo": "test_org/test_repo", "secret_type": "new_type", "secret": "secret1", "state": "open", "url": "https://github.com/test_org/test_repo/2", "resolution": None, "resolution_comment": None },
37+
{"repo": "test_org/test_repo", "secret_type": "google_cloud_private_key_id", "secret": "1234567", "state": "resolved", "url": "https://github.com/test_org/test_repo/3", "resolution": "false_positive", "resolution_comment": "Foo" },
38+
{"repo": "test_org/test_repo", "secret_type": "google_cloud_service_account_credentials", "secret": '{"private_key_id": "1234567"}', "state": "open", "url": "https://github.com/test_org/test_repo/4", "resolution": None, "resolution_comment": None },
39+
40+
]
41+
yield mock_list
42+
43+
def test_main(mock_args, mock_github, mock_list_secret_scanning_alerts):
44+
mock_github_instance = mock_github.return_value
45+
mock_github_instance.query_once = MagicMock()
46+
47+
main()
48+
49+
mock_github_instance.query_once.assert_has_calls(
50+
[
51+
call(
52+
"repo",
53+
"test_org/test_repo",
54+
"/secret-scanning/alerts/2",
55+
data={
56+
"state": "resolved",
57+
"resolution": "false_positive",
58+
"resolution_comment": "Foo"
59+
},
60+
method="PATCH"
61+
),
62+
call(
63+
"repo",
64+
"test_org/test_repo",
65+
"/secret-scanning/alerts/4",
66+
data={
67+
"state": "resolved",
68+
"resolution": "false_positive",
69+
"resolution_comment": "Foo"
70+
},
71+
method="PATCH"
72+
),
73+
],
74+
any_order=True
75+
)
76+
assert mock_github_instance.query_once.call_count == 2

0 commit comments

Comments
 (0)