diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index a543e2bf96..fb3eb7a254 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -76,17 +76,14 @@ import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; -import org.apache.activemq.store.kahadb.data.KahaLocation; import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand; import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand; import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand; import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; import org.apache.activemq.store.kahadb.disk.journal.Location; import org.apache.activemq.store.kahadb.disk.page.Transaction; -import org.apache.activemq.store.kahadb.disk.page.Transaction.CallableClosure; import org.apache.activemq.store.kahadb.disk.util.SequenceSet; import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl; -import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; @@ -109,8 +106,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, protected ExecutorService queueExecutor; protected ExecutorService topicExecutor; - protected final List> asyncQueueMaps = new LinkedList<>(); - protected final List> asyncTopicMaps = new LinkedList<>(); + final List> asyncQueueMaps = new LinkedList<>(); + final List> asyncTopicMaps = new LinkedList<>(); final WireFormat wireFormat = new OpenWireFormat(); private SystemUsage usageManager; private LinkedBlockingQueue asyncQueueJobQueue; @@ -233,26 +230,22 @@ public void doStart() throws Exception { this.globalQueueSemaphore = new Semaphore(getMaxAsyncJobs()); this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs()); - this.asyncQueueJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); - this.asyncTopicJobQueue = new LinkedBlockingQueue(getMaxAsyncJobs()); - this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, - asyncQueueJobQueue, new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); - thread.setDaemon(true); - return thread; - } - }); - this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, - asyncTopicJobQueue, new ThreadFactory() { - @Override - public Thread newThread(Runnable runnable) { - Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); - thread.setDaemon(true); - return thread; - } - }); + this.asyncQueueJobQueue = new LinkedBlockingQueue<>(getMaxAsyncJobs()); + this.asyncTopicJobQueue = new LinkedBlockingQueue<>(getMaxAsyncJobs()); + this.queueExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, + TimeUnit.MILLISECONDS, + asyncQueueJobQueue, runnable -> { + Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch"); + thread.setDaemon(true); + return thread; + }); + this.topicExecutor = new StoreTaskExecutor(1, asyncExecutorMaxThreads, 0L, + TimeUnit.MILLISECONDS, + asyncTopicJobQueue, runnable -> { + Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch"); + thread.setDaemon(true); + return thread; + }); } @Override @@ -305,21 +298,18 @@ public void doStop(ServiceStopper stopper) throws Exception { } private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Location execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(destination, tx); - Long sequence = sd.messageIdIndex.get(tx, key); - if (sequence == null) { - return null; - } - return sd.orderIndex.get(tx, sequence).location; + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(destination, tx); + Long sequence = sd.messageIdIndex.get(tx, key); + if (sequence == null) { + return null; } + return sd.orderIndex.get(tx, sequence).location; }); } - protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { - StoreQueueTask task = null; + StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) { + StoreQueueTask task; synchronized (store.asyncTaskMap) { task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); } @@ -327,20 +317,20 @@ protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) } // with asyncTaskMap locked - protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { + void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException { store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); this.queueExecutor.execute(task); } - protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { - StoreTopicTask task = null; + StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) { + StoreTopicTask task; synchronized (store.asyncTaskMap) { task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination())); } return task; } - protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { + void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException { synchronized (store.asyncTaskMap) { store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task); } @@ -409,12 +399,12 @@ private KahaDBMessageStore findMatchingStore(ActiveMQDestination activeMQDestina } public class KahaDBMessageStore extends AbstractMessageStore { - protected final Map asyncTaskMap = new HashMap(); + final Map asyncTaskMap = new HashMap<>(); protected KahaDestination dest; private final int maxAsyncJobs; private final Semaphore localDestinationSemaphore; - protected final HashMap> ackedAndPreparedMap = new HashMap>(); - protected final HashMap> rolledBackAcksMap = new HashMap>(); + protected final HashMap> ackedAndPreparedMap = new HashMap<>(); + protected final HashMap> rolledBackAcksMap = new HashMap<>(); double doneTasks, canceledTasks = 0; @@ -425,13 +415,7 @@ public KahaDBMessageStore(ActiveMQDestination destination) { this.localDestinationSemaphore = new Semaphore(this.maxAsyncJobs); } - @Override - public ActiveMQDestination getDestination() { - return destination; - } - - - private final String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) { + private String recoveredTxStateMapKey(ActiveMQDestination destination, MessageAck ack) { return destination.isQueue() ? destination.getPhysicalName() : ack.getConsumerId().getConnectionId(); } @@ -439,25 +423,22 @@ private final String recoveredTxStateMapKey(ActiveMQDestination destination, Mes // till then they are skipped by the store. // 'at most once' XA guarantee public void trackRecoveredAcks(ArrayList acks) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (MessageAck ack : acks) { final String key = recoveredTxStateMapKey(destination, ack); - Set ackedAndPrepared = ackedAndPreparedMap.get(key); - if (ackedAndPrepared == null) { - ackedAndPrepared = new LinkedHashSet(); - ackedAndPreparedMap.put(key, ackedAndPrepared); - } + Set ackedAndPrepared = ackedAndPreparedMap.computeIfAbsent(key, + k -> new LinkedHashSet<>()); ackedAndPrepared.add(ack.getLastMessageId().toProducerKey()); } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } public void forgetRecoveredAcks(ArrayList acks, boolean rollback) throws IOException { if (acks != null) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (MessageAck ack : acks) { final String id = ack.getLastMessageId().toProducerKey(); @@ -470,11 +451,8 @@ public void forgetRecoveredAcks(ArrayList acks, boolean rollback) th } } if (rollback) { - Set rolledBackAcks = rolledBackAcksMap.get(key); - if (rolledBackAcks == null) { - rolledBackAcks = new LinkedHashSet(); - rolledBackAcksMap.put(key, rolledBackAcks); - } + Set rolledBackAcks = rolledBackAcksMap.computeIfAbsent(key, + k -> new LinkedHashSet<>()); rolledBackAcks.add(id); pageFile.tx().execute(tx -> { incrementAndAddSizeToStoreStat(tx, dest, 0); @@ -482,7 +460,7 @@ public void forgetRecoveredAcks(ArrayList acks, boolean rollback) th } } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -513,7 +491,7 @@ public ListenableFuture asyncAddQueueMessage(final ConnectionContext con public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { if (isConcurrentStoreAndDispatchQueues()) { AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination()); - StoreQueueTask task = null; + StoreQueueTask task; synchronized (asyncTaskMap) { task = (StoreQueueTask) asyncTaskMap.get(key); } @@ -528,11 +506,11 @@ public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws } removeMessage(context, ack); } else { - indexLock.writeLock().lock(); + indexLock.lock(); try { metadata.producerSequenceIdTracker.isDuplicate(ack.getLastMessageId()); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } synchronized (asyncTaskMap) { asyncTaskMap.remove(key); @@ -558,7 +536,7 @@ public void addMessage(final ConnectionContext context, final Message message) t command.setMessage(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); store(command, isEnableJournalDiskSyncs() && message.isResponseRequired(), new IndexAware() { // sync add? (for async, future present from getFutureOrSequenceLong) - Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); + final Object possibleFuture = message.getMessageId().getFutureOrSequenceLong(); @Override public void sequenceAssignedWithIndexLocked(final long sequence) { @@ -566,12 +544,8 @@ public void sequenceAssignedWithIndexLocked(final long sequence) { if (indexListener != null) { if (possibleFuture == null) { trackPendingAdd(dest, sequence); - indexListener.onAdd(new IndexListener.MessageContext(context, message, new Runnable() { - @Override - public void run() { - trackPendingAddComplete(dest, sequence); - } - })); + indexListener.onAdd(new IndexListener.MessageContext(context, message, + () -> trackPendingAddComplete(dest, sequence))); } } } @@ -604,7 +578,8 @@ public void run() { @Override public void updateMessage(Message message) throws IOException { if (LOG.isTraceEnabled()) { - LOG.trace("updating: " + message.getMessageId() + " with deliveryCount: " + message.getRedeliveryCounter()); + LOG.trace("updating: {} with deliveryCount: {}", message.getMessageId(), + message.getRedeliveryCounter()); } KahaUpdateMessageCommand updateMessageCommand = new KahaUpdateMessageCommand(); KahaAddMessageCommand command = new KahaAddMessageCommand(); @@ -645,11 +620,11 @@ public Message getMessage(MessageId identity) throws IOException { // operations... but for now we must // externally synchronize... Location location; - indexLock.writeLock().lock(); + indexLock.lock(); try { location = findMessageLocation(key, dest); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } if (location == null) { return null; @@ -660,80 +635,71 @@ public Message getMessage(MessageId identity) throws IOException { @Override public boolean isEmpty() throws IOException { - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Boolean execute(Transaction tx) throws IOException { - // Iterate through all index entries to get a count of - // messages in the destination. - StoredDestination sd = getStoredDestination(dest, tx); - return sd.locationIndex.isEmpty(tx); - } + return pageFile.tx().execute(tx -> { + // Iterate through all index entries to get a count of + // messages in the destination. + StoredDestination sd = getStoredDestination(dest, tx); + return sd.locationIndex.isEmpty(tx); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @Override public void recover(final MessageRecoveryListener listener) throws Exception { // recovery may involve expiry which will modify - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener); - sd.orderIndex.resetCursorPosition(); - for (Iterator> iterator = - sd.orderIndex.iterator(tx, new MessageOrderCursor()); listener.hasSpace() && - iterator.hasNext(); ) { - Entry entry = iterator.next(); - Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - Message msg = loadMessage(entry.getValue().location); - listener.recoverMessage(msg); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, Integer.MAX_VALUE, listener); + sd.orderIndex.resetCursorPosition(); + for (Iterator> iterator = + sd.orderIndex.iterator(tx, new MessageOrderCursor()); listener.hasSpace() && + iterator.hasNext(); ) { + Entry entry = iterator.next(); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; } + Message msg = loadMessage(entry.getValue().location); + listener.recoverMessage(msg); } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @Override public void recoverNextMessages(final int maxReturned, final MessageRecoveryListener listener) throws Exception { - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - Entry entry = null; - int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); - Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { - entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - Message msg = loadMessage(entry.getValue().location); - msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); - listener.recoverMessage(msg); - counter++; - if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { - break; - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + Entry entry; + int counter = recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext(); ) { + entry = iterator.next(); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; + } + Message msg = loadMessage(entry.getValue().location); + msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); + listener.recoverMessage(msg); + counter++; + if (counter >= maxReturned || !listener.canRecoveryNextMessage()) { + break; } - sd.orderIndex.stoppedIterating(); } + sd.orderIndex.stoppedIterating(); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -747,86 +713,84 @@ public void recoverMessages(final MessageRecoveryContext messageRecoveryContext) throw new IllegalArgumentException("Invalid messageRecoveryContext specified"); } - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - - /** - * [AMQ-9773] - * - * The order index sequence key value increases for every message since the beginning of the - * creation of the index, so the first message available won't be at sequence key value:0 in - * the index when there were older messages acknowledged. - * - * The MessageOrderCursor _position_ value is relative to the index, so there is a disconnect - * between queue _position_ and index sequence value over time. - * - * The MessageRecoveryContext determines the recovery start position based off the provided - * offset, or the position of the requested startMessageId. If a startMessageId is specified, - * but not found in the index, then the value of 0 is used as a fallback. - * - * The MessageRecoveryContext determines the recovery end position based off of the provided - * endMessageId (if provided), or the maximum recovered message count, or if the - * MessageRecoveryListener signals that no more messages should be recovered - * (ie memory is full). - */ - Long startSequenceOffset = null; - Long endSequenceOffset = null; - - if(messageRecoveryContext.getStartMessageId() != null && !messageRecoveryContext.getStartMessageId().isBlank()) { - startSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getStartMessageId())).orElse(0L); - } else { - startSequenceOffset = Optional.ofNullable(messageRecoveryContext.getOffset()).orElse(0L); - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + + /** + * [AMQ-9773] + * + * The order index sequence key value increases for every message since the beginning of the + * creation of the index, so the first message available won't be at sequence key value:0 in + * the index when there were older messages acknowledged. + * + * The MessageOrderCursor _position_ value is relative to the index, so there is a disconnect + * between queue _position_ and index sequence value over time. + * + * The MessageRecoveryContext determines the recovery start position based off the provided + * offset, or the position of the requested startMessageId. If a startMessageId is specified, + * but not found in the index, then the value of 0 is used as a fallback. + * + * The MessageRecoveryContext determines the recovery end position based off of the provided + * endMessageId (if provided), or the maximum recovered message count, or if the + * MessageRecoveryListener signals that no more messages should be recovered + * (ie memory is full). + */ + Long startSequenceOffset; + Long endSequenceOffset = null; + + if(messageRecoveryContext.getStartMessageId() != null && !messageRecoveryContext.getStartMessageId().isBlank()) { + startSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getStartMessageId())).orElse(0L); + } else { + startSequenceOffset = Optional.ofNullable(messageRecoveryContext.getOffset()).orElse(0L); + } - if(messageRecoveryContext.getEndMessageId() != null && !messageRecoveryContext.getEndMessageId().isBlank()) { - endSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getEndMessageId())) - .orElse(startSequenceOffset + Long.valueOf(messageRecoveryContext.getMaxMessageCountReturned())); - messageRecoveryContext.setEndSequenceId(endSequenceOffset); - } + if(messageRecoveryContext.getEndMessageId() != null && !messageRecoveryContext.getEndMessageId().isBlank()) { + endSequenceOffset = Optional.ofNullable(sd.messageIdIndex.get(tx, messageRecoveryContext.getEndMessageId())) + .orElse(startSequenceOffset + (long) messageRecoveryContext.getMaxMessageCountReturned()); + messageRecoveryContext.setEndSequenceId(endSequenceOffset); + } - if(endSequenceOffset != null && - endSequenceOffset < startSequenceOffset) { - LOG.warn("Invalid offset parameters start:{} end:{}", startSequenceOffset, endSequenceOffset); - throw new IllegalStateException("Invalid offset parameters start:" + startSequenceOffset + " end:" + endSequenceOffset); - } + if(endSequenceOffset != null && + endSequenceOffset < startSequenceOffset) { + LOG.warn("Invalid offset parameters start:{} end:{}", startSequenceOffset, endSequenceOffset); + throw new IllegalStateException("Invalid offset parameters start:" + startSequenceOffset + " end:" + endSequenceOffset); + } - Entry entry = null; - recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, messageRecoveryContext.getMaxMessageCountReturned(), messageRecoveryContext); - Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); - Iterator> iterator = (messageRecoveryContext.isUseDedicatedCursor() ? sd.orderIndex.iterator(tx, new MessageOrderCursor(startSequenceOffset)) : sd.orderIndex.iterator(tx)); + Entry entry; + recoverRolledBackAcks(destination.getPhysicalName(), sd, tx, messageRecoveryContext.getMaxMessageCountReturned(), messageRecoveryContext); + Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); + Iterator> iterator = (messageRecoveryContext.isUseDedicatedCursor() ? sd.orderIndex.iterator(tx, + new MessageOrderCursor(startSequenceOffset)) : sd.orderIndex.iterator(tx)); - while (iterator.hasNext()) { - entry = iterator.next(); + while (iterator.hasNext()) { + entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; + } - Message msg = loadMessage(entry.getValue().location); - msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); + Message msg = loadMessage(entry.getValue().location); + msg.getMessageId().setFutureOrSequenceLong(entry.getKey()); - messageRecoveryContext.recoverMessage(msg); - if (!messageRecoveryContext.canRecoveryNextMessage(entry.getKey())) { - break; - } + messageRecoveryContext.recoverMessage(msg); + if (!messageRecoveryContext.canRecoveryNextMessage(entry.getKey())) { + break; } + } - // [AMQ-9773] The sd.orderIndex uses the destination's cursor - if(!messageRecoveryContext.isUseDedicatedCursor()) { - sd.orderIndex.stoppedIterating(); - } + // [AMQ-9773] The sd.orderIndex uses the destination's cursor + if(!messageRecoveryContext.isUseDedicatedCursor()) { + sd.orderIndex.stoppedIterating(); } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } - protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { + int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestination sd, Transaction tx, int maxReturned, MessageRecoveryListener listener) throws Exception { int counter = 0; String id; @@ -862,64 +826,46 @@ protected int recoverRolledBackAcks(String recoveredTxStateMapKey, StoredDestina @Override public void resetBatching() { if (pageFile.isLoaded()) { - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getExistingStoredDestination(dest, tx); - if (sd != null) { - sd.orderIndex.resetCursorPosition();} - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getExistingStoredDestination(dest, tx); + if (sd != null) { + sd.orderIndex.resetCursorPosition();} }); } catch (Exception e) { LOG.error("Failed to reset batching",e); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @Override public void setBatch(final MessageId identity) throws IOException { - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - Long location = (Long) identity.getFutureOrSequenceLong(); - Long pending = sd.orderIndex.minPendingAdd(); - if (pending != null) { - location = Math.min(location, pending-1); - } - sd.orderIndex.setBatch(tx, location); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + Long location = (Long) identity.getFutureOrSequenceLong(); + Long pending = sd.orderIndex.minPendingAdd(); + if (pending != null) { + location = Math.min(location, pending-1); } + sd.orderIndex.setBatch(location); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } - @Override - public void setMemoryUsage(MemoryUsage memoryUsage) { - } - @Override - public void start() throws Exception { - super.start(); - } - @Override - public void stop() throws Exception { - super.stop(); - } - protected void lockAsyncJobQueue() { try { if (!this.localDestinationSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS)) { throw new TimeoutException(this +" timeout waiting for localDestSem:" + this.localDestinationSemaphore); } } catch (Exception e) { - LOG.error("Failed to lock async jobs for " + this.destination, e); + LOG.error("Failed to lock async jobs for {}", this.destination, e); } } @@ -931,7 +877,7 @@ protected void acquireLocalAsyncLock() { try { this.localDestinationSemaphore.acquire(); } catch (InterruptedException e) { - LOG.error("Failed to aquire async lock for " + this.destination, e); + LOG.error("Failed to aquire async lock for {}", this.destination, e); } } @@ -949,25 +895,22 @@ protected void recoverMessageStoreStatistics() throws IOException { try { MessageStoreStatistics recoveredStatistics; lockAsyncJobQueue(); - indexLock.writeLock().lock(); + indexLock.lock(); try { - recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public MessageStoreStatistics execute(Transaction tx) throws IOException { - MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx); - - // Iterate through all index entries to get the size of each message - if (statistics == null) { - StoredDestination sd = getStoredDestination(dest, tx); - statistics = new MessageStoreStatistics(); - for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) { - int locationSize = iterator.next().getKey().getSize(); - statistics.getMessageCount().increment(); - statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); - } + recoveredStatistics = pageFile.tx().execute(tx -> { + MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx); + + // Iterate through all index entries to get the size of each message + if (statistics == null) { + StoredDestination sd = getStoredDestination(dest, tx); + statistics = new MessageStoreStatistics(); + for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) { + int locationSize = iterator.next().getKey().getSize(); + statistics.getMessageCount().increment(); + statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); } - return statistics; } + return statistics; }); Set ackedAndPrepared = ackedAndPreparedMap.get(destination.getPhysicalName()); if (ackedAndPrepared != null) { @@ -976,7 +919,7 @@ public MessageStoreStatistics execute(Transaction tx) throws IOException { getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount()); getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize()); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } finally { unlockAsyncJobQueue(); @@ -1025,10 +968,10 @@ public ListenableFuture asyncAddTopicMessage(final ConnectionContext con @Override public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { - String subscriptionKey = subscriptionKey(clientId, subscriptionName).toString(); + String subscriptionKey = subscriptionKey(clientId, subscriptionName); if (isConcurrentStoreAndDispatchTopics()) { AsyncJobKey key = new AsyncJobKey(messageId, getDestination()); - StoreTopicTask task = null; + StoreTopicTask task; synchronized (asyncTaskMap) { task = (StoreTopicTask) asyncTaskMap.get(key); } @@ -1042,14 +985,14 @@ public void acknowledge(ConnectionContext context, String clientId, String subsc } } } else { - doAcknowledge(context, subscriptionKey, messageId, ack); + doAcknowledge(subscriptionKey, messageId, ack); } } else { - doAcknowledge(context, subscriptionKey, messageId, ack); + doAcknowledge(subscriptionKey, messageId, ack); } } - protected void doAcknowledge(ConnectionContext context, String subscriptionKey, MessageId messageId, MessageAck ack) + protected void doAcknowledge(String subscriptionKey, MessageId messageId, MessageAck ack) throws IOException { KahaRemoveMessageCommand command = new KahaRemoveMessageCommand(); command.setDestination(dest); @@ -1071,11 +1014,11 @@ public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroacti .getSubscriptionName()); KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); - command.setSubscriptionKey(subscriptionKey.toString()); + command.setSubscriptionKey(subscriptionKey); command.setRetroactive(retroactive); org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(subscriptionInfo); command.setSubscriptionInfo(new Buffer(packet.getData(), packet.getOffset(), packet.getLength())); - store(command, isEnableJournalDiskSyncs() && true, null, null); + store(command, isEnableJournalDiskSyncs(), null, null); this.subscriptionCount.incrementAndGet(); } @@ -1083,33 +1026,30 @@ public void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroacti public void deleteSubscription(String clientId, String subscriptionName) throws IOException { KahaSubscriptionCommand command = new KahaSubscriptionCommand(); command.setDestination(dest); - command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName).toString()); - store(command, isEnableJournalDiskSyncs() && true, null, null); + command.setSubscriptionKey(subscriptionKey(clientId, subscriptionName)); + store(command, isEnableJournalDiskSyncs(), null, null); this.subscriptionCount.decrementAndGet(); } @Override public SubscriptionInfo[] getAllSubscriptions() throws IOException { - final ArrayList subscriptions = new ArrayList(); - indexLock.writeLock().lock(); + final ArrayList subscriptions = new ArrayList<>(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry - .getValue().getSubscriptionInfo().newInput())); - subscriptions.add(info); + pageFile.tx().execute((Transaction.Closure) tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + for (Iterator> iterator = sd.subscriptions.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + SubscriptionInfo info = (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(entry + .getValue().getSubscriptionInfo().newInput())); + subscriptions.add(info); - } } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } SubscriptionInfo[] rc = new SubscriptionInfo[subscriptions.size()]; @@ -1120,22 +1060,19 @@ public void execute(Transaction tx) throws IOException { @Override public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public SubscriptionInfo execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); - if (command == null) { - return null; - } - return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command - .getSubscriptionInfo().newInput())); + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + KahaSubscriptionCommand command = sd.subscriptions.get(tx, subscriptionKey); + if (command == null) { + return null; } + return (SubscriptionInfo) wireFormat.unmarshal(new DataInputStream(command + .getSubscriptionInfo().newInput())); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1147,23 +1084,20 @@ public int getMessageCount(String clientId, String subscriptionName) throws IOEx return (int)this.messageStoreSubStats.getMessageCount(subscriptionKey).getCount(); } else { - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Integer execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos == null) { - // The subscription might not exist. - return 0; - } - - return (int) getStoredMessageCount(tx, sd, subscriptionKey); + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos == null) { + // The subscription might not exist. + return 0; } + + return (int) getStoredMessageCount(tx, sd, subscriptionKey); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1175,23 +1109,20 @@ public long getMessageSize(String clientId, String subscriptionName) throws IOEx if (isEnableSubscriptionStatistics()) { return this.messageStoreSubStats.getMessageSize(subscriptionKey).getTotalSize(); } else { - indexLock.writeLock().lock(); + indexLock.lock(); try { - return pageFile.tx().execute(new Transaction.CallableClosure() { - @Override - public Long execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos == null) { - // The subscription might not exist. - return 0l; - } - - return getStoredMessageSize(tx, sd, subscriptionKey); + return pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos == null) { + // The subscription might not exist. + return 0L; } + + return getStoredMessageSize(tx, sd, subscriptionKey); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1199,39 +1130,35 @@ public Long execute(Transaction tx) throws IOException { protected void recoverMessageStoreSubMetrics() throws IOException { if (isEnableSubscriptionStatistics()) { final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics(); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); - List subscriptionKeys = new ArrayList<>(); - for (Iterator> iterator = sd.subscriptions - .iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - - final String subscriptionKey = entry.getKey(); - final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - if (cursorPos != null) { - //add the subscriptions to a list for recovering pending sizes below - subscriptionKeys.add(subscriptionKey); - //recover just the count here as that is fast - statistics.getMessageCount(subscriptionKey) - .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); - } - } + List subscriptionKeys = new ArrayList<>(); + for (Iterator> iterator = sd.subscriptions + .iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); - //Recover the message sizes for each subscription by iterating only 1 time over the order index - //to speed up recovery - final Map subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys); - subPendingMessageSizes.forEach((k,v) -> { - statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0); - }); + final String subscriptionKey = entry.getKey(); + final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + if (cursorPos != null) { + //add the subscriptions to a list for recovering pending sizes below + subscriptionKeys.add(subscriptionKey); + //recover just the count here as that is fast + statistics.getMessageCount(subscriptionKey) + .setCount(getStoredMessageCount(tx, sd, subscriptionKey)); + } } + + //Recover the message sizes for each subscription by iterating only 1 time over the order index + //to speed up recovery + final Map subPendingMessageSizes = getStoredMessageSize(tx, sd, subscriptionKeys); + subPendingMessageSizes.forEach((k,v) -> + statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get() : 0)); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1242,44 +1169,41 @@ public void recoverSubscription(String clientId, String subscriptionName, final final String subscriptionKey = subscriptionKey(clientId, subscriptionName); @SuppressWarnings("unused") final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); - SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); - //If we have ackPositions tracked then compare the first one as individual acknowledge mode - //may have bumped lastAck even though there are earlier messages to still consume - if (subAckPositions != null && !subAckPositions.isEmpty() - && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) { - //we have messages to ack before lastAckedSequence - sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); - } else { - subAckPositions = null; - sd.orderIndex.setBatch(tx, cursorPos); + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + LastAck cursorPos = getLastAck(tx, sd, subscriptionKey); + SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); + //If we have ackPositions tracked then compare the first one as individual acknowledge mode + //may have bumped lastAck even though there are earlier messages to still consume + if (subAckPositions != null && !subAckPositions.isEmpty() + && subAckPositions.getHead().getFirst() < cursorPos.lastAckedSequence) { + //we have messages to ack before lastAckedSequence + sd.orderIndex.setBatch(subAckPositions.getHead().getFirst() - 1); + } else { + subAckPositions = null; + sd.orderIndex.setBatch(cursorPos); + } + recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; } - recoverRolledBackAcks(subscriptionKey, sd, tx, Integer.MAX_VALUE, listener); - Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - //If subAckPositions is set then verify the sequence set contains the message still - //and if it doesn't skip it - if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { - continue; - } - listener.recoverMessage(loadMessage(entry.getValue().location)); + //If subAckPositions is set then verify the sequence set contains the message still + //and if it doesn't skip it + if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { + continue; } - sd.orderIndex.resetCursorPosition(); + listener.recoverMessage(loadMessage(entry.getValue().location)); } + sd.orderIndex.resetCursorPosition(); }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1289,73 +1213,70 @@ public void recoverNextMessages(String clientId, String subscriptionName, final final String subscriptionKey = subscriptionKey(clientId, subscriptionName); @SuppressWarnings("unused") final SubscriptionInfo info = lookupSubscription(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws Exception { - StoredDestination sd = getStoredDestination(dest, tx); - sd.orderIndex.resetCursorPosition(); - MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); - SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey);; - if (moc == null) { - LastAck pos = getLastAck(tx, sd, subscriptionKey); - if (pos == null) { - // sub deleted - return; - } - //If we have ackPositions tracked then compare the first one as individual acknowledge mode - //may have bumped lastAck even though there are earlier messages to still consume - if (subAckPositions != null && !subAckPositions.isEmpty() - && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) { - //we have messages to ack before lastAckedSequence - sd.orderIndex.setBatch(tx, subAckPositions.getHead().getFirst() - 1); - } else { - subAckPositions = null; - sd.orderIndex.setBatch(tx, pos); - } - moc = sd.orderIndex.cursor; + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + sd.orderIndex.resetCursorPosition(); + MessageOrderCursor moc = sd.subscriptionCursors.get(subscriptionKey); + SequenceSet subAckPositions = getSequenceSet(tx, sd, subscriptionKey); + if (moc == null) { + LastAck pos = getLastAck(tx, sd, subscriptionKey); + if (pos == null) { + // sub deleted + return; + } + //If we have ackPositions tracked then compare the first one as individual acknowledge mode + //may have bumped lastAck even though there are earlier messages to still consume + if (subAckPositions != null && !subAckPositions.isEmpty() + && subAckPositions.getHead().getFirst() < pos.lastAckedSequence) { + //we have messages to ack before lastAckedSequence + sd.orderIndex.setBatch(subAckPositions.getHead().getFirst() - 1); } else { - sd.orderIndex.cursor.sync(moc); + subAckPositions = null; + sd.orderIndex.setBatch(pos); } + moc = sd.orderIndex.cursor; + } else { + sd.orderIndex.cursor.sync(moc); + } - Entry entry = null; - int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener); - Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); - for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator - .hasNext();) { - entry = iterator.next(); - if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { - continue; - } - //If subAckPositions is set then verify the sequence set contains the message still - //and if it doesn't skip it - if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { - continue; - } - if (listener.recoverMessage(loadMessage(entry.getValue().location))) { - counter++; - } - if (counter >= maxReturned || listener.hasSpace() == false) { - break; - } + Entry entry = null; + int counter = recoverRolledBackAcks(subscriptionKey, sd, tx, maxReturned, listener); + Set ackedAndPrepared = ackedAndPreparedMap.get(subscriptionKey); + for (Iterator> iterator = sd.orderIndex.iterator(tx, moc); iterator + .hasNext();) { + entry = iterator.next(); + if (ackedAndPrepared != null && ackedAndPrepared.contains(entry.getValue().messageId)) { + continue; } - sd.orderIndex.stoppedIterating(); - if (entry != null) { - MessageOrderCursor copy = sd.orderIndex.cursor.copy(); - sd.subscriptionCursors.put(subscriptionKey, copy); + //If subAckPositions is set then verify the sequence set contains the message still + //and if it doesn't skip it + if (subAckPositions != null && !subAckPositions.contains(entry.getKey())) { + continue; + } + if (listener.recoverMessage(loadMessage(entry.getValue().location))) { + counter++; + } + if (counter >= maxReturned || !listener.hasSpace()) { + break; } } + sd.orderIndex.stoppedIterating(); + if (entry != null) { + MessageOrderCursor copy = sd.orderIndex.cursor.copy(); + sd.subscriptionCursors.put(subscriptionKey, copy); + } }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @Override public Map> recoverExpired(Set subscriptions, int maxBrowse, MessageRecoveryListener listener) throws Exception { - indexLock.writeLock().lock(); + indexLock.lock(); try { return pageFile.tx().execute(tx -> { StoredDestination sd = getStoredDestination(dest, tx); @@ -1406,7 +1327,7 @@ public Map> recoverExpired(Set su return expired; }); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1414,17 +1335,14 @@ public Map> recoverExpired(Set su public void resetBatching(String clientId, String subscriptionName) { try { final String subscriptionKey = subscriptionKey(clientId, subscriptionName); - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = getStoredDestination(dest, tx); - sd.subscriptionCursors.remove(subscriptionKey); - } + pageFile.tx().execute(tx -> { + StoredDestination sd = getStoredDestination(dest, tx); + sd.subscriptionCursors.remove(subscriptionKey); }); }finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } catch (IOException e) { throw new RuntimeException(e); @@ -1501,22 +1419,19 @@ public void deleteAllMessages() throws IOException { @Override public Set getDestinations() { try { - final HashSet rc = new HashSet(); - indexLock.writeLock().lock(); + final HashSet rc = new HashSet<>(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - //Removing isEmpty topic check - see AMQ-5875 - rc.add(convert(entry.getKey())); - } + pageFile.tx().execute(tx -> { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + //Removing isEmpty topic check - see AMQ-5875 + rc.add(convert(entry.getKey())); } }); }finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } return rc; } catch (IOException e) { @@ -1525,17 +1440,17 @@ public void execute(Transaction tx) throws IOException { } @Override - public long getLastMessageBrokerSequenceId() throws IOException { + public long getLastMessageBrokerSequenceId() { return 0; } @Override public long getLastProducerSequenceId(ProducerId id) { - indexLock.writeLock().lock(); + indexLock.lock(); try { return metadata.producerSequenceIdTracker.getLastSeqId(id); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @@ -1578,7 +1493,7 @@ public void checkpoint(boolean sync) throws IOException { Message loadMessage(Location location) throws IOException { try { JournalCommand command = load(location); - KahaAddMessageCommand addMessage = null; + KahaAddMessageCommand addMessage; switch (command.type()) { case KAHA_UPDATE_MESSAGE_COMMAND: addMessage = ((KahaUpdateMessageCommand) command).getMessage(); @@ -1592,8 +1507,7 @@ Message loadMessage(Location location) throws IOException { if (!addMessage.hasMessage()) { throw new IOException("Could not load journal record, null message content at location: " + location); } - Message msg = (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); - return msg; + return (Message) wireFormat.unmarshal(new DataInputStream(addMessage.getMessage().newInput())); } catch (Throwable t) { IOException ioe = IOExceptionSupport.create("Unexpected error on journal read at: " + location , t); LOG.error("Failed to load message at: {}", location , ioe); @@ -1606,13 +1520,6 @@ Message loadMessage(Location location) throws IOException { // Internal conversion methods. // ///////////////////////////////////////////////////////////////// - KahaLocation convert(Location location) { - KahaLocation rc = new KahaLocation(); - rc.setLogId(location.getDataFileId()); - rc.setOffset(location.getOffset()); - return rc; - } - KahaDestination convert(ActiveMQDestination dest) { KahaDestination rc = new KahaDestination(); rc.setName(dest.getPhysicalName()); @@ -1644,10 +1551,6 @@ ActiveMQDestination convert(String dest) { return convert(type, name); } - private ActiveMQDestination convert(KahaDestination commandDestination) { - return convert(commandDestination.getType().getNumber(), commandDestination.getName()); - } - private ActiveMQDestination convert(int type, String name) { switch (KahaDestination.DestinationType.valueOf(type)) { case QUEUE: @@ -1701,18 +1604,18 @@ public String toString() { } public interface StoreTask { - public boolean cancel(); + boolean cancel(); - public void aquireLocks(); + void aquireLocks(); - public void releaseLocks(); + void releaseLocks(); } class StoreQueueTask implements Runnable, StoreTask { protected final Message message; protected final ConnectionContext context; protected final KahaDBMessageStore store; - protected final InnerFutureTask future; + final InnerFutureTask future; protected final AtomicBoolean done = new AtomicBoolean(); protected final AtomicBoolean locked = new AtomicBoolean(); @@ -1781,7 +1684,7 @@ protected Message getMessage() { return this.message; } - private class InnerFutureTask extends FutureTask implements ListenableFuture { + private static class InnerFutureTask extends FutureTask implements ListenableFuture { private final AtomicReference listenerRef = new AtomicReference<>(); @@ -1826,7 +1729,7 @@ private void fireListener() { class StoreTopicTask extends StoreQueueTask { private final int subscriptionCount; - private final List subscriptionKeys = new ArrayList(1); + private final List subscriptionKeys = new ArrayList<>(1); private final KahaDBTopicMessageStore topicStore; public StoreTopicTask(KahaDBTopicMessageStore store, ConnectionContext context, Message message, int subscriptionCount) { @@ -1880,7 +1783,7 @@ public void run() { // apply any acks we have synchronized (this.subscriptionKeys) { for (String key : this.subscriptionKeys) { - this.topicStore.doAcknowledge(context, key, this.message.getMessageId(), null); + this.topicStore.doAcknowledge(key, this.message.getMessageId(), null); } } @@ -1898,7 +1801,7 @@ public void run() { } } - public class StoreTaskExecutor extends ThreadPoolExecutor { + public static class StoreTaskExecutor extends ThreadPoolExecutor { public StoreTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit timeUnit, BlockingQueue queue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, timeUnit, queue, threadFactory); @@ -1915,7 +1818,7 @@ protected void afterExecute(Runnable runnable, Throwable throwable) { } @Override - public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + public JobSchedulerStore createJobSchedulerStore() throws UnsupportedOperationException { return new JobSchedulerStoreImpl(); } diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 85ff18fc80..c21f982560 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -53,19 +53,16 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; -import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.TransactionId; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.protobuf.Buffer; @@ -140,7 +137,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; - protected class Metadata { + class Metadata { protected Page page; protected int state; protected BTreeIndex destinations; @@ -191,7 +188,7 @@ public void read(DataInput is) throws IOException { openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; } - LOG.info("KahaDB is version " + version); + LOG.info("KahaDB is version {}", version); } public void write(DataOutput os) throws IOException { @@ -246,15 +243,15 @@ public void writePayload(Metadata object, DataOutput dataOut) throws IOException public enum PurgeRecoveredXATransactionStrategy { NEVER, COMMIT, - ROLLBACK; + ROLLBACK } protected PageFile pageFile; protected Journal journal; - protected Metadata metadata = new Metadata(); + Metadata metadata = new Metadata(); protected final PersistenceAdapterStatistics persistenceAdapterStatistics = new PersistenceAdapterStatistics(); - protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); + MetadataMarshaller metadataMarshaller = new MetadataMarshaller(); protected boolean failIfDatabaseIsLocked; @@ -324,50 +321,44 @@ public void allowIOResumption() { } private void loadPageFile() throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { final PageFile pageFile = getPageFile(); pageFile.load(); - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - if (pageFile.getPageCount() == 0) { - // First time this is created.. Initialize the metadata - Page page = tx.allocate(); - assert page.getPageId() == 0; - page.set(metadata); - metadata.page = page; - metadata.state = CLOSED_STATE; - metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId()); - - tx.store(metadata.page, metadataMarshaller, true); - } else { - Page page = tx.load(0, metadataMarshaller); - metadata = page.get(); - metadata.page = page; - } - metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); - metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); - metadata.destinations.load(tx); + pageFile.tx().execute(tx -> { + if (pageFile.getPageCount() == 0) { + // First time this is created.. Initialize the metadata + Page page = tx.allocate(); + assert page.getPageId() == 0; + page.set(metadata); + metadata.page = page; + metadata.state = CLOSED_STATE; + metadata.destinations = new BTreeIndex<>(pageFile, tx.allocate().getPageId()); + + tx.store(metadata.page, metadataMarshaller, true); + } else { + Page page = tx.load(0, metadataMarshaller); + metadata = page.get(); + metadata.page = page; } + metadata.destinations.setKeyMarshaller(StringMarshaller.INSTANCE); + metadata.destinations.setValueMarshaller(new StoredDestinationMarshaller()); + metadata.destinations.load(tx); }); // Load up all the destinations since we need to scan all the indexes to figure out which journal files can be deleted. // Perhaps we should just keep an index of file storedDestinations.clear(); - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); - storedDestinations.put(entry.getKey(), sd); - - if (checkForCorruptJournalFiles) { - // sanity check the index also - if (!entry.getValue().locationIndex.isEmpty(tx)) { - if (entry.getValue().orderIndex.nextMessageId <= 0) { - throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); - } + pageFile.tx().execute(tx -> { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + StoredDestination sd = loadStoredDestination(tx, entry.getKey(), entry.getValue().subscriptions!=null); + storedDestinations.put(entry.getKey(), sd); + + if (checkForCorruptJournalFiles) { + // sanity check the index also + if (!entry.getValue().locationIndex.isEmpty(tx)) { + if (entry.getValue().orderIndex.nextMessageId <= 0) { + throw new IOException("Detected uninitialized orderIndex nextMessageId with pending messages for " + entry.getKey()); } } } @@ -375,28 +366,25 @@ public void execute(Transaction tx) throws IOException { }); pageFile.flush(); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } private void startCheckpoint() { if (checkpointInterval == 0 && cleanupInterval == 0) { - LOG.info("periodic checkpoint/cleanup disabled, will occur on clean " + (getCleanupOnStop() ? "shutdown/" : "") + "restart"); + LOG.info("periodic checkpoint/cleanup disabled, will occur on clean {}restart", + getCleanupOnStop() ? "shutdown/" : ""); return; } synchronized (schedulerLock) { if (scheduler == null || scheduler.isShutdown()) { - scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread schedulerThread = new Thread(r); - @Override - public Thread newThread(Runnable r) { - Thread schedulerThread = new Thread(r); - - schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); - schedulerThread.setDaemon(true); + schedulerThread.setName("ActiveMQ Journal Checkpoint Worker"); + schedulerThread.setDaemon(true); - return schedulerThread; - } + return schedulerThread; }); // Short intervals for check-point and cleanups @@ -463,7 +451,8 @@ public void open() throws IOException { try { loadPageFile(); } catch (Throwable t) { - LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:" + t); + LOG.warn("Index corrupted. Recovering the index through journal replay. Cause:{}", + String.valueOf(t)); if (LOG.isDebugEnabled()) { LOG.debug("Index load failure", t); } @@ -489,7 +478,7 @@ public void open() throws IOException { } public void load() throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { IOHelper.mkdirs(directory); if (deleteAllMessages) { @@ -506,7 +495,7 @@ public void load() throws IOException { open(); store(new KahaTraceCommand().setMessage("LOADED " + new Date())); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @@ -536,23 +525,19 @@ public void close() throws IOException, InterruptedException { } public void unload() throws IOException, InterruptedException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { if( pageFile != null && pageFile.isLoaded() ) { metadata.state = CLOSED_STATE; metadata.firstInProgressTransactionLocation = getInProgressTxLocationRange()[0]; if (metadata.page != null) { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - tx.store(metadata.page, metadataMarshaller, true); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + tx.store(metadata.page, metadataMarshaller, true)); } } } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } close(); } @@ -596,11 +581,11 @@ class TranInfo { TransactionId id; Location location; - class opCount { + static class OpCount { int add; int remove; } - HashMap destinationOpCount = new HashMap<>(); + HashMap destinationOpCount = new HashMap<>(); @SuppressWarnings("rawtypes") public void track(Operation operation) { @@ -614,14 +599,10 @@ public void track(Operation operation) { destination = add.getCommand().getDestination(); isAdd = true; } else { - RemoveOperation removeOpperation = (RemoveOperation) operation; - destination = removeOpperation.getCommand().getDestination(); - } - opCount opCount = destinationOpCount.get(destination); - if (opCount == null) { - opCount = new opCount(); - destinationOpCount.put(destination, opCount); + RemoveOperation removeOperation = (RemoveOperation) operation; + destination = removeOperation.getCommand().getDestination(); } + OpCount opCount = destinationOpCount.computeIfAbsent(destination, k -> new OpCount()); if (isAdd) { opCount.add++; } else { @@ -631,12 +612,12 @@ public void track(Operation operation) { @Override public String toString() { - StringBuffer buffer = new StringBuffer(); - buffer.append(location).append(";").append(id).append(";\n"); - for (Entry op : destinationOpCount.entrySet()) { - buffer.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); + StringBuilder builder = new StringBuilder(); + builder.append(location).append(";").append(id).append(";\n"); + for (Entry op : destinationOpCount.entrySet()) { + builder.append(op.getKey()).append('+').append(op.getValue().add).append(',').append('-').append(op.getValue().remove).append(';'); } - return buffer.toString(); + return builder.toString(); } } @@ -696,7 +677,7 @@ public String getPreparedTransaction(TransactionId transactionId) { * @throws IllegalStateException */ private void recover() throws IllegalStateException, IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { long start = System.currentTimeMillis(); @@ -707,7 +688,7 @@ private void recover() throws IllegalStateException, IOException { if (recoveryPosition != null) { int redoCounter = 0; int dataFileRotationTracker = recoveryPosition.getDataFileId(); - LOG.info("Recovering from the journal @" + recoveryPosition); + LOG.info("Recovering from the journal @{}", recoveryPosition); while (recoveryPosition != null) { try { JournalCommand message = load(recoveryPosition); @@ -716,7 +697,7 @@ private void recover() throws IllegalStateException, IOException { redoCounter++; } catch (IOException failedRecovery) { if (isIgnoreMissingJournalfiles()) { - LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); + LOG.debug("Failed to recover data at position:{}", recoveryPosition, failedRecovery); // track this dud location journal.corruptRecoveryLocation(recoveryPosition); } else { @@ -730,29 +711,23 @@ private void recover() throws IllegalStateException, IOException { journal.cleanup(); } if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) { - LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered .."); + LOG.info("@{}, {} entries recovered ..", recoveryPosition, redoCounter); } } if (LOG.isInfoEnabled()) { long end = System.currentTimeMillis(); - LOG.info("Recovery replayed " + redoCounter + " operations from the journal in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Recovery replayed {} operations from the journal in {} seconds.", redoCounter, (end - start) / 1000.0f); } } // We may have to undo some index updates. - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - recoverIndex(tx); - } - }); + pageFile.tx().execute(this::recoverIndex); // rollback any recovered inflight local transactions, and discard any inflight XA transactions. Set toRollback = new HashSet<>(); Set toDiscard = new HashSet<>(); synchronized (inflightTransactions) { - for (Iterator it = inflightTransactions.keySet().iterator(); it.hasNext(); ) { - TransactionId id = it.next(); + for (TransactionId id : inflightTransactions.keySet()) { if (id.isLocalTransaction()) { toRollback.add(id); } else { @@ -761,20 +736,20 @@ public void execute(Transaction tx) throws IOException { } for (TransactionId tx: toRollback) { if (LOG.isDebugEnabled()) { - LOG.debug("rolling back recovered indoubt local transaction " + tx); + LOG.debug("rolling back recovered indoubt local transaction {}", tx); } store(new KahaRollbackCommand().setTransactionInfo(TransactionIdConversion.convertToLocal(tx)), false, null, null); } for (TransactionId tx: toDiscard) { if (LOG.isDebugEnabled()) { - LOG.debug("discarding recovered in-flight XA transaction " + tx); + LOG.debug("discarding recovered in-flight XA transaction {}", tx); } inflightTransactions.remove(tx); } } synchronized (preparedTransactions) { - Set txIds = new LinkedHashSet(preparedTransactions.keySet()); + Set txIds = new LinkedHashSet<>(preparedTransactions.keySet()); for (TransactionId txId : txIds) { switch (purgeRecoveredXATransactionStrategy){ case NEVER: @@ -793,7 +768,7 @@ public void execute(Transaction tx) throws IOException { } } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @@ -802,23 +777,6 @@ private KahaTransactionInfo createLocalTransactionInfo(TransactionId tx) { return TransactionIdConversion.convertToLocal(tx); } - private Location minimum(Location x, - Location y) { - Location min = null; - if (x != null) { - min = x; - if (y != null) { - int compare = y.compareTo(x); - if (compare < 0) { - min = y; - } - } - } else { - min = y; - } - return min; - } - private boolean recoverProducerAudit() throws IOException { boolean requiresReplay = true; if (metadata.producerSequenceIdTrackerLocation != null) { @@ -870,7 +828,7 @@ protected void recoverIndex(Transaction tx) throws IOException { final ArrayList matches = new ArrayList<>(); // Find all the Locations that are >= than the last Append Location. - sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor(lastAppendLocation) { + sd.locationIndex.visit(tx, new BTreeVisitor.GTEVisitor<>(lastAppendLocation) { @Override protected void matched(Location key, Long value) { matches.add(value); @@ -895,7 +853,8 @@ protected void matched(Location key, Long value) { // these the end user should do sync writes to the journal. if (LOG.isInfoEnabled()) { long end = System.currentTimeMillis(); - LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Rolled back {} messages from the index in {} seconds.", undoCounter, + (end - start) / 1000.0f); } } @@ -908,14 +867,14 @@ protected void matched(Location key, Long value) { final SequenceSet ss = new SequenceSet(); for (StoredDestination sd : storedDestinations.values()) { // Use a visitor to cut down the number of pages that we load - sd.locationIndex.visit(tx, new BTreeVisitor() { - int last=-1; + sd.locationIndex.visit(tx, new BTreeVisitor<>() { + int last = -1; @Override public boolean isInterestedInKeysBetween(Location first, Location second) { - if( first==null ) { + if (first == null) { return !ss.contains(0, second.getDataFileId()); - } else if( second==null ) { + } else if (second == null) { return true; } else { return !ss.contains(first.getDataFileId(), second.getDataFileId()); @@ -926,7 +885,7 @@ public boolean isInterestedInKeysBetween(Location first, Location second) { public void visit(List keys, List values) { for (Location l : keys) { int fileId = l.getDataFileId(); - if( last != fileId ) { + if (last != fileId) { ss.add(fileId); last = fileId; } @@ -942,15 +901,13 @@ public void visit(List keys, List values) { for (Entry> entry : metadata.ackMessageFileMap.entrySet()) { missingJournalFiles.add(entry.getKey()); - for (Integer i : entry.getValue()) { - missingJournalFiles.add(i); - } + missingJournalFiles.addAll(entry.getValue()); } missingJournalFiles.removeAll(journal.getFileMap().keySet()); if (!missingJournalFiles.isEmpty()) { - LOG.warn("Some journal files are missing: " + missingJournalFiles); + LOG.warn("Some journal files are missing: {}", missingJournalFiles); } ArrayList> knownCorruption = new ArrayList<>(); @@ -980,7 +937,7 @@ public void visit(List keys, List values) { for (Entry sdEntry : storedDestinations.entrySet()) { final StoredDestination sd = sdEntry.getValue(); final LinkedHashMap matches = new LinkedHashMap<>(); - sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor(missingPredicates) { + sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<>(missingPredicates) { @Override protected void matched(Location key, Long value) { matches.put(value, key); @@ -998,13 +955,14 @@ protected void matched(Location key, Long value) { MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); sd.locationIndex.remove(tx, keys.location); sd.messageIdIndex.remove(tx, keys.messageId); - LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); + LOG.info("[{}] dropped: {} at corrupt location: {}", sdEntry.getKey(), + keys.messageId, keys.location); undoCounter++; decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize()); // TODO: do we need to modify the ack positions for the pub sub case? } } else { - LOG.error("[" + sdEntry.getKey() + "] references corrupt locations: " + matches); + LOG.error("[{}] references corrupt locations: {}", sdEntry.getKey(), matches); throw new IOException("Detected missing/corrupt journal files referenced by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected."); } } @@ -1013,12 +971,12 @@ protected void matched(Location key, Long value) { if (!ignoreMissingJournalfiles) { if (!knownCorruption.isEmpty()) { - LOG.error("Detected corrupt journal files. " + knownCorruption); + LOG.error("Detected corrupt journal files. {}", knownCorruption); throw new IOException("Detected corrupt journal files. " + knownCorruption); } if (!missingJournalFiles.isEmpty()) { - LOG.error("Detected missing journal files. " + missingJournalFiles); + LOG.error("Detected missing journal files. {}", missingJournalFiles); throw new IOException("Detected missing journal files. " + missingJournalFiles); } } @@ -1028,40 +986,12 @@ protected void matched(Location key, Long value) { // should do sync writes to the journal. if (LOG.isInfoEnabled()) { long end = System.currentTimeMillis(); - LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Detected missing/corrupt journal files. Dropped {} messages from the index in {} seconds.", + undoCounter, (end - start) / 1000.0f); } } } - private Location nextRecoveryPosition; - private Location lastRecoveryPosition; - - public void incrementalRecover() throws IOException { - this.indexLock.writeLock().lock(); - try { - if( nextRecoveryPosition == null ) { - if( lastRecoveryPosition==null ) { - nextRecoveryPosition = getRecoveryPosition(); - } else { - nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); - } - } - while (nextRecoveryPosition != null) { - lastRecoveryPosition = nextRecoveryPosition; - metadata.lastUpdate = lastRecoveryPosition; - JournalCommand message = load(lastRecoveryPosition); - process(message, lastRecoveryPosition, (IndexAware) null); - nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition); - } - } finally { - this.indexLock.writeLock().unlock(); - } - } - - public Location getLastUpdatePosition() throws IOException { - return metadata.lastUpdate; - } - private Location getRecoveryPosition() throws IOException { if (!this.forceRecoverIndex) { @@ -1093,20 +1023,20 @@ private Location getNextInitializedLocation(Location location) throws IOExceptio protected void checkpointCleanup(final boolean cleanup) throws IOException { long start; - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { start = System.currentTimeMillis(); if( !opened.get() ) { return; } } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } checkpointUpdate(cleanup); long totalTimeMillis = System.currentTimeMillis() - start; if (LOG_SLOW_ACCESS_TIME > 0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: cleanup took " + totalTimeMillis); + LOG.info("Slow KahaDB access: cleanup took {}", totalTimeMillis); } persistenceAdapterStatistics.addSlowCleanupTime(totalTimeMillis); } @@ -1131,7 +1061,7 @@ public Location store(JournalCommand data, Runnable onJournalStoreComplete) t return store(data, false, null, null, onJournalStoreComplete); } - public Location store(JournalCommand data, boolean sync, IndexAware before,Runnable after) throws IOException { + Location store(JournalCommand data, boolean sync, IndexAware before,Runnable after) throws IOException { return store(data, sync, before, after, null); } @@ -1141,7 +1071,7 @@ public Location store(JournalCommand data, boolean sync, IndexAware before,Ru * the JournalMessage is used to update the index just like it would be done * during a recovery process. */ - public Location store(JournalCommand data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { + Location store(JournalCommand data, boolean sync, IndexAware before, Runnable after, Runnable onJournalStoreComplete) throws IOException { try { ByteSequence sequence = toByteSequence(data); Location location; @@ -1162,7 +1092,8 @@ public Location store(JournalCommand data, boolean sync, IndexAware before, R long totalTimeMillis = end - start; if (LOG_SLOW_ACCESS_TIME > 0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: Journal append took: "+(start2-start)+" ms, Index update took "+(end-start2)+" ms"); + LOG.info("Slow KahaDB access: Journal append took: {} ms, Index update took {} ms", + start2 - start, end - start2); } persistenceAdapterStatistics.addSlowWriteTime(totalTimeMillis); } @@ -1179,7 +1110,7 @@ public Location store(JournalCommand data, boolean sync, IndexAware before, R return location; } catch (IOException ioe) { - LOG.error("KahaDB failed to store to Journal, command of type: " + data.type(), ioe); + LOG.error("KahaDB failed to store to Journal, command of type: {}", data.type(), ioe); brokerService.handleIOException(ioe); throw ioe; } @@ -1198,7 +1129,7 @@ public JournalCommand load(Location location) throws IOException { long totalTimeMillis = System.currentTimeMillis() - start; if( LOG_SLOW_ACCESS_TIME>0 && totalTimeMillis > LOG_SLOW_ACCESS_TIME) { if (LOG.isInfoEnabled()) { - LOG.info("Slow KahaDB access: Journal read took: "+ totalTimeMillis +" ms"); + LOG.info("Slow KahaDB access: Journal read took: {} ms", totalTimeMillis); } persistenceAdapterStatistics.addSlowReadTime(totalTimeMillis); } @@ -1234,7 +1165,7 @@ void process(JournalCommand data, final Location location, final Location inD // just recover producer audit data.visit(new Visitor() { @Override - public void visit(KahaAddMessageCommand command) throws IOException { + public void visit(KahaAddMessageCommand command) { metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId()); } }); @@ -1247,12 +1178,8 @@ private void initMessageStore(JournalCommand data) throws IOException { public void visit(KahaAddMessageCommand command) throws IOException { final KahaDestination destination = command.getDestination(); if (!storedDestinations.containsKey(key(destination))) { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - getStoredDestination(destination, tx); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + getStoredDestination(destination, tx)); } } }); @@ -1277,7 +1204,7 @@ public void visit(KahaRemoveMessageCommand command) throws IOException { } @Override - public void visit(KahaPrepareCommand command) throws IOException { + public void visit(KahaPrepareCommand command) { process(command, location); } @@ -1293,7 +1220,7 @@ public void visit(KahaRollbackCommand command) throws IOException { @Override public void visit(KahaRemoveDestinationCommand command) throws IOException { - process(command, location); + process(command); } @Override @@ -1302,12 +1229,12 @@ public void visit(KahaSubscriptionCommand command) throws IOException { } @Override - public void visit(KahaProducerAuditCommand command) throws IOException { + public void visit(KahaProducerAuditCommand command) { processLocation(location); } @Override - public void visit(KahaAckMessageFileMapCommand command) throws IOException { + public void visit(KahaAckMessageFileMapCommand command) { processLocation(location); } @@ -1329,40 +1256,33 @@ public void visit(KahaRewrittenDataFileCommand command) throws IOException { } @SuppressWarnings("rawtypes") - protected void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { + void process(final KahaAddMessageCommand command, final Location location, final IndexAware runWithIndexLock) throws IOException { if (command.hasTransactionInfo()) { List inflightTx = getInflightTx(command.getTransactionInfo()); inflightTx.add(new AddOperation(command, location, runWithIndexLock)); } else { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - long assignedIndex = updateIndex(tx, command, location); - if (runWithIndexLock != null) { - runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); - } + pageFile.tx().execute(tx -> { + long assignedIndex = updateIndex(tx, command, location); + if (runWithIndexLock != null) { + runWithIndexLock.sequenceAssignedWithIndexLocked(assignedIndex); } }); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } } protected void process(final KahaUpdateMessageCommand command, final Location location) throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command, location)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @@ -1372,59 +1292,47 @@ protected void process(final KahaRemoveMessageCommand command, final Location lo List inflightTx = getInflightTx(command.getTransactionInfo()); inflightTx.add(new RemoveOperation(command, location)); } else { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command, location)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } } - protected void process(final KahaRemoveDestinationCommand command, final Location location) throws IOException { - this.indexLock.writeLock().lock(); + protected void process(final KahaRemoveDestinationCommand command) throws IOException { + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } protected void process(final KahaSubscriptionCommand command, final Location location) throws IOException { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - updateIndex(tx, command, location); - } - }); + pageFile.tx().execute((Transaction.Closure) tx -> + updateIndex(tx, command, location)); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } protected void processLocation(final Location location) { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { metadata.lastUpdate = location; } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } @SuppressWarnings("rawtypes") - protected void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { + void process(KahaCommitCommand command, final Location location, final IndexAware before) throws IOException { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); List inflightTx; synchronized (inflightTransactions) { @@ -1444,27 +1352,24 @@ protected void process(KahaCommitCommand command, final Location location, final } final List messagingTx = inflightTx; - indexLock.writeLock().lock(); + indexLock.lock(); try { - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Operation op : messagingTx) { - op.execute(tx); - recordAckMessageReferenceLocation(location, op.getLocation()); - } + pageFile.tx().execute(tx -> { + for (Operation op : messagingTx) { + op.execute(tx); + recordAckMessageReferenceLocation(location, op.getLocation()); } }); metadata.lastUpdate = location; } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } @SuppressWarnings("rawtypes") protected void process(KahaPrepareCommand command, Location location) { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); - List tx = null; + List tx; synchronized (inflightTransactions) { tx = inflightTransactions.remove(key); if (tx != null) { @@ -1472,13 +1377,13 @@ protected void process(KahaPrepareCommand command, Location location) { } } if (tx != null && !tx.isEmpty()) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (Operation op : tx) { recordAckMessageReferenceLocation(location, op.getLocation()); } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1486,7 +1391,7 @@ protected void process(KahaPrepareCommand command, Location location) { @SuppressWarnings("rawtypes") protected void process(KahaRollbackCommand command, Location location) throws IOException { TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo()); - List updates = null; + List updates; synchronized (inflightTransactions) { updates = inflightTransactions.remove(key); if (updates == null) { @@ -1494,13 +1399,13 @@ protected void process(KahaRollbackCommand command, Location location) throws I } } if (key.isXATransaction() && updates != null && !updates.isEmpty()) { - indexLock.writeLock().lock(); + indexLock.lock(); try { for (Operation op : updates) { recordAckMessageReferenceLocation(location, op.getLocation()); } } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } } } @@ -1523,8 +1428,7 @@ protected void process(KahaRewrittenDataFileCommand command, Location location) // These methods do the actual index updates. // ///////////////////////////////////////////////////////////////// - protected final ReentrantReadWriteLock indexLock = new ReentrantReadWriteLock(); - private final HashSet journalFilesBeingReplicated = new HashSet<>(); + protected final ReentrantLock indexLock = new ReentrantLock(); long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException { StoredDestination sd = getExistingStoredDestination(command.getDestination(), tx); @@ -1649,10 +1553,11 @@ void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackL recordAckMessageReferenceLocation(ackLocation, keys.location); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { - LOG.debug("message not found in order index: " + sequenceId + " for: " + command.getMessageId()); + LOG.debug("message not found in order index: {} for: {}", sequenceId, + command.getMessageId()); } } else if (LOG.isDebugEnabled()) { - LOG.debug("message not found in sequence id index: " + command.getMessageId()); + LOG.debug("message not found in sequence id index: {}", command.getMessageId()); } } else { // In the topic case we need remove the message once it's been acked @@ -1676,7 +1581,8 @@ void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackL removeAckLocation(command, tx, sd, subscriptionKey, sequence); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { - LOG.debug("on ack, no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey()); + LOG.debug("on ack, no message sequence exists for id: {} and sub: {}", + command.getMessageId(), command.getSubscriptionKey()); } } @@ -1698,7 +1604,7 @@ private void recordAckMessageReferenceLocation(Location ackLocation, Location me } } - void updateIndex(Transaction tx, KahaRemoveDestinationCommand command, Location location) throws IOException { + void updateIndex(Transaction tx, KahaRemoveDestinationCommand command) throws IOException { StoredDestination sd = getStoredDestination(command.getDestination(), tx); sd.orderIndex.remove(tx); @@ -1747,7 +1653,7 @@ void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location locat Location existing = sd.subLocations.get(tx, subscriptionKey); if (existing != null && existing.compareTo(location) == 0) { // replay on recovery, ignore - LOG.trace("ignoring journal replay of replay of sub from: " + location); + LOG.trace("ignoring journal replay of replay of sub from: {}", location); return; } @@ -1777,7 +1683,7 @@ void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location locat // remove the stored destination KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand(); removeDestinationCommand.setDestination(command.getDestination()); - updateIndex(tx, removeDestinationCommand, null); + updateIndex(tx, removeDestinationCommand); clearStoreStats(command.getDestination()); } } @@ -1786,19 +1692,15 @@ void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location locat private void checkpointUpdate(final boolean cleanup) throws IOException { checkpointLock.writeLock().lock(); try { - this.indexLock.writeLock().lock(); + this.indexLock.lock(); try { - Set filesToGc = pageFile.tx().execute(new Transaction.CallableClosure, IOException>() { - @Override - public Set execute(Transaction tx) throws IOException { - return checkpointUpdate(tx, cleanup); - } - }); + Set filesToGc = pageFile.tx().execute((Transaction.CallableClosure, IOException>) + tx -> checkpointUpdate(tx, cleanup)); pageFile.flush(); // after the index update such that partial removal does not leave dangling references in the index. journal.removeDataFiles(filesToGc); } finally { - this.indexLock.writeLock().unlock(); + this.indexLock.unlock(); } } finally { @@ -1834,17 +1736,13 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio gcCandidateSet.addAll(completeFileSet); if (LOG.isTraceEnabled()) { - LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); + LOG.trace("Last update: {}, full gc candidates set: {}", lastUpdate, gcCandidateSet); } if (lastUpdate != null) { // we won't delete past the last update, ackCompaction journal can be a candidate in error - gcCandidateSet.removeAll(new TreeSet(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); - } - - // Don't GC files under replication - if( journalFilesBeingReplicated!=null ) { - gcCandidateSet.removeAll(journalFilesBeingReplicated); + gcCandidateSet.removeAll( + new TreeSet<>(gcCandidateSet.tailSet(lastUpdate.getDataFileId()))); } if (metadata.producerSequenceIdTrackerLocation != null) { @@ -1853,12 +1751,12 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio // rewrite so we don't prevent gc metadata.producerSequenceIdTracker.setModified(true); if (LOG.isTraceEnabled()) { - LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation); + LOG.trace("rewriting producerSequenceIdTracker:{}", metadata.producerSequenceIdTrackerLocation); } } gcCandidateSet.remove(dataFileId); if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet); + LOG.trace("gc candidates after producerSequenceIdTrackerLocation:{}, {}", metadata.producerSequenceIdTrackerLocation, gcCandidateSet); } } @@ -1866,7 +1764,7 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId(); gcCandidateSet.remove(dataFileId); if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet); + LOG.trace("gc candidates after ackMessageFileMapLocation:{}, {}", metadata.ackMessageFileMapLocation, gcCandidateSet); } } @@ -1877,7 +1775,7 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio } } if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet); + LOG.trace("gc candidates after in progress tx range:{}, {}", Arrays.asList(inProgressTxRange), gcCandidateSet); } // Go through all the destinations to see if any of them can remove GC candidates. @@ -1887,28 +1785,32 @@ Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOExceptio } // Use a visitor to cut down the number of pages that we load - entry.getValue().locationIndex.visit(tx, new BTreeVisitor() { - int last=-1; + entry.getValue().locationIndex.visit(tx, new BTreeVisitor<>() { + int last = -1; + @Override public boolean isInterestedInKeysBetween(Location first, Location second) { - if( first==null ) { - SortedSet subset = gcCandidateSet.headSet(second.getDataFileId()+1); - if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + if (first == null) { + SortedSet subset = gcCandidateSet.headSet( + second.getDataFileId() + 1); + if (!subset.isEmpty() && subset.last() == second.getDataFileId()) { subset.remove(second.getDataFileId()); } return !subset.isEmpty(); - } else if( second==null ) { - SortedSet subset = gcCandidateSet.tailSet(first.getDataFileId()); - if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + } else if (second == null) { + SortedSet subset = gcCandidateSet.tailSet( + first.getDataFileId()); + if (!subset.isEmpty() && subset.first() == first.getDataFileId()) { subset.remove(first.getDataFileId()); } return !subset.isEmpty(); } else { - SortedSet subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1); - if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) { + SortedSet subset = gcCandidateSet.subSet(first.getDataFileId(), + second.getDataFileId() + 1); + if (!subset.isEmpty() && subset.first() == first.getDataFileId()) { subset.remove(first.getDataFileId()); } - if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) { + if (!subset.isEmpty() && subset.last() == second.getDataFileId()) { subset.remove(second.getDataFileId()); } return !subset.isEmpty(); @@ -1919,7 +1821,7 @@ public boolean isInterestedInKeysBetween(Location first, Location second) { public void visit(List keys, List values) { for (Location l : keys) { int fileId = l.getDataFileId(); - if( last != fileId ) { + if (last != fileId) { gcCandidateSet.remove(fileId); last = fileId; } @@ -1974,14 +1876,14 @@ public void visit(List keys, List values) { } if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet); + LOG.trace("gc candidates after dest:{}, {}", entry.getKey(), gcCandidateSet); } } // check we are not deleting file with ack for in-use journal files if (LOG.isTraceEnabled()) { - LOG.trace("gc candidates: " + gcCandidateSet); - LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap); + LOG.trace("gc candidates: {}", gcCandidateSet); + LOG.trace("ackMessageFileMap: {}", metadata.ackMessageFileMap); } boolean ackMessageFileMapMod = false; @@ -2002,8 +1904,8 @@ public void visit(List keys, List values) { metadata.ackMessageFileMapDirtyFlag.lazySet(true); } else { if (LOG.isTraceEnabled()) { - LOG.trace("not removing data file: " + candidate - + " as contained ack(s) refer to referenced file: " + referencedFileIds); + LOG.trace("not removing data file: {} as contained ack(s) refer to referenced file: {}", + candidate, referencedFileIds); } } } @@ -2088,7 +1990,7 @@ public void run() { checkpointLock.readLock().lock(); try { // Lock index to capture the ackMessageFileMap data - indexLock.writeLock().lock(); + indexLock.lock(); try { // Map keys might not be sorted, find the earliest log file to forward acks // from and move only those, future cycles can chip away at more as needed. @@ -2111,7 +2013,7 @@ public void run() { journalLogsReferenced.addAll(metadata.ackMessageFileMap.get(journalToAdvance)); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } try { @@ -2175,7 +2077,7 @@ private void forwardAllAcks(Integer journalToRead, Set journalLogsRefer Map> updatedAckLocations = new HashMap<>(); - try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile);) { + try (TargetedDataFileAppender appender = new TargetedDataFileAppender(journal, forwardsFile)) { KahaRewrittenDataFileCommand compactionMarker = new KahaRewrittenDataFileCommand(); compactionMarker.setSourceDataFileId(journalToRead); compactionMarker.setRewriteType(forwardsFile.getTypeCode()); @@ -2207,14 +2109,13 @@ private void forwardAllAcks(Integer journalToRead, Set journalLogsRefer LOG.trace("ACKS forwarded, updates for ack locations: {}", updatedAckLocations); // Lock index while we update the ackMessageFileMap. - indexLock.writeLock().lock(); + indexLock.lock(); try { // Update the ack map with the new locations of the acks for (Entry> entry : updatedAckLocations.entrySet()) { Set referenceFileIds = metadata.ackMessageFileMap.get(entry.getKey()); if (referenceFileIds == null) { - referenceFileIds = new HashSet<>(); - referenceFileIds.addAll(entry.getValue()); + referenceFileIds = new HashSet<>(entry.getValue()); metadata.ackMessageFileMap.put(entry.getKey(), referenceFileIds); metadata.ackMessageFileMapDirtyFlag.lazySet(true); } else { @@ -2227,7 +2128,7 @@ private void forwardAllAcks(Integer journalToRead, Set journalLogsRefer metadata.ackMessageFileMap.remove(journalToRead); metadata.ackMessageFileMapDirtyFlag.lazySet(true); } finally { - indexLock.writeLock().unlock(); + indexLock.unlock(); } LOG.trace("ACK File Map following updates: {}", metadata.ackMessageFileMap); @@ -2257,11 +2158,7 @@ private Location getNextLocationForAckForward(final Location nextLocation, final return location; } - final Runnable nullCompletionCallback = new Runnable() { - @Override - public void run() { - } - }; + final Runnable nullCompletionCallback = () -> {}; private Location checkpointProducerAudit() throws IOException { if (metadata.producerSequenceIdTracker == null || metadata.producerSequenceIdTracker.modified()) { @@ -2314,15 +2211,11 @@ private Location checkpointSubscriptionCommand(KahaSubscriptionCommand subscript return location; } - public HashSet getJournalFilesBeingReplicated() { - return journalFilesBeingReplicated; - } - // ///////////////////////////////////////////////////////////////// // StoredDestination related implementation methods. // ///////////////////////////////////////////////////////////////// - protected final HashMap storedDestinations = new HashMap<>(); + final HashMap storedDestinations = new HashMap<>(); static class MessageKeys { final String messageId; @@ -2339,7 +2232,7 @@ public String toString() { } } - protected class MessageKeysMarshaller extends VariableMarshaller { + class MessageKeysMarshaller extends VariableMarshaller { final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); @Override @@ -2383,7 +2276,7 @@ public String toString() { } } - protected class LastAckMarshaller implements Marshaller { + class LastAckMarshaller implements Marshaller { @Override public void writePayload(LastAck object, DataOutput dataOut) throws IOException { @@ -2417,20 +2310,18 @@ public boolean isDeepCopySupported() { } } - class StoredMessageStoreStatistics { - private PageFile pageFile; + static class StoredMessageStoreStatistics { private Page page; - private long pageId; - private AtomicBoolean loaded = new AtomicBoolean(); - private MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller(); + private final long pageId; + private final AtomicBoolean loaded = new AtomicBoolean(); + private final MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller(); - StoredMessageStoreStatistics(PageFile pageFile, long pageId) { + StoredMessageStoreStatistics(long pageId) { this.pageId = pageId; - this.pageFile = pageFile; } - StoredMessageStoreStatistics(PageFile pageFile, Page page) { - this(pageFile, page.getPageId()); + StoredMessageStoreStatistics(Page page) { + this(page.getPageId()); } public long getPageId() { @@ -2464,6 +2355,7 @@ synchronized void put(Transaction tx, MessageStoreStatistics storeStatistics) th tx.store(page, messageStoreStatisticsMarshaller, true); } } + class StoredDestination { MessageOrderIndex orderIndex = new MessageOrderIndex(); @@ -2496,7 +2388,7 @@ public String toString() { } } - protected class MessageStoreStatisticsMarshaller extends VariableMarshaller { + protected static class MessageStoreStatisticsMarshaller extends VariableMarshaller { @Override public void writePayload(final MessageStoreStatistics object, final DataOutput dataOut) throws IOException { @@ -2528,7 +2420,7 @@ public MessageStoreStatistics readPayload(final DataInput dataIn) throws IOExcep } } - protected class StoredDestinationMarshaller extends VariableMarshaller { + class StoredDestinationMarshaller extends VariableMarshaller { final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); @@ -2546,48 +2438,45 @@ public StoredDestination readPayload(final DataInput dataIn) throws IOException value.ackPositions = new ListIndex<>(pageFile, dataIn.readLong()); } else { // upgrade - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - LinkedHashMap temp = new LinkedHashMap<>(); - - if (metadata.version >= 3) { - // migrate - BTreeIndex> oldAckPositions = - new BTreeIndex<>(pageFile, dataIn.readLong()); - oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); - oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); - oldAckPositions.load(tx); - - - // Do the initial build of the data in memory before writing into the store - // based Ack Positions List to avoid a lot of disk thrashing. - Iterator>> iterator = oldAckPositions.iterator(tx); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - - for(String subKey : entry.getValue()) { - SequenceSet pendingAcks = temp.get(subKey); - if (pendingAcks == null) { - pendingAcks = new SequenceSet(); - temp.put(subKey, pendingAcks); - } - - pendingAcks.add(entry.getKey()); + pageFile.tx().execute(tx -> { + LinkedHashMap temp = new LinkedHashMap<>(); + + if (metadata.version >= 3) { + // migrate + BTreeIndex> oldAckPositions = + new BTreeIndex<>(pageFile, dataIn.readLong()); + oldAckPositions.setKeyMarshaller(LongMarshaller.INSTANCE); + oldAckPositions.setValueMarshaller(HashSetStringMarshaller.INSTANCE); + oldAckPositions.load(tx); + + + // Do the initial build of the data in memory before writing into the store + // based Ack Positions List to avoid a lot of disk thrashing. + Iterator>> iterator = oldAckPositions.iterator(tx); + while (iterator.hasNext()) { + Entry> entry = iterator.next(); + + for(String subKey : entry.getValue()) { + SequenceSet pendingAcks = temp.get(subKey); + if (pendingAcks == null) { + pendingAcks = new SequenceSet(); + temp.put(subKey, pendingAcks); } + + pendingAcks.add(entry.getKey()); } } - // Now move the pending messages to ack data into the store backed - // structure. - value.ackPositions = new ListIndex<>(pageFile, tx.allocate()); - value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); - value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); - value.ackPositions.load(tx); - for(String subscriptionKey : temp.keySet()) { - value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); - } - } + // Now move the pending messages to ack data into the store backed + // structure. + value.ackPositions = new ListIndex<>(pageFile, tx.allocate()); + value.ackPositions.setKeyMarshaller(StringMarshaller.INSTANCE); + value.ackPositions.setValueMarshaller(SequenceSet.Marshaller.INSTANCE); + value.ackPositions.load(tx); + for(String subscriptionKey : temp.keySet()) { + value.ackPositions.put(tx, subscriptionKey, temp.get(subscriptionKey)); + } + }); } @@ -2595,14 +2484,11 @@ public void execute(Transaction tx) throws IOException { value.subLocations = new ListIndex<>(pageFile, dataIn.readLong()); } else { // upgrade - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - value.subLocations = new ListIndex<>(pageFile, tx.allocate()); - value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); - value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); - value.subLocations.load(tx); - } + pageFile.tx().execute(tx -> { + value.subLocations = new ListIndex<>(pageFile, tx.allocate()); + value.subLocations.setKeyMarshaller(StringMarshaller.INSTANCE); + value.subLocations.setValueMarshaller(LocationMarshaller.INSTANCE); + value.subLocations.load(tx); }); } } @@ -2612,27 +2498,24 @@ public void execute(Transaction tx) throws IOException { value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); } else { // upgrade - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); - value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); - value.orderIndex.lowPriorityIndex.load(tx); - - value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); - value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); - value.orderIndex.highPriorityIndex.load(tx); - } + pageFile.tx().execute((Transaction.Closure) tx -> { + value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); + value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); + value.orderIndex.lowPriorityIndex.load(tx); + + value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, tx.allocate()); + value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); + value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller); + value.orderIndex.highPriorityIndex.load(tx); }); } if (metadata.version >= 7) { - value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, dataIn.readLong()); + value.messageStoreStatistics = new StoredMessageStoreStatistics(dataIn.readLong()); } else { pageFile.tx().execute(tx -> { - value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate()); + value.messageStoreStatistics = new StoredMessageStoreStatistics(tx.allocate()); value.messageStoreStatistics.load(tx); }); } @@ -2676,7 +2559,7 @@ public void writePayload(KahaSubscriptionCommand object, DataOutput dataOut) thr } } - protected StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { + StoredDestination getStoredDestination(KahaDestination destination, Transaction tx) throws IOException { String key = key(destination); StoredDestination rc = storedDestinations.get(key); if (rc == null) { @@ -2690,12 +2573,12 @@ protected StoredDestination getStoredDestination(KahaDestination destination, Tr return rc; } - protected MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException { + MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException { StoredDestination sd = getStoredDestination(destination, tx); return sd != null && sd.messageStoreStatistics != null ? sd.messageStoreStatistics.get(tx) : null; } - protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { + StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { String key = key(destination); StoredDestination rc = storedDestinations.get(key); if (rc == null && metadata.destinations.containsKey(tx, key)) { @@ -2728,7 +2611,7 @@ private StoredDestination loadStoredDestination(Transaction tx, String key, bool rc.subLocations = new ListIndex<>(pageFile, tx.allocate()); } - rc.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate()); + rc.messageStoreStatistics = new StoredMessageStoreStatistics(tx.allocate()); metadata.destinations.put(tx, key, rc); } @@ -2792,7 +2675,8 @@ private StoredDestination loadStoredDestination(Transaction tx, String key, bool for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { Entry entry = iterator.next(); for (Iterator> orderIterator = - rc.orderIndex.iterator(tx, new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { + rc.orderIndex.iterator(tx, + new MessageOrderCursor(entry.getValue().lastAckedSequence)); orderIterator.hasNext(); ) { Long sequence = orderIterator.next().getKey(); addAckLocation(tx, rc, sequence, entry.getKey()); } @@ -2866,7 +2750,7 @@ protected void incrementAndAddSizeToStoreStat(Transaction tx, KahaDestination ka incrementAndAddSizeToStoreStat(tx, key(kahaDestination), sd, size); } - protected void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { + void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); if (storeStats != null) { incrementAndAddSizeToStoreStat(size, storeStats); @@ -2894,7 +2778,7 @@ protected void decrementAndSubSizeToStoreStat(Transaction tx, KahaDestination ka decrementAndSubSizeToStoreStat(tx, key(kahaDestination), sd,size); } - protected void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { + void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); if (storeStats != null) { decrementAndSubSizeToStoreStat(size, storeStats); @@ -2989,25 +2873,6 @@ protected MessageStoreSubscriptionStatistics getSubStats(String kahaDestKey) { return subStats; } - /** - * Determine whether this Destination matches the DestinationType - * - * @param destination - * @param type - * @return - */ - protected boolean matchType(Destination destination, - KahaDestination.DestinationType type) { - if (destination instanceof Topic - && type.equals(KahaDestination.DestinationType.TOPIC)) { - return true; - } else if (destination instanceof Queue - && type.equals(KahaDestination.DestinationType.QUEUE)) { - return true; - } - return false; - } - class LocationSizeMarshaller implements Marshaller { public LocationSizeMarshaller() { @@ -3191,20 +3056,19 @@ private void removeAckLocation(KahaRemoveMessageCommand command, } } - public LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + LastAck getLastAck(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { return sd.subscriptionAcks.get(tx, subscriptionKey); } - protected SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + SequenceSet getSequenceSet(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { if (sd.ackPositions != null) { - final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); - return messageSequences; + return sd.ackPositions.get(tx, subscriptionKey); } return null; } - protected long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + long getStoredMessageCount(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { if (sd.ackPositions != null) { SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); if (messageSequences != null) { @@ -3227,7 +3091,7 @@ protected long getStoredMessageCount(Transaction tx, StoredDestination sd, Strin * @return * @throws IOException */ - protected Map getStoredMessageSize(Transaction tx, StoredDestination sd, List subscriptionKeys) throws IOException { + Map getStoredMessageSize(Transaction tx, StoredDestination sd, List subscriptionKeys) throws IOException { final Map subPendingMessageSizes = new HashMap<>(); final Map messageSequencesMap = new HashMap<>(); @@ -3272,7 +3136,7 @@ protected Map getStoredMessageSize(Transaction tx, StoredDes return subPendingMessageSizes; } - protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { long locationSize = 0; if (sd.ackPositions != null) { @@ -3313,18 +3177,15 @@ protected String key(KahaDestination destination) { @SuppressWarnings("rawtypes") private final LinkedHashMap> inflightTransactions = new LinkedHashMap<>(); @SuppressWarnings("rawtypes") - protected final LinkedHashMap> preparedTransactions = new LinkedHashMap<>(); + final LinkedHashMap> preparedTransactions = new LinkedHashMap<>(); @SuppressWarnings("rawtypes") private List getInflightTx(KahaTransactionInfo info) { TransactionId key = TransactionIdConversion.convert(info); List tx; synchronized (inflightTransactions) { - tx = inflightTransactions.get(key); - if (tx == null) { - tx = Collections.synchronizedList(new ArrayList()); - inflightTransactions.put(key, tx); - } + tx = inflightTransactions.computeIfAbsent(key, + k -> Collections.synchronizedList(new ArrayList<>())); } return tx; } @@ -3334,7 +3195,7 @@ private TransactionId key(KahaTransactionInfo transactionInfo) { return TransactionIdConversion.convert(transactionInfo); } - abstract class Operation > { + abstract static class Operation > { final T command; final Location location; @@ -3577,7 +3438,7 @@ public Journal getJournal() throws IOException { return journal; } - protected Metadata getMetadata() { + Metadata getMetadata() { return metadata; } @@ -3723,7 +3584,7 @@ public PersistenceAdapterStatistics getPersistenceAdapterStatistics() { // Internal conversion methods. // ///////////////////////////////////////////////////////////////// - class MessageOrderCursor{ + static class MessageOrderCursor{ long defaultCursorPosition; long lowPriorityCursorPosition; long highPriorityCursorPosition; @@ -3880,9 +3741,9 @@ void resetCursorPosition() { lastLowKey = null; } - void setBatch(Transaction tx, Long sequence) throws IOException { + void setBatch(Long sequence) { if (sequence != null) { - Long nextPosition = sequence + 1; + long nextPosition = sequence + 1; lastDefaultKey = sequence; cursor.defaultCursorPosition = nextPosition; lastHighKey = sequence; @@ -3892,8 +3753,8 @@ void setBatch(Transaction tx, Long sequence) throws IOException { } } - void setBatch(Transaction tx, LastAck last) throws IOException { - setBatch(tx, last.lastAckedSequence); + void setBatch(LastAck last) { + setBatch(last.lastAckedSequence); if (cursor.defaultCursorPosition == 0 && cursor.highPriorityCursorPosition == 0 && cursor.lowPriorityCursorPosition == 0) { @@ -4145,9 +4006,7 @@ public HashSet readPayload(DataInput dataIn) throws IOException { try { return (HashSet) oin.readObject(); } catch (ClassNotFoundException cfe) { - IOException ioe = new IOException("Failed to read HashSet: " + cfe); - ioe.initCause(cfe); - throw ioe; + throw new IOException("Failed to read HashSet: " + cfe, cfe); } } } @@ -4161,7 +4020,7 @@ public void setIndexDirectory(File indexDirectory) { } interface IndexAware { - public void sequenceAssignedWithIndexLocked(long index); + void sequenceAssignedWithIndexLocked(long index); } public String getPreallocationScope() { diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index a1a98780e2..a9b4c52258 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -442,31 +442,28 @@ private void corruptBatchEndEof(int id) throws Exception{ private void corruptOrderIndex(final int num, final int size) throws Exception { //This is because of AMQ-6097, now that the MessageOrderIndex stores the size in the Location, //we need to corrupt that value as well - final KahaDBStore kahaDbStore = (KahaDBStore) ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); - kahaDbStore.indexLock.writeLock().lock(); + final KahaDBStore kahaDbStore = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); + kahaDbStore.indexLock.lock(); try { - kahaDbStore.pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert( - (ActiveMQQueue)destination), tx); - int i = 1; - for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { - Entry entry = iterator.next(); - if (i == num) { - //change the size value to the wrong size - sd.orderIndex.get(tx, entry.getKey()); - MessageKeys messageKeys = entry.getValue(); - messageKeys.location.setSize(size); - sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys); - break; - } - i++; + kahaDbStore.pageFile.tx().execute(tx -> { + StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert( + (ActiveMQQueue)destination), tx); + int i = 1; + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + Entry entry = iterator.next(); + if (i == num) { + //change the size value to the wrong size + sd.orderIndex.get(tx, entry.getKey()); + MessageKeys messageKeys = entry.getValue(); + messageKeys.location.setSize(size); + sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys); + break; } + i++; } }); } finally { - kahaDbStore.indexLock.writeLock().unlock(); + kahaDbStore.indexLock.unlock(); } } diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java index ca2f27d123..8f471c9a07 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java @@ -164,22 +164,19 @@ private void corruptIndex() throws IOException { //blow up the index try { - store.indexLock.writeLock().lock(); - pageFile.tx().execute(new Transaction.Closure() { - @Override - public void execute(Transaction tx) throws IOException { - for (Iterator> iterator = metadata.destinations.iterator(tx); iterator - .hasNext();) { - Entry entry = iterator.next(); - entry.getValue().orderIndex.nextMessageId = -100; - entry.getValue().orderIndex.defaultPriorityIndex.clear(tx); - entry.getValue().orderIndex.lowPriorityIndex.clear(tx); - entry.getValue().orderIndex.highPriorityIndex.clear(tx); - } + store.indexLock.lock(); + pageFile.tx().execute(tx -> { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + entry.getValue().orderIndex.nextMessageId = -100; + entry.getValue().orderIndex.defaultPriorityIndex.clear(tx); + entry.getValue().orderIndex.lowPriorityIndex.clear(tx); + entry.getValue().orderIndex.highPriorityIndex.clear(tx); } }); } finally { - store.indexLock.writeLock().unlock(); + store.indexLock.unlock(); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java index 6c6a3f92fc..a6a470c9ee 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java @@ -62,11 +62,11 @@ public void forceCleanup() throws IOException { public int getFileMapSize() throws IOException { // ensure save memory publishing, use the right lock - indexLock.readLock().lock(); + indexLock.lock(); try { return getJournal().getFileMap().size(); } finally { - indexLock.readLock().unlock(); + indexLock.unlock(); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java index 404ee07fdd..db3c875f1b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java @@ -65,11 +65,11 @@ public void forceCleanup() throws IOException { public int getFileMapSize() throws IOException { // ensure save memory publishing, use the right lock - indexLock.readLock().lock(); + indexLock.lock(); try { return getJournal().getFileMap().size(); } finally { - indexLock.readLock().unlock(); + indexLock.unlock(); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java index 4513856153..4560eed4a8 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java @@ -61,7 +61,7 @@ public void testLocationIndexMatchesOrderIndex() throws Exception { //Iterate over the order index and add up the size of the messages to compare //to the location index - kahaDbStore.indexLock.readLock().lock(); + kahaDbStore.indexLock.lock(); try { long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure() { @Override @@ -79,7 +79,7 @@ public Long execute(Transaction tx) throws IOException { assertEquals("Order index size values don't match message size", size, messageStore.getMessageSize()); } finally { - kahaDbStore.indexLock.readLock().unlock(); + kahaDbStore.indexLock.unlock(); } }