diff options
6 files changed, 64 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index b8c5e821f7..7088c704ed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -60,7 +60,7 @@ public class UnacknowledgedMessage { if (queue != null) { - message.dequeue(storeContext, queue); + queue.dequeue(storeContext, message); } //if the queue is null then the message is waiting to be acked, but has been removed. message.decrementReference(storeContext); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index afa581f0c5..cd8c0198f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -592,7 +592,19 @@ public class AMQMessage _transientMessageData.addDestinationQueue(queue); } - public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException + /** + * NOTE: Think about why you are using this method. Normal usages would want to do + * AMQQueue.dequeue(StoreContext, AMQMessage) + * This will keep the queue statistics up-to-date. + * Currently this method is only called _correctly_ from AMQQueue dequeue. + * Ideally we would have a better way for the queue to dequeue the message. + * Especially since enqueue isn't the recipriocal of this method. + * @deprecated + * @param storeContext + * @param queue + * @throws AMQException + */ + void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException { _messageHandle.dequeue(storeContext, _messageId, queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 4a336ef71c..2ea0b6d3d8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -809,7 +809,7 @@ public class AMQQueue implements Managable, Comparable } } - void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException + public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException { try { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index ad02f477e0..8e72e995d0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -413,14 +413,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager AMQMessage message = _messages.poll(); - message.dequeue(storeContext, queue); - - message.decrementReference(storeContext); - if (message != null) { + queue.dequeue(storeContext, message); + _totalMessageSize.addAndGet(-message.getSize()); - } + + //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE. + message.decrementReference(storeContext); + + } _lock.unlock(); } @@ -485,7 +487,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager _totalMessageSize.addAndGet(-message.getSize()); // Use the reapingStoreContext as any sub(if we have one) may be in a tx. - message.dequeue(_reapingStoreContext, _queue); + _queue.dequeue(_reapingStoreContext, message); message.decrementReference(_reapingStoreContext); @@ -511,13 +513,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } /** - * This method will return true if the message is to be purged from the queue. + * This method will return true if the message is to be purged from the queue. * * - * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue) + * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue) + * * @param message * @param sub + * * @return + * * @throws AMQException */ private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException @@ -607,6 +612,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager ") to :" + System.identityHashCode(sub)); } + + if (messageQueue == _messages) + { + _totalMessageSize.addAndGet(-message.getSize()); + } + sub.send(message, _queue); //remove sent message from our queue. @@ -654,10 +665,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } - if ((message != null) && (messageQueue == _messages)) - { - _totalMessageSize.addAndGet(-message.getSize()); - } } catch (AMQException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 774f6e915c..1299c3a80c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -307,12 +307,11 @@ public class SubscriptionImpl implements Subscription } protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag); - + if (!_acks) { msg.decrementReference(storeContext); } - } } finally diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java index c8d43a47a5..a9c93d7227 100644 --- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java +++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java @@ -114,18 +114,28 @@ public class DurableSubscriptionTest extends TestCase con.close(); } - public void testDurability() throws AMQException, JMSException, URLSyntaxException + public void testDurabilityNOACK() throws AMQException, JMSException, URLSyntaxException + { + durabilityImpl(AMQSession.NO_ACKNOWLEDGE); + } + + public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException + { + durabilityImpl(Session.AUTO_ACKNOWLEDGE); + } + + private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException { AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test"); AMQTopic topic = new AMQTopic(con, "MyTopic"); - Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session session1 = con.createSession(false, ackMode); MessageConsumer consumer1 = session1.createConsumer(topic); - Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session sessionProd = con.createSession(false, ackMode); MessageProducer producer = sessionProd.createProducer(topic); - Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session session2 = con.createSession(false, ackMode); TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription"); con.start(); @@ -133,36 +143,41 @@ public class DurableSubscriptionTest extends TestCase producer.send(session1.createTextMessage("A")); Message msg; - msg = consumer1.receive(); - assertEquals("A", ((TextMessage) msg).getText()); msg = consumer1.receive(100); - assertEquals(null, msg); + assertNotNull("Message should be available", msg); + assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText()); + + msg = consumer1.receive(100); + assertNull("There should be no more messages for consumption on consumer1.", msg); msg = consumer2.receive(); - assertEquals("A", ((TextMessage) msg).getText()); + assertNotNull(msg); + assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText()); msg = consumer2.receive(100); - assertEquals(null, msg); + assertNull("There should be no more messages for consumption on consumer2.", msg); consumer2.close(); - Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE); + Session session3 = con.createSession(false, ackMode); MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription"); producer.send(session1.createTextMessage("B")); _logger.info("Receive message on consumer 1 :expecting B"); msg = consumer1.receive(100); - assertEquals("B", ((TextMessage) msg).getText()); + assertNotNull("Consumer 1 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 1 :expecting null"); msg = consumer1.receive(100); - assertEquals(null, msg); + assertNull("There should be no more messages for consumption on consumer1.", msg); _logger.info("Receive message on consumer 3 :expecting B"); msg = consumer3.receive(100); - assertEquals("B", ((TextMessage) msg).getText()); + assertNotNull("Consumer 3 should get message 'B'.", msg); + assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText()); _logger.info("Receive message on consumer 3 :expecting null"); msg = consumer3.receive(100); - assertEquals(null, msg); + assertNull("There should be no more messages for consumption on consumer3.", msg); con.close(); } |