Skip to content

Commit fd9648a

Browse files
committed
feat(gooddata-pipelines): Add AWS auth via IAM role
1 parent 82a1c2d commit fd9648a

File tree

4 files changed

+105
-65
lines changed

4 files changed

+105
-65
lines changed

gooddata-pipelines/gooddata_pipelines/backup_and_restore/backup_manager.py

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def __init__(self, host: str, token: str, config: BackupRestoreConfig):
5555

5656
self.config = config
5757

58-
self.storage = self.get_storage(self.config)
58+
self.storage = self._get_storage(self.config)
5959
self.org_id = self._api.get_organization_id()
6060

6161
self.loader = BackupInputProcessor(self._api, self.config.api_page_size)
@@ -67,7 +67,7 @@ def create(
6767
host: str,
6868
token: str,
6969
) -> "BackupManager":
70-
"""Creates a backup worker instance using provided host and token."""
70+
"""Creates a backup worker instance using the provided host and token."""
7171
return cls(host=host, token=token, config=config)
7272

7373
@classmethod
@@ -81,7 +81,8 @@ def create_from_profile(
8181
content = profile_content(profile, profiles_path)
8282
return cls(**content, config=config)
8383

84-
def get_storage(self, conf: BackupRestoreConfig) -> BackupStorage:
84+
@staticmethod
85+
def _get_storage(conf: BackupRestoreConfig) -> BackupStorage:
8586
"""Returns the storage class based on the storage type."""
8687
if conf.storage_type == StorageType.S3:
8788
return S3Storage(conf)
@@ -100,7 +101,7 @@ def get_user_data_filters(self, ws_id: str) -> dict:
100101
else:
101102
raise RuntimeError(f"{response.status_code}: {response.text}")
102103

103-
def store_user_data_filters(
104+
def _store_user_data_filters(
104105
self,
105106
user_data_filters: dict,
106107
export_path: Path,
@@ -128,20 +129,20 @@ def store_user_data_filters(
128129
"user_data_filters",
129130
filter["id"] + ".yaml",
130131
)
131-
self.write_to_yaml(udf_file_path, filter)
132+
self._write_to_yaml(udf_file_path, filter)
132133

133134
@staticmethod
134-
def move_folder(source: Path, destination: Path) -> None:
135+
def _move_folder(source: Path, destination: Path) -> None:
135136
"""Moves the source folder to the destination."""
136137
shutil.move(source, destination)
137138

138139
@staticmethod
139-
def write_to_yaml(path: str, source: Any) -> None:
140+
def _write_to_yaml(path: str, source: Any) -> None:
140141
"""Writes the source to a YAML file."""
141142
with open(path, "w") as outfile:
142143
yaml.dump(source, outfile)
143144

144-
def get_automations_from_api(self, workspace_id: str) -> Any:
145+
def _get_automations_from_api(self, workspace_id: str) -> Any:
145146
"""Returns automations for the workspace as JSON."""
146147
response: requests.Response = self._api.get_automations(workspace_id)
147148
if response.ok:
@@ -152,10 +153,10 @@ def get_automations_from_api(self, workspace_id: str) -> Any:
152153
+ f"{response.status_code}: {response.text}"
153154
)
154155

155-
def store_automations(self, export_path: Path, workspace_id: str) -> None:
156+
def _store_automations(self, export_path: Path, workspace_id: str) -> None:
156157
"""Stores the automations in the specified export path."""
157158
# Get the automations from the API
158-
automations: Any = self.get_automations_from_api(workspace_id)
159+
automations: Any = self._get_automations_from_api(workspace_id)
159160

160161
automations_folder_path: Path = Path(
161162
export_path,
@@ -184,8 +185,8 @@ def store_declarative_filter_views(
184185
# Get the filter views YAML files from the API
185186
self._api.store_declarative_filter_views(workspace_id, export_path)
186187

187-
# Move filter views to the subfolder containing analytics model
188-
self.move_folder(
188+
# Move filter views to the subfolder containing the analytics model
189+
self._move_folder(
189190
Path(export_path, "gooddata_layouts", self.org_id, "filter_views"),
190191
Path(
191192
export_path,
@@ -197,7 +198,7 @@ def store_declarative_filter_views(
197198
),
198199
)
199200

200-
def get_workspace_export(
201+
def _get_workspace_export(
201202
self,
202203
local_target_path: str,
203204
workspaces_to_export: list[str],
@@ -232,9 +233,9 @@ def get_workspace_export(
232233
# be more transparent/readable and possibly safer for threading
233234
self._api.store_declarative_workspace(workspace_id, export_path)
234235
self.store_declarative_filter_views(export_path, workspace_id)
235-
self.store_automations(export_path, workspace_id)
236+
self._store_automations(export_path, workspace_id)
236237

237-
self.store_user_data_filters(
238+
self._store_user_data_filters(
238239
user_data_filters, export_path, workspace_id
239240
)
240241
self.logger.info(f"Stored export for {workspace_id}")
@@ -250,7 +251,7 @@ def get_workspace_export(
250251
+ "is correct and that the workspaces exist."
251252
)
252253

253-
def archive_gooddata_layouts_to_zip(self, folder: str) -> None:
254+
def _archive_gooddata_layouts_to_zip(self, folder: str) -> None:
254255
"""Archives the gooddata_layouts directory to a zip file."""
255256
try:
256257
target_subdir = ""
@@ -271,11 +272,12 @@ def archive_gooddata_layouts_to_zip(self, folder: str) -> None:
271272
self.logger.error(f"Error archiving {folder} to zip: {e}")
272273
raise
273274

274-
def split_to_batches(
275-
self, workspaces_to_export: list[str], batch_size: int
275+
@staticmethod
276+
def _split_to_batches(
277+
workspaces_to_export: list[str], batch_size: int
276278
) -> list[BackupBatch]:
277-
"""Splits the list of workspaces to into batches of the specified size.
278-
The batch is respresented as a list of workspace IDs.
279+
"""Splits the list of workspaces into batches of the specified size.
280+
The batch is represented as a list of workspace IDs.
279281
Returns a list of batches (i.e. list of lists of IDs)
280282
"""
281283
list_of_batches = []
@@ -286,7 +288,7 @@ def split_to_batches(
286288

287289
return list_of_batches
288290

289-
def process_batch(
291+
def _process_batch(
290292
self,
291293
batch: BackupBatch,
292294
stop_event: threading.Event,
@@ -298,14 +300,14 @@ def process_batch(
298300
The base wait time is defined by BackupSettings.RETRY_DELAY.
299301
"""
300302
if stop_event.is_set():
301-
# If the stop_event flag is set, return. This will terminate the thread.
303+
# If the stop_event flag is set, return. This will terminate the thread
302304
return
303305

304306
try:
305307
with tempfile.TemporaryDirectory() as tmpdir:
306-
self.get_workspace_export(tmpdir, batch.list_of_ids)
308+
self._get_workspace_export(tmpdir, batch.list_of_ids)
307309

308-
self.archive_gooddata_layouts_to_zip(
310+
self._archive_gooddata_layouts_to_zip(
309311
str(Path(tmpdir, self.org_id))
310312
)
311313

@@ -316,7 +318,7 @@ def process_batch(
316318
return
317319

318320
elif retry_count < BackupSettings.MAX_RETRIES:
319-
# Retry with exponential backoff until MAX_RETRIES.
321+
# Retry with exponential backoff until MAX_RETRIES
320322
next_retry = retry_count + 1
321323
wait_time = BackupSettings.RETRY_DELAY**next_retry
322324
self.logger.info(
@@ -326,13 +328,13 @@ def process_batch(
326328
)
327329

328330
time.sleep(wait_time)
329-
self.process_batch(batch, stop_event, next_retry)
331+
self._process_batch(batch, stop_event, next_retry)
330332
else:
331-
# If the batch fails after MAX_RETRIES, raise the error.
333+
# If the batch fails after MAX_RETRIES, raise the error
332334
self.logger.error(f"Batch failed: {e.__class__.__name__}: {e}")
333335
raise
334336

335-
def process_batches_in_parallel(
337+
def _process_batches_in_parallel(
336338
self,
337339
batches: list[BackupBatch],
338340
) -> None:
@@ -352,7 +354,7 @@ def process_batches_in_parallel(
352354
for batch in batches:
353355
futures.append(
354356
executor.submit(
355-
self.process_batch,
357+
self._process_batch,
356358
batch,
357359
stop_event,
358360
)
@@ -363,10 +365,10 @@ def process_batches_in_parallel(
363365
try:
364366
future.result()
365367
except Exception:
366-
# On failure, set the flag to True - signal running processes to stop.
368+
# On failure, set the flag to True - signal running processes to stop
367369
stop_event.set()
368370

369-
# Cancel unstarted threads.
371+
# Cancel unstarted threads
370372
for f in futures:
371373
if not f.done():
372374
f.cancel()
@@ -383,58 +385,58 @@ def backup_workspaces(
383385
workspace in storage specified in the configuration.
384386
385387
Args:
386-
path_to_csv (str): Path to a CSV file containing a list of workspace IDs.
388+
path_to_csv (str): Path to a CSV file containing a list of workspace IDs
387389
workspace_ids (list[str]): List of workspace IDs
388390
"""
389-
self.backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids)
391+
self._backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids)
390392

391393
def backup_hierarchies(
392394
self, path_to_csv: str | None, workspace_ids: list[str] | None
393395
) -> None:
394396
"""Runs the backup process for a list of hierarchies.
395397
396398
Will take the list of workspace IDs or read the list of workspace IDs
397-
from a CSV file and create backup for each those workspaces' hierarchies
399+
from a CSV file and create backup for each of those workspaces' hierarchies
398400
in storage specified in the configuration.
399401
Workspace hierarchy means the workspace itself and all its direct and
400402
indirect children.
401403
402404
Args:
403-
path_to_csv (str): Path to a CSV file containing a list of workspace IDs.
405+
path_to_csv (str): Path to a CSV file containing a list of workspace IDs
404406
workspace_ids (list[str]): List of workspace IDs
405407
"""
406-
self.backup(InputType.HIERARCHY, path_to_csv, workspace_ids)
408+
self._backup(InputType.HIERARCHY, path_to_csv, workspace_ids)
407409

408410
def backup_entire_organization(self) -> None:
409411
"""Runs the backup process for the entire organization.
410412
411413
Will create backup for all workspaces in the organization in storage
412414
specified in the configuration.
413415
"""
414-
self.backup(InputType.ORGANIZATION)
416+
self._backup(InputType.ORGANIZATION)
415417

416-
def backup(
418+
def _backup(
417419
self,
418420
input_type: InputType,
419421
path_to_csv: str | None = None,
420422
workspace_ids: list[str] | None = None,
421423
) -> None:
422-
"""Runs the backup process with selected input type."""
424+
"""Runs the backup process with the selected input type."""
423425
try:
424426
workspaces_to_export: list[str] = self.loader.get_ids_to_backup(
425427
input_type,
426428
path_to_csv,
427429
workspace_ids,
428430
)
429-
batches = self.split_to_batches(
431+
batches = self._split_to_batches(
430432
workspaces_to_export, self.config.batch_size
431433
)
432434

433435
self.logger.info(
434436
f"Exporting {len(workspaces_to_export)} workspaces in {len(batches)} batches."
435437
)
436438

437-
self.process_batches_in_parallel(batches)
439+
self._process_batches_in_parallel(batches)
438440

439441
self.logger.info("Backup completed")
440442
except Exception as e:

gooddata-pipelines/gooddata_pipelines/backup_and_restore/models/storage.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,40 @@ class S3StorageConfig(BaseModel):
2121

2222
backup_path: str
2323
bucket: str
24-
profile: str = "default"
24+
profile: Optional[str] = None
2525
aws_access_key_id: Optional[str] = None
2626
aws_secret_access_key: Optional[str] = None
27-
aws_default_region: Optional[str] = None
27+
aws_default_region: Optional[str] = "us-east-1"
28+
29+
@classmethod
30+
def from_iam_role(cls, backup_path: str, bucket: str) -> "S3StorageConfig":
31+
"""Use default IAM role or environment credentials."""
32+
return cls(backup_path=backup_path, bucket=bucket)
33+
34+
@classmethod
35+
def from_aws_credentials(
36+
cls,
37+
backup_path: str,
38+
bucket: str,
39+
aws_access_key_id: str,
40+
aws_secret_access_key: str,
41+
aws_default_region: str,
42+
) -> "S3StorageConfig":
43+
"""Use explicit AWS access keys and region."""
44+
return cls(
45+
backup_path=backup_path,
46+
bucket=bucket,
47+
aws_access_key_id=aws_access_key_id,
48+
aws_secret_access_key=aws_secret_access_key,
49+
aws_default_region=aws_default_region,
50+
)
51+
52+
@classmethod
53+
def from_aws_profile(
54+
cls, backup_path: str, bucket: str, profile: str
55+
) -> "S3StorageConfig":
56+
"""Use a named AWS CLI profile."""
57+
return cls(backup_path=backup_path, bucket=bucket, profile=profile)
2858

2959

3060
class LocalStorageConfig(BaseModel):

gooddata-pipelines/gooddata_pipelines/backup_and_restore/storage/s3_storage.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def __init__(self, conf: BackupRestoreConfig):
2222

2323
self._config = conf.storage
2424
self._session = self._create_boto_session(self._config)
25+
self._client = self._session.client("s3")
2526
self._resource = self._session.resource("s3")
2627
self._bucket = self._resource.Bucket(self._config.bucket) # type: ignore [missing library stubs]
2728
suffix = "/" if not self._config.backup_path.endswith("/") else ""
@@ -43,32 +44,37 @@ def _create_boto_session(self, config: S3StorageConfig) -> boto3.Session:
4344
)
4445
except Exception:
4546
self.logger.warning(
46-
"Failed to create boto3 session with supplied credentials. Falling back to profile..."
47+
"Failed to create boto3 session with supplied credentials."
48+
)
49+
50+
if config.profile:
51+
try:
52+
return boto3.Session(profile_name=config.profile)
53+
except Exception:
54+
self.logger.warning(
55+
f"AWS profile [{config.profile}] not found."
4756
)
4857

4958
try:
50-
return boto3.Session(profile_name=config.profile)
59+
return boto3.Session()
5160
except Exception:
5261
self.logger.warning(
53-
'AWS profile "[default]" not found. Trying other fallback methods...'
62+
"Failed to create boto3 session with default IAM role or environment credentials."
5463
)
5564

56-
return boto3.Session()
57-
5865
def _verify_connection(self) -> None:
5966
"""
6067
Pings the S3 bucket to verify that the connection is working.
6168
"""
6269
try:
63-
# TODO: install boto3 s3 stubs
64-
self._resource.meta.client.head_bucket(Bucket=self._config.bucket)
70+
self._client.head_bucket(Bucket=self._config.bucket)
6571
except Exception as e:
6672
raise RuntimeError(
6773
f"Failed to connect to S3 bucket {self._config.bucket}: {e}"
6874
)
6975

7076
def export(self, folder: str, org_id: str) -> None:
71-
"""Uploads the content of the folder to S3 as backup."""
77+
"""Uploads the content of the folder to S3 as a backup."""
7278
storage_path = f"{self._config.bucket}/{self._backup_path}"
7379
self.logger.info(f"Uploading {org_id} to {storage_path}")
7480
folder = f"{folder}/{org_id}"
@@ -77,10 +83,12 @@ def export(self, folder: str, org_id: str) -> None:
7783
export_path = (
7884
f"{self._backup_path}{org_id}/{full_path[len(folder) + 1 :]}/"
7985
)
80-
self._bucket.put_object(Key=export_path)
86+
self._client.put_object(Bucket=self._config.bucket, Key=export_path)
8187

8288
for file in files:
8389
full_path = os.path.join(subdir, file)
8490
with open(full_path, "rb") as data:
8591
export_path = f"{self._backup_path}{org_id}/{full_path[len(folder) + 1 :]}"
86-
self._bucket.put_object(Key=export_path, Body=data)
92+
self._client.put_object(
93+
Bucket=self._config.bucket, Key=export_path, Body=data
94+
)

0 commit comments

Comments
 (0)