diff options
6 files changed, 222 insertions, 57 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index faa5d4a5c5..4a336ef71c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -581,7 +581,7 @@ public class AMQQueue implements Managable, Comparable /** Removes the AMQMessage from the top of the queue. */ public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException { - _deliveryMgr.removeAMessageFromTop(storeContext); + _deliveryMgr.removeAMessageFromTop(storeContext, this); } /** removes all the messages from the queue. */ diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index ea077d659f..d9629a20b5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -333,7 +333,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (!acks) { - msg.decrementReference(channel.getStoreContext()); + msg.decrementReference(channel.getStoreContext()); } } finally @@ -407,11 +407,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager * * @throws AMQException */ - public void removeAMessageFromTop(StoreContext storeContext) throws AMQException + public void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException { _lock.lock(); AMQMessage message = _messages.poll(); + + message.dequeue(storeContext, queue); + + message.decrementReference(storeContext); + if (message != null) { _totalMessageSize.addAndGet(-message.getSize()); @@ -434,6 +439,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _messages.poll(); _queue.dequeue(storeContext, msg); + + msg.decrementReference(_reapingStoreContext); + msg = getNextMessage(); count++; } @@ -479,6 +487,8 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // Use the reapingStoreContext as any sub(if we have one) may be in a tx. message.dequeue(_reapingStoreContext, _queue); + message.decrementReference(_reapingStoreContext); + if (_log.isInfoEnabled()) { _log.info(debugIdentity() + " Doing clean up of the main _message queue."); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index 10ba48552c..153106d919 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -72,7 +72,7 @@ interface DeliveryManager */ void deliver(StoreContext storeContext, AMQShortString name, AMQMessage msg, boolean deliverFirst) throws FailedDequeueException, AMQException; - void removeAMessageFromTop(StoreContext storeContext) throws AMQException; + void removeAMessageFromTop(StoreContext storeContext, AMQQueue queue) throws AMQException; long clearAllMessages(StoreContext storeContext) throws AMQException; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index 4c02f266ed..3caf6ad73d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -24,10 +24,13 @@ import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.RequiredDeliveryException; import org.apache.qpid.server.protocol.TestMinaProtocolSession; +import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.registry.IApplicationRegistry; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -36,6 +39,8 @@ import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.store.MemoryMessageStore; +import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.mina.common.ByteBuffer; import javax.management.JMException; import java.util.LinkedList; @@ -49,18 +54,16 @@ public class AMQQueueMBeanTest extends TestCase private static long MESSAGE_SIZE = 1000; private AMQQueue _queue; private AMQQueueMBean _queueMBean; - private MessageStore _messageStore = new MemoryMessageStore(); + private MessageStore _messageStore; private StoreContext _storeContext = new StoreContext(); - private TransactionalContext _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, - null, - new LinkedList<RequiredDeliveryException>(), - new HashSet<Long>()); + private TransactionalContext _transactionalContext; private VirtualHost _virtualHost; + private AMQProtocolSession _protocolSession; - public void testMessageCount() throws Exception + public void testMessageCountTransient() throws Exception { int messageCount = 10; - sendMessages(messageCount); + sendMessages(messageCount, false); assertTrue(_queueMBean.getMessageCount() == messageCount); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; @@ -73,6 +76,43 @@ public class AMQQueueMBeanTest extends TestCase _queueMBean.clearQueue(); assertTrue(_queueMBean.getMessageCount() == 0); assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + //Ensure that the data has been removed from the Store + verifyBrokerState(); + } + + public void testMessageCountPersistent() throws Exception + { + int messageCount = 10; + sendMessages(messageCount, true); + assertEquals("", messageCount, _queueMBean.getMessageCount().intValue()); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + long queueDepth = (messageCount * MESSAGE_SIZE) >> 10; + assertTrue(_queueMBean.getQueueDepth() == queueDepth); + + _queueMBean.deleteMessageFromTop(); + assertTrue(_queueMBean.getMessageCount() == (messageCount - 1)); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + _queueMBean.clearQueue(); + assertTrue(_queueMBean.getMessageCount() == 0); + assertTrue(_queueMBean.getReceivedMessageCount() == messageCount); + + //Ensure that the data has been removed from the Store + verifyBrokerState(); + } + + // todo: collect to a general testing class -duplicated from Systest/MessageReturntest + private void verifyBrokerState() + { + + TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) _virtualHost.getMessageStore()); + + // Unlike MessageReturnTest there is no need for a delay as there this thread does the clean up. + assertNotNull("ContentBodyMap should not be null", store.getContentBodyMap()); + assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size()); + assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); + assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size()); } public void testConsumerCount() throws AMQException @@ -86,26 +126,26 @@ public class AMQQueueMBeanTest extends TestCase AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); protocolSession.addChannel(channel); - _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null,false,false); + _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); assertTrue(_queueMBean.getActiveConsumerCount() == 1); SubscriptionSet _subscribers = (SubscriptionSet) mgr; SubscriptionFactory subscriptionFactory = new SubscriptionImpl.Factory(); - Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), - protocolSession, - new AMQShortString("S1"), - false, - null, - true, - _queue); - - Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), - protocolSession, - new AMQShortString("S2"), - false, - null, - true, - _queue); + Subscription s1 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S1"), + false, + null, + true, + _queue); + + Subscription s2 = subscriptionFactory.createSubscription(channel.getChannelId(), + protocolSession, + new AMQShortString("S2"), + false, + null, + true, + _queue); _subscribers.addSubscriber(s1); _subscribers.addSubscriber(s2); assertTrue(_queueMBean.getActiveConsumerCount() == 3); @@ -165,7 +205,7 @@ public class AMQQueueMBeanTest extends TestCase } - AMQMessage msg = message(false); + AMQMessage msg = message(false, false); long id = msg.getMessageId(); _queue.clearQueue(_storeContext); @@ -184,7 +224,7 @@ public class AMQQueueMBeanTest extends TestCase } } - private AMQMessage message(final boolean immediate) throws AMQException + private AMQMessage message(final boolean immediate, boolean persistent) throws AMQException { MessagePublishInfo publish = new MessagePublishInfo() { @@ -209,9 +249,11 @@ public class AMQQueueMBeanTest extends TestCase return null; } }; - + ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); contentHeaderBody.bodySize = MESSAGE_SIZE; // in bytes + contentHeaderBody.properties = new BasicContentHeaderProperties(); + ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) (persistent ? 2 : 1)); return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody); } @@ -221,22 +263,38 @@ public class AMQQueueMBeanTest extends TestCase super.setUp(); IApplicationRegistry applicationRegistry = ApplicationRegistry.getInstance(); _virtualHost = applicationRegistry.getVirtualHostRegistry().getVirtualHost("test"); + _messageStore = _virtualHost.getMessageStore(); + + _transactionalContext = new NonTransactionalContext(_messageStore, _storeContext, + null, + new LinkedList<RequiredDeliveryException>(), + new HashSet<Long>()); + _queue = new AMQQueue(new AMQShortString("testQueue"), false, new AMQShortString("AMQueueMBeanTest"), false, _virtualHost); _queueMBean = new AMQQueueMBean(_queue); + + _protocolSession = new TestMinaProtocolSession(); } - private void sendMessages(int messageCount) throws AMQException + private void sendMessages(int messageCount, boolean persistent) throws AMQException { - AMQMessage[] messages = new AMQMessage[messageCount]; - for (int i = 0; i < messages.length; i++) - { - messages[i] = message(false); - messages[i].enqueue(_queue); - messages[i].routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); - } for (int i = 0; i < messageCount; i++) { - _queue.process(_storeContext, messages[i], false); + AMQMessage currentMessage = message(false, persistent); + currentMessage.enqueue(_queue); + + // route header + currentMessage.routingComplete(_messageStore, _storeContext, new MessageHandleFactory()); + + // Add the body so we have somthing to test later + currentMessage.addContentBodyFrame(_storeContext, + _protocolSession.getRegistry() + .getProtocolVersionMethodConverter() + .convertToContentChunk( + new ContentBody(ByteBuffer.allocate((int) MESSAGE_SIZE), + MESSAGE_SIZE))); + + } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java new file mode 100644 index 0000000000..48d808142c --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.queue.MessageMetaData; +import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.abstraction.ContentChunk; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; + +/** + * Adds some extra methods to the memory message store for testing purposes. + */ +public class TestableMemoryMessageStore extends MemoryMessageStore +{ + + MemoryMessageStore _mms = null; + + public TestableMemoryMessageStore(MemoryMessageStore mms) + { + _mms = mms; + } + + public TestableMemoryMessageStore() + { + _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(); + _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>(); + } + + public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap() + { + if (_mms != null) + { + return _mms._metaDataMap; + } + else + { + return _metaDataMap; + } + } + + public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap() + { + if (_mms != null) + { + return _mms._contentBodyMap; + } + else + { + return _contentBodyMap; + } + } +} diff --git a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java index 76ea2a63d4..45bf73bd9d 100644 --- a/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java +++ b/java/systests/src/main/java/org/apache/qpid/server/queue/MessageReturnTest.java @@ -78,6 +78,8 @@ public class MessageReturnTest extends TestCase implements ExceptionListener private CountDownLatch _returns = new CountDownLatch(1); private int _receivedCount = 0; + private int _initialContentBodyMapSize; + private int _initilaMessageMetaDataMapSize; protected void setUp() throws Exception { @@ -94,13 +96,19 @@ public class MessageReturnTest extends TestCase implements ExceptionListener env.put("queue.badQueue", QUEUE); _context = factory.getInitialContext(env); + + getBrokerInitialState(); } protected void tearDown() throws Exception { - _producerConnection.close(); super.tearDown(); + if (_producerConnection != null) + { + _producerConnection.close(); + } + if (BROKER.startsWith("vm://")) { TransportConnection.killAllVMBrokers(); @@ -130,7 +138,7 @@ public class MessageReturnTest extends TestCase implements ExceptionListener _producerConnection.close(); //Verify we get all the messages. - verifyAllMessagesRecevied(); + verifyAllMessagesRecevied(); //Verify Broker state verifyBrokerState(); } @@ -153,6 +161,34 @@ public class MessageReturnTest extends TestCase implements ExceptionListener _producer = _producerSession.createProducer((Queue) _context.lookup("badQueue")); } + // todo: collect to a general testing class - duplicated in AMQQueueMBeanTest + private void getBrokerInitialState() + { + IApplicationRegistry registry = ApplicationRegistry.getInstance(); + + VirtualHost testVhost = registry.getVirtualHostRegistry().getVirtualHost(VHOST); + + assertNotNull("Unable to get test Vhost", testVhost.getMessageStore()); + + TestableMemoryMessageStore store = new TestableMemoryMessageStore((MemoryMessageStore) testVhost.getMessageStore()); + + _initialContentBodyMapSize = store.getContentBodyMap() == null ? 0 : store.getContentBodyMap().size(); + _initilaMessageMetaDataMapSize = store.getMessageMetaDataMap() == null ? 0 : store.getMessageMetaDataMap().size(); + + if (_initialContentBodyMapSize != 0) + { + _logger.warn("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize); + System.out.println("Store is dirty: ContentBodyMap has Size:" + _initialContentBodyMapSize); + } + + if (_initilaMessageMetaDataMapSize != 0) + { + _logger.warn("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize); + System.out.println("Store is dirty: MessageMetaDataMap has Size:" + _initilaMessageMetaDataMapSize); + } + + } + private void verifyBrokerState() { IApplicationRegistry registry = ApplicationRegistry.getInstance(); @@ -169,7 +205,7 @@ public class MessageReturnTest extends TestCase implements ExceptionListener // If the CBM has content it may be due to the broker not yet purging. // Closing the producer connection before testing should give the store time to clean up. // Perform a quick sleep just in case - if (store.getContentBodyMap().size() != 0) + while (store.getContentBodyMap().size() > _initialContentBodyMapSize) { try { @@ -179,21 +215,9 @@ public class MessageReturnTest extends TestCase implements ExceptionListener { } } - assertEquals("Expected the store to have no content:" + store.getContentBodyMap(), 0, store.getContentBodyMap().size()); + assertTrue("Expected the store content size not reached at test start it was :" + _initialContentBodyMapSize + " Now it is :" + store.getContentBodyMap().size(), _initialContentBodyMapSize >= store.getContentBodyMap().size()); assertNotNull("MessageMetaDataMap should not be null", store.getMessageMetaDataMap()); - - if (store.getMessageMetaDataMap().size() != 0) - { - try - { - Thread.sleep(500); - } - catch (InterruptedException e) - { - } - } - - assertEquals("Expected the store to have no metadata:" + store.getMessageMetaDataMap(), 0, store.getMessageMetaDataMap().size()); + assertTrue("Expected the store MessageMetaData size not reached at test start it was :" + _initilaMessageMetaDataMapSize + " Now it is :" + store.getMessageMetaDataMap().size(), _initialContentBodyMapSize >= store.getMessageMetaDataMap().size()); } private void verifyAllMessagesRecevied() @@ -221,7 +245,7 @@ public class MessageReturnTest extends TestCase implements ExceptionListener /** * We can't verify messageOrder here as the return threads are not synchronized so we have no way of - * guarranting the order. + * guarranting the order. */ private void verifyMessageOrder() { |