From 15c6e1552997bc618b01faac3a2ceabbeeba7946 Mon Sep 17 00:00:00 2001 From: Kenneth Anthony Giusti Date: Mon, 20 Jun 2011 18:04:05 +0000 Subject: QPID-3079: update with review feedback git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid-3079@1137726 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/src/qpid/broker/Queue.cpp | 6 ++---- qpid/cpp/src/qpid/broker/Queue.h | 13 +++---------- qpid/cpp/src/qpid/broker/SemanticState.cpp | 23 ++++++++--------------- 3 files changed, 13 insertions(+), 29 deletions(-) diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 04d61c1add..cf538aaaa7 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -1263,14 +1263,12 @@ void Queue::DequeueCompletion::dequeueDone() assert(completionsNeeded.get() > 0); if (--completionsNeeded == 0) { assert(cb); - (*cb)(ctxt); - ctxt.reset(); + cb(); } } -void Queue::DequeueCompletion::registerCallback( callback *f, boost::intrusive_ptr& _ctxt ) +void Queue::DequeueCompletion::registerCallback( boost::function f ) { cb = f; - ctxt = _ctxt; dequeueDone(); // invoke callback if dequeue already done. } diff --git a/qpid/cpp/src/qpid/broker/Queue.h b/qpid/cpp/src/qpid/broker/Queue.h index 3bcaf0f473..86c184676f 100644 --- a/qpid/cpp/src/qpid/broker/Queue.h +++ b/qpid/cpp/src/qpid/broker/Queue.h @@ -289,21 +289,14 @@ class Queue : public boost::enable_shared_from_this, class DequeueCompletion : public RefCounted { public: - typedef void callback( boost::intrusive_ptr& ctxt ); - DequeueCompletion() - : completionsNeeded(2), // one for register call, another for done call - cb(0) {} - + : completionsNeeded(2) {}// one for register call, another for done call void dequeueDone(); - void registerCallback( callback *f, boost::intrusive_ptr& _ctxt ); + void registerCallback(boost::function f); private: mutable qpid::sys::AtomicValue completionsNeeded; - callback *cb; - boost::intrusive_ptr ctxt; - friend class Queue; - + boost::function cb; }; QPID_BROKER_EXTERN boost::intrusive_ptr dequeue(TransactionContext* ctxt, const QueuedMessage& msg); diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 96e8fe8b2d..2383978276 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -783,30 +783,24 @@ namespace { std::vector > queues; // for flush() SemanticState& state; - /** completes this command. Note: may run in *any* thread */ - void complete() + public: + AsyncMessageAcceptCmd(SemanticState& _state) + : pending(1), state(_state) {} + + /** signal a dequeue is done. Note: may be run in *any* thread */ + void dequeueDone() { Mutex::ScopedLock l(lock); assert(pending); if (--pending == 0) { framing::Invoker::Result r; // message.accept does not return result data + queues.clear(); Mutex::ScopedUnlock ul(lock); QPID_LOG(trace, "Completing async message.accept cmd=" << getId()); completed( r ); } } - public: - AsyncMessageAcceptCmd(SemanticState& _state) - : pending(1), state(_state) {} - - /** signal this dequeue done. Note: may be run in *any* thread */ - static void dequeueDone( boost::intrusive_ptr& ctxt ) - { - boost::intrusive_ptr cmd(boost::static_pointer_cast(ctxt)); - cmd->complete(); - } - /** called from session to urge pending dequeues to complete ASAP, done as a result of an execution.sync */ void flush() @@ -894,8 +888,7 @@ void SemanticState::accepted(const SequenceSet& commands) { if (async) { if (!cmd) cmd = boost::intrusive_ptr(new AsyncMessageAcceptCmd(*this)); cmd->add(i->getQueue()); - boost::intrusive_ptr rc(boost::static_pointer_cast(cmd)); - async->registerCallback(&AsyncMessageAcceptCmd::dequeueDone, rc); + async->registerCallback(boost::bind(&AsyncMessageAcceptCmd::dequeueDone, cmd)); } if (i->isRedundant()) i = unacked.erase(i); -- cgit v1.2.1