summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/SemanticState.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/SemanticState.cpp52
1 files changed, 28 insertions, 24 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp
index 0f670ded83..3530f3ef6f 100644
--- a/qpid/cpp/src/qpid/broker/SemanticState.cpp
+++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp
@@ -824,13 +824,14 @@ namespace {
pending.erase(i);
if (ready && pending.empty()) {
- framing::Invoker::Result r;
+ framing::Invoker::Result r; // message.accept does not return result data
Mutex::ScopedUnlock ul(lock);
completed( r );
}
}
- /** allow the Message.Accept to complete */
+ /** allow the Message.Accept to complete - do this only after all
+ * deliveryIds have been added() */
void enable()
{
Mutex::ScopedLock l(lock);
@@ -854,14 +855,14 @@ namespace {
boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
public:
DequeueDone( const DeliveryId & _id,
- boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
+ const boost::intrusive_ptr<AsyncMessageAcceptCmd>& _cmd )
: id(_id), cmd(_cmd) {}
void operator()() { cmd->complete( id ); }
};
/** factory to create the above callback - passed to queue's dequeue
- method, only called if dequeue is async! */
+ method, only used if dequeue is asynchronous! */
boost::shared_ptr<Queue::DequeueDoneCallback> factory( SemanticState *state,
const DeliveryId& id,
boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd )
@@ -873,10 +874,18 @@ namespace {
boost::shared_ptr<DequeueDone> x( new DequeueDone(id, cmd ) );
return x;
}
+
+ /** predicate to process unacked delivery records */
+ bool acceptDelivery( SemanticState *state,
+ boost::intrusive_ptr<AsyncMessageAcceptCmd>& cmd,
+ DeliveryRecord dr )
+ {
+ Queue::DequeueDoneCallbackFactory f = boost::bind(factory, state, dr.getId(), cmd);
+ return dr.accept((TransactionContext*) 0, &f);
+ }
}
void SemanticState::accepted(const SequenceSet& commands) {
- QPID_LOG(error, "SemanticState::accepted (" << commands << ")");
assertClusterSafe();
if (txBuffer.get()) {
//in transactional mode, don't dequeue or remove, just
@@ -900,26 +909,21 @@ void SemanticState::accepted(const SequenceSet& commands) {
unacked.erase(removed, unacked.end());
}
} else {
- /** @todo KAG - the following code removes the command from unacked
- even if the dequeue has not completed. note that the command will
- still not complete until all dequeues complete. I'm doing this to
- avoid having to lock the unacked list, which would be necessary if
- we remove when the dequeue completes. Is this ok? */
+ /** @todo KAG - the following code removes the message from unacked
+ list even if the dequeue has not yet completed. Note that the
+ entire command will still not complete until all dequeues
+ complete. I'm doing this to avoid having to lock the unacked list,
+ which would be necessary if we remove when the dequeue
+ completes. Is this ok? */
boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd;
- DeliveryRecords::iterator i;
- DeliveryRecords undone;
- for (i = unacked.begin(); i < unacked.end(); ++i) {
- if (i->coveredBy(&commands)) {
- Queue::DequeueDoneCallbackFactory f = boost::bind(factory, this, i->getId(), cmd);
- if (i->accept((TransactionContext*) 0, &f) == false) {
- undone.push_back(*i);
- }
- }
- }
- if (undone.empty())
- unacked.clear();
- else
- unacked.swap(undone);
+ DeliveryRecords::iterator removed =
+ remove_if(unacked.begin(), unacked.end(),
+ isInSequenceSetAnd(commands,
+ bind(acceptDelivery,
+ this,
+ cmd,
+ _1)));
+ unacked.erase(removed, unacked.end());
if (cmd) {
boost::intrusive_ptr<SessionContext::AsyncCommandContext> pcmd(boost::static_pointer_cast<SessionContext::AsyncCommandContext>(cmd));