Skip to content

Commit 6249c53

Browse files
author
gdgate
authored
Merge pull request #441 from jaceksan/working
TRIVIAL: gooddata-dbt - enable --dry-run for pre-merge pipelines Reviewed-by: Jan Kadlec https://github.com/hkad98
2 parents e65d7ca + c8a6bd4 commit 6249c53

File tree

5 files changed

+178
-39
lines changed

5 files changed

+178
-39
lines changed

gooddata-dbt/gooddata_dbt/args.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ def set_dbt_cloud_stats_args(parser: argparse.ArgumentParser) -> None:
140140
def parse_arguments(description: str) -> argparse.Namespace:
141141
parser = get_parser(description)
142142
parser.add_argument("--debug", action="store_true", default=False, help="Increase logging level to DEBUG")
143+
parser.add_argument("--dry-run", action="store_true", default=False, help="Do not call GoodData APIs")
143144
set_gooddata_endpoint_args(parser)
144145

145146
subparsers = parser.add_subparsers(help="actions")
@@ -175,6 +176,7 @@ def parse_arguments(description: str) -> argparse.Namespace:
175176

176177
upload_notification = subparsers.add_parser("upload_notification")
177178
set_dbt_args(upload_notification)
179+
set_gooddata_upper_case_args(upload_notification)
178180
upload_notification.set_defaults(method="upload_notification")
179181

180182
deploy_analytics = subparsers.add_parser("deploy_analytics")

