Skip to content

Commit e907728

Browse files
authored
Merge pull request #1138 from gooddata/aben/parallel-gone
feat(gooddata-pipelines): Process workspace backup sequentially
2 parents be7d8d5 + 1d466f5 commit e907728

File tree

7 files changed

+292
-78
lines changed

7 files changed

+292
-78
lines changed

gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py

Lines changed: 36 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
import os
55
import shutil
66
import tempfile
7-
import threading
87
import time
98
import traceback
10-
from concurrent.futures import ThreadPoolExecutor, as_completed
119
from dataclasses import dataclass
1210
from pathlib import Path
1311
from typing import Any, Type
@@ -39,6 +37,7 @@
3937
S3Storage,
4038
)
4139
from gooddata_pipelines.logger import LogObserver
40+
from gooddata_pipelines.utils.rate_limiter import RateLimiter
4241

4342

4443
@dataclass
@@ -60,6 +59,10 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig):
6059

6160
self.loader = BackupInputProcessor(self._api, self.config.api_page_size)
6261

62+
self._api_rate_limiter = RateLimiter(
63+
calls_per_second=self.config.api_calls_per_second,
64+
)
65+
6366
@classmethod
6467
def create(
6568
cls: Type["BackupManager"],
@@ -95,11 +98,12 @@ def _get_storage(conf: BackupRestoreConfig) -> BackupStorage:
9598

9699
def get_user_data_filters(self, ws_id: str) -> dict:
97100
"""Returns the user data filters for the specified workspace."""
98-
response: requests.Response = self._api.get_user_data_filters(ws_id)
99-
if response.ok:
100-
return response.json()
101-
else:
102-
raise RuntimeError(f"{response.status_code}: {response.text}")
101+
with self._api_rate_limiter:
102+
response: requests.Response = self._api.get_user_data_filters(ws_id)
103+
if response.ok:
104+
return response.json()
105+
else:
106+
raise RuntimeError(f"{response.status_code}: {response.text}")
103107

104108
def _store_user_data_filters(
105109
self,
@@ -144,14 +148,17 @@ def _write_to_yaml(path: str, source: Any) -> None:
144148

145149
def _get_automations_from_api(self, workspace_id: str) -> Any:
146150
"""Returns automations for the workspace as JSON."""
147-
response: requests.Response = self._api.get_automations(workspace_id)
148-
if response.ok:
149-
return response.json()
150-
else:
151-
raise RuntimeError(
152-
f"Failed to get automations for {workspace_id}. "
153-
+ f"{response.status_code}: {response.text}"
151+
with self._api_rate_limiter:
152+
response: requests.Response = self._api.get_automations(
153+
workspace_id
154154
)
155+
if response.ok:
156+
return response.json()
157+
else:
158+
raise RuntimeError(
159+
f"Failed to get automations for {workspace_id}. "
160+
+ f"{response.status_code}: {response.text}"
161+
)
155162

156163
def _store_automations(self, export_path: Path, workspace_id: str) -> None:
157164
"""Stores the automations in the specified export path."""
@@ -183,7 +190,8 @@ def store_declarative_filter_views(
183190
) -> None:
184191
"""Stores the filter views in the specified export path."""
185192
# Get the filter views YAML files from the API
186-
self._api.store_declarative_filter_views(workspace_id, export_path)
193+
with self._api_rate_limiter:
194+
self._api.store_declarative_filter_views(workspace_id, export_path)
187195

188196
# Move filter views to the subfolder containing the analytics model
189197
self._move_folder(
@@ -231,7 +239,10 @@ def _get_workspace_export(
231239
# the SDK. That way we could save and package all the declarations
232240
# directly instead of reorganizing the folder structures. That should
233241
# be more transparent/readable and possibly safer for threading
234-
self._api.store_declarative_workspace(workspace_id, export_path)
242+
with self._api_rate_limiter:
243+
self._api.store_declarative_workspace(
244+
workspace_id, export_path
245+
)
235246
self.store_declarative_filter_views(export_path, workspace_id)
236247
self._store_automations(export_path, workspace_id)
237248

@@ -291,18 +302,13 @@ def _split_to_batches(
291302
def _process_batch(
292303
self,
293304
batch: BackupBatch,
294-
stop_event: threading.Event,
295305
retry_count: int = 0,
296306
) -> None:
297307
"""Processes a single batch of workspaces for backup.
298308
If the batch processing fails, the function will wait
299309
and retry with exponential backoff up to BackupSettings.MAX_RETRIES.
300310
The base wait time is defined by BackupSettings.RETRY_DELAY.
301311
"""
302-
if stop_event.is_set():
303-
# If the stop_event flag is set, return. This will terminate the thread
304-
return
305-
306312
try:
307313
with tempfile.TemporaryDirectory() as tmpdir:
308314
self._get_workspace_export(tmpdir, batch.list_of_ids)
@@ -314,10 +320,7 @@ def _process_batch(
314320
self.storage.export(tmpdir, self.org_id)
315321

316322
except Exception as e:
317-
if stop_event.is_set():
318-
return
319-
320-
elif retry_count < BackupSettings.MAX_RETRIES:
323+
if retry_count < BackupSettings.MAX_RETRIES:
321324
# Retry with exponential backoff until MAX_RETRIES
322325
next_retry = retry_count + 1
323326
wait_time = BackupSettings.RETRY_DELAY**next_retry
@@ -328,52 +331,23 @@ def _process_batch(
328331
)
329332

330333
time.sleep(wait_time)
331-
self._process_batch(batch, stop_event, next_retry)
334+
self._process_batch(batch, next_retry)
332335
else:
333336
# If the batch fails after MAX_RETRIES, raise the error
334337
self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}")
335338
raise
336339

337-
def _process_batches_in_parallel(
340+
def _process_batches(
338341
self,
339342
batches: list[BackupBatch],
340343
) -> None:
341344
"""
342-
Processes batches in parallel using concurrent.futures. Will stop the processing
343-
if any one of the batches fails.
345+
Processes batches sequentially to avoid overloading the API.
346+
If any batch fails, the processing will stop.
344347
"""
345-
346-
# Create a threading flag to control the threads that have already been started
347-
stop_event = threading.Event()
348-
349-
with ThreadPoolExecutor(
350-
max_workers=self.config.max_workers
351-
) as executor:
352-
# Set the futures tasks.
353-
futures = []
354-
for batch in batches:
355-
futures.append(
356-
executor.submit(
357-
self._process_batch,
358-
batch,
359-
stop_event,
360-
)
361-
)
362-
363-
# Process futures as they complete
364-
for future in as_completed(futures):
365-
try:
366-
future.result()
367-
except Exception:
368-
# On failure, set the flag to True - signal running processes to stop
369-
stop_event.set()
370-
371-
# Cancel unstarted threads
372-
for f in futures:
373-
if not f.done():
374-
f.cancel()
375-
376-
raise
348+
for i, batch in enumerate(batches, 1):
349+
self.logger.info(f"Processing batch {i}/{len(batches)}...")
350+
self._process_batch(batch)
377351

378352
def backup_workspaces(
379353
self,
@@ -440,7 +414,7 @@ def _backup(
440414
f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches."
441415
)
442416

443-
self._process_batches_in_parallel(batches)
417+
self._process_batches(batches)
444418

445419
self.logger.info("Backup completed")
446420
except Exception as e:

gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,15 @@ class DirNames:
2121
UDF = "user_data_filters"
2222

2323

24-
@dataclass(frozen=True)
25-
class ConcurrencyDefaults:
26-
MAX_WORKERS = 1
27-
DEFAULT_BATCH_SIZE = 100
28-
29-
3024
@dataclass(frozen=True)
3125
class ApiDefaults:
3226
DEFAULT_PAGE_SIZE = 100
27+
DEFAULT_BATCH_SIZE = 100
28+
DEFAULT_API_CALLS_PER_SECOND = 1.0
3329

3430

3531
@dataclass(frozen=True)
36-
class BackupSettings(ConcurrencyDefaults, ApiDefaults):
32+
class BackupSettings(ApiDefaults):
3733
MAX_RETRIES = 3
3834
RETRY_DELAY = 5 # seconds
3935
TIMESTAMP_SDK_FOLDER = (

gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,13 @@ class BackupRestoreConfig(BaseModel):
8383
description="Batch size must be greater than 0",
8484
),
8585
] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE)
86-
max_workers: Annotated[
87-
int,
86+
api_calls_per_second: Annotated[
87+
float,
8888
Field(
8989
gt=0,
90-
lt=3,
91-
description="Max workers must be greater than 0 and less than 3",
90+
description="Maximum API calls per second (rate limiting)",
9291
),
93-
] = Field(default=BackupSettings.MAX_WORKERS)
92+
] = Field(default=BackupSettings.DEFAULT_API_CALLS_PER_SECOND)
9493

9594
@classmethod
9695
def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig":
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# (C) 2025 GoodData Corporation
2+
3+
"""
4+
Utility modules for gooddata-pipelines package.
5+
"""
6+
7+
from .rate_limiter import RateLimiter
8+
9+
__all__ = ["RateLimiter"]
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# (C) 2025 GoodData Corporation
2+
3+
import time
4+
import threading
5+
import functools
6+
from typing import Callable, Any, Literal
7+
8+
9+
class RateLimiter:
10+
"""
11+
Rate limiter usable as a decorator and as a context manager.
12+
- Shared instance decorator: limiter = RateLimiter(); @limiter
13+
- Per-function decorator: @RateLimiter(calls_per_second=2)
14+
- Context manager: with RateLimiter(2): ...
15+
"""
16+
17+
def __init__(self, calls_per_second: float = 1.0) -> None:
18+
if calls_per_second <= 0:
19+
raise ValueError("calls_per_second must be greater than 0")
20+
21+
self.calls_per_second = calls_per_second
22+
self.min_interval = 1.0 / calls_per_second
23+
24+
self._lock = threading.Lock()
25+
self._last_call_time = 0.0
26+
27+
def wait_if_needed(self) -> float:
28+
"""Sleep if needed to maintain the rate limit, return actual sleep time."""
29+
with self._lock:
30+
now = time.monotonic()
31+
since_last = now - self._last_call_time
32+
33+
if since_last < self.min_interval:
34+
sleep_time = self.min_interval - since_last
35+
time.sleep(sleep_time)
36+
self._last_call_time = time.monotonic()
37+
return sleep_time
38+
else:
39+
self._last_call_time = now
40+
return 0.0
41+
42+
# Decorator support
43+
def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
44+
@functools.wraps(func)
45+
def wrapper(*args: Any, **kwargs: Any) -> Any:
46+
self.wait_if_needed()
47+
return func(*args, **kwargs)
48+
49+
return wrapper
50+
51+
# Context manager support
52+
def __enter__(self) -> "RateLimiter":
53+
self.wait_if_needed()
54+
return self
55+
56+
def __exit__(
57+
self, exc_type: Any, exc_val: Any, exc_tb: Any
58+
) -> Literal[False]:
59+
return False
60+
61+
def reset(self) -> None:
62+
"""Reset the limiter (useful in tests)."""
63+
with self._lock:
64+
self._last_call_time = 0.0

gooddata-pipelines/tests/backup_and_restore/test_backup.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import shutil
55
import tempfile
6-
import threading
76
from pathlib import Path
87
from unittest import mock
98

@@ -325,7 +324,6 @@ def test_process_batch_success(
325324

326325
backup_manager._process_batch(
327326
batch=batch,
328-
stop_event=threading.Event(),
329327
retry_count=0,
330328
)
331329

@@ -362,7 +360,6 @@ def fail_once(*args, **kwargs):
362360

363361
backup_manager._process_batch(
364362
batch=batch,
365-
stop_event=threading.Event(),
366363
)
367364

368365
assert get_workspace_export_mock.call_count == 2
@@ -392,7 +389,6 @@ def test_process_batch_raises_after_max_retries(
392389
with pytest.raises(Exception) as exc_info:
393390
backup_manager._process_batch(
394391
batch=batch,
395-
stop_event=threading.Event(),
396392
retry_count=BackupSettings.MAX_RETRIES,
397393
)
398394
assert str(exc_info.value) == "fail"

0 commit comments

Comments
 (0)