Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig):

self.config = config

self.storage = self.get_storage(self.config)
self.storage = self._get_storage(self.config)
self.org_id = self._api.get_organization_id()

self.loader = BackupInputProcessor(self._api, self.config.api_page_size)
Expand All @@ -67,7 +67,7 @@ def create(
host: str,
token: str,
) -> "BackupManager":
"""Creates a backup worker instance using provided host and token."""
"""Creates a backup worker instance using the provided host and token."""
return cls(host=host, token=token, config=config)

@classmethod
Expand All @@ -81,7 +81,8 @@ def create_from_profile(
content = profile_content(profile, profiles_path)
return cls(**content, config=config)

def get_storage(self, conf: BackupRestoreConfig) -> BackupStorage:
@staticmethod
def _get_storage(conf: BackupRestoreConfig) -> BackupStorage:
"""Returns the storage class based on the storage type."""
if conf.storage_type == StorageType.S3:
return S3Storage(conf)
Expand All @@ -100,7 +101,7 @@ def get_user_data_filters(self, ws_id: str) -> dict:
else:
raise RuntimeError(f"{response.status_code}: {response.text}")

def store_user_data_filters(
def _store_user_data_filters(
self,
user_data_filters: dict,
export_path: Path,
Expand Down Expand Up @@ -128,20 +129,20 @@ def store_user_data_filters(
"user_data_filters",
filter["id"] + ".yaml",
)
self.write_to_yaml(udf_file_path, filter)
self._write_to_yaml(udf_file_path, filter)

@staticmethod
def move_folder(source: Path, destination: Path) -> None:
def _move_folder(source: Path, destination: Path) -> None:
"""Moves the source folder to the destination."""
shutil.move(source, destination)

@staticmethod
def write_to_yaml(path: str, source: Any) -> None:
def _write_to_yaml(path: str, source: Any) -> None:
"""Writes the source to a YAML file."""
with open(path, "w") as outfile:
yaml.dump(source, outfile)

def get_automations_from_api(self, workspace_id: str) -> Any:
def _get_automations_from_api(self, workspace_id: str) -> Any:
"""Returns automations for the workspace as JSON."""
response: requests.Response = self._api.get_automations(workspace_id)
if response.ok:
Expand All @@ -152,10 +153,10 @@ def get_automations_from_api(self, workspace_id: str) -> Any:
+ f"{response.status_code}: {response.text}"
)

def store_automations(self, export_path: Path, workspace_id: str) -> None:
def _store_automations(self, export_path: Path, workspace_id: str) -> None:
"""Stores the automations in the specified export path."""
# Get the automations from the API
automations: Any = self.get_automations_from_api(workspace_id)
automations: Any = self._get_automations_from_api(workspace_id)

automations_folder_path: Path = Path(
export_path,
Expand Down Expand Up @@ -184,8 +185,8 @@ def store_declarative_filter_views(
# Get the filter views YAML files from the API
self._api.store_declarative_filter_views(workspace_id, export_path)

# Move filter views to the subfolder containing analytics model
self.move_folder(
# Move filter views to the subfolder containing the analytics model
self._move_folder(
Path(export_path, "gooddata_layouts", self.org_id, "filter_views"),
Path(
export_path,
Expand All @@ -197,7 +198,7 @@ def store_declarative_filter_views(
),
)

def get_workspace_export(
def _get_workspace_export(
self,
local_target_path: str,
workspaces_to_export: list[str],
Expand Down Expand Up @@ -232,9 +233,9 @@ def get_workspace_export(
# be more transparent/readable and possibly safer for threading
self._api.store_declarative_workspace(workspace_id, export_path)
self.store_declarative_filter_views(export_path, workspace_id)
self.store_automations(export_path, workspace_id)
self._store_automations(export_path, workspace_id)

self.store_user_data_filters(
self._store_user_data_filters(
user_data_filters, export_path, workspace_id
)
self.logger.info(f"Stored export for {workspace_id}")
Expand All @@ -250,7 +251,7 @@ def get_workspace_export(
+ "is correct and that the workspaces exist."
)

def archive_gooddata_layouts_to_zip(self, folder: str) -> None:
def _archive_gooddata_layouts_to_zip(self, folder: str) -> None:
"""Archives the gooddata_layouts directory to a zip file."""
try:
target_subdir = ""
Expand All @@ -271,11 +272,12 @@ def archive_gooddata_layouts_to_zip(self, folder: str) -> None:
self.logger.error(f"Error archiving {folder} to zip: {e}")
raise

def split_to_batches(
self, workspaces_to_export: list[str], batch_size: int
@staticmethod
def _split_to_batches(
workspaces_to_export: list[str], batch_size: int
) -> list[BackupBatch]:
"""Splits the list of workspaces to into batches of the specified size.
The batch is respresented as a list of workspace IDs.
"""Splits the list of workspaces into batches of the specified size.
The batch is represented as a list of workspace IDs.
Returns a list of batches (i.e. list of lists of IDs)
"""
list_of_batches = []
Expand All @@ -286,7 +288,7 @@ def split_to_batches(

return list_of_batches

def process_batch(
def _process_batch(
self,
batch: BackupBatch,
stop_event: threading.Event,
Expand All @@ -298,14 +300,14 @@ def process_batch(
The base wait time is defined by BackupSettings.RETRY_DELAY.
"""
if stop_event.is_set():
# If the stop_event flag is set, return. This will terminate the thread.
# If the stop_event flag is set, return. This will terminate the thread
return

try:
with tempfile.TemporaryDirectory() as tmpdir:
self.get_workspace_export(tmpdir, batch.list_of_ids)
self._get_workspace_export(tmpdir, batch.list_of_ids)

self.archive_gooddata_layouts_to_zip(
self._archive_gooddata_layouts_to_zip(
str(Path(tmpdir, self.org_id))
)

Expand All @@ -316,7 +318,7 @@ def process_batch(
return

elif retry_count < BackupSettings.MAX_RETRIES:
# Retry with exponential backoff until MAX_RETRIES.
# Retry with exponential backoff until MAX_RETRIES
next_retry = retry_count + 1
wait_time = BackupSettings.RETRY_DELAY**next_retry
self.logger.info(
Expand All @@ -326,13 +328,13 @@ def process_batch(
)

time.sleep(wait_time)
self.process_batch(batch, stop_event, next_retry)
self._process_batch(batch, stop_event, next_retry)
else:
# If the batch fails after MAX_RETRIES, raise the error.
# If the batch fails after MAX_RETRIES, raise the error
self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}")
raise

def process_batches_in_parallel(
def _process_batches_in_parallel(
self,
batches: list[BackupBatch],
) -> None:
Expand All @@ -345,14 +347,14 @@ def process_batches_in_parallel(
stop_event = threading.Event()

with ThreadPoolExecutor(
max_workers=BackupSettings.MAX_WORKERS
max_workers=self.config.max_workers
) as executor:
# Set the futures tasks.
futures = []
for batch in batches:
futures.append(
executor.submit(
self.process_batch,
self._process_batch,
batch,
stop_event,
)
Expand All @@ -363,10 +365,10 @@ def process_batches_in_parallel(
try:
future.result()
except Exception:
# On failure, set the flag to True - signal running processes to stop.
# On failure, set the flag to True - signal running processes to stop
stop_event.set()

# Cancel unstarted threads.
# Cancel unstarted threads
for f in futures:
if not f.done():
f.cancel()
Expand All @@ -383,58 +385,58 @@ def backup_workspaces(
workspace in storage specified in the configuration.

Args:
path_to_csv (str): Path to a CSV file containing a list of workspace IDs.
path_to_csv (str): Path to a CSV file containing a list of workspace IDs
workspace_ids (list[str]): List of workspace IDs
"""
self.backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids)
self._backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids)

def backup_hierarchies(
self, path_to_csv: str | None, workspace_ids: list[str] | None
) -> None:
"""Runs the backup process for a list of hierarchies.

Will take the list of workspace IDs or read the list of workspace IDs
from a CSV file and create backup for each those workspaces' hierarchies
from a CSV file and create backup for each of those workspaces' hierarchies
in storage specified in the configuration.
Workspace hierarchy means the workspace itself and all its direct and
indirect children.

Args:
path_to_csv (str): Path to a CSV file containing a list of workspace IDs.
path_to_csv (str): Path to a CSV file containing a list of workspace IDs
workspace_ids (list[str]): List of workspace IDs
"""
self.backup(InputType.HIERARCHY, path_to_csv, workspace_ids)
self._backup(InputType.HIERARCHY, path_to_csv, workspace_ids)

def backup_entire_organization(self) -> None:
"""Runs the backup process for the entire organization.

Will create backup for all workspaces in the organization in storage
specified in the configuration.
"""
self.backup(InputType.ORGANIZATION)
self._backup(InputType.ORGANIZATION)

def backup(
def _backup(
self,
input_type: InputType,
path_to_csv: str | None = None,
workspace_ids: list[str] | None = None,
) -> None:
"""Runs the backup process with selected input type."""
"""Runs the backup process with the selected input type."""
try:
workspaces_to_export: list[str] = self.loader.get_ids_to_backup(
input_type,
path_to_csv,
workspace_ids,
)
batches = self.split_to_batches(
batches = self._split_to_batches(
workspaces_to_export, self.config.batch_size
)

self.logger.info(
f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches."
)

self.process_batches_in_parallel(batches)
self._process_batches_in_parallel(batches)

self.logger.info("Backup completed")
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# (C) 2025 GoodData Corporation
import datetime
from dataclasses import dataclass

Expand All @@ -22,7 +23,7 @@ class DirNames:

@dataclass(frozen=True)
class ConcurrencyDefaults:
MAX_WORKERS = 2
MAX_WORKERS = 1
DEFAULT_BATCH_SIZE = 100


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,40 @@ class S3StorageConfig(BaseModel):

backup_path: str
bucket: str
profile: str = "default"
profile: Optional[str] = None
aws_access_key_id: Optional[str] = None
aws_secret_access_key: Optional[str] = None
aws_default_region: Optional[str] = None
aws_default_region: Optional[str] = "us-east-1"

@classmethod
def from_iam_role(cls, backup_path: str, bucket: str) -> "S3StorageConfig":
"""Use default IAM role or environment credentials."""
return cls(backup_path=backup_path, bucket=bucket)

@classmethod
def from_aws_credentials(
cls,
backup_path: str,
bucket: str,
aws_access_key_id: str,
aws_secret_access_key: str,
aws_default_region: str,
) -> "S3StorageConfig":
"""Use explicit AWS access keys and region."""
return cls(
backup_path=backup_path,
bucket=bucket,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
aws_default_region=aws_default_region,
)

@classmethod
def from_aws_profile(
cls, backup_path: str, bucket: str, profile: str
) -> "S3StorageConfig":
"""Use a named AWS CLI profile."""
return cls(backup_path=backup_path, bucket=bucket, profile=profile)


class LocalStorageConfig(BaseModel):
Expand Down Expand Up @@ -53,6 +83,14 @@ class BackupRestoreConfig(BaseModel):
description="Batch size must be greater than 0",
),
] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE)
max_workers: Annotated[
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the BackupManager class uses BackupSettings.MAX_WORKERS to limit the threads, not the value from BackupRestoreConfig... can you please change that in backup_manager.py?
Currently it has LN347: max_workers=BackupSettings.MAX_WORKERS which should be something like max_workers=config.max_workers

int,
Field(
gt=0,
lt=3,
description="Max workers must be greater than 0 and less than 3",
),
] = Field(default=BackupSettings.MAX_WORKERS)

@classmethod
def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig":
Expand Down
Loading
Loading