diff options
author | Gordon Sim <gsim@apache.org> | 2006-09-27 16:44:02 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-09-27 16:44:02 +0000 |
commit | 9d718c2348708b0b27ce9fb9fcbf05c4b0a997cc (patch) | |
tree | 4bea6ff87d58d3db712cbc727b9723680b1b981a | |
parent | 72aff4cc22932bd98b542ac9dd6393e23985e312 (diff) | |
download | qpid-python-9d718c2348708b0b27ce9fb9fcbf05c4b0a997cc.tar.gz |
Moved ack tests to basic class, added test for requeueing on recovery.
Implemented requeuing on recovery.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450504 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/broker/inc/Message.h | 2 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 2 | ||||
-rw-r--r-- | cpp/broker/src/Message.cpp | 9 | ||||
-rw-r--r-- | python/tests/basic.py | 101 | ||||
-rw-r--r-- | python/tests/broker.py | 44 |
5 files changed, 111 insertions, 47 deletions
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h index 37a0c9b2c8..7a239adace 100644 --- a/cpp/broker/inc/Message.h +++ b/cpp/broker/inc/Message.h @@ -39,6 +39,7 @@ namespace qpid { string routingKey; const bool mandatory; const bool immediate; + bool redelivered; qpid::framing::AMQHeaderBody::shared_ptr header; content_list content; @@ -61,6 +62,7 @@ namespace qpid { void deliver(qpid::framing::OutputHandler* out, int channel, string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize); + void redeliver(); friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry); diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index b49635e026..4fb6a52b99 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -173,7 +173,6 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){ void Channel::recover(bool requeue){ if(requeue){ - //TODO: need to set redelivered flag for_each(unacknowledged.begin(), unacknowledged.end(), Requeue()); unacknowledged.clear(); }else{ @@ -188,6 +187,7 @@ bool Channel::MatchAck::operator()(AckRecord& record) const{ } void Channel::Requeue::operator()(AckRecord& record) const{ + record.msg->redeliver(); record.queue->deliver(record.msg); } diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp index 7afcd97934..8ebe40410a 100644 --- a/cpp/broker/src/Message.cpp +++ b/cpp/broker/src/Message.cpp @@ -32,7 +32,8 @@ Message::Message(const ConnectionToken* const _publisher, exchange(_exchange), routingKey(_routingKey), mandatory(_mandatory), - immediate(_immediate){ + immediate(_immediate), + redelivered(false){ } @@ -51,11 +52,15 @@ bool Message::isComplete(){ return header.get() && (header->getContentSize() == contentSize()); } +void Message::redeliver(){ + redelivered = true; +} + void Message::deliver(OutputHandler* out, int channel, string& consumerTag, u_int64_t deliveryTag, u_int32_t framesize){ - out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false, exchange, routingKey))); + out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey))); AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header); out->send(new AMQFrame(channel, headerBody)); for(content_iterator i = content.begin(); i != content.end(); i++){ diff --git a/python/tests/basic.py b/python/tests/basic.py index 75d49ec805..9ffcd11b95 100644 --- a/python/tests/basic.py +++ b/python/tests/basic.py @@ -137,3 +137,104 @@ class BasicTests(TestBase): #cancellation of non-existant consumers should be handled without error channel.basic_cancel(consumer_tag="my-consumer") channel.basic_cancel(consumer_tag="this-never-existed") + + + def test_basic_ack(self): + """ + Test basic ack/recover behaviour + """ + channel = self.channel + channel.queue_declare(queue="test-ack-queue") + + reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) + queue = self.client.queue(reply.consumer_tag) + + channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) + channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_recover(requeue=False) + + msg3b = queue.get(timeout=1) + msg5b = queue.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message: " + extra.content.body) + except Empty: None + + def test_basic_recover_requeue(self): + """ + Test requeing on recovery + """ + channel = self.channel + channel.queue_declare(queue="test-requeue") + + subscription = channel.basic_consume(queue="test-requeue", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + channel.basic_publish(routing_key="test-requeue", content=Content("One")) + channel.basic_publish(routing_key="test-requeue", content=Content("Two")) + channel.basic_publish(routing_key="test-requeue", content=Content("Three")) + channel.basic_publish(routing_key="test-requeue", content=Content("Four")) + channel.basic_publish(routing_key="test-requeue", content=Content("Five")) + + msg1 = queue.get(timeout=1) + msg2 = queue.get(timeout=1) + msg3 = queue.get(timeout=1) + msg4 = queue.get(timeout=1) + msg5 = queue.get(timeout=1) + + self.assertEqual("One", msg1.content.body) + self.assertEqual("Two", msg2.content.body) + self.assertEqual("Three", msg3.content.body) + self.assertEqual("Four", msg4.content.body) + self.assertEqual("Five", msg5.content.body) + + channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two + channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four + + channel.basic_cancel(consumer_tag=subscription.consumer_tag) + subscription2 = channel.basic_consume(queue="test-requeue") + queue2 = self.client.queue(subscription2.consumer_tag) + + channel.basic_recover(requeue=True) + + msg3b = queue2.get(timeout=1) + msg5b = queue2.get(timeout=1) + + self.assertEqual("Three", msg3b.content.body) + self.assertEqual("Five", msg5b.content.body) + + self.assertTrue(msg3b.redelivered) + self.assertTrue(msg5b.redelivered) + + try: + extra = queue2.get(timeout=1) + self.fail("Got unexpected message in second queue: " + extra.content.body) + except Empty: None + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected message in original queue: " + extra.content.body) + except Empty: None + diff --git a/python/tests/broker.py b/python/tests/broker.py index 1c31767ef8..f7cb3bcef3 100644 --- a/python/tests/broker.py +++ b/python/tests/broker.py @@ -101,47 +101,3 @@ class BrokerTests(TestBase): except Closed, e: self.assertConnectionException(504, e.args[0]) - - def test_acknowledgement(self): - """ - Test basic ack/recover behaviour - """ - channel = self.channel - channel.queue_declare(queue="test-ack-queue") - - reply = channel.basic_consume(queue="test-ack-queue", no_ack=False) - queue = self.client.queue(reply.consumer_tag) - - channel.basic_publish(routing_key="test-ack-queue", content=Content("One")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Two")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Three")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) - channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) - - self.assertEqual("One", msg1.content.body) - self.assertEqual("Two", msg2.content.body) - self.assertEqual("Three", msg3.content.body) - self.assertEqual("Four", msg4.content.body) - self.assertEqual("Five", msg5.content.body) - - channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two - channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four - - channel.basic_recover(requeue=False) - - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) - - self.assertEqual("Three", msg3b.content.body) - self.assertEqual("Five", msg5b.content.body) - - try: - extra = queue.get(timeout=1) - self.fail("Got unexpected message: " + extra.content.body) - except Empty: None |