diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d1fbdd3..f683b249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## 0.10.0 [unreleased] +### Features +1. [#197](https://github.com/InfluxCommunity/influxdb3-java/pull/197): Respect iox::column_type::field metadata when mapping query results into values + ## 0.9.0 [2024-08-12] ### Features diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index f43178b8..c11948de 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -23,6 +23,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.math.BigInteger; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +42,7 @@ import io.netty.handler.codec.http.HttpMethod; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.util.Text; import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBClient; @@ -183,16 +185,52 @@ public Stream query(@Nonnull final String query, return queryData(query, parameters, options) .flatMap(vector -> { List fieldVectors = vector.getFieldVectors(); - return IntStream - .range(0, vector.getRowCount()) - .mapToObj(rowNumber -> { - - ArrayList row = new ArrayList<>(); - for (FieldVector fieldVector : fieldVectors) { - row.add(fieldVector.getObject(rowNumber)); - } - return row.toArray(); - }); + return IntStream.range(0, vector.getRowCount()) + .mapToObj(rowNumber -> { + ArrayList row = new ArrayList<>(); + for (int i = 0; i < fieldVectors.size(); i++) { + var schema = vector.getSchema().getFields().get(i); + var metaType = schema.getMetadata().get("iox::column::type"); + String valueType = metaType != null ? metaType.split("::")[2] : null; + + if ("field".equals(valueType)) { + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + var intValue = (Long) fieldVectors.get(i) + .getObject(rowNumber); + row.add(intValue); + break; + case "iox::column_type::field::float": + var doubleValue = (Double) fieldVectors.get(i) + .getObject(rowNumber); + row.add(doubleValue); + break; + case "iox::column_type::field::string": + var textValue = (Text) fieldVectors.get(i) + .getObject(rowNumber); + row.add(textValue.toString()); + break; + case "iox::column_type::field::boolean": + var boolValue = (Boolean) fieldVectors.get(i) + .getObject(rowNumber); + row.add(boolValue); + break; + default: + } + } else if ("timestamp".equals(valueType) + || Objects.equals(schema.getName(), "time")) { + var timestamp = fieldVectors.get(i).getObject(rowNumber); + BigInteger time = NanosecondConverter.getTimestampNano(timestamp, schema); + row.add(time); + } else { + Object value = fieldVectors.get(i).getObject(rowNumber); + row.add(value); + } + } + + return row.toArray(); + }); }); } diff --git a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java index f47c9227..dd1ee196 100644 --- a/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/NanosecondConverter.java @@ -24,11 +24,18 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; + import com.influxdb.v3.client.write.WritePrecision; import static java.util.function.Function.identity; @@ -111,4 +118,49 @@ public static BigInteger convert(final Instant instant, final WritePrecision pre return FROM_NANOS.get(precision).apply(nanos); } + + public static BigInteger getTimestampNano(@Nonnull final Object value, @Nonnull final Field schema) { + BigInteger result = null; + + if (value instanceof Long) { + if (schema.getFieldType().getType() instanceof ArrowType.Timestamp) { + ArrowType.Timestamp type = (ArrowType.Timestamp) schema.getFieldType().getType(); + TimeUnit timeUnit; + switch (type.getUnit()) { + case SECOND: + timeUnit = TimeUnit.SECONDS; + break; + case MILLISECOND: + timeUnit = TimeUnit.MILLISECONDS; + break; + case MICROSECOND: + timeUnit = TimeUnit.MICROSECONDS; + break; + default: + case NANOSECOND: + timeUnit = TimeUnit.NANOSECONDS; + break; + } + long nanoseconds = TimeUnit.NANOSECONDS.convert((Long) value, timeUnit); + Instant instant = Instant.ofEpochSecond(0, nanoseconds); + result = convertInstantToNano(instant, WritePrecision.NS); + } else { + Instant instant = Instant.ofEpochMilli((Long) value); + result = convertInstantToNano(instant, WritePrecision.NS); + } + } else if (value instanceof LocalDateTime) { + Instant instant = ((LocalDateTime) value).toInstant(ZoneOffset.UTC); + result = convertInstantToNano(instant, WritePrecision.NS); + } + return result; + } + + private static BigInteger convertInstantToNano(final Instant instant, final WritePrecision precision) { + var writePrecision = WritePrecision.NS; + if (precision != null) { + writePrecision = precision; + } + BigInteger convertedTime = NanosecondConverter.convert(instant, writePrecision); + return NanosecondConverter.convertToNanos(convertedTime, writePrecision); + } } diff --git a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java index d9587052..42e3e259 100644 --- a/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java +++ b/src/main/java/com/influxdb/v3/client/internal/VectorSchemaRootConverter.java @@ -95,7 +95,7 @@ PointValues toPointValues(final int rowNumber, String valueType = parts[2]; if ("field".equals(valueType)) { - p.setField(name, value); + setFieldWithMetaType(p, name, value, metaType); } else if ("tag".equals(valueType) && value instanceof String) { p.setTag(name, (String) value); } else if ("timestamp".equals(valueType)) { @@ -105,6 +105,32 @@ PointValues toPointValues(final int rowNumber, return p; } + private void setFieldWithMetaType(final PointValues p, + final String name, + final Object value, + final String metaType) { + if (value == null) { + return; + } + + switch (metaType) { + case "iox::column_type::field::integer": + case "iox::column_type::field::uinteger": + p.setIntegerField(name, (Long) value); + break; + case "iox::column_type::field::float": + p.setFloatField(name, (Double) value); + break; + case "iox::column_type::field::string": + p.setStringField(name, (String) value); + break; + case "iox::column_type::field::boolean": + p.setBooleanField(name, (Boolean) value); + break; + default: + } + } + private void setTimestamp(@Nonnull final Object value, @Nonnull final Field schema, @Nonnull final PointValues pointValues) { diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java index 2c9fdde0..d88758ec 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java @@ -21,11 +21,19 @@ */ package com.influxdb.v3.client; +import java.math.BigInteger; +import java.time.Instant; import java.util.Map; import java.util.Properties; +import java.util.UUID; +import java.util.stream.Stream; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; + +import com.influxdb.v3.client.write.WriteOptions; +import com.influxdb.v3.client.write.WritePrecision; class InfluxDBClientTest { @@ -116,4 +124,53 @@ public void unsupportedQueryParams() throws Exception { + "class com.influxdb.v3.client.internal.InfluxDBClientImpl"); } } + + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*") + @EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*") + @Test + public void testQuery() throws Exception { + try (InfluxDBClient client = InfluxDBClient.getInstance( + System.getenv("TESTING_INFLUXDB_URL"), + System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(), + System.getenv("TESTING_INFLUXDB_DATABASE"), + null)) { + String uuid = UUID.randomUUID().toString(); + long timestamp = Instant.now().getEpochSecond(); + String record = String.format( + "host10,tag=empty " + + "name=\"intel\"," + + "mem_total=2048," + + "disk_free=100i," + + "temperature=100.86," + + "isActive=true," + + "testId=\"%s\" %d", + uuid, + timestamp + ); + client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null)); + + Map parameters = Map.of("testId", uuid); + String sql = "Select * from host10 where \"testId\"=$testId"; + try (Stream stream = client.query(sql, parameters)) { + stream.findFirst() + .ifPresent(objects -> { + Assertions.assertThat(objects[0].getClass()).isEqualTo(Long.class); + Assertions.assertThat(objects[0]).isEqualTo(100L); + + Assertions.assertThat(objects[1].getClass()).isEqualTo(Boolean.class); + Assertions.assertThat(objects[1]).isEqualTo(true); + + Assertions.assertThat(objects[2].getClass()).isEqualTo(Double.class); + Assertions.assertThat(objects[2]).isEqualTo(2048.0); + + Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class); + Assertions.assertThat(objects[3]).isEqualTo("intel"); + + Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class); + Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000)); + }); + } + } + } } diff --git a/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java new file mode 100644 index 00000000..31ea7f1f --- /dev/null +++ b/src/test/java/com/influxdb/v3/client/internal/NanosecondConverterTest.java @@ -0,0 +1,91 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client.internal; + +import java.math.BigInteger; + +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class NanosecondConverterTest { + + @Test + void testGetTimestampNanosecond() { + BigInteger timestampNanoSecond = null; + + // Second + FieldType timeTypeSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.SECOND, "UTC"), + null); + Field timeFieldSecond = new Field("time", timeTypeSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000_000)), timestampNanoSecond + ); + + // MilliSecond + FieldType timeTypeMilliSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.MILLISECOND, "UTC"), + null); + Field timeFieldMilliSecond = new Field("time", timeTypeMilliSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldMilliSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000)), timestampNanoSecond + ); + + // MicroSecond + FieldType timeTypeMicroSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"), + null); + Field timeFieldMicroSecond = new Field("time", timeTypeMicroSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldMicroSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000)), timestampNanoSecond + ); + + // Nano Second + FieldType timeTypeNanoSecond = new FieldType(true, + new ArrowType.Timestamp(TimeUnit.NANOSECOND, "UTC"), + null); + Field timeFieldNanoSecond = new Field("time", timeTypeNanoSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, timeFieldNanoSecond); + Assertions.assertEquals(BigInteger.valueOf(123_456L), timestampNanoSecond); + + // For ArrowType.Time type + FieldType timeMilliSecond = new FieldType(true, + new ArrowType.Time(TimeUnit.MILLISECOND, 32), + null); + Field fieldMilliSecond = new Field("time", timeMilliSecond, null); + timestampNanoSecond = NanosecondConverter.getTimestampNano(123_456L, fieldMilliSecond); + Assertions.assertEquals( + BigInteger.valueOf(123_456L) + .multiply(BigInteger.valueOf(1_000_000)), timestampNanoSecond + ); + } +} diff --git a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java index 7915b2bc..f0b77c9b 100644 --- a/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/VectorSchemaRootConverterTest.java @@ -25,16 +25,20 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.annotation.Nonnull; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.BaseFixedWidthVector; import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.TimeMilliVector; import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -125,6 +129,41 @@ void timestampAsArrowInt() { } } + @Test + public void testConverterWithMetaType() { + try (VectorSchemaRoot root = generateVectorSchemaRoot()) { + PointValues pointValues = VectorSchemaRootConverter.INSTANCE.toPointValues(0, root, root.getFieldVectors()); + + String measurement = pointValues.getMeasurement(); + Assertions.assertThat(measurement).isEqualTo("host"); + Assertions.assertThat(measurement.getClass()).isEqualTo(String.class); + + BigInteger expected = BigInteger.valueOf(123_456L * 1_000_000); + Assertions.assertThat((BigInteger) pointValues.getTimestamp()).isEqualByComparingTo(expected); + + Long memTotal = (Long) pointValues.getField("mem_total"); + Assertions.assertThat(memTotal).isEqualTo(2048); + Assertions.assertThat(memTotal.getClass()).isEqualTo(Long.class); + + Long diskFree = (Long) pointValues.getField("disk_free"); + Assertions.assertThat(diskFree).isEqualTo(1_000_000); + Assertions.assertThat(diskFree.getClass()).isEqualTo(Long.class); + + Double temperature = (Double) pointValues.getField("temperature"); + Assertions.assertThat(temperature).isEqualTo(100.8766f); + Assertions.assertThat(temperature.getClass()).isEqualTo(Double.class); + + String name = (String) pointValues.getField("name"); + Assertions.assertThat(name).isEqualTo("intel"); + Assertions.assertThat(name.getClass()).isEqualTo(String.class); + + Boolean isActive = (Boolean) pointValues.getField("isActive"); + Assertions.assertThat(isActive).isEqualTo(true); + Assertions.assertThat(isActive.getClass()).isEqualTo(Boolean.class); + } + } + + @Test void timestampWithoutMetadataAndFieldWithoutMetadata() { FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null); @@ -237,4 +276,104 @@ private VectorSchemaRoot initializeVectorSchemaRoot(@Nonnull final Field... fiel return root; } + + public VectorSchemaRoot generateVectorSchemaRoot() { + Field measurementField = generateStringField("measurement"); + Field timeField = generateTimeField(); + Field memTotalField = generateIntField("mem_total"); + Field diskFreeField = generateUnsignedIntField("disk_free"); + Field temperatureField = generateFloatField("temperature"); + Field nameField = generateStringField("name"); + Field isActiveField = generateBoolField("isActive"); + List fields = List.of(measurementField, + timeField, + memTotalField, + diskFreeField, + temperatureField, + nameField, + isActiveField); + + VectorSchemaRoot root = initializeVectorSchemaRoot(fields.toArray(new Field[0])); + VarCharVector measurement = (VarCharVector) root.getVector("measurement"); + measurement.allocateNew(); + measurement.set(0, "host".getBytes()); + + TimeMilliVector timeVector = (TimeMilliVector) root.getVector("time"); + timeVector.allocateNew(); + timeVector.setSafe(0, 123_456); + + BigIntVector intVector = (BigIntVector) root.getVector("mem_total"); + intVector.allocateNew(); + intVector.set(0, 2048); + + BigIntVector unsignedIntVector = (BigIntVector) root.getVector("disk_free"); + unsignedIntVector.allocateNew(); + unsignedIntVector.set(0, 1_000_000); + + Float8Vector floatVector = (Float8Vector) root.getVector("temperature"); + floatVector.allocateNew(); + floatVector.set(0, 100.8766f); + + VarCharVector stringVector = (VarCharVector) root.getVector("name"); + stringVector.allocateNew(); + stringVector.setSafe(0, "intel".getBytes()); + + BitVector boolVector = (BitVector) root.getVector("isActive"); + boolVector.allocateNew(); + boolVector.setSafe(0, 1); + + return root; + } + + private Field generateIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::integer"); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private Field generateUnsignedIntField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::uinteger"); + FieldType intType = new FieldType(true, + new ArrowType.Int(64, true), + null, + metadata); + return new Field(fieldName, intType, null); + } + + private Field generateFloatField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::float"); + FieldType floatType = new FieldType(true, + new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE), + null, + metadata); + return new Field(fieldName, floatType, null); + } + + private Field generateStringField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::string"); + FieldType stringType = new FieldType(true, new ArrowType.Utf8(), null, metadata); + return new Field(fieldName, stringType, null); + } + + private Field generateBoolField(final String fieldName) { + Map metadata = new HashMap<>(); + metadata.put("iox::column::type", "iox::column_type::field::boolean"); + FieldType boolType = new FieldType(true, new ArrowType.Bool(), null, metadata); + return new Field(fieldName, boolType, null); + } + + private Field generateTimeField() { + FieldType timeType = new FieldType(true, + new ArrowType.Time(TimeUnit.MILLISECOND, 32), + null); + return new Field("time", timeType, null); + } + }