diff --git a/server/src/main/resources/transport/definitions/referable/esql_serializeable_tdigest.csv b/server/src/main/resources/transport/definitions/referable/esql_serializeable_tdigest.csv new file mode 100644 index 0000000000000..0a1946da601e8 --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/esql_serializeable_tdigest.csv @@ -0,0 +1 @@ +9237000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index bbc3e4edba1e8..709f84dfc8f50 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -plan_profile_version,9236000 +esql_serializeable_tdigest,9237000 diff --git a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java index b481fd8d9aab5..92cbffa4e656a 100644 --- a/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java +++ b/x-pack/plugin/analytics/src/main/java/org/elasticsearch/xpack/analytics/mapper/TDigestFieldMapper.java @@ -78,9 +78,7 @@ public class TDigestFieldMapper extends FieldMapper { public static final String CENTROIDS_NAME = "centroids"; public static final String COUNTS_NAME = "counts"; - public static final String SUM_FIELD_NAME = "sum"; - public static final String MIN_FIELD_NAME = "min"; - public static final String MAX_FIELD_NAME = "max"; + public static final String CONTENT_TYPE = "tdigest"; private static TDigestFieldMapper toType(FieldMapper in) { diff --git a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java index b2b8b678b8333..9bef9b1c6db7d 100644 --- a/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java +++ b/x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/DataType.java @@ -373,15 +373,13 @@ public enum DataType implements Writeable { .underConstruction(DataTypesTransportVersions.TEXT_SIMILARITY_RANK_DOC_EXPLAIN_CHUNKS_VERSION) ), - /* TDIGEST( - builder().esType("exponential_histogram") + builder().esType("tdigest") .estimatedSize(16 * 160)// guess 160 buckets (OTEL default for positive values only histograms) with 16 bytes per bucket .docValues() - .underConstruction() - ), + .underConstruction(DataTypesTransportVersions.ESQL_SERIALIZEABLE_TDIGEST) - */ + ), /** * Fields with this type are dense vectors, represented as an array of float values. @@ -1049,5 +1047,8 @@ public static class DataTypesTransportVersions { public static final TransportVersion TEXT_SIMILARITY_RANK_DOC_EXPLAIN_CHUNKS_VERSION = TransportVersion.fromName( "text_similarity_rank_docs_explain_chunks" ); + + private static final TransportVersion ESQL_SERIALIZEABLE_TDIGEST = TransportVersion.fromName("esql_serializeable_tdigest"); + } } diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java index 0ce6b6fcffa6a..a4b50bc5bb5db 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/data/TDigestHolder.java @@ -8,9 +8,20 @@ package org.elasticsearch.compute.data; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.ByteArrayStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.GenericNamedWriteable; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tdigest.parsing.TDigestParser; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.json.JsonXContent; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -19,8 +30,19 @@ * {@link org.elasticsearch.search.aggregations.metrics.TDigestState} in classic aggregations, which we are not using directly because * the serialization format is pretty bad for ESQL's use case (specifically, encoding the near-constant compression and merge strategy * data inline as opposed to in a dedicated column isn't great). + * + * This is writable to support ESQL literals of this type, even though those should not exist. Literal support, and thus a writeable + * object here, are required for ESQL testing. See for example ShowExecSerializationTest. */ -public class TDigestHolder { +public class TDigestHolder implements GenericNamedWriteable { + + private static final TransportVersion ESQL_SERIALIZEABLE_TDIGEST = TransportVersion.fromName("esql_serializeable_tdigest"); + + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + GenericNamedWriteable.class, + "TDigestHolder", + TDigestHolder::new + ); private final double min; private final double max; @@ -37,11 +59,33 @@ public TDigestHolder(BytesRef encodedDigest, double min, double max, double sum, this.valueCount = valueCount; } + // TODO: Probably TDigestHolder and ParsedTDigest should be the same object + public TDigestHolder(TDigestParser.ParsedTDigest parsed) throws IOException { + this(parsed.centroids(), parsed.counts(), parsed.min(), parsed.max(), parsed.sum(), parsed.count()); + } + public TDigestHolder(List centroids, List counts, double min, double max, double sum, long valueCount) throws IOException { this(encodeCentroidsAndCounts(centroids, counts), min, max, sum, valueCount); } + public TDigestHolder(StreamInput in) throws IOException { + this.encodedDigest = in.readBytesRef(); + this.min = in.readDouble(); + this.max = in.readDouble(); + this.sum = in.readDouble(); + this.valueCount = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBytesRef(encodedDigest); + out.writeDouble(min); + out.writeDouble(max); + out.writeDouble(sum); + out.writeVLong(valueCount); + } + @Override public boolean equals(Object o) { if ((o instanceof TDigestHolder that)) { @@ -98,4 +142,58 @@ public long getValueCount() { return valueCount; } + @Override + public String toString() { + // TODO: this is largely duplicated from TDigestFieldMapepr's synthetic source support, and we should refactor all of that. + try (XContentBuilder builder = JsonXContent.contentBuilder()) { + builder.startObject(); + + if (Double.isNaN(this.getMin()) == false) { + builder.field("min", this.getMin()); + } + if (Double.isNaN(this.getMax()) == false) { + builder.field("max", this.getMax()); + } + if (Double.isNaN(this.getSum()) == false) { + builder.field("sum", this.getSum()); + } + + // TODO: Would be nice to wrap all of this in reusable objects and minimize allocations here + ByteArrayStreamInput values = new ByteArrayStreamInput(); + values.reset(encodedDigest.bytes, encodedDigest.offset, encodedDigest.length); + List centroids = new ArrayList<>(); + List counts = new ArrayList<>(); + while (values.available() > 0) { + counts.add(values.readVLong()); + centroids.add(values.readDouble()); + } + + // TODO: reuse the constans from the field type + builder.startArray("centroids"); + for (Double centroid : centroids) { + builder.value(centroid.doubleValue()); + } + builder.endArray(); + + builder.startArray("counts"); + for (Long count : counts) { + builder.value(count.longValue()); + } + builder.endArray(); + builder.endObject(); + return Strings.toString(builder); + } catch (IOException e) { + throw new IllegalStateException("error rendering TDigest", e); + } + } + + @Override + public String getWriteableName() { + return "TDigestHolder"; + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return ESQL_SERIALIZEABLE_TDIGEST; + } } diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java index a1a1688dd4e64..5822f4a2ed481 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/BlockTestUtils.java @@ -424,7 +424,7 @@ public static ExponentialHistogram randomExponentialHistogram() { } public static TDigestHolder randomTDigest() { - // TODO: This is mostly copied from TDigestFieldMapperTests; refactor it. + // TODO: This is mostly copied from TDigestFieldMapperTests and EsqlTestUtils; refactor it. int size = between(1, 100); // Note - we use TDigestState to build an actual t-digest for realistic values here TDigestState digest = TDigestState.createWithoutCircuitBreaking(100); diff --git a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java index 2bb23d01c7118..4a01001d92e2f 100644 --- a/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java +++ b/x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java @@ -373,13 +373,10 @@ protected boolean supportsExponentialHistograms() { @Override protected boolean supportsTDigestField() { try { - return RestEsqlTestCase.hasCapabilities( - client(), - List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) - ) + return RestEsqlTestCase.hasCapabilities(client(), List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName())) && RestEsqlTestCase.hasCapabilities( remoteClusterClient(), - List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) + List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName()) ); } catch (IOException e) { throw new RuntimeException(e); diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java index f72d244b25e08..ed718dde490b3 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java @@ -64,10 +64,7 @@ protected boolean supportsExponentialHistograms() { @Override protected boolean supportsTDigestField() { - return RestEsqlTestCase.hasCapabilities( - client(), - List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) - ); + return RestEsqlTestCase.hasCapabilities(client(), List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName())); } @Before diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 209fc0202fc46..311e5343e0727 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -791,6 +791,7 @@ public void testSuggestedCast() throws IOException { shouldBeSupported.remove(DataType.TSID_DATA_TYPE); shouldBeSupported.remove(DataType.DENSE_VECTOR); shouldBeSupported.remove(DataType.EXPONENTIAL_HISTOGRAM); // TODO(b/133393): add support when blockloader is implemented + shouldBeSupported.remove(DataType.TDIGEST); if (EsqlCapabilities.Cap.AGGREGATE_METRIC_DOUBLE_V0.isEnabled() == false) { shouldBeSupported.remove(DataType.AGGREGATE_METRIC_DOUBLE); } diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java index 82994256edf0d..8fd9b5491d07c 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/AllSupportedFieldsTestCase.java @@ -909,7 +909,7 @@ private static boolean supportedInIndex(DataType t) { DATE_PERIOD, TIME_DURATION, GEOTILE, GEOHASH, GEOHEX, // TODO(b/133393): Once we remove the feature-flag of the exp-histo field type (!= ES|QL type), // replace this with a capability check - EXPONENTIAL_HISTOGRAM, + EXPONENTIAL_HISTOGRAM, TDIGEST, // TODO fix geo CARTESIAN_POINT, CARTESIAN_SHAPE -> false; default -> true; diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java index f4d86ce04b7f7..825a38a782ebc 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java @@ -295,10 +295,7 @@ protected boolean supportsExponentialHistograms() { } protected boolean supportsTDigestField() { - return RestEsqlTestCase.hasCapabilities( - client(), - List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName()) - ); + return RestEsqlTestCase.hasCapabilities(client(), List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName())); } protected void doTest() throws Throwable { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java index 3f28402587ef3..b6e793b10ce07 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java @@ -42,6 +42,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.compute.lucene.DataPartitioning; import org.elasticsearch.core.PathUtils; import org.elasticsearch.core.SuppressForbidden; @@ -65,8 +66,10 @@ import org.elasticsearch.logging.Logger; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; +import org.elasticsearch.search.aggregations.metrics.TDigestState; import org.elasticsearch.search.crossproject.CrossProjectModeDecider; import org.elasticsearch.tasks.TaskCancelledException; +import org.elasticsearch.tdigest.Centroid; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TransportVersionUtils; import org.elasticsearch.threadpool.ThreadPool; @@ -186,6 +189,7 @@ import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.test.ESTestCase.assertEquals; import static org.elasticsearch.test.ESTestCase.between; +import static org.elasticsearch.test.ESTestCase.fail; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; import static org.elasticsearch.test.ESTestCase.randomArray; import static org.elasticsearch.test.ESTestCase.randomBoolean; @@ -193,6 +197,7 @@ import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomFloat; import static org.elasticsearch.test.ESTestCase.randomFrom; +import static org.elasticsearch.test.ESTestCase.randomGaussianDouble; import static org.elasticsearch.test.ESTestCase.randomIdentifier; import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -1076,6 +1081,7 @@ public static Literal randomLiteral(DataType type) { case UNSUPPORTED, OBJECT, DOC_DATA_TYPE -> throw new IllegalArgumentException( "can't make random values for [" + type.typeName() + "]" ); + case TDIGEST -> EsqlTestUtils.randomTDigest(); }, type); } @@ -1114,6 +1120,39 @@ public static ExponentialHistogram randomExponentialHistogram() { return new WriteableExponentialHistogram(histo); } + public static TDigestHolder randomTDigest() { + // TODO: This is mostly copied from TDigestFieldMapperTests and BlockTestUtils; refactor it. + int size = between(1, 100); + // Note - we use TDigestState to build an actual t-digest for realistic values here + TDigestState digest = TDigestState.createWithoutCircuitBreaking(100); + for (int i = 0; i < size; i++) { + double sample = randomGaussianDouble(); + int count = randomIntBetween(1, Integer.MAX_VALUE); + digest.add(sample, count); + } + List centroids = new ArrayList<>(); + List counts = new ArrayList<>(); + double sum = 0.0; + long valueCount = 0L; + for (Centroid c : digest.centroids()) { + centroids.add(c.mean()); + counts.add(c.count()); + sum += c.mean() * c.count(); + valueCount += c.count(); + } + double min = digest.getMin(); + double max = digest.getMax(); + + TDigestHolder returnValue = null; + try { + returnValue = new TDigestHolder(centroids, counts, min, max, sum, valueCount); + } catch (IOException e) { + // This is a test util, so we're just going to fail the test here + fail(e); + } + return returnValue; + } + static Version randomVersion() { // TODO degenerate versions and stuff return switch (between(0, 2)) { diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec index 1ca152a4d76aa..858014c0c6f29 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/tdigest.csv-spec @@ -1,9 +1,8 @@ Make sure we can even load tdigest data -required_capability: tdigest_field_type_basic_functionality +required_capability: tdigest_field_type_support_v1 -FROM tdigest_standard_index | KEEP @timestamp,instance; - -@timestamp:date | instance:keyword -2025-01-01T00:00:00Z | hand-rolled +FROM tdigest_standard_index ; +@timestamp:date | instance:keyword | responseTime:tdigest +2025-01-01T00:00:00Z | hand-rolled | "{""min"": 0.1, ""max"": 0.5, ""sum"": 16.4, ""centroids"":[0.1,0.2,0.3,0.4,0.5], ""counts"":[3,7,23,12,6]}" ; diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java index 4f899e12eecfa..d97e17b529629 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupJoinTypesIT.java @@ -75,6 +75,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.NULL; import static org.elasticsearch.xpack.esql.core.type.DataType.SCALED_FLOAT; import static org.elasticsearch.xpack.esql.core.type.DataType.SHORT; +import static org.elasticsearch.xpack.esql.core.type.DataType.TDIGEST; import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT; import static org.elasticsearch.xpack.esql.core.type.DataType.TSID_DATA_TYPE; import static org.elasticsearch.xpack.esql.core.type.DataType.UNDER_CONSTRUCTION; @@ -249,6 +250,7 @@ public LookupJoinTypesIT(BinaryComparisonOperation operation) { || type == AGGREGATE_METRIC_DOUBLE // need special handling for loads at the moment || type == DENSE_VECTOR // need special handling for loads at the moment || type == EXPONENTIAL_HISTOGRAM + || type == TDIGEST || type == GEOHASH || type == GEOTILE || type == GEOHEX diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 704eefcce9b4a..e3b413066da88 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -1588,7 +1588,7 @@ public enum Cap { */ EXPONENTIAL_HISTOGRAM_PRE_TECH_PREVIEW_V8(EXPONENTIAL_HISTOGRAM_FEATURE_FLAG), - TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY(T_DIGEST_ESQL_SUPPORT), + TDIGEST_FIELD_TYPE_SUPPORT_V1(T_DIGEST_ESQL_SUPPORT), /** * Create new block when filtering OrdinalBytesRefBlock diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java index 92145463165b8..89c6f30521e88 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/PositionToXContent.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.data.FloatBlock; import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; +import org.elasticsearch.compute.data.TDigestBlock; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; @@ -38,6 +39,7 @@ import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.ipToString; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.nanoTimeToString; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.spatialToString; +import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.tDigestBlockToString; import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.versionToString; public abstract class PositionToXContent { @@ -172,6 +174,12 @@ protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Pa return builder.value(aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, valueIndex)); } }; + case TDIGEST -> new PositionToXContent(block) { + protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex) + throws IOException { + return builder.value(tDigestBlockToString((TDigestBlock) block, valueIndex)); + } + }; case EXPONENTIAL_HISTOGRAM -> new PositionToXContent(block) { ExponentialHistogramScratch scratch = new ExponentialHistogramScratch(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java index c1d652362e1e3..42927b264bff2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java @@ -21,6 +21,7 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestBlock; import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper; import org.elasticsearch.xcontent.XContentParser; import org.elasticsearch.xcontent.XContentParserConfiguration; @@ -142,6 +143,7 @@ private static Object valueAt(DataType dataType, Block block, int offset, BytesR case GEOHEX, GEOHASH, GEOTILE -> geoGridToString(((LongBlock) block).getLong(offset), dataType); case AGGREGATE_METRIC_DOUBLE -> aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, offset); case EXPONENTIAL_HISTOGRAM -> exponentialHistogramBlockToString((ExponentialHistogramBlock) block, offset); + case TDIGEST -> ((TDigestBlock) block).getTDigestHolder(offset); case UNSUPPORTED -> (String) null; case SOURCE -> { BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java index 8dfa560c64e20..fa85e6c202672 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/Coalesce.java @@ -213,7 +213,7 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { case EXPONENTIAL_HISTOGRAM -> CoalesceExponentialHistogramEvaluator.toEvaluator(toEvaluator, children()); case NULL -> EvalOperator.CONSTANT_NULL_FACTORY; case UNSUPPORTED, SHORT, BYTE, DATE_PERIOD, OBJECT, DOC_DATA_TYPE, SOURCE, TIME_DURATION, FLOAT, HALF_FLOAT, TSID_DATA_TYPE, - SCALED_FLOAT, AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> throw new UnsupportedOperationException( + SCALED_FLOAT, AGGREGATE_METRIC_DOUBLE, TDIGEST, DENSE_VECTOR -> throw new UnsupportedOperationException( dataType() + " can’t be coalesced" ); }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java index 5440c6fb829c6..35ee8b65781d0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/join/Join.java @@ -59,6 +59,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.NULL; import static org.elasticsearch.xpack.esql.core.type.DataType.OBJECT; import static org.elasticsearch.xpack.esql.core.type.DataType.SOURCE; +import static org.elasticsearch.xpack.esql.core.type.DataType.TDIGEST; import static org.elasticsearch.xpack.esql.core.type.DataType.TEXT; import static org.elasticsearch.xpack.esql.core.type.DataType.TIME_DURATION; import static org.elasticsearch.xpack.esql.core.type.DataType.TSID_DATA_TYPE; @@ -96,6 +97,7 @@ public class Join extends BinaryPlan implements PostAnalysisVerificationAware, S TSID_DATA_TYPE, AGGREGATE_METRIC_DOUBLE, EXPONENTIAL_HISTOGRAM, + TDIGEST, DENSE_VECTOR }; private final JoinConfig config; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index 6ac268a92f11d..7537ae8a73c26 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -487,7 +487,7 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION, OBJECT, SCALED_FLOAT, UNSIGNED_LONG -> TopNEncoder.DEFAULT_SORTABLE; case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE, - AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM, TSID_DATA_TYPE -> + AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM, TDIGEST, TSID_DATA_TYPE -> TopNEncoder.DEFAULT_UNSORTABLE; // unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point case UNSUPPORTED -> TopNEncoder.UNSUPPORTED; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 60bd97d166193..6491a24cbae68 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -373,6 +373,7 @@ public static ElementType toElementType(DataType dataType, MappedFieldType.Field case GEO_SHAPE, CARTESIAN_SHAPE -> fieldExtractPreference == EXTRACT_SPATIAL_BOUNDS ? ElementType.INT : ElementType.BYTES_REF; case AGGREGATE_METRIC_DOUBLE -> ElementType.AGGREGATE_METRIC_DOUBLE; case EXPONENTIAL_HISTOGRAM -> ElementType.EXPONENTIAL_HISTOGRAM; + case TDIGEST -> ElementType.TDIGEST; case DENSE_VECTOR -> ElementType.FLOAT; case SHORT, BYTE, DATE_PERIOD, TIME_DURATION, OBJECT, FLOAT, HALF_FLOAT, SCALED_FLOAT -> throw EsqlIllegalArgumentException .illegalDataType(dataType); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java index a10094dd93997..f1cdf55cb1856 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/type/EsqlDataTypeConverter.java @@ -23,6 +23,8 @@ import org.elasticsearch.compute.data.ExponentialHistogramBlock; import org.elasticsearch.compute.data.ExponentialHistogramScratch; import org.elasticsearch.compute.data.IntBlock; +import org.elasticsearch.compute.data.TDigestBlock; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.core.Booleans; import org.elasticsearch.exponentialhistogram.ExponentialHistogram; import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent; @@ -803,6 +805,15 @@ public static String exponentialHistogramBlockToString(ExponentialHistogramBlock return exponentialHistogramToString(histo); } + public static String tDigestBlockToString(TDigestBlock tDigestBlock, int index) { + TDigestHolder digest = tDigestBlock.getTDigestHolder(index); + return tDigestToString(digest); + } + + public static String tDigestToString(TDigestHolder digest) { + return digest.toString(); + } + public static String aggregateMetricDoubleLiteralToString(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral aggMetric) { try (XContentBuilder builder = JsonXContent.contentBuilder()) { builder.startObject(); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java index 237dc568fc9f3..5d318a46743b4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.ExistsQueryBuilder; import org.elasticsearch.index.query.MatchAllQueryBuilder; @@ -125,6 +126,7 @@ public static NamedWriteableRegistry writableRegistry() { AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral::new ) ); + entries.add(TDigestHolder.ENTRY); entries.add(WriteableExponentialHistogram.ENTRY); return new NamedWriteableRegistry(entries); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index ef98b9b422025..f25febce08f74 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -32,6 +32,8 @@ import org.elasticsearch.compute.data.IntBlock; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.data.TDigestBlockBuilder; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.DriverSleeps; @@ -53,6 +55,7 @@ import org.elasticsearch.index.mapper.BlockLoader; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils; +import org.elasticsearch.tdigest.parsing.TDigestParser; import org.elasticsearch.test.AbstractChunkedSerializingTestCase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.RemoteClusterAware; @@ -63,6 +66,7 @@ import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; import org.elasticsearch.xcontent.XContentParser; +import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.esql.EsqlTestUtils; @@ -311,6 +315,7 @@ private Page randomPage(List columns) { ); expBuilder.append(histo); } + case TDIGEST -> ((TDigestBlockBuilder) builder).append(EsqlTestUtils.randomTDigest()); // default -> throw new UnsupportedOperationException("unsupported data type [" + c + "]"); } return builder.build(); @@ -1311,6 +1316,31 @@ static Page valuesToPage(BlockFactory blockFactory, List columns expHistoBuilder.append(parsed); } } + case TDIGEST -> { + TDigestBlockBuilder tDigestBlockBuilder = (TDigestBlockBuilder) builder; + String json = Types.forciblyCast(value); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(XContentParserConfiguration.EMPTY, json)) { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new IllegalArgumentException("Expected START_OBJECT but found: " + parser.currentToken()); + } + parser.nextToken(); + TDigestHolder parsed = new TDigestHolder( + TDigestParser.parse( + "serialized_block", + parser, + (a, b) -> new UnsupportedOperationException("failed parsing tdigest"), + (x, y, z) -> new UnsupportedOperationException("failed parsing tdigest") + ) + ); + if (parsed == null) { + tDigestBlockBuilder.appendNull(); + } else { + tDigestBlockBuilder.append(parsed); + } + } catch (UnsupportedOperationException | IOException e) { + fail("Unable to parse TDigestBlockBuilder: " + e.getMessage()); + } + } } } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java index 4059ad709d0f5..479c3e20de1e8 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/conditional/CaseTests.java @@ -66,7 +66,7 @@ public class CaseTests extends AbstractScalarFunctionTestCase { if (Build.current().isSnapshot()) { t.addAll( DataType.UNDER_CONSTRUCTION.stream() - .filter(type -> type != DataType.AGGREGATE_METRIC_DOUBLE && type != DataType.DENSE_VECTOR) + .filter(type -> type != DataType.AGGREGATE_METRIC_DOUBLE && type != DataType.DENSE_VECTOR && type != DataType.TDIGEST) .toList() ); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/IsNullTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/IsNullTests.java index bfa541b7e3ff6..4a6c4fad419ae 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/IsNullTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/nulls/IsNullTests.java @@ -42,6 +42,9 @@ public static Iterable parameters() { if (type.supportedVersion().supportedLocally() == false) { continue; } + if (type == DataType.TDIGEST) { + continue; + } if (type != DataType.NULL) { suppliers.add( new TestCaseSupplier( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java index e71b4defdc403..95d0fbf3d727d 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plan/physical/AbstractPhysicalPlanSerializationTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.compute.data.AggregateMetricDoubleBlockBuilder; +import org.elasticsearch.compute.data.TDigestHolder; import org.elasticsearch.search.SearchModule; import org.elasticsearch.xpack.esql.WriteableExponentialHistogram; import org.elasticsearch.xpack.esql.core.tree.Node; @@ -58,6 +59,7 @@ protected final NamedWriteableRegistry getNamedWriteableRegistry() { entries.add(AggregateMetricDoubleBlockBuilder.AggregateMetricDoubleLiteral.ENTRY); entries.add(LookupJoinExec.ENTRY); entries.add(WriteableExponentialHistogram.ENTRY); + entries.add(TDigestHolder.ENTRY); return new NamedWriteableRegistry(entries); }