diff options
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_external_state.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_external_state_impl.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_external_state_impl.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_external_state_mock.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_external_state_mock.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_commands.cpp | 4 |
10 files changed, 52 insertions, 33 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d8e44410074..9842d675f16 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -145,17 +145,6 @@ namespace repl { return std::pair<OpTime,long long>(ts, hashNew); } - static void _logOpUninitialized(OperationContext* txn, - const char *opstr, - const char *ns, - const char *logNS, - const BSONObj& obj, - BSONObj *o2, - bool *bb, - bool fromMigrate ) { - uassert(13288, "replSet error write op to db before replSet initialized", str::startsWith(ns, "local.") || *opstr == 'n'); - } - /** write an op to the oplog that is already built. todo : make _logOpRS() call this so we don't repeat ourself? */ @@ -404,10 +393,7 @@ namespace repl { const BSONObj& obj, BSONObj *o2, bool *bb, - bool fromMigrate ) = _logOpUninitialized; - void newReplUp() { - _logOp = _logOpRS; - } + bool fromMigrate ) = _logOpRS; void oldRepl() { _logOp = _logOpOld; } diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index a8ee1b1780c..5f963591ca3 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -39,10 +39,8 @@ namespace mongo { namespace repl { - // These functions redefine the function for logOp(), - // for either master/slave or replica sets. + // Redefines the function for logOp() to master/slave. void oldRepl(); // master-slave - void newReplUp();// replica set after startup // Create a new capped collection for the oplog if it doesn't yet exist. // This will be either local.oplog.rs (replica sets) or local.oplog.$main (master/slave) diff --git a/src/mongo/db/repl/repl_coordinator_external_state.h b/src/mongo/db/repl/repl_coordinator_external_state.h index ee96eeba65c..23fc30e3b88 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state.h +++ b/src/mongo/db/repl/repl_coordinator_external_state.h @@ -59,7 +59,9 @@ namespace repl { virtual ~ReplicationCoordinatorExternalState(); /** - * Starts the background sync, producer, and sync source feedback threads, and sets up logOp + * Starts the background sync, producer, and sync source feedback threads + * + * NOTE: Only starts threads if they are not already started, */ virtual void startThreads() = 0; @@ -75,6 +77,11 @@ namespace repl { virtual void shutdown() = 0; /** + * Creates the oplog and writes the first entry. + */ + virtual void initiateOplog(OperationContext* txn) = 0; + + /** * Simple wrapper around SyncSourceFeedback::forwardSlaveHandshake. Signals to the * SyncSourceFeedback thread that it needs to wake up and send a replication handshake * upstream. diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp index d9aef7887db..48bdf194987 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp @@ -72,17 +72,23 @@ namespace { } // namespace ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() : - _nextThreadId(0) {} + _startedThreads(false) + , _nextThreadId(0) {} ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {} void ReplicationCoordinatorExternalStateImpl::startThreads() { + boost::lock_guard<boost::mutex> lk(_threadMutex); + if (_startedThreads) { + return; + } + log() << "Starting replication applier threads"; _applierThread.reset(new boost::thread(runSyncThread)); BackgroundSync* bgsync = BackgroundSync::get(); _producerThread.reset(new boost::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); _syncSourceFeedbackThread.reset(new boost::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback))); - newReplUp(); + _startedThreads = true; } void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) { @@ -90,12 +96,21 @@ namespace { } void ReplicationCoordinatorExternalStateImpl::shutdown() { - _syncSourceFeedback.shutdown(); - _syncSourceFeedbackThread->join(); - _applierThread->join(); - BackgroundSync* bgsync = BackgroundSync::get(); - bgsync->shutdown(); - _producerThread->join(); + boost::lock_guard<boost::mutex> lk(_threadMutex); + if (_startedThreads) { + log() << "Stopping replication applier threads"; + _syncSourceFeedback.shutdown(); + _syncSourceFeedbackThread->join(); + _applierThread->join(); + BackgroundSync* bgsync = BackgroundSync::get(); + bgsync->shutdown(); + _producerThread->join(); + } + } + + void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) { + createOplog(txn); + logOpInitiate(txn, BSON("msg" << "initiating set")); } void ReplicationCoordinatorExternalStateImpl::forwardSlaveHandshake() { diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.h b/src/mongo/db/repl/repl_coordinator_external_state_impl.h index d336b30f37b..0ab836dfc28 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.h @@ -48,6 +48,7 @@ namespace repl { virtual void startThreads(); virtual void startMasterSlave(OperationContext* txn); virtual void shutdown(); + virtual void initiateOplog(OperationContext* txn); virtual void forwardSlaveHandshake(); virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext* txn); @@ -65,6 +66,11 @@ namespace repl { std::string getNextOpContextThreadName(); private: + // Guards starting threads and setting _startedThreads + boost::mutex _threadMutex; + + // True when the threads have been started + bool _startedThreads; // The SyncSourceFeedback class is responsible for sending replSetUpdatePosition commands // for forwarding replication progress information upstream when there is chained diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp index 3b555ccd0f9..c5bfaf66d94 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp @@ -55,6 +55,7 @@ namespace repl { void ReplicationCoordinatorExternalStateMock::startThreads() {} void ReplicationCoordinatorExternalStateMock::startMasterSlave(OperationContext*) {} + void ReplicationCoordinatorExternalStateMock::initiateOplog(OperationContext* txn) {} void ReplicationCoordinatorExternalStateMock::shutdown() {} void ReplicationCoordinatorExternalStateMock::forwardSlaveHandshake() {} void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {} diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.h b/src/mongo/db/repl/repl_coordinator_external_state_mock.h index 5c1511d16bc..568ab190de7 100644 --- a/src/mongo/db/repl/repl_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.h @@ -51,6 +51,7 @@ namespace repl { virtual void startThreads(); virtual void startMasterSlave(OperationContext*); virtual void shutdown(); + virtual void initiateOplog(OperationContext* txn); virtual void forwardSlaveHandshake(); virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext*); diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 6e4e5d6fec1..ad835cc531d 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -268,6 +268,7 @@ namespace { lk.unlock(); } _performPostMemberStateUpdateAction(action); + _externalState->startThreads(); } void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) { @@ -297,8 +298,6 @@ namespace { _topCoordDriverThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, &_replExecutor))); - _externalState->startThreads(); - bool doneLoadingConfig = _startLoadLocalConfig(txn); if (doneLoadingConfig) { // If we're not done loading the config, then the config state will be set by @@ -1875,6 +1874,12 @@ namespace { configStateGuard.Dismiss(); fassert(18654, cbh.getStatus()); _replExecutor.wait(cbh.getValue()); + + if (status.isOK()) { + // Create the oplog with the first entry, and start repl threads. + _externalState->initiateOplog(txn); + _externalState->startThreads(); + } return status; } diff --git a/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp index c2b6fe0a22d..4ec8f4060ba 100644 --- a/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp @@ -424,6 +424,10 @@ namespace { } return; } + + lk.unlock(); + + _externalState->startThreads(); } const stdx::function<void (const ReplicationExecutor::CallbackData&)> reconfigFinishFn( diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index b4725c347bc..52a8bc819f5 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -279,10 +279,6 @@ namespace { Status status = getGlobalReplicationCoordinator()->processReplSetInitiate(txn, configObj, &result); - if (status.isOK()) { - createOplog(txn); - logOpInitiate(txn, BSON("msg" << "initiating set")); - } return appendCommandStatus(result, status); } } cmdReplSetInitiate; |