summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@tart.local>2016-06-09 14:20:40 -0400
committerScott Hernandez <scotthernandez@tart.local>2016-06-17 11:53:01 -0400
commitc59f5ade57e41b6a50f40999ea14883da691e951 (patch)
treeb9bb033b488fd95a94263c3470834224c6240b06
parent8beb09ffc1e0fd5173bf255d1710a37806535da3 (diff)
downloadmongo-c59f5ade57e41b6a50f40999ea14883da691e951.tar.gz
SERVER-23750: run DataReplicator::initialSync on ReplCoodExt initial sync thread
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp11
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp9
6 files changed, 31 insertions, 2 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index c5795596df0..8307ac7b80b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -99,6 +99,8 @@ public:
*/
virtual void startSteadyStateReplication() = 0;
+ virtual void runOnInitialSyncThread(stdx::function<void(OperationContext* txn)> run) = 0;
+
/**
* Starts the Master/Slave threads and sets up logOp
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 7f0772fa54e..e101dc120e7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -149,6 +149,17 @@ void ReplicationCoordinatorExternalStateImpl::startInitialSync(OnInitialSyncFini
}});
}
+void ReplicationCoordinatorExternalStateImpl::runOnInitialSyncThread(
+ stdx::function<void(OperationContext* txn)> run) {
+ _initialSyncThread.reset(new stdx::thread{[run, this]() {
+ Client::initThreadIfNotAlready("initial sync");
+ auto txn = cc().makeOperationContext();
+ invariant(txn);
+ invariant(txn->getClient());
+ run(txn.get());
+ }});
+}
+
void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication() {
if (!_producerThread) {
log() << "Starting replication fetcher thread";
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 222e71983bc..8ab9d47f31e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -58,6 +58,8 @@ public:
virtual void startThreads(const ReplSettings& settings) override;
virtual void startInitialSync(OnInitialSyncFinishedFn finished) override;
virtual void startSteadyStateReplication() override;
+ virtual void runOnInitialSyncThread(stdx::function<void(OperationContext* txn)> run) override;
+
virtual bool isInitialSyncFlagSet(OperationContext* txn) override;
virtual void startMasterSlave(OperationContext* txn);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 218bbad70e6..f1f2770677e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -26,6 +26,8 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
#include "mongo/platform/basic.h"
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
@@ -37,6 +39,7 @@
#include "mongo/db/repl/oplog_buffer_blocking_queue.h"
#include "mongo/db/storage/snapshot_name.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/log.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/sequence_util.h"
@@ -58,6 +61,11 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock
ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {}
+void ReplicationCoordinatorExternalStateMock::runOnInitialSyncThread(
+ stdx::function<void(OperationContext* txn)> run) {
+ log() << "not running initial sync during test.";
+}
+
void ReplicationCoordinatorExternalStateMock::startThreads(const ReplSettings& settings) {
_threadsStarted = true;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 1bfb156ef9b..87583512504 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -59,6 +59,7 @@ public:
virtual void startThreads(const ReplSettings& settings) override;
virtual void startInitialSync(OnInitialSyncFinishedFn finished) override;
virtual void startSteadyStateReplication() override;
+ virtual void runOnInitialSyncThread(stdx::function<void(OperationContext* txn)> run) override;
virtual bool isInitialSyncFlagSet(OperationContext* txn) override;
virtual void startMasterSlave(OperationContext*);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a49a68ae0b7..0e498d2c77e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -499,8 +499,13 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* txn) {
// Do initial sync.
if (_externalState->shouldUseDataReplicatorInitialSync()) {
- // TODO: make this async with callback.
- _dr.initialSync(txn);
+ _externalState->runOnInitialSyncThread([this](OperationContext* txn) {
+ const auto status = _dr.initialSync(txn);
+ fassertStatusOK(40088, status);
+ _setMyLastAppliedOpTime_inlock({status.getValue(), -1}, false);
+ _externalState->startSteadyStateReplication();
+
+ });
} else {
_externalState->startInitialSync([this]() {
stdx::lock_guard<stdx::mutex> lk(_mutex);