diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-08-23 22:17:57 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-08-28 18:55:16 -0400 |
commit | c246ae62641c3559c38830f6f5f4981e0acffa0c (patch) | |
tree | 02606ad94456d5a63fb8f5294305c042450ad82d /src/mongo | |
parent | 2203abc793e888e1c7b281b8bf213e0fd0446795 (diff) | |
download | mongo-c246ae62641c3559c38830f6f5f4981e0acffa0c.tar.gz |
SERVER-29386 VoteRequestor and FreshnessChecker need safe destruction
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/check_quorum_for_config_change.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_checker.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_checker.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/scatter_gather_runner.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/scatter_gather_runner.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/scatter_gather_test.cpp | 147 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/vote_requester.h | 3 |
10 files changed, 140 insertions, 57 deletions
diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index a1be65dbaf3..b3652192e40 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -282,14 +282,14 @@ bool QuorumChecker::hasReceivedSufficientResponses() const { Status checkQuorumGeneral(executor::TaskExecutor* executor, const ReplSetConfig& rsConfig, const int myIndex) { - QuorumChecker checker(&rsConfig, myIndex); - ScatterGatherRunner runner(&checker, executor); + auto checker = std::make_shared<QuorumChecker>(&rsConfig, myIndex); + ScatterGatherRunner runner(checker, executor); Status status = runner.run(); if (!status.isOK()) { return status; } - return checker.getFinalStatus(); + return checker->getFinalStatus(); } Status checkQuorumForInitiate(executor::TaskExecutor* executor, diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index 5cf5697086e..8025cd78982 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -35,6 +35,7 @@ #include "mongo/base/status.h" #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/scatter_gather_runner.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { @@ -132,8 +133,8 @@ StatusWith<executor::TaskExecutor::EventHandle> ElectCmdRunner::start( const ReplSetConfig& currentConfig, int selfIndex, const std::vector<HostAndPort>& targets) { - _algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, OID::gen())); - _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + _algorithm = std::make_shared<Algorithm>(currentConfig, selfIndex, targets, OID::gen()); + _runner = stdx::make_unique<ScatterGatherRunner>(_algorithm, executor); return _runner->start(); } diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h index 6928a5fce08..278a77fdb8f 100644 --- a/src/mongo/db/repl/elect_cmd_runner.h +++ b/src/mongo/db/repl/elect_cmd_runner.h @@ -115,7 +115,7 @@ public: } private: - std::unique_ptr<Algorithm> _algorithm; + std::shared_ptr<Algorithm> _algorithm; std::unique_ptr<ScatterGatherRunner> _runner; bool _isCanceled; }; diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index 8fd0d2b8df8..4939d68bd0a 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/repl_set_config.h" #include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" @@ -215,8 +216,8 @@ StatusWith<executor::TaskExecutor::EventHandle> FreshnessChecker::start( int selfIndex, const std::vector<HostAndPort>& targets) { _originalConfigVersion = currentConfig.getConfigVersion(); - _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets)); - _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + _algorithm = std::make_shared<Algorithm>(lastOpTimeApplied, currentConfig, selfIndex, targets); + _runner = stdx::make_unique<ScatterGatherRunner>(_algorithm, executor); return _runner->start(); } diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h index 2db774a7baf..0a4bbf7113b 100644 --- a/src/mongo/db/repl/freshness_checker.h +++ b/src/mongo/db/repl/freshness_checker.h @@ -148,7 +148,7 @@ public: long long getOriginalConfigVersion() const; private: - std::unique_ptr<Algorithm> _algorithm; + std::shared_ptr<Algorithm> _algorithm; std::unique_ptr<ScatterGatherRunner> _runner; long long _originalConfigVersion; bool _isCanceled; diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp index f29600a2bf2..226b36f46ff 100644 --- a/src/mongo/db/repl/scatter_gather_runner.cpp +++ b/src/mongo/db/repl/scatter_gather_runner.cpp @@ -48,9 +48,9 @@ using EventHandle = executor::TaskExecutor::EventHandle; using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; using RemoteCommandCallbackFn = executor::TaskExecutor::RemoteCommandCallbackFn; -ScatterGatherRunner::ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, +ScatterGatherRunner::ScatterGatherRunner(std::shared_ptr<ScatterGatherAlgorithm> algorithm, executor::TaskExecutor* executor) - : _executor(executor), _impl(std::make_shared<RunnerImpl>(algorithm, executor)) {} + : _executor(executor), _impl(std::make_shared<RunnerImpl>(std::move(algorithm), executor)) {} Status ScatterGatherRunner::run() { auto finishEvh = start(); @@ -79,9 +79,9 @@ void ScatterGatherRunner::cancel() { /** * Scatter gather runner implementation. */ -ScatterGatherRunner::RunnerImpl::RunnerImpl(ScatterGatherAlgorithm* algorithm, +ScatterGatherRunner::RunnerImpl::RunnerImpl(std::shared_ptr<ScatterGatherAlgorithm> algorithm, executor::TaskExecutor* executor) - : _executor(executor), _algorithm(algorithm) {} + : _executor(executor), _algorithm(std::move(algorithm)) {} StatusWith<EventHandle> ScatterGatherRunner::RunnerImpl::start( const RemoteCommandCallbackFn processResponseCB) { diff --git a/src/mongo/db/repl/scatter_gather_runner.h b/src/mongo/db/repl/scatter_gather_runner.h index 4a58461de1e..ce12c99b1a8 100644 --- a/src/mongo/db/repl/scatter_gather_runner.h +++ b/src/mongo/db/repl/scatter_gather_runner.h @@ -54,9 +54,10 @@ public: /** * Constructs a new runner whose underlying algorithm is "algorithm". * - * "algorithm" and "executor" must remain in scope until the runner's destructor completes. + * "executor" must remain in scope until the runner's destructor completes. + * "algorithm" is shared between the runner and the caller. */ - explicit ScatterGatherRunner(ScatterGatherAlgorithm* algorithm, + explicit ScatterGatherRunner(std::shared_ptr<ScatterGatherAlgorithm> algorithm, executor::TaskExecutor* executor); /** @@ -92,7 +93,8 @@ private: */ class RunnerImpl { public: - explicit RunnerImpl(ScatterGatherAlgorithm* algorithm, executor::TaskExecutor* executor); + explicit RunnerImpl(std::shared_ptr<ScatterGatherAlgorithm> algorithm, + executor::TaskExecutor* executor); /** * On success, returns an event handle that will be signaled when the runner has @@ -121,8 +123,8 @@ private: */ void _signalSufficientResponsesReceived(); - executor::TaskExecutor* _executor; // Not owned here. - ScatterGatherAlgorithm* _algorithm; // Not owned here. + executor::TaskExecutor* _executor; // Not owned here. + std::shared_ptr<ScatterGatherAlgorithm> _algorithm; executor::TaskExecutor::EventHandle _sufficientResponsesReceived; std::vector<executor::TaskExecutor::CallbackHandle> _callbacks; bool _started = false; diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index 3f4d8d4d5cd..cb9fae75ecb 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -153,13 +153,13 @@ executor::TaskExecutor::CallbackFn getOnCompletionTestFunction(bool* ran) { // scheduled callbacks still exist will not be unsafe (ASAN builder) after the algorithm has // completed. TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) { - ScatterGatherTestAlgorithm* sga = new ScatterGatherTestAlgorithm(); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor()); bool ranCompletion = false; StatusWith<executor::TaskExecutor::EventHandle> status = sgr->start(); - getExecutor() - .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) - .status_with_transitional_ignore(); + ASSERT_OK(getExecutor() + .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) + .getStatus()); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); @@ -186,7 +186,7 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) { net->runUntil(net->now() + Seconds(2)); ASSERT_TRUE(ranCompletion); - delete sga; + sga.reset(); delete sgr; net->runReadyNetworkOperations(); @@ -194,13 +194,90 @@ TEST_F(ScatterGatherTest, DeleteAlgorithmAfterItHasCompleted) { net->exitNetwork(); } +TEST_F(ScatterGatherTest, DeleteAlgorithmBeforeItCompletes) { + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor()); + bool ranCompletion = false; + StatusWith<executor::TaskExecutor::EventHandle> status = sgr->start(); + ASSERT_OK(status.getStatus()); + ASSERT_OK(getExecutor() + .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) + .getStatus()); + ASSERT_FALSE(ranCompletion); + + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + net->scheduleResponse( + noi, net->now(), (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + ASSERT_FALSE(ranCompletion); + // Get and process the response from the first node immediately. + net->runReadyNetworkOperations(); + + noi = net->getNextReadyRequest(); + net->scheduleResponse(noi, + net->now() + Seconds(2), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + ASSERT_FALSE(ranCompletion); + + noi = net->getNextReadyRequest(); + net->scheduleResponse(noi, + net->now() + Seconds(5), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + ASSERT_FALSE(ranCompletion); + + sga.reset(); + delete sgr; + + net->runUntil(net->now() + Seconds(2)); + ASSERT_TRUE(ranCompletion); + + net->exitNetwork(); +} + +TEST_F(ScatterGatherTest, DeleteAlgorithmAfterCancel) { + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner* sgr = new ScatterGatherRunner(sga, &getExecutor()); + bool ranCompletion = false; + StatusWith<executor::TaskExecutor::EventHandle> status = sgr->start(); + ASSERT_OK(status.getStatus()); + ASSERT_OK(getExecutor() + .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) + .getStatus()); + ASSERT_FALSE(ranCompletion); + + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + net->scheduleResponse(noi, + net->now() + Seconds(2), + (RemoteCommandResponse(BSON("ok" << 1), BSONObj(), Milliseconds(10)))); + ASSERT_FALSE(ranCompletion); + + // Cancel the runner so following responses won't change the result. All pending requests + // are cancelled. + sgr->cancel(); + ASSERT_FALSE(net->hasReadyRequests()); + // Run the event that gets signaled by cancellation. + net->runReadyNetworkOperations(); + ASSERT_TRUE(ranCompletion); + + sga.reset(); + delete sgr; + + // It's safe to advance the clock to process the scheduled response. + auto now = net->now(); + ASSERT_EQ(net->runUntil(net->now() + Seconds(2)), now + Seconds(2)); + net->exitNetwork(); +} + // Confirm that shutting the TaskExecutor down before calling run() will cause run() // to return ErrorCodes::ShutdownInProgress. TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) { - ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, &getExecutor()); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner sgr(sga, &getExecutor()); shutdownExecutorThread(); - sga.finish(); + sga->finish(); Status status = sgr.run(); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status); } @@ -208,8 +285,8 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeRun) { // Confirm that shutting the TaskExecutor down after calling run(), but before run() // finishes will cause run() to return Status::OK(). TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) { - ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, &getExecutor()); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner sgr(sga, &getExecutor()); ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor()); sgrr.run(); // need to wait for the scatter-gather to be scheduled in the executor @@ -230,12 +307,12 @@ TEST_F(ScatterGatherTest, ShutdownExecutorAfterRun) { // Confirm that shutting the TaskExecutor down before calling start() will cause start() // to return ErrorCodes::ShutdownInProgress and should not run onCompletion(). TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) { - ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, &getExecutor()); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner sgr(sga, &getExecutor()); shutdownExecutorThread(); bool ranCompletion = false; StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start(); - sga.finish(); + sga->finish(); ASSERT_FALSE(ranCompletion); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, status.getStatus()); } @@ -243,28 +320,28 @@ TEST_F(ScatterGatherTest, ShutdownExecutorBeforeStart) { // Confirm that shutting the TaskExecutor down after calling start() will cause start() // to return Status::OK and should not run onCompletion(). TEST_F(ScatterGatherTest, ShutdownExecutorAfterStart) { - ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, &getExecutor()); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner sgr(sga, &getExecutor()); bool ranCompletion = false; StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start(); - getExecutor() - .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) - .status_with_transitional_ignore(); + ASSERT_OK(getExecutor() + .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) + .getStatus()); shutdownExecutorThread(); - sga.finish(); + sga->finish(); ASSERT_FALSE(ranCompletion); ASSERT_OK(status.getStatus()); } // Confirm that responses are not processed once sufficient responses have been received. TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) { - ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, &getExecutor()); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner sgr(sga, &getExecutor()); bool ranCompletion = false; StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start(); - getExecutor() - .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) - .status_with_transitional_ignore(); + ASSERT_OK(getExecutor() + .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) + .getStatus()); ASSERT_OK(status.getStatus()); ASSERT_FALSE(ranCompletion); @@ -293,22 +370,22 @@ TEST_F(ScatterGatherTest, DoNotProcessMoreThanSufficientResponses) { net->runReadyNetworkOperations(); // the third resposne should not be processed, so the count should not increment - ASSERT_EQUALS(2, sga.getResponseCount()); + ASSERT_EQUALS(2, sga->getResponseCount()); net->exitNetwork(); } // Confirm that starting with sufficient responses received will immediate complete. TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTrueImmediately) { - ScatterGatherTestAlgorithm sga; + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); // set hasReceivedSufficientResponses to return true before the run starts - sga.finish(); - ScatterGatherRunner sgr(&sga, &getExecutor()); + sga->finish(); + ScatterGatherRunner sgr(sga, &getExecutor()); bool ranCompletion = false; StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start(); - getExecutor() - .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) - .status_with_transitional_ignore(); + ASSERT_OK(getExecutor() + .onEvent(status.getValue(), getOnCompletionTestFunction(&ranCompletion)) + .getStatus()); ASSERT_OK(status.getStatus()); // Wait until callback finishes. NetworkInterfaceMock* net = getNet(); @@ -323,8 +400,8 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru // This test ensures we do not process more responses than we've scheduled callbacks for. TEST_F(ScatterGatherTest, NeverEnoughResponses) { - ScatterGatherTestAlgorithm sga(5); - ScatterGatherRunner sgr(&sga); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(5); + ScatterGatherRunner sgr(sga); bool ranCompletion = false; StatusWith<executor::TaskExecutor::EventHandle> status = sgr.start(&getExecutor(), getOnCompletionTestFunction(&ranCompletion)); @@ -365,8 +442,8 @@ TEST_F(ScatterGatherTest, DoNotCreateCallbacksIfHasSufficientResponsesReturnsTru // Confirm that running via run() will finish once sufficient responses have been received. TEST_F(ScatterGatherTest, SuccessfulScatterGatherViaRun) { - ScatterGatherTestAlgorithm sga; - ScatterGatherRunner sgr(&sga, &getExecutor()); + auto sga = std::make_shared<ScatterGatherTestAlgorithm>(); + ScatterGatherRunner sgr(sga, &getExecutor()); ScatterGatherRunnerRunner sgrr(&sgr, &getExecutor()); sgrr.run(); diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index cf06c62bd18..67d56d004d3 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -36,6 +36,7 @@ #include "mongo/db/repl/repl_set_request_votes_args.h" #include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { @@ -170,9 +171,9 @@ StatusWith<executor::TaskExecutor::EventHandle> VoteRequester::start( bool dryRun, OpTime lastDurableOpTime, int primaryIndex) { - _algorithm.reset( - new Algorithm(rsConfig, candidateIndex, term, dryRun, lastDurableOpTime, primaryIndex)); - _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + _algorithm = std::make_shared<Algorithm>( + rsConfig, candidateIndex, term, dryRun, lastDurableOpTime, primaryIndex); + _runner = stdx::make_unique<ScatterGatherRunner>(_algorithm, executor); return _runner->start(); } diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h index d3d33851379..db67e41a50b 100644 --- a/src/mongo/db/repl/vote_requester.h +++ b/src/mongo/db/repl/vote_requester.h @@ -39,6 +39,7 @@ #include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" namespace mongo { @@ -124,7 +125,7 @@ public: unordered_set<HostAndPort> getResponders() const; private: - std::unique_ptr<Algorithm> _algorithm; + std::shared_ptr<Algorithm> _algorithm; std::unique_ptr<ScatterGatherRunner> _runner; bool _isCanceled = false; }; |