diff options
author | Randolph Tan <randolph@10gen.com> | 2017-02-23 14:57:14 -0500 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-03-07 17:12:01 -0500 |
commit | 43be7d92a003f075a5130b2eb38175a062d0729d (patch) | |
tree | bb751ca50470551a85fd2f36b85d21f22d7820e8 /src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | |
parent | 07c73edf683342d91ba941dd2733ff9e9e6fa6c0 (diff) | |
download | mongo-43be7d92a003f075a5130b2eb38175a062d0729d.tar.gz |
SERVER-27749 Integrate LogicalTimeMetadata
Attach logical time metadata to all outgoing messages and process incoming logical time metadata in mongod
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 36d8a84fd06..04a2fcf88f8 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -46,6 +46,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" +#include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/op_observer.h" #include "mongo/db/repair_database.h" #include "mongo/db/repl/bgsync.h" @@ -187,8 +188,9 @@ std::unique_ptr<ThreadPool> makeThreadPool() { } // namespace ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl( - StorageInterface* storageInterface) - : _storageInterface(storageInterface), + ServiceContext* service, StorageInterface* storageInterface) + : _service(service), + _storageInterface(storageInterface), _initialSyncThreadPool(OldThreadPool::DoNotStartThreadsTag(), 1, "initial sync-"), _initialSyncRunner(&_initialSyncThreadPool) { uassert(ErrorCodes::BadValue, "A StorageInterface is required.", _storageInterface); @@ -299,14 +301,14 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) { log() << "Starting replication snapshot thread"; - _snapshotThread = SnapshotThread::start(getGlobalServiceContext()); + _snapshotThread = SnapshotThread::start(_service); } log() << "Starting replication storage threads"; - getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this); + _service->getGlobalStorageEngine()->setJournalListener(this); auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>(); - // TODO SERVER-27750: add LogicalTimeMetadataHook + hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(_service)); _taskExecutor = stdx::make_unique<executor::ThreadPoolTaskExecutor>( makeThreadPool(), executor::makeNetworkInterface("NetworkInterfaceASIO-RS", nullptr, std::move(hookList))); @@ -365,7 +367,7 @@ Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationCont try { ScopedTransaction scopedXact(opCtx, MODE_X); Lock::GlobalWrite globalWrite(opCtx->lockState()); - StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine(); + StorageEngine* engine = _service->getGlobalStorageEngine(); if (!engine->isMmapV1()) { return Status::OK(); @@ -395,7 +397,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati Helpers::putSingleton(opCtx, configCollectionName, config); const auto msgObj = BSON("msg" << "initiating set"); - getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, msgObj); + _service->getOpObserver()->onOpMessage(opCtx, msgObj); wuow.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "initiate oplog entry", "local.oplog.rs"); @@ -717,7 +719,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); + _service->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* opCtx) { @@ -727,10 +729,10 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) { - Balancer::get(getGlobalServiceContext())->interruptBalancer(); + Balancer::get(_service)->interruptBalancer(); } - ShardingState::get(getGlobalServiceContext())->markCollectionsNotShardedAtStepdown(); + ShardingState::get(_service)->markCollectionsNotShardedAtStepdown(); } void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook( @@ -829,7 +831,7 @@ void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() { void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* opCtx) { std::vector<std::string> dbNames; - StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); + StorageEngine* storageEngine = _service->getGlobalStorageEngine(); storageEngine->listDatabases(&dbNames); for (std::vector<std::string>::iterator it = dbNames.begin(); it != dbNames.end(); ++it) { @@ -848,19 +850,19 @@ void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationC } void ReplicationCoordinatorExternalStateImpl::dropAllSnapshots() { - if (auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager()) + if (auto manager = _service->getGlobalStorageEngine()->getSnapshotManager()) manager->dropAllSnapshots(); } void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotName newCommitPoint) { - auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); + auto manager = _service->getGlobalStorageEngine()->getSnapshotManager(); invariant(manager); // This should never be called if there is no SnapshotManager. manager->setCommittedSnapshot(newCommitPoint); } void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx, SnapshotName name) { - auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager(); + auto manager = _service->getGlobalStorageEngine()->getSnapshotManager(); invariant(manager); // This should never be called if there is no SnapshotManager. manager->createSnapshot(opCtx, name); } |