diff --git a/.github/workflows/production.yml b/.github/workflows/production.yml index c50aefd5af..0ea61ea349 100644 --- a/.github/workflows/production.yml +++ b/.github/workflows/production.yml @@ -150,7 +150,8 @@ jobs: run: | srun ${SRUN_COMMON} bash -e -s <<'EOF' source /venv/bin/activate - pytest --print -x -m "benchmarks" ./tests + pytest --mem-monitoring-filepath "/mnt/data/artifacts/mem_test_${SLURM_JOB_NAME}.csv" \ + --print -x -m "benchmarks" ./tests cat speed_test*.txt > "/mnt/data/artifacts/speed_test_${SLURM_JOB_NAME}.txt" EOF @@ -168,3 +169,8 @@ jobs: with: name: speed-test-${{ matrix.GS_ENABLE_NDARRAY }} path: "/mnt/data/artifacts/speed_test_${{ env.SLURM_JOB_NAME }}.txt" + - name: Upload benchmark mem stats as artifact + uses: actions/upload-artifact@v4 + with: + name: mem-test-${{ matrix.GS_ENABLE_NDARRAY }} + path: "/mnt/data/artifacts/mem_test_${{ env.SLURM_JOB_NAME }}.csv" \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 6e3c821047..8ce0113fb5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -75,6 +75,7 @@ dev = [ "pytest-print", # - 16.0 is causing pytest-xdist to crash in case of failure or skipped tests "pytest-rerunfailures!=16.0", + "setproctitle", # allows renaming the test processes on the cluster "syrupy", "huggingface_hub[hf_xet]", "wandb", diff --git a/tests/conftest.py b/tests/conftest.py index a87ec77738..77e8303cb6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,11 +5,13 @@ import os import re import subprocess +from argparse import SUPPRESS import sys from enum import Enum from io import BytesIO from pathlib import Path +import setproctitle import psutil import pyglet import pytest @@ -56,6 +58,15 @@ IMG_NUM_ERR_THR = 0.001 +def is_mem_monitoring_supported(): + try: + assert sys.platform.startswith("linux") + subprocess.check_output(["nvidia-smi"], stderr=subprocess.STDOUT, timeout=2) + return True, None + except Exception as exc: # platform or nvidia-smi unavailable + return False, exc + + def pytest_make_parametrize_id(config, val, argname): if isinstance(val, Enum): return val.name @@ -64,7 +75,7 @@ def pytest_make_parametrize_id(config, val, argname): return f"{val}" -@pytest.hookimpl +@pytest.hookimpl(tryfirst=True) def pytest_cmdline_main(config: pytest.Config) -> None: # Make sure that no unsupported markers have been specified in CLI declared_markers = set(name for spec in config.getini("markers") if (name := spec.split(":")[0]) != "forked") @@ -73,6 +84,22 @@ def pytest_cmdline_main(config: pytest.Config) -> None: except NameError as e: raise pytest.UsageError(f"Unknown marker in CLI expression: '{e.name}'") + # Only launch memory monitor from the main process, not from xdist workers + mem_filepath = config.getoption("--mem-monitoring-filepath") + if mem_filepath and not os.environ.get("PYTEST_XDIST_WORKER"): + supported, reason = is_mem_monitoring_supported() + if not supported: + raise pytest.UsageError(f"--mem-monitoring-filepath is not supported on this platform: {reason}") + subprocess.Popen( + [ + sys.executable, + "tests/monitor_test_mem.py", + "--die-with-parent", + "--out-csv-filepath", + mem_filepath, + ] + ) + # Make sure that benchmarks are running on GPU and the number of workers if valid expr = Expression.compile(config.option.markexpr) is_benchmarks = expr.evaluate(MarkMatcher.from_markers((pytest.mark.benchmarks,))) @@ -328,7 +355,12 @@ def pytest_collection_modifyitems(config, items): items[:] = [item for bucket in sorted(buckets, key=len) for item in bucket] +@pytest.hookimpl(tryfirst=True) def pytest_runtest_setup(item): + # Include test name in process title + test_name = item.nodeid.replace(" ", "") + setproctitle.setproctitle(f"pytest: {test_name}") + # Match CUDA device with EGL device. # Note that this must be done here instead of 'pytest_cmdline_main', otherwise it will segfault when using # 'pytest-forked', because EGL instances are not allowed to cross thread boundaries. @@ -348,6 +380,13 @@ def pytest_addoption(parser): ) parser.addoption("--vis", action="store_true", default=False, help="Enable interactive viewer.") parser.addoption("--dev", action="store_true", default=False, help="Enable genesis debug mode.") + supported, _reason = is_mem_monitoring_supported() + help_text = ( + "Run memory monitoring, and store results to mem_monitoring_filepath. CUDA on linux ONLY." + if supported + else SUPPRESS + ) + parser.addoption("--mem-monitoring-filepath", type=str, help=help_text) @pytest.fixture(scope="session") diff --git a/tests/monitor_test_mem.py b/tests/monitor_test_mem.py new file mode 100644 index 0000000000..f5ddda07ab --- /dev/null +++ b/tests/monitor_test_mem.py @@ -0,0 +1,110 @@ +from collections import defaultdict +import csv +import subprocess +import time +import os +import argparse +import psutil + + +def grep(contents: list[str], target): + return [l for l in contents if target in l] + + +def get_cuda_usage() -> dict[int, int]: + output = subprocess.check_output(["nvidia-smi"]).decode("utf-8") + section = 0 + subsec = 0 + res = {} + for line in output.split("\n"): + if line.startswith("|============"): + section += 1 + subsec = 0 + continue + if line.startswith("+-------"): + subsec += 1 + continue + if section == 2 and subsec == 0: + if "No running processes" in line: + continue + split_line = line.split() + pid = int(split_line[4]) + mem = int(split_line[-2].split("MiB")[0]) + res[pid] = mem + return res + + +def get_test_name_by_pid() -> dict[int, str]: + test_by_psid = {} + for proc in psutil.process_iter(["pid", "cmdline"]): + try: + cmdline = proc.info["cmdline"] + if cmdline is None: + continue + # Join cmdline to get full command string + cmd_str = " ".join(cmdline) + if "pytest: tests" in cmd_str: + # Find the test name after "::" + if "::" in cmd_str: + test_name = cmd_str.partition("::")[2] + if test_name.strip() != "": + test_by_psid[proc.info["pid"]] = test_name + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + # Process may have terminated or we don't have permission + pass + return test_by_psid + + +def main() -> None: + parser = argparse.ArgumentParser() + parser.add_argument("--out-csv-filepath", type=str, required=True) + parser.add_argument("--die-with-parent", action="store_true") + args = parser.parse_args() + + max_mem_by_test = defaultdict(int) + + f = open(args.out_csv_filepath, "w") + dict_writer = csv.DictWriter(f, fieldnames=["test", "max_mem_mb"]) + dict_writer.writeheader() + old_mem_by_test = {} + num_results_written = 0 + disp = False + while not args.die_with_parent or os.getppid() != 1: + mem_by_pid = get_cuda_usage() + test_by_psid = get_test_name_by_pid() + num_tests = len(test_by_psid) + _mem_by_test = {} + for psid, test in test_by_psid.items(): + if psid not in mem_by_pid: + continue + if test.strip() == "": + continue + _mem = mem_by_pid[psid] + _mem_by_test[test] = _mem + for test, _mem in _mem_by_test.items(): + max_mem_by_test[test] = max(_mem, max_mem_by_test[test]) + for _test, _mem in old_mem_by_test.items(): + if _test not in _mem_by_test: + dict_writer.writerow({"test": _test, "max_mem_mb": max_mem_by_test[_test]}) + f.flush() + num_results_written += 1 + spinny = "x" if disp else "+" + print( + num_tests, + "tests running, of which", + len(_mem_by_test), + "on gpu. Num results written: ", + num_results_written, + "[updating]", + " ", + end="\r", + flush=True, + ) + old_mem_by_test = _mem_by_test + disp = not disp + time.sleep(2.0) + print("Test monitor exiting") + + +if __name__ == "__main__": + main()