Skip to content

Commit bd6cebd

Browse files
authored
Merge branch 'main' into dependabot/gradle/projects/control-service/projects/pipelines_control_service/com.nimbusds-nimbus-jose-jwt-9.39.1
2 parents 5a1d6a7 + 99f0161 commit bd6cebd

32 files changed

+902
-114
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Copyright 2024-2025 Broadcom
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
5+
class DataQualityException(Exception):
6+
"""
7+
Exception raised for errors with the quality of the data.
8+
9+
Attributes:
10+
checked_object -- Object that the quality checks are ran against
11+
target_table -- DWH table where target data is loaded
12+
source_view -- View from which the raw data is loaded from
13+
"""
14+
15+
def __init__(self, checked_object, target_table, source_view):
16+
self.checked_object = checked_object
17+
self.target_table = target_table
18+
self.source_view = source_view
19+
self.message = f"""What happened: Error occurred while performing quality checks.\n
20+
Why it happened: Object: {checked_object} is not passing the quality checks.\n
21+
Consequences: The source view data will not be processed to the target table - {target_table}.\n
22+
Countermeasures: Check the source view: {source_view} what data is trying to be processed."""
23+
super().__init__(self.message)

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/03-drop-backup-target.sql renamed to projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/02-drop-backup-target.sql

File renamed without changes.

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/02-drop-tmp-target.sql renamed to projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/03-drop-tmp-target.sql

File renamed without changes.
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
# Copyright 2023-2024 Broadcom
2+
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
import os
5+
6+
from vdk.api.job_input import IJobInput
7+
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
8+
from vdk.plugin.trino.trino_utils import CommonUtilities
9+
from vdk.plugin.trino.trino_utils import TrinoTemplateQueries
10+
11+
log = logging.getLogger(__name__)
12+
13+
SQL_FILES_FOLDER = (
14+
os.path.dirname(os.path.abspath(__file__)) + "/06-requisite-sql-scripts"
15+
)
16+
17+
"""
18+
This step is intened to handle quality checks if such provided
19+
and stop the data from being populated into the target table if the check has negative outcome.
20+
Otherwise the data will be directly processed according to the used template type
21+
"""
22+
23+
24+
def run(job_input: IJobInput):
25+
"""
26+
1. Delete tmp_target and backup_target table if exist
27+
2. create tmp_target
28+
3. Insert source view data to tmp_target
29+
4. if check,
30+
- create staging table
31+
- Use trino_utils function to move data from tmp_target to staging table
32+
- send staging table for check validation
33+
- If validated,
34+
- drop backup table
35+
- Use trino_utils function to move data from staging table to target table
36+
- else Raise error
37+
5. else,
38+
- check if tmp_target table has data
39+
- Use trino_utils function to move data from tmp target table to target table
40+
"""
41+
42+
job_arguments = job_input.get_arguments()
43+
44+
check = job_arguments.get("check")
45+
source_schema = job_arguments.get("source_schema")
46+
source_view = job_arguments.get("source_view")
47+
target_schema = job_arguments.get("target_schema")
48+
target_table = job_arguments.get("target_table")
49+
trino_queries = TrinoTemplateQueries(job_input)
50+
drop_table_query = CommonUtilities.get_file_content(
51+
SQL_FILES_FOLDER, "06-drop-table.sql"
52+
)
53+
create_table_query = CommonUtilities.get_file_content(
54+
SQL_FILES_FOLDER, "06-create-table.sql"
55+
)
56+
57+
backup_target_table = f"backup_{target_table}"
58+
tmp_target_table = f"tmp_{target_table}"
59+
60+
if check:
61+
staging_schema = job_arguments.get("staging_schema", target_schema)
62+
staging_table_name = CommonUtilities.get_staging_table_name(
63+
target_schema, target_table
64+
)
65+
staging_table = f"{staging_schema}.{staging_table_name}"
66+
target_table_full_name = f"{target_schema}.{target_table}"
67+
68+
# create staging table
69+
create_staging_table = create_table_query.format(
70+
table_schema=staging_schema,
71+
table_name=staging_table_name,
72+
target_schema=target_schema,
73+
target_table=target_table,
74+
)
75+
job_input.execute_query(create_staging_table)
76+
77+
# use trino_utils function to move data
78+
trino_queries.perform_safe_move_data_to_table_step(
79+
from_db=target_schema,
80+
from_table_name=tmp_target_table,
81+
to_db=staging_schema,
82+
to_table_name=staging_table_name,
83+
)
84+
85+
if check(staging_table):
86+
job_input.execute_query(
87+
f"SELECT * FROM information_schema.tables WHERE "
88+
f"table_schema = '{staging_schema}' AND table_name = '{staging_table_name}'"
89+
)
90+
91+
# Drop backup target if already exists
92+
drop_backup_target = drop_table_query.format(
93+
target_schema=target_schema, target_table=backup_target_table
94+
)
95+
job_input.execute_query(drop_backup_target)
96+
97+
# use trino_utils function to move data
98+
trino_queries.perform_safe_move_data_to_table_step(
99+
from_db=staging_schema,
100+
from_table_name=staging_table_name,
101+
to_db=target_schema,
102+
to_table_name=target_table,
103+
)
104+
105+
else:
106+
raise DataQualityException(
107+
checked_object=staging_table,
108+
source_view=f"{source_schema}.{source_view}",
109+
target_table=target_table_full_name,
110+
)
111+
112+
else:
113+
log.debug("Check if tmp target has data.")
114+
res = job_input.execute_query(
115+
f"""
116+
SELECT COUNT(*) FROM {target_schema}.{tmp_target_table}
117+
"""
118+
)
119+
if res and res[0][0] > 0:
120+
log.debug(
121+
"Confirmed that tmp target has data, proceed with moving it to target."
122+
)
123+
trino_queries.perform_safe_move_data_to_table_step(
124+
from_db=target_schema,
125+
from_table_name=tmp_target_table,
126+
to_db=target_schema,
127+
to_table_name=target_table,
128+
)
129+
else:
130+
log.info(
131+
f"Target table {target_schema}.{target_table} remains unchanged "
132+
f"because source table {target_schema}.{source_view} was empty."
133+
)
134+
trino_queries.drop_table(target_schema, tmp_target_table)

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/06-move-data-from-tmp-to-target.py

