diff --git a/CHANGELOG.md b/CHANGELOG.md index d5743e07..eaaf9556 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [Unreleased] +## [6.0.1] - 2025-12-25 ### Fixed - Changed extra's name back to `aio` +- Fixed handling of datetime columns in old pandas versions. - Fixed encoding error in `ingest_from_dataframe` when using csv data format. ## [6.0.0] - 2025-11-26 diff --git a/azure-kusto-data/azure/kusto/data/helpers.py b/azure-kusto-data/azure/kusto/data/helpers.py index 7df38df3..d09613aa 100644 --- a/azure-kusto-data/azure/kusto/data/helpers.py +++ b/azure-kusto-data/azure/kusto/data/helpers.py @@ -1,17 +1,17 @@ import json from functools import lru_cache from pathlib import Path -from typing import TYPE_CHECKING, Union, Callable, Dict, Optional +from typing import TYPE_CHECKING, Any, Union, Callable, Optional if TYPE_CHECKING: import pandas as pd from azure.kusto.data._models import KustoResultTable, KustoStreamingResultTable # Alias for dataframe_from_result_table converter type -Converter = Dict[str, Union[str, Callable[[str, "pd.DataFrame"], "pd.Series"]]] +Converter = dict[str, Union[str, Callable[[str, "pd.DataFrame"], "pd.Series"]]] -def load_bundled_json(file_name: str) -> Dict: +def load_bundled_json(file_name: str) -> dict[Any, Any]: filename = Path(__file__).absolute().parent.joinpath(file_name) with filename.open("r", encoding="utf-8") as data: return json.load(data) @@ -113,23 +113,24 @@ def parse_float(frame, col): import numpy as np import pandas as pd - frame[col] = frame[col].replace("NaN", np.nan).replace("Infinity", np.inf).replace("-Infinity", -np.inf) + frame[col] = frame[col].infer_objects(copy=False).replace({"NaN": np.nan, "Infinity": np.inf, "-Infinity": -np.inf}) frame[col] = pd.to_numeric(frame[col], errors="coerce").astype(pd.Float64Dtype()) # pyright: ignore[reportCallIssue,reportArgumentType] + return frame[col] -def parse_datetime(frame, col): +def parse_datetime(frame, col, force_version: Optional[str] = None) -> "pd.Series": # Pandas before version 2 doesn't support the "format" arg import pandas as pd args = {} - if pd.__version__.startswith("2."): + if (force_version or pd.__version__).startswith("2."): args = {"format": "ISO8601", "utc": True} else: # if frame contains ".", replace "Z" with ".000Z" - # == False is not a mistake - that's the pandas way to do it - contains_dot = frame[col].str.contains(".") - frame.loc[not contains_dot, col] = frame.loc[not contains_dot, col].str.replace("Z", ".000Z") + # Using bitwise NOT (~) on the boolean Series is the idiomatic pandas way to negate the mask + contains_dot = frame[col].str.contains("\\.") + frame.loc[~contains_dot, col] = frame.loc[~contains_dot, col].str.replace("Z", ".000Z") frame[col] = pd.to_datetime(frame[col], errors="coerce", **args) return frame[col] diff --git a/azure-kusto-data/tests/test_helpers.py b/azure-kusto-data/tests/test_helpers.py index 4bfbee90..b9cf4bae 100644 --- a/azure-kusto-data/tests/test_helpers.py +++ b/azure-kusto-data/tests/test_helpers.py @@ -7,7 +7,7 @@ import pytest from azure.kusto.data._models import KustoResultTable -from azure.kusto.data.helpers import dataframe_from_result_table +from azure.kusto.data.helpers import dataframe_from_result_table, parse_datetime from azure.kusto.data.response import KustoResponseDataSetV2 import pandas import numpy @@ -128,3 +128,31 @@ def test_pandas_mixed_date(): assert df["Date"][0] == pandas.Timestamp(year=2023, month=12, day=12, hour=1, minute=59, second=59, microsecond=352000, tzinfo=datetime.timezone.utc) assert df["Date"][1] == pandas.Timestamp(year=2023, month=12, day=12, hour=1, minute=54, second=44, tzinfo=datetime.timezone.utc) + + +def test_datetime_parsing(): + """Test parse_datetime function with different pandas versions and datetime formats""" + + # Test with pandas v2 behavior (force version 2) + df_v2 = pandas.DataFrame( + { + "mixed": ["2023-12-12T01:59:59.352Z", "2023-12-12T01:54:44Z"], + } + ) + + # Force pandas v2 behavior + result_v2 = parse_datetime(df_v2, "mixed", force_version="2.0.0") + assert str(result_v2[0]) == "2023-12-12 01:59:59.352000+00:00" + assert str(result_v2[1]) == "2023-12-12 01:54:44+00:00" + # Test with pandas v1 behavior (force version 1) + + df_v1 = pandas.DataFrame( + { + "mixed": ["2023-12-12T01:59:59.352Z", "2023-12-12T01:54:44Z"], + } + ) + + # Force pandas v1 behavior - it should add .000 to dates without milliseconds + result_v1 = parse_datetime(df_v1, "mixed", force_version="1.5.3") + assert str(result_v1[0]) == "2023-12-12 01:59:59.352000+00:00" + assert str(result_v1[1]) == "2023-12-12 01:54:44+00:00" diff --git a/azure-kusto-ingest/tests/test_e2e_ingest.py b/azure-kusto-ingest/tests/test_e2e_ingest.py index 6c84276d..72fcd469 100644 --- a/azure-kusto-ingest/tests/test_e2e_ingest.py +++ b/azure-kusto-ingest/tests/test_e2e_ingest.py @@ -201,377 +201,453 @@ def setup_class(cls): cls.json_file_path = os.path.join(cls.input_folder_path, "dataset.json") cls.zipped_json_file_path = os.path.join(cls.input_folder_path, "dataset.jsonz.gz") - cls.current_count = 0 - - cls.client.execute( - cls.test_db, - f".create table {cls.test_table} (rownumber: int, rowguid: string, xdouble: real, xfloat: real, xbool: bool, xint16: int, xint32: int, xint64: long, xuint8: long, xuint16: long, xuint32: long, xuint64: long, xdate: datetime, xsmalltext: string, xtext: string, xnumberAsText: string, xtime: timespan, xtextWithNulls: string, xdynamicWithNulls: dynamic)", - ) - cls.client.execute(cls.test_db, f".create table {cls.test_table} ingestion json mapping 'JsonMapping' {cls.table_json_mapping_reference()}") - - cls.client.execute(cls.test_db, f".alter table {cls.test_table} policy streamingingestion enable ") - - # Clear the cache to guarantee that subsequent streaming ingestion requests incorporate database and table schema changes - # See https://docs.microsoft.com/azure/data-explorer/kusto/management/data-ingestion/clear-schema-cache-command - cls.client.execute(cls.test_db, ".clear database cache streamingingestion schema") - @classmethod def teardown_class(cls): - cls.client.execute(cls.test_db, ".drop table {} ifexists".format(cls.test_table)) cls.client.close() cls.ingest_client.close() cls.streaming_ingest_client.close() cls.managed_streaming_ingest_client.close() + @classmethod + async def init_table(cls, test_name: str) -> str: + table_name = f"{cls.test_table}_{test_name}_{str(int(time.time()))}_{random.randint(1, 100000)}" + async with await cls.get_async_client() as async_client: + await async_client.execute( + cls.test_db, + f".create table {table_name} (rownumber: int, rowguid: string, xdouble: real, xfloat: real, xbool: bool, xint16: int, xint32: int, xint64: long, xuint8: long, xuint16: long, xuint32: long, xuint64: long, xdate: datetime, xsmalltext: string, xtext: string, xnumberAsText: string, xtime: timespan, xtextWithNulls: string, xdynamicWithNulls: dynamic)", + ) + await async_client.execute(cls.test_db, f".create table {table_name} ingestion json mapping 'JsonMapping' {cls.table_json_mapping_reference()}") + + await async_client.execute(cls.test_db, f".alter table {table_name} policy streamingingestion enable ") + + # Clear the cache to guarantee that subsequent streaming ingestion requests incorporate database and table schema changes + # See https://docs.microsoft.com/azure/data-explorer/kusto/management/data-ingestion/clear-schema-cache-command + await async_client.execute(cls.test_db, ".clear database cache streamingingestion schema") + return table_name + + @classmethod + async def drop_table(cls, table_name: str): + async with await cls.get_async_client() as async_client: + await async_client.execute(cls.test_db, f".drop table {table_name} ifexists") + @classmethod async def get_async_client(cls) -> AsyncKustoClient: return AsyncKustoClient(cls.engine_kcsb_from_env(is_async=True)) # assertions @classmethod - async def assert_rows_added(cls, expected: int, timeout=60): + async def assert_rows_added(cls, table_name: str, expected: int, timeout: int = 120): actual = 0 while timeout > 0: time.sleep(1) timeout -= 1 try: - command = "{} | count".format(cls.test_table) - response = cls.client.execute(cls.test_db, command) - async_client = await cls.get_async_client() - response_from_async = await async_client.execute(cls.test_db, command) + async with await cls.get_async_client() as async_client: + command = "{} | count".format(table_name) + response = cls.client.execute(cls.test_db, command) + response_from_async = await async_client.execute(cls.test_db, command) except KustoServiceError: continue if response is not None: row = response.primary_results[0][0] row_async = response_from_async.primary_results[0][0] - actual = int(row["Count"]) - cls.current_count + actual = int(row["Count"]) + actual_async = int(row_async["Count"]) # this is done to allow for data to arrive properly - if actual >= expected: - assert row_async == row, "Mismatch answers between async('{0}') and sync('{1}') clients".format(row_async, row) + if actual >= expected and actual_async >= expected: break - - cls.current_count += actual assert actual == expected, "Row count expected = {0}, while actual row count = {1}".format(expected, actual) @pytest.mark.asyncio async def test_csv_ingest_existing_table(self, is_managed_streaming): - csv_ingest_props = IngestionProperties( - self.test_db, - self.test_table, - data_format=DataFormat.CSV, - column_mappings=self.get_test_table_csv_mappings(), - report_level=ReportLevel.FailuresAndSuccesses, - flush_immediately=True, - ) + table = await self.init_table("csv_ingest_existing_table") + try: + csv_ingest_props = IngestionProperties( + self.test_db, + table, + data_format=DataFormat.CSV, + column_mappings=self.get_test_table_csv_mappings(), + report_level=ReportLevel.FailuresAndSuccesses, + flush_immediately=True, + ) - client = self.streaming_ingest_client if is_managed_streaming else self.ingest_client + client = self.streaming_ingest_client if is_managed_streaming else self.ingest_client - for f in [self.csv_file_path, self.zipped_csv_file_path]: - client.ingest_from_file(f, csv_ingest_props) + for f in [self.csv_file_path, self.zipped_csv_file_path]: + client.ingest_from_file(f, csv_ingest_props) - await self.assert_rows_added(20) + await self.assert_rows_added(table, 20) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_csv_ingest_ignore_first_record(self, is_managed_streaming): - csv_ingest_props = IngestionProperties( - self.test_db, - self.test_table, - data_format=DataFormat.CSV, - column_mappings=self.get_test_table_csv_mappings(), - report_level=ReportLevel.FailuresAndSuccesses, - flush_immediately=True, - ignore_first_record=True, - ) + table = await self.init_table("csv_ingest_ignore_first_record" + ("_managed" if is_managed_streaming else "")) + try: + csv_ingest_props = IngestionProperties( + self.test_db, + table, + data_format=DataFormat.CSV, + column_mappings=self.get_test_table_csv_mappings(), + report_level=ReportLevel.FailuresAndSuccesses, + flush_immediately=True, + ignore_first_record=True, + ) - for f in [self.csv_file_path, self.zipped_csv_file_path]: - self.ingest_client.ingest_from_file(f, csv_ingest_props) + client = self.streaming_ingest_client if is_managed_streaming else self.ingest_client - await self.assert_rows_added(18) + for f in [self.csv_file_path, self.zipped_csv_file_path]: + client.ingest_from_file(f, csv_ingest_props) + + if is_managed_streaming: + await self.assert_rows_added(table, 20) + else: + await self.assert_rows_added(table, 18) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_json_ingest_existing_table(self): - json_ingestion_props = IngestionProperties( - self.test_db, - self.test_table, - flush_immediately=True, - data_format=DataFormat.JSON, - column_mappings=self.table_json_mappings(), - report_level=ReportLevel.FailuresAndSuccesses, - ) + table = await self.init_table("json_ingest_existing_table") + try: + json_ingestion_props = IngestionProperties( + self.test_db, + table, + flush_immediately=True, + data_format=DataFormat.JSON, + column_mappings=self.table_json_mappings(), + report_level=ReportLevel.FailuresAndSuccesses, + ) - for f in [self.json_file_path, self.zipped_json_file_path]: - self.ingest_client.ingest_from_file(f, json_ingestion_props) + for f in [self.json_file_path, self.zipped_json_file_path]: + self.ingest_client.ingest_from_file(f, json_ingestion_props) - await self.assert_rows_added(4) + await self.assert_rows_added(table, 4) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_json_ingest_existing_table_no_mapping(self): - json_ingestion_props = IngestionProperties( - self.test_db, - self.test_table, - flush_immediately=True, - data_format=DataFormat.JSON, - report_level=ReportLevel.FailuresAndSuccesses, - ) + table = await self.init_table("json_ingest_existing_table_no_mapping") + try: + json_ingestion_props = IngestionProperties( + self.test_db, + table, + flush_immediately=True, + data_format=DataFormat.JSON, + report_level=ReportLevel.FailuresAndSuccesses, + ) - for f in [self.json_file_path, self.zipped_json_file_path]: - self.ingest_client.ingest_from_file(f, json_ingestion_props) + for f in [self.json_file_path, self.zipped_json_file_path]: + self.ingest_client.ingest_from_file(f, json_ingestion_props) - await self.assert_rows_added(4) + await self.assert_rows_added(table, 4) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_ingest_complicated_props(self): - validation_policy = ValidationPolicy( - validation_options=ValidationOptions.ValidateCsvInputConstantColumns, validation_implications=ValidationImplications.Fail - ) - json_ingestion_props = IngestionProperties( - self.test_db, - self.test_table, - data_format=DataFormat.JSON, - column_mappings=self.table_json_mappings(), - additional_tags=["a", "b"], - ingest_if_not_exists=["aaaa", "bbbb"], - ingest_by_tags=["ingestByTag"], - drop_by_tags=["drop", "drop-by"], - flush_immediately=False, - report_level=ReportLevel.FailuresAndSuccesses, - report_method=ReportMethod.Queue, - validation_policy=validation_policy, - ) + table = await self.init_table("ingest_complicated_props") + try: + validation_policy = ValidationPolicy( + validation_options=ValidationOptions.ValidateCsvInputConstantColumns, + validation_implications=ValidationImplications.Fail, + ) + json_ingestion_props = IngestionProperties( + self.test_db, + table, + data_format=DataFormat.JSON, + column_mappings=self.table_json_mappings(), + additional_tags=["a", "b"], + ingest_if_not_exists=["aaaa", "bbbb"], + ingest_by_tags=["ingestByTag"], + drop_by_tags=["drop", "drop-by"], + flush_immediately=False, + report_level=ReportLevel.FailuresAndSuccesses, + report_method=ReportMethod.Queue, + validation_policy=validation_policy, + ) - file_paths = [self.json_file_path, self.zipped_json_file_path] - fds = [FileDescriptor(fp, 0, uuid.uuid4()) for fp in file_paths] + file_paths = [self.json_file_path, self.zipped_json_file_path] + fds = [FileDescriptor(fp, 0, uuid.uuid4()) for fp in file_paths] - for fd in fds: - self.ingest_client.ingest_from_file(fd, json_ingestion_props) + for fd in fds: + self.ingest_client.ingest_from_file(fd, json_ingestion_props) - await self.assert_rows_added(4) + await self.assert_rows_added(table, 4) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_ingest_from_stream(self, is_managed_streaming): - validation_policy = ValidationPolicy( - validation_options=ValidationOptions.ValidateCsvInputConstantColumns, validation_implications=ValidationImplications.Fail - ) - json_ingestion_props = IngestionProperties( - self.test_db, - self.test_table, - data_format=DataFormat.JSON, - column_mappings=self.table_json_mappings(), - additional_tags=["a", "b"], - ingest_if_not_exists=["aaaa", "bbbb"], - ingest_by_tags=["ingestByTag"], - drop_by_tags=["drop", "drop-by"], - flush_immediately=False, - report_level=ReportLevel.FailuresAndSuccesses, - report_method=ReportMethod.Queue, - validation_policy=validation_policy, - ) - text = io.StringIO(pathlib.Path(self.json_file_path).read_text()) - zipped = io.BytesIO(pathlib.Path(self.zipped_json_file_path).read_bytes()) + table = await self.init_table("ingest_from_stream" + ("_managed" if is_managed_streaming else "")) + try: + validation_policy = ValidationPolicy( + validation_options=ValidationOptions.ValidateCsvInputConstantColumns, + validation_implications=ValidationImplications.Fail, + ) + json_ingestion_props = IngestionProperties( + self.test_db, + table, + data_format=DataFormat.JSON, + column_mappings=self.table_json_mappings(), + additional_tags=["a", "b"], + ingest_if_not_exists=["aaaa", "bbbb"], + ingest_by_tags=["ingestByTag"], + drop_by_tags=["drop", "drop-by"], + flush_immediately=False, + report_level=ReportLevel.FailuresAndSuccesses, + report_method=ReportMethod.Queue, + validation_policy=validation_policy, + ) + text = io.StringIO(pathlib.Path(self.json_file_path).read_text()) + zipped = io.BytesIO(pathlib.Path(self.zipped_json_file_path).read_bytes()) - client = self.managed_streaming_ingest_client if is_managed_streaming else self.ingest_client + client = self.managed_streaming_ingest_client if is_managed_streaming else self.ingest_client - client.ingest_from_stream(text, json_ingestion_props) - client.ingest_from_stream(StreamDescriptor(zipped, is_compressed=True), json_ingestion_props) + client.ingest_from_stream(text, json_ingestion_props) + client.ingest_from_stream(StreamDescriptor(zipped, is_compressed=True), json_ingestion_props) - await self.assert_rows_added(4) + await self.assert_rows_added(table, 4) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_json_ingestion_ingest_by_tag(self): - json_ingestion_props = IngestionProperties( - self.test_db, - self.test_table, - data_format=DataFormat.JSON, - column_mappings=self.table_json_mappings(), - ingest_if_not_exists=["ingestByTag"], - report_level=ReportLevel.FailuresAndSuccesses, - drop_by_tags=["drop", "drop-by"], - flush_immediately=True, - ) + table = await self.init_table("json_ingestion_ingest_by_tag") + try: + json_ingestion_props = IngestionProperties( + self.test_db, + table, + data_format=DataFormat.JSON, + column_mappings=self.table_json_mappings(), + ingest_if_not_exists=["ingestByTag"], + report_level=ReportLevel.FailuresAndSuccesses, + drop_by_tags=["drop", "drop-by"], + flush_immediately=True, + ) - for f in [self.json_file_path, self.zipped_json_file_path]: - self.ingest_client.ingest_from_file(f, json_ingestion_props) + for f in [self.json_file_path, self.zipped_json_file_path]: + self.ingest_client.ingest_from_file(f, json_ingestion_props) - await self.assert_rows_added(0) + await self.assert_rows_added(table, 0) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_tsv_ingestion_csv_mapping(self): - tsv_ingestion_props = IngestionProperties( - self.test_db, - self.test_table, - flush_immediately=True, - data_format=DataFormat.TSV, - column_mappings=self.get_test_table_csv_mappings(), - report_level=ReportLevel.FailuresAndSuccesses, - ) + table = await self.init_table("tsv_ingestion_csv_mapping") + try: + tsv_ingestion_props = IngestionProperties( + self.test_db, + table, + flush_immediately=True, + data_format=DataFormat.TSV, + column_mappings=self.get_test_table_csv_mappings(), + report_level=ReportLevel.FailuresAndSuccesses, + ) - self.ingest_client.ingest_from_file(self.tsv_file_path, tsv_ingestion_props) + self.ingest_client.ingest_from_file(self.tsv_file_path, tsv_ingestion_props) - await self.assert_rows_added(10) + await self.assert_rows_added(table, 10) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_ingest_blob(self): if not self.test_blob: pytest.skip("Provide blob SAS uri with 'dataset.csv'") - csv_ingest_props = IngestionProperties( - self.test_db, - self.test_table, - data_format=DataFormat.CSV, - column_mappings=self.get_test_table_csv_mappings(), - report_level=ReportLevel.FailuresAndSuccesses, - flush_immediately=True, - ) + table = await self.init_table("ingest_blob") + try: + csv_ingest_props = IngestionProperties( + self.test_db, + table, + data_format=DataFormat.CSV, + column_mappings=self.get_test_table_csv_mappings(), + report_level=ReportLevel.FailuresAndSuccesses, + flush_immediately=True, + ) - blob_len = 1578 - self.ingest_client.ingest_from_blob(BlobDescriptor(self.test_blob, blob_len), csv_ingest_props) + blob_len = 1578 + self.ingest_client.ingest_from_blob(BlobDescriptor(self.test_blob, blob_len), csv_ingest_props) - await self.assert_rows_added(10) + await self.assert_rows_added(table, 10) - # Don't provide size hint - self.ingest_client.ingest_from_blob(BlobDescriptor(self.test_blob, size=None), csv_ingest_props) + # Don't provide size hint + self.ingest_client.ingest_from_blob(BlobDescriptor(self.test_blob, size=None), csv_ingest_props) - await self.assert_rows_added(10) + await self.assert_rows_added(table, 10) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_streaming_ingest_from_opened_file(self, is_managed_streaming): - ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, data_format=DataFormat.CSV) + table = await self.init_table("streaming_ingest_from_opened_file" + ("_managed" if is_managed_streaming else "")) + try: + ingestion_properties = IngestionProperties(database=self.test_db, table=table, data_format=DataFormat.CSV) - client = self.managed_streaming_ingest_client if is_managed_streaming else self.streaming_ingest_client - with open(self.csv_file_path, "r") as stream: - client.ingest_from_stream(stream, ingestion_properties=ingestion_properties) + client = self.managed_streaming_ingest_client if is_managed_streaming else self.streaming_ingest_client + with open(self.csv_file_path, "r") as stream: + client.ingest_from_stream(stream, ingestion_properties=ingestion_properties) - await self.assert_rows_added(10, timeout=120) + await self.assert_rows_added(table, 10, timeout=120) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_streaming_ingest_from_csv_file(self): - ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True, data_format=DataFormat.CSV) + table = await self.init_table("streaming_ingest_from_csv_file") + try: + ingestion_properties = IngestionProperties(database=self.test_db, table=table, flush_immediately=True, data_format=DataFormat.CSV) - for f in [self.csv_file_path, self.zipped_csv_file_path]: - self.streaming_ingest_client.ingest_from_file(f, ingestion_properties=ingestion_properties) + for f in [self.csv_file_path, self.zipped_csv_file_path]: + self.streaming_ingest_client.ingest_from_file(f, ingestion_properties=ingestion_properties) - await self.assert_rows_added(20, timeout=120) + await self.assert_rows_added(table, 20, timeout=120) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_streaming_ingest_from_json_file(self): - ingestion_properties = IngestionProperties( - database=self.test_db, - table=self.test_table, - flush_immediately=True, - data_format=DataFormat.JSON, - ingestion_mapping_reference="JsonMapping", - ingestion_mapping_kind=IngestionMappingKind.JSON, - ) + table = await self.init_table("streaming_ingest_from_json_file") + try: + ingestion_properties = IngestionProperties( + database=self.test_db, + table=table, + flush_immediately=True, + data_format=DataFormat.JSON, + ingestion_mapping_reference="JsonMapping", + ingestion_mapping_kind=IngestionMappingKind.JSON, + ) - for f in [self.json_file_path, self.zipped_json_file_path]: - self.streaming_ingest_client.ingest_from_file(f, ingestion_properties=ingestion_properties) + for f in [self.json_file_path, self.zipped_json_file_path]: + self.streaming_ingest_client.ingest_from_file(f, ingestion_properties=ingestion_properties) - await self.assert_rows_added(4, timeout=120) + await self.assert_rows_added(table, 4, timeout=120) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_streaming_ingest_from_csv_io_streams(self): - ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, data_format=DataFormat.CSV) - byte_sequence = b'0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,"Zero",0,00:00:00,,null' - bytes_stream = io.BytesIO(byte_sequence) - self.streaming_ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties) + table = await self.init_table("streaming_ingest_from_csv_io_streams") + try: + ingestion_properties = IngestionProperties(database=self.test_db, table=table, data_format=DataFormat.CSV) + byte_sequence = b'0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,"Zero",0,00:00:00,,null' + bytes_stream = io.BytesIO(byte_sequence) + self.streaming_ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties) - str_sequence = '0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,"Zero",0,00:00:00,,null' - str_stream = io.StringIO(str_sequence) - self.streaming_ingest_client.ingest_from_stream(str_stream, ingestion_properties=ingestion_properties) + str_sequence = '0,00000000-0000-0000-0001-020304050607,0,0,0,0,0,0,0,0,0,0,2014-01-01T01:01:01.0000000Z,Zero,"Zero",0,00:00:00,,null' + str_stream = io.StringIO(str_sequence) + self.streaming_ingest_client.ingest_from_stream(str_stream, ingestion_properties=ingestion_properties) - await self.assert_rows_added(2, timeout=120) + await self.assert_rows_added(table, 2, timeout=120) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_streaming_ingest_from_json_io_streams(self): - ingestion_properties = IngestionProperties( - database=self.test_db, - table=self.test_table, - data_format=DataFormat.JSON, - flush_immediately=True, - ingestion_mapping_reference="JsonMapping", - ingestion_mapping_kind=IngestionMappingKind.JSON, - ) + table = await self.init_table("streaming_ingest_from_json_io_streams") + try: + ingestion_properties = IngestionProperties( + database=self.test_db, + table=table, + data_format=DataFormat.JSON, + flush_immediately=True, + ingestion_mapping_reference="JsonMapping", + ingestion_mapping_kind=IngestionMappingKind.JSON, + ) - byte_sequence = b'{"rownumber": 0, "rowguid": "00000000-0000-0000-0001-020304050607", "xdouble": 0.0, "xfloat": 0.0, "xbool": 0, "xint16": 0, "xint32": 0, "xint64": 0, "xunit8": 0, "xuint16": 0, "xunit32": 0, "xunit64": 0, "xdate": "2014-01-01T01:01:01Z", "xsmalltext": "Zero", "xtext": "Zero", "xnumberAsText": "0", "xtime": "00:00:00", "xtextWithNulls": null, "xdynamicWithNulls": ""}' - bytes_stream = io.BytesIO(byte_sequence) - self.streaming_ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties) + byte_sequence = b'{"rownumber": 0, "rowguid": "00000000-0000-0000-0001-020304050607", "xdouble": 0.0, "xfloat": 0.0, "xbool": 0, "xint16": 0, "xint32": 0, "xint64": 0, "xunit8": 0, "xuint16": 0, "xunit32": 0, "xunit64": 0, "xdate": "2014-01-01T01:01:01Z", "xsmalltext": "Zero", "xtext": "Zero", "xnumberAsText": "0", "xtime": "00:00:00", "xtextWithNulls": null, "xdynamicWithNulls": ""}' + bytes_stream = io.BytesIO(byte_sequence) + self.streaming_ingest_client.ingest_from_stream(bytes_stream, ingestion_properties=ingestion_properties) - str_sequence = '{"rownumber": 0, "rowguid": "00000000-0000-0000-0001-020304050607", "xdouble": 0.0, "xfloat": 0.0, "xbool": 0, "xint16": 0, "xint32": 0, "xint64": 0, "xunit8": 0, "xuint16": 0, "xunit32": 0, "xunit64": 0, "xdate": "2014-01-01T01:01:01Z", "xsmalltext": "Zero", "xtext": "Zero", "xnumberAsText": "0", "xtime": "00:00:00", "xtextWithNulls": null, "xdynamicWithNulls": ""}' - str_stream = io.StringIO(str_sequence) - self.streaming_ingest_client.ingest_from_stream(str_stream, ingestion_properties=ingestion_properties) + str_sequence = '{"rownumber": 0, "rowguid": "00000000-0000-0000-0001-020304050607", "xdouble": 0.0, "xfloat": 0.0, "xbool": 0, "xint16": 0, "xint32": 0, "xint64": 0, "xunit8": 0, "xuint16": 0, "xunit32": 0, "xunit64": 0, "xdate": "2014-01-01T01:01:01Z", "xsmalltext": "Zero", "xtext": "Zero", "xnumberAsText": "0", "xtime": "00:00:00", "xtextWithNulls": null, "xdynamicWithNulls": ""}' + str_stream = io.StringIO(str_sequence) + self.streaming_ingest_client.ingest_from_stream(str_stream, ingestion_properties=ingestion_properties) - await self.assert_rows_added(2, timeout=120) + await self.assert_rows_added(table, 2, timeout=120) + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_streaming_ingest_from_dataframe(self): - from pandas import DataFrame - - fields = [ - "rownumber", - "rowguid", - "xdouble", - "xfloat", - "xbool", - "xint16", - "xint32", - "xint64", - "xunit8", - "xuint16", - "xunit32", - "xunit64", - "xdate", - "xsmalltext", - "xtext", - "xnumberAsText", - "xtime", - "xtextWithNulls", - "xdynamicWithNulls", - ] - - guid = uuid.uuid4() - - dynamic_value = ["me@dummy.com", "you@dummy.com", "them@dummy.com"] - rows = [[0, str(guid), 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, dynamic_value]] - df = DataFrame(data=rows, columns=fields) - ingestion_properties = IngestionProperties(database=self.test_db, table=self.test_table, flush_immediately=True) - self.ingest_client.ingest_from_dataframe(df, ingestion_properties) - - await self.assert_rows_added(1, timeout=120) - - a = self.client.execute(self.test_db, f"{self.test_table} | where rowguid == '{guid}'") - assert a.primary_results[0].rows[0]["xdynamicWithNulls"] == dynamic_value + table = await self.init_table("streaming_ingest_from_dataframe") + try: + from pandas import DataFrame + + fields = [ + "rownumber", + "rowguid", + "xdouble", + "xfloat", + "xbool", + "xint16", + "xint32", + "xint64", + "xunit8", + "xuint16", + "xunit32", + "xunit64", + "xdate", + "xsmalltext", + "xtext", + "xnumberAsText", + "xtime", + "xtextWithNulls", + "xdynamicWithNulls", + ] + + guid = uuid.uuid4() + + dynamic_value = ["me@dummy.com", "you@dummy.com", "them@dummy.com"] + rows = [[0, str(guid), 0.0, 0.0, 0, 0, 0, 0, 0, 0, 0, 0, "2014-01-01T01:01:01Z", "Zero", "Zero", "0", "00:00:00", None, dynamic_value]] + df = DataFrame(data=rows, columns=fields) + ingestion_properties = IngestionProperties(database=self.test_db, table=table, flush_immediately=True) + self.ingest_client.ingest_from_dataframe(df, ingestion_properties) + + await self.assert_rows_added(table, 1, timeout=120) + + a = self.client.execute(self.test_db, f"{table} | where rowguid == '{guid}'") + assert a.primary_results[0].rows[0]["xdynamicWithNulls"] == dynamic_value + finally: + await self.drop_table(table) @pytest.mark.asyncio async def test_streaming_ingest_from_blob(self, is_managed_streaming): - ingestion_properties = IngestionProperties( - database=self.test_db, - table=self.test_table, - data_format=DataFormat.JSON, - ingestion_mapping_reference="JsonMapping", - ingestion_mapping_kind=IngestionMappingKind.JSON, - ) - export_containers_list = self.ingest_client._resource_manager._kusto_client.execute("NetDefaultDB", ".show export containers") - containers = [_ResourceUri(s["StorageRoot"]) for s in export_containers_list.primary_results[0]] - - for c in containers: - self.ingest_client._resource_manager._ranked_storage_account_set.add_storage_account(c.storage_account_name) - - with FileDescriptor(self.json_file_path).open(False) as stream: - blob_descriptor = self.ingest_client.upload_blob( - containers, - FileDescriptor(self.json_file_path), - ingestion_properties.database, - ingestion_properties.table, - stream, - None, - 10 * 60, - 3, + table = await self.init_table("streaming_ingest_from_blob" + ("_managed" if is_managed_streaming else "")) + try: + ingestion_properties = IngestionProperties( + database=self.test_db, + table=table, + data_format=DataFormat.JSON, + ingestion_mapping_reference="JsonMapping", + ingestion_mapping_kind=IngestionMappingKind.JSON, ) - if is_managed_streaming: - self.managed_streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties) - else: - self.streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties) - - await self.assert_rows_added(2, timeout=120) + export_containers_list = self.ingest_client._resource_manager._kusto_client.execute("NetDefaultDB", ".show export containers") + containers = [_ResourceUri(s["StorageRoot"]) for s in export_containers_list.primary_results[0]] + + for c in containers: + self.ingest_client._resource_manager._ranked_storage_account_set.add_storage_account(c.storage_account_name) + + with FileDescriptor(self.json_file_path).open(False) as stream: + blob_descriptor = self.ingest_client.upload_blob( + containers, + FileDescriptor(self.json_file_path), + ingestion_properties.database, + ingestion_properties.table, + stream, + None, + 10 * 60, + 3, + ) + if is_managed_streaming: + self.managed_streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties) + else: + self.streaming_ingest_client.ingest_from_blob(blob_descriptor, ingestion_properties) + + await self.assert_rows_added(table, 2, timeout=120) + finally: + await self.drop_table(table)