Skip to content
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
915a4ce
add the new data type
not-napoleon Dec 4, 2025
fac3a49
match values in CSV test
not-napoleon Dec 4, 2025
d5c6a8b
fix Query Response Tests
not-napoleon Dec 5, 2025
1b6c3a2
ignore some tests I don't expect to be supported yet
not-napoleon Dec 5, 2025
975d727
make TDigestHolder writeable. Required for ShowExecSerializationTests
not-napoleon Dec 5, 2025
bfe28e8
transport version? I assume?
not-napoleon Dec 5, 2025
5fd101f
spotless apply
not-napoleon Dec 5, 2025
af9d854
[CI] Auto commit changes from spotless
Dec 5, 2025
7f2111d
Merge remote-tracking branch 'refs/remotes/not-napoleon/tdigest-data-…
not-napoleon Dec 5, 2025
18ea5c7
Merge branch 'main' into tdigest-data-type
not-napoleon Dec 5, 2025
4251c6b
update capability name
not-napoleon Dec 5, 2025
a0e679a
[CI] Auto commit changes from spotless
Dec 5, 2025
8864285
Merge branch 'main' into tdigest-data-type
not-napoleon Dec 8, 2025
f4bc63a
transport version? I assume?
not-napoleon Dec 8, 2025
9f25ded
Merge remote-tracking branch 'refs/remotes/not-napoleon/tdigest-data-…
not-napoleon Dec 8, 2025
f91bf9e
fix bad merge
not-napoleon Dec 8, 2025
f89d1b2
[CI] Auto commit changes from spotless
Dec 8, 2025
373a4e3
skip CCQ BWC Tests, hopefully
not-napoleon Dec 8, 2025
740882c
ignore tdigest from suggested cast tests
not-napoleon Dec 8, 2025
fed56b6
Merge remote-tracking branch 'refs/remotes/not-napoleon/tdigest-data-…
not-napoleon Dec 8, 2025
57a33b7
Merge branch 'main' into tdigest-data-type
not-napoleon Dec 8, 2025
ed91eba
[CI] Auto commit changes from spotless
Dec 8, 2025
72f3157
response to PR feedback
not-napoleon Dec 9, 2025
3125a22
Mark TDigest type as not supported for joins
not-napoleon Dec 9, 2025
577406f
Merge remote-tracking branch 'refs/remotes/not-napoleon/tdigest-data-…
not-napoleon Dec 9, 2025
82837e8
One more spot
not-napoleon Dec 9, 2025
3090313
Merge branch 'main' into tdigest-data-type
not-napoleon Dec 9, 2025
b72eb65
Merge branch 'main' into tdigest-data-type
not-napoleon Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9236000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
inference_ccm_enablement_service,9235000
esql_serializeable_tdigest,9236000
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ public class TDigestFieldMapper extends FieldMapper {

public static final String CENTROIDS_NAME = "centroids";
public static final String COUNTS_NAME = "counts";
public static final String SUM_FIELD_NAME = "sum";
public static final String MIN_FIELD_NAME = "min";
public static final String MAX_FIELD_NAME = "max";

public static final String CONTENT_TYPE = "tdigest";

private static TDigestFieldMapper toType(FieldMapper in) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,13 @@ public enum DataType implements Writeable {
.underConstruction(DataTypesTransportVersions.RESOLVE_FIELDS_RESPONSE_USED_TV)
),

/*
TDIGEST(
builder().esType("exponential_histogram")
builder().esType("tdigest")
.estimatedSize(16 * 160)// guess 160 buckets (OTEL default for positive values only histograms) with 16 bytes per bucket
.docValues()
.underConstruction()
),
.underConstruction(DataTypesTransportVersions.ESQL_SERIALIZEABLE_TDIGEST)

*/
),

