diff options
author | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2006-10-10 10:06:36 +0000 |
commit | 3a0eda85d85b3185d3fec6dec6700c6ca1fe3818 (patch) | |
tree | d4671a2fbb8ad69a0cc734134b43b28152c3e5b4 /cpp/broker/src/Channel.cpp | |
parent | 14654e5360b72adf1704838b3820c7d1fc860e8e (diff) | |
download | qpid-python-3a0eda85d85b3185d3fec6dec6700c6ca1fe3818.tar.gz |
Implementation and tests for basic_qos (i.e. prefetching)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@454677 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/broker/src/Channel.cpp')
-rw-r--r-- | cpp/broker/src/Channel.cpp | 99 |
1 files changed, 70 insertions, 29 deletions
diff --git a/cpp/broker/src/Channel.cpp b/cpp/broker/src/Channel.cpp index ae99f4e7fa..f887e080a6 100644 --- a/cpp/broker/src/Channel.cpp +++ b/cpp/broker/src/Channel.cpp @@ -28,19 +28,18 @@ using namespace qpid::concurrent; Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : out(_out), id(_id), + prefetchCount(0), + prefetchSize(0), + outstandingSize(0), framesize(_framesize), transactional(false), deliveryTag(1), tagGenerator("sgen"){} Channel::~Channel(){ - for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - std::cout << "ERROR: Channel consumer appears not to have been cancelled before channel was destroyed." << std::endl; - delete (i->second); - } } -bool Channel::exists(string& consumerTag){ +bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } @@ -57,27 +56,26 @@ void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool excl } } -void Channel::cancel(string& tag){ +void Channel::cancel(consumer_iterator i){ + ConsumerImpl* c = i->second; + consumers.erase(i); + if(c){ + c->cancel(); + delete c; + } +} + +void Channel::cancel(const string& tag){ consumer_iterator i = consumers.find(tag); if(i != consumers.end()){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + cancel(i); } } void Channel::close(){ //cancel all consumers for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ - ConsumerImpl* c = i->second; - consumers.erase(i); - if(c){ - c->cancel(); - delete c; - } + cancel(i); } } @@ -99,33 +97,50 @@ void Channel::deliver(Message::shared_ptr& msg, string& consumerTag, Queue::shar u_int64_t myDeliveryTag = deliveryTag++; if(ackExpected){ unacknowledged.push_back(AckRecord(msg, queue, consumerTag, myDeliveryTag)); + outstandingSize += msg->contentSize(); } //send deliver method, header and content(s) msg->deliver(out, id, consumerTag, myDeliveryTag, framesize); } +bool Channel::checkPrefetch(Message::shared_ptr& msg){ + Locker locker(deliveryLock); + bool countOk = !prefetchCount || prefetchCount > unacknowledged.size(); + bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstandingSize || unacknowledged.empty(); + return countOk && sizeOk; +} + Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, string& _tag, Queue::shared_ptr _queue, ConnectionToken* const _connection, bool ack) : parent(_parent), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack){ + ackExpected(ack), + blocked(false){ } bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ - if(connection != msg->getPublisher()){ - parent->deliver(msg, tag, queue, ackExpected); - return true; - }else{ - return false; + if(connection != msg->getPublisher()){//check for no_local + if(ackExpected && !parent->checkPrefetch(msg)){ + blocked = true; + }else{ + blocked = false; + parent->deliver(msg, tag, queue, ackExpected); + return true; + } } + return false; } void Channel::ConsumerImpl::cancel(){ if(queue) queue->cancel(this); } +void Channel::ConsumerImpl::requestDispatch(){ + if(blocked) queue->dispatch(); +} + void Channel::checkMessage(const std::string& text){ if(!message.get()){ THROW_QPID_ERROR(PROTOCOL_ERROR + 504, text); @@ -140,20 +155,36 @@ void Channel::handlePublish(Message* msg){ } void Channel::ack(u_int64_t deliveryTag, bool multiple){ + Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + ack_iterator i = find_if(unacknowledged.begin(), unacknowledged.end(), MatchAck(deliveryTag)); if(i == unacknowledged.end()){ - //error: how should this be signalled? - }else if(multiple){ + throw InvalidAckException(); + }else if(multiple){ unacknowledged.erase(unacknowledged.begin(), ++i); + //recompute outstandingSize (might in some cases be quicker to add up removed size and subtract from total?): + outstandingSize = for_each(unacknowledged.begin(), unacknowledged.end(), AddSize()).getSize(); }else{ - unacknowledged.erase(i); + outstandingSize -= i->msg->contentSize(); + unacknowledged.erase(i); + } + + //if the prefetch limit had previously been reached, there may + //be messages that can be now be delivered + for(consumer_iterator i = consumers.begin(); i != consumers.end(); i++){ + i->second->requestDispatch(); } } void Channel::recover(bool requeue){ + Locker locker(deliveryLock);//need to synchronize with possible concurrent delivery + if(requeue){ - for_each(unacknowledged.begin(), unacknowledged.end(), Requeue()); - unacknowledged.clear(); + outstandingSize = 0; + ack_iterator start(unacknowledged.begin()); + ack_iterator end(unacknowledged.end()); + for_each(start, end, Requeue()); + unacknowledged.erase(start, end); }else{ for_each(unacknowledged.begin(), unacknowledged.end(), Redeliver(this)); } @@ -175,3 +206,13 @@ Channel::Redeliver::Redeliver(Channel* const _channel) : channel(_channel) {} void Channel::Redeliver::operator()(AckRecord& record) const{ record.msg->deliver(channel->out, channel->id, record.consumerTag, record.deliveryTag, channel->framesize); } + +Channel::AddSize::AddSize() : size(0){} + +void Channel::AddSize::operator()(AckRecord& record){ + size += record.msg->contentSize(); +} + +u_int32_t Channel::AddSize::getSize(){ + return size; +} |