From b7184b42794770d88fb359e078f5dc2d5146e233 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon" Date: Wed, 7 Jan 2026 09:42:14 -0500 Subject: [PATCH] AMQ-9824 - Cleanup code in KahaDB classes This cleanups the MessageDatabase and KahaDBStore classes. This commit includes the following: * Fixes the scope of several methods and types. For example, there were many cases where protected methods were referencing types that were package scope. * Simplified the code by replacing anonymous methods with lambdas * removed unused methods and parameters * removed unnecessary casts * cleaned up the use of generics where types could be inferred * Replaced the ReentrantReadWriteLock that was used for indexLock with ReentrantLock becuase only the write lock was ever being used (the page file doesn't support concurrent reads right now). This should provide a small performance/memory improvement and simplifies the code a bit. * removed unnecessary null initializations * cleaned up logging to remove string concatenation and instead use parameters * removed method overrides that are the same as the parent or just call the super method * removed unused checked exceptions from method's throws * marked inner classes as static when possible --- .../activemq/store/kahadb/KahaDBStore.java | 827 ++++++++---------- .../store/kahadb/MessageDatabase.java | 711 ++++++--------- ...JournalCorruptionEofIndexRecoveryTest.java | 37 +- .../KahaDBStoreOpenWireVersionTest.java | 23 +- .../org/apache/activemq/bugs/AMQ2982Test.java | 4 +- .../org/apache/activemq/bugs/AMQ2983Test.java | 4 +- .../kahadb/KahaDBMessageStoreSizeTest.java | 4 +- 7 files changed, 683 insertions(+), 927 deletions(-) diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index a543e2bf964..fb3eb7a2542 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -76,17 +76,14 @@ import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; -import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; @@ -109,8 +106,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, protected ExecutorService queueExecutor; protected ExecutorService topicExecutor; - protected final List> asyncQueueMaps = new LinkedList<>(); - protected final List> asyncTopicMaps = new LinkedList<>(); + final List> asyncQueueMaps = new LinkedList<>(); + final List> asyncTopicMaps = new LinkedList<>(); final WireFormat wireFormat = new OpenWireFormat(); private SystemUsage usageManager; private LinkedBlockingQueue asyncQueueJobQueue; @@ -233,26 +230,22 @@ public void doStart() throws Exception { this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); - this.asyncQueueJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); - this.asyncTopicJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); - this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, - asyncQueueJobQueue, new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); - thread.setDaemon(true); - return thread; - } - }); - this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, - asyncTopicJobQueue, new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); - thread.setDaemon(true); - return thread; - } - }); + this.asyncQueueJobQueue = new LinkedBlockingQueue<>(getMaxAsyncJobs()); + this.asyncTopicJobQueue = new LinkedBlockingQueue<>(getMaxAsyncJobs()); + this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, + TimeUnit.MILLISECONDS, + asyncQueueJobQueue, runnable -> { + Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); + thread.setDaemon(true); + return thread; + }); + this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, + TimeUnit.MILLISECONDS, + asyncTopicJobQueue, runnable -> { + Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); + thread.setDaemon(true); + return thread; + }); } @Override @@ -305,21 +298,18 @@ public void doStop(ServiceStopper stopper) throws Exception { } private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Location execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(destination, tx); - Long sequence = sd.messageIdIndex.get(tx, key); - if (sequence == null) { - return null; - } - return sd.orderIndex.get(tx, sequence).location; + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(destination, tx); + Long sequence = sd.messageIdIndex.get(tx, key); + if (sequence == null) { + return null; } + return sd.orderIndex.get(tx, sequence).location; }); } - protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { - StoreQueueTask task = null; + StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { + StoreQueueTask task; synchronized (store.asyncTaskMap) { task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); } @@ -327,20 +317,20 @@ protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) } // with asyncTaskMap locked - protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { + void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); this.queueExecutor.execute(task); } - protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { - StoreTopicTask task = null; + StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { + StoreTopicTask task; synchronized (store.asyncTaskMap) { task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); } return task; } - protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { + void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { synchronized (store.asyncTaskMap) { store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); } @@ -409,12 +399,12 @@ private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestina } public class KahaDBMessageStore extends AbstractMessageStore { - protected final Map asyncTaskMap = new HashMap(); + final Map asyncTaskMap = new HashMap<>(); protected KahaDestination dest; private final int maxAsyncJobs; private final Semaphore localDestinationSemaphore; - protected final HashMap> ackedAndPreparedMap = new HashMap>(); - protected final HashMap> rolledBackAcksMap = new HashMap>(); + protected final HashMap> ackedAndPreparedMap = new HashMap<>(); + protected final HashMap> rolledBackAcksMap = new HashMap<>(); double doneTasks, canceledTasks = 0; @@ -425,13 +415,7 @@ public KahaDBMessageStore(ActiveMQDestination destination) { this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); } - @Override - public ActiveMQDestination getDestination() { - return destination; - } - - - private final String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) { + private String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) { return destination.isQueue() ? destination.getPhysicalName() : ack.getConsumerId().getConnectionId(); } @@ -439,25 +423,22 @@ private final String recoveredTxStateMapKey(ActiveMQDestination destination, Mes // till then they are skipped by the store. // 'at most once' XA guarantee public void trackRecoveredAcks(ArrayList acks) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (MessageAck ack : acks) { final String key = recoveredTxStateMapKey(destination, ack); - Set ackedAndPrepared = ackedAndPreparedMap.get(key); - if (ackedAndPrepared == null) { - ackedAndPrepared = new LinkedHashSet(); - ackedAndPreparedMap.put(key, ackedAndPrepared); - } + Set ackedAndPrepared = ackedAndPreparedMap.computeIfAbsent(key, + k -> new LinkedHashSet<>()); ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } public void forgetRecoveredAcks(ArrayList acks, boolean rollback) throws IOException { if (acks != null) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (MessageAck ack : acks) { final String id = ack.getLastMessageId().toProducerKey(); @@ -470,11 +451,8 @@ public void forgetRecoveredAcks(ArrayList acks, boolean rollback) th } } if (rollback) { - Set rolledBackAcks = rolledBackAcksMap.get(key); - if (rolledBackAcks == null) { - rolledBackAcks = new LinkedHashSet(); - rolledBackAcksMap.put(key, rolledBackAcks); - } + Set rolledBackAcks = rolledBackAcksMap.computeIfAbsent(key, + k -> new LinkedHashSet<>()); rolledBackAcks.add(id); pageFile.tx().execute(tx -> { incrementAndAddSizeToStoreStat(tx, dest, 0); @@ -482,7 +460,7 @@ public void forgetRecoveredAcks(ArrayList acks, boolean rollback) th } } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -513,7 +491,7 @@ public ListenableFuture asyncAddQueueMessage(final ConnectionContext con public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { if (isConcurrentStoreAndDispatchQueues()) { AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); - StoreQueueTask task = null; + StoreQueueTask task; synchronized (asyncTaskMap) { task = (StoreQueueTask) asyncTaskMap.get(key); } @@ -528,11 +506,11 @@ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws } removeMessage(context, ack); } else { - indexLock.writeLock().lock(); + indexLock.lock(); try { metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } synchronized (asyncTaskMap) { asyncTaskMap.remove(key); @@ -558,7 +536,7 @@ public void addMessage(final ConnectionContext context, final Message message) t command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { // sync add? (for async, future present from getFutureOrSequenceLong) - Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); + final Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); @Override public void sequenceAssignedWithIndexLocked(final long sequence) { @@ -566,12 +544,8 @@ public void sequenceAssignedWithIndexLocked(final long sequence) { if (indexListener != null) { if (possibleFuture == null) { trackPendingAdd(dest, sequence); - indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { - @Override - public void run() { - trackPendingAddComplete(dest, sequence); - } - })); + indexListener.onAdd(new IndexListener.MessageContext(context, message, + () -> trackPendingAddComplete(dest, sequence))); } } } @@ -604,7 +578,8 @@ public void run() { @Override public void updateMessage(Message message) throws IOException { if (LOG.isTraceEnabled()) { - LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); + LOG.trace("updating: {} with deliveryCount: {}", message.getMessageId(), + message.getRedeliveryCounter()); } KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); KahaAddMessageCommand command = new KahaAddMessageCommand(); @@ -645,11 +620,11 @@ public Message getMessage(MessageId identity) throws IOException { // operations... but for now we must // externally synchronize... Location location; - indexLock.writeLock().lock(); + indexLock.lock(); try { location = findMessageLocation(key, dest); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } if (location == null) { return null; @@ -660,80 +635,71 @@ public Message getMessage(MessageId identity) throws IOException { @Override public boolean isEmpty() throws IOException { - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Boolean execute(Transaction tx) throws IOException { - // Iterate through all index entries to get a count of - // messages in the destination. - StoredDestination sd = getStoredDestination(dest, tx); - return sd.locationIndex.isEmpty(tx); - } + return pageFile.tx().execute(tx -> { + // Iterate through all index entries to get a count of + // messages in the destination. + StoredDestination sd = getStoredDestination(dest, tx); + return sd.locationIndex.isEmpty(tx); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @Override public void recover(final MessageRecoveryListener listener) throws Exception { // recovery may involve expiry which will modify - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener); - sd.orderIndex.resetCursorPosition(); - for (Iterator> iterator = - sd.orderIndex.iterator(tx, new MessageOrderCursor()); listener.hasSpace() && - iterator.hasNext(); ) { - Entry entry = iterator.next(); - Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - Message msg = loadMessage(entry.getValue().location); - listener.recoverMessage(msg); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener); + sd.orderIndex.resetCursorPosition(); + for (Iterator> iterator = + sd.orderIndex.iterator(tx, new MessageOrderCursor()); listener.hasSpace() && + iterator.hasNext(); ) { + Entry entry = iterator.next(); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; } + Message msg = loadMessage(entry.getValue().location); + listener.recoverMessage(msg); } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @Override public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - Entry entry = null; - int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); - Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { - entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - Message msg = loadMessage(entry.getValue().location); - msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); - listener.recoverMessage(msg); - counter++; - if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { - break; - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + Entry entry; + int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { + entry = iterator.next(); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; + } + Message msg = loadMessage(entry.getValue().location); + msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); + listener.recoverMessage(msg); + counter++; + if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { + break; } - sd.orderIndex.stoppedIterating(); } + sd.orderIndex.stoppedIterating(); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -747,86 +713,84 @@ public void recoverMessages(final MessageRecoveryContext messageRecoveryContext) throw new IllegalArgumentException("Invalid messageRecoveryContext specified"); } - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - - /** - * [AMQ-9773] - * - * The order index sequence key value increases for every message since the beginning of the - * creation of the index, so the first message available won't be at sequence key value:0 in - * the index when there were older messages acknowledged. - * - * The MessageOrderCursor _position_ value is relative to the index, so there is a disconnect - * between queue _position_ and index sequence value over time. - * - * The MessageRecoveryContext determines the recovery start position based off the provided - * offset, or the position of the requested startMessageId. If a startMessageId is specified, - * but not found in the index, then the value of 0 is used as a fallback. - * - * The MessageRecoveryContext determines the recovery end position based off of the provided - * endMessageId (if provided), or the maximum recovered message count, or if the - * MessageRecoveryListener signals that no more messages should be recovered - * (ie memory is full). - */ - Long startSequenceOffset = null; - Long endSequenceOffset = null; - - if(messageRecoveryContext.getStartMessageId() != null && !messageRecoveryContext.getStartMessageId().isBlank()) { - startSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getStartMessageId())).orElse(0L); - } else { - startSequenceOffset = Optional.ofNullable(messageRecoveryContext.getOffset()).orElse(0L); - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + + /** + * [AMQ-9773] + * + * The order index sequence key value increases for every message since the beginning of the + * creation of the index, so the first message available won't be at sequence key value:0 in + * the index when there were older messages acknowledged. + * + * The MessageOrderCursor _position_ value is relative to the index, so there is a disconnect + * between queue _position_ and index sequence value over time. + * + * The MessageRecoveryContext determines the recovery start position based off the provided + * offset, or the position of the requested startMessageId. If a startMessageId is specified, + * but not found in the index, then the value of 0 is used as a fallback. + * + * The MessageRecoveryContext determines the recovery end position based off of the provided + * endMessageId (if provided), or the maximum recovered message count, or if the + * MessageRecoveryListener signals that no more messages should be recovered + * (ie memory is full). + */ + Long startSequenceOffset; + Long endSequenceOffset = null; + + if(messageRecoveryContext.getStartMessageId() != null && !messageRecoveryContext.getStartMessageId().isBlank()) { + startSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getStartMessageId())).orElse(0L); + } else { + startSequenceOffset = Optional.ofNullable(messageRecoveryContext.getOffset()).orElse(0L); + } - if(messageRecoveryContext.getEndMessageId() != null && !messageRecoveryContext.getEndMessageId().isBlank()) { - endSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getEndMessageId())) - .orElse(startSequenceOffset + Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned())); - messageRecoveryContext.setEndSequenceId(endSequenceOffset); - } + if(messageRecoveryContext.getEndMessageId() != null && !messageRecoveryContext.getEndMessageId().isBlank()) { + endSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getEndMessageId())) + .orElse(startSequenceOffset + (long) messageRecoveryContext.getMaxMessageCountReturned()); + messageRecoveryContext.setEndSequenceId(endSequenceOffset); + } - if(endSequenceOffset != null && - endSequenceOffset < startSequenceOffset) { - LOG.warn("Invalid offset parameters start:{} end:{}", startSequenceOffset, endSequenceOffset); - throw new IllegalStateException("Invalid offset parameters start:" + startSequenceOffset + " end:" + endSequenceOffset); - } + if(endSequenceOffset != null && + endSequenceOffset < startSequenceOffset) { + LOG.warn("Invalid offset parameters start:{} end:{}", startSequenceOffset, endSequenceOffset); + throw new IllegalStateException("Invalid offset parameters start:" + startSequenceOffset + " end:" + endSequenceOffset); + } - Entry entry = null; - recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, messageRecoveryContext.getMaxMessageCountReturned(), messageRecoveryContext); - Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); - Iterator> iterator = (messageRecoveryContext.isUseDedicatedCursor() ? sd.orderIndex.iterator(tx, new MessageOrderCursor(startSequenceOffset)) : sd.orderIndex.iterator(tx)); + Entry entry; + recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, messageRecoveryContext.getMaxMessageCountReturned(), messageRecoveryContext); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + Iterator> iterator = (messageRecoveryContext.isUseDedicatedCursor() ? sd.orderIndex.iterator(tx, + new MessageOrderCursor(startSequenceOffset)) : sd.orderIndex.iterator(tx)); - while (iterator.hasNext()) { - entry = iterator.next(); + while (iterator.hasNext()) { + entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; + } - Message msg = loadMessage(entry.getValue().location); - msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); + Message msg = loadMessage(entry.getValue().location); + msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); - messageRecoveryContext.recoverMessage(msg); - if (!messageRecoveryContext.canRecoveryNextMessage(entry.getKey())) { - break; - } + messageRecoveryContext.recoverMessage(msg); + if (!messageRecoveryContext.canRecoveryNextMessage(entry.getKey())) { + break; } + } - // [AMQ-9773] The sd.orderIndex uses the destination's cursor - if(!messageRecoveryContext.isUseDedicatedCursor()) { - sd.orderIndex.stoppedIterating(); - } + // [AMQ-9773] The sd.orderIndex uses the destination's cursor + if(!messageRecoveryContext.isUseDedicatedCursor()) { + sd.orderIndex.stoppedIterating(); } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } - protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { + int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { int counter = 0; String id; @@ -862,64 +826,46 @@ protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestina @Override public void resetBatching() { if (pageFile.isLoaded()) { - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getExistingStoredDestination(dest, tx); - if (sd != null) { - sd.orderIndex.resetCursorPosition();} - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getExistingStoredDestination(dest, tx); + if (sd != null) { + sd.orderIndex.resetCursorPosition();} }); } catch (Exception e) { LOG.error("Failed to reset batching",e); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @Override public void setBatch(final MessageId identity) throws IOException { - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - Long location = (Long) identity.getFutureOrSequenceLong(); - Long pending = sd.orderIndex.minPendingAdd(); - if (pending != null) { - location = Math.min(location, pending-1); - } - sd.orderIndex.setBatch(tx, location); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + Long location = (Long) identity.getFutureOrSequenceLong(); + Long pending = sd.orderIndex.minPendingAdd(); + if (pending != null) { + location = Math.min(location, pending-1); } + sd.orderIndex.setBatch(location); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } - @Override - public void setMemoryUsage(MemoryUsage memoryUsage) { - } - @Override - public void start() throws Exception { - super.start(); - } - @Override - public void stop() throws Exception { - super.stop(); - } - protected void lockAsyncJobQueue() { try { if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); } } catch (Exception e) { - LOG.error("Failed to lock async jobs for " + this.destination, e); + LOG.error("Failed to lock async jobs for {}", this.destination, e); } } @@ -931,7 +877,7 @@ protected void acquireLocalAsyncLock() { try { this.localDestinationSemaphore.acquire(); } catch (InterruptedException e) { - LOG.error("Failed to aquire async lock for " + this.destination, e); + LOG.error("Failed to aquire async lock for {}", this.destination, e); } } @@ -949,25 +895,22 @@ protected void recoverMessageStoreStatistics() throws IOException { try { MessageStoreStatistics recoveredStatistics; lockAsyncJobQueue(); - indexLock.writeLock().lock(); + indexLock.lock(); try { - recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public MessageStoreStatistics execute(Transaction tx) throws IOException { - MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx); - - // Iterate through all index entries to get the size of each message - if (statistics == null) { - StoredDestination sd = getStoredDestination(dest, tx); - statistics = new MessageStoreStatistics(); - for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) { - int locationSize = iterator.next().getKey().getSize(); - statistics.getMessageCount().increment(); - statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); - } + recoveredStatistics = pageFile.tx().execute(tx -> { + MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx); + + // Iterate through all index entries to get the size of each message + if (statistics == null) { + StoredDestination sd = getStoredDestination(dest, tx); + statistics = new MessageStoreStatistics(); + for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) { + int locationSize = iterator.next().getKey().getSize(); + statistics.getMessageCount().increment(); + statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); } - return statistics; } + return statistics; }); Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); if (ackedAndPrepared != null) { @@ -976,7 +919,7 @@ public MessageStoreStatistics execute(Transaction tx) throws IOException { getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } finally { unlockAsyncJobQueue(); @@ -1025,10 +968,10 @@ public ListenableFuture asyncAddTopicMessage(final ConnectionContext con @Override public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { - String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); + String subscriptionKey = subscriptionKey(clientId, subscriptionName); if (isConcurrentStoreAndDispatchTopics()) { AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); - StoreTopicTask task = null; + StoreTopicTask task; synchronized (asyncTaskMap) { task = (StoreTopicTask) asyncTaskMap.get(key); } @@ -1042,14 +985,14 @@ public void acknowledge(ConnectionContext context, String clientId, String subsc } } } else { - doAcknowledge(context, subscriptionKey, messageId, ack); + doAcknowledge(subscriptionKey, messageId, ack); } } else { - doAcknowledge(context, subscriptionKey, messageId, ack); + doAcknowledge(subscriptionKey, messageId, ack); } } - protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) + protected void doAcknowledge(String subscriptionKey, MessageId messageId, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); @@ -1071,11 +1014,11 @@ public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroacti .getSubscriptionName()); KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); - command.setSubscriptionKey(subscriptionKey.toString()); + command.setSubscriptionKey(subscriptionKey); command.setRetroactive(retroactive); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && true, null, null); + store(command, isEnableJournalDiskSyncs(), null, null); this.subscriptionCount.incrementAndGet(); } @@ -1083,33 +1026,30 @@ public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroacti public void deleteSubscription(String clientId, String subscriptionName) throws IOException { KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); - command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); - store(command, isEnableJournalDiskSyncs() && true, null, null); + command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); + store(command, isEnableJournalDiskSyncs(), null, null); this.subscriptionCount.decrementAndGet(); } @Override public SubscriptionInfo[] getAllSubscriptions() throws IOException { - final ArrayList subscriptions = new ArrayList(); - indexLock.writeLock().lock(); + final ArrayList subscriptions = new ArrayList<>(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry - .getValue().getSubscriptionInfo().newInput())); - subscriptions.add(info); + pageFile.tx().execute((Transaction.Closure) tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry + .getValue().getSubscriptionInfo().newInput())); + subscriptions.add(info); - } } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; @@ -1120,22 +1060,19 @@ public void execute(Transaction tx) throws IOException { @Override public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public SubscriptionInfo execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); - if (command == null) { - return null; - } - return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command - .getSubscriptionInfo().newInput())); + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); + if (command == null) { + return null; } + return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command + .getSubscriptionInfo().newInput())); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1147,23 +1084,20 @@ public int getMessageCount(String clientId, String subscriptionName) throws IOEx return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); } else { - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Integer execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos == null) { - // The subscription might not exist. - return 0; - } - - return (int) getStoredMessageCount(tx, sd, subscriptionKey); + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos == null) { + // The subscription might not exist. + return 0; } + + return (int) getStoredMessageCount(tx, sd, subscriptionKey); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1175,23 +1109,20 @@ public long getMessageSize(String clientId, String subscriptionName) throws IOEx if (isEnableSubscriptionStatistics()) { return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); } else { - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Long execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos == null) { - // The subscription might not exist. - return 0l; - } - - return getStoredMessageSize(tx, sd, subscriptionKey); + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos == null) { + // The subscription might not exist. + return 0L; } + + return getStoredMessageSize(tx, sd, subscriptionKey); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1199,39 +1130,35 @@ public Long execute(Transaction tx) throws IOException { protected void recoverMessageStoreSubMetrics() throws IOException { if (isEnableSubscriptionStatistics()) { final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); - List subscriptionKeys = new ArrayList<>(); - for (Iterator> iterator = sd.subscriptions - .iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - - final String subscriptionKey = entry.getKey(); - final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos != null) { - //add the subscriptions to a list for recovering pending sizes below - subscriptionKeys.add(subscriptionKey); - //recover just the count here as that is fast - statistics.getMessageCount(subscriptionKey) - .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); - } - } + List subscriptionKeys = new ArrayList<>(); + for (Iterator> iterator = sd.subscriptions + .iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); - //Recover the message sizes for each subscription by iterating only 1 time over the order index - //to speed up recovery - final Map subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys); - subPendingMessageSizes.forEach((k,v) -> { - statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0); - }); + final String subscriptionKey = entry.getKey(); + final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos != null) { + //add the subscriptions to a list for recovering pending sizes below + subscriptionKeys.add(subscriptionKey); + //recover just the count here as that is fast + statistics.getMessageCount(subscriptionKey) + .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); + } } + + //Recover the message sizes for each subscription by iterating only 1 time over the order index + //to speed up recovery + final Map subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys); + subPendingMessageSizes.forEach((k,v) -> + statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0)); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1242,44 +1169,41 @@ public void recoverSubscription(String clientId, String subscriptionName, final final String subscriptionKey = subscriptionKey(clientId, subscriptionName); @SuppressWarnings("unused") final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); - //If we have ackPositions tracked then compare the first one as individual acknowledge mode - //may have bumped lastAck even though there are earlier messages to still consume - if (subAckPositions != null && !subAckPositions.isEmpty() - && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) { - //we have messages to ack before lastAckedSequence - sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); - } else { - subAckPositions = null; - sd.orderIndex.setBatch(tx, cursorPos); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); + //If we have ackPositions tracked then compare the first one as individual acknowledge mode + //may have bumped lastAck even though there are earlier messages to still consume + if (subAckPositions != null && !subAckPositions.isEmpty() + && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) { + //we have messages to ack before lastAckedSequence + sd.orderIndex.setBatch(subAckPositions.getHead().getFirst() - 1); + } else { + subAckPositions = null; + sd.orderIndex.setBatch(cursorPos); + } + recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; } - recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener); - Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - //If subAckPositions is set then verify the sequence set contains the message still - //and if it doesn't skip it - if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { - continue; - } - listener.recoverMessage(loadMessage(entry.getValue().location)); + //If subAckPositions is set then verify the sequence set contains the message still + //and if it doesn't skip it + if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { + continue; } - sd.orderIndex.resetCursorPosition(); + listener.recoverMessage(loadMessage(entry.getValue().location)); } + sd.orderIndex.resetCursorPosition(); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1289,73 +1213,70 @@ public void recoverNextMessages(String clientId, String subscriptionName, final final String subscriptionKey = subscriptionKey(clientId, subscriptionName); @SuppressWarnings("unused") final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - sd.orderIndex.resetCursorPosition(); - MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); - SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);; - if (moc == null) { - LastAck pos = getLastAck(tx, sd, subscriptionKey); - if (pos == null) { - // sub deleted - return; - } - //If we have ackPositions tracked then compare the first one as individual acknowledge mode - //may have bumped lastAck even though there are earlier messages to still consume - if (subAckPositions != null && !subAckPositions.isEmpty() - && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) { - //we have messages to ack before lastAckedSequence - sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); - } else { - subAckPositions = null; - sd.orderIndex.setBatch(tx, pos); - } - moc = sd.orderIndex.cursor; + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + sd.orderIndex.resetCursorPosition(); + MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); + SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); + if (moc == null) { + LastAck pos = getLastAck(tx, sd, subscriptionKey); + if (pos == null) { + // sub deleted + return; + } + //If we have ackPositions tracked then compare the first one as individual acknowledge mode + //may have bumped lastAck even though there are earlier messages to still consume + if (subAckPositions != null && !subAckPositions.isEmpty() + && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) { + //we have messages to ack before lastAckedSequence + sd.orderIndex.setBatch(subAckPositions.getHead().getFirst() - 1); } else { - sd.orderIndex.cursor.sync(moc); + subAckPositions = null; + sd.orderIndex.setBatch(pos); } + moc = sd.orderIndex.cursor; + } else { + sd.orderIndex.cursor.sync(moc); + } - Entry entry = null; - int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener); - Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); - for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator - .hasNext();) { - entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - //If subAckPositions is set then verify the sequence set contains the message still - //and if it doesn't skip it - if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { - continue; - } - if (listener.recoverMessage(loadMessage(entry.getValue().location))) { - counter++; - } - if (counter >= maxReturned || listener.hasSpace() == false) { - break; - } + Entry entry = null; + int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); + for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator + .hasNext();) { + entry = iterator.next(); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; } - sd.orderIndex.stoppedIterating(); - if (entry != null) { - MessageOrderCursor copy = sd.orderIndex.cursor.copy(); - sd.subscriptionCursors.put(subscriptionKey, copy); + //If subAckPositions is set then verify the sequence set contains the message still + //and if it doesn't skip it + if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { + continue; + } + if (listener.recoverMessage(loadMessage(entry.getValue().location))) { + counter++; + } + if (counter >= maxReturned || !listener.hasSpace()) { + break; } } + sd.orderIndex.stoppedIterating(); + if (entry != null) { + MessageOrderCursor copy = sd.orderIndex.cursor.copy(); + sd.subscriptionCursors.put(subscriptionKey, copy); + } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @Override public Map> recoverExpired(Set subscriptions, int maxBrowse, MessageRecoveryListener listener) throws Exception { - indexLock.writeLock().lock(); + indexLock.lock(); try { return pageFile.tx().execute(tx -> { StoredDestination sd = getStoredDestination(dest, tx); @@ -1406,7 +1327,7 @@ public Map> recoverExpired(Set su return expired; }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1414,17 +1335,14 @@ public Map> recoverExpired(Set su public void resetBatching(String clientId, String subscriptionName) { try { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - sd.subscriptionCursors.remove(subscriptionKey); - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + sd.subscriptionCursors.remove(subscriptionKey); }); }finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } catch (IOException e) { throw new RuntimeException(e); @@ -1501,22 +1419,19 @@ public void deleteAllMessages() throws IOException { @Override public Set getDestinations() { try { - final HashSet rc = new HashSet(); - indexLock.writeLock().lock(); + final HashSet rc = new HashSet<>(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - //Removing isEmpty topic check - see AMQ-5875 - rc.add(convert(entry.getKey())); - } + pageFile.tx().execute(tx -> { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + //Removing isEmpty topic check - see AMQ-5875 + rc.add(convert(entry.getKey())); } }); }finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } return rc; } catch (IOException e) { @@ -1525,17 +1440,17 @@ public void execute(Transaction tx) throws IOException { } @Override - public long getLastMessageBrokerSequenceId() throws IOException { + public long getLastMessageBrokerSequenceId() { return 0; } @Override public long getLastProducerSequenceId(ProducerId id) { - indexLock.writeLock().lock(); + indexLock.lock(); try { return metadata.producerSequenceIdTracker.getLastSeqId(id); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1578,7 +1493,7 @@ public void checkpoint(boolean sync) throws IOException { Message loadMessage(Location location) throws IOException { try { JournalCommand command = load(location); - KahaAddMessageCommand addMessage = null; + KahaAddMessageCommand addMessage; switch (command.type()) { case KAHA_UPDATE_MESSAGE_COMMAND: addMessage = ((KahaUpdateMessageCommand) command).getMessage(); @@ -1592,8 +1507,7 @@ Message loadMessage(Location location) throws IOException { if (!addMessage.hasMessage()) { throw new IOException("Could not load journal record, null message content at location: " + location); } - Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); - return msg; + return (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); } catch (Throwable t) { IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t); LOG.error("Failed to load message at: {}", location , ioe); @@ -1606,13 +1520,6 @@ Message loadMessage(Location location) throws IOException { // Internal conversion methods. // ///////////////////////////////////////////////////////////////// - KahaLocation convert(Location location) { - KahaLocation rc = new KahaLocation(); - rc.setLogId(location.getDataFileId()); - rc.setOffset(location.getOffset()); - return rc; - } - KahaDestination convert(ActiveMQDestination dest) { KahaDestination rc = new KahaDestination(); rc.setName(dest.getPhysicalName()); @@ -1644,10 +1551,6 @@ ActiveMQDestination convert(String dest) { return convert(type, name); } - private ActiveMQDestination convert(KahaDestination commandDestination) { - return convert(commandDestination.getType().getNumber(), commandDestination.getName()); - } - private ActiveMQDestination convert(int type, String name) { switch (KahaDestination.DestinationType.valueOf(type)) { case QUEUE: @@ -1701,18 +1604,18 @@ public String toString() { } public interface StoreTask { - public boolean cancel(); + boolean cancel(); - public void aquireLocks(); + void aquireLocks(); - public void releaseLocks(); + void releaseLocks(); } class StoreQueueTask implements Runnable, StoreTask { protected final Message message; protected final ConnectionContext context; protected final KahaDBMessageStore store; - protected final InnerFutureTask future; + final InnerFutureTask future; protected final AtomicBoolean done = new AtomicBoolean(); protected final AtomicBoolean locked = new AtomicBoolean(); @@ -1781,7 +1684,7 @@ protected Message getMessage() { return this.message; } - private class InnerFutureTask extends FutureTask implements ListenableFuture { + private static class InnerFutureTask extends FutureTask implements ListenableFuture { private final AtomicReference listenerRef = new AtomicReference<>(); @@ -1826,7 +1729,7 @@ private void fireListener() { class StoreTopicTask extends StoreQueueTask { private final int subscriptionCount; - private final List subscriptionKeys = new ArrayList(1); + private final List subscriptionKeys = new ArrayList<>(1); private final KahaDBTopicMessageStore topicStore; public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, int subscriptionCount) { @@ -1880,7 +1783,7 @@ public void run() { // apply any acks we have synchronized (this.subscriptionKeys) { for (String key : this.subscriptionKeys) { - this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); + this.topicStore.doAcknowledge(key, this.message.getMessageId(), null); } } @@ -1898,7 +1801,7 @@ public void run() { } } - public class StoreTaskExecutor extends ThreadPoolExecutor { + public static class StoreTaskExecutor extends ThreadPoolExecutor { public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue queue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); @@ -1915,7 +1818,7 @@ protected void afterExecute(Runnable runnable, Throwable throwable) { } @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + public JobSchedulerStore createJobSchedulerStore() throws UnsupportedOperationException { return new JobSchedulerStoreImpl(); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 85ff18fc807..c21f9825601 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -53,19 +53,16 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; @@ -140,7 +137,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; - protected class Metadata { + class Metadata { protected Page page; protected int state; protected BTreeIndex destinations; @@ -191,7 +188,7 @@ public void read(DataInput is) throws IOException { openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; } - LOG.info("KahaDB is version " + version); + LOG.info("KahaDB is version {}", version); } public void write(DataOutput os) throws IOException { @@ -246,15 +243,15 @@ public void writePayload(Metadata object, DataOutput dataOut) throws IOException public enum PurgeRecoveredXATransactionStrategy { NEVER, COMMIT, - ROLLBACK; + ROLLBACK } protected PageFile pageFile; protected Journal journal; - protected Metadata metadata = new Metadata(); + Metadata metadata = new Metadata(); protected final PersistenceAdapterStatistics persistenceAdapterStatistics = new PersistenceAdapterStatistics(); - protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); + MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); protected boolean failIfDatabaseIsLocked; @@ -324,50 +321,44 @@ public void allowIOResumption() { } private void loadPageFile() throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { final PageFile pageFile = getPageFile(); pageFile.load(); - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - if (pageFile.getPageCount() == 0) { - // First time this is created.. Initialize the metadata - Page page = tx.allocate(); - assert page.getPageId() == 0; - page.set(metadata); - metadata.page = page; - metadata.state = CLOSED_STATE; - metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId()); - - tx.store(metadata.page, metadataMarshaller, true); - } else { - Page page = tx.load(0, metadataMarshaller); - metadata = page.get(); - metadata.page = page; - } - metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); - metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); - metadata.destinations.load(tx); + pageFile.tx().execute(tx -> { + if (pageFile.getPageCount() == 0) { + // First time this is created.. Initialize the metadata + Page page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metadata); + metadata.page = page; + metadata.state = CLOSED_STATE; + metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId()); + + tx.store(metadata.page, metadataMarshaller, true); + } else { + Page page = tx.load(0, metadataMarshaller); + metadata = page.get(); + metadata.page = page; } + metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); + metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); + metadata.destinations.load(tx); }); // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. // Perhaps we should just keep an index of file storedDestinations.clear(); - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); - storedDestinations.put(entry.getKey(), sd); - - if (checkForCorruptJournalFiles) { - // sanity check the index also - if (!entry.getValue().locationIndex.isEmpty(tx)) { - if (entry.getValue().orderIndex.nextMessageId <= 0) { - throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); - } + pageFile.tx().execute(tx -> { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); + storedDestinations.put(entry.getKey(), sd); + + if (checkForCorruptJournalFiles) { + // sanity check the index also + if (!entry.getValue().locationIndex.isEmpty(tx)) { + if (entry.getValue().orderIndex.nextMessageId <= 0) { + throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); } } } @@ -375,28 +366,25 @@ public void execute(Transaction tx) throws IOException { }); pageFile.flush(); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } private void startCheckpoint() { if (checkpointInterval == 0 && cleanupInterval == 0) { - LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); + LOG.info("periodic checkpoint/cleanup disabled, will occur on clean {}restart", + getCleanupOnStop() ? "shutdown/" : ""); return; } synchronized (schedulerLock) { if (scheduler == null || scheduler.isShutdown()) { - scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread schedulerThread = new Thread(r); - @Override - public Thread newThread(Runnable r) { - Thread schedulerThread = new Thread(r); - - schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); - schedulerThread.setDaemon(true); + schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); + schedulerThread.setDaemon(true); - return schedulerThread; - } + return schedulerThread; }); // Short intervals for check-point and cleanups @@ -463,7 +451,8 @@ public void open() throws IOException { try { loadPageFile(); } catch (Throwable t) { - LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); + LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:{}", + String.valueOf(t)); if (LOG.isDebugEnabled()) { LOG.debug("Index load failure", t); } @@ -489,7 +478,7 @@ public void open() throws IOException { } public void load() throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { IOHelper.mkdirs(directory); if (deleteAllMessages) { @@ -506,7 +495,7 @@ public void load() throws IOException { open(); store(new KahaTraceCommand().setMessage("LOADED " + new Date())); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @@ -536,23 +525,19 @@ public void close() throws IOException, InterruptedException { } public void unload() throws IOException, InterruptedException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { if( pageFile != null && pageFile.isLoaded() ) { metadata.state = CLOSED_STATE; metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; if (metadata.page != null) { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - tx.store(metadata.page, metadataMarshaller, true); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + tx.store(metadata.page, metadataMarshaller, true)); } } } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } close(); } @@ -596,11 +581,11 @@ class TranInfo { TransactionId id; Location location; - class opCount { + static class OpCount { int add; int remove; } - HashMap destinationOpCount = new HashMap<>(); + HashMap destinationOpCount = new HashMap<>(); @SuppressWarnings("rawtypes") public void track(Operation operation) { @@ -614,14 +599,10 @@ public void track(Operation operation) { destination = add.getCommand().getDestination(); isAdd = true; } else { - RemoveOperation removeOpperation = (RemoveOperation) operation; - destination = removeOpperation.getCommand().getDestination(); - } - opCount opCount = destinationOpCount.get(destination); - if (opCount == null) { - opCount = new opCount(); - destinationOpCount.put(destination, opCount); + RemoveOperation removeOperation = (RemoveOperation) operation; + destination = removeOperation.getCommand().getDestination(); } + OpCount opCount = destinationOpCount.computeIfAbsent(destination, k -> new OpCount()); if (isAdd) { opCount.add++; } else { @@ -631,12 +612,12 @@ public void track(Operation operation) { @Override public String toString() { - StringBuffer buffer = new StringBuffer(); - buffer.append(location).append(";").append(id).append(";\n"); - for (Entry op : destinationOpCount.entrySet()) { - buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); + StringBuilder builder = new StringBuilder(); + builder.append(location).append(";").append(id).append(";\n"); + for (Entry op : destinationOpCount.entrySet()) { + builder.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); } - return buffer.toString(); + return builder.toString(); } } @@ -696,7 +677,7 @@ public String getPreparedTransaction(TransactionId transactionId) { * @throws IllegalStateException */ private void recover() throws IllegalStateException, IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { long start = System.currentTimeMillis(); @@ -707,7 +688,7 @@ private void recover() throws IllegalStateException, IOException { if (recoveryPosition != null) { int redoCounter = 0; int dataFileRotationTracker = recoveryPosition.getDataFileId(); - LOG.info("Recovering from the journal @" + recoveryPosition); + LOG.info("Recovering from the journal @{}", recoveryPosition); while (recoveryPosition != null) { try { JournalCommand message = load(recoveryPosition); @@ -716,7 +697,7 @@ private void recover() throws IllegalStateException, IOException { redoCounter++; } catch (IOException failedRecovery) { if (isIgnoreMissingJournalfiles()) { - LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); + LOG.debug("Failed to recover data at position:{}", recoveryPosition, failedRecovery); // track this dud location journal.corruptRecoveryLocation(recoveryPosition); } else { @@ -730,29 +711,23 @@ private void recover() throws IllegalStateException, IOException { journal.cleanup(); } if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { - LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); + LOG.info("@{}, {} entries recovered ..", recoveryPosition, redoCounter); } } if (LOG.isInfoEnabled()) { long end = System.currentTimeMillis(); - LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Recovery replayed {} operations from the journal in {} seconds.", redoCounter, (end - start) / 1000.0f); } } // We may have to undo some index updates. - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - recoverIndex(tx); - } - }); + pageFile.tx().execute(this::recoverIndex); // rollback any recovered inflight local transactions, and discard any inflight XA transactions. Set toRollback = new HashSet<>(); Set toDiscard = new HashSet<>(); synchronized (inflightTransactions) { - for (Iterator it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { - TransactionId id = it.next(); + for (TransactionId id : inflightTransactions.keySet()) { if (id.isLocalTransaction()) { toRollback.add(id); } else { @@ -761,20 +736,20 @@ public void execute(Transaction tx) throws IOException { } for (TransactionId tx: toRollback) { if (LOG.isDebugEnabled()) { - LOG.debug("rolling back recovered indoubt local transaction " + tx); + LOG.debug("rolling back recovered indoubt local transaction {}", tx); } store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); } for (TransactionId tx: toDiscard) { if (LOG.isDebugEnabled()) { - LOG.debug("discarding recovered in-flight XA transaction " + tx); + LOG.debug("discarding recovered in-flight XA transaction {}", tx); } inflightTransactions.remove(tx); } } synchronized (preparedTransactions) { - Set txIds = new LinkedHashSet(preparedTransactions.keySet()); + Set txIds = new LinkedHashSet<>(preparedTransactions.keySet()); for (TransactionId txId : txIds) { switch (purgeRecoveredXATransactionStrategy){ case NEVER: @@ -793,7 +768,7 @@ public void execute(Transaction tx) throws IOException { } } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @@ -802,23 +777,6 @@ private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { return TransactionIdConversion.convertToLocal(tx); } - private Location minimum(Location x, - Location y) { - Location min = null; - if (x != null) { - min = x; - if (y != null) { - int compare = y.compareTo(x); - if (compare < 0) { - min = y; - } - } - } else { - min = y; - } - return min; - } - private boolean recoverProducerAudit() throws IOException { boolean requiresReplay = true; if (metadata.producerSequenceIdTrackerLocation != null) { @@ -870,7 +828,7 @@ protected void recoverIndex(Transaction tx) throws IOException { final ArrayList matches = new ArrayList<>(); // Find all the Locations that are >= than the last Append Location. - sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor(lastAppendLocation) { + sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<>(lastAppendLocation) { @Override protected void matched(Location key, Long value) { matches.add(value); @@ -895,7 +853,8 @@ protected void matched(Location key, Long value) { // these the end user should do sync writes to the journal. if (LOG.isInfoEnabled()) { long end = System.currentTimeMillis(); - LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, + (end - start) / 1000.0f); } } @@ -908,14 +867,14 @@ protected void matched(Location key, Long value) { final SequenceSet ss = new SequenceSet(); for (StoredDestination sd : storedDestinations.values()) { // Use a visitor to cut down the number of pages that we load - sd.locationIndex.visit(tx, new BTreeVisitor() { - int last=-1; + sd.locationIndex.visit(tx, new BTreeVisitor<>() { + int last = -1; @Override public boolean isInterestedInKeysBetween(Location first, Location second) { - if( first==null ) { + if (first == null) { return !ss.contains(0, second.getDataFileId()); - } else if( second==null ) { + } else if (second == null) { return true; } else { return !ss.contains(first.getDataFileId(), second.getDataFileId()); @@ -926,7 +885,7 @@ public boolean isInterestedInKeysBetween(Location first, Location second) { public void visit(List keys, List values) { for (Location l : keys) { int fileId = l.getDataFileId(); - if( last != fileId ) { + if (last != fileId) { ss.add(fileId); last = fileId; } @@ -942,15 +901,13 @@ public void visit(List keys, List values) { for (Entry> entry : metadata.ackMessageFileMap.entrySet()) { missingJournalFiles.add(entry.getKey()); - for (Integer i : entry.getValue()) { - missingJournalFiles.add(i); - } + missingJournalFiles.addAll(entry.getValue()); } missingJournalFiles.removeAll(journal.getFileMap().keySet()); if (!missingJournalFiles.isEmpty()) { - LOG.warn("Some journal files are missing: " + missingJournalFiles); + LOG.warn("Some journal files are missing: {}", missingJournalFiles); } ArrayList> knownCorruption = new ArrayList<>(); @@ -980,7 +937,7 @@ public void visit(List keys, List values) { for (Entry sdEntry : storedDestinations.entrySet()) { final StoredDestination sd = sdEntry.getValue(); final LinkedHashMap matches = new LinkedHashMap<>(); - sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor(missingPredicates) { + sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<>(missingPredicates) { @Override protected void matched(Location key, Long value) { matches.put(value, key); @@ -998,13 +955,14 @@ protected void matched(Location key, Long value) { MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); sd.locationIndex.remove(tx, keys.location); sd.messageIdIndex.remove(tx, keys.messageId); - LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); + LOG.info("[{}] dropped: {} at corrupt location: {}", sdEntry.getKey(), + keys.messageId, keys.location); undoCounter++; decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize()); // TODO: do we need to modify the ack positions for the pub sub case? } } else { - LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); + LOG.error("[{}] references corrupt locations: {}", sdEntry.getKey(), matches); throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); } } @@ -1013,12 +971,12 @@ protected void matched(Location key, Long value) { if (!ignoreMissingJournalfiles) { if (!knownCorruption.isEmpty()) { - LOG.error("Detected corrupt journal files. " + knownCorruption); + LOG.error("Detected corrupt journal files. {}", knownCorruption); throw new IOException("Detected corrupt journal files. " + knownCorruption); } if (!missingJournalFiles.isEmpty()) { - LOG.error("Detected missing journal files. " + missingJournalFiles); + LOG.error("Detected missing journal files. {}", missingJournalFiles); throw new IOException("Detected missing journal files. " + missingJournalFiles); } } @@ -1028,40 +986,12 @@ protected void matched(Location key, Long value) { // should do sync writes to the journal. if (LOG.isInfoEnabled()) { long end = System.currentTimeMillis(); - LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Detected missing/corrupt journal files. Dropped {} messages from the index in {} seconds.", + undoCounter, (end - start) / 1000.0f); } } } - private Location nextRecoveryPosition; - private Location lastRecoveryPosition; - - public void incrementalRecover() throws IOException { - this.indexLock.writeLock().lock(); - try { - if( nextRecoveryPosition == null ) { - if( lastRecoveryPosition==null ) { - nextRecoveryPosition = getRecoveryPosition(); - } else { - nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); - } - } - while (nextRecoveryPosition != null) { - lastRecoveryPosition = nextRecoveryPosition; - metadata.lastUpdate = lastRecoveryPosition; - JournalCommand message = load(lastRecoveryPosition); - process(message, lastRecoveryPosition, (IndexAware) null); - nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); - } - } finally { - this.indexLock.writeLock().unlock(); - } - } - - public Location getLastUpdatePosition() throws IOException { - return metadata.lastUpdate; - } - private Location getRecoveryPosition() throws IOException { if (!this.forceRecoverIndex) { @@ -1093,20 +1023,20 @@ private Location getNextInitializedLocation(Location location) throws IOExceptio protected void checkpointCleanup(final boolean cleanup) throws IOException { long start; - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { start = System.currentTimeMillis(); if( !opened.get() ) { return; } } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } checkpointUpdate(cleanup); long totalTimeMillis = System.currentTimeMillis() - start; if (LOG_SLOW_ACCESS_TIME > 0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: cleanup took " + totalTimeMillis); + LOG.info("Slow KahaDB access: cleanup took {}", totalTimeMillis); } persistenceAdapterStatistics.addSlowCleanupTime(totalTimeMillis); } @@ -1131,7 +1061,7 @@ public Location store(JournalCommand data, Runnable onJournalStoreComplete) t return store(data, false, null, null, onJournalStoreComplete); } - public Location store(JournalCommand data, boolean sync, IndexAware before,Runnable after) throws IOException { + Location store(JournalCommand data, boolean sync, IndexAware before,Runnable after) throws IOException { return store(data, sync, before, after, null); } @@ -1141,7 +1071,7 @@ public Location store(JournalCommand data, boolean sync, IndexAware before,Ru * the JournalMessage is used to update the index just like it would be done * during a recovery process. */ - public Location store(JournalCommand data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { + Location store(JournalCommand data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { try { ByteSequence sequence = toByteSequence(data); Location location; @@ -1162,7 +1092,8 @@ public Location store(JournalCommand data, boolean sync, IndexAware before, R long totalTimeMillis = end - start; if (LOG_SLOW_ACCESS_TIME > 0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); + LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms", + start2 - start, end - start2); } persistenceAdapterStatistics.addSlowWriteTime(totalTimeMillis); } @@ -1179,7 +1110,7 @@ public Location store(JournalCommand data, boolean sync, IndexAware before, R return location; } catch (IOException ioe) { - LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); + LOG.error("KahaDB failed to store to Journal, command of type: {}", data.type(), ioe); brokerService.handleIOException(ioe); throw ioe; } @@ -1198,7 +1129,7 @@ public JournalCommand load(Location location) throws IOException { long totalTimeMillis = System.currentTimeMillis() - start; if( LOG_SLOW_ACCESS_TIME>0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: Journal read took: "+ totalTimeMillis +" ms"); + LOG.info("Slow KahaDB access: Journal read took: {} ms", totalTimeMillis); } persistenceAdapterStatistics.addSlowReadTime(totalTimeMillis); } @@ -1234,7 +1165,7 @@ void process(JournalCommand data, final Location location, final Location inD // just recover producer audit data.visit(new Visitor() { @Override - public void visit(KahaAddMessageCommand command) throws IOException { + public void visit(KahaAddMessageCommand command) { metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); } }); @@ -1247,12 +1178,8 @@ private void initMessageStore(JournalCommand data) throws IOException { public void visit(KahaAddMessageCommand command) throws IOException { final KahaDestination destination = command.getDestination(); if (!storedDestinations.containsKey(key(destination))) { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - getStoredDestination(destination, tx); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + getStoredDestination(destination, tx)); } } }); @@ -1277,7 +1204,7 @@ public void visit(KahaRemoveMessageCommand command) throws IOException { } @Override - public void visit(KahaPrepareCommand command) throws IOException { + public void visit(KahaPrepareCommand command) { process(command, location); } @@ -1293,7 +1220,7 @@ public void visit(KahaRollbackCommand command) throws IOException { @Override public void visit(KahaRemoveDestinationCommand command) throws IOException { - process(command, location); + process(command); } @Override @@ -1302,12 +1229,12 @@ public void visit(KahaSubscriptionCommand command) throws IOException { } @Override - public void visit(KahaProducerAuditCommand command) throws IOException { + public void visit(KahaProducerAuditCommand command) { processLocation(location); } @Override - public void visit(KahaAckMessageFileMapCommand command) throws IOException { + public void visit(KahaAckMessageFileMapCommand command) { processLocation(location); } @@ -1329,40 +1256,33 @@ public void visit(KahaRewrittenDataFileCommand command) throws IOException { } @SuppressWarnings("rawtypes") - protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { + void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { if (command.hasTransactionInfo()) { List inflightTx = getInflightTx(command.getTransactionInfo()); inflightTx.add(new AddOperation(command, location, runWithIndexLock)); } else { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - long assignedIndex = updateIndex(tx, command, location); - if (runWithIndexLock != null) { - runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); - } + pageFile.tx().execute(tx -> { + long assignedIndex = updateIndex(tx, command, location); + if (runWithIndexLock != null) { + runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); } }); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } } protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command, location)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @@ -1372,59 +1292,47 @@ protected void process(final KahaRemoveMessageCommand command, final Location lo List inflightTx = getInflightTx(command.getTransactionInfo()); inflightTx.add(new RemoveOperation(command, location)); } else { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command, location)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } } - protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { - this.indexLock.writeLock().lock(); + protected void process(final KahaRemoveDestinationCommand command) throws IOException { + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command, location)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } protected void processLocation(final Location location) { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { metadata.lastUpdate = location; } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @SuppressWarnings("rawtypes") - protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { + void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); List inflightTx; synchronized (inflightTransactions) { @@ -1444,27 +1352,24 @@ protected void process(KahaCommitCommand command, final Location location, final } final List messagingTx = inflightTx; - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Operation op : messagingTx) { - op.execute(tx); - recordAckMessageReferenceLocation(location, op.getLocation()); - } + pageFile.tx().execute(tx -> { + for (Operation op : messagingTx) { + op.execute(tx); + recordAckMessageReferenceLocation(location, op.getLocation()); } }); metadata.lastUpdate = location; } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @SuppressWarnings("rawtypes") protected void process(KahaPrepareCommand command, Location location) { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); - List tx = null; + List tx; synchronized (inflightTransactions) { tx = inflightTransactions.remove(key); if (tx != null) { @@ -1472,13 +1377,13 @@ protected void process(KahaPrepareCommand command, Location location) { } } if (tx != null && !tx.isEmpty()) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (Operation op : tx) { recordAckMessageReferenceLocation(location, op.getLocation()); } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1486,7 +1391,7 @@ protected void process(KahaPrepareCommand command, Location location) { @SuppressWarnings("rawtypes") protected void process(KahaRollbackCommand command, Location location) throws IOException { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); - List updates = null; + List updates; synchronized (inflightTransactions) { updates = inflightTransactions.remove(key); if (updates == null) { @@ -1494,13 +1399,13 @@ protected void process(KahaRollbackCommand command, Location location) throws I } } if (key.isXATransaction() && updates != null && !updates.isEmpty()) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (Operation op : updates) { recordAckMessageReferenceLocation(location, op.getLocation()); } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1523,8 +1428,7 @@ protected void process(KahaRewrittenDataFileCommand command, Location location) // These methods do the actual index updates. // ///////////////////////////////////////////////////////////////// - protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); - private final HashSet journalFilesBeingReplicated = new HashSet<>(); + protected final ReentrantLock indexLock = new ReentrantLock(); long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx); @@ -1649,10 +1553,11 @@ void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackL recordAckMessageReferenceLocation(ackLocation, keys.location); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { - LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); + LOG.debug("message not found in order index: {} for: {}", sequenceId, + command.getMessageId()); } } else if (LOG.isDebugEnabled()) { - LOG.debug("message not found in sequence id index: " + command.getMessageId()); + LOG.debug("message not found in sequence id index: {}", command.getMessageId()); } } else { // In the topic case we need remove the message once it's been acked @@ -1676,7 +1581,8 @@ void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackL removeAckLocation(command, tx, sd, subscriptionKey, sequence); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { - LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); + LOG.debug("on ack, no message sequence exists for id: {} and sub: {}", + command.getMessageId(), command.getSubscriptionKey()); } } @@ -1698,7 +1604,7 @@ private void recordAckMessageReferenceLocation(Location ackLocation, Location me } } - void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { + void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); sd.orderIndex.remove(tx); @@ -1747,7 +1653,7 @@ void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location locat Location existing = sd.subLocations.get(tx, subscriptionKey); if (existing != null && existing.compareTo(location) == 0) { // replay on recovery, ignore - LOG.trace("ignoring journal replay of replay of sub from: " + location); + LOG.trace("ignoring journal replay of replay of sub from: {}", location); return; } @@ -1777,7 +1683,7 @@ void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location locat // remove the stored destination KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); removeDestinationCommand.setDestination(command.getDestination()); - updateIndex(tx, removeDestinationCommand, null); + updateIndex(tx, removeDestinationCommand); clearStoreStats(command.getDestination()); } } @@ -1786,19 +1692,15 @@ void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location locat private void checkpointUpdate(final boolean cleanup) throws IOException { checkpointLock.writeLock().lock(); try { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - Set filesToGc = pageFile.tx().execute(new Transaction.CallableClosure, IOException>() { - @Override - public Set execute(Transaction tx) throws IOException { - return checkpointUpdate(tx, cleanup); - } - }); + Set filesToGc = pageFile.tx().execute((Transaction.CallableClosure, IOException>) + tx -> checkpointUpdate(tx, cleanup)); pageFile.flush(); // after the index update such that partial removal does not leave dangling references in the index. journal.removeDataFiles(filesToGc); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } finally { @@ -1834,17 +1736,13 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio gcCandidateSet.addAll(completeFileSet); if (LOG.isTraceEnabled()) { - LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); + LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet); } if (lastUpdate != null) { // we won't delete past the last update, ackCompaction journal can be a candidate in error - gcCandidateSet.removeAll(new TreeSet(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); - } - - // Don't GC files under replication - if( journalFilesBeingReplicated!=null ) { - gcCandidateSet.removeAll(journalFilesBeingReplicated); + gcCandidateSet.removeAll( + new TreeSet<>(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); } if (metadata.producerSequenceIdTrackerLocation != null) { @@ -1853,12 +1751,12 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio // rewrite so we don't prevent gc metadata.producerSequenceIdTracker.setModified(true); if (LOG.isTraceEnabled()) { - LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); + LOG.trace("rewriting producerSequenceIdTracker:{}", metadata.producerSequenceIdTrackerLocation); } } gcCandidateSet.remove(dataFileId); if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet); + LOG.trace("gc candidates after producerSequenceIdTrackerLocation:{}, {}", metadata.producerSequenceIdTrackerLocation, gcCandidateSet); } } @@ -1866,7 +1764,7 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); gcCandidateSet.remove(dataFileId); if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet); + LOG.trace("gc candidates after ackMessageFileMapLocation:{}, {}", metadata.ackMessageFileMapLocation, gcCandidateSet); } } @@ -1877,7 +1775,7 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio } } if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); + LOG.trace("gc candidates after in progress tx range:{}, {}", Arrays.asList(inProgressTxRange), gcCandidateSet); } // Go through all the destinations to see if any of them can remove GC candidates. @@ -1887,28 +1785,32 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio } // Use a visitor to cut down the number of pages that we load - entry.getValue().locationIndex.visit(tx, new BTreeVisitor() { - int last=-1; + entry.getValue().locationIndex.visit(tx, new BTreeVisitor<>() { + int last = -1; + @Override public boolean isInterestedInKeysBetween(Location first, Location second) { - if( first==null ) { - SortedSet subset = gcCandidateSet.headSet(second.getDataFileId()+1); - if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + if (first == null) { + SortedSet subset = gcCandidateSet.headSet( + second.getDataFileId() + 1); + if (!subset.isEmpty() && subset.last() == second.getDataFileId()) { subset.remove(second.getDataFileId()); } return !subset.isEmpty(); - } else if( second==null ) { - SortedSet subset = gcCandidateSet.tailSet(first.getDataFileId()); - if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + } else if (second == null) { + SortedSet subset = gcCandidateSet.tailSet( + first.getDataFileId()); + if (!subset.isEmpty() && subset.first() == first.getDataFileId()) { subset.remove(first.getDataFileId()); } return !subset.isEmpty(); } else { - SortedSet subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); - if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + SortedSet subset = gcCandidateSet.subSet(first.getDataFileId(), + second.getDataFileId() + 1); + if (!subset.isEmpty() && subset.first() == first.getDataFileId()) { subset.remove(first.getDataFileId()); } - if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + if (!subset.isEmpty() && subset.last() == second.getDataFileId()) { subset.remove(second.getDataFileId()); } return !subset.isEmpty(); @@ -1919,7 +1821,7 @@ public boolean isInterestedInKeysBetween(Location first, Location second) { public void visit(List keys, List values) { for (Location l : keys) { int fileId = l.getDataFileId(); - if( last != fileId ) { + if (last != fileId) { gcCandidateSet.remove(fileId); last = fileId; } @@ -1974,14 +1876,14 @@ public void visit(List keys, List values) { } if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); + LOG.trace("gc candidates after dest:{}, {}", entry.getKey(), gcCandidateSet); } } // check we are not deleting file with ack for in-use journal files if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates: " + gcCandidateSet); - LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); + LOG.trace("gc candidates: {}", gcCandidateSet); + LOG.trace("ackMessageFileMap: {}", metadata.ackMessageFileMap); } boolean ackMessageFileMapMod = false; @@ -2002,8 +1904,8 @@ public void visit(List keys, List values) { metadata.ackMessageFileMapDirtyFlag.lazySet(true); } else { if (LOG.isTraceEnabled()) { - LOG.trace("not removing data file: " + candidate - + " as contained ack(s) refer to referenced file: " + referencedFileIds); + LOG.trace("not removing data file: {} as contained ack(s) refer to referenced file: {}", + candidate, referencedFileIds); } } } @@ -2088,7 +1990,7 @@ public void run() { checkpointLock.readLock().lock(); try { // Lock index to capture the ackMessageFileMap data - indexLock.writeLock().lock(); + indexLock.lock(); try { // Map keys might not be sorted, find the earliest log file to forward acks // from and move only those, future cycles can chip away at more as needed. @@ -2111,7 +2013,7 @@ public void run() { journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } try { @@ -2175,7 +2077,7 @@ private void forwardAllAcks(Integer journalToRead, Set journalLogsRefer Map> updatedAckLocations = new HashMap<>(); - try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { + try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile)) { KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); compactionMarker.setSourceDataFileId(journalToRead); compactionMarker.setRewriteType(forwardsFile.getTypeCode()); @@ -2207,14 +2109,13 @@ private void forwardAllAcks(Integer journalToRead, Set journalLogsRefer LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); // Lock index while we update the ackMessageFileMap. - indexLock.writeLock().lock(); + indexLock.lock(); try { // Update the ack map with the new locations of the acks for (Entry> entry : updatedAckLocations.entrySet()) { Set referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); if (referenceFileIds == null) { - referenceFileIds = new HashSet<>(); - referenceFileIds.addAll(entry.getValue()); + referenceFileIds = new HashSet<>(entry.getValue()); metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); metadata.ackMessageFileMapDirtyFlag.lazySet(true); } else { @@ -2227,7 +2128,7 @@ private void forwardAllAcks(Integer journalToRead, Set journalLogsRefer metadata.ackMessageFileMap.remove(journalToRead); metadata.ackMessageFileMapDirtyFlag.lazySet(true); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); @@ -2257,11 +2158,7 @@ private Location getNextLocationForAckForward(final Location nextLocation, final return location; } - final Runnable nullCompletionCallback = new Runnable() { - @Override - public void run() { - } - }; + final Runnable nullCompletionCallback = () -> {}; private Location checkpointProducerAudit() throws IOException { if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { @@ -2314,15 +2211,11 @@ private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscript return location; } - public HashSet getJournalFilesBeingReplicated() { - return journalFilesBeingReplicated; - } - // ///////////////////////////////////////////////////////////////// // StoredDestination related implementation methods. // ///////////////////////////////////////////////////////////////// - protected final HashMap storedDestinations = new HashMap<>(); + final HashMap storedDestinations = new HashMap<>(); static class MessageKeys { final String messageId; @@ -2339,7 +2232,7 @@ public String toString() { } } - protected class MessageKeysMarshaller extends VariableMarshaller { + class MessageKeysMarshaller extends VariableMarshaller { final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); @Override @@ -2383,7 +2276,7 @@ public String toString() { } } - protected class LastAckMarshaller implements Marshaller { + class LastAckMarshaller implements Marshaller { @Override public void writePayload(LastAck object, DataOutput dataOut) throws IOException { @@ -2417,20 +2310,18 @@ public boolean isDeepCopySupported() { } } - class StoredMessageStoreStatistics { - private PageFile pageFile; + static class StoredMessageStoreStatistics { private Page page; - private long pageId; - private AtomicBoolean loaded = new AtomicBoolean(); - private MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller(); + private final long pageId; + private final AtomicBoolean loaded = new AtomicBoolean(); + private final MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller(); - StoredMessageStoreStatistics(PageFile pageFile, long pageId) { + StoredMessageStoreStatistics(long pageId) { this.pageId = pageId; - this.pageFile = pageFile; } - StoredMessageStoreStatistics(PageFile pageFile, Page page) { - this(pageFile, page.getPageId()); + StoredMessageStoreStatistics(Page page) { + this(page.getPageId()); } public long getPageId() { @@ -2464,6 +2355,7 @@ synchronized void put(Transaction tx, MessageStoreStatistics storeStatistics) th tx.store(page, messageStoreStatisticsMarshaller, true); } } + class StoredDestination { MessageOrderIndex orderIndex = new MessageOrderIndex(); @@ -2496,7 +2388,7 @@ public String toString() { } } - protected class MessageStoreStatisticsMarshaller extends VariableMarshaller { + protected static class MessageStoreStatisticsMarshaller extends VariableMarshaller { @Override public void writePayload(final MessageStoreStatistics object, final DataOutput dataOut) throws IOException { @@ -2528,7 +2420,7 @@ public MessageStoreStatistics readPayload(final DataInput dataIn) throws IOExcep } } - protected class StoredDestinationMarshaller extends VariableMarshaller { + class StoredDestinationMarshaller extends VariableMarshaller { final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); @@ -2546,48 +2438,45 @@ public StoredDestination readPayload(final DataInput dataIn) throws IOException value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong()); } else { // upgrade - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - LinkedHashMap temp = new LinkedHashMap<>(); - - if (metadata.version >= 3) { - // migrate - BTreeIndex> oldAckPositions = - new BTreeIndex<>(pageFile, dataIn.readLong()); - oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); - oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); - oldAckPositions.load(tx); - - - // Do the initial build of the data in memory before writing into the store - // based Ack Positions List to avoid a lot of disk thrashing. - Iterator>> iterator = oldAckPositions.iterator(tx); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - - for(String subKey : entry.getValue()) { - SequenceSet pendingAcks = temp.get(subKey); - if (pendingAcks == null) { - pendingAcks = new SequenceSet(); - temp.put(subKey, pendingAcks); - } - - pendingAcks.add(entry.getKey()); + pageFile.tx().execute(tx -> { + LinkedHashMap temp = new LinkedHashMap<>(); + + if (metadata.version >= 3) { + // migrate + BTreeIndex> oldAckPositions = + new BTreeIndex<>(pageFile, dataIn.readLong()); + oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); + oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); + oldAckPositions.load(tx); + + + // Do the initial build of the data in memory before writing into the store + // based Ack Positions List to avoid a lot of disk thrashing. + Iterator>> iterator = oldAckPositions.iterator(tx); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + + for(String subKey : entry.getValue()) { + SequenceSet pendingAcks = temp.get(subKey); + if (pendingAcks == null) { + pendingAcks = new SequenceSet(); + temp.put(subKey, pendingAcks); } + + pendingAcks.add(entry.getKey()); } } - // Now move the pending messages to ack data into the store backed - // structure. - value.ackPositions = new ListIndex<>(pageFile, tx.allocate()); - value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); - value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); - value.ackPositions.load(tx); - for(String subscriptionKey : temp.keySet()) { - value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); - } - } + // Now move the pending messages to ack data into the store backed + // structure. + value.ackPositions = new ListIndex<>(pageFile, tx.allocate()); + value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); + value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); + value.ackPositions.load(tx); + for(String subscriptionKey : temp.keySet()) { + value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); + } + }); } @@ -2595,14 +2484,11 @@ public void execute(Transaction tx) throws IOException { value.subLocations = new ListIndex<>(pageFile, dataIn.readLong()); } else { // upgrade - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - value.subLocations = new ListIndex<>(pageFile, tx.allocate()); - value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); - value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); - value.subLocations.load(tx); - } + pageFile.tx().execute(tx -> { + value.subLocations = new ListIndex<>(pageFile, tx.allocate()); + value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); + value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); + value.subLocations.load(tx); }); } } @@ -2612,27 +2498,24 @@ public void execute(Transaction tx) throws IOException { value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); } else { // upgrade - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); - value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); - value.orderIndex.lowPriorityIndex.load(tx); - - value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); - value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); - value.orderIndex.highPriorityIndex.load(tx); - } + pageFile.tx().execute((Transaction.Closure) tx -> { + value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); + value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); + value.orderIndex.lowPriorityIndex.load(tx); + + value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); + value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); + value.orderIndex.highPriorityIndex.load(tx); }); } if (metadata.version >= 7) { - value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, dataIn.readLong()); + value.messageStoreStatistics = new StoredMessageStoreStatistics(dataIn.readLong()); } else { pageFile.tx().execute(tx -> { - value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate()); + value.messageStoreStatistics = new StoredMessageStoreStatistics(tx.allocate()); value.messageStoreStatistics.load(tx); }); } @@ -2676,7 +2559,7 @@ public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) thr } } - protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { + StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { String key = key(destination); StoredDestination rc = storedDestinations.get(key); if (rc == null) { @@ -2690,12 +2573,12 @@ protected StoredDestination getStoredDestination(KahaDestination destination, Tr return rc; } - protected MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException { + MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(destination, tx); return sd != null && sd.messageStoreStatistics != null ? sd.messageStoreStatistics.get(tx) : null; } - protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { + StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { String key = key(destination); StoredDestination rc = storedDestinations.get(key); if (rc == null && metadata.destinations.containsKey(tx, key)) { @@ -2728,7 +2611,7 @@ private StoredDestination loadStoredDestination(Transaction tx, String key, bool rc.subLocations = new ListIndex<>(pageFile, tx.allocate()); } - rc.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate()); + rc.messageStoreStatistics = new StoredMessageStoreStatistics(tx.allocate()); metadata.destinations.put(tx, key, rc); } @@ -2792,7 +2675,8 @@ private StoredDestination loadStoredDestination(Transaction tx, String key, bool for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { Entry entry = iterator.next(); for (Iterator> orderIterator = - rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { + rc.orderIndex.iterator(tx, + new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { Long sequence = orderIterator.next().getKey(); addAckLocation(tx, rc, sequence, entry.getKey()); } @@ -2866,7 +2750,7 @@ protected void incrementAndAddSizeToStoreStat(Transaction tx, KahaDestination ka incrementAndAddSizeToStoreStat(tx, key(kahaDestination), sd, size); } - protected void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { + void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); if (storeStats != null) { incrementAndAddSizeToStoreStat(size, storeStats); @@ -2894,7 +2778,7 @@ protected void decrementAndSubSizeToStoreStat(Transaction tx, KahaDestination ka decrementAndSubSizeToStoreStat(tx, key(kahaDestination), sd,size); } - protected void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { + void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); if (storeStats != null) { decrementAndSubSizeToStoreStat(size, storeStats); @@ -2989,25 +2873,6 @@ protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) { return subStats; } - /** - * Determine whether this Destination matches the DestinationType - * - * @param destination - * @param type - * @return - */ - protected boolean matchType(Destination destination, - KahaDestination.DestinationType type) { - if (destination instanceof Topic - && type.equals(KahaDestination.DestinationType.TOPIC)) { - return true; - } else if (destination instanceof Queue - && type.equals(KahaDestination.DestinationType.QUEUE)) { - return true; - } - return false; - } - class LocationSizeMarshaller implements Marshaller { public LocationSizeMarshaller() { @@ -3191,20 +3056,19 @@ private void removeAckLocation(KahaRemoveMessageCommand command, } } - public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { return sd.subscriptionAcks.get(tx, subscriptionKey); } - protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { if (sd.ackPositions != null) { - final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); - return messageSequences; + return sd.ackPositions.get(tx, subscriptionKey); } return null; } - protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { if (sd.ackPositions != null) { SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); if (messageSequences != null) { @@ -3227,7 +3091,7 @@ protected long getStoredMessageCount(Transaction tx, StoredDestination sd, Strin * @return * @throws IOException */ - protected Map getStoredMessageSize(Transaction tx, StoredDestination sd, List subscriptionKeys) throws IOException { + Map getStoredMessageSize(Transaction tx, StoredDestination sd, List subscriptionKeys) throws IOException { final Map subPendingMessageSizes = new HashMap<>(); final Map messageSequencesMap = new HashMap<>(); @@ -3272,7 +3136,7 @@ protected Map getStoredMessageSize(Transaction tx, StoredDes return subPendingMessageSizes; } - protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { long locationSize = 0; if (sd.ackPositions != null) { @@ -3313,18 +3177,15 @@ protected String key(KahaDestination destination) { @SuppressWarnings("rawtypes") private final LinkedHashMap> inflightTransactions = new LinkedHashMap<>(); @SuppressWarnings("rawtypes") - protected final LinkedHashMap> preparedTransactions = new LinkedHashMap<>(); + final LinkedHashMap> preparedTransactions = new LinkedHashMap<>(); @SuppressWarnings("rawtypes") private List getInflightTx(KahaTransactionInfo info) { TransactionId key = TransactionIdConversion.convert(info); List tx; synchronized (inflightTransactions) { - tx = inflightTransactions.get(key); - if (tx == null) { - tx = Collections.synchronizedList(new ArrayList()); - inflightTransactions.put(key, tx); - } + tx = inflightTransactions.computeIfAbsent(key, + k -> Collections.synchronizedList(new ArrayList<>())); } return tx; } @@ -3334,7 +3195,7 @@ private TransactionId key(KahaTransactionInfo transactionInfo) { return TransactionIdConversion.convert(transactionInfo); } - abstract class Operation > { + abstract static class Operation > { final T command; final Location location; @@ -3577,7 +3438,7 @@ public Journal getJournal() throws IOException { return journal; } - protected Metadata getMetadata() { + Metadata getMetadata() { return metadata; } @@ -3723,7 +3584,7 @@ public PersistenceAdapterStatistics getPersistenceAdapterStatistics() { // Internal conversion methods. // ///////////////////////////////////////////////////////////////// - class MessageOrderCursor{ + static class MessageOrderCursor{ long defaultCursorPosition; long lowPriorityCursorPosition; long highPriorityCursorPosition; @@ -3880,9 +3741,9 @@ void resetCursorPosition() { lastLowKey = null; } - void setBatch(Transaction tx, Long sequence) throws IOException { + void setBatch(Long sequence) { if (sequence != null) { - Long nextPosition = sequence + 1; + long nextPosition = sequence + 1; lastDefaultKey = sequence; cursor.defaultCursorPosition = nextPosition; lastHighKey = sequence; @@ -3892,8 +3753,8 @@ void setBatch(Transaction tx, Long sequence) throws IOException { } } - void setBatch(Transaction tx, LastAck last) throws IOException { - setBatch(tx, last.lastAckedSequence); + void setBatch(LastAck last) { + setBatch(last.lastAckedSequence); if (cursor.defaultCursorPosition == 0 && cursor.highPriorityCursorPosition == 0 && cursor.lowPriorityCursorPosition == 0) { @@ -4145,9 +4006,7 @@ public HashSet readPayload(DataInput dataIn) throws IOException { try { return (HashSet) oin.readObject(); } catch (ClassNotFoundException cfe) { - IOException ioe = new IOException("Failed to read HashSet: " + cfe); - ioe.initCause(cfe); - throw ioe; + throw new IOException("Failed to read HashSet: " + cfe, cfe); } } } @@ -4161,7 +4020,7 @@ public void setIndexDirectory(File indexDirectory) { } interface IndexAware { - public void sequenceAssignedWithIndexLocked(long index); + void sequenceAssignedWithIndexLocked(long index); } public String getPreallocationScope() { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index a1a98780e20..a9b4c52258d 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -442,31 +442,28 @@ private void corruptBatchEndEof(int id) throws Exception{ private void corruptOrderIndex(final int num, final int size) throws Exception { //This is because of AMQ-6097, now that the MessageOrderIndex stores the size in the Location, //we need to corrupt that value as well - final KahaDBStore kahaDbStore = (KahaDBStore) ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); - kahaDbStore.indexLock.writeLock().lock(); + final KahaDBStore kahaDbStore = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); + kahaDbStore.indexLock.lock(); try { - kahaDbStore.pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert( - (ActiveMQQueue)destination), tx); - int i = 1; - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - if (i == num) { - //change the size value to the wrong size - sd.orderIndex.get(tx, entry.getKey()); - MessageKeys messageKeys = entry.getValue(); - messageKeys.location.setSize(size); - sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys); - break; - } - i++; + kahaDbStore.pageFile.tx().execute(tx -> { + StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert( + (ActiveMQQueue)destination), tx); + int i = 1; + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + if (i == num) { + //change the size value to the wrong size + sd.orderIndex.get(tx, entry.getKey()); + MessageKeys messageKeys = entry.getValue(); + messageKeys.location.setSize(size); + sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys); + break; } + i++; } }); } finally { - kahaDbStore.indexLock.writeLock().unlock(); + kahaDbStore.indexLock.unlock(); } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java index ca2f27d1235..8f471c9a07f 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java @@ -164,22 +164,19 @@ private void corruptIndex() throws IOException { //blow up the index try { - store.indexLock.writeLock().lock(); - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - entry.getValue().orderIndex.nextMessageId = -100; - entry.getValue().orderIndex.defaultPriorityIndex.clear(tx); - entry.getValue().orderIndex.lowPriorityIndex.clear(tx); - entry.getValue().orderIndex.highPriorityIndex.clear(tx); - } + store.indexLock.lock(); + pageFile.tx().execute(tx -> { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + entry.getValue().orderIndex.nextMessageId = -100; + entry.getValue().orderIndex.defaultPriorityIndex.clear(tx); + entry.getValue().orderIndex.lowPriorityIndex.clear(tx); + entry.getValue().orderIndex.highPriorityIndex.clear(tx); } }); } finally { - store.indexLock.writeLock().unlock(); + store.indexLock.unlock(); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java index 6c6a3f92fc9..a6a470c9eeb 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java @@ -62,11 +62,11 @@ public void forceCleanup() throws IOException { public int getFileMapSize() throws IOException { // ensure save memory publishing, use the right lock - indexLock.readLock().lock(); + indexLock.lock(); try { return getJournal().getFileMap().size(); } finally { - indexLock.readLock().unlock(); + indexLock.unlock(); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java index 404ee07fdd7..db3c875f1b9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java @@ -65,11 +65,11 @@ public void forceCleanup() throws IOException { public int getFileMapSize() throws IOException { // ensure save memory publishing, use the right lock - indexLock.readLock().lock(); + indexLock.lock(); try { return getJournal().getFileMap().size(); } finally { - indexLock.readLock().unlock(); + indexLock.unlock(); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java index 45138561538..4560eed4a82 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java @@ -61,7 +61,7 @@ public void testLocationIndexMatchesOrderIndex() throws Exception { //Iterate over the order index and add up the size of the messages to compare //to the location index - kahaDbStore.indexLock.readLock().lock(); + kahaDbStore.indexLock.lock(); try { long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure() { @Override @@ -79,7 +79,7 @@ public Long execute(Transaction tx) throws IOException { assertEquals("Order index size values don't match message size", size, messageStore.getMessageSize()); } finally { - kahaDbStore.indexLock.readLock().unlock(); + kahaDbStore.indexLock.unlock(); } }