Skip to content

Commit ba0d8ab

Browse files
committed
feat(gooddata-pipelines): Add rate militer and use it on workspace backup
1 parent c1e6543 commit ba0d8ab

File tree

6 files changed

+138
-18
lines changed

6 files changed

+138
-18
lines changed

gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
S3Storage,
3838
)
3939
from gooddata_pipelines.logger import LogObserver
40+
from gooddata_pipelines.utils.rate_limiter import RateLimiter
4041

4142

4243
@dataclass
@@ -58,6 +59,11 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig):
5859

5960
self.loader = BackupInputProcessor(self._api, self.config.api_page_size)
6061

62+
self._api_rate_limiter = RateLimiter(
63+
calls_per_second=self.config.api_calls_per_second,
64+
name="GoodData_API_RateLimiter",
65+
)
66+
6167
@classmethod
6268
def create(
6369
cls: Type["BackupManager"],
@@ -93,11 +99,12 @@ def _get_storage(conf: BackupRestoreConfig) -> BackupStorage:
9399

94100
def get_user_data_filters(self, ws_id: str) -> dict:
95101
"""Returns the user data filters for the specified workspace."""
96-
response: requests.Response = self._api.get_user_data_filters(ws_id)
97-
if response.ok:
98-
return response.json()
99-
else:
100-
raise RuntimeError(f"{response.status_code}: {response.text}")
102+
with self._api_rate_limiter:
103+
response: requests.Response = self._api.get_user_data_filters(ws_id)
104+
if response.ok:
105+
return response.json()
106+
else:
107+
raise RuntimeError(f"{response.status_code}: {response.text}")
101108

102109
def _store_user_data_filters(
103110
self,
@@ -142,14 +149,17 @@ def _write_to_yaml(path: str, source: Any) -> None:
142149

143150
def _get_automations_from_api(self, workspace_id: str) -> Any:
144151
"""Returns automations for the workspace as JSON."""
145-
response: requests.Response = self._api.get_automations(workspace_id)
146-
if response.ok:
147-
return response.json()
148-
else:
149-
raise RuntimeError(
150-
f"Failed to get automations for {workspace_id}. "
151-
+ f"{response.status_code}: {response.text}"
152+
with self._api_rate_limiter:
153+
response: requests.Response = self._api.get_automations(
154+
workspace_id
152155
)
156+
if response.ok:
157+
return response.json()
158+
else:
159+
raise RuntimeError(
160+
f"Failed to get automations for {workspace_id}. "
161+
+ f"{response.status_code}: {response.text}"
162+
)
153163

154164
def _store_automations(self, export_path: Path, workspace_id: str) -> None:
155165
"""Stores the automations in the specified export path."""
@@ -181,7 +191,8 @@ def store_declarative_filter_views(
181191
) -> None:
182192
"""Stores the filter views in the specified export path."""
183193
# Get the filter views YAML files from the API
184-
self._api.store_declarative_filter_views(workspace_id, export_path)
194+
with self._api_rate_limiter:
195+
self._api.store_declarative_filter_views(workspace_id, export_path)
185196

186197
# Move filter views to the subfolder containing the analytics model
187198
self._move_folder(
@@ -229,7 +240,10 @@ def _get_workspace_export(
229240
# the SDK. That way we could save and package all the declarations
230241
# directly instead of reorganizing the folder structures. That should
231242
# be more transparent/readable and possibly safer for threading
232-
self._api.store_declarative_workspace(workspace_id, export_path)
243+
with self._api_rate_limiter:
244+
self._api.store_declarative_workspace(
245+
workspace_id, export_path
246+
)
233247
self.store_declarative_filter_views(export_path, workspace_id)
234248
self._store_automations(export_path, workspace_id)
235249

gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class DirNames:
2525
class ApiDefaults:
2626
DEFAULT_PAGE_SIZE = 100
2727
DEFAULT_BATCH_SIZE = 100
28+
DEFAULT_API_CALLS_PER_SECOND = 1.0
2829

2930

3031
@dataclass(frozen=True)

gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ class BackupRestoreConfig(BaseModel):
8383
description="Batch size must be greater than 0",
8484
),
8585
] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE)
86+
api_calls_per_second: Annotated[
87+
float,
88+
Field(
89+
gt=0,
90+
description="Maximum API calls per second (rate limiting)",
91+
),
92+
] = Field(default=BackupSettings.DEFAULT_API_CALLS_PER_SECOND)
8693

8794
@classmethod
8895
def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig":
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# (C) 2025 GoodData Corporation
2+
3+
"""
4+
Utility modules for gooddata-pipelines package.
5+
"""
6+
7+
from .rate_limiter import RateLimiter
8+
9+
__all__ = ["RateLimiter"]
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# (C) 2025 GoodData Corporation
2+
3+
import time
4+
import threading
5+
from typing import Any, Optional
6+
7+
8+
class RateLimiter:
9+
"""
10+
A rate limiter that ensures a minimum interval between operations.
11+
12+
This class can be used to limit the rate of API calls, file operations, or any
13+
other operations that need to be throttled to avoid overwhelming external systems.
14+
"""
15+
16+
def __init__(
17+
self,
18+
calls_per_second: float = 1.0,
19+
name: Optional[str] = None,
20+
logger: Optional[Any] = None,
21+
) -> None:
22+
"""
23+
Initialize the rate limiter.
24+
25+
Args:
26+
calls_per_second: Maximum number of calls allowed per second
27+
name: Optional name for logging purposes
28+
logger: Optional logger instance for rate limiting messages
29+
"""
30+
if calls_per_second <= 0:
31+
raise ValueError("calls_per_second must be greater than 0")
32+
33+
self.calls_per_second = calls_per_second
34+
self.min_interval = 1.0 / calls_per_second
35+
self.name = name or "RateLimiter"
36+
self.logger = logger
37+
38+
# Thread-safe tracking of last call time
39+
self._lock = threading.Lock()
40+
self._last_call_time = 0.0
41+
42+
def wait_if_needed(self) -> float:
43+
"""
44+
Wait if necessary to maintain the rate limit.
45+
46+
Returns:
47+
The actual time waited (0 if no wait was needed)
48+
"""
49+
with self._lock:
50+
current_time = time.time()
51+
time_since_last_call = current_time - self._last_call_time
52+
53+
if time_since_last_call < self.min_interval:
54+
sleep_time = self.min_interval - time_since_last_call
55+
56+
if self.logger:
57+
self.logger.info(
58+
f"{self.name}: Rate limiting active, waiting {sleep_time:.2f} seconds "
59+
f"(limit: {self.calls_per_second} calls/sec)"
60+
)
61+
62+
time.sleep(sleep_time)
63+
self._last_call_time = time.time()
64+
return sleep_time
65+
else:
66+
self._last_call_time = current_time
67+
return 0.0
68+
69+
def __enter__(self) -> "RateLimiter":
70+
self.wait_if_needed()
71+
return self
72+
73+
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
74+
pass
75+
76+
def reset(self) -> None:
77+
"""Reset the rate limiter state (useful for testing)."""
78+
with self._lock:
79+
self._last_call_time = 0.0
80+
81+
@property
82+
def time_until_next_call(self) -> float:
83+
"""
84+
Get the time (in seconds) until the next call can be made without waiting.
85+
"""
86+
with self._lock:
87+
current_time = time.time()
88+
time_since_last_call = current_time - self._last_call_time
89+
90+
if time_since_last_call < self.min_interval:
91+
return self.min_interval - time_since_last_call
92+
else:
93+
return 0.0

gooddata-pipelines/tests/backup_and_restore/test_backup.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import shutil
55
import tempfile
6-
import threading
76
from pathlib import Path
87
from unittest import mock
98

@@ -325,7 +324,6 @@ def test_process_batch_success(
325324

326325
backup_manager._process_batch(
327326
batch=batch,
328-
stop_event=threading.Event(),
329327
retry_count=0,
330328
)
331329

@@ -362,7 +360,6 @@ def fail_once(*args, **kwargs):
362360

363361
backup_manager._process_batch(
364362
batch=batch,
365-
stop_event=threading.Event(),
366363
)
367364

368365
assert get_workspace_export_mock.call_count == 2
@@ -392,7 +389,6 @@ def test_process_batch_raises_after_max_retries(
392389
with pytest.raises(Exception) as exc_info:
393390
backup_manager._process_batch(
394391
batch=batch,
395-
stop_event=threading.Event(),
396392
retry_count=BackupSettings.MAX_RETRIES,
397393
)
398394
assert str(exc_info.value) == "fail"

0 commit comments

Comments
 (0)