summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/oplog.cpp16
-rw-r--r--src/mongo/db/repl/oplog.h4
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state.h9
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_impl.cpp31
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_impl.h6
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_mock.cpp1
-rw-r--r--src/mongo/db/repl/repl_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp9
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp4
-rw-r--r--src/mongo/db/repl/replset_commands.cpp4
10 files changed, 52 insertions, 33 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index d8e44410074..9842d675f16 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -145,17 +145,6 @@ namespace repl {
return std::pair<OpTime,long long>(ts, hashNew);
}
- static void _logOpUninitialized(OperationContext* txn,
- const char *opstr,
- const char *ns,
- const char *logNS,
- const BSONObj& obj,
- BSONObj *o2,
- bool *bb,
- bool fromMigrate ) {
- uassert(13288, "replSet error write op to db before replSet initialized", str::startsWith(ns, "local.") || *opstr == 'n');
- }
-
/** write an op to the oplog that is already built.
todo : make _logOpRS() call this so we don't repeat ourself?
*/
@@ -404,10 +393,7 @@ namespace repl {
const BSONObj& obj,
BSONObj *o2,
bool *bb,
- bool fromMigrate ) = _logOpUninitialized;
- void newReplUp() {
- _logOp = _logOpRS;
- }
+ bool fromMigrate ) = _logOpRS;
void oldRepl() { _logOp = _logOpOld; }
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index a8ee1b1780c..5f963591ca3 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -39,10 +39,8 @@ namespace mongo {
namespace repl {
- // These functions redefine the function for logOp(),
- // for either master/slave or replica sets.
+ // Redefines the function for logOp() to master/slave.
void oldRepl(); // master-slave
- void newReplUp();// replica set after startup
// Create a new capped collection for the oplog if it doesn't yet exist.
// This will be either local.oplog.rs (replica sets) or local.oplog.$main (master/slave)
diff --git a/src/mongo/db/repl/repl_coordinator_external_state.h b/src/mongo/db/repl/repl_coordinator_external_state.h
index ee96eeba65c..23fc30e3b88 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state.h
@@ -59,7 +59,9 @@ namespace repl {
virtual ~ReplicationCoordinatorExternalState();
/**
- * Starts the background sync, producer, and sync source feedback threads, and sets up logOp
+ * Starts the background sync, producer, and sync source feedback threads
+ *
+ * NOTE: Only starts threads if they are not already started,
*/
virtual void startThreads() = 0;
@@ -75,6 +77,11 @@ namespace repl {
virtual void shutdown() = 0;
/**
+ * Creates the oplog and writes the first entry.
+ */
+ virtual void initiateOplog(OperationContext* txn) = 0;
+
+ /**
* Simple wrapper around SyncSourceFeedback::forwardSlaveHandshake. Signals to the
* SyncSourceFeedback thread that it needs to wake up and send a replication handshake
* upstream.
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
index d9aef7887db..48bdf194987 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.cpp
@@ -72,17 +72,23 @@ namespace {
} // namespace
ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() :
- _nextThreadId(0) {}
+ _startedThreads(false)
+ , _nextThreadId(0) {}
ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {}
void ReplicationCoordinatorExternalStateImpl::startThreads() {
+ boost::lock_guard<boost::mutex> lk(_threadMutex);
+ if (_startedThreads) {
+ return;
+ }
+ log() << "Starting replication applier threads";
_applierThread.reset(new boost::thread(runSyncThread));
BackgroundSync* bgsync = BackgroundSync::get();
_producerThread.reset(new boost::thread(stdx::bind(&BackgroundSync::producerThread,
bgsync)));
_syncSourceFeedbackThread.reset(new boost::thread(stdx::bind(&SyncSourceFeedback::run,
&_syncSourceFeedback)));
- newReplUp();
+ _startedThreads = true;
}
void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) {
@@ -90,12 +96,21 @@ namespace {
}
void ReplicationCoordinatorExternalStateImpl::shutdown() {
- _syncSourceFeedback.shutdown();
- _syncSourceFeedbackThread->join();
- _applierThread->join();
- BackgroundSync* bgsync = BackgroundSync::get();
- bgsync->shutdown();
- _producerThread->join();
+ boost::lock_guard<boost::mutex> lk(_threadMutex);
+ if (_startedThreads) {
+ log() << "Stopping replication applier threads";
+ _syncSourceFeedback.shutdown();
+ _syncSourceFeedbackThread->join();
+ _applierThread->join();
+ BackgroundSync* bgsync = BackgroundSync::get();
+ bgsync->shutdown();
+ _producerThread->join();
+ }
+ }
+
+ void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) {
+ createOplog(txn);
+ logOpInitiate(txn, BSON("msg" << "initiating set"));
}
void ReplicationCoordinatorExternalStateImpl::forwardSlaveHandshake() {
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_impl.h b/src/mongo/db/repl/repl_coordinator_external_state_impl.h
index d336b30f37b..0ab836dfc28 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state_impl.h
@@ -48,6 +48,7 @@ namespace repl {
virtual void startThreads();
virtual void startMasterSlave(OperationContext* txn);
virtual void shutdown();
+ virtual void initiateOplog(OperationContext* txn);
virtual void forwardSlaveHandshake();
virtual void forwardSlaveProgress();
virtual OID ensureMe(OperationContext* txn);
@@ -65,6 +66,11 @@ namespace repl {
std::string getNextOpContextThreadName();
private:
+ // Guards starting threads and setting _startedThreads
+ boost::mutex _threadMutex;
+
+ // True when the threads have been started
+ bool _startedThreads;
// The SyncSourceFeedback class is responsible for sending replSetUpdatePosition commands
// for forwarding replication progress information upstream when there is chained
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
index 3b555ccd0f9..c5bfaf66d94 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.cpp
@@ -55,6 +55,7 @@ namespace repl {
void ReplicationCoordinatorExternalStateMock::startThreads() {}
void ReplicationCoordinatorExternalStateMock::startMasterSlave(OperationContext*) {}
+ void ReplicationCoordinatorExternalStateMock::initiateOplog(OperationContext* txn) {}
void ReplicationCoordinatorExternalStateMock::shutdown() {}
void ReplicationCoordinatorExternalStateMock::forwardSlaveHandshake() {}
void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {}
diff --git a/src/mongo/db/repl/repl_coordinator_external_state_mock.h b/src/mongo/db/repl/repl_coordinator_external_state_mock.h
index 5c1511d16bc..568ab190de7 100644
--- a/src/mongo/db/repl/repl_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_external_state_mock.h
@@ -51,6 +51,7 @@ namespace repl {
virtual void startThreads();
virtual void startMasterSlave(OperationContext*);
virtual void shutdown();
+ virtual void initiateOplog(OperationContext* txn);
virtual void forwardSlaveHandshake();
virtual void forwardSlaveProgress();
virtual OID ensureMe(OperationContext*);
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 6e4e5d6fec1..ad835cc531d 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -268,6 +268,7 @@ namespace {
lk.unlock();
}
_performPostMemberStateUpdateAction(action);
+ _externalState->startThreads();
}
void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) {
@@ -297,8 +298,6 @@ namespace {
_topCoordDriverThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
&_replExecutor)));
- _externalState->startThreads();
-
bool doneLoadingConfig = _startLoadLocalConfig(txn);
if (doneLoadingConfig) {
// If we're not done loading the config, then the config state will be set by
@@ -1875,6 +1874,12 @@ namespace {
configStateGuard.Dismiss();
fassert(18654, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
+
+ if (status.isOK()) {
+ // Create the oplog with the first entry, and start repl threads.
+ _externalState->initiateOplog(txn);
+ _externalState->startThreads();
+ }
return status;
}
diff --git a/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp
index c2b6fe0a22d..4ec8f4060ba 100644
--- a/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl_heartbeat.cpp
@@ -424,6 +424,10 @@ namespace {
}
return;
}
+
+ lk.unlock();
+
+ _externalState->startThreads();
}
const stdx::function<void (const ReplicationExecutor::CallbackData&)> reconfigFinishFn(
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index b4725c347bc..52a8bc819f5 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -279,10 +279,6 @@ namespace {
Status status = getGlobalReplicationCoordinator()->processReplSetInitiate(txn,
configObj,
&result);
- if (status.isOK()) {
- createOplog(txn);
- logOpInitiate(txn, BSON("msg" << "initiating set"));
- }
return appendCommandStatus(result, status);
}
} cmdReplSetInitiate;