summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-11-11 17:44:51 -0500
committerBenety Goh <benety@mongodb.com>2016-11-15 06:26:07 -0500
commitb476c357ac3bee4134d8fca884258460b214ebd8 (patch)
tree11bfeecfe3f3e71a9d1f1607026750e15c08d3da
parentbed166fa1fe6070669fce5e7a9934f8c05ce562a (diff)
downloadmongo-b476c357ac3bee4134d8fca884258460b214ebd8.tar.gz
SERVER-25662 DataReplicator cleanup
removed _scheduleFetch_inlock() and _fetcherPaused renamed _state to _dataReplicatorState DataReplicator::_anyActiveHandles_inlock() should shutdown last oplog entry fetcher DataReplicator::shutdown() no longer requires an operation context removed unnecessary references to DataReplicatorState::Uninitialized removed unused sync source checking from doNextActions collapse overloaded versions of _scheduleApplyBatch merged _doNextActions_inlock() and _doNextActions_Steady_inlock()
-rw-r--r--src/mongo/db/repl/data_replicator.cpp184
-rw-r--r--src/mongo/db/repl/data_replicator.h18
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp4
4 files changed, 52 insertions, 156 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index e662691389b..7eb266c2b9c 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -247,7 +247,7 @@ DataReplicator::DataReplicator(
_opts(opts),
_dataReplicatorExternalState(std::move(dataReplicatorExternalState)),
_exec(_dataReplicatorExternalState->getTaskExecutor()),
- _state(DataReplicatorState::Uninitialized),
+ _dataReplicatorState(DataReplicatorState::Uninitialized),
_storage(storage) {
uassert(ErrorCodes::BadValue, "invalid storage interface", _storage);
uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime);
@@ -264,13 +264,18 @@ DataReplicator::~DataReplicator() {
});
}
-Status DataReplicator::shutdown(OperationContext* txn) {
- return _shutdown(txn);
+Status DataReplicator::shutdown() {
+ auto status = scheduleShutdown();
+ if (status.isOK()) {
+ log() << "Waiting for shutdown of DataReplicator.";
+ waitForShutdown();
+ }
+ return status;
}
DataReplicatorState DataReplicator::getState() const {
LockGuard lk(_mutex);
- return _state;
+ return _dataReplicatorState;
}
HostAndPort DataReplicator::getSyncSource() const {
@@ -298,15 +303,11 @@ std::string DataReplicator::getDiagnosticString() const {
str::stream out;
out << "DataReplicator -"
<< " opts: " << _opts.toString() << " oplogFetcher: " << _oplogFetcher->toString()
- << " opsBuffered: " << _oplogBuffer->getSize() << " state: " << toString(_state);
- switch (_state) {
- case DataReplicatorState::InitialSync:
- out << " opsAppied: " << _initialSyncState->appliedOps
- << " status: " << _initialSyncState->status.toString();
- break;
- case DataReplicatorState::Uninitialized:
- // TODO: add more here
- break;
+ << " opsBuffered: " << _oplogBuffer->getSize()
+ << " state: " << toString(_dataReplicatorState);
+ if (_initialSyncState) {
+ out << " opsAppied: " << _initialSyncState->appliedOps
+ << " status: " << _initialSyncState->status.toString();
}
return out;
@@ -495,7 +496,12 @@ Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn,
this,
stdx::placeholders::_1,
stdx::placeholders::_2));
- _scheduleFetch_inlock();
+ LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString();
+ auto oplogFetcherStartupStatus = _oplogFetcher->startup();
+ if (!oplogFetcherStartupStatus.isOK()) {
+ return oplogFetcherStartupStatus;
+ }
+
DatabasesCloner* cloner = _initialSyncState->dbsCloner.get();
if (_scheduleDbWorkFn) {
cloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn);
@@ -580,15 +586,9 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn,
return retStatus;
}
_stats.initialSyncStart = _exec->now();
- if (_state != DataReplicatorState::Uninitialized) {
- if (_state == DataReplicatorState::InitialSync)
- return {ErrorCodes::InitialSyncActive,
- (str::stream() << "Initial sync in progress; try resync to start anew.")};
- else {
- return {
- ErrorCodes::AlreadyInitialized,
- (str::stream() << "Cannot do initial sync in " << toString(_state) << " state.")};
- }
+ if (_dataReplicatorState == DataReplicatorState::InitialSync) {
+ return {ErrorCodes::InitialSyncActive,
+ (str::stream() << "Initial sync in progress; try resync to start anew.")};
}
LOG(1) << "Creating oplogBuffer.";
@@ -640,10 +640,12 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn,
if (attemptErrorStatus.isOK()) {
invariant(_syncSource.empty());
for (int i = 0; i < numInitialSyncConnectAttempts; ++i) {
- attemptErrorStatus = _ensureGoodSyncSource_inlock();
- if (attemptErrorStatus.isOK()) {
+ auto syncSource = _chooseSyncSource_inlock();
+ if (syncSource.isOK()) {
+ _syncSource = syncSource.getValue();
break;
}
+ attemptErrorStatus = syncSource.getStatus();
LOG(1) << "Error getting sync source: '" << attemptErrorStatus.toString()
<< "', trying again in " << _opts.syncSourceRetryWait << ". Attempt "
<< i + 1 << " of " << numInitialSyncConnectAttempts.load();
@@ -730,7 +732,6 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn,
lk.lock();
}
- _fetcherPaused = false;
_applierPaused = false;
_lastFetched = _lastApplied;
@@ -881,6 +882,9 @@ bool DataReplicator::_anyActiveHandles_inlock() const {
void DataReplicator::_cancelAllHandles_inlock() {
if (_oplogFetcher)
_oplogFetcher->shutdown();
+ if (_lastOplogEntryFetcher) {
+ _lastOplogEntryFetcher->shutdown();
+ }
if (_applier)
_applier->shutdown();
// No need to call shutdown() on _shuttingdownApplier. This applier is assigned when the most
@@ -922,7 +926,7 @@ void DataReplicator::_doNextActions_inlock() {
return;
}
- if (DataReplicatorState::Uninitialized == _state) {
+ if (DataReplicatorState::Uninitialized == _dataReplicatorState) {
return;
}
@@ -946,46 +950,7 @@ void DataReplicator::_doNextActions_inlock() {
invariant(_initialSyncState->status.isOK());
_setState_inlock(DataReplicatorState::Uninitialized);
_exec->signalEvent(_initialSyncState->finishEvent);
- } else {
- // Run steady state events to fetch/apply.
- _doNextActions_Steady_inlock();
- }
-}
-
-void DataReplicator::_doNextActions_Steady_inlock() {
- // Check sync source is still good.
- if (_syncSource.empty()) {
- _syncSource =
- _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp());
- }
- if (_syncSource.empty()) {
- log() << "_syncSource is empty so scheduling a retry in " << _opts.syncSourceRetryWait;
- // No sync source, reschedule check
- Date_t when = _exec->now() + _opts.syncSourceRetryWait;
- // schedule self-callback w/executor
- // to try to get a new sync source in a bit
- auto scheduleResult = _exec->scheduleWorkAt(when, [this](const CallbackArgs& cbData) {
- if (cbData.status == ErrorCodes::CallbackCanceled) {
- return;
- }
- _doNextActions();
- });
- if (!scheduleResult.isOK()) {
- severe() << "failed to schedule sync source refresh: " << scheduleResult.getStatus()
- << ". stopping data replicator";
- _setState_inlock(DataReplicatorState::Uninitialized);
- return;
- }
- } else if (!_fetcherPaused) {
- // Check if active fetch, if not start one
- if (!_oplogFetcher || !_oplogFetcher->isActive()) {
- const auto scheduleStatus = _scheduleFetch_inlock();
- if (!scheduleStatus.isOK() && scheduleStatus != ErrorCodes::ShutdownInProgress) {
- error() << "Error scheduling fetcher '" << scheduleStatus << "'.";
- _oplogFetcher.reset();
- _scheduleDoNextActions();
- }
- }
+ return;
}
// Check if no active apply and ops to apply
@@ -1100,7 +1065,7 @@ void DataReplicator::_onApplyBatchFinish(const Status& status,
_shuttingDownApplier = std::move(_applier);
if (!status.isOK()) {
- invariant(DataReplicatorState::InitialSync == _state);
+ invariant(DataReplicatorState::InitialSync == _dataReplicatorState);
error() << "Failed to apply batch due to '" << redact(status) << "'";
_initialSyncState->status = status;
_exec->signalEvent(_initialSyncState->finishEvent);
@@ -1171,11 +1136,6 @@ Status DataReplicator::_scheduleDoNextActions() {
return status.getStatus();
}
-Status DataReplicator::_scheduleApplyBatch() {
- LockGuard lk(_mutex);
- return _scheduleApplyBatch_inlock();
-}
-
Status DataReplicator::_scheduleApplyBatch_inlock() {
if (_applierPaused) {
return Status::OK();
@@ -1206,11 +1166,8 @@ Status DataReplicator::_scheduleApplyBatch_inlock() {
if (ops.empty()) {
return _scheduleDoNextActions();
}
- return _scheduleApplyBatch_inlock(ops);
-}
-Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) {
- invariant(_state == DataReplicatorState::InitialSync);
+ invariant(_dataReplicatorState == DataReplicatorState::InitialSync);
_fetchCount.store(0);
// "_syncSource" has to be copied to stdx::bind result.
HostAndPort source = _syncSource;
@@ -1246,65 +1203,21 @@ void DataReplicator::_setState(const DataReplicatorState& newState) {
}
void DataReplicator::_setState_inlock(const DataReplicatorState& newState) {
- _state = newState;
+ _dataReplicatorState = newState;
}
-Status DataReplicator::_ensureGoodSyncSource_inlock() {
- if (_syncSource.empty()) {
- _syncSource =
- _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp());
- if (!_syncSource.empty()) {
- return Status::OK();
- }
-
+StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() {
+ auto syncSource =
+ _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp());
+ if (syncSource.empty()) {
return Status{ErrorCodes::InvalidSyncSource,
str::stream() << "No valid sync source available. Our last fetched optime: "
<< _lastFetched.opTime.toString()};
}
- return Status::OK();
+ return syncSource;
}
-Status DataReplicator::_scheduleFetch_inlock() {
- if (!_oplogFetcher) {
- if (!_ensureGoodSyncSource_inlock().isOK()) {
- auto status = _scheduleDoNextActions();
- if (!status.isOK()) {
- return status;
- }
- }
-
- invariant(DataReplicatorState::InitialSync == _state);
-
- const auto remoteOplogNS = _opts.remoteOplogNS;
- _oplogFetcher = stdx::make_unique<OplogFetcher>(
- _exec,
- _lastFetched,
- _syncSource,
- remoteOplogNS,
- uassertStatusOK(_dataReplicatorExternalState->getCurrentConfig()),
- _opts.oplogFetcherMaxFetcherRestarts,
- _dataReplicatorExternalState.get(),
- stdx::bind(&DataReplicator::_enqueueDocuments,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- stdx::placeholders::_3),
- stdx::bind(&DataReplicator::_onOplogFetchFinish,
- this,
- stdx::placeholders::_1,
- stdx::placeholders::_2));
- }
- if (!_oplogFetcher->isActive()) {
- LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString();
- Status status = _oplogFetcher->startup();
- if (!status.isOK()) {
- return status;
- }
- }
- return Status::OK();
-}
-
-Status DataReplicator::scheduleShutdown(OperationContext* txn) {
+Status DataReplicator::scheduleShutdown() {
auto eventStatus = _exec->makeEvent();
if (!eventStatus.isOK()) {
return eventStatus.getStatus();
@@ -1315,7 +1228,7 @@ Status DataReplicator::scheduleShutdown(OperationContext* txn) {
invariant(!_onShutdown.isValid());
_inShutdown = true;
_onShutdown = eventStatus.getValue();
- if (DataReplicatorState::InitialSync == _state && _initialSyncState &&
+ if (DataReplicatorState::InitialSync == _dataReplicatorState && _initialSyncState &&
_initialSyncState->status.isOK()) {
_initialSyncState->status = {ErrorCodes::ShutdownInProgress,
"Shutdown issued for the operation."};
@@ -1338,15 +1251,6 @@ void DataReplicator::waitForShutdown() {
_exec->waitForEvent(onShutdown);
}
-Status DataReplicator::_shutdown(OperationContext* txn) {
- auto status = scheduleShutdown(txn);
- if (status.isOK()) {
- log() << "Waiting for shutdown of DataReplicator.";
- waitForShutdown();
- }
- return status;
-}
-
void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
const OplogFetcher::DocumentsInfo& info) {
@@ -1381,8 +1285,6 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
}
void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched) {
- _fetcherPaused = true;
-
log() << "Finished fetching oplog during initial sync: " << redact(status)
<< ". Last fetched optime and hash: " << lastFetched;
@@ -1398,7 +1300,7 @@ void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithH
}
if (!status.isOK()) {
- invariant(_state == DataReplicatorState::InitialSync);
+ invariant(_dataReplicatorState == DataReplicatorState::InitialSync);
// Do not change sync source, just log.
error() << "Error fetching oplog during initial sync: " << redact(status);
invariant(_initialSyncState);
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index d88a1d0ebd1..6316db8e3c1 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -190,12 +190,12 @@ public:
virtual ~DataReplicator();
// Shuts down replication if "start" has been called, and blocks until shutdown has completed.
- Status shutdown(OperationContext* txn);
+ Status shutdown();
/**
* Cancels outstanding work and begins shutting down.
*/
- Status scheduleShutdown(OperationContext* txn);
+ Status scheduleShutdown();
/**
* Waits for data replicator to finish shutting down.
@@ -249,8 +249,9 @@ private:
void _setState(const DataReplicatorState& newState);
void _setState_inlock(const DataReplicatorState& newState);
- // Returns OK when there is a good syncSource at _syncSource.
- Status _ensureGoodSyncSource_inlock();
+ // Obtains a valid sync source from the sync source selector.
+ // Returns error if a sync source cannot be found.
+ StatusWith<HostAndPort> _chooseSyncSource_inlock();
/**
* Pushes documents from oplog fetcher to blocking queue for
@@ -262,7 +263,6 @@ private:
void _onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched);
void _doNextActions();
void _doNextActions_inlock();
- void _doNextActions_Steady_inlock();
BSONObj _getInitialSyncProgress_inlock() const;
@@ -281,17 +281,12 @@ private:
void _scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback);
Status _scheduleDoNextActions();
- Status _scheduleApplyBatch();
Status _scheduleApplyBatch_inlock();
- Status _scheduleApplyBatch_inlock(const Operations& ops);
- Status _scheduleFetch_inlock();
void _cancelAllHandles_inlock();
void _waitOnAndResetAll_inlock(UniqueLock* lk);
bool _anyActiveHandles_inlock() const;
- Status _shutdown(OperationContext* txn);
-
// Counts how many documents have been refetched from the source in the current batch.
AtomicUInt32 _fetchCount;
@@ -310,10 +305,9 @@ private:
const DataReplicatorOptions _opts; // (R)
std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R)
executor::TaskExecutor* _exec; // (R)
- DataReplicatorState _state; // (MX)
+ DataReplicatorState _dataReplicatorState; // (MX)
std::unique_ptr<InitialSyncState> _initialSyncState; // (M)
StorageInterface* _storage; // (M)
- bool _fetcherPaused = false; // (X)
std::unique_ptr<OplogFetcher> _oplogFetcher; // (S)
std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S)
bool _applierPaused = false; // (X)
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index e1d463c799e..2aaeb5a6074 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -640,7 +640,7 @@ void InitialSyncTest::tearDown() {
TEST_F(InitialSyncTest, ShutdownImmediatelyAfterStartup) {
startSync(1);
auto txn = makeOpCtx();
- ASSERT_OK(getDR().shutdown(txn.get()));
+ ASSERT_OK(getDR().shutdown());
getExecutor().shutdown();
verifySync(getNet(), ErrorCodes::ShutdownInProgress);
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 27debb69593..5a5b0cf9fce 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -563,7 +563,7 @@ void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) {
if (drCopy && drCopy->getState() == DataReplicatorState::InitialSync) {
LOG(1)
<< "ReplicationCoordinatorImpl::_stopDataReplication calling DataReplicator::shutdown.";
- const auto status = drCopy->shutdown(txn);
+ const auto status = drCopy->shutdown();
if (!status.isOK()) {
warning() << "DataReplicator shutdown failed: " << status;
}
@@ -730,7 +730,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
// joining the replication executor is blocking so it must be run outside of the mutex
if (drCopy) {
LOG(1) << "ReplicationCoordinatorImpl::shutdown calling DataReplicator::shutdown.";
- const auto status = drCopy->shutdown(txn);
+ const auto status = drCopy->shutdown();
if (!status.isOK()) {
warning() << "DataReplicator shutdown failed: " << status;
}