summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp464
1 files changed, 224 insertions, 240 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index a1b5c609bf8..34976b02ba5 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -70,272 +70,256 @@ namespace mongo {
namespace repl {
namespace {
- const char configCollectionName[] = "local.system.replset";
- const char configDatabaseName[] = "local";
- const char lastVoteCollectionName[] = "local.replset.election";
- const char lastVoteDatabaseName[] = "local";
- const char meCollectionName[] = "local.me";
- const char meDatabaseName[] = "local";
- const char tsFieldName[] = "ts";
+const char configCollectionName[] = "local.system.replset";
+const char configDatabaseName[] = "local";
+const char lastVoteCollectionName[] = "local.replset.election";
+const char lastVoteDatabaseName[] = "local";
+const char meCollectionName[] = "local.me";
+const char meDatabaseName[] = "local";
+const char tsFieldName[] = "ts";
} // namespace
- ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() :
- _startedThreads(false)
- , _nextThreadId(0) {}
- ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {}
+ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl()
+ : _startedThreads(false), _nextThreadId(0) {}
+ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {}
- void ReplicationCoordinatorExternalStateImpl::startThreads() {
- stdx::lock_guard<stdx::mutex> lk(_threadMutex);
- if (_startedThreads) {
- return;
- }
- log() << "Starting replication applier threads";
- _applierThread.reset(new stdx::thread(runSyncThread));
+void ReplicationCoordinatorExternalStateImpl::startThreads() {
+ stdx::lock_guard<stdx::mutex> lk(_threadMutex);
+ if (_startedThreads) {
+ return;
+ }
+ log() << "Starting replication applier threads";
+ _applierThread.reset(new stdx::thread(runSyncThread));
+ BackgroundSync* bgsync = BackgroundSync::get();
+ _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync)));
+ _syncSourceFeedbackThread.reset(
+ new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback)));
+ _startedThreads = true;
+}
+
+void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) {
+ repl::startMasterSlave(txn);
+}
+
+void ReplicationCoordinatorExternalStateImpl::shutdown() {
+ stdx::lock_guard<stdx::mutex> lk(_threadMutex);
+ if (_startedThreads) {
+ log() << "Stopping replication applier threads";
+ _syncSourceFeedback.shutdown();
+ _syncSourceFeedbackThread->join();
+ _applierThread->join();
BackgroundSync* bgsync = BackgroundSync::get();
- _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread,
- bgsync)));
- _syncSourceFeedbackThread.reset(new stdx::thread(stdx::bind(&SyncSourceFeedback::run,
- &_syncSourceFeedback)));
- _startedThreads = true;
+ bgsync->shutdown();
+ _producerThread->join();
}
+}
- void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) {
- repl::startMasterSlave(txn);
- }
+void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) {
+ createOplog(txn);
- void ReplicationCoordinatorExternalStateImpl::shutdown() {
- stdx::lock_guard<stdx::mutex> lk(_threadMutex);
- if (_startedThreads) {
- log() << "Stopping replication applier threads";
- _syncSourceFeedback.shutdown();
- _syncSourceFeedbackThread->join();
- _applierThread->join();
- BackgroundSync* bgsync = BackgroundSync::get();
- bgsync->shutdown();
- _producerThread->join();
- }
- }
-
- void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) {
- createOplog(txn);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction scopedXact(txn, MODE_X);
+ Lock::GlobalWrite globalWrite(txn->lockState());
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(txn, MODE_X);
- Lock::GlobalWrite globalWrite(txn->lockState());
-
- WriteUnitOfWork wuow(txn);
- getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSON("msg" << "initiating set"));
- wuow.commit();
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs");
+ WriteUnitOfWork wuow(txn);
+ getGlobalServiceContext()->getOpObserver()->onOpMessage(txn,
+ BSON("msg"
+ << "initiating set"));
+ wuow.commit();
}
-
- void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() {
- _syncSourceFeedback.forwardSlaveProgress();
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs");
+}
+
+void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() {
+ _syncSourceFeedback.forwardSlaveProgress();
+}
+
+OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) {
+ std::string myname = getHostName();
+ OID myRID;
+ {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lock(txn->lockState(), meDatabaseName, MODE_X);
+
+ BSONObj me;
+ // local.me is an identifier for a server for getLastError w:2+
+ // TODO: handle WriteConflictExceptions below
+ if (!Helpers::getSingleton(txn, meCollectionName, me) || !me.hasField("host") ||
+ me["host"].String() != myname) {
+ myRID = OID::gen();
+
+ // clean out local.me
+ Helpers::emptyCollection(txn, meCollectionName);
+
+ // repopulate
+ BSONObjBuilder b;
+ b.append("_id", myRID);
+ b.append("host", myname);
+ Helpers::putSingleton(txn, meCollectionName, b.done());
+ } else {
+ myRID = me["_id"].OID();
+ }
}
+ return myRID;
+}
- OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) {
- std::string myname = getHostName();
- OID myRID;
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lock(txn->lockState(), meDatabaseName, MODE_X);
-
- BSONObj me;
- // local.me is an identifier for a server for getLastError w:2+
- // TODO: handle WriteConflictExceptions below
- if (!Helpers::getSingleton(txn, meCollectionName, me) ||
- !me.hasField("host") ||
- me["host"].String() != myname) {
-
- myRID = OID::gen();
-
- // clean out local.me
- Helpers::emptyCollection(txn, meCollectionName);
-
- // repopulate
- BSONObjBuilder b;
- b.append("_id", myRID);
- b.append("host", myname);
- Helpers::putSingleton(txn, meCollectionName, b.done());
- } else {
- myRID = me["_id"].OID();
+StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument(
+ OperationContext* txn) {
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ BSONObj config;
+ if (!Helpers::getSingleton(txn, configCollectionName, config)) {
+ return StatusWith<BSONObj>(
+ ErrorCodes::NoMatchingDocument,
+ str::stream() << "Did not find replica set configuration document in "
+ << configCollectionName);
}
+ return StatusWith<BSONObj>(config);
}
- return myRID;
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "load replica set config", configCollectionName);
+ } catch (const DBException& ex) {
+ return StatusWith<BSONObj>(ex.toStatus());
}
+}
- StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument(
- OperationContext* txn) {
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- BSONObj config;
- if (!Helpers::getSingleton(txn, configCollectionName, config)) {
- return StatusWith<BSONObj>(
- ErrorCodes::NoMatchingDocument,
- str::stream() << "Did not find replica set configuration document in "
- << configCollectionName);
- }
- return StatusWith<BSONObj>(config);
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn,
- "load replica set config",
- configCollectionName);
- }
- catch (const DBException& ex) {
- return StatusWith<BSONObj>(ex.toStatus());
+Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(OperationContext* txn,
+ const BSONObj& config) {
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X);
+ Helpers::putSingleton(txn, configCollectionName, config);
+ return Status::OK();
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "save replica set config", configCollectionName);
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
+}
- Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(
- OperationContext* txn,
- const BSONObj& config) {
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X);
- Helpers::putSingleton(txn, configCollectionName, config);
- return Status::OK();
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn,
- "save replica set config",
- configCollectionName);
- }
- catch (const DBException& ex) {
- return ex.toStatus();
+StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteDocument(
+ OperationContext* txn) {
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ BSONObj lastVoteObj;
+ if (!Helpers::getSingleton(txn, lastVoteCollectionName, lastVoteObj)) {
+ return StatusWith<LastVote>(ErrorCodes::NoMatchingDocument,
+ str::stream()
+ << "Did not find replica set lastVote document in "
+ << lastVoteCollectionName);
+ }
+ LastVote lastVote;
+ lastVote.initialize(lastVoteObj);
+ return StatusWith<LastVote>(lastVote);
}
-
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn, "load replica set lastVote", lastVoteCollectionName);
+ } catch (const DBException& ex) {
+ return StatusWith<LastVote>(ex.toStatus());
}
+}
- StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteDocument(
- OperationContext* txn) {
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- BSONObj lastVoteObj;
- if (!Helpers::getSingleton(txn, lastVoteCollectionName, lastVoteObj)) {
- return StatusWith<LastVote>(
- ErrorCodes::NoMatchingDocument,
- str::stream() << "Did not find replica set lastVote document in "
- << lastVoteCollectionName);
- }
- LastVote lastVote;
- lastVote.initialize(lastVoteObj);
- return StatusWith<LastVote>(lastVote);
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn,
- "load replica set lastVote",
- lastVoteCollectionName);
- }
- catch (const DBException& ex) {
- return StatusWith<LastVote>(ex.toStatus());
+Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
+ OperationContext* txn, const LastVote& lastVote) {
+ BSONObj lastVoteObj = lastVote.toBSON();
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbWriteLock(txn->lockState(), lastVoteDatabaseName, MODE_X);
+ Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj);
+ return Status::OK();
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn, "save replica set lastVote", lastVoteCollectionName);
+ MONGO_UNREACHABLE;
+ } catch (const DBException& ex) {
+ return ex.toStatus();
}
-
- Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument(
- OperationContext* txn,
- const LastVote& lastVote) {
- BSONObj lastVoteObj = lastVote.toBSON();
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbWriteLock(txn->lockState(), lastVoteDatabaseName, MODE_X);
- Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj);
- return Status::OK();
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn,
- "save replica set lastVote",
- lastVoteCollectionName);
- MONGO_UNREACHABLE;
- }
- catch (const DBException& ex) {
- return ex.toStatus();
+}
+
+void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) {
+ setNewTimestamp(newTime);
+}
+
+StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) {
+ // TODO: handle WriteConflictExceptions below
+ try {
+ BSONObj oplogEntry;
+ if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) {
+ return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument,
+ str::stream() << "Did not find any entries in "
+ << rsOplogName);
}
-
- }
-
- void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) {
- setNewTimestamp(newTime);
- }
-
- StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
- OperationContext* txn) {
-
- // TODO: handle WriteConflictExceptions below
- try {
- BSONObj oplogEntry;
- if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) {
- return StatusWith<OpTime>(
- ErrorCodes::NoMatchingDocument,
- str::stream() << "Did not find any entries in " << rsOplogName);
- }
- BSONElement tsElement = oplogEntry[tsFieldName];
- if (tsElement.eoo()) {
- return StatusWith<OpTime>(
- ErrorCodes::NoSuchKey,
- str::stream() << "Most recent entry in " << rsOplogName << " missing \"" <<
- tsFieldName << "\" field");
- }
- if (tsElement.type() != bsonTimestamp) {
- return StatusWith<OpTime>(
- ErrorCodes::TypeMismatch,
- str::stream() << "Expected type of \"" << tsFieldName <<
- "\" in most recent " << rsOplogName <<
- " entry to have type Timestamp, but found " << typeName(tsElement.type()));
- }
- return StatusWith<OpTime>(extractOpTime(oplogEntry));
+ BSONElement tsElement = oplogEntry[tsFieldName];
+ if (tsElement.eoo()) {
+ return StatusWith<OpTime>(ErrorCodes::NoSuchKey,
+ str::stream() << "Most recent entry in " << rsOplogName
+ << " missing \"" << tsFieldName << "\" field");
}
- catch (const DBException& ex) {
- return StatusWith<OpTime>(ex.toStatus());
+ if (tsElement.type() != bsonTimestamp) {
+ return StatusWith<OpTime>(ErrorCodes::TypeMismatch,
+ str::stream() << "Expected type of \"" << tsFieldName
+ << "\" in most recent " << rsOplogName
+ << " entry to have type Timestamp, but found "
+ << typeName(tsElement.type()));
}
+ return StatusWith<OpTime>(extractOpTime(oplogEntry));
+ } catch (const DBException& ex) {
+ return StatusWith<OpTime>(ex.toStatus());
}
-
- bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host) {
- return repl::isSelf(host);
-
- }
-
- HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
- const OperationContext* txn) {
- return HostAndPort(txn->getClient()->clientAddress(true));
- }
-
- void ReplicationCoordinatorExternalStateImpl::closeConnections() {
- MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen);
- }
-
- void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
- ServiceContext* environment = getGlobalServiceContext();
- environment->killAllUserOperations(txn);
- }
-
- void ReplicationCoordinatorExternalStateImpl::clearShardingState() {
- shardingState.clearCollectionMetadata();
- }
-
- void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
- BackgroundSync::get()->clearSyncTarget();
- }
-
- OperationContext* ReplicationCoordinatorExternalStateImpl::createOperationContext(
- const std::string& threadName) {
- Client::initThreadIfNotAlready(threadName.c_str());
- return new OperationContextImpl();
- }
-
- void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationContext* txn) {
- std::vector<std::string> dbNames;
- StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
- storageEngine->listDatabases(&dbNames);
-
- for (std::vector<std::string>::iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
- // The local db is special because it isn't replicated. It is cleared at startup even on
- // replica set members.
- if (*it == "local")
- continue;
- LOG(2) << "Removing temporary collections from " << *it;
- Database* db = dbHolder().get(txn, *it);
- // Since we must be holding the global lock during this function, if listDatabases
- // returned this dbname, we should be able to get a reference to it - it can't have
- // been dropped.
- invariant(db);
- db->clearTmpCollections(txn);
- }
+}
+
+bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host) {
+ return repl::isSelf(host);
+}
+
+HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
+ const OperationContext* txn) {
+ return HostAndPort(txn->getClient()->clientAddress(true));
+}
+
+void ReplicationCoordinatorExternalStateImpl::closeConnections() {
+ MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen);
+}
+
+void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) {
+ ServiceContext* environment = getGlobalServiceContext();
+ environment->killAllUserOperations(txn);
+}
+
+void ReplicationCoordinatorExternalStateImpl::clearShardingState() {
+ shardingState.clearCollectionMetadata();
+}
+
+void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() {
+ BackgroundSync::get()->clearSyncTarget();
+}
+
+OperationContext* ReplicationCoordinatorExternalStateImpl::createOperationContext(
+ const std::string& threadName) {
+ Client::initThreadIfNotAlready(threadName.c_str());
+ return new OperationContextImpl();
+}
+
+void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationContext* txn) {
+ std::vector<std::string> dbNames;
+ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
+ storageEngine->listDatabases(&dbNames);
+
+ for (std::vector<std::string>::iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
+ // The local db is special because it isn't replicated. It is cleared at startup even on
+ // replica set members.
+ if (*it == "local")
+ continue;
+ LOG(2) << "Removing temporary collections from " << *it;
+ Database* db = dbHolder().get(txn, *it);
+ // Since we must be holding the global lock during this function, if listDatabases
+ // returned this dbname, we should be able to get a reference to it - it can't have
+ // been dropped.
+ invariant(db);
+ db->clearTmpCollections(txn);
}
+}
-} // namespace repl
-} // namespace mongo
+} // namespace repl
+} // namespace mongo