summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-09-27 16:44:02 +0000
committerGordon Sim <gsim@apache.org>2006-09-27 16:44:02 +0000
commit9d718c2348708b0b27ce9fb9fcbf05c4b0a997cc (patch)
tree4bea6ff87d58d3db712cbc727b9723680b1b981a
parent72aff4cc22932bd98b542ac9dd6393e23985e312 (diff)
downloadqpid-python-9d718c2348708b0b27ce9fb9fcbf05c4b0a997cc.tar.gz
Moved ack tests to basic class, added test for requeueing on recovery.
Implemented requeuing on recovery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@450504 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/broker/inc/Message.h2
-rw-r--r--cpp/broker/src/Channel.cpp2
-rw-r--r--cpp/broker/src/Message.cpp9
-rw-r--r--python/tests/basic.py101
-rw-r--r--python/tests/broker.py44
5 files changed, 111 insertions, 47 deletions
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
index 37a0c9b2c8..7a239adace 100644
--- a/cpp/broker/inc/Message.h
+++ b/cpp/broker/inc/Message.h
@@ -39,6 +39,7 @@ namespace qpid {
string routingKey;
const bool mandatory;
const bool immediate;
+ bool redelivered;
qpid::framing::AMQHeaderBody::shared_ptr header;
content_list content;
@@ -61,6 +62,7 @@ namespace qpid {
void deliver(qpid::framing::OutputHandler* out, int channel,
string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize);
+ void redeliver();
friend bool route(Message::shared_ptr& msg, ExchangeRegistry* registry);
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index b49635e026..4fb6a52b99 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -173,7 +173,6 @@ void Channel::ack(u_int64_t deliveryTag, bool multiple){
void Channel::recover(bool requeue){
if(requeue){
- //TODO: need to set redelivered flag
for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
unacknowledged.clear();
}else{
@@ -188,6 +187,7 @@ bool Channel::MatchAck::operator()(AckRecord& record) const{
}
void Channel::Requeue::operator()(AckRecord& record) const{
+ record.msg->redeliver();
record.queue->deliver(record.msg);
}
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
index 7afcd97934..8ebe40410a 100644
--- a/cpp/broker/src/Message.cpp
+++ b/cpp/broker/src/Message.cpp
@@ -32,7 +32,8 @@ Message::Message(const ConnectionToken* const _publisher,
exchange(_exchange),
routingKey(_routingKey),
mandatory(_mandatory),
- immediate(_immediate){
+ immediate(_immediate),
+ redelivered(false){
}
@@ -51,11 +52,15 @@ bool Message::isComplete(){
return header.get() && (header->getContentSize() == contentSize());
}
+void Message::redeliver(){
+ redelivered = true;
+}
+
void Message::deliver(OutputHandler* out, int channel,
string& consumerTag, u_int64_t deliveryTag,
u_int32_t framesize){
- out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, false, exchange, routingKey)));
+ out->send(new AMQFrame(channel, new BasicDeliverBody(consumerTag, deliveryTag, redelivered, exchange, routingKey)));
AMQBody::shared_ptr headerBody = static_pointer_cast<AMQBody, AMQHeaderBody>(header);
out->send(new AMQFrame(channel, headerBody));
for(content_iterator i = content.begin(); i != content.end(); i++){
diff --git a/python/tests/basic.py b/python/tests/basic.py
index 75d49ec805..9ffcd11b95 100644
--- a/python/tests/basic.py
+++ b/python/tests/basic.py
@@ -137,3 +137,104 @@ class BasicTests(TestBase):
#cancellation of non-existant consumers should be handled without error
channel.basic_cancel(consumer_tag="my-consumer")
channel.basic_cancel(consumer_tag="this-never-existed")
+
+
+ def test_basic_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-ack-queue")
+
+ reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
+ queue = self.client.queue(reply.consumer_tag)
+
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
+ channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+ channel.basic_recover(requeue=False)
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ def test_basic_recover_requeue(self):
+ """
+ Test requeing on recovery
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-requeue")
+
+ subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ channel.basic_publish(routing_key="test-requeue", content=Content("One"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
+ channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+ channel.basic_cancel(consumer_tag=subscription.consumer_tag)
+ subscription2 = channel.basic_consume(queue="test-requeue")
+ queue2 = self.client.queue(subscription2.consumer_tag)
+
+ channel.basic_recover(requeue=True)
+
+ msg3b = queue2.get(timeout=1)
+ msg5b = queue2.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ self.assertTrue(msg3b.redelivered)
+ self.assertTrue(msg5b.redelivered)
+
+ try:
+ extra = queue2.get(timeout=1)
+ self.fail("Got unexpected message in second queue: " + extra.content.body)
+ except Empty: None
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in original queue: " + extra.content.body)
+ except Empty: None
+
diff --git a/python/tests/broker.py b/python/tests/broker.py
index 1c31767ef8..f7cb3bcef3 100644
--- a/python/tests/broker.py
+++ b/python/tests/broker.py
@@ -101,47 +101,3 @@ class BrokerTests(TestBase):
except Closed, e:
self.assertConnectionException(504, e.args[0])
-
- def test_acknowledgement(self):
- """
- Test basic ack/recover behaviour
- """
- channel = self.channel
- channel.queue_declare(queue="test-ack-queue")
-
- reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
- queue = self.client.queue(reply.consumer_tag)
-
- channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
- channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
-
- msg1 = queue.get(timeout=1)
- msg2 = queue.get(timeout=1)
- msg3 = queue.get(timeout=1)
- msg4 = queue.get(timeout=1)
- msg5 = queue.get(timeout=1)
-
- self.assertEqual("One", msg1.content.body)
- self.assertEqual("Two", msg2.content.body)
- self.assertEqual("Three", msg3.content.body)
- self.assertEqual("Four", msg4.content.body)
- self.assertEqual("Five", msg5.content.body)
-
- channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
- channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
-
- channel.basic_recover(requeue=False)
-
- msg3b = queue.get(timeout=1)
- msg5b = queue.get(timeout=1)
-
- self.assertEqual("Three", msg3b.content.body)
- self.assertEqual("Five", msg5b.content.body)
-
- try:
- extra = queue.get(timeout=1)
- self.fail("Got unexpected message: " + extra.content.body)
- except Empty: None