Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
- `write_dataframe()`: New method for writing pandas and polars DataFrames with explicit parameters (`measurement`, `timestamp_column`, `tags`, `timestamp_timezone`).
- `query_dataframe()`: New method for querying data directly to a pandas or polars DataFrame via the `frame_type` parameter.
- Updated README with clear examples for DataFrame operations.
1. [#179](https://github.com/InfluxCommunity/influxdb3-python/pull/179): Add option to disable gRPC response
compression for Flight queries:
- `disable_grpc_compression` parameter in `InfluxDBClient3` constructor
- `INFLUX_DISABLE_GRPC_COMPRESSION` environment variable support in `from_env()`

### Bug Fixes

Expand Down
29 changes: 26 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,32 @@ print(table.to_pandas().to_markdown())
```

### gRPC compression
The Python client supports gRPC response compression.
If the server chooses to compress query responses (e.g., with gzip), the client
will automatically decompress them — no extra configuration is required.

#### Request compression

Request compression is not supported by InfluxDB 3 — the client sends uncompressed requests.

#### Response compression

Response compression is enabled by default. The client sends the `grpc-accept-encoding: identity, deflate, gzip`
header, and the server returns gzip-compressed responses (if supported). The client automatically
decompresses them — no configuration required.

To **disable response compression**:

```python
# Via constructor parameter
client = InfluxDBClient3(
host="your-host",
token="your-token",
database="your-database",
disable_grpc_compression=True
)

# Or via environment variable
# INFLUX_DISABLE_GRPC_COMPRESSION=true
client = InfluxDBClient3.from_env()
```

## Windows Users
Currently, Windows users require an extra installation when querying via Flight natively. This is due to the fact gRPC cannot locate Windows root certificates. To work around this please follow these steps:
Expand Down
13 changes: 13 additions & 0 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
INFLUX_WRITE_NO_SYNC = "INFLUX_WRITE_NO_SYNC"
INFLUX_WRITE_TIMEOUT = "INFLUX_WRITE_TIMEOUT"
INFLUX_QUERY_TIMEOUT = "INFLUX_QUERY_TIMEOUT"
INFLUX_DISABLE_GRPC_COMPRESSION = "INFLUX_DISABLE_GRPC_COMPRESSION"


def write_client_options(**kwargs):
Expand Down Expand Up @@ -190,6 +191,7 @@ def __init__(
flight_client_options=None,
write_port_overwrite=None,
query_port_overwrite=None,
disable_grpc_compression=False,
**kwargs):
"""
Initialize an InfluxDB client.
Expand All @@ -206,6 +208,8 @@ def __init__(
:type write_client_options: dict[str, any]
:param flight_client_options: dictionary for providing additional arguments for the FlightClient.
:type flight_client_options: dict[str, any]
:param disable_grpc_compression: Disable gRPC compression for Flight query responses. Default is False.
:type disable_grpc_compression: bool
:key auth_scheme: token authentication scheme. Set to "Bearer" for Edge.
:key bool verify_ssl: Set this to false to skip verifying SSL certificate when calling API from https server.
:key str ssl_ca_cert: Set this to customize the certificate file to verify the peer.
Expand Down Expand Up @@ -291,6 +295,8 @@ def __init__(
connection_string = f"grpc+tcp://{hostname}:{port}"

q_opts_builder = QueryApiOptionsBuilder()
if disable_grpc_compression:
q_opts_builder.disable_grpc_compression(True)
if kw_keys.__contains__('ssl_ca_cert'):
q_opts_builder.root_certs(kwargs.get('ssl_ca_cert', None))
if kw_keys.__contains__('verify_ssl'):
Expand Down Expand Up @@ -361,13 +367,20 @@ def from_env(cls, **kwargs: Any) -> 'InfluxDBClient3':
if os.getenv(INFLUX_AUTH_SCHEME) is not None:
kwargs['auth_scheme'] = os.getenv(INFLUX_AUTH_SCHEME)

disable_grpc_compression = os.getenv(INFLUX_DISABLE_GRPC_COMPRESSION)
if disable_grpc_compression is not None:
disable_grpc_compression = disable_grpc_compression.strip().lower() in ['true', '1', 't', 'y', 'yes']
else:
disable_grpc_compression = False

org = os.getenv(INFLUX_ORG, "default")
return InfluxDBClient3(
host=required_vars[INFLUX_HOST],
token=required_vars[INFLUX_TOKEN],
database=required_vars[INFLUX_DATABASE],
write_client_options=write_client_option,
org=org,
disable_grpc_compression=disable_grpc_compression,
**kwargs
)

Expand Down
21 changes: 20 additions & 1 deletion influxdb_client_3/query/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ class QueryApiOptions(object):
proxy (str): URL to a proxy server
flight_client_options (dict): base set of flight client options passed to internal pyarrow.flight.FlightClient
timeout(float): timeout in seconds to wait for a response
disable_grpc_compression (bool): disable gRPC compression for query responses
"""
_DEFAULT_TIMEOUT = 300.0
tls_root_certs: bytes = None
tls_verify: bool = None
proxy: str = None
flight_client_options: dict = None
timeout: float = None
disable_grpc_compression: bool = False

def __init__(self, root_certs_path: str,
verify: bool,
proxy: str,
flight_client_options: dict,
timeout: float = _DEFAULT_TIMEOUT):
timeout: float = _DEFAULT_TIMEOUT,
disable_grpc_compression: bool = False):
"""
Initialize a set of QueryApiOptions

Expand All @@ -41,13 +44,15 @@ def __init__(self, root_certs_path: str,
:param flight_client_options: set of flight_client_options
to be passed to internal pyarrow.flight.FlightClient.
:param timeout: timeout in seconds to wait for a response.
:param disable_grpc_compression: disable gRPC compression for query responses.
"""
if root_certs_path:
self.tls_root_certs = self._read_certs(root_certs_path)
self.tls_verify = verify
self.proxy = proxy
self.flight_client_options = flight_client_options
self.timeout = timeout
self.disable_grpc_compression = disable_grpc_compression

def _read_certs(self, path: str) -> bytes:
with open(path, "rb") as certs_file:
Expand Down Expand Up @@ -75,6 +80,7 @@ class QueryApiOptionsBuilder(object):
_proxy: str = None
_flight_client_options: dict = None
_timeout: float = None
_disable_grpc_compression: bool = False

def root_certs(self, path: str):
self._root_certs_path = path
Expand All @@ -96,6 +102,11 @@ def timeout(self, timeout: float):
self._timeout = timeout
return self

def disable_grpc_compression(self, disable: bool):
"""Disable gRPC compression for query responses."""
self._disable_grpc_compression = disable
return self

def build(self) -> QueryApiOptions:
"""Build a QueryApiOptions object with previously set values"""
return QueryApiOptions(
Expand All @@ -104,6 +115,7 @@ def build(self) -> QueryApiOptions:
proxy=self._proxy,
flight_client_options=self._flight_client_options,
timeout=self._timeout,
disable_grpc_compression=self._disable_grpc_compression,
)


Expand Down Expand Up @@ -162,6 +174,13 @@ def __init__(self,
self._flight_client_options["disable_server_verification"] = not options.tls_verify
if options.timeout is not None:
self._default_timeout = options.timeout
if options.disable_grpc_compression:
# Disable gRPC response compression by only enabling identity algorithm
# Bitset: bit 0 = identity, bit 1 = deflate, bit 2 = gzip
# Setting to 1 (0b001) enables only identity (no compression)
self._flight_client_options["generic_options"].append(
("grpc.compression_enabled_algorithms_bitset", 1)
)
if self._proxy:
self._flight_client_options["generic_options"].append(("grpc.http_proxy", self._proxy))
self._flight_client = FlightClient(connection_string, **self._flight_client_options)
Expand Down
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ def get_version():
'pandas': ['pandas'],
'polars': ['polars'],
'dataframe': ['pandas', 'polars'],
'test': ['pytest', 'pytest-cov', 'pytest-httpserver']
'test': [
'pytest',
'pytest-cov',
'pytest-httpserver',
'h2>=4.0.0,<5.0.0',
'cryptography>=3.4.0',
]
},
install_requires=requires,
python_requires='>=3.8',
Expand Down
68 changes: 68 additions & 0 deletions tests/test_influxdb_client_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,74 @@ def test_parse_invalid_write_timeout_range(self):
with self.assertRaisesRegex(ValueError, ".*Must be non-negative.*"):
InfluxDBClient3.from_env()

def assertGrpcCompressionDisabled(self, client, disabled):
"""Assert whether gRPC compression is disabled for the client."""
self.assertIsInstance(client, InfluxDBClient3)
generic_options = dict(client._query_api._flight_client_options['generic_options'])
if disabled:
self.assertEqual(generic_options.get('grpc.compression_enabled_algorithms_bitset'), 1)
else:
self.assertIsNone(generic_options.get('grpc.compression_enabled_algorithms_bitset'))

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'true'})
def test_from_env_disable_grpc_compression_true(self):
client = InfluxDBClient3.from_env()
self.assertGrpcCompressionDisabled(client, True)

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'TrUe'})
def test_from_env_disable_grpc_compression_true_mixed_case(self):
client = InfluxDBClient3.from_env()
self.assertGrpcCompressionDisabled(client, True)

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': '1'})
def test_from_env_disable_grpc_compression_one(self):
client = InfluxDBClient3.from_env()
self.assertGrpcCompressionDisabled(client, True)

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'false'})
def test_from_env_disable_grpc_compression_false(self):
client = InfluxDBClient3.from_env()
self.assertGrpcCompressionDisabled(client, False)

@patch.dict('os.environ', {'INFLUX_HOST': 'localhost', 'INFLUX_TOKEN': 'test_token',
'INFLUX_DATABASE': 'test_db', 'INFLUX_DISABLE_GRPC_COMPRESSION': 'anything-else'})
def test_from_env_disable_grpc_compression_anything_else_is_false(self):
client = InfluxDBClient3.from_env()
self.assertGrpcCompressionDisabled(client, False)

def test_disable_grpc_compression_parameter_true(self):
client = InfluxDBClient3(
host="localhost",
org="my_org",
database="my_db",
token="my_token",
disable_grpc_compression=True
)
self.assertGrpcCompressionDisabled(client, True)

def test_disable_grpc_compression_parameter_false(self):
client = InfluxDBClient3(
host="localhost",
org="my_org",
database="my_db",
token="my_token",
disable_grpc_compression=False
)
self.assertGrpcCompressionDisabled(client, False)

def test_disable_grpc_compression_default_is_false(self):
client = InfluxDBClient3(
host="localhost",
org="my_org",
database="my_db",
token="my_token",
)
self.assertGrpcCompressionDisabled(client, False)

def test_query_with_arrow_error(self):
f = ErrorFlightServer()
with InfluxDBClient3(f"http://localhost:{f.port}", "my_org", "my_db", "my_token") as c:
Expand Down
Loading
Loading