From 2f2b091d0a6cad734a460054c611eab949aa6695 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Salgado Date: Sun, 28 Dec 2025 17:22:07 +0000 Subject: [PATCH] Move local imports to module level in sampling profiler The sampling profiler code had numerous imports placed inside functions rather than at module level. While deferred imports can reduce startup time for rarely-used code paths, these imports were in functions called during normal profiler operation, adding repeated import overhead. Moving imports to module level follows Python best practices for code that runs frequently. This makes import dependencies explicit at file scope and eliminates per-call import lookup costs. The test files also had redundant local imports that duplicated module-level imports. --- Lib/profiling/sampling/binary_reader.py | 11 +-- Lib/profiling/sampling/cli.py | 6 +- Lib/profiling/sampling/heatmap_collector.py | 11 +-- .../sampling/live_collector/collector.py | 2 - .../sampling/live_collector/widgets.py | 3 +- Lib/profiling/sampling/pstats_collector.py | 6 +- Lib/profiling/sampling/sample.py | 3 +- Lib/profiling/sampling/stack_collector.py | 2 +- .../test_sampling_profiler/test_advanced.py | 6 +- .../test_sampling_profiler/test_async.py | 18 +---- .../test_sampling_profiler/test_children.py | 74 ++++--------------- .../test_sampling_profiler/test_collectors.py | 4 +- .../test_integration.py | 15 ++-- .../test_live_collector_interaction.py | 8 +- .../test_sampling_profiler/test_modes.py | 23 ++---- .../test_sampling_profiler/test_profiler.py | 2 +- 16 files changed, 54 insertions(+), 140 deletions(-) diff --git a/Lib/profiling/sampling/binary_reader.py b/Lib/profiling/sampling/binary_reader.py index 50c96668cc585b..13b10812d13052 100644 --- a/Lib/profiling/sampling/binary_reader.py +++ b/Lib/profiling/sampling/binary_reader.py @@ -1,5 +1,11 @@ """Thin Python wrapper around C binary reader for profiling data.""" +import _remote_debugging + +from .gecko_collector import GeckoCollector +from .stack_collector import FlamegraphCollector, CollapsedStackCollector +from .pstats_collector import PStatsCollector + class BinaryReader: """High-performance binary reader using C implementation. @@ -23,7 +29,6 @@ def __init__(self, filename): self._reader = None def __enter__(self): - import _remote_debugging self._reader = _remote_debugging.BinaryReader(self.filename) return self @@ -99,10 +104,6 @@ def convert_binary_to_format(input_file, output_file, output_format, Returns: int: Number of samples converted """ - from .gecko_collector import GeckoCollector - from .stack_collector import FlamegraphCollector, CollapsedStackCollector - from .pstats_collector import PStatsCollector - with BinaryReader(input_file) as reader: info = reader.get_info() interval = sample_interval_usec or info['sample_interval_us'] diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py index e43925ea8595f0..2976447195e3af 100644 --- a/Lib/profiling/sampling/cli.py +++ b/Lib/profiling/sampling/cli.py @@ -35,6 +35,8 @@ SORT_MODE_NSAMPLES_CUMUL, ) +from ._child_monitor import ChildProcessMonitor + try: from .live_collector import LiveStatsCollector except ImportError: @@ -93,8 +95,6 @@ class CustomFormatter( } def _setup_child_monitor(args, parent_pid): - from ._child_monitor import ChildProcessMonitor - # Build CLI args for child profilers (excluding --subprocesses to avoid recursion) child_cli_args = _build_child_profiler_args(args) @@ -1123,8 +1123,6 @@ def _handle_live_run(args): def _handle_replay(args): """Handle the 'replay' command - convert binary profile to another format.""" - import os - if not os.path.exists(args.input_file): sys.exit(f"Error: Input file not found: {args.input_file}") diff --git a/Lib/profiling/sampling/heatmap_collector.py b/Lib/profiling/sampling/heatmap_collector.py index bb810fa485be63..3793c8f3762032 100644 --- a/Lib/profiling/sampling/heatmap_collector.py +++ b/Lib/profiling/sampling/heatmap_collector.py @@ -18,6 +18,7 @@ from ._css_utils import get_combined_css from ._format_utils import fmt from .collector import normalize_location, extract_lineno +from .opcode_utils import get_opcode_info, format_opcode from .stack_collector import StackTraceCollector @@ -634,8 +635,6 @@ def _get_bytecode_data_for_line(self, filename, lineno): Returns: List of dicts with instruction info, sorted by samples descending """ - from .opcode_utils import get_opcode_info, format_opcode - key = (filename, lineno) opcode_data = self.line_opcodes.get(key, {}) @@ -1038,8 +1037,6 @@ def _render_source_with_highlights(self, line_content: str, line_num: int, Simple: collect ranges with sample counts, assign each byte position to smallest covering range, then emit spans for contiguous runs with sample data. """ - import html as html_module - content = line_content.rstrip('\n') if not content: return '' @@ -1062,7 +1059,7 @@ def _render_source_with_highlights(self, line_content: str, line_num: int, range_data[key]['opcodes'].append(opname) if not range_data: - return html_module.escape(content) + return html.escape(content) # For each byte position, find the smallest covering range byte_to_range = {} @@ -1090,7 +1087,7 @@ def _render_source_with_highlights(self, line_content: str, line_num: int, def flush_span(): nonlocal span_chars, current_range if span_chars: - text = html_module.escape(''.join(span_chars)) + text = html.escape(''.join(span_chars)) if current_range: data = range_data.get(current_range, {'samples': 0, 'opcodes': []}) samples = data['samples'] @@ -1104,7 +1101,7 @@ def flush_span(): f'data-samples="{samples}" ' f'data-max-samples="{max_range_samples}" ' f'data-pct="{pct}" ' - f'data-opcodes="{html_module.escape(opcodes)}">{text}') + f'data-opcodes="{html.escape(opcodes)}">{text}') else: result.append(text) span_chars = [] diff --git a/Lib/profiling/sampling/live_collector/collector.py b/Lib/profiling/sampling/live_collector/collector.py index b31ab060a6b934..620abe4e2382d3 100644 --- a/Lib/profiling/sampling/live_collector/collector.py +++ b/Lib/profiling/sampling/live_collector/collector.py @@ -932,8 +932,6 @@ def _show_terminal_size_warning_and_wait(self, height, width): def _handle_input(self): """Handle keyboard input (non-blocking).""" - from . import constants - self.display.set_nodelay(True) ch = self.display.get_input() diff --git a/Lib/profiling/sampling/live_collector/widgets.py b/Lib/profiling/sampling/live_collector/widgets.py index cf04f3aa3254ef..069f05ffd65bea 100644 --- a/Lib/profiling/sampling/live_collector/widgets.py +++ b/Lib/profiling/sampling/live_collector/widgets.py @@ -31,6 +31,7 @@ PROFILING_MODE_GIL, PROFILING_MODE_WALL, ) +from ..opcode_utils import get_opcode_info, format_opcode class Widget(ABC): @@ -1007,8 +1008,6 @@ def render(self, line, width, **kwargs): Returns: Next available line number """ - from ..opcode_utils import get_opcode_info, format_opcode - stats_list = kwargs.get("stats_list", []) height = kwargs.get("height", 24) selected_row = self.collector.selected_row diff --git a/Lib/profiling/sampling/pstats_collector.py b/Lib/profiling/sampling/pstats_collector.py index e0dc9ab6bb7edb..6be1d698ffaa9a 100644 --- a/Lib/profiling/sampling/pstats_collector.py +++ b/Lib/profiling/sampling/pstats_collector.py @@ -1,9 +1,10 @@ import collections import marshal +import pstats from _colorize import ANSIColors from .collector import Collector, extract_lineno -from .constants import MICROSECONDS_PER_SECOND +from .constants import MICROSECONDS_PER_SECOND, PROFILING_MODE_CPU class PstatsCollector(Collector): @@ -86,9 +87,6 @@ def create_stats(self): def print_stats(self, sort=-1, limit=None, show_summary=True, mode=None): """Print formatted statistics to stdout.""" - import pstats - from .constants import PROFILING_MODE_CPU - # Create stats object stats = pstats.SampledStats(self).strip_dirs() if not stats.stats: diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py index e73306ebf290e7..139c116a825ce2 100644 --- a/Lib/profiling/sampling/sample.py +++ b/Lib/profiling/sampling/sample.py @@ -1,5 +1,6 @@ import _remote_debugging import contextlib +import curses import os import statistics import sys @@ -459,8 +460,6 @@ def sample_live( Returns: The collector with collected samples """ - import curses - # Check if process is alive before doing any heavy initialization if not _is_process_running(pid): print(f"No samples collected - process {pid} exited before profiling could begin.", file=sys.stderr) diff --git a/Lib/profiling/sampling/stack_collector.py b/Lib/profiling/sampling/stack_collector.py index 55e643d0e9c8cb..4e213cfe41ca24 100644 --- a/Lib/profiling/sampling/stack_collector.py +++ b/Lib/profiling/sampling/stack_collector.py @@ -6,6 +6,7 @@ import linecache import os import sys +import sysconfig from ._css_utils import get_combined_css from .collector import Collector, extract_lineno @@ -244,7 +245,6 @@ def convert_children(children, min_samples): } # Calculate thread status percentages for display - import sysconfig is_free_threaded = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) total_threads = max(1, self.thread_status_counts["total"]) thread_stats = { diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py index bcd4de7f5d7ebe..11b1ad84242fd4 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py @@ -11,6 +11,8 @@ import _remote_debugging # noqa: F401 import profiling.sampling import profiling.sampling.sample + from profiling.sampling.pstats_collector import PstatsCollector + from profiling.sampling.stack_collector import CollapsedStackCollector except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -61,7 +63,6 @@ def test_gc_frames_enabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.pstats_collector import PstatsCollector collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, @@ -88,7 +89,6 @@ def test_gc_frames_disabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.pstats_collector import PstatsCollector collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, @@ -140,7 +140,6 @@ def test_native_frames_enabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.stack_collector import CollapsedStackCollector collector = CollapsedStackCollector(1000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, @@ -176,7 +175,6 @@ def test_native_frames_disabled(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.pstats_collector import PstatsCollector collector = PstatsCollector(sample_interval_usec=5000, skip_idle=False) profiling.sampling.sample.sample( subproc.process.pid, diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_async.py b/Lib/test/test_profiling/test_sampling_profiler/test_async.py index d8ca86c996bffa..1f5685717b6273 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_async.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_async.py @@ -6,11 +6,14 @@ 3. Stack traversal: _build_linear_stacks() with BFS """ +import inspect import unittest try: import _remote_debugging # noqa: F401 from profiling.sampling.pstats_collector import PstatsCollector + from profiling.sampling.stack_collector import FlamegraphCollector + from profiling.sampling.sample import sample, sample_live, SampleProfiler except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -561,8 +564,6 @@ class TestFlamegraphCollectorAsync(unittest.TestCase): def test_flamegraph_with_async_frames(self): """Test FlamegraphCollector correctly processes async task frames.""" - from profiling.sampling.stack_collector import FlamegraphCollector - collector = FlamegraphCollector(sample_interval_usec=1000) # Build async task tree: Root -> Child @@ -607,8 +608,6 @@ def test_flamegraph_with_async_frames(self): def test_flamegraph_with_task_markers(self): """Test FlamegraphCollector includes boundary markers.""" - from profiling.sampling.stack_collector import FlamegraphCollector - collector = FlamegraphCollector(sample_interval_usec=1000) task = MockTaskInfo( @@ -643,8 +642,6 @@ def find_task_marker(node, depth=0): def test_flamegraph_multiple_async_samples(self): """Test FlamegraphCollector aggregates multiple async samples correctly.""" - from profiling.sampling.stack_collector import FlamegraphCollector - collector = FlamegraphCollector(sample_interval_usec=1000) task = MockTaskInfo( @@ -675,25 +672,16 @@ class TestAsyncAwareParameterFlow(unittest.TestCase): def test_sample_function_accepts_async_aware(self): """Test that sample() function accepts async_aware parameter.""" - from profiling.sampling.sample import sample - import inspect - sig = inspect.signature(sample) self.assertIn("async_aware", sig.parameters) def test_sample_live_function_accepts_async_aware(self): """Test that sample_live() function accepts async_aware parameter.""" - from profiling.sampling.sample import sample_live - import inspect - sig = inspect.signature(sample_live) self.assertIn("async_aware", sig.parameters) def test_sample_profiler_sample_accepts_async_aware(self): """Test that SampleProfiler.sample() accepts async_aware parameter.""" - from profiling.sampling.sample import SampleProfiler - import inspect - sig = inspect.signature(SampleProfiler.sample) self.assertIn("async_aware", sig.parameters) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_children.py b/Lib/test/test_profiling/test_sampling_profiler/test_children.py index b7dc878a238f8d..cfc6ffe7f294c9 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_children.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_children.py @@ -10,6 +10,7 @@ import threading import time import unittest +from unittest.mock import MagicMock, patch from test.support import ( SHORT_TIMEOUT, @@ -17,6 +18,21 @@ requires_remote_subprocess_debugging, ) +from profiling.sampling._child_monitor import ( + get_child_pids, + ChildProcessMonitor, + is_python_process, + _MAX_CHILD_PROFILERS, + _CLEANUP_INTERVAL_CYCLES, +) +from profiling.sampling.cli import ( + _add_sampling_options, + _validate_args, + _build_child_profiler_args, + _build_output_pattern, + _setup_child_monitor, +) + from .helpers import _cleanup_process # String to check for in stderr when profiler lacks permissions (e.g., macOS) @@ -100,8 +116,6 @@ def test_get_child_pids_from_remote_debugging(self): def test_get_child_pids_fallback(self): """Test the fallback implementation for get_child_pids.""" - from profiling.sampling._child_monitor import get_child_pids - # Test with current process result = get_child_pids(os.getpid()) self.assertIsInstance(result, list) @@ -109,8 +123,6 @@ def test_get_child_pids_fallback(self): @unittest.skipUnless(sys.platform == "linux", "Linux only") def test_discover_child_process_linux(self): """Test that we can discover child processes on Linux.""" - from profiling.sampling._child_monitor import get_child_pids - # Create a child process proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(10)"], @@ -139,8 +151,6 @@ def test_discover_child_process_linux(self): def test_recursive_child_discovery(self): """Test that recursive=True finds grandchildren.""" - from profiling.sampling._child_monitor import get_child_pids - # Create a child that spawns a grandchild and keeps a reference to it # so we can clean it up via the child process code = """ @@ -256,8 +266,6 @@ def wait_for_signal(): def test_nonexistent_pid_returns_empty(self): """Test that nonexistent PID returns empty list.""" - from profiling.sampling._child_monitor import get_child_pids - # Use a very high PID that's unlikely to exist result = get_child_pids(999999999) self.assertEqual(result, []) @@ -275,8 +283,6 @@ def tearDown(self): def test_monitor_creation(self): """Test that ChildProcessMonitor can be created.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=["-r", "10khz", "-d", "5"], @@ -288,8 +294,6 @@ def test_monitor_creation(self): def test_monitor_lifecycle(self): """Test monitor lifecycle via context manager.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -307,8 +311,6 @@ def test_monitor_lifecycle(self): def test_spawned_profilers_property(self): """Test that spawned_profilers returns a copy of the list.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -320,8 +322,6 @@ def test_spawned_profilers_property(self): def test_context_manager(self): """Test that ChildProcessMonitor works as a context manager.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - with ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) as monitor: @@ -344,8 +344,6 @@ def tearDown(self): def test_subprocesses_flag_parsed(self): """Test that --subprocesses flag is recognized.""" - from profiling.sampling.cli import _add_sampling_options - parser = argparse.ArgumentParser() _add_sampling_options(parser) @@ -359,8 +357,6 @@ def test_subprocesses_flag_parsed(self): def test_subprocesses_incompatible_with_live(self): """Test that --subprocesses is incompatible with --live.""" - from profiling.sampling.cli import _validate_args - # Create mock args with both subprocesses and live args = argparse.Namespace( subprocesses=True, @@ -383,8 +379,6 @@ def test_subprocesses_incompatible_with_live(self): def test_build_child_profiler_args(self): """Test building CLI args for child profilers.""" - from profiling.sampling.cli import _build_child_profiler_args - args = argparse.Namespace( sample_interval_usec=200, duration=15, @@ -441,8 +435,6 @@ def assert_flag_value_pair(flag, value): def test_build_child_profiler_args_no_gc(self): """Test building CLI args with --no-gc.""" - from profiling.sampling.cli import _build_child_profiler_args - args = argparse.Namespace( sample_interval_usec=100, duration=5, @@ -466,8 +458,6 @@ def test_build_child_profiler_args_no_gc(self): def test_build_output_pattern_with_outfile(self): """Test output pattern generation with user-specified output.""" - from profiling.sampling.cli import _build_output_pattern - # With extension args = argparse.Namespace(outfile="output.html", format="flamegraph") pattern = _build_output_pattern(args) @@ -480,8 +470,6 @@ def test_build_output_pattern_with_outfile(self): def test_build_output_pattern_default(self): """Test output pattern generation with default output.""" - from profiling.sampling.cli import _build_output_pattern - # Flamegraph format args = argparse.Namespace(outfile=None, format="flamegraph") pattern = _build_output_pattern(args) @@ -507,8 +495,6 @@ def tearDown(self): def test_setup_child_monitor(self): """Test setting up a child monitor from args.""" - from profiling.sampling.cli import _setup_child_monitor - args = argparse.Namespace( sample_interval_usec=100, duration=5, @@ -548,8 +534,6 @@ def tearDown(self): def test_is_python_process_current_process(self): """Test that current process is detected as Python.""" - from profiling.sampling._child_monitor import is_python_process - # Current process should be Python result = is_python_process(os.getpid()) self.assertTrue( @@ -559,8 +543,6 @@ def test_is_python_process_current_process(self): def test_is_python_process_python_subprocess(self): """Test that a Python subprocess is detected as Python.""" - from profiling.sampling._child_monitor import is_python_process - # Start a Python subprocess proc = subprocess.Popen( [sys.executable, "-c", "import time; time.sleep(10)"], @@ -593,8 +575,6 @@ def test_is_python_process_python_subprocess(self): @unittest.skipUnless(sys.platform == "linux", "Linux only test") def test_is_python_process_non_python_subprocess(self): """Test that a non-Python subprocess is not detected as Python.""" - from profiling.sampling._child_monitor import is_python_process - # Start a non-Python subprocess (sleep command) proc = subprocess.Popen( ["sleep", "10"], @@ -619,8 +599,6 @@ def test_is_python_process_non_python_subprocess(self): def test_is_python_process_nonexistent_pid(self): """Test that nonexistent PID returns False.""" - from profiling.sampling._child_monitor import is_python_process - # Use a very high PID that's unlikely to exist result = is_python_process(999999999) self.assertFalse( @@ -630,8 +608,6 @@ def test_is_python_process_nonexistent_pid(self): def test_is_python_process_exited_process(self): """Test handling of a process that exits quickly.""" - from profiling.sampling._child_monitor import is_python_process - # Start a process that exits immediately proc = subprocess.Popen( [sys.executable, "-c", "pass"], @@ -661,8 +637,6 @@ def tearDown(self): def test_max_profilers_constant_exists(self): """Test that _MAX_CHILD_PROFILERS constant is defined.""" - from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS - self.assertEqual( _MAX_CHILD_PROFILERS, 100, @@ -671,8 +645,6 @@ def test_max_profilers_constant_exists(self): def test_cleanup_interval_constant_exists(self): """Test that _CLEANUP_INTERVAL_CYCLES constant is defined.""" - from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES - self.assertEqual( _CLEANUP_INTERVAL_CYCLES, 10, @@ -681,12 +653,6 @@ def test_cleanup_interval_constant_exists(self): def test_monitor_respects_max_limit(self): """Test that monitor refuses to spawn more than _MAX_CHILD_PROFILERS.""" - from profiling.sampling._child_monitor import ( - ChildProcessMonitor, - _MAX_CHILD_PROFILERS, - ) - from unittest.mock import MagicMock, patch - # Create a monitor monitor = ChildProcessMonitor( pid=os.getpid(), @@ -739,8 +705,6 @@ def tearDown(self): def test_wait_for_profilers_empty_list(self): """Test that wait_for_profilers returns immediately with no profilers.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -767,8 +731,6 @@ def test_wait_for_profilers_empty_list(self): def test_wait_for_profilers_with_completed_process(self): """Test waiting for profilers that complete quickly.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -807,8 +769,6 @@ def test_wait_for_profilers_with_completed_process(self): def test_wait_for_profilers_timeout(self): """Test that wait_for_profilers respects timeout.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) @@ -847,8 +807,6 @@ def test_wait_for_profilers_timeout(self): def test_wait_for_profilers_multiple(self): """Test waiting for multiple profilers.""" - from profiling.sampling._child_monitor import ChildProcessMonitor - monitor = ChildProcessMonitor( pid=os.getpid(), cli_args=[], output_pattern=None ) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py index 30615a7d31d86c..13bdb4e111364c 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py @@ -2,6 +2,7 @@ import json import marshal +import opcode import os import tempfile import unittest @@ -1437,7 +1438,6 @@ class TestOpcodeFormatting(unittest.TestCase): def test_get_opcode_info_standard_opcode(self): """Test get_opcode_info for a standard opcode.""" - import opcode # LOAD_CONST is a standard opcode load_const = opcode.opmap.get('LOAD_CONST') if load_const is not None: @@ -1455,7 +1455,6 @@ def test_get_opcode_info_unknown_opcode(self): def test_format_opcode_standard(self): """Test format_opcode for a standard opcode.""" - import opcode load_const = opcode.opmap.get('LOAD_CONST') if load_const is not None: formatted = format_opcode(load_const) @@ -1463,7 +1462,6 @@ def test_format_opcode_standard(self): def test_format_opcode_specialized(self): """Test format_opcode for a specialized opcode shows base in parens.""" - import opcode if not hasattr(opcode, '_specialized_opmap'): self.skipTest("No specialized opcodes in this Python version") if not hasattr(opcode, '_specializations'): diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py index b82474858ddd4a..c6731e956391a9 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py @@ -18,6 +18,7 @@ from profiling.sampling.pstats_collector import PstatsCollector from profiling.sampling.stack_collector import CollapsedStackCollector from profiling.sampling.sample import SampleProfiler, _is_process_running + from profiling.sampling.cli import main except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -547,7 +548,6 @@ def test_sample_target_script(self): io.StringIO() as captured_output, mock.patch("sys.stdout", captured_output), ): - from profiling.sampling.cli import main main() output = captured_output.getvalue() @@ -585,7 +585,6 @@ def test_sample_target_module(self): # Change to temp directory so subprocess can find the module contextlib.chdir(tempdir.name), ): - from profiling.sampling.cli import main main() output = captured_output.getvalue() @@ -714,8 +713,7 @@ def test_live_incompatible_with_pstats_options(self): test_args = ["profiling.sampling.cli", "run", "--live"] + args + ["test.py"] with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) def test_live_incompatible_with_multiple_pstats_options(self): @@ -727,8 +725,7 @@ def test_live_incompatible_with_multiple_pstats_options(self): with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) def test_live_incompatible_with_pstats_default_values(self): @@ -738,8 +735,7 @@ def test_live_incompatible_with_pstats_default_values(self): with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) # Test with --limit=15 (the default value) @@ -747,8 +743,7 @@ def test_live_incompatible_with_pstats_default_values(self): with mock.patch("sys.argv", test_args): with self.assertRaises(SystemExit) as cm: - from profiling.sampling.cli import main - main() + main() self.assertNotEqual(cm.exception.code, 0) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py index 38f1d03e4939f1..8342faffb94762 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py @@ -383,10 +383,9 @@ def test_finished_state_footer_message(self): def test_finished_state_freezes_time(self): """Test that time displays are frozen when finished.""" - import time as time_module # Set up collector with known start time - self.collector.start_time = time_module.perf_counter() - 10.0 # 10 seconds ago + self.collector.start_time = time.perf_counter() - 10.0 # 10 seconds ago # Mark as finished - this should freeze the time self.collector.mark_finished() @@ -396,7 +395,7 @@ def test_finished_state_freezes_time(self): frozen_time_display = self.collector.current_time_display # Wait a bit to ensure time would advance - time_module.sleep(0.1) + time.sleep(0.1) # Time should remain frozen self.assertEqual(self.collector.elapsed_time, frozen_elapsed) @@ -1215,7 +1214,6 @@ def test_reset_blocked_when_finished(self): def test_time_display_fix_when_finished(self): """Test that time display shows correct frozen time when finished.""" - import time as time_module # Mark as finished to freeze time self.collector.mark_finished() @@ -1228,7 +1226,7 @@ def test_time_display_fix_when_finished(self): frozen_time = self.collector.current_time_display # Wait a bit - time_module.sleep(0.1) + time.sleep(0.1) # Should still show the same frozen time (not jump to wrong time) self.assertEqual(self.collector.current_time_display, frozen_time) diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py index 877237866b1e65..0b38fb4ad4bcf6 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py @@ -9,6 +9,12 @@ import profiling.sampling import profiling.sampling.sample from profiling.sampling.pstats_collector import PstatsCollector + from profiling.sampling.cli import main, _parse_mode + from profiling.sampling.constants import PROFILING_MODE_EXCEPTION + from _remote_debugging import ( + THREAD_STATUS_HAS_GIL, + THREAD_STATUS_ON_CPU, + ) except ImportError: raise unittest.SkipTest( "Test only runs when _remote_debugging is available" @@ -40,7 +46,6 @@ def test_mode_validation(self): mock.patch("sys.stderr", io.StringIO()) as mock_stderr, self.assertRaises(SystemExit) as cm, ): - from profiling.sampling.cli import main main() self.assertEqual(cm.exception.code, 2) # argparse error @@ -49,16 +54,6 @@ def test_mode_validation(self): def test_frames_filtered_with_skip_idle(self): """Test that frames are actually filtered when skip_idle=True.""" - # Import thread status flags - try: - from _remote_debugging import ( - THREAD_STATUS_HAS_GIL, - THREAD_STATUS_ON_CPU, - ) - except ImportError: - THREAD_STATUS_HAS_GIL = 1 << 0 - THREAD_STATUS_ON_CPU = 1 << 1 - # Create mock frames with different thread statuses class MockThreadInfoWithStatus: def __init__(self, thread_id, frame_info, status): @@ -240,7 +235,6 @@ class TestGilModeFiltering(unittest.TestCase): def test_gil_mode_validation(self): """Test that CLI accepts gil mode choice correctly.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -298,7 +292,6 @@ def test_gil_mode_sample_function_call(self): def test_gil_mode_cli_argument_parsing(self): """Test CLI argument parsing for GIL mode with various options.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -405,7 +398,6 @@ def test_mode_constants_are_defined(self): def test_parse_mode_function(self): """Test the _parse_mode function with all valid modes.""" - from profiling.sampling.cli import _parse_mode self.assertEqual(_parse_mode("wall"), 0) self.assertEqual(_parse_mode("cpu"), 1) self.assertEqual(_parse_mode("gil"), 2) @@ -422,7 +414,6 @@ class TestExceptionModeFiltering(unittest.TestCase): def test_exception_mode_validation(self): """Test that CLI accepts exception mode choice correctly.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -480,7 +471,6 @@ def test_exception_mode_sample_function_call(self): def test_exception_mode_cli_argument_parsing(self): """Test CLI argument parsing for exception mode with various options.""" - from profiling.sampling.cli import main test_args = [ "profiling.sampling.cli", @@ -512,7 +502,6 @@ def test_exception_mode_cli_argument_parsing(self): def test_exception_mode_constants_are_defined(self): """Test that exception mode constant is properly defined.""" - from profiling.sampling.constants import PROFILING_MODE_EXCEPTION self.assertEqual(PROFILING_MODE_EXCEPTION, 4) def test_exception_mode_integration_filtering(self): diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py index 822f559561eb0a..8d70a1d2ef8cfc 100644 --- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py +++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py @@ -1,6 +1,7 @@ """Tests for sampling profiler core functionality.""" import io +import re from unittest import mock import unittest @@ -591,7 +592,6 @@ def test_print_sampled_stats_sort_by_name(self): # Extract just the function names for comparison func_names = [] - import re for line in data_lines: # Function name is between the last ( and ), accounting for ANSI color codes