|
8 | 8 | import logging |
9 | 9 | import datetime |
10 | 10 | import json |
11 | | -from typing import Generator |
| 11 | +from typing import Generator, Iterable |
12 | 12 | from collections import defaultdict |
13 | 13 | from defusedcsv import csv # type: ignore |
14 | 14 | from githubapi import GitHub, parse_date |
@@ -77,6 +77,33 @@ def change_state(hostname, result: dict, res: dict) -> None: |
77 | 77 | return |
78 | 78 |
|
79 | 79 |
|
| 80 | +def update_states(hostname: str, results: Iterable[dict], existing_results: dict) -> None: |
| 81 | + """Update the state of matching alerts to match the existing results.""" |
| 82 | + for result in results: |
| 83 | + repo = result["repo"] |
| 84 | + path = result["path"] |
| 85 | + start_line = result["start_line"] |
| 86 | + start_column = result["start_column"] |
| 87 | + end_line = result["end_line"] |
| 88 | + end_column = result["end_column"] |
| 89 | + |
| 90 | + start_loc = (start_line, start_column) |
| 91 | + end_loc = (end_line, end_column) |
| 92 | + |
| 93 | + LOG.debug(f"{repo}, {path}, {start_loc}, {end_loc}") |
| 94 | + |
| 95 | + try: |
| 96 | + res = existing_results[repo][path][start_loc][end_loc] |
| 97 | + LOG.warning(f"Found existing alert: {res}") |
| 98 | + except KeyError: |
| 99 | + continue |
| 100 | + |
| 101 | + if res["state"] != result["state"]: |
| 102 | + LOG.warning(f"State mismatch: {res['state']} != {result['state']}") |
| 103 | + |
| 104 | + change_state(hostname, result, res) |
| 105 | + |
| 106 | + |
80 | 107 | def add_args(parser: argparse.ArgumentParser) -> None: |
81 | 108 | """Add command-line arguments to the parser.""" |
82 | 109 | parser.add_argument( |
@@ -154,29 +181,7 @@ def main() -> None: |
154 | 181 |
|
155 | 182 | results = list_code_scanning_alerts(name, scope, hostname, state=state, since=since) |
156 | 183 |
|
157 | | - for result in results: |
158 | | - repo = result["repo"] |
159 | | - path = result["path"] |
160 | | - start_line = result["start_line"] |
161 | | - start_column = result["start_column"] |
162 | | - end_line = result["end_line"] |
163 | | - end_column = result["end_column"] |
164 | | - |
165 | | - start_loc = (start_line, start_column) |
166 | | - end_loc = (end_line, end_column) |
167 | | - |
168 | | - LOG.debug(f"{repo}, {path}, {start_loc}, {end_loc}") |
169 | | - |
170 | | - try: |
171 | | - res = existing_results[repo][path][start_loc][end_loc] |
172 | | - LOG.warning(f"Found existing alert: {res}") |
173 | | - except KeyError: |
174 | | - continue |
175 | | - |
176 | | - if res["state"] != result["state"]: |
177 | | - LOG.warning(f"State mismatch: {res['state']} != {result['state']}") |
178 | | - |
179 | | - change_state(hostname, result, res) |
| 184 | + update_states(hostname, results, existing_results) |
180 | 185 |
|
181 | 186 |
|
182 | 187 | if __name__ == "__main__": |
|
0 commit comments