diff options
27 files changed, 1010 insertions, 239 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index e3580521c2..a47f5dfcbb 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -280,6 +280,7 @@ nobase_include_HEADERS = \ qpid/broker/Deliverable.h \ qpid/broker/DeliverableMessage.h \ qpid/broker/DeliveryAdapter.h \ + qpid/broker/DeliveryId.h \ qpid/broker/DeliveryToken.h \ qpid/broker/DirectExchange.h \ qpid/broker/DtxAck.h \ diff --git a/cpp/src/qpid/broker/AccumulatedAck.cpp b/cpp/src/qpid/broker/AccumulatedAck.cpp index ff471b0287..b5d52ac3be 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.cpp +++ b/cpp/src/qpid/broker/AccumulatedAck.cpp @@ -21,37 +21,125 @@ #include "AccumulatedAck.h" #include <assert.h> +#include <iostream> -using std::less_equal; -using std::bind2nd; +using std::list; +using std::max; +using std::min; using namespace qpid::broker; -void AccumulatedAck::update(uint64_t firstTag, uint64_t lastTag){ - assert(firstTag<=lastTag); - if (firstTag <= range + 1) { - if (lastTag > range) range = lastTag; +void AccumulatedAck::update(DeliveryId first, DeliveryId last){ + assert(first <= last); + if (last < mark) return; + + + Range r(first, last); + bool handled = false; + list<Range>::iterator merged = ranges.end(); + if (r.mergeable(mark)) { + mark = r.end; + merged = ranges.begin(); + handled = true; + } else { + for (list<Range>::iterator i = ranges.begin(); i != ranges.end() && !handled; i++) { + if (i->merge(r)) { + merged = i; + handled = true; + } else if (r.start < i->start) { + ranges.insert(i, r); + handled = true; + } + } + } + if (!handled) { + ranges.push_back(r); } else { - for (uint64_t tag = firstTag; tag<=lastTag; tag++) - individual.push_back(tag); + while (!ranges.empty() && ranges.front().end <= mark) { + ranges.pop_front(); + } + //new range is incorporated, but may be possible to consolidate + if (merged == ranges.begin()) { + //consolidate mark + while (merged != ranges.end() && merged->mergeable(mark)) { + mark = merged->end; + merged = ranges.erase(merged); + } + } + if (merged != ranges.end()) { + //consolidate ranges + list<Range>::iterator i = merged; + list<Range>::iterator j = i++; + while (i != ranges.end() && j->merge(*i)) { + j = i++; + } + } } } -void AccumulatedAck::consolidate(){ - individual.sort(); - //remove any individual tags that are covered by range - individual.remove_if(bind2nd(less_equal<uint64_t>(), range)); - //update range if possible (using <= allows for duplicates from overlapping ranges) - while (individual.front() <= range + 1) { - range = individual.front(); - individual.pop_front(); +void AccumulatedAck::consolidate(){} + +void AccumulatedAck::clear(){ + mark = 0;//not sure that this is valid when wraparound is a possibility + ranges.clear(); +} + +bool AccumulatedAck::covers(DeliveryId tag) const{ + if (tag <= mark) return true; + for (list<Range>::const_iterator i = ranges.begin(); i != ranges.end(); i++) { + if (i->contains(tag)) return true; } + return false; } -void AccumulatedAck::clear(){ - range = 0; - individual.clear(); +bool Range::contains(DeliveryId i) const +{ + return i >= start && i <= end; } -bool AccumulatedAck::covers(uint64_t tag) const{ - return tag <= range || find(individual.begin(), individual.end(), tag) != individual.end(); +bool Range::intersect(const Range& r) const +{ + return r.contains(start) || r.contains(end) || contains(r.start) || contains(r.end); } + +bool Range::merge(const Range& r) +{ + if (intersect(r) || mergeable(r.end) || r.mergeable(end)) { + start = min(start, r.start); + end = max(end, r.end); + return true; + } else { + return false; + } +} + +bool Range::mergeable(const DeliveryId& s) const +{ + if (contains(s) || start - s == 1) { + return true; + } else { + return false; + } +} + +Range::Range(DeliveryId s, DeliveryId e) : start(s), end(e) {} + + +namespace qpid{ +namespace broker{ + std::ostream& operator<<(std::ostream& out, const Range& r) + { + out << "[" << r.start.getValue() << "-" << r.end.getValue() << "]"; + return out; + } + + std::ostream& operator<<(std::ostream& out, const AccumulatedAck& a) + { + out << "{mark: " << a.mark.getValue() << ", ranges: ("; + for (list<Range>::const_iterator i = a.ranges.begin(); i != a.ranges.end(); i++) { + if (i != a.ranges.begin()) out << ", "; + out << *i; + } + out << ")]"; + return out; + } +}} diff --git a/cpp/src/qpid/broker/AccumulatedAck.h b/cpp/src/qpid/broker/AccumulatedAck.h index c4a6e3b79b..be01c5e02c 100644 --- a/cpp/src/qpid/broker/AccumulatedAck.h +++ b/cpp/src/qpid/broker/AccumulatedAck.h @@ -24,9 +24,23 @@ #include <algorithm> #include <functional> #include <list> +#include <ostream> +#include "DeliveryId.h" namespace qpid { namespace broker { + + struct Range + { + DeliveryId start; + DeliveryId end; + + Range(DeliveryId s, DeliveryId e); + bool contains(DeliveryId i) const; + bool intersect(const Range& r) const; + bool merge(const Range& r); + bool mergeable(const DeliveryId& r) const; + }; /** * Keeps an accumulated record of acked messages (by delivery * tag). @@ -37,19 +51,21 @@ namespace qpid { * If not zero, then everything up to this value has been * acked. */ - uint64_t range; + DeliveryId mark; /** * List of individually acked messages that are not * included in the range marked by 'range'. */ - std::list<uint64_t> individual; + std::list<Range> ranges; - AccumulatedAck(uint64_t r) : range(r) {} - void update(uint64_t firstTag, uint64_t lastTag); + AccumulatedAck(DeliveryId r) : mark(r) {} + void update(DeliveryId firstTag, DeliveryId lastTag); void consolidate(); void clear(); - bool covers(uint64_t tag) const; + bool covers(DeliveryId tag) const; }; + std::ostream& operator<<(std::ostream&, const Range&); + std::ostream& operator<<(std::ostream&, const AccumulatedAck&); } } diff --git a/cpp/src/qpid/broker/BrokerAdapter.cpp b/cpp/src/qpid/broker/BrokerAdapter.cpp index 8edf448bc4..26c2b30ac6 100644 --- a/cpp/src/qpid/broker/BrokerAdapter.cpp +++ b/cpp/src/qpid/broker/BrokerAdapter.cpp @@ -325,7 +325,7 @@ void BrokerAdapter::BasicHandlerImpl::consume(uint16_t /*ticket*/, //also version specific behaviour now) if (newTag.empty()) newTag = tagGenerator.generate(); DeliveryToken::shared_ptr token(BasicMessage::createConsumeToken(newTag)); - channel.consume(token, newTag, queue, !noAck, exclusive, noLocal ? &connection : 0, &fields); + channel.consume(token, newTag, queue, noLocal, !noAck, exclusive, &fields); if(!nowait) client.consumeOk(newTag); @@ -365,7 +365,11 @@ void BrokerAdapter::BasicHandlerImpl::get(uint16_t /*ticket*/, const string& que } void BrokerAdapter::BasicHandlerImpl::ack(uint64_t deliveryTag, bool multiple){ - channel.ack(deliveryTag, multiple); + if (multiple) { + channel.ackCumulative(deliveryTag); + } else { + channel.ackRange(deliveryTag, deliveryTag); + } } void BrokerAdapter::BasicHandlerImpl::reject(uint64_t /*deliveryTag*/, bool /*requeue*/){} diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp index c50fbd5559..6e577ab354 100644 --- a/cpp/src/qpid/broker/BrokerChannel.cpp +++ b/cpp/src/qpid/broker/BrokerChannel.cpp @@ -53,7 +53,6 @@ Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageS id(_id), connection(con), out(_out), - currentDeliveryTag(1), prefetchSize(0), prefetchCount(0), tagGenerator("sgen"), @@ -75,17 +74,13 @@ bool Channel::exists(const string& consumerTag){ return consumers.find(consumerTag) != consumers.end(); } -// TODO aconway 2007-02-12: Why is connection token passed in instead -// of using the channel's parent connection? void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut, - Queue::shared_ptr queue, bool acks, - bool exclusive, ConnectionToken* const connection, - const FieldTable*) + Queue::shared_ptr queue, bool nolocal, bool acks, + bool exclusive, const FieldTable*) { if(tagInOut.empty()) tagInOut = tagGenerator.generate(); - std::auto_ptr<ConsumerImpl> c( - new ConsumerImpl(this, token, tagInOut, queue, connection, acks)); + std::auto_ptr<ConsumerImpl> c(new ConsumerImpl(this, token, tagInOut, queue, acks, nolocal)); queue->consume(c.get(), exclusive);//may throw exception consumers.insert(tagInOut, c.release()); } @@ -210,7 +205,7 @@ void Channel::checkDtxTimeout() void Channel::record(const DeliveryRecord& delivery) { unacked.push_back(delivery); - delivery.addTo(&outstanding); + delivery.addTo(outstanding); } bool Channel::checkPrefetch(Message::shared_ptr& msg) @@ -221,33 +216,61 @@ bool Channel::checkPrefetch(Message::shared_ptr& msg) return countOk && sizeOk; } -Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token, - const string& _tag, Queue::shared_ptr _queue, - ConnectionToken* const _connection, bool ack - ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection), - ackExpected(ack), blocked(false) {} +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, + DeliveryToken::shared_ptr _token, + const string& _name, + Queue::shared_ptr _queue, + bool ack, + bool _nolocal + ) : parent(_parent), + token(_token), + name(_name), + queue(_queue), + ackExpected(ack), + nolocal(_nolocal), + blocked(false), + windowing(true), + msgCredit(0xFFFFFFFF), + byteCredit(0xFFFFFFFF) {} bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg) { - if(!connection || connection != msg->getPublisher()){//check for no_local - if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){ + if (nolocal && &(parent->connection) == msg->getPublisher()) { + return false; + } else { + if (!checkCredit(msg) || !parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))) { blocked = true; - }else{ + } else { blocked = false; + Mutex::ScopedLock locker(parent->deliveryLock); - uint64_t deliveryTag = parent->out.deliver(msg, token); - if(ackExpected){ - parent->record(DeliveryRecord(msg, queue, tag, deliveryTag)); + DeliveryId deliveryTag = parent->out.deliver(msg, token); + if (ackExpected) { + parent->record(DeliveryRecord(msg, queue, name, deliveryTag)); } + } + return !blocked; + } +} - return true; +bool Channel::ConsumerImpl::checkCredit(Message::shared_ptr& msg) +{ + Mutex::ScopedLock l(lock); + if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) { + return false; + } else { + if (msgCredit != 0xFFFFFFFF) { + msgCredit--; + } + if (byteCredit != 0xFFFFFFFF) { + byteCredit -= msg->getRequiredCredit(); } + return true; } - return false; } -void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) { +void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag) { Mutex::ScopedLock locker(parent->deliveryLock); parent->out.redeliver(msg, token, deliveryTag); } @@ -326,55 +349,71 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) { } -// Used by Basic -void Channel::ack(uint64_t deliveryTag, bool multiple) +void Channel::ackCumulative(DeliveryId id) { - if (multiple) - ack(0, deliveryTag); - else - ack(deliveryTag, deliveryTag); + ack(id, id, true); } -void Channel::ack(uint64_t firstTag, uint64_t lastTag) +void Channel::ackRange(DeliveryId first, DeliveryId last) { + ack(first, last, false); +} + +void Channel::ack(DeliveryId first, DeliveryId last, bool cumulative) +{ + Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + + ack_iterator start = cumulative ? unacked.begin() : + find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matchOrAfter), first)); + ack_iterator end = start; + + if (cumulative || first != last) { + //need to find end (position it just after the last record in range) + end = find_if(start, unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::after), last)); + } else { + //just acked single element (move end past it) + ++end; + } + + for_each(start, end, boost::bind(&Channel::acknowledged, this, _1)); + if (txBuffer.get()) { - accumulatedAck.update(firstTag, lastTag); - //TODO: I think the outstanding prefetch size & count should be updated at this point... - //TODO: ...this may then necessitate dispatching to consumers + //in transactional mode, don't dequeue or remove, just + //maintain set of acknowledged messages: + accumulatedAck.update(cumulative ? accumulatedAck.mark : first, last); + if (dtxBuffer.get()) { + //if enlisted in a dtx, remove the relevant slice from + //unacked and record it against that transaction TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); accumulatedAck.clear(); dtxBuffer->enlist(txAck); } - } else { - Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::dequeue), 0)); + unacked.erase(start, end); + } - ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag)); - ack_iterator j = (firstTag == 0) ? - unacked.begin() : - find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), firstTag)); - - if(i == unacked.end()){ - throw ConnectionException(530, "Received ack for unrecognised delivery tag"); - }else if(i!=j){ - ack_iterator end = ++i; - for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); - unacked.erase(unacked.begin(), end); - - //recalculate the prefetch: - outstanding.reset(); - for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); - }else{ - i->discard(); - i->subtractFrom(&outstanding); - unacked.erase(i); - } + //if the prefetch limit had previously been reached, or credit + //had expired in windowing mode there may be messages that can + //be now be delivered + for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); +} + +void Channel::acknowledged(const DeliveryRecord& delivery) +{ + delivery.subtractFrom(outstanding); + ConsumerImplMap::iterator i = consumers.find(delivery.getConsumerTag()); + if (i != consumers.end()) { + i->acknowledged(delivery); + } +} - //if the prefetch limit had previously been reached, there may - //be messages that can be now be delivered - std::for_each(consumers.begin(), consumers.end(), - boost::bind(&ConsumerImpl::requestDispatch, _1)); +void Channel::ConsumerImpl::acknowledged(const DeliveryRecord& delivery) +{ + if (windowing) { + if (msgCredit != 0xFFFFFFFF) msgCredit++; + if (byteCredit != 0xFFFFFFFF) delivery.updateByteCredit(byteCredit); } } @@ -384,6 +423,8 @@ void Channel::recover(bool requeue) if(requeue){ outstanding.reset(); + //take copy and clear unacked as requeue may result in redelivery to this channel + //which will in turn result in additions to unacked std::list<DeliveryRecord> copy = unacked; unacked.clear(); for_each(copy.rbegin(), copy.rend(), mem_fun_ref(&DeliveryRecord::requeue)); @@ -397,7 +438,7 @@ bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool Message::shared_ptr msg = queue->dequeue(); if(msg){ Mutex::ScopedLock locker(deliveryLock); - uint64_t myDeliveryTag = out.deliver(msg, token); + DeliveryId myDeliveryTag = out.deliver(msg, token); if(ackExpected){ unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); } @@ -408,7 +449,7 @@ bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool } void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, - uint64_t deliveryTag) + DeliveryId deliveryTag) { ConsumerImplMap::iterator i = consumers.find(consumerTag); if (i != consumers.end()){ @@ -426,3 +467,82 @@ void Channel::flow(bool active) std::for_each(consumers.begin(), consumers.end(), boost::bind(&ConsumerImpl::requestDispatch, _1)); } } + + +Channel::ConsumerImpl& Channel::find(const std::string& destination) +{ + ConsumerImplMap::iterator i = consumers.find(destination); + if (i == consumers.end()) { + throw ChannelException(404, boost::format("Unknown destination %1%") % destination); + } else { + return *i; + } +} + +void Channel::setWindowMode(const std::string& destination) +{ + find(destination).setWindowMode(); +} + +void Channel::setCreditMode(const std::string& destination) +{ + find(destination).setCreditMode(); +} + +void Channel::addByteCredit(const std::string& destination, uint32_t value) +{ + find(destination).addByteCredit(value); +} + + +void Channel::addMessageCredit(const std::string& destination, uint32_t value) +{ + find(destination).addMessageCredit(value); +} + +void Channel::flush(const std::string& destination) +{ + ConsumerImpl& c = find(destination); + c.flush(); +} + + +void Channel::stop(const std::string& destination) +{ + find(destination).stop(); +} + +void Channel::ConsumerImpl::setWindowMode() +{ + windowing = true; +} + +void Channel::ConsumerImpl::setCreditMode() +{ + windowing = false; +} + +void Channel::ConsumerImpl::addByteCredit(uint32_t value) +{ + byteCredit += value; + requestDispatch(); +} + +void Channel::ConsumerImpl::addMessageCredit(uint32_t value) +{ + msgCredit += value; + requestDispatch(); +} + +void Channel::ConsumerImpl::flush() +{ + //TODO: need to wait until any messages that are available for + //this consumer have been delivered... i.e. some sort of flush on + //the queue... +} + +void Channel::ConsumerImpl::stop() +{ + msgCredit = 0; + byteCredit = 0; +} diff --git a/cpp/src/qpid/broker/BrokerChannel.h b/cpp/src/qpid/broker/BrokerChannel.h index e9672c96d7..1f4f6f35e7 100644 --- a/cpp/src/qpid/broker/BrokerChannel.h +++ b/cpp/src/qpid/broker/BrokerChannel.h @@ -64,23 +64,35 @@ class Channel : public CompletionHandler { class ConsumerImpl : public Consumer { - Channel* parent; - DeliveryToken::shared_ptr token; - const string tag; - Queue::shared_ptr queue; - ConnectionToken* const connection; + sys::Mutex lock; + Channel* const parent; + const DeliveryToken::shared_ptr token; + const string name; + const Queue::shared_ptr queue; const bool ackExpected; + const bool nolocal; bool blocked; + bool windowing; + uint32_t msgCredit; + uint32_t byteCredit; public: ConsumerImpl(Channel* parent, DeliveryToken::shared_ptr token, - const string& tag, Queue::shared_ptr queue, - ConnectionToken* const connection, bool ack); + const string& name, Queue::shared_ptr queue, bool ack, bool nolocal); ~ConsumerImpl(); bool deliver(Message::shared_ptr& msg); - void redeliver(Message::shared_ptr& msg, uint64_t deliveryTag); + void redeliver(Message::shared_ptr& msg, DeliveryId deliveryTag); void cancel(); void requestDispatch(); + + void setWindowMode(); + void setCreditMode(); + void addByteCredit(uint32_t value); + void addMessageCredit(uint32_t value); + void flush(); + void stop(); + bool checkCredit(Message::shared_ptr& msg); + void acknowledged(const DeliveryRecord&); }; typedef boost::ptr_map<string,ConsumerImpl> ConsumerImplMap; @@ -88,7 +100,6 @@ class Channel : public CompletionHandler framing::ChannelId id; Connection& connection; DeliveryAdapter& out; - uint64_t currentDeliveryTag; Queue::shared_ptr defaultQueue; ConsumerImplMap consumers; uint32_t prefetchSize; @@ -110,6 +121,9 @@ class Channel : public CompletionHandler void record(const DeliveryRecord& delivery); bool checkPrefetch(Message::shared_ptr& msg); void checkDtxTimeout(); + ConsumerImpl& find(const std::string& destination); + void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative); + void acknowledged(const DeliveryRecord&); public: Channel(Connection& parent, DeliveryAdapter& out, framing::ChannelId id, MessageStore* const store = 0); @@ -129,10 +143,17 @@ class Channel : public CompletionHandler /** *@param tagInOut - if empty it is updated with the generated token. */ - void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, bool acks, - bool exclusive, ConnectionToken* const connection = 0, - const framing::FieldTable* = 0); + void consume(DeliveryToken::shared_ptr token, string& tagInOut, Queue::shared_ptr queue, + bool nolocal, bool acks, bool exclusive, const framing::FieldTable* = 0); void cancel(const string& tag); + + void setWindowMode(const std::string& destination); + void setCreditMode(const std::string& destination); + void addByteCredit(const std::string& destination, uint32_t value); + void addMessageCredit(const std::string& destination, uint32_t value); + void flush(const std::string& destination); + void stop(const std::string& destination); + bool get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected); void close(); void startTx(); @@ -143,11 +164,11 @@ class Channel : public CompletionHandler void endDtx(const std::string& xid, bool fail); void suspendDtx(const std::string& xid); void resumeDtx(const std::string& xid); - void ack(uint64_t deliveryTag, bool multiple); - void ack(uint64_t deliveryTag, uint64_t endTag); + void ackCumulative(DeliveryId deliveryTag); + void ackRange(DeliveryId deliveryTag, DeliveryId endTag); void recover(bool requeue); void flow(bool active); - void deliver(Message::shared_ptr& msg, const string& consumerTag, uint64_t deliveryTag); + void deliver(Message::shared_ptr& msg, const string& consumerTag, DeliveryId deliveryTag); void handlePublish(Message* msg); void handleHeader(boost::shared_ptr<framing::AMQHeaderBody>); void handleContent(boost::shared_ptr<framing::AMQContentBody>); diff --git a/cpp/src/qpid/broker/BrokerMessage.cpp b/cpp/src/qpid/broker/BrokerMessage.cpp index bf0e37e8e3..244bee4a92 100644 --- a/cpp/src/qpid/broker/BrokerMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessage.cpp @@ -110,45 +110,43 @@ DeliveryToken::shared_ptr BasicMessage::createConsumeToken(const string& consume } void BasicMessage::deliver(ChannelAdapter& channel, - const string& consumerTag, uint64_t deliveryTag, + const string& consumerTag, DeliveryId id, uint32_t framesize) { channel.send(make_shared_ptr( new BasicDeliverBody( - channel.getVersion(), consumerTag, deliveryTag, + channel.getVersion(), consumerTag, id.getValue(), getRedelivered(), getExchange(), getRoutingKey()))); sendContent(channel, framesize); } void BasicMessage::sendGetOk(ChannelAdapter& channel, uint32_t messageCount, - uint64_t /*responseTo*/, - uint64_t deliveryTag, + DeliveryId id, uint32_t framesize) { channel.send(make_shared_ptr( new BasicGetOkBody( channel.getVersion(), - //responseTo, - deliveryTag, getRedelivered(), getExchange(), + id.getValue(), getRedelivered(), getExchange(), getRoutingKey(), messageCount))); sendContent(channel, framesize); } -void BasicMessage::deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize) +void BasicMessage::deliver(framing::ChannelAdapter& channel, DeliveryId id, DeliveryToken::shared_ptr token, uint32_t framesize) { BasicConsumeToken::shared_ptr consume = dynamic_pointer_cast<BasicConsumeToken>(token); if (consume) { - deliver(channel, consume->consumer, deliveryTag, framesize); + deliver(channel, consume->consumer, id, framesize); } else { BasicGetToken::shared_ptr get = dynamic_pointer_cast<BasicGetToken>(token); if (get) { - uint64_t request(1/*actual value doesn't affect anything at present*/); - sendGetOk(channel, get->queue->getMessageCount(), request, deliveryTag, framesize); + sendGetOk(channel, get->queue->getMessageCount(), id.getValue(), framesize); } else { //TODO: //either need to be able to convert to a message transfer or //throw error of some kind to allow this to be handled higher up + throw Exception("Conversion to BasicMessage not defined!"); } } } @@ -292,3 +290,9 @@ void BasicMessage::setContent(std::auto_ptr<Content>& _content) Mutex::ScopedLock locker(contentLock); content = _content; } + + +uint32_t BasicMessage::getRequiredCredit() const +{ + return header->size() + contentSize(); +} diff --git a/cpp/src/qpid/broker/BrokerMessage.h b/cpp/src/qpid/broker/BrokerMessage.h index e6483b4733..3f22f07aec 100644 --- a/cpp/src/qpid/broker/BrokerMessage.h +++ b/cpp/src/qpid/broker/BrokerMessage.h @@ -73,17 +73,16 @@ class BasicMessage : public Message { static DeliveryToken::shared_ptr createGetToken(boost::shared_ptr<Queue> queue); static DeliveryToken::shared_ptr createConsumeToken(const string& consumer); - void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); + void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); void deliver(framing::ChannelAdapter&, const string& consumerTag, - uint64_t deliveryTag, + DeliveryId deliveryTag, uint32_t framesize); void sendGetOk(framing::ChannelAdapter& channel, uint32_t messageCount, - uint64_t responseTo, - uint64_t deliveryTag, + DeliveryId deliveryTag, uint32_t framesize); framing::BasicHeaderProperties* getHeaderProperties(); @@ -132,6 +131,11 @@ class BasicMessage : public Message { * it uses). */ void setContent(std::auto_ptr<Content>& content); + + /** + * Returns the byte credits required to transfer this message. + */ + uint32_t getRequiredCredit() const; }; } diff --git a/cpp/src/qpid/broker/BrokerMessageBase.h b/cpp/src/qpid/broker/BrokerMessageBase.h index d9269fa94f..aeb34880eb 100644 --- a/cpp/src/qpid/broker/BrokerMessageBase.h +++ b/cpp/src/qpid/broker/BrokerMessageBase.h @@ -25,6 +25,7 @@ #include <string> #include <boost/shared_ptr.hpp> #include "Content.h" +#include "DeliveryId.h" #include "DeliveryToken.h" #include "PersistableMessage.h" #include "qpid/framing/amqp_types.h" @@ -92,7 +93,7 @@ class Message : public PersistableMessage{ void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } void redeliver() { redelivered = true; } - virtual void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag/*only needed for basic class*/, + virtual void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag/*only needed for basic class*/, DeliveryToken::shared_ptr token, uint32_t framesize) = 0; virtual bool isComplete() = 0; @@ -105,6 +106,8 @@ class Message : public PersistableMessage{ return publisher; } + virtual uint32_t getRequiredCredit() const = 0; + virtual void encode(framing::Buffer& buffer) const = 0; virtual void encodeHeader(framing::Buffer& buffer) const = 0; diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.cpp b/cpp/src/qpid/broker/BrokerMessageMessage.cpp index 8e8eaf23f0..9ad27093bb 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.cpp +++ b/cpp/src/qpid/broker/BrokerMessageMessage.cpp @@ -188,7 +188,7 @@ void MessageMessage::transferMessage( } -void MessageMessage::deliver(ChannelAdapter& channel, uint64_t, DeliveryToken::shared_ptr token, uint32_t framesize) +void MessageMessage::deliver(ChannelAdapter& channel, DeliveryId, DeliveryToken::shared_ptr token, uint32_t framesize) { transferMessage(channel, shared_polymorphic_cast<MessageDeliveryToken>(token)->destination, framesize); } @@ -320,6 +320,14 @@ MessageMessage::ReferencePtr MessageMessage::getReference() const { return reference; } +uint32_t MessageMessage::getRequiredCredit() const +{ + //TODO: change when encoding changes. Should be the payload of any + //header & body frames. + return transfer->size(); +} + + DeliveryToken::shared_ptr MessageMessage::getToken(const std::string& destination) { return DeliveryToken::shared_ptr(new MessageDeliveryToken(destination)); diff --git a/cpp/src/qpid/broker/BrokerMessageMessage.h b/cpp/src/qpid/broker/BrokerMessageMessage.h index 612f457ae4..4efd22c9fe 100644 --- a/cpp/src/qpid/broker/BrokerMessageMessage.h +++ b/cpp/src/qpid/broker/BrokerMessageMessage.h @@ -53,7 +53,7 @@ class MessageMessage: public Message{ TransferPtr getTransfer() const { return transfer; } ReferencePtr getReference() const ; - void deliver(framing::ChannelAdapter& channel, uint64_t deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); + void deliver(framing::ChannelAdapter& channel, DeliveryId deliveryTag, DeliveryToken::shared_ptr token, uint32_t framesize); void deliver(framing::ChannelAdapter&, const std::string& destination, uint32_t framesize); bool isComplete(); @@ -71,6 +71,7 @@ class MessageMessage: public Message{ uint64_t expectedContentSize(); void decodeHeader(framing::Buffer& buffer); void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); + uint32_t getRequiredCredit() const; static DeliveryToken::shared_ptr getToken(const std::string& destination); diff --git a/cpp/src/qpid/broker/DeliveryAdapter.h b/cpp/src/qpid/broker/DeliveryAdapter.h index 971f4095cf..d59c4769d7 100644 --- a/cpp/src/qpid/broker/DeliveryAdapter.h +++ b/cpp/src/qpid/broker/DeliveryAdapter.h @@ -22,13 +22,13 @@ #define _DeliveryAdapter_ #include "BrokerMessageBase.h" +#include "DeliveryId.h" #include "DeliveryToken.h" #include "qpid/framing/amqp_types.h" namespace qpid { namespace broker { - typedef framing::RequestId DeliveryId; /** * The intention behind this interface is to separate the generic * handling of some form of message delivery to clients that is diff --git a/cpp/src/qpid/broker/DeliveryId.h b/cpp/src/qpid/broker/DeliveryId.h new file mode 100644 index 0000000000..6cec05ed2c --- /dev/null +++ b/cpp/src/qpid/broker/DeliveryId.h @@ -0,0 +1,34 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DeliveryId_ +#define _DeliveryId_ + +#include "qpid/framing/SequenceNumber.h" + +namespace qpid { +namespace broker { + + typedef framing::SequenceNumber DeliveryId; + +}} + + +#endif diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 7e16adafda..43f85b9b6e 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -27,7 +27,7 @@ using std::string; DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, const string _consumerTag, - const uint64_t _deliveryTag) : msg(_msg), + const DeliveryId _deliveryTag) : msg(_msg), queue(_queue), consumerTag(_consumerTag), deliveryTag(_deliveryTag), @@ -35,21 +35,29 @@ DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, Queue::shared_ptr _queue, - const uint64_t _deliveryTag) : msg(_msg), + const DeliveryId _deliveryTag) : msg(_msg), queue(_queue), consumerTag(""), deliveryTag(_deliveryTag), pull(true){} -void DeliveryRecord::discard(TransactionContext* ctxt) const{ +void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ queue->dequeue(ctxt, msg); } -bool DeliveryRecord::matches(uint64_t tag) const{ +bool DeliveryRecord::matches(DeliveryId tag) const{ return deliveryTag == tag; } +bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{ + return matches(tag) || after(tag); +} + +bool DeliveryRecord::after(DeliveryId tag) const{ + return deliveryTag > tag; +} + bool DeliveryRecord::coveredBy(const AccumulatedAck* const range) const{ return range->covers(deliveryTag); } @@ -68,20 +76,38 @@ void DeliveryRecord::requeue() const{ queue->requeue(msg); } -void DeliveryRecord::addTo(Prefetch* const prefetch) const{ +void DeliveryRecord::updateByteCredit(uint32_t& credit) const +{ + credit += msg->getRequiredCredit(); +} + + +void DeliveryRecord::addTo(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch->size += msg->contentSize(); - prefetch->count++; + prefetch.size += msg->contentSize(); + prefetch.count++; } } -void DeliveryRecord::subtractFrom(Prefetch* const prefetch) const{ +void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{ if(!pull){ //ignore 'pulled' messages (i.e. those that were sent in //response to get) when calculating prefetch - prefetch->size -= msg->contentSize(); - prefetch->count--; + prefetch.size -= msg->contentSize(); + prefetch.count--; } } + +namespace qpid { +namespace broker { + +std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) { + out << "{" << "id=" << r.deliveryTag.getValue(); + out << ", consumer=" << r.consumerTag; + out << ", queue=" << r.queue->getName() << "}"; + return out; +} + +}} diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index 9f73f940ff..745a246c78 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -23,10 +23,12 @@ #include <algorithm> #include <list> +#include <ostream> #include "AccumulatedAck.h" #include "BrokerMessage.h" #include "Prefetch.h" #include "BrokerQueue.h" +#include "DeliveryId.h" namespace qpid { namespace broker { @@ -39,20 +41,27 @@ namespace qpid { mutable Message::shared_ptr msg; mutable Queue::shared_ptr queue; const std::string consumerTag; - const uint64_t deliveryTag; + const DeliveryId deliveryTag; bool pull; public: - DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag); - DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag); + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const DeliveryId deliveryTag); + DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const DeliveryId deliveryTag); - void discard(TransactionContext* ctxt = 0) const; - bool matches(uint64_t tag) const; + void dequeue(TransactionContext* ctxt = 0) const; + bool matches(DeliveryId tag) const; + bool matchOrAfter(DeliveryId tag) const; + bool after(DeliveryId tag) const; bool coveredBy(const AccumulatedAck* const range) const; void requeue() const; void redeliver(Channel* const) const; - void addTo(Prefetch* const prefetch) const; - void subtractFrom(Prefetch* const prefetch) const; + void updateByteCredit(uint32_t& credit) const; + void addTo(Prefetch&) const; + void subtractFrom(Prefetch&) const; + const std::string& getConsumerTag() const { return consumerTag; } + bool isPull() const { return pull; } + + friend std::ostream& operator<<(std::ostream&, const DeliveryRecord&); }; typedef std::list<DeliveryRecord>::iterator ack_iterator; diff --git a/cpp/src/qpid/broker/DtxAck.cpp b/cpp/src/qpid/broker/DtxAck.cpp index a879abd9ab..badf3564e7 100644 --- a/cpp/src/qpid/broker/DtxAck.cpp +++ b/cpp/src/qpid/broker/DtxAck.cpp @@ -26,7 +26,7 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -DtxAck::DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked) +DtxAck::DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked) { remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, pending.end()), not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), &acked))); @@ -38,7 +38,7 @@ bool DtxAck::prepare(TransactionContext* ctxt) throw() try{ //record dequeue in the store for (ack_iterator i = pending.begin(); i != pending.end(); i++) { - i->discard(ctxt); + i->dequeue(ctxt); } return true; }catch(...){ diff --git a/cpp/src/qpid/broker/DtxAck.h b/cpp/src/qpid/broker/DtxAck.h index 9da9d2078a..84afd00c9c 100644 --- a/cpp/src/qpid/broker/DtxAck.h +++ b/cpp/src/qpid/broker/DtxAck.h @@ -34,7 +34,7 @@ namespace qpid { std::list<DeliveryRecord> pending; public: - DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); + DtxAck(const AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index da57439e21..c728a800ab 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -27,6 +27,8 @@ #include "qpid/framing/MessageTransferBody.h" #include "BrokerAdapter.h" +#include <boost/format.hpp> + namespace qpid { namespace broker { @@ -96,7 +98,7 @@ MessageHandlerImpl::consume(uint16_t /*ticket*/, if(!destination.empty() && channel.exists(destination)) throw ConnectionException(530, "Consumer tags must be unique"); string tag = destination; - channel.consume(MessageMessage::getToken(destination), tag, queue, !noAck, exclusive, noLocal ? &connection : 0, &filter); + channel.consume(MessageMessage::getToken(destination), tag, queue, noLocal, !noAck, exclusive, &filter); // Dispatch messages as there is now a consumer. queue->requestDispatch(); } @@ -130,7 +132,7 @@ MessageHandlerImpl::empty() void MessageHandlerImpl::ok() { - channel.ack(adapter.getFirstAckRequest(), adapter.getLastAckRequest()); + throw ConnectionException(540, "Message.Ok no longer supported"); } void @@ -171,4 +173,45 @@ MessageHandlerImpl::transfer(const framing::MethodContext& context) } + +void MessageHandlerImpl::flow(const std::string& destination, u_int8_t unit, u_int32_t value) +{ + + if (unit == 0) { + //message + channel.addMessageCredit(destination, value); + } else if (unit == 1) { + //bytes + channel.addByteCredit(destination, value); + } else { + //unknown + throw ConnectionException(502, boost::format("Invalid value for unit %1%") % unit); + } + +} + +void MessageHandlerImpl::flowMode(const std::string& destination, u_int8_t mode) +{ + if (mode == 0) { + //credit + channel.setCreditMode(destination); + } else if (mode == 1) { + //window + channel.setWindowMode(destination); + } else{ + throw ConnectionException(502, boost::format("Invalid value for mode %1%") % mode); + } +} + +void MessageHandlerImpl::flush(const std::string& destination) +{ + channel.flush(destination); +} + +void MessageHandlerImpl::stop(const std::string& destination) +{ + channel.stop(destination); +} + + }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.h b/cpp/src/qpid/broker/MessageHandlerImpl.h index 86c54023b5..b7e91795ec 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.h +++ b/cpp/src/qpid/broker/MessageHandlerImpl.h @@ -83,6 +83,15 @@ class MessageHandlerImpl : const std::string& identifier ); void transfer(const framing::MethodContext& context); + + void flow(const std::string& destination, u_int8_t unit, u_int32_t value); + + void flowMode(const std::string& destination, u_int8_t mode); + + void flush(const std::string& destination); + + void stop(const std::string& destination); + private: ReferenceRegistry references; }; diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp index 27f484cfcb..08f91bf69a 100644 --- a/cpp/src/qpid/broker/SemanticHandler.cpp +++ b/cpp/src/qpid/broker/SemanticHandler.cpp @@ -63,20 +63,24 @@ void SemanticHandler::handle(framing::AMQFrame& frame) void SemanticHandler::handleMethodInContext(boost::shared_ptr<qpid::framing::AMQMethodBody> method, const qpid::framing::MethodContext& context) { - if (!method->invoke(this)) { - //else do the usual: - handleL4(method, context); - //(if the frameset is complete) we can move the execution-mark - //forward - - //temporary hack until channel management is moved to its own handler: - if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { - ++(incoming.hwm); + try { + if (!method->invoke(this)) { + //else do the usual: + handleL4(method, context); + //(if the frameset is complete) we can move the execution-mark + //forward + + //temporary hack until channel management is moved to its own handler: + if (method->amqpClassId() != ChannelOpenBody::CLASS_ID) { + ++(incoming.hwm); + } + + //note: need to be more sophisticated than this if we execute + //commands that arrive within an active message frameset (that + //can't happen until 0-10 framing is implemented) } - - //note: need to be more sophisticated than this if we execute - //commands that arrive within an active message frameset (that - //can't happen until 0-10 framing is implemented) + }catch(const std::exception& e){ + connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } } @@ -87,15 +91,14 @@ void SemanticHandler::complete(uint32_t cumulative, SequenceNumberSet range) if (outgoing.lwm < mark) { outgoing.lwm = mark; //ack messages: - channel.ack(mark.getValue(), true); + channel.ackCumulative(mark.getValue()); //std::cout << "[" << this << "] acknowledged: " << mark << std::endl; } if (range.size() % 2) { //must be even number throw ConnectionException(530, "Received odd number of elements in ranged mark"); } else { - //TODO: need to keep a record of the full range previously acked for (SequenceNumberSet::iterator i = range.begin(); i != range.end(); i++) { - channel.ack((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); + channel.ackRange((uint64_t) i->getValue(), (uint64_t) (++i)->getValue()); } } } @@ -121,22 +124,16 @@ void SemanticHandler::handleL4(boost::shared_ptr<qpid::framing::AMQMethodBody> m throw ConnectionException(504, out.str()); } } else { - adapter->setResponseTo(context.getRequestId()); method->invoke(*adapter, context); - adapter->setResponseTo(0); } - }catch(ChannelException& e){ - adapter->setResponseTo(0); + }catch(const ChannelException& e){ adapter->getProxy().getChannel().close( e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); connection.closeChannel(getId()); - }catch(ConnectionException& e){ + }catch(const ConnectionException& e){ connection.close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - }catch(std::exception& e){ - connection.close(541/*internal error*/, e.what(), method->amqpClassId(), method->amqpMethodId()); } - } bool SemanticHandler::isOpen() const diff --git a/cpp/src/qpid/broker/TxAck.cpp b/cpp/src/qpid/broker/TxAck.cpp index b02e669f02..958dbcbec0 100644 --- a/cpp/src/qpid/broker/TxAck.cpp +++ b/cpp/src/qpid/broker/TxAck.cpp @@ -36,7 +36,7 @@ bool TxAck::prepare(TransactionContext* ctxt) throw(){ //dequeue all acked messages from their queues for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { if (i->coveredBy(&acked)) { - i->discard(ctxt); + i->dequeue(ctxt); } } return true; diff --git a/cpp/src/qpid/framing/SequenceNumber.cpp b/cpp/src/qpid/framing/SequenceNumber.cpp index ea1a34b1cf..24867130a2 100644 --- a/cpp/src/qpid/framing/SequenceNumber.cpp +++ b/cpp/src/qpid/framing/SequenceNumber.cpp @@ -51,6 +51,11 @@ const SequenceNumber SequenceNumber::operator++(int) return old; } +SequenceNumber SequenceNumber::operator+(uint32_t i) const +{ + return SequenceNumber(value + i); +} + bool SequenceNumber::operator<(const SequenceNumber& other) const { return (value - other.value) < 0; @@ -61,6 +66,16 @@ bool SequenceNumber::operator>(const SequenceNumber& other) const return other < *this; } +bool SequenceNumber::operator<=(const SequenceNumber& other) const +{ + return *this == other || *this < other; +} + +bool SequenceNumber::operator>=(const SequenceNumber& other) const +{ + return *this == other || *this > other; +} + namespace qpid { namespace framing { diff --git a/cpp/src/qpid/framing/SequenceNumber.h b/cpp/src/qpid/framing/SequenceNumber.h index 3e0dfea2af..9b8f0659b2 100644 --- a/cpp/src/qpid/framing/SequenceNumber.h +++ b/cpp/src/qpid/framing/SequenceNumber.h @@ -39,10 +39,13 @@ class SequenceNumber SequenceNumber& operator++();//prefix ++ const SequenceNumber operator++(int);//postfix ++ + SequenceNumber operator+(uint32_t) const; bool operator==(const SequenceNumber& other) const; bool operator!=(const SequenceNumber& other) const; bool operator<(const SequenceNumber& other) const; bool operator>(const SequenceNumber& other) const; + bool operator<=(const SequenceNumber& other) const; + bool operator>=(const SequenceNumber& other) const; uint32_t getValue() const { return (uint32_t) value; } friend int32_t operator-(const SequenceNumber& a, const SequenceNumber& b); diff --git a/cpp/src/tests/AccumulatedAckTest.cpp b/cpp/src/tests/AccumulatedAckTest.cpp index edd4b0d807..601af532fa 100644 --- a/cpp/src/tests/AccumulatedAckTest.cpp +++ b/cpp/src/tests/AccumulatedAckTest.cpp @@ -29,77 +29,182 @@ using namespace qpid::broker; class AccumulatedAckTest : public CppUnit::TestCase { - CPPUNIT_TEST_SUITE(AccumulatedAckTest); - CPPUNIT_TEST(testGeneral); - CPPUNIT_TEST(testCovers); - CPPUNIT_TEST(testUpdateAndConsolidate); - CPPUNIT_TEST_SUITE_END(); - - public: - void testGeneral() - { - AccumulatedAck ack(0); - ack.clear(); - ack.update(3,3); - ack.update(7,7); - ack.update(9,9); - ack.update(1,2); - ack.update(4,5); - ack.update(6,6); - - for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i)); - CPPUNIT_ASSERT(ack.covers(9)); - - CPPUNIT_ASSERT(!ack.covers(8)); - CPPUNIT_ASSERT(!ack.covers(10)); - - ack.consolidate(); - - for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(ack.covers(i)); - CPPUNIT_ASSERT(ack.covers(9)); - - CPPUNIT_ASSERT(!ack.covers(8)); - CPPUNIT_ASSERT(!ack.covers(10)); - } + CPPUNIT_TEST_SUITE(AccumulatedAckTest); + CPPUNIT_TEST(testGeneral); + CPPUNIT_TEST(testCovers); + CPPUNIT_TEST(testCase1); + CPPUNIT_TEST(testCase2); + CPPUNIT_TEST(testCase3); + CPPUNIT_TEST(testCase4); + CPPUNIT_TEST(testConsolidation1); + CPPUNIT_TEST(testConsolidation2); + CPPUNIT_TEST(testConsolidation3); + CPPUNIT_TEST_SUITE_END(); + +public: + bool covers(const AccumulatedAck& ack, int i) + { + return ack.covers(DeliveryId(i)); + } + + void update(AccumulatedAck& ack, int start, int end) + { + ack.update(DeliveryId(start), DeliveryId(end)); + } + + void testGeneral() + { + AccumulatedAck ack(0); + ack.clear(); + update(ack, 3,3); + update(ack, 7,7); + update(ack, 9,9); + update(ack, 1,2); + update(ack, 4,5); + update(ack, 6,6); + + for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(covers(ack, i)); + CPPUNIT_ASSERT(covers(ack, 9)); + + CPPUNIT_ASSERT(!covers(ack, 8)); + CPPUNIT_ASSERT(!covers(ack, 10)); + + ack.consolidate(); + + for(int i = 1; i <= 7; i++) CPPUNIT_ASSERT(covers(ack, i)); + CPPUNIT_ASSERT(covers(ack, 9)); + + CPPUNIT_ASSERT(!covers(ack, 8)); + CPPUNIT_ASSERT(!covers(ack, 10)); + } + + void testCovers() + { + AccumulatedAck ack(5); + update(ack, 7, 7); + update(ack, 9, 9); + + CPPUNIT_ASSERT(covers(ack, 1)); + CPPUNIT_ASSERT(covers(ack, 2)); + CPPUNIT_ASSERT(covers(ack, 3)); + CPPUNIT_ASSERT(covers(ack, 4)); + CPPUNIT_ASSERT(covers(ack, 5)); + CPPUNIT_ASSERT(covers(ack, 7)); + CPPUNIT_ASSERT(covers(ack, 9)); + + CPPUNIT_ASSERT(!covers(ack, 6)); + CPPUNIT_ASSERT(!covers(ack, 8)); + CPPUNIT_ASSERT(!covers(ack, 10)); + } + + void testCase1() + { + AccumulatedAck ack(3); + update(ack, 1,2); + for(int i = 1; i <= 3; i++) CPPUNIT_ASSERT(covers(ack, i)); + CPPUNIT_ASSERT(!covers(ack, 4)); + } + + void testCase2() + { + AccumulatedAck ack(3); + update(ack, 3,6); + for(int i = 1; i <= 6; i++) CPPUNIT_ASSERT(covers(ack, i)); + CPPUNIT_ASSERT(!covers(ack, 7)); + } - void testCovers() - { - AccumulatedAck ack(5); - ack.individual.push_back(7); - ack.individual.push_back(9); - - CPPUNIT_ASSERT(ack.covers(1)); - CPPUNIT_ASSERT(ack.covers(2)); - CPPUNIT_ASSERT(ack.covers(3)); - CPPUNIT_ASSERT(ack.covers(4)); - CPPUNIT_ASSERT(ack.covers(5)); - CPPUNIT_ASSERT(ack.covers(7)); - CPPUNIT_ASSERT(ack.covers(9)); - - CPPUNIT_ASSERT(!ack.covers(6)); - CPPUNIT_ASSERT(!ack.covers(8)); - CPPUNIT_ASSERT(!ack.covers(10)); + void testCase3() + { + AccumulatedAck ack(3); + update(ack, 4,6); + for(int i = 1; i <= 6; i++) { + CPPUNIT_ASSERT(covers(ack, i)); } + CPPUNIT_ASSERT(!covers(ack, 7)); + } - void testUpdateAndConsolidate() - { - AccumulatedAck ack(0); - ack.update(1, 1); - ack.update(3, 3); - ack.update(10, 10); - ack.update(8, 8); - ack.update(6, 6); - ack.update(3, 3); - ack.update(2, 2); - ack.update(0, 5); - ack.consolidate(); - CPPUNIT_ASSERT_EQUAL((uint64_t) 6, ack.range); - CPPUNIT_ASSERT_EQUAL((size_t) 2, ack.individual.size()); - list<uint64_t>::iterator i = ack.individual.begin(); - CPPUNIT_ASSERT_EQUAL((uint64_t) 8, *i); - i++; - CPPUNIT_ASSERT_EQUAL((uint64_t) 10, *i); + void testCase4() + { + AccumulatedAck ack(3); + update(ack, 5,6); + for(int i = 1; i <= 6; i++) { + if (i == 4) CPPUNIT_ASSERT(!covers(ack, i)); + else CPPUNIT_ASSERT(covers(ack, i)); } + CPPUNIT_ASSERT(!covers(ack, 7)); + } + + void testConsolidation1() + { + AccumulatedAck ack(3); + update(ack, 7,7); + CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + + update(ack, 8,9); + CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + + update(ack, 1,2); + CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + + update(ack, 4,5); + CPPUNIT_ASSERT_EQUAL((uint32_t) 5, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + + update(ack, 6,6); + CPPUNIT_ASSERT_EQUAL((uint32_t) 9, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size()); + + for(int i = 1; i <= 9; i++) CPPUNIT_ASSERT(covers(ack, i)); + CPPUNIT_ASSERT(!covers(ack, 10)); + } + + void testConsolidation2() + { + AccumulatedAck ack(0); + update(ack, 10,12); + CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + + update(ack, 7,9); + CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + CPPUNIT_ASSERT_EQUAL((uint32_t) 7, ack.ranges.front().start.getValue()); + CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue()); + + update(ack, 5,7); + CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + CPPUNIT_ASSERT_EQUAL((uint32_t) 5, ack.ranges.front().start.getValue()); + CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue()); + + update(ack, 3,4); + CPPUNIT_ASSERT_EQUAL((uint32_t) 0, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 1, ack.ranges.size()); + CPPUNIT_ASSERT_EQUAL((uint32_t) 3, ack.ranges.front().start.getValue()); + CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.ranges.front().end.getValue()); + + update(ack, 1,2); + CPPUNIT_ASSERT_EQUAL((uint32_t) 12, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size()); + + for(int i = 1; i <= 12; i++) CPPUNIT_ASSERT(covers(ack, i)); + CPPUNIT_ASSERT(!covers(ack, 13)); + } + + void testConsolidation3() + { + AccumulatedAck ack(0); + update(ack, 10,12); + update(ack, 6,7); + update(ack, 3,4); + update(ack, 1,15); + CPPUNIT_ASSERT_EQUAL((uint32_t) 15, ack.mark.getValue()); + CPPUNIT_ASSERT_EQUAL((size_t) 0, ack.ranges.size()); + } + }; // Make this test suite a plugin. diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp index df9fa89501..a5d9eb69a5 100644 --- a/cpp/src/tests/TxAckTest.cpp +++ b/cpp/src/tests/TxAckTest.cpp @@ -77,9 +77,9 @@ public: } //assume msgs 1-5, 7 and 9 are all acked (i.e. 6, 8 & 10 are not) - acked.range = 5; - acked.individual.push_back(7); - acked.individual.push_back(9); + acked.mark = 5; + acked.update(7, 7); + acked.update(9, 9); } void testPrepare() diff --git a/python/tests_0-10/message.py b/python/tests_0-10/message.py index 74e2b6416f..b882cd5438 100644 --- a/python/tests_0-10/message.py +++ b/python/tests_0-10/message.py @@ -645,7 +645,145 @@ class MessageTests(TestBase): self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz") self.assertEmpty(self.client.queue("consumer")) + + def test_credit_flow_messages(self): + """ + Test basic credit based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "Message %d" % i) + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + channel.message_flow(unit = 0, value = 1, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + def test_credit_flow_bytes(self): + """ + Test basic credit based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 0, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "abcdefgh") + + #each message is currently interpreted as requiring 75 bytes of credit + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = 75*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + #increase credit again and check more are received + for i in range(6, 11): + channel.message_flow(unit = 1, value = 75, destination = "c") + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + + def test_window_flow_messages(self): + """ + Test basic window based flow control with unit = message + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 1, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "Message %d" % i) + + #set message credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 0, value = 5, destination = "c") + #set infinite byte credit + channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + for i in range(1, 6): + msg = q.get(timeout = 1) + self.assertDataEquals(channel, msg, "Message %d" % i) + self.assertEmpty(q) + + #acknowledge messages and check more are received + msg.complete(cumulative=True) + for i in range(6, 11): + self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i) + self.assertEmpty(q) + + + def test_window_flow_bytes(self): + """ + Test basic window based flow control with unit = bytes + """ + #declare an exclusive queue + channel = self.channel + channel.queue_declare(queue = "q", exclusive=True) + #create consumer (for now that defaults to infinite credit) + channel.message_consume(queue = "q", destination = "c") + channel.message_flow_mode(mode = 1, destination = "c") + #set credit to zero (can remove this once move to proper default for subscribe method) + channel.message_stop(destination = "c") + #send batch of messages to queue + for i in range(1, 11): + channel.message_transfer(routing_key = "q", body = "abcdefgh") + + #each message is currently interpreted as requiring 75 bytes of credit + #set byte credit to finite amount (less than enough for all messages) + channel.message_flow(unit = 1, value = 75*5, destination = "c") + #set infinite message credit + channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c") + #check that expected number were received + q = self.client.queue("c") + msgs = [] + for i in range(1, 6): + msg = q.get(timeout = 1) + msgs.append(msg) + self.assertDataEquals(channel, msg, "abcdefgh") + self.assertEmpty(q) + + #ack each message individually and check more are received + for i in range(6, 11): + msg = msgs.pop() + msg.complete(cumulative=False) + self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh") + self.assertEmpty(q) + + def assertDataEquals(self, channel, msg, expected): if isinstance(msg.body, ReferenceId): diff --git a/specs/amqp-transitional.0-10.xml b/specs/amqp-transitional.0-10.xml index 53912d0c2e..42a784e163 100644 --- a/specs/amqp-transitional.0-10.xml +++ b/specs/amqp-transitional.0-10.xml @@ -6900,6 +6900,128 @@ </doc> </field> </method> + <!-- - Method: message.flow-mode - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="flow-mode" index="120" label="set the flow control mode"> + <doc> + Sets the mode of flow control used for a given destination. + + With credit based flow control, the sender of messages continually maintains its current + credit balance with the recipient. The credit balance consists of two values, a message + count, and a byte count. Whenever message data is sent, both counts must be decremented. + If either value reaches zero, the flow of message data must stop. Additional credit is + received via the message.flow method. + + The sender MUST NOT send partial framesets. This means that if there is not enough byte + credit available to send a complete message, the sender must either wait or use chunked + transfer to send the first part of the message data in a complete frameset. + + Window based flow control is identical to credit based flow control, however message + acknowledgment implicitly grants a single unit of message credit, and the size of the + message in byte credits for each acknowledged message. + </doc> + + <rule name="byte-accounting"> + <doc> + The byte count is decremented by the payload size of each transmitted frame with + segment type header or body appearing within a message.transfer command. Note that + the payload size is the frame size less the frame header size (frame-size - 12). + </doc> + </rule> + + <rule name="mode-switching"> + <doc> + Mode switching may only occur if both outstanding credit balances are zero. There are + three ways for a recipient of messages to be sure that the sender 's credit balance is + zero: + + 1) The recipient may send a message.stop command to the sender. When the recipient + receives confirmation of completion for the message.stop command, it knows that the + sender's credit is zero. + + 2) The recipient may perform the same steps described in (1) with the message.flush + command substituted for the message.stop command. + + 3) Immediately after receiving a message.consume, the credit for that destination + defaults to zero. + </doc> + </rule> + + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="destination" domain="destination" /> + <field name="mode" domain="octet"> + <doc> + One of: + - credit (0): choose credit based flow control + - window (1): choose window based flow control + </doc> + </field> + </method> + + <!-- - Method: message.flow - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="flow" index="130" label="control message flow"> + <doc> + This method controls the flow of message data to a given destination. It is used by the + recipient of messages to dynamically match the incoming rate of message flow to its + processing or forwarding capacity. Upon receipt of this method, the sender must add "value" + number of the specified unit to the available credit balance for the specified destination. + A value of (0xFFFFFFFF) indicates an infinite amount of credit. This disables any limit for + the given unit until the credit balance is zeroed with message.stop or message.flush. + </doc> + + <!-- throws no-such-destination --> + + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="destination" domain="destination"/> + <field name="unit" domain="octet"> + <doc> + Specifies the unit of credit balance. + + One of: + - message (0) + - byte (1) + </doc> + </field> + <field name="value" domain="long"> + <doc> + A value of (0xFFFFFFFF) indicates an infinite amount of credit. + </doc> + </field> + </method> + + <!-- - Method: message.flush - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="flush" index="140"> + <doc> + Forces the sender to exhaust his credit supply. The sender's credit will always be zero when + this method completes. The message does not complete until all the message transfers occur. + </doc> + + <chassis name="server" implement="MUST" /> + + <field name="destination" domain="destination" /> + </method> + + <!-- - Method: message.stop - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --> + + <method name="stop" index="150"> + <doc> + On receipt of this method, a producer of messages MUST set his credit to zero for the given + destination. This obeys the generic semantics of command completion, i.e. when confirmation + is issued credit MUST be zero and no further messages will be sent until such a time as + further credit is received. + </doc> + + <chassis name="server" implement="MUST" /> + <chassis name="client" implement="MUST" /> + + <field name="destination" domain="destination" /> + </method> </class> |