summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/scatter_gather_runner.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/scatter_gather_runner.h')
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.h97
1 files changed, 56 insertions, 41 deletions
diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h
index ad7b8d93aa5..e4e6e40f404 100644
--- a/src/mongo/db/repl/scatter_gather_runner.h
+++ b/src/mongo/db/repl/scatter_gather_runner.h
@@ -33,6 +33,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -44,7 +45,7 @@ namespace repl {
class ScatterGatherAlgorithm;
/**
- * Implementation of a scatter-gather behavior using a ReplicationExecutor.
+ * Interface of a scatter-gather behavior.
*/
class ScatterGatherRunner {
MONGO_DISALLOW_COPYING(ScatterGatherRunner);
@@ -53,14 +54,12 @@ public:
/**
* Constructs a new runner whose underlying algorithm is "algorithm".
*
- * "algorithm" must remain in scope until the runner's destructor completes.
+ * "algorithm" and "executor" must remain in scope until the runner's destructor completes.
*/
- explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm);
-
- ~ScatterGatherRunner();
+ explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
/**
- * Runs the scatter-gather process using "executor", and blocks until it completes.
+ * Runs the scatter-gather process and blocks until it completes.
*
* Must _not_ be run from inside the executor context.
*
@@ -70,55 +69,71 @@ public:
* is scheduled but before it completes, this method will return Status::OK(),
* just as it does when it runs successfully to completion.
*/
- Status run(ReplicationExecutor* executor);
+ Status run();
/**
- * Starts executing the scatter-gather process using "executor".
- *
* On success, returns an event handle that will be signaled when the runner has
* finished executing the scatter-gather process. After that event has been
* signaled, it is safe for the caller to examine any state on "algorithm".
*
- * This method must be called inside the executor context.
- *
- * onCompletion is an optional callback that will be executed in executor context
- * immediately prior to signaling the event handle returned here. It must never
- * throw exceptions. It may examine the state of the algorithm object.
- *
- * NOTE: If the executor starts to shut down before onCompletion executes, onCompletion may
- * never execute, even though the returned event will eventually be signaled.
+ * The returned event will eventually be signaled.
*/
- StatusWith<ReplicationExecutor::EventHandle> start(
- ReplicationExecutor* executor,
- const stdx::function<void()>& onCompletion = stdx::function<void()>());
+ StatusWith<ReplicationExecutor::EventHandle> start();
/**
- * Informs the runner to cancel further processing. The "executor" argument
- * must point to the same executor passed to "start()".
- *
- * Like start, this method must be called from within the executor context.
+ * Informs the runner to cancel further processing.
*/
- void cancel(ReplicationExecutor* executor);
+ void cancel();
private:
/**
- * Callback invoked once for every response from the network.
- */
- static void _processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData,
- ScatterGatherRunner* runner);
-
- /**
- * Method that performs all actions required when _algorithm indicates a sufficient
- * number of respones have been received.
+ * Implementation of a scatter-gather behavior using a ReplicationExecutor.
*/
- void _signalSufficientResponsesReceived(ReplicationExecutor* executor);
-
- ScatterGatherAlgorithm* _algorithm;
- stdx::function<void()> _onCompletion;
- ReplicationExecutor::EventHandle _sufficientResponsesReceived;
- std::vector<ReplicationExecutor::CallbackHandle> _callbacks;
- size_t _actualResponses;
- bool _started;
+ class RunnerImpl {
+ public:
+ explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, ReplicationExecutor* executor);
+
+ /**
+ * On success, returns an event handle that will be signaled when the runner has
+ * finished executing the scatter-gather process. After that event has been
+ * signaled, it is safe for the caller to examine any state on "algorithm".
+ *
+ * The returned event will eventually be signaled.
+ */
+ StatusWith<ReplicationExecutor::EventHandle> start(
+ const ReplicationExecutor::RemoteCommandCallbackFn cb);
+
+ /**
+ * Informs the runner to cancel further processing.
+ */
+ void cancel();
+
+ /**
+ * Callback invoked once for every response from the network.
+ */
+ void processResponse(const ReplicationExecutor::RemoteCommandCallbackArgs& cbData);
+
+ private:
+ /**
+ * Method that performs all actions required when _algorithm indicates a sufficient
+ * number of responses have been received.
+ */
+ void _signalSufficientResponsesReceived();
+
+ ReplicationExecutor* _executor; // Not owned here.
+ ScatterGatherAlgorithm* _algorithm; // Not owned here.
+ ReplicationExecutor::EventHandle _sufficientResponsesReceived;
+ std::vector<ReplicationExecutor::CallbackHandle> _callbacks;
+ size_t _actualResponses = 0;
+ bool _started = false;
+ stdx::mutex _mutex;
+ };
+
+ ReplicationExecutor* _executor; // Not owned here.
+
+ // This pointer of RunnerImpl will be shared with remote command callbacks to make sure
+ // callbacks can access the members safely.
+ std::shared_ptr<RunnerImpl> _impl;
};
} // namespace repl