summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java')
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java113
1 files changed, 90 insertions, 23 deletions
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index 9146fe88ae..ab8c1e7c9c 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -22,14 +22,15 @@ package org.apache.qpid.server.store;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashMap;
import java.util.List;
+import java.nio.ByteBuffer;
/**
* Adds some extra methods to the memory message store for testing purposes.
@@ -39,6 +40,7 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
MemoryMessageStore _mms = null;
private HashMap<Long, AMQQueue> _messages = new HashMap<Long, AMQQueue>();
+ private AtomicInteger _messageCount = new AtomicInteger(0);
public TestableMemoryMessageStore(MemoryMessageStore mms)
{
@@ -47,46 +49,111 @@ public class TestableMemoryMessageStore extends MemoryMessageStore
public TestableMemoryMessageStore()
{
- _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+
+ }
+
+
+
+
+ @Override
+ public StoredMessage addMessage(StorableMessageMetaData metaData)
+ {
+ return new TestableStoredMessage(super.addMessage(metaData));
+ }
+
+ public int getMessageCount()
+ {
+ return _messageCount.get();
}
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ private class TestableTransaction implements Transaction
{
- if (_mms != null)
+ public void enqueueMessage(TransactionLogResource queue, Long messageId) throws AMQException
{
- return _mms._metaDataMap;
+ getMessages().put(messageId, (AMQQueue)queue);
}
- else
+
+ public void dequeueMessage(TransactionLogResource queue, Long messageId) throws AMQException
{
- return _metaDataMap;
+ getMessages().remove(messageId);
}
- }
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
- {
- if (_mms != null)
+ public void commitTran() throws AMQException
{
- return _mms._contentBodyMap;
}
- else
+
+ public StoreFuture commitTranAsync() throws AMQException
+ {
+ return new StoreFuture()
+ {
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public void waitForCompletion()
+ {
+
+ }
+ };
+ }
+
+ public void abortTran() throws AMQException
{
- return _contentBodyMap;
}
- }
-
- public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
- {
- getMessages().put(messageId, queue);
}
- public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+
+ @Override
+ public Transaction newTransaction()
{
- getMessages().remove(messageId);
+ return new TestableTransaction();
}
public HashMap<Long, AMQQueue> getMessages()
{
return _messages;
}
+
+ private class TestableStoredMessage implements StoredMessage
+ {
+ private final StoredMessage _storedMessage;
+
+ public TestableStoredMessage(StoredMessage storedMessage)
+ {
+ _messageCount.incrementAndGet();
+ _storedMessage = storedMessage;
+ }
+
+ public StorableMessageMetaData getMetaData()
+ {
+ return _storedMessage.getMetaData();
+ }
+
+ public long getMessageNumber()
+ {
+ return _storedMessage.getMessageNumber();
+ }
+
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ _storedMessage.addContent(offsetInMessage, src);
+ }
+
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return _storedMessage.getContent(offsetInMessage, dst);
+ }
+
+ public StoreFuture flushToStore()
+ {
+ return _storedMessage.flushToStore();
+ }
+
+ public void remove()
+ {
+ _storedMessage.remove();
+ _messageCount.decrementAndGet();
+ }
+ }
}