summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java128
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java73
-rw-r--r--java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java60
6 files changed, 222 insertions, 57 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index faa5d4a5c5..4a336ef71c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -581,7 +581,7 @@ public class AMQQueue implements Managable, Comparable
/** Removes the AMQMessage from the top of the queue. */
public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
- _deliveryMgr.removeAMessageFromTop(storeContext);
+ _deliveryMgr.removeAMessageFromTop(storeContext, this);
}
/** removes all the messages from the queue. */
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index ea077d659f..d9629a20b5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -333,7 +333,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
if (!acks)
{
- msg.decrementReference(channel.getStoreContext());
+ msg.decrementReference(channel.getStoreContext());
}
}
finally
@@ -407,11 +407,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
*
* @throws AMQException
*/
- public void removeAMessageFromTop(StoreContext storeContext) throws AMQException
+ public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException
{
_lock.lock();
AMQMessage message = _messages.poll();
+
+ message.dequeue(storeContext, queue);
+
+ message.decrementReference(storeContext);
+
if (message != null)
{
_totalMessageSize.addAndGet(-message.getSize());
@@ -434,6 +439,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_messages.poll();
_queue.dequeue(storeContext, msg);
+
+ msg.decrementReference(_reapingStoreContext);
+
msg = getNextMessage();
count++;
}
@@ -479,6 +487,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
message.dequeue(_reapingStoreContext, _queue);
+ message.decrementReference(_reapingStoreContext);
+
if (_log.isInfoEnabled())
{
_log.info(debugIdentity() + " Doing clean up of the main _message queue.");
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
index 10ba48552c..153106d919 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
@@ -72,7 +72,7 @@ interface DeliveryManager
*/
void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException;
- void removeAMessageFromTop(StoreContext storeContext) throws AMQException;
+ void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException;
long clearAllMessages(StoreContext storeContext) throws AMQException;
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
index 4c02f266ed..3caf6ad73d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
@@ -24,10 +24,13 @@ import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.protocol.TestMinaProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -36,6 +39,8 @@ import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.mina.common.ByteBuffer;
import javax.management.JMException;
import java.util.LinkedList;
@@ -49,18 +54,16 @@ public class AMQQueueMBeanTest extends TestCase
private static long MESSAGE_SIZE = 1000;
private AMQQueue _queue;
private AMQQueueMBean _queueMBean;
- private MessageStore _messageStore = new MemoryMessageStore();
+ private MessageStore _messageStore;
private StoreContext _storeContext = new StoreContext();
- private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
- null,
- new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ private TransactionalContext _transactionalContext;
private VirtualHost _virtualHost;
+ private AMQProtocolSession _protocolSession;
- public void testMessageCount() throws Exception
+ public void testMessageCountTransient() throws Exception
{
int messageCount = 10;
- sendMessages(messageCount);
+ sendMessages(messageCount, false);
assertTrue(_queueMBean.getMessageCount() == messageCount);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
@@ -73,6 +76,43 @@ public class AMQQueueMBeanTest extends TestCase
_queueMBean.clearQueue();
assertTrue(_queueMBean.getMessageCount() == 0);
assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ public void testMessageCountPersistent() throws Exception
+ {
+ int messageCount = 10;
+ sendMessages(messageCount, true);
+ assertEquals("", messageCount, _queueMBean.getMessageCount().intValue());
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+ long queueDepth = (messageCount * MESSAGE_SIZE) >> 10;
+ assertTrue(_queueMBean.getQueueDepth() == queueDepth);
+
+ _queueMBean.deleteMessageFromTop();
+ assertTrue(_queueMBean.getMessageCount() == (messageCount - 1));
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ _queueMBean.clearQueue();
+ assertTrue(_queueMBean.getMessageCount() == 0);
+ assertTrue(_queueMBean.getReceivedMessageCount() == messageCount);
+
+ //Ensure that the data has been removed from the Store
+ verifyBrokerState();
+ }
+
+ // todo: collect to a general testing class -duplicated from Systest/MessageReturntest
+ private void verifyBrokerState()
+ {
+
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore());
+
+ // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up.
+ assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap());
+ assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+ assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
+ assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
}
public void testConsumerCount() throws AMQException
@@ -86,26 +126,26 @@ public class AMQQueueMBeanTest extends TestCase
AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null);
protocolSession.addChannel(channel);
- _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false);
+ _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false);
assertTrue(_queueMBean.getActiveConsumerCount() == 1);
SubscriptionSet _subscribers = (SubscriptionSet) mgr;
SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory();
- Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
- protocolSession,
- new AMQShortString("S1"),
- false,
- null,
- true,
- _queue);
-
- Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
- protocolSession,
- new AMQShortString("S2"),
- false,
- null,
- true,
- _queue);
+ Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S1"),
+ false,
+ null,
+ true,
+ _queue);
+
+ Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(),
+ protocolSession,
+ new AMQShortString("S2"),
+ false,
+ null,
+ true,
+ _queue);
_subscribers.addSubscriber(s1);
_subscribers.addSubscriber(s2);
assertTrue(_queueMBean.getActiveConsumerCount() == 3);
@@ -165,7 +205,7 @@ public class AMQQueueMBeanTest extends TestCase
}
- AMQMessage msg = message(false);
+ AMQMessage msg = message(false, false);
long id = msg.getMessageId();
_queue.clearQueue(_storeContext);
@@ -184,7 +224,7 @@ public class AMQQueueMBeanTest extends TestCase
}
}
- private AMQMessage message(final boolean immediate) throws AMQException
+ private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException
{
MessagePublishInfo publish = new MessagePublishInfo()
{
@@ -209,9 +249,11 @@ public class AMQQueueMBeanTest extends TestCase
return null;
}
};
-
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1));
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
}
@@ -221,22 +263,38 @@ public class AMQQueueMBeanTest extends TestCase
super.setUp();
IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance();
_virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test");
+ _messageStore = _virtualHost.getMessageStore();
+
+ _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext,
+ null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+
_queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost);
_queueMBean = new AMQQueueMBean(_queue);
+
+ _protocolSession = new TestMinaProtocolSession();
}
- private void sendMessages(int messageCount) throws AMQException
+ private void sendMessages(int messageCount, boolean persistent) throws AMQException
{
- AMQMessage[] messages = new AMQMessage[messageCount];
- for (int i = 0; i < messages.length; i++)
- {
- messages[i] = message(false);
- messages[i].enqueue(_queue);
- messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
- }
for (int i = 0; i < messageCount; i++)
{
- _queue.process(_storeContext, messages[i], false);
+ AMQMessage currentMessage = message(false, persistent);
+ currentMessage.enqueue(_queue);
+
+ // route header
+ currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory());
+
+ // Add the body so we have somthing to test later
+ currentMessage.addContentBodyFrame(_storeContext,
+ _protocolSession.getRegistry()
+ .getProtocolVersionMethodConverter()
+ .convertToContentChunk(
+ new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE),
+ MESSAGE_SIZE)));
+
+
}
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
new file mode 100644
index 0000000000..48d808142c
--- /dev/null
+++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.List;
+
+/**
+ * Adds some extra methods to the memory message store for testing purposes.
+ */
+public class TestableMemoryMessageStore extends MemoryMessageStore
+{
+
+ MemoryMessageStore _mms = null;
+
+ public TestableMemoryMessageStore(MemoryMessageStore mms)
+ {
+ _mms = mms;
+ }
+
+ public TestableMemoryMessageStore()
+ {
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
+ }
+
+ public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
+ {
+ if (_mms != null)
+ {
+ return _mms._metaDataMap;
+ }
+ else
+ {
+ return _metaDataMap;
+ }
+ }
+
+ public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
+ {
+ if (_mms != null)
+ {
+ return _mms._contentBodyMap;
+ }
+ else
+ {
+ return _contentBodyMap;
+ }
+ }
+}
diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
index 76ea2a63d4..45bf73bd9d 100644
--- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
+++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java
@@ -78,6 +78,8 @@ public class MessageReturnTest extends TestCase implements ExceptionListener
private CountDownLatch _returns = new CountDownLatch(1);
private int _receivedCount = 0;
+ private int _initialContentBodyMapSize;
+ private int _initilaMessageMetaDataMapSize;
protected void setUp() throws Exception
{
@@ -94,13 +96,19 @@ public class MessageReturnTest extends TestCase implements ExceptionListener
env.put("queue.badQueue", QUEUE);
_context = factory.getInitialContext(env);
+
+ getBrokerInitialState();
}
protected void tearDown() throws Exception
{
- _producerConnection.close();
super.tearDown();
+ if (_producerConnection != null)
+ {
+ _producerConnection.close();
+ }
+
if (BROKER.startsWith("vm://"))
{
TransportConnection.killAllVMBrokers();
@@ -130,7 +138,7 @@ public class MessageReturnTest extends TestCase implements ExceptionListener
_producerConnection.close();
//Verify we get all the messages.
- verifyAllMessagesRecevied();
+ verifyAllMessagesRecevied();
//Verify Broker state
verifyBrokerState();
}
@@ -153,6 +161,34 @@ public class MessageReturnTest extends TestCase implements ExceptionListener
_producer = _producerSession.createProducer((Queue) _context.lookup("badQueue"));
}
+ // todo: collect to a general testing class - duplicated in AMQQueueMBeanTest
+ private void getBrokerInitialState()
+ {
+ IApplicationRegistry registry = ApplicationRegistry.getInstance();
+
+ VirtualHost testVhost = registry.getVirtualHostRegistry().getVirtualHost(VHOST);
+
+ assertNotNull("Unable to get test Vhost", testVhost.getMessageStore());
+
+ TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) testVhost.getMessageStore());
+
+ _initialContentBodyMapSize = store.getContentBodyMap() == null ? 0 : store.getContentBodyMap().size();
+ _initilaMessageMetaDataMapSize = store.getMessageMetaDataMap() == null ? 0 : store.getMessageMetaDataMap().size();
+
+ if (_initialContentBodyMapSize != 0)
+ {
+ _logger.warn("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize);
+ System.out.println("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize);
+ }
+
+ if (_initilaMessageMetaDataMapSize != 0)
+ {
+ _logger.warn("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize);
+ System.out.println("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize);
+ }
+
+ }
+
private void verifyBrokerState()
{
IApplicationRegistry registry = ApplicationRegistry.getInstance();
@@ -169,7 +205,7 @@ public class MessageReturnTest extends TestCase implements ExceptionListener
// If the CBM has content it may be due to the broker not yet purging.
// Closing the producer connection before testing should give the store time to clean up.
// Perform a quick sleep just in case
- if (store.getContentBodyMap().size() != 0)
+ while (store.getContentBodyMap().size() > _initialContentBodyMapSize)
{
try
{
@@ -179,21 +215,9 @@ public class MessageReturnTest extends TestCase implements ExceptionListener
{
}
}
- assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size());
+ assertTrue("Expected the store content size not reached at test start it was :" + _initialContentBodyMapSize + " Now it is :" + store.getContentBodyMap().size(), _initialContentBodyMapSize >= store.getContentBodyMap().size());
assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap());
-
- if (store.getMessageMetaDataMap().size() != 0)
- {
- try
- {
- Thread.sleep(500);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size());
+ assertTrue("Expected the store MessageMetaData size not reached at test start it was :" + _initilaMessageMetaDataMapSize + " Now it is :" + store.getMessageMetaDataMap().size(), _initialContentBodyMapSize >= store.getMessageMetaDataMap().size());
}
private void verifyAllMessagesRecevied()
@@ -221,7 +245,7 @@ public class MessageReturnTest extends TestCase implements ExceptionListener
/**
* We can't verify messageOrder here as the return threads are not synchronized so we have no way of
- * guarranting the order.
+ * guarranting the order.
*/
private void verifyMessageOrder()
{