diff options
author | Eric Milkie <milkie@10gen.com> | 2014-08-11 13:44:49 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2014-08-11 15:17:52 -0400 |
commit | 4064ced405a51bedcb9da9d90f0320023e23728c (patch) | |
tree | 2f928efd599a6498db0b17ea7fbcc1d0e52a0fa3 | |
parent | 08873e79890885f83d0a9ae1de7ea9f4769cd83d (diff) | |
download | mongo-4064ced405a51bedcb9da9d90f0320023e23728c.tar.gz |
SERVER-14788 Wire initial configuration loading into startReplication.
via Andy Schwerin.
19 files changed, 398 insertions, 149 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index b5831a85733..dd2b2b2ae82 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -276,7 +276,10 @@ namespace mongo { server->setupSockets(); logStartup(); - repl::getGlobalReplicationCoordinator()->startReplication(); + { + OperationContextImpl txn; + repl::getGlobalReplicationCoordinator()->startReplication(&txn); + } if (serverGlobalParams.isHttpInterfaceEnabled) boost::thread web(stdx::bind(&webServerThread, new RestAdminAccess())); // takes ownership diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h index ac79c69e07b..93243194aad 100644 --- a/src/mongo/db/repl/repl_coordinator.h +++ b/src/mongo/db/repl/repl_coordinator.h @@ -93,7 +93,7 @@ namespace repl { * components of the replication system to start up whatever threads and do whatever * initialization they need. */ - virtual void startReplication() = 0; + virtual void startReplication(OperationContext* txn) = 0; /** * Does whatever cleanup is required to stop replication, including instructing the other diff --git a/src/mongo/db/repl/repl_coordinator_external_state.h b/src/mongo/db/repl/repl_coordinator_external_state.h index f994e91281e..9e27723e122 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state.h +++ b/src/mongo/db/repl/repl_coordinator_external_state.h @@ -32,9 +32,11 @@ namespace mongo { - struct HostAndPort; + class BSONObj; class OID; class OperationContext; + struct HostAndPort; + template <typename T> class StatusWith; namespace repl { @@ -82,7 +84,7 @@ namespace repl { * exist or our hostname doesn't match what was recorded in local.me, generates a new OID * to use as our RID, stores it in local.me, and returns it. */ - virtual OID ensureMe() = 0; + virtual OID ensureMe(OperationContext*) = 0; /** * Returns true if "host" is one of the network identities of this node. @@ -90,6 +92,11 @@ namespace repl { virtual bool isSelf(const HostAndPort& host) = 0; /** + * Gets the replica set config document from local storage, or returns an error. + */ + virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn) = 0; + + /** * Returns the HostAndPort of the remote client connected to us that initiated the operation * represented by "txn". */ diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp index 717e9bd305b..83f13a6ddf6 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp @@ -25,8 +25,6 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - -#include "mongo/platform/basic.h" #include "mongo/platform/basic.h" @@ -34,11 +32,11 @@ #include <string> +#include "mongo/base/status_with.h" #include "mongo/bson/oid.h" #include "mongo/db/client.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" -#include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/isself.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/sock.h" @@ -65,29 +63,28 @@ namespace repl { _syncSourceFeedback.forwardSlaveProgress(); } - OID ReplicationCoordinatorExternalStateImpl::ensureMe() { + OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) { std::string myname = getHostName(); OID myRID; { - OperationContextImpl txn; - Client::WriteContext ctx(&txn, "local"); + Client::WriteContext ctx(txn, "local"); BSONObj me; // local.me is an identifier for a server for getLastError w:2+ - if (!Helpers::getSingleton(&txn, "local.me", me) || + if (!Helpers::getSingleton(txn, "local.me", me) || !me.hasField("host") || me["host"].String() != myname) { myRID = OID::gen(); // clean out local.me - Helpers::emptyCollection(&txn, "local.me"); + Helpers::emptyCollection(txn, "local.me"); // repopulate BSONObjBuilder b; b.append("_id", myRID); b.append("host", myname); - Helpers::putSingleton(&txn, "local.me", b.done()); + Helpers::putSingleton(txn, "local.me", b.done()); } else { myRID = me["_id"].OID(); } @@ -96,6 +93,23 @@ namespace repl { return myRID; } + StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument( + OperationContext* txn) { + try { + BSONObj config; + Client::ReadContext ctx(txn, "local"); + if (!Helpers::getSingleton(txn, "local.system.replset", config)) { + return StatusWith<BSONObj>( + ErrorCodes::NoMatchingDocument, + "Did not find replica set configuration document in local.system.replset"); + } + return StatusWith<BSONObj>(config); + } + catch (const DBException& ex) { + return StatusWith<BSONObj>(ex.toStatus()); + } + } + bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host) { return repl::isSelf(host); diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.h b/src/mongo/db/repl/repl_coordinator_external_state_impl.h index 645ec5ac5ee..75d976ec3c9 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.h @@ -44,8 +44,9 @@ namespace repl { virtual void shutdown(); virtual void forwardSlaveHandshake(); virtual void forwardSlaveProgress(); - virtual OID ensureMe(); + virtual OID ensureMe(OperationContext* txn); virtual bool isSelf(const HostAndPort& host); + virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); virtual HostAndPort getClientHostAndPort(const OperationContext* txn); private: diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp index 71a411afa19..0d7001405e3 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp @@ -30,8 +30,10 @@ #include "mongo/db/repl/repl_coordinator_external_state_mock.h" +#include "mongo/base/status_with.h" #include "mongo/bson/oid.h" #include "mongo/db/client.h" +#include "mongo/db/jsobj.h" #include "mongo/db/operation_context_impl.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/sequence_util.h" @@ -39,7 +41,11 @@ namespace mongo { namespace repl { - ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock() {} + ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock() + : _localRsConfigDocument(Status(ErrorCodes::NoMatchingDocument, + "No local config document")) { + } + ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {} void ReplicationCoordinatorExternalStateMock::runSyncSourceFeedback() {} @@ -47,7 +53,7 @@ namespace repl { void ReplicationCoordinatorExternalStateMock::forwardSlaveHandshake() {} void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {} - OID ReplicationCoordinatorExternalStateMock::ensureMe() { + OID ReplicationCoordinatorExternalStateMock::ensureMe(OperationContext*) { return OID::gen(); } @@ -64,5 +70,16 @@ namespace repl { return HostAndPort(); } + StatusWith<BSONObj> ReplicationCoordinatorExternalStateMock::loadLocalConfigDocument( + OperationContext* txn) { + return _localRsConfigDocument; + } + + void ReplicationCoordinatorExternalStateMock::setLocalConfigDocument( + const StatusWith<BSONObj>& localConfigDocument) { + + _localRsConfigDocument = localConfigDocument; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.h b/src/mongo/db/repl/repl_coordinator_external_state_mock.h index 3d3b2c3cca4..cc168c7fdeb 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.h @@ -31,6 +31,9 @@ #include <vector> #include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/oid.h" +#include "mongo/db/jsobj.h" #include "mongo/db/repl/repl_coordinator_external_state.h" #include "mongo/util/net/hostandport.h" @@ -46,9 +49,10 @@ namespace repl { virtual void shutdown(); virtual void forwardSlaveHandshake(); virtual void forwardSlaveProgress(); - virtual OID ensureMe(); + virtual OID ensureMe(OperationContext*); virtual bool isSelf(const HostAndPort& host); virtual HostAndPort getClientHostAndPort(const OperationContext* txn); + virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn); /** * Adds "host" to the list of hosts that this mock will match when responding to "isSelf" @@ -56,7 +60,13 @@ namespace repl { */ void addSelf(const HostAndPort& host); + /** + * Sets the return value for subsequent calls to loadLocalConfigDocument(). + */ + void setLocalConfigDocument(const StatusWith<BSONObj>& localConfigDocument); + private: + StatusWith<BSONObj> _localRsConfigDocument; std::vector<HostAndPort> _selfHosts; }; diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp index 52584fb1cb5..fba4093cdf5 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp +++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp @@ -50,9 +50,9 @@ namespace repl { HybridReplicationCoordinator::~HybridReplicationCoordinator() {} - void HybridReplicationCoordinator::startReplication() { - _legacy.startReplication(); - _impl.startReplication(); + void HybridReplicationCoordinator::startReplication(OperationContext* txn) { + _legacy.startReplication(txn); + _impl.startReplication(txn); } void HybridReplicationCoordinator::shutdown() { diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h index cdca96eb884..2cbda81f891 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.h +++ b/src/mongo/db/repl/repl_coordinator_hybrid.h @@ -49,7 +49,7 @@ namespace repl { HybridReplicationCoordinator(const ReplSettings& settings); virtual ~HybridReplicationCoordinator(); - virtual void startReplication(); + virtual void startReplication(OperationContext* txn); virtual void shutdown(); diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 7d99095b0de..ffb5a7ac45a 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -35,16 +35,18 @@ #include "mongo/base/status.h" #include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/check_quorum_for_config_change.h" #include "mongo/db/repl/master_slave.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" +#include "mongo/db/repl/repl_set_seed_list.h" #include "mongo/db/repl/repl_settings.h" +#include "mongo/db/repl/replica_set_config_checks.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/rs.h" #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/server_options.h" -#include "mongo/db/server_options.h" #include "mongo/db/write_concern_options.h" #include "mongo/stdx/functional.h" #include "mongo/util/assert_util.h" @@ -91,6 +93,7 @@ namespace repl { ReplicationExecutor::NetworkInterface* network, TopologyCoordinator* topCoord) : _settings(settings), + _topCoord(topCoord), _replExecutor(network), _externalState(externalState), _inShutdown(false), @@ -100,12 +103,6 @@ namespace repl { return; } - _topCoord.reset(topCoord); - _topCoord->registerConfigChangeCallback( - stdx::bind(&ReplicationCoordinatorImpl::_onReplicaSetConfigChange, - this, - stdx::placeholders::_1, - stdx::placeholders::_2)); _topCoord->registerStateChangeCallback( stdx::bind(&ReplicationCoordinatorImpl::_onSelfStateChange, this, @@ -114,7 +111,85 @@ namespace repl { ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {} - void ReplicationCoordinatorImpl::startReplication() { + void ReplicationCoordinatorImpl::waitForStartUp() { + if (_startUpFinishedHandle.isValid()) { + _replExecutor.wait(_startUpFinishedHandle); + } + } + + void ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { + + StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn); + if (!cfg.isOK()) { + log() << "Did not find local replica set configuration document at startup; " << + cfg.getStatus(); + return; + } + ReplicaSetConfig localConfig; + Status status = localConfig.initialize(cfg.getValue()); + if (!status.isOK()) { + warning() << "Locally stored replica set configuration does not parse; " + "waiting for rsInitiate or remote heartbeat; Got " << status << " while parsing " << + cfg.getValue(); + return; + } + if (localConfig.getReplSetName() != _settings.ourSetName()) { + warning() << "Local replica set configuration document reports set name of " << + localConfig.getReplSetName() << ", but command line reports " << + _settings.ourSetName() << "; ignoring local configuration document."; + return; + } + + // Use a callback here, because finishLoadLocalConfig will block until we start listening + // in this thread. + StatusWith<ReplicationExecutor::CallbackHandle> cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig, + this, + stdx::placeholders::_1, + localConfig)); + if (cbh.isOK()) { + _startUpFinishedHandle = cbh.getValue(); + } + } + + void ReplicationCoordinatorImpl::_finishLoadLocalConfig( + const ReplicationExecutor::CallbackData& cbData, + const ReplicaSetConfig& localConfig) { + if (!cbData.status.isOK()) { + LOG(1) << "Loading local replica set configuration failed due to " << cbData.status; + return; + } + + boost::unique_lock<boost::mutex> lk(_mutex); + // TODO(schwerin): validateConfigForStartUp calls isSelf, which might lead to + // network traffic. It would be nice not to hold a lock for that. Ditto in + // other validate calls, in other parts of the repl coordinator. + StatusWith<int> myIndex = validateConfigForStartUp(_externalState.get(), + _rsConfig, + localConfig); + if (!myIndex.isOK()) { + warning() << "Locally stored replica set configuration not valid for current node; " + "waiting for rsInitiate or remote heratbeat; Got " << myIndex.getStatus() << + " while validating " << localConfig.toBSON(); + return; + } + if (_rsConfig.isInitialized()) { + cancelHeartbeats(); + } + _rsConfig = localConfig; + _thisMembersConfigIndex = myIndex.getValue(); + OpTime lastAppliedOpTime = _getLastOpApplied_inlock(); + lk.unlock(); + _topCoord->updateConfig( + cbData, + localConfig, + myIndex.getValue(), + Date_t(curTimeMillis64()), + lastAppliedOpTime); + _startHeartbeats(); + } + + void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) { if (!isReplEnabled()) { return; } @@ -123,13 +198,15 @@ namespace repl { // access to _myRID, which is not mutex guarded. This is OK because startReplication() // executes before the server starts listening for connections, and replication starts no // threads of its own until later in this function. - _myRID = _externalState->ensureMe(); + _myRID = _externalState->ensureMe(txn); _topCoordDriverThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, &_replExecutor))); _syncSourceFeedbackThread.reset(new boost::thread( stdx::bind(&ReplicationCoordinatorExternalState::runSyncSourceFeedback, _externalState.get()))); + + _startLoadLocalConfig(txn); } void ReplicationCoordinatorImpl::shutdown() { @@ -179,9 +256,9 @@ namespace repl { } void ReplicationCoordinatorImpl::_onSelfStateChange(const MemberState& newState) { - invariant(_settings.usingReplSets()); boost::lock_guard<boost::mutex> lk(_mutex); invariant(_settings.usingReplSets()); + invariant(_getReplicationMode_inlock() == modeReplSet); _currentState = newState; if (newState.primary()) { _electionID = OID::gen(); @@ -232,6 +309,10 @@ namespace repl { OpTime ReplicationCoordinatorImpl::_getLastOpApplied() { boost::lock_guard<boost::mutex> lk(_mutex); + return _getLastOpApplied_inlock(); + } + + OpTime ReplicationCoordinatorImpl::_getLastOpApplied_inlock() { OperationContextNoop txn; return _slaveInfoMap[getMyRID(&txn)].opTime; } @@ -273,6 +354,7 @@ namespace repl { } Timer timer; + boost::condition_variable condVar; boost::unique_lock<boost::mutex> lk(_mutex); const Mode replMode = _getReplicationMode_inlock(); @@ -286,7 +368,6 @@ namespace repl { return StatusAndDuration(Status::OK(), Milliseconds(0)); } - boost::condition_variable condVar; // Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList WaiterInfo waitInfo(&_replicationWaiterList, &opId, &writeConcern, &condVar); @@ -645,27 +726,27 @@ namespace repl { return Status::OK(); } - void ReplicationCoordinatorImpl::_onReplicaSetConfigChange(const ReplicaSetConfig& newConfig, - int myIndex) { - invariant(_settings.usingReplSets()); - boost::lock_guard<boost::mutex> lk(_mutex); - _rsConfig = newConfig; - _thisMembersConfigIndex = myIndex; +// void ReplicationCoordinatorImpl::_onReplicaSetConfigChange(const ReplicaSetConfig& newConfig, +// int myIndex) { +// invariant(_settings.usingReplSets()); +// boost::lock_guard<boost::mutex> lk(_mutex); +// _rsConfig = newConfig; +// _thisMembersConfigIndex = myIndex; - cancelHeartbeats(); - _startHeartbeats(); +// cancelHeartbeats(); +// _startHeartbeats(); -// TODO(SERVER-14591): instead of this, use WriteConcernOptions and store in replcoord; -// in getLastError command, fetch the defaults via a getter in replcoord. -// replcoord is responsible for replacing its gledefault with a new config's. -/* - if (getLastErrorDefault || !c.getLastErrorDefaults.isEmpty()) { - // see comment in dbcommands.cpp for getlasterrordefault - getLastErrorDefault = new BSONObj(c.getLastErrorDefaults); - } -*/ +// // TODO(SERVER-14591): instead of this, use WriteConcernOptions and store in replcoord; +// // in getLastError command, fetch the defaults via a getter in replcoord. +// // replcoord is responsible for replacing its gledefault with a new config's. +// /* +// if (getLastErrorDefault || !c.getLastErrorDefaults.isEmpty()) { +// // see comment in dbcommands.cpp for getlasterrordefault +// getLastErrorDefault = new BSONObj(c.getLastErrorDefaults); +// } +// */ - } +// } Status ReplicationCoordinatorImpl::processReplSetUpdatePosition( OperationContext* txn, diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index 0acdb2be004..0a7b4465cb7 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -63,7 +63,7 @@ namespace repl { // ================== Members of public ReplicationCoordinator API =================== - virtual void startReplication(); + virtual void startReplication(OperationContext* txn); virtual void shutdown(); @@ -190,6 +190,14 @@ namespace repl { */ void cancelHeartbeats(); + // ================== Test support API =================== + + /** + * If called after startReplication(), blocks until all asynchronous + * activities associated with replication start-up complete. + */ + void waitForStartUp(); + private: // Struct that holds information about clients waiting for replication. @@ -212,13 +220,11 @@ namespace repl { // Called by the TopologyCoordinator whenever this node's replica set state transitions. void _onSelfStateChange(const MemberState& newState); - // Called by the TopologyCoordinator whenever the replica set configuration is updated - void _onReplicaSetConfigChange(const ReplicaSetConfig& newConfig, int myIndex); - /* * Returns the OpTime of the last applied operation on this node. */ OpTime _getLastOpApplied(); + OpTime _getLastOpApplied_inlock(); /* * Returns true if the given writeConcern is satisfied up to "optime". @@ -241,7 +247,7 @@ namespace repl { void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle); /** - * Start a heartbeat for each member in the current config + * Starts a heartbeat for each member in the current config */ void _startHeartbeats(); @@ -253,6 +259,23 @@ namespace repl { */ Mode _getReplicationMode_inlock() const; + /** + * Starts loading the replication configuration from local storage, and if it is valid, + * schedules a callback to set itas the current replica set config (sets _rsConfig and + * _thisMembersConfigIndex). + */ + void _startLoadLocalConfig(OperationContext* txn); + + /** + * Callback that finishes the work started in _startLoadLocalConfig. + */ + void _finishLoadLocalConfig(const ReplicationExecutor::CallbackData& cbData, + const ReplicaSetConfig& localConfig); + + // Handle for the callback that marks the end of startReplication()'s asynchronous + // work. Used for testing, set in startReplication() and never changed. + ReplicationExecutor::CallbackHandle _startUpFinishedHandle; + // Handles to actively queued heartbeats. // Only accessed serially in ReplicationExecutor callbacks, which makes it safe to access // outside of _mutex. diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp index 9cc4ed8fafe..d4fb42f277f 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp @@ -30,6 +30,7 @@ #include <boost/scoped_ptr.hpp> #include <boost/thread.hpp> +#include <memory> #include <set> #include <vector> @@ -41,6 +42,7 @@ #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/write_concern_options.h" +#include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -60,6 +62,10 @@ namespace mongo { namespace repl { namespace { + bool stringContains(const std::string &haystack, const std::string& needle) { + return haystack.find(needle) != std::string::npos; + } + const Seconds zeroSecs(0); class ReplCoordTest : public mongo::unittest::Test { @@ -80,18 +86,6 @@ namespace { ReplicationCoordinatorImpl* getReplCoord() {return _repl.get();} TopologyCoordinatorImpl& getTopoCoord() {return *_topo;} - void updateConfig(BSONObj configBSON, int me) { - ReplicaSetConfig config; - ASSERT_OK(config.initialize(configBSON)); - updateConfig(config, me); - } - - void updateConfig(ReplicaSetConfig config, int me) { - ReplicationExecutor::CallbackHandle cbh; - ReplicationExecutor::CallbackData cbData(NULL, cbh, Status::OK()); - getTopoCoord().updateConfig(cbData, config, me, curTimeMillis64(), OpTime()); - } - void init() { invariant(!_repl); invariant(!_callShutdown); @@ -121,16 +115,46 @@ namespace { init(); } - _repl->startReplication(); + OperationContextNoop txn; + _repl->startReplication(&txn); + _repl->waitForStartUp(); _callShutdown = true; } + void start(const BSONObj& configDoc, const HostAndPort& selfHost) { + if (!_repl) { + init(); + } + _externalState->setLocalConfigDocument(StatusWith<BSONObj>(configDoc)); + _externalState->addSelf(selfHost); + start(); + } + + void assertStart(ReplicationCoordinator::Mode expectedMode, + const BSONObj& configDoc, + const HostAndPort& selfHost) { + start(configDoc, selfHost); + ASSERT_EQUALS(expectedMode, getReplCoord()->getReplicationMode()); + } + + void assertStartSuccess(const BSONObj& configDoc, const HostAndPort& selfHost) { + assertStart(ReplicationCoordinator::modeReplSet, configDoc, selfHost); + } + void shutdown() { invariant(_callShutdown); _repl->shutdown(); _callShutdown = false; } + int64_t countLogLinesContaining(const std::string& needle) { + return std::count_if(getCapturedLogMessages().begin(), + getCapturedLogMessages().end(), + stdx::bind(stringContains, + stdx::placeholders::_1, + needle)); + } + private: boost::scoped_ptr<ReplicationCoordinatorImpl> _repl; // Owned by ReplicationCoordinatorImpl @@ -143,14 +167,54 @@ namespace { bool _callShutdown; }; - TEST_F(ReplCoordTest, StartupShutdown) { + TEST_F(ReplCoordTest, StartupWithValidLocalConfig) { + assertStart( + ReplicationCoordinator::modeReplSet, + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))), + HostAndPort("node1", 12345)); + } + + TEST_F(ReplCoordTest, StartupWithInvalidLocalConfig) { + startCapturingLogMessages(); + assertStart(ReplicationCoordinator::modeNone, + BSON("_id" << "mySet"), HostAndPort("node1", 12345)); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("configuration does not parse")); + } + + TEST_F(ReplCoordTest, StartupWithConfigMissingSelf) { + startCapturingLogMessages(); + assertStart( + ReplicationCoordinator::modeNone, + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345") << + BSON("_id" << 2 << "host" << "node2:54321"))), + HostAndPort("node3", 12345)); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("NodeNotFound")); + } + + TEST_F(ReplCoordTest, StartupWithLocalConfigSetNameMismatch) { + init("mySet"); + startCapturingLogMessages(); + assertStart(ReplicationCoordinator::modeNone, + BSON("_id" << "notMySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))), + HostAndPort("node1", 12345)); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("reports set name of notMySet,")); + } + + TEST_F(ReplCoordTest, StartupWithNoLocalConfig) { + startCapturingLogMessages(); start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << - "_id" << 0 ))), - 0); - shutdown(); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("Did not find local ")); + ASSERT_EQUALS(ReplicationCoordinator::modeNone, getReplCoord()->getReplicationMode()); } TEST_F(ReplCoordTest, AwaitReplicationNumberBaseCases) { @@ -188,12 +252,11 @@ namespace { } TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesNonBlocking) { - start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << - "_id" << 0 ))), - 0); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + HostAndPort("node1", 12345)); OperationContextNoop txn; OID client1 = OID::gen(); @@ -297,12 +360,11 @@ namespace { }; TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesBlocking) { - start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << - "_id" << 0 ))), - 0); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + HostAndPort("node1", 12345)); OperationContextNoop txn; ReplicationAwaiter awaiter(getReplCoord(), &txn); @@ -347,12 +409,11 @@ namespace { } TEST_F(ReplCoordTest, AwaitReplicationTimeout) { - start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << - "_id" << 0 ))), - 0); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + HostAndPort("node1", 12345)); OperationContextNoop txn; ReplicationAwaiter awaiter(getReplCoord(), &txn); @@ -377,12 +438,11 @@ namespace { } TEST_F(ReplCoordTest, AwaitReplicationShutdown) { - start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << - "_id" << 0 ))), - 0); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + HostAndPort("node1", 12345)); OperationContextNoop txn; ReplicationAwaiter awaiter(getReplCoord(), &txn); @@ -407,6 +467,11 @@ namespace { awaiter.reset(); } + TEST_F(ReplCoordTest, AwaitReplicationNamedModes) { + // TODO(spencer): Test awaitReplication with w:majority and tag groups + warning() << "Test ReplCoordTest.AwaitReplicationNamedModes needs to be written."; + } + TEST_F(ReplCoordTest, GetReplicationModeNone) { init(); ASSERT_EQUALS(ReplicationCoordinator::modeNone, getReplCoord()->getReplicationMode()); @@ -437,35 +502,27 @@ namespace { init(settings); ASSERT_EQUALS(ReplicationCoordinator::modeNone, getReplCoord()->getReplicationMode()); - start(); - updateConfig((BSON("_id" << "mySet" << - "version" << 2 << - "members" << BSON_ARRAY(BSON("host" << "node1:12345" << - "_id" << 0 )))), - 0); - ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode()); - } - - TEST_F(ReplCoordTest, AwaitReplicationNamedModes) { - // TODO(spencer): Test awaitReplication with w:majority and tag groups + assertStart( + ReplicationCoordinator::modeReplSet, + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))), + HostAndPort("node1", 12345)); } TEST_F(ReplCoordTest, TestPrepareReplSetUpdatePositionCommand) { - init("mySet:/test1:1234,test2:1234,test3:1234"); - start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 1 << - "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") << - BSON("_id" << 1 << "host" << "test2:1234") << - BSON("_id" << 2 << "host" << "test3:1234"))), - 0); - OID rid1 = OID::gen(); + OperationContextNoop txn; + init("mySet/test1:1234,test2:1234,test3:1234"); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") << + BSON("_id" << 1 << "host" << "test2:1234") << + BSON("_id" << 2 << "host" << "test3:1234"))), + HostAndPort("test1", 1234)); + OID rid1 = getReplCoord()->getMyRID(&txn); OID rid2 = OID::gen(); OID rid3 = OID::gen(); - HandshakeArgs handshake1; - handshake1.initialize(BSON("handshake" << rid1 << - "member" << 0 << - "config" << BSON("_id" << 0 << "host" << "test1:1234"))); HandshakeArgs handshake2; handshake2.initialize(BSON("handshake" << rid2 << "member" << 1 << @@ -474,8 +531,6 @@ namespace { handshake3.initialize(BSON("handshake" << rid3 << "member" << 2 << "config" << BSON("_id" << 2 << "host" << "test3:1234"))); - OperationContextNoop txn; - ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1)); ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2)); ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3)); OpTime optime1(1, 1); @@ -511,17 +566,17 @@ namespace { } TEST_F(ReplCoordTest, TestHandshakes) { - init("mySet:/test1:1234,test2:1234,test3:1234"); - start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 1 << - "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") << - BSON("_id" << 1 << "host" << "test2:1234") << - BSON("_id" << 2 << "host" << "test3:1234"))), - 1); + init("mySet/test1:1234,test2:1234,test3:1234"); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") << + BSON("_id" << 1 << "host" << "test2:1234") << + BSON("_id" << 2 << "host" << "test3:1234"))), + HostAndPort("test2", 1234)); // Test generating basic handshake with no chaining - OperationContextNoop txn; std::vector<BSONObj> handshakes; + OperationContextNoop txn; getReplCoord()->prepareReplSetUpdatePositionCommandHandshakes(&txn, &handshakes); ASSERT_EQUALS(1U, handshakes.size()); BSONObj handshakeCmd = handshakes[0]; @@ -580,15 +635,15 @@ namespace { // information for replSetGetStatus from a different source than the nodes that aren't // ourself. After this setup, we call replSetGetStatus and make sure that the fields // returned for each member match our expectations. - init("mySet:/test1:1234,test2:1234,test3:1234"); - start(); - updateConfig(BSON("_id" << "mySet" << - "version" << 1 << - "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test0:1234") << - BSON("_id" << 1 << "host" << "test1:1234") << - BSON("_id" << 2 << "host" << "test2:1234") << - BSON("_id" << 3 << "host" << "test3:1234"))), - 3); + init("mySet/test1:1234,test2:1234,test3:1234"); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test0:1234") << + BSON("_id" << 1 << "host" << "test1:1234") << + BSON("_id" << 2 << "host" << "test2:1234") << + BSON("_id" << 3 << "host" << "test3:1234"))), + HostAndPort("test3", 1234)); Date_t startupTime(curTimeMillis64()); OpTime electionTime(1, 2); OpTime oplogProgress(3, 4); @@ -681,8 +736,12 @@ namespace { } TEST_F(ReplCoordTest, TestGetElectionId) { - init("mySet:/test1:1234,test2:1234,test3:1234"); - start(); + init("mySet/test1:1234,test2:1234,test3:1234"); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 2 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "test1:1234"))), + HostAndPort("test1", 1234)); OID electionID1 = getReplCoord()->getElectionId(); getTopoCoord()._changeMemberState(MemberState::RS_PRIMARY); OID electionID2 = getReplCoord()->getElectionId(); diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index e48351d0014..9e1f188ec12 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -72,7 +72,7 @@ namespace repl { } LegacyReplicationCoordinator::~LegacyReplicationCoordinator() {} - void LegacyReplicationCoordinator::startReplication() { + void LegacyReplicationCoordinator::startReplication(OperationContext* txn) { // if we are going to be a replica set, we aren't doing other forms of replication. if (!_settings.replSet.empty()) { if (_settings.slave || _settings.master) { diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h index 5c939a404af..90b144e8049 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.h +++ b/src/mongo/db/repl/repl_coordinator_legacy.h @@ -47,7 +47,7 @@ namespace repl { LegacyReplicationCoordinator(const ReplSettings& settings); virtual ~LegacyReplicationCoordinator(); - virtual void startReplication(); + virtual void startReplication(OperationContext*); virtual void shutdown(); diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp index 5acb257b589..73bc883fd28 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_mock.cpp @@ -40,7 +40,7 @@ namespace repl { _settings(settings) {} ReplicationCoordinatorMock::~ReplicationCoordinatorMock() {} - void ReplicationCoordinatorMock::startReplication() { + void ReplicationCoordinatorMock::startReplication(OperationContext* txn) { // TODO } diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h index a3422eeb1a2..353365de40b 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.h +++ b/src/mongo/db/repl/repl_coordinator_mock.h @@ -46,7 +46,7 @@ namespace repl { ReplicationCoordinatorMock(const ReplSettings& settings); virtual ~ReplicationCoordinatorMock(); - virtual void startReplication(); + virtual void startReplication(OperationContext* txn); virtual void shutdown(); diff --git a/src/mongo/db/repl/replica_set_config_checks.cpp b/src/mongo/db/repl/replica_set_config_checks.cpp index 5fed899fc68..263c1c61e9f 100644 --- a/src/mongo/db/repl/replica_set_config_checks.cpp +++ b/src/mongo/db/repl/replica_set_config_checks.cpp @@ -230,6 +230,23 @@ namespace { } } // namespace + StatusWith<int> validateConfigForStartUp( + ReplicationCoordinatorExternalState* externalState, + const ReplicaSetConfig& oldConfig, + const ReplicaSetConfig& newConfig) { + Status status = newConfig.validate(); + if (!status.isOK()) { + return StatusWith<int>(status); + } + if (oldConfig.isInitialized()) { + status = validateOldAndNewConfigsCompatible(oldConfig, newConfig); + if (!status.isOK()) { + return StatusWith<int>(status); + } + } + return findSelfInConfig(externalState, newConfig); + } + StatusWith<int> validateConfigForInitiate( ReplicationCoordinatorExternalState* externalState, const ReplicaSetConfig& newConfig) { diff --git a/src/mongo/db/repl/replica_set_config_checks.h b/src/mongo/db/repl/replica_set_config_checks.h index 1675b538bdf..d7fcc29a6e5 100644 --- a/src/mongo/db/repl/replica_set_config_checks.h +++ b/src/mongo/db/repl/replica_set_config_checks.h @@ -37,6 +37,21 @@ namespace repl { class ReplicaSetConfig; /** + * Validates that "newConfig" is a legal configuration that the current + * node can accept from its local storage during startup. + * + * Returns the index of the current node's member configuration in "newConfig", + * on success, and an indicative error on failure. + * + * If "oldConfig" is valid, this method only succeds if "newConfig" is a legal + * successor configuration. + */ + StatusWith<int> validateConfigForStartUp( + ReplicationCoordinatorExternalState* externalState, + const ReplicaSetConfig& oldConfig, + const ReplicaSetConfig& newConfig); + + /** * Validates that "newConfig" is a legal initial configuration that can be * initiated by the current node (identified via "externalState"). * diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index d743748e966..3c3c89e3afb 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -400,6 +400,8 @@ namespace repl { public: CallbackHandle() : _generation(0) {} + bool isValid() const { return _finishedEvent.isValid(); } + bool operator==(const CallbackHandle &other) const { return (_finishedEvent == other._finishedEvent); } |