summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/check_quorum_for_config_change.cpp6
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp5
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.h2
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp5
-rw-r--r--src/mongo/db/repl/freshness_checker.h2
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.cpp8
-rw-r--r--src/mongo/db/repl/scatter_gather_runner.h12
-rw-r--r--src/mongo/db/repl/scatter_gather_test.cpp147
-rw-r--r--src/mongo/db/repl/vote_requester.cpp7
-rw-r--r--src/mongo/db/repl/vote_requester.h3
10 files changed, 140 insertions, 57 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp
index a1be65dbaf3..b3652192e40 100644
--- a/src/mongo/db/repl/check_quorum_for_config_change.cpp
+++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp
@@ -282,14 +282,14 @@ bool QuorumChecker::hasReceivedSufficientResponses() const {
Status checkQuorumGeneral(executor::TaskExecutor* executor,
const ReplSetConfig& rsConfig,
const int myIndex) {
- QuorumChecker checker(&rsConfig, myIndex);
- ScatterGatherRunner runner(&checker, executor);
+ auto checker = std::make_shared<QuorumChecker>(&rsConfig, myIndex);
+ ScatterGatherRunner runner(checker, executor);
Status status = runner.run();
if (!status.isOK()) {
return status;
}
- return checker.getFinalStatus();
+ return checker->getFinalStatus();
}
Status checkQuorumForInitiate(executor::TaskExecutor* executor,
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index 5cf5697086e..8025cd78982 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -35,6 +35,7 @@
#include "mongo/base/status.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/scatter_gather_runner.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -132,8 +133,8 @@ StatusWith<executor::TaskExecutor::EventHandle> ElectCmdRunner::start(
const ReplSetConfig& currentConfig,
int selfIndex,
const std::vector<HostAndPort>& targets) {
- _algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, OID::gen()));
- _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ _algorithm = std::make_shared<Algorithm>(currentConfig, selfIndex, targets, OID::gen());
+ _runner = stdx::make_unique<ScatterGatherRunner>(_algorithm, executor);
return _runner->start();
}
diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h
index 6928a5fce08..278a77fdb8f 100644
--- a/src/mongo/db/repl/elect_cmd_runner.h
+++ b/src/mongo/db/repl/elect_cmd_runner.h
@@ -115,7 +115,7 @@ public:
}
private:
- std::unique_ptr<Algorithm> _algorithm;
+ std::shared_ptr<Algorithm> _algorithm;
std::unique_ptr<ScatterGatherRunner> _runner;
bool _isCanceled;
};
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index 8fd0d2b8df8..4939d68bd0a 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/scatter_gather_runner.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
@@ -215,8 +216,8 @@ StatusWith<executor::TaskExecutor::EventHandle> FreshnessChecker::start(
int selfIndex,
const std::vector<HostAndPort>& targets) {
_originalConfigVersion = currentConfig.getConfigVersion();
- _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets));
- _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ _algorithm = std::make_shared<Algorithm>(lastOpTimeApplied, currentConfig, selfIndex, targets);
+ _runner = stdx::make_unique<ScatterGatherRunner>(_algorithm, executor);
return _runner->start();
}
diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h
index 2db774a7baf..0a4bbf7113b 100644
--- a/src/mongo/db/repl/freshness_checker.h
+++ b/src/mongo/db/repl/freshness_checker.h
@@ -148,7 +148,7 @@ public:
long long getOriginalConfigVersion() const;
private:
- std::unique_ptr<Algorithm> _algorithm;
+ std::shared_ptr<Algorithm> _algorithm;
std::unique_ptr<ScatterGatherRunner> _runner;
long long _originalConfigVersion;
bool _isCanceled;
diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp
index f29600a2bf2..226b36f46ff 100644
--- a/src/mongo/db/repl/scatter_gather_runner.cpp
+++ b/src/mongo/db/repl/scatter_gather_runner.cpp
@@ -48,9 +48,9 @@ using EventHandle = executor::TaskExecutor::EventHandle;
using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn;
-ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm,
+ScatterGatherRunner::ScatterGatherRunner(std::shared_ptr<ScatterGatherAlgorithm> algorithm,
executor::TaskExecutor* executor)
- : _executor(executor), _impl(std::make_shared<RunnerImpl>(algorithm, executor)) {}
+ : _executor(executor), _impl(std::make_shared<RunnerImpl>(std::move(algorithm), executor)) {}
Status ScatterGatherRunner::run() {
auto finishEvh = start();
@@ -79,9 +79,9 @@ void ScatterGatherRunner::cancel() {
/**
* Scatter gather runner implementation.
*/
-ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm,
+ScatterGatherRunner::RunnerImpl::RunnerImpl(std::shared_ptr<ScatterGatherAlgorithm> algorithm,
executor::TaskExecutor* executor)
- : _executor(executor), _algorithm(algorithm) {}
+ : _executor(executor), _algorithm(std::move(algorithm)) {}
StatusWith<EventHandle> ScatterGatherRunner::RunnerImpl::start(
const RemoteCommandCallbackFn processResponseCB) {
diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h
index 4a58461de1e..ce12c99b1a8 100644
--- a/src/mongo/db/repl/scatter_gather_runner.h
+++ b/src/mongo/db/repl/scatter_gather_runner.h
@@ -54,9 +54,10 @@ public:
/**
* Constructs a new runner whose underlying algorithm is "algorithm".
*
- * "algorithm" and "executor" must remain in scope until the runner's destructor completes.
+ * "executor" must remain in scope until the runner's destructor completes.
+ * "algorithm" is shared between the runner and the caller.
*/
- explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm,
+ explicit ScatterGatherRunner(std::shared_ptr<ScatterGatherAlgorithm> algorithm,
executor::TaskExecutor* executor);
/**
@@ -92,7 +93,8 @@ private:
*/
class RunnerImpl {
public:
- explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, executor::TaskExecutor* executor);
+ explicit RunnerImpl(std::shared_ptr<ScatterGatherAlgorithm> algorithm,
+ executor::TaskExecutor* executor);
/**
* On success, returns an event handle that will be signaled when the runner has
@@ -121,8 +123,8 @@ private:
*/
void _signalSufficientResponsesReceived();
- executor::TaskExecutor* _executor; // Not owned here.
- ScatterGatherAlgorithm* _algorithm; // Not owned here.
+ executor::TaskExecutor* _executor; // Not owned here.
+ std::shared_ptr<ScatterGatherAlgorithm> _algorithm;
executor::TaskExecutor::EventHandle _sufficientResponsesReceived;
std::vector<executor::TaskExecutor::CallbackHandle> _callbacks;
bool _started = false;
diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp
index 3f4d8d4d5cd..cb9fae75ecb 100644
--- a/src/mongo/db/repl/scatter_gather_test.cpp
+++ b/src/mongo/db/repl/scatter_gather_test.cpp
@@ -153,13 +153,13 @@ executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) {
// scheduled callbacks still exist will not be unsafe (ASAN builder) after the algorithm has
// completed.
TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
- ScatterGatherTestAlgorithm* sga = new ScatterGatherTestAlgorithm();
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor());
bool ranCompletion = false;
StatusWith<executor::TaskExecutor::EventHandle> status = sgr->start();
- getExecutor()
- .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
- .status_with_transitional_ignore();
+ ASSERT_OK(getExecutor()
+ .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
+ .getStatus());
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -186,7 +186,7 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
net->runUntil(net->now() + Seconds(2));
ASSERT_TRUE(ranCompletion);
- delete sga;
+ sga.reset();
delete sgr;
net->runReadyNetworkOperations();
@@ -194,13 +194,90 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) {
net->exitNetwork();
}
+TEST_F(ScatterGatherTest, DeleteAlgorithmBeforeItCompletes) {
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor());
+ bool ranCompletion = false;
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr->start();
+ ASSERT_OK(status.getStatus());
+ ASSERT_OK(getExecutor()
+ .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
+ .getStatus());
+ ASSERT_FALSE(ranCompletion);
+
+ NetworkInterfaceMock* net = getNet();
+ net->enterNetwork();
+ NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ net->scheduleResponse(
+ noi, net->now(), (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ ASSERT_FALSE(ranCompletion);
+ // Get and process the response from the first node immediately.
+ net->runReadyNetworkOperations();
+
+ noi = net->getNextReadyRequest();
+ net->scheduleResponse(noi,
+ net->now() + Seconds(2),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ ASSERT_FALSE(ranCompletion);
+
+ noi = net->getNextReadyRequest();
+ net->scheduleResponse(noi,
+ net->now() + Seconds(5),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ ASSERT_FALSE(ranCompletion);
+
+ sga.reset();
+ delete sgr;
+
+ net->runUntil(net->now() + Seconds(2));
+ ASSERT_TRUE(ranCompletion);
+
+ net->exitNetwork();
+}
+
+TEST_F(ScatterGatherTest, DeleteAlgorithmAfterCancel) {
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor());
+ bool ranCompletion = false;
+ StatusWith<executor::TaskExecutor::EventHandle> status = sgr->start();
+ ASSERT_OK(status.getStatus());
+ ASSERT_OK(getExecutor()
+ .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
+ .getStatus());
+ ASSERT_FALSE(ranCompletion);
+
+ NetworkInterfaceMock* net = getNet();
+ net->enterNetwork();
+ NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ net->scheduleResponse(noi,
+ net->now() + Seconds(2),
+ (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10))));
+ ASSERT_FALSE(ranCompletion);
+
+ // Cancel the runner so following responses won't change the result. All pending requests
+ // are cancelled.
+ sgr->cancel();
+ ASSERT_FALSE(net->hasReadyRequests());
+ // Run the event that gets signaled by cancellation.
+ net->runReadyNetworkOperations();
+ ASSERT_TRUE(ranCompletion);
+
+ sga.reset();
+ delete sgr;
+
+ // It's safe to advance the clock to process the scheduled response.
+ auto now = net->now();
+ ASSERT_EQ(net->runUntil(net->now() + Seconds(2)), now + Seconds(2));
+ net->exitNetwork();
+}
+
// Confirm that shutting the TaskExecutor down before calling run() will cause run()
// to return ErrorCodes::ShutdownInProgress.
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) {
- ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, &getExecutor());
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner sgr(sga, &getExecutor());
shutdownExecutorThread();
- sga.finish();
+ sga->finish();
Status status = sgr.run();
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status);
}
@@ -208,8 +285,8 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) {
// Confirm that shutting the TaskExecutor down after calling run(), but before run()
// finishes will cause run() to return Status::OK().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
- ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, &getExecutor());
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner sgr(sga, &getExecutor());
ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor());
sgrr.run();
// need to wait for the scatter-gather to be scheduled in the executor
@@ -230,12 +307,12 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) {
// Confirm that shutting the TaskExecutor down before calling start() will cause start()
// to return ErrorCodes::ShutdownInProgress and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) {
- ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, &getExecutor());
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner sgr(sga, &getExecutor());
shutdownExecutorThread();
bool ranCompletion = false;
StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
- sga.finish();
+ sga->finish();
ASSERT_FALSE(ranCompletion);
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.getStatus());
}
@@ -243,28 +320,28 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) {
// Confirm that shutting the TaskExecutor down after calling start() will cause start()
// to return Status::OK and should not run onCompletion().
TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) {
- ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, &getExecutor());
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner sgr(sga, &getExecutor());
bool ranCompletion = false;
StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
- getExecutor()
- .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
- .status_with_transitional_ignore();
+ ASSERT_OK(getExecutor()
+ .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
+ .getStatus());
shutdownExecutorThread();
- sga.finish();
+ sga->finish();
ASSERT_FALSE(ranCompletion);
ASSERT_OK(status.getStatus());
}
// Confirm that responses are not processed once sufficient responses have been received.
TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) {
- ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, &getExecutor());
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner sgr(sga, &getExecutor());
bool ranCompletion = false;
StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
- getExecutor()
- .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
- .status_with_transitional_ignore();
+ ASSERT_OK(getExecutor()
+ .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
+ .getStatus());
ASSERT_OK(status.getStatus());
ASSERT_FALSE(ranCompletion);
@@ -293,22 +370,22 @@ TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) {
net->runReadyNetworkOperations();
// the third resposne should not be processed, so the count should not increment
- ASSERT_EQUALS(2, sga.getResponseCount());
+ ASSERT_EQUALS(2, sga->getResponseCount());
net->exitNetwork();
}
// Confirm that starting with sufficient responses received will immediate complete.
TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTrueImmediately) {
- ScatterGatherTestAlgorithm sga;
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
// set hasReceivedSufficientResponses to return true before the run starts
- sga.finish();
- ScatterGatherRunner sgr(&sga, &getExecutor());
+ sga->finish();
+ ScatterGatherRunner sgr(sga, &getExecutor());
bool ranCompletion = false;
StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start();
- getExecutor()
- .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
- .status_with_transitional_ignore();
+ ASSERT_OK(getExecutor()
+ .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion))
+ .getStatus());
ASSERT_OK(status.getStatus());
// Wait until callback finishes.
NetworkInterfaceMock* net = getNet();
@@ -323,8 +400,8 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
// This test ensures we do not process more responses than we've scheduled callbacks for.
TEST_F(ScatterGatherTest, NeverEnoughResponses) {
- ScatterGatherTestAlgorithm sga(5);
- ScatterGatherRunner sgr(&sga);
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>(5);
+ ScatterGatherRunner sgr(sga);
bool ranCompletion = false;
StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start(&getExecutor(),
getOnCompletionTestFunction(&ranCompletion));
@@ -365,8 +442,8 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru
// Confirm that running via run() will finish once sufficient responses have been received.
TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) {
- ScatterGatherTestAlgorithm sga;
- ScatterGatherRunner sgr(&sga, &getExecutor());
+ auto sga = std::make_shared<ScatterGatherTestAlgorithm>();
+ ScatterGatherRunner sgr(sga, &getExecutor());
ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor());
sgrr.run();
diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp
index cf06c62bd18..67d56d004d3 100644
--- a/src/mongo/db/repl/vote_requester.cpp
+++ b/src/mongo/db/repl/vote_requester.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/repl/repl_set_request_votes_args.h"
#include "mongo/db/repl/scatter_gather_runner.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -170,9 +171,9 @@ StatusWith<executor::TaskExecutor::EventHandle> VoteRequester::start(
bool dryRun,
OpTime lastDurableOpTime,
int primaryIndex) {
- _algorithm.reset(
- new Algorithm(rsConfig, candidateIndex, term, dryRun, lastDurableOpTime, primaryIndex));
- _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ _algorithm = std::make_shared<Algorithm>(
+ rsConfig, candidateIndex, term, dryRun, lastDurableOpTime, primaryIndex);
+ _runner = stdx::make_unique<ScatterGatherRunner>(_algorithm, executor);
return _runner->start();
}
diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h
index d3d33851379..db67e41a50b 100644
--- a/src/mongo/db/repl/vote_requester.h
+++ b/src/mongo/db/repl/vote_requester.h
@@ -39,6 +39,7 @@
#include "mongo/db/repl/scatter_gather_runner.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
namespace mongo {
@@ -124,7 +125,7 @@ public:
unordered_set<HostAndPort> getResponders() const;
private:
- std::unique_ptr<Algorithm> _algorithm;
+ std::shared_ptr<Algorithm> _algorithm;
std::unique_ptr<ScatterGatherRunner> _runner;
bool _isCanceled = false;
};