diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index 7ce7a023df6..cb380fd993d 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -418,6 +418,13 @@
*
Cache |
* scan server tablet cache metrics |
*
+ *
+ * | N/A |
+ * N/A |
+ * {@value #METRICS_SCAN_EXCEPTIONS} |
+ * Counter |
+ * Count the number of exceptions that occur within scan executors |
+ *
*
*
* | scan |
@@ -725,6 +732,7 @@ public interface MetricsProducer {
String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + "query.scanned.entries";
String METRICS_SCAN_ZOMBIE_THREADS = METRICS_SCAN_PREFIX + "zombie.threads";
String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + "tablet.metadata.cache";
+ String METRICS_SCAN_EXCEPTIONS = METRICS_SCAN_PREFIX + "exceptions";
String METRICS_TSERVER_PREFIX = "accumulo.tserver.";
String METRICS_TSERVER_ENTRIES = METRICS_TSERVER_PREFIX + "entries";
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
index 36fbd42de11..c45480afe9d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.tserver.metrics;
import java.time.Duration;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.IntSupplier;
@@ -28,6 +29,7 @@
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
+import io.micrometer.core.instrument.FunctionCounter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
@@ -47,6 +49,9 @@ public class TabletServerScanMetrics implements MetricsProducer {
private final LongAdder queryResultCount = new LongAdder();
private final LongAdder queryResultBytes = new LongAdder();
private final LongAdder scannedCount = new LongAdder();
+ private final ConcurrentHashMap executorExceptionCounts =
+ new ConcurrentHashMap<>();
+ private volatile MeterRegistry registry = null;
public void incrementLookupCount(long amount) {
this.lookupCount.add(amount);
@@ -124,8 +129,28 @@ public TabletServerScanMetrics(IntSupplier openFileSupplier) {
openFiles = openFileSupplier;
}
+ public void incrementExecutorExceptions(String executorName) {
+ executorExceptionCounts.computeIfAbsent(executorName, k -> {
+ AtomicLong counter = new AtomicLong(0);
+ // Register the counter if the registry is already available
+ if (registry != null) {
+ registerExecutorExceptionCounter(executorName, counter);
+ }
+ return counter;
+ }).incrementAndGet();
+ }
+
+ private void registerExecutorExceptionCounter(String executorName, AtomicLong counter) {
+ FunctionCounter.builder(METRICS_SCAN_EXCEPTIONS, counter, AtomicLong::get)
+ .tags("executor", executorName)
+ .description(
+ "Number of exceptions thrown from the iterator stack during scan execution, tagged by executor name")
+ .register(registry);
+ }
+
@Override
public void registerMetrics(MeterRegistry registry) {
+ this.registry = registry;
Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::getAsInt)
.description("Number of files open for scans").register(registry);
scans = Timer.builder(METRICS_SCAN_TIMES).description("Scans").register(registry);
@@ -156,6 +181,7 @@ public void registerMetrics(MeterRegistry registry) {
Gauge.builder(METRICS_SCAN_ZOMBIE_THREADS, this, TabletServerScanMetrics::getZombieThreadsCount)
.description("Number of scan threads that have no associated client session")
.register(registry);
+ executorExceptionCounts.forEach(this::registerExecutorExceptionCounter);
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
index 8eb15ea6bb4..750661182fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java
@@ -59,6 +59,17 @@ public LookupTask(TabletHostingServer server, long scanID) {
this.scanID = scanID;
}
+ private void recordException(MultiScanSession scanSession) {
+ if (scanSession != null && server.getScanMetrics() != null) {
+ String executorName = getExecutorName(scanSession);
+ server.getScanMetrics().incrementExecutorExceptions(executorName);
+ }
+ }
+
+ private String getExecutorName(MultiScanSession scanSession) {
+ return scanSession.scanParams.getScanDispatch().getExecutorName();
+ }
+
@Override
public void run() {
MultiScanSession session = (MultiScanSession) server.getSession(scanID);
@@ -179,9 +190,11 @@ public void run() {
addResult(iie);
}
} catch (SampleNotPresentException e) {
+ recordException(session);
addResult(e);
} catch (Exception e) {
log.warn("exception while doing multi-scan ", e);
+ recordException(session);
addResult(e);
} finally {
transitionFromRunning();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
index 96251bd59b0..5707d714da0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java
@@ -48,6 +48,17 @@ public NextBatchTask(TabletHostingServer server, long scanID, AtomicBoolean inte
}
}
+ private void recordException(SingleScanSession scanSession) {
+ if (scanSession != null && server.getScanMetrics() != null) {
+ String executorName = getExecutorName(scanSession);
+ server.getScanMetrics().incrementExecutorExceptions(executorName);
+ }
+ }
+
+ private String getExecutorName(SingleScanSession scanSession) {
+ return scanSession.scanParams.getScanDispatch().getExecutorName();
+ }
+
@Override
public void run() {
@@ -93,10 +104,12 @@ public void run() {
addResult(iie);
}
} catch (TooManyFilesException | SampleNotPresentException e) {
+ recordException(scanSession);
addResult(e);
} catch (IOException | RuntimeException e) {
log.warn("exception while scanning tablet {} for {}", scanSession.extent, scanSession.client,
e);
+ recordException(scanSession);
addResult(e);
} finally {
transitionFromRunning();
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index 026f392a010..b82640d998d 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -120,7 +120,8 @@ public void confirmMetricsPublished() throws Exception {
METRICS_SCAN_RESERVATION_TOTAL_TIMER,
METRICS_SCAN_RESERVATION_WRITEOUT_TIMER,
METRICS_SCAN_TABLET_METADATA_CACHE,
- METRICS_SERVER_IDLE);
+ METRICS_SERVER_IDLE,
+ METRICS_SCAN_EXCEPTIONS);
// @formatter:on
Map expectedMetricNames = this.getMetricFields();
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java
new file mode 100644
index 00000000000..f681982baf8
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.metrics;
+
+import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_SCAN_EXCEPTIONS;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Duration;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.IteratorUtil;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.BadIterator;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScanExecutorExceptionsIT extends ConfigurableMacBase {
+
+ private static final Logger log = LoggerFactory.getLogger(ScanExecutorExceptionsIT.class);
+ private static TestStatsDSink sink;
+ private static final AtomicLong exceptionCount = new AtomicLong(0);
+
+ @Override
+ protected Duration defaultTimeout() {
+ return Duration.ofMinutes(5);
+ }
+
+ @BeforeAll
+ public static void setup() throws Exception {
+ sink = new TestStatsDSink();
+ }
+
+ @AfterAll
+ public static void teardown() throws Exception {
+ if (sink != null) {
+ sink.close();
+ }
+ }
+
+ @Override
+ protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+ cfg.setProperty("general.custom.metrics. opts. logging.step", "5s");
+ String clazzList = LoggingMeterRegistryFactory.class.getName() + ","
+ + TestStatsDRegistryFactory.class.getName();
+ cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList);
+ Map sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+ TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort()));
+ cfg.setSystemProperties(sysProps);
+ }
+
+ @Test
+ public void testScanExecutorExceptions() throws Exception {
+ try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
+
+ String tableName = getUniqueNames(1)[0];
+ log.info("Creating table: {}", tableName);
+
+ // Create table with BadIterator configured for scan scope
+ NewTableConfiguration ntc = new NewTableConfiguration();
+ IteratorSetting badIterSetting = new IteratorSetting(50, "bad", BadIterator.class);
+ ntc.attachIterator(badIterSetting, EnumSet.of(IteratorUtil.IteratorScope.scan));
+ client.tableOperations().create(tableName, ntc);
+
+ try (BatchWriter writer = client.createBatchWriter(tableName)) {
+ for (int i = 0; i < 10; i++) {
+ Mutation m = new Mutation(new Text("row" + i));
+ m.put("cf", "cq", "value" + i);
+ writer.addMutation(m);
+ }
+ }
+
+ client.tableOperations().flush(tableName, null, null, true);
+
+ log.info("Performing scans to trigger exceptions.. .");
+
+ for (int i = 0; i < 3; i++) {
+ performScanWithException(client, tableName);
+ }
+
+ for (int i = 0; i < 3; i++) {
+ performBatchScanWithException(client, tableName);
+ }
+
+ log.info("Waiting for metrics to be published...");
+ Thread.sleep(15_000);
+ log.info("Collecting metrics from sink...");
+ List statsDMetrics = sink.getLines();
+ log.info("Received {} metric lines from sink", statsDMetrics.size());
+ exceptionCount.set(0);
+
+ int metricsFound = 0;
+ for (String line : statsDMetrics) {
+ if (line.startsWith(METRICS_SCAN_EXCEPTIONS)) {
+ metricsFound++;
+ TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line);
+ log.info("Found scan exception metric: {}", metric);
+ String executor = metric.getTags().get("executor");
+ if (executor != null) {
+ long val = Long.parseLong(metric.getValue());
+ exceptionCount.accumulateAndGet(val, Math::max);
+ log.info("Recorded exception count for executor '{}': {}", executor, val);
+ }
+ }
+ }
+
+ log.info("Found {} scan exception metrics total", metricsFound);
+ long finalCount = exceptionCount.get();
+ log.info("Final exception count: {}", finalCount);
+
+ assertTrue(finalCount > 0, "Should have tracked exceptions, but count was: " + finalCount);
+ }
+ }
+
+ private void performScanWithException(AccumuloClient client, String table) {
+ try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) {
+ // Set a timeout to avoid hanging forever
+ scanner.setTimeout(5, java.util.concurrent.TimeUnit.SECONDS);
+ scanner.forEach((k, v) -> {
+ // This should never be reached because the iterator throws an exception
+ });
+ } catch (Exception e) {
+ log.debug("Expected exception from regular scan on table {}: {}", table, e.getMessage());
+ }
+ }
+
+ private void performBatchScanWithException(AccumuloClient client, String table) {
+ try (BatchScanner batchScanner = client.createBatchScanner(table, Authorizations.EMPTY, 2)) {
+ batchScanner.setTimeout(5, java.util.concurrent.TimeUnit.SECONDS);
+ batchScanner.setRanges(List.of(new Range("row0", "row5"), new Range("row6", "row9")));
+ batchScanner.forEach((k, v) -> {
+ // This should never be reached because the iterator throws an exception
+ });
+ } catch (Exception e) {
+ log.debug("Expected exception from batch scan on table {}: {}", table, e.getMessage());
+ }
+ }
+}