diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2014-09-13 22:43:51 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2014-09-15 19:18:21 -0400 |
commit | d427859985fffb76278a104d46a1a913528dbc4a (patch) | |
tree | 7a4f3f24bfb1e3a59e3fff458c0be31687704bfe | |
parent | f8a3f964eda2be32154dd8afbfad9bdf6283e82e (diff) | |
download | mongo-d427859985fffb76278a104d46a1a913528dbc4a.tar.gz |
SERVER-15248 Convert FreshnessChecker to use a ScatterGatherAlgorithm.
Also, use completion functions instead of scheduling work on events to handle
election phase transitions in repl_coordinator_impl_elect.cpp. Leads to less
scheduling, and easier-to-follow code.
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/elect_cmd_runner.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_checker.cpp | 156 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_checker.h | 89 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_checker_test.cpp | 356 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl_elect.cpp | 89 |
7 files changed, 239 insertions, 465 deletions
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index 5afbeada778..6772e50e004 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -134,12 +134,13 @@ namespace repl { ReplicationExecutor* executor, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<HostAndPort>& targets) { + const std::vector<HostAndPort>& targets, + const stdx::function<void ()>& onCompletion) { const long long round(executor->nextRandomInt64(std::numeric_limits<int64_t>::max())); _algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, round)); _runner.reset(new ScatterGatherRunner(_algorithm.get())); - return _runner->start(executor); + return _runner->start(executor, onCompletion); } int ElectCmdRunner::getReceivedVotes() const { diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h index 67518f70fee..bf1d062f58c 100644 --- a/src/mongo/db/repl/elect_cmd_runner.h +++ b/src/mongo/db/repl/elect_cmd_runner.h @@ -92,7 +92,8 @@ namespace repl { ReplicationExecutor* executor, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<HostAndPort>& targets); + const std::vector<HostAndPort>& targets, + const stdx::function<void ()>& onCompletion = stdx::function<void ()>()); /** * Returns the number of received votes. Only valid to call after diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index 01c85ec592b..36eed2f5535 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -37,6 +37,7 @@ #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_runner.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" @@ -44,98 +45,74 @@ namespace mongo { namespace repl { - FreshnessChecker::FreshnessChecker() : _actualResponses(0), - _freshest(true), - _tied(false), - _originalConfigVersion(0) { + FreshnessChecker::Algorithm::Algorithm( + OpTime lastOpTimeApplied, + const ReplicaSetConfig& rsConfig, + int selfIndex, + const std::vector<HostAndPort>& targets) : + _actualResponses(0), + _freshest(true), + _tied(false), + _lastOpTimeApplied(lastOpTimeApplied), + _rsConfig(rsConfig), + _selfIndex(selfIndex), + _targets(targets) { } + FreshnessChecker::Algorithm::~Algorithm() {} - Status FreshnessChecker::start( - ReplicationExecutor* executor, - const ReplicationExecutor::EventHandle& evh, - const OpTime& lastOpTimeApplied, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& hosts) { + std::vector<ReplicationExecutor::RemoteCommandRequest> + FreshnessChecker::Algorithm::getRequests() const { - _lastOpTimeApplied = lastOpTimeApplied; - _freshest = true; - _originalConfigVersion = currentConfig.getConfigVersion(); - _sufficientResponsesReceived = evh; + const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); // gather all not-down nodes, get their fullnames(or hostandport's) // schedule fresh command for each node - BSONObj replSetFreshCmd = BSON("replSetFresh" << 1 << - "set" << currentConfig.getReplSetName() << - "opTime" << Date_t(lastOpTimeApplied.asDate()) << - "who" << currentConfig.getMemberAt(selfIndex) - .getHostAndPort().toString() << - "cfgver" << currentConfig.getConfigVersion() << - "id" << currentConfig.getMemberAt(selfIndex).getId()); - for (std::vector<HostAndPort>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) { - const StatusWith<ReplicationExecutor::CallbackHandle> cbh = - executor->scheduleRemoteCommand( - ReplicationExecutor::RemoteCommandRequest( + const BSONObj replSetFreshCmd = + BSON("replSetFresh" << 1 << + "set" << _rsConfig.getReplSetName() << + "opTime" << Date_t(_lastOpTimeApplied.asDate()) << + "who" << selfConfig.getHostAndPort().toString() << + "cfgver" << _rsConfig.getConfigVersion() << + "id" << selfConfig.getId()); + + std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); + it != _targets.end(); + ++it) { + invariant(*it != selfConfig.getHostAndPort()); + requests.push_back(ReplicationExecutor::RemoteCommandRequest( *it, "admin", replSetFreshCmd, - Milliseconds(30*1000)), // trying to match current Socket timeout - stdx::bind(&FreshnessChecker::_onReplSetFreshResponse, - this, - stdx::placeholders::_1)); - if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { - return cbh.getStatus(); - } - fassert(18682, cbh.getStatus()); - - _responseCallbacks.push_back(cbh.getValue()); - } - - if (_responseCallbacks.size() == 0) { - _signalSufficientResponsesReceived(executor); + Milliseconds(30*1000))); // trying to match current Socket timeout } - return Status::OK(); + return requests; } - void FreshnessChecker::_onReplSetFreshResponse( - const ReplicationExecutor::RemoteCommandCallbackData& cbData) { + void FreshnessChecker::Algorithm::processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response) { ++_actualResponses; - if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) { - return; - } - - if (!cbData.response.isOK()) { + if (!response.isOK()) { // command failed, so nothing further to do. - if (_actualResponses == _responseCallbacks.size()) { - _signalSufficientResponsesReceived(cbData.executor); - } return; } - ScopeGuard sufficientResponsesReceivedCaller = - MakeObjGuard(*this, - &FreshnessChecker::_signalSufficientResponsesReceived, - cbData.executor); - - BSONObj res = cbData.response.getValue().data; + const BSONObj res = response.getValue().data; if (res["fresher"].trueValue()) { log() << "not electing self, we are not freshest"; _freshest = false; return; } - + if (res["opTime"].type() != mongo::Date) { - error() << "wrong type for opTime argument in replSetFresh response: " << + error() << "wrong type for opTime argument in replSetFresh response: " << typeName(res["opTime"].type()); _freshest = false; - if (_actualResponses != _responseCallbacks.size()) { - // More responses are still pending. - sufficientResponsesReceivedCaller.Dismiss(); - } return; } OpTime remoteTime(res["opTime"].date()); @@ -147,54 +124,57 @@ namespace repl { _freshest = false; return; } - + if (res["veto"].trueValue()) { BSONElement msg = res["errmsg"]; if (!msg.eoo()) { - log() << "not electing self, " << cbData.request.target.toString() << + log() << "not electing self, " << request.target.toString() << " would veto with '" << msg << "'"; } else { - log() << "not electing self, " << cbData.request.target.toString() << + log() << "not electing self, " << request.target.toString() << " would veto"; } _freshest = false; return; } - - if (_actualResponses != _responseCallbacks.size()) { - // More responses are still pending. - sufficientResponsesReceivedCaller.Dismiss(); - } } - void FreshnessChecker::_signalSufficientResponsesReceived(ReplicationExecutor* executor) { - if (_sufficientResponsesReceived.isValid()) { - - // Cancel all the command callbacks, - // so that they do not attempt to access FreshnessChecker - // state after this callback completes. - std::for_each(_responseCallbacks.begin(), - _responseCallbacks.end(), - stdx::bind(&ReplicationExecutor::cancel, - executor, - stdx::placeholders::_1)); - - executor->signalEvent(_sufficientResponsesReceived); - _sufficientResponsesReceived = ReplicationExecutor::EventHandle(); - + bool FreshnessChecker::Algorithm::hasReceivedSufficientResponses() const { + if (!_freshest) { + return true; + } + if (_actualResponses == _targets.size()) { + return true; } + return false; } void FreshnessChecker::getResults(bool* freshest, bool* tied) const { - *freshest = _freshest; - *tied = _tied; + *freshest = _algorithm->isFreshest(); + *tied = _algorithm->isTiedForFreshest(); } long long FreshnessChecker::getOriginalConfigVersion() const { return _originalConfigVersion; } + FreshnessChecker::FreshnessChecker() {} + FreshnessChecker::~FreshnessChecker() {} + + StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start( + ReplicationExecutor* executor, + const OpTime& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& targets, + const stdx::function<void ()>& onCompletion) { + + _originalConfigVersion = currentConfig.getConfigVersion(); + _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets)); + _runner.reset(new ScatterGatherRunner(_algorithm.get())); + return _runner->start(executor, onCompletion); + } } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h index a0863f81fdf..e5f870c9b3d 100644 --- a/src/mongo/db/repl/freshness_checker.h +++ b/src/mongo/db/repl/freshness_checker.h @@ -31,8 +31,10 @@ #include <vector> #include "mongo/base/disallow_copying.h" -#include "mongo/db/repl/replication_executor.h" #include "mongo/bson/optime.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/scatter_gather_algorithm.h" namespace mongo { @@ -41,12 +43,49 @@ namespace mongo { namespace repl { class ReplicaSetConfig; - class MemberHeartbeatData; + class ScatterGatherRunner; class FreshnessChecker { MONGO_DISALLOW_COPYING(FreshnessChecker); public: + class Algorithm : public ScatterGatherAlgorithm { + public: + Algorithm(OpTime lastOpTimeApplied, + const ReplicaSetConfig& rsConfig, + int selfIndex, + const std::vector<HostAndPort>& targets); + virtual ~Algorithm(); + virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual void processResponse( + const ReplicationExecutor::RemoteCommandRequest& request, + const ResponseStatus& response); + virtual bool hasReceivedSufficientResponses() const; + + bool isFreshest() const { return _freshest; } + bool isTiedForFreshest() const { return _tied; } + + private: + // Number of responses received so far. + size_t _actualResponses; + + // Does this node have the latest applied optime of all queriable nodes in the set? + bool _freshest; + + // Does this node have the same optime as another queriable node in the set? + bool _tied; + + // Last OpTime applied by the caller; used in the Fresh command + const OpTime _lastOpTimeApplied; + + const ReplicaSetConfig _rsConfig; + + const int _selfIndex; + + const std::vector<HostAndPort> _targets; + }; + FreshnessChecker(); + virtual ~FreshnessChecker(); /** * Begins the process of sending replSetFresh commands to all non-DOWN nodes @@ -57,13 +96,13 @@ namespace repl { * callbacks that it schedules. * If this function returns Status::OK(), evh is then guaranteed to be signaled. **/ - Status start( + StatusWith<ReplicationExecutor::EventHandle> start( ReplicationExecutor* executor, - const ReplicationExecutor::EventHandle& evh, - const OpTime& lastOpTimeApplied, + const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<HostAndPort>& hosts); + const std::vector<HostAndPort>& targets, + const stdx::function<void ()>& onCompletion = stdx::function<void ()>()); /** * Returns whether this node is the freshest of all non-DOWN nodes in the set, @@ -80,40 +119,10 @@ namespace repl { long long getOriginalConfigVersion() const; private: - /** - * Callback that runs after a replSetFresh command returns. - * Adjusts _tied and _freshest flags appropriately, and - * signals completion if we have received the last expected response. - */ - void _onReplSetFreshResponse(const ReplicationExecutor::RemoteCommandCallbackData& cbData); - - /** - * Signals _sufficientResponsesReceived event, if it hasn't been already. - */ - void _signalSufficientResponsesReceived(ReplicationExecutor* executor); - - // Event used to signal completion of the FreshnessChecker's commands. - ReplicationExecutor::EventHandle _sufficientResponsesReceived; - - // Vector of command callbacks scheduled by start() and - // canceled by _onFreshnessCheckComplete(). - std::vector<ReplicationExecutor::CallbackHandle> _responseCallbacks; - - // Last OpTime applied by the caller; used in the Fresh command - OpTime _lastOpTimeApplied; - - // Number of responses received so far. - size_t _actualResponses; - - // Does this node have the latest applied optime of all queriable nodes in the set? - bool _freshest; - - // Does this node have the same optime as another queriable node in the set? - bool _tied; - - // The version of the config passed to start(). + boost::scoped_ptr<Algorithm> _algorithm; + boost::scoped_ptr<ScatterGatherRunner> _runner; long long _originalConfigVersion; }; -} -} +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index a16bcc6f511..2932a080a47 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -53,10 +53,23 @@ namespace { class FreshnessCheckerTest : public mongo::unittest::Test { public: - FreshnessCheckerTest(); + StatusWith<ReplicationExecutor::EventHandle> startTest( + FreshnessChecker* checker, + const OpTime& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts); + + void runTest( + FreshnessChecker* checker, + const OpTime& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts); + void freshnessCheckerRunner(const ReplicationExecutor::CallbackData& data, FreshnessChecker* checker, - const ReplicationExecutor::EventHandle& evh, + StatusWith<ReplicationExecutor::EventHandle>* evh, const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, @@ -73,15 +86,12 @@ namespace { NetworkInterfaceMockWithMap* _net; boost::scoped_ptr<ReplicationExecutor> _executor; boost::scoped_ptr<boost::thread> _executorThread; - Status _lastStatus; private: void setUp(); void tearDown(); }; - FreshnessCheckerTest::FreshnessCheckerTest() : _lastStatus(Status::OK()) {} - void FreshnessCheckerTest::setUp() { _net = new NetworkInterfaceMockWithMap; _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); @@ -117,63 +127,77 @@ namespace { // This is necessary because the run method must be scheduled in the Replication Executor // for correct concurrency operation. void FreshnessCheckerTest::freshnessCheckerRunner( - const ReplicationExecutor::CallbackData& data, - FreshnessChecker* checker, - const ReplicationExecutor::EventHandle& evh, - const OpTime& lastOpTimeApplied, - const ReplicaSetConfig& currentConfig, - int selfIndex, - const std::vector<HostAndPort>& hosts) { + const ReplicationExecutor::CallbackData& data, + FreshnessChecker* checker, + StatusWith<ReplicationExecutor::EventHandle>* evh, + const OpTime& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts) { + invariant(data.status.isOK()); - _lastStatus = checker->start(data.executor, - evh, - lastOpTimeApplied, - currentConfig, - selfIndex, - hosts); + *evh = checker->start(data.executor, + lastOpTimeApplied, + currentConfig, + selfIndex, + hosts); + } + + StatusWith<ReplicationExecutor::EventHandle> FreshnessCheckerTest::startTest( + FreshnessChecker* checker, + const OpTime& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts) { + + StatusWith<ReplicationExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set"); + StatusWith<ReplicationExecutor::CallbackHandle> cbh = + _executor->scheduleWork( + stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, + this, + stdx::placeholders::_1, + checker, + &evh, + lastOpTimeApplied, + currentConfig, + selfIndex, + hosts)); + ASSERT_OK(cbh.getStatus()); + _executor->wait(cbh.getValue()); + ASSERT_OK(evh.getStatus()); + return evh; + } + + void FreshnessCheckerTest::runTest( + FreshnessChecker* checker, + const OpTime& lastOpTimeApplied, + const ReplicaSetConfig& currentConfig, + int selfIndex, + const std::vector<HostAndPort>& hosts) { + + _executor->waitForEvent(startTest(checker, + lastOpTimeApplied, + currentConfig, + selfIndex, + hosts).getValue()); } TEST_F(FreshnessCheckerTest, OneNode) { - // Only one node in the config. We are freshest and not tied. + // Only one node in the config. We must be freshest and not tied. ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - - Date_t now(0); - std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); FreshnessChecker checker; - - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(0,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(0, 0), config, 0, std::vector<HostAndPort>()); bool weAreFreshest(false); bool tied(false); checker.getResults(&weAreFreshest, &tied); ASSERT_TRUE(weAreFreshest); ASSERT_FALSE(tied); - } TEST_F(FreshnessCheckerTest, TwoNodes) { @@ -184,15 +208,9 @@ namespace { "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); - const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); _net->addResponse(RemoteCommandRequest(HostAndPort("h1"), @@ -206,30 +224,13 @@ namespace { "opTime" << Date_t(OpTime(0,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(0,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(0, 0), config, 0, hosts); bool weAreFreshest(false); bool tied(false); checker.getResults(&weAreFreshest, &tied); ASSERT_TRUE(weAreFreshest); ASSERT_TRUE(tied); - } @@ -241,13 +242,8 @@ namespace { "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); @@ -263,22 +259,12 @@ namespace { true /* isBlocked */); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(0,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - - _executor->wait(cbh.getValue()); - ASSERT_OK(_lastStatus); - + StatusWith<ReplicationExecutor::EventHandle> evh = startTest( + &checker, + OpTime(0, 0), + config, + 0, + hosts); _executor->shutdown(); _net->unblockAll(); _executor->waitForEvent(evh.getValue()); @@ -289,7 +275,7 @@ namespace { // This seems less than ideal, but if we are shutting down, the next phase of election // cannot proceed anyway. ASSERT_TRUE(weAreFreshest); - ASSERT_FALSE(tied); + ASSERT_FALSE(tied); } TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshest) { @@ -301,13 +287,8 @@ namespace { "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); @@ -324,23 +305,7 @@ namespace { "opTime" << Date_t(OpTime(0,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(10,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(10, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(true); @@ -360,13 +325,8 @@ namespace { "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); @@ -383,23 +343,7 @@ namespace { "opTime" << Date_t(OpTime(10,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(0,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(0, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(true); @@ -419,13 +363,8 @@ namespace { "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); @@ -441,23 +380,7 @@ namespace { "opTime" << 3))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(10,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(10, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(true); @@ -478,13 +401,8 @@ namespace { "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - hosts.push_back(config.getMemberAt(0).getHostAndPort()); hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); @@ -502,23 +420,7 @@ namespace { "opTime" << Date_t(OpTime(0,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(10,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(10, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(true); @@ -542,13 +444,9 @@ namespace { BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - for (ReplicaSetConfig::MemberIterator mem = config.membersBegin(); + for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); @@ -595,23 +493,7 @@ namespace { "opTime" << Date_t(OpTime(0,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(10,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(10, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(true); @@ -730,13 +612,9 @@ namespace { BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - for (ReplicaSetConfig::MemberIterator mem = config.membersBegin(); + for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); @@ -782,23 +660,7 @@ namespace { "opTime" << Date_t(OpTime(0,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(10,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(10, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(true); @@ -822,13 +684,9 @@ namespace { BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - for (ReplicaSetConfig::MemberIterator mem = config.membersBegin(); + for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); @@ -876,23 +734,7 @@ namespace { "opTime" << Date_t(OpTime(0,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(10,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(10, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(true); @@ -1013,13 +855,9 @@ namespace { BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); - - StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent(); - ASSERT_OK(evh.getStatus()); - Date_t now(0); std::vector<HostAndPort> hosts; - for (ReplicaSetConfig::MemberIterator mem = config.membersBegin(); + for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); @@ -1047,23 +885,7 @@ namespace { "opTime" << Date_t(OpTime(0,0).asDate())))); FreshnessChecker checker; - StatusWith<ReplicationExecutor::CallbackHandle> cbh = - _executor->scheduleWork( - stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, - this, - stdx::placeholders::_1, - &checker, - evh.getValue(), - OpTime(10,0), - config, - 0, - hosts)); - ASSERT_OK(cbh.getStatus()); - _executor->wait(cbh.getValue()); - - ASSERT_OK(_lastStatus); - - _executor->waitForEvent(evh.getValue()); + runTest(&checker, OpTime(0, 0), config, 0, hosts); stopCapturingLogMessages(); bool weAreFreshest(false); diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index c50a426aa73..a46180fd33a 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -478,16 +478,14 @@ namespace repl { * decides whether to continue election proceedings. * finishEvh is an event that is signaled when election is complete. **/ - void _onFreshnessCheckComplete(const ReplicationExecutor::CallbackData& cbData, - const ReplicationExecutor::EventHandle& finishEvh); + void _onFreshnessCheckComplete(const ReplicationExecutor::EventHandle& finishEvh); /** * Callback called when the ElectCmdRunner has completed; checks the results and * decides whether to complete the election and change state to primary. * finishEvh is an event that is signaled when election is complete. **/ - void _onElectCmdRunnerComplete(const ReplicationExecutor::CallbackData& cbData, - const ReplicationExecutor::EventHandle& finishEvh); + void _onElectCmdRunnerComplete(const ReplicationExecutor::EventHandle& finishEvh); /** * Chooses a new sync source. Must be scheduled as a callback. diff --git a/src/mongo/db/repl/repl_coordinator_impl_elect.cpp b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp index 9317dc8b289..e24e5ae6003 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_elect.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp @@ -79,67 +79,45 @@ namespace repl { if (_freshnessChecker) { // If an attempt to elect self is currently in progress, don't interrupt it. return; - // Note that the old code, in addition to prohibiting multiple in-flight election + // Note that the old code, in addition to prohibiting multiple in-flight election // attempts, used to omit processing *any* incoming knowledge about // primaries in the cluster while an election was occurring. This seemed like // overkill, so it has been removed. } - // Make an event for our internal use to help synchronize the next phase of election. - StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = cbData.executor->makeEvent(); - if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18681, nextPhaseEvh.getStatus()); - _freshnessChecker.reset(new FreshnessChecker); - StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = - cbData.executor->onEvent( - nextPhaseEvh.getValue(), + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start( + cbData.executor, + lastOpTimeApplied, + _rsConfig, + _thisMembersConfigIndex, + _topCoord->getMaybeUpHostAndPorts(), stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, - this, - stdx::placeholders::_1, + this, finishEvh)); - if (finishCheckCallback.getStatus() == ErrorCodes::ShutdownInProgress) { + if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } - fassert(18670, finishCheckCallback.getStatus()); - - Status status = _freshnessChecker->start(cbData.executor, - nextPhaseEvh.getValue(), - lastOpTimeApplied, - _rsConfig, - _thisMembersConfigIndex, - _topCoord->getMaybeUpHostAndPorts()); - if (status == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18688, status); - + fassert(18681, nextPhaseEvh.getStatus()); finishEvhGuard.Dismiss(); } void ReplicationCoordinatorImpl::_onFreshnessCheckComplete( - const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::EventHandle& finishEvh) { // Signal finish event upon early exit. - ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent, - cbData.executor, + ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent, + &_replExecutor, finishEvh)); // Make sure to reset our state on all error exit paths - ScopeGuard freshnessCheckerDeleter = - MakeObjGuard(_freshnessChecker, - &boost::scoped_ptr<FreshnessChecker>::reset, + ScopeGuard freshnessCheckerDeleter = + MakeObjGuard(_freshnessChecker, + &boost::scoped_ptr<FreshnessChecker>::reset, static_cast<FreshnessChecker*>(NULL)); - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - - Date_t now(cbData.executor->now()); + Date_t now(_replExecutor.now()); bool weAreFreshest; bool tied; _freshnessChecker->getResults(&weAreFreshest, &tied); @@ -171,48 +149,33 @@ namespace repl { _electCmdRunner.reset(new ElectCmdRunner); StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start( - cbData.executor, + &_replExecutor, _rsConfig, _thisMembersConfigIndex, - _topCoord->getMaybeUpHostAndPorts()); - if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { - return; - } - fassert(18685, nextPhaseEvh.getStatus()); - - StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = - cbData.executor->onEvent( - nextPhaseEvh.getValue(), + _topCoord->getMaybeUpHostAndPorts(), stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this, - stdx::placeholders::_1, finishEvh)); - if (finishCheckCallback.getStatus() == ErrorCodes::ShutdownInProgress) { + if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } - fassert(18671, finishCheckCallback.getStatus()); - + fassert(18685, nextPhaseEvh.getStatus()); freshnessCheckerDeleter.Dismiss(); finishEvhGuard.Dismiss(); } void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete( - const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::EventHandle& finishEvh) { // Signal finish event and cleanup, upon function exit in all cases. - ON_BLOCK_EXIT(&ReplicationExecutor::signalEvent, cbData.executor, finishEvh); - ON_BLOCK_EXIT_OBJ(_freshnessChecker, - &boost::scoped_ptr<FreshnessChecker>::reset, + ON_BLOCK_EXIT(&ReplicationExecutor::signalEvent, &_replExecutor, finishEvh); + ON_BLOCK_EXIT_OBJ(_freshnessChecker, + &boost::scoped_ptr<FreshnessChecker>::reset, static_cast<FreshnessChecker*>(NULL)); - ON_BLOCK_EXIT_OBJ(_electCmdRunner, - &boost::scoped_ptr<ElectCmdRunner>::reset, + ON_BLOCK_EXIT_OBJ(_electCmdRunner, + &boost::scoped_ptr<ElectCmdRunner>::reset, static_cast<ElectCmdRunner*>(NULL)); - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - int receivedVotes = _electCmdRunner->getReceivedVotes(); if (receivedVotes < _rsConfig.getMajorityVoteCount()) { @@ -225,7 +188,7 @@ namespace repl { log() << "replSet config version changed during our election, ignoring result"; return; } - + log() << "replSet election succeeded, assuming primary role"; // |