#ifndef _AsyncCompletion_ #define _AsyncCompletion_ /* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include #include "qpid/broker/BrokerImportExport.h" #include "qpid/sys/AtomicValue.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Monitor.h" namespace qpid { namespace broker { /** * Class to implement asynchronous notification of completion. * * Use-case: An "initiator" needs to wait for a set of "completers" to * finish a unit of work before an action can occur. This object * tracks the progress of the set of completers, and allows the action * to occur once all completers have signalled that they are done. * * The initiator and completers may be running in separate threads. * * The initiating thread is the thread that initiates the action, * i.e. the connection read thread. * * A completing thread is any thread that contributes to completion, * e.g. a store thread that does an async write. * There may be zero or more completers. * * When the work is complete, a callback is invoked. The callback * may be invoked in the Initiator thread, or one of the Completer * threads. The callback is passed a flag indicating whether or not * the callback is running under the context of the Initiator thread. * * Use model: * 1) Initiator thread invokes begin() * 2) After begin() has been invoked, zero or more Completers invoke * startCompleter(). Completers may be running in the same or * different thread as the Initiator, as long as they guarantee that * startCompleter() is invoked at least once before the Initiator invokes end(). * 3) Completers may invoke finishCompleter() at any time, even after the * initiator has invoked end(). finishCompleter() may be called from any * thread. * 4) startCompleter()/finishCompleter() calls "nest": for each call to * startCompleter(), a corresponding call to finishCompleter() must be made. * Once the last finishCompleter() is called, the Completer must no longer * reference the completion object. * 5) The Initiator invokes end() at the point where it has finished * dispatching work to the Completers, and is prepared for the callback * handler to be invoked. Note: if there are no outstanding Completers * pending when the Initiator invokes end(), the callback will be invoked * directly, and the sync parameter will be set true. This indicates to the * Initiator that the callback is executing in the context of the end() call, * and the Initiator is free to optimize the handling of the completion, * assuming no need for synchronization with Completer threads. */ class AsyncCompletion { public: /** Supplied by the Initiator to the end() method, allows for a callback * when all outstanding completers are done. If the callback cannot be * made during the end() call, the clone() method must supply a copy of * this callback object that persists after end() returns. The cloned * callback object will be used by the last completer thread, and * released when the callback returns. */ class Callback : public RefCounted { public: virtual void completed(bool) = 0; virtual boost::intrusive_ptr clone() = 0; }; private: mutable qpid::sys::AtomicValue completionsNeeded; mutable qpid::sys::Monitor callbackLock; bool inCallback, active; void invokeCallback(bool sync) { qpid::sys::Mutex::ScopedLock l(callbackLock); if (active) { if (callback.get()) { inCallback = true; { qpid::sys::Mutex::ScopedUnlock ul(callbackLock); callback->completed(sync); } inCallback = false; callback = boost::intrusive_ptr(); callbackLock.notifyAll(); } active = false; } } protected: /** Invoked when all completers have signalled that they have completed * (via calls to finishCompleter()). bool == true if called via end() */ boost::intrusive_ptr callback; public: AsyncCompletion() : completionsNeeded(0), inCallback(false), active(true) {}; virtual ~AsyncCompletion() { cancel(); } /** True when all outstanding operations have compeleted */ bool isDone() { return !active; } /** Called to signal the start of an asynchronous operation. The operation * is considered pending until finishCompleter() is called. * E.g. called when initiating an async store operation. */ void startCompleter() { ++completionsNeeded; } /** Called by completer to signal that it has finished the operation started * when startCompleter() was invoked. * e.g. called when async write complete. */ void finishCompleter() { if (--completionsNeeded == 0) { invokeCallback(false); } } /** called by initiator before any calls to startCompleter can be done. */ void begin() { ++completionsNeeded; } /** called by initiator after all potential completers have called * startCompleter(). */ void end(Callback& cb) { assert(completionsNeeded.get() > 0); // ensure begin() has been called! // the following only "decrements" the count if it is 1. This means // there are no more outstanding completers and we are done. if (completionsNeeded.boolCompareAndSwap(1, 0)) { // done! Complete immediately cb.completed(true); return; } // the compare-and-swap did not succeed. This means there are // outstanding completers pending (count > 1). Get a persistent // Callback object to use when the last completer is done. // Decrement after setting up the callback ensures that pending // completers cannot touch the callback until it is ready. callback = cb.clone(); if (--completionsNeeded == 0) { // note that a completer may have completed during the // callback setup or decrement: invokeCallback(true); } } /** may be called by Initiator to cancel the callback. Will wait for * callback to complete if in progress. */ virtual void cancel() { qpid::sys::Mutex::ScopedLock l(callbackLock); while (inCallback) callbackLock.wait(); callback = boost::intrusive_ptr(); active = false; } }; }} // qpid::broker:: #endif /*!_AsyncCompletion_*/