diff --git a/Lib/profiling/sampling/binary_reader.py b/Lib/profiling/sampling/binary_reader.py index 50c96668cc585b..13b10812d13052 100644 --- a/Lib/profiling/sampling/binary_reader.py +++ b/Lib/profiling/sampling/binary_reader.py @@ -1,5 +1,11 @@ """Thin Python wrapper around C binary reader for profiling data.""" +import _remote_debugging + +from .gecko_collector import GeckoCollector +from .stack_collector import FlamegraphCollector, CollapsedStackCollector +from .pstats_collector import PStatsCollector + class BinaryReader: """High-performance binary reader using C implementation. @@ -23,7 +29,6 @@ def __init__(self, filename): self._reader = None def __enter__(self): - import _remote_debugging self._reader = _remote_debugging.BinaryReader(self.filename) return self @@ -99,10 +104,6 @@ def convert_binary_to_format(input_file, output_file, output_format, Returns: int: Number of samples converted """ - from .gecko_collector import GeckoCollector - from .stack_collector import FlamegraphCollector, CollapsedStackCollector - from .pstats_collector import PStatsCollector - with BinaryReader(input_file) as reader: info = reader.get_info() interval = sample_interval_usec or info['sample_interval_us'] diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py index e43925ea8595f0..2976447195e3af 100644 --- a/Lib/profiling/sampling/cli.py +++ b/Lib/profiling/sampling/cli.py @@ -35,6 +35,8 @@ SORT_MODE_NSAMPLES_CUMUL, ) +from ._child_monitor import ChildProcessMonitor + try: from .live_collector import LiveStatsCollector except ImportError: @@ -93,8 +95,6 @@ class CustomFormatter( } def _setup_child_monitor(args, parent_pid): - from ._child_monitor import ChildProcessMonitor - # Build CLI args for child profilers (excluding --subprocesses to avoid recursion) child_cli_args = _build_child_profiler_args(args) @@ -1123,8 +1123,6 @@ def _handle_live_run(args): def _handle_replay(args): """Handle the 'replay' command - convert binary profile to another format.""" - import os - if not os.path.exists(args.input_file): sys.exit(f"Error: Input file not found: {args.input_file}") diff --git a/Lib/profiling/sampling/heatmap_collector.py b/Lib/profiling/sampling/heatmap_collector.py index bb810fa485be63..3793c8f3762032 100644 --- a/Lib/profiling/sampling/heatmap_collector.py +++ b/Lib/profiling/sampling/heatmap_collector.py @@ -18,6 +18,7 @@ from ._css_utils import get_combined_css from ._format_utils import fmt from .collector import normalize_location, extract_lineno +from .opcode_utils import get_opcode_info, format_opcode from .stack_collector import StackTraceCollector @@ -634,8 +635,6 @@ def _get_bytecode_data_for_line(self, filename, lineno): Returns: List of dicts with instruction info, sorted by samples descending """ - from .opcode_utils import get_opcode_info, format_opcode - key = (filename, lineno) opcode_data = self.line_opcodes.get(key, {}) @@ -1038,8 +1037,6 @@ def _render_source_with_highlights(self, line_content: str, line_num: int, Simple: collect ranges with sample counts, assign each byte position to smallest covering range, then emit spans for contiguous runs with sample data. """ - import html as html_module - content = line_content.rstrip('\n') if not content: return '' @@ -1062,7 +1059,7 @@ def _render_source_with_highlights(self, line_content: str, line_num: int, range_data[key]['opcodes'].append(opname) if not range_data: - return html_module.escape(content) + return html.escape(content) # For each byte position, find the smallest covering range byte_to_range = {} @@ -1090,7 +1087,7 @@ def _render_source_with_highlights(self, line_content: str, line_num: int, def flush_span(): nonlocal span_chars, current_range if span_chars: - text = html_module.escape(''.join(span_chars)) + text = html.escape(''.join(span_chars)) if current_range: data = range_data.get(current_range, {'samples': 0, 'opcodes': []}) samples = data['samples'] @@ -1104,7 +1101,7 @@ def flush_span(): f'data-samples="{samples}" ' f'data-max-samples="{max_range_samples}" ' f'data-pct="{pct}" ' - f'data-opcodes="{html_module.escape(opcodes)}">{text}') + f'data-opcodes="{html.escape(opcodes)}">{text}') else: result.append(text) span_chars = [] diff --git a/Lib/profiling/sampling/live_collector/collector.py b/Lib/profiling/sampling/live_collector/collector.py index b31ab060a6b934..620abe4e2382d3 100644 --- a/Lib/profiling/sampling/live_collector/collector.py +++ b/Lib/profiling/sampling/live_collector/collector.py @@ -932,8 +932,6 @@ def _show_terminal_size_warning_and_wait(self, height, width): def _handle_input(self): """Handle keyboard input (non-blocking).""" - from . import constants - self.display.set_nodelay(True) ch = self.display.get_input() diff --git a/Lib/profiling/sampling/live_collector/widgets.py b/Lib/profiling/sampling/live_collector/widgets.py index cf04f3aa3254ef..069f05ffd65bea 100644 --- a/Lib/profiling/sampling/live_collector/widgets.py +++ b/Lib/profiling/sampling/live_collector/widgets.py @@ -31,6 +31,7 @@ PROFILING_MODE_GIL, PROFILING_MODE_WALL, ) +from ..opcode_utils import get_opcode_info, format_opcode class Widget(ABC): @@ -1007,8 +1008,6 @@ def render(self, line, width, **kwargs): Returns: Next available line number """ - from ..opcode_utils import get_opcode_info, format_opcode - stats_list = kwargs.get("stats_list", []) height = kwargs.get("height", 24) selected_row = self.collector.selected_row diff --git a/Lib/profiling/sampling/pstats_collector.py b/Lib/profiling/sampling/pstats_collector.py index e0dc9ab6bb7edb..6be1d698ffaa9a 100644 --- a/Lib/profiling/sampling/pstats_collector.py +++ b/Lib/profiling/sampling/pstats_collector.py @@ -1,9 +1,10 @@ import collections import marshal +import pstats from _colorize import ANSIColors from .collector import Collector, extract_lineno -from .constants import MICROSECONDS_PER_SECOND +from .constants import MICROSECONDS_PER_SECOND, PROFILING_MODE_CPU class PstatsCollector(Collector): @@ -86,9 +87,6 @@ def create_stats(self): def print_stats(self, sort=-1, limit=None, show_summary=True, mode=None): """Print formatted statistics to stdout.""" - import pstats - from .constants import PROFILING_MODE_CPU - # Create stats object stats = pstats.SampledStats(self).strip_dirs() if not stats.stats: diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index e73306ebf290e7..139c116a825ce2 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -1,5 +1,6 @@ import _remote_debugging import contextlib +import curses import os import statistics import sys @@ -459,8 +460,6 @@ def sample_live( Returns: The collector with collected samples """ - import curses - # Check if process is alive before doing any heavy initialization if not _is_process_running(pid): print(f"No samples collected - process {pid} exited before profiling could begin.", file=sys.stderr) diff --git a/Lib/profiling/sampling/stack_collector.py b/Lib/profiling/sampling/stack_collector.py index 55e643d0e9c8cb..4e213cfe41ca24 100644 --- a/Lib/profiling/sampling/stack_collector.py +++ b/Lib/profiling/sampling/stack_collector.py @@ -6,6 +6,7 @@ import linecache import os import sys +import sysconfig from ._css_utils import get_combined_css from .collector import Collector, extract_lineno @@ -244,7 +245,6 @@ def convert_children(children, min_samples): } # Calculate thread status percentages for display - import sysconfig is_free_threaded = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) total_threads = max(1, self.thread_status_counts["total"]) thread_stats = { diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py index bcd4de7f5d7ebe..11b1ad84242fd4 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py @@ -11,6 +11,8 @@ import _remote_debugging # noqa: F401 import profiling.sampling import profiling.sampling.sample + from profiling.sampling.pstats_collector import PstatsCollector + from profiling.sampling.stack_collector import CollapsedStackCollector except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -61,7 +63,6 @@ def test_gc_frames_enabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.pstats_collector import PstatsCollector collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, @@ -88,7 +89,6 @@ def test_gc_frames_disabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.pstats_collector import PstatsCollector collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, @@ -140,7 +140,6 @@ def test_native_frames_enabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.stack_collector import CollapsedStackCollector collector = CollapsedStackCollector(1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, @@ -176,7 +175,6 @@ def test_native_frames_disabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.pstats_collector import PstatsCollector collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_async.py b/Lib/test/test_profiling/test_sampling_profiler/test_async.py index d8ca86c996bffa..1f5685717b6273 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_async.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_async.py @@ -6,11 +6,14 @@ 3. Stack traversal: _build_linear_stacks() with BFS """ +import inspect import unittest try: import _remote_debugging # noqa: F401 from profiling.sampling.pstats_collector import PstatsCollector + from profiling.sampling.stack_collector import FlamegraphCollector + from profiling.sampling.sample import sample, sample_live, SampleProfiler except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -561,8 +564,6 @@ class TestFlamegraphCollectorAsync(unittest.TestCase): def test_flamegraph_with_async_frames(self): """Test FlamegraphCollector correctly processes async task frames.""" - from profiling.sampling.stack_collector import FlamegraphCollector - collector = FlamegraphCollector(sample_interval_usec=1000) # Build async task tree: Root -> Child @@ -607,8 +608,6 @@ def test_flamegraph_with_async_frames(self): def test_flamegraph_with_task_markers(self): """Test FlamegraphCollector includes boundary markers.""" - from profiling.sampling.stack_collector import FlamegraphCollector - collector = FlamegraphCollector(sample_interval_usec=1000) task = MockTaskInfo( @@ -643,8 +642,6 @@ def find_task_marker(node, depth=0): def test_flamegraph_multiple_async_samples(self): """Test FlamegraphCollector aggregates multiple async samples correctly.""" - from profiling.sampling.stack_collector import FlamegraphCollector - collector = FlamegraphCollector(sample_interval_usec=1000) task = MockTaskInfo( @@ -675,25 +672,16 @@ class TestAsyncAwareParameterFlow(unittest.TestCase): def test_sample_function_accepts_async_aware(self): """Test that sample() function accepts async_aware parameter.""" - from profiling.sampling.sample import sample - import inspect - sig = inspect.signature(sample) self.assertIn("async_aware", sig.parameters) def test_sample_live_function_accepts_async_aware(self): """Test that sample_live() function accepts async_aware parameter.""" - from profiling.sampling.sample import sample_live - import inspect - sig = inspect.signature(sample_live) self.assertIn("async_aware", sig.parameters) def test_sample_profiler_sample_accepts_async_aware(self): """Test that SampleProfiler.sample() accepts async_aware parameter.""" - from profiling.sampling.sample import SampleProfiler - import inspect - sig = inspect.signature(SampleProfiler.sample) self.assertIn("async_aware", sig.parameters) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_children.py b/Lib/test/test_profiling/test_sampling_profiler/test_children.py index b7dc878a238f8d..cfc6ffe7f294c9 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_children.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_children.py @@ -10,6 +10,7 @@ import threading import time import unittest +from unittest.mock import MagicMock, patch from test.support import ( SHORT_TIMEOUT, @@ -17,6 +18,21 @@ requires_remote_subprocess_debugging, ) +from profiling.sampling._child_monitor import ( + get_child_pids, + ChildProcessMonitor, + is_python_process, + _MAX_CHILD_PROFILERS, + _CLEANUP_INTERVAL_CYCLES, +) +from profiling.sampling.cli import ( + _add_sampling_options, + _validate_args, + _build_child_profiler_args, + _build_output_pattern, + _setup_child_monitor, +) + from .helpers import _cleanup_process # String to check for in stderr when profiler lacks permissions (e.g., macOS) @@ -100,8 +116,6 @@ def test_get_child_pids_from_remote_debugging(self): def test_get_child_pids_fallback(self): """Test the fallback implementation for get_child_pids.""" - from profiling.sampling._child_monitor import get_child_pids - # Test with current process result = get_child_pids(os.getpid()) self.assertIsInstance(result, list) @@ -109,8 +123,6 @@ def test_get_child_pids_fallback(self): @unittest.skipUnless(sys.platform == "linux", "Linux only") def test_discover_child_process_linux(self): """Test that we can discover child processes on Linux.""" - from profiling.sampling._child_monitor import get_child_pids - # Create a child process proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(10)"], @@ -139,8 +151,6 @@ def test_discover_child_process_linux(self): def test_recursive_child_discovery(self): """Test that recursive=True finds grandchildren.""" - from profiling.sampling._child_monitor import get_child_pids - # Create a child that spawns a grandchild and keeps a reference to it # so we can clean it up via the child process code = """ @@ -256,8 +266,6 @@ def wait_for_signal(): def test_nonexistent_pid_returns_empty(self): """Test that nonexistent PID returns empty list.""" - from profiling.sampling._child_monitor import get_child_pids - # Use a very high PID that's unlikely to exist result = get_child_pids(999999999) self.assertEqual(result, []) @@ -275,8 +283,6 @@ def tearDown(self): def test_monitor_creation(self): """Test that ChildProcessMonitor can be created.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=["-r", "10khz", "-d", "5"], @@ -288,8 +294,6 @@ def test_monitor_creation(self): def test_monitor_lifecycle(self): """Test monitor lifecycle via context manager.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -307,8 +311,6 @@ def test_monitor_lifecycle(self): def test_spawned_profilers_property(self): """Test that spawned_profilers returns a copy of the list.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -320,8 +322,6 @@ def test_spawned_profilers_property(self): def test_context_manager(self): """Test that ChildProcessMonitor works as a context manager.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - with ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) as monitor: @@ -344,8 +344,6 @@ def tearDown(self): def test_subprocesses_flag_parsed(self): """Test that --subprocesses flag is recognized.""" - from profiling.sampling.cli import _add_sampling_options - parser = argparse.ArgumentParser() _add_sampling_options(parser) @@ -359,8 +357,6 @@ def test_subprocesses_flag_parsed(self): def test_subprocesses_incompatible_with_live(self): """Test that --subprocesses is incompatible with --live.""" - from profiling.sampling.cli import _validate_args - # Create mock args with both subprocesses and live args = argparse.Namespace( subprocesses=True, @@ -383,8 +379,6 @@ def test_subprocesses_incompatible_with_live(self): def test_build_child_profiler_args(self): """Test building CLI args for child profilers.""" - from profiling.sampling.cli import _build_child_profiler_args - args = argparse.Namespace( sample_interval_usec=200, duration=15, @@ -441,8 +435,6 @@ def assert_flag_value_pair(flag, value): def test_build_child_profiler_args_no_gc(self): """Test building CLI args with --no-gc.""" - from profiling.sampling.cli import _build_child_profiler_args - args = argparse.Namespace( sample_interval_usec=100, duration=5, @@ -466,8 +458,6 @@ def test_build_child_profiler_args_no_gc(self): def test_build_output_pattern_with_outfile(self): """Test output pattern generation with user-specified output.""" - from profiling.sampling.cli import _build_output_pattern - # With extension args = argparse.Namespace(outfile="output.html", format="flamegraph") pattern = _build_output_pattern(args) @@ -480,8 +470,6 @@ def test_build_output_pattern_with_outfile(self): def test_build_output_pattern_default(self): """Test output pattern generation with default output.""" - from profiling.sampling.cli import _build_output_pattern - # Flamegraph format args = argparse.Namespace(outfile=None, format="flamegraph") pattern = _build_output_pattern(args) @@ -507,8 +495,6 @@ def tearDown(self): def test_setup_child_monitor(self): """Test setting up a child monitor from args.""" - from profiling.sampling.cli import _setup_child_monitor - args = argparse.Namespace( sample_interval_usec=100, duration=5, @@ -548,8 +534,6 @@ def tearDown(self): def test_is_python_process_current_process(self): """Test that current process is detected as Python.""" - from profiling.sampling._child_monitor import is_python_process - # Current process should be Python result = is_python_process(os.getpid()) self.assertTrue( @@ -559,8 +543,6 @@ def test_is_python_process_current_process(self): def test_is_python_process_python_subprocess(self): """Test that a Python subprocess is detected as Python.""" - from profiling.sampling._child_monitor import is_python_process - # Start a Python subprocess proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(10)"], @@ -593,8 +575,6 @@ def test_is_python_process_python_subprocess(self): @unittest.skipUnless(sys.platform == "linux", "Linux only test") def test_is_python_process_non_python_subprocess(self): """Test that a non-Python subprocess is not detected as Python.""" - from profiling.sampling._child_monitor import is_python_process - # Start a non-Python subprocess (sleep command) proc = subprocess.Popen( ["sleep", "10"], @@ -619,8 +599,6 @@ def test_is_python_process_non_python_subprocess(self): def test_is_python_process_nonexistent_pid(self): """Test that nonexistent PID returns False.""" - from profiling.sampling._child_monitor import is_python_process - # Use a very high PID that's unlikely to exist result = is_python_process(999999999) self.assertFalse( @@ -630,8 +608,6 @@ def test_is_python_process_nonexistent_pid(self): def test_is_python_process_exited_process(self): """Test handling of a process that exits quickly.""" - from profiling.sampling._child_monitor import is_python_process - # Start a process that exits immediately proc = subprocess.Popen( [sys.executable, "-c", "pass"], @@ -661,8 +637,6 @@ def tearDown(self): def test_max_profilers_constant_exists(self): """Test that _MAX_CHILD_PROFILERS constant is defined.""" - from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS - self.assertEqual( _MAX_CHILD_PROFILERS, 100, @@ -671,8 +645,6 @@ def test_max_profilers_constant_exists(self): def test_cleanup_interval_constant_exists(self): """Test that _CLEANUP_INTERVAL_CYCLES constant is defined.""" - from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES - self.assertEqual( _CLEANUP_INTERVAL_CYCLES, 10, @@ -681,12 +653,6 @@ def test_cleanup_interval_constant_exists(self): def test_monitor_respects_max_limit(self): """Test that monitor refuses to spawn more than _MAX_CHILD_PROFILERS.""" - from profiling.sampling._child_monitor import ( - ChildProcessMonitor, - _MAX_CHILD_PROFILERS, - ) - from unittest.mock import MagicMock, patch - # Create a monitor monitor = ChildProcessMonitor( pid=os.getpid(), @@ -739,8 +705,6 @@ def tearDown(self): def test_wait_for_profilers_empty_list(self): """Test that wait_for_profilers returns immediately with no profilers.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -767,8 +731,6 @@ def test_wait_for_profilers_empty_list(self): def test_wait_for_profilers_with_completed_process(self): """Test waiting for profilers that complete quickly.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -807,8 +769,6 @@ def test_wait_for_profilers_with_completed_process(self): def test_wait_for_profilers_timeout(self): """Test that wait_for_profilers respects timeout.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -847,8 +807,6 @@ def test_wait_for_profilers_timeout(self): def test_wait_for_profilers_multiple(self): """Test waiting for multiple profilers.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py index 30615a7d31d86c..13bdb4e111364c 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py @@ -2,6 +2,7 @@ import json import marshal +import opcode import os import tempfile import unittest @@ -1437,7 +1438,6 @@ class TestOpcodeFormatting(unittest.TestCase): def test_get_opcode_info_standard_opcode(self): """Test get_opcode_info for a standard opcode.""" - import opcode # LOAD_CONST is a standard opcode load_const = opcode.opmap.get('LOAD_CONST') if load_const is not None: @@ -1455,7 +1455,6 @@ def test_get_opcode_info_unknown_opcode(self): def test_format_opcode_standard(self): """Test format_opcode for a standard opcode.""" - import opcode load_const = opcode.opmap.get('LOAD_CONST') if load_const is not None: formatted = format_opcode(load_const) @@ -1463,7 +1462,6 @@ def test_format_opcode_standard(self): def test_format_opcode_specialized(self): """Test format_opcode for a specialized opcode shows base in parens.""" - import opcode if not hasattr(opcode, '_specialized_opmap'): self.skipTest("No specialized opcodes in this Python version") if not hasattr(opcode, '_specializations'): diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index b82474858ddd4a..c6731e956391a9 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -18,6 +18,7 @@ from profiling.sampling.pstats_collector import PstatsCollector from profiling.sampling.stack_collector import CollapsedStackCollector from profiling.sampling.sample import SampleProfiler, _is_process_running + from profiling.sampling.cli import main except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -547,7 +548,6 @@ def test_sample_target_script(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.cli import main main() output = captured_output.getvalue() @@ -585,7 +585,6 @@ def test_sample_target_module(self): # Change to temp directory so subprocess can find the module contextlib.chdir(tempdir.name), ): - from profiling.sampling.cli import main main() output = captured_output.getvalue() @@ -714,8 +713,7 @@ def test_live_incompatible_with_pstats_options(self): test_args = ["profiling.sampling.cli", "run", "--live"] + args + ["test.py"] with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) def test_live_incompatible_with_multiple_pstats_options(self): @@ -727,8 +725,7 @@ def test_live_incompatible_with_multiple_pstats_options(self): with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) def test_live_incompatible_with_pstats_default_values(self): @@ -738,8 +735,7 @@ def test_live_incompatible_with_pstats_default_values(self): with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) # Test with --limit=15 (the default value) @@ -747,8 +743,7 @@ def test_live_incompatible_with_pstats_default_values(self): with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py index 38f1d03e4939f1..8342faffb94762 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py @@ -383,10 +383,9 @@ def test_finished_state_footer_message(self): def test_finished_state_freezes_time(self): """Test that time displays are frozen when finished.""" - import time as time_module # Set up collector with known start time - self.collector.start_time = time_module.perf_counter() - 10.0 # 10 seconds ago + self.collector.start_time = time.perf_counter() - 10.0 # 10 seconds ago # Mark as finished - this should freeze the time self.collector.mark_finished() @@ -396,7 +395,7 @@ def test_finished_state_freezes_time(self): frozen_time_display = self.collector.current_time_display # Wait a bit to ensure time would advance - time_module.sleep(0.1) + time.sleep(0.1) # Time should remain frozen self.assertEqual(self.collector.elapsed_time, frozen_elapsed) @@ -1215,7 +1214,6 @@ def test_reset_blocked_when_finished(self): def test_time_display_fix_when_finished(self): """Test that time display shows correct frozen time when finished.""" - import time as time_module # Mark as finished to freeze time self.collector.mark_finished() @@ -1228,7 +1226,7 @@ def test_time_display_fix_when_finished(self): frozen_time = self.collector.current_time_display # Wait a bit - time_module.sleep(0.1) + time.sleep(0.1) # Should still show the same frozen time (not jump to wrong time) self.assertEqual(self.collector.current_time_display, frozen_time) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py index 877237866b1e65..0b38fb4ad4bcf6 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py @@ -9,6 +9,12 @@ import profiling.sampling import profiling.sampling.sample from profiling.sampling.pstats_collector import PstatsCollector + from profiling.sampling.cli import main, _parse_mode + from profiling.sampling.constants import PROFILING_MODE_EXCEPTION + from _remote_debugging import ( + THREAD_STATUS_HAS_GIL, + THREAD_STATUS_ON_CPU, + ) except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -40,7 +46,6 @@ def test_mode_validation(self): mock.patch("sys.stderr", io.StringIO()) as mock_stderr, self.assertRaises(SystemExit) as cm, ): - from profiling.sampling.cli import main main() self.assertEqual(cm.exception.code, 2) # argparse error @@ -49,16 +54,6 @@ def test_mode_validation(self): def test_frames_filtered_with_skip_idle(self): """Test that frames are actually filtered when skip_idle=True.""" - # Import thread status flags - try: - from _remote_debugging import ( - THREAD_STATUS_HAS_GIL, - THREAD_STATUS_ON_CPU, - ) - except ImportError: - THREAD_STATUS_HAS_GIL = 1 << 0 - THREAD_STATUS_ON_CPU = 1 << 1 - # Create mock frames with different thread statuses class MockThreadInfoWithStatus: def __init__(self, thread_id, frame_info, status): @@ -240,7 +235,6 @@ class TestGilModeFiltering(unittest.TestCase): def test_gil_mode_validation(self): """Test that CLI accepts gil mode choice correctly.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -298,7 +292,6 @@ def test_gil_mode_sample_function_call(self): def test_gil_mode_cli_argument_parsing(self): """Test CLI argument parsing for GIL mode with various options.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -405,7 +398,6 @@ def test_mode_constants_are_defined(self): def test_parse_mode_function(self): """Test the _parse_mode function with all valid modes.""" - from profiling.sampling.cli import _parse_mode self.assertEqual(_parse_mode("wall"), 0) self.assertEqual(_parse_mode("cpu"), 1) self.assertEqual(_parse_mode("gil"), 2) @@ -422,7 +414,6 @@ class TestExceptionModeFiltering(unittest.TestCase): def test_exception_mode_validation(self): """Test that CLI accepts exception mode choice correctly.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -480,7 +471,6 @@ def test_exception_mode_sample_function_call(self): def test_exception_mode_cli_argument_parsing(self): """Test CLI argument parsing for exception mode with various options.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -512,7 +502,6 @@ def test_exception_mode_cli_argument_parsing(self): def test_exception_mode_constants_are_defined(self): """Test that exception mode constant is properly defined.""" - from profiling.sampling.constants import PROFILING_MODE_EXCEPTION self.assertEqual(PROFILING_MODE_EXCEPTION, 4) def test_exception_mode_integration_filtering(self): diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py index 822f559561eb0a..8d70a1d2ef8cfc 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py @@ -1,6 +1,7 @@ """Tests for sampling profiler core functionality.""" import io +import re from unittest import mock import unittest @@ -591,7 +592,6 @@ def test_print_sampled_stats_sort_by_name(self): # Extract just the function names for comparison func_names = [] - import re for line in data_lines: # Function name is between the last ( and ), accounting for ANSI color codes