summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java43
6 files changed, 64 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index b8c5e821f7..7088c704ed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -60,7 +60,7 @@ public class UnacknowledgedMessage
{
if (queue != null)
{
- message.dequeue(storeContext, queue);
+ queue.dequeue(storeContext, message);
}
//if the queue is null then the message is waiting to be acked, but has been removed.
message.decrementReference(storeContext);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index afa581f0c5..cd8c0198f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -592,7 +592,19 @@ public class AMQMessage
_transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
+ /**
+ * NOTE: Think about why you are using this method. Normal usages would want to do
+ * AMQQueue.dequeue(StoreContext, AMQMessage)
+ * This will keep the queue statistics up-to-date.
+ * Currently this method is only called _correctly_ from AMQQueue dequeue.
+ * Ideally we would have a better way for the queue to dequeue the message.
+ * Especially since enqueue isn't the recipriocal of this method.
+ * @deprecated
+ * @param storeContext
+ * @param queue
+ * @throws AMQException
+ */
+ void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
{
_messageHandle.dequeue(storeContext, _messageId, queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4a336ef71c..2ea0b6d3d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -809,7 +809,7 @@ public class AMQQueue implements Managable, Comparable
}
}
- void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+ public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
{
try
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index ad02f477e0..8e72e995d0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -413,14 +413,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = _messages.poll();
- message.dequeue(storeContext, queue);
-
- message.decrementReference(storeContext);
-
if (message != null)
{
+ queue.dequeue(storeContext, message);
+
_totalMessageSize.addAndGet(-message.getSize());
- }
+
+ //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+ message.decrementReference(storeContext);
+
+ }
_lock.unlock();
}
@@ -485,7 +487,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_totalMessageSize.addAndGet(-message.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- message.dequeue(_reapingStoreContext, _queue);
+ _queue.dequeue(_reapingStoreContext, message);
message.decrementReference(_reapingStoreContext);
@@ -511,13 +513,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * This method will return true if the message is to be purged from the queue.
+ * This method will return true if the message is to be purged from the queue.
*
*
- * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ *
* @param message
* @param sub
+ *
* @return
+ *
* @throws AMQException
*/
private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
@@ -607,6 +612,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
") to :" + System.identityHashCode(sub));
}
+
+ if (messageQueue == _messages)
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+ }
+
sub.send(message, _queue);
//remove sent message from our queue.
@@ -654,10 +665,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- if ((message != null) && (messageQueue == _messages))
- {
- _totalMessageSize.addAndGet(-message.getSize());
- }
}
catch (AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 774f6e915c..1299c3a80c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -307,12 +307,11 @@ public class SubscriptionImpl implements Subscription
}
protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
-
+
if (!_acks)
{
msg.decrementReference(storeContext);
}
-
}
}
finally
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index c8d43a47a5..a9c93d7227 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -114,18 +114,28 @@ public class DurableSubscriptionTest extends TestCase
con.close();
}
- public void testDurability() throws AMQException, JMSException, URLSyntaxException
+ public void testDurabilityNOACK() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImpl(AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
AMQTopic topic = new AMQTopic(con, "MyTopic");
- Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
- Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session sessionProd = con.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session2 = con.createSession(false, ackMode);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
con.start();
@@ -133,36 +143,41 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive();
- assertEquals("A", ((TextMessage) msg).getText());
msg = consumer1.receive(100);
- assertEquals(null, msg);
+ assertNotNull("Message should be available", msg);
+ assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
+
+ msg = consumer1.receive(100);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
msg = consumer2.receive();
- assertEquals("A", ((TextMessage) msg).getText());
+ assertNotNull(msg);
+ assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer2.", msg);
consumer2.close();
- Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(100);
- assertEquals("B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 1 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(100);
- assertEquals("B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
con.close();
}