Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
6e633a2
WIP
spanglerco Aug 4, 2025
0495966
iterating
cameronwaterman Sep 8, 2025
7dcc639
iterating
cameronwaterman Sep 8, 2025
29b9299
Update _data_frame_client.py
cameronwaterman Sep 9, 2025
a9fcfc8
Error handling / tests
cameronwaterman Sep 9, 2025
8c89e58
Update test_append_table_data_polymorphic.py
cameronwaterman Sep 9, 2025
0ec3782
cleanup
cameronwaterman Sep 9, 2025
6feea37
Iterating on tests
cameronwaterman Sep 9, 2025
68a73e2
cleanup
cameronwaterman Sep 10, 2025
01d2efd
Tests talk with real DFS
cameronwaterman Sep 11, 2025
db95df7
Cleanup
cameronwaterman Sep 11, 2025
221dede
Cleanup
cameronwaterman Sep 11, 2025
5eb05c9
Fix formatting and tests for 3.10 (hopefully)
cameronwaterman Sep 11, 2025
579b9d3
Update _data_frame_client.py
cameronwaterman Sep 11, 2025
05605dd
Fix type checking for 3.10
cameronwaterman Sep 11, 2025
5f128b1
Address review feedback
cameronwaterman Sep 17, 2025
b25d53b
Address second round of review feedback
cameronwaterman Sep 22, 2025
e4d77d9
format/lint
cameronwaterman Sep 22, 2025
368ccf0
Attempt to fix 3.9 tag test issue
cameronwaterman Sep 23, 2025
bcab56d
format
cameronwaterman Sep 23, 2025
feb3d43
Revert to using timer
cameronwaterman Sep 24, 2025
435a192
0 -> 1
cameronwaterman Sep 24, 2025
e09a04e
Increase timeout
cameronwaterman Sep 24, 2025
4d8f056
test improvements
cameronwaterman Sep 24, 2025
c3230f3
iterating
cameronwaterman Sep 24, 2025
99f8af6
cleanup
cameronwaterman Sep 24, 2025
6067577
Cleanup example
cameronwaterman Sep 24, 2025
9782bde
Add test assertions for row_count and supports_append
cameronwaterman Sep 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ To install **nisystemlink-clients**, use one of the following methods:

$ python -m easy_install nisystemlink-clients

Optional Arrow (pyarrow) Support
--------------------------------
The base install does not pull in ``pyarrow``. Install the optional extra if you
plan to use Arrow RecordBatch ingestion with ``DataFrameClient.append_table_data``::

$ python -m pip install "nisystemlink-clients[pyarrow]"

Without the extra, Arrow-specific code paths will raise a ``RuntimeError`` when
attempting to append ``pyarrow.RecordBatch`` instances.

.. _usage_section:

Usage
Expand Down
24 changes: 24 additions & 0 deletions docs/api_reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,27 @@ nisystemlink.clients.dataframe
.. automodule:: nisystemlink.clients.dataframe.models
:members:
:imported-members:

Arrow / JSON Ingestion Notes
----------------------------
``append_table_data`` accepts multiple data forms:

* ``AppendTableDataRequest`` (JSON)
* ``DataFrame`` model (JSON)
* Single ``pyarrow.RecordBatch`` (Arrow IPC)
* Iterable of ``pyarrow.RecordBatch`` (Arrow IPC)
* ``None`` with ``end_of_data`` (flush only)

Arrow support is optional and requires installing the ``pyarrow`` extra::

pip install "nisystemlink-clients[pyarrow]"

If ``pyarrow`` is not installed and a RecordBatch (or iterable) is passed, a
``RuntimeError`` is raised. When Arrow is used, the batches are serialized into
an IPC stream and sent with content type
``application/vnd.apache.arrow.stream``; the ``end_of_data`` flag is sent as a
query parameter. JSON ingestion places ``endOfData`` in the request body.

If the target SystemLink DataFrame Service does not yet support Arrow and
responds with HTTP 400, the client raises an explanatory ``ApiException``
advising to upgrade or fall back to JSON ingestion.
50 changes: 44 additions & 6 deletions examples/dataframe/create_write_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import random
from datetime import datetime

try:
import pyarrow as pa # type: ignore
except Exception:
pa = None
from nisystemlink.clients.dataframe import DataFrameClient
from nisystemlink.clients.dataframe.models import (
AppendTableDataRequest,
Expand All @@ -25,12 +29,46 @@
)
)

