diff options
author | Martin Ritchie <ritchiem@apache.org> | 2007-09-25 12:51:09 +0000 |
---|---|---|
committer | Martin Ritchie <ritchiem@apache.org> | 2007-09-25 12:51:09 +0000 |
commit | f782284d8cc4d3c3cb969c06dd1bfbbb6ec2d160 (patch) | |
tree | a439bc58c599eeb090d3c8aafaa77c40a8057701 | |
parent | 24a6914e1f823e9f38d7b721e29c369720750c6b (diff) | |
download | qpid-python-f782284d8cc4d3c3cb969c06dd1bfbbb6ec2d160.tar.gz |
QPID-610 : Fix for Get NO_ACK leak. The Java Client doesn't use get so augmented the python test_get to send persistent messages and used debugger to verify that messages were correctly removed. Verified that prior to this commit they would remain in the store. We need a management exchange to fully validate this with a python tests.
NOTE: The setting of "delivery mode" property on M2.1 is not the same as on trunk where _ is use such as "delivery_mode".
There is also no error that you have sent an incorrect property.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@579229 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java | 5 | ||||
-rw-r--r-- | python/tests/basic.py | 51 |
2 files changed, 49 insertions, 7 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 026761a618..ea077d659f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -330,6 +330,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager deliveryTag, _queue.getMessageCount()); _totalMessageSize.addAndGet(-msg.getSize()); } + + if (!acks) + { + msg.decrementReference(channel.getStoreContext()); + } } finally { diff --git a/python/tests/basic.py b/python/tests/basic.py index 9f26ee3728..bbbfa8ebf9 100644 --- a/python/tests/basic.py +++ b/python/tests/basic.py @@ -339,9 +339,11 @@ class BasicTests(TestBase): channel = self.channel channel.queue_declare(queue="test-get", exclusive=True) - #publish some messages (no_ack=True) + #publish some messages (no_ack=True) with persistent messaging for i in range(1, 11): - channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + msg=Content("Message %d" % i) + msg["delivery mode"] = 2 + channel.basic_publish(routing_key="test-get",content=msg ) #use basic_get to read back the messages, and check that we get an empty at the end for i in range(1, 11): @@ -354,18 +356,53 @@ class BasicTests(TestBase): self.assertEqual(reply.method.klass.name, "basic") self.assertEqual(reply.method.name, "get-empty") - #repeat for no_ack=False + + #publish some messages (no_ack=True) transient messaging for i in range(11, 21): channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + #use basic_get to read back the messages, and check that we get an empty at the end for i in range(11, 21): + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + #repeat for no_ack=False + + #publish some messages (no_ack=False) with persistent messaging + for i in range(21, 31): + msg=Content("Message %d" % i) + msg["delivery mode"] = 2 + channel.basic_publish(routing_key="test-get",content=msg ) + + #use basic_get to read back the messages, and check that we get an empty at the end + for i in range(21, 31): + reply = channel.basic_get(no_ack=False) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-ok") + self.assertEqual("Message %d" % i, reply.content.body) + + reply = channel.basic_get(no_ack=True) + self.assertEqual(reply.method.klass.name, "basic") + self.assertEqual(reply.method.name, "get-empty") + + #public some messages (no_ack=False) with transient messaging + for i in range(31, 41): + channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i)) + + for i in range(31, 41): reply = channel.basic_get(no_ack=False) self.assertEqual(reply.method.klass.name, "basic") self.assertEqual(reply.method.name, "get-ok") self.assertEqual("Message %d" % i, reply.content.body) - if(i == 13): + if(i == 33): channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True) - if(i in [15, 17, 19]): + if(i in [35, 37, 39]): channel.basic_ack(delivery_tag=reply.delivery_tag) reply = channel.basic_get(no_ack=True) @@ -375,8 +412,8 @@ class BasicTests(TestBase): #recover(requeue=True) channel.basic_recover(requeue=True) - #get the unacked messages again (14, 16, 18, 20) - for i in [14, 16, 18, 20]: + #get the unacked messages again (34, 36, 38, 40) + for i in [34, 36, 38, 40]: reply = channel.basic_get(no_ack=False) self.assertEqual(reply.method.klass.name, "basic") self.assertEqual(reply.method.name, "get-ok") |