gooddata-dbt/gooddata_dbt/dbt/tables.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,14 +218,19 @@ def read_dbt_models(dbt_catalog: Dict, upper_case: bool, all_model_ids: List[str
218218
column.meta.gooddata.upper_case_names()
219219
return tables
220220

221-
def set_data_types(self, scan_pdm: CatalogDeclarativeTables) -> None:
222-
for table in self.tables:
223-
scan_table = self.get_scan_table(scan_pdm, table.name)
224-
for column in table.columns.values():
225-
# dbt does not provide data types in manifest.json
226-
# get it from GoodData scan API
227-
scan_column = self.get_scan_column(scan_table, column.name)
228-
column.data_type = scan_column.data_type
221+
def set_data_types(self, scan_pdm: CatalogDeclarativeTables, dry_run: bool = False) -> None:
222+
if dry_run:
223+
for table in self.tables:
224+
for column in table.columns.values():
225+
column.data_type = "STRING"
226+
else:
227+
for table in self.tables:
228+
scan_table = self.get_scan_table(scan_pdm, table.name)
229+
for column in table.columns.values():
230+
# dbt does not provide data types in manifest.json
231+
# get it from GoodData scan API
232+
scan_column = self.get_scan_column(scan_table, column.name)
233+
column.data_type = scan_column.data_type
229234

230235
@property
231236
def schema_name(self) -> str:

gooddata-dbt/gooddata_dbt/dbt_plugin.py

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from gooddata_dbt.dbt.cloud import DbtConnection, DbtCredentials, DbtExecution
1515
from gooddata_dbt.dbt.profiles import DbtProfiles
1616
from gooddata_dbt.dbt.tables import DbtModelTables
17+
from gooddata_dbt.gooddata.api_wrapper import GoodDataApiWrapper
1718
from gooddata_dbt.gooddata.config import GoodDataConfig, GoodDataConfigOrganization, GoodDataConfigProduct
1819
from gooddata_dbt.logger import get_logger
1920
from gooddata_dbt.sdk_wrapper import GoodDataSdkWrapper
@@ -32,30 +33,35 @@ def layout_model_path(data_product: GoodDataConfigProduct) -> Path:
3233

3334
def generate_and_put_ldm(
3435
logger: logging.Logger,
35-
sdk: GoodDataSdk,
36+
sdk_wrapper: GoodDataSdkWrapper,
3637
data_source_id: str,
3738
workspace_id: str,
3839
dbt_tables: DbtModelTables,
3940
model_ids: Optional[List[str]],
4041
) -> None:
4142
scan_request = CatalogScanModelRequest(scan_tables=True, scan_views=True)
4243
logger.info(f"Scan data source {data_source_id=}")
43-
scan_pdm = sdk.catalog_data_source.scan_data_source(data_source_id, scan_request, report_warnings=True).pdm
44+
scan_pdm = sdk_wrapper.sdk_facade.scan_data_source(data_source_id, scan_request)
4445
scan_pdm.store_to_disk(Path("test"))
4546
# Store data types to dbt_tables class. It is used in make_declarative_datasets to inject data types to LDM.
46-
dbt_tables.set_data_types(scan_pdm)
47+
dbt_tables.set_data_types(scan_pdm, sdk_wrapper.sdk_facade.dry_run)
4748
# Construct GoodData LDM from dbt models
4849
declarative_datasets = dbt_tables.make_declarative_datasets(data_source_id, model_ids)
4950
ldm = CatalogDeclarativeModel.from_dict({"ldm": declarative_datasets}, camel_case=False)
5051
# Deploy logical into target workspace
51-
sdk.catalog_workspace_content.put_declarative_ldm(workspace_id, ldm)
52+
sdk_wrapper.sdk_facade.put_declarative_ldm(workspace_id, ldm)
5253

5354

54-
def create_workspace(logger: logging.Logger, sdk: GoodDataSdk, workspace_id: str, workspace_title: str) -> None:
55+
def create_workspace(
56+
logger: logging.Logger,
57+
sdk_wrapper: GoodDataSdkWrapper,
58+
workspace_id: str,
59+
workspace_title: str,
60+
) -> None:
5561
logger.info(f"Create workspace {workspace_id=} {workspace_title=}")
5662
# Create workspaces, if they do not exist yet, otherwise update them
5763
workspace = CatalogWorkspace(workspace_id=workspace_id, name=workspace_title)
58-
sdk.catalog_workspace.create_or_update(workspace=workspace)
64+
sdk_wrapper.sdk_facade.create_workspace(workspace=workspace)
5965

6066

6167
def deploy_ldm(
@@ -70,7 +76,7 @@ def deploy_ldm(
7076
dbt_profiles = DbtProfiles(args)
7177
data_source_id = dbt_profiles.data_source_id
7278
dbt_tables = DbtModelTables.from_local(args.gooddata_upper_case, all_model_ids)
73-
generate_and_put_ldm(logger, sdk_wrapper.sdk, data_source_id, workspace_id, dbt_tables, model_ids)
79+
generate_and_put_ldm(logger, sdk_wrapper, data_source_id, workspace_id, dbt_tables, model_ids)
7480
workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/modeler/#/{workspace_id}"
7581
logger.info(f"LDM successfully loaded, verify here: {workspace_url}")
7682

@@ -89,12 +95,12 @@ def register_data_source(
8995

9096
logger.info(f"Register data source {data_source_id=} schema={dbt_tables.schema_name}")
9197
data_source = dbt_target.to_gooddata(data_source_id, dbt_tables.schema_name)
92-
sdk_wrapper.sdk.catalog_data_source.create_or_update_data_source(data_source)
98+
sdk_wrapper.sdk_facade.create_or_update_data_source(data_source)
9399

94100

95-
def upload_notification(logger: logging.Logger, sdk: GoodDataSdk, data_source_id: str) -> None:
101+
def upload_notification(logger: logging.Logger, sdk_wrapper: GoodDataSdkWrapper, data_source_id: str) -> None:
96102
logger.info(f"Upload notification {data_source_id=}")
97-
sdk.catalog_data_source.register_upload_notification(data_source_id)
103+
sdk_wrapper.sdk_facade.register_upload_notification(data_source_id)
98104

99105

100106
def deploy_analytics(
@@ -110,7 +116,7 @@ def deploy_analytics(
110116

111117
# Deploy analytics model into target workspace
112118
logger.info("Load analytics model into GoodData")
113-
sdk_wrapper.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm)
119+
sdk_wrapper.sdk_facade.put_declarative_analytics_model(workspace_id, adm)
114120

115121
workspace_url = f"{sdk_wrapper.get_host_from_sdk()}/dashboards/#/workspace/{workspace_id}"
116122
logger.info(f"Analytics successfully loaded, verify here: {workspace_url}")
@@ -130,20 +136,20 @@ def store_analytics(
130136
)
131137

132138

133-
async def execute_insight(sdk: GoodDataSdk, workspace_id: str, insight: Insight) -> None:
134-
sdk.tables.for_insight(workspace_id, insight)
139+
async def execute_insight(sdk_wrapper: GoodDataSdkWrapper, workspace_id: str, insight: Insight) -> None:
140+
sdk_wrapper.sdk_facade.execute_insight(workspace_id, insight)
135141

136142

137143
async def test_insight(
138144
logger: logging.Logger,
139-
sdk: GoodDataSdk,
145+
sdk_wrapper: GoodDataSdkWrapper,
140146
workspace_id: str,
141147
insight: Insight,
142148
) -> dict:
143149
logger.info(f"Executing insight {insight.id=} {insight.title=} ...")
144150
start = time()
145151
try:
146-
await execute_insight(sdk, workspace_id, insight)
152+
await execute_insight(sdk_wrapper, workspace_id, insight)
147153
duration = get_duration(start)
148154
logger.info(f"Test successful {insight.id=} {insight.title=} duration={duration}(ms)")
149155
return {"id": insight.id, "title": insight.title, "duration": duration, "status": "success"}
@@ -155,37 +161,37 @@ async def test_insight(
155161

156162
async def safe_test_insight(
157163
logger: logging.Logger,
158-
sdk: GoodDataSdk,
164+
sdk_wrapper: GoodDataSdkWrapper,
159165
workspace_id: str,
160166
insight: Insight,
161167
semaphore: Semaphore,
162168
) -> dict:
163169
async with semaphore: # semaphore limits num of simultaneous executions
164170
return await test_insight(
165171
logger,
166-
sdk,
172+
sdk_wrapper,
167173
workspace_id,
168174
insight,
169175
)
170176

171177

172178
async def test_insights(
173179
logger: logging.Logger,
174-
sdk: GoodDataSdk,
180+
sdk_wrapper: GoodDataSdkWrapper,
175181
workspace_id: str,
176182
skip_tests: Optional[List[str]],
177183
test_insights_parallelism: int = 1,
178184
) -> None:
179185
start = time()
180186
logger.info(f"Test insights {workspace_id=}")
181-
insights = sdk.insights.get_insights(workspace_id)
187+
insights = sdk_wrapper.sdk_facade.get_insights(workspace_id)
182188
semaphore = asyncio.Semaphore(test_insights_parallelism)
183189
tasks = []
184190
for insight in insights:
185191
if skip_tests is not None and insight.id in skip_tests:
186192
logger.info(f"Skip test insight={insight.title} (requested in gooddata.yaml)")
187193
else:
188-
tasks.append(safe_test_insight(logger, sdk, workspace_id, insight, semaphore))
194+
tasks.append(safe_test_insight(logger, sdk_wrapper, workspace_id, insight, semaphore))
189195
results = await asyncio.gather(*tasks)
190196
duration = get_duration(start)
191197
errors = [result for result in results if result["status"] == "failed"]
@@ -195,7 +201,11 @@ async def test_insights(
195201
logger.info(f"Test insights finished {workspace_id=} {duration=}(ms)")
196202

197203

198-
def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDataSdk, workspace_id: str) -> None:
204+
def create_localized_workspaces(
205+
data_product: GoodDataConfigProduct,
206+
sdk_facade: GoodDataApiWrapper,
207+
workspace_id: str,
208+
) -> None:
199209
if data_product.localization is None:
200210
return
201211
for to in data_product.localization.to:
@@ -205,15 +215,14 @@ def create_localized_workspaces(data_product: GoodDataConfigProduct, sdk: GoodDa
205215
source=data_product.localization.from_language, target=to.language
206216
).translate_batch
207217
logging.info(f"create_localized_workspaces layout_root_path={GOODDATA_LAYOUTS_DIR / data_product.id}")
208-
sdk.catalog_workspace.generate_localized_workspaces(
218+
sdk_facade.generate_localized_workspaces(
209219
workspace_id,
210-
to_lang=to.language,
211-
to_locale=to.locale,
212-
from_lang=data_product.localization.from_language,
220+
to=to,
221+
data_product=data_product,
213222
translator_func=translator_func,
223+
layout_path=GOODDATA_LAYOUTS_DIR / data_product.id,
214224
provision_workspace=True,
215225
store_layouts=False,
216-
layout_root_path=GOODDATA_LAYOUTS_DIR / data_product.id,
217226
)
218227

219228

@@ -329,7 +338,7 @@ def process_organization(
329338
if args.method == "upload_notification":
330339
dbt_profiles = DbtProfiles(args)
331340
# Caches are invalidated only per data source, not per data product
332-
upload_notification(logger, sdk_wrapper.sdk, dbt_profiles.data_source_id)
341+
upload_notification(logger, sdk_wrapper, dbt_profiles.data_source_id)
333342
elif args.method == "register_data_sources":
334343
register_data_source(logger, args, gd_config.all_model_ids, sdk_wrapper)
335344
else:
@@ -345,21 +354,21 @@ def process_organization(
345354
workspace_id = f"{data_product.id}_{environment.id}"
346355
workspace_title = f"{data_product.name} ({environment.name})"
347356
if args.method == "provision_workspaces":
348-
create_workspace(logger, sdk_wrapper.sdk, workspace_id, workspace_title)
357+
create_workspace(logger, sdk_wrapper, workspace_id, workspace_title)
349358
elif args.method == "deploy_ldm":
350359
deploy_ldm(
351360
logger, args, gd_config.all_model_ids, sdk_wrapper, data_product.model_ids, workspace_id
352361
)
353362
if data_product.localization:
354-
create_localized_workspaces(data_product, sdk_wrapper.sdk, workspace_id)
363+
create_localized_workspaces(data_product, sdk_wrapper.sdk_facade, workspace_id)
355364
elif args.method == "store_analytics":
356365
store_analytics(logger, sdk_wrapper.sdk, workspace_id, data_product)
357366
elif args.method == "deploy_analytics":
358367
deploy_analytics(logger, sdk_wrapper, workspace_id, data_product)
359368
elif args.method == "test_insights":
360369
parallelism = gd_config.global_properties.test_insights_parallelism or 1
361370
asyncio.run(
362-
test_insights(logger, sdk_wrapper.sdk, workspace_id, data_product.skip_tests, parallelism)
371+
test_insights(logger, sdk_wrapper, workspace_id, data_product.skip_tests, parallelism)
363372
)
364373
else:
365374
raise Exception(f"Unsupported method requested in args: {args.method}")
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# (C) 2023 GoodData Corporation
2+
import logging
3+
from pathlib import Path
4+
from typing import Any, List, Union
5+
6+
from gooddata_dbt.gooddata.config import GoodDataConfigLocalizationTo, GoodDataConfigProduct
7+
8+
from gooddata_sdk import (
9+
CatalogDataSourcePostgres,
10+
CatalogDataSourceSnowflake,
11+
CatalogDataSourceVertica,
12+
CatalogDeclarativeAnalytics,
13+
CatalogDeclarativeColumn,
14+
CatalogDeclarativeModel,
15+
CatalogDeclarativeTable,
16+
CatalogDeclarativeTables,
17+
CatalogScanModelRequest,
18+
CatalogWorkspace,
19+
GoodDataSdk,
20+
Insight,
21+
)
22+
23+
DataSource = Union[CatalogDataSourcePostgres, CatalogDataSourceSnowflake, CatalogDataSourceVertica]
24+
25+
26+
class GoodDataApiWrapper:
27+
def __init__(self, sdk: GoodDataSdk, logger: logging.Logger, dry_run: bool = False) -> None:
28+
self.sdk = sdk
29+
self.logger = logger
30+
self.dry_run = dry_run
31+
32+
def get_insights(self, workspace_id: str) -> List[Insight]:
33+
if self.dry_run:
34+
self.logger.info("Dry run - skipping insights listing")
35+
return []
36+
else:
37+
return self.sdk.insights.get_insights(workspace_id)
38+
39+
def execute_insight(self, workspace_id: str, insight: Insight) -> None:
40+
if self.dry_run:
41+
self.logger.info("Dry run - skipping insights execution")
42+
else:
43+
self.sdk.tables.for_insight(workspace_id, insight)
44+
45+
def scan_data_source(self, data_source_id: str, scan_request: CatalogScanModelRequest) -> CatalogDeclarativeTables:
46+
if self.dry_run:
47+
self.logger.info("Dry run - skipping data source scanning")
48+
return CatalogDeclarativeTables(
49+
tables=[
50+
CatalogDeclarativeTable(
51+
id="dry_run",
52+
type="DATA_SOURCE_TABLE",
53+
path=["table"],
54+
columns=[CatalogDeclarativeColumn(name="dry_run", data_type="STRING")],
55+
)
56+
]
57+
)
58+
else:
59+
return self.sdk.catalog_data_source.scan_data_source(data_source_id, scan_request, report_warnings=True).pdm
60+
61+
def put_declarative_ldm(self, workspace_id: str, declarative_ldm: CatalogDeclarativeModel) -> None:
62+
if self.dry_run:
63+
self.logger.info("Dry run - skipping declarative LDM put")
64+
else:
65+
self.sdk.catalog_workspace_content.put_declarative_ldm(workspace_id, declarative_ldm)
66+
67+
def create_workspace(self, workspace: CatalogWorkspace) -> None:
68+
if self.dry_run:
69+
self.logger.info("Dry run - skipping workspace creation")
70+
else:
71+
self.sdk.catalog_workspace.create_or_update(workspace=workspace)
72+
73+
def create_or_update_data_source(self, data_source: DataSource) -> None:
74+
if self.dry_run:
75+
self.logger.info("Dry run - skipping data source creation")
76+
else:
77+
self.sdk.catalog_data_source.create_or_update_data_source(data_source)
78+
79+
def register_upload_notification(self, data_source_id: str) -> None:
80+
if self.dry_run:
81+
self.logger.info("Dry run - skipping upload notification registration")
82+
else:
83+
self.sdk.catalog_data_source.register_upload_notification(data_source_id)
84+
85+
def put_declarative_analytics_model(self, workspace_id: str, adm: CatalogDeclarativeAnalytics) -> None:
86+
if self.dry_run:
87+
self.logger.info("Dry run - skipping declarative analytics model put")
88+
else:
89+
self.sdk.catalog_workspace_content.put_declarative_analytics_model(workspace_id, adm)
90+
91+
def generate_localized_workspaces(
92+
self,
93+
workspace_id: str,
94+
to: GoodDataConfigLocalizationTo,
95+
data_product: GoodDataConfigProduct,
96+
translator_func: Any,
97+
layout_path: Path,
98+
provision_workspace: bool = True,
99+
store_layouts: bool = False,
100+
) -> None:
101+
from_language = "en"
102+
if data_product.localization is not None:
103+
from_language = data_product.localization.from_language
104+
if self.dry_run:
105+
self.logger.info("Dry run - skipping localized workspaces generation")
106+
else:
107+
self.sdk.catalog_workspace.generate_localized_workspaces(
108+
workspace_id,
109+
to_lang=to.language,
110+
to_locale=to.locale,
111+
from_lang=from_language,
112+
translator_func=translator_func,
113+
layout_root_path=layout_path,
114+
provision_workspace=provision_workspace,
115+
store_layouts=store_layouts,
116+
)

0 commit comments

Comments
 (0)