diff options
author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-20 16:42:29 +0000 |
---|---|---|
committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-20 16:42:29 +0000 |
commit | 2bf9fcba59a576861dae424ee5ddfe8fae0a1af9 (patch) | |
tree | d9d354f25cf6a6401d08d227baaf1a9124e03f17 | |
parent | 02291ffa8f75a05c118f8031263cec14d55bf894 (diff) | |
download | qpid-python-2bf9fcba59a576861dae424ee5ddfe8fae0a1af9.tar.gz |
QPID-5907 : [Java Broker] Add ability for broker to startup while persistent queues are still being recovered
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1612118 13f79535-47bb-0310-9956-ffa450edef68
17 files changed, 1473 insertions, 374 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index 4072c483b2..cf187fe1e9 100644 --- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -18,10 +18,9 @@ */ package org.apache.qpid.server.store.berkeleydb; -import static org.apache.qpid.server.store.berkeleydb.BDBUtils.*; +import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG; import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely; import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely; -import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; @@ -122,7 +121,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData) { - long newMessageId = getNextMessageSequenceNumber(); + long newMessageId = getNextMessageId(); if (metaData.isPersistent()) { @@ -134,7 +133,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private long getNextMessageSequenceNumber() + public long getNextMessageId() { long newMessageId; try @@ -182,6 +181,73 @@ public abstract class AbstractBDBMessageStore implements MessageStore } @Override + public StoredMessage<?> getMessage(final long messageId) + { + checkMessageStoreOpen(); + return getMessageInternal(messageId, getEnvironmentFacade()); + } + + @Override + public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Cursor cursor = null; + List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>(); + try + { + cursor = getDeliveryDb().openCursor(null, null); + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + + QueueEntryBinding keyBinding = QueueEntryBinding.getInstance(); + keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key); + + if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + QueueEntryKey entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS) + { + entry = keyBinding.entryToObject(key); + if(entry.getQueueId().equals(queue.getId())) + { + entries.add(entry); + } + else + { + break; + } + } + } + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e); + } + finally + { + closeCursorSafely(cursor, getEnvironmentFacade()); + } + + for(QueueEntryKey entry : entries) + { + UUID queueId = entry.getQueueId(); + long messageId = entry.getMessageId(); + if (!handler.handle(queueId, messageId)) + { + break; + } + } + + } + + + + @Override public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -542,6 +608,33 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + + private StoredBDBMessage<?> getMessageInternal(long messageId, EnvironmentFacade environmentFacade) + { + try + { + DatabaseEntry key = new DatabaseEntry(); + DatabaseEntry value = new DatabaseEntry(); + MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance(); + LongBinding.longToEntry(messageId, key); + if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS) + { + StorableMessageMetaData metaData = valueBinding.entryToObject(value); + StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true); + return message; + } + else + { + return null; + } + + } + catch (DatabaseException e) + { + throw environmentFacade.handleDatabaseException("Cannot visit messages", e); + } + } + /** * Stores a chunk of message data. * diff --git a/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java b/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java index 0e5022a66f..21193b3a37 100644 --- a/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java +++ b/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java @@ -25,7 +25,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -@Retention(RetentionPolicy.SOURCE) +@Retention(RetentionPolicy.CLASS) @Target(ElementType.CONSTRUCTOR) public @interface ManagedObjectFactoryConstructor { diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index d25833b9f7..03bba43d57 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.protocol.CapacityChecker; @@ -107,4 +108,7 @@ public interface AMQQueue<X extends AMQQueue<X>> void setNotificationListener(QueueNotificationListener listener); + void completeRecovery(); + + void recover(ServerMessage<?> message); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 2bfef7f4b8..cb3b5effd0 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -30,6 +30,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -225,6 +226,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> private int _maximumDistinctGroups; private State _state = State.UNINITIALIZED; + private final AtomicBoolean _recovering = new AtomicBoolean(true); + private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>(); protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost) { @@ -246,6 +249,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { _virtualHost.getDurableConfigurationStore().create(asObjectRecord()); } + + _recovering.set(false); } @Override @@ -842,15 +847,69 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> // ------ Enqueue / Dequeue - public void enqueue(ServerMessage message, Action<? super MessageInstance> action) + public final void enqueue(ServerMessage message, Action<? super MessageInstance> action) { incrementQueueCount(); incrementQueueSize(message); _totalMessagesReceived.incrementAndGet(); + if(_recovering.get()) + { + EnqueueRequest request = new EnqueueRequest(message, action); + _postRecoveryQueue.add(request); + + // deal with the case the recovering status changed just as we added to the post recovery queue + if(!_recovering.get() && _postRecoveryQueue.remove(request)) + { + doEnqueue(message, action); + } + } + else + { + doEnqueue(message, action); + } + + } + + public final void recover(ServerMessage message) + { + incrementQueueCount(); + incrementQueueSize(message); + + _totalMessagesReceived.incrementAndGet(); + + doEnqueue(message, null); + } + + + @Override + public final void completeRecovery() + { + if(_recovering.get()) + { + enqueueFromPostRecoveryQueue(); + + _recovering.set(false); + + // deal with any enqueues that occurred just as we cleared the queue + enqueueFromPostRecoveryQueue(); + } + } + private void enqueueFromPostRecoveryQueue() + { + while(!_postRecoveryQueue.isEmpty()) + { + EnqueueRequest request = _postRecoveryQueue.poll(); + MessageReference<?> messageReference = request.getMessage(); + doEnqueue(messageReference.getMessage(), request.getAction()); + messageReference.release(); + } + } + protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action) + { final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber; final QueueEntry entry = getEntries().add(message); @@ -939,7 +998,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> { action.performAction(entry); } - } private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry) @@ -2735,4 +2793,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>> } } + + private static class EnqueueRequest + { + private final MessageReference<?> _message; + private final Action<? super MessageInstance> _action; + + public EnqueueRequest(final ServerMessage message, + final Action<? super MessageInstance> action) + { + _message = message.newReference(); + _action = action; + } + + public MessageReference<?> getMessage() + { + return _message; + } + + public Action<? super MessageInstance> getAction() + { + return _action; + } + } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java index 1d61e6fc22..4b6c7a3fe7 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java @@ -53,12 +53,12 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements } @Override - public void enqueue(final ServerMessage message, + protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action) { synchronized (_sortedQueueLock) { - super.enqueue(message, action); + super.doEnqueue(message, action); } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index 974b3ba9ff..24866e4e2e 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -25,7 +25,6 @@ import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -78,6 +77,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?"; private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id"; + private static final String SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? ORDER BY queue_id, message_id"; + private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, content ) values (?, ?)"; private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME @@ -90,6 +91,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore "SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME; + private static final String SELECT_ONE_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?"; private static final String INSERT_INTO_XIDS = "INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)"; @@ -114,19 +116,56 @@ public abstract class AbstractJDBCMessageStore implements MessageStore protected void setMaximumMessageId() { - visitMessages(new MessageHandler() + + try + { + Connection conn = newAutoCommitConnection(); + try + { + setMaxMessageId(conn, "SELECT max(message_id) FROM " + MESSAGE_CONTENT_TABLE_NAME, 1); + setMaxMessageId(conn, "SELECT max(message_id) FROM " + META_DATA_TABLE_NAME, 1); + setMaxMessageId(conn, "SELECT queue_id, max(message_id) FROM " + QUEUE_ENTRY_TABLE_NAME + " GROUP BY queue_id " , 2); + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new StoreException(e); + } + + } + + private void setMaxMessageId(final Connection conn, final String query, int col) throws SQLException + { + PreparedStatement statement = + conn.prepareStatement(query); + try { - @Override - public boolean handle(StoredMessage<?> storedMessage) + ResultSet rs = statement.executeQuery(); + try { - long id = storedMessage.getMessageNumber(); - if (_messageId.get() < id) + while(rs.next()) { - _messageId.set(id); + long maxMessageId = rs.getLong(col); + if(_messageId.get() < maxMessageId) + { + _messageId.set(maxMessageId); + } } - return true; + } - }); + finally + { + rs.close(); + } + } + finally + { + statement.close(); + } } protected void upgrade(ConfiguredObject<?> parent) throws StoreException @@ -410,14 +449,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore if(metaData.isPersistent()) { - return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData); + return new StoredJDBCMessage(getNextMessageId(), metaData); } else { - return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData); + return new StoredMemoryMessage(getNextMessageId(), metaData); } } + @Override + public long getNextMessageId() + { + return _messageId.incrementAndGet(); + } + private void removeMessage(long messageId) { try @@ -1353,6 +1398,51 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override + public StoredMessage<?> getMessage(long messageId) throws StoreException + { + checkMessageStoreOpen(); + + Connection conn = null; + StoredJDBCMessage message; + try + { + conn = newAutoCommitConnection(); + try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA)) + { + stmt.setLong(1, messageId); + try (ResultSet rs = stmt.executeQuery()) + { + if (rs.next()) + { + byte[] dataAsBytes = getBlobAsBytes(rs, 2); + ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); + buf.position(1); + buf = buf.slice(); + MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); + message = new StoredJDBCMessage(messageId, metaData, true); + + } + else + { + message = null; + } + } + } + return message; + } + catch (SQLException e) + { + throw new StoreException("Error encountered when visiting messages", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } + } + + + @Override public void visitMessages(MessageHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -1404,6 +1494,53 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } @Override + public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException + { + checkMessageStoreOpen(); + + Connection conn = null; + try + { + conn = newAutoCommitConnection(); + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE); + try + { + stmt.setString(1, queue.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + while(rs.next()) + { + String id = rs.getString(1); + long messageId = rs.getLong(2); + if (!handler.handle(UUID.fromString(id), messageId)) + { + break; + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + catch(SQLException e) + { + throw new StoreException("Error encountered when visiting message instances", e); + } + finally + { + JdbcUtils.closeConnection(conn, getLogger()); + } + + } + + @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { checkMessageStoreOpen(); diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java index 3df1ffb6bc..f4551aae05 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -194,7 +194,7 @@ public class MemoryMessageStore implements MessageStore @Override public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) { - long id = _messageId.getAndIncrement(); + long id = getNextMessageId(); if(metaData.isPersistent()) { @@ -224,6 +224,12 @@ public class MemoryMessageStore implements MessageStore } @Override + public long getNextMessageId() + { + return _messageId.getAndIncrement(); + } + + @Override public boolean isPersistent() { return false; @@ -275,6 +281,12 @@ public class MemoryMessageStore implements MessageStore } @Override + public StoredMessage<?> getMessage(final long messageId) + { + return _messages.get(messageId); + } + + @Override public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException { synchronized (_transactionLock) @@ -294,6 +306,27 @@ public class MemoryMessageStore implements MessageStore } @Override + public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + Set<Long> ids = _messageInstances.get(queue.getId()); + if(ids != null) + { + for (long id : ids) + { + if (!handler.handle(queue.getId(), id)) + { + return; + } + + } + } + } + } + + + @Override public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException { synchronized (_transactionLock) diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java index 642fcbdc54..a4eaf48353 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java @@ -32,6 +32,8 @@ import org.apache.qpid.server.store.handler.MessageInstanceHandler; */ public interface MessageStore { + long getNextMessageId(); + String getStoreLocation(); void addEventListener(EventListener eventListener, Event... events); @@ -56,10 +58,12 @@ public interface MessageStore void visitMessages(MessageHandler handler) throws StoreException; void visitMessageInstances(MessageInstanceHandler handler) throws StoreException; + void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException; void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException; <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData); + StoredMessage<?> getMessage(long messageId); /** * Is this store capable of persisting the data diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 8848dcdd94..15bf13bcab 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -127,6 +127,11 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura } @Override + public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException + { + } + + @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { } @@ -136,4 +141,15 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura { } + @Override + public long getNextMessageId() + { + return 0; + } + + @Override + public StoredMessage<?> getMessage(final long messageId) + { + return null; + } } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java index aa689cc7e0..535ad77ea4 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java @@ -68,7 +68,6 @@ public class DtxBranch ROLLBACK_ONLY } - public DtxBranch(Xid xid, MessageStore store, VirtualHostImpl vhost) { _xid = xid; diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index 1765aebcca..1cd14dc025 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -88,6 +88,8 @@ import org.apache.qpid.server.util.MapValueConverter; public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X> implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener { + private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery"; + public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ"; public static final String DLQ_ROUTING_KEY = "dlq"; public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change @@ -149,10 +151,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte @ManagedAttributeField private int _housekeepingThreadCount; + + private boolean _useAsyncRecoverer; + private MessageDestination _defaultDestination; private MessageStore _messageStore; + public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode) { super(parentsMap(virtualHostNode), attributes); @@ -1332,7 +1338,20 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte createDefaultExchanges(); } - new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover(); + MessageStoreRecoverer messageStoreRecoverer; + + + + if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY)) + { + messageStoreRecoverer = new AsynchronousMessageStoreRecoverer(); + } + else + { + messageStoreRecoverer = new SynchronousMessageStoreRecoverer(); + } + messageStoreRecoverer.recover(this); + State finalState = State.ERRORED; try diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java new file mode 100644 index 0000000000..9742404225 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java @@ -0,0 +1,427 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.Xid; +import org.apache.qpid.transport.util.Functions; + +public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer +{ + private static final Logger _logger = Logger.getLogger(AsynchronousMessageStoreRecoverer.class); + + @Override + public void recover(final VirtualHostImpl virtualHost) + { + AsynchronousRecoverer asynchronousRecoverer = new AsynchronousRecoverer(virtualHost); + + asynchronousRecoverer.recover(); + } + + private static class AsynchronousRecoverer + { + private final VirtualHostImpl<?, ?, ?> _virtualHost; + private final EventLogger _eventLogger; + private final MessageStore _store; + private final MessageStoreLogSubject _logSubject; + private final long _maxMessageId; + private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>(); + private final AtomicBoolean _recoveryComplete = new AtomicBoolean(); + private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>(); + + + private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost) + { + _virtualHost = virtualHost; + _eventLogger = virtualHost.getEventLogger(); + _store = virtualHost.getMessageStore(); + _logSubject = new MessageStoreLogSubject(virtualHost.getName(), _store.getClass().getSimpleName()); + + _maxMessageId = _store.getNextMessageId(); + _recoveringQueues.addAll(_virtualHost.getQueues()); + + } + + public void recover() + { + getStore().visitDistributedTransactions(new DistributedTransactionVisitor()); + + for(AMQQueue<?> queue : _recoveringQueues) + { + Thread queueThread = new Thread(new QueueRecoveringTask(queue), "Queue Recoverer : " + queue.getName() + " (vh: " + getVirtualHost().getName() + ")"); + queueThread.start(); + } + } + + public VirtualHostImpl<?, ?, ?> getVirtualHost() + { + return _virtualHost; + } + + public EventLogger getEventLogger() + { + return _eventLogger; + } + + public MessageStore getStore() + { + return _store; + } + + public MessageStoreLogSubject getLogSubject() + { + return _logSubject; + } + + private boolean isRecovering(AMQQueue<?> queue) + { + return _recoveringQueues.contains(queue); + } + + private void recoverQueue(AMQQueue<?> queue) + { + MessageInstanceVisitor handler = new MessageInstanceVisitor(queue); + _store.visitMessageInstances(queue, handler); + + getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(handler.getRecoveredCount(), queue.getName())); + getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true)); + queue.completeRecovery(); + + _recoveringQueues.remove(queue); + if (_recoveringQueues.isEmpty() && _recoveryComplete.compareAndSet(false, true)) + { + completeRecovery(); + } + } + + private synchronized void completeRecovery() + { + // at this point nothing should be writing to the map of recovered messages + for (MessageReference<? extends ServerMessage<?>> entry : _recoveredMessages.values()) + { + entry.release(); + } + _recoveredMessages.clear(); + } + + private synchronized ServerMessage<?> getRecoveredMessage(final long messageId) + { + MessageReference<? extends ServerMessage<?>> ref = _recoveredMessages.get(messageId); + if (ref == null) + { + StoredMessage<?> message = _store.getMessage(messageId); + if(message != null) + { + StorableMessageMetaData metaData = message.getMetaData(); + + @SuppressWarnings("rawtypes") + MessageMetaDataType type = metaData.getType(); + + @SuppressWarnings("unchecked") + ServerMessage<?> serverMessage = type.createMessage(message); + + ref = serverMessage.newReference(); + _recoveredMessages.put(messageId, ref); + } + } + return ref == null ? null : ref.getMessage(); + } + + + private class DistributedTransactionVisitor implements DistributedTransactionHandler + { + + + + @Override + public boolean handle(long format, + byte[] globalId, + byte[] branchId, + Transaction.Record[] enqueues, + Transaction.Record[] dequeues) + { + Xid id = new Xid(format, globalId, branchId); + DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry(); + DtxBranch branch = dtxRegistry.getBranch(id); + if (branch == null) + { + branch = new DtxBranch(id, getStore(), getVirtualHost()); + dtxRegistry.registerBranch(branch); + } + for (Transaction.Record record : enqueues) + { + final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId()); + if (queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage<?> message = getRecoveredMessage(messageId); + + if (message != null) + { + final MessageReference<?> ref = message.newReference(); + + branch.enqueue(queue, message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + queue.enqueue(message, null); + ref.release(); + } + + public void onRollback() + { + ref.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + getEventLogger().message(getLogSubject(), + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString( + messageId))); + } + } + else + { + StringBuilder xidString = xidAsString(id); + getEventLogger().message(getLogSubject(), + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource() + .getId() + .toString())); + + } + } + for (Transaction.Record record : dequeues) + { + + final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId()); + + if (queue != null) + { + // For DTX to work correctly the queues which have uncommitted branches with dequeues + // must be synchronously recovered + + if (isRecovering(queue)) + { + recoverQueue(queue); + } + + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage<?> message = getRecoveredMessage(messageId); + + if (message != null) + { + final QueueEntry entry = queue.getMessageOnTheQueue(messageId); + + entry.acquire(); + + branch.dequeue(queue, message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + entry.delete(); + } + + public void onRollback() + { + entry.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + getEventLogger().message(getLogSubject(), + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString( + messageId))); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + getEventLogger().message(getLogSubject(), + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource() + .getId() + .toString())); + } + + } + + + branch.setState(DtxBranch.State.PREPARED); + branch.prePrepareTransaction(); + + return true; + } + + private StringBuilder xidAsString(Xid id) + { + return new StringBuilder("(") + .append(id.getFormat()) + .append(',') + .append(Functions.str(id.getGlobalId())) + .append(',') + .append(Functions.str(id.getBranchId())) + .append(')'); + } + + + } + + private class QueueRecoveringTask implements Runnable + { + private final AMQQueue<?> _queue; + + public QueueRecoveringTask(final AMQQueue<?> queue) + { + _queue = queue; + } + + @Override + public void run() + { + recoverQueue(_queue); + } + } + + private class MessageInstanceVisitor implements MessageInstanceHandler + { + private final AMQQueue<?> _queue; + long _recoveredCount; + + private MessageInstanceVisitor(AMQQueue<?> queue) + { + _queue = queue; + } + + @Override + public boolean handle(final UUID queueId, long messageId) + { + String queueName = _queue.getName(); + + if(messageId < _maxMessageId) + { + ServerMessage<?> message = getRecoveredMessage(messageId); + + if (message != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); + } + + _queue.recover(message); + _recoveredCount++; + } + else + { + _logger.warn("Message id " + + messageId + + " referenced in log as enqueued in queue " + + queueName + + " is unknown, entry will be discarded"); + Transaction txn = getStore().newTransaction(); + txn.dequeueMessage(_queue, new DummyMessage(messageId)); + txn.commitTranAsync(); + } + return true; + } + else + { + return false; + } + + } + + public long getRecoveredCount() + { + return _recoveredCount; + } + } + } + + private static class DummyMessage implements EnqueueableMessage + { + + private final long _messageId; + + public DummyMessage(long messageId) + { + _messageId = messageId; + } + + public long getMessageNumber() + { + return _messageId; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + return null; + } + } + + +} diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java index 662a26bb2e..9f4c7dd319 100644 --- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java @@ -20,338 +20,7 @@ */ package org.apache.qpid.server.virtualhost; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; -import java.util.UUID; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.logging.messages.MessageStoreMessages; -import org.apache.qpid.server.logging.messages.TransactionLogMessages; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.message.MessageReference; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.store.MessageStore; -import org.apache.qpid.server.store.StorableMessageMetaData; -import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.store.Transaction; -import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.store.Transaction.Record; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; -import org.apache.qpid.server.txn.DtxBranch; -import org.apache.qpid.server.txn.DtxRegistry; -import org.apache.qpid.server.txn.ServerTransaction; -import org.apache.qpid.transport.Xid; -import org.apache.qpid.transport.util.Functions; - -public class MessageStoreRecoverer +public interface MessageStoreRecoverer { - private static final Logger _logger = Logger.getLogger(MessageStoreRecoverer.class); - - private final VirtualHostImpl _virtualHost; - - private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>(); - private final Map<Long, ServerMessage<?>> _recoveredMessages = new HashMap<Long, ServerMessage<?>>(); - private final Map<Long, StoredMessage<?>> _unusedMessages = new HashMap<Long, StoredMessage<?>>(); - private final EventLogger _eventLogger; - - private final MessageStoreLogSubject _logSubject; - private final MessageStore _store; - - - public MessageStoreRecoverer(VirtualHostImpl virtualHost, MessageStoreLogSubject logSubject) - { - super(); - _virtualHost = virtualHost; - _eventLogger = virtualHost.getEventLogger(); - _logSubject = logSubject; - _store = virtualHost.getMessageStore(); - } - - - public void recover() - { - _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START()); - _store.visitMessages(messageVisitor); - - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false)); - _store.visitMessageInstances(messageAndMessageInstanceRecoverer); - - for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet()) - { - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); - } - - _store.visitDistributedTransactions(distributedTransactionRecoverer); - - - - for(StoredMessage<?> m : _unusedMessages.values()) - { - _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); - m.remove(); - } - _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); - - _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size())); - _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); - - - } - - MessageHandler messageVisitor = new MessageHandler() - { - - @Override - public boolean handle(StoredMessage<?> message) - { - StorableMessageMetaData metaData = message.getMetaData(); - - @SuppressWarnings("rawtypes") - MessageMetaDataType type = metaData.getType(); - - @SuppressWarnings("unchecked") - ServerMessage<?> serverMessage = type.createMessage(message); - - _recoveredMessages.put(message.getMessageNumber(), serverMessage); - _unusedMessages.put(message.getMessageNumber(), message); - return true; - } - - }; - - MessageInstanceHandler messageAndMessageInstanceRecoverer = new MessageInstanceHandler() - { - @Override - public boolean handle(final UUID queueId, long messageId) - { - AMQQueue<?> queue = _virtualHost.getQueue(queueId); - if(queue != null) - { - String queueName = queue.getName(); - ServerMessage<?> message = _recoveredMessages.get(messageId); - _unusedMessages.remove(messageId); - - if(message != null) - { - if (_logger.isDebugEnabled()) - { - _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); - } - - Integer count = _queueRecoveries.get(queueName); - if (count == null) - { - count = 0; - } - - queue.enqueue(message,null); - - _queueRecoveries.put(queueName, ++count); - } - else - { - _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); - Transaction txn = _store.newTransaction(); - txn.dequeueMessage(queue, new DummyMessage(messageId)); - txn.commitTranAsync(); - } - } - else - { - _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); - Transaction txn = _store.newTransaction(); - TransactionLogResource mockQueue = - new TransactionLogResource() - { - @Override - public String getName() - { - return "<<UNKNOWN>>"; - } - - @Override - public UUID getId() - { - return queueId; - } - - @Override - public boolean isDurable() - { - return false; - } - }; - txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); - txn.commitTranAsync(); - } - return true; - } - }; - - private DistributedTransactionHandler distributedTransactionRecoverer = new DistributedTransactionHandler() - { - - @Override - public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - Xid id = new Xid(format, globalId, branchId); - DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); - DtxBranch branch = dtxRegistry.getBranch(id); - if(branch == null) - { - branch = new DtxBranch(id, _store, _virtualHost); - dtxRegistry.registerBranch(branch); - } - for(Transaction.Record record : enqueues) - { - final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); - if(queue != null) - { - final long messageId = record.getMessage().getMessageNumber(); - final ServerMessage<?> message = _recoveredMessages.get(messageId); - _unusedMessages.remove(messageId); - - if(message != null) - { - final MessageReference<?> ref = message.newReference(); - - branch.enqueue(queue,message); - - branch.addPostTransactionAction(new ServerTransaction.Action() - { - - public void postCommit() - { - queue.enqueue(message, null); - ref.release(); - } - - public void onRollback() - { - ref.release(); - } - }); - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), - Long.toString(messageId))); - } - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource().getId().toString())); - - } - } - for(Transaction.Record record : dequeues) - { - final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); - if(queue != null) - { - final long messageId = record.getMessage().getMessageNumber(); - final ServerMessage<?> message = _recoveredMessages.get(messageId); - _unusedMessages.remove(messageId); - - if(message != null) - { - final QueueEntry entry = queue.getMessageOnTheQueue(messageId); - - entry.acquire(); - - branch.dequeue(queue, message); - - branch.addPostTransactionAction(new ServerTransaction.Action() - { - - public void postCommit() - { - entry.delete(); - } - - public void onRollback() - { - entry.release(); - } - }); - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), - Long.toString(messageId))); - - } - - } - else - { - StringBuilder xidString = xidAsString(id); - _eventLogger.message(_logSubject, - TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getResource().getId().toString())); - } - - } - - branch.setState(DtxBranch.State.PREPARED); - branch.prePrepareTransaction(); - return true; - } - - private StringBuilder xidAsString(Xid id) - { - return new StringBuilder("(") - .append(id.getFormat()) - .append(',') - .append(Functions.str(id.getGlobalId())) - .append(',') - .append(Functions.str(id.getBranchId())) - .append(')'); - } - - - }; - - - private static class DummyMessage implements EnqueueableMessage - { - - private final long _messageId; - - public DummyMessage(long messageId) - { - _messageId = messageId; - } - - public long getMessageNumber() - { - return _messageId; - } - - public boolean isPersistent() - { - return true; - } - - public StoredMessage getStoredMessage() - { - return null; - } - } + void recover(VirtualHostImpl virtualHost); } diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java new file mode 100644 index 0000000000..9fc71e23d7 --- /dev/null +++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java @@ -0,0 +1,416 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.log4j.Logger; + +import org.apache.qpid.server.logging.EventLogger; +import org.apache.qpid.server.logging.messages.MessageStoreMessages; +import org.apache.qpid.server.logging.messages.TransactionLogMessages; +import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.QueueEntry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.store.Transaction; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.TransactionLogResource; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; +import org.apache.qpid.server.txn.DtxBranch; +import org.apache.qpid.server.txn.DtxRegistry; +import org.apache.qpid.server.txn.ServerTransaction; +import org.apache.qpid.transport.Xid; +import org.apache.qpid.transport.util.Functions; + +public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer +{ + private static final Logger _logger = Logger.getLogger(SynchronousMessageStoreRecoverer.class); + + @Override + public void recover(VirtualHostImpl virtualHost) + { + EventLogger eventLogger = virtualHost.getEventLogger(); + MessageStore store = virtualHost.getMessageStore(); + MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName()); + + Map<String, Integer> queueRecoveries = new TreeMap<>(); + Map<Long, ServerMessage<?>> recoveredMessages = new HashMap<>(); + Map<Long, StoredMessage<?>> unusedMessages = new HashMap<>(); + + + eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START()); + + store.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages)); + + eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false)); + store.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries, + recoveredMessages, unusedMessages)); + for(Map.Entry<String,Integer> entry : queueRecoveries.entrySet()) + { + eventLogger.message(logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey())); + eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true)); + virtualHost.getQueue(entry.getKey()).completeRecovery(); + } + + Collection<AMQQueue> allQueues = virtualHost.getQueues(); + + for(AMQQueue q : allQueues) + { + if(!queueRecoveries.containsKey(q.getName())) + { + q.completeRecovery(); + } + } + + store.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger, + logSubject, recoveredMessages, unusedMessages)); + + + + for(StoredMessage<?> m : unusedMessages.values()) + { + _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing...."); + m.remove(); + } + eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); + + eventLogger.message(logSubject, + MessageStoreMessages.RECOVERED(recoveredMessages.size() - unusedMessages.size())); + eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_COMPLETE()); + + + } + + private static class MessageVisitor implements MessageHandler + { + + private final Map<Long, ServerMessage<?>> _recoveredMessages; + private final Map<Long, StoredMessage<?>> _unusedMessages; + + public MessageVisitor(final Map<Long, ServerMessage<?>> recoveredMessages, + final Map<Long, StoredMessage<?>> unusedMessages) + { + _recoveredMessages = recoveredMessages; + _unusedMessages = unusedMessages; + } + + @Override + public boolean handle(StoredMessage<?> message) + { + StorableMessageMetaData metaData = message.getMetaData(); + + @SuppressWarnings("rawtypes") + MessageMetaDataType type = metaData.getType(); + + @SuppressWarnings("unchecked") + ServerMessage<?> serverMessage = type.createMessage(message); + + _recoveredMessages.put(message.getMessageNumber(), serverMessage); + _unusedMessages.put(message.getMessageNumber(), message); + return true; + } + + } + + private static class MessageInstanceVisitor implements MessageInstanceHandler + { + private final VirtualHostImpl _virtualHost; + private final MessageStore _store; + + private final Map<String, Integer> _queueRecoveries; + private final Map<Long, ServerMessage<?>> _recoveredMessages; + private final Map<Long, StoredMessage<?>> _unusedMessages; + + private MessageInstanceVisitor(final VirtualHostImpl virtualHost, + final MessageStore store, + final Map<String, Integer> queueRecoveries, + final Map<Long, ServerMessage<?>> recoveredMessages, + final Map<Long, StoredMessage<?>> unusedMessages) + { + _virtualHost = virtualHost; + _store = store; + _queueRecoveries = queueRecoveries; + _recoveredMessages = recoveredMessages; + _unusedMessages = unusedMessages; + } + + @Override + public boolean handle(final UUID queueId, long messageId) + { + AMQQueue<?> queue = _virtualHost.getQueue(queueId); + if(queue != null) + { + String queueName = queue.getName(); + ServerMessage<?> message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + if (_logger.isDebugEnabled()) + { + _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); + } + + Integer count = _queueRecoveries.get(queueName); + if (count == null) + { + count = 0; + } + + queue.recover(message); + + _queueRecoveries.put(queueName, ++count); + } + else + { + _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); + Transaction txn = _store.newTransaction(); + txn.dequeueMessage(queue, new DummyMessage(messageId)); + txn.commitTranAsync(); + } + } + else + { + _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); + Transaction txn = _store.newTransaction(); + TransactionLogResource mockQueue = + new TransactionLogResource() + { + @Override + public String getName() + { + return "<<UNKNOWN>>"; + } + + @Override + public UUID getId() + { + return queueId; + } + + @Override + public boolean isDurable() + { + return false; + } + }; + txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); + txn.commitTranAsync(); + } + return true; + } + } + + private static class DistributedTransactionVisitor implements DistributedTransactionHandler + { + + private final VirtualHostImpl _virtualHost; + private final MessageStore _store; + private final EventLogger _eventLogger; + private final MessageStoreLogSubject _logSubject; + + private final Map<Long, ServerMessage<?>> _recoveredMessages; + private final Map<Long, StoredMessage<?>> _unusedMessages; + + private DistributedTransactionVisitor(final VirtualHostImpl virtualHost, + final MessageStore store, + final EventLogger eventLogger, + final MessageStoreLogSubject logSubject, + final Map<Long, ServerMessage<?>> recoveredMessages, + final Map<Long, StoredMessage<?>> unusedMessages) + { + _virtualHost = virtualHost; + _store = store; + _eventLogger = eventLogger; + _logSubject = logSubject; + _recoveredMessages = recoveredMessages; + _unusedMessages = unusedMessages; + } + + @Override + public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + Xid id = new Xid(format, globalId, branchId); + DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry(); + DtxBranch branch = dtxRegistry.getBranch(id); + if(branch == null) + { + branch = new DtxBranch(id, _store, _virtualHost); + dtxRegistry.registerBranch(branch); + } + for(Transaction.Record record : enqueues) + { + final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage<?> message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final MessageReference<?> ref = message.newReference(); + + branch.enqueue(queue,message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + queue.enqueue(message, null); + ref.release(); + } + + public void onRollback() + { + ref.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); + } + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); + + } + } + for(Transaction.Record record : dequeues) + { + final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId()); + if(queue != null) + { + final long messageId = record.getMessage().getMessageNumber(); + final ServerMessage<?> message = _recoveredMessages.get(messageId); + _unusedMessages.remove(messageId); + + if(message != null) + { + final QueueEntry entry = queue.getMessageOnTheQueue(messageId); + + entry.acquire(); + + branch.dequeue(queue, message); + + branch.addPostTransactionAction(new ServerTransaction.Action() + { + + public void postCommit() + { + entry.delete(); + } + + public void onRollback() + { + entry.release(); + } + }); + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(), + Long.toString(messageId))); + + } + + } + else + { + StringBuilder xidString = xidAsString(id); + _eventLogger.message(_logSubject, + TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), + record.getResource().getId().toString())); + } + + } + + branch.setState(DtxBranch.State.PREPARED); + branch.prePrepareTransaction(); + return true; + } + + private StringBuilder xidAsString(Xid id) + { + return new StringBuilder("(") + .append(id.getFormat()) + .append(',') + .append(Functions.str(id.getGlobalId())) + .append(',') + .append(Functions.str(id.getBranchId())) + .append(')'); + } + + + } + + + private static class DummyMessage implements EnqueueableMessage + { + + private final long _messageId; + + public DummyMessage(long messageId) + { + _messageId = messageId; + } + + public long getMessageNumber() + { + return _messageId; + } + + public boolean isPersistent() + { + return true; + } + + public StoredMessage getStoredMessage() + { + return null; + } + } + +} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java new file mode 100644 index 0000000000..3e15ada10c --- /dev/null +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java @@ -0,0 +1,195 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageReference; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.util.Action; +import org.apache.qpid.server.util.BrokerTestHelper; +import org.apache.qpid.server.virtualhost.VirtualHostImpl; +import org.apache.qpid.test.utils.QpidTestCase; + +public class QueueMessageRecoveryTest extends QpidTestCase +{ + VirtualHostImpl<?, ?, ?> _vhost; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _vhost = BrokerTestHelper.createVirtualHost("host"); + } + + public void testSimpleRecovery() throws Exception + { + // Simple deterministic test to prove that interleaved recovery and new publishing onto a queue is correctly + // handled + final List<ServerMessage<?>> messageList = new ArrayList<>(); + TestQueue queue = new TestQueue(Collections.singletonMap(Queue.NAME, (Object)"test"), _vhost, messageList); + + queue.open(); + + queue.recover(createMockMessage(0)); + queue.enqueue(createMockMessage(4), null); + queue.enqueue(createMockMessage(5), null); + queue.recover(createMockMessage(1)); + queue.recover(createMockMessage(2)); + queue.enqueue(createMockMessage(6), null); + queue.recover(createMockMessage(3)); + + assertEquals(4, messageList.size()); + for(int i = 0; i < 4; i++) + { + assertEquals((long)i, messageList.get(i).getMessageNumber()); + } + + queue.completeRecovery(); + + queue.enqueue(createMockMessage(7), null); + + assertEquals(8, messageList.size()); + + for(int i = 0; i < 8; i++) + { + assertEquals((long)i, messageList.get(i).getMessageNumber()); + } + + } + + + public void testMultiThreadedRecovery() throws Exception + { + // Non deterministic test with separate publishing and recovery threads + + performMultiThreadedRecovery(5000); + } + + public void testRepeatedMultiThreadedRecovery() throws Exception + { + // Repeated non deterministic test with separate publishing and recovery threads(but with fewer messages being + // published/recovered + + for(int i = 0; i < 50; i++) + { + performMultiThreadedRecovery(10); + } + } + + private void performMultiThreadedRecovery(final int size) throws Exception + { + + final List<ServerMessage<?>> messageList = new ArrayList<>(); + final TestQueue queue = new TestQueue(Collections.singletonMap(Queue.NAME, (Object) "test"), _vhost, messageList); + + queue.open(); + + + Thread recoveryThread = new Thread(new Runnable() + { + + @Override + public void run() + { + for(int i = 0; i < size; i++) + { + queue.recover(createMockMessage(i)); + } + queue.completeRecovery(); + } + }, "recovery thread"); + + Thread publishingThread = new Thread(new Runnable() + { + + @Override + public void run() + { + for(int i = 0; i < size; i++) + { + queue.enqueue(createMockMessage(size + i), null); + } + } + }, "publishing thread"); + + recoveryThread.start(); + publishingThread.start(); + + recoveryThread.join(10000); + publishingThread.join(10000); + + assertEquals(size*2, messageList.size()); + + for(int i = 0; i < (size*2); i++) + { + assertEquals((long)i, messageList.get(i).getMessageNumber()); + } + } + + + private ServerMessage createMockMessage(final long i) + { + ServerMessage msg = mock(ServerMessage.class); + when(msg.getMessageNumber()).thenReturn(i); + MessageReference ref = mock(MessageReference.class); + when(ref.getMessage()).thenReturn(msg); + when(msg.newReference()).thenReturn(ref); + return msg; + } + + private class TestQueue extends AbstractQueue<TestQueue> + { + + private final List<ServerMessage<?>> _messageList; + + protected TestQueue(final Map<String, Object> attributes, + final VirtualHostImpl virtualHost, + final List<ServerMessage<?>> messageList) + { + super(attributes, virtualHost); + _messageList = messageList; + } + + @Override + QueueEntryList getEntries() + { + return null; + } + + @Override + protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action) + { + synchronized(_messageList) + { + _messageList.add(message); + } + } + } +} diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 71f0bed3d9..d328e21a94 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -142,7 +142,7 @@ public class StandardQueueTest extends AbstractQueueTestBase { // create a queue where each even entry is considered a dequeued AbstractQueue queue = new DequeuedQueue(getVirtualHost()); - queue.open(); + queue.create(); // create a consumer MockConsumer consumer = new MockConsumer(); @@ -188,7 +188,7 @@ public class StandardQueueTest extends AbstractQueueTestBase // do nothing } }; - testQueue.open(); + testQueue.create(); // put messages List<StandardQueueEntry> entries = diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java index 2a5e0ff763..685fea207b 100644 --- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java +++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java @@ -34,9 +34,9 @@ import static org.mockito.Mockito.when; import java.util.UUID; import junit.framework.TestCase; +import org.mockito.ArgumentMatcher; import org.apache.qpid.server.logging.EventLogger; -import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject; import org.apache.qpid.server.message.EnqueueableMessage; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -58,9 +58,8 @@ import org.apache.qpid.server.txn.DtxBranch; import org.apache.qpid.server.txn.DtxRegistry; import org.apache.qpid.server.util.Action; import org.apache.qpid.transport.Xid; -import org.mockito.ArgumentMatcher; -public class MessageStoreRecovererTest extends TestCase +public class SynchronousMessageStoreRecovererTest extends TestCase { private VirtualHostImpl _virtualHost; @@ -99,11 +98,12 @@ public class MessageStoreRecovererTest extends TestCase when(_virtualHost.getMessageStore()).thenReturn(store); - MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); - recoverer.recover(); + SynchronousMessageStoreRecoverer + recoverer = new SynchronousMessageStoreRecoverer(); + recoverer.recover(_virtualHost); ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage); - verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull()); + verify(queue, times(1)).recover(eq(message)); } @SuppressWarnings("unchecked") @@ -137,8 +137,9 @@ public class MessageStoreRecovererTest extends TestCase when(_virtualHost.getMessageStore()).thenReturn(store); - MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); - recoverer.recover(); + SynchronousMessageStoreRecoverer + recoverer = new SynchronousMessageStoreRecoverer(); + recoverer.recover(_virtualHost); verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class)); verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId))); @@ -175,8 +176,9 @@ public class MessageStoreRecovererTest extends TestCase when(_virtualHost.getMessageStore()).thenReturn(store); - MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); - recoverer.recover(); + SynchronousMessageStoreRecoverer + recoverer = new SynchronousMessageStoreRecoverer(); + recoverer.recover(_virtualHost); verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId))); verify(transaction, times(1)).commitTranAsync(); @@ -205,8 +207,9 @@ public class MessageStoreRecovererTest extends TestCase when(_virtualHost.getMessageStore()).thenReturn(store); - MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); - recoverer.recover(); + SynchronousMessageStoreRecoverer + recoverer = new SynchronousMessageStoreRecoverer(); + recoverer.recover(_virtualHost); verify(storedMessage, times(1)).remove(); } @@ -262,8 +265,9 @@ public class MessageStoreRecovererTest extends TestCase when(_virtualHost.getMessageStore()).thenReturn(store); when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry); - MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); - recoverer.recover(); + SynchronousMessageStoreRecoverer + recoverer = new SynchronousMessageStoreRecoverer(); + recoverer.recover(_virtualHost); DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId)); assertNotNull("Expected dtx branch to be created", branch); @@ -330,8 +334,9 @@ public class MessageStoreRecovererTest extends TestCase when(_virtualHost.getMessageStore()).thenReturn(store); when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry); - MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class)); - recoverer.recover(); + SynchronousMessageStoreRecoverer + recoverer = new SynchronousMessageStoreRecoverer(); + recoverer.recover(_virtualHost); DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId)); assertNotNull("Expected dtx branch to be created", branch); @@ -377,6 +382,7 @@ public class MessageStoreRecovererTest extends TestCase when(queue.getId()).thenReturn(queueId); when(queue.getName()).thenReturn("test-queue"); when(_virtualHost.getQueue(queueId)).thenReturn(queue); + when(_virtualHost.getQueue("test-queue")).thenReturn(queue); return queue; } |