diff options
Diffstat (limited to 'cpp/src/qpid/broker/DeliveryRecord.cpp')
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 152 |
1 files changed, 64 insertions, 88 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index 530dca99a4..22ec5e86a0 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -18,91 +18,72 @@ * under the License. * */ -#include "DeliveryRecord.h" -#include "DeliverableMessage.h" -#include "SemanticState.h" -#include "Exchange.h" +#include "qpid/broker/DeliveryRecord.h" +#include "qpid/broker/DeliverableMessage.h" +#include "qpid/broker/SemanticState.h" +#include "qpid/broker/Exchange.h" #include "qpid/log/Statement.h" +#include "qpid/framing/FrameHandler.h" +#include "qpid/framing/MessageTransferBody.h" +using namespace qpid; using namespace qpid::broker; using std::string; DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, - Queue::shared_ptr _queue, - const std::string _tag, - DeliveryToken::shared_ptr _token, - const DeliveryId _id, - bool _acquired, bool accepted) : msg(_msg), - queue(_queue), - tag(_tag), - token(_token), - id(_id), - acquired(_acquired), - pull(false), - cancelled(false), - credit(msg.payload ? msg.payload->getRequiredCredit() : 0), - size(msg.payload ? msg.payload->contentSize() : 0), - completed(false), - ended(accepted) -{ - if (accepted) setEnded(); -} - -DeliveryRecord::DeliveryRecord(const QueuedMessage& _msg, - Queue::shared_ptr _queue, - const DeliveryId _id) : msg(_msg), - queue(_queue), - id(_id), - acquired(true), - pull(true), - cancelled(false), - credit(msg.payload ? msg.payload->getRequiredCredit() : 0), - size(msg.payload ? msg.payload->contentSize() : 0), - completed(false), - ended(false) + const Queue::shared_ptr& _queue, + const std::string& _tag, + bool _acquired, + bool accepted, + bool _windowing, + uint32_t _credit) : msg(_msg), + queue(_queue), + tag(_tag), + acquired(_acquired), + acceptExpected(!accepted), + cancelled(false), + completed(false), + ended(accepted && acquired), + windowing(_windowing), + credit(msg.payload ? msg.payload->getRequiredCredit() : _credit) {} -void DeliveryRecord::setEnded() +bool DeliveryRecord::setEnded() { ended = true; //reset msg pointer, don't need to hold on to it anymore msg.payload = boost::intrusive_ptr<Message>(); - QPID_LOG(debug, "DeliveryRecord::setEnded() id=" << id); -} - -bool DeliveryRecord::matches(DeliveryId tag) const{ - return id == tag; -} - -bool DeliveryRecord::matchOrAfter(DeliveryId tag) const{ - return matches(tag) || after(tag); -} - -bool DeliveryRecord::after(DeliveryId tag) const{ - return id > tag; -} - -bool DeliveryRecord::coveredBy(const framing::SequenceSet* const range) const{ - return range->contains(id); + return isRedundant(); } void DeliveryRecord::redeliver(SemanticState* const session) { if (!ended) { - if(pull || cancelled){ - //if message was originally sent as response to get, we must requeue it - - //or if subscription was cancelled, requeue it (waiting for + if(cancelled){ + //if subscription was cancelled, requeue it (waiting for //final confirmation for AMQP WG on this case) - requeue(); }else{ msg.payload->redeliver();//mark as redelivered - id = session->redeliver(msg, token); + session->deliver(*this, false); } } } +void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, uint16_t framesize) +{ + id = deliveryId; + if (msg.payload->getRedelivered()){ + msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true); + } + + framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), tag, acceptExpected ? 0 : 1, acquired ? 0 : 1))); + method.setEof(false); + h.handle(method); + msg.payload->sendHeader(h, framesize); + msg.payload->sendContent(*queue, h, framesize); +} + void DeliveryRecord::requeue() const { if (acquired && !ended) { @@ -123,25 +104,29 @@ void DeliveryRecord::release(bool setRedelivered) } } -void DeliveryRecord::complete() -{ +void DeliveryRecord::complete() { completed = true; } -void DeliveryRecord::accept(TransactionContext* ctxt) { +bool DeliveryRecord::accept(TransactionContext* ctxt) { if (acquired && !ended) { - queue->dequeue(ctxt, msg.payload); + queue->dequeue(ctxt, msg); setEnded(); QPID_LOG(debug, "Accepted " << id); } + return isRedundant(); } void DeliveryRecord::dequeue(TransactionContext* ctxt) const{ if (acquired && !ended) { - queue->dequeue(ctxt, msg.payload); + queue->dequeue(ctxt, msg); } } +void DeliveryRecord::committed() const{ + queue->dequeueCommitted(msg); +} + void DeliveryRecord::reject() { Exchange::shared_ptr alternate = queue->getAlternateExchange(); @@ -161,29 +146,14 @@ uint32_t DeliveryRecord::getCredit() const return credit; } - -void DeliveryRecord::addTo(Prefetch& prefetch) const{ - if(!pull){ - //ignore 'pulled' messages (i.e. those that were sent in - //response to get) when calculating prefetch - prefetch.size += size; - prefetch.count++; - } -} - -void DeliveryRecord::subtractFrom(Prefetch& prefetch) const{ - if(!pull){ - //ignore 'pulled' messages (i.e. those that were sent in - //response to get) when calculating prefetch - prefetch.size -= size; - prefetch.count--; - } -} - void DeliveryRecord::acquire(DeliveryIds& results) { if (queue->acquire(msg)) { acquired = true; results.push_back(id); + if (!acceptExpected) { + if (ended) { QPID_LOG(error, "Can't dequeue ended message"); } + else { queue->dequeue(0, msg); setEnded(); } + } } else { QPID_LOG(info, "Message already acquired " << id.getValue()); } @@ -195,6 +165,16 @@ void DeliveryRecord::cancel(const std::string& cancelledTag) cancelled = true; } +AckRange DeliveryRecord::findRange(DeliveryRecords& records, DeliveryId first, DeliveryId last) +{ + DeliveryRecords::iterator start = lower_bound(records.begin(), records.end(), first); + // Find end - position it just after the last record in range + DeliveryRecords::iterator end = lower_bound(records.begin(), records.end(), last); + if (end != records.end() && end->getId() == last) ++end; + return AckRange(start, end); +} + + namespace qpid { namespace broker { @@ -206,9 +186,5 @@ std::ostream& operator<<(std::ostream& out, const DeliveryRecord& r) return out; } -bool operator<(const DeliveryRecord& a, const DeliveryRecord& b) -{ - return a.id < b.id; -} }} |