99from socketsecurity .core .classes import Comment
1010from socketsecurity .core .scm_comments import Comments
1111
12- # Declare all globals with initial None values
13- github_sha : Optional [str ] = None
14- github_api_url : Optional [str ] = None
15- github_ref_type : Optional [str ] = None
16- github_event_name : Optional [str ] = None
17- github_workspace : Optional [str ] = None
18- github_repository : Optional [str ] = None
19- github_ref_name : Optional [str ] = None
20- github_actor : Optional [str ] = None
21- default_branch : Optional [str ] = None
22- github_env : Optional [str ] = None
23- pr_number : Optional [str ] = None
24- pr_name : Optional [str ] = None
25- is_default_branch : bool = False
26- commit_message : Optional [str ] = None
27- committer : Optional [str ] = None
28- gh_api_token : Optional [str ] = None
29- github_repository_owner : Optional [str ] = None
30- event_action : Optional [str ] = None
31-
32- github_variables = [
33- "GITHUB_SHA" ,
34- "GITHUB_API_URL" ,
35- "GITHUB_REF_TYPE" ,
36- "GITHUB_EVENT_NAME" ,
37- "GITHUB_WORKSPACE" ,
38- "GITHUB_REPOSITORY" ,
39- "GITHUB_REF_NAME" ,
40- "DEFAULT_BRANCH" ,
41- "PR_NUMBER" ,
42- "PR_NAME" ,
43- "COMMIT_MESSAGE" ,
44- "GITHUB_ACTOR" ,
45- "GITHUB_ENV" ,
46- "GH_API_TOKEN" ,
47- "GITHUB_REPOSITORY_OWNER" ,
48- "EVENT_ACTION"
49- ]
5012
5113@dataclass
5214class GithubConfig :
@@ -67,6 +29,7 @@ class GithubConfig:
6729 token : str
6830 owner : str
6931 event_action : Optional [str ]
32+ headers : dict
7033
7134 @classmethod
7235 def from_env (cls ) -> 'GithubConfig' :
@@ -76,13 +39,19 @@ def from_env(cls) -> 'GithubConfig':
7639 log .error ("Unable to get Github API Token from GH_API_TOKEN" )
7740 sys .exit (2 )
7841
42+ repository = os .getenv ('GITHUB_REPOSITORY' , '' )
43+ owner = os .getenv ('GITHUB_REPOSITORY_OWNER' , '' )
44+ if '/' in repository :
45+ owner = repository .split ('/' )[0 ]
46+ repository = repository .split ('/' )[1 ]
47+
7948 return cls (
8049 sha = os .getenv ('GITHUB_SHA' , '' ),
8150 api_url = os .getenv ('GITHUB_API_URL' , '' ),
8251 ref_type = os .getenv ('GITHUB_REF_TYPE' , '' ),
8352 event_name = os .getenv ('GITHUB_EVENT_NAME' , '' ),
8453 workspace = os .getenv ('GITHUB_WORKSPACE' , '' ),
85- repository = os . getenv ( 'GITHUB_REPOSITORY' , '' ). split ( '/' )[ - 1 ] ,
54+ repository = repository ,
8655 ref_name = os .getenv ('GITHUB_REF_NAME' , '' ),
8756 default_branch = os .getenv ('DEFAULT_BRANCH' , '' ).lower () == 'true' ,
8857 pr_number = os .getenv ('PR_NUMBER' ),
@@ -91,206 +60,140 @@ def from_env(cls) -> 'GithubConfig':
9160 actor = os .getenv ('GITHUB_ACTOR' , '' ),
9261 env = os .getenv ('GITHUB_ENV' , '' ),
9362 token = token ,
94- owner = os .getenv ('GITHUB_REPOSITORY_OWNER' , '' ),
95- event_action = os .getenv ('EVENT_ACTION' )
63+ owner = owner ,
64+ event_action = os .getenv ('EVENT_ACTION' ),
65+ headers = {
66+ 'Authorization' : f"Bearer { token } " ,
67+ 'User-Agent' : 'SocketPythonScript/0.0.1' ,
68+ "accept" : "application/json"
69+ }
9670 )
9771
9872
99- for env in github_variables :
100- var_name = env .lower ()
101- globals ()[var_name ] = os .getenv (env ) or None
102- if var_name == "default_branch" :
103- if default_branch is None or default_branch .lower () == "false" :
104- is_default_branch = False
105- else :
106- is_default_branch = True
107- if var_name != "gh_api_token" :
108- value = globals ()[var_name ] = os .getenv (env ) or None
109- log .debug (f"{ env } ={ value } " )
73+ class Github :
74+ def __init__ (self , config : Optional [GithubConfig ] = None ):
75+ self .config = config or GithubConfig .from_env ()
11076
111- headers = {
112- 'Authorization' : f"Bearer { gh_api_token } " ,
113- 'User-Agent' : 'SocketPythonScript/0.0.1' ,
114- "accept" : "application/json"
115- }
77+ if not self .config .token :
78+ log .error ("Unable to get Github API Token" )
79+ sys .exit (2 )
11680
81+ def check_event_type (self ) -> str :
82+ if self .config .event_name .lower () == "push" :
83+ if not self .config .pr_number :
84+ return "main"
85+ return "diff"
86+ elif self .config .event_name .lower () == "pull_request" :
87+ if self .config .event_action and self .config .event_action .lower () in ['opened' , 'synchronize' ]:
88+ return "diff"
89+ log .info (f"Pull Request Action { self .config .event_action } is not a supported type" )
90+ sys .exit (0 )
91+ elif self .config .event_name .lower () == "issue_comment" :
92+ return "comment"
93+
94+ log .error (f"Unknown event type { self .config .event_name } " )
95+ sys .exit (0 )
96+
97+ def post_comment (self , body : str ) -> None :
98+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/{ self .config .pr_number } /comments"
99+ payload = json .dumps ({"body" : body })
100+ do_request (
101+ path = path ,
102+ payload = payload ,
103+ method = "POST" ,
104+ headers = self .config .headers ,
105+ base_url = self .config .api_url
106+ )
117107
118- class Github :
119- commit_sha : str
120- api_url : str
121- ref_type : str
122- event_name : str
123- workspace : str
124- repository : str
125- ref_name : str
126- default_branch : str
127- is_default_branch : bool
128- pr_number : int
129- pr_name : str
130- commit_message : str
131- committer : str
132- github_env : str
133- api_token : str
134- project_id : int
135- event_action : str
108+ def update_comment (self , body : str , comment_id : str ) -> None :
109+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/comments/{ comment_id } "
110+ payload = json .dumps ({"body" : body })
111+ do_request (
112+ path = path ,
113+ payload = payload ,
114+ method = "PATCH" ,
115+ headers = self .config .headers ,
116+ base_url = self .config .api_url
117+ )
136118
137- def __init__ (self ):
138- self .commit_sha = github_sha
139- self .api_url = github_api_url
140- self .ref_type = github_ref_type
141- self .event_name = github_event_name
142- self .workspace = github_workspace
143- self .repository = github_repository
144- if "/" in self .repository :
145- self .repository = self .repository .rsplit ("/" )[1 ]
146- self .branch = github_ref_name
147- self .default_branch = default_branch
148- self .is_default_branch = is_default_branch
149- self .pr_number = pr_number
150- self .pr_name = pr_name
151- self .commit_message = commit_message
152- self .committer = github_actor
153- self .github_env = github_env
154- self .api_token = gh_api_token
155- self .project_id = 0
156- self .event_action = event_action
157- if self .api_token is None :
158- print ("Unable to get Github API Token from GH_API_TOKEN" )
159- sys .exit (2 )
119+ def write_new_env (self , name : str , content : str ) -> None :
120+ with open (self .config .env , "a" ) as f :
121+ new_content = content .replace ("\n " , "\\ n" )
122+ f .write (f"{ name } ={ new_content } " )
160123
161- @staticmethod
162- def check_event_type () -> str :
163- if github_event_name .lower () == "push" :
164- if pr_number is None or pr_number == "" or pr_number == "0" :
165- event_type = "main"
166- else :
167- event_type = "diff"
168- elif github_event_name .lower () == "pull_request" :
169- if event_action is not None and event_action != "" and (
170- event_action .lower () == "opened" or event_action .lower () == 'synchronize' ):
171- event_type = "diff"
172- else :
173- log .info (f"Pull Request Action { event_action } is not a supported type" )
174- sys .exit (0 )
175- elif github_event_name .lower () == "issue_comment" :
176- event_type = "comment"
124+ def get_comments_for_pr (self ) -> dict :
125+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/{ self .config .pr_number } /comments"
126+ raw_comments = Comments .process_response (
127+ do_request (path , headers = self .config .headers , base_url = self .config .api_url )
128+ )
129+
130+ comments = {}
131+ if "error" not in raw_comments :
132+ for item in raw_comments :
133+ comment = Comment (** item )
134+ comments [comment .id ] = comment
135+ comment .body_list = comment .body .split ("\n " )
177136 else :
178- event_type = None
179- log .error (f"Unknown event type { github_event_name } " )
180- sys .exit (0 )
181- return event_type
137+ log .error (raw_comments )
138+
139+ return Comments .check_for_socket_comments (comments )
182140
183- @staticmethod
184141 def add_socket_comments (
185- security_comment : str ,
186- overview_comment : str ,
187- comments : dict ,
188- new_security_comment : bool = True ,
189- new_overview_comment : bool = True
142+ self ,
143+ security_comment : str ,
144+ overview_comment : str ,
145+ comments : dict ,
146+ new_security_comment : bool = True ,
147+ new_overview_comment : bool = True
190148 ) -> None :
191- existing_overview_comment = comments .get ("overview" )
192- existing_security_comment = comments .get ("security" )
193149 if new_overview_comment :
194150 log .debug ("New Dependency Overview comment" )
195- if existing_overview_comment is not None :
151+ if overview := comments . get ( "overview" ) :
196152 log .debug ("Previous version of Dependency Overview, updating" )
197- existing_overview_comment : Comment
198- Github .update_comment (overview_comment , str (existing_overview_comment .id ))
153+ self .update_comment (overview_comment , str (overview .id ))
199154 else :
200155 log .debug ("No previous version of Dependency Overview, posting" )
201- Github .post_comment (overview_comment )
156+ self .post_comment (overview_comment )
157+
202158 if new_security_comment :
203159 log .debug ("New Security Issue Comment" )
204- if existing_security_comment is not None :
160+ if security := comments . get ( "security" ) :
205161 log .debug ("Previous version of Security Issue comment, updating" )
206- existing_security_comment : Comment
207- Github .update_comment (security_comment , str (existing_security_comment .id ))
162+ self .update_comment (security_comment , str (security .id ))
208163 else :
209164 log .debug ("No Previous version of Security Issue comment, posting" )
210- Github .post_comment (security_comment )
211-
212- @staticmethod
213- def post_comment (body : str ) -> None :
214- repo = github_repository .rsplit ("/" , 1 )[1 ]
215- path = f"repos/{ github_repository_owner } /{ repo } /issues/{ pr_number } /comments"
216- payload = {
217- "body" : body
218- }
219- payload = json .dumps (payload )
220- do_request (path , payload = payload , method = "POST" , headers = headers , base_url = github_api_url )
221-
222- @staticmethod
223- def update_comment (body : str , comment_id : str ) -> None :
224- repo = github_repository .rsplit ("/" , 1 )[1 ]
225- path = f"repos/{ github_repository_owner } /{ repo } /issues/comments/{ comment_id } "
226- payload = {
227- "body" : body
228- }
229- payload = json .dumps (payload )
230- do_request (path , payload = payload , method = "PATCH" , headers = headers , base_url = github_api_url )
165+ self .post_comment (security_comment )
231166
232- @staticmethod
233- def write_new_env (name : str , content : str ) -> None :
234- file = open (github_env , "a" )
235- new_content = content .replace ("\n " , "\\ n" )
236- env_output = f"{ name } ={ new_content } "
237- file .write (env_output )
238-
239- @staticmethod
240- def get_comments_for_pr (repo : str , pr : str ) -> dict :
241- path = f"repos/{ github_repository_owner } /{ repo } /issues/{ pr } /comments"
242- raw_comments = Comments .process_response (do_request (path , headers = headers , base_url = github_api_url ))
243- comments = {}
244- if "error" not in raw_comments :
245- for item in raw_comments :
246- comment = Comment (** item )
247- comments [comment .id ] = comment
248- for line in comment .body .split ("\n " ):
249- comment .body_list .append (line )
250- else :
251- log .error (raw_comments )
252- socket_comments = Comments .check_for_socket_comments (comments )
253- return socket_comments
254-
255- @staticmethod
256- def remove_comment_alerts (comments : dict ):
257- security_alert = comments .get ("security" )
258- if security_alert is not None :
259- security_alert : Comment
167+ def remove_comment_alerts (self , comments : dict ) -> None :
168+ if security_alert := comments .get ("security" ):
260169 new_body = Comments .process_security_comment (security_alert , comments )
261- Github .handle_ignore_reactions (comments )
262- Github .update_comment (new_body , str (security_alert .id ))
263-
264- @staticmethod
265- def handle_ignore_reactions (comments : dict ) -> None :
266- for comment in comments ["ignore" ]:
267- comment : Comment
268- if "SocketSecurity ignore" in comment .body :
269- if not Github .comment_reaction_exists (comment .id ):
270- Github .post_reaction (comment .id )
271-
272- @staticmethod
273- def post_reaction (comment_id : int ) -> None :
274- repo = github_repository .rsplit ("/" , 1 )[1 ]
275- path = f"repos/{ github_repository_owner } /{ repo } /issues/comments/{ comment_id } /reactions"
276- payload = {
277- "content" : "+1"
278- }
279- payload = json .dumps (payload )
280- do_request (path , payload = payload , method = "POST" , headers = headers , base_url = github_api_url )
170+ self .handle_ignore_reactions (comments )
171+ self .update_comment (new_body , str (security_alert .id ))
172+
173+ def handle_ignore_reactions (self , comments : dict ) -> None :
174+ for comment in comments .get ("ignore" , []):
175+ if "SocketSecurity ignore" in comment .body and not self .comment_reaction_exists (comment .id ):
176+ self .post_reaction (comment .id )
177+
178+ def post_reaction (self , comment_id : int ) -> None :
179+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/comments/{ comment_id } /reactions"
180+ payload = json .dumps ({"content" : "+1" })
181+ do_request (
182+ path = path ,
183+ payload = payload ,
184+ method = "POST" ,
185+ headers = self .config .headers ,
186+ base_url = self .config .api_url
187+ )
281188
282- @staticmethod
283- def comment_reaction_exists (comment_id : int ) -> bool :
284- repo = github_repository .rsplit ("/" , 1 )[1 ]
285- path = f"repos/{ github_repository_owner } /{ repo } /issues/comments/{ comment_id } /reactions"
189+ def comment_reaction_exists (self , comment_id : int ) -> bool :
190+ path = f"repos/{ self .config .owner } /{ self .config .repository } /issues/comments/{ comment_id } /reactions"
286191 try :
287- response = do_request (path , headers = headers , base_url = github_api_url )
288- data = response .json ()
289- for reaction in data :
290- content = reaction .get ("content" )
291- if content is not None and content == ":thumbsup:" :
192+ response = do_request (path , headers = self .config .headers , base_url = self .config .api_url )
193+ for reaction in response .json ():
194+ if reaction .get ("content" ) == ":thumbsup:" :
292195 return True
293196 except Exception as error :
294- log .error (f"Unable to get reaction for { comment_id } for PR { pr_number } " )
197+ log .error (f"Unable to get reaction for { comment_id } for PR { self . config . pr_number } " )
295198 log .error (error )
296199 return False
0 commit comments