From c77d6733da5cc6b8090d8afe5ee3f955ce026324 Mon Sep 17 00:00:00 2001 From: ibilley7 Date: Wed, 23 Jul 2025 16:06:58 -0400 Subject: [PATCH 1/2] Convert properties in FIXED_PROPERTIES to be non-fixed Changed these properties to be non-fixed (made no-fixed by creating a thread pool in the run method of Manager that "refresh" these properties on an interval): * `MANAGER_TABLET_REFRESH_MINTHREADS` * `MANAGER_TABLET_REFRESH_MAXTHREADS` Removed these non-fixed properties that were labeled as fixed: * `MANAGER_MINTHREADS` * `SSERV_MINTHREADS` * 'TSERV_MINTHREADS' * `COMPACTOR_MINTHREADS` All of the other properties in FIXED_PROPERTIES besides these need to be evaluated similarly. Partial work for #5696 --- .../apache/accumulo/core/conf/Property.java | 16 ++++---- .../org/apache/accumulo/manager/Manager.java | 39 +++++++++++++++++++ 2 files changed, 46 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index b63b6a0d9f3..5163074dc0d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1611,16 +1611,15 @@ public static boolean isValidTablePropertyKey(String key) { // MANAGER options MANAGER_THREADCHECK, MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, MANAGER_METADATA_SUSPENDABLE, MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, - MANAGER_CLIENTPORT, MANAGER_MINTHREADS, MANAGER_MINTHREADS_TIMEOUT, - MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, - MANAGER_TABLET_REFRESH_MINTHREADS, MANAGER_TABLET_REFRESH_MAXTHREADS, - MANAGER_TABLET_MERGEABILITY_INTERVAL, MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX, + MANAGER_CLIENTPORT, MANAGER_MINTHREADS_TIMEOUT, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, MANAGER_TABLET_MERGEABILITY_INTERVAL, + MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX, // SSERV options SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT, SSERV_THREADCHECK, SSERV_CLIENTPORT, SSERV_PORTSEARCH, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, SSERV_DEFAULT_BLOCKSIZE, SSERV_SCAN_REFERENCE_EXPIRATION_TIME, - SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS, SSERV_MINTHREADS_TIMEOUT, + SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS_TIMEOUT, SSERV_WAL_SORT_MAX_CONCURRENT, SSERV_GROUP_NAME, // TSERV options @@ -1631,8 +1630,8 @@ public static boolean isValidTablePropertyKey(String key) { TSERV_LOG_BUSY_TABLETS_COUNT, TSERV_LOG_BUSY_TABLETS_INTERVAL, TSERV_WAL_SORT_MAX_CONCURRENT, TSERV_SLOW_FILEPERMIT_MILLIS, TSERV_WAL_BLOCKSIZE, TSERV_CLIENTPORT, TSERV_PORTSEARCH, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, TSERV_DEFAULT_BLOCKSIZE, - TSERV_MINTHREADS, TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, - TSERV_SCAN_MAX_OPENFILES, TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME, + TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, TSERV_SCAN_MAX_OPENFILES, + TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME, // GC options GC_CANDIDATE_BATCH_SIZE, GC_CYCLE_START, GC_PORT, @@ -1643,8 +1642,7 @@ public static boolean isValidTablePropertyKey(String key) { // COMPACTOR options COMPACTOR_CANCEL_CHECK_INTERVAL, COMPACTOR_CLIENTPORT, COMPACTOR_THREADCHECK, - COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS, COMPACTOR_MINTHREADS_TIMEOUT, - COMPACTOR_GROUP_NAME, + COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS_TIMEOUT, COMPACTOR_GROUP_NAME, // COMPACTION_COORDINATOR options COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a972b4e13b5..9f00ae7a6f7 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -50,6 +50,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -262,6 +263,7 @@ void setTserverStatus(LiveTServersSnapshot snapshot, private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; private ThreadPoolExecutor tabletRefreshThreadPool; + private ScheduledExecutorService refreshCheckerScheduler; private final TabletStateStore rootTabletStore; private final TabletStateStore metadataTabletStore; @@ -905,6 +907,40 @@ private void checkForHeldServer(SortedMap ts return info; } + private class RefreshThreads implements Runnable { + @Override + public void run() { + try { + int newMinThreads = getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS); + int newMaxThreads = getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS); + + int currentMinThreads = tabletRefreshThreadPool.getPoolSize(); + int currentMaxThreads = tabletRefreshThreadPool.getMaximumPoolSize(); + + if (newMinThreads != currentMinThreads) { + System.out.println("Updating MANAGER_TABLET_REFRESH_MINTHREADS from " + currentMinThreads + + " to " + newMinThreads); + tabletRefreshThreadPool.setCorePoolSize(newMinThreads); + } + if (newMaxThreads != currentMaxThreads) { + System.out.println("Updating MANAGER_TABLET_REFRESH_MAXTHREADS from " + currentMaxThreads + + " to " + newMaxThreads); + tabletRefreshThreadPool.setMaximumPoolSize(newMaxThreads); + } + } catch (Exception e) { + System.err + .println("Error checking or updating thread pool configuration: " + e.getMessage()); + } + } + } + + public void startRefreshChecker() { + refreshCheckerScheduler = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, + "ManagerTabletRefreshThreadsExecutor"); + ThreadPools.watchNonCriticalScheduledTask( + refreshCheckerScheduler.scheduleAtFixedRate(new RefreshThreads(), 5, 5, MINUTES)); + } + @Override public void run() { final ServerContext context = getContext(); @@ -968,6 +1004,8 @@ public void run() { .numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS)) .build(); + startRefreshChecker(); + Thread statusThread = Threads.createCriticalThread("Status Thread", new StatusThread()); statusThread.start(); @@ -1235,6 +1273,7 @@ boolean canSuspendTablets() { tableInformationStatusPool.shutdownNow(); tabletRefreshThreadPool.shutdownNow(); + refreshCheckerScheduler.shutdown(); compactionCoordinator.shutdown(); From 1db76c70611a2d8bff045b2c0a6c35a5c95e10eb Mon Sep 17 00:00:00 2001 From: ibilley7 Date: Fri, 22 Aug 2025 09:55:13 -0400 Subject: [PATCH 2/2] Changed code to model off of `resizePool()` method in ThreadPools. Added `resizeCorePool()` method. --- .../core/util/threads/ThreadPools.java | 37 +++++++++++++++++++ .../org/apache/accumulo/manager/Manager.java | 24 ++---------- 2 files changed, 41 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 6eb22ff58bc..7c21119848a 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -219,6 +219,43 @@ public static void ensureRunning(ScheduledFuture future, String message) { } } + /** + * Resize ThreadPoolExecutor based on current value of minThreads + * + * @param pool the ThreadPoolExecutor to modify + * @param minThreads supplier of minThreads value + * @param poolName name of the thread pool + */ + public static void resizeCorePool(final ThreadPoolExecutor pool, final IntSupplier minThreads, + String poolName) { + int count = pool.getPoolSize(); + int newCount = minThreads.getAsInt(); + if (count == newCount) { + return; + } + LOG.info("Changing min threads for {} from {} to {}", poolName, count, newCount); + if (newCount < count) { + pool.setCorePoolSize(newCount); + } else { + if (newCount > pool.getMaximumPoolSize()) { + pool.setMaximumPoolSize(newCount); + } + pool.setCorePoolSize(newCount); + } + } + + /** + * Resize ThreadPoolExecutor based on current value of Property p + * + * @param pool the ThreadPoolExecutor to modify + * @param conf the AccumuloConfiguration + * @param p the property to base the size from + */ + public static void resizeCorePool(final ThreadPoolExecutor pool, final AccumuloConfiguration conf, + final Property p) { + resizeCorePool(pool, () -> conf.getCount(p), p.getKey()); + } + /** * Resize ThreadPoolExecutor based on current value of maxThreads * diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 9f00ae7a6f7..2759de71951 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -910,27 +910,11 @@ private void checkForHeldServer(SortedMap ts private class RefreshThreads implements Runnable { @Override public void run() { - try { - int newMinThreads = getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS); - int newMaxThreads = getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS); - - int currentMinThreads = tabletRefreshThreadPool.getPoolSize(); - int currentMaxThreads = tabletRefreshThreadPool.getMaximumPoolSize(); + ThreadPools.resizeCorePool(tabletRefreshThreadPool, getConfiguration(), + Property.MANAGER_TABLET_REFRESH_MINTHREADS); + ThreadPools.resizePool(tabletRefreshThreadPool, getConfiguration(), + Property.MANAGER_TABLET_REFRESH_MAXTHREADS); - if (newMinThreads != currentMinThreads) { - System.out.println("Updating MANAGER_TABLET_REFRESH_MINTHREADS from " + currentMinThreads - + " to " + newMinThreads); - tabletRefreshThreadPool.setCorePoolSize(newMinThreads); - } - if (newMaxThreads != currentMaxThreads) { - System.out.println("Updating MANAGER_TABLET_REFRESH_MAXTHREADS from " + currentMaxThreads - + " to " + newMaxThreads); - tabletRefreshThreadPool.setMaximumPoolSize(newMaxThreads); - } - } catch (Exception e) { - System.err - .println("Error checking or updating thread pool configuration: " + e.getMessage()); - } } }