From 093478768381f43c125b6918ab440a453df1065d Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 11 Feb 2026 11:36:20 +0100 Subject: [PATCH 1/2] Update `test_snapshot_pruning_removes_outdated_records` to remove the expected source of flaky behavior --- tests/unit/_autoscaling/test_snapshotter.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index 9d26e018e7..e6a5dc9a8a 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -213,9 +213,6 @@ async def test_methods_raise_error_when_not_active() -> None: assert snapshotter.active is True -@pytest.mark.skip( - reason='Flaky due to snapshot pruning boundary condition, see https://github.com/apify/crawlee-python/issues/1734' -) async def test_snapshot_pruning_removes_outdated_records( snapshotter: Snapshotter, event_manager: LocalEventManager, default_memory_info: MemoryInfo ) -> None: @@ -230,7 +227,13 @@ async def test_snapshot_pruning_removes_outdated_records( cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)), memory_info=default_memory_info, ) - for delta in [5, 3, 2, 0] + for delta in [ + 5, + 3, + # Snapshots older than 2 hours should be pruned + 1, + 0, + ] ] for event_data in events_data: @@ -241,7 +244,7 @@ async def test_snapshot_pruning_removes_outdated_records( # Check that only the last two snapshots remain assert len(cpu_snapshots) == 2 - assert cpu_snapshots[0].created_at == now - timedelta(hours=2) + assert cpu_snapshots[0].created_at == now - timedelta(hours=1) assert cpu_snapshots[1].created_at == now From 63e84687030660a62c9115cd135cfd1508319256 Mon Sep 17 00:00:00 2001 From: Josef Prochazka Date: Wed, 11 Feb 2026 16:13:16 +0100 Subject: [PATCH 2/2] Suggest fixing the race condition --- src/crawlee/_autoscaling/snapshotter.py | 20 +++++++---- tests/unit/_autoscaling/test_snapshotter.py | 37 +++++++++++---------- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/crawlee/_autoscaling/snapshotter.py b/src/crawlee/_autoscaling/snapshotter.py index 55af9da1dd..9a558ac1d6 100644 --- a/src/crawlee/_autoscaling/snapshotter.py +++ b/src/crawlee/_autoscaling/snapshotter.py @@ -2,7 +2,7 @@ from __future__ import annotations -import bisect +from bisect import insort from datetime import datetime, timedelta, timezone from logging import getLogger from typing import TYPE_CHECKING, TypeVar, cast @@ -31,7 +31,7 @@ class SortedSnapshotList(list[T]): def add(self, item: T) -> None: """Add an item to the list maintaining sorted order by `created_at` using binary search.""" - bisect.insort(self, item, key=lambda item: item.created_at) + insort(self, item, key=lambda item: item.created_at) @docs_group('Autoscaling') @@ -252,11 +252,13 @@ def _get_sample(snapshots: list[Snapshot], duration: timedelta | None = None) -> latest_time = snapshots[-1].created_at return [snapshot for snapshot in snapshots if latest_time - snapshot.created_at <= duration] - def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None: + async def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None: """Capture a snapshot of the current CPU usage. This method does not perform CPU usage measurement. Instead, it just reads the data received through the `event_data` parameter, which is expected to be supplied by the event manager. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). Args: event_data: System info data from which CPU usage is read. @@ -271,11 +273,13 @@ def _snapshot_cpu(self, event_data: EventSystemInfoData) -> None: self._prune_snapshots(snapshots, event_data.cpu_info.created_at) self._cpu_snapshots.add(snapshot) - def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: + async def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: """Capture a snapshot of the current memory usage. This method does not perform memory usage measurement. Instead, it just reads the data received through the `event_data` parameter, which is expected to be supplied by the event manager. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). Args: event_data: System info data from which memory usage is read. @@ -298,13 +302,15 @@ def _snapshot_memory(self, event_data: EventSystemInfoData) -> None: self._memory_snapshots.add(snapshot) self._evaluate_memory_load(event_data.memory_info.current_size, event_data.memory_info.created_at) - def _snapshot_event_loop(self) -> None: + async def _snapshot_event_loop(self) -> None: """Capture a snapshot of the current event loop usage. This method evaluates the event loop's latency by comparing the expected time between snapshots to the actual time elapsed since the last snapshot. The delay in the snapshot reflects the time deviation due to event loop overhead - it's calculated by subtracting the expected interval between snapshots from the actual time elapsed since the last snapshot. If there's no previous snapshot, the delay is considered zero. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). """ snapshot = EventLoopSnapshot(max_delay=self._max_event_loop_delay, delay=timedelta(seconds=0)) previous_snapshot = self._event_loop_snapshots[-1] if self._event_loop_snapshots else None @@ -317,11 +323,13 @@ def _snapshot_event_loop(self) -> None: self._prune_snapshots(snapshots, snapshot.created_at) self._event_loop_snapshots.add(snapshot) - def _snapshot_client(self) -> None: + async def _snapshot_client(self) -> None: """Capture a snapshot of the current API state by checking for rate limit errors (HTTP 429). Only errors produced by a 2nd retry of the API call are considered for snapshotting since earlier errors may just be caused by a random spike in the number of requests and do not necessarily signify API overloading. + Must be `async` to ensure it is not scheduled to be run in own thread by the event manager, which could cause + race conditions in snapshots manipulation(sorting and pruning). """ client = service_locator.get_storage_client() diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index e6a5dc9a8a..8b653873fa 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -1,9 +1,12 @@ from __future__ import annotations import asyncio +import time +from bisect import insort from datetime import datetime, timedelta, timezone from logging import getLogger -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast +from unittest import mock from unittest.mock import MagicMock import pytest @@ -222,29 +225,29 @@ async def test_snapshot_pruning_removes_outdated_records( # Create timestamps for testing now = datetime.now(timezone.utc) - events_data = [ - EventSystemInfoData( - cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)), - memory_info=default_memory_info, - ) - for delta in [ - 5, - 3, - # Snapshots older than 2 hours should be pruned - 1, - 0, + def randomly_delayed_insort(*args: Any, **kwargs: Any) -> None: + """Sort with injected delay to provoke otherwise hard to reproduce race condition.""" + time.sleep(0.05) + return insort(*args, **kwargs) + + with mock.patch('crawlee._autoscaling.snapshotter.insort', side_effect=randomly_delayed_insort): + events_data = [ + EventSystemInfoData( + cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)), + memory_info=default_memory_info, + ) + for delta in [5, 3, 2, 0] ] - ] - for event_data in events_data: - event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) - await event_manager.wait_for_all_listeners_to_complete() + for event_data in events_data: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() cpu_snapshots = cast('list[CpuSnapshot]', snapshotter.get_cpu_sample()) # Check that only the last two snapshots remain assert len(cpu_snapshots) == 2 - assert cpu_snapshots[0].created_at == now - timedelta(hours=1) + assert cpu_snapshots[0].created_at == now - timedelta(hours=2) assert cpu_snapshots[1].created_at == now