summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-20 16:42:29 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-20 16:42:29 +0000
commit2bf9fcba59a576861dae424ee5ddfe8fae0a1af9 (patch)
treed9d354f25cf6a6401d08d227baaf1a9124e03f17
parent02291ffa8f75a05c118f8031263cec14d55bf894 (diff)
downloadqpid-python-2bf9fcba59a576861dae424ee5ddfe8fae0a1af9.tar.gz
QPID-5907 : [Java Broker] Add ability for broker to startup while persistent queues are still being recovered
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1612118 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java101
-rw-r--r--java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java2
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java85
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java159
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java35
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java4
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java16
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java1
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java21
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java427
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java335
-rw-r--r--java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java416
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java195
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java4
-rw-r--r--java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java (renamed from java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java)38
17 files changed, 1473 insertions, 374 deletions
diff --git a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
index 4072c483b2..cf187fe1e9 100644
--- a/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
+++ b/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
@@ -18,10 +18,9 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import static org.apache.qpid.server.store.berkeleydb.BDBUtils.*;
+import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
import static org.apache.qpid.server.store.berkeleydb.BDBUtils.abortTransactionSafely;
import static org.apache.qpid.server.store.berkeleydb.BDBUtils.closeCursorSafely;
-import static org.apache.qpid.server.store.berkeleydb.BDBUtils.DEFAULT_DATABASE_CONFIG;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
@@ -122,7 +121,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData)
{
- long newMessageId = getNextMessageSequenceNumber();
+ long newMessageId = getNextMessageId();
if (metaData.isPersistent())
{
@@ -134,7 +133,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
- private long getNextMessageSequenceNumber()
+ public long getNextMessageId()
{
long newMessageId;
try
@@ -182,6 +181,73 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
@Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ checkMessageStoreOpen();
+ return getMessageInternal(messageId, getEnvironmentFacade());
+ }
+
+ @Override
+ public void visitMessageInstances(final TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Cursor cursor = null;
+ List<QueueEntryKey> entries = new ArrayList<QueueEntryKey>();
+ try
+ {
+ cursor = getDeliveryDb().openCursor(null, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ keyBinding.objectToEntry(new QueueEntryKey(queue.getId(),0l), key);
+
+ if(cursor.getSearchKeyRange(key,value,LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ QueueEntryKey entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ while (cursor.getNext(key, value, LockMode.DEFAULT) == OperationStatus.SUCCESS)
+ {
+ entry = keyBinding.entryToObject(key);
+ if(entry.getQueueId().equals(queue.getId()))
+ {
+ entries.add(entry);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+ catch (DatabaseException e)
+ {
+ throw getEnvironmentFacade().handleDatabaseException("Cannot visit message instances", e);
+ }
+ finally
+ {
+ closeCursorSafely(cursor, getEnvironmentFacade());
+ }
+
+ for(QueueEntryKey entry : entries)
+ {
+ UUID queueId = entry.getQueueId();
+ long messageId = entry.getMessageId();
+ if (!handler.handle(queueId, messageId))
+ {
+ break;
+ }
+ }
+
+ }
+
+
+
+ @Override
public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -542,6 +608,33 @@ public abstract class AbstractBDBMessageStore implements MessageStore
}
}
+
+ private StoredBDBMessage<?> getMessageInternal(long messageId, EnvironmentFacade environmentFacade)
+ {
+ try
+ {
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
+ LongBinding.longToEntry(messageId, key);
+ if(getMessageMetaDataDb().get(null, key, value, LockMode.READ_COMMITTED) == OperationStatus.SUCCESS)
+ {
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
+ StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, true);
+ return message;
+ }
+ else
+ {
+ return null;
+ }
+
+ }
+ catch (DatabaseException e)
+ {
+ throw environmentFacade.handleDatabaseException("Cannot visit messages", e);
+ }
+ }
+
/**
* Stores a chunk of message data.
*
diff --git a/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java b/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java
index 0e5022a66f..21193b3a37 100644
--- a/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java
+++ b/java/broker-codegen/src/main/java/org/apache/qpid/server/model/ManagedObjectFactoryConstructor.java
@@ -25,7 +25,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-@Retention(RetentionPolicy.SOURCE)
+@Retention(RetentionPolicy.CLASS)
@Target(ElementType.CONSTRUCTOR)
public @interface ManagedObjectFactoryConstructor
{
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index d25833b9f7..03bba43d57 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.protocol.CapacityChecker;
@@ -107,4 +108,7 @@ public interface AMQQueue<X extends AMQQueue<X>>
void setNotificationListener(QueueNotificationListener listener);
+ void completeRecovery();
+
+ void recover(ServerMessage<?> message);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 2bfef7f4b8..cb3b5effd0 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -30,6 +30,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
@@ -225,6 +226,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
private int _maximumDistinctGroups;
private State _state = State.UNINITIALIZED;
+ private final AtomicBoolean _recovering = new AtomicBoolean(true);
+ private final ConcurrentLinkedQueue<EnqueueRequest> _postRecoveryQueue = new ConcurrentLinkedQueue<>();
protected AbstractQueue(Map<String, Object> attributes, VirtualHostImpl virtualHost)
{
@@ -246,6 +249,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
_virtualHost.getDurableConfigurationStore().create(asObjectRecord());
}
+
+ _recovering.set(false);
}
@Override
@@ -842,15 +847,69 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
// ------ Enqueue / Dequeue
- public void enqueue(ServerMessage message, Action<? super MessageInstance> action)
+ public final void enqueue(ServerMessage message, Action<? super MessageInstance> action)
{
incrementQueueCount();
incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
+ if(_recovering.get())
+ {
+ EnqueueRequest request = new EnqueueRequest(message, action);
+ _postRecoveryQueue.add(request);
+
+ // deal with the case the recovering status changed just as we added to the post recovery queue
+ if(!_recovering.get() && _postRecoveryQueue.remove(request))
+ {
+ doEnqueue(message, action);
+ }
+ }
+ else
+ {
+ doEnqueue(message, action);
+ }
+
+ }
+
+ public final void recover(ServerMessage message)
+ {
+ incrementQueueCount();
+ incrementQueueSize(message);
+
+ _totalMessagesReceived.incrementAndGet();
+
+ doEnqueue(message, null);
+ }
+
+
+ @Override
+ public final void completeRecovery()
+ {
+ if(_recovering.get())
+ {
+ enqueueFromPostRecoveryQueue();
+
+ _recovering.set(false);
+
+ // deal with any enqueues that occurred just as we cleared the queue
+ enqueueFromPostRecoveryQueue();
+ }
+ }
+ private void enqueueFromPostRecoveryQueue()
+ {
+ while(!_postRecoveryQueue.isEmpty())
+ {
+ EnqueueRequest request = _postRecoveryQueue.poll();
+ MessageReference<?> messageReference = request.getMessage();
+ doEnqueue(messageReference.getMessage(), request.getAction());
+ messageReference.release();
+ }
+ }
+ protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action)
+ {
final QueueConsumer<?> exclusiveSub = _exclusiveSubscriber;
final QueueEntry entry = getEntries().add(message);
@@ -939,7 +998,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
{
action.performAction(entry);
}
-
}
private void deliverToConsumer(final QueueConsumer<?> sub, final QueueEntry entry)
@@ -2735,4 +2793,27 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
}
+
+ private static class EnqueueRequest
+ {
+ private final MessageReference<?> _message;
+ private final Action<? super MessageInstance> _action;
+
+ public EnqueueRequest(final ServerMessage message,
+ final Action<? super MessageInstance> action)
+ {
+ _message = message.newReference();
+ _action = action;
+ }
+
+ public MessageReference<?> getMessage()
+ {
+ return _message;
+ }
+
+ public Action<? super MessageInstance> getAction()
+ {
+ return _action;
+ }
+ }
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
index 1d61e6fc22..4b6c7a3fe7 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
@@ -53,12 +53,12 @@ public class SortedQueueImpl extends OutOfOrderQueue<SortedQueueImpl> implements
}
@Override
- public void enqueue(final ServerMessage message,
+ protected void doEnqueue(final ServerMessage message,
final Action<? super MessageInstance> action)
{
synchronized (_sortedQueueLock)
{
- super.enqueue(message, action);
+ super.doEnqueue(message, action);
}
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index 974b3ba9ff..24866e4e2e 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -25,7 +25,6 @@ import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -78,6 +77,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
+ private static final String SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? ORDER BY queue_id, message_id";
+
private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
+ "( message_id, content ) values (?, ?)";
private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
@@ -90,6 +91,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
"SELECT meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String DELETE_FROM_META_DATA = "DELETE FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String SELECT_ALL_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME;
+ private static final String SELECT_ONE_FROM_META_DATA = "SELECT message_id, meta_data FROM " + META_DATA_TABLE_NAME + " WHERE message_id = ?";
private static final String INSERT_INTO_XIDS =
"INSERT INTO "+ XID_TABLE_NAME +" ( format, global_id, branch_id ) values (?, ?, ?)";
@@ -114,19 +116,56 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
protected void setMaximumMessageId()
{
- visitMessages(new MessageHandler()
+
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ setMaxMessageId(conn, "SELECT max(message_id) FROM " + MESSAGE_CONTENT_TABLE_NAME, 1);
+ setMaxMessageId(conn, "SELECT max(message_id) FROM " + META_DATA_TABLE_NAME, 1);
+ setMaxMessageId(conn, "SELECT queue_id, max(message_id) FROM " + QUEUE_ENTRY_TABLE_NAME + " GROUP BY queue_id " , 2);
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+
+ }
+
+ private void setMaxMessageId(final Connection conn, final String query, int col) throws SQLException
+ {
+ PreparedStatement statement =
+ conn.prepareStatement(query);
+ try
{
- @Override
- public boolean handle(StoredMessage<?> storedMessage)
+ ResultSet rs = statement.executeQuery();
+ try
{
- long id = storedMessage.getMessageNumber();
- if (_messageId.get() < id)
+ while(rs.next())
{
- _messageId.set(id);
+ long maxMessageId = rs.getLong(col);
+ if(_messageId.get() < maxMessageId)
+ {
+ _messageId.set(maxMessageId);
+ }
}
- return true;
+
}
- });
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ statement.close();
+ }
}
protected void upgrade(ConfiguredObject<?> parent) throws StoreException
@@ -410,14 +449,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
if(metaData.isPersistent())
{
- return new StoredJDBCMessage(_messageId.incrementAndGet(), metaData);
+ return new StoredJDBCMessage(getNextMessageId(), metaData);
}
else
{
- return new StoredMemoryMessage(_messageId.incrementAndGet(), metaData);
+ return new StoredMemoryMessage(getNextMessageId(), metaData);
}
}
+ @Override
+ public long getNextMessageId()
+ {
+ return _messageId.incrementAndGet();
+ }
+
private void removeMessage(long messageId)
{
try
@@ -1353,6 +1398,51 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
+ public StoredMessage<?> getMessage(long messageId) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ StoredJDBCMessage message;
+ try
+ {
+ conn = newAutoCommitConnection();
+ try (PreparedStatement stmt = conn.prepareStatement(SELECT_ONE_FROM_META_DATA))
+ {
+ stmt.setLong(1, messageId);
+ try (ResultSet rs = stmt.executeQuery())
+ {
+ if (rs.next())
+ {
+ byte[] dataAsBytes = getBlobAsBytes(rs, 2);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType<?> type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
+ message = new StoredJDBCMessage(messageId, metaData, true);
+
+ }
+ else
+ {
+ message = null;
+ }
+ }
+ }
+ return message;
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting messages", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+ }
+
+
+ @Override
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -1404,6 +1494,53 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
@Override
+ public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException
+ {
+ checkMessageStoreOpen();
+
+ Connection conn = null;
+ try
+ {
+ conn = newAutoCommitConnection();
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_QUEUE_ENTRY_FOR_QUEUE);
+ try
+ {
+ stmt.setString(1, queue.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while(rs.next())
+ {
+ String id = rs.getString(1);
+ long messageId = rs.getLong(2);
+ if (!handler.handle(UUID.fromString(id), messageId))
+ {
+ break;
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ catch(SQLException e)
+ {
+ throw new StoreException("Error encountered when visiting message instances", e);
+ }
+ finally
+ {
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+
+ }
+
+ @Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index 3df1ffb6bc..f4551aae05 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -194,7 +194,7 @@ public class MemoryMessageStore implements MessageStore
@Override
public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
{
- long id = _messageId.getAndIncrement();
+ long id = getNextMessageId();
if(metaData.isPersistent())
{
@@ -224,6 +224,12 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public long getNextMessageId()
+ {
+ return _messageId.getAndIncrement();
+ }
+
+ @Override
public boolean isPersistent()
{
return false;
@@ -275,6 +281,12 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ return _messages.get(messageId);
+ }
+
+ @Override
public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
{
synchronized (_transactionLock)
@@ -294,6 +306,27 @@ public class MemoryMessageStore implements MessageStore
}
@Override
+ public void visitMessageInstances(TransactionLogResource queue, final MessageInstanceHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ Set<Long> ids = _messageInstances.get(queue.getId());
+ if(ids != null)
+ {
+ for (long id : ids)
+ {
+ if (!handler.handle(queue.getId(), id))
+ {
+ return;
+ }
+
+ }
+ }
+ }
+ }
+
+
+ @Override
public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
{
synchronized (_transactionLock)
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
index 642fcbdc54..a4eaf48353 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStore.java
@@ -32,6 +32,8 @@ import org.apache.qpid.server.store.handler.MessageInstanceHandler;
*/
public interface MessageStore
{
+ long getNextMessageId();
+
String getStoreLocation();
void addEventListener(EventListener eventListener, Event... events);
@@ -56,10 +58,12 @@ public interface MessageStore
void visitMessages(MessageHandler handler) throws StoreException;
void visitMessageInstances(MessageInstanceHandler handler) throws StoreException;
+ void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException;
void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException;
<T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
+ StoredMessage<?> getMessage(long messageId);
/**
* Is this store capable of persisting the data
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 8848dcdd94..15bf13bcab 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -127,6 +127,11 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
}
@Override
+ public void visitMessageInstances(TransactionLogResource queue, MessageInstanceHandler handler) throws StoreException
+ {
+ }
+
+ @Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
}
@@ -136,4 +141,15 @@ public abstract class NullMessageStore implements MessageStore, DurableConfigura
{
}
+ @Override
+ public long getNextMessageId()
+ {
+ return 0;
+ }
+
+ @Override
+ public StoredMessage<?> getMessage(final long messageId)
+ {
+ return null;
+ }
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
index aa689cc7e0..535ad77ea4 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
@@ -68,7 +68,6 @@ public class DtxBranch
ROLLBACK_ONLY
}
-
public DtxBranch(Xid xid, MessageStore store, VirtualHostImpl vhost)
{
_xid = xid;
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 1765aebcca..1cd14dc025 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -88,6 +88,8 @@ import org.apache.qpid.server.util.MapValueConverter;
public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> extends AbstractConfiguredObject<X>
implements VirtualHostImpl<X, AMQQueue<?>, ExchangeImpl<?>>, IConnectionRegistry.RegistryChangeListener, EventListener
{
+ private static final String USE_ASYNC_RECOVERY = "use_async_message_store_recovery";
+
public static final String DEFAULT_DLQ_NAME_SUFFIX = "_DLQ";
public static final String DLQ_ROUTING_KEY = "dlq";
public static final String CREATE_DLQ_ON_CREATION = "x-qpid-dlq-enabled"; // TODO - this value should change
@@ -149,10 +151,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@ManagedAttributeField
private int _housekeepingThreadCount;
+
+ private boolean _useAsyncRecoverer;
+
private MessageDestination _defaultDestination;
private MessageStore _messageStore;
+
public AbstractVirtualHost(final Map<String, Object> attributes, VirtualHostNode<?> virtualHostNode)
{
super(parentsMap(virtualHostNode), attributes);
@@ -1332,7 +1338,20 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
createDefaultExchanges();
}
- new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
+ MessageStoreRecoverer messageStoreRecoverer;
+
+
+
+ if(getContextValue(Boolean.class, USE_ASYNC_RECOVERY))
+ {
+ messageStoreRecoverer = new AsynchronousMessageStoreRecoverer();
+ }
+ else
+ {
+ messageStoreRecoverer = new SynchronousMessageStoreRecoverer();
+ }
+ messageStoreRecoverer.recover(this);
+
State finalState = State.ERRORED;
try
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
new file mode 100644
index 0000000000..9742404225
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AsynchronousMessageStoreRecoverer.java
@@ -0,0 +1,427 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
+
+public class AsynchronousMessageStoreRecoverer implements MessageStoreRecoverer
+{
+ private static final Logger _logger = Logger.getLogger(AsynchronousMessageStoreRecoverer.class);
+
+ @Override
+ public void recover(final VirtualHostImpl virtualHost)
+ {
+ AsynchronousRecoverer asynchronousRecoverer = new AsynchronousRecoverer(virtualHost);
+
+ asynchronousRecoverer.recover();
+ }
+
+ private static class AsynchronousRecoverer
+ {
+ private final VirtualHostImpl<?, ?, ?> _virtualHost;
+ private final EventLogger _eventLogger;
+ private final MessageStore _store;
+ private final MessageStoreLogSubject _logSubject;
+ private final long _maxMessageId;
+ private final Set<AMQQueue<?>> _recoveringQueues = new CopyOnWriteArraySet<>();
+ private final AtomicBoolean _recoveryComplete = new AtomicBoolean();
+ private final Map<Long, MessageReference<? extends ServerMessage<?>>> _recoveredMessages = new HashMap<>();
+
+
+ private AsynchronousRecoverer(final VirtualHostImpl<?, ?, ?> virtualHost)
+ {
+ _virtualHost = virtualHost;
+ _eventLogger = virtualHost.getEventLogger();
+ _store = virtualHost.getMessageStore();
+ _logSubject = new MessageStoreLogSubject(virtualHost.getName(), _store.getClass().getSimpleName());
+
+ _maxMessageId = _store.getNextMessageId();
+ _recoveringQueues.addAll(_virtualHost.getQueues());
+
+ }
+
+ public void recover()
+ {
+ getStore().visitDistributedTransactions(new DistributedTransactionVisitor());
+
+ for(AMQQueue<?> queue : _recoveringQueues)
+ {
+ Thread queueThread = new Thread(new QueueRecoveringTask(queue), "Queue Recoverer : " + queue.getName() + " (vh: " + getVirtualHost().getName() + ")");
+ queueThread.start();
+ }
+ }
+
+ public VirtualHostImpl<?, ?, ?> getVirtualHost()
+ {
+ return _virtualHost;
+ }
+
+ public EventLogger getEventLogger()
+ {
+ return _eventLogger;
+ }
+
+ public MessageStore getStore()
+ {
+ return _store;
+ }
+
+ public MessageStoreLogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ private boolean isRecovering(AMQQueue<?> queue)
+ {
+ return _recoveringQueues.contains(queue);
+ }
+
+ private void recoverQueue(AMQQueue<?> queue)
+ {
+ MessageInstanceVisitor handler = new MessageInstanceVisitor(queue);
+ _store.visitMessageInstances(queue, handler);
+
+ getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERED(handler.getRecoveredCount(), queue.getName()));
+ getEventLogger().message(getLogSubject(), TransactionLogMessages.RECOVERY_COMPLETE(queue.getName(), true));
+ queue.completeRecovery();
+
+ _recoveringQueues.remove(queue);
+ if (_recoveringQueues.isEmpty() && _recoveryComplete.compareAndSet(false, true))
+ {
+ completeRecovery();
+ }
+ }
+
+ private synchronized void completeRecovery()
+ {
+ // at this point nothing should be writing to the map of recovered messages
+ for (MessageReference<? extends ServerMessage<?>> entry : _recoveredMessages.values())
+ {
+ entry.release();
+ }
+ _recoveredMessages.clear();
+ }
+
+ private synchronized ServerMessage<?> getRecoveredMessage(final long messageId)
+ {
+ MessageReference<? extends ServerMessage<?>> ref = _recoveredMessages.get(messageId);
+ if (ref == null)
+ {
+ StoredMessage<?> message = _store.getMessage(messageId);
+ if(message != null)
+ {
+ StorableMessageMetaData metaData = message.getMetaData();
+
+ @SuppressWarnings("rawtypes")
+ MessageMetaDataType type = metaData.getType();
+
+ @SuppressWarnings("unchecked")
+ ServerMessage<?> serverMessage = type.createMessage(message);
+
+ ref = serverMessage.newReference();
+ _recoveredMessages.put(messageId, ref);
+ }
+ }
+ return ref == null ? null : ref.getMessage();
+ }
+
+
+ private class DistributedTransactionVisitor implements DistributedTransactionHandler
+ {
+
+
+
+ @Override
+ public boolean handle(long format,
+ byte[] globalId,
+ byte[] branchId,
+ Transaction.Record[] enqueues,
+ Transaction.Record[] dequeues)
+ {
+ Xid id = new Xid(format, globalId, branchId);
+ DtxRegistry dtxRegistry = getVirtualHost().getDtxRegistry();
+ DtxBranch branch = dtxRegistry.getBranch(id);
+ if (branch == null)
+ {
+ branch = new DtxBranch(id, getStore(), getVirtualHost());
+ dtxRegistry.registerBranch(branch);
+ }
+ for (Transaction.Record record : enqueues)
+ {
+ final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId());
+ if (queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = getRecoveredMessage(messageId);
+
+ if (message != null)
+ {
+ final MessageReference<?> ref = message.newReference();
+
+ branch.enqueue(queue, message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ queue.enqueue(message, null);
+ ref.release();
+ }
+
+ public void onRollback()
+ {
+ ref.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ getEventLogger().message(getLogSubject(),
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(
+ messageId)));
+ }
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ getEventLogger().message(getLogSubject(),
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource()
+ .getId()
+ .toString()));
+
+ }
+ }
+ for (Transaction.Record record : dequeues)
+ {
+
+ final AMQQueue<?> queue = getVirtualHost().getQueue(record.getResource().getId());
+
+ if (queue != null)
+ {
+ // For DTX to work correctly the queues which have uncommitted branches with dequeues
+ // must be synchronously recovered
+
+ if (isRecovering(queue))
+ {
+ recoverQueue(queue);
+ }
+
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = getRecoveredMessage(messageId);
+
+ if (message != null)
+ {
+ final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+
+ entry.acquire();
+
+ branch.dequeue(queue, message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ getEventLogger().message(getLogSubject(),
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(
+ messageId)));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ getEventLogger().message(getLogSubject(),
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource()
+ .getId()
+ .toString()));
+ }
+
+ }
+
+
+ branch.setState(DtxBranch.State.PREPARED);
+ branch.prePrepareTransaction();
+
+ return true;
+ }
+
+ private StringBuilder xidAsString(Xid id)
+ {
+ return new StringBuilder("(")
+ .append(id.getFormat())
+ .append(',')
+ .append(Functions.str(id.getGlobalId()))
+ .append(',')
+ .append(Functions.str(id.getBranchId()))
+ .append(')');
+ }
+
+
+ }
+
+ private class QueueRecoveringTask implements Runnable
+ {
+ private final AMQQueue<?> _queue;
+
+ public QueueRecoveringTask(final AMQQueue<?> queue)
+ {
+ _queue = queue;
+ }
+
+ @Override
+ public void run()
+ {
+ recoverQueue(_queue);
+ }
+ }
+
+ private class MessageInstanceVisitor implements MessageInstanceHandler
+ {
+ private final AMQQueue<?> _queue;
+ long _recoveredCount;
+
+ private MessageInstanceVisitor(AMQQueue<?> queue)
+ {
+ _queue = queue;
+ }
+
+ @Override
+ public boolean handle(final UUID queueId, long messageId)
+ {
+ String queueName = _queue.getName();
+
+ if(messageId < _maxMessageId)
+ {
+ ServerMessage<?> message = getRecoveredMessage(messageId);
+
+ if (message != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
+ }
+
+ _queue.recover(message);
+ _recoveredCount++;
+ }
+ else
+ {
+ _logger.warn("Message id "
+ + messageId
+ + " referenced in log as enqueued in queue "
+ + queueName
+ + " is unknown, entry will be discarded");
+ Transaction txn = getStore().newTransaction();
+ txn.dequeueMessage(_queue, new DummyMessage(messageId));
+ txn.commitTranAsync();
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+
+ }
+
+ public long getRecoveredCount()
+ {
+ return _recoveredCount;
+ }
+ }
+ }
+
+ private static class DummyMessage implements EnqueueableMessage
+ {
+
+ private final long _messageId;
+
+ public DummyMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ }
+
+
+}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
index 662a26bb2e..9f4c7dd319 100644
--- a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/MessageStoreRecoverer.java
@@ -20,338 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
-import org.apache.qpid.server.logging.messages.TransactionLogMessages;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.message.MessageReference;
-import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StorableMessageMetaData;
-import org.apache.qpid.server.store.StoredMessage;
-import org.apache.qpid.server.store.Transaction;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.Transaction.Record;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-import org.apache.qpid.server.txn.DtxBranch;
-import org.apache.qpid.server.txn.DtxRegistry;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.transport.Xid;
-import org.apache.qpid.transport.util.Functions;
-
-public class MessageStoreRecoverer
+public interface MessageStoreRecoverer
{
- private static final Logger _logger = Logger.getLogger(MessageStoreRecoverer.class);
-
- private final VirtualHostImpl _virtualHost;
-
- private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private final Map<Long, ServerMessage<?>> _recoveredMessages = new HashMap<Long, ServerMessage<?>>();
- private final Map<Long, StoredMessage<?>> _unusedMessages = new HashMap<Long, StoredMessage<?>>();
- private final EventLogger _eventLogger;
-
- private final MessageStoreLogSubject _logSubject;
- private final MessageStore _store;
-
-
- public MessageStoreRecoverer(VirtualHostImpl virtualHost, MessageStoreLogSubject logSubject)
- {
- super();
- _virtualHost = virtualHost;
- _eventLogger = virtualHost.getEventLogger();
- _logSubject = logSubject;
- _store = virtualHost.getMessageStore();
- }
-
-
- public void recover()
- {
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_START());
- _store.visitMessages(messageVisitor);
-
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
- _store.visitMessageInstances(messageAndMessageInstanceRecoverer);
-
- for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
- {
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
- }
-
- _store.visitDistributedTransactions(distributedTransactionRecoverer);
-
-
-
- for(StoredMessage<?> m : _unusedMessages.values())
- {
- _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
- m.remove();
- }
- _eventLogger.message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
-
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERED(_recoveredMessages.size() - _unusedMessages.size()));
- _eventLogger.message(_logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
-
-
- }
-
- MessageHandler messageVisitor = new MessageHandler()
- {
-
- @Override
- public boolean handle(StoredMessage<?> message)
- {
- StorableMessageMetaData metaData = message.getMetaData();
-
- @SuppressWarnings("rawtypes")
- MessageMetaDataType type = metaData.getType();
-
- @SuppressWarnings("unchecked")
- ServerMessage<?> serverMessage = type.createMessage(message);
-
- _recoveredMessages.put(message.getMessageNumber(), serverMessage);
- _unusedMessages.put(message.getMessageNumber(), message);
- return true;
- }
-
- };
-
- MessageInstanceHandler messageAndMessageInstanceRecoverer = new MessageInstanceHandler()
- {
- @Override
- public boolean handle(final UUID queueId, long messageId)
- {
- AMQQueue<?> queue = _virtualHost.getQueue(queueId);
- if(queue != null)
- {
- String queueName = queue.getName();
- ServerMessage<?> message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
- }
-
- Integer count = _queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
-
- queue.enqueue(message,null);
-
- _queueRecoveries.put(queueName, ++count);
- }
- else
- {
- _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
- Transaction txn = _store.newTransaction();
- txn.dequeueMessage(queue, new DummyMessage(messageId));
- txn.commitTranAsync();
- }
- }
- else
- {
- _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
- Transaction txn = _store.newTransaction();
- TransactionLogResource mockQueue =
- new TransactionLogResource()
- {
- @Override
- public String getName()
- {
- return "<<UNKNOWN>>";
- }
-
- @Override
- public UUID getId()
- {
- return queueId;
- }
-
- @Override
- public boolean isDurable()
- {
- return false;
- }
- };
- txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
- txn.commitTranAsync();
- }
- return true;
- }
- };
-
- private DistributedTransactionHandler distributedTransactionRecoverer = new DistributedTransactionHandler()
- {
-
- @Override
- public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- Xid id = new Xid(format, globalId, branchId);
- DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
- DtxBranch branch = dtxRegistry.getBranch(id);
- if(branch == null)
- {
- branch = new DtxBranch(id, _store, _virtualHost);
- dtxRegistry.registerBranch(branch);
- }
- for(Transaction.Record record : enqueues)
- {
- final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
- if(queue != null)
- {
- final long messageId = record.getMessage().getMessageNumber();
- final ServerMessage<?> message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
- final MessageReference<?> ref = message.newReference();
-
- branch.enqueue(queue,message);
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- queue.enqueue(message, null);
- ref.release();
- }
-
- public void onRollback()
- {
- ref.release();
- }
- });
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
- }
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
-
- }
- }
- for(Transaction.Record record : dequeues)
- {
- final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
- if(queue != null)
- {
- final long messageId = record.getMessage().getMessageNumber();
- final ServerMessage<?> message = _recoveredMessages.get(messageId);
- _unusedMessages.remove(messageId);
-
- if(message != null)
- {
- final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
-
- entry.acquire();
-
- branch.dequeue(queue, message);
-
- branch.addPostTransactionAction(new ServerTransaction.Action()
- {
-
- public void postCommit()
- {
- entry.delete();
- }
-
- public void onRollback()
- {
- entry.release();
- }
- });
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
- Long.toString(messageId)));
-
- }
-
- }
- else
- {
- StringBuilder xidString = xidAsString(id);
- _eventLogger.message(_logSubject,
- TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getResource().getId().toString()));
- }
-
- }
-
- branch.setState(DtxBranch.State.PREPARED);
- branch.prePrepareTransaction();
- return true;
- }
-
- private StringBuilder xidAsString(Xid id)
- {
- return new StringBuilder("(")
- .append(id.getFormat())
- .append(',')
- .append(Functions.str(id.getGlobalId()))
- .append(',')
- .append(Functions.str(id.getBranchId()))
- .append(')');
- }
-
-
- };
-
-
- private static class DummyMessage implements EnqueueableMessage
- {
-
- private final long _messageId;
-
- public DummyMessage(long messageId)
- {
- _messageId = messageId;
- }
-
- public long getMessageNumber()
- {
- return _messageId;
- }
-
- public boolean isPersistent()
- {
- return true;
- }
-
- public StoredMessage getStoredMessage()
- {
- return null;
- }
- }
+ void recover(VirtualHostImpl virtualHost);
}
diff --git a/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
new file mode 100644
index 0000000000..9fc71e23d7
--- /dev/null
+++ b/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecoverer.java
@@ -0,0 +1,416 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
+import org.apache.qpid.server.logging.messages.TransactionLogMessages;
+import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
+
+public class SynchronousMessageStoreRecoverer implements MessageStoreRecoverer
+{
+ private static final Logger _logger = Logger.getLogger(SynchronousMessageStoreRecoverer.class);
+
+ @Override
+ public void recover(VirtualHostImpl virtualHost)
+ {
+ EventLogger eventLogger = virtualHost.getEventLogger();
+ MessageStore store = virtualHost.getMessageStore();
+ MessageStoreLogSubject logSubject = new MessageStoreLogSubject(virtualHost.getName(), store.getClass().getSimpleName());
+
+ Map<String, Integer> queueRecoveries = new TreeMap<>();
+ Map<Long, ServerMessage<?>> recoveredMessages = new HashMap<>();
+ Map<Long, StoredMessage<?>> unusedMessages = new HashMap<>();
+
+
+ eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_START());
+
+ store.visitMessages(new MessageVisitor(recoveredMessages, unusedMessages));
+
+ eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_START(null, false));
+ store.visitMessageInstances(new MessageInstanceVisitor(virtualHost, store, queueRecoveries,
+ recoveredMessages, unusedMessages));
+ for(Map.Entry<String,Integer> entry : queueRecoveries.entrySet())
+ {
+ eventLogger.message(logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
+ eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
+ virtualHost.getQueue(entry.getKey()).completeRecovery();
+ }
+
+ Collection<AMQQueue> allQueues = virtualHost.getQueues();
+
+ for(AMQQueue q : allQueues)
+ {
+ if(!queueRecoveries.containsKey(q.getName()))
+ {
+ q.completeRecovery();
+ }
+ }
+
+ store.visitDistributedTransactions(new DistributedTransactionVisitor(virtualHost, store, eventLogger,
+ logSubject, recoveredMessages, unusedMessages));
+
+
+
+ for(StoredMessage<?> m : unusedMessages.values())
+ {
+ _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+ m.remove();
+ }
+ eventLogger.message(logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+
+ eventLogger.message(logSubject,
+ MessageStoreMessages.RECOVERED(recoveredMessages.size() - unusedMessages.size()));
+ eventLogger.message(logSubject, MessageStoreMessages.RECOVERY_COMPLETE());
+
+
+ }
+
+ private static class MessageVisitor implements MessageHandler
+ {
+
+ private final Map<Long, ServerMessage<?>> _recoveredMessages;
+ private final Map<Long, StoredMessage<?>> _unusedMessages;
+
+ public MessageVisitor(final Map<Long, ServerMessage<?>> recoveredMessages,
+ final Map<Long, StoredMessage<?>> unusedMessages)
+ {
+ _recoveredMessages = recoveredMessages;
+ _unusedMessages = unusedMessages;
+ }
+
+ @Override
+ public boolean handle(StoredMessage<?> message)
+ {
+ StorableMessageMetaData metaData = message.getMetaData();
+
+ @SuppressWarnings("rawtypes")
+ MessageMetaDataType type = metaData.getType();
+
+ @SuppressWarnings("unchecked")
+ ServerMessage<?> serverMessage = type.createMessage(message);
+
+ _recoveredMessages.put(message.getMessageNumber(), serverMessage);
+ _unusedMessages.put(message.getMessageNumber(), message);
+ return true;
+ }
+
+ }
+
+ private static class MessageInstanceVisitor implements MessageInstanceHandler
+ {
+ private final VirtualHostImpl _virtualHost;
+ private final MessageStore _store;
+
+ private final Map<String, Integer> _queueRecoveries;
+ private final Map<Long, ServerMessage<?>> _recoveredMessages;
+ private final Map<Long, StoredMessage<?>> _unusedMessages;
+
+ private MessageInstanceVisitor(final VirtualHostImpl virtualHost,
+ final MessageStore store,
+ final Map<String, Integer> queueRecoveries,
+ final Map<Long, ServerMessage<?>> recoveredMessages,
+ final Map<Long, StoredMessage<?>> unusedMessages)
+ {
+ _virtualHost = virtualHost;
+ _store = store;
+ _queueRecoveries = queueRecoveries;
+ _recoveredMessages = recoveredMessages;
+ _unusedMessages = unusedMessages;
+ }
+
+ @Override
+ public boolean handle(final UUID queueId, long messageId)
+ {
+ AMQQueue<?> queue = _virtualHost.getQueue(queueId);
+ if(queue != null)
+ {
+ String queueName = queue.getName();
+ ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
+ }
+
+ Integer count = _queueRecoveries.get(queueName);
+ if (count == null)
+ {
+ count = 0;
+ }
+
+ queue.recover(message);
+
+ _queueRecoveries.put(queueName, ++count);
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
+ Transaction txn = _store.newTransaction();
+ txn.dequeueMessage(queue, new DummyMessage(messageId));
+ txn.commitTranAsync();
+ }
+ }
+ else
+ {
+ _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
+ Transaction txn = _store.newTransaction();
+ TransactionLogResource mockQueue =
+ new TransactionLogResource()
+ {
+ @Override
+ public String getName()
+ {
+ return "<<UNKNOWN>>";
+ }
+
+ @Override
+ public UUID getId()
+ {
+ return queueId;
+ }
+
+ @Override
+ public boolean isDurable()
+ {
+ return false;
+ }
+ };
+ txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
+ txn.commitTranAsync();
+ }
+ return true;
+ }
+ }
+
+ private static class DistributedTransactionVisitor implements DistributedTransactionHandler
+ {
+
+ private final VirtualHostImpl _virtualHost;
+ private final MessageStore _store;
+ private final EventLogger _eventLogger;
+ private final MessageStoreLogSubject _logSubject;
+
+ private final Map<Long, ServerMessage<?>> _recoveredMessages;
+ private final Map<Long, StoredMessage<?>> _unusedMessages;
+
+ private DistributedTransactionVisitor(final VirtualHostImpl virtualHost,
+ final MessageStore store,
+ final EventLogger eventLogger,
+ final MessageStoreLogSubject logSubject,
+ final Map<Long, ServerMessage<?>> recoveredMessages,
+ final Map<Long, StoredMessage<?>> unusedMessages)
+ {
+ _virtualHost = virtualHost;
+ _store = store;
+ _eventLogger = eventLogger;
+ _logSubject = logSubject;
+ _recoveredMessages = recoveredMessages;
+ _unusedMessages = unusedMessages;
+ }
+
+ @Override
+ public boolean handle(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ Xid id = new Xid(format, globalId, branchId);
+ DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
+ DtxBranch branch = dtxRegistry.getBranch(id);
+ if(branch == null)
+ {
+ branch = new DtxBranch(id, _store, _virtualHost);
+ dtxRegistry.registerBranch(branch);
+ }
+ for(Transaction.Record record : enqueues)
+ {
+ final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final MessageReference<?> ref = message.newReference();
+
+ branch.enqueue(queue,message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ queue.enqueue(message, null);
+ ref.release();
+ }
+
+ public void onRollback()
+ {
+ ref.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
+ }
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
+
+ }
+ }
+ for(Transaction.Record record : dequeues)
+ {
+ final AMQQueue<?> queue = _virtualHost.getQueue(record.getResource().getId());
+ if(queue != null)
+ {
+ final long messageId = record.getMessage().getMessageNumber();
+ final ServerMessage<?> message = _recoveredMessages.get(messageId);
+ _unusedMessages.remove(messageId);
+
+ if(message != null)
+ {
+ final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+
+ entry.acquire();
+
+ branch.dequeue(queue, message);
+
+ branch.addPostTransactionAction(new ServerTransaction.Action()
+ {
+
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ public void onRollback()
+ {
+ entry.release();
+ }
+ });
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+ Long.toString(messageId)));
+
+ }
+
+ }
+ else
+ {
+ StringBuilder xidString = xidAsString(id);
+ _eventLogger.message(_logSubject,
+ TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+ record.getResource().getId().toString()));
+ }
+
+ }
+
+ branch.setState(DtxBranch.State.PREPARED);
+ branch.prePrepareTransaction();
+ return true;
+ }
+
+ private StringBuilder xidAsString(Xid id)
+ {
+ return new StringBuilder("(")
+ .append(id.getFormat())
+ .append(',')
+ .append(Functions.str(id.getGlobalId()))
+ .append(',')
+ .append(Functions.str(id.getBranchId()))
+ .append(')');
+ }
+
+
+ }
+
+
+ private static class DummyMessage implements EnqueueableMessage
+ {
+
+ private final long _messageId;
+
+ public DummyMessage(long messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getMessageNumber()
+ {
+ return _messageId;
+ }
+
+ public boolean isPersistent()
+ {
+ return true;
+ }
+
+ public StoredMessage getStoredMessage()
+ {
+ return null;
+ }
+ }
+
+}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
new file mode 100644
index 0000000000..3e15ada10c
--- /dev/null
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.BrokerTestHelper;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class QueueMessageRecoveryTest extends QpidTestCase
+{
+ VirtualHostImpl<?, ?, ?> _vhost;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _vhost = BrokerTestHelper.createVirtualHost("host");
+ }
+
+ public void testSimpleRecovery() throws Exception
+ {
+ // Simple deterministic test to prove that interleaved recovery and new publishing onto a queue is correctly
+ // handled
+ final List<ServerMessage<?>> messageList = new ArrayList<>();
+ TestQueue queue = new TestQueue(Collections.singletonMap(Queue.NAME, (Object)"test"), _vhost, messageList);
+
+ queue.open();
+
+ queue.recover(createMockMessage(0));
+ queue.enqueue(createMockMessage(4), null);
+ queue.enqueue(createMockMessage(5), null);
+ queue.recover(createMockMessage(1));
+ queue.recover(createMockMessage(2));
+ queue.enqueue(createMockMessage(6), null);
+ queue.recover(createMockMessage(3));
+
+ assertEquals(4, messageList.size());
+ for(int i = 0; i < 4; i++)
+ {
+ assertEquals((long)i, messageList.get(i).getMessageNumber());
+ }
+
+ queue.completeRecovery();
+
+ queue.enqueue(createMockMessage(7), null);
+
+ assertEquals(8, messageList.size());
+
+ for(int i = 0; i < 8; i++)
+ {
+ assertEquals((long)i, messageList.get(i).getMessageNumber());
+ }
+
+ }
+
+
+ public void testMultiThreadedRecovery() throws Exception
+ {
+ // Non deterministic test with separate publishing and recovery threads
+
+ performMultiThreadedRecovery(5000);
+ }
+
+ public void testRepeatedMultiThreadedRecovery() throws Exception
+ {
+ // Repeated non deterministic test with separate publishing and recovery threads(but with fewer messages being
+ // published/recovered
+
+ for(int i = 0; i < 50; i++)
+ {
+ performMultiThreadedRecovery(10);
+ }
+ }
+
+ private void performMultiThreadedRecovery(final int size) throws Exception
+ {
+
+ final List<ServerMessage<?>> messageList = new ArrayList<>();
+ final TestQueue queue = new TestQueue(Collections.singletonMap(Queue.NAME, (Object) "test"), _vhost, messageList);
+
+ queue.open();
+
+
+ Thread recoveryThread = new Thread(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ for(int i = 0; i < size; i++)
+ {
+ queue.recover(createMockMessage(i));
+ }
+ queue.completeRecovery();
+ }
+ }, "recovery thread");
+
+ Thread publishingThread = new Thread(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ for(int i = 0; i < size; i++)
+ {
+ queue.enqueue(createMockMessage(size + i), null);
+ }
+ }
+ }, "publishing thread");
+
+ recoveryThread.start();
+ publishingThread.start();
+
+ recoveryThread.join(10000);
+ publishingThread.join(10000);
+
+ assertEquals(size*2, messageList.size());
+
+ for(int i = 0; i < (size*2); i++)
+ {
+ assertEquals((long)i, messageList.get(i).getMessageNumber());
+ }
+ }
+
+
+ private ServerMessage createMockMessage(final long i)
+ {
+ ServerMessage msg = mock(ServerMessage.class);
+ when(msg.getMessageNumber()).thenReturn(i);
+ MessageReference ref = mock(MessageReference.class);
+ when(ref.getMessage()).thenReturn(msg);
+ when(msg.newReference()).thenReturn(ref);
+ return msg;
+ }
+
+ private class TestQueue extends AbstractQueue<TestQueue>
+ {
+
+ private final List<ServerMessage<?>> _messageList;
+
+ protected TestQueue(final Map<String, Object> attributes,
+ final VirtualHostImpl virtualHost,
+ final List<ServerMessage<?>> messageList)
+ {
+ super(attributes, virtualHost);
+ _messageList = messageList;
+ }
+
+ @Override
+ QueueEntryList getEntries()
+ {
+ return null;
+ }
+
+ @Override
+ protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action)
+ {
+ synchronized(_messageList)
+ {
+ _messageList.add(message);
+ }
+ }
+ }
+}
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index 71f0bed3d9..d328e21a94 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -142,7 +142,7 @@ public class StandardQueueTest extends AbstractQueueTestBase
{
// create a queue where each even entry is considered a dequeued
AbstractQueue queue = new DequeuedQueue(getVirtualHost());
- queue.open();
+ queue.create();
// create a consumer
MockConsumer consumer = new MockConsumer();
@@ -188,7 +188,7 @@ public class StandardQueueTest extends AbstractQueueTestBase
// do nothing
}
};
- testQueue.open();
+ testQueue.create();
// put messages
List<StandardQueueEntry> entries =
diff --git a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
index 2a5e0ff763..685fea207b 100644
--- a/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MessageStoreRecovererTest.java
+++ b/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/SynchronousMessageStoreRecovererTest.java
@@ -34,9 +34,9 @@ import static org.mockito.Mockito.when;
import java.util.UUID;
import junit.framework.TestCase;
+import org.mockito.ArgumentMatcher;
import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
@@ -58,9 +58,8 @@ import org.apache.qpid.server.txn.DtxBranch;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.Xid;
-import org.mockito.ArgumentMatcher;
-public class MessageStoreRecovererTest extends TestCase
+public class SynchronousMessageStoreRecovererTest extends TestCase
{
private VirtualHostImpl _virtualHost;
@@ -99,11 +98,12 @@ public class MessageStoreRecovererTest extends TestCase
when(_virtualHost.getMessageStore()).thenReturn(store);
- MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
- recoverer.recover();
+ SynchronousMessageStoreRecoverer
+ recoverer = new SynchronousMessageStoreRecoverer();
+ recoverer.recover(_virtualHost);
ServerMessage<?> message = storedMessage.getMetaData().getType().createMessage(storedMessage);
- verify(queue, times(1)).enqueue(eq(message), (Action<? super MessageInstance>)isNull());
+ verify(queue, times(1)).recover(eq(message));
}
@SuppressWarnings("unchecked")
@@ -137,8 +137,9 @@ public class MessageStoreRecovererTest extends TestCase
when(_virtualHost.getMessageStore()).thenReturn(store);
- MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
- recoverer.recover();
+ SynchronousMessageStoreRecoverer
+ recoverer = new SynchronousMessageStoreRecoverer();
+ recoverer.recover(_virtualHost);
verify(queue, never()).enqueue(any(ServerMessage.class), any(Action.class));
verify(transaction).dequeueMessage(same(queue), argThat(new MessageNumberMatcher(messageId)));
@@ -175,8 +176,9 @@ public class MessageStoreRecovererTest extends TestCase
when(_virtualHost.getMessageStore()).thenReturn(store);
- MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
- recoverer.recover();
+ SynchronousMessageStoreRecoverer
+ recoverer = new SynchronousMessageStoreRecoverer();
+ recoverer.recover(_virtualHost);
verify(transaction).dequeueMessage(argThat(new QueueIdMatcher(queueId)), argThat(new MessageNumberMatcher(messageId)));
verify(transaction, times(1)).commitTranAsync();
@@ -205,8 +207,9 @@ public class MessageStoreRecovererTest extends TestCase
when(_virtualHost.getMessageStore()).thenReturn(store);
- MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
- recoverer.recover();
+ SynchronousMessageStoreRecoverer
+ recoverer = new SynchronousMessageStoreRecoverer();
+ recoverer.recover(_virtualHost);
verify(storedMessage, times(1)).remove();
}
@@ -262,8 +265,9 @@ public class MessageStoreRecovererTest extends TestCase
when(_virtualHost.getMessageStore()).thenReturn(store);
when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
- MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
- recoverer.recover();
+ SynchronousMessageStoreRecoverer
+ recoverer = new SynchronousMessageStoreRecoverer();
+ recoverer.recover(_virtualHost);
DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
assertNotNull("Expected dtx branch to be created", branch);
@@ -330,8 +334,9 @@ public class MessageStoreRecovererTest extends TestCase
when(_virtualHost.getMessageStore()).thenReturn(store);
when(_virtualHost.getDtxRegistry()).thenReturn(dtxRegistry);
- MessageStoreRecoverer recoverer = new MessageStoreRecoverer(_virtualHost, mock(MessageStoreLogSubject.class));
- recoverer.recover();
+ SynchronousMessageStoreRecoverer
+ recoverer = new SynchronousMessageStoreRecoverer();
+ recoverer.recover(_virtualHost);
DtxBranch branch = dtxRegistry.getBranch(new Xid(format, globalId, branchId));
assertNotNull("Expected dtx branch to be created", branch);
@@ -377,6 +382,7 @@ public class MessageStoreRecovererTest extends TestCase
when(queue.getId()).thenReturn(queueId);
when(queue.getName()).thenReturn("test-queue");
when(_virtualHost.getQueue(queueId)).thenReturn(queue);
+ when(_virtualHost.getQueue("test-queue")).thenReturn(queue);
return queue;
}