diff options
Diffstat (limited to 'src/mongo/db/repl/scatter_gather_runner.h')
-rw-r--r-- | src/mongo/db/repl/scatter_gather_runner.h | 97 |
1 files changed, 56 insertions, 41 deletions
diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h index ad7b8d93aa5..e4e6e40f404 100644 --- a/src/mongo/db/repl/scatter_gather_runner.h +++ b/src/mongo/db/repl/scatter_gather_runner.h @@ -33,6 +33,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" namespace mongo { @@ -44,7 +45,7 @@ namespace repl { class ScatterGatherAlgorithm; /** - * Implementation of a scatter-gather behavior using a ReplicationExecutor. + * Interface of a scatter-gather behavior. */ class ScatterGatherRunner { MONGO_DISALLOW_COPYING(ScatterGatherRunner); @@ -53,14 +54,12 @@ public: /** * Constructs a new runner whose underlying algorithm is "algorithm". * - * "algorithm" must remain in scope until the runner's destructor completes. + * "algorithm" and "executor" must remain in scope until the runner's destructor completes. */ - explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm); - - ~ScatterGatherRunner(); + explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor); /** - * Runs the scatter-gather process using "executor", and blocks until it completes. + * Runs the scatter-gather process and blocks until it completes. * * Must _not_ be run from inside the executor context. * @@ -70,55 +69,71 @@ public: * is scheduled but before it completes, this method will return Status::OK(), * just as it does when it runs successfully to completion. */ - Status run(ReplicationExecutor* executor); + Status run(); /** - * Starts executing the scatter-gather process using "executor". - * * On success, returns an event handle that will be signaled when the runner has * finished executing the scatter-gather process. After that event has been * signaled, it is safe for the caller to examine any state on "algorithm". * - * This method must be called inside the executor context. - * - * onCompletion is an optional callback that will be executed in executor context - * immediately prior to signaling the event handle returned here. It must never - * throw exceptions. It may examine the state of the algorithm object. - * - * NOTE: If the executor starts to shut down before onCompletion executes, onCompletion may - * never execute, even though the returned event will eventually be signaled. + * The returned event will eventually be signaled. */ - StatusWith<ReplicationExecutor::EventHandle> start( - ReplicationExecutor* executor, - const stdx::function<void()>& onCompletion = stdx::function<void()>()); + StatusWith<ReplicationExecutor::EventHandle> start(); /** - * Informs the runner to cancel further processing. The "executor" argument - * must point to the same executor passed to "start()". - * - * Like start, this method must be called from within the executor context. + * Informs the runner to cancel further processing. */ - void cancel(ReplicationExecutor* executor); + void cancel(); private: /** - * Callback invoked once for every response from the network. - */ - static void _processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, - ScatterGatherRunner* runner); - - /** - * Method that performs all actions required when _algorithm indicates a sufficient - * number of respones have been received. + * Implementation of a scatter-gather behavior using a ReplicationExecutor. */ - void _signalSufficientResponsesReceived(ReplicationExecutor* executor); - - ScatterGatherAlgorithm* _algorithm; - stdx::function<void()> _onCompletion; - ReplicationExecutor::EventHandle _sufficientResponsesReceived; - std::vector<ReplicationExecutor::CallbackHandle> _callbacks; - size_t _actualResponses; - bool _started; + class RunnerImpl { + public: + explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor); + + /** + * On success, returns an event handle that will be signaled when the runner has + * finished executing the scatter-gather process. After that event has been + * signaled, it is safe for the caller to examine any state on "algorithm". + * + * The returned event will eventually be signaled. + */ + StatusWith<ReplicationExecutor::EventHandle> start( + const ReplicationExecutor::RemoteCommandCallbackFn cb); + + /** + * Informs the runner to cancel further processing. + */ + void cancel(); + + /** + * Callback invoked once for every response from the network. + */ + void processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData); + + private: + /** + * Method that performs all actions required when _algorithm indicates a sufficient + * number of responses have been received. + */ + void _signalSufficientResponsesReceived(); + + ReplicationExecutor* _executor; // Not owned here. + ScatterGatherAlgorithm* _algorithm; // Not owned here. + ReplicationExecutor::EventHandle _sufficientResponsesReceived; + std::vector<ReplicationExecutor::CallbackHandle> _callbacks; + size_t _actualResponses = 0; + bool _started = false; + stdx::mutex _mutex; + }; + + ReplicationExecutor* _executor; // Not owned here. + + // This pointer of RunnerImpl will be shared with remote command callbacks to make sure + // callbacks can access the members safely. + std::shared_ptr<RunnerImpl> _impl; }; } // namespace repl |