summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-10-01 15:31:00 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-10-01 15:31:00 +0000
commit95f531a2e97c7566be7fc07c259f81c9d7ff2842 (patch)
tree55dfe5aef0bbf63fc1238c9a10db8b6a7ab1b5a9
parentf07cc9da6e3f138efe5c6d1b76968a9b8eee5bf0 (diff)
downloadqpid-python-95f531a2e97c7566be7fc07c259f81c9d7ff2842.tar.gz
QPID-611 QPID-620. DurableSubscriptionTest was failing due to a race condition when using NO_ACK. This is due to the Queue Total Size being updated after the send, but after the send and NO_ACK the msg data is purged and so there is no size to retrieve. Changed all references to msg.dequeue to queue.dequeue where appropriate so we can use that single point in the future for updating the Queue Total Size.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@580993 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java14
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java31
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java3
-rw-r--r--java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java43
6 files changed, 64 insertions, 31 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
index b8c5e821f7..7088c704ed 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
@@ -60,7 +60,7 @@ public class UnacknowledgedMessage
{
if (queue != null)
{
- message.dequeue(storeContext, queue);
+ queue.dequeue(storeContext, message);
}
//if the queue is null then the message is waiting to be acked, but has been removed.
message.decrementReference(storeContext);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
index afa581f0c5..cd8c0198f3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
@@ -592,7 +592,19 @@ public class AMQMessage
_transientMessageData.addDestinationQueue(queue);
}
- public void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
+ /**
+ * NOTE: Think about why you are using this method. Normal usages would want to do
+ * AMQQueue.dequeue(StoreContext, AMQMessage)
+ * This will keep the queue statistics up-to-date.
+ * Currently this method is only called _correctly_ from AMQQueue dequeue.
+ * Ideally we would have a better way for the queue to dequeue the message.
+ * Especially since enqueue isn't the recipriocal of this method.
+ * @deprecated
+ * @param storeContext
+ * @param queue
+ * @throws AMQException
+ */
+ void dequeue(StoreContext storeContext, AMQQueue queue) throws AMQException
{
_messageHandle.dequeue(storeContext, _messageId, queue);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
index 4a336ef71c..2ea0b6d3d8 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
@@ -809,7 +809,7 @@ public class AMQQueue implements Managable, Comparable
}
}
- void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+ public void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
{
try
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index ad02f477e0..8e72e995d0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -413,14 +413,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
AMQMessage message = _messages.poll();
- message.dequeue(storeContext, queue);
-
- message.decrementReference(storeContext);
-
if (message != null)
{
+ queue.dequeue(storeContext, message);
+
_totalMessageSize.addAndGet(-message.getSize());
- }
+
+ //If this causes ref count to hit zero then data will be purged so message.getSize() will NPE.
+ message.decrementReference(storeContext);
+
+ }
_lock.unlock();
}
@@ -485,7 +487,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
_totalMessageSize.addAndGet(-message.getSize());
// Use the reapingStoreContext as any sub(if we have one) may be in a tx.
- message.dequeue(_reapingStoreContext, _queue);
+ _queue.dequeue(_reapingStoreContext, message);
message.decrementReference(_reapingStoreContext);
@@ -511,13 +513,16 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
/**
- * This method will return true if the message is to be purged from the queue.
+ * This method will return true if the message is to be purged from the queue.
*
*
- * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ * SIDE-EFFECT: The message will be taken by the Subscription(sub) for the current Queue(_queue)
+ *
* @param message
* @param sub
+ *
* @return
+ *
* @throws AMQException
*/
private boolean purgeMessage(AMQMessage message, Subscription sub) throws AMQException
@@ -607,6 +612,12 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
") to :" + System.identityHashCode(sub));
}
+
+ if (messageQueue == _messages)
+ {
+ _totalMessageSize.addAndGet(-message.getSize());
+ }
+
sub.send(message, _queue);
//remove sent message from our queue.
@@ -654,10 +665,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
}
}
- if ((message != null) && (messageQueue == _messages))
- {
- _totalMessageSize.addAndGet(-message.getSize());
- }
}
catch (AMQException e)
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
index 774f6e915c..1299c3a80c 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
@@ -307,12 +307,11 @@ public class SubscriptionImpl implements Subscription
}
protocolSession.getProtocolOutputConverter().writeDeliver(msg, channel.getChannelId(), deliveryTag, consumerTag);
-
+
if (!_acks)
{
msg.decrementReference(storeContext);
}
-
}
}
finally
diff --git a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
index c8d43a47a5..a9c93d7227 100644
--- a/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
+++ b/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
@@ -114,18 +114,28 @@ public class DurableSubscriptionTest extends TestCase
con.close();
}
- public void testDurability() throws AMQException, JMSException, URLSyntaxException
+ public void testDurabilityNOACK() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImpl(AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
AMQTopic topic = new AMQTopic(con, "MyTopic");
- Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
- Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session sessionProd = con.createSession(false, ackMode);
MessageProducer producer = sessionProd.createProducer(topic);
- Session session2 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session2 = con.createSession(false, ackMode);
TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
con.start();
@@ -133,36 +143,41 @@ public class DurableSubscriptionTest extends TestCase
producer.send(session1.createTextMessage("A"));
Message msg;
- msg = consumer1.receive();
- assertEquals("A", ((TextMessage) msg).getText());
msg = consumer1.receive(100);
- assertEquals(null, msg);
+ assertNotNull("Message should be available", msg);
+ assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
+
+ msg = consumer1.receive(100);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
msg = consumer2.receive();
- assertEquals("A", ((TextMessage) msg).getText());
+ assertNotNull(msg);
+ assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
msg = consumer2.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer2.", msg);
consumer2.close();
- Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
producer.send(session1.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(100);
- assertEquals("B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 1 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 1 :expecting null");
msg = consumer1.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(100);
- assertEquals("B", ((TextMessage) msg).getText());
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
msg = consumer3.receive(100);
- assertEquals(null, msg);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
con.close();
}