Skip to content

Commit bfe8f11

Browse files
committed
feat(gooddata-pipelines): Add rate militer and use it on workspace backup
1 parent c1e6543 commit bfe8f11

File tree

7 files changed

+284
-18
lines changed

7 files changed

+284
-18
lines changed

gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
S3Storage,
3838
)
3939
from gooddata_pipelines.logger import LogObserver
40+
from gooddata_pipelines.utils.rate_limiter import RateLimiter
4041

4142

4243
@dataclass
@@ -58,6 +59,10 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig):
5859

5960
self.loader = BackupInputProcessor(self._api, self.config.api_page_size)
6061

62+
self._api_rate_limiter = RateLimiter(
63+
calls_per_second=self.config.api_calls_per_second,
64+
)
65+
6166
@classmethod
6267
def create(
6368
cls: Type["BackupManager"],
@@ -93,11 +98,12 @@ def _get_storage(conf: BackupRestoreConfig) -> BackupStorage:
9398

9499
def get_user_data_filters(self, ws_id: str) -> dict:
95100
"""Returns the user data filters for the specified workspace."""
96-
response: requests.Response = self._api.get_user_data_filters(ws_id)
97-
if response.ok:
98-
return response.json()
99-
else:
100-
raise RuntimeError(f"{response.status_code}: {response.text}")
101+
with self._api_rate_limiter:
102+
response: requests.Response = self._api.get_user_data_filters(ws_id)
103+
if response.ok:
104+
return response.json()
105+
else:
106+
raise RuntimeError(f"{response.status_code}: {response.text}")
101107

102108
def _store_user_data_filters(
103109
self,
@@ -142,14 +148,17 @@ def _write_to_yaml(path: str, source: Any) -> None:
142148

143149
def _get_automations_from_api(self, workspace_id: str) -> Any:
144150
"""Returns automations for the workspace as JSON."""
145-
response: requests.Response = self._api.get_automations(workspace_id)
146-
if response.ok:
147-
return response.json()
148-
else:
149-
raise RuntimeError(
150-
f"Failed to get automations for {workspace_id}. "
151-
+ f"{response.status_code}: {response.text}"
151+
with self._api_rate_limiter:
152+
response: requests.Response = self._api.get_automations(
153+
workspace_id
152154
)
155+
if response.ok:
156+
return response.json()
157+
else:
158+
raise RuntimeError(
159+
f"Failed to get automations for {workspace_id}. "
160+
+ f"{response.status_code}: {response.text}"
161+
)
153162

154163
def _store_automations(self, export_path: Path, workspace_id: str) -> None:
155164
"""Stores the automations in the specified export path."""
@@ -181,7 +190,8 @@ def store_declarative_filter_views(
181190
) -> None:
182191
"""Stores the filter views in the specified export path."""
183192
# Get the filter views YAML files from the API
184-
self._api.store_declarative_filter_views(workspace_id, export_path)
193+
with self._api_rate_limiter:
194+
self._api.store_declarative_filter_views(workspace_id, export_path)
185195

186196
# Move filter views to the subfolder containing the analytics model
187197
self._move_folder(
@@ -229,7 +239,10 @@ def _get_workspace_export(
229239
# the SDK. That way we could save and package all the declarations
230240
# directly instead of reorganizing the folder structures. That should
231241
# be more transparent/readable and possibly safer for threading
232-
self._api.store_declarative_workspace(workspace_id, export_path)
242+
with self._api_rate_limiter:
243+
self._api.store_declarative_workspace(
244+
workspace_id, export_path
245+
)
233246
self.store_declarative_filter_views(export_path, workspace_id)
234247
self._store_automations(export_path, workspace_id)
235248

gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class DirNames:
2525
class ApiDefaults:
2626
DEFAULT_PAGE_SIZE = 100
2727
DEFAULT_BATCH_SIZE = 100
28+
DEFAULT_API_CALLS_PER_SECOND = 1.0
2829

2930

3031
@dataclass(frozen=True)

gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ class BackupRestoreConfig(BaseModel):
8383
description="Batch size must be greater than 0",
8484
),
8585
] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE)
86+
api_calls_per_second: Annotated[
87+
float,
88+
Field(
89+
gt=0,
90+
description="Maximum API calls per second (rate limiting)",
91+
),
92+
] = Field(default=BackupSettings.DEFAULT_API_CALLS_PER_SECOND)
8693

8794
@classmethod
8895
def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig":
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# (C) 2025 GoodData Corporation
2+
3+
"""
4+
Utility modules for gooddata-pipelines package.
5+
"""
6+
7+
from .rate_limiter import RateLimiter
8+
9+
__all__ = ["RateLimiter"]
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# (C) 2025 GoodData Corporation
2+
3+
import time
4+
import threading
5+
import functools
6+
from typing import Callable, Any, Literal
7+
8+
9+
class RateLimiter:
10+
"""
11+
Rate limiter usable as a decorator and as a context manager.
12+
- Shared instance decorator: limiter = RateLimiter(); @limiter
13+
- Per-function decorator: @RateLimiter(calls_per_second=2)
14+
- Context manager: with RateLimiter(2): ...
15+
"""
16+
17+
def __init__(self, calls_per_second: float = 1.0) -> None:
18+
if calls_per_second <= 0:
19+
raise ValueError("calls_per_second must be greater than 0")
20+
21+
self.calls_per_second = calls_per_second
22+
self.min_interval = 1.0 / calls_per_second
23+
24+
self._lock = threading.Lock()
25+
self._last_call_time = 0.0
26+
27+
def wait_if_needed(self) -> float:
28+
"""Sleep if needed to maintain the rate limit, return actual sleep time."""
29+
with self._lock:
30+
now = time.monotonic()
31+
since_last = now - self._last_call_time
32+
33+
if since_last < self.min_interval:
34+
sleep_time = self.min_interval - since_last
35+
time.sleep(sleep_time)
36+
self._last_call_time = time.monotonic()
37+
return sleep_time
38+
else:
39+
self._last_call_time = now
40+
return 0.0
41+
42+
# Decorator support
43+
def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
44+
@functools.wraps(func)
45+
def wrapper(*args, **kwargs):
46+
self.wait_if_needed()
47+
return func(*args, **kwargs)
48+
49+
return wrapper
50+
51+
# Context manager support
52+
def __enter__(self) -> "RateLimiter":
53+
self.wait_if_needed()
54+
return self
55+
56+
def __exit__(
57+
self, exc_type: Any, exc_val: Any, exc_tb: Any
58+
) -> Literal[False]:
59+
return False
60+
61+
def reset(self) -> None:
62+
"""Reset the limiter (useful in tests)."""
63+
with self._lock:
64+
self._last_call_time = 0.0

gooddata-pipelines/tests/backup_and_restore/test_backup.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import shutil
55
import tempfile
6-
import threading
76
from pathlib import Path
87
from unittest import mock
98

@@ -325,7 +324,6 @@ def test_process_batch_success(
325324

326325
backup_manager._process_batch(
327326
batch=batch,
328-
stop_event=threading.Event(),
329327
retry_count=0,
330328
)
331329

@@ -362,7 +360,6 @@ def fail_once(*args, **kwargs):
362360

363361
backup_manager._process_batch(
364362
batch=batch,
365-
stop_event=threading.Event(),
366363
)
367364

368365
assert get_workspace_export_mock.call_count == 2
@@ -392,7 +389,6 @@ def test_process_batch_raises_after_max_retries(
392389
with pytest.raises(Exception) as exc_info:
393390
backup_manager._process_batch(
394391
batch=batch,
395-
stop_event=threading.Event(),
396392
retry_count=BackupSettings.MAX_RETRIES,
397393
)
398394
assert str(exc_info.value) == "fail"
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
# (C) 2025 GoodData Corporation
2+
3+
import time
4+
import pytest
5+
from gooddata_pipelines.utils.rate_limiter import RateLimiter
6+
7+
8+
# ---------------------------
9+
# Core wait + reset behavior
10+
# ---------------------------
11+
12+
13+
def test_rate_limiter_no_wait_needed():
14+
limiter = RateLimiter(calls_per_second=1000.0) # Very fast limit
15+
waited = limiter.wait_if_needed()
16+
assert waited == pytest.approx(0.0, abs=0.001)
17+
18+
19+
def test_rate_limiter_enforces_delay():
20+
limiter = RateLimiter(calls_per_second=2.0)
21+
limiter.wait_if_needed()
22+
start = time.time()
23+
waited = limiter.wait_if_needed()
24+
duration = time.time() - start
25+
26+
assert waited >= 0.49
27+
assert duration < 0.65
28+
29+
30+
def test_rate_limiter_respects_reset():
31+
limiter = RateLimiter(calls_per_second=1.0)
32+
limiter.wait_if_needed()
33+
limiter.reset()
34+
waited = limiter.wait_if_needed()
35+
assert waited == pytest.approx(0.0, abs=0.001)
36+
37+
38+
def test_rate_limiter_min_interval_property():
39+
limiter = RateLimiter(calls_per_second=4.0)
40+
assert limiter.min_interval == pytest.approx(0.25, abs=1e-9)
41+
42+
43+
# -----------------------------------------
44+
# Decorator: shared instance (@limiter)
45+
# -----------------------------------------
46+
47+
48+
def test_rate_limiter_as_decorator_enforces_delay_shared_instance():
49+
limiter = RateLimiter(calls_per_second=2.0)
50+
ts = []
51+
52+
@limiter
53+
def func():
54+
ts.append(time.time())
55+
56+
func()
57+
func()
58+
59+
assert len(ts) == 2
60+
assert ts[1] - ts[0] >= 0.49
61+
62+
63+
def test_rate_limiter_decorator_shared_state_across_functions():
64+
limiter = RateLimiter(calls_per_second=2.0)
65+
ts = []
66+
67+
@limiter
68+
def func_a():
69+
ts.append(time.time())
70+
71+
@limiter
72+
def func_b():
73+
ts.append(time.time())
74+
75+
func_a()
76+
func_b() # should be throttled by the *same* limiter
77+
assert len(ts) == 2
78+
assert ts[1] - ts[0] >= 0.49
79+
80+
81+
def test_multiple_limiters_independent_state_shared_instance_mode():
82+
limiter_a = RateLimiter(calls_per_second=2.0)
83+
limiter_b = RateLimiter(calls_per_second=2.0)
84+
85+
ts_a = []
86+
ts_b = []
87+
88+
@limiter_a
89+
def func_a():
90+
ts_a.append(time.time())
91+
92+
@limiter_b
93+
def func_b():
94+
ts_b.append(time.time())
95+
96+
func_a()
97+
func_b()
98+
99+
# They should be ~simultaneous since they use different instances
100+
assert abs(ts_a[0] - ts_b[0]) < 0.05
101+
102+
103+
# -------------------------------------------------------
104+
# Decorator: per-function instance (@RateLimiter(...))
105+
# -------------------------------------------------------
106+
107+
108+
def test_per_function_decorator_enforces_delay_per_function():
109+
# Each function decorated this way gets its *own* limiter instance.
110+
ts = []
111+
112+
@RateLimiter(calls_per_second=2.0) # 0.5s
113+
def func():
114+
ts.append(time.time())
115+
116+
func()
117+
func()
118+
assert len(ts) == 2
119+
assert ts[1] - ts[0] >= 0.49
120+
121+
122+
def test_per_function_decorator_independent_state_between_functions():
123+
ts_a = []
124+
ts_b = []
125+
126+
@RateLimiter(calls_per_second=2.0)
127+
def func_a():
128+
ts_a.append(time.time())
129+
130+
@RateLimiter(calls_per_second=2.0)
131+
def func_b():
132+
ts_b.append(time.time())
133+
134+
func_a()
135+
func_b() # independent limiter, so no enforced delay between A and B
136+
assert abs(ts_a[0] - ts_b[0]) < 0.05
137+
138+
139+
# -----------------------------
140+
# Context manager usage
141+
# -----------------------------
142+
143+
144+
def test_context_manager_waits_on_enter():
145+
limiter = RateLimiter(calls_per_second=2.0)
146+
with limiter:
147+
t1 = time.time()
148+
with limiter:
149+
t2 = time.time()
150+
151+
# The second 'with' should be at least 0.5s after the first 'with' enter
152+
assert t2 - t1 >= 0.49
153+
154+
155+
def test_context_manager_multiple_uses_same_instance():
156+
limiter = RateLimiter(calls_per_second=3.0)
157+
times = []
158+
159+
for _ in range(3):
160+
with limiter:
161+
times.append(time.time())
162+
163+
intervals = [b - a for a, b in zip(times, times[1:])]
164+
for iv in intervals:
165+
assert iv >= 0.30 # a bit of slack for timing variance
166+
167+
168+
def test_context_manager_propagates_exceptions():
169+
limiter = RateLimiter(calls_per_second=10.0)
170+
171+
class Boom(Exception):
172+
pass
173+
174+
with pytest.raises(Boom):
175+
with limiter:
176+
raise Boom("fail")

0 commit comments

Comments
 (0)