summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-02-23 14:57:14 -0500
committerRandolph Tan <randolph@10gen.com>2017-03-07 17:12:01 -0500
commit43be7d92a003f075a5130b2eb38175a062d0729d (patch)
treebb751ca50470551a85fd2f36b85d21f22d7820e8 /src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
parent07c73edf683342d91ba941dd2733ff9e9e6fa6c0 (diff)
downloadmongo-43be7d92a003f075a5130b2eb38175a062d0729d.tar.gz
SERVER-27749 Integrate LogicalTimeMetadata
Attach logical time metadata to all outgoing messages and process incoming logical time metadata in mongod
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp30
1 files changed, 16 insertions, 14 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 36d8a84fd06..04a2fcf88f8 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/repair_database.h"
#include "mongo/db/repl/bgsync.h"
@@ -187,8 +188,9 @@ std::unique_ptr<ThreadPool> makeThreadPool() {
} // namespace
ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl(
- StorageInterface* storageInterface)
- : _storageInterface(storageInterface),
+ ServiceContext* service, StorageInterface* storageInterface)
+ : _service(service),
+ _storageInterface(storageInterface),
_initialSyncThreadPool(OldThreadPool::DoNotStartThreadsTag(), 1, "initial sync-"),
_initialSyncRunner(&_initialSyncThreadPool) {
uassert(ErrorCodes::BadValue, "A StorageInterface is required.", _storageInterface);
@@ -299,14 +301,14 @@ void ReplicationCoordinatorExternalStateImpl::startThreads(const ReplSettings& s
if (settings.isMajorityReadConcernEnabled() || enableReplSnapshotThread) {
log() << "Starting replication snapshot thread";
- _snapshotThread = SnapshotThread::start(getGlobalServiceContext());
+ _snapshotThread = SnapshotThread::start(_service);
}
log() << "Starting replication storage threads";
- getGlobalServiceContext()->getGlobalStorageEngine()->setJournalListener(this);
+ _service->getGlobalStorageEngine()->setJournalListener(this);
auto hookList = stdx::make_unique<rpc::EgressMetadataHookList>();
- // TODO SERVER-27750: add LogicalTimeMetadataHook
+ hookList->addHook(stdx::make_unique<rpc::LogicalTimeMetadataHook>(_service));
_taskExecutor = stdx::make_unique<executor::ThreadPoolTaskExecutor>(
makeThreadPool(),
executor::makeNetworkInterface("NetworkInterfaceASIO-RS", nullptr, std::move(hookList)));
@@ -365,7 +367,7 @@ Status ReplicationCoordinatorExternalStateImpl::runRepairOnLocalDB(OperationCont
try {
ScopedTransaction scopedXact(opCtx, MODE_X);
Lock::GlobalWrite globalWrite(opCtx->lockState());
- StorageEngine* engine = getGlobalServiceContext()->getGlobalStorageEngine();
+ StorageEngine* engine = _service->getGlobalStorageEngine();
if (!engine->isMmapV1()) {
return Status::OK();
@@ -395,7 +397,7 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
Helpers::putSingleton(opCtx, configCollectionName, config);
const auto msgObj = BSON("msg"
<< "initiating set");
- getGlobalServiceContext()->getOpObserver()->onOpMessage(opCtx, msgObj);
+ _service->getOpObserver()->onOpMessage(opCtx, msgObj);
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "initiate oplog entry", "local.oplog.rs");
@@ -717,7 +719,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort(
}
void ReplicationCoordinatorExternalStateImpl::closeConnections() {
- getGlobalServiceContext()->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen);
+ _service->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen);
}
void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* opCtx) {
@@ -727,10 +729,10 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon
void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
- Balancer::get(getGlobalServiceContext())->interruptBalancer();
+ Balancer::get(_service)->interruptBalancer();
}
- ShardingState::get(getGlobalServiceContext())->markCollectionsNotShardedAtStepdown();
+ ShardingState::get(_service)->markCollectionsNotShardedAtStepdown();
}
void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook(
@@ -829,7 +831,7 @@ void ReplicationCoordinatorExternalStateImpl::startProducerIfStopped() {
void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationContext* opCtx) {
std::vector<std::string> dbNames;
- StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
+ StorageEngine* storageEngine = _service->getGlobalStorageEngine();
storageEngine->listDatabases(&dbNames);
for (std::vector<std::string>::iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
@@ -848,19 +850,19 @@ void ReplicationCoordinatorExternalStateImpl::_dropAllTempCollections(OperationC
}
void ReplicationCoordinatorExternalStateImpl::dropAllSnapshots() {
- if (auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager())
+ if (auto manager = _service->getGlobalStorageEngine()->getSnapshotManager())
manager->dropAllSnapshots();
}
void ReplicationCoordinatorExternalStateImpl::updateCommittedSnapshot(SnapshotName newCommitPoint) {
- auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager();
+ auto manager = _service->getGlobalStorageEngine()->getSnapshotManager();
invariant(manager); // This should never be called if there is no SnapshotManager.
manager->setCommittedSnapshot(newCommitPoint);
}
void ReplicationCoordinatorExternalStateImpl::createSnapshot(OperationContext* opCtx,
SnapshotName name) {
- auto manager = getGlobalServiceContext()->getGlobalStorageEngine()->getSnapshotManager();
+ auto manager = _service->getGlobalStorageEngine()->getSnapshotManager();
invariant(manager); // This should never be called if there is no SnapshotManager.
manager->createSnapshot(opCtx, name);
}