summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2009-04-03 17:54:44 +0000
committerMartin Ritchie <ritchiem@apache.org>2009-04-03 17:54:44 +0000
commit590feae56049782150cc54b46158b77b94c53cdf (patch)
treee81e4f01cabe32894eecfcf8e24ed670e2aea520
parent9c80608ac4e50745f82c4ff75f325ff87bc6ceb9 (diff)
downloadqpid-python-590feae56049782150cc54b46158b77b94c53cdf.tar.gz
QPID-1764 : Updated all tests to use the TestTransactionLog interface and split testing code into subclasses. TestableTransactionLog will now correctly wrap a TransactionLog for testing. To enable testing of the BaseTransactionLog a TestableBaseTransactionLog was needed to only return values that are actually stored in the BaseTL the TestableTransactionLog actually stores single enqueues so that they can be queried by the test.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@761741 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java24
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java50
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java44
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java29
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java16
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java26
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java11
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java178
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java129
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java121
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java10
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java5
16 files changed, 397 insertions, 261 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
index d57b81c362..f5819716cb 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -51,7 +51,7 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class MemoryMessageStore implements TransactionLog, RoutingTable
{
- private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+ protected static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
@@ -154,13 +154,7 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
public void enqueueMessage(StoreContext context, final ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
- for (AMQQueue q : queues)
- {
- if (q.isDurable())
- {
- enqueueMessage(context,q,messageId);
- }
- }
+ // Not required to do anything
}
public void enqueueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
@@ -232,25 +226,13 @@ public class MemoryMessageStore implements TransactionLog, RoutingTable
_metaDataMap.put(messageId, messageMetaData);
}
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
- {
- checkNotClosed();
- return _metaDataMap.get(messageId);
- }
-
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
- {
- checkNotClosed();
- List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
- return bodyList.get(index);
- }
public boolean isPersistent()
{
return false;
}
- private void checkNotClosed() throws MessageStoreClosedException
+ protected void checkNotClosed() throws MessageStoreClosedException
{
if (_closed.get())
{
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
index 52b8b0ad19..e034143596 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
@@ -103,7 +103,7 @@ public class TxAckTest extends TestCase
private final List<Long> _unacked;
private StoreContext _storeContext = new StoreContext();
private AMQQueue _queue;
- private TransactionLog _transactionLog = new TestableMemoryMessageStore();
+ private TransactionLog _transactionLog = new TestableMemoryMessageStore().configure();
private static final int MESSAGE_SIZE=100;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 89dbc4f959..6ae2324e5f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactory;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -43,14 +44,13 @@ import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.ArrayList;
import java.util.LinkedList;
+import java.util.List;
/**
* Test class to test AMQQueueMBean attribtues and operations
@@ -70,7 +70,7 @@ public class AMQQueueMBeanTest extends TestCase
public void testMessageCountTransient() throws Exception
{
int messageCount = 10;
- sendMessages(messageCount, false);
+ List<AMQMessage> messages = sendMessages(messageCount, false);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
long queueDepth = (messageCount * MESSAGE_SIZE);
@@ -85,13 +85,13 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
//Ensure that the data has been removed from the Store
- verifyBrokerState();
+ verifyBrokerState(messages);
}
public void testMessageCountPersistent() throws Exception
{
int messageCount = 10;
- sendMessages(messageCount, true);
+ List<AMQMessage> messages = sendMessages(messageCount, true);
assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
long queueDepth = (messageCount * MESSAGE_SIZE);
@@ -106,20 +106,38 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
//Ensure that the data has been removed from the Store
- verifyBrokerState();
+ verifyBrokerState(messages);
}
// todo: collect to a general testing class -duplicated from Systest/MessageReturntest
- private void verifyBrokerState()
+ private void verifyBrokerState(List<AMQMessage> messages)
{
- TestableMemoryMessageStore store = new TestableMemoryMessageStore(_virtualHost.getTransactionLog());
+ TestableTransactionLog store = new TestableTransactionLog(_virtualHost.getTransactionLog());
- // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
- assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
- assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
- assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
- assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+ // We can only now check messageData and ConentBodyChunks by MessageID.
+ for (AMQMessage message : messages)
+ {
+ // Check we have no message metadata for the messages we sent
+ try
+ {
+ assertNull(store.getMessageMetaData(new StoreContext(), message.getMessageId()));
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ try
+ {
+ assertNull(store.getContentBodyChunk(new StoreContext(), message.getMessageId(),0));
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+
+ }
}
public void testConsumerCount() throws AMQException
@@ -297,8 +315,9 @@ public class AMQQueueMBeanTest extends TestCase
ApplicationRegistry.remove(1);
}
- private void sendMessages(int messageCount, boolean persistent) throws AMQException
+ private List<AMQMessage> sendMessages(int messageCount, boolean persistent) throws AMQException
{
+ List<AMQMessage> messages = new LinkedList<AMQMessage>();
for (int i = 0; i < messageCount; i++)
{
IncomingMessage currentMessage = message(false, persistent);
@@ -316,9 +335,10 @@ public class AMQQueueMBeanTest extends TestCase
.convertToContentChunk(
new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
MESSAGE_SIZE)));
- currentMessage.deliverToQueues();
+ messages.add(currentMessage.deliverToQueues());
}
+ return messages;
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
index 3280516b56..58073e52b6 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
@@ -30,6 +30,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
@@ -38,8 +39,6 @@ import org.apache.qpid.server.flow.Pre0_10CreditManager;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
@@ -47,6 +46,7 @@ import org.apache.qpid.server.util.NullApplicationRegistry;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Set;
+import java.util.List;
/**
* Tests that acknowledgements are handled correctly.
@@ -59,7 +59,7 @@ public class AckTest extends TestCase
private MockProtocolSession _protocolSession;
- private TestableMemoryMessageStore _messageStore;
+ private TestableTransactionLog _transactionLog;
private StoreContext _storeContext = new StoreContext();
@@ -75,9 +75,9 @@ public class AckTest extends TestCase
ApplicationRegistry.initialise(new NullApplicationRegistry(), 1);
VirtualHost vhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
- _messageStore = new TestableMemoryMessageStore(vhost.getTransactionLog());
- _protocolSession = new MockProtocolSession(_messageStore);
- _channel = new AMQChannel(_protocolSession,5, _messageStore /*dont need exchange registry*/);
+ _transactionLog = new TestableTransactionLog(vhost.getTransactionLog());
+ _protocolSession = new MockProtocolSession(_transactionLog);
+ _channel = new AMQChannel(_protocolSession,5, _transactionLog /*dont need exchange registry*/);
_protocolSession.addChannel(_channel);
@@ -95,13 +95,13 @@ public class AckTest extends TestCase
publishMessages(count, false);
}
- private void publishMessages(int count, boolean persistent) throws AMQException
+ private List<AMQMessage> publishMessages(int count, boolean persistent) throws AMQException
{
- TransactionalContext txnContext = new NonTransactionalContext(_messageStore, _storeContext, null,
+ TransactionalContext txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null,
new LinkedList<RequiredDeliveryException>()
);
_queue.registerSubscription(_subscription,false);
- MessageFactory factory = MessageFactory.getInstance();
+ List<AMQMessage> sentMessages = new LinkedList<AMQMessage>();
for (int i = 1; i <= count; i++)
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
@@ -109,7 +109,7 @@ public class AckTest extends TestCase
MessagePublishInfo publishBody = new MessagePublishInfoImpl(new AMQShortString("someExchange"), false,
false, new AMQShortString("rk"));
- IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _messageStore);
+ IncomingMessage msg = new IncomingMessage(publishBody, txnContext,_protocolSession, _transactionLog);
//IncomingMessage msg2 = null;
if (persistent)
{
@@ -130,14 +130,16 @@ public class AckTest extends TestCase
ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
qs.add(_queue);
msg.enqueue(qs);
- msg.routingComplete(_messageStore);
+ msg.routingComplete(_transactionLog);
if(msg.allContentReceived())
{
- msg.deliverToQueues();
+ sentMessages.add(msg.deliverToQueues());
}
// we manually send the message to the subscription
//_subscription.send(new QueueEntry(_queue,msg), _queue);
}
+
+ return sentMessages;
}
/**
@@ -148,11 +150,16 @@ public class AckTest extends TestCase
{
_subscription = SubscriptionFactoryImpl.INSTANCE.createSubscription(5, _protocolSession, DEFAULT_CONSUMER_TAG, true, null, false, new LimitlessCreditManager());
final int msgCount = 10;
- publishMessages(msgCount, true);
+ List<AMQMessage> sentMessages = publishMessages(msgCount, true);
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
+ for (AMQMessage message : sentMessages)
+ {
+ List<AMQQueue> enqueuedQueues = _transactionLog.getMessageReferenceMap(message.getMessageId());
+ assertNotNull("Expected message to be enqueued",enqueuedQueues);
+ assertEquals("Message is not enqueued on expected number of queues.",1, enqueuedQueues.size());
+ }
Set<Long> deliveryTagSet = map.getDeliveryTags();
int i = 1;
@@ -165,7 +172,6 @@ public class AckTest extends TestCase
}
assertTrue(map.size() == msgCount);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == msgCount);
}
/**
@@ -180,8 +186,8 @@ public class AckTest extends TestCase
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue(_messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize());
+// assertTrue(_messageStore.getContentBodyMap().size() == 0);to be
}
@@ -197,8 +203,8 @@ public class AckTest extends TestCase
UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap();
assertTrue(map.size() == 0);
- assertTrue("Size:" + _messageStore.getMessageMetaDataMap().size(), _messageStore.getMessageMetaDataMap().size() == 0);
- assertTrue(_messageStore.getContentBodyMap().size() == 0);
+ assertEquals("There was more MetaData objects than expected", 0, _transactionLog.getMessageMetaDataSize());
+// assertTrue(_messageStore.getContentBodyMap().size() == 0);
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
index d007913a4f..4b4c404229 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/PersistentMessageTest.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -31,10 +32,10 @@ import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.commons.configuration.PropertiesConfiguration;
import java.util.ArrayList;
import java.util.LinkedList;
@@ -42,7 +43,7 @@ import java.util.List;
public class PersistentMessageTest extends TransientMessageTest
{
- private TestableMemoryMessageStore _messageStore;
+ private TestableTransactionLog _transactionLog;
protected SimpleAMQQueue _queue;
protected AMQShortString _q1name = new AMQShortString("q1name");
@@ -54,22 +55,22 @@ public class PersistentMessageTest extends TransientMessageTest
public void setUp() throws Exception
{
- _messageStore = new TestableMemoryMessageStore();
+ _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
_storeContext = new StoreContext();
VirtualHost vhost = new VirtualHost(new VirtualHostConfiguration(PersistentMessageTest.class.getName(),
new PropertiesConfiguration()),
- _messageStore);
+ _transactionLog);
_queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, vhost, null);
// Create IncomingMessage and nondurable queue
- _messageDeliveryContext = new NonTransactionalContext(_messageStore, new StoreContext(), null, _returnMessages);
+ _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, _returnMessages);
}
@Override
protected AMQMessage newMessage()
{
- return MessageFactory.getInstance().createMessage(_messageStore, true);
+ return MessageFactory.getInstance().createMessage(_transactionLog, true);
}
@Override
@@ -82,7 +83,7 @@ public class PersistentMessageTest extends TransientMessageTest
/**
* Tests the returning of a single persistent message to a queue. An immediate message is sent to the queue and
* checked that it bounced. The transactionlog and returnMessasges are then checked to ensure they have the right
- * contents. TransactionLog = Empty, returnMessages 1 item.
+ * contents. TransactionLog = Empty, returnMessages 1 item.
*
* @throws Exception
*/
@@ -98,17 +99,16 @@ public class PersistentMessageTest extends TransientMessageTest
// equivalent to amqChannel.routeMessage()
msg.enqueue(qs);
- msg.routingComplete(_messageStore);
+ msg.routingComplete(_transactionLog);
// equivalent to amqChannel.deliverCurrentMessageIfComplete
msg.deliverToQueues();
// Check that data has been stored to disk
long messageId = msg.getMessageId();
- checkMessageMetaDataExists(messageId);
// Check that it was not enqueued
- List<AMQQueue> queueList = _messageStore.getMessageReferenceMap(messageId);
+ List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
assertTrue("TransactionLog contains a queue reference for this messageID:" + messageId, queueList == null || queueList.isEmpty());
checkMessageMetaDataRemoved(messageId);
@@ -118,7 +118,7 @@ public class PersistentMessageTest extends TransientMessageTest
protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
{
IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
- new MockProtocolSession(_messageStore), _messageStore);
+ new MockProtocolSession(_transactionLog), _transactionLog);
// equivalent to amqChannel.publishContenHeader
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
@@ -138,7 +138,8 @@ public class PersistentMessageTest extends TransientMessageTest
{
try
{
- _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ assertNotNull("Message MetaData does not exist for message:" + messageId,
+ _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
}
catch (AMQException amqe)
{
@@ -151,8 +152,8 @@ public class PersistentMessageTest extends TransientMessageTest
try
{
assertNull("Message MetaData still exists for message:" + messageId,
- _messageStore.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
- List ids = _messageStore.getMessageReferenceMap(messageId);
+ _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId));
+ List ids = _transactionLog.getMessageReferenceMap(messageId);
assertTrue("Message still has values in the reference map:" + messageId, ids == null || ids.isEmpty());
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index d5e873ebc0..4e7bad06ae 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -35,12 +35,13 @@ import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.TestTransactionLog;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import java.util.ArrayList;
import java.util.List;
@@ -50,7 +51,7 @@ public class SimpleAMQQueueTest extends TestCase
protected SimpleAMQQueue _queue;
protected VirtualHost _virtualHost;
- protected TestableMemoryMessageStore _transactionLog = new TestableMemoryMessageStore();
+ protected TestableTransactionLog _transactionLog;
protected AMQShortString _qname = new AMQShortString("qname");
protected AMQShortString _owner = new AMQShortString("owner");
protected AMQShortString _routingKey = new AMQShortString("routing key");
@@ -68,6 +69,7 @@ public class SimpleAMQQueueTest extends TestCase
//Create Application Registry for test
ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
+ _transactionLog = new TestableTransactionLog(new MemoryMessageStore().configure());
PropertiesConfiguration env = new PropertiesConfiguration();
_virtualHost = new VirtualHost(new VirtualHostConfiguration(getClass().getSimpleName(), env), _transactionLog);
applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
@@ -340,7 +342,9 @@ public class SimpleAMQQueueTest extends TestCase
// Check that it is enqueued
List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
- assertNotNull(data);
+ assertNotNull("Message has no enqueued information.", data);
+ assertTrue("Message is not enqueued on correct queue.", data.contains(_queue));
+ assertEquals("Message not enqueued on the right queues.", 1, data.size());
// Dequeue message
ContentHeaderBody header = new ContentHeaderBody();
@@ -355,7 +359,7 @@ public class SimpleAMQQueueTest extends TestCase
// Check that it is dequeued
data = _transactionLog.getMessageReferenceMap(messageId);
- assertTrue(data == null || data.isEmpty());
+ assertNull("Message still has enqueue data.", data);
}
public void testMessagesFlowToDisk() throws AMQException, InterruptedException
@@ -509,7 +513,9 @@ public class SimpleAMQQueueTest extends TestCase
//Check message was correctly enqueued
List<AMQQueue> data = _transactionLog.getMessageReferenceMap(messageId);
- assertNotNull(data);
+ assertNotNull("Message has no enqueued information.", data);
+ assertTrue("Message is not enqueued on correct queue.", data.contains(_queue));
+ assertEquals("Message not enqueued on the right queues.", 1, data.size());
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
index abcd9855d9..3a4746eb2c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
@@ -39,7 +39,7 @@ import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockProtocolSession;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.registry.ApplicationRegistry;
public class ACLManagerTest extends TestCase
@@ -66,7 +66,7 @@ public class ACLManagerTest extends TestCase
_pluginManager = new MockPluginManager("");
_authzManager = new ACLManager(_conf, _pluginManager);
- _session = new MockProtocolSession(new TestableMemoryMessageStore());
+ _session = new MockProtocolSession(new MemoryMessageStore().configure());
}
public void tearDown() throws Exception
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
index ff1fb8c97d..251f6d45f7 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/plugins/network/FirewallPluginTest.java
@@ -37,7 +37,7 @@ import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.protocol.TestIoSession;
import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.store.TestableMemoryMessageStore;
+
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
@@ -81,14 +81,12 @@ public class FirewallPluginTest extends TestCase
}
}
- private TestableMemoryMessageStore _store;
private VirtualHost _virtualHost;
private AMQMinaProtocolSession _session;
@Override
public void setUp() throws Exception
{
- _store = new TestableMemoryMessageStore();
PropertiesConfiguration env = new PropertiesConfiguration();
_virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
TestIoSession iosession = new TestIoSession();
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
index 5a4c435e59..4c03a57cc8 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
@@ -26,28 +26,24 @@ import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
-/**
- * Tests that reference counting works correctly with AMQMessage and the message store
- */
+/** Tests that reference counting works correctly with AMQMessage and the message store */
public class TestReferenceCounting extends TestCase
{
- private TestableMemoryMessageStore _store;
+ private TestableTransactionLog _store;
private StoreContext _storeContext = new StoreContext();
-
protected void setUp() throws Exception
{
super.setUp();
- _store = new TestableMemoryMessageStore();
+ _store = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
}
- /**
- * Check that when the reference count is decremented the message removes itself from the store
- */
+ /** Check that when the reference count is decremented the message removes itself from the store */
public void testMessageGetsRemoved() throws AMQException
{
ContentHeaderBody chb = createPersistentContentHeader();
@@ -57,14 +53,15 @@ public class TestReferenceCounting extends TestCase
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
- assertEquals(1, _store.getMessageMetaDataMap().size());
+ assertNotNull("Message Metadata did not exist for new message",
+ _store.getMessageMetaData(new StoreContext(), message.getMessageId()));
}
private ContentHeaderBody createPersistentContentHeader()
{
ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties bchp = new BasicContentHeaderProperties();
- bchp.setDeliveryMode((byte)2);
+ bchp.setDeliveryMode((byte) 2);
chb.properties = bchp;
return chb;
}
@@ -77,8 +74,9 @@ public class TestReferenceCounting extends TestCase
final ContentHeaderBody chb = createPersistentContentHeader();
AMQMessage message = (MessageFactory.getInstance()).createMessage(_store, true);
message.setPublishAndContentHeaderBody(_storeContext, info, chb);
-
- assertEquals(1, _store.getMessageMetaDataMap().size());
+
+ assertNotNull("Message Metadata did not exist for new message",
+ _store.getMessageMetaData(new StoreContext(), message.getMessageId()));
}
public static junit.framework.Test suite()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
index 38d139e94c..5d0fdfb727 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestTransactionLog.java
@@ -21,12 +21,23 @@
package org.apache.qpid.server.store;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.AMQException;
import java.util.Map;
import java.util.List;
public interface TestTransactionLog extends TransactionLog
{
+ public void setBaseTransactionLog(BaseTransactionLog base);
+
public List<AMQQueue> getMessageReferenceMap(Long messageID);
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException;
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException;
+ public long getMessageMetaDataSize();
+ public TransactionLog getDelegate();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
index fa5cdc1aa5..2099181a76 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -20,192 +20,80 @@
*/
package org.apache.qpid.server.store;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.transactionlog.BaseTransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
/** Adds some extra methods to the memory message store for testing purposes. */
-public class TestableMemoryMessageStore implements TestTransactionLog, TransactionLog, RoutingTable
+public class TestableMemoryMessageStore extends MemoryMessageStore implements TestTransactionLog
{
- private TransactionLog _transactionLog;
- private RoutingTable _routingTable;
- private MemoryMessageStore _mms;
+ private TestableTransactionLog _base;
- public TestableMemoryMessageStore(TransactionLog log)
+ public void setBaseTransactionLog(BaseTransactionLog base)
{
- _transactionLog = log;
- if (log instanceof BaseTransactionLog)
+ if (!(base instanceof TestableTransactionLog))
{
- TransactionLog delegate = ((BaseTransactionLog) log).getDelegate();
- if (delegate instanceof RoutingTable)
- {
- _routingTable = (RoutingTable) delegate;
- }
- else
- {
- throw new RuntimeException("Specified BaseTransactionLog does not delegate to a RoutingTable:" + log);
- }
-
- if (delegate instanceof MemoryMessageStore)
- {
- _mms = (MemoryMessageStore) delegate;
- }
-
- }
- else
- {
- throw new RuntimeException("Specified BaseTransactionLog is not testable:" + log);
+ throw new RuntimeException("base must be a TestableTransactionLog for correct operation in a TestMemoryMessageStore");
}
+ _base = (TestableTransactionLog) base;
}
- public TestableMemoryMessageStore(MemoryMessageStore mms)
- {
- _routingTable = mms;
- _transactionLog = mms.configure();
- }
-
- public TestableMemoryMessageStore()
- {
- _mms = new MemoryMessageStore();
- _transactionLog = _mms.configure();
- _routingTable = _mms;
- }
-
- public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
- {
- return ((MemoryMessageStore) _routingTable)._metaDataMap;
- }
-
- public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
- {
- return ((MemoryMessageStore) _routingTable)._contentBodyMap;
- }
-
- public List<AMQQueue> getMessageReferenceMap(Long messageId)
- {
-// return _mms._messageEnqueueMap.get(messageId);
-// ((BaseTransactionLog)_transactionLog).
- return new ArrayList<AMQQueue>();
- }
-
- public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ @Override
+ public TransactionLog configure()
{
- _transactionLog = (TransactionLog) _transactionLog.configure(virtualHost, base, config);
- return _transactionLog;
- }
+ BaseTransactionLog base = (BaseTransactionLog) super.configure();
- public void close() throws Exception
- {
- _transactionLog.close();
- _routingTable.close();
- }
+ _base = new TestableTransactionLog(base.getDelegate());
- public void createExchange(Exchange exchange) throws AMQException
- {
- _routingTable.createExchange(exchange);
+ return _base;
}
- public void removeExchange(Exchange exchange) throws AMQException
+ @Override
+ public TransactionLog configure(String base, VirtualHostConfiguration config)
{
- _routingTable.removeExchange(exchange);
- }
-
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
- {
- _routingTable.bindQueue(exchange, routingKey, queue, args);
- }
-
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
- {
- _routingTable.unbindQueue(exchange, routingKey, queue, args);
- }
-
- public void createQueue(AMQQueue queue) throws AMQException
- {
- _routingTable.createQueue(queue);
- }
-
- public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
- {
- _routingTable.createQueue(queue, arguments);
- }
-
- public void removeQueue(AMQQueue queue) throws AMQException
- {
- _routingTable.removeQueue(queue);
- }
-
- public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
- {
- _transactionLog.enqueueMessage(context, queues, messageId);
- }
-
- public void dequeueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
- {
- _transactionLog.dequeueMessage(context, queue, messageId);
- }
-
- public void removeMessage(StoreContext context, Long messageId) throws AMQException
- {
- _transactionLog.removeMessage(context, messageId);
- }
-
- public void beginTran(StoreContext context) throws AMQException
- {
- _transactionLog.beginTran(context);
- }
+ //Only initialise when called with current 'store' configs i.e. don't reinit when used as a 'RoutingTable'
+ if (base.equals("store"))
+ {
+ super.configure();
- public void commitTran(StoreContext context) throws AMQException
- {
- _transactionLog.commitTran(context);
- }
+ _base = new TestableTransactionLog(this);
- public void abortTran(StoreContext context) throws AMQException
- {
- _transactionLog.abortTran(context);
- }
+ return _base;
+ }
- public boolean inTran(StoreContext context)
- {
- return _transactionLog.inTran(context);
+ return super.configure();
}
- public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
+ public List<AMQQueue> getMessageReferenceMap(Long messageId)
{
- _transactionLog.storeContentBodyChunk(context, messageId, index, contentBody, lastContentBody);
+ return _base.getMessageReferenceMap(messageId);
}
- public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId)
{
- _transactionLog.storeMessageMetaData(context, messageId, messageMetaData);
+ return _metaDataMap.get(messageId);
}
- public boolean isPersistent()
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index)
{
- return _transactionLog.isPersistent();
+ List<ContentChunk> bodyList = _contentBodyMap.get(messageId);
+ return bodyList.get(index);
}
- public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ public long getMessageMetaDataSize()
{
- return _mms.getMessageMetaData(context, messageId);
+ return _metaDataMap.size();
}
- public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ public TransactionLog getDelegate()
{
- return _mms.getContentBodyChunk(context, messageId, index);
+ return _base;
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
index 0a2a1c2327..a0c38ff0ad 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/BaseTransactionLogTest.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockContentChunk;
import org.apache.qpid.server.queue.MockPersistentAMQMessage;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
@@ -47,14 +48,14 @@ public class BaseTransactionLogTest extends TestCase implements TransactionLog
final private Map<Long, ArrayList<ContentChunk>> _storeChunks = new HashMap<Long, ArrayList<ContentChunk>>();
final private Map<Long, MessageMetaData> _storeMetaData = new HashMap<Long, MessageMetaData>();
- TestableTransactionLog _transactionLog;
+ TestTransactionLog _transactionLog;
private ArrayList<AMQQueue> _queues;
private MockPersistentAMQMessage _message;
public void setUp() throws Exception
{
super.setUp();
- _transactionLog = new TestableTransactionLog(this);
+ _transactionLog = new TestableBaseTransactionLog(this);
}
public void testSingleEnqueueNoTransactional() throws AMQException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java
new file mode 100644
index 0000000000..92bc44da0b
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableBaseTransactionLog.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.transactionlog;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import java.util.List;
+
+public class TestableBaseTransactionLog extends BaseTransactionLog implements TestTransactionLog
+{
+
+ public TestableBaseTransactionLog()
+ {
+ super(null);
+ }
+
+ public TestableBaseTransactionLog(TransactionLog delegate)
+ {
+ super(delegate);
+ if (delegate instanceof BaseTransactionLog)
+ {
+ _delegate = ((BaseTransactionLog) delegate).getDelegate();
+ }
+
+ }
+
+ @Override
+ public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
+ {
+ if (_delegate != null)
+ {
+ TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config);
+
+ // Unwrap any BaseTransactionLog
+ if (configuredLog instanceof BaseTransactionLog)
+ {
+ _delegate = ((BaseTransactionLog) configuredLog).getDelegate();
+ }
+ }
+ else
+ {
+ String delegateClass = config.getStoreConfiguration().getString("delegate");
+ Class clazz = Class.forName(delegateClass);
+ Object o = clazz.newInstance();
+
+ if (!(o instanceof TransactionLog))
+ {
+ throw new ClassCastException("TransactionLog class must implement " + TransactionLog.class + ". Class " + clazz +
+ " does not.");
+ }
+ _delegate = (TransactionLog) o;
+
+ // If a TransactionLog uses the BaseTransactionLog then it will return this object.
+ _delegate.configure(virtualHost, base, config);
+ }
+ return this;
+ }
+
+ public void setBaseTransactionLog(BaseTransactionLog base)
+ {
+ throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs");
+ }
+
+ public List<AMQQueue> getMessageReferenceMap(Long messageID)
+ {
+ return _idToQueues.get(messageID);
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ {
+ if (_delegate instanceof TestTransactionLog)
+ {
+ return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ {
+ if (_delegate instanceof TestTransactionLog)
+ {
+ return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public long getMessageMetaDataSize()
+ {
+ if (_delegate instanceof TestTransactionLog)
+ {
+ return ((TestTransactionLog) _delegate).getMessageMetaDataSize();
+ }
+ else
+ {
+ return 0;
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
index b0c47052b2..38e17c3a07 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/transactionlog/TestableTransactionLog.java
@@ -20,52 +20,95 @@
*/
package org.apache.qpid.server.transactionlog;
-import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.LinkedList;
+import java.util.Map;
public class TestableTransactionLog extends BaseTransactionLog implements TestTransactionLog
{
-
- List<Long> _singleEnqueues = new LinkedList<Long>();
+ protected Map<Long, List<AMQQueue>> _singleEnqueuedIDstoQueue = new HashMap<Long, List<AMQQueue>>();
public TestableTransactionLog()
{
super(null);
}
- public TestableTransactionLog(BaseTransactionLog delegate)
+ public TestableTransactionLog(TransactionLog delegate)
{
- super(delegate.getDelegate());
+ super(delegate);
+ if (delegate instanceof BaseTransactionLog)
+ {
+ _delegate = ((BaseTransactionLog) delegate).getDelegate();
+ }
+
}
- public TestableTransactionLog(TransactionLog delegate)
+ /**
+ * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting
+ *
+ * @param context The transactional context for the operation.
+ * @param queues
+ * @param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException
+ *
+ * @throws AMQException
+ */
+ @Override
+ public void enqueueMessage(StoreContext context, ArrayList<AMQQueue> queues, Long messageId) throws AMQException
{
- super(delegate);
+ if (queues.size() == 1)
+ {
+ _singleEnqueuedIDstoQueue.put(messageId, queues);
+ }
+
+ super.enqueueMessage(context, queues, messageId);
}
+ /**
+ * Override the BaseTranasactionLog to record the single enqueues of a message so we can perform references counting
+ *
+ * @param context The transactional context for the operation.
+ * @param queue
+ * @param messageId The message to enqueue. @throws AMQException If the operation fails for any reason. @throws org.apache.qpid.AMQException
+ *
+ * @throws AMQException
+ */
+ @Override
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
+ {
+ if (_singleEnqueuedIDstoQueue.containsKey(messageId))
+ {
+ _singleEnqueuedIDstoQueue.remove(messageId);
+ }
+
+ super.dequeueMessage(context, queue, messageId);
+ }
@Override
public Object configure(VirtualHost virtualHost, String base, VirtualHostConfiguration config) throws Exception
{
if (_delegate != null)
{
- TransactionLog configuredLog = (TransactionLog)_delegate.configure(virtualHost, base, config);
+ TransactionLog configuredLog = (TransactionLog) _delegate.configure(virtualHost, base, config);
// Unwrap any BaseTransactionLog
if (configuredLog instanceof BaseTransactionLog)
{
- _delegate = ((BaseTransactionLog)configuredLog).getDelegate();
+ _delegate = ((BaseTransactionLog) configuredLog).getDelegate();
}
}
else
{
- String delegateClass = config.getStoreConfiguration().getString("delegate");
+ String delegateClass = config.getStoreConfiguration().getString("delegate");
Class clazz = Class.forName(delegateClass);
Object o = clazz.newInstance();
@@ -77,13 +120,61 @@ public class TestableTransactionLog extends BaseTransactionLog implements TestTr
_delegate = (TransactionLog) o;
// If a TransactionLog uses the BaseTransactionLog then it will return this object.
- _delegate.configure(virtualHost, base, config);
+ _delegate.configure(virtualHost, base, config);
}
return this;
}
+ public void setBaseTransactionLog(BaseTransactionLog base)
+ {
+ throw new RuntimeException("TestableTransactionLog is unable to swap BaseTransactionLogs");
+ }
+
public List<AMQQueue> getMessageReferenceMap(Long messageID)
{
- return _idToQueues.get(messageID);
+ List<AMQQueue> result = _idToQueues.get(messageID);
+
+ if (result == null)
+ {
+ result = _singleEnqueuedIDstoQueue.get(messageID);
+ }
+
+ return result;
+ }
+
+ public MessageMetaData getMessageMetaData(StoreContext context, Long messageId) throws AMQException
+ {
+ if (_delegate instanceof TestTransactionLog)
+ {
+ return ((TestTransactionLog) _delegate).getMessageMetaData(context, messageId);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public ContentChunk getContentBodyChunk(StoreContext context, Long messageId, int index) throws AMQException
+ {
+ if (_delegate instanceof TestTransactionLog)
+ {
+ return ((TestTransactionLog) _delegate).getContentBodyChunk(context, messageId, index);
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public long getMessageMetaDataSize()
+ {
+ if (_delegate instanceof TestTransactionLog)
+ {
+ return ((TestTransactionLog) _delegate).getMessageMetaDataSize();
+ }
+ else
+ {
+ return 0;
+ }
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index cdc7eabf04..dbd05b9598 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -34,8 +34,10 @@ import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.TestTransactionLog;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -62,7 +64,11 @@ public class InternalBrokerBaseCase extends TestCase
{
super.setUp();
PropertiesConfiguration configuration = new PropertiesConfiguration();
- configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableMemoryMessageStore.class.getName());
+ // This configuration is not used as TestApplicationRegistry just creates a single vhost 'test' with
+ // TransactionLog TestableTransactionLog(TestMemoryMessageStore)
+ configuration.setProperty("virtualhosts.virtualhost.test.store.class", TestableTransactionLog.class.getName());
+ configuration.setProperty("virtualhosts.virtualhost.test.store.delegate", TestableMemoryMessageStore.class.getName());
+
_registry = new TestApplicationRegistry(new ServerConfiguration(configuration));
ApplicationRegistry.initialise(_registry);
_virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
@@ -96,7 +102,7 @@ public class InternalBrokerBaseCase extends TestCase
protected void checkStoreContents(int messageCount)
{
- assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _transactionLog).getMessageMetaDataMap().size());
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestTransactionLog) _transactionLog).getMessageMetaDataSize());
//The above publish message is sufficiently small not to fit in the header so no Body is required.
//assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 22bd3b5aab..8c2508b8f4 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -21,7 +21,6 @@
package org.apache.qpid.server.util;
import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
@@ -38,9 +37,9 @@ import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.transactionlog.TestableTransactionLog;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Properties;
import java.util.Arrays;
@@ -83,7 +82,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
_managedObjectRegistry = new NoopManagedObjectRegistry();
- _transactionLog = new TestableMemoryMessageStore();
+ _transactionLog = new TestableTransactionLog(new TestableMemoryMessageStore().configure());
_virtualHostRegistry = new VirtualHostRegistry();