/**
* Fields with this type are dense vectors, represented as an array of float values.
Expand Down Expand Up @@ -1048,5 +1046,8 @@ public static class DataTypesTransportVersions {
public static final TransportVersion RESOLVE_FIELDS_RESPONSE_USED_TV = TransportVersion.fromName(
"esql_resolve_fields_response_used"
);

private static final TransportVersion ESQL_SERIALIZEABLE_TDIGEST = TransportVersion.fromName("esql_serializeable_tdigest");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,20 @@
package org.elasticsearch.compute.data;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.GenericNamedWriteable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tdigest.parsing.TDigestParser;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -19,8 +30,19 @@
* {@link org.elasticsearch.search.aggregations.metrics.TDigestState} in classic aggregations, which we are not using directly because
* the serialization format is pretty bad for ESQL's use case (specifically, encoding the near-constant compression and merge strategy
* data inline as opposed to in a dedicated column isn't great).
*
* This is writable to support ESQL literals of this type, even though those should not exist. Literal support, and thus a writeable
* object here, are required for ESQL testing. See for example ShowExecSerializationTest.
*/
public class TDigestHolder {
public class TDigestHolder implements GenericNamedWriteable {

private static final TransportVersion ESQL_SERIALIZEABLE_TDIGEST = TransportVersion.fromName("esql_serializeable_tdigest");

public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
GenericNamedWriteable.class,
"TDigestHolder",
TDigestHolder::new
);

private final double min;
private final double max;
Expand All @@ -37,11 +59,24 @@ public TDigestHolder(BytesRef encodedDigest, double min, double max, double sum,
this.valueCount = valueCount;
}

// TODO: Probably TDigestHolder and ParsedTDigest should be the same object
public TDigestHolder(TDigestParser.ParsedTDigest parsed) throws IOException {
this(parsed.centroids(), parsed.counts(), parsed.min(), parsed.max(), parsed.sum(), parsed.count());
}

public TDigestHolder(List<Double> centroids, List<Long> counts, double min, double max, double sum, long valueCount)
throws IOException {
this(encodeCentroidsAndCounts(centroids, counts), min, max, sum, valueCount);
}

public TDigestHolder(StreamInput in) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually place readFrom and writeTo next to each other so they can be easily scanned and compared.

this.encodedDigest = in.readBytesRef();
this.min = in.readDouble();
this.max = in.readDouble();
this.sum = in.readDouble();
this.valueCount = in.readVLong();
}

@Override
public boolean equals(Object o) {
if ((o instanceof TDigestHolder that)) {
Expand Down Expand Up @@ -98,4 +133,67 @@ public long getValueCount() {
return valueCount;
}

@Override
public String toString() {
// TODO: this is largely duplicated from TDigestFieldMapepr's synthetic source support, and we should refactor all of that.
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
builder.startObject();

if (Double.isNaN(this.getMin()) == false) {
builder.field("min", this.getMin());
}
if (Double.isNaN(this.getMax()) == false) {
builder.field("max", this.getMax());
}
if (Double.isNaN(this.getSum()) == false) {
builder.field("sum", this.getSum());
}

// TODO: Would be nice to wrap all of this in reusable objects and minimize allocations here
ByteArrayStreamInput values = new ByteArrayStreamInput();
values.reset(encodedDigest.bytes, encodedDigest.offset, encodedDigest.length);
List<Double> centroids = new ArrayList<>();
List<Long> counts = new ArrayList<>();
while (values.available() > 0) {
counts.add(values.readVLong());
centroids.add(values.readDouble());
}

// TODO: reuse the constans from the field type
builder.startArray("centroids");
for (Double centroid : centroids) {
builder.value(centroid.doubleValue());
}
builder.endArray();

builder.startArray("counts");
for (Long count : counts) {
builder.value(count.longValue());
}
builder.endArray();
builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
throw new IllegalStateException("error rendering TDigest", e);
}
}

@Override
public String getWriteableName() {
return "TDigestHolder";
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return ESQL_SERIALIZEABLE_TDIGEST;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesRef(encodedDigest);
out.writeDouble(min);
out.writeDouble(max);
out.writeDouble(sum);
out.writeVLong(valueCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ public static ExponentialHistogram randomExponentialHistogram() {
}

public static TDigestHolder randomTDigest() {
// TODO: This is mostly copied from TDigestFieldMapperTests; refactor it.
// TODO: This is mostly copied from TDigestFieldMapperTests and EsqlTestUtils; refactor it.
int size = between(1, 100);
// Note - we use TDigestState to build an actual t-digest for realistic values here
TDigestState digest = TDigestState.createWithoutCircuitBreaking(100);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,10 @@ protected boolean supportsExponentialHistograms() {
@Override
protected boolean supportsTDigestField() {
try {
return RestEsqlTestCase.hasCapabilities(
client(),
List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName())
)
return RestEsqlTestCase.hasCapabilities(client(), List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName()))
&& RestEsqlTestCase.hasCapabilities(
remoteClusterClient(),
List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName())
List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName())
);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ protected boolean supportsExponentialHistograms() {

@Override
protected boolean supportsTDigestField() {
return RestEsqlTestCase.hasCapabilities(
client(),
List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName())
);
return RestEsqlTestCase.hasCapabilities(client(), List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName()));
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,7 @@ protected boolean supportsExponentialHistograms() {
}

protected boolean supportsTDigestField() {
return RestEsqlTestCase.hasCapabilities(
client(),
List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY.capabilityName())
);
return RestEsqlTestCase.hasCapabilities(client(), List.of(EsqlCapabilities.Cap.TDIGEST_FIELD_TYPE_SUPPORT_V1.capabilityName()));
}

protected void doTest() throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.data.TDigestHolder;
import org.elasticsearch.compute.lucene.DataPartitioning;
import org.elasticsearch.core.PathUtils;
import org.elasticsearch.core.SuppressForbidden;
Expand All @@ -64,8 +65,10 @@
import org.elasticsearch.logging.Logger;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.search.aggregations.metrics.TDigestState;
import org.elasticsearch.search.crossproject.CrossProjectModeDecider;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tdigest.Centroid;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TransportVersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -184,13 +187,15 @@
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.test.ESTestCase.assertEquals;
import static org.elasticsearch.test.ESTestCase.between;
import static org.elasticsearch.test.ESTestCase.fail;
import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength;
import static org.elasticsearch.test.ESTestCase.randomArray;
import static org.elasticsearch.test.ESTestCase.randomBoolean;
import static org.elasticsearch.test.ESTestCase.randomByte;
import static org.elasticsearch.test.ESTestCase.randomDouble;
import static org.elasticsearch.test.ESTestCase.randomFloat;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomGaussianDouble;
import static org.elasticsearch.test.ESTestCase.randomIdentifier;
import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
Expand Down Expand Up @@ -1068,6 +1073,7 @@ public static Literal randomLiteral(DataType type) {
case UNSUPPORTED, OBJECT, DOC_DATA_TYPE -> throw new IllegalArgumentException(
"can't make random values for [" + type.typeName() + "]"
);
case TDIGEST -> EsqlTestUtils.randomTDigest();
}, type);
}

