diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/SemanticState.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 23 |
1 files changed, 8 insertions, 15 deletions
diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 96e8fe8b2d..2383978276 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -783,30 +783,24 @@ namespace { std::vector<boost::shared_ptr<Queue> > queues; // for flush() SemanticState& state; - /** completes this command. Note: may run in *any* thread */ - void complete() + public: + AsyncMessageAcceptCmd(SemanticState& _state) + : pending(1), state(_state) {} + + /** signal a dequeue is done. Note: may be run in *any* thread */ + void dequeueDone() { Mutex::ScopedLock l(lock); assert(pending); if (--pending == 0) { framing::Invoker::Result r; // message.accept does not return result data + queues.clear(); Mutex::ScopedUnlock ul(lock); QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); completed( r ); } } - public: - AsyncMessageAcceptCmd(SemanticState& _state) - : pending(1), state(_state) {} - - /** signal this dequeue done. Note: may be run in *any* thread */ - static void dequeueDone( boost::intrusive_ptr<RefCounted>& ctxt ) - { - boost::intrusive_ptr<AsyncMessageAcceptCmd> cmd(boost::static_pointer_cast<AsyncMessageAcceptCmd>(ctxt)); - cmd->complete(); - } - /** called from session to urge pending dequeues to complete ASAP, done as a result of an execution.sync */ void flush() @@ -894,8 +888,7 @@ void SemanticState::accepted(const SequenceSet& commands) { if (async) { if (!cmd) cmd = boost::intrusive_ptr<AsyncMessageAcceptCmd>(new AsyncMessageAcceptCmd(*this)); cmd->add(i->getQueue()); - boost::intrusive_ptr<qpid::RefCounted> rc(boost::static_pointer_cast<RefCounted>(cmd)); - async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc); + async->registerCallback(boost::bind(&AsyncMessageAcceptCmd::dequeueDone, cmd)); } if (i->isRedundant()) i = unacked.erase(i); |