summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2006-10-10 10:06:36 +0000
committerGordon Sim <gsim@apache.org>2006-10-10 10:06:36 +0000
commit3a0eda85d85b3185d3fec6dec6700c6ca1fe3818 (patch)
treed4671a2fbb8ad69a0cc734134b43b28152c3e5b4
parent14654e5360b72adf1704838b3820c7d1fc860e8e (diff)
downloadqpid-python-3a0eda85d85b3185d3fec6dec6700c6ca1fe3818.tar.gz
Implementation and tests for basic_qos (i.e. prefetching)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@454677 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/broker/inc/Channel.h30
-rw-r--r--cpp/broker/inc/Message.h5
-rw-r--r--cpp/broker/src/Channel.cpp99
-rw-r--r--cpp/broker/src/Message.cpp12
-rw-r--r--cpp/broker/src/SessionHandlerImpl.cpp6
-rw-r--r--cpp/broker/test/ChannelTest.cpp120
-rw-r--r--cpp/common/framing/inc/AMQContentBody.h2
-rw-r--r--cpp/common/framing/src/AMQContentBody.cpp2
-rw-r--r--python/tests/basic.py97
9 files changed, 302 insertions, 71 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h
index e76c8a63e9..4f4d8e2890 100644
--- a/cpp/broker/inc/Channel.h
+++ b/cpp/broker/inc/Channel.h
@@ -45,10 +45,12 @@ namespace qpid {
Queue::shared_ptr queue;
ConnectionToken* const connection;
const bool ackExpected;
+ bool blocked;
public:
ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack);
virtual bool deliver(Message::shared_ptr& msg);
void cancel();
+ void requestDispatch();
};
typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator;
@@ -87,6 +89,14 @@ namespace qpid {
void operator()(AckRecord& record) const;
};
+ class AddSize{
+ u_int32_t size;
+ public:
+ AddSize();
+ void operator()(AckRecord& record);
+ u_int32_t getSize();
+ };
+
const int id;
qpid::framing::OutputHandler* out;
u_int64_t deliveryTag;
@@ -95,6 +105,7 @@ namespace qpid {
std::map<string, ConsumerImpl*> consumers;
u_int32_t prefetchSize;
u_int16_t prefetchCount;
+ u_int32_t outstandingSize;
u_int32_t framesize;
Message::shared_ptr message;
NameGenerator tagGenerator;
@@ -103,12 +114,15 @@ namespace qpid {
void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected);
void checkMessage(const std::string& text);
+ bool checkPrefetch(Message::shared_ptr& msg);
+ void cancel(consumer_iterator consumer);
- template<class Operation> void processMessage(Operation route){
+ template<class Operation> Operation processMessage(Operation route){
if(message->isComplete()){
route(message);
message.reset();
}
+ return route;
}
@@ -119,9 +133,9 @@ namespace qpid {
inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; }
inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; }
inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; }
- bool exists(string& consumerTag);
+ bool exists(const string& consumerTag);
void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0);
- void cancel(string& tag);
+ void cancel(const string& tag);
void begin();
void close();
void commit();
@@ -142,10 +156,10 @@ namespace qpid {
* there is no content routes it using the functor passed
* in.
*/
- template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
+ template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){
checkMessage("Invalid message sequence: got header before publish.");
message->setHeader(header);
- processMessage(route);
+ return processMessage(route);
}
/**
@@ -153,13 +167,15 @@ namespace qpid {
* if this completes the message, routes it using the
* functor passed in.
*/
- template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
+ template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){
checkMessage("Invalid message sequence: got content before publish.");
message->addContent(content);
- processMessage(route);
+ return processMessage(route);
}
};
+
+ struct InvalidAckException{};
}
}
diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h
index 8b3321c2dc..7b2c2bc848 100644
--- a/cpp/broker/inc/Message.h
+++ b/cpp/broker/inc/Message.h
@@ -47,8 +47,7 @@ namespace qpid {
bool redelivered;
qpid::framing::AMQHeaderBody::shared_ptr header;
content_list content;
-
- u_int64_t contentSize();
+ u_int64_t size;
public:
typedef std::tr1::shared_ptr<Message> shared_ptr;
@@ -70,6 +69,8 @@ namespace qpid {
qpid::framing::BasicHeaderProperties* getHeaderProperties();
const string& getRoutingKey() const { return routingKey; }
const string& getExchange() const { return exchange; }
+ u_int64_t contentSize() const{ return size; }
+
};
}
}
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp
index ae99f4e7fa..f887e080a6 100644
--- a/cpp/broker/src/Channel.cpp
+++ b/cpp/broker/src/Channel.cpp
@@ -28,19 +28,18 @@ using namespace qpid::concurrent;
Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out),
id(_id),
+ prefetchCount(0),
+ prefetchSize(0),
+ outstandingSize(0),
framesize(_framesize),
transactional(false),
deliveryTag(1),
tagGenerator("sgen"){}
Channel::~Channel(){
- for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl;
- delete (i->second);
- }
}
-bool Channel::exists(string& consumerTag){
+bool Channel::exists(const string& consumerTag){
return consumers.find(consumerTag) != consumers.end();
}
@@ -57,27 +56,26 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl
}
}
-void Channel::cancel(string& tag){
+void Channel::cancel(consumer_iterator i){
+ ConsumerImpl* c = i->second;
+ consumers.erase(i);
+ if(c){
+ c->cancel();
+ delete c;
+ }
+}
+
+void Channel::cancel(const string& tag){
consumer_iterator i = consumers.find(tag);
if(i != consumers.end()){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
+ cancel(i);
}
}
void Channel::close(){
//cancel all consumers
for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){
- ConsumerImpl* c = i->second;
- consumers.erase(i);
- if(c){
- c->cancel();
- delete c;
- }
+ cancel(i);
}
}
@@ -99,33 +97,50 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar
u_int64_t myDeliveryTag = deliveryTag++;
if(ackExpected){
unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag));
+ outstandingSize += msg->contentSize();
}
//send deliver method, header and content(s)
msg->deliver(out, id, consumerTag, myDeliveryTag, framesize);
}
+bool Channel::checkPrefetch(Message::shared_ptr& msg){
+ Locker locker(deliveryLock);
+ bool countOk = !prefetchCount || prefetchCount > unacknowledged.size();
+ bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty();
+ return countOk && sizeOk;
+}
+
Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag,
Queue::shared_ptr _queue,
ConnectionToken* const _connection, bool ack) : parent(_parent),
tag(_tag),
queue(_queue),
connection(_connection),
- ackExpected(ack){
+ ackExpected(ack),
+ blocked(false){
}
bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
- if(connection != msg->getPublisher()){
- parent->deliver(msg, tag, queue, ackExpected);
- return true;
- }else{
- return false;
+ if(connection != msg->getPublisher()){//check for no_local
+ if(ackExpected && !parent->checkPrefetch(msg)){
+ blocked = true;
+ }else{
+ blocked = false;
+ parent->deliver(msg, tag, queue, ackExpected);
+ return true;
+ }
}
+ return false;
}
void Channel::ConsumerImpl::cancel(){
if(queue) queue->cancel(this);
}
+void Channel::ConsumerImpl::requestDispatch(){
+ if(blocked) queue->dispatch();
+}
+
void Channel::checkMessage(const std::string& text){
if(!message.get()){
THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text);
@@ -140,20 +155,36 @@ void Channel::handlePublish(Message* msg){
}
void Channel::ack(u_int64_t deliveryTag, bool multiple){
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag));
if(i == unacknowledged.end()){
- //error: how should this be signalled?
- }else if(multiple){
+ throw InvalidAckException();
+ }else if(multiple){
unacknowledged.erase(unacknowledged.begin(), ++i);
+ //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?):
+ outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize();
}else{
- unacknowledged.erase(i);
+ outstandingSize -= i->msg->contentSize();
+ unacknowledged.erase(i);
+ }
+
+ //if the prefetch limit had previously been reached, there may
+ //be messages that can be now be delivered
+ for(consumer_iterator i = consumers.begin(); i != consumers.end(); i++){
+ i->second->requestDispatch();
}
}
void Channel::recover(bool requeue){
+ Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery
+
if(requeue){
- for_each(unacknowledged.begin(), unacknowledged.end(), Requeue());
- unacknowledged.clear();
+ outstandingSize = 0;
+ ack_iterator start(unacknowledged.begin());
+ ack_iterator end(unacknowledged.end());
+ for_each(start, end, Requeue());
+ unacknowledged.erase(start, end);
}else{
for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this));
}
@@ -175,3 +206,13 @@ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {}
void Channel::Redeliver::operator()(AckRecord& record) const{
record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize);
}
+
+Channel::AddSize::AddSize() : size(0){}
+
+void Channel::AddSize::operator()(AckRecord& record){
+ size += record.msg->contentSize();
+}
+
+u_int32_t Channel::AddSize::getSize(){
+ return size;
+}
diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp
index a4ae85e904..a44eeaab59 100644
--- a/cpp/broker/src/Message.cpp
+++ b/cpp/broker/src/Message.cpp
@@ -33,7 +33,8 @@ Message::Message(const ConnectionToken* const _publisher,
routingKey(_routingKey),
mandatory(_mandatory),
immediate(_immediate),
- redelivered(false){
+ redelivered(false),
+ size(0){
}
@@ -46,6 +47,7 @@ void Message::setHeader(AMQHeaderBody::shared_ptr header){
void Message::addContent(AMQContentBody::shared_ptr data){
content.push_back(data);
+ size += data->size();
}
bool Message::isComplete(){
@@ -78,14 +80,6 @@ BasicHeaderProperties* Message::getHeaderProperties(){
return dynamic_cast<BasicHeaderProperties*>(header->getProperties());
}
-u_int64_t Message::contentSize(){
- u_int64_t size(0);
- for(content_iterator i = content.begin(); i != content.end(); i++){
- size += (*i)->size();
- }
- return size;
-}
-
const ConnectionToken* const Message::getPublisher(){
return publisher;
}
diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp
index 63a42a7fd6..857730f3f7 100644
--- a/cpp/broker/src/SessionHandlerImpl.cpp
+++ b/cpp/broker/src/SessionHandlerImpl.cpp
@@ -385,7 +385,11 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t
void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){}
void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){
- parent->getChannel(channel)->ack(deliveryTag, multiple);
+ try{
+ parent->getChannel(channel)->ack(deliveryTag, multiple);
+ }catch(InvalidAckException& e){
+ throw ConnectionException(530, "Received ack for unrecognised delivery tag");
+ }
}
void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){}
diff --git a/cpp/broker/test/ChannelTest.cpp b/cpp/broker/test/ChannelTest.cpp
index 73a1f97b46..c96d17379e 100644
--- a/cpp/broker/test/ChannelTest.cpp
+++ b/cpp/broker/test/ChannelTest.cpp
@@ -24,23 +24,24 @@
#include <iostream>
#include <memory>
+using namespace std::tr1;
using namespace qpid::broker;
using namespace qpid::framing;
using namespace qpid::concurrent;
-struct MessageHolder{
+struct DummyRouter{
Message::shared_ptr last;
-};
-
-class DummyRouter{
- MessageHolder& holder;
-public:
- DummyRouter(MessageHolder& _holder) : holder(_holder){
+ void operator()(Message::shared_ptr& msg){
+ last = msg;
}
+};
- void operator()(Message::shared_ptr& msg){
- holder.last = msg;
+struct DummyHandler : OutputHandler{
+ std::vector<AMQFrame*> frames;
+
+ virtual void send(AMQFrame* frame){
+ frames.push_back(frame);
}
};
@@ -49,6 +50,8 @@ class ChannelTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ChannelTest);
CPPUNIT_TEST(testIncoming);
+ CPPUNIT_TEST(testConsumerMgmt);
+ CPPUNIT_TEST(testDeliveryNoAck);
CPPUNIT_TEST_SUITE_END();
public:
@@ -64,18 +67,99 @@ class ChannelTest : public CppUnit::TestCase
AMQContentBody::shared_ptr part1(new AMQContentBody(data1));
AMQContentBody::shared_ptr part2(new AMQContentBody(data2));
- MessageHolder holder;
- channel.handleHeader(header, DummyRouter(holder));
- CPPUNIT_ASSERT(!holder.last);
- channel.handleContent(part1, DummyRouter(holder));
- CPPUNIT_ASSERT(!holder.last);
- channel.handleContent(part2, DummyRouter(holder));
- CPPUNIT_ASSERT(holder.last);
- CPPUNIT_ASSERT_EQUAL(routingKey, holder.last->getRoutingKey());
+ CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last);
+ CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last);
+ DummyRouter router = channel.handleContent(part2, DummyRouter());
+ CPPUNIT_ASSERT(router.last);
+ CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey());
+ }
+
+ void testConsumerMgmt(){
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ Channel channel(0, 0, 0);
+ CPPUNIT_ASSERT(!channel.exists("my_consumer"));
+
+ ConnectionToken* owner;
+ string tag("my_consumer");
+ channel.consume(tag, queue, false, false, owner);
+ string tagA;
+ string tagB;
+ channel.consume(tagA, queue, false, false, owner);
+ channel.consume(tagB, queue, false, false, owner);
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount());
+ CPPUNIT_ASSERT(channel.exists("my_consumer"));
+ CPPUNIT_ASSERT(channel.exists(tagA));
+ CPPUNIT_ASSERT(channel.exists(tagB));
+ channel.cancel(tagA);
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount());
+ CPPUNIT_ASSERT(channel.exists("my_consumer"));
+ CPPUNIT_ASSERT(!channel.exists(tagA));
+ CPPUNIT_ASSERT(channel.exists(tagB));
+ channel.close();
+ CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount());
+ }
+
+ void testDeliveryNoAck(){
+ DummyHandler handler;
+ Channel channel(&handler, 7, 10000);
+
+ Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ msg->setHeader(header);
+ AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
+ msg->addContent(body);
+
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner;
+ string tag("no_ack");
+ channel.consume(tag, queue, false, false, owner);
+
+ queue->deliver(msg);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+ BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
+ AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
+ }
+
+ void testDeliveryAndRecovery(){
+ DummyHandler handler;
+ Channel channel(&handler, 7, 10000);
+
+ Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false));
+ AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
+ header->setContentSize(14);
+ msg->setHeader(header);
+ AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn"));
+ msg->addContent(body);
+
+ Queue::shared_ptr queue(new Queue("my_queue"));
+ ConnectionToken* owner;
+ string tag("ack");
+ channel.consume(tag, queue, true, false, owner);
+
+ queue->deliver(msg);
+ CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel());
+ CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel());
+ BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody()));
+ AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody()));
+ AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody()));
+ CPPUNIT_ASSERT(deliver);
+ CPPUNIT_ASSERT(contentHeader);
+ CPPUNIT_ASSERT(contentBody);
+ CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData());
}
};
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest);
-
diff --git a/cpp/common/framing/inc/AMQContentBody.h b/cpp/common/framing/inc/AMQContentBody.h
index 1a6f2cf117..daf7d6cd44 100644
--- a/cpp/common/framing/inc/AMQContentBody.h
+++ b/cpp/common/framing/inc/AMQContentBody.h
@@ -33,7 +33,7 @@ public:
typedef std::tr1::shared_ptr<AMQContentBody> shared_ptr;
AMQContentBody();
- AMQContentBody(string& data);
+ AMQContentBody(const string& data);
inline virtual ~AMQContentBody(){}
inline u_int8_t type() const { return CONTENT_BODY; };
inline string& getData(){ return data; }
diff --git a/cpp/common/framing/src/AMQContentBody.cpp b/cpp/common/framing/src/AMQContentBody.cpp
index a9ee190ba8..6bc588c3ab 100644
--- a/cpp/common/framing/src/AMQContentBody.cpp
+++ b/cpp/common/framing/src/AMQContentBody.cpp
@@ -21,7 +21,7 @@
qpid::framing::AMQContentBody::AMQContentBody(){
}
-qpid::framing::AMQContentBody::AMQContentBody(string& _data) : data(_data){
+qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){
}
u_int32_t qpid::framing::AMQContentBody::size() const{
diff --git a/python/tests/basic.py b/python/tests/basic.py
index 9ffcd11b95..50004614eb 100644
--- a/python/tests/basic.py
+++ b/python/tests/basic.py
@@ -113,7 +113,7 @@ class BasicTests(TestBase):
except Closed, e:
self.assertConnectionException(530, e.args[0])
- def test_basic_cancel(self):
+ def test_cancel(self):
"""
Test compliance of the basic.cancel method
"""
@@ -139,7 +139,7 @@ class BasicTests(TestBase):
channel.basic_cancel(consumer_tag="this-never-existed")
- def test_basic_ack(self):
+ def test_ack(self):
"""
Test basic ack/recover behaviour
"""
@@ -183,7 +183,7 @@ class BasicTests(TestBase):
self.fail("Got unexpected message: " + extra.content.body)
except Empty: None
- def test_basic_recover_requeue(self):
+ def test_recover_requeue(self):
"""
Test requeing on recovery
"""
@@ -238,3 +238,94 @@ class BasicTests(TestBase):
self.fail("Got unexpected message in original queue: " + extra.content.body)
except Empty: None
+
+ def test_qos_prefetch_count(self):
+ """
+ Test that the prefetch count specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 5:
+ channel.basic_qos(prefetch_count=5)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered:
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+
+ def test_qos_prefetch_size(self):
+ """
+ Test that the prefetch size specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+ channel.basic_qos(prefetch_size=50)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered (i.e. 45 bytes worth):
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #make sure that a single oversized message still gets delivered
+ large = "abcdefghijklmnopqrstuvwxyz"
+ large = large + "-" + large;
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
+ msg = queue.get(timeout=1)
+ self.assertEqual(large, msg.content.body)