diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
commit | 3a0eda85d85b3185d3fec6dec6700c6ca1fe3818 (patch) | |
tree | d4671a2fbb8ad69a0cc734134b43b28152c3e5b4 | |
parent | 14654e5360b72adf1704838b3820c7d1fc860e8e (diff) | |
download | qpid-python-3a0eda85d85b3185d3fec6dec6700c6ca1fe3818.tar.gz |
Implementation and tests for basic_qos (i.e. prefetching)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@454677 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | cpp/broker/inc/Channel.h | 30 | ||||
-rw-r--r-- | cpp/broker/inc/Message.h | 5 | ||||
-rw-r--r-- | cpp/broker/src/Channel.cpp | 99 | ||||
-rw-r--r-- | cpp/broker/src/Message.cpp | 12 | ||||
-rw-r--r-- | cpp/broker/src/SessionHandlerImpl.cpp | 6 | ||||
-rw-r--r-- | cpp/broker/test/ChannelTest.cpp | 120 | ||||
-rw-r--r-- | cpp/common/framing/inc/AMQContentBody.h | 2 | ||||
-rw-r--r-- | cpp/common/framing/src/AMQContentBody.cpp | 2 | ||||
-rw-r--r-- | python/tests/basic.py | 97 |
9 files changed, 302 insertions, 71 deletions
diff --git a/cpp/broker/inc/Channel.h b/cpp/broker/inc/Channel.h index e76c8a63e9..4f4d8e2890 100644 --- a/cpp/broker/inc/Channel.h +++ b/cpp/broker/inc/Channel.h @@ -45,10 +45,12 @@ namespace qpid { Queue::shared_ptr queue; ConnectionToken* const connection; const bool ackExpected; + bool blocked; public: ConsumerImpl(Channel* parent, string& tag, Queue::shared_ptr queue, ConnectionToken* const connection, bool ack); virtual bool deliver(Message::shared_ptr& msg); void cancel(); + void requestDispatch(); }; typedef std::map<string,ConsumerImpl*>::iterator consumer_iterator; @@ -87,6 +89,14 @@ namespace qpid { void operator()(AckRecord& record) const; }; + class AddSize{ + u_int32_t size; + public: + AddSize(); + void operator()(AckRecord& record); + u_int32_t getSize(); + }; + const int id; qpid::framing::OutputHandler* out; u_int64_t deliveryTag; @@ -95,6 +105,7 @@ namespace qpid { std::map<string, ConsumerImpl*> consumers; u_int32_t prefetchSize; u_int16_t prefetchCount; + u_int32_t outstandingSize; u_int32_t framesize; Message::shared_ptr message; NameGenerator tagGenerator; @@ -103,12 +114,15 @@ namespace qpid { void deliver(Message::shared_ptr& msg, string& tag, Queue::shared_ptr& queue, bool ackExpected); void checkMessage(const std::string& text); + bool checkPrefetch(Message::shared_ptr& msg); + void cancel(consumer_iterator consumer); - template<class Operation> void processMessage(Operation route){ + template<class Operation> Operation processMessage(Operation route){ if(message->isComplete()){ route(message); message.reset(); } + return route; } @@ -119,9 +133,9 @@ namespace qpid { inline Queue::shared_ptr getDefaultQueue(){ return defaultQueue; } inline u_int32_t setPrefetchSize(u_int32_t size){ prefetchSize = size; } inline u_int16_t setPrefetchCount(u_int16_t count){ prefetchCount = count; } - bool exists(string& consumerTag); + bool exists(const string& consumerTag); void consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection = 0); - void cancel(string& tag); + void cancel(const string& tag); void begin(); void close(); void commit(); @@ -142,10 +156,10 @@ namespace qpid { * there is no content routes it using the functor passed * in. */ - template<class Operation> void handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ + template<class Operation> Operation handleHeader(qpid::framing::AMQHeaderBody::shared_ptr header, Operation route){ checkMessage("Invalid message sequence: got header before publish."); message->setHeader(header); - processMessage(route); + return processMessage(route); } /** @@ -153,13 +167,15 @@ namespace qpid { * if this completes the message, routes it using the * functor passed in. */ - template<class Operation> void handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ + template<class Operation> Operation handleContent(qpid::framing::AMQContentBody::shared_ptr content, Operation route){ checkMessage("Invalid message sequence: got content before publish."); message->addContent(content); - processMessage(route); + return processMessage(route); } }; + + struct InvalidAckException{}; } } diff --git a/cpp/broker/inc/Message.h b/cpp/broker/inc/Message.h index 8b3321c2dc..7b2c2bc848 100644 --- a/cpp/broker/inc/Message.h +++ b/cpp/broker/inc/Message.h @@ -47,8 +47,7 @@ namespace qpid { bool redelivered; qpid::framing::AMQHeaderBody::shared_ptr header; content_list content; - - u_int64_t contentSize(); + u_int64_t size; public: typedef std::tr1::shared_ptr<Message> shared_ptr; @@ -70,6 +69,8 @@ namespace qpid { qpid::framing::BasicHeaderProperties* getHeaderProperties(); const string& getRoutingKey() const { return routingKey; } const string& getExchange() const { return exchange; } + u_int64_t contentSize() const{ return size; } + }; } } diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index ae99f4e7fa..f887e080a6 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -28,19 +28,18 @@ using namespace qpid::concurrent; Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out), id(_id), + prefetchCount(0), + prefetchSize(0), + outstandingSize(0), framesize(_framesize), transactional(false), deliveryTag(1), tagGenerator("sgen"){} Channel::~Channel(){ - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl; - delete (i->second); - } } -bool Channel::exists(string& consumerTag){ +bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } @@ -57,27 +56,26 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl } } -void Channel::cancel(string& tag){ +void Channel::cancel(consumer_iterator i){ + ConsumerImpl* c = i->second; + consumers.erase(i); + if(c){ + c->cancel(); + delete c; + } +} + +void Channel::cancel(const string& tag){ consumer_iterator i = consumers.find(tag); if(i != consumers.end()){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + cancel(i); } } void Channel::close(){ //cancel all consumers for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + cancel(i); } } @@ -99,33 +97,50 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar u_int64_t myDeliveryTag = deliveryTag++; if(ackExpected){ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); + outstandingSize += msg->contentSize(); } //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); } +bool Channel::checkPrefetch(Message::shared_ptr& msg){ + Locker locker(deliveryLock); + bool countOk = !prefetchCount || prefetchCount > unacknowledged.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty(); + return countOk && sizeOk; +} + Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack){ + ackExpected(ack), + blocked(false){ } bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ - if(connection != msg->getPublisher()){ - parent->deliver(msg, tag, queue, ackExpected); - return true; - }else{ - return false; + if(connection != msg->getPublisher()){//check for no_local + if(ackExpected && !parent->checkPrefetch(msg)){ + blocked = true; + }else{ + blocked = false; + parent->deliver(msg, tag, queue, ackExpected); + return true; + } } + return false; } void Channel::ConsumerImpl::cancel(){ if(queue) queue->cancel(this); } +void Channel::ConsumerImpl::requestDispatch(){ + if(blocked) queue->dispatch(); +} + void Channel::checkMessage(const std::string& text){ if(!message.get()){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text); @@ -140,20 +155,36 @@ void Channel::handlePublish(Message* msg){ } void Channel::ack(u_int64_t deliveryTag, bool multiple){ + Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag)); if(i == unacknowledged.end()){ - //error: how should this be signalled? - }else if(multiple){ + throw InvalidAckException(); + }else if(multiple){ unacknowledged.erase(unacknowledged.begin(), ++i); + //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?): + outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize(); }else{ - unacknowledged.erase(i); + outstandingSize -= i->msg->contentSize(); + unacknowledged.erase(i); + } + + //if the prefetch limit had previously been reached, there may + //be messages that can be now be delivered + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i++){ + i->second->requestDispatch(); } } void Channel::recover(bool requeue){ + Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + if(requeue){ - for_each(unacknowledged.begin(), unacknowledged.end(), Requeue()); - unacknowledged.clear(); + outstandingSize = 0; + ack_iterator start(unacknowledged.begin()); + ack_iterator end(unacknowledged.end()); + for_each(start, end, Requeue()); + unacknowledged.erase(start, end); }else{ for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); } @@ -175,3 +206,13 @@ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} void Channel::Redeliver::operator()(AckRecord& record) const{ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); } + +Channel::AddSize::AddSize() : size(0){} + +void Channel::AddSize::operator()(AckRecord& record){ + size += record.msg->contentSize(); +} + +u_int32_t Channel::AddSize::getSize(){ + return size; +} diff --git a/cpp/broker/src/Message.cpp b/cpp/broker/src/Message.cpp index a4ae85e904..a44eeaab59 100644 --- a/cpp/broker/src/Message.cpp +++ b/cpp/broker/src/Message.cpp @@ -33,7 +33,8 @@ Message::Message(const ConnectionToken* const _publisher, routingKey(_routingKey), mandatory(_mandatory), immediate(_immediate), - redelivered(false){ + redelivered(false), + size(0){ } @@ -46,6 +47,7 @@ void Message::setHeader(AMQHeaderBody::shared_ptr header){ void Message::addContent(AMQContentBody::shared_ptr data){ content.push_back(data); + size += data->size(); } bool Message::isComplete(){ @@ -78,14 +80,6 @@ BasicHeaderProperties* Message::getHeaderProperties(){ return dynamic_cast<BasicHeaderProperties*>(header->getProperties()); } -u_int64_t Message::contentSize(){ - u_int64_t size(0); - for(content_iterator i = content.begin(); i != content.end(); i++){ - size += (*i)->size(); - } - return size; -} - const ConnectionToken* const Message::getPublisher(){ return publisher; } diff --git a/cpp/broker/src/SessionHandlerImpl.cpp b/cpp/broker/src/SessionHandlerImpl.cpp index 63a42a7fd6..857730f3f7 100644 --- a/cpp/broker/src/SessionHandlerImpl.cpp +++ b/cpp/broker/src/SessionHandlerImpl.cpp @@ -385,7 +385,11 @@ void SessionHandlerImpl::BasicHandlerImpl::publish(u_int16_t channel, u_int16_t void SessionHandlerImpl::BasicHandlerImpl::get(u_int16_t channel, u_int16_t ticket, string& queue, bool noAck){} void SessionHandlerImpl::BasicHandlerImpl::ack(u_int16_t channel, u_int64_t deliveryTag, bool multiple){ - parent->getChannel(channel)->ack(deliveryTag, multiple); + try{ + parent->getChannel(channel)->ack(deliveryTag, multiple); + }catch(InvalidAckException& e){ + throw ConnectionException(530, "Received ack for unrecognised delivery tag"); + } } void SessionHandlerImpl::BasicHandlerImpl::reject(u_int16_t channel, u_int64_t deliveryTag, bool requeue){} diff --git a/cpp/broker/test/ChannelTest.cpp b/cpp/broker/test/ChannelTest.cpp index 73a1f97b46..c96d17379e 100644 --- a/cpp/broker/test/ChannelTest.cpp +++ b/cpp/broker/test/ChannelTest.cpp @@ -24,23 +24,24 @@ #include <iostream> #include <memory> +using namespace std::tr1; using namespace qpid::broker; using namespace qpid::framing; using namespace qpid::concurrent; -struct MessageHolder{ +struct DummyRouter{ Message::shared_ptr last; -}; - -class DummyRouter{ - MessageHolder& holder; -public: - DummyRouter(MessageHolder& _holder) : holder(_holder){ + void operator()(Message::shared_ptr& msg){ + last = msg; } +}; - void operator()(Message::shared_ptr& msg){ - holder.last = msg; +struct DummyHandler : OutputHandler{ + std::vector<AMQFrame*> frames; + + virtual void send(AMQFrame* frame){ + frames.push_back(frame); } }; @@ -49,6 +50,8 @@ class ChannelTest : public CppUnit::TestCase { CPPUNIT_TEST_SUITE(ChannelTest); CPPUNIT_TEST(testIncoming); + CPPUNIT_TEST(testConsumerMgmt); + CPPUNIT_TEST(testDeliveryNoAck); CPPUNIT_TEST_SUITE_END(); public: @@ -64,18 +67,99 @@ class ChannelTest : public CppUnit::TestCase AMQContentBody::shared_ptr part1(new AMQContentBody(data1)); AMQContentBody::shared_ptr part2(new AMQContentBody(data2)); - MessageHolder holder; - channel.handleHeader(header, DummyRouter(holder)); - CPPUNIT_ASSERT(!holder.last); - channel.handleContent(part1, DummyRouter(holder)); - CPPUNIT_ASSERT(!holder.last); - channel.handleContent(part2, DummyRouter(holder)); - CPPUNIT_ASSERT(holder.last); - CPPUNIT_ASSERT_EQUAL(routingKey, holder.last->getRoutingKey()); + CPPUNIT_ASSERT(!channel.handleHeader(header, DummyRouter()).last); + CPPUNIT_ASSERT(!channel.handleContent(part1, DummyRouter()).last); + DummyRouter router = channel.handleContent(part2, DummyRouter()); + CPPUNIT_ASSERT(router.last); + CPPUNIT_ASSERT_EQUAL(routingKey, router.last->getRoutingKey()); + } + + void testConsumerMgmt(){ + Queue::shared_ptr queue(new Queue("my_queue")); + Channel channel(0, 0, 0); + CPPUNIT_ASSERT(!channel.exists("my_consumer")); + + ConnectionToken* owner; + string tag("my_consumer"); + channel.consume(tag, queue, false, false, owner); + string tagA; + string tagB; + channel.consume(tagA, queue, false, false, owner); + channel.consume(tagB, queue, false, false, owner); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 3, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.cancel(tagA); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 2, queue->getConsumerCount()); + CPPUNIT_ASSERT(channel.exists("my_consumer")); + CPPUNIT_ASSERT(!channel.exists(tagA)); + CPPUNIT_ASSERT(channel.exists(tagB)); + channel.close(); + CPPUNIT_ASSERT_EQUAL((u_int32_t) 0, queue->getConsumerCount()); + } + + void testDeliveryNoAck(){ + DummyHandler handler; + Channel channel(&handler, 7, 10000); + + Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + msg->setHeader(header); + AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); + msg->addContent(body); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner; + string tag("no_ack"); + channel.consume(tag, queue, false, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); + } + + void testDeliveryAndRecovery(){ + DummyHandler handler; + Channel channel(&handler, 7, 10000); + + Message::shared_ptr msg(new Message(0, "test", "my_routing_key", false, false)); + AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); + header->setContentSize(14); + msg->setHeader(header); + AMQContentBody::shared_ptr body(new AMQContentBody("abcdefghijklmn")); + msg->addContent(body); + + Queue::shared_ptr queue(new Queue("my_queue")); + ConnectionToken* owner; + string tag("ack"); + channel.consume(tag, queue, true, false, owner); + + queue->deliver(msg); + CPPUNIT_ASSERT_EQUAL((size_t) 3, handler.frames.size()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[0]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[1]->getChannel()); + CPPUNIT_ASSERT_EQUAL((u_int16_t) 7, handler.frames[2]->getChannel()); + BasicDeliverBody::shared_ptr deliver(dynamic_pointer_cast<BasicDeliverBody, AMQBody>(handler.frames[0]->getBody())); + AMQHeaderBody::shared_ptr contentHeader(dynamic_pointer_cast<AMQHeaderBody, AMQBody>(handler.frames[1]->getBody())); + AMQContentBody::shared_ptr contentBody(dynamic_pointer_cast<AMQContentBody, AMQBody>(handler.frames[2]->getBody())); + CPPUNIT_ASSERT(deliver); + CPPUNIT_ASSERT(contentHeader); + CPPUNIT_ASSERT(contentBody); + CPPUNIT_ASSERT_EQUAL(string("abcdefghijklmn"), contentBody->getData()); } }; // Make this test suite a plugin. CPPUNIT_PLUGIN_IMPLEMENT(); CPPUNIT_TEST_SUITE_REGISTRATION(ChannelTest); - diff --git a/cpp/common/framing/inc/AMQContentBody.h b/cpp/common/framing/inc/AMQContentBody.h index 1a6f2cf117..daf7d6cd44 100644 --- a/cpp/common/framing/inc/AMQContentBody.h +++ b/cpp/common/framing/inc/AMQContentBody.h @@ -33,7 +33,7 @@ public: typedef std::tr1::shared_ptr<AMQContentBody> shared_ptr; AMQContentBody(); - AMQContentBody(string& data); + AMQContentBody(const string& data); inline virtual ~AMQContentBody(){} inline u_int8_t type() const { return CONTENT_BODY; }; inline string& getData(){ return data; } diff --git a/cpp/common/framing/src/AMQContentBody.cpp b/cpp/common/framing/src/AMQContentBody.cpp index a9ee190ba8..6bc588c3ab 100644 --- a/cpp/common/framing/src/AMQContentBody.cpp +++ b/cpp/common/framing/src/AMQContentBody.cpp @@ -21,7 +21,7 @@ qpid::framing::AMQContentBody::AMQContentBody(){ } -qpid::framing::AMQContentBody::AMQContentBody(string& _data) : data(_data){ +qpid::framing::AMQContentBody::AMQContentBody(const string& _data) : data(_data){ } u_int32_t qpid::framing::AMQContentBody::size() const{ diff --git a/python/tests/basic.py b/python/tests/basic.py index 9ffcd11b95..50004614eb 100644 --- a/python/tests/basic.py +++ b/python/tests/basic.py @@ -113,7 +113,7 @@ class BasicTests(TestBase): except Closed, e: self.assertConnectionException(530, e.args[0]) - def test_basic_cancel(self): + def test_cancel(self): """ Test compliance of the basic.cancel method """ @@ -139,7 +139,7 @@ class BasicTests(TestBase): channel.basic_cancel(consumer_tag="this-never-existed") - def test_basic_ack(self): + def test_ack(self): """ Test basic ack/recover behaviour """ @@ -183,7 +183,7 @@ class BasicTests(TestBase): self.fail("Got unexpected message: " + extra.content.body) except Empty: None - def test_basic_recover_requeue(self): + def test_recover_requeue(self): """ Test requeing on recovery """ @@ -238,3 +238,94 @@ class BasicTests(TestBase): self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None + + def test_qos_prefetch_count(self): + """ + Test that the prefetch count specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-count", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 5: + channel.basic_qos(prefetch_count=5) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered: + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + + + def test_qos_prefetch_size(self): + """ + Test that the prefetch size specified is honoured + """ + #setup: declare queue and subscribe + channel = self.channel + channel.queue_declare(queue="test-prefetch-size", exclusive=True) + subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False) + queue = self.client.queue(subscription.consumer_tag) + + #set prefetch to 50 bytes (each message is 9 or 10 bytes): + channel.basic_qos(prefetch_size=50) + + #publish 10 messages: + for i in range(1, 11): + channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i)) + + #only 5 messages should have been delivered (i.e. 45 bytes worth): + for i in range(1, 6): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 6th message in original queue: " + extra.content.body) + except Empty: None + + #ack messages and check that the next set arrive ok: + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + for i in range(6, 11): + msg = queue.get(timeout=1) + self.assertEqual("Message %d" % i, msg.content.body) + + channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) + + try: + extra = queue.get(timeout=1) + self.fail("Got unexpected 11th message in original queue: " + extra.content.body) + except Empty: None + + #make sure that a single oversized message still gets delivered + large = "abcdefghijklmnopqrstuvwxyz" + large = large + "-" + large; + channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) + msg = queue.get(timeout=1) + self.assertEqual(large, msg.content.body) |