summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/AsyncCompletion.h
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/AsyncCompletion.h')
-rw-r--r--qpid/cpp/src/qpid/broker/AsyncCompletion.h63
1 files changed, 16 insertions, 47 deletions
diff --git a/qpid/cpp/src/qpid/broker/AsyncCompletion.h b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
index fef994438f..1f3d11e0ee 100644
--- a/qpid/cpp/src/qpid/broker/AsyncCompletion.h
+++ b/qpid/cpp/src/qpid/broker/AsyncCompletion.h
@@ -22,8 +22,6 @@
*
*/
-#include <boost/intrusive_ptr.hpp>
-
#include "qpid/broker/BrokerImportExport.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Mutex.h"
@@ -79,22 +77,6 @@ namespace broker {
class AsyncCompletion
{
- public:
-
- /** Supplied by the Initiator to the end() method, allows for a callback
- * when all outstanding completers are done. If the callback cannot be
- * made during the end() call, the clone() method must supply a copy of
- * this callback object that persists after end() returns. The cloned
- * callback object will be used by the last completer thread, and
- * released when the callback returns.
- */
- class Callback : public RefCounted
- {
- public:
- virtual void completed(bool) = 0;
- virtual boost::intrusive_ptr<Callback> clone() = 0;
- };
-
private:
mutable qpid::sys::AtomicValue<uint32_t> completionsNeeded;
mutable qpid::sys::Monitor callbackLock;
@@ -103,17 +85,14 @@ class AsyncCompletion
void invokeCallback(bool sync) {
qpid::sys::Mutex::ScopedLock l(callbackLock);
if (active) {
- if (callback.get()) {
- inCallback = true;
- {
- qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
- callback->completed(sync);
- }
- inCallback = false;
- callback = boost::intrusive_ptr<Callback>();
- callbackLock.notifyAll();
+ inCallback = true;
+ {
+ qpid::sys::Mutex::ScopedUnlock ul(callbackLock);
+ completed(sync);
}
+ inCallback = false;
active = false;
+ callbackLock.notifyAll();
}
}
@@ -121,17 +100,17 @@ class AsyncCompletion
/** Invoked when all completers have signalled that they have completed
* (via calls to finishCompleter()). bool == true if called via end()
*/
- boost::intrusive_ptr<Callback> callback;
+ virtual void completed(bool) = 0;
public:
AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {};
virtual ~AsyncCompletion() { cancel(); }
-
/** True when all outstanding operations have compeleted
*/
bool isDone()
{
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
return !active;
}
@@ -156,32 +135,17 @@ class AsyncCompletion
*/
void begin()
{
+ qpid::sys::Mutex::ScopedLock l(callbackLock);
++completionsNeeded;
}
/** called by initiator after all potential completers have called
* startCompleter().
*/
- void end(Callback& cb)
+ void end()
{
assert(completionsNeeded.get() > 0); // ensure begin() has been called!
- // the following only "decrements" the count if it is 1. This means
- // there are no more outstanding completers and we are done.
- if (completionsNeeded.boolCompareAndSwap(1, 0)) {
- // done! Complete immediately
- cb.completed(true);
- return;
- }
-
- // the compare-and-swap did not succeed. This means there are
- // outstanding completers pending (count > 1). Get a persistent
- // Callback object to use when the last completer is done.
- // Decrement after setting up the callback ensures that pending
- // completers cannot touch the callback until it is ready.
- callback = cb.clone();
if (--completionsNeeded == 0) {
- // note that a completer may have completed during the
- // callback setup or decrement:
invokeCallback(true);
}
}
@@ -192,9 +156,14 @@ class AsyncCompletion
virtual void cancel() {
qpid::sys::Mutex::ScopedLock l(callbackLock);
while (inCallback) callbackLock.wait();
- callback = boost::intrusive_ptr<Callback>();
active = false;
}
+
+ /** may be called by Initiator after all completers have been added but
+ * prior to calling end(). Allows initiator to determine if it _really_
+ * needs to wait for pending Completers (e.g. count > 1).
+ */
+ //uint32_t getPendingCompleters() { return completionsNeeded.get(); }
};
}} // qpid::broker::