Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,25 +158,33 @@ def get_all_workspaces(self) -> list[str]:
return all_workspaces

def get_ids_to_backup(
self, input_type: InputType, path_to_csv: str | None = None
self,
input_type: InputType,
path_to_csv: str | None = None,
workspace_ids: list[str] | None = None,
) -> list[str]:
"""Returns the list of workspace IDs to back up based on the input type."""

if input_type in (InputType.LIST_OF_WORKSPACES, InputType.HIERARCHY):
if path_to_csv is None:
if (path_to_csv is None) == (workspace_ids is None):
raise ValueError(
f"Path to CSV is required for this input type: {input_type.value}"
f"Path to CSV and list of workspace IDs must be specified exclusively for this input type: {input_type.value}"
)

# If we're backing up based on the list, simply read it from the CSV
list_of_parents = []
if path_to_csv is not None:
list_of_parents = self.csv_reader.read_backup_csv(path_to_csv)
if workspace_ids is not None:
list_of_parents = workspace_ids

if input_type == InputType.LIST_OF_WORKSPACES:
return self.csv_reader.read_backup_csv(path_to_csv)
return list_of_parents
else:
# For hierarchy backup, we read the CSV and treat it as a list of
# parent workspace IDs. Then we retrieve the children of each parent,
# including their children, and so on. The parent workspaces are
# also included in the backup.
list_of_parents = self.csv_reader.read_backup_csv(path_to_csv)
list_of_children: list[str] = []

for parent in list_of_parents:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,29 +373,37 @@ def process_batches_in_parallel(

raise

def backup_workspaces(self, path_to_csv: str) -> None:
def backup_workspaces(
self, path_to_csv: str | None, workspace_ids: list[str] | None
) -> None:
"""Runs the backup process for a list of workspace IDs.

Will read the list of workspace IDs from a CSV file and create backup for
each workspace in storage specified in the configuration.
Will take the list of workspace IDs or read the list of
workspace IDs from a CSV file and create backup for each
workspace in storage specified in the configuration.

Args:
path_to_csv (str): Path to a CSV file containing a list of workspace IDs.
workspace_ids (list[str]): List of workspace IDs
"""
self.backup(InputType.LIST_OF_WORKSPACES, path_to_csv)
self.backup(InputType.LIST_OF_WORKSPACES, path_to_csv, workspace_ids)

def backup_hierarchies(self, path_to_csv: str) -> None:
def backup_hierarchies(
self, path_to_csv: str | None, workspace_ids: list[str] | None
) -> None:
"""Runs the backup process for a list of hierarchies.

Will read the list of workspace IDs from a CSV file and create backup for
each those workspaces' hierarchies in storage specified in the configuration.
Will take the list of workspace IDs or read the list of workspace IDs
from a CSV file and create backup for each those workspaces' hierarchies
in storage specified in the configuration.
Workspace hierarchy means the workspace itself and all its direct and
indirect children.

Args:
path_to_csv (str): Path to a CSV file containing a list of workspace IDs.
workspace_ids (list[str]): List of workspace IDs
"""
self.backup(InputType.HIERARCHY, path_to_csv)
self.backup(InputType.HIERARCHY, path_to_csv, workspace_ids)

def backup_entire_organization(self) -> None:
"""Runs the backup process for the entire organization.
Expand All @@ -406,12 +414,17 @@ def backup_entire_organization(self) -> None:
self.backup(InputType.ORGANIZATION)

def backup(
self, input_type: InputType, path_to_csv: str | None = None
self,
input_type: InputType,
path_to_csv: str | None = None,
workspace_ids: list[str] | None = None,
) -> None:
"""Runs the backup process with selected input type."""
try:
workspaces_to_export: list[str] = self.loader.get_ids_to_backup(
input_type, path_to_csv
input_type,
path_to_csv,
workspace_ids,
)
batches = self.split_to_batches(
workspaces_to_export, self.config.batch_size
Expand Down
Loading