diff --git a/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java b/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java index d4ff33a37..1c47f867c 100644 --- a/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java +++ b/prometheus-metrics-core/src/main/java/io/prometheus/metrics/core/metrics/Buffer.java @@ -18,7 +18,15 @@ class Buffer { private static final long bufferActiveBit = 1L << 63; - private final AtomicLong observationCount = new AtomicLong(0); + // Tracking observation counts requires an AtomicLong for coordination between recording and + // collecting. AtomicLong does much worse under contention than the LongAdder instances used + // elsewhere to hold aggregated state. To improve, we stripe the AtomicLong into N instances, + // where N is the number of available processors. Each record operation chooses the appropriate + // instance to use based on the modulo of its thread id and N. This is a more naive / simple + // implementation compared to the striping used under the hood in java.util.concurrent classes + // like LongAdder - contention and hot spots can still occur if recording thread ids happen to + // resolve to the same index. Further improvement is possible. + private final AtomicLong[] stripedObservationCounts; private double[] observationBuffer = new double[0]; private int bufferPos = 0; private boolean reset = false; @@ -27,8 +35,17 @@ class Buffer { ReentrantLock runLock = new ReentrantLock(); Condition bufferFilled = appendLock.newCondition(); + Buffer() { + stripedObservationCounts = new AtomicLong[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < stripedObservationCounts.length; i++) { + stripedObservationCounts[i] = new AtomicLong(0); + } + } + boolean append(double value) { - long count = observationCount.incrementAndGet(); + int index = Math.abs((int) Thread.currentThread().getId()) % stripedObservationCounts.length; + AtomicLong observationCountForThread = stripedObservationCounts[index]; + long count = observationCountForThread.incrementAndGet(); if ((count & bufferActiveBit) == 0) { return false; // sign bit not set -> buffer not active. } else { @@ -69,7 +86,10 @@ T run( runLock.lock(); try { // Signal that the buffer is active. - Long expectedCount = observationCount.getAndAdd(bufferActiveBit); + long expectedCount = 0L; + for (AtomicLong observationCount : stripedObservationCounts) { + expectedCount += observationCount.getAndAdd(bufferActiveBit); + } while (!complete.apply(expectedCount)) { // Wait until all in-flight threads have added their observations to the histogram / @@ -81,14 +101,18 @@ T run( result = createResult.get(); // Signal that the buffer is inactive. - int expectedBufferSize; + long expectedBufferSize = 0; if (reset) { - expectedBufferSize = - (int) ((observationCount.getAndSet(0) & ~bufferActiveBit) - expectedCount); + for (AtomicLong observationCount : stripedObservationCounts) { + expectedBufferSize += observationCount.getAndSet(0) & ~bufferActiveBit; + } reset = false; } else { - expectedBufferSize = (int) (observationCount.addAndGet(bufferActiveBit) - expectedCount); + for (AtomicLong observationCount : stripedObservationCounts) { + expectedBufferSize += observationCount.addAndGet(bufferActiveBit); + } } + expectedBufferSize -= expectedCount; appendLock.lock(); try {