From 0bc250a923385eeeb66f1cd8a1bcdd3cb896a609 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Wed, 18 Nov 2020 17:43:30 -0500 Subject: [PATCH 1/5] WIP on adding symbols inspection --- libcflib/symbol_inspection.py | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 libcflib/symbol_inspection.py diff --git a/libcflib/symbol_inspection.py b/libcflib/symbol_inspection.py new file mode 100644 index 0000000..31c23a6 --- /dev/null +++ b/libcflib/symbol_inspection.py @@ -0,0 +1,43 @@ +from pprint import pprint + +import jedi + +import os + +from tqdm import tqdm + + +def file_path_to_import(file_path: str): + return ( + file_path.replace("/__init__.py", "") + .replace(".py", "") + .replace("/", ".") + ) + + +def get_all_symbol_names(top_dir): + symbols_dict = {} + module_import = top_dir.split('/')[-1] + for root, dirs, files in tqdm(os.walk(top_dir)): + _files = [f for f in files if f.endswith('.py')] + for file in _files: + file_name = os.path.join(root, file) + import_name = file_path_to_import(''.join(file_name.rpartition(module_import)[1:])) + data = jedi.Script(path=file_name).complete() + symbols_from_script = {k.full_name: k.type for k in data if k.full_name and module_import+'.' in k.full_name} + + # cull statements within functions and classes, which are not importable + classes_and_functions = {k for k, v in symbols_from_script.items() if v in ['class', 'function']} + for k in list(symbols_from_script): + for cf in classes_and_functions: + if k != cf and k.startswith(cf) and k in symbols_from_script: + symbols_from_script.pop(k) + + symbols_dict[import_name] = set(symbols_from_script) + + symbols = set() + # handle star imports + for k, v in symbols_dict.items(): + symbols.update(v) + symbols.update({f"{k}.{vv.rsplit('.', 1)[-1]}" for vv in v}) + return symbols From a442166fc9f207ff67b589db254a82df68c07988 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Mon, 30 Nov 2020 18:07:05 -0500 Subject: [PATCH 2/5] add test for symbol extraction code --- libcflib/symbol_inspection.py | 34 ++++++++++++++++++++------------- tests/test_symbol_inspection.py | 10 ++++++++++ 2 files changed, 31 insertions(+), 13 deletions(-) create mode 100644 tests/test_symbol_inspection.py diff --git a/libcflib/symbol_inspection.py b/libcflib/symbol_inspection.py index 31c23a6..d87ec01 100644 --- a/libcflib/symbol_inspection.py +++ b/libcflib/symbol_inspection.py @@ -1,5 +1,3 @@ -from pprint import pprint - import jedi import os @@ -8,26 +6,36 @@ def file_path_to_import(file_path: str): - return ( - file_path.replace("/__init__.py", "") - .replace(".py", "") - .replace("/", ".") - ) + return file_path.replace("/__init__.py", "").replace(".py", "").replace("/", ".") def get_all_symbol_names(top_dir): + # Note Jedi seems to pick up things that are protected by a + # __name__ == '__main__' if statement + # this could cause some over-reporting of viable imports this + # shouldn't cause issues with an audit since we don't expect 3rd parties + # to depend on those symbols_dict = {} - module_import = top_dir.split('/')[-1] + module_import = top_dir.split("/")[-1] + # walk all the files looking for python files for root, dirs, files in tqdm(os.walk(top_dir)): - _files = [f for f in files if f.endswith('.py')] + _files = [f for f in files if f.endswith(".py")] for file in _files: file_name = os.path.join(root, file) - import_name = file_path_to_import(''.join(file_name.rpartition(module_import)[1:])) + import_name = file_path_to_import( + "".join(file_name.rpartition(module_import)[1:]) + ) data = jedi.Script(path=file_name).complete() - symbols_from_script = {k.full_name: k.type for k in data if k.full_name and module_import+'.' in k.full_name} + symbols_from_script = { + k.full_name: k.type + for k in data + if k.full_name and module_import + "." in k.full_name + } # cull statements within functions and classes, which are not importable - classes_and_functions = {k for k, v in symbols_from_script.items() if v in ['class', 'function']} + classes_and_functions = { + k for k, v in symbols_from_script.items() if v in ["class", "function"] + } for k in list(symbols_from_script): for cf in classes_and_functions: if k != cf and k.startswith(cf) and k in symbols_from_script: @@ -36,7 +44,7 @@ def get_all_symbol_names(top_dir): symbols_dict[import_name] = set(symbols_from_script) symbols = set() - # handle star imports + # handle star imports, which don't usually get added but are valid symbols for k, v in symbols_dict.items(): symbols.update(v) symbols.update({f"{k}.{vv.rsplit('.', 1)[-1]}" for vv in v}) diff --git a/tests/test_symbol_inspection.py b/tests/test_symbol_inspection.py new file mode 100644 index 0000000..9e11da2 --- /dev/null +++ b/tests/test_symbol_inspection.py @@ -0,0 +1,10 @@ +from pathlib import Path + +from libcflib.symbol_inspection import get_all_symbol_names + + +def test_get_all_symbol_names(): + top_dir = Path(__file__).parent / Path("..") / Path("libcflib") + assert "libcflib.symbol_inspection.get_all_symbol_names" in get_all_symbol_names( + str(top_dir) + ) From 760ac9c527735266658bde6fdefa0141a7562f36 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Mon, 30 Nov 2020 19:41:03 -0500 Subject: [PATCH 3/5] use reap system --- libcflib/preloader.py | 6 ++-- libcflib/symbol_inspection.py | 55 +++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/libcflib/preloader.py b/libcflib/preloader.py index 4dda176..266bdb8 100644 --- a/libcflib/preloader.py +++ b/libcflib/preloader.py @@ -122,16 +122,16 @@ def reap_package(root_path, package, dst_path, src_url, progress_callback=None): return harvested_data -def reap(path, known_bad_packages=()): +def reap(path, known_bad_packages=(), reap_function=reap_package, number_to_reap=1000): sorted_files = list(diff(path)) print(f"TOTAL OUTSTANDING ARTIFACTS: {len(sorted_files)}") - sorted_files = sorted_files[:1000] + sorted_files = sorted_files[:number_to_reap] progress = tqdm.tqdm(total=len(sorted_files)) with ThreadPoolExecutor(max_workers=20) as pool: futures = [ pool.submit( - reap_package, + reap_function, path, package, dst, diff --git a/libcflib/symbol_inspection.py b/libcflib/symbol_inspection.py index d87ec01..d0dbeff 100644 --- a/libcflib/symbol_inspection.py +++ b/libcflib/symbol_inspection.py @@ -1,9 +1,20 @@ +import glob +import io +import json +import tarfile +from tempfile import TemporaryDirectory + import jedi import os +import requests from tqdm import tqdm +from libcflib.harvester import harvest +from libcflib.preloader import reap, ReapFailure +from libcflib.tools import expand_file_and_mkdirs + def file_path_to_import(file_path: str): return file_path.replace("/__init__.py", "").replace(".py", "").replace("/", ".") @@ -49,3 +60,47 @@ def get_all_symbol_names(top_dir): symbols.update(v) symbols.update({f"{k}.{vv.rsplit('.', 1)[-1]}" for vv in v}) return symbols + + +def harvest_imports(io_like): + tf = tarfile.open(fileobj=io_like, mode="r:bz2") + with TemporaryDirectory() as f: + tf.extractall(path=f) + return list(get_all_symbol_names(os.path.join(f, 'site-packages'))) + + +def reap_imports(root_path, package, dst_path, src_url, progress_callback=None): + if progress_callback: + progress_callback() + try: + resp = requests.get(src_url, timeout=60 * 2) + filelike = io.BytesIO(resp.content) + harvested_data = harvest_imports(filelike) + with open( + expand_file_and_mkdirs(os.path.join(root_path, package, dst_path)), "w" + ) as fo: + json.dump(harvested_data, fo, indent=1, sort_keys=True) + except Exception as e: + raise ReapFailure(package, src_url, str(e)) + return harvested_data + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("root_path") + parser.add_argument( + "--known-bad-packages", + help="name of a json file containing a list of urls to be skipped", + ) + + args = parser.parse_args() + print(args) + if args.known_bad_packages: + with open(args.known_bad_packages, "r") as fo: + known_bad_packages = set(json.load(fo)) + else: + known_bad_packages = set() + + reap(args.root_path, known_bad_packages, reap_imports, number_to_reap=10) From 312cb35d21b4968bf9c7643f1f86350aacc470e3 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Sun, 20 Dec 2020 16:30:04 -0500 Subject: [PATCH 4/5] WIP use dask for multiprocessing --- libcflib/symbol_inspection.py | 228 ++++++++++++++++++++++++++++------ 1 file changed, 187 insertions(+), 41 deletions(-) diff --git a/libcflib/symbol_inspection.py b/libcflib/symbol_inspection.py index d0dbeff..66eb024 100644 --- a/libcflib/symbol_inspection.py +++ b/libcflib/symbol_inspection.py @@ -1,89 +1,236 @@ +import contextlib import glob import io import json +import os import tarfile +import traceback +from concurrent.futures._base import as_completed +from concurrent.futures.process import ProcessPoolExecutor +from concurrent.futures.thread import ThreadPoolExecutor from tempfile import TemporaryDirectory -import jedi - -import os import requests +from libcflib.tools import expand_file_and_mkdirs from tqdm import tqdm -from libcflib.harvester import harvest -from libcflib.preloader import reap, ReapFailure -from libcflib.tools import expand_file_and_mkdirs +from libcflib.preloader import ReapFailure, diff +import psutil + + +@contextlib.contextmanager +def executor(kind: str, max_workers: int, daemon=True): + """General purpose utility to get an executor with its as_completed handler + + This allows us to easily use other executors as needed. + """ + if kind == "thread": + with ThreadPoolExecutor(max_workers=max_workers) as pool_t: + yield pool_t + elif kind == "process": + with ProcessPoolExecutor(max_workers=max_workers) as pool_p: + yield pool_p + elif kind in ["dask", "dask-process", "dask-thread"]: + import dask + import distributed + from distributed.cfexecutor import ClientExecutor + + processes = kind == "dask" or kind == "dask-process" + + with dask.config.set({"distributed.worker.daemon": daemon}): + with distributed.LocalCluster( + n_workers=max_workers, + processes=processes, + ) as cluster: + with distributed.Client(cluster) as client: + yield ClientExecutor(client) + else: + raise NotImplementedError("That kind is not implemented") def file_path_to_import(file_path: str): return file_path.replace("/__init__.py", "").replace(".py", "").replace("/", ".") +class bla: + full_name='hi' + type='hi' + + def get_all_symbol_names(top_dir): + import jedi.settings + + jedi.settings.fast_parser = False + + from jedi.cache import clear_time_caches + import jedi # Note Jedi seems to pick up things that are protected by a # __name__ == '__main__' if statement # this could cause some over-reporting of viable imports this # shouldn't cause issues with an audit since we don't expect 3rd parties # to depend on those symbols_dict = {} - module_import = top_dir.split("/")[-1] + errors_dict = {} # walk all the files looking for python files - for root, dirs, files in tqdm(os.walk(top_dir)): - _files = [f for f in files if f.endswith(".py")] - for file in _files: - file_name = os.path.join(root, file) - import_name = file_path_to_import( - "".join(file_name.rpartition(module_import)[1:]) - ) - data = jedi.Script(path=file_name).complete() - symbols_from_script = { - k.full_name: k.type - for k in data - if k.full_name and module_import + "." in k.full_name - } - - # cull statements within functions and classes, which are not importable - classes_and_functions = { - k for k, v in symbols_from_script.items() if v in ["class", "function"] + glob_glob = glob.glob(f'{top_dir}/**/*.py', recursive=True) + for file_name in [k for k in glob_glob]: + # TODO: check for `__init__.py` existence or that the file is top level + folder_path = file_name.rpartition(top_dir + '/')[-1] + import_name = file_path_to_import(folder_path) + module_import = import_name.split('.')[0] + try: + data = jedi.Script(path=file_name, project=jedi.Project(''.join(top_dir))).complete() + # data = [bla] * 100 + except Exception as e: + print(import_name) + errors_dict[import_name] = { + "exception": str(e), + "traceback": str(traceback.format_exc()).split( + "\n", + ), } - for k in list(symbols_from_script): - for cf in classes_and_functions: - if k != cf and k.startswith(cf) and k in symbols_from_script: - symbols_from_script.pop(k) - - symbols_dict[import_name] = set(symbols_from_script) + data = [] + + symbols_from_script = { + k.full_name: k.type + for k in data + # Checks that the symbol has a name and comes from the pkg in question + if k.full_name and module_import + "." in k.full_name + } + + # cull statements within functions and classes, which are not importable + classes_and_functions = { + k for k, v in symbols_from_script.items() if v in ["class", "function"] + } + for k in list(symbols_from_script): + for cf in classes_and_functions: + if k != cf and k.startswith(cf) and k in symbols_from_script: + symbols_from_script.pop(k) + + symbols_dict[import_name] = set(symbols_from_script) + del data + del symbols_from_script + + # try to fix bad jedi memory leak + clear_time_caches(True) symbols = set() # handle star imports, which don't usually get added but are valid symbols for k, v in symbols_dict.items(): symbols.update(v) symbols.update({f"{k}.{vv.rsplit('.', 1)[-1]}" for vv in v}) - return symbols + del symbols_dict + return symbols, errors_dict def harvest_imports(io_like): tf = tarfile.open(fileobj=io_like, mode="r:bz2") + # TODO: push dir allocation into thread with TemporaryDirectory() as f: tf.extractall(path=f) - return list(get_all_symbol_names(os.path.join(f, 'site-packages'))) - - -def reap_imports(root_path, package, dst_path, src_url, progress_callback=None): + symbols = set() + errors = {} + found_sp = False + for root, subdirs, files in os.walk(f): + if root.lower().endswith('site-packages'): + found_sp = True + _symbols, _errors = get_all_symbol_names(root) + symbols.update(_symbols) + errors.update(_errors) + tf.close() + output = dict(errors=errors, symbols=sorted(symbols)) + if not found_sp: + return None + return output + + +def reap_imports(root_path, package, dst_path, src_url, + filelike, + progress_callback=None): if progress_callback: progress_callback() try: - resp = requests.get(src_url, timeout=60 * 2) - filelike = io.BytesIO(resp.content) harvested_data = harvest_imports(filelike) with open( expand_file_and_mkdirs(os.path.join(root_path, package, dst_path)), "w" ) as fo: json.dump(harvested_data, fo, indent=1, sort_keys=True) + del harvested_data except Exception as e: raise ReapFailure(package, src_url, str(e)) - return harvested_data - + # return harvested_data + + +def fetch_artifact(src_url): + resp = requests.get(src_url, timeout=60 * 2) + filelike = io.BytesIO(resp.content) + return filelike + + +def fetch_and_run(path, pkg, dst, src_url, progess_callback=None): + print('hi') + filelike = fetch_artifact(src_url) + print('fetched') + reap_imports(path, pkg, dst, src_url, filelike, progress_callback=progess_callback) + print('reaped') + + +def reap(path, known_bad_packages=(), reap_function=reap_imports, number_to_reap=1000, + ): + if not os.path.exists(path): + os.makedirs(path) + sorted_files = list(diff(path)) + print(f"TOTAL OUTSTANDING ARTIFACTS: {len(sorted_files)}") + sorted_files = sorted_files[:number_to_reap] + # progress = tqdm(total=len(sorted_files)) + + # with ThreadPoolExecutor(max_workers=20) as pool: + # futures = {pool.submit(fetch_artifact, src_url): (package, dst, src_url) + # for package, dst, src_url in sorted_files + # if (src_url not in known_bad_packages)} + # for f in as_completed(futures): + # try: + # initial = psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2 + # package, dst, src_url = futures.pop(f) + # reap_function(path, package, dst, src_url, f.result(), + # progress_callback=progress.update, + # ) + # del f + # print(initial, psutil.Process(os.getpid()).memory_info().rss / 1024 ** 2) + # except ReapFailure as e: + # print(f"FAILURE {e.args}") + # except Exception: + # pass + + with executor(max_workers=3, kind='dask') as pool: + futures = {pool.submit(fetch_and_run, path, package, dst, src_url, + # progress.update + ): (package, dst, src_url) + for package, dst, src_url in sorted_files + if (src_url not in known_bad_packages)} + for f in as_completed(futures): + try: + initial = psutil.virtual_memory().available / 1024 ** 2 + f.result() + print(initial, psutil.virtual_memory().available / 1024 **2) + except ReapFailure as e: + print(f"FAILURE {e.args}") + except Exception: + pass + + # for package, dst, src_url in sorted_files: + # if (src_url in known_bad_packages): + # continue + # f = fetch_artifact(src_url) + # try: + # reap_function(path, package, dst, src_url, f, + # progress_callback=progress.update, + # ) + # except ReapFailure as e: + # print(f"FAILURE {e.args}") + # except Exception: + # pass if __name__ == "__main__": import argparse @@ -102,5 +249,4 @@ def reap_imports(root_path, package, dst_path, src_url, progress_callback=None): known_bad_packages = set(json.load(fo)) else: known_bad_packages = set() - - reap(args.root_path, known_bad_packages, reap_imports, number_to_reap=10) + reap(args.root_path, known_bad_packages, reap_imports, number_to_reap=100) From 15a4beae252688656ec6b9c61a29cd6da9206975 Mon Sep 17 00:00:00 2001 From: "Christopher J. Wright" Date: Fri, 1 Jan 2021 14:09:39 -0500 Subject: [PATCH 5/5] use dask so that memory leaks are handled --- libcflib/symbol_inspection.py | 109 ++++++++++++++++------------------ 1 file changed, 52 insertions(+), 57 deletions(-) diff --git a/libcflib/symbol_inspection.py b/libcflib/symbol_inspection.py index 66eb024..6e347d9 100644 --- a/libcflib/symbol_inspection.py +++ b/libcflib/symbol_inspection.py @@ -53,18 +53,51 @@ def file_path_to_import(file_path: str): return file_path.replace("/__init__.py", "").replace(".py", "").replace("/", ".") -class bla: - full_name='hi' - type='hi' +def single_file_extraction(file_name, top_dir): + from jedi.cache import clear_time_caches + import jedi + symbols_dict = {} + errors_dict = {} + # TODO: check for `__init__.py` existence or that the file is top level + folder_path = file_name.rpartition(top_dir + '/')[-1] + import_name = file_path_to_import(folder_path) + module_import = import_name.split('.')[0] + try: + data = jedi.Script(path=file_name, project=jedi.Project(''.join(top_dir))).complete() + except Exception as e: + print(import_name, str(e)) + errors_dict[import_name] = { + "exception": str(e), + "traceback": str(traceback.format_exc()).split( + "\n", + ), + } + data = [] + + symbols_from_script = { + k.full_name: k.type + for k in data + # Checks that the symbol has a name and comes from the pkg in question + if k.full_name and module_import + "." in k.full_name + } + + # cull statements within functions and classes, which are not importable + classes_and_functions = { + k for k, v in symbols_from_script.items() if v in ["class", "function"] + } + for k in list(symbols_from_script): + for cf in classes_and_functions: + if k != cf and k.startswith(cf) and k in symbols_from_script: + symbols_from_script.pop(k) + + symbols_dict[import_name] = set(symbols_from_script) + del data + del symbols_from_script + clear_time_caches(True) + return symbols_dict, errors_dict def get_all_symbol_names(top_dir): - import jedi.settings - - jedi.settings.fast_parser = False - - from jedi.cache import clear_time_caches - import jedi # Note Jedi seems to pick up things that are protected by a # __name__ == '__main__' if statement # this could cause some over-reporting of viable imports this @@ -75,45 +108,9 @@ def get_all_symbol_names(top_dir): # walk all the files looking for python files glob_glob = glob.glob(f'{top_dir}/**/*.py', recursive=True) for file_name in [k for k in glob_glob]: - # TODO: check for `__init__.py` existence or that the file is top level - folder_path = file_name.rpartition(top_dir + '/')[-1] - import_name = file_path_to_import(folder_path) - module_import = import_name.split('.')[0] - try: - data = jedi.Script(path=file_name, project=jedi.Project(''.join(top_dir))).complete() - # data = [bla] * 100 - except Exception as e: - print(import_name) - errors_dict[import_name] = { - "exception": str(e), - "traceback": str(traceback.format_exc()).split( - "\n", - ), - } - data = [] - - symbols_from_script = { - k.full_name: k.type - for k in data - # Checks that the symbol has a name and comes from the pkg in question - if k.full_name and module_import + "." in k.full_name - } - - # cull statements within functions and classes, which are not importable - classes_and_functions = { - k for k, v in symbols_from_script.items() if v in ["class", "function"] - } - for k in list(symbols_from_script): - for cf in classes_and_functions: - if k != cf and k.startswith(cf) and k in symbols_from_script: - symbols_from_script.pop(k) - - symbols_dict[import_name] = set(symbols_from_script) - del data - del symbols_from_script - - # try to fix bad jedi memory leak - clear_time_caches(True) + sd, ed = single_file_extraction(file_name, top_dir) + symbols_dict.update(sd) + errors_dict.update(ed) symbols = set() # handle star imports, which don't usually get added but are valid symbols @@ -126,9 +123,9 @@ def get_all_symbol_names(top_dir): def harvest_imports(io_like): tf = tarfile.open(fileobj=io_like, mode="r:bz2") - # TODO: push dir allocation into thread + # TODO: push dir allocation into thread? with TemporaryDirectory() as f: - tf.extractall(path=f) + tf.extractall(path=f, members=[m for m in tf.getmembers() if m.name.endswith('.py')]) symbols = set() errors = {} found_sp = False @@ -169,11 +166,11 @@ def fetch_artifact(src_url): def fetch_and_run(path, pkg, dst, src_url, progess_callback=None): - print('hi') + print(f'starting {pkg}') filelike = fetch_artifact(src_url) - print('fetched') reap_imports(path, pkg, dst, src_url, filelike, progress_callback=progess_callback) - print('reaped') + filelike.close() + print(f'reaped {pkg}') def reap(path, known_bad_packages=(), reap_function=reap_imports, number_to_reap=1000, @@ -203,17 +200,15 @@ def reap(path, known_bad_packages=(), reap_function=reap_imports, number_to_reap # except Exception: # pass - with executor(max_workers=3, kind='dask') as pool: + with executor(max_workers=5, kind='dask') as pool: futures = {pool.submit(fetch_and_run, path, package, dst, src_url, # progress.update ): (package, dst, src_url) for package, dst, src_url in sorted_files if (src_url not in known_bad_packages)} - for f in as_completed(futures): + for f in tqdm(as_completed(futures), total=len(sorted_files)): try: - initial = psutil.virtual_memory().available / 1024 ** 2 f.result() - print(initial, psutil.virtual_memory().available / 1024 **2) except ReapFailure as e: print(f"FAILURE {e.args}") except Exception: