Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@
import os
import shutil
import tempfile
import threading
import time
import traceback
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Type
Expand Down Expand Up @@ -39,6 +37,7 @@
S3Storage,
)
from gooddata_pipelines.logger import LogObserver
from gooddata_pipelines.utils.rate_limiter import RateLimiter


@dataclass
Expand All @@ -60,6 +59,10 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig):

self.loader = BackupInputProcessor(self._api, self.config.api_page_size)

self._api_rate_limiter = RateLimiter(
calls_per_second=self.config.api_calls_per_second,
)

@classmethod
def create(
cls: Type["BackupManager"],
Expand Down Expand Up @@ -95,11 +98,12 @@ def _get_storage(conf: BackupRestoreConfig) -> BackupStorage:

def get_user_data_filters(self, ws_id: str) -> dict:
"""Returns the user data filters for the specified workspace."""
response: requests.Response = self._api.get_user_data_filters(ws_id)
if response.ok:
return response.json()
else:
raise RuntimeError(f"{response.status_code}: {response.text}")
with self._api_rate_limiter:
response: requests.Response = self._api.get_user_data_filters(ws_id)
if response.ok:
return response.json()
else:
raise RuntimeError(f"{response.status_code}: {response.text}")

def _store_user_data_filters(
self,
Expand Down Expand Up @@ -144,14 +148,17 @@ def _write_to_yaml(path: str, source: Any) -> None:

def _get_automations_from_api(self, workspace_id: str) -> Any:
"""Returns automations for the workspace as JSON."""
response: requests.Response = self._api.get_automations(workspace_id)
if response.ok:
return response.json()
else:
raise RuntimeError(
f"Failed to get automations for {workspace_id}. "
+ f"{response.status_code}: {response.text}"
with self._api_rate_limiter:
response: requests.Response = self._api.get_automations(
workspace_id
)
if response.ok:
return response.json()
else:
raise RuntimeError(
f"Failed to get automations for {workspace_id}. "
+ f"{response.status_code}: {response.text}"
)

def _store_automations(self, export_path: Path, workspace_id: str) -> None:
"""Stores the automations in the specified export path."""
Expand Down Expand Up @@ -183,7 +190,8 @@ def store_declarative_filter_views(
) -> None:
"""Stores the filter views in the specified export path."""
# Get the filter views YAML files from the API
self._api.store_declarative_filter_views(workspace_id, export_path)
with self._api_rate_limiter:
self._api.store_declarative_filter_views(workspace_id, export_path)

# Move filter views to the subfolder containing the analytics model
self._move_folder(
Expand Down Expand Up @@ -231,7 +239,10 @@ def _get_workspace_export(
# the SDK. That way we could save and package all the declarations
# directly instead of reorganizing the folder structures. That should
# be more transparent/readable and possibly safer for threading
self._api.store_declarative_workspace(workspace_id, export_path)
with self._api_rate_limiter:
self._api.store_declarative_workspace(
workspace_id, export_path
)
self.store_declarative_filter_views(export_path, workspace_id)
self._store_automations(export_path, workspace_id)

Expand Down Expand Up @@ -291,18 +302,13 @@ def _split_to_batches(
def _process_batch(
self,
batch: BackupBatch,
stop_event: threading.Event,
retry_count: int = 0,
) -> None:
"""Processes a single batch of workspaces for backup.
If the batch processing fails, the function will wait
and retry with exponential backoff up to BackupSettings.MAX_RETRIES.
The base wait time is defined by BackupSettings.RETRY_DELAY.
"""
if stop_event.is_set():
# If the stop_event flag is set, return. This will terminate the thread
return

try:
with tempfile.TemporaryDirectory() as tmpdir:
self._get_workspace_export(tmpdir, batch.list_of_ids)
Expand All @@ -314,10 +320,7 @@ def _process_batch(
self.storage.export(tmpdir, self.org_id)

except Exception as e:
if stop_event.is_set():
return

elif retry_count < BackupSettings.MAX_RETRIES:
if retry_count < BackupSettings.MAX_RETRIES:
# Retry with exponential backoff until MAX_RETRIES
next_retry = retry_count + 1
wait_time = BackupSettings.RETRY_DELAY**next_retry
Expand All @@ -328,52 +331,23 @@ def _process_batch(
)

time.sleep(wait_time)
self._process_batch(batch, stop_event, next_retry)
self._process_batch(batch, next_retry)
else:
# If the batch fails after MAX_RETRIES, raise the error
self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}")
raise

def _process_batches_in_parallel(
def _process_batches(
self,
batches: list[BackupBatch],
) -> None:
"""
Processes batches in parallel using concurrent.futures. Will stop the processing
if any one of the batches fails.
Processes batches sequentially to avoid overloading the API.
If any batch fails, the processing will stop.
"""

# Create a threading flag to control the threads that have already been started
stop_event = threading.Event()

with ThreadPoolExecutor(
max_workers=self.config.max_workers
) as executor:
# Set the futures tasks.
futures = []
for batch in batches:
futures.append(
executor.submit(
self._process_batch,
batch,
stop_event,
)
)

# Process futures as they complete
for future in as_completed(futures):
try:
future.result()
except Exception:
# On failure, set the flag to True - signal running processes to stop
stop_event.set()

# Cancel unstarted threads
for f in futures:
if not f.done():
f.cancel()

raise
for i, batch in enumerate(batches, 1):
self.logger.info(f"Processing batch {i}/{len(batches)}...")
self._process_batch(batch)

def backup_workspaces(
self,
Expand Down Expand Up @@ -440,7 +414,7 @@ def _backup(
f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches."
)

self._process_batches_in_parallel(batches)
self._process_batches(batches)

self.logger.info("Backup completed")
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,15 @@ class DirNames:
UDF = "user_data_filters"


@dataclass(frozen=True)
class ConcurrencyDefaults:
MAX_WORKERS = 1
DEFAULT_BATCH_SIZE = 100


@dataclass(frozen=True)
class ApiDefaults:
DEFAULT_PAGE_SIZE = 100
DEFAULT_BATCH_SIZE = 100
DEFAULT_API_CALLS_PER_SECOND = 1.0


@dataclass(frozen=True)
class BackupSettings(ConcurrencyDefaults, ApiDefaults):
class BackupSettings(ApiDefaults):
MAX_RETRIES = 3
RETRY_DELAY = 5 # seconds
TIMESTAMP_SDK_FOLDER = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,13 @@ class BackupRestoreConfig(BaseModel):
description="Batch size must be greater than 0",
),
] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE)
max_workers: Annotated[
int,
api_calls_per_second: Annotated[
float,
Field(
gt=0,
lt=3,
description="Max workers must be greater than 0 and less than 3",
description="Maximum API calls per second (rate limiting)",
),
] = Field(default=BackupSettings.MAX_WORKERS)
] = Field(default=BackupSettings.DEFAULT_API_CALLS_PER_SECOND)

@classmethod
def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig":
Expand Down
9 changes: 9 additions & 0 deletions gooddata-pipelines/gooddata_pipelines/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# (C) 2025 GoodData Corporation

"""
Utility modules for gooddata-pipelines package.
"""

from .rate_limiter import RateLimiter

__all__ = ["RateLimiter"]
64 changes: 64 additions & 0 deletions gooddata-pipelines/gooddata_pipelines/utils/rate_limiter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# (C) 2025 GoodData Corporation

import time
import threading
import functools
from typing import Callable, Any, Literal


class RateLimiter:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be great if this had some tests

"""
Rate limiter usable as a decorator and as a context manager.
- Shared instance decorator: limiter = RateLimiter(); @limiter
- Per-function decorator: @RateLimiter(calls_per_second=2)
- Context manager: with RateLimiter(2): ...
"""

def __init__(self, calls_per_second: float = 1.0) -> None:
if calls_per_second <= 0:
raise ValueError("calls_per_second must be greater than 0")

self.calls_per_second = calls_per_second
self.min_interval = 1.0 / calls_per_second

self._lock = threading.Lock()
self._last_call_time = 0.0

def wait_if_needed(self) -> float:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this method have a return value? If I see it correctly, it is not stored or read anywhere...

"""Sleep if needed to maintain the rate limit, return actual sleep time."""
with self._lock:
now = time.monotonic()
since_last = now - self._last_call_time

if since_last < self.min_interval:
sleep_time = self.min_interval - since_last
time.sleep(sleep_time)
self._last_call_time = time.monotonic()
return sleep_time
else:
self._last_call_time = now
return 0.0

# Decorator support
def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
self.wait_if_needed()
return func(*args, **kwargs)

return wrapper

# Context manager support
def __enter__(self) -> "RateLimiter":
self.wait_if_needed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be great if we could use the rate limiter as a decorator instead of context manager... My assumption is that it could be more versatile (for handling the methods from gooddata_sdk) and less verbose (we could wrap functions/methods with @rate_limiter instead of the with ... blocks and the rate limiter would not have to be a part of the instanced object (like the BackupManager). WDYT?

return self

def __exit__(
self, exc_type: Any, exc_val: Any, exc_tb: Any
) -> Literal[False]:
return False

def reset(self) -> None:
"""Reset the limiter (useful in tests)."""
with self._lock:
self._last_call_time = 0.0
4 changes: 0 additions & 4 deletions gooddata-pipelines/tests/backup_and_restore/test_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import shutil
import tempfile
import threading
from pathlib import Path
from unittest import mock

Expand Down Expand Up @@ -325,7 +324,6 @@ def test_process_batch_success(

backup_manager._process_batch(
batch=batch,
stop_event=threading.Event(),
retry_count=0,
)

Expand Down Expand Up @@ -362,7 +360,6 @@ def fail_once(*args, **kwargs):

backup_manager._process_batch(
batch=batch,
stop_event=threading.Event(),
)

assert get_workspace_export_mock.call_count == 2
Expand Down Expand Up @@ -392,7 +389,6 @@ def test_process_batch_raises_after_max_retries(
with pytest.raises(Exception) as exc_info:
backup_manager._process_batch(
batch=batch,
stop_event=threading.Event(),
retry_count=BackupSettings.MAX_RETRIES,
)
assert str(exc_info.value) == "fail"
Expand Down
Loading
Loading