# Generate example data
frame = DataFrame(
data=[[i, random.random(), datetime.now().isoformat()] for i in range(100)]
# Append via explicit AppendTableDataRequest (JSON)
frame_request = DataFrame(
data=[[i, random.random(), datetime.now().isoformat()] for i in range(3)]
)
client.append_table_data(table_id, AppendTableDataRequest(frame=frame_request))

# Write example data to table
client.append_table_data(
table_id, data=AppendTableDataRequest(frame=frame, endOfData=True)
# Append via DataFrame model directly (JSON)
frame_direct = DataFrame(
data=[[i + 3, random.random(), datetime.now().isoformat()] for i in range(3)]
)
client.append_table_data(table_id, frame_direct)

if pa is not None:
# Append via single RecordBatch (Arrow)
batch_single = pa.record_batch(
[
pa.array([6, 7, 8]),
pa.array([0.1, 0.2, 0.3]),
pa.array([datetime.now().isoformat()] * 3),
],
names=["ix", "Float_Column", "Timestamp_Column"],
)
client.append_table_data(table_id, batch_single)

# Append via iterable of RecordBatches (Arrow)
batch_list = [
pa.record_batch(
[
pa.array([9, 10]),
pa.array([0.4, 0.5]),
pa.array([datetime.now().isoformat(), datetime.now().isoformat()]),
],
names=["ix", "Float_Column", "Timestamp_Column"],
)
]
client.append_table_data(table_id, batch_list)

# Mark end_of_data for the table
# Supply `None` and `end_of_data=True`
client.append_table_data(table_id, None, end_of_data=True)
else:
# If pyarrow not installed, flush via JSON path
client.append_table_data(table_id, None, end_of_data=True)
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ def datetime_to_str(cls, value: datetime.datetime) -> str:
Returns:
The string representation of the timestamp.
"""
return datetime.datetime.utcfromtimestamp(value.timestamp()).isoformat() + "Z"
# Use timezone-aware conversion to avoid deprecated utcfromtimestamp usage and
# preserve exact UTC semantics (value assumed either naive UTC or aware).
if value.tzinfo is None:
value = value.replace(tzinfo=datetime.timezone.utc)
else:
value = value.astimezone(datetime.timezone.utc)
return value.isoformat().replace("+00:00", "Z")

@classmethod
def str_to_datetime(cls, timestamp: str) -> datetime.datetime:
Expand Down
8 changes: 7 additions & 1 deletion nisystemlink/clients/core/_uplink/_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from uplink import (
Body,
commands,
headers,
json,
response_handler as uplink_response_handler,
returns,
Expand All @@ -26,13 +27,18 @@ def post(
path: str,
args: Optional[Sequence[Any]] = None,
return_key: Optional[Union[str, Tuple[str, ...]]] = None,
content_type: Optional[str] = None,
) -> Callable[[F], F]:
"""Annotation for a POST request with a JSON request body. If args is not
specified, defaults to a single argument that represents the request body.
"""

def decorator(func: F) -> F:
result = json(commands.post(path, args=args or (Body,))(func))
result = commands.post(path, args=args or (Body,))(func)
if content_type:
result = headers({"Content-Type": content_type})(result)
else:
result = json(result) # type: ignore
if return_key:
result = returns.json(key=return_key)(result)
return result # type: ignore
Expand Down
178 changes: 171 additions & 7 deletions nisystemlink/clients/dataframe/_data_frame_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
"""Implementation of DataFrameClient."""

from typing import List, Optional

from collections.abc import Iterable
from io import BytesIO
from typing import List, Optional, Union

try:
import pyarrow as pa # type: ignore
except Exception:
pa = None
from nisystemlink.clients import core
from nisystemlink.clients.core._uplink._base_client import BaseClient
from nisystemlink.clients.core._uplink._methods import (
Expand Down Expand Up @@ -250,18 +256,176 @@ def get_table_data(
...

@post("tables/{id}/data", args=[Path, Body])
def append_table_data(self, id: str, data: models.AppendTableDataRequest) -> None:
def _append_table_data_json(
self, id: str, data: models.AppendTableDataRequest
) -> None:
"""Internal uplink-implemented JSON append call."""
...

@post(
"tables/{id}/data",
args=[Path, Body, Query("endOfData")],
content_type="application/vnd.apache.arrow.stream",
)
def _append_table_data_arrow(
self, id: str, data: Iterable[bytes], end_of_data: Optional[bool] = None
) -> None:
"""Internal uplink-implemented Arrow (binary) append call."""
...

def append_table_data(
self,
id: str,
data: Optional[
Union[
models.AppendTableDataRequest,
models.DataFrame,
"pa.RecordBatch", # type: ignore[name-defined]
Iterable["pa.RecordBatch"], # type: ignore[name-defined]
]
],
*,
end_of_data: Optional[bool] = None,
) -> None:
"""Appends one or more rows of data to the table identified by its ID.

Args:
id: Unique ID of a data table.
data: The rows of data to append and any additional options.
data: The data to append. Supported forms:
* AppendTableDataRequest: Sent as-is via JSON; ``end_of_data`` must be ``None``.
* DataFrame (service model): Wrapped into an AppendTableDataRequest (``end_of_data``
optional) and sent as JSON.
* Single pyarrow.RecordBatch: Treated the same as an iterable containing one batch
and streamed as Arrow IPC. ``end_of_data`` (if provided) is sent as a query
parameter.
* Iterable[pyarrow.RecordBatch]: Streamed as Arrow IPC. ``end_of_data`` (if provided)
is sent as a query parameter. If the iterator yields no batches, it is treated
as ``None`` and requires ``end_of_data``.
* None: ``end_of_data`` must be provided; sends JSON containing only the
``endOfData`` flag.
end_of_data: Whether additional rows may be appended in future requests. Required when
``data`` is ``None`` or the RecordBatch iterator is empty; must be omitted when
passing an ``AppendTableDataRequest`` (include it inside that model instead).

Raises:
ApiException: if unable to communicate with the DataFrame Service
or provided an invalid argument.
ValueError: If parameter constraints are violated.
ApiException: If unable to communicate with the DataFrame Service or an
invalid argument is provided.
"""
...
if isinstance(data, models.AppendTableDataRequest):
if end_of_data is not None:
raise ValueError(
"end_of_data must not be provided separately when passing an AppendTableDataRequest."
)
self._append_table_data_json(id, data)
return

if isinstance(data, models.DataFrame):
if end_of_data is None:
request_model = models.AppendTableDataRequest(frame=data)
else:
request_model = models.AppendTableDataRequest(
frame=data, end_of_data=end_of_data
)
self._append_table_data_json(id, request_model)
return

if pa is not None and isinstance(data, pa.RecordBatch):
data = [data]

if isinstance(data, Iterable):
iterator = iter(data)
try:
first_batch = next(iterator)
except StopIteration:
if end_of_data is None:
raise ValueError(
"end_of_data must be provided when data iterator is empty."
)
self._append_table_data_json(
id,
models.AppendTableDataRequest(end_of_data=end_of_data),
)
return

if pa is None:
raise RuntimeError(
"pyarrow is not installed. Install to stream RecordBatches."
)

if not isinstance(first_batch, pa.RecordBatch):
raise ValueError(
"Iterable provided to data must yield pyarrow.RecordBatch objects."
)

def _generate_body() -> Iterable[memoryview]:
batch = first_batch
with BytesIO() as buf:
options = pa.ipc.IpcWriteOptions(compression="zstd")
writer = pa.ipc.new_stream(buf, batch.schema, options=options)

while True:
writer.write_batch(batch)
with buf.getbuffer() as view, view[0 : buf.tell()] as slice:
yield slice
buf.seek(0)
try:
batch = next(iterator)
except StopIteration:
break

writer.close()
with buf.getbuffer() as view, view[0 : buf.tell()] as slice:
yield slice

try:
self._append_table_data_arrow(
id,
_generate_body(),
end_of_data,
)
except core.ApiException as ex:
if ex.http_status_code == 400:
wrap = True
try:
write_op = getattr(
self.api_info().operations, "write_data", None
)
if (
write_op is not None
and getattr(write_op, "version", 0) >= 2
):
wrap = False
except Exception:
pass
if wrap:
raise core.ApiException(
(
"Arrow ingestion request was rejected. The target "
"DataFrame Service doesn't support Arrow streaming. "
"Install a DataFrame Service version with Arrow support "
"or fall back to JSON ingestion."
),
error=ex.error,
http_status_code=ex.http_status_code,
inner=ex,
) from ex
raise
return

if data is None:
if end_of_data is None:
raise ValueError(
"end_of_data must be provided when data is None (no rows to append)."
)
self._append_table_data_json(
id, models.AppendTableDataRequest(end_of_data=end_of_data)
)
return

raise ValueError(
"Unsupported type for data. Expected AppendTableDataRequest, DataFrame, Iterable[RecordBatch], or None."
)

@post("tables/{id}/query-data", args=[Path, Body])
def query_table_data(
Expand Down
Loading