diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.cpp | 3 | ||||
-rw-r--r-- | cpp/src/qpid/broker/DeliveryRecord.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/MessageHandlerImpl.cpp | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.cpp | 4 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SemanticState.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.cpp | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/SessionAdapter.h | 1 |
7 files changed, 12 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp index c5bf325afc..4406eccc44 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.cpp +++ b/cpp/src/qpid/broker/DeliveryRecord.cpp @@ -111,9 +111,10 @@ void DeliveryRecord::requeue() const } } -void DeliveryRecord::release() +void DeliveryRecord::release(bool setRedelivered) { if (acquired && !ended) { + if (setRedelivered) msg.payload->redeliver(); queue->requeue(msg); acquired = false; setEnded(); diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h index eed7b81748..7d08a4b1f0 100644 --- a/cpp/src/qpid/broker/DeliveryRecord.h +++ b/cpp/src/qpid/broker/DeliveryRecord.h @@ -67,7 +67,7 @@ class DeliveryRecord{ void dequeue(TransactionContext* ctxt = 0) const; void requeue() const; - void release(); + void release(bool setRedelivered); void reject(); void cancel(const std::string& tag); void redeliver(SemanticState* const); diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp index c26824a8e3..64c0282963 100644 --- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp +++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp @@ -39,7 +39,7 @@ using namespace framing; MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerImpl(s), - releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), + releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)) {} diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp index 6f5577de5a..2251901340 100644 --- a/cpp/src/qpid/broker/SemanticState.cpp +++ b/cpp/src/qpid/broker/SemanticState.cpp @@ -625,14 +625,14 @@ void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acqu for_each(range.start, range.end, AcquireFunctor(acquired)); } -void SemanticState::release(DeliveryId first, DeliveryId last) +void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered) { AckRange range = findRange(first, last); //release results in the message being added to the head so want //to release in reverse order to keep the original transfer order DeliveryRecords::reverse_iterator start(range.end); DeliveryRecords::reverse_iterator end(range.start); - for_each(start, end, mem_fun_ref(&DeliveryRecord::release)); + for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::release), setRedelivered)); } void SemanticState::reject(DeliveryId first, DeliveryId last) diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h index 4bcd0dddb3..20a0239db0 100644 --- a/cpp/src/qpid/broker/SemanticState.h +++ b/cpp/src/qpid/broker/SemanticState.h @@ -178,7 +178,7 @@ class SemanticState : public framing::FrameHandler::Chains, void flow(bool active); DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token); void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired); - void release(DeliveryId first, DeliveryId last); + void release(DeliveryId first, DeliveryId last, bool setRedelivered); void reject(DeliveryId first, DeliveryId last); void handle(boost::intrusive_ptr<Message> msg); bool doOutput() { return outputTasks.doOutput(); } diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp index d30a1dc696..8093c7c174 100644 --- a/cpp/src/qpid/broker/SessionAdapter.cpp +++ b/cpp/src/qpid/broker/SessionAdapter.cpp @@ -298,7 +298,8 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) : HandlerHelper(s), - releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)), + releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)), + releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)), rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)), acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2)) {} @@ -314,9 +315,9 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/, //not yet used (content containing assemblies treated differently at present } - void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool /*setRedelivered*/) +void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered) { - transfers.for_each(releaseOp); + transfers.for_each(setRedelivered ? releaseRedeliveredOp : releaseOp); } void diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h index fc182e0bb6..a77f1b5d77 100644 --- a/cpp/src/qpid/broker/SessionAdapter.h +++ b/cpp/src/qpid/broker/SessionAdapter.h @@ -149,6 +149,7 @@ class Queue; public HandlerHelper { typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation; + RangedOperation releaseRedeliveredOp; RangedOperation releaseOp; RangedOperation rejectOp; RangedOperation acceptOp; |