summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Ritchie <ritchiem@apache.org>2007-09-25 12:51:09 +0000
committerMartin Ritchie <ritchiem@apache.org>2007-09-25 12:51:09 +0000
commitf782284d8cc4d3c3cb969c06dd1bfbbb6ec2d160 (patch)
treea439bc58c599eeb090d3c8aafaa77c40a8057701
parent24a6914e1f823e9f38d7b721e29c369720750c6b (diff)
downloadqpid-python-f782284d8cc4d3c3cb969c06dd1bfbbb6ec2d160.tar.gz
QPID-610 : Fix for Get NO_ACK leak. The Java Client doesn't use get so augmented the python test_get to send persistent messages and used debugger to verify that messages were correctly removed. Verified that prior to this commit they would remain in the store. We need a management exchange to fully validate this with a python tests.
NOTE: The setting of "delivery mode" property on M2.1 is not the same as on trunk where _ is use such as "delivery_mode". There is also no error that you have sent an incorrect property. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@579229 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java5
-rw-r--r--python/tests/basic.py51
2 files changed, 49 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
index 026761a618..ea077d659f 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
@@ -330,6 +330,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager
deliveryTag, _queue.getMessageCount());
_totalMessageSize.addAndGet(-msg.getSize());
}
+
+ if (!acks)
+ {
+ msg.decrementReference(channel.getStoreContext());
+ }
}
finally
{
diff --git a/python/tests/basic.py b/python/tests/basic.py
index 9f26ee3728..bbbfa8ebf9 100644
--- a/python/tests/basic.py
+++ b/python/tests/basic.py
@@ -339,9 +339,11 @@ class BasicTests(TestBase):
channel = self.channel
channel.queue_declare(queue="test-get", exclusive=True)
- #publish some messages (no_ack=True)
+ #publish some messages (no_ack=True) with persistent messaging
for i in range(1, 11):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+ msg=Content("Message %d" % i)
+ msg["delivery mode"] = 2
+ channel.basic_publish(routing_key="test-get",content=msg )
#use basic_get to read back the messages, and check that we get an empty at the end
for i in range(1, 11):
@@ -354,18 +356,53 @@ class BasicTests(TestBase):
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-empty")
- #repeat for no_ack=False
+
+ #publish some messages (no_ack=True) transient messaging
for i in range(11, 21):
channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+ #use basic_get to read back the messages, and check that we get an empty at the end
for i in range(11, 21):
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #repeat for no_ack=False
+
+ #publish some messages (no_ack=False) with persistent messaging
+ for i in range(21, 31):
+ msg=Content("Message %d" % i)
+ msg["delivery mode"] = 2
+ channel.basic_publish(routing_key="test-get",content=msg )
+
+ #use basic_get to read back the messages, and check that we get an empty at the end
+ for i in range(21, 31):
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #public some messages (no_ack=False) with transient messaging
+ for i in range(31, 41):
+ channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+ for i in range(31, 41):
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-ok")
self.assertEqual("Message %d" % i, reply.content.body)
- if(i == 13):
+ if(i == 33):
channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [15, 17, 19]):
+ if(i in [35, 37, 39]):
channel.basic_ack(delivery_tag=reply.delivery_tag)
reply = channel.basic_get(no_ack=True)
@@ -375,8 +412,8 @@ class BasicTests(TestBase):
#recover(requeue=True)
channel.basic_recover(requeue=True)
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
+ #get the unacked messages again (34, 36, 38, 40)
+ for i in [34, 36, 38, 40]:
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-ok")