From c1e6543335fd3ad6b079071f2b9e1eaa4940b97e Mon Sep 17 00:00:00 2001 From: Anna Benke Date: Fri, 5 Sep 2025 16:00:10 +0200 Subject: [PATCH 1/2] feat(gooddata-pipelines): Process workspac backup sequentially --- .../backup_and_restore/backup_manager.py | 57 +++---------------- .../backup_and_restore/constants.py | 9 +-- .../backup_and_restore/models/storage.py | 8 --- 3 files changed, 11 insertions(+), 63 deletions(-) diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py index 005a3d35d..7e75874d0 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py @@ -4,10 +4,8 @@ import os import shutil import tempfile -import threading import time import traceback -from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path from typing import Any, Type @@ -291,7 +289,6 @@ def _split_to_batches( def _process_batch( self, batch: BackupBatch, - stop_event: threading.Event, retry_count: int = 0, ) -> None: """Processes a single batch of workspaces for backup. @@ -299,10 +296,6 @@ def _process_batch( and retry with exponential backoff up to BackupSettings.MAX_RETRIES. The base wait time is defined by BackupSettings.RETRY_DELAY. """ - if stop_event.is_set(): - # If the stop_event flag is set, return. This will terminate the thread - return - try: with tempfile.TemporaryDirectory() as tmpdir: self._get_workspace_export(tmpdir, batch.list_of_ids) @@ -314,10 +307,7 @@ def _process_batch( self.storage.export(tmpdir, self.org_id) except Exception as e: - if stop_event.is_set(): - return - - elif retry_count < BackupSettings.MAX_RETRIES: + if retry_count < BackupSettings.MAX_RETRIES: # Retry with exponential backoff until MAX_RETRIES next_retry = retry_count + 1 wait_time = BackupSettings.RETRY_DELAY**next_retry @@ -328,52 +318,23 @@ def _process_batch( ) time.sleep(wait_time) - self._process_batch(batch, stop_event, next_retry) + self._process_batch(batch, next_retry) else: # If the batch fails after MAX_RETRIES, raise the error self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}") raise - def _process_batches_in_parallel( + def _process_batches( self, batches: list[BackupBatch], ) -> None: """ - Processes batches in parallel using concurrent.futures. Will stop the processing - if any one of the batches fails. + Processes batches sequentially to avoid overloading the API. + If any batch fails, the processing will stop. """ - - # Create a threading flag to control the threads that have already been started - stop_event = threading.Event() - - with ThreadPoolExecutor( - max_workers=self.config.max_workers - ) as executor: - # Set the futures tasks. - futures = [] - for batch in batches: - futures.append( - executor.submit( - self._process_batch, - batch, - stop_event, - ) - ) - - # Process futures as they complete - for future in as_completed(futures): - try: - future.result() - except Exception: - # On failure, set the flag to True - signal running processes to stop - stop_event.set() - - # Cancel unstarted threads - for f in futures: - if not f.done(): - f.cancel() - - raise + for i, batch in enumerate(batches, 1): + self.logger.info(f"Processing batch {i}/{len(batches)}...") + self._process_batch(batch) def backup_workspaces( self, @@ -440,7 +401,7 @@ def _backup( f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches." ) - self._process_batches_in_parallel(batches) + self._process_batches(batches) self.logger.info("Backup completed") except Exception as e: diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py index 3feee2ea1..72397bdd3 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py @@ -21,19 +21,14 @@ class DirNames: UDF = "user_data_filters" -@dataclass(frozen=True) -class ConcurrencyDefaults: - MAX_WORKERS = 1 - DEFAULT_BATCH_SIZE = 100 - - @dataclass(frozen=True) class ApiDefaults: DEFAULT_PAGE_SIZE = 100 + DEFAULT_BATCH_SIZE = 100 @dataclass(frozen=True) -class BackupSettings(ConcurrencyDefaults, ApiDefaults): +class BackupSettings(ApiDefaults): MAX_RETRIES = 3 RETRY_DELAY = 5 # seconds TIMESTAMP_SDK_FOLDER = ( diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py index 3b8209038..9c9eb7c6f 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py @@ -83,14 +83,6 @@ class BackupRestoreConfig(BaseModel): description="Batch size must be greater than 0", ), ] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE) - max_workers: Annotated[ - int, - Field( - gt=0, - lt=3, - description="Max workers must be greater than 0 and less than 3", - ), - ] = Field(default=BackupSettings.MAX_WORKERS) @classmethod def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig": From 1d466f52d3a2f5d86549f015d8ebc5fa4735e3ad Mon Sep 17 00:00:00 2001 From: Anna Benke Date: Mon, 8 Sep 2025 14:47:14 +0200 Subject: [PATCH 2/2] feat(gooddata-pipelines): Add rate militer and use it on workspace backup --- .../backup_and_restore/backup_manager.py | 41 ++-- .../backup_and_restore/constants.py | 1 + .../backup_and_restore/models/storage.py | 7 + .../gooddata_pipelines/utils/__init__.py | 9 + .../gooddata_pipelines/utils/rate_limiter.py | 64 +++++++ .../tests/backup_and_restore/test_backup.py | 4 - .../tests/utils/test_rate_limiter.py | 176 ++++++++++++++++++ 7 files changed, 284 insertions(+), 18 deletions(-) create mode 100644 gooddata-pipelines/gooddata_pipelines/utils/__init__.py create mode 100644 gooddata-pipelines/gooddata_pipelines/utils/rate_limiter.py create mode 100644 gooddata-pipelines/tests/utils/test_rate_limiter.py diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py index 7e75874d0..3a042b9f0 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py @@ -37,6 +37,7 @@ S3Storage, ) from gooddata_pipelines.logger import LogObserver +from gooddata_pipelines.utils.rate_limiter import RateLimiter @dataclass @@ -58,6 +59,10 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig): self.loader = BackupInputProcessor(self._api, self.config.api_page_size) + self._api_rate_limiter = RateLimiter( + calls_per_second=self.config.api_calls_per_second, + ) + @classmethod def create( cls: Type["BackupManager"], @@ -93,11 +98,12 @@ def _get_storage(conf: BackupRestoreConfig) -> BackupStorage: def get_user_data_filters(self, ws_id: str) -> dict: """Returns the user data filters for the specified workspace.""" - response: requests.Response = self._api.get_user_data_filters(ws_id) - if response.ok: - return response.json() - else: - raise RuntimeError(f"{response.status_code}: {response.text}") + with self._api_rate_limiter: + response: requests.Response = self._api.get_user_data_filters(ws_id) + if response.ok: + return response.json() + else: + raise RuntimeError(f"{response.status_code}: {response.text}") def _store_user_data_filters( self, @@ -142,14 +148,17 @@ def _write_to_yaml(path: str, source: Any) -> None: def _get_automations_from_api(self, workspace_id: str) -> Any: """Returns automations for the workspace as JSON.""" - response: requests.Response = self._api.get_automations(workspace_id) - if response.ok: - return response.json() - else: - raise RuntimeError( - f"Failed to get automations for {workspace_id}. " - + f"{response.status_code}: {response.text}" + with self._api_rate_limiter: + response: requests.Response = self._api.get_automations( + workspace_id ) + if response.ok: + return response.json() + else: + raise RuntimeError( + f"Failed to get automations for {workspace_id}. " + + f"{response.status_code}: {response.text}" + ) def _store_automations(self, export_path: Path, workspace_id: str) -> None: """Stores the automations in the specified export path.""" @@ -181,7 +190,8 @@ def store_declarative_filter_views( ) -> None: """Stores the filter views in the specified export path.""" # Get the filter views YAML files from the API - self._api.store_declarative_filter_views(workspace_id, export_path) + with self._api_rate_limiter: + self._api.store_declarative_filter_views(workspace_id, export_path) # Move filter views to the subfolder containing the analytics model self._move_folder( @@ -229,7 +239,10 @@ def _get_workspace_export( # the SDK. That way we could save and package all the declarations # directly instead of reorganizing the folder structures. That should # be more transparent/readable and possibly safer for threading - self._api.store_declarative_workspace(workspace_id, export_path) + with self._api_rate_limiter: + self._api.store_declarative_workspace( + workspace_id, export_path + ) self.store_declarative_filter_views(export_path, workspace_id) self._store_automations(export_path, workspace_id) diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py index 72397bdd3..5ce180b89 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py @@ -25,6 +25,7 @@ class DirNames: class ApiDefaults: DEFAULT_PAGE_SIZE = 100 DEFAULT_BATCH_SIZE = 100 + DEFAULT_API_CALLS_PER_SECOND = 1.0 @dataclass(frozen=True) diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py index 9c9eb7c6f..83c2d6056 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py @@ -83,6 +83,13 @@ class BackupRestoreConfig(BaseModel): description="Batch size must be greater than 0", ), ] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE) + api_calls_per_second: Annotated[ + float, + Field( + gt=0, + description="Maximum API calls per second (rate limiting)", + ), + ] = Field(default=BackupSettings.DEFAULT_API_CALLS_PER_SECOND) @classmethod def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig": diff --git a/gooddata-pipelines/gooddata_pipelines/utils/__init__.py b/gooddata-pipelines/gooddata_pipelines/utils/__init__.py new file mode 100644 index 000000000..744fb2d4a --- /dev/null +++ b/gooddata-pipelines/gooddata_pipelines/utils/__init__.py @@ -0,0 +1,9 @@ +# (C) 2025 GoodData Corporation + +""" +Utility modules for gooddata-pipelines package. +""" + +from .rate_limiter import RateLimiter + +__all__ = ["RateLimiter"] diff --git a/gooddata-pipelines/gooddata_pipelines/utils/rate_limiter.py b/gooddata-pipelines/gooddata_pipelines/utils/rate_limiter.py new file mode 100644 index 000000000..7c6598aef --- /dev/null +++ b/gooddata-pipelines/gooddata_pipelines/utils/rate_limiter.py @@ -0,0 +1,64 @@ +# (C) 2025 GoodData Corporation + +import time +import threading +import functools +from typing import Callable, Any, Literal + + +class RateLimiter: + """ + Rate limiter usable as a decorator and as a context manager. + - Shared instance decorator: limiter = RateLimiter(); @limiter + - Per-function decorator: @RateLimiter(calls_per_second=2) + - Context manager: with RateLimiter(2): ... + """ + + def __init__(self, calls_per_second: float = 1.0) -> None: + if calls_per_second <= 0: + raise ValueError("calls_per_second must be greater than 0") + + self.calls_per_second = calls_per_second + self.min_interval = 1.0 / calls_per_second + + self._lock = threading.Lock() + self._last_call_time = 0.0 + + def wait_if_needed(self) -> float: + """Sleep if needed to maintain the rate limit, return actual sleep time.""" + with self._lock: + now = time.monotonic() + since_last = now - self._last_call_time + + if since_last < self.min_interval: + sleep_time = self.min_interval - since_last + time.sleep(sleep_time) + self._last_call_time = time.monotonic() + return sleep_time + else: + self._last_call_time = now + return 0.0 + + # Decorator support + def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + self.wait_if_needed() + return func(*args, **kwargs) + + return wrapper + + # Context manager support + def __enter__(self) -> "RateLimiter": + self.wait_if_needed() + return self + + def __exit__( + self, exc_type: Any, exc_val: Any, exc_tb: Any + ) -> Literal[False]: + return False + + def reset(self) -> None: + """Reset the limiter (useful in tests).""" + with self._lock: + self._last_call_time = 0.0 diff --git a/gooddata-pipelines/tests/backup_and_restore/test_backup.py b/gooddata-pipelines/tests/backup_and_restore/test_backup.py index 2aaba580d..4e425550b 100644 --- a/gooddata-pipelines/tests/backup_and_restore/test_backup.py +++ b/gooddata-pipelines/tests/backup_and_restore/test_backup.py @@ -3,7 +3,6 @@ import os import shutil import tempfile -import threading from pathlib import Path from unittest import mock @@ -325,7 +324,6 @@ def test_process_batch_success( backup_manager._process_batch( batch=batch, - stop_event=threading.Event(), retry_count=0, ) @@ -362,7 +360,6 @@ def fail_once(*args, **kwargs): backup_manager._process_batch( batch=batch, - stop_event=threading.Event(), ) assert get_workspace_export_mock.call_count == 2 @@ -392,7 +389,6 @@ def test_process_batch_raises_after_max_retries( with pytest.raises(Exception) as exc_info: backup_manager._process_batch( batch=batch, - stop_event=threading.Event(), retry_count=BackupSettings.MAX_RETRIES, ) assert str(exc_info.value) == "fail" diff --git a/gooddata-pipelines/tests/utils/test_rate_limiter.py b/gooddata-pipelines/tests/utils/test_rate_limiter.py new file mode 100644 index 000000000..d5dbc897e --- /dev/null +++ b/gooddata-pipelines/tests/utils/test_rate_limiter.py @@ -0,0 +1,176 @@ +# (C) 2025 GoodData Corporation + +import time +import pytest +from gooddata_pipelines.utils.rate_limiter import RateLimiter + + +# --------------------------- +# Core wait + reset behavior +# --------------------------- + + +def test_rate_limiter_no_wait_needed(): + limiter = RateLimiter(calls_per_second=1000.0) # Very fast limit + waited = limiter.wait_if_needed() + assert waited == pytest.approx(0.0, abs=0.001) + + +def test_rate_limiter_enforces_delay(): + limiter = RateLimiter(calls_per_second=2.0) + limiter.wait_if_needed() + start = time.time() + waited = limiter.wait_if_needed() + duration = time.time() - start + + assert waited >= 0.49 + assert duration < 0.65 + + +def test_rate_limiter_respects_reset(): + limiter = RateLimiter(calls_per_second=1.0) + limiter.wait_if_needed() + limiter.reset() + waited = limiter.wait_if_needed() + assert waited == pytest.approx(0.0, abs=0.001) + + +def test_rate_limiter_min_interval_property(): + limiter = RateLimiter(calls_per_second=4.0) + assert limiter.min_interval == pytest.approx(0.25, abs=1e-9) + + +# ----------------------------------------- +# Decorator: shared instance (@limiter) +# ----------------------------------------- + + +def test_rate_limiter_as_decorator_enforces_delay_shared_instance(): + limiter = RateLimiter(calls_per_second=2.0) + ts = [] + + @limiter + def func(): + ts.append(time.time()) + + func() + func() + + assert len(ts) == 2 + assert ts[1] - ts[0] >= 0.49 + + +def test_rate_limiter_decorator_shared_state_across_functions(): + limiter = RateLimiter(calls_per_second=2.0) + ts = [] + + @limiter + def func_a(): + ts.append(time.time()) + + @limiter + def func_b(): + ts.append(time.time()) + + func_a() + func_b() # should be throttled by the *same* limiter + assert len(ts) == 2 + assert ts[1] - ts[0] >= 0.49 + + +def test_multiple_limiters_independent_state_shared_instance_mode(): + limiter_a = RateLimiter(calls_per_second=2.0) + limiter_b = RateLimiter(calls_per_second=2.0) + + ts_a = [] + ts_b = [] + + @limiter_a + def func_a(): + ts_a.append(time.time()) + + @limiter_b + def func_b(): + ts_b.append(time.time()) + + func_a() + func_b() + + # They should be ~simultaneous since they use different instances + assert abs(ts_a[0] - ts_b[0]) < 0.05 + + +# ------------------------------------------------------- +# Decorator: per-function instance (@RateLimiter(...)) +# ------------------------------------------------------- + + +def test_per_function_decorator_enforces_delay_per_function(): + # Each function decorated this way gets its *own* limiter instance. + ts = [] + + @RateLimiter(calls_per_second=2.0) # 0.5s + def func(): + ts.append(time.time()) + + func() + func() + assert len(ts) == 2 + assert ts[1] - ts[0] >= 0.49 + + +def test_per_function_decorator_independent_state_between_functions(): + ts_a = [] + ts_b = [] + + @RateLimiter(calls_per_second=2.0) + def func_a(): + ts_a.append(time.time()) + + @RateLimiter(calls_per_second=2.0) + def func_b(): + ts_b.append(time.time()) + + func_a() + func_b() # independent limiter, so no enforced delay between A and B + assert abs(ts_a[0] - ts_b[0]) < 0.05 + + +# ----------------------------- +# Context manager usage +# ----------------------------- + + +def test_context_manager_waits_on_enter(): + limiter = RateLimiter(calls_per_second=2.0) + with limiter: + t1 = time.time() + with limiter: + t2 = time.time() + + # The second 'with' should be at least 0.5s after the first 'with' enter + assert t2 - t1 >= 0.49 + + +def test_context_manager_multiple_uses_same_instance(): + limiter = RateLimiter(calls_per_second=3.0) + times = [] + + for _ in range(3): + with limiter: + times.append(time.time()) + + intervals = [b - a for a, b in zip(times, times[1:])] + for iv in intervals: + assert iv >= 0.30 # a bit of slack for timing variance + + +def test_context_manager_propagates_exceptions(): + limiter = RateLimiter(calls_per_second=10.0) + + class Boom(Exception): + pass + + with pytest.raises(Boom): + with limiter: + raise Boom("fail")