diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/AsyncCompletion.h')
-rw-r--r-- | qpid/cpp/src/qpid/broker/AsyncCompletion.h | 63 |
1 files changed, 16 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h index fef994438f..1f3d11e0ee 100644 --- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h +++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h @@ -22,8 +22,6 @@ * */ -#include <boost/intrusive_ptr.hpp> - #include "qpid/broker/BrokerImportExport.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" @@ -79,22 +77,6 @@ namespace broker { class AsyncCompletion { - public: - - /** Supplied by the Initiator to the end() method, allows for a callback - * when all outstanding completers are done. If the callback cannot be - * made during the end() call, the clone() method must supply a copy of - * this callback object that persists after end() returns. The cloned - * callback object will be used by the last completer thread, and - * released when the callback returns. - */ - class Callback : public RefCounted - { - public: - virtual void completed(bool) = 0; - virtual boost::intrusive_ptr<Callback> clone() = 0; - }; - private: mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded; mutable qpid::sys::Monitor callbackLock; @@ -103,17 +85,14 @@ class AsyncCompletion void invokeCallback(bool sync) { qpid::sys::Mutex::ScopedLock l(callbackLock); if (active) { - if (callback.get()) { - inCallback = true; - { - qpid::sys::Mutex::ScopedUnlock ul(callbackLock); - callback->completed(sync); - } - inCallback = false; - callback = boost::intrusive_ptr<Callback>(); - callbackLock.notifyAll(); + inCallback = true; + { + qpid::sys::Mutex::ScopedUnlock ul(callbackLock); + completed(sync); } + inCallback = false; active = false; + callbackLock.notifyAll(); } } @@ -121,17 +100,17 @@ class AsyncCompletion /** Invoked when all completers have signalled that they have completed * (via calls to finishCompleter()). bool == true if called via end() */ - boost::intrusive_ptr<Callback> callback; + virtual void completed(bool) = 0; public: AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {}; virtual ~AsyncCompletion() { cancel(); } - /** True when all outstanding operations have compeleted */ bool isDone() { + qpid::sys::Mutex::ScopedLock l(callbackLock); return !active; } @@ -156,32 +135,17 @@ class AsyncCompletion */ void begin() { + qpid::sys::Mutex::ScopedLock l(callbackLock); ++completionsNeeded; } /** called by initiator after all potential completers have called * startCompleter(). */ - void end(Callback& cb) + void end() { assert(completionsNeeded.get() > 0); // ensure begin() has been called! - // the following only "decrements" the count if it is 1. This means - // there are no more outstanding completers and we are done. - if (completionsNeeded.boolCompareAndSwap(1, 0)) { - // done! Complete immediately - cb.completed(true); - return; - } - - // the compare-and-swap did not succeed. This means there are - // outstanding completers pending (count > 1). Get a persistent - // Callback object to use when the last completer is done. - // Decrement after setting up the callback ensures that pending - // completers cannot touch the callback until it is ready. - callback = cb.clone(); if (--completionsNeeded == 0) { - // note that a completer may have completed during the - // callback setup or decrement: invokeCallback(true); } } @@ -192,9 +156,14 @@ class AsyncCompletion virtual void cancel() { qpid::sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); - callback = boost::intrusive_ptr<Callback>(); active = false; } + + /** may be called by Initiator after all completers have been added but + * prior to calling end(). Allows initiator to determine if it _really_ + * needs to wait for pending Completers (e.g. count > 1). + */ + //uint32_t getPendingCompleters() { return completionsNeeded.get(); } }; }} // qpid::broker:: |