summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2014-09-13 22:43:51 -0400
committerAndy Schwerin <schwerin@mongodb.com>2014-09-15 19:18:21 -0400
commitd427859985fffb76278a104d46a1a913528dbc4a (patch)
tree7a4f3f24bfb1e3a59e3fff458c0be31687704bfe
parentf8a3f964eda2be32154dd8afbfad9bdf6283e82e (diff)
downloadmongo-d427859985fffb76278a104d46a1a913528dbc4a.tar.gz
SERVER-15248 Convert FreshnessChecker to use a ScatterGatherAlgorithm.
Also, use completion functions instead of scheduling work on events to handle election phase transitions in repl_coordinator_impl_elect.cpp. Leads to less scheduling, and easier-to-follow code.
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.cpp5
-rw-r--r--src/mongo/db/repl/elect_cmd_runner.h3
-rw-r--r--src/mongo/db/repl/freshness_checker.cpp156
-rw-r--r--src/mongo/db/repl/freshness_checker.h89
-rw-r--r--src/mongo/db/repl/freshness_checker_test.cpp356
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h6
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl_elect.cpp89
7 files changed, 239 insertions, 465 deletions
diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp
index 5afbeada778..6772e50e004 100644
--- a/src/mongo/db/repl/elect_cmd_runner.cpp
+++ b/src/mongo/db/repl/elect_cmd_runner.cpp
@@ -134,12 +134,13 @@ namespace repl {
ReplicationExecutor* executor,
const ReplicaSetConfig& currentConfig,
int selfIndex,
- const std::vector<HostAndPort>& targets) {
+ const std::vector<HostAndPort>& targets,
+ const stdx::function<void ()>& onCompletion) {
const long long round(executor->nextRandomInt64(std::numeric_limits<int64_t>::max()));
_algorithm.reset(new Algorithm(currentConfig, selfIndex, targets, round));
_runner.reset(new ScatterGatherRunner(_algorithm.get()));
- return _runner->start(executor);
+ return _runner->start(executor, onCompletion);
}
int ElectCmdRunner::getReceivedVotes() const {
diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h
index 67518f70fee..bf1d062f58c 100644
--- a/src/mongo/db/repl/elect_cmd_runner.h
+++ b/src/mongo/db/repl/elect_cmd_runner.h
@@ -92,7 +92,8 @@ namespace repl {
ReplicationExecutor* executor,
const ReplicaSetConfig& currentConfig,
int selfIndex,
- const std::vector<HostAndPort>& targets);
+ const std::vector<HostAndPort>& targets,
+ const stdx::function<void ()>& onCompletion = stdx::function<void ()>());
/**
* Returns the number of received votes. Only valid to call after
diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp
index 01c85ec592b..36eed2f5535 100644
--- a/src/mongo/db/repl/freshness_checker.cpp
+++ b/src/mongo/db/repl/freshness_checker.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/repl/member_heartbeat_data.h"
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/scatter_gather_runner.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
@@ -44,98 +45,74 @@
namespace mongo {
namespace repl {
- FreshnessChecker::FreshnessChecker() : _actualResponses(0),
- _freshest(true),
- _tied(false),
- _originalConfigVersion(0) {
+ FreshnessChecker::Algorithm::Algorithm(
+ OpTime lastOpTimeApplied,
+ const ReplicaSetConfig& rsConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets) :
+ _actualResponses(0),
+ _freshest(true),
+ _tied(false),
+ _lastOpTimeApplied(lastOpTimeApplied),
+ _rsConfig(rsConfig),
+ _selfIndex(selfIndex),
+ _targets(targets) {
}
+ FreshnessChecker::Algorithm::~Algorithm() {}
- Status FreshnessChecker::start(
- ReplicationExecutor* executor,
- const ReplicationExecutor::EventHandle& evh,
- const OpTime& lastOpTimeApplied,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& hosts) {
+ std::vector<ReplicationExecutor::RemoteCommandRequest>
+ FreshnessChecker::Algorithm::getRequests() const {
- _lastOpTimeApplied = lastOpTimeApplied;
- _freshest = true;
- _originalConfigVersion = currentConfig.getConfigVersion();
- _sufficientResponsesReceived = evh;
+ const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex);
// gather all not-down nodes, get their fullnames(or hostandport's)
// schedule fresh command for each node
- BSONObj replSetFreshCmd = BSON("replSetFresh" << 1 <<
- "set" << currentConfig.getReplSetName() <<
- "opTime" << Date_t(lastOpTimeApplied.asDate()) <<
- "who" << currentConfig.getMemberAt(selfIndex)
- .getHostAndPort().toString() <<
- "cfgver" << currentConfig.getConfigVersion() <<
- "id" << currentConfig.getMemberAt(selfIndex).getId());
- for (std::vector<HostAndPort>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) {
- const StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- executor->scheduleRemoteCommand(
- ReplicationExecutor::RemoteCommandRequest(
+ const BSONObj replSetFreshCmd =
+ BSON("replSetFresh" << 1 <<
+ "set" << _rsConfig.getReplSetName() <<
+ "opTime" << Date_t(_lastOpTimeApplied.asDate()) <<
+ "who" << selfConfig.getHostAndPort().toString() <<
+ "cfgver" << _rsConfig.getConfigVersion() <<
+ "id" << selfConfig.getId());
+
+ std::vector<ReplicationExecutor::RemoteCommandRequest> requests;
+ for (std::vector<HostAndPort>::const_iterator it = _targets.begin();
+ it != _targets.end();
+ ++it) {
+ invariant(*it != selfConfig.getHostAndPort());
+ requests.push_back(ReplicationExecutor::RemoteCommandRequest(
*it,
"admin",
replSetFreshCmd,
- Milliseconds(30*1000)), // trying to match current Socket timeout
- stdx::bind(&FreshnessChecker::_onReplSetFreshResponse,
- this,
- stdx::placeholders::_1));
- if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return cbh.getStatus();
- }
- fassert(18682, cbh.getStatus());
-
- _responseCallbacks.push_back(cbh.getValue());
- }
-
- if (_responseCallbacks.size() == 0) {
- _signalSufficientResponsesReceived(executor);
+ Milliseconds(30*1000))); // trying to match current Socket timeout
}
- return Status::OK();
+ return requests;
}
- void FreshnessChecker::_onReplSetFreshResponse(
- const ReplicationExecutor::RemoteCommandCallbackData& cbData) {
+ void FreshnessChecker::Algorithm::processResponse(
+ const ReplicationExecutor::RemoteCommandRequest& request,
+ const ResponseStatus& response) {
++_actualResponses;
- if (cbData.response.getStatus() == ErrorCodes::CallbackCanceled) {
- return;
- }
-
- if (!cbData.response.isOK()) {
+ if (!response.isOK()) {
// command failed, so nothing further to do.
- if (_actualResponses == _responseCallbacks.size()) {
- _signalSufficientResponsesReceived(cbData.executor);
- }
return;
}
- ScopeGuard sufficientResponsesReceivedCaller =
- MakeObjGuard(*this,
- &FreshnessChecker::_signalSufficientResponsesReceived,
- cbData.executor);
-
- BSONObj res = cbData.response.getValue().data;
+ const BSONObj res = response.getValue().data;
if (res["fresher"].trueValue()) {
log() << "not electing self, we are not freshest";
_freshest = false;
return;
}
-
+
if (res["opTime"].type() != mongo::Date) {
- error() << "wrong type for opTime argument in replSetFresh response: " <<
+ error() << "wrong type for opTime argument in replSetFresh response: " <<
typeName(res["opTime"].type());
_freshest = false;
- if (_actualResponses != _responseCallbacks.size()) {
- // More responses are still pending.
- sufficientResponsesReceivedCaller.Dismiss();
- }
return;
}
OpTime remoteTime(res["opTime"].date());
@@ -147,54 +124,57 @@ namespace repl {
_freshest = false;
return;
}
-
+
if (res["veto"].trueValue()) {
BSONElement msg = res["errmsg"];
if (!msg.eoo()) {
- log() << "not electing self, " << cbData.request.target.toString() <<
+ log() << "not electing self, " << request.target.toString() <<
" would veto with '" << msg << "'";
}
else {
- log() << "not electing self, " << cbData.request.target.toString() <<
+ log() << "not electing self, " << request.target.toString() <<
" would veto";
}
_freshest = false;
return;
}
-
- if (_actualResponses != _responseCallbacks.size()) {
- // More responses are still pending.
- sufficientResponsesReceivedCaller.Dismiss();
- }
}
- void FreshnessChecker::_signalSufficientResponsesReceived(ReplicationExecutor* executor) {
- if (_sufficientResponsesReceived.isValid()) {
-
- // Cancel all the command callbacks,
- // so that they do not attempt to access FreshnessChecker
- // state after this callback completes.
- std::for_each(_responseCallbacks.begin(),
- _responseCallbacks.end(),
- stdx::bind(&ReplicationExecutor::cancel,
- executor,
- stdx::placeholders::_1));
-
- executor->signalEvent(_sufficientResponsesReceived);
- _sufficientResponsesReceived = ReplicationExecutor::EventHandle();
-
+ bool FreshnessChecker::Algorithm::hasReceivedSufficientResponses() const {
+ if (!_freshest) {
+ return true;
+ }
+ if (_actualResponses == _targets.size()) {
+ return true;
}
+ return false;
}
void FreshnessChecker::getResults(bool* freshest, bool* tied) const {
- *freshest = _freshest;
- *tied = _tied;
+ *freshest = _algorithm->isFreshest();
+ *tied = _algorithm->isTiedForFreshest();
}
long long FreshnessChecker::getOriginalConfigVersion() const {
return _originalConfigVersion;
}
+ FreshnessChecker::FreshnessChecker() {}
+ FreshnessChecker::~FreshnessChecker() {}
+
+ StatusWith<ReplicationExecutor::EventHandle> FreshnessChecker::start(
+ ReplicationExecutor* executor,
+ const OpTime& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets,
+ const stdx::function<void ()>& onCompletion) {
+
+ _originalConfigVersion = currentConfig.getConfigVersion();
+ _algorithm.reset(new Algorithm(lastOpTimeApplied, currentConfig, selfIndex, targets));
+ _runner.reset(new ScatterGatherRunner(_algorithm.get()));
+ return _runner->start(executor, onCompletion);
+ }
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h
index a0863f81fdf..e5f870c9b3d 100644
--- a/src/mongo/db/repl/freshness_checker.h
+++ b/src/mongo/db/repl/freshness_checker.h
@@ -31,8 +31,10 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
-#include "mongo/db/repl/replication_executor.h"
#include "mongo/bson/optime.h"
+#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/replica_set_config.h"
+#include "mongo/db/repl/scatter_gather_algorithm.h"
namespace mongo {
@@ -41,12 +43,49 @@ namespace mongo {
namespace repl {
class ReplicaSetConfig;
- class MemberHeartbeatData;
+ class ScatterGatherRunner;
class FreshnessChecker {
MONGO_DISALLOW_COPYING(FreshnessChecker);
public:
+ class Algorithm : public ScatterGatherAlgorithm {
+ public:
+ Algorithm(OpTime lastOpTimeApplied,
+ const ReplicaSetConfig& rsConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& targets);
+ virtual ~Algorithm();
+ virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const;
+ virtual void processResponse(
+ const ReplicationExecutor::RemoteCommandRequest& request,
+ const ResponseStatus& response);
+ virtual bool hasReceivedSufficientResponses() const;
+
+ bool isFreshest() const { return _freshest; }
+ bool isTiedForFreshest() const { return _tied; }
+
+ private:
+ // Number of responses received so far.
+ size_t _actualResponses;
+
+ // Does this node have the latest applied optime of all queriable nodes in the set?
+ bool _freshest;
+
+ // Does this node have the same optime as another queriable node in the set?
+ bool _tied;
+
+ // Last OpTime applied by the caller; used in the Fresh command
+ const OpTime _lastOpTimeApplied;
+
+ const ReplicaSetConfig _rsConfig;
+
+ const int _selfIndex;
+
+ const std::vector<HostAndPort> _targets;
+ };
+
FreshnessChecker();
+ virtual ~FreshnessChecker();
/**
* Begins the process of sending replSetFresh commands to all non-DOWN nodes
@@ -57,13 +96,13 @@ namespace repl {
* callbacks that it schedules.
* If this function returns Status::OK(), evh is then guaranteed to be signaled.
**/
- Status start(
+ StatusWith<ReplicationExecutor::EventHandle> start(
ReplicationExecutor* executor,
- const ReplicationExecutor::EventHandle& evh,
- const OpTime& lastOpTimeApplied,
+ const OpTime& lastOpTimeApplied,
const ReplicaSetConfig& currentConfig,
int selfIndex,
- const std::vector<HostAndPort>& hosts);
+ const std::vector<HostAndPort>& targets,
+ const stdx::function<void ()>& onCompletion = stdx::function<void ()>());
/**
* Returns whether this node is the freshest of all non-DOWN nodes in the set,
@@ -80,40 +119,10 @@ namespace repl {
long long getOriginalConfigVersion() const;
private:
- /**
- * Callback that runs after a replSetFresh command returns.
- * Adjusts _tied and _freshest flags appropriately, and
- * signals completion if we have received the last expected response.
- */
- void _onReplSetFreshResponse(const ReplicationExecutor::RemoteCommandCallbackData& cbData);
-
- /**
- * Signals _sufficientResponsesReceived event, if it hasn't been already.
- */
- void _signalSufficientResponsesReceived(ReplicationExecutor* executor);
-
- // Event used to signal completion of the FreshnessChecker's commands.
- ReplicationExecutor::EventHandle _sufficientResponsesReceived;
-
- // Vector of command callbacks scheduled by start() and
- // canceled by _onFreshnessCheckComplete().
- std::vector<ReplicationExecutor::CallbackHandle> _responseCallbacks;
-
- // Last OpTime applied by the caller; used in the Fresh command
- OpTime _lastOpTimeApplied;
-
- // Number of responses received so far.
- size_t _actualResponses;
-
- // Does this node have the latest applied optime of all queriable nodes in the set?
- bool _freshest;
-
- // Does this node have the same optime as another queriable node in the set?
- bool _tied;
-
- // The version of the config passed to start().
+ boost::scoped_ptr<Algorithm> _algorithm;
+ boost::scoped_ptr<ScatterGatherRunner> _runner;
long long _originalConfigVersion;
};
-}
-}
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp
index a16bcc6f511..2932a080a47 100644
--- a/src/mongo/db/repl/freshness_checker_test.cpp
+++ b/src/mongo/db/repl/freshness_checker_test.cpp
@@ -53,10 +53,23 @@ namespace {
class FreshnessCheckerTest : public mongo::unittest::Test {
public:
- FreshnessCheckerTest();
+ StatusWith<ReplicationExecutor::EventHandle> startTest(
+ FreshnessChecker* checker,
+ const OpTime& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& hosts);
+
+ void runTest(
+ FreshnessChecker* checker,
+ const OpTime& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& hosts);
+
void freshnessCheckerRunner(const ReplicationExecutor::CallbackData& data,
FreshnessChecker* checker,
- const ReplicationExecutor::EventHandle& evh,
+ StatusWith<ReplicationExecutor::EventHandle>* evh,
const OpTime& lastOpTimeApplied,
const ReplicaSetConfig& currentConfig,
int selfIndex,
@@ -73,15 +86,12 @@ namespace {
NetworkInterfaceMockWithMap* _net;
boost::scoped_ptr<ReplicationExecutor> _executor;
boost::scoped_ptr<boost::thread> _executorThread;
- Status _lastStatus;
private:
void setUp();
void tearDown();
};
- FreshnessCheckerTest::FreshnessCheckerTest() : _lastStatus(Status::OK()) {}
-
void FreshnessCheckerTest::setUp() {
_net = new NetworkInterfaceMockWithMap;
_executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */));
@@ -117,63 +127,77 @@ namespace {
// This is necessary because the run method must be scheduled in the Replication Executor
// for correct concurrency operation.
void FreshnessCheckerTest::freshnessCheckerRunner(
- const ReplicationExecutor::CallbackData& data,
- FreshnessChecker* checker,
- const ReplicationExecutor::EventHandle& evh,
- const OpTime& lastOpTimeApplied,
- const ReplicaSetConfig& currentConfig,
- int selfIndex,
- const std::vector<HostAndPort>& hosts) {
+ const ReplicationExecutor::CallbackData& data,
+ FreshnessChecker* checker,
+ StatusWith<ReplicationExecutor::EventHandle>* evh,
+ const OpTime& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& hosts) {
+
invariant(data.status.isOK());
- _lastStatus = checker->start(data.executor,
- evh,
- lastOpTimeApplied,
- currentConfig,
- selfIndex,
- hosts);
+ *evh = checker->start(data.executor,
+ lastOpTimeApplied,
+ currentConfig,
+ selfIndex,
+ hosts);
+ }
+
+ StatusWith<ReplicationExecutor::EventHandle> FreshnessCheckerTest::startTest(
+ FreshnessChecker* checker,
+ const OpTime& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& hosts) {
+
+ StatusWith<ReplicationExecutor::EventHandle> evh(ErrorCodes::InternalError, "Not set");
+ StatusWith<ReplicationExecutor::CallbackHandle> cbh =
+ _executor->scheduleWork(
+ stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
+ this,
+ stdx::placeholders::_1,
+ checker,
+ &evh,
+ lastOpTimeApplied,
+ currentConfig,
+ selfIndex,
+ hosts));
+ ASSERT_OK(cbh.getStatus());
+ _executor->wait(cbh.getValue());
+ ASSERT_OK(evh.getStatus());
+ return evh;
+ }
+
+ void FreshnessCheckerTest::runTest(
+ FreshnessChecker* checker,
+ const OpTime& lastOpTimeApplied,
+ const ReplicaSetConfig& currentConfig,
+ int selfIndex,
+ const std::vector<HostAndPort>& hosts) {
+
+ _executor->waitForEvent(startTest(checker,
+ lastOpTimeApplied,
+ currentConfig,
+ selfIndex,
+ hosts).getValue());
}
TEST_F(FreshnessCheckerTest, OneNode) {
- // Only one node in the config. We are freshest and not tied.
+ // Only one node in the config. We must be freshest and not tied.
ReplicaSetConfig config = assertMakeRSConfig(
BSON("_id" << "rs0" <<
"version" << 1 <<
"members" << BSON_ARRAY(
BSON("_id" << 1 << "host" << "h1"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
-
- Date_t now(0);
- std::vector<HostAndPort> hosts;
- hosts.push_back(config.getMemberAt(0).getHostAndPort());
FreshnessChecker checker;
-
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(0,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(0, 0), config, 0, std::vector<HostAndPort>());
bool weAreFreshest(false);
bool tied(false);
checker.getResults(&weAreFreshest, &tied);
ASSERT_TRUE(weAreFreshest);
ASSERT_FALSE(tied);
-
}
TEST_F(FreshnessCheckerTest, TwoNodes) {
@@ -184,15 +208,9 @@ namespace {
"members" << BSON_ARRAY(
BSON("_id" << 1 << "host" << "h0") <<
BSON("_id" << 2 << "host" << "h1"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- hosts.push_back(config.getMemberAt(0).getHostAndPort());
hosts.push_back(config.getMemberAt(1).getHostAndPort());
-
const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0);
_net->addResponse(RemoteCommandRequest(HostAndPort("h1"),
@@ -206,30 +224,13 @@ namespace {
"opTime" << Date_t(OpTime(0,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(0,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(0, 0), config, 0, hosts);
bool weAreFreshest(false);
bool tied(false);
checker.getResults(&weAreFreshest, &tied);
ASSERT_TRUE(weAreFreshest);
ASSERT_TRUE(tied);
-
}
@@ -241,13 +242,8 @@ namespace {
"members" << BSON_ARRAY(
BSON("_id" << 1 << "host" << "h0") <<
BSON("_id" << 2 << "host" << "h1"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- hosts.push_back(config.getMemberAt(0).getHostAndPort());
hosts.push_back(config.getMemberAt(1).getHostAndPort());
const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0);
@@ -263,22 +259,12 @@ namespace {
true /* isBlocked */);
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(0,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
-
- _executor->wait(cbh.getValue());
- ASSERT_OK(_lastStatus);
-
+ StatusWith<ReplicationExecutor::EventHandle> evh = startTest(
+ &checker,
+ OpTime(0, 0),
+ config,
+ 0,
+ hosts);
_executor->shutdown();
_net->unblockAll();
_executor->waitForEvent(evh.getValue());
@@ -289,7 +275,7 @@ namespace {
// This seems less than ideal, but if we are shutting down, the next phase of election
// cannot proceed anyway.
ASSERT_TRUE(weAreFreshest);
- ASSERT_FALSE(tied);
+ ASSERT_FALSE(tied);
}
TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshest) {
@@ -301,13 +287,8 @@ namespace {
"members" << BSON_ARRAY(
BSON("_id" << 1 << "host" << "h0") <<
BSON("_id" << 2 << "host" << "h1"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- hosts.push_back(config.getMemberAt(0).getHostAndPort());
hosts.push_back(config.getMemberAt(1).getHostAndPort());
const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0);
@@ -324,23 +305,7 @@ namespace {
"opTime" << Date_t(OpTime(0,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(10,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(10, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(true);
@@ -360,13 +325,8 @@ namespace {
"members" << BSON_ARRAY(
BSON("_id" << 1 << "host" << "h0") <<
BSON("_id" << 2 << "host" << "h1"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- hosts.push_back(config.getMemberAt(0).getHostAndPort());
hosts.push_back(config.getMemberAt(1).getHostAndPort());
const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0);
@@ -383,23 +343,7 @@ namespace {
"opTime" << Date_t(OpTime(10,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(0,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(0, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(true);
@@ -419,13 +363,8 @@ namespace {
"members" << BSON_ARRAY(
BSON("_id" << 1 << "host" << "h0") <<
BSON("_id" << 2 << "host" << "h1"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- hosts.push_back(config.getMemberAt(0).getHostAndPort());
hosts.push_back(config.getMemberAt(1).getHostAndPort());
const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0);
@@ -441,23 +380,7 @@ namespace {
"opTime" << 3)));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(10,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(10, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(true);
@@ -478,13 +401,8 @@ namespace {
"members" << BSON_ARRAY(
BSON("_id" << 1 << "host" << "h0") <<
BSON("_id" << 2 << "host" << "h1"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- hosts.push_back(config.getMemberAt(0).getHostAndPort());
hosts.push_back(config.getMemberAt(1).getHostAndPort());
const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0);
@@ -502,23 +420,7 @@ namespace {
"opTime" << Date_t(OpTime(0,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(10,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(10, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(true);
@@ -542,13 +444,9 @@ namespace {
BSON("_id" << 3 << "host" << "h2") <<
BSON("_id" << 4 << "host" << "h3") <<
BSON("_id" << 5 << "host" << "h4"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- for (ReplicaSetConfig::MemberIterator mem = config.membersBegin();
+ for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin();
mem != config.membersEnd();
++mem) {
hosts.push_back(mem->getHostAndPort());
@@ -595,23 +493,7 @@ namespace {
"opTime" << Date_t(OpTime(0,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(10,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(10, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(true);
@@ -730,13 +612,9 @@ namespace {
BSON("_id" << 3 << "host" << "h2") <<
BSON("_id" << 4 << "host" << "h3") <<
BSON("_id" << 5 << "host" << "h4"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- for (ReplicaSetConfig::MemberIterator mem = config.membersBegin();
+ for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin();
mem != config.membersEnd();
++mem) {
hosts.push_back(mem->getHostAndPort());
@@ -782,23 +660,7 @@ namespace {
"opTime" << Date_t(OpTime(0,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(10,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(10, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(true);
@@ -822,13 +684,9 @@ namespace {
BSON("_id" << 3 << "host" << "h2") <<
BSON("_id" << 4 << "host" << "h3") <<
BSON("_id" << 5 << "host" << "h4"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- for (ReplicaSetConfig::MemberIterator mem = config.membersBegin();
+ for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin();
mem != config.membersEnd();
++mem) {
hosts.push_back(mem->getHostAndPort());
@@ -876,23 +734,7 @@ namespace {
"opTime" << Date_t(OpTime(0,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(10,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(10, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(true);
@@ -1013,13 +855,9 @@ namespace {
BSON("_id" << 3 << "host" << "h2") <<
BSON("_id" << 4 << "host" << "h3") <<
BSON("_id" << 5 << "host" << "h4"))));
-
- StatusWith<ReplicationExecutor::EventHandle> evh = _executor->makeEvent();
- ASSERT_OK(evh.getStatus());
- Date_t now(0);
std::vector<HostAndPort> hosts;
- for (ReplicaSetConfig::MemberIterator mem = config.membersBegin();
+ for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin();
mem != config.membersEnd();
++mem) {
hosts.push_back(mem->getHostAndPort());
@@ -1047,23 +885,7 @@ namespace {
"opTime" << Date_t(OpTime(0,0).asDate()))));
FreshnessChecker checker;
- StatusWith<ReplicationExecutor::CallbackHandle> cbh =
- _executor->scheduleWork(
- stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner,
- this,
- stdx::placeholders::_1,
- &checker,
- evh.getValue(),
- OpTime(10,0),
- config,
- 0,
- hosts));
- ASSERT_OK(cbh.getStatus());
- _executor->wait(cbh.getValue());
-
- ASSERT_OK(_lastStatus);
-
- _executor->waitForEvent(evh.getValue());
+ runTest(&checker, OpTime(0, 0), config, 0, hosts);
stopCapturingLogMessages();
bool weAreFreshest(false);
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index c50a426aa73..a46180fd33a 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -478,16 +478,14 @@ namespace repl {
* decides whether to continue election proceedings.
* finishEvh is an event that is signaled when election is complete.
**/
- void _onFreshnessCheckComplete(const ReplicationExecutor::CallbackData& cbData,
- const ReplicationExecutor::EventHandle& finishEvh);
+ void _onFreshnessCheckComplete(const ReplicationExecutor::EventHandle& finishEvh);
/**
* Callback called when the ElectCmdRunner has completed; checks the results and
* decides whether to complete the election and change state to primary.
* finishEvh is an event that is signaled when election is complete.
**/
- void _onElectCmdRunnerComplete(const ReplicationExecutor::CallbackData& cbData,
- const ReplicationExecutor::EventHandle& finishEvh);
+ void _onElectCmdRunnerComplete(const ReplicationExecutor::EventHandle& finishEvh);
/**
* Chooses a new sync source. Must be scheduled as a callback.
diff --git a/src/mongo/db/repl/repl_coordinator_impl_elect.cpp b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp
index 9317dc8b289..e24e5ae6003 100644
--- a/src/mongo/db/repl/repl_coordinator_impl_elect.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp
@@ -79,67 +79,45 @@ namespace repl {
if (_freshnessChecker) {
// If an attempt to elect self is currently in progress, don't interrupt it.
return;
- // Note that the old code, in addition to prohibiting multiple in-flight election
+ // Note that the old code, in addition to prohibiting multiple in-flight election
// attempts, used to omit processing *any* incoming knowledge about
// primaries in the cluster while an election was occurring. This seemed like
// overkill, so it has been removed.
}
- // Make an event for our internal use to help synchronize the next phase of election.
- StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = cbData.executor->makeEvent();
- if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(18681, nextPhaseEvh.getStatus());
-
_freshnessChecker.reset(new FreshnessChecker);
- StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback =
- cbData.executor->onEvent(
- nextPhaseEvh.getValue(),
+ StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _freshnessChecker->start(
+ cbData.executor,
+ lastOpTimeApplied,
+ _rsConfig,
+ _thisMembersConfigIndex,
+ _topCoord->getMaybeUpHostAndPorts(),
stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete,
- this,
- stdx::placeholders::_1,
+ this,
finishEvh));
- if (finishCheckCallback.getStatus() == ErrorCodes::ShutdownInProgress) {
+ if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
- fassert(18670, finishCheckCallback.getStatus());
-
- Status status = _freshnessChecker->start(cbData.executor,
- nextPhaseEvh.getValue(),
- lastOpTimeApplied,
- _rsConfig,
- _thisMembersConfigIndex,
- _topCoord->getMaybeUpHostAndPorts());
- if (status == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(18688, status);
-
+ fassert(18681, nextPhaseEvh.getStatus());
finishEvhGuard.Dismiss();
}
void ReplicationCoordinatorImpl::_onFreshnessCheckComplete(
- const ReplicationExecutor::CallbackData& cbData,
const ReplicationExecutor::EventHandle& finishEvh) {
// Signal finish event upon early exit.
- ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent,
- cbData.executor,
+ ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent,
+ &_replExecutor,
finishEvh));
// Make sure to reset our state on all error exit paths
- ScopeGuard freshnessCheckerDeleter =
- MakeObjGuard(_freshnessChecker,
- &boost::scoped_ptr<FreshnessChecker>::reset,
+ ScopeGuard freshnessCheckerDeleter =
+ MakeObjGuard(_freshnessChecker,
+ &boost::scoped_ptr<FreshnessChecker>::reset,
static_cast<FreshnessChecker*>(NULL));
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
-
- Date_t now(cbData.executor->now());
+ Date_t now(_replExecutor.now());
bool weAreFreshest;
bool tied;
_freshnessChecker->getResults(&weAreFreshest, &tied);
@@ -171,48 +149,33 @@ namespace repl {
_electCmdRunner.reset(new ElectCmdRunner);
StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = _electCmdRunner->start(
- cbData.executor,
+ &_replExecutor,
_rsConfig,
_thisMembersConfigIndex,
- _topCoord->getMaybeUpHostAndPorts());
- if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
- return;
- }
- fassert(18685, nextPhaseEvh.getStatus());
-
- StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback =
- cbData.executor->onEvent(
- nextPhaseEvh.getValue(),
+ _topCoord->getMaybeUpHostAndPorts(),
stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete,
this,
- stdx::placeholders::_1,
finishEvh));
- if (finishCheckCallback.getStatus() == ErrorCodes::ShutdownInProgress) {
+ if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
- fassert(18671, finishCheckCallback.getStatus());
-
+ fassert(18685, nextPhaseEvh.getStatus());
freshnessCheckerDeleter.Dismiss();
finishEvhGuard.Dismiss();
}
void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete(
- const ReplicationExecutor::CallbackData& cbData,
const ReplicationExecutor::EventHandle& finishEvh) {
// Signal finish event and cleanup, upon function exit in all cases.
- ON_BLOCK_EXIT(&ReplicationExecutor::signalEvent, cbData.executor, finishEvh);
- ON_BLOCK_EXIT_OBJ(_freshnessChecker,
- &boost::scoped_ptr<FreshnessChecker>::reset,
+ ON_BLOCK_EXIT(&ReplicationExecutor::signalEvent, &_replExecutor, finishEvh);
+ ON_BLOCK_EXIT_OBJ(_freshnessChecker,
+ &boost::scoped_ptr<FreshnessChecker>::reset,
static_cast<FreshnessChecker*>(NULL));
- ON_BLOCK_EXIT_OBJ(_electCmdRunner,
- &boost::scoped_ptr<ElectCmdRunner>::reset,
+ ON_BLOCK_EXIT_OBJ(_electCmdRunner,
+ &boost::scoped_ptr<ElectCmdRunner>::reset,
static_cast<ElectCmdRunner*>(NULL));
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
-
int receivedVotes = _electCmdRunner->getReceivedVotes();
if (receivedVotes < _rsConfig.getMajorityVoteCount()) {
@@ -225,7 +188,7 @@ namespace repl {
log() << "replSet config version changed during our election, ignoring result";
return;
}
-
+
log() << "replSet election succeeded, assuming primary role";
//