Lines changed: 0 additions & 58 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- create table with same schema as target table
2+
CREATE TABLE "{table_schema}"."{table_name}"(
3+
LIKE "{target_schema}"."{target_table}"
4+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE IF EXISTS "{target_schema}"."{target_table}"

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ In summary, it overwrites the target table with the source data.
1616
- target_table - database table of type 'Slowly Changing Dimension Type 1', where target data is loaded
1717
- source_schema - database schema, where source raw data is loaded from
1818
- source_view - database view, where source raw data is loaded from
19+
- check - (Optional) Callback function responsible for checking the quality of the data. Takes in a table name as a parameter which will be used for data validation
20+
- staging_schema - (Optional) Schema where the checks will be executed. If not provided target_schema will be used as default
1921

2022
### Prerequisites:
2123

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Copyright 2023-2024 Broadcom
2+
# SPDX-License-Identifier: Apache-2.0
3+
import logging
4+
5+
from vdk.api.job_input import IJobInput
6+
from vdk.plugin.trino.trino_utils import TrinoTemplateQueries
7+
8+
log = logging.getLogger(__name__)
9+
10+
11+
def run(job_input: IJobInput):
12+
"""
13+
In this step we try to recover potentially unexistent target table from backup.
14+
In some cases the template might fail during the step where new data is written in target table
15+
(last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and
16+
target table is no longer present. Fortunately it has a backup.
17+
So when the job is retried, this first step should recover the target (if the reason for the previous fail
18+
is no longer present).
19+
"""
20+
21+
args = job_input.get_arguments()
22+
target_schema = args.get("target_schema")
23+
target_table = args.get("target_table")
24+
trino_queries = TrinoTemplateQueries(job_input)
25+
26+
trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
(SELECT * FROM "{source_schema}"."{source_view}" LIMIT 0)
2+
UNION ALL
3+
(SELECT * FROM "{target_schema}"."{target_table}" LIMIT 0)

0 commit comments

Comments
 (0)