summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2008-06-13 14:56:45 +0000
committerMartin Ritchie <ritchiem@apache.org>2008-06-13 14:56:45 +0000
commita5b69f36dcf2b48f03f87b0440ae12a450b4f36c (patch)
tree21cc31f89a114297378bfd4e7ed535ecb7ddccf9
parent3632b33a92d6c7a52f1a176afc1424c514a33a72 (diff)
downloadqpid-python-a5b69f36dcf2b48f03f87b0440ae12a450b4f36c.tar.gz
QPID-1136 : Provided a fix for the leak in UnacknowledgedMessage when acking. Added a new InternalBrokerBaseCase for performing testing on the broker without using the client libraries. This allows for testing closer to AMQP.
Further investigation is required to identify why the .Net was causing the refcounting problems that required the previous change to Unacknowledged message introducing this . git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.x@667561 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java3
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java7
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java21
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java120
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java162
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java52
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java4
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java161
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java (renamed from java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java)40
10 files changed, 485 insertions, 91 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
index 5b0f3cf5eb..c1ffab531d 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.txn.TransactionalContext;
+import org.apache.qpid.server.store.StoreContext;
public interface UnacknowledgedMessageMap
{
@@ -55,7 +56,7 @@ public interface UnacknowledgedMessageMap
UnacknowledgedMessage remove(long deliveryTag);
- void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException;
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException;
Collection<UnacknowledgedMessage> cancelAllMessages();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
index 5204f13e81..0f3c690cc6 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.ack;
+import org.apache.qpid.server.store.StoreContext;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -160,7 +161,7 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
}
}
- public void drainTo(Collection<UnacknowledgedMessage> destination, long deliveryTag) throws AMQException
+ public void drainTo(long deliveryTag, StoreContext storeContext) throws AMQException
{
synchronized (_lock)
{
@@ -176,10 +177,12 @@ public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
" When deliveryTag is:" + deliveryTag + "ES:" + _map.entrySet().toString());
}
+ //Message has been ack so discard it. This will dequeue and decrement the reference.
+ unacked.getValue().discard(storeContext);
+
it.remove();
_unackedSize -= unacked.getValue().getMessage().getSize();
- destination.add(unacked.getValue());
if (unacked.getKey() == deliveryTag)
{
break;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index cac3489f4c..af877f1776 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -159,28 +159,13 @@ public class NonTransactionalContext implements TransactionalContext
throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
}
- LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
- unacknowledgedMessageMap.drainTo(acked, deliveryTag);
- for (UnacknowledgedMessage msg : acked)
- {
- if (debug)
- {
- _log.debug("Discarding message: " + msg.getMessage().getMessageId());
- }
- if(msg.getMessage().isPersistent())
- {
- beginTranIfNecessary();
- }
-
- //Message has been ack so discard it. This will dequeue and decrement the reference.
- msg.discard(_storeContext);
- }
+ unacknowledgedMessageMap.drainTo(deliveryTag, _storeContext);
}
}
else
{
UnacknowledgedMessage msg;
- msg = unacknowledgedMessageMap.remove(deliveryTag);
+ msg = unacknowledgedMessageMap.get(deliveryTag);
if (msg == null)
{
@@ -202,6 +187,8 @@ public class NonTransactionalContext implements TransactionalContext
//Message has been ack so discard it. This will dequeue and decrement the reference.
msg.discard(_storeContext);
+ unacknowledgedMessageMap.remove(deliveryTag);
+
if (debug)
{
_log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
new file mode 100644
index 0000000000..9ef4af2932
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/ack/AcknowledgeTest.java
@@ -0,0 +1,120 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.util.InternalBrokerBaseCase;
+
+import java.util.List;
+
+public class AcknowledgeTest extends InternalBrokerBaseCase
+{
+
+ public void testTransactionalSingleAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testTransactionalMultiAck() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testTransactionalAckAll() throws AMQException
+ {
+ _channel.setLocalTransactional();
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ public void testNonTransactionalSingleAck() throws AMQException
+ {
+ runMessageAck(1, 1, 1, false, 0);
+ }
+
+ public void testNonTransactionalMultiAck() throws AMQException
+ {
+ runMessageAck(10, 1, 5, true, 5);
+ }
+
+ public void testNonTransactionalAckAll() throws AMQException
+ {
+ runMessageAck(10, 1, 0, true, 0);
+ }
+
+ protected void runMessageAck(int sendMessageCount, long firstDeliveryTag, long acknowledgeDeliveryTag, boolean acknowldegeMultiple, int remainingUnackedMessages) throws AMQException
+ {
+ //Check store is empty
+ checkStoreContents(0);
+
+ //Send required messsages to the queue
+ publishMessages(_session, _channel, sendMessageCount);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ //Ensure they are stored
+ checkStoreContents(sendMessageCount);
+
+ //Check that there are no unacked messages
+ assertEquals("Channel should have no unacked msgs ", 0, _channel.getUnacknowledgedMessageMap().size());
+
+ //Subscribe to the queue
+ AMQShortString subscriber = subscribe(_session, _channel, _queue);
+
+ _queue.deliverAsync();
+
+ //Wait for the messages to be delivered
+ _session.awaitDelivery(sendMessageCount);
+
+ //Check that they are all waiting to be acknoledged
+ assertEquals("Channel should have unacked msgs", sendMessageCount, _channel.getUnacknowledgedMessageMap().size());
+
+ List<InternalTestProtocolSession.DeliveryPair> messages = _session.getDelivers(_channel.getChannelId(), subscriber, sendMessageCount);
+
+ //Double check we received the right number of messages
+ assertEquals(sendMessageCount, messages.size());
+
+ //Check that the first message has the expected deliveryTag
+ assertEquals("First message does not have expected deliveryTag", firstDeliveryTag, messages.get(0).getDeliveryTag());
+
+ //Send required Acknowledgement
+ _channel.acknowledgeMessage(acknowledgeDeliveryTag, acknowldegeMultiple);
+
+ if (_channel.isTransactional())
+ {
+ _channel.commit();
+ }
+
+ // Check Remaining Acknowledgements
+ assertEquals("Channel unacked msgs count incorrect", remainingUnackedMessages, _channel.getUnacknowledgedMessageMap().size());
+
+ //Check store contents are also correct.
+ checkStoreContents(remainingUnackedMessages);
+ }
+
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
new file mode 100644
index 0000000000..03f342b5dc
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class InternalTestProtocolSession extends AMQMinaProtocolSession implements ProtocolOutputConverter
+{
+ // ChannelID(LIST) -> LinkedList<Pair>
+ final Map<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>> _channelDelivers;
+ private AtomicInteger _deliveryCount = new AtomicInteger(0);
+
+ public InternalTestProtocolSession() throws AMQException
+ {
+ super(new TestIoSession(),
+ ApplicationRegistry.getInstance().getVirtualHostRegistry(),
+ new AMQCodecFactory(true));
+
+ _channelDelivers = new HashMap<Integer, Map<AMQShortString, LinkedList<DeliveryPair>>>();
+
+ }
+
+ public ProtocolOutputConverter getProtocolOutputConverter()
+ {
+ return this;
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return (byte) 8;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return (byte) 0;
+ }
+
+ // ***
+
+ public List<DeliveryPair> getDelivers(int channelId, AMQShortString consumerTag, int count)
+ {
+ synchronized (_channelDelivers)
+ {
+ List<DeliveryPair> msgs = _channelDelivers.get(channelId).get(consumerTag).subList(0, count);
+
+ List<DeliveryPair> response = new ArrayList<DeliveryPair>(msgs);
+
+ //Remove the msgs from the receivedList.
+ msgs.clear();
+
+ return response;
+ }
+ }
+
+ // *** ProtocolOutputConverter Implementation
+ public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+ {
+ }
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+ }
+
+ public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException
+ {
+ _deliveryCount.incrementAndGet();
+
+ synchronized (_channelDelivers)
+ {
+ Map<AMQShortString, LinkedList<DeliveryPair>> consumers = _channelDelivers.get(channelId);
+
+ if (consumers == null)
+ {
+ consumers = new HashMap<AMQShortString, LinkedList<DeliveryPair>>();
+ _channelDelivers.put(channelId, consumers);
+ }
+
+ LinkedList<DeliveryPair> consumerDelivers = consumers.get(consumerTag);
+
+ if (consumerDelivers == null)
+ {
+ consumerDelivers = new LinkedList<DeliveryPair>();
+ consumers.put(consumerTag, consumerDelivers);
+ }
+
+ consumerDelivers.add(new DeliveryPair(deliveryTag, message));
+ }
+ }
+
+ public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ }
+
+ public void awaitDelivery(int msgs)
+ {
+ int start = _deliveryCount.get();
+
+ while ((start + msgs) > _deliveryCount.get())
+ {
+ try
+ {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public class DeliveryPair
+ {
+ private long _deliveryTag;
+ private AMQMessage _message;
+
+ public DeliveryPair(long deliveryTag, AMQMessage message)
+ {
+ _deliveryTag = deliveryTag;
+ _message = message;
+ }
+
+ public AMQMessage getMessage()
+ {
+ return _message;
+ }
+
+ public long getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+ }
+}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java b/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
deleted file mode 100644
index 0c0d8f471e..0000000000
--- a/java/broker/src/test/java/org/apache/qpid/server/protocol/TestMinaProtocolSession.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
-
-public class TestMinaProtocolSession extends AMQMinaProtocolSession
-{
- public TestMinaProtocolSession() throws AMQException
- {
- super(new TestIoSession(),
- ApplicationRegistry.getInstance().getVirtualHostRegistry(),
- new AMQCodecFactory(true));
- }
-
- public ProtocolOutputConverter getProtocolOutputConverter()
- {
- return ProtocolOutputConverterRegistry.getConverter(this);
- }
-
- public byte getProtocolMajorVersion()
- {
- return (byte)8;
- }
-
- public byte getProtocolMinorVersion()
- {
- return (byte)0;
- }
-}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
index 65db2a6425..4f95643b5c 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
@@ -32,7 +32,7 @@ import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
@@ -171,7 +171,7 @@ public class AMQQueueAlertTest extends TestCase
*/
public void testQueueDepthAlertWithSubscribers() throws Exception
{
- protocolSession = new TestMinaProtocolSession();
+ protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore);
protocolSession.addChannel(channel);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 9b874d63e8..409df7eeeb 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -29,7 +29,7 @@ import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -121,7 +121,7 @@ public class AMQQueueMBeanTest extends TestCase
assertTrue(_queueMBean.getActiveConsumerCount() == 0);
- TestMinaProtocolSession protocolSession = new TestMinaProtocolSession();
+ InternalTestProtocolSession protocolSession = new InternalTestProtocolSession();
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore);
protocolSession.addChannel(channel);
@@ -277,7 +277,7 @@ public class AMQQueueMBeanTest extends TestCase
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
- _protocolSession = new TestMinaProtocolSession();
+ _protocolSession = new InternalTestProtocolSession();
}
private void sendMessages(int messageCount, boolean persistent) throws AMQException
diff --git a/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
new file mode 100644
index 0000000000..7a4d562394
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import junit.framework.TestCase;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.InternalTestProtocolSession;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+public class InternalBrokerBaseCase extends TestCase
+{
+ protected IApplicationRegistry _registry;
+ protected MessageStore _messageStore;
+ protected AMQChannel _channel;
+ protected InternalTestProtocolSession _session;
+ protected VirtualHost _virtualHost;
+ protected StoreContext _storeContext = new StoreContext();
+ protected AMQQueue _queue;
+ protected AMQShortString QUEUE_NAME;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _registry = new TestApplicationRegistry();
+ ApplicationRegistry.initialise(_registry);
+ _virtualHost = _registry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ QUEUE_NAME = new AMQShortString("test");
+ _queue = new AMQQueue(QUEUE_NAME, false, new AMQShortString("testowner"), false, _virtualHost);
+
+ _virtualHost.getQueueRegistry().registerQueue(_queue);
+
+ Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
+
+ _queue.bind(QUEUE_NAME, null, defaultExchange);
+
+ _session = new InternalTestProtocolSession();
+
+ _channel = new AMQChannel(_session, 1, _messageStore);
+
+ _session.addChannel(_channel);
+ }
+
+ public void tearDown() throws Exception
+ {
+ ApplicationRegistry.removeAll();
+ super.tearDown();
+ }
+
+ protected void checkStoreContents(int messageCount)
+ {
+ assertEquals("Message header count incorrect in the MetaDataMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getMessageMetaDataMap().size());
+
+ //The above publish message is sufficiently small not to fit in the header so no Body is required.
+ //assertEquals("Message body count incorrect in the ContentBodyMap", messageCount, ((TestableMemoryMessageStore) _messageStore).getContentBodyMap().size());
+ }
+
+ protected AMQShortString subscribe(InternalTestProtocolSession session, AMQChannel channel, AMQQueue queue)
+ {
+ try
+ {
+ return channel.subscribeToQueue(null, queue, session, true, null, false, true);
+ }
+ catch (AMQException e)
+ {
+ fail(e.getMessage());
+ }
+ catch (ConsumerTagNotUniqueException e)
+ {
+ fail(e.getMessage());
+ }
+ //Keep the compiler happy
+ return null;
+ }
+
+ public void publishMessages(InternalTestProtocolSession session, AMQChannel channel, int messages) throws AMQException
+ {
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return ExchangeDefaults.DEFAULT_EXCHANGE_NAME;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("test");
+ }
+ };
+
+ for (int count = 0; count < messages; count++)
+ {
+ channel.setPublishFrame(info, session, _virtualHost.getExchangeRegistry().getExchange(info.getExchange()));
+
+ //Set the body size
+ ContentHeaderBody _headerBody = new ContentHeaderBody();
+ _headerBody.bodySize = 0;
+
+ //Set Minimum properties
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+
+ properties.setExpiration(0L);
+ properties.setTimestamp(System.currentTimeMillis());
+ //Make Message Persistent
+ properties.setDeliveryMode((byte) 2);
+
+ _headerBody.properties = properties;
+
+ channel.publishContentHeader(_headerBody, session);
+ }
+
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
index 83b4665be6..97d3b3694f 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/util/TestApplicationRegistry.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
@@ -20,28 +20,28 @@
*/
package org.apache.qpid.server.util;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.management.ManagedObjectRegistry;
+import org.apache.qpid.server.management.NoopManagedObjectRegistry;
import org.apache.qpid.server.plugins.PluginManager;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
-import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
import org.apache.qpid.server.security.access.ACLPlugin;
import org.apache.qpid.server.security.access.plugins.AllowAll;
+import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.MapConfiguration;
-import java.util.HashMap;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Properties;
public class TestApplicationRegistry extends ApplicationRegistry
@@ -63,6 +63,8 @@ public class TestApplicationRegistry extends ApplicationRegistry
private MessageStore _messageStore;
private VirtualHost _vHost;
+ private VirtualHostRegistry _virtualHostRegistry;
+
public TestApplicationRegistry()
{
super(new MapConfiguration(new HashMap()));
@@ -80,15 +82,25 @@ public class TestApplicationRegistry extends ApplicationRegistry
_authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
- IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
- _managedObjectRegistry = appRegistry.getManagedObjectRegistry();
- _vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _managedObjectRegistry = new NoopManagedObjectRegistry();
+
+ // We can't call getInstance here as we may be in the process of initialising ourselves!!!!
+ //ApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ //_managedObjectRegistry = appRegistry.getManagedObjectRegistry();
+ //_vHost = appRegistry.getVirtualHostRegistry().getVirtualHost("test");
+
+ _messageStore = new TestableMemoryMessageStore();
+
+ _virtualHostRegistry = new VirtualHostRegistry();
+
+ _vHost = new VirtualHost("test", _messageStore);
+
+ _virtualHostRegistry.registerVirtualHost(_vHost);
+
_queueRegistry = _vHost.getQueueRegistry();
_exchangeFactory = _vHost.getExchangeFactory();
_exchangeRegistry = _vHost.getExchangeRegistry();
- _messageStore = new TestableMemoryMessageStore();
-
_configuration.addProperty("heartbeat.delay", 10 * 60); // 10 minutes
}
@@ -134,7 +146,7 @@ public class TestApplicationRegistry extends ApplicationRegistry
public VirtualHostRegistry getVirtualHostRegistry()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _virtualHostRegistry;
}
public ACLPlugin getAccessManager()