Skip to content

Commit c1e6543

Browse files
committed
feat(gooddata-pipelines): Process workspac backup sequentially
1 parent be7d8d5 commit c1e6543

File tree

3 files changed

+11
-63
lines changed

3 files changed

+11
-63
lines changed

gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py

Lines changed: 9 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
import os
55
import shutil
66
import tempfile
7-
import threading
87
import time
98
import traceback
10-
from concurrent.futures import ThreadPoolExecutor, as_completed
119
from dataclasses import dataclass
1210
from pathlib import Path
1311
from typing import Any, Type
@@ -291,18 +289,13 @@ def _split_to_batches(
291289
def _process_batch(
292290
self,
293291
batch: BackupBatch,
294-
stop_event: threading.Event,
295292
retry_count: int = 0,
296293
) -> None:
297294
"""Processes a single batch of workspaces for backup.
298295
If the batch processing fails, the function will wait
299296
and retry with exponential backoff up to BackupSettings.MAX_RETRIES.
300297
The base wait time is defined by BackupSettings.RETRY_DELAY.
301298
"""
302-
if stop_event.is_set():
303-
# If the stop_event flag is set, return. This will terminate the thread
304-
return
305-
306299
try:
307300
with tempfile.TemporaryDirectory() as tmpdir:
308301
self._get_workspace_export(tmpdir, batch.list_of_ids)
@@ -314,10 +307,7 @@ def _process_batch(
314307
self.storage.export(tmpdir, self.org_id)
315308

316309
except Exception as e:
317-
if stop_event.is_set():
318-
return
319-
320-
elif retry_count < BackupSettings.MAX_RETRIES:
310+
if retry_count < BackupSettings.MAX_RETRIES:
321311
# Retry with exponential backoff until MAX_RETRIES
322312
next_retry = retry_count + 1
323313
wait_time = BackupSettings.RETRY_DELAY**next_retry
@@ -328,52 +318,23 @@ def _process_batch(
328318
)
329319

330320
time.sleep(wait_time)
331-
self._process_batch(batch, stop_event, next_retry)
321+
self._process_batch(batch, next_retry)
332322
else:
333323
# If the batch fails after MAX_RETRIES, raise the error
334324
self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}")
335325
raise
336326

337-
def _process_batches_in_parallel(
327+
def _process_batches(
338328
self,
339329
batches: list[BackupBatch],
340330
) -> None:
341331
"""
342-
Processes batches in parallel using concurrent.futures. Will stop the processing
343-
if any one of the batches fails.
332+
Processes batches sequentially to avoid overloading the API.
333+
If any batch fails, the processing will stop.
344334
"""
345-
346-
# Create a threading flag to control the threads that have already been started
347-
stop_event = threading.Event()
348-
349-
with ThreadPoolExecutor(
350-
max_workers=self.config.max_workers
351-
) as executor:
352-
# Set the futures tasks.
353-
futures = []
354-
for batch in batches:
355-
futures.append(
356-
executor.submit(
357-
self._process_batch,
358-
batch,
359-
stop_event,
360-
)
361-
)
362-
363-
# Process futures as they complete
364-
for future in as_completed(futures):
365-
try:
366-
future.result()
367-
except Exception:
368-
# On failure, set the flag to True - signal running processes to stop
369-
stop_event.set()
370-
371-
# Cancel unstarted threads
372-
for f in futures:
373-
if not f.done():
374-
f.cancel()
375-
376-
raise
335+
for i, batch in enumerate(batches, 1):
336+
self.logger.info(f"Processing batch {i}/{len(batches)}...")
337+
self._process_batch(batch)
377338

378339
def backup_workspaces(
379340
self,
@@ -440,7 +401,7 @@ def _backup(
440401
f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches."
441402
)
442403

443-
self._process_batches_in_parallel(batches)
404+
self._process_batches(batches)
444405

445406
self.logger.info("Backup completed")
446407
except Exception as e:

gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,14 @@ class DirNames:
2121
UDF = "user_data_filters"
2222

2323

24-
@dataclass(frozen=True)
25-
class ConcurrencyDefaults:
26-
MAX_WORKERS = 1
27-
DEFAULT_BATCH_SIZE = 100
28-
29-
3024
@dataclass(frozen=True)
3125
class ApiDefaults:
3226
DEFAULT_PAGE_SIZE = 100
27+
DEFAULT_BATCH_SIZE = 100
3328

3429

3530
@dataclass(frozen=True)
36-
class BackupSettings(ConcurrencyDefaults, ApiDefaults):
31+
class BackupSettings(ApiDefaults):
3732
MAX_RETRIES = 3
3833
RETRY_DELAY = 5 # seconds
3934
TIMESTAMP_SDK_FOLDER = (

gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,6 @@ class BackupRestoreConfig(BaseModel):
8383
description="Batch size must be greater than 0",
8484
),
8585
] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE)
86-
max_workers: Annotated[
87-
int,
88-
Field(
89-
gt=0,
90-
lt=3,
91-
description="Max workers must be greater than 0 and less than 3",
92-
),
93-
] = Field(default=BackupSettings.MAX_WORKERS)
9486

9587
@classmethod
9688
def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig":

0 commit comments

Comments
 (0)