diff --git a/README.rst b/README.rst index 7db163e9..102baeeb 100644 --- a/README.rst +++ b/README.rst @@ -34,6 +34,16 @@ To install **nisystemlink-clients**, use one of the following methods: $ python -m easy_install nisystemlink-clients +Optional Arrow (pyarrow) Support +-------------------------------- +The base install does not pull in ``pyarrow``. Install the optional extra if you +plan to use Arrow RecordBatch ingestion with ``DataFrameClient.append_table_data``:: + + $ python -m pip install "nisystemlink-clients[pyarrow]" + +Without the extra, Arrow-specific code paths will raise a ``RuntimeError`` when +attempting to append ``pyarrow.RecordBatch`` instances. + .. _usage_section: Usage diff --git a/docs/api_reference/dataframe.rst b/docs/api_reference/dataframe.rst index 27dd7ab9..ebdc9e06 100644 --- a/docs/api_reference/dataframe.rst +++ b/docs/api_reference/dataframe.rst @@ -25,3 +25,27 @@ nisystemlink.clients.dataframe .. automodule:: nisystemlink.clients.dataframe.models :members: :imported-members: + +Arrow / JSON Ingestion Notes +---------------------------- +``append_table_data`` accepts multiple data forms: + +* ``AppendTableDataRequest`` (JSON) +* ``DataFrame`` model (JSON) +* Single ``pyarrow.RecordBatch`` (Arrow IPC) +* Iterable of ``pyarrow.RecordBatch`` (Arrow IPC) +* ``None`` with ``end_of_data`` (flush only) + +Arrow support is optional and requires installing the ``pyarrow`` extra:: + + pip install "nisystemlink-clients[pyarrow]" + +If ``pyarrow`` is not installed and a RecordBatch (or iterable) is passed, a +``RuntimeError`` is raised. When Arrow is used, the batches are serialized into +an IPC stream and sent with content type +``application/vnd.apache.arrow.stream``; the ``end_of_data`` flag is sent as a +query parameter. JSON ingestion places ``endOfData`` in the request body. + +If the target SystemLink DataFrame Service does not yet support Arrow and +responds with HTTP 400, the client raises an explanatory ``ApiException`` +advising to upgrade or fall back to JSON ingestion. diff --git a/examples/dataframe/create_write_data.py b/examples/dataframe/create_write_data.py index a0ef0cd8..03f52029 100644 --- a/examples/dataframe/create_write_data.py +++ b/examples/dataframe/create_write_data.py @@ -1,6 +1,10 @@ import random from datetime import datetime +try: + import pyarrow as pa # type: ignore +except Exception: + pa = None from nisystemlink.clients.dataframe import DataFrameClient from nisystemlink.clients.dataframe.models import ( AppendTableDataRequest, @@ -25,12 +29,46 @@ ) ) -# Generate example data -frame = DataFrame( - data=[[i, random.random(), datetime.now().isoformat()] for i in range(100)] +# Append via explicit AppendTableDataRequest (JSON) +frame_request = DataFrame( + data=[[i, random.random(), datetime.now().isoformat()] for i in range(3)] ) +client.append_table_data(table_id, AppendTableDataRequest(frame=frame_request)) -# Write example data to table -client.append_table_data( - table_id, data=AppendTableDataRequest(frame=frame, endOfData=True) +# Append via DataFrame model directly (JSON) +frame_direct = DataFrame( + data=[[i + 3, random.random(), datetime.now().isoformat()] for i in range(3)] ) +client.append_table_data(table_id, frame_direct) + +if pa is not None: + # Append via single RecordBatch (Arrow) + batch_single = pa.record_batch( + [ + pa.array([6, 7, 8]), + pa.array([0.1, 0.2, 0.3]), + pa.array([datetime.now().isoformat()] * 3), + ], + names=["ix", "Float_Column", "Timestamp_Column"], + ) + client.append_table_data(table_id, batch_single) + + # Append via iterable of RecordBatches (Arrow) + batch_list = [ + pa.record_batch( + [ + pa.array([9, 10]), + pa.array([0.4, 0.5]), + pa.array([datetime.now().isoformat(), datetime.now().isoformat()]), + ], + names=["ix", "Float_Column", "Timestamp_Column"], + ) + ] + client.append_table_data(table_id, batch_list) + + # Mark end_of_data for the table + # Supply `None` and `end_of_data=True` + client.append_table_data(table_id, None, end_of_data=True) +else: + # If pyarrow not installed, flush via JSON path + client.append_table_data(table_id, None, end_of_data=True) diff --git a/nisystemlink/clients/core/_internal/_timestamp_utilities.py b/nisystemlink/clients/core/_internal/_timestamp_utilities.py index a5cd9ec1..8eb84af9 100644 --- a/nisystemlink/clients/core/_internal/_timestamp_utilities.py +++ b/nisystemlink/clients/core/_internal/_timestamp_utilities.py @@ -30,7 +30,13 @@ def datetime_to_str(cls, value: datetime.datetime) -> str: Returns: The string representation of the timestamp. """ - return datetime.datetime.utcfromtimestamp(value.timestamp()).isoformat() + "Z" + # Use timezone-aware conversion to avoid deprecated utcfromtimestamp usage and + # preserve exact UTC semantics (value assumed either naive UTC or aware). + if value.tzinfo is None: + value = value.replace(tzinfo=datetime.timezone.utc) + else: + value = value.astimezone(datetime.timezone.utc) + return value.isoformat().replace("+00:00", "Z") @classmethod def str_to_datetime(cls, timestamp: str) -> datetime.datetime: diff --git a/nisystemlink/clients/core/_uplink/_methods.py b/nisystemlink/clients/core/_uplink/_methods.py index 6cd187f0..bc78d15d 100644 --- a/nisystemlink/clients/core/_uplink/_methods.py +++ b/nisystemlink/clients/core/_uplink/_methods.py @@ -5,6 +5,7 @@ from uplink import ( Body, commands, + headers, json, response_handler as uplink_response_handler, returns, @@ -26,13 +27,18 @@ def post( path: str, args: Optional[Sequence[Any]] = None, return_key: Optional[Union[str, Tuple[str, ...]]] = None, + content_type: Optional[str] = None, ) -> Callable[[F], F]: """Annotation for a POST request with a JSON request body. If args is not specified, defaults to a single argument that represents the request body. """ def decorator(func: F) -> F: - result = json(commands.post(path, args=args or (Body,))(func)) + result = commands.post(path, args=args or (Body,))(func) + if content_type: + result = headers({"Content-Type": content_type})(result) + else: + result = json(result) # type: ignore if return_key: result = returns.json(key=return_key)(result) return result # type: ignore diff --git a/nisystemlink/clients/dataframe/_data_frame_client.py b/nisystemlink/clients/dataframe/_data_frame_client.py index 8b976985..5c6ac261 100644 --- a/nisystemlink/clients/dataframe/_data_frame_client.py +++ b/nisystemlink/clients/dataframe/_data_frame_client.py @@ -1,7 +1,13 @@ """Implementation of DataFrameClient.""" -from typing import List, Optional - +from collections.abc import Iterable +from io import BytesIO +from typing import List, Optional, Union + +try: + import pyarrow as pa # type: ignore +except Exception: + pa = None from nisystemlink.clients import core from nisystemlink.clients.core._uplink._base_client import BaseClient from nisystemlink.clients.core._uplink._methods import ( @@ -250,18 +256,176 @@ def get_table_data( ... @post("tables/{id}/data", args=[Path, Body]) - def append_table_data(self, id: str, data: models.AppendTableDataRequest) -> None: + def _append_table_data_json( + self, id: str, data: models.AppendTableDataRequest + ) -> None: + """Internal uplink-implemented JSON append call.""" + ... + + @post( + "tables/{id}/data", + args=[Path, Body, Query("endOfData")], + content_type="application/vnd.apache.arrow.stream", + ) + def _append_table_data_arrow( + self, id: str, data: Iterable[bytes], end_of_data: Optional[bool] = None + ) -> None: + """Internal uplink-implemented Arrow (binary) append call.""" + ... + + def append_table_data( + self, + id: str, + data: Optional[ + Union[ + models.AppendTableDataRequest, + models.DataFrame, + "pa.RecordBatch", # type: ignore[name-defined] + Iterable["pa.RecordBatch"], # type: ignore[name-defined] + ] + ], + *, + end_of_data: Optional[bool] = None, + ) -> None: """Appends one or more rows of data to the table identified by its ID. Args: id: Unique ID of a data table. - data: The rows of data to append and any additional options. + data: The data to append. Supported forms: + * AppendTableDataRequest: Sent as-is via JSON; ``end_of_data`` must be ``None``. + * DataFrame (service model): Wrapped into an AppendTableDataRequest (``end_of_data`` + optional) and sent as JSON. + * Single pyarrow.RecordBatch: Treated the same as an iterable containing one batch + and streamed as Arrow IPC. ``end_of_data`` (if provided) is sent as a query + parameter. + * Iterable[pyarrow.RecordBatch]: Streamed as Arrow IPC. ``end_of_data`` (if provided) + is sent as a query parameter. If the iterator yields no batches, it is treated + as ``None`` and requires ``end_of_data``. + * None: ``end_of_data`` must be provided; sends JSON containing only the + ``endOfData`` flag. + end_of_data: Whether additional rows may be appended in future requests. Required when + ``data`` is ``None`` or the RecordBatch iterator is empty; must be omitted when + passing an ``AppendTableDataRequest`` (include it inside that model instead). Raises: - ApiException: if unable to communicate with the DataFrame Service - or provided an invalid argument. + ValueError: If parameter constraints are violated. + ApiException: If unable to communicate with the DataFrame Service or an + invalid argument is provided. """ - ... + if isinstance(data, models.AppendTableDataRequest): + if end_of_data is not None: + raise ValueError( + "end_of_data must not be provided separately when passing an AppendTableDataRequest." + ) + self._append_table_data_json(id, data) + return + + if isinstance(data, models.DataFrame): + if end_of_data is None: + request_model = models.AppendTableDataRequest(frame=data) + else: + request_model = models.AppendTableDataRequest( + frame=data, end_of_data=end_of_data + ) + self._append_table_data_json(id, request_model) + return + + if pa is not None and isinstance(data, pa.RecordBatch): + data = [data] + + if isinstance(data, Iterable): + iterator = iter(data) + try: + first_batch = next(iterator) + except StopIteration: + if end_of_data is None: + raise ValueError( + "end_of_data must be provided when data iterator is empty." + ) + self._append_table_data_json( + id, + models.AppendTableDataRequest(end_of_data=end_of_data), + ) + return + + if pa is None: + raise RuntimeError( + "pyarrow is not installed. Install to stream RecordBatches." + ) + + if not isinstance(first_batch, pa.RecordBatch): + raise ValueError( + "Iterable provided to data must yield pyarrow.RecordBatch objects." + ) + + def _generate_body() -> Iterable[memoryview]: + batch = first_batch + with BytesIO() as buf: + options = pa.ipc.IpcWriteOptions(compression="zstd") + writer = pa.ipc.new_stream(buf, batch.schema, options=options) + + while True: + writer.write_batch(batch) + with buf.getbuffer() as view, view[0 : buf.tell()] as slice: + yield slice + buf.seek(0) + try: + batch = next(iterator) + except StopIteration: + break + + writer.close() + with buf.getbuffer() as view, view[0 : buf.tell()] as slice: + yield slice + + try: + self._append_table_data_arrow( + id, + _generate_body(), + end_of_data, + ) + except core.ApiException as ex: + if ex.http_status_code == 400: + wrap = True + try: + write_op = getattr( + self.api_info().operations, "write_data", None + ) + if ( + write_op is not None + and getattr(write_op, "version", 0) >= 2 + ): + wrap = False + except Exception: + pass + if wrap: + raise core.ApiException( + ( + "Arrow ingestion request was rejected. The target " + "DataFrame Service doesn't support Arrow streaming. " + "Install a DataFrame Service version with Arrow support " + "or fall back to JSON ingestion." + ), + error=ex.error, + http_status_code=ex.http_status_code, + inner=ex, + ) from ex + raise + return + + if data is None: + if end_of_data is None: + raise ValueError( + "end_of_data must be provided when data is None (no rows to append)." + ) + self._append_table_data_json( + id, models.AppendTableDataRequest(end_of_data=end_of_data) + ) + return + + raise ValueError( + "Unsupported type for data. Expected AppendTableDataRequest, DataFrame, Iterable[RecordBatch], or None." + ) @post("tables/{id}/query-data", args=[Path, Body]) def query_table_data( diff --git a/poetry.lock b/poetry.lock index a4aeb10b..e6c4e8b5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -774,6 +774,63 @@ tomli = ">=1.2.2" [package.extras] poetry-plugin = ["poetry (>=1.0,<2.0)"] +[[package]] +name = "pyarrow" +version = "21.0.0" +description = "Python library for Apache Arrow" +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"pyarrow\"" +files = [ + {file = "pyarrow-21.0.0-cp310-cp310-macosx_12_0_arm64.whl", hash = "sha256:e563271e2c5ff4d4a4cbeb2c83d5cf0d4938b891518e676025f7268c6fe5fe26"}, + {file = "pyarrow-21.0.0-cp310-cp310-macosx_12_0_x86_64.whl", hash = "sha256:fee33b0ca46f4c85443d6c450357101e47d53e6c3f008d658c27a2d020d44c79"}, + {file = "pyarrow-21.0.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:7be45519b830f7c24b21d630a31d48bcebfd5d4d7f9d3bdb49da9cdf6d764edb"}, + {file = "pyarrow-21.0.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:26bfd95f6bff443ceae63c65dc7e048670b7e98bc892210acba7e4995d3d4b51"}, + {file = "pyarrow-21.0.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:bd04ec08f7f8bd113c55868bd3fc442a9db67c27af098c5f814a3091e71cc61a"}, + {file = "pyarrow-21.0.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:9b0b14b49ac10654332a805aedfc0147fb3469cbf8ea951b3d040dab12372594"}, + {file = "pyarrow-21.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:9d9f8bcb4c3be7738add259738abdeddc363de1b80e3310e04067aa1ca596634"}, + {file = "pyarrow-21.0.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:c077f48aab61738c237802836fc3844f85409a46015635198761b0d6a688f87b"}, + {file = "pyarrow-21.0.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:689f448066781856237eca8d1975b98cace19b8dd2ab6145bf49475478bcaa10"}, + {file = "pyarrow-21.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:479ee41399fcddc46159a551705b89c05f11e8b8cb8e968f7fec64f62d91985e"}, + {file = "pyarrow-21.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:40ebfcb54a4f11bcde86bc586cbd0272bac0d516cfa539c799c2453768477569"}, + {file = "pyarrow-21.0.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8d58d8497814274d3d20214fbb24abcad2f7e351474357d552a8d53bce70c70e"}, + {file = "pyarrow-21.0.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:585e7224f21124dd57836b1530ac8f2df2afc43c861d7bf3d58a4870c42ae36c"}, + {file = "pyarrow-21.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:555ca6935b2cbca2c0e932bedd853e9bc523098c39636de9ad4693b5b1df86d6"}, + {file = "pyarrow-21.0.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:3a302f0e0963db37e0a24a70c56cf91a4faa0bca51c23812279ca2e23481fccd"}, + {file = "pyarrow-21.0.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:b6b27cf01e243871390474a211a7922bfbe3bda21e39bc9160daf0da3fe48876"}, + {file = "pyarrow-21.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:e72a8ec6b868e258a2cd2672d91f2860ad532d590ce94cdf7d5e7ec674ccf03d"}, + {file = "pyarrow-21.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b7ae0bbdc8c6674259b25bef5d2a1d6af5d39d7200c819cf99e07f7dfef1c51e"}, + {file = "pyarrow-21.0.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:58c30a1729f82d201627c173d91bd431db88ea74dcaa3885855bc6203e433b82"}, + {file = "pyarrow-21.0.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:072116f65604b822a7f22945a7a6e581cfa28e3454fdcc6939d4ff6090126623"}, + {file = "pyarrow-21.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:cf56ec8b0a5c8c9d7021d6fd754e688104f9ebebf1bf4449613c9531f5346a18"}, + {file = "pyarrow-21.0.0-cp313-cp313-macosx_12_0_arm64.whl", hash = "sha256:e99310a4ebd4479bcd1964dff9e14af33746300cb014aa4a3781738ac63baf4a"}, + {file = "pyarrow-21.0.0-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:d2fe8e7f3ce329a71b7ddd7498b3cfac0eeb200c2789bd840234f0dc271a8efe"}, + {file = "pyarrow-21.0.0-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:f522e5709379d72fb3da7785aa489ff0bb87448a9dc5a75f45763a795a089ebd"}, + {file = "pyarrow-21.0.0-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:69cbbdf0631396e9925e048cfa5bce4e8c3d3b41562bbd70c685a8eb53a91e61"}, + {file = "pyarrow-21.0.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:731c7022587006b755d0bdb27626a1a3bb004bb56b11fb30d98b6c1b4718579d"}, + {file = "pyarrow-21.0.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:dc56bc708f2d8ac71bd1dcb927e458c93cec10b98eb4120206a4091db7b67b99"}, + {file = "pyarrow-21.0.0-cp313-cp313-win_amd64.whl", hash = "sha256:186aa00bca62139f75b7de8420f745f2af12941595bbbfa7ed3870ff63e25636"}, + {file = "pyarrow-21.0.0-cp313-cp313t-macosx_12_0_arm64.whl", hash = "sha256:a7a102574faa3f421141a64c10216e078df467ab9576684d5cd696952546e2da"}, + {file = "pyarrow-21.0.0-cp313-cp313t-macosx_12_0_x86_64.whl", hash = "sha256:1e005378c4a2c6db3ada3ad4c217b381f6c886f0a80d6a316fe586b90f77efd7"}, + {file = "pyarrow-21.0.0-cp313-cp313t-manylinux_2_28_aarch64.whl", hash = "sha256:65f8e85f79031449ec8706b74504a316805217b35b6099155dd7e227eef0d4b6"}, + {file = "pyarrow-21.0.0-cp313-cp313t-manylinux_2_28_x86_64.whl", hash = "sha256:3a81486adc665c7eb1a2bde0224cfca6ceaba344a82a971ef059678417880eb8"}, + {file = "pyarrow-21.0.0-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:fc0d2f88b81dcf3ccf9a6ae17f89183762c8a94a5bdcfa09e05cfe413acf0503"}, + {file = "pyarrow-21.0.0-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:6299449adf89df38537837487a4f8d3bd91ec94354fdd2a7d30bc11c48ef6e79"}, + {file = "pyarrow-21.0.0-cp313-cp313t-win_amd64.whl", hash = "sha256:222c39e2c70113543982c6b34f3077962b44fca38c0bd9e68bb6781534425c10"}, + {file = "pyarrow-21.0.0-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:a7f6524e3747e35f80744537c78e7302cd41deee8baa668d56d55f77d9c464b3"}, + {file = "pyarrow-21.0.0-cp39-cp39-macosx_12_0_x86_64.whl", hash = "sha256:203003786c9fd253ebcafa44b03c06983c9c8d06c3145e37f1b76a1f317aeae1"}, + {file = "pyarrow-21.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:3b4d97e297741796fead24867a8dabf86c87e4584ccc03167e4a811f50fdf74d"}, + {file = "pyarrow-21.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:898afce396b80fdda05e3086b4256f8677c671f7b1d27a6976fa011d3fd0a86e"}, + {file = "pyarrow-21.0.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:067c66ca29aaedae08218569a114e413b26e742171f526e828e1064fcdec13f4"}, + {file = "pyarrow-21.0.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:0c4e75d13eb76295a49e0ea056eb18dbd87d81450bfeb8afa19a7e5a75ae2ad7"}, + {file = "pyarrow-21.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:cdc4c17afda4dab2a9c0b79148a43a7f4e1094916b3e18d8975bfd6d6d52241f"}, + {file = "pyarrow-21.0.0.tar.gz", hash = "sha256:5051f2dccf0e283ff56335760cbc8622cf52264d67e359d5569541ac11b6d5bc"}, +] + +[package.extras] +test = ["cffi", "hypothesis", "pandas", "pytest", "pytz"] + [[package]] name = "pycodestyle" version = "2.9.1" @@ -1395,7 +1452,10 @@ h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] +[extras] +pyarrow = ["pyarrow"] + [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "809134922cb291c65292513ace5efb024a705e81bd0be88c69cdc66efa120071" +content-hash = "6c8dde41cc84db4467014785da6deb10f5edbe3d646c9015dbc9638bdf0ead53" diff --git a/pyproject.toml b/pyproject.toml index b99911e2..7c18dc42 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,10 @@ uplink = [ pydantic = "^2.11.3" pyyaml = "^6.0.1" pandas = "^2.1.0" +pyarrow = { version = "^21.0.0", optional = true } + +[tool.poetry.extras] +pyarrow = ["pyarrow"] [tool.poetry.group.dev.dependencies] black = ">=22.10,<25.0" diff --git a/tests/integration/dataframe/test_dataframe.py b/tests/integration/dataframe/test_dataframe.py index 25c1cd93..4596297c 100644 --- a/tests/integration/dataframe/test_dataframe.py +++ b/tests/integration/dataframe/test_dataframe.py @@ -87,6 +87,19 @@ def test_tables(create_table): @pytest.mark.enterprise @pytest.mark.integration class TestDataFrame: + def _new_single_int_table(self, create_table, column_name: str = "a") -> str: + return create_table( + CreateTableRequest( + columns=[ + Column( + name=column_name, + data_type=DataType.Int64, + column_type=ColumnType.Index, + ) + ] + ) + ) + def test__api_info__returns(self, client): response = client.api_info() @@ -615,3 +628,239 @@ def test__export_table_data__works(self, client: DataFrameClient, create_table): response.read() == b'"col1","col2","col3"\r\n1,2.5,6.5\r\n2,1.5,5.5\r\n3,2.5,7.5' ) + + def test__append_table_data__append_request_success( + self, client: DataFrameClient, create_table + ): + table_id = self._new_single_int_table(create_table) + frame = DataFrame(columns=["a"], data=[["1"], ["2"]]) + client.append_table_data( + table_id, AppendTableDataRequest(frame=frame, end_of_data=True) + ) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 2 + assert metadata.supports_append is False + + def test__append_table_data__append_request_with_end_of_data_argument_disallowed( + self, client: DataFrameClient, create_table + ): + request = AppendTableDataRequest(end_of_data=True) + with pytest.raises( + ValueError, + match="end_of_data must not be provided separately when passing an AppendTableDataRequest.", + ): + client.append_table_data( + self._new_single_int_table(create_table), request, end_of_data=True + ) + + def test__append_table_data__append_request_without_end_of_data_success( + self, client: DataFrameClient, create_table + ): + table_id = self._new_single_int_table(create_table) + frame = DataFrame(columns=["a"], data=[["7"], ["8"]]) + client.append_table_data(table_id, AppendTableDataRequest(frame=frame)) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 2 + assert metadata.supports_append is True + + def test__append_table_data__accepts_dataframe_model( + self, client: DataFrameClient, create_table + ): + table_id = self._new_single_int_table(create_table) + frame = DataFrame(columns=["a"], data=[["1"], ["2"]]) + client.append_table_data(table_id, frame, end_of_data=True) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 2 + assert metadata.supports_append is False + + def test__append_table_data__dataframe_without_end_of_data_success( + self, client: DataFrameClient, create_table + ): + table_id = self._new_single_int_table(create_table) + frame = DataFrame(columns=["a"], data=[["10"], ["11"]]) + client.append_table_data(table_id, frame) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 2 + assert metadata.supports_append is True + + def test__append_table_data__none_without_end_of_data_raises( + self, client: DataFrameClient, create_table + ): + table_id = create_table(basic_table_model) + with pytest.raises( + ValueError, match="end_of_data must be provided when data is None" + ): + client.append_table_data(table_id, None) + + def test__append_table_data__flush_only_with_none( + self, client: DataFrameClient, create_table + ): + table_id = self._new_single_int_table(create_table) + client.append_table_data(table_id, None, end_of_data=True) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 0 + assert metadata.supports_append is False + + def test__append_table_data__arrow_ingestion_success( + self, client: DataFrameClient, create_table + ): + pa = pytest.importorskip("pyarrow") + table_id = self._new_single_int_table(create_table) + batch = pa.record_batch([pa.array([10, 11, 12])], names=["a"]) + client.append_table_data(table_id, [batch], end_of_data=True) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 3 + assert metadata.supports_append is False + with pytest.raises(ApiException): + client.append_table_data(table_id, None, end_of_data=True) + + def test__append_table_data__single_recordbatch_success( + self, client: DataFrameClient, create_table + ): + pa = pytest.importorskip("pyarrow") + table_id = self._new_single_int_table(create_table) + batch = pa.record_batch([pa.array([1, 2, 3])], names=["a"]) + client.append_table_data(table_id, batch, end_of_data=True) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 3 + assert metadata.supports_append is False + with pytest.raises(ApiException): + client.append_table_data(table_id, None, end_of_data=True) + + def test__append_table_data__arrow_ingestion_with_end_of_data_query_param_false( + self, client: DataFrameClient, create_table + ): + pa = pytest.importorskip("pyarrow") + table_id = self._new_single_int_table(create_table) + batch1 = pa.record_batch([pa.array([4, 5, 6])], names=["a"]) + client.append_table_data(table_id, [batch1], end_of_data=False) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 3 + assert metadata.supports_append is True + batch2 = pa.record_batch([pa.array([7, 8])], names=["a"]) + client.append_table_data(table_id, [batch2], end_of_data=True) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 5 + assert metadata.supports_append is False + + def test__append_table_data__empty_iterator_requires_end_of_data( + self, client: DataFrameClient, create_table + ): + table_id = create_table(basic_table_model) + with pytest.raises( + ValueError, + match="end_of_data must be provided when data iterator is empty.", + ): + client.append_table_data(table_id, []) + client.append_table_data(table_id, [], end_of_data=True) + metadata = client.get_table_metadata(table_id) + assert metadata.row_count == 0 + assert metadata.supports_append is False + + def test__append_table_data__arrow_iterable_with_non_recordbatch_elements_raises( + self, client: DataFrameClient, create_table + ): + pytest.importorskip("pyarrow") + table_id = create_table(basic_table_model) + with pytest.raises( + ValueError, + match="Iterable provided to data must yield pyarrow.RecordBatch objects.", + ): + client.append_table_data(table_id, [1, 2, 3]) + + def test__append_table_data__arrow_iterable_without_pyarrow_raises_runtime_error( + self, client: DataFrameClient, create_table, monkeypatch + ): + import nisystemlink.clients.dataframe._data_frame_client as df_module + + monkeypatch.setattr(df_module, "pa", None) + table_id = create_table(basic_table_model) + with pytest.raises( + RuntimeError, + match="pyarrow is not installed. Install to stream RecordBatches.", + ): + client.append_table_data(table_id, [object()]) + + def test__append_table_data__arrow_ingestion_400_unsupported( + self, client: DataFrameClient + ): + pa = pytest.importorskip("pyarrow") + table_id = "mock_table_id" + bad_batch = pa.record_batch([pa.array([1, 2, 3])], names=["b"]) + api_info_json = { + "operations": { + "create_tables": {"available": True, "version": 1}, + "delete_tables": {"available": True, "version": 1}, + "modify_metadata": {"available": True, "version": 1}, + "list_tables": {"available": True, "version": 1}, + "read_data": {"available": True, "version": 3}, + "write_data": {"available": True, "version": 1}, + } + } + + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + f"{client.session.base_url}", + json=api_info_json, + ) + rsps.add( + responses.POST, + f"{client.session.base_url}tables/{table_id}/data", + status=400, + ) + with pytest.raises(ApiException) as excinfo: + client.append_table_data(table_id, [bad_batch], end_of_data=True) + + assert "Arrow ingestion request was rejected" in str(excinfo.value) + + def test__append_table_data__arrow_ingestion_400_supported_passthrough( + self, client: DataFrameClient + ): + pa = pytest.importorskip("pyarrow") + table_id = "mock_table_id" + bad_batch = pa.record_batch([pa.array([1, 2, 3])], names=["b"]) + api_info_json = { + "operations": { + "create_tables": {"available": True, "version": 1}, + "delete_tables": {"available": True, "version": 1}, + "modify_metadata": {"available": True, "version": 1}, + "list_tables": {"available": True, "version": 1}, + "read_data": {"available": True, "version": 3}, + "write_data": {"available": True, "version": 2}, + } + } + + with responses.RequestsMock() as rsps: + rsps.add( + responses.GET, + f"{client.session.base_url}", + json=api_info_json, + ) + rsps.add( + responses.POST, + f"{client.session.base_url}tables/{table_id}/data", + status=400, + ) + with pytest.raises(ApiException) as excinfo: + client.append_table_data(table_id, [bad_batch], end_of_data=True) + + assert "Arrow ingestion request was rejected" not in str(excinfo.value) + + def test__append_table_data__arrow_ingestion_non_400_passthrough( + self, client: DataFrameClient + ): + pa = pytest.importorskip("pyarrow") + batch = pa.record_batch([pa.array([1, 2, 3])], names=["a"]) + with pytest.raises(ApiException) as excinfo: + client.append_table_data( + "111111111111111111111111", [batch], end_of_data=True + ) + assert "Arrow ingestion request was rejected" not in str(excinfo.value) + + def test__append_table_data__unsupported_type_raises( + self, client: DataFrameClient, create_table + ): + table_id = create_table(basic_table_model) + with pytest.raises(ValueError, match="Unsupported type"): + client.append_table_data(table_id, 123) # type: ignore[arg-type] diff --git a/tests/tag/test_tagmanager.py b/tests/tag/test_tagmanager.py index 19632a08..56bb55be 100644 --- a/tests/tag/test_tagmanager.py +++ b/tests/tag/test_tagmanager.py @@ -12,6 +12,31 @@ from ..anyorderlist import AnyOrderList +def _wait_for_call_count(mock_obj, expected: int, *, timeout: float = 3.0): + """Wait until mock_obj.call_count >= expected or timeout. + + Args: + mock_obj: Mock with call_count attribute. + expected: Target call count (>=). + timeout: Max seconds to wait. + + Returns: + Elapsed seconds when condition satisfied. + + Raises: + AssertionError on timeout or premature satisfaction. + """ + start = time.monotonic() + while True: + if mock_obj.call_count >= expected: + return time.monotonic() - start + if time.monotonic() - start >= timeout: + raise AssertionError( + f"Timed out waiting for call_count >= {expected}. Final={mock_obj.call_count} timeout={timeout}s" + ) + time.sleep(0.01) + + class TestTagManager(HttpClientTestBase): def setup_method(self, method): super().setup_method(method) @@ -2350,7 +2375,11 @@ def test__create_writer_with_buffer_size__sends_when_buffer_full(self): writer.write(path, tbase.DataType.INT32, value1, timestamp=timestamp) writer.write(path, tbase.DataType.INT32, value2, timestamp=timestamp) - utctime = datetime.utcfromtimestamp(timestamp.timestamp()).isoformat() + "Z" + utctime = ( + datetime.fromtimestamp(timestamp.timestamp(), timezone.utc) + .isoformat() + .replace("+00:00", "Z") + ) self._client.all_requests.assert_called_once_with( "POST", "/nitag/v2/update-current-values", @@ -2375,7 +2404,10 @@ def test__create_writer_with_buffer_size__sends_when_buffer_full(self): def test__create_writer_with_buffer_time__sends_when_timer_elapsed(self): path = "tag" value = 1 - writer = self._uut.create_writer(max_buffer_time=timedelta(milliseconds=50)) + buffer_ms = 50 + writer = self._uut.create_writer( + max_buffer_time=timedelta(milliseconds=buffer_ms) + ) timestamp = datetime.now() self._client.all_requests.configure_mock( side_effect=self._get_mock_request([None]) @@ -2383,12 +2415,16 @@ def test__create_writer_with_buffer_time__sends_when_timer_elapsed(self): writer.write(path, tbase.DataType.INT32, value, timestamp=timestamp) self._client.all_requests.assert_not_called() - for i in range(100): - if self._client.all_requests.call_count > 0: - break - time.sleep(0.01) - utctime = datetime.utcfromtimestamp(timestamp.timestamp()).isoformat() + "Z" + time.sleep(buffer_ms / 1000 * 2.5) + if self._client.all_requests.call_count == 0: + _wait_for_call_count(self._client.all_requests, 1, timeout=5.0) + + utctime = ( + datetime.fromtimestamp(timestamp.timestamp(), timezone.utc) + .isoformat() + .replace("+00:00", "Z") + ) self._client.all_requests.assert_called_once_with( "POST", "/nitag/v2/update-current-values", @@ -2414,8 +2450,9 @@ def test__create_writer_with_buffer_size_and_timer__obeys_both_settings(self): writer1 = self._uut.create_writer( buffer_size=2, max_buffer_time=timedelta(minutes=1) ) + buffer_ms = 50 writer2 = self._uut.create_writer( - buffer_size=2, max_buffer_time=timedelta(milliseconds=50) + buffer_size=2, max_buffer_time=timedelta(milliseconds=buffer_ms) ) timestamp = datetime.now() self._client.all_requests.configure_mock( @@ -2424,8 +2461,11 @@ def test__create_writer_with_buffer_size_and_timer__obeys_both_settings(self): writer1.write(path, tbase.DataType.INT32, value1, timestamp=timestamp) writer1.write(path, tbase.DataType.INT32, value2, timestamp=timestamp) - - utctime = datetime.utcfromtimestamp(timestamp.timestamp()).isoformat() + "Z" + utctime = ( + datetime.fromtimestamp(timestamp.timestamp(), timezone.utc) + .isoformat() + .replace("+00:00", "Z") + ) self._client.all_requests.assert_called_once_with( "POST", "/nitag/v2/update-current-values", @@ -2449,10 +2489,10 @@ def test__create_writer_with_buffer_size_and_timer__obeys_both_settings(self): writer2.write(path, tbase.DataType.INT32, value3, timestamp=timestamp) assert 1 == self._client.all_requests.call_count # same as before - for i in range(100): - if self._client.all_requests.call_count > 1: - break - time.sleep(0.01) + + time.sleep(buffer_ms / 1000 * 2.5) + if self._client.all_requests.call_count == 1: + _wait_for_call_count(self._client.all_requests, 2, timeout=5.0) assert 2 == self._client.all_requests.call_count assert self._client.all_requests.call_args_list[1] == mock.call( @@ -2486,7 +2526,7 @@ def test__read_with_timestamp_and_aggregates__retrieves_all_data_from_server(sel path = "test" value = "success" now = datetime.now(timezone.utc) - utctime = datetime.utcfromtimestamp(now.timestamp()).isoformat() + "Z" + utctime = now.isoformat().replace("+00:00", "Z") self._client.all_requests.configure_mock( side_effect=self._get_mock_request( [ @@ -2554,7 +2594,7 @@ def test__read_with_timestamp__does_not_query_aggregates(self): path = "test" value = "success" now = datetime.now(timezone.utc) - utctime = datetime.utcfromtimestamp(now.timestamp()).isoformat() + "Z" + utctime = now.isoformat().replace("+00:00", "Z") self._client.all_requests.configure_mock( side_effect=self._get_mock_request( [ @@ -2669,7 +2709,7 @@ async def test__read_async_with_timestamp_and_aggregates__retrieves_all_data_fro path = "test" value = "success" now = datetime.now(timezone.utc) - utctime = datetime.utcfromtimestamp(now.timestamp()).isoformat() + "Z" + utctime = now.isoformat().replace("+00:00", "Z") self._client.all_requests.configure_mock( side_effect=self._get_mock_request( [ @@ -2743,7 +2783,7 @@ async def test__read_async_with_timestamp__does_not_query_aggregates(self): path = "test" value = "success" now = datetime.now(timezone.utc) - utctime = datetime.utcfromtimestamp(now.timestamp()).isoformat() + "Z" + utctime = now.isoformat().replace("+00:00", "Z") self._client.all_requests.configure_mock( side_effect=self._get_mock_request( [