diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py index 0e8111ca9..acd2ea47f 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py @@ -55,7 +55,7 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig): self.config = config - self.storage = self.get_storage(self.config) + self.storage = self._get_storage(self.config) self.org_id = self._api.get_organization_id() self.loader = BackupInputProcessor(self._api, self.config.api_page_size) @@ -67,7 +67,7 @@ def create( host: str, token: str, ) -> "BackupManager": - """Creates a backup worker instance using provided host and token.""" + """Creates a backup worker instance using the provided host and token.""" return cls(host=host, token=token, config=config) @classmethod @@ -81,7 +81,8 @@ def create_from_profile( content = profile_content(profile, profiles_path) return cls(**content, config=config) - def get_storage(self, conf: BackupRestoreConfig) -> BackupStorage: + @staticmethod + def _get_storage(conf: BackupRestoreConfig) -> BackupStorage: """Returns the storage class based on the storage type.""" if conf.storage_type == StorageType.S3: return S3Storage(conf) @@ -100,7 +101,7 @@ def get_user_data_filters(self, ws_id: str) -> dict: else: raise RuntimeError(f"{response.status_code}: {response.text}") - def store_user_data_filters( + def _store_user_data_filters( self, user_data_filters: dict, export_path: Path, @@ -128,20 +129,20 @@ def store_user_data_filters( "user_data_filters", filter["id"] + ".yaml", ) - self.write_to_yaml(udf_file_path, filter) + self._write_to_yaml(udf_file_path, filter) @staticmethod - def move_folder(source: Path, destination: Path) -> None: + def _move_folder(source: Path, destination: Path) -> None: """Moves the source folder to the destination.""" shutil.move(source, destination) @staticmethod - def write_to_yaml(path: str, source: Any) -> None: + def _write_to_yaml(path: str, source: Any) -> None: """Writes the source to a YAML file.""" with open(path, "w") as outfile: yaml.dump(source, outfile) - def get_automations_from_api(self, workspace_id: str) -> Any: + def _get_automations_from_api(self, workspace_id: str) -> Any: """Returns automations for the workspace as JSON.""" response: requests.Response = self._api.get_automations(workspace_id) if response.ok: @@ -152,10 +153,10 @@ def get_automations_from_api(self, workspace_id: str) -> Any: + f"{response.status_code}: {response.text}" ) - def store_automations(self, export_path: Path, workspace_id: str) -> None: + def _store_automations(self, export_path: Path, workspace_id: str) -> None: """Stores the automations in the specified export path.""" # Get the automations from the API - automations: Any = self.get_automations_from_api(workspace_id) + automations: Any = self._get_automations_from_api(workspace_id) automations_folder_path: Path = Path( export_path, @@ -184,8 +185,8 @@ def store_declarative_filter_views( # Get the filter views YAML files from the API self._api.store_declarative_filter_views(workspace_id, export_path) - # Move filter views to the subfolder containing analytics model - self.move_folder( + # Move filter views to the subfolder containing the analytics model + self._move_folder( Path(export_path, "gooddata_layouts", self.org_id, "filter_views"), Path( export_path, @@ -197,7 +198,7 @@ def store_declarative_filter_views( ), ) - def get_workspace_export( + def _get_workspace_export( self, local_target_path: str, workspaces_to_export: list[str], @@ -232,9 +233,9 @@ def get_workspace_export( # be more transparent/readable and possibly safer for threading self._api.store_declarative_workspace(workspace_id, export_path) self.store_declarative_filter_views(export_path, workspace_id) - self.store_automations(export_path, workspace_id) + self._store_automations(export_path, workspace_id) - self.store_user_data_filters( + self._store_user_data_filters( user_data_filters, export_path, workspace_id ) self.logger.info(f"Stored export for {workspace_id}") @@ -250,7 +251,7 @@ def get_workspace_export( + "is correct and that the workspaces exist." ) - def archive_gooddata_layouts_to_zip(self, folder: str) -> None: + def _archive_gooddata_layouts_to_zip(self, folder: str) -> None: """Archives the gooddata_layouts directory to a zip file.""" try: target_subdir = "" @@ -271,11 +272,12 @@ def archive_gooddata_layouts_to_zip(self, folder: str) -> None: self.logger.error(f"Error archiving {folder} to zip: {e}") raise - def split_to_batches( - self, workspaces_to_export: list[str], batch_size: int + @staticmethod + def _split_to_batches( + workspaces_to_export: list[str], batch_size: int ) -> list[BackupBatch]: - """Splits the list of workspaces to into batches of the specified size. - The batch is respresented as a list of workspace IDs. + """Splits the list of workspaces into batches of the specified size. + The batch is represented as a list of workspace IDs. Returns a list of batches (i.e. list of lists of IDs) """ list_of_batches = [] @@ -286,7 +288,7 @@ def split_to_batches( return list_of_batches - def process_batch( + def _process_batch( self, batch: BackupBatch, stop_event: threading.Event, @@ -298,14 +300,14 @@ def process_batch( The base wait time is defined by BackupSettings.RETRY_DELAY. """ if stop_event.is_set(): - # If the stop_event flag is set, return. This will terminate the thread. + # If the stop_event flag is set, return. This will terminate the thread return try: with tempfile.TemporaryDirectory() as tmpdir: - self.get_workspace_export(tmpdir, batch.list_of_ids) + self._get_workspace_export(tmpdir, batch.list_of_ids) - self.archive_gooddata_layouts_to_zip( + self._archive_gooddata_layouts_to_zip( str(Path(tmpdir, self.org_id)) ) @@ -316,7 +318,7 @@ def process_batch( return elif retry_count < BackupSettings.MAX_RETRIES: - # Retry with exponential backoff until MAX_RETRIES. + # Retry with exponential backoff until MAX_RETRIES next_retry = retry_count + 1 wait_time = BackupSettings.RETRY_DELAY**next_retry self.logger.info( @@ -326,13 +328,13 @@ def process_batch( ) time.sleep(wait_time) - self.process_batch(batch, stop_event, next_retry) + self._process_batch(batch, stop_event, next_retry) else: - # If the batch fails after MAX_RETRIES, raise the error. + # If the batch fails after MAX_RETRIES, raise the error self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}") raise - def process_batches_in_parallel( + def _process_batches_in_parallel( self, batches: list[BackupBatch], ) -> None: @@ -345,14 +347,14 @@ def process_batches_in_parallel( stop_event = threading.Event() with ThreadPoolExecutor( - max_workers=BackupSettings.MAX_WORKERS + max_workers=self.config.max_workers ) as executor: # Set the futures tasks. futures = [] for batch in batches: futures.append( executor.submit( - self.process_batch, + self._process_batch, batch, stop_event, ) @@ -363,10 +365,10 @@ def process_batches_in_parallel( try: future.result() except Exception: - # On failure, set the flag to True - signal running processes to stop. + # On failure, set the flag to True - signal running processes to stop stop_event.set() - # Cancel unstarted threads. + # Cancel unstarted threads for f in futures: if not f.done(): f.cancel() @@ -383,10 +385,10 @@ def backup_workspaces( workspace in storage specified in the configuration. Args: - path_to_csv (str): Path to a CSV file containing a list of workspace IDs. + path_to_csv (str): Path to a CSV file containing a list of workspace IDs workspace_ids (list[str]): List of workspace IDs """ - self.backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids) + self._backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids) def backup_hierarchies( self, path_to_csv: str | None, workspace_ids: list[str] | None @@ -394,16 +396,16 @@ def backup_hierarchies( """Runs the backup process for a list of hierarchies. Will take the list of workspace IDs or read the list of workspace IDs - from a CSV file and create backup for each those workspaces' hierarchies + from a CSV file and create backup for each of those workspaces' hierarchies in storage specified in the configuration. Workspace hierarchy means the workspace itself and all its direct and indirect children. Args: - path_to_csv (str): Path to a CSV file containing a list of workspace IDs. + path_to_csv (str): Path to a CSV file containing a list of workspace IDs workspace_ids (list[str]): List of workspace IDs """ - self.backup(InputType.HIERARCHY, path_to_csv, workspace_ids) + self._backup(InputType.HIERARCHY, path_to_csv, workspace_ids) def backup_entire_organization(self) -> None: """Runs the backup process for the entire organization. @@ -411,22 +413,22 @@ def backup_entire_organization(self) -> None: Will create backup for all workspaces in the organization in storage specified in the configuration. """ - self.backup(InputType.ORGANIZATION) + self._backup(InputType.ORGANIZATION) - def backup( + def _backup( self, input_type: InputType, path_to_csv: str | None = None, workspace_ids: list[str] | None = None, ) -> None: - """Runs the backup process with selected input type.""" + """Runs the backup process with the selected input type.""" try: workspaces_to_export: list[str] = self.loader.get_ids_to_backup( input_type, path_to_csv, workspace_ids, ) - batches = self.split_to_batches( + batches = self._split_to_batches( workspaces_to_export, self.config.batch_size ) @@ -434,7 +436,7 @@ def backup( f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches." ) - self.process_batches_in_parallel(batches) + self._process_batches_in_parallel(batches) self.logger.info("Backup completed") except Exception as e: diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py index 27a9ce29b..3feee2ea1 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/constants.py @@ -1,3 +1,4 @@ +# (C) 2025 GoodData Corporation import datetime from dataclasses import dataclass @@ -22,7 +23,7 @@ class DirNames: @dataclass(frozen=True) class ConcurrencyDefaults: - MAX_WORKERS = 2 + MAX_WORKERS = 1 DEFAULT_BATCH_SIZE = 100 diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py index 8ec1add43..3b8209038 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py @@ -21,10 +21,40 @@ class S3StorageConfig(BaseModel): backup_path: str bucket: str - profile: str = "default" + profile: Optional[str] = None aws_access_key_id: Optional[str] = None aws_secret_access_key: Optional[str] = None - aws_default_region: Optional[str] = None + aws_default_region: Optional[str] = "us-east-1" + + @classmethod + def from_iam_role(cls, backup_path: str, bucket: str) -> "S3StorageConfig": + """Use default IAM role or environment credentials.""" + return cls(backup_path=backup_path, bucket=bucket) + + @classmethod + def from_aws_credentials( + cls, + backup_path: str, + bucket: str, + aws_access_key_id: str, + aws_secret_access_key: str, + aws_default_region: str, + ) -> "S3StorageConfig": + """Use explicit AWS access keys and region.""" + return cls( + backup_path=backup_path, + bucket=bucket, + aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key, + aws_default_region=aws_default_region, + ) + + @classmethod + def from_aws_profile( + cls, backup_path: str, bucket: str, profile: str + ) -> "S3StorageConfig": + """Use a named AWS CLI profile.""" + return cls(backup_path=backup_path, bucket=bucket, profile=profile) class LocalStorageConfig(BaseModel): @@ -53,6 +83,14 @@ class BackupRestoreConfig(BaseModel): description="Batch size must be greater than 0", ), ] = Field(default=BackupSettings.DEFAULT_BATCH_SIZE) + max_workers: Annotated[ + int, + Field( + gt=0, + lt=3, + description="Max workers must be greater than 0 and less than 3", + ), + ] = Field(default=BackupSettings.MAX_WORKERS) @classmethod def from_yaml(cls, conf_path: str) -> "BackupRestoreConfig": diff --git a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/storage/s3_storage.py b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/storage/s3_storage.py index 2a30f40cf..7a136a3e3 100644 --- a/gooddata-pipelines/gooddata_pipelines/backup_and_restore/storage/s3_storage.py +++ b/gooddata-pipelines/gooddata_pipelines/backup_and_restore/storage/s3_storage.py @@ -22,6 +22,7 @@ def __init__(self, conf: BackupRestoreConfig): self._config = conf.storage self._session = self._create_boto_session(self._config) + self._client = self._session.client("s3") self._resource = self._session.resource("s3") self._bucket = self._resource.Bucket(self._config.bucket) # type: ignore [missing library stubs] suffix = "/" if not self._config.backup_path.endswith("/") else "" @@ -43,32 +44,40 @@ def _create_boto_session(self, config: S3StorageConfig) -> boto3.Session: ) except Exception: self.logger.warning( - "Failed to create boto3 session with supplied credentials. Falling back to profile..." + "Failed to create boto3 session with supplied credentials." + ) + + if config.profile: + try: + return boto3.Session(profile_name=config.profile) + except Exception: + self.logger.warning( + f"AWS profile [{config.profile}] not found." ) try: - return boto3.Session(profile_name=config.profile) + return boto3.Session() except Exception: - self.logger.warning( - 'AWS profile "[default]" not found. Trying other fallback methods...' + self.logger.error( + "Failed to create boto3 session with default IAM role or environment credentials." + ) + raise RuntimeError( + "Unable to create AWS session. Please check your AWS credentials, profile, or IAM role configuration." ) - - return boto3.Session() def _verify_connection(self) -> None: """ Pings the S3 bucket to verify that the connection is working. """ try: - # TODO: install boto3 s3 stubs - self._resource.meta.client.head_bucket(Bucket=self._config.bucket) + self._client.head_bucket(Bucket=self._config.bucket) except Exception as e: raise RuntimeError( f"Failed to connect to S3 bucket {self._config.bucket}: {e}" ) def export(self, folder: str, org_id: str) -> None: - """Uploads the content of the folder to S3 as backup.""" + """Uploads the content of the folder to S3 as a backup.""" storage_path = f"{self._config.bucket}/{self._backup_path}" self.logger.info(f"Uploading {org_id} to {storage_path}") folder = f"{folder}/{org_id}" @@ -77,10 +86,12 @@ def export(self, folder: str, org_id: str) -> None: export_path = ( f"{self._backup_path}{org_id}/{full_path[len(folder) + 1 :]}/" ) - self._bucket.put_object(Key=export_path) + self._client.put_object(Bucket=self._config.bucket, Key=export_path) for file in files: full_path = os.path.join(subdir, file) with open(full_path, "rb") as data: export_path = f"{self._backup_path}{org_id}/{full_path[len(folder) + 1 :]}" - self._bucket.put_object(Key=export_path, Body=data) + self._client.put_object( + Bucket=self._config.bucket, Key=export_path, Body=data + ) diff --git a/gooddata-pipelines/tests/backup_and_restore/test_backup.py b/gooddata-pipelines/tests/backup_and_restore/test_backup.py index 81735a6ca..2aaba580d 100644 --- a/gooddata-pipelines/tests/backup_and_restore/test_backup.py +++ b/gooddata-pipelines/tests/backup_and_restore/test_backup.py @@ -106,13 +106,13 @@ def assert_not_called_with(target, *args, **kwargs): def test_get_s3_storage(backup_manager): """Test get_storage method with literal string as input.""" - s3_storage = backup_manager.get_storage(S3_CONFIG) + s3_storage = backup_manager._get_storage(S3_CONFIG) assert isinstance(s3_storage, S3Storage) def test_get_local_storage(backup_manager): """Test get_storage method with literal string as input.""" - local_storage = backup_manager.get_storage(LOCAL_CONFIG) + local_storage = backup_manager._get_storage(LOCAL_CONFIG) assert isinstance(local_storage, LocalStorage) @@ -125,7 +125,7 @@ def test_archive_gooddata_layouts_to_zip(backup_manager): ), Path(tmpdir + "/services"), ) - backup_manager.archive_gooddata_layouts_to_zip( + backup_manager._archive_gooddata_layouts_to_zip( str(Path(tmpdir, "services")) ) @@ -202,7 +202,7 @@ def test_store_user_data_filters(backup_manager): ] } user_data_filter_folderlocation = f"{TEST_DATA_SUBDIR}/test_exports/services/wsid1/20230713-132759-1_3_1_dev5/gooddata_layouts/services/workspaces/wsid1/user_data_filters" - backup_manager.store_user_data_filters( + backup_manager._store_user_data_filters( user_data_filters, Path( f"{TEST_DATA_SUBDIR}/test_exports/services/wsid1/20230713-132759-1_3_1_dev5", @@ -245,7 +245,7 @@ def test_local_storage_export(backup_manager): ), org_store_location, ) - local_storage = backup_manager.get_storage(LOCAL_CONFIG) + local_storage = backup_manager._get_storage(LOCAL_CONFIG) local_storage.export( folder=tmpdir, @@ -301,7 +301,7 @@ def test_split_to_batches(backup_manager): BackupBatch(["ws5"]), ] - result = backup_manager.split_to_batches(workspaces, batch_size) + result = backup_manager._split_to_batches(workspaces, batch_size) for i, batch in enumerate(result): assert isinstance(batch, BackupBatch) @@ -309,10 +309,10 @@ def test_split_to_batches(backup_manager): @mock.patch( - "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager.get_workspace_export" + "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager._get_workspace_export" ) @mock.patch( - "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager.archive_gooddata_layouts_to_zip" + "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager._archive_gooddata_layouts_to_zip" ) def test_process_batch_success( archive_gooddata_layouts_to_zip_mock, @@ -323,7 +323,7 @@ def test_process_batch_success( backup_manager.storage = mock.Mock() batch = BackupBatch(["ws1", "ws2"]) - backup_manager.process_batch( + backup_manager._process_batch( batch=batch, stop_event=threading.Event(), retry_count=0, @@ -335,10 +335,10 @@ def test_process_batch_success( @mock.patch( - "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager.get_workspace_export" + "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager._get_workspace_export" ) @mock.patch( - "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager.archive_gooddata_layouts_to_zip" + "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager._archive_gooddata_layouts_to_zip" ) def test_process_batch_retries_on_exception( archive_gooddata_layouts_to_zip_mock, @@ -360,7 +360,7 @@ def fail_once(*args, **kwargs): get_workspace_export_mock.side_effect = fail_once - backup_manager.process_batch( + backup_manager._process_batch( batch=batch, stop_event=threading.Event(), ) @@ -374,10 +374,10 @@ def fail_once(*args, **kwargs): @mock.patch( - "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager.get_workspace_export" + "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager._get_workspace_export" ) @mock.patch( - "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager.archive_gooddata_layouts_to_zip" + "gooddata_pipelines.backup_and_restore.backup_manager.BackupManager._archive_gooddata_layouts_to_zip" ) def test_process_batch_raises_after_max_retries( archive_gooddata_layouts_to_zip_mock, @@ -390,7 +390,7 @@ def test_process_batch_raises_after_max_retries( get_workspace_export_mock.side_effect = Exception("fail") with pytest.raises(Exception) as exc_info: - backup_manager.process_batch( + backup_manager._process_batch( batch=batch, stop_event=threading.Event(), retry_count=BackupSettings.MAX_RETRIES,