Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion discoverx/dx.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def __init__(

def _can_read_columns_table(self) -> bool:
try:
self.spark.sql(f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' LIMIT 1")
self.spark.sql(
f"SELECT * FROM {self.INFORMATION_SCHEMA}.columns WHERE table_catalog = 'system' AND table_schema = 'columns' LIMIT 1"
)
return True
except Exception as e:
self.logger.error(f"Error while reading table {self.INFORMATION_SCHEMA}.columns: {e}")
Expand Down
24 changes: 21 additions & 3 deletions discoverx/explorer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, from_tables, spark: SparkSession, info_fetcher: InfoFetcher)
self._sql_query_template = None
self._max_concurrency = 10
self._with_tags = False
self._having_tags = []

@staticmethod
def validate_from_components(from_tables: str):
Expand Down Expand Up @@ -70,6 +71,19 @@ def having_columns(self, *columns) -> "DataExplorer":
new_obj._having_columns.extend(columns)
return new_obj

def having_tag(self, tag_name: str, tag_value: str = None) -> "DataExplorer":
"""Will select tables tagged with the provided tag name and optionally value
either at table, schema, or catalog level.

Args:
tag_name (str): Tag name
tag_value (str, optional): Tag value. Defaults to None.
"""
new_obj = copy.deepcopy(self)
new_obj._having_tags.extend(TagInfo(tag_name, tag_value))
new_obj._with_tags = True
return new_obj

def with_concurrency(self, max_concurrency) -> "DataExplorer":
"""Sets the maximum number of concurrent queries to run"""
new_obj = copy.deepcopy(self)
Expand Down Expand Up @@ -140,7 +154,9 @@ def scan(
self._catalogs,
self._schemas,
self._tables,
self._info_fetcher.get_tables_info(self._catalogs, self._schemas, self._tables, self._having_columns),
self._info_fetcher.get_tables_info(
self._catalogs, self._schemas, self._tables, self._having_columns, self._having_tags
),
custom_rules=custom_rules,
locale=locale,
)
Expand All @@ -163,6 +179,7 @@ def map(self, f) -> list[any]:
self._tables,
self._having_columns,
self._with_tags,
self._having_tags,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=self._max_concurrency) as executor:
# Submit tasks to the thread pool
Expand Down Expand Up @@ -203,9 +220,9 @@ def _get_stack_string_columns_expression(table_info: TableInfo) -> str:
@staticmethod
def _build_sql(sql_template: str, table_info: TableInfo) -> str:
if table_info.catalog and table_info.catalog != "None":
full_table_name = f"{table_info.catalog}.{table_info.schema}.{table_info.table}"
full_table_name = f"`{table_info.catalog}`.`{table_info.schema}`.`{table_info.table}`"
else:
full_table_name = f"{table_info.schema}.{table_info.table}"
full_table_name = f"`{table_info.schema}`.`{table_info.table}`"

stack_string_columns = DataExplorerActions._get_stack_string_columns_expression(table_info)

Expand Down Expand Up @@ -244,6 +261,7 @@ def _get_sql_commands(self, data_explorer: DataExplorer) -> list[tuple[str, Tabl
data_explorer._tables,
data_explorer._having_columns,
data_explorer._with_tags,
data_explorer._having_tags,
)
sql_commands = [
(
Expand Down
26 changes: 24 additions & 2 deletions discoverx/table_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def get_tables_info(
schemas: str,
tables: str,
columns: list[str] = [],
with_tags=False,
with_tags: bool = False,
having_tags: list[TagInfo] = [],
) -> list[TableInfo]:
# Filter tables by matching filter
table_list_sql = self._get_table_list_sql(catalogs, schemas, tables, columns, with_tags)
Expand All @@ -120,7 +121,28 @@ def get_tables_info(
if len(filtered_tables) == 0:
raise ValueError(f"No tables found matching filter: {catalogs}.{schemas}.{tables}")

return self._to_info_list(filtered_tables)
info_list = self._to_info_list(filtered_tables)
return [info for info in info_list if InfoFetcher._contains_all_tags(info.tags, having_tags)]

@staticmethod
def _contains_all_tags(tags_info: TagsInfo, tags: list[TagInfo]) -> bool:
if not tags:
return True
if not tags_info:
return False

all_tags = []

if tags_info.catalog_tags:
all_tags.extend(tags_info.catalog_tags)

if tags_info.schema_tags:
all_tags.extend(tags_info.schema_tags)

if tags_info.table_tags:
all_tags.extend(tags_info.table_tags)

return all([tag in all_tags for tag in tags])

def _get_table_list_sql(
self,
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/explorer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def test_validate_from_components():

def test_build_sql(sample_table_info):
sql_template = "SELECT * FROM {full_table_name}"
expected_sql = "SELECT * FROM catalog1.schema1.table1"
expected_sql = "SELECT * FROM `catalog1`.`schema1`.`table1`"
assert DataExplorerActions._build_sql(sql_template, sample_table_info) == expected_sql


Expand Down
23 changes: 23 additions & 0 deletions tests/unit/table_info_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import pytest
from discoverx.explorer import InfoFetcher, TagsInfo, TagInfo


def test_validate_from_components():
info_table = TagsInfo([], [TagInfo("a", "v1")], [], [])
info_schema = TagsInfo([], [], [TagInfo("a", "v1")], [])
info_catalog = TagsInfo([], [], [], [TagInfo("a", "v1")])
info_no_tags = TagsInfo([], [], [], [])

assert InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v1")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", "v2")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("b", "v1")])
assert not InfoFetcher._contains_all_tags(info_table, [TagInfo("a", None)])
# If no tags to check, then it should be true
assert InfoFetcher._contains_all_tags(info_table, [])

assert InfoFetcher._contains_all_tags(info_schema, [TagInfo("a", "v1")])

assert InfoFetcher._contains_all_tags(info_catalog, [TagInfo("a", "v1")])

assert InfoFetcher._contains_all_tags(info_no_tags, [])
assert not InfoFetcher._contains_all_tags(info_no_tags, [TagInfo("a", "v1")])