diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 464 |
1 files changed, 224 insertions, 240 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index a1b5c609bf8..34976b02ba5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -70,272 +70,256 @@ namespace mongo { namespace repl { namespace { - const char configCollectionName[] = "local.system.replset"; - const char configDatabaseName[] = "local"; - const char lastVoteCollectionName[] = "local.replset.election"; - const char lastVoteDatabaseName[] = "local"; - const char meCollectionName[] = "local.me"; - const char meDatabaseName[] = "local"; - const char tsFieldName[] = "ts"; +const char configCollectionName[] = "local.system.replset"; +const char configDatabaseName[] = "local"; +const char lastVoteCollectionName[] = "local.replset.election"; +const char lastVoteDatabaseName[] = "local"; +const char meCollectionName[] = "local.me"; +const char meDatabaseName[] = "local"; +const char tsFieldName[] = "ts"; } // namespace - ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() : - _startedThreads(false) - , _nextThreadId(0) {} - ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {} +ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() + : _startedThreads(false), _nextThreadId(0) {} +ReplicationCoordinatorExternalStateImpl::~ReplicationCoordinatorExternalStateImpl() {} - void ReplicationCoordinatorExternalStateImpl::startThreads() { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); - if (_startedThreads) { - return; - } - log() << "Starting replication applier threads"; - _applierThread.reset(new stdx::thread(runSyncThread)); +void ReplicationCoordinatorExternalStateImpl::startThreads() { + stdx::lock_guard<stdx::mutex> lk(_threadMutex); + if (_startedThreads) { + return; + } + log() << "Starting replication applier threads"; + _applierThread.reset(new stdx::thread(runSyncThread)); + BackgroundSync* bgsync = BackgroundSync::get(); + _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, bgsync))); + _syncSourceFeedbackThread.reset( + new stdx::thread(stdx::bind(&SyncSourceFeedback::run, &_syncSourceFeedback))); + _startedThreads = true; +} + +void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) { + repl::startMasterSlave(txn); +} + +void ReplicationCoordinatorExternalStateImpl::shutdown() { + stdx::lock_guard<stdx::mutex> lk(_threadMutex); + if (_startedThreads) { + log() << "Stopping replication applier threads"; + _syncSourceFeedback.shutdown(); + _syncSourceFeedbackThread->join(); + _applierThread->join(); BackgroundSync* bgsync = BackgroundSync::get(); - _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::producerThread, - bgsync))); - _syncSourceFeedbackThread.reset(new stdx::thread(stdx::bind(&SyncSourceFeedback::run, - &_syncSourceFeedback))); - _startedThreads = true; + bgsync->shutdown(); + _producerThread->join(); } +} - void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* txn) { - repl::startMasterSlave(txn); - } +void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) { + createOplog(txn); - void ReplicationCoordinatorExternalStateImpl::shutdown() { - stdx::lock_guard<stdx::mutex> lk(_threadMutex); - if (_startedThreads) { - log() << "Stopping replication applier threads"; - _syncSourceFeedback.shutdown(); - _syncSourceFeedbackThread->join(); - _applierThread->join(); - BackgroundSync* bgsync = BackgroundSync::get(); - bgsync->shutdown(); - _producerThread->join(); - } - } - - void ReplicationCoordinatorExternalStateImpl::initiateOplog(OperationContext* txn) { - createOplog(txn); + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction scopedXact(txn, MODE_X); + Lock::GlobalWrite globalWrite(txn->lockState()); - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(txn, MODE_X); - Lock::GlobalWrite globalWrite(txn->lockState()); - - WriteUnitOfWork wuow(txn); - getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSON("msg" << "initiating set")); - wuow.commit(); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs"); + WriteUnitOfWork wuow(txn); + getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, + BSON("msg" + << "initiating set")); + wuow.commit(); } - - void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() { - _syncSourceFeedback.forwardSlaveProgress(); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs"); +} + +void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() { + _syncSourceFeedback.forwardSlaveProgress(); +} + +OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) { + std::string myname = getHostName(); + OID myRID; + { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lock(txn->lockState(), meDatabaseName, MODE_X); + + BSONObj me; + // local.me is an identifier for a server for getLastError w:2+ + // TODO: handle WriteConflictExceptions below + if (!Helpers::getSingleton(txn, meCollectionName, me) || !me.hasField("host") || + me["host"].String() != myname) { + myRID = OID::gen(); + + // clean out local.me + Helpers::emptyCollection(txn, meCollectionName); + + // repopulate + BSONObjBuilder b; + b.append("_id", myRID); + b.append("host", myname); + Helpers::putSingleton(txn, meCollectionName, b.done()); + } else { + myRID = me["_id"].OID(); + } } + return myRID; +} - OID ReplicationCoordinatorExternalStateImpl::ensureMe(OperationContext* txn) { - std::string myname = getHostName(); - OID myRID; - { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lock(txn->lockState(), meDatabaseName, MODE_X); - - BSONObj me; - // local.me is an identifier for a server for getLastError w:2+ - // TODO: handle WriteConflictExceptions below - if (!Helpers::getSingleton(txn, meCollectionName, me) || - !me.hasField("host") || - me["host"].String() != myname) { - - myRID = OID::gen(); - - // clean out local.me - Helpers::emptyCollection(txn, meCollectionName); - - // repopulate - BSONObjBuilder b; - b.append("_id", myRID); - b.append("host", myname); - Helpers::putSingleton(txn, meCollectionName, b.done()); - } else { - myRID = me["_id"].OID(); +StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument( + OperationContext* txn) { + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + BSONObj config; + if (!Helpers::getSingleton(txn, configCollectionName, config)) { + return StatusWith<BSONObj>( + ErrorCodes::NoMatchingDocument, + str::stream() << "Did not find replica set configuration document in " + << configCollectionName); } + return StatusWith<BSONObj>(config); } - return myRID; + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "load replica set config", configCollectionName); + } catch (const DBException& ex) { + return StatusWith<BSONObj>(ex.toStatus()); } +} - StatusWith<BSONObj> ReplicationCoordinatorExternalStateImpl::loadLocalConfigDocument( - OperationContext* txn) { - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - BSONObj config; - if (!Helpers::getSingleton(txn, configCollectionName, config)) { - return StatusWith<BSONObj>( - ErrorCodes::NoMatchingDocument, - str::stream() << "Did not find replica set configuration document in " - << configCollectionName); - } - return StatusWith<BSONObj>(config); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, - "load replica set config", - configCollectionName); - } - catch (const DBException& ex) { - return StatusWith<BSONObj>(ex.toStatus()); +Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument(OperationContext* txn, + const BSONObj& config) { + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X); + Helpers::putSingleton(txn, configCollectionName, config); + return Status::OK(); } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "save replica set config", configCollectionName); + } catch (const DBException& ex) { + return ex.toStatus(); } +} - Status ReplicationCoordinatorExternalStateImpl::storeLocalConfigDocument( - OperationContext* txn, - const BSONObj& config) { - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbWriteLock(txn->lockState(), configDatabaseName, MODE_X); - Helpers::putSingleton(txn, configCollectionName, config); - return Status::OK(); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, - "save replica set config", - configCollectionName); - } - catch (const DBException& ex) { - return ex.toStatus(); +StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteDocument( + OperationContext* txn) { + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + BSONObj lastVoteObj; + if (!Helpers::getSingleton(txn, lastVoteCollectionName, lastVoteObj)) { + return StatusWith<LastVote>(ErrorCodes::NoMatchingDocument, + str::stream() + << "Did not find replica set lastVote document in " + << lastVoteCollectionName); + } + LastVote lastVote; + lastVote.initialize(lastVoteObj); + return StatusWith<LastVote>(lastVote); } - + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, "load replica set lastVote", lastVoteCollectionName); + } catch (const DBException& ex) { + return StatusWith<LastVote>(ex.toStatus()); } +} - StatusWith<LastVote> ReplicationCoordinatorExternalStateImpl::loadLocalLastVoteDocument( - OperationContext* txn) { - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - BSONObj lastVoteObj; - if (!Helpers::getSingleton(txn, lastVoteCollectionName, lastVoteObj)) { - return StatusWith<LastVote>( - ErrorCodes::NoMatchingDocument, - str::stream() << "Did not find replica set lastVote document in " - << lastVoteCollectionName); - } - LastVote lastVote; - lastVote.initialize(lastVoteObj); - return StatusWith<LastVote>(lastVote); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, - "load replica set lastVote", - lastVoteCollectionName); - } - catch (const DBException& ex) { - return StatusWith<LastVote>(ex.toStatus()); +Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( + OperationContext* txn, const LastVote& lastVote) { + BSONObj lastVoteObj = lastVote.toBSON(); + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbWriteLock(txn->lockState(), lastVoteDatabaseName, MODE_X); + Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj); + return Status::OK(); } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, "save replica set lastVote", lastVoteCollectionName); + MONGO_UNREACHABLE; + } catch (const DBException& ex) { + return ex.toStatus(); } - - Status ReplicationCoordinatorExternalStateImpl::storeLocalLastVoteDocument( - OperationContext* txn, - const LastVote& lastVote) { - BSONObj lastVoteObj = lastVote.toBSON(); - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbWriteLock(txn->lockState(), lastVoteDatabaseName, MODE_X); - Helpers::putSingleton(txn, lastVoteCollectionName, lastVoteObj); - return Status::OK(); - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, - "save replica set lastVote", - lastVoteCollectionName); - MONGO_UNREACHABLE; - } - catch (const DBException& ex) { - return ex.toStatus(); +} + +void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) { + setNewTimestamp(newTime); +} + +StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) { + // TODO: handle WriteConflictExceptions below + try { + BSONObj oplogEntry; + if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) { + return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument, + str::stream() << "Did not find any entries in " + << rsOplogName); } - - } - - void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp& newTime) { - setNewTimestamp(newTime); - } - - StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( - OperationContext* txn) { - - // TODO: handle WriteConflictExceptions below - try { - BSONObj oplogEntry; - if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) { - return StatusWith<OpTime>( - ErrorCodes::NoMatchingDocument, - str::stream() << "Did not find any entries in " << rsOplogName); - } - BSONElement tsElement = oplogEntry[tsFieldName]; - if (tsElement.eoo()) { - return StatusWith<OpTime>( - ErrorCodes::NoSuchKey, - str::stream() << "Most recent entry in " << rsOplogName << " missing \"" << - tsFieldName << "\" field"); - } - if (tsElement.type() != bsonTimestamp) { - return StatusWith<OpTime>( - ErrorCodes::TypeMismatch, - str::stream() << "Expected type of \"" << tsFieldName << - "\" in most recent " << rsOplogName << - " entry to have type Timestamp, but found " << typeName(tsElement.type())); - } - return StatusWith<OpTime>(extractOpTime(oplogEntry)); + BSONElement tsElement = oplogEntry[tsFieldName]; + if (tsElement.eoo()) { + return StatusWith<OpTime>(ErrorCodes::NoSuchKey, + str::stream() << "Most recent entry in " << rsOplogName + << " missing \"" << tsFieldName << "\" field"); } - catch (const DBException& ex) { - return StatusWith<OpTime>(ex.toStatus()); + if (tsElement.type() != bsonTimestamp) { + return StatusWith<OpTime>(ErrorCodes::TypeMismatch, + str::stream() << "Expected type of \"" << tsFieldName + << "\" in most recent " << rsOplogName + << " entry to have type Timestamp, but found " + << typeName(tsElement.type())); } + return StatusWith<OpTime>(extractOpTime(oplogEntry)); + } catch (const DBException& ex) { + return StatusWith<OpTime>(ex.toStatus()); } - - bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host) { - return repl::isSelf(host); - - } - - HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( - const OperationContext* txn) { - return HostAndPort(txn->getClient()->clientAddress(true)); - } - - void ReplicationCoordinatorExternalStateImpl::closeConnections() { - MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen); - } - - void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { - ServiceContext* environment = getGlobalServiceContext(); - environment->killAllUserOperations(txn); - } - - void ReplicationCoordinatorExternalStateImpl::clearShardingState() { - shardingState.clearCollectionMetadata(); - } - - void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { - BackgroundSync::get()->clearSyncTarget(); - } - - OperationContext* ReplicationCoordinatorExternalStateImpl::createOperationContext( - const std::string& threadName) { - Client::initThreadIfNotAlready(threadName.c_str()); - return new OperationContextImpl(); - } - - void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationContext* txn) { - std::vector<std::string> dbNames; - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - storageEngine->listDatabases(&dbNames); - - for (std::vector<std::string>::iterator it = dbNames.begin(); it != dbNames.end(); ++it) { - // The local db is special because it isn't replicated. It is cleared at startup even on - // replica set members. - if (*it == "local") - continue; - LOG(2) << "Removing temporary collections from " << *it; - Database* db = dbHolder().get(txn, *it); - // Since we must be holding the global lock during this function, if listDatabases - // returned this dbname, we should be able to get a reference to it - it can't have - // been dropped. - invariant(db); - db->clearTmpCollections(txn); - } +} + +bool ReplicationCoordinatorExternalStateImpl::isSelf(const HostAndPort& host) { + return repl::isSelf(host); +} + +HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( + const OperationContext* txn) { + return HostAndPort(txn->getClient()->clientAddress(true)); +} + +void ReplicationCoordinatorExternalStateImpl::closeConnections() { + MessagingPort::closeAllSockets(executor::NetworkInterface::kMessagingPortKeepOpen); +} + +void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* txn) { + ServiceContext* environment = getGlobalServiceContext(); + environment->killAllUserOperations(txn); +} + +void ReplicationCoordinatorExternalStateImpl::clearShardingState() { + shardingState.clearCollectionMetadata(); +} + +void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { + BackgroundSync::get()->clearSyncTarget(); +} + +OperationContext* ReplicationCoordinatorExternalStateImpl::createOperationContext( + const std::string& threadName) { + Client::initThreadIfNotAlready(threadName.c_str()); + return new OperationContextImpl(); +} + +void ReplicationCoordinatorExternalStateImpl::dropAllTempCollections(OperationContext* txn) { + std::vector<std::string> dbNames; + StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); + storageEngine->listDatabases(&dbNames); + + for (std::vector<std::string>::iterator it = dbNames.begin(); it != dbNames.end(); ++it) { + // The local db is special because it isn't replicated. It is cleared at startup even on + // replica set members. + if (*it == "local") + continue; + LOG(2) << "Removing temporary collections from " << *it; + Database* db = dbHolder().get(txn, *it); + // Since we must be holding the global lock during this function, if listDatabases + // returned this dbname, we should be able to get a reference to it - it can't have + // been dropped. + invariant(db); + db->clearTmpCollections(txn); } +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |