summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2014-08-11 13:44:49 -0400
committerEric Milkie <milkie@10gen.com>2014-08-11 15:17:52 -0400
commit4064ced405a51bedcb9da9d90f0320023e23728c (patch)
tree2f928efd599a6498db0b17ea7fbcc1d0e52a0fa3
parent08873e79890885f83d0a9ae1de7ea9f4769cd83d (diff)
downloadmongo-4064ced405a51bedcb9da9d90f0320023e23728c.tar.gz
SERVER-14788 Wire initial configuration loading into startReplication.
via Andy Schwerin.
-rw-r--r--src/mongo/db/db.cpp5
-rw-r--r--src/mongo/db/repl/repl_coordinator.h2
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state.h11
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_impl.cpp32
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_impl.h3
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_mock.cpp21
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_mock.h12
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp6
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.h2
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp139
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h33
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl_test.cpp239
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.cpp2
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.h2
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replica_set_config_checks.cpp17
-rw-r--r--src/mongo/db/repl/replica_set_config_checks.h15
-rw-r--r--src/mongo/db/repl/replication_executor.h2
19 files changed, 398 insertions, 149 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index b5831a85733..dd2b2b2ae82 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -276,7 +276,10 @@ namespace mongo {
server->setupSockets();
logStartup();
- repl::getGlobalReplicationCoordinator()->startReplication();
+ {
+ OperationContextImpl txn;
+ repl::getGlobalReplicationCoordinator()->startReplication(&txn);
+ }
if (serverGlobalParams.isHttpInterfaceEnabled)
boost::thread web(stdx::bind(&webServerThread,
new RestAdminAccess())); // takes ownership
diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h
index ac79c69e07b..93243194aad 100644
--- a/src/mongo/db/repl/repl_coordinator.h
+++ b/src/mongo/db/repl/repl_coordinator.h
@@ -93,7 +93,7 @@ namespace repl {
* components of the replication system to start up whatever threads and do whatever
* initialization they need.
*/
- virtual void startReplication() = 0;
+ virtual void startReplication(OperationContext* txn) = 0;
/**
* Does whatever cleanup is required to stop replication, including instructing the other
diff --git a/src/mongo/db/repl/repl_coordinator_external_state.h b/src/mongo/db/repl/repl_coordinator_external_state.h
index f994e91281e..9e27723e122 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state.h
@@ -32,9 +32,11 @@
namespace mongo {
- struct HostAndPort;
+ class BSONObj;
class OID;
class OperationContext;
+ struct HostAndPort;
+ template <typename T> class StatusWith;
namespace repl {
@@ -82,7 +84,7 @@ namespace repl {
* exist or our hostname doesn't match what was recorded in local.me, generates a new OID
* to use as our RID, stores it in local.me, and returns it.
*/
- virtual OID ensureMe() = 0;
+ virtual OID ensureMe(OperationContext*) = 0;
/**
* Returns true if "host" is one of the network identities of this node.
@@ -90,6 +92,11 @@ namespace repl {
virtual bool isSelf(const HostAndPort& host) = 0;
/**
+ * Gets the replica set config document from local storage, or returns an error.
+ */
+ virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn) = 0;
+
+ /**
* Returns the HostAndPort of the remote client connected to us that initiated the operation
* represented by "txn".
*/
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
index 717e9bd305b..83f13a6ddf6 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
@@ -25,8 +25,6 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
-
-#include "mongo/platform/basic.h"
#include "mongo/platform/basic.h"
@@ -34,11 +32,11 @@
#include <string>
+#include "mongo/base/status_with.h"
#include "mongo/bson/oid.h"
#include "mongo/db/client.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
-#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/isself.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/sock.h"
@@ -65,29 +63,28 @@ namespace repl {
_syncSourceFeedback.forwardSlaveProgress();
}
- OID ReplicationCoordinatorExternalStateImpl::ensureMe() {
+ OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) {
std::string myname = getHostName();
OID myRID;
{
- OperationContextImpl txn;
- Client::WriteContext ctx(&txn, "local");
+ Client::WriteContext ctx(txn, "local");
BSONObj me;
// local.me is an identifier for a server for getLastError w:2+
- if (!Helpers::getSingleton(&txn, "local.me", me) ||
+ if (!Helpers::getSingleton(txn, "local.me", me) ||
!me.hasField("host") ||
me["host"].String() != myname) {
myRID = OID::gen();
// clean out local.me
- Helpers::emptyCollection(&txn, "local.me");
+ Helpers::emptyCollection(txn, "local.me");
// repopulate
BSONObjBuilder b;
b.append("_id", myRID);
b.append("host", myname);
- Helpers::putSingleton(&txn, "local.me", b.done());
+ Helpers::putSingleton(txn, "local.me", b.done());
} else {
myRID = me["_id"].OID();
}
@@ -96,6 +93,23 @@ namespace repl {
return myRID;
}
+ StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument(
+ OperationContext* txn) {
+ try {
+ BSONObj config;
+ Client::ReadContext ctx(txn, "local");
+ if (!Helpers::getSingleton(txn, "local.system.replset", config)) {
+ return StatusWith<BSONObj>(
+ ErrorCodes::NoMatchingDocument,
+ "Did not find replica set configuration document in local.system.replset");
+ }
+ return StatusWith<BSONObj>(config);
+ }
+ catch (const DBException& ex) {
+ return StatusWith<BSONObj>(ex.toStatus());
+ }
+ }
+
bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host) {
return repl::isSelf(host);
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.h b/src/mongo/db/repl/repl_coordinator_external_state_impl.h
index 645ec5ac5ee..75d976ec3c9 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.h
@@ -44,8 +44,9 @@ namespace repl {
virtual void shutdown();
virtual void forwardSlaveHandshake();
virtual void forwardSlaveProgress();
- virtual OID ensureMe();
+ virtual OID ensureMe(OperationContext* txn);
virtual bool isSelf(const HostAndPort& host);
+ virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn);
virtual HostAndPort getClientHostAndPort(const OperationContext* txn);
private:
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
index 71a411afa19..0d7001405e3 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
@@ -30,8 +30,10 @@
#include "mongo/db/repl/repl_coordinator_external_state_mock.h"
+#include "mongo/base/status_with.h"
#include "mongo/bson/oid.h"
#include "mongo/db/client.h"
+#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/sequence_util.h"
@@ -39,7 +41,11 @@
namespace mongo {
namespace repl {
- ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock() {}
+ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock()
+ : _localRsConfigDocument(Status(ErrorCodes::NoMatchingDocument,
+ "No local config document")) {
+ }
+
ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {}
void ReplicationCoordinatorExternalStateMock::runSyncSourceFeedback() {}
@@ -47,7 +53,7 @@ namespace repl {
void ReplicationCoordinatorExternalStateMock::forwardSlaveHandshake() {}
void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {}
- OID ReplicationCoordinatorExternalStateMock::ensureMe() {
+ OID ReplicationCoordinatorExternalStateMock::ensureMe(OperationContext*) {
return OID::gen();
}
@@ -64,5 +70,16 @@ namespace repl {
return HostAndPort();
}
+ StatusWith<BSONObj> ReplicationCoordinatorExternalStateMock::loadLocalConfigDocument(
+ OperationContext* txn) {
+ return _localRsConfigDocument;
+ }
+
+ void ReplicationCoordinatorExternalStateMock::setLocalConfigDocument(
+ const StatusWith<BSONObj>& localConfigDocument) {
+
+ _localRsConfigDocument = localConfigDocument;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.h b/src/mongo/db/repl/repl_coordinator_external_state_mock.h
index 3d3b2c3cca4..cc168c7fdeb 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.h
@@ -31,6 +31,9 @@
#include <vector>
#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/oid.h"
+#include "mongo/db/jsobj.h"
#include "mongo/db/repl/repl_coordinator_external_state.h"
#include "mongo/util/net/hostandport.h"
@@ -46,9 +49,10 @@ namespace repl {
virtual void shutdown();
virtual void forwardSlaveHandshake();
virtual void forwardSlaveProgress();
- virtual OID ensureMe();
+ virtual OID ensureMe(OperationContext*);
virtual bool isSelf(const HostAndPort& host);
virtual HostAndPort getClientHostAndPort(const OperationContext* txn);
+ virtual StatusWith<BSONObj> loadLocalConfigDocument(OperationContext* txn);
/**
* Adds "host" to the list of hosts that this mock will match when responding to "isSelf"
@@ -56,7 +60,13 @@ namespace repl {
*/
void addSelf(const HostAndPort& host);
+ /**
+ * Sets the return value for subsequent calls to loadLocalConfigDocument().
+ */
+ void setLocalConfigDocument(const StatusWith<BSONObj>& localConfigDocument);
+
private:
+ StatusWith<BSONObj> _localRsConfigDocument;
std::vector<HostAndPort> _selfHosts;
};
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
index 52584fb1cb5..fba4093cdf5 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -50,9 +50,9 @@ namespace repl {
HybridReplicationCoordinator::~HybridReplicationCoordinator() {}
- void HybridReplicationCoordinator::startReplication() {
- _legacy.startReplication();
- _impl.startReplication();
+ void HybridReplicationCoordinator::startReplication(OperationContext* txn) {
+ _legacy.startReplication(txn);
+ _impl.startReplication(txn);
}
void HybridReplicationCoordinator::shutdown() {
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h
index cdca96eb884..2cbda81f891 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.h
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.h
@@ -49,7 +49,7 @@ namespace repl {
HybridReplicationCoordinator(const ReplSettings& settings);
virtual ~HybridReplicationCoordinator();
- virtual void startReplication();
+ virtual void startReplication(OperationContext* txn);
virtual void shutdown();
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 7d99095b0de..ffb5a7ac45a 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -35,16 +35,18 @@
#include "mongo/base/status.h"
#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/repl/check_quorum_for_config_change.h"
#include "mongo/db/repl/master_slave.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
+#include "mongo/db/repl/repl_set_seed_list.h"
#include "mongo/db/repl/repl_settings.h"
+#include "mongo/db/repl/replica_set_config_checks.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/rs.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/server_options.h"
-#include "mongo/db/server_options.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
@@ -91,6 +93,7 @@ namespace repl {
ReplicationExecutor::NetworkInterface* network,
TopologyCoordinator* topCoord) :
_settings(settings),
+ _topCoord(topCoord),
_replExecutor(network),
_externalState(externalState),
_inShutdown(false),
@@ -100,12 +103,6 @@ namespace repl {
return;
}
- _topCoord.reset(topCoord);
- _topCoord->registerConfigChangeCallback(
- stdx::bind(&ReplicationCoordinatorImpl::_onReplicaSetConfigChange,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2));
_topCoord->registerStateChangeCallback(
stdx::bind(&ReplicationCoordinatorImpl::_onSelfStateChange,
this,
@@ -114,7 +111,85 @@ namespace repl {
ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {}
- void ReplicationCoordinatorImpl::startReplication() {
+ void ReplicationCoordinatorImpl::waitForStartUp() {
+ if (_startUpFinishedHandle.isValid()) {
+ _replExecutor.wait(_startUpFinishedHandle);
+ }
+ }
+
+ void ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
+
+ StatusWith<BSONObj> cfg = _externalState->loadLocalConfigDocument(txn);
+ if (!cfg.isOK()) {
+ log() << "Did not find local replica set configuration document at startup; " <<
+ cfg.getStatus();
+ return;
+ }
+ ReplicaSetConfig localConfig;
+ Status status = localConfig.initialize(cfg.getValue());
+ if (!status.isOK()) {
+ warning() << "Locally stored replica set configuration does not parse; "
+ "waiting for rsInitiate or remote heartbeat; Got " << status << " while parsing " <<
+ cfg.getValue();
+ return;
+ }
+ if (localConfig.getReplSetName() != _settings.ourSetName()) {
+ warning() << "Local replica set configuration document reports set name of " <<
+ localConfig.getReplSetName() << ", but command line reports " <<
+ _settings.ourSetName() << "; ignoring local configuration document.";
+ return;
+ }
+
+ // Use a callback here, because finishLoadLocalConfig will block until we start listening
+ // in this thread.
+ StatusWith<ReplicationExecutor::CallbackHandle> cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig,
+ this,
+ stdx::placeholders::_1,
+ localConfig));
+ if (cbh.isOK()) {
+ _startUpFinishedHandle = cbh.getValue();
+ }
+ }
+
+ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
+ const ReplicationExecutor::CallbackData& cbData,
+ const ReplicaSetConfig& localConfig) {
+ if (!cbData.status.isOK()) {
+ LOG(1) << "Loading local replica set configuration failed due to " << cbData.status;
+ return;
+ }
+
+ boost::unique_lock<boost::mutex> lk(_mutex);
+ // TODO(schwerin): validateConfigForStartUp calls isSelf, which might lead to
+ // network traffic. It would be nice not to hold a lock for that. Ditto in
+ // other validate calls, in other parts of the repl coordinator.
+ StatusWith<int> myIndex = validateConfigForStartUp(_externalState.get(),
+ _rsConfig,
+ localConfig);
+ if (!myIndex.isOK()) {
+ warning() << "Locally stored replica set configuration not valid for current node; "
+ "waiting for rsInitiate or remote heratbeat; Got " << myIndex.getStatus() <<
+ " while validating " << localConfig.toBSON();
+ return;
+ }
+ if (_rsConfig.isInitialized()) {
+ cancelHeartbeats();
+ }
+ _rsConfig = localConfig;
+ _thisMembersConfigIndex = myIndex.getValue();
+ OpTime lastAppliedOpTime = _getLastOpApplied_inlock();
+ lk.unlock();
+ _topCoord->updateConfig(
+ cbData,
+ localConfig,
+ myIndex.getValue(),
+ Date_t(curTimeMillis64()),
+ lastAppliedOpTime);
+ _startHeartbeats();
+ }
+
+ void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) {
if (!isReplEnabled()) {
return;
}
@@ -123,13 +198,15 @@ namespace repl {
// access to _myRID, which is not mutex guarded. This is OK because startReplication()
// executes before the server starts listening for connections, and replication starts no
// threads of its own until later in this function.
- _myRID = _externalState->ensureMe();
+ _myRID = _externalState->ensureMe(txn);
_topCoordDriverThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
&_replExecutor)));
_syncSourceFeedbackThread.reset(new boost::thread(
stdx::bind(&ReplicationCoordinatorExternalState::runSyncSourceFeedback,
_externalState.get())));
+
+ _startLoadLocalConfig(txn);
}
void ReplicationCoordinatorImpl::shutdown() {
@@ -179,9 +256,9 @@ namespace repl {
}
void ReplicationCoordinatorImpl::_onSelfStateChange(const MemberState& newState) {
- invariant(_settings.usingReplSets());
boost::lock_guard<boost::mutex> lk(_mutex);
invariant(_settings.usingReplSets());
+ invariant(_getReplicationMode_inlock() == modeReplSet);
_currentState = newState;
if (newState.primary()) {
_electionID = OID::gen();
@@ -232,6 +309,10 @@ namespace repl {
OpTime ReplicationCoordinatorImpl::_getLastOpApplied() {
boost::lock_guard<boost::mutex> lk(_mutex);
+ return _getLastOpApplied_inlock();
+ }
+
+ OpTime ReplicationCoordinatorImpl::_getLastOpApplied_inlock() {
OperationContextNoop txn;
return _slaveInfoMap[getMyRID(&txn)].opTime;
}
@@ -273,6 +354,7 @@ namespace repl {
}
Timer timer;
+ boost::condition_variable condVar;
boost::unique_lock<boost::mutex> lk(_mutex);
const Mode replMode = _getReplicationMode_inlock();
@@ -286,7 +368,6 @@ namespace repl {
return StatusAndDuration(Status::OK(), Milliseconds(0));
}
- boost::condition_variable condVar;
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
WaiterInfo waitInfo(&_replicationWaiterList, &opId, &writeConcern, &condVar);
@@ -645,27 +726,27 @@ namespace repl {
return Status::OK();
}
- void ReplicationCoordinatorImpl::_onReplicaSetConfigChange(const ReplicaSetConfig& newConfig,
- int myIndex) {
- invariant(_settings.usingReplSets());
- boost::lock_guard<boost::mutex> lk(_mutex);
- _rsConfig = newConfig;
- _thisMembersConfigIndex = myIndex;
+// void ReplicationCoordinatorImpl::_onReplicaSetConfigChange(const ReplicaSetConfig& newConfig,
+// int myIndex) {
+// invariant(_settings.usingReplSets());
+// boost::lock_guard<boost::mutex> lk(_mutex);
+// _rsConfig = newConfig;
+// _thisMembersConfigIndex = myIndex;
- cancelHeartbeats();
- _startHeartbeats();
+// cancelHeartbeats();
+// _startHeartbeats();
-// TODO(SERVER-14591): instead of this, use WriteConcernOptions and store in replcoord;
-// in getLastError command, fetch the defaults via a getter in replcoord.
-// replcoord is responsible for replacing its gledefault with a new config's.
-/*
- if (getLastErrorDefault || !c.getLastErrorDefaults.isEmpty()) {
- // see comment in dbcommands.cpp for getlasterrordefault
- getLastErrorDefault = new BSONObj(c.getLastErrorDefaults);
- }
-*/
+// // TODO(SERVER-14591): instead of this, use WriteConcernOptions and store in replcoord;
+// // in getLastError command, fetch the defaults via a getter in replcoord.
+// // replcoord is responsible for replacing its gledefault with a new config's.
+// /*
+// if (getLastErrorDefault || !c.getLastErrorDefaults.isEmpty()) {
+// // see comment in dbcommands.cpp for getlasterrordefault
+// getLastErrorDefault = new BSONObj(c.getLastErrorDefaults);
+// }
+// */
- }
+// }
Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(
OperationContext* txn,
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index 0acdb2be004..0a7b4465cb7 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -63,7 +63,7 @@ namespace repl {
// ================== Members of public ReplicationCoordinator API ===================
- virtual void startReplication();
+ virtual void startReplication(OperationContext* txn);
virtual void shutdown();
@@ -190,6 +190,14 @@ namespace repl {
*/
void cancelHeartbeats();
+ // ================== Test support API ===================
+
+ /**
+ * If called after startReplication(), blocks until all asynchronous
+ * activities associated with replication start-up complete.
+ */
+ void waitForStartUp();
+
private:
// Struct that holds information about clients waiting for replication.
@@ -212,13 +220,11 @@ namespace repl {
// Called by the TopologyCoordinator whenever this node's replica set state transitions.
void _onSelfStateChange(const MemberState& newState);
- // Called by the TopologyCoordinator whenever the replica set configuration is updated
- void _onReplicaSetConfigChange(const ReplicaSetConfig& newConfig, int myIndex);
-
/*
* Returns the OpTime of the last applied operation on this node.
*/
OpTime _getLastOpApplied();
+ OpTime _getLastOpApplied_inlock();
/*
* Returns true if the given writeConcern is satisfied up to "optime".
@@ -241,7 +247,7 @@ namespace repl {
void _untrackHeartbeatHandle(const ReplicationExecutor::CallbackHandle& handle);
/**
- * Start a heartbeat for each member in the current config
+ * Starts a heartbeat for each member in the current config
*/
void _startHeartbeats();
@@ -253,6 +259,23 @@ namespace repl {
*/
Mode _getReplicationMode_inlock() const;
+ /**
+ * Starts loading the replication configuration from local storage, and if it is valid,
+ * schedules a callback to set itas the current replica set config (sets _rsConfig and
+ * _thisMembersConfigIndex).
+ */
+ void _startLoadLocalConfig(OperationContext* txn);
+
+ /**
+ * Callback that finishes the work started in _startLoadLocalConfig.
+ */
+ void _finishLoadLocalConfig(const ReplicationExecutor::CallbackData& cbData,
+ const ReplicaSetConfig& localConfig);
+
+ // Handle for the callback that marks the end of startReplication()'s asynchronous
+ // work. Used for testing, set in startReplication() and never changed.
+ ReplicationExecutor::CallbackHandle _startUpFinishedHandle;
+
// Handles to actively queued heartbeats.
// Only accessed serially in ReplicationExecutor callbacks, which makes it safe to access
// outside of _mutex.
diff --git a/src/mongo/db/repl/repl_coordinator_impl_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
index 9cc4ed8fafe..d4fb42f277f 100644
--- a/src/mongo/db/repl/repl_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl_test.cpp
@@ -30,6 +30,7 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
+#include <memory>
#include <set>
#include <vector>
@@ -41,6 +42,7 @@
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/stdx/functional.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
@@ -60,6 +62,10 @@ namespace mongo {
namespace repl {
namespace {
+ bool stringContains(const std::string &haystack, const std::string& needle) {
+ return haystack.find(needle) != std::string::npos;
+ }
+
const Seconds zeroSecs(0);
class ReplCoordTest : public mongo::unittest::Test {
@@ -80,18 +86,6 @@ namespace {
ReplicationCoordinatorImpl* getReplCoord() {return _repl.get();}
TopologyCoordinatorImpl& getTopoCoord() {return *_topo;}
- void updateConfig(BSONObj configBSON, int me) {
- ReplicaSetConfig config;
- ASSERT_OK(config.initialize(configBSON));
- updateConfig(config, me);
- }
-
- void updateConfig(ReplicaSetConfig config, int me) {
- ReplicationExecutor::CallbackHandle cbh;
- ReplicationExecutor::CallbackData cbData(NULL, cbh, Status::OK());
- getTopoCoord().updateConfig(cbData, config, me, curTimeMillis64(), OpTime());
- }
-
void init() {
invariant(!_repl);
invariant(!_callShutdown);
@@ -121,16 +115,46 @@ namespace {
init();
}
- _repl->startReplication();
+ OperationContextNoop txn;
+ _repl->startReplication(&txn);
+ _repl->waitForStartUp();
_callShutdown = true;
}
+ void start(const BSONObj& configDoc, const HostAndPort& selfHost) {
+ if (!_repl) {
+ init();
+ }
+ _externalState->setLocalConfigDocument(StatusWith<BSONObj>(configDoc));
+ _externalState->addSelf(selfHost);
+ start();
+ }
+
+ void assertStart(ReplicationCoordinator::Mode expectedMode,
+ const BSONObj& configDoc,
+ const HostAndPort& selfHost) {
+ start(configDoc, selfHost);
+ ASSERT_EQUALS(expectedMode, getReplCoord()->getReplicationMode());
+ }
+
+ void assertStartSuccess(const BSONObj& configDoc, const HostAndPort& selfHost) {
+ assertStart(ReplicationCoordinator::modeReplSet, configDoc, selfHost);
+ }
+
void shutdown() {
invariant(_callShutdown);
_repl->shutdown();
_callShutdown = false;
}
+ int64_t countLogLinesContaining(const std::string& needle) {
+ return std::count_if(getCapturedLogMessages().begin(),
+ getCapturedLogMessages().end(),
+ stdx::bind(stringContains,
+ stdx::placeholders::_1,
+ needle));
+ }
+
private:
boost::scoped_ptr<ReplicationCoordinatorImpl> _repl;
// Owned by ReplicationCoordinatorImpl
@@ -143,14 +167,54 @@ namespace {
bool _callShutdown;
};
- TEST_F(ReplCoordTest, StartupShutdown) {
+ TEST_F(ReplCoordTest, StartupWithValidLocalConfig) {
+ assertStart(
+ ReplicationCoordinator::modeReplSet,
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))),
+ HostAndPort("node1", 12345));
+ }
+
+ TEST_F(ReplCoordTest, StartupWithInvalidLocalConfig) {
+ startCapturingLogMessages();
+ assertStart(ReplicationCoordinator::modeNone,
+ BSON("_id" << "mySet"), HostAndPort("node1", 12345));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("configuration does not parse"));
+ }
+
+ TEST_F(ReplCoordTest, StartupWithConfigMissingSelf) {
+ startCapturingLogMessages();
+ assertStart(
+ ReplicationCoordinator::modeNone,
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345") <<
+ BSON("_id" << 2 << "host" << "node2:54321"))),
+ HostAndPort("node3", 12345));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("NodeNotFound"));
+ }
+
+ TEST_F(ReplCoordTest, StartupWithLocalConfigSetNameMismatch) {
+ init("mySet");
+ startCapturingLogMessages();
+ assertStart(ReplicationCoordinator::modeNone,
+ BSON("_id" << "notMySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))),
+ HostAndPort("node1", 12345));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("reports set name of notMySet,"));
+ }
+
+ TEST_F(ReplCoordTest, StartupWithNoLocalConfig) {
+ startCapturingLogMessages();
start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" <<
- "_id" << 0 ))),
- 0);
- shutdown();
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("Did not find local "));
+ ASSERT_EQUALS(ReplicationCoordinator::modeNone, getReplCoord()->getReplicationMode());
}
TEST_F(ReplCoordTest, AwaitReplicationNumberBaseCases) {
@@ -188,12 +252,11 @@ namespace {
}
TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesNonBlocking) {
- start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" <<
- "_id" << 0 ))),
- 0);
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ HostAndPort("node1", 12345));
OperationContextNoop txn;
OID client1 = OID::gen();
@@ -297,12 +360,11 @@ namespace {
};
TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesBlocking) {
- start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" <<
- "_id" << 0 ))),
- 0);
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ HostAndPort("node1", 12345));
OperationContextNoop txn;
ReplicationAwaiter awaiter(getReplCoord(), &txn);
@@ -347,12 +409,11 @@ namespace {
}
TEST_F(ReplCoordTest, AwaitReplicationTimeout) {
- start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" <<
- "_id" << 0 ))),
- 0);
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ HostAndPort("node1", 12345));
OperationContextNoop txn;
ReplicationAwaiter awaiter(getReplCoord(), &txn);
@@ -377,12 +438,11 @@ namespace {
}
TEST_F(ReplCoordTest, AwaitReplicationShutdown) {
- start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" <<
- "_id" << 0 ))),
- 0);
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ HostAndPort("node1", 12345));
OperationContextNoop txn;
ReplicationAwaiter awaiter(getReplCoord(), &txn);
@@ -407,6 +467,11 @@ namespace {
awaiter.reset();
}
+ TEST_F(ReplCoordTest, AwaitReplicationNamedModes) {
+ // TODO(spencer): Test awaitReplication with w:majority and tag groups
+ warning() << "Test ReplCoordTest.AwaitReplicationNamedModes needs to be written.";
+ }
+
TEST_F(ReplCoordTest, GetReplicationModeNone) {
init();
ASSERT_EQUALS(ReplicationCoordinator::modeNone, getReplCoord()->getReplicationMode());
@@ -437,35 +502,27 @@ namespace {
init(settings);
ASSERT_EQUALS(ReplicationCoordinator::modeNone,
getReplCoord()->getReplicationMode());
- start();
- updateConfig((BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" <<
- "_id" << 0 )))),
- 0);
- ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationNamedModes) {
- // TODO(spencer): Test awaitReplication with w:majority and tag groups
+ assertStart(
+ ReplicationCoordinator::modeReplSet,
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
+ HostAndPort("node1", 12345));
}
TEST_F(ReplCoordTest, TestPrepareReplSetUpdatePositionCommand) {
- init("mySet:/test1:1234,test2:1234,test3:1234");
- start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
- BSON("_id" << 1 << "host" << "test2:1234") <<
- BSON("_id" << 2 << "host" << "test3:1234"))),
- 0);
- OID rid1 = OID::gen();
+ OperationContextNoop txn;
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 1 <<
+ "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
+ BSON("_id" << 1 << "host" << "test2:1234") <<
+ BSON("_id" << 2 << "host" << "test3:1234"))),
+ HostAndPort("test1", 1234));
+ OID rid1 = getReplCoord()->getMyRID(&txn);
OID rid2 = OID::gen();
OID rid3 = OID::gen();
- HandshakeArgs handshake1;
- handshake1.initialize(BSON("handshake" << rid1 <<
- "member" << 0 <<
- "config" << BSON("_id" << 0 << "host" << "test1:1234")));
HandshakeArgs handshake2;
handshake2.initialize(BSON("handshake" << rid2 <<
"member" << 1 <<
@@ -474,8 +531,6 @@ namespace {
handshake3.initialize(BSON("handshake" << rid3 <<
"member" << 2 <<
"config" << BSON("_id" << 2 << "host" << "test3:1234")));
- OperationContextNoop txn;
- ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake1));
ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake2));
ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake3));
OpTime optime1(1, 1);
@@ -511,17 +566,17 @@ namespace {
}
TEST_F(ReplCoordTest, TestHandshakes) {
- init("mySet:/test1:1234,test2:1234,test3:1234");
- start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
- BSON("_id" << 1 << "host" << "test2:1234") <<
- BSON("_id" << 2 << "host" << "test3:1234"))),
- 1);
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 1 <<
+ "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
+ BSON("_id" << 1 << "host" << "test2:1234") <<
+ BSON("_id" << 2 << "host" << "test3:1234"))),
+ HostAndPort("test2", 1234));
// Test generating basic handshake with no chaining
- OperationContextNoop txn;
std::vector<BSONObj> handshakes;
+ OperationContextNoop txn;
getReplCoord()->prepareReplSetUpdatePositionCommandHandshakes(&txn, &handshakes);
ASSERT_EQUALS(1U, handshakes.size());
BSONObj handshakeCmd = handshakes[0];
@@ -580,15 +635,15 @@ namespace {
// information for replSetGetStatus from a different source than the nodes that aren't
// ourself. After this setup, we call replSetGetStatus and make sure that the fields
// returned for each member match our expectations.
- init("mySet:/test1:1234,test2:1234,test3:1234");
- start();
- updateConfig(BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test0:1234") <<
- BSON("_id" << 1 << "host" << "test1:1234") <<
- BSON("_id" << 2 << "host" << "test2:1234") <<
- BSON("_id" << 3 << "host" << "test3:1234"))),
- 3);
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 1 <<
+ "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test0:1234") <<
+ BSON("_id" << 1 << "host" << "test1:1234") <<
+ BSON("_id" << 2 << "host" << "test2:1234") <<
+ BSON("_id" << 3 << "host" << "test3:1234"))),
+ HostAndPort("test3", 1234));
Date_t startupTime(curTimeMillis64());
OpTime electionTime(1, 2);
OpTime oplogProgress(3, 4);
@@ -681,8 +736,12 @@ namespace {
}
TEST_F(ReplCoordTest, TestGetElectionId) {
- init("mySet:/test1:1234,test2:1234,test3:1234");
- start();
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(
+ BSON("_id" << "mySet" <<
+ "version" << 2 <<
+ "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "test1:1234"))),
+ HostAndPort("test1", 1234));
OID electionID1 = getReplCoord()->getElectionId();
getTopoCoord()._changeMemberState(MemberState::RS_PRIMARY);
OID electionID2 = getReplCoord()->getElectionId();
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp
index e48351d0014..9e1f188ec12 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.cpp
+++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp
@@ -72,7 +72,7 @@ namespace repl {
}
LegacyReplicationCoordinator::~LegacyReplicationCoordinator() {}
- void LegacyReplicationCoordinator::startReplication() {
+ void LegacyReplicationCoordinator::startReplication(OperationContext* txn) {
// if we are going to be a replica set, we aren't doing other forms of replication.
if (!_settings.replSet.empty()) {
if (_settings.slave || _settings.master) {
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h
index 5c939a404af..90b144e8049 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.h
+++ b/src/mongo/db/repl/repl_coordinator_legacy.h
@@ -47,7 +47,7 @@ namespace repl {
LegacyReplicationCoordinator(const ReplSettings& settings);
virtual ~LegacyReplicationCoordinator();
- virtual void startReplication();
+ virtual void startReplication(OperationContext*);
virtual void shutdown();
diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp
index 5acb257b589..73bc883fd28 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_mock.cpp
@@ -40,7 +40,7 @@ namespace repl {
_settings(settings) {}
ReplicationCoordinatorMock::~ReplicationCoordinatorMock() {}
- void ReplicationCoordinatorMock::startReplication() {
+ void ReplicationCoordinatorMock::startReplication(OperationContext* txn) {
// TODO
}
diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h
index a3422eeb1a2..353365de40b 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_mock.h
@@ -46,7 +46,7 @@ namespace repl {
ReplicationCoordinatorMock(const ReplSettings& settings);
virtual ~ReplicationCoordinatorMock();
- virtual void startReplication();
+ virtual void startReplication(OperationContext* txn);
virtual void shutdown();
diff --git a/src/mongo/db/repl/replica_set_config_checks.cpp b/src/mongo/db/repl/replica_set_config_checks.cpp
index 5fed899fc68..263c1c61e9f 100644
--- a/src/mongo/db/repl/replica_set_config_checks.cpp
+++ b/src/mongo/db/repl/replica_set_config_checks.cpp
@@ -230,6 +230,23 @@ namespace {
}
} // namespace
+ StatusWith<int> validateConfigForStartUp(
+ ReplicationCoordinatorExternalState* externalState,
+ const ReplicaSetConfig& oldConfig,
+ const ReplicaSetConfig& newConfig) {
+ Status status = newConfig.validate();
+ if (!status.isOK()) {
+ return StatusWith<int>(status);
+ }
+ if (oldConfig.isInitialized()) {
+ status = validateOldAndNewConfigsCompatible(oldConfig, newConfig);
+ if (!status.isOK()) {
+ return StatusWith<int>(status);
+ }
+ }
+ return findSelfInConfig(externalState, newConfig);
+ }
+
StatusWith<int> validateConfigForInitiate(
ReplicationCoordinatorExternalState* externalState,
const ReplicaSetConfig& newConfig) {
diff --git a/src/mongo/db/repl/replica_set_config_checks.h b/src/mongo/db/repl/replica_set_config_checks.h
index 1675b538bdf..d7fcc29a6e5 100644
--- a/src/mongo/db/repl/replica_set_config_checks.h
+++ b/src/mongo/db/repl/replica_set_config_checks.h
@@ -37,6 +37,21 @@ namespace repl {
class ReplicaSetConfig;
/**
+ * Validates that "newConfig" is a legal configuration that the current
+ * node can accept from its local storage during startup.
+ *
+ * Returns the index of the current node's member configuration in "newConfig",
+ * on success, and an indicative error on failure.
+ *
+ * If "oldConfig" is valid, this method only succeds if "newConfig" is a legal
+ * successor configuration.
+ */
+ StatusWith<int> validateConfigForStartUp(
+ ReplicationCoordinatorExternalState* externalState,
+ const ReplicaSetConfig& oldConfig,
+ const ReplicaSetConfig& newConfig);
+
+ /**
* Validates that "newConfig" is a legal initial configuration that can be
* initiated by the current node (identified via "externalState").
*
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h
index d743748e966..3c3c89e3afb 100644
--- a/src/mongo/db/repl/replication_executor.h
+++ b/src/mongo/db/repl/replication_executor.h
@@ -400,6 +400,8 @@ namespace repl {
public:
CallbackHandle() : _generation(0) {}
+ bool isValid() const { return _finishedEvent.isValid(); }
+
bool operator==(const CallbackHandle &other) const {
return (_finishedEvent == other._finishedEvent);
}