diff options
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java')
-rw-r--r-- | qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java | 113 |
1 files changed, 90 insertions, 23 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java index 9146fe88ae..ab8c1e7c9c 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -22,14 +22,15 @@ package org.apache.qpid.server.store; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.queue.MessageMetaData; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.framing.abstraction.ContentChunk; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; import java.util.HashMap; import java.util.List; +import java.nio.ByteBuffer; /** * Adds some extra methods to the memory message store for testing purposes. @@ -39,6 +40,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore MemoryMessageStore _mms = null; private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>(); + private AtomicInteger _messageCount = new AtomicInteger(0); public TestableMemoryMessageStore(MemoryMessageStore mms) { @@ -47,46 +49,111 @@ public class TestableMemoryMessageStore extends MemoryMessageStore public TestableMemoryMessageStore() { - _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); - _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); + + } + + + + + @Override + public StoredMessage addMessage(StorableMessageMetaData metaData) + { + return new TestableStoredMessage(super.addMessage(metaData)); + } + + public int getMessageCount() + { + return _messageCount.get(); } - public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() + private class TestableTransaction implements Transaction { - if (_mms != null) + public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException { - return _mms._metaDataMap; + getMessages().put(messageId, (AMQQueue)queue); } - else + + public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException { - return _metaDataMap; + getMessages().remove(messageId); } - } - public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() - { - if (_mms != null) + public void commitTran() throws AMQException { - return _mms._contentBodyMap; } - else + + public StoreFuture commitTranAsync() throws AMQException + { + return new StoreFuture() + { + public boolean isComplete() + { + return true; + } + + public void waitForCompletion() + { + + } + }; + } + + public void abortTran() throws AMQException { - return _contentBodyMap; } - } - - public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException - { - getMessages().put(messageId, queue); } - public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException + + @Override + public Transaction newTransaction() { - getMessages().remove(messageId); + return new TestableTransaction(); } public HashMap<Long, AMQQueue> getMessages() { return _messages; } + + private class TestableStoredMessage implements StoredMessage + { + private final StoredMessage _storedMessage; + + public TestableStoredMessage(StoredMessage storedMessage) + { + _messageCount.incrementAndGet(); + _storedMessage = storedMessage; + } + + public StorableMessageMetaData getMetaData() + { + return _storedMessage.getMetaData(); + } + + public long getMessageNumber() + { + return _storedMessage.getMessageNumber(); + } + + public void addContent(int offsetInMessage, ByteBuffer src) + { + _storedMessage.addContent(offsetInMessage, src); + } + + public int getContent(int offsetInMessage, ByteBuffer dst) + { + return _storedMessage.getContent(offsetInMessage, dst); + } + + public StoreFuture flushToStore() + { + return _storedMessage.flushToStore(); + } + + public void remove() + { + _storedMessage.remove(); + _messageCount.decrementAndGet(); + } + } } |