Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 0.12.0 [unreleased]

### Bug Fixes

1. [#121](https://github.com/InfluxCommunity/influxdb3-python/pull/121): Fix use of arguments `verify_ssl` and `ssl_ca_cert` in `QueryApi`.

## 0.11.0 [2025-02-27]

### Bug Fixes
Expand Down
104 changes: 104 additions & 0 deletions Examples/basic_ssl_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import time

import pyarrow

from config import Config
from influxdb_client_3 import InfluxDBClient3

bad_cert = """-----BEGIN CERTIFICATE-----
MIIFDTCCAvWgAwIBAgIUYzpfisy9xLrhiZd+D9vOdzC3+iswDQYJKoZIhvcNAQEL
BQAwFjEUMBIGA1UEAwwLdGVzdGhvc3QuaW8wHhcNMjUwMjI4MTM1NTMyWhcNMzUw
MjI2MTM1NTMyWjAWMRQwEgYDVQQDDAt0ZXN0aG9zdC5pbzCCAiIwDQYJKoZIhvcN
AQEBBQADggIPADCCAgoCggIBAN1lwqXYP8UMvjb56SpUEj2OpoEDRfLeWrEiHkOl
xoymvJGaXZNEpDXo2TTdysCoYWEjz9IY6GlqSo2Yssf5BZkQwMOw7MdyRwCigzrh
OAKbyCfsvEgfNFrXEdSDpaxW++5SToeErudYXc+sBfnI1NB4W3GBGqqIvx8fqaB3
1EU9ql2sKKxI0oYIQD/If9rQEyLFKeWdD8iT6YST1Vugkvd34NPmaqV5+pjdSb4z
a8olavwUoslqFUeILqIq+WZZbOlgCcJYKcBAmELRnsxGaABRtMwMZx+0D+oKo4Kl
QQtOcER+RHkBHyYFghZIBnzudfbP9NadknOz3AilJbJolXfXJqeQhRD8Ob49kkhe
OwjAppHnaZGWjYZMLIfnwwXBwkS7bSwF16Wot83cpL46Xvg6xcl12An4JaoF798Q
cXyYrWCgvbqjVR7694gxqLGzk138AKTDSbER1h1rfqCqkk7soE0oWCs7jiCk2XvD
49qVfHtd50KYJ4/yP1XL0PmLL0Hw1kvOxLVkFENc1zkoYXJRt2Ec6j9dajmGlsFn
0bLLap6UIlIGQFuvcLf4bvsIi9FICy2jBjaIdM4UAWbReG+52+180HEleAwi5bAN
HY61WVXc4X+N0E2y8HWc1QaRioU7R4XZ5HXKs7OTWkKFZUU2JDFHAKdiiAU78qLU
7GApAgMBAAGjUzBRMB0GA1UdDgQWBBT2vPFo0mzh9ls4xJUiAgSK+B5LpTAfBgNV
HSMEGDAWgBT2vPFo0mzh9ls4xJUiAgSK+B5LpTAPBgNVHRMBAf8EBTADAQH/MA0G
CSqGSIb3DQEBCwUAA4ICAQC4TJNPx476qhiMi8anISv9lo9cnLju+qNhcz7wupBH
3Go6bVQ7TCbSt2QpAyY64mdnRqHsXeGvZXCnabOpeKRDeAPBtRjc6yNKuXybqFtn
W3PZEs/OYc659TUA+MoBzSXYStN9yiiYXyVFqVn+Rw6kM9tKh0GgAU7f5P+8IGuR
gXJbCjkbdJO7JUiVGEEmkjUHyqFxMHaZ8V6uazs52qIFyt7OYQTeV9HdoW8D9vAt
GfzYwzRDzbsZeIJqqDzLe7NOyxEyqZHCbtNpGcOyaLOl7ZBS52WsqaUZtL+9PjqD
2TWj4WUFkOWQpTvWKHqM6//Buv4GjnTBShQKm+h+rxcGkdRMF6/sKwxPbr39P3RJ
TMfJA3u5UuowT44VaA2jkQzqIbxH9+3EA+0qPbqPJchOSr0pHSncqvR9FYcr7ayN
b6UDFnjeliyEqqksUO0arbvaO9FfB0kH8lU1NOKaQNO++Xj69GZMC6s721cNdad0
qqcdtyXWeOBBchguYDrSUIgLnUTHEwwzOmcNQ36hO5eX282BJy3ZLT3JU6MJopjz
vkbDDAxSrpZMcaoAWSrxgJAETeYiO4YbfORIzPkwdUkEIr6XY02Pi7MdkDGQ5hiB
TavA8+oXRa4b9BR3bCWcg8S/t4uOTTLkeTcQbONPh5A5IRySLCU+CwqB+/+VlO8X
Aw==
-----END CERTIFICATE-----"""


def write_cert(cert, file_name):
f = open(file_name, "w")
f.write(cert)
f.close()


def remove_cert(file_name):
os.remove(file_name)


def print_results(results: list):
print("%-6s%-6s%-6s%-24s" % ("id", "speed", "ticks", "time"))
for result in results:
print("%-6s%-6.2f%-6i%-24s" % (result['id'], result['speed'], result['ticks'], result['time']))


def main() -> None:
print("Main")
temp_cert_file = "temp_cert.pem"
conf = Config()

write_and_query_with_explicit_sys_cert(conf)

write_cert(bad_cert, temp_cert_file)
query_with_verify_ssl_off(conf, temp_cert_file)
remove_cert(temp_cert_file)


def write_and_query_with_explicit_sys_cert(conf):
print("\nwrite and query with typical linux system cert\n")
with InfluxDBClient3(token=conf.token,
host=conf.host,
org=conf.org,
database=conf.database,
ssl_ca_cert="/etc/ssl/certs/ca-certificates.crt",
verify_ssl=True) as _client:
now = time.time_ns()
lp = f"escooter,id=zx80 speed=3.14,ticks=42i {now - (10 * 1_000_000_000)}"
_client.write(lp)

query = "SELECT * FROM \"escooter\" ORDER BY time DESC"
reader: pyarrow.Table = _client.query(query, mode="")
print_results(reader.to_pylist())


def query_with_verify_ssl_off(conf, cert):
print("\nquerying with verify_ssl off\n")

# Note that the passed root cert above is bad
# Switch verify_ssl to True to throw SSL_ERROR_SSL
with InfluxDBClient3(token=conf.token,
host=conf.host,
org=conf.org,
database=conf.database,
ssl_ca_cert=cert,
verify_ssl=False) as _client:

query = "SELECT * FROM \"escooter\" ORDER BY time DESC"
reader: pyarrow.Table = _client.query(query, mode="")
print_results(reader.to_pylist())


if __name__ == "__main__":
main()
13 changes: 11 additions & 2 deletions influxdb_client_3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pyarrow as pa
import importlib.util

from influxdb_client_3.query.query_api import QueryApi as _QueryApi
from influxdb_client_3.query.query_api import QueryApi as _QueryApi, QueryApiOptionsBuilder
from influxdb_client_3.read_file import UploadFile
from influxdb_client_3.write_client import InfluxDBClient as _InfluxDBClient, WriteOptions, Point
from influxdb_client_3.write_client.client.exceptions import InfluxDBError
Expand Down Expand Up @@ -165,9 +165,18 @@ def __init__(
connection_string = f"grpc+tls://{hostname}:{port}"
else:
connection_string = f"grpc+tcp://{hostname}:{port}"

q_opts_builder = QueryApiOptionsBuilder()
kw_keys = kwargs.keys()
if kw_keys.__contains__('ssl_ca_cert'):
q_opts_builder.root_certs(kwargs.get('ssl_ca_cert', None))
if kw_keys.__contains__('verify_ssl'):
q_opts_builder.tls_verify(kwargs.get('verify_ssl', True))
if kw_keys.__contains__('proxy'):
q_opts_builder.proxy(kwargs.get('proxy', None))
self._query_api = _QueryApi(connection_string=connection_string, token=token,
flight_client_options=flight_client_options,
proxy=kwargs.get("proxy", None))
proxy=kwargs.get("proxy", None), options=q_opts_builder.build())

def write(self, record=None, database=None, **kwargs):
"""
Expand Down
96 changes: 95 additions & 1 deletion influxdb_client_3/query/query_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,96 @@
from influxdb_client_3.version import USER_AGENT


class QueryApiOptions(object):
"""
Structure for encapsulating options for the QueryApi

Attributes
----------
tls_root_certs (bytes): contents of an SSL root certificate or chain read as bytes
tls_verify (bool): whether to verify SSL certificates or not
proxy (str): URL to a proxy server
flight_client_options (dict): base set of flight client options passed to internal pyarrow.flight.FlightClient
"""
tls_root_certs: bytes = None
tls_verify: bool = None
proxy: str = None
flight_client_options: dict = None

def __init__(self, root_certs_path, verify, proxy, flight_client_options):
"""
Initialize a set of QueryApiOptions

:param root_certs_path: path to a certificate .pem file.
:param verify: whether to verify SSL certificates or not.
:param proxy: URL of a proxy server, if required.
:param flight_client_options: set of flight_client_options
to be passed to internal pyarrow.flight.FlightClient.
"""
if root_certs_path:
self.tls_root_certs = self._read_certs(root_certs_path)
self.tls_verify = verify
self.proxy = proxy
self.flight_client_options = flight_client_options

def _read_certs(self, path):
with open(path, "rb") as certs_file:
return certs_file.read()


class QueryApiOptionsBuilder(object):
"""
Helper class to make adding QueryApiOptions more dynamic.

Example:

.. code-block:: python

options = QueryApiOptionsBuilder()\
.proxy("http://internal.tunnel.proxy:8080") \
.root_certs("/home/fred/.etc/ssl/alt_certs.pem") \
.tls_verify(True) \
.build()

client = QueryApi(connection, token, None, None, options)
"""
_root_certs_path = None
_tls_verify = True
_proxy = None
_flight_client_options = None

def root_certs(self, path):
self._root_certs_path = path
return self

def tls_verify(self, verify):
self._tls_verify = verify
return self

def proxy(self, proxy):
self._proxy = proxy
return self

def flight_client_options(self, flight_client_options):
self._flight_client_options = flight_client_options
return self

def build(self):
"""Build a QueryApiOptions object with previously set values"""
return QueryApiOptions(
root_certs_path=self._root_certs_path,
verify=self._tls_verify,
proxy=self._proxy,
flight_client_options=self._flight_client_options
)


class QueryApi(object):
"""
Implementation for '/api/v2/query' endpoint.

Example:

.. code-block:: python

from influxdb_client import InfluxDBClient
Expand All @@ -26,7 +111,7 @@ def __init__(self,
connection_string,
token,
flight_client_options,
proxy=None) -> None:
proxy=None, options=None) -> None:
"""
Initialize defaults.

Expand All @@ -37,6 +122,15 @@ def __init__(self,
self._token = token
self._flight_client_options = flight_client_options or {}
self._proxy = proxy
if options:
if options.flight_client_options:
self._flight_client_options = options.flight_client_options
if options.tls_root_certs:
self._flight_client_options["tls_root_certs"] = options.tls_root_certs
if options.proxy:
self._proxy = options.proxy
if options.tls_verify is not None:
self._flight_client_options["disable_server_verification"] = not options.tls_verify
self._flight_client_options["generic_options"] = [
("grpc.secondary_user_agent", USER_AGENT)
]
Expand Down
73 changes: 69 additions & 4 deletions tests/test_influxdb_client_3_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pyarrow
import pytest
from pyarrow._flight import FlightError

from influxdb_client_3 import InfluxDBClient3, InfluxDBError, write_client_options, WriteOptions

Expand Down Expand Up @@ -122,7 +123,6 @@ def error(conf, data, exception: InfluxDBError):
database=self.database,
token=self.token,
write_client_options=wc_opts) as w_client:

for i in range(0, data_set_size):
w_client.write(f'{measurement},location=harfa val={i}i {now - (i * 1_000_000_000)}')

Expand All @@ -134,7 +134,6 @@ def error(conf, data, exception: InfluxDBError):
database=self.database,
token=self.token,
write_client_options=wc_opts) as r_client:

query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '3 minute'"
reader: pyarrow.Table = r_client.query(query)
list_results = reader.to_pylist()
Expand Down Expand Up @@ -165,7 +164,6 @@ def test_batch_write_closed(self):
token=self.token,
write_client_options=wc_opts,
debug=True) as w_client:

for i in range(0, data_size):
w_client.write(f'{measurement},location=harfa val={i}i {now - (i * 1_000_000_000)}')

Expand All @@ -177,10 +175,77 @@ def test_batch_write_closed(self):
database=self.database,
token=self.token,
write_client_options=wc_opts) as r_client:

logging.info("PREPARING QUERY")

query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '3 hours'"
reader: pyarrow.Table = r_client.query(query, mode="")
list_results = reader.to_pylist()
self.assertEqual(data_size, len(list_results))

test_cert = """-----BEGIN CERTIFICATE-----
MIIDUzCCAjugAwIBAgIUZB55ULutbc9gy6xLp1BkTQU7siowDQYJKoZIhvcNAQEL
BQAwNjE0MDIGA1UEAwwraW5mbHV4ZGIzLWNsdXN0ZXJlZC1zd2FuLmJyYW1ib3Jh
LnpvbmEtYi5ldTAeFw0yNTAyMTgxNTIyMTJaFw0yNjAyMTgxNTIyMTJaMDYxNDAy
BgNVBAMMK2luZmx1eGRiMy1jbHVzdGVyZWQtc3dhbi5icmFtYm9yYS56b25hLWIu
ZXUwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCugeNrx0ZfyyP8H4e0
zDSkKWnEXlVdjMi+ZSHhMbjvvqMkUQGLc/W59AEmMJ0Uiljka9d+F7jdu+oqDq9p
4kGPhO3Oh7zIG0IGbncj8AwIXMGDNkNyL8s7C1+LoYotlSWDpWwkEKXUeAzdqS63
CSJFqSJM2dss8qe9BpM6zHWJAKS1I30QT3SXQFEsF5m2F62dXCEEI6pO7jlik8/w
aI47dTM20QyimVzea48SC/ELO/T4AjbmMeBGlTyCm39KOElOKRTJvB4KESEWaL3r
EvPZbTh+72PUyrjxiDa56+RmtDPo7EN3uxuRVFX/HWiNnFk7orQLKZg5Kr8wE46R
KmVvAgMBAAGjWTBXMDYGA1UdEQQvMC2CK2luZmx1eGRiMy1jbHVzdGVyZWQtc3dh
bi5icmFtYm9yYS56b25hLWIuZXUwHQYDVR0OBBYEFH8et6JCzGD7Ny84aNRtq5Nj
hvS/MA0GCSqGSIb3DQEBCwUAA4IBAQCuDwARea/Xr3+hmte9A0H+XB8wMPAJ64e8
QA0qi0oy0gGdLfQHhsBWWmKSYLv7HygTNzb+7uFOTtq1UPLt18F+POPeLIj74QZV
z89Pbo1TwUMzQ2pgbu0yRvraXIpqXGrPm5GWYp5mopX0rBWKdimbmEMkhZA0sVeH
IdKIRUY6EyIVG+Z/nbuVqUlgnIWOMp0yg4RRC91zHy3Xvykf3Vai25H/jQpa6cbU
//MIodzUIqT8Tja5cHXE51bLdUkO1rtNKdM7TUdjzkZ+bAOpqKl+c0FlYZI+F7Ly
+MdCcNgKFc8o8jGiyP6uyAJeg+tSICpFDw00LyuKmU62c7VKuyo7
-----END CERTIFICATE-----"""

def create_test_cert(self, cert_file):
f = open(cert_file, "w")
f.write(self.test_cert)
f.close()

def remove_test_cert(self, cert_file):
os.remove(cert_file)

def test_queries_w_bad_cert(self):
cert_file = "test_cert.pem"
self.create_test_cert(cert_file)
with InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
verify_ssl=True,
ssl_ca_cert=cert_file,
debug=True) as client:
try:
query = "SELECT table_name FROM information_schema.tables"
client.query(query, mode="")
assert False, "query should throw SSL_ERROR"
except FlightError as fe:
assert str(fe).__contains__('SSL_ERROR_SSL')
finally:
self.remove_test_cert(cert_file)

def test_verify_ssl_false(self):
cert_file = "test_cert.pem"
self.create_test_cert(cert_file)
measurement = f'test{random_hex(6)}'

with InfluxDBClient3(host=self.host,
database=self.database,
token=self.token,
verify_ssl=False,
ssl_ca_cert=cert_file,
debug=True) as client:
try:
now = time.time_ns()
client.write(f'{measurement},location=harfa val=42i {now - 1_000_000_000}')
query = f"SELECT * FROM \"{measurement}\""
reader: pyarrow.Table = client.query(query, mode="")
list_results = reader.to_pylist()
assert len(list_results) > 0
finally:
self.remove_test_cert(cert_file)
Loading
Loading