summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp3
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.h2
-rw-r--r--cpp/src/qpid/broker/MessageHandlerImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp4
-rw-r--r--cpp/src/qpid/broker/SemanticState.h2
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.cpp7
-rw-r--r--cpp/src/qpid/broker/SessionAdapter.h1
7 files changed, 12 insertions, 9 deletions
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index c5bf325afc..4406eccc44 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -111,9 +111,10 @@ void DeliveryRecord::requeue() const
}
}
-void DeliveryRecord::release()
+void DeliveryRecord::release(bool setRedelivered)
{
if (acquired && !ended) {
+ if (setRedelivered) msg.payload->redeliver();
queue->requeue(msg);
acquired = false;
setEnded();
diff --git a/cpp/src/qpid/broker/DeliveryRecord.h b/cpp/src/qpid/broker/DeliveryRecord.h
index eed7b81748..7d08a4b1f0 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.h
+++ b/cpp/src/qpid/broker/DeliveryRecord.h
@@ -67,7 +67,7 @@ class DeliveryRecord{
void dequeue(TransactionContext* ctxt = 0) const;
void requeue() const;
- void release();
+ void release(bool setRedelivered);
void reject();
void cancel(const std::string& tag);
void redeliver(SemanticState* const);
diff --git a/cpp/src/qpid/broker/MessageHandlerImpl.cpp b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
index c26824a8e3..64c0282963 100644
--- a/cpp/src/qpid/broker/MessageHandlerImpl.cpp
+++ b/cpp/src/qpid/broker/MessageHandlerImpl.cpp
@@ -39,7 +39,7 @@ using namespace framing;
MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerImpl(s),
- releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2))
{}
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 6f5577de5a..2251901340 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -625,14 +625,14 @@ void SemanticState::acquire(DeliveryId first, DeliveryId last, DeliveryIds& acqu
for_each(range.start, range.end, AcquireFunctor(acquired));
}
-void SemanticState::release(DeliveryId first, DeliveryId last)
+void SemanticState::release(DeliveryId first, DeliveryId last, bool setRedelivered)
{
AckRange range = findRange(first, last);
//release results in the message being added to the head so want
//to release in reverse order to keep the original transfer order
DeliveryRecords::reverse_iterator start(range.end);
DeliveryRecords::reverse_iterator end(range.start);
- for_each(start, end, mem_fun_ref(&DeliveryRecord::release));
+ for_each(start, end, bind2nd(mem_fun_ref(&DeliveryRecord::release), setRedelivered));
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index 4bcd0dddb3..20a0239db0 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -178,7 +178,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void flow(bool active);
DeliveryId redeliver(QueuedMessage& msg, DeliveryToken::shared_ptr token);
void acquire(DeliveryId first, DeliveryId last, DeliveryIds& acquired);
- void release(DeliveryId first, DeliveryId last);
+ void release(DeliveryId first, DeliveryId last, bool setRedelivered);
void reject(DeliveryId first, DeliveryId last);
void handle(boost::intrusive_ptr<Message> msg);
bool doOutput() { return outputTasks.doOutput(); }
diff --git a/cpp/src/qpid/broker/SessionAdapter.cpp b/cpp/src/qpid/broker/SessionAdapter.cpp
index d30a1dc696..8093c7c174 100644
--- a/cpp/src/qpid/broker/SessionAdapter.cpp
+++ b/cpp/src/qpid/broker/SessionAdapter.cpp
@@ -298,7 +298,8 @@ void SessionAdapter::QueueHandlerImpl::delete_(const string& queue, bool ifUnuse
SessionAdapter::MessageHandlerImpl::MessageHandlerImpl(SemanticState& s) :
HandlerHelper(s),
- releaseOp(boost::bind(&SemanticState::release, &state, _1, _2)),
+ releaseRedeliveredOp(boost::bind(&SemanticState::release, &state, _1, _2, true)),
+ releaseOp(boost::bind(&SemanticState::release, &state, _1, _2, false)),
rejectOp(boost::bind(&SemanticState::reject, &state, _1, _2)),
acceptOp(boost::bind(&SemanticState::accepted, &state, _1, _2))
{}
@@ -314,9 +315,9 @@ void SessionAdapter::MessageHandlerImpl::transfer(const string& /*destination*/,
//not yet used (content containing assemblies treated differently at present
}
- void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool /*setRedelivered*/)
+void SessionAdapter::MessageHandlerImpl::release(const SequenceSet& transfers, bool setRedelivered)
{
- transfers.for_each(releaseOp);
+ transfers.for_each(setRedelivered ? releaseRedeliveredOp : releaseOp);
}
void
diff --git a/cpp/src/qpid/broker/SessionAdapter.h b/cpp/src/qpid/broker/SessionAdapter.h
index fc182e0bb6..a77f1b5d77 100644
--- a/cpp/src/qpid/broker/SessionAdapter.h
+++ b/cpp/src/qpid/broker/SessionAdapter.h
@@ -149,6 +149,7 @@ class Queue;
public HandlerHelper
{
typedef boost::function<void(DeliveryId, DeliveryId)> RangedOperation;
+ RangedOperation releaseRedeliveredOp;
RangedOperation releaseOp;
RangedOperation rejectOp;
RangedOperation acceptOp;