66from glob import glob
77from pathlib import PurePath
88from urllib .parse import urlencode
9+ from socketdev import socketdev
10+ from typing import Tuple , Dict
911
1012import requests
1113
2325from socketsecurity .core .licenses import Licenses
2426
2527from .cli_client import CliClient
26- from .config import SocketConfig
28+ from .socket_config import SocketConfig
2729from .utils import socket_globs
30+ from socketdev .org import Orgs , Organization
2831
2932__all__ = [
3033 "Core" ,
@@ -139,12 +142,11 @@ def do_request(
139142
140143class Core :
141144
142- client : CliClient
143145 config : SocketConfig
144-
145- def __init__ (self , config : SocketConfig , client : CliClient ):
146+ sdk : socketdev
147+ def __init__ (self , config : SocketConfig , sdk : socketdev ):
146148 self .config = config
147- self .client = client
149+ self .sdk = sdk
148150 self .set_org_vars ()
149151
150152
@@ -165,73 +167,38 @@ def set_org_vars(self) -> None:
165167 # Get security policy AFTER org_id is updated
166168 self .config .security_policy = self .get_security_policy ()
167169
168- def get_org_id_slug (self ) -> tuple [str , str ]:
170+ def get_org_id_slug (self ) -> Tuple [str , str ]:
169171 """Gets the Org ID and Org Slug for the API Token"""
170- path = "organizations"
171- response = self .client .request (path )
172- data = response .json ()
173- organizations = data .get ("organizations" )
174- new_org_id = None
175- new_org_slug = None
172+ response = self .sdk .org .get ()
173+ organizations : Dict [str , Organization ] = response .get ("organizations" , {})
174+
176175 if len (organizations ) == 1 :
177- for key in organizations :
178- new_org_id = key
179- new_org_slug = organizations [key ].get ('slug' )
180- return new_org_id , new_org_slug
176+ org_id = next (iter (organizations )) # More Pythonic way to get first key
177+ return org_id , organizations [org_id ]['slug' ]
178+ return None , None
181179
182180 def get_sbom_data (self , full_scan_id : str ) -> list :
183181 """
184182 Return the list of SBOM artifacts for a full scan
185183 """
186- path = f"orgs/{ self .config .org_slug } /full-scans/{ full_scan_id } "
187- response = self .client .request (path )
188- results = []
189184
190- if response .status_code != 200 :
185+ response = self .sdk .fullscans .stream (self .config .org_slug , full_scan_id )
186+ if (response .get ("success" , False ) == False ):
191187 log .debug (f"Failed to get SBOM data for full-scan { full_scan_id } " )
192- log .debug (response .text )
188+ log .debug (response .get ( "message" , "No message" ) )
193189 return []
194- data = response .text
195- data .strip ('"' )
196- data .strip ()
197- for line in data .split ("\n " ):
198- if line != '"' and line != "" and line is not None :
199- item = json .loads (line )
200- results .append (item )
201190
202- return results
191+ response .pop ("success" , None )
192+ response .pop ("status" , None )
193+ return response
203194
204195 def get_security_policy (self ) -> dict :
205- """Get the Security policy and determine the effective Org security policy"""
206- payload = [{"organization" : self .config .org_id }]
207-
208- response = self .client .request (
209- path = "settings" ,
210- method = "POST" ,
211- payload = json .dumps (payload )
212- )
196+ """Get the Security policy"""
213197
214- data = response .json ()
215- defaults = data .get ("defaults" , {})
216- default_rules = defaults .get ("issueRules" , {})
217- entries = data .get ("entries" , [])
198+ response = self .sdk .settings .get (self .config .org_slug )
218199
219- org_rules = {}
220-
221- # Get organization-specific rules
222- for org_set in entries :
223- settings = org_set .get ("settings" )
224- if settings :
225- org_details = settings .get ("organization" , {})
226- org_rules .update (org_details .get ("issueRules" , {}))
227-
228- # Apply default rules where no org-specific rule exists
229- for default in default_rules :
230- if default not in org_rules :
231- action = default_rules [default ]["action" ]
232- org_rules [default ] = {"action" : action }
233-
234- return org_rules
200+ data = response .get ("securityPolicyRules" , {})
201+ return data
235202
236203 @staticmethod
237204 def get_manifest_files (package : Package , packages : dict ) -> str :
@@ -254,11 +221,15 @@ def get_manifest_files(package: Package, packages: dict) -> str:
254221 return manifest_files
255222
256223 def create_sbom_output (self , diff : Diff ) -> dict :
257- base_path = f"orgs/{ self .config .org_slug } /export/cdx"
258- path = f"{ base_path } /{ diff .id } "
259- result = self .client .request (path = path )
260224 try :
261- sbom = result .json ()
225+ result = self .sdk .export .cdx_bom (self .config .org_slug , diff .id )
226+ if (result .get ("success" , False ) == False ):
227+ log .error (f"Failed to get CycloneDX Output for full-scan { diff .id } " )
228+ log .error (result .get ("message" , "No message" ))
229+ return {}
230+
231+ result .pop ("success" , None )
232+ return result
262233 except Exception as error :
263234 log .error (f"Unable to get CycloneDX Output for { diff .id } " )
264235 log .error (error )
@@ -330,37 +301,18 @@ def create_full_scan(self, files: list, params: FullScanParams, workspace: str)
330301 :param workspace: str - Path of workspace
331302 :return:
332303 """
333- send_files = []
304+
334305 create_full_start = time .time ()
335306 log .debug ("Creating new full scan" )
336- for file in files :
337- if platform .system () == "Windows" :
338- file = file .replace ("\\ " , "/" )
339- if "/" in file :
340- path , name = file .rsplit ("/" , 1 )
341- else :
342- path = "."
343- name = file
344- full_path = f"{ path } /{ name } "
345- if full_path .startswith (workspace ):
346- key = full_path [len (workspace ):]
347- else :
348- key = full_path
349- key = key .lstrip ("/" )
350- key = key .lstrip ("./" )
351- payload = (
352- key ,
353- (
354- name ,
355- open (full_path , 'rb' )
356- )
357- )
358- send_files .append (payload )
359- query_params = urlencode (params .__dict__ )
360- full_uri = f"{ self .config .full_scan_path } ?{ query_params } "
361- response = self .client .request (full_uri , method = "POST" , files = send_files )
362- results = response .json ()
363- full_scan = FullScan (** results )
307+ params .org_slug = self .config .org_slug
308+ res = self .sdk .fullscans .post (files , params )
309+
310+ # If the response is a string, it's an error message
311+ if isinstance (res , str ):
312+ log .error (f"Error creating full scan: { res } " )
313+ return FullScan ()
314+
315+ full_scan = FullScan (** res )
364316 full_scan .sbom_artifacts = self .get_sbom_data (full_scan .id )
365317 create_full_end = time .time ()
366318 total_time = create_full_end - create_full_start
@@ -380,35 +332,29 @@ def get_license_details(package: Package) -> Package:
380332 def get_head_scan_for_repo (self , repo_slug : str ) -> str :
381333 """Get the head scan ID for a repository"""
382334 print (f"\n Getting head scan for repo: { repo_slug } " )
383- repo_path = f"{ self .config .repository_path } /{ repo_slug } "
384- print (f"Repository path: { repo_path } " )
385335
386- response = self .client . request ( repo_path )
336+ response = self .sdk . repos . repo ( self . config . org_slug , repo_slug )
387337 response_data = response .json ()
388338 print (f"Raw API Response: { response_data } " ) # Debug raw response
389- print (f"Response type: { type (response_data )} " ) # Debug response type
390339
391- if "repository" in response_data :
392- print (f"Repository data: { response_data ['repository' ]} " ) # Debug repository data
393- else :
394- print ("No 'repository' key in response data!" )
340+ if not response_data or "repository" not in response_data :
341+ log .error ("Failed to get repository data from API" )
342+ return ""
395343
396344 repository = Repository (** response_data ["repository" ])
397345 print (f"Created repository object: { repository .__dict__ } " ) # Debug final object
398346
399347 return repository .head_full_scan_id
400348
349+ # TODO: this is the same as get_sbom_data. AND IT CALLS GET_SBOM_DATA. huh?
401350 def get_full_scan (self , full_scan_id : str ) -> FullScan :
402351 """
403352 Get the specified full scan and return a FullScan object
404353 :param full_scan_id: str - ID of the full scan to pull
405354 :return:
406355 """
407- full_scan_url = f"{ self .config .full_scan_path } /{ full_scan_id } "
408- response = self .client .request (full_scan_url )
409- results = response .json ()
356+ results = self .get_sbom_data (full_scan_id )
410357 full_scan = FullScan (** results )
411- full_scan .sbom_artifacts = self .get_sbom_data (full_scan .id )
412358 return full_scan
413359
414360 def create_new_diff (self , path : str , params : FullScanParams , workspace : str , no_change : bool = False ) -> Diff :
0 commit comments