From 0bd4186e347e069179e2848a0cc565d0e827de41 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Thu, 20 Nov 2025 11:08:20 -0500 Subject: [PATCH 1/2] Added new scan executor metric --- .../core/metrics/MetricsProducer.java | 1 + .../metrics/TabletServerScanMetrics.java | 26 +++++++++++++++++++ .../accumulo/tserver/scan/NextBatchTask.java | 15 +++++++++++ 3 files changed, 42 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index 7ce7a023df6..c79a29d7d9b 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -725,6 +725,7 @@ public interface MetricsProducer { String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + "query.scanned.entries"; String METRICS_SCAN_ZOMBIE_THREADS = METRICS_SCAN_PREFIX + "zombie.threads"; String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + "tablet.metadata.cache"; + String METRICS_SCAN_EXECUTOR_EXCEPTIONS = METRICS_SCAN_PREFIX + "executor.exceptions"; String METRICS_TSERVER_PREFIX = "accumulo.tserver."; String METRICS_TSERVER_ENTRIES = METRICS_TSERVER_PREFIX + "entries"; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index 36fbd42de11..36a05a96645 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver.metrics; import java.time.Duration; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.IntSupplier; @@ -28,6 +29,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.FunctionCounter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; @@ -47,6 +49,9 @@ public class TabletServerScanMetrics implements MetricsProducer { private final LongAdder queryResultCount = new LongAdder(); private final LongAdder queryResultBytes = new LongAdder(); private final LongAdder scannedCount = new LongAdder(); + private final ConcurrentHashMap executorExceptionCounts = + new ConcurrentHashMap<>(); + private volatile MeterRegistry registry = null; public void incrementLookupCount(long amount) { this.lookupCount.add(amount); @@ -124,8 +129,28 @@ public TabletServerScanMetrics(IntSupplier openFileSupplier) { openFiles = openFileSupplier; } + public void incrementExecutorExceptions(String executorName) { + executorExceptionCounts.computeIfAbsent(executorName, k -> { + AtomicLong counter = new AtomicLong(0); + // Register the counter if the registry is already available + if (registry != null) { + registerExecutorExceptionCounter(executorName, counter); + } + return counter; + }).incrementAndGet(); + } + + private void registerExecutorExceptionCounter(String executorName, AtomicLong counter) { + FunctionCounter.builder(METRICS_SCAN_EXECUTOR_EXCEPTIONS, counter, AtomicLong::get) + .tags("executor", executorName) + .description( + "Number of exceptions thrown from the iterator stack during scan execution, tagged by executor name") + .register(registry); + } + @Override public void registerMetrics(MeterRegistry registry) { + this.registry = registry; Gauge.builder(METRICS_SCAN_OPEN_FILES, openFiles::getAsInt) .description("Number of files open for scans").register(registry); scans = Timer.builder(METRICS_SCAN_TIMES).description("Scans").register(registry); @@ -156,6 +181,7 @@ public void registerMetrics(MeterRegistry registry) { Gauge.builder(METRICS_SCAN_ZOMBIE_THREADS, this, TabletServerScanMetrics::getZombieThreadsCount) .description("Number of scan threads that have no associated client session") .register(registry); + executorExceptionCounts.forEach(this::registerExecutorExceptionCounter); } } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index 96251bd59b0..f02f48453f8 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; +import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.session.SingleScanSession; @@ -48,6 +49,18 @@ public NextBatchTask(TabletHostingServer server, long scanID, AtomicBoolean inte } } + private void recordException(SingleScanSession scanSession) { + if (scanSession != null && server.getScanMetrics() != null) { + String executorName = getExecutorName(scanSession); + server.getScanMetrics().incrementExecutorExceptions(executorName); + } + } + + private String getExecutorName(SingleScanSession scanSession) { + String executorName = scanSession.getExecutionHints().get("scan_type"); + return executorName != null ? executorName : SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME; + } + @Override public void run() { @@ -93,10 +106,12 @@ public void run() { addResult(iie); } } catch (TooManyFilesException | SampleNotPresentException e) { + recordException(scanSession); addResult(e); } catch (IOException | RuntimeException e) { log.warn("exception while scanning tablet {} for {}", scanSession.extent, scanSession.client, e); + recordException(scanSession); addResult(e); } finally { transitionFromRunning(); From a1488bb0af9c8c5ad885177ee873f6ba75238fb6 Mon Sep 17 00:00:00 2001 From: arbaazkhan1 Date: Thu, 11 Dec 2025 11:19:57 -0500 Subject: [PATCH 2/2] added new ScanExecutorExceptionsIT --- .../core/metrics/MetricsProducer.java | 9 +- .../metrics/TabletServerScanMetrics.java | 2 +- .../accumulo/tserver/scan/LookupTask.java | 13 ++ .../accumulo/tserver/scan/NextBatchTask.java | 4 +- .../accumulo/test/metrics/MetricsIT.java | 3 +- .../metrics/ScanExecutorExceptionsIT.java | 175 ++++++++++++++++++ 6 files changed, 200 insertions(+), 6 deletions(-) create mode 100644 test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java index c79a29d7d9b..cb380fd993d 100644 --- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java +++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java @@ -418,6 +418,13 @@ * Cache * scan server tablet cache metrics * + * + * N/A + * N/A + * {@value #METRICS_SCAN_EXCEPTIONS} + * Counter + * Count the number of exceptions that occur within scan executors + * * * * scan @@ -725,7 +732,7 @@ public interface MetricsProducer { String METRICS_SCAN_SCANNED_ENTRIES = METRICS_SCAN_PREFIX + "query.scanned.entries"; String METRICS_SCAN_ZOMBIE_THREADS = METRICS_SCAN_PREFIX + "zombie.threads"; String METRICS_SCAN_TABLET_METADATA_CACHE = METRICS_SCAN_PREFIX + "tablet.metadata.cache"; - String METRICS_SCAN_EXECUTOR_EXCEPTIONS = METRICS_SCAN_PREFIX + "executor.exceptions"; + String METRICS_SCAN_EXCEPTIONS = METRICS_SCAN_PREFIX + "exceptions"; String METRICS_TSERVER_PREFIX = "accumulo.tserver."; String METRICS_TSERVER_ENTRIES = METRICS_TSERVER_PREFIX + "entries"; diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java index 36a05a96645..c45480afe9d 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/metrics/TabletServerScanMetrics.java @@ -141,7 +141,7 @@ public void incrementExecutorExceptions(String executorName) { } private void registerExecutorExceptionCounter(String executorName, AtomicLong counter) { - FunctionCounter.builder(METRICS_SCAN_EXECUTOR_EXCEPTIONS, counter, AtomicLong::get) + FunctionCounter.builder(METRICS_SCAN_EXCEPTIONS, counter, AtomicLong::get) .tags("executor", executorName) .description( "Number of exceptions thrown from the iterator stack during scan execution, tagged by executor name") diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java index 8eb15ea6bb4..750661182fe 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/LookupTask.java @@ -59,6 +59,17 @@ public LookupTask(TabletHostingServer server, long scanID) { this.scanID = scanID; } + private void recordException(MultiScanSession scanSession) { + if (scanSession != null && server.getScanMetrics() != null) { + String executorName = getExecutorName(scanSession); + server.getScanMetrics().incrementExecutorExceptions(executorName); + } + } + + private String getExecutorName(MultiScanSession scanSession) { + return scanSession.scanParams.getScanDispatch().getExecutorName(); + } + @Override public void run() { MultiScanSession session = (MultiScanSession) server.getSession(scanID); @@ -179,9 +190,11 @@ public void run() { addResult(iie); } } catch (SampleNotPresentException e) { + recordException(session); addResult(e); } catch (Exception e) { log.warn("exception while doing multi-scan ", e); + recordException(session); addResult(e); } finally { transitionFromRunning(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java index f02f48453f8..5707d714da0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/scan/NextBatchTask.java @@ -23,7 +23,6 @@ import org.apache.accumulo.core.client.SampleNotPresentException; import org.apache.accumulo.core.iteratorsImpl.system.IterationInterruptedException; -import org.apache.accumulo.core.spi.scan.SimpleScanDispatcher; import org.apache.accumulo.server.fs.TooManyFilesException; import org.apache.accumulo.tserver.TabletHostingServer; import org.apache.accumulo.tserver.session.SingleScanSession; @@ -57,8 +56,7 @@ private void recordException(SingleScanSession scanSession) { } private String getExecutorName(SingleScanSession scanSession) { - String executorName = scanSession.getExecutionHints().get("scan_type"); - return executorName != null ? executorName : SimpleScanDispatcher.DEFAULT_SCAN_EXECUTOR_NAME; + return scanSession.scanParams.getScanDispatch().getExecutorName(); } @Override diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java index 026f392a010..b82640d998d 100644 --- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java @@ -120,7 +120,8 @@ public void confirmMetricsPublished() throws Exception { METRICS_SCAN_RESERVATION_TOTAL_TIMER, METRICS_SCAN_RESERVATION_WRITEOUT_TIMER, METRICS_SCAN_TABLET_METADATA_CACHE, - METRICS_SERVER_IDLE); + METRICS_SERVER_IDLE, + METRICS_SCAN_EXCEPTIONS); // @formatter:on Map expectedMetricNames = this.getMetricFields(); diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java b/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java new file mode 100644 index 00000000000..f681982baf8 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/metrics/ScanExecutorExceptionsIT.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.metrics; + +import static org.apache.accumulo.core.metrics.MetricsProducer.METRICS_SCAN_EXCEPTIONS; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Duration; +import java.util.EnumSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.functional.BadIterator; +import org.apache.accumulo.test.functional.ConfigurableMacBase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ScanExecutorExceptionsIT extends ConfigurableMacBase { + + private static final Logger log = LoggerFactory.getLogger(ScanExecutorExceptionsIT.class); + private static TestStatsDSink sink; + private static final AtomicLong exceptionCount = new AtomicLong(0); + + @Override + protected Duration defaultTimeout() { + return Duration.ofMinutes(5); + } + + @BeforeAll + public static void setup() throws Exception { + sink = new TestStatsDSink(); + } + + @AfterAll + public static void teardown() throws Exception { + if (sink != null) { + sink.close(); + } + } + + @Override + protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) { + cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true"); + cfg.setProperty("general.custom.metrics. opts. logging.step", "5s"); + String clazzList = LoggingMeterRegistryFactory.class.getName() + "," + + TestStatsDRegistryFactory.class.getName(); + cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList); + Map sysProps = Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1", + TestStatsDRegistryFactory.SERVER_PORT, Integer.toString(sink.getPort())); + cfg.setSystemProperties(sysProps); + } + + @Test + public void testScanExecutorExceptions() throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + + String tableName = getUniqueNames(1)[0]; + log.info("Creating table: {}", tableName); + + // Create table with BadIterator configured for scan scope + NewTableConfiguration ntc = new NewTableConfiguration(); + IteratorSetting badIterSetting = new IteratorSetting(50, "bad", BadIterator.class); + ntc.attachIterator(badIterSetting, EnumSet.of(IteratorUtil.IteratorScope.scan)); + client.tableOperations().create(tableName, ntc); + + try (BatchWriter writer = client.createBatchWriter(tableName)) { + for (int i = 0; i < 10; i++) { + Mutation m = new Mutation(new Text("row" + i)); + m.put("cf", "cq", "value" + i); + writer.addMutation(m); + } + } + + client.tableOperations().flush(tableName, null, null, true); + + log.info("Performing scans to trigger exceptions.. ."); + + for (int i = 0; i < 3; i++) { + performScanWithException(client, tableName); + } + + for (int i = 0; i < 3; i++) { + performBatchScanWithException(client, tableName); + } + + log.info("Waiting for metrics to be published..."); + Thread.sleep(15_000); + log.info("Collecting metrics from sink..."); + List statsDMetrics = sink.getLines(); + log.info("Received {} metric lines from sink", statsDMetrics.size()); + exceptionCount.set(0); + + int metricsFound = 0; + for (String line : statsDMetrics) { + if (line.startsWith(METRICS_SCAN_EXCEPTIONS)) { + metricsFound++; + TestStatsDSink.Metric metric = TestStatsDSink.parseStatsDMetric(line); + log.info("Found scan exception metric: {}", metric); + String executor = metric.getTags().get("executor"); + if (executor != null) { + long val = Long.parseLong(metric.getValue()); + exceptionCount.accumulateAndGet(val, Math::max); + log.info("Recorded exception count for executor '{}': {}", executor, val); + } + } + } + + log.info("Found {} scan exception metrics total", metricsFound); + long finalCount = exceptionCount.get(); + log.info("Final exception count: {}", finalCount); + + assertTrue(finalCount > 0, "Should have tracked exceptions, but count was: " + finalCount); + } + } + + private void performScanWithException(AccumuloClient client, String table) { + try (Scanner scanner = client.createScanner(table, Authorizations.EMPTY)) { + // Set a timeout to avoid hanging forever + scanner.setTimeout(5, java.util.concurrent.TimeUnit.SECONDS); + scanner.forEach((k, v) -> { + // This should never be reached because the iterator throws an exception + }); + } catch (Exception e) { + log.debug("Expected exception from regular scan on table {}: {}", table, e.getMessage()); + } + } + + private void performBatchScanWithException(AccumuloClient client, String table) { + try (BatchScanner batchScanner = client.createBatchScanner(table, Authorizations.EMPTY, 2)) { + batchScanner.setTimeout(5, java.util.concurrent.TimeUnit.SECONDS); + batchScanner.setRanges(List.of(new Range("row0", "row5"), new Range("row6", "row9"))); + batchScanner.forEach((k, v) -> { + // This should never be reached because the iterator throws an exception + }); + } catch (Exception e) { + log.debug("Expected exception from batch scan on table {}: {}", table, e.getMessage()); + } + } +}