From 4ab229e815c8789df86ccb3b1d0602663778e78c Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Mon, 18 Nov 2024 16:32:51 +0700 Subject: [PATCH 01/28] wip --- .../internal/VectorSchemaRootConverter.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index d9587052..3374fd4a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -95,7 +95,7 @@ PointValues toPointValues(final int rowNumber, String valueType = parts[2]; if ("field".equals(valueType)) { - p.setField(name, value); + setFieldWithMetaType(p, name, value, metaType); } else if ("tag".equals(valueType) && value instanceof String) { p.setTag(name, (String) value); } else if ("timestamp".equals(valueType)) { @@ -105,6 +105,20 @@ PointValues toPointValues(final int rowNumber, return p; } + private void setFieldWithMetaType(PointValues p, String name, Object value, String metaType) { + switch (metaType) { + case "iox::column_type::field::integer": + p.setIntegerField(name, (Long) value); + break; + case "iox::column_type::field::float": + p.setFloatField(name, (Float) value); + break; + case "iox::column_type::field::string": + p.setStringField(name, (String) value); + break; + } + } + private void setTimestamp(@Nonnull final Object value, @Nonnull final Field schema, @Nonnull final PointValues pointValues) { From cfb12d4dccf113922bd7aa785c39ec634cf5a2bf Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 08:27:40 +0700 Subject: [PATCH 02/28] wip --- .../v3/client/internal/VectorSchemaRootConverter.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 3374fd4a..2652c23d 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -108,6 +108,7 @@ PointValues toPointValues(final int rowNumber, private void setFieldWithMetaType(PointValues p, String name, Object value, String metaType) { switch (metaType) { case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": p.setIntegerField(name, (Long) value); break; case "iox::column_type::field::float": @@ -116,6 +117,9 @@ private void setFieldWithMetaType(PointValues p, String name, Object value, Stri case "iox::column_type::field::string": p.setStringField(name, (String) value); break; + case "iox::column_type::field::boolean": + p.setBooleanField(name, (Boolean) value); + break; } } From 3ed4603d46c6bbd9424b45022fa4324e53cfd8a1 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 10:46:51 +0700 Subject: [PATCH 03/28] wip --- .../influxdb/v3/client/internal/VectorSchemaRootConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 2652c23d..1850e656 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -112,7 +112,7 @@ private void setFieldWithMetaType(PointValues p, String name, Object value, Stri p.setIntegerField(name, (Long) value); break; case "iox::column_type::field::float": - p.setFloatField(name, (Float) value); + p.setFloatField(name, (Double) value); break; case "iox::column_type::field::string": p.setStringField(name, (String) value); From 051a47ec18294ef751b6a863fda2a096ff00ab9d Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 10:57:20 +0700 Subject: [PATCH 04/28] wip --- .../influxdb/v3/client/internal/VectorSchemaRootConverter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 1850e656..d3c93a76 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -105,7 +105,7 @@ PointValues toPointValues(final int rowNumber, return p; } - private void setFieldWithMetaType(PointValues p, String name, Object value, String metaType) { + private void setFieldWithMetaType(final PointValues p, final String name, final Object value, final String metaType) { switch (metaType) { case "iox::column_type::field::integer": case "iox::column_type::field::uinteger": @@ -120,6 +120,7 @@ private void setFieldWithMetaType(PointValues p, String name, Object value, Stri case "iox::column_type::field::boolean": p.setBooleanField(name, (Boolean) value); break; + default: } } From 43d9387777fcff23aee3d2e1e53591b9a8bf61ff Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 15:42:01 +0700 Subject: [PATCH 05/28] wip --- .../client/internal/InfluxDBClientImpl.java | 50 +++++++++++++++---- .../client/internal/NanosecondConverter.java | 42 ++++++++++++++++ 2 files changed, 82 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index f43178b8..d4366800 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -183,16 +184,45 @@ public Stream query(@Nonnull final String query, return queryData(query, parameters, options) .flatMap(vector -> { List fieldVectors = vector.getFieldVectors(); - return IntStream - .range(0, vector.getRowCount()) - .mapToObj(rowNumber -> { - - ArrayList row = new ArrayList<>(); - for (FieldVector fieldVector : fieldVectors) { - row.add(fieldVector.getObject(rowNumber)); - } - return row.toArray(); - }); + return IntStream.range(0, vector.getRowCount()) + .mapToObj(rowNumber -> { + ArrayList row = new ArrayList<>(); + for (int i = 0; i < fieldVectors.size(); i++) { + var schema = vector.getSchema().getFields().get(i); + var metaType = schema.getMetadata().get("iox::column::type"); + String valueType = metaType.split("::")[2]; + if ("field".equals(valueType)) { + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + var intValue = (Long) fieldVectors.get(i).getObject(rowNumber); + row.add(intValue); + break; + case "iox::column_type::field::float": + var doubleValue = (Double) fieldVectors.get(i).getObject(rowNumber); + row.add(doubleValue); + break; + case "iox::column_type::field::string": + var stringValue = (String) fieldVectors.get(i).getObject(rowNumber); + row.add(stringValue); + break; + case "iox::column_type::field::boolean": + var boolValue = (Boolean) fieldVectors.get(i).getObject(rowNumber); + row.add(boolValue); + break; + default: + } + } else if ("timestamp".equals(valueType)) { + var timestamp = fieldVectors.get(i).getObject(rowNumber); + BigInteger time = NanosecondConverter.getTimestamp(timestamp, schema); + row.add(time); + } else { + row.add(fieldVectors.get(i).getObject(rowNumber)); + } + } + + return row.toArray(); + }); }); } diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index f47c9227..8e48e1fd 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -24,12 +24,18 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import com.influxdb.v3.client.write.WritePrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; import static java.util.function.Function.identity; @@ -111,4 +117,40 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre return FROM_NANOS.get(precision).apply(nanos); } + + public static BigInteger getTimestamp(@Nonnull final Object value, @Nonnull final Field schema) { + BigInteger result = null; + + if (value instanceof Long) { + if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) { + ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType(); + TimeUnit timeUnit; + switch (type.getUnit()) { + case SECOND: + timeUnit = TimeUnit.SECONDS; + break; + case MILLISECOND: + timeUnit = TimeUnit.MILLISECONDS; + break; + case MICROSECOND: + timeUnit = TimeUnit.MICROSECONDS; + break; + default: + case NANOSECOND: + timeUnit = TimeUnit.NANOSECONDS; + break; + } + long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit); + BigInteger convertedTime = NanosecondConverter.convert(Instant.ofEpochSecond(0, nanoseconds), WritePrecision.NS); + result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS); + } else { + BigInteger convertedTime = NanosecondConverter.convert(Instant.ofEpochMilli((Long) value), WritePrecision.NS); + result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS); + } + } else if (value instanceof LocalDateTime) { + BigInteger convertedTime = NanosecondConverter.convert(((LocalDateTime) value).toInstant(ZoneOffset.UTC), WritePrecision.NS); + result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS); + } + return result; + } } From 4ec77b3c3e2e7b2eaa61cbd718c7a1efd4301ca7 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 15:45:27 +0700 Subject: [PATCH 06/28] wip --- .../v3/client/internal/VectorSchemaRootConverter.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index d3c93a76..d1d26ac0 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -105,7 +105,10 @@ PointValues toPointValues(final int rowNumber, return p; } - private void setFieldWithMetaType(final PointValues p, final String name, final Object value, final String metaType) { + private void setFieldWithMetaType(final PointValues p, + final String name, + final Object value, + final String metaType) { switch (metaType) { case "iox::column_type::field::integer": case "iox::column_type::field::uinteger": From c6438ab3e11d178d1be0e60558a6249f3c491eaf Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 16:06:44 +0700 Subject: [PATCH 07/28] wip --- .../client/internal/NanosecondConverter.java | 33 ++++++++++++------- 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index 8e48e1fd..c4cd2f4f 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -21,6 +21,12 @@ */ package com.influxdb.v3.client.internal; +import com.influxdb.v3.client.write.WritePrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; @@ -30,12 +36,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import com.influxdb.v3.client.write.WritePrecision; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; import static java.util.function.Function.identity; @@ -141,16 +141,25 @@ public static BigInteger getTimestamp(@Nonnull final Object value, @Nonnull fina break; } long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit); - BigInteger convertedTime = NanosecondConverter.convert(Instant.ofEpochSecond(0, nanoseconds), WritePrecision.NS); - result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS); + Instant instant = Instant.ofEpochSecond(0, nanoseconds); + result = convertInstantToNano(instant, WritePrecision.NS); } else { - BigInteger convertedTime = NanosecondConverter.convert(Instant.ofEpochMilli((Long) value), WritePrecision.NS); - result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS); + Instant instant = Instant.ofEpochMilli((Long) value); + result = convertInstantToNano(instant, WritePrecision.NS); } } else if (value instanceof LocalDateTime) { - BigInteger convertedTime = NanosecondConverter.convert(((LocalDateTime) value).toInstant(ZoneOffset.UTC), WritePrecision.NS); - result = NanosecondConverter.convertToNanos(convertedTime, WritePrecision.NS); + Instant instant = ((LocalDateTime) value).toInstant(ZoneOffset.UTC); + result = convertInstantToNano(instant, WritePrecision.NS); } return result; } + + private static BigInteger convertInstantToNano(final Instant instant, final WritePrecision precision) { + var writePrecision = WritePrecision.NS; + if (precision != null) { + writePrecision = precision; + } + BigInteger convertedTime = NanosecondConverter.convert(instant, writePrecision); + return NanosecondConverter.convertToNanos(convertedTime, writePrecision); + } } From 77c886756616a8cf7df08e1ea2c567a10e7b8576 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 16:57:44 +0700 Subject: [PATCH 08/28] wip --- .../influxdb/v3/client/internal/NanosecondConverter.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index c4cd2f4f..69f69448 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -21,10 +21,6 @@ */ package com.influxdb.v3.client.internal; -import com.influxdb.v3.client.write.WritePrecision; -import org.apache.arrow.vector.types.pojo.ArrowType; -import org.apache.arrow.vector.types.pojo.Field; - import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.math.BigDecimal; @@ -36,8 +32,10 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; - +import com.influxdb.v3.client.write.WritePrecision; import static java.util.function.Function.identity; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; /** * Nanosecond converter. From b514ff71fdf1aee4ec320634be33268051727118 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 17:05:08 +0700 Subject: [PATCH 09/28] wip --- .../v3/client/internal/NanosecondConverter.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index 69f69448..a71fc881 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -21,8 +21,6 @@ */ package com.influxdb.v3.client.internal; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; @@ -32,11 +30,16 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import com.influxdb.v3.client.write.WritePrecision; -import static java.util.function.Function.identity; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; +import com.influxdb.v3.client.write.WritePrecision; + +import static java.util.function.Function.identity; + /** * Nanosecond converter. *

From ac2d19b7261f3253796aad88026719e56ec04c0a Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Tue, 19 Nov 2024 17:10:14 +0700 Subject: [PATCH 10/28] wip --- .../v3/client/internal/InfluxDBClientImpl.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index d4366800..2abe27e7 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -195,19 +195,23 @@ public Stream query(@Nonnull final String query, switch (metaType) { case "iox::column_type::field::integer": case "iox::column_type::field::uinteger": - var intValue = (Long) fieldVectors.get(i).getObject(rowNumber); + var intValue = (Long) fieldVectors.get(i) + .getObject(rowNumber); row.add(intValue); break; case "iox::column_type::field::float": - var doubleValue = (Double) fieldVectors.get(i).getObject(rowNumber); + var doubleValue = (Double) fieldVectors.get(i) + .getObject(rowNumber); row.add(doubleValue); break; case "iox::column_type::field::string": - var stringValue = (String) fieldVectors.get(i).getObject(rowNumber); + var stringValue = (String) fieldVectors.get(i) + .getObject(rowNumber); row.add(stringValue); break; case "iox::column_type::field::boolean": - var boolValue = (Boolean) fieldVectors.get(i).getObject(rowNumber); + var boolValue = (Boolean) fieldVectors.get(i) + .getObject(rowNumber); row.add(boolValue); break; default: From 9916f3d7ad9a777eaa906027dd382484ac53c6e2 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 20 Nov 2024 10:19:33 +0700 Subject: [PATCH 11/28] wip --- .../com/influxdb/v3/client/internal/InfluxDBClientImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 2abe27e7..fec7fa68 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -190,7 +190,8 @@ public Stream query(@Nonnull final String query, for (int i = 0; i < fieldVectors.size(); i++) { var schema = vector.getSchema().getFields().get(i); var metaType = schema.getMetadata().get("iox::column::type"); - String valueType = metaType.split("::")[2]; + String valueType = metaType != null ? metaType.split("::")[2] : null; + if ("field".equals(valueType)) { switch (metaType) { case "iox::column_type::field::integer": From b40517b9aeb153c43307cab5e847041aa0f602a5 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 20 Nov 2024 10:37:51 +0700 Subject: [PATCH 12/28] wip --- .../influxdb/v3/client/internal/VectorSchemaRootConverter.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index d1d26ac0..8ed59aaf 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -109,6 +109,8 @@ private void setFieldWithMetaType(final PointValues p, final String name, final Object value, final String metaType) { + if (value == null) return; + switch (metaType) { case "iox::column_type::field::integer": case "iox::column_type::field::uinteger": From 7d5ef318c90f0cda0a3682f4eaaa41a4e0758a67 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 20 Nov 2024 10:40:49 +0700 Subject: [PATCH 13/28] wip --- .../v3/client/internal/VectorSchemaRootConverter.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index 8ed59aaf..42e3e259 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -109,7 +109,9 @@ private void setFieldWithMetaType(final PointValues p, final String name, final Object value, final String metaType) { - if (value == null) return; + if (value == null) { + return; + } switch (metaType) { case "iox::column_type::field::integer": From 47c2aee1a7c68b1caab9393927d4934f56447553 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 20 Nov 2024 15:49:04 +0700 Subject: [PATCH 14/28] wip --- .../VectorSchemaRootConverterTest.java | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) diff --git a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java index 7915b2bc..5f3d6139 100644 --- a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java @@ -25,16 +25,20 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.annotation.Nonnull; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BaseFixedWidthVector; import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.TimeMilliVector; import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -125,6 +129,41 @@ void timestampAsArrowInt() { } } + @Test + public void testConverterWithMetaType() { + try (VectorSchemaRoot root = generateVectorSchemaRoot()) { + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + + String measurement = pointValues.getMeasurement(); + Assertions.assertThat(measurement).isEqualTo("host"); + Assertions.assertThat(measurement.getClass()).isEqualTo(String.class); + + BigInteger expected = BigInteger.valueOf(123_456L * 1_000_000); + Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); + + Long memTotal = (Long) pointValues.getField("mem_total"); + Assertions.assertThat(memTotal).isEqualTo(2048); + Assertions.assertThat(memTotal.getClass()).isEqualTo(Long.class); + + Long diskFree = (Long) pointValues.getField("disk_free"); + Assertions.assertThat(diskFree).isEqualTo(1_000_000); + Assertions.assertThat(diskFree.getClass()).isEqualTo(Long.class); + + Double temperature = (Double) pointValues.getField("temperature"); + Assertions.assertThat(temperature).isEqualTo(100.8766f); + Assertions.assertThat(temperature.getClass()).isEqualTo(Double.class); + + String name = (String) pointValues.getField("name"); + Assertions.assertThat(name).isEqualTo("intel"); + Assertions.assertThat(name.getClass()).isEqualTo(String.class); + + Boolean isActive = (Boolean) pointValues.getField("isActive"); + Assertions.assertThat(isActive).isEqualTo(true); + Assertions.assertThat(isActive.getClass()).isEqualTo(Boolean.class); + } + } + + @Test void timestampWithoutMetadataAndFieldWithoutMetadata() { FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); @@ -237,4 +276,87 @@ private VectorSchemaRoot initializeVectorSchemaRoot(@Nonnull final Field... fiel return root; } + + public VectorSchemaRoot generateVectorSchemaRoot() { + Field measurementField = generateStringField("measurement"); + Field timeField = generateTimeField(); + Field memTotalField = generateIntField("mem_total"); + Field diskFreeField = generateUnsignedIntField("disk_free"); + Field temperatureField = generateFloatField("temperature"); + Field nameField = generateStringField("name"); + Field isActiveField = generateBoolField("isActive"); + List fields = List.of(measurementField, timeField, memTotalField, diskFreeField, temperatureField, nameField, isActiveField); + + VectorSchemaRoot root = initializeVectorSchemaRoot(fields.toArray(new Field[0])); + VarCharVector measurement = (VarCharVector) root.getVector("measurement"); + measurement.allocateNew(); + measurement.set(0, "host".getBytes()); + + TimeMilliVector timeVector = (TimeMilliVector) root.getVector("time"); + timeVector.allocateNew(); + timeVector.setSafe(0, 123_456); + + BigIntVector intVector = (BigIntVector) root.getVector("mem_total"); + intVector.allocateNew(); + intVector.set(0, 2048); + + BigIntVector unsignedIntVector = (BigIntVector) root.getVector("disk_free"); + unsignedIntVector.allocateNew(); + unsignedIntVector.set(0, 1_000_000); + + Float8Vector floatVector = (Float8Vector) root.getVector("temperature"); + floatVector.allocateNew(); + floatVector.set(0, 100.8766f); + + VarCharVector stringVector = (VarCharVector) root.getVector("name"); + stringVector.allocateNew(); + stringVector.setSafe(0, "intel".getBytes()); + + BitVector boolVector = (BitVector) root.getVector("isActive"); + boolVector.allocateNew(); + boolVector.setSafe(0, 1); + + return root; + } + + private Field generateIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::integer"); + FieldType intType = new FieldType(true, new ArrowType.Int(64, true), null, metadata); + return new Field(fieldName, intType, null); + } + + private Field generateUnsignedIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::uinteger"); + FieldType intType = new FieldType(true, new ArrowType.Int(64, true), null, metadata); + return new Field(fieldName, intType, null); + } + + private Field generateFloatField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::float"); + FieldType floatType = new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null, metadata); + return new Field(fieldName, floatType, null); + } + + private Field generateStringField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::string"); + FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null, metadata); + return new Field(fieldName, stringType, null); + } + + private Field generateBoolField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::boolean"); + FieldType boolType = new FieldType(true, new ArrowType.Bool(), null, metadata); + return new Field(fieldName, boolType, null); + } + + private Field generateTimeField() { + FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); + return new Field("time", timeType, null); + } + } From eb33725ba66438704db9a568f5e7714c1df707d1 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Wed, 20 Nov 2024 15:54:47 +0700 Subject: [PATCH 15/28] wip --- .../VectorSchemaRootConverterTest.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java index 5f3d6139..f0b77c9b 100644 --- a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java @@ -285,7 +285,13 @@ public VectorSchemaRoot generateVectorSchemaRoot() { Field temperatureField = generateFloatField("temperature"); Field nameField = generateStringField("name"); Field isActiveField = generateBoolField("isActive"); - List fields = List.of(measurementField, timeField, memTotalField, diskFreeField, temperatureField, nameField, isActiveField); + List fields = List.of(measurementField, + timeField, + memTotalField, + diskFreeField, + temperatureField, + nameField, + isActiveField); VectorSchemaRoot root = initializeVectorSchemaRoot(fields.toArray(new Field[0])); VarCharVector measurement = (VarCharVector) root.getVector("measurement"); @@ -322,21 +328,30 @@ public VectorSchemaRoot generateVectorSchemaRoot() { private Field generateIntField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::integer"); - FieldType intType = new FieldType(true, new ArrowType.Int(64, true), null, metadata); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); return new Field(fieldName, intType, null); } private Field generateUnsignedIntField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::uinteger"); - FieldType intType = new FieldType(true, new ArrowType.Int(64, true), null, metadata); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); return new Field(fieldName, intType, null); } private Field generateFloatField(final String fieldName) { Map metadata = new HashMap<>(); metadata.put("iox::column::type", "iox::column_type::field::float"); - FieldType floatType = new FieldType(true, new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), null, metadata); + FieldType floatType = new FieldType(true, + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + null, + metadata); return new Field(fieldName, floatType, null); } @@ -355,7 +370,9 @@ private Field generateBoolField(final String fieldName) { } private Field generateTimeField() { - FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); + FieldType timeType = new FieldType(true, + new ArrowType.Time(TimeUnit.MILLISECOND, 32), + null); return new Field("time", timeType, null); } From 536e75c123d85c4c4e17af0e77977929c0ecb932 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 14:49:18 +0700 Subject: [PATCH 16/28] wip --- .../client/internal/InfluxDBClientImpl.java | 11 +++-- .../v3/client/InfluxDBClientTest.java | 48 +++++++++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index fec7fa68..da317e4b 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -42,6 +42,7 @@ import io.netty.handler.codec.http.HttpMethod; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.util.Text; import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBClient; @@ -206,9 +207,9 @@ public Stream query(@Nonnull final String query, row.add(doubleValue); break; case "iox::column_type::field::string": - var stringValue = (String) fieldVectors.get(i) - .getObject(rowNumber); - row.add(stringValue); + var textValue = (Text) fieldVectors.get(i) + .getObject(rowNumber); + row.add(textValue.toString()); break; case "iox::column_type::field::boolean": var boolValue = (Boolean) fieldVectors.get(i) @@ -222,7 +223,9 @@ public Stream query(@Nonnull final String query, BigInteger time = NanosecondConverter.getTimestamp(timestamp, schema); row.add(time); } else { - row.add(fieldVectors.get(i).getObject(rowNumber)); + Text textValue = (Text) fieldVectors.get(i) + .getObject(rowNumber); + row.add(textValue.toString()); } } diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 2c9fdde0..c9dc61f2 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -21,11 +21,19 @@ */ package com.influxdb.v3.client; +import java.math.BigInteger; +import java.time.Instant; import java.util.Map; import java.util.Properties; +import java.util.UUID; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; + +import com.influxdb.v3.client.write.WriteOptions; +import com.influxdb.v3.client.write.WritePrecision; class InfluxDBClientTest { @@ -116,4 +124,44 @@ public void unsupportedQueryParams() throws Exception { + "class com.influxdb.v3.client.internal.InfluxDBClientImpl"); } } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testQuery() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + String uuid = UUID.randomUUID().toString(); + long timestamp = Instant.now().getEpochSecond(); + String record = String.format( + "host9,tag=empty name=\"intel\",mem_total=2048,disk_free=100,temperature=100.86,isActive=true,testId=\"%s\" %d", + uuid, + timestamp + ); + client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null)); + + Map parameters = Map.of("testId", uuid); + String sql = "Select * from host9 where \"testId\"=$testId"; + try (Stream stream = client.query(sql, parameters)) { + stream.findFirst() + .ifPresent(objects -> { + Assertions.assertThat(objects[0].getClass()).isEqualTo(Double.class); + Assertions.assertThat(objects[0]).isEqualTo(100.0); + + Assertions.assertThat(objects[1].getClass()).isEqualTo(Boolean.class); + Assertions.assertThat(objects[1]).isEqualTo(true); + + Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class); + Assertions.assertThat(objects[3]).isEqualTo("intel"); + + Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class); + Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000)); + }); + } + } + } } From 32c2130289fe1b152e5c86c2f48c19b86bb2b067 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 14:52:46 +0700 Subject: [PATCH 17/28] wip --- .../java/com/influxdb/v3/client/InfluxDBClientTest.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index c9dc61f2..0a92eea9 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -138,7 +138,13 @@ public void testQuery() throws Exception { String uuid = UUID.randomUUID().toString(); long timestamp = Instant.now().getEpochSecond(); String record = String.format( - "host9,tag=empty name=\"intel\",mem_total=2048,disk_free=100,temperature=100.86,isActive=true,testId=\"%s\" %d", + "host9,tag=empty " + + "name=\"intel\"," + + "mem_total=2048," + + "disk_free=100," + + "temperature=100.86," + + "isActive=true," + + "testId=\"%s\" %d", uuid, timestamp ); From 63d34ea4501ea776c1cc70d883206ede93c3ca82 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 15:02:50 +0700 Subject: [PATCH 18/28] wip --- .../com/influxdb/v3/client/InfluxDBClientTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 0a92eea9..7922d109 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -138,13 +138,13 @@ public void testQuery() throws Exception { String uuid = UUID.randomUUID().toString(); long timestamp = Instant.now().getEpochSecond(); String record = String.format( - "host9,tag=empty " + - "name=\"intel\"," + - "mem_total=2048," + - "disk_free=100," + - "temperature=100.86," + - "isActive=true," + - "testId=\"%s\" %d", + "host9,tag=empty " + +"name=\"intel\"," + +"mem_total=2048," + +"disk_free=100," + +"temperature=100.86," + +"isActive=true," + +"testId=\"%s\" %d", uuid, timestamp ); From 63e4dd457205d26a7d866e5da31a485ef41741b0 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 15:06:21 +0700 Subject: [PATCH 19/28] wip --- .../com/influxdb/v3/client/InfluxDBClientTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 7922d109..6d7ccc81 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -139,12 +139,12 @@ public void testQuery() throws Exception { long timestamp = Instant.now().getEpochSecond(); String record = String.format( "host9,tag=empty " - +"name=\"intel\"," - +"mem_total=2048," - +"disk_free=100," - +"temperature=100.86," - +"isActive=true," - +"testId=\"%s\" %d", + + "name=\"intel\"," + + "mem_total=2048," + + "disk_free=100," + + "temperature=100.86," + + "isActive=true," + + "testId=\"%s\" %d", uuid, timestamp ); From 52d294bbeb48fb7be92d99b5d407717cdff72198 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 15:25:42 +0700 Subject: [PATCH 20/28] wip --- .../influxdb/v3/client/internal/InfluxDBClientImpl.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index da317e4b..96172f3f 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -218,14 +218,13 @@ public Stream query(@Nonnull final String query, break; default: } - } else if ("timestamp".equals(valueType)) { + } else if ("timestamp".equals(valueType) || Objects.equals(schema.getName(), "time")) { var timestamp = fieldVectors.get(i).getObject(rowNumber); BigInteger time = NanosecondConverter.getTimestamp(timestamp, schema); row.add(time); } else { - Text textValue = (Text) fieldVectors.get(i) - .getObject(rowNumber); - row.add(textValue.toString()); + Object value = fieldVectors.get(i).getObject(rowNumber); + row.add(value); } } From 3a85deaca2e12b482b6cb132c0af730d34929b7f Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 15:27:56 +0700 Subject: [PATCH 21/28] wip --- .../com/influxdb/v3/client/internal/InfluxDBClientImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 96172f3f..8546a23a 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -218,7 +218,8 @@ public Stream query(@Nonnull final String query, break; default: } - } else if ("timestamp".equals(valueType) || Objects.equals(schema.getName(), "time")) { + } else if ("timestamp".equals(valueType) + || Objects.equals(schema.getName(), "time")) { var timestamp = fieldVectors.get(i).getObject(rowNumber); BigInteger time = NanosecondConverter.getTimestamp(timestamp, schema); row.add(time); From 465187ab5db8672f07a50ae0f6330533a88c4825 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 15:49:19 +0700 Subject: [PATCH 22/28] wip --- .../com/influxdb/v3/client/InfluxDBClientTest.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 6d7ccc81..d88758ec 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -138,10 +138,10 @@ public void testQuery() throws Exception { String uuid = UUID.randomUUID().toString(); long timestamp = Instant.now().getEpochSecond(); String record = String.format( - "host9,tag=empty " + "host10,tag=empty " + "name=\"intel\"," + "mem_total=2048," - + "disk_free=100," + + "disk_free=100i," + "temperature=100.86," + "isActive=true," + "testId=\"%s\" %d", @@ -151,16 +151,19 @@ public void testQuery() throws Exception { client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null)); Map parameters = Map.of("testId", uuid); - String sql = "Select * from host9 where \"testId\"=$testId"; + String sql = "Select * from host10 where \"testId\"=$testId"; try (Stream stream = client.query(sql, parameters)) { stream.findFirst() .ifPresent(objects -> { - Assertions.assertThat(objects[0].getClass()).isEqualTo(Double.class); - Assertions.assertThat(objects[0]).isEqualTo(100.0); + Assertions.assertThat(objects[0].getClass()).isEqualTo(Long.class); + Assertions.assertThat(objects[0]).isEqualTo(100L); Assertions.assertThat(objects[1].getClass()).isEqualTo(Boolean.class); Assertions.assertThat(objects[1]).isEqualTo(true); + Assertions.assertThat(objects[2].getClass()).isEqualTo(Double.class); + Assertions.assertThat(objects[2]).isEqualTo(2048.0); + Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class); Assertions.assertThat(objects[3]).isEqualTo("intel"); From e6f38ccf6269112775e34b28d54b041b94beb241 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 16:25:50 +0700 Subject: [PATCH 23/28] wip --- .../client/internal/InfluxDBClientImpl.java | 2 +- .../client/internal/NanosecondConverter.java | 2 +- .../internal/NanosecondConverterTest.java | 60 +++++++++++++++++++ 3 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 8546a23a..93621656 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -221,7 +221,7 @@ public Stream query(@Nonnull final String query, } else if ("timestamp".equals(valueType) || Objects.equals(schema.getName(), "time")) { var timestamp = fieldVectors.get(i).getObject(rowNumber); - BigInteger time = NanosecondConverter.getTimestamp(timestamp, schema); + BigInteger time = NanosecondConverter.getTimestampNanoSecond(timestamp, schema); row.add(time); } else { Object value = fieldVectors.get(i).getObject(rowNumber); diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index a71fc881..a8beb066 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -119,7 +119,7 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre return FROM_NANOS.get(precision).apply(nanos); } - public static BigInteger getTimestamp(@Nonnull final Object value, @Nonnull final Field schema) { + public static BigInteger getTimestampNanoSecond(@Nonnull final Object value, @Nonnull final Field schema) { BigInteger result = null; if (value instanceof Long) { diff --git a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java new file mode 100644 index 00000000..d4c4519b --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java @@ -0,0 +1,60 @@ +package com.influxdb.v3.client.internal; + +import java.math.BigInteger; + +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class NanosecondConverterTest { + + @Test + void testGetTimestampNanosecond() { + BigInteger timestampNanoSecond = null; + + // Second + FieldType timeTypeSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), + null); + Field timeFieldSecond = new Field("time", timeTypeSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000_000)), timestampNanoSecond + ); + + // MilliSecond + FieldType timeTypeMilliSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), + null); + Field timeFieldMilliSecond = new Field("time", timeTypeMilliSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldMilliSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000)), timestampNanoSecond + ); + + // MicroSecond + FieldType timeTypeMicroSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), + null); + Field timeFieldMicroSecond = new Field("time", timeTypeMicroSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldMicroSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000)), timestampNanoSecond + ); + + // Nano Second + FieldType timeTypeNanoSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), + null); + Field timeFieldNanoSecond = new Field("time", timeTypeNanoSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldNanoSecond); + Assertions.assertEquals(BigInteger.valueOf(123_456L), timestampNanoSecond + ); + } +} From 9e96ec3223d7eef154db63b3cb83a986bdeee8ef Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 16:53:18 +0700 Subject: [PATCH 24/28] wip --- .../influxdb/v3/client/internal/InfluxDBClientImpl.java | 2 +- .../influxdb/v3/client/internal/NanosecondConverter.java | 2 +- .../v3/client/internal/NanosecondConverterTest.java | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 93621656..441b4449 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -221,7 +221,7 @@ public Stream query(@Nonnull final String query, } else if ("timestamp".equals(valueType) || Objects.equals(schema.getName(), "time")) { var timestamp = fieldVectors.get(i).getObject(rowNumber); - BigInteger time = NanosecondConverter.getTimestampNanoSecond(timestamp, schema); + BigInteger time = NanosecondConverter.getTimestampNano(timestamp, schema); row.add(time); } else { Object value = fieldVectors.get(i).getObject(rowNumber); diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index a8beb066..dd1ee196 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -119,7 +119,7 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre return FROM_NANOS.get(precision).apply(nanos); } - public static BigInteger getTimestampNanoSecond(@Nonnull final Object value, @Nonnull final Field schema) { + public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull final Field schema) { BigInteger result = null; if (value instanceof Long) { diff --git a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java index d4c4519b..a61438c0 100644 --- a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java @@ -20,7 +20,7 @@ void testGetTimestampNanosecond() { new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), null); Field timeFieldSecond = new Field("time", timeTypeSecond, null); - timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldSecond); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldSecond); Assertions.assertEquals( BigInteger.valueOf(123_456L) .multiply(BigInteger.valueOf(1_000_000_000)), timestampNanoSecond @@ -31,7 +31,7 @@ void testGetTimestampNanosecond() { new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), null); Field timeFieldMilliSecond = new Field("time", timeTypeMilliSecond, null); - timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldMilliSecond); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldMilliSecond); Assertions.assertEquals( BigInteger.valueOf(123_456L) .multiply(BigInteger.valueOf(1_000_000)), timestampNanoSecond @@ -42,7 +42,7 @@ void testGetTimestampNanosecond() { new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), null); Field timeFieldMicroSecond = new Field("time", timeTypeMicroSecond, null); - timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldMicroSecond); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldMicroSecond); Assertions.assertEquals( BigInteger.valueOf(123_456L) .multiply(BigInteger.valueOf(1_000)), timestampNanoSecond @@ -53,7 +53,7 @@ void testGetTimestampNanosecond() { new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), null); Field timeFieldNanoSecond = new Field("time", timeTypeNanoSecond, null); - timestampNanoSecond = NanosecondConverter.getTimestampNanoSecond(123_456L, timeFieldNanoSecond); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldNanoSecond); Assertions.assertEquals(BigInteger.valueOf(123_456L), timestampNanoSecond ); } From 6a6c4d31bf7d6b8a861ef81cb0878acf766472c7 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 16:59:02 +0700 Subject: [PATCH 25/28] wip --- .../client/internal/InfluxDBClientImpl.java | 88 +++++++++---------- 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 441b4449..c11948de 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -186,50 +186,50 @@ public Stream query(@Nonnull final String query, .flatMap(vector -> { List fieldVectors = vector.getFieldVectors(); return IntStream.range(0, vector.getRowCount()) - .mapToObj(rowNumber -> { - ArrayList row = new ArrayList<>(); - for (int i = 0; i < fieldVectors.size(); i++) { - var schema = vector.getSchema().getFields().get(i); - var metaType = schema.getMetadata().get("iox::column::type"); - String valueType = metaType != null ? metaType.split("::")[2] : null; - - if ("field".equals(valueType)) { - switch (metaType) { - case "iox::column_type::field::integer": - case "iox::column_type::field::uinteger": - var intValue = (Long) fieldVectors.get(i) - .getObject(rowNumber); - row.add(intValue); - break; - case "iox::column_type::field::float": - var doubleValue = (Double) fieldVectors.get(i) - .getObject(rowNumber); - row.add(doubleValue); - break; - case "iox::column_type::field::string": - var textValue = (Text) fieldVectors.get(i) - .getObject(rowNumber); - row.add(textValue.toString()); - break; - case "iox::column_type::field::boolean": - var boolValue = (Boolean) fieldVectors.get(i) - .getObject(rowNumber); - row.add(boolValue); - break; - default: - } - } else if ("timestamp".equals(valueType) - || Objects.equals(schema.getName(), "time")) { - var timestamp = fieldVectors.get(i).getObject(rowNumber); - BigInteger time = NanosecondConverter.getTimestampNano(timestamp, schema); - row.add(time); - } else { - Object value = fieldVectors.get(i).getObject(rowNumber); - row.add(value); - } - } - - return row.toArray(); + .mapToObj(rowNumber -> { + ArrayList row = new ArrayList<>(); + for (int i = 0; i < fieldVectors.size(); i++) { + var schema = vector.getSchema().getFields().get(i); + var metaType = schema.getMetadata().get("iox::column::type"); + String valueType = metaType != null ? metaType.split("::")[2] : null; + + if ("field".equals(valueType)) { + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + var intValue = (Long) fieldVectors.get(i) + .getObject(rowNumber); + row.add(intValue); + break; + case "iox::column_type::field::float": + var doubleValue = (Double) fieldVectors.get(i) + .getObject(rowNumber); + row.add(doubleValue); + break; + case "iox::column_type::field::string": + var textValue = (Text) fieldVectors.get(i) + .getObject(rowNumber); + row.add(textValue.toString()); + break; + case "iox::column_type::field::boolean": + var boolValue = (Boolean) fieldVectors.get(i) + .getObject(rowNumber); + row.add(boolValue); + break; + default: + } + } else if ("timestamp".equals(valueType) + || Objects.equals(schema.getName(), "time")) { + var timestamp = fieldVectors.get(i).getObject(rowNumber); + BigInteger time = NanosecondConverter.getTimestampNano(timestamp, schema); + row.add(time); + } else { + Object value = fieldVectors.get(i).getObject(rowNumber); + row.add(value); + } + } + + return row.toArray(); }); }); } From 282cc21c46816234b12cb674d9c518bfe74d0f72 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 17:01:53 +0700 Subject: [PATCH 26/28] wip --- .../internal/NanosecondConverterTest.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java index a61438c0..82b0ef33 100644 --- a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java @@ -1,3 +1,24 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ package com.influxdb.v3.client.internal; import java.math.BigInteger; From a77303dc0347ab91791e13b31d602abac86dd3e8 Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 17:09:51 +0700 Subject: [PATCH 27/28] wip --- .../v3/client/internal/NanosecondConverterTest.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java index 82b0ef33..31ea7f1f 100644 --- a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java @@ -75,7 +75,17 @@ void testGetTimestampNanosecond() { null); Field timeFieldNanoSecond = new Field("time", timeTypeNanoSecond, null); timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldNanoSecond); - Assertions.assertEquals(BigInteger.valueOf(123_456L), timestampNanoSecond + Assertions.assertEquals(BigInteger.valueOf(123_456L), timestampNanoSecond); + + // For ArrowType.Time type + FieldType timeMilliSecond = new FieldType(true, + new ArrowType.Time(TimeUnit.MILLISECOND, 32), + null); + Field fieldMilliSecond = new Field("time", timeMilliSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, fieldMilliSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000)), timestampNanoSecond ); } } From 03fd834d181aa2b238dbee5fb0de0cd520fcfb5e Mon Sep 17 00:00:00 2001 From: NguyenHoangSon96 Date: Thu, 21 Nov 2024 17:26:06 +0700 Subject: [PATCH 28/28] wip --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d1fbdd3..f683b249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.10.0 [unreleased] +### Features +1. [#197](https://github.com/InfluxCommunity/influxdb3-java/pull/197): Respect iox::column_type::field metadata when mapping query results into values + ## 0.9.0 [2024-08-12] ### Features