Expand Down Expand Up @@ -1106,6 +1112,39 @@ public static ExponentialHistogram randomExponentialHistogram() {
return new WriteableExponentialHistogram(histo);
}

public static TDigestHolder randomTDigest() {
// TODO: This is mostly copied from TDigestFieldMapperTests and BlockTestUtils; refactor it.
int size = between(1, 100);
// Note - we use TDigestState to build an actual t-digest for realistic values here
TDigestState digest = TDigestState.createWithoutCircuitBreaking(100);
for (int i = 0; i < size; i++) {
double sample = randomGaussianDouble();
int count = randomIntBetween(1, Integer.MAX_VALUE);
digest.add(sample, count);
}
List<Double> centroids = new ArrayList<>();
List<Long> counts = new ArrayList<>();
double sum = 0.0;
long valueCount = 0L;
for (Centroid c : digest.centroids()) {
centroids.add(c.mean());
counts.add(c.count());
sum += c.mean() * c.count();
valueCount += c.count();
}
double min = digest.getMin();
double max = digest.getMax();

TDigestHolder returnValue = null;
try {
returnValue = new TDigestHolder(centroids, counts, min, max, sum, valueCount);
} catch (IOException e) {
// This is a test util, so we're just going to fail the test here
fail(e);
}
return returnValue;
}

static Version randomVersion() {
// TODO degenerate versions and stuff
return switch (between(0, 2)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
Make sure we can even load tdigest data
required_capability: tdigest_field_type_basic_functionality
required_capability: tdigest_field_type_support_v1

FROM tdigest_standard_index | KEEP @timestamp,instance;

@timestamp:date | instance:keyword
2025-01-01T00:00:00Z | hand-rolled
FROM tdigest_standard_index ;

@timestamp:date | instance:keyword | responseTime:tdigest
2025-01-01T00:00:00Z | hand-rolled | "{""min"": 0.1, ""max"": 0.5, ""sum"": 16.4, ""centroids"":[0.1,0.2,0.3,0.4,0.5], ""counts"":[3,7,23,12,6]}"
;
Original file line number Diff line number Diff line change
Expand Up @@ -1583,7 +1583,7 @@ public enum Cap {
*/
EXPONENTIAL_HISTOGRAM_PRE_TECH_PREVIEW_V8(EXPONENTIAL_HISTOGRAM_FEATURE_FLAG),

TDIGEST_FIELD_TYPE_BASIC_FUNCTIONALITY(T_DIGEST_ESQL_SUPPORT),
TDIGEST_FIELD_TYPE_SUPPORT_V1(T_DIGEST_ESQL_SUPPORT),

/**
* Create new block when filtering OrdinalBytesRefBlock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.compute.data.FloatBlock;
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.TDigestBlock;
import org.elasticsearch.exponentialhistogram.ExponentialHistogram;
import org.elasticsearch.exponentialhistogram.ExponentialHistogramXContent;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
Expand All @@ -38,6 +39,7 @@
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.ipToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.nanoTimeToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.spatialToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.tDigestBlockToString;
import static org.elasticsearch.xpack.esql.type.EsqlDataTypeConverter.versionToString;

public abstract class PositionToXContent {
Expand Down Expand Up @@ -172,6 +174,12 @@ protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Pa
return builder.value(aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, valueIndex));
}
};
case TDIGEST -> new PositionToXContent(block) {
protected XContentBuilder valueToXContent(XContentBuilder builder, ToXContent.Params params, int valueIndex)
throws IOException {
return builder.value(tDigestBlockToString((TDigestBlock) block, valueIndex));
}
};
case EXPONENTIAL_HISTOGRAM -> new PositionToXContent(block) {

ExponentialHistogramScratch scratch = new ExponentialHistogramScratch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.compute.data.IntBlock;
import org.elasticsearch.compute.data.LongBlock;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.data.TDigestBlock;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
Expand Down Expand Up @@ -142,6 +143,7 @@ private static Object valueAt(DataType dataType, Block block, int offset, BytesR
case GEOHEX, GEOHASH, GEOTILE -> geoGridToString(((LongBlock) block).getLong(offset), dataType);
case AGGREGATE_METRIC_DOUBLE -> aggregateMetricDoubleBlockToString((AggregateMetricDoubleBlock) block, offset);
case EXPONENTIAL_HISTOGRAM -> exponentialHistogramBlockToString((ExponentialHistogramBlock) block, offset);
case TDIGEST -> ((TDigestBlock) block).getTDigestHolder(offset);
case UNSUPPORTED -> (String) null;
case SOURCE -> {
BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) {
case EXPONENTIAL_HISTOGRAM -> CoalesceExponentialHistogramEvaluator.toEvaluator(toEvaluator, children());
case NULL -> EvalOperator.CONSTANT_NULL_FACTORY;
case UNSUPPORTED, SHORT, BYTE, DATE_PERIOD, OBJECT, DOC_DATA_TYPE, SOURCE, TIME_DURATION, FLOAT, HALF_FLOAT, TSID_DATA_TYPE,
SCALED_FLOAT, AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR -> throw new UnsupportedOperationException(
SCALED_FLOAT, AGGREGATE_METRIC_DOUBLE, TDIGEST, DENSE_VECTOR -> throw new UnsupportedOperationException(
dataType() + " can’t be coalesced"
);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerConte
case BOOLEAN, NULL, BYTE, SHORT, INTEGER, LONG, DOUBLE, FLOAT, HALF_FLOAT, DATETIME, DATE_NANOS, DATE_PERIOD, TIME_DURATION,
OBJECT, SCALED_FLOAT, UNSIGNED_LONG -> TopNEncoder.DEFAULT_SORTABLE;
case GEO_POINT, CARTESIAN_POINT, GEO_SHAPE, CARTESIAN_SHAPE, COUNTER_LONG, COUNTER_INTEGER, COUNTER_DOUBLE, SOURCE,
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM, TSID_DATA_TYPE ->
AGGREGATE_METRIC_DOUBLE, DENSE_VECTOR, GEOHASH, GEOTILE, GEOHEX, EXPONENTIAL_HISTOGRAM, TDIGEST, TSID_DATA_TYPE ->
TopNEncoder.DEFAULT_UNSORTABLE;
// unsupported fields are encoded as BytesRef, we'll use the same encoder; all values should be null at this point
case UNSUPPORTED -> TopNEncoder.UNSUPPORTED;
Expand Down
Loading