diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 6fe6783..91cafc0 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -1,6 +1,8 @@ """Collect and write time series data to InfluxDB Cloud or InfluxDB OSS.""" # coding: utf-8 +# TODO Remove after this program no longer supports Python 3.8.* +from __future__ import annotations import logging import os import warnings @@ -420,14 +422,15 @@ def _create_batching_pipeline(self) -> tuple[Subject[Any], rx.abc.DisposableBase # Create batch (concatenation line protocols by \n) ops.map(lambda group: group.pipe( # type: ignore ops.to_iterable(), - ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), # type: ignore + ops.map(lambda xs: _BatchItem(key=group.key, data=_body_reduce(xs), size=len(xs))))), + # type: ignore ops.merge_all())), # Write data into InfluxDB (possibility to retry if its fail) ops.filter(lambda batch: batch.size > 0), ops.map(mapper=lambda batch: self._to_response(data=batch, delay=self._jitter_delay())), ops.merge_all()) \ .subscribe(self._on_next, self._on_error, self._on_complete) - + return subject, disposable def flush(self): @@ -453,7 +456,7 @@ def close(self): """Flush data and dispose a batching buffer.""" if self._subject is None: return # Already closed - + self._subject.on_completed() self._subject.dispose() self._subject = None