diff options
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r-- | cpp/src/qpid/broker/BrokerChannel.cpp | 242 |
1 files changed, 181 insertions, 61 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index c50fbd5559..6e577ab354 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -53,7 +53,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS id(_id), connection(con), out(_out), - currentDeliveryTag(1), prefetchSize(0), prefetchCount(0), tagGenerator("sgen"), @@ -75,17 +74,13 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -// TODO aconway 2007-02-12: Why is connection token passed in instead -// of using the channel's parent connection? void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool acks, - bool exclusive, ConnectionToken* const connection, - const FieldTable*) + Queue::shared_ptr queue, bool nolocal, bool acks, + bool exclusive, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, token, tagInOut, queue, connection, acks)); + std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -210,7 +205,7 @@ void Channel::checkDtxTimeout() void Channel::record(const DeliveryRecord& delivery) { unacked.push_back(delivery); - delivery.addTo(&outstanding); + delivery.addTo(outstanding); } bool Channel::checkPrefetch(Message::shared_ptr& msg) @@ -221,33 +216,61 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg) return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token, - const string& _tag, Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack - ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, + DeliveryToken::shared_ptr _token, + const string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _nolocal + ) : parent(_parent), + token(_token), + name(_name), + queue(_queue), + ackExpected(ack), + nolocal(_nolocal), + blocked(false), + windowing(true), + msgCredit(0xFFFFFFFF), + byteCredit(0xFFFFFFFF) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg) { - if(!connection || connection != msg->getPublisher()){//check for no_local - if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ + if (nolocal && &(parent->connection) == msg->getPublisher()) { + return false; + } else { + if (!checkCredit(msg) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))) { blocked = true; - }else{ + } else { blocked = false; + Mutex::ScopedLock locker(parent->deliveryLock); - uint64_t deliveryTag = parent->out.deliver(msg, token); - if(ackExpected){ - parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); + DeliveryId deliveryTag = parent->out.deliver(msg, token); + if (ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, deliveryTag)); } + } + return !blocked; + } +} - return true; +bool Channel::ConsumerImpl::checkCredit(Message::shared_ptr& msg) +{ + Mutex::ScopedLock l(lock); + if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { + return false; + } else { + if (msgCredit != 0xFFFFFFFF) { + msgCredit--; + } + if (byteCredit != 0xFFFFFFFF) { + byteCredit -= msg->getRequiredCredit(); } + return true; } - return false; } -void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { +void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { Mutex::ScopedLock locker(parent->deliveryLock); parent->out.redeliver(msg, token, deliveryTag); } @@ -326,55 +349,71 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { } -// Used by Basic -void Channel::ack(uint64_t deliveryTag, bool multiple) +void Channel::ackCumulative(DeliveryId id) { - if (multiple) - ack(0, deliveryTag); - else - ack(deliveryTag, deliveryTag); + ack(id, id, true); } -void Channel::ack(uint64_t firstTag, uint64_t lastTag) +void Channel::ackRange(DeliveryId first, DeliveryId last) { + ack(first, last, false); +} + +void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative) +{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator start = cumulative ? unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; + + if (cumulative || first != last) { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } else { + //just acked single element (move end past it) + ++end; + } + + for_each(start, end, boost::bind(&Channel::acknowledged, this, _1)); + if (txBuffer.get()) { - accumulatedAck.update(firstTag, lastTag); - //TODO: I think the outstanding prefetch size & count should be updated at this point... - //TODO: ...this may then necessitate dispatching to consumers + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); } - } else { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(start, end); + } - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); - ack_iterator j = (firstTag == 0) ? - unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); - - if(i == unacked.end()){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - }else if(i!=j){ - ack_iterator end = ++i; - for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); - unacked.erase(unacked.begin(), end); - - //recalculate the prefetch: - outstanding.reset(); - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); - }else{ - i->discard(); - i->subtractFrom(&outstanding); - unacked.erase(i); - } + //if the prefetch limit had previously been reached, or credit + //had expired in windowing mode there may be messages that can + //be now be delivered + for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); +} + +void Channel::acknowledged(const DeliveryRecord& delivery) +{ + delivery.subtractFrom(outstanding); + ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); + if (i != consumers.end()) { + i->acknowledged(delivery); + } +} - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), - boost::bind(&ConsumerImpl::requestDispatch, _1)); +void Channel::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +{ + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); } } @@ -384,6 +423,8 @@ void Channel::recover(bool requeue) if(requeue){ outstanding.reset(); + //take copy and clear unacked as requeue may result in redelivery to this channel + //which will in turn result in additions to unacked std::list<DeliveryRecord> copy = unacked; unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); @@ -397,7 +438,7 @@ bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = out.deliver(msg, token); + DeliveryId myDeliveryTag = out.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -408,7 +449,7 @@ bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, - uint64_t deliveryTag) + DeliveryId deliveryTag) { ConsumerImplMap::iterator i = consumers.find(consumerTag); if (i != consumers.end()){ @@ -426,3 +467,82 @@ void Channel::flow(bool active) std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); } } + + +Channel::ConsumerImpl& Channel::find(const std::string& destination) +{ + ConsumerImplMap::iterator i = consumers.find(destination); + if (i == consumers.end()) { + throw ChannelException(404, boost::format("Unknown destination %1%") % destination); + } else { + return *i; + } +} + +void Channel::setWindowMode(const std::string& destination) +{ + find(destination).setWindowMode(); +} + +void Channel::setCreditMode(const std::string& destination) +{ + find(destination).setCreditMode(); +} + +void Channel::addByteCredit(const std::string& destination, uint32_t value) +{ + find(destination).addByteCredit(value); +} + + +void Channel::addMessageCredit(const std::string& destination, uint32_t value) +{ + find(destination).addMessageCredit(value); +} + +void Channel::flush(const std::string& destination) +{ + ConsumerImpl& c = find(destination); + c.flush(); +} + + +void Channel::stop(const std::string& destination) +{ + find(destination).stop(); +} + +void Channel::ConsumerImpl::setWindowMode() +{ + windowing = true; +} + +void Channel::ConsumerImpl::setCreditMode() +{ + windowing = false; +} + +void Channel::ConsumerImpl::addByteCredit(uint32_t value) +{ + byteCredit += value; + requestDispatch(); +} + +void Channel::ConsumerImpl::addMessageCredit(uint32_t value) +{ + msgCredit += value; + requestDispatch(); +} + +void Channel::ConsumerImpl::flush() +{ + //TODO: need to wait until any messages that are available for + //this consumer have been delivered... i.e. some sort of flush on + //the queue... +} + +void Channel::ConsumerImpl::stop() +{ + msgCredit = 0; + byteCredit = 0; +} |