diff options
author | Benety Goh <benety@mongodb.com> | 2016-11-11 17:44:51 -0500 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2016-11-15 06:26:07 -0500 |
commit | b476c357ac3bee4134d8fca884258460b214ebd8 (patch) | |
tree | 11bfeecfe3f3e71a9d1f1607026750e15c08d3da | |
parent | bed166fa1fe6070669fce5e7a9934f8c05ce562a (diff) | |
download | mongo-b476c357ac3bee4134d8fca884258460b214ebd8.tar.gz |
SERVER-25662 DataReplicator cleanup
removed _scheduleFetch_inlock() and _fetcherPaused
renamed _state to _dataReplicatorState
DataReplicator::_anyActiveHandles_inlock() should shutdown last oplog entry fetcher
DataReplicator::shutdown() no longer requires an operation context
removed unnecessary references to DataReplicatorState::Uninitialized
removed unused sync source checking from doNextActions
collapse overloaded versions of _scheduleApplyBatch
merged _doNextActions_inlock() and _doNextActions_Steady_inlock()
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 184 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 18 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 4 |
4 files changed, 52 insertions, 156 deletions
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index e662691389b..7eb266c2b9c 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -247,7 +247,7 @@ DataReplicator::DataReplicator( _opts(opts), _dataReplicatorExternalState(std::move(dataReplicatorExternalState)), _exec(_dataReplicatorExternalState->getTaskExecutor()), - _state(DataReplicatorState::Uninitialized), + _dataReplicatorState(DataReplicatorState::Uninitialized), _storage(storage) { uassert(ErrorCodes::BadValue, "invalid storage interface", _storage); uassert(ErrorCodes::BadValue, "invalid getMyLastOptime function", _opts.getMyLastOptime); @@ -264,13 +264,18 @@ DataReplicator::~DataReplicator() { }); } -Status DataReplicator::shutdown(OperationContext* txn) { - return _shutdown(txn); +Status DataReplicator::shutdown() { + auto status = scheduleShutdown(); + if (status.isOK()) { + log() << "Waiting for shutdown of DataReplicator."; + waitForShutdown(); + } + return status; } DataReplicatorState DataReplicator::getState() const { LockGuard lk(_mutex); - return _state; + return _dataReplicatorState; } HostAndPort DataReplicator::getSyncSource() const { @@ -298,15 +303,11 @@ std::string DataReplicator::getDiagnosticString() const { str::stream out; out << "DataReplicator -" << " opts: " << _opts.toString() << " oplogFetcher: " << _oplogFetcher->toString() - << " opsBuffered: " << _oplogBuffer->getSize() << " state: " << toString(_state); - switch (_state) { - case DataReplicatorState::InitialSync: - out << " opsAppied: " << _initialSyncState->appliedOps - << " status: " << _initialSyncState->status.toString(); - break; - case DataReplicatorState::Uninitialized: - // TODO: add more here - break; + << " opsBuffered: " << _oplogBuffer->getSize() + << " state: " << toString(_dataReplicatorState); + if (_initialSyncState) { + out << " opsAppied: " << _initialSyncState->appliedOps + << " status: " << _initialSyncState->status.toString(); } return out; @@ -495,7 +496,12 @@ Status DataReplicator::_runInitialSyncAttempt_inlock(OperationContext* txn, this, stdx::placeholders::_1, stdx::placeholders::_2)); - _scheduleFetch_inlock(); + LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString(); + auto oplogFetcherStartupStatus = _oplogFetcher->startup(); + if (!oplogFetcherStartupStatus.isOK()) { + return oplogFetcherStartupStatus; + } + DatabasesCloner* cloner = _initialSyncState->dbsCloner.get(); if (_scheduleDbWorkFn) { cloner->setScheduleDbWorkFn_forTest(_scheduleDbWorkFn); @@ -580,15 +586,9 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn, return retStatus; } _stats.initialSyncStart = _exec->now(); - if (_state != DataReplicatorState::Uninitialized) { - if (_state == DataReplicatorState::InitialSync) - return {ErrorCodes::InitialSyncActive, - (str::stream() << "Initial sync in progress; try resync to start anew.")}; - else { - return { - ErrorCodes::AlreadyInitialized, - (str::stream() << "Cannot do initial sync in " << toString(_state) << " state.")}; - } + if (_dataReplicatorState == DataReplicatorState::InitialSync) { + return {ErrorCodes::InitialSyncActive, + (str::stream() << "Initial sync in progress; try resync to start anew.")}; } LOG(1) << "Creating oplogBuffer."; @@ -640,10 +640,12 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn, if (attemptErrorStatus.isOK()) { invariant(_syncSource.empty()); for (int i = 0; i < numInitialSyncConnectAttempts; ++i) { - attemptErrorStatus = _ensureGoodSyncSource_inlock(); - if (attemptErrorStatus.isOK()) { + auto syncSource = _chooseSyncSource_inlock(); + if (syncSource.isOK()) { + _syncSource = syncSource.getValue(); break; } + attemptErrorStatus = syncSource.getStatus(); LOG(1) << "Error getting sync source: '" << attemptErrorStatus.toString() << "', trying again in " << _opts.syncSourceRetryWait << ". Attempt " << i + 1 << " of " << numInitialSyncConnectAttempts.load(); @@ -730,7 +732,6 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn, lk.lock(); } - _fetcherPaused = false; _applierPaused = false; _lastFetched = _lastApplied; @@ -881,6 +882,9 @@ bool DataReplicator::_anyActiveHandles_inlock() const { void DataReplicator::_cancelAllHandles_inlock() { if (_oplogFetcher) _oplogFetcher->shutdown(); + if (_lastOplogEntryFetcher) { + _lastOplogEntryFetcher->shutdown(); + } if (_applier) _applier->shutdown(); // No need to call shutdown() on _shuttingdownApplier. This applier is assigned when the most @@ -922,7 +926,7 @@ void DataReplicator::_doNextActions_inlock() { return; } - if (DataReplicatorState::Uninitialized == _state) { + if (DataReplicatorState::Uninitialized == _dataReplicatorState) { return; } @@ -946,46 +950,7 @@ void DataReplicator::_doNextActions_inlock() { invariant(_initialSyncState->status.isOK()); _setState_inlock(DataReplicatorState::Uninitialized); _exec->signalEvent(_initialSyncState->finishEvent); - } else { - // Run steady state events to fetch/apply. - _doNextActions_Steady_inlock(); - } -} - -void DataReplicator::_doNextActions_Steady_inlock() { - // Check sync source is still good. - if (_syncSource.empty()) { - _syncSource = - _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp()); - } - if (_syncSource.empty()) { - log() << "_syncSource is empty so scheduling a retry in " << _opts.syncSourceRetryWait; - // No sync source, reschedule check - Date_t when = _exec->now() + _opts.syncSourceRetryWait; - // schedule self-callback w/executor - // to try to get a new sync source in a bit - auto scheduleResult = _exec->scheduleWorkAt(when, [this](const CallbackArgs& cbData) { - if (cbData.status == ErrorCodes::CallbackCanceled) { - return; - } - _doNextActions(); - }); - if (!scheduleResult.isOK()) { - severe() << "failed to schedule sync source refresh: " << scheduleResult.getStatus() - << ". stopping data replicator"; - _setState_inlock(DataReplicatorState::Uninitialized); - return; - } - } else if (!_fetcherPaused) { - // Check if active fetch, if not start one - if (!_oplogFetcher || !_oplogFetcher->isActive()) { - const auto scheduleStatus = _scheduleFetch_inlock(); - if (!scheduleStatus.isOK() && scheduleStatus != ErrorCodes::ShutdownInProgress) { - error() << "Error scheduling fetcher '" << scheduleStatus << "'."; - _oplogFetcher.reset(); - _scheduleDoNextActions(); - } - } + return; } // Check if no active apply and ops to apply @@ -1100,7 +1065,7 @@ void DataReplicator::_onApplyBatchFinish(const Status& status, _shuttingDownApplier = std::move(_applier); if (!status.isOK()) { - invariant(DataReplicatorState::InitialSync == _state); + invariant(DataReplicatorState::InitialSync == _dataReplicatorState); error() << "Failed to apply batch due to '" << redact(status) << "'"; _initialSyncState->status = status; _exec->signalEvent(_initialSyncState->finishEvent); @@ -1171,11 +1136,6 @@ Status DataReplicator::_scheduleDoNextActions() { return status.getStatus(); } -Status DataReplicator::_scheduleApplyBatch() { - LockGuard lk(_mutex); - return _scheduleApplyBatch_inlock(); -} - Status DataReplicator::_scheduleApplyBatch_inlock() { if (_applierPaused) { return Status::OK(); @@ -1206,11 +1166,8 @@ Status DataReplicator::_scheduleApplyBatch_inlock() { if (ops.empty()) { return _scheduleDoNextActions(); } - return _scheduleApplyBatch_inlock(ops); -} -Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) { - invariant(_state == DataReplicatorState::InitialSync); + invariant(_dataReplicatorState == DataReplicatorState::InitialSync); _fetchCount.store(0); // "_syncSource" has to be copied to stdx::bind result. HostAndPort source = _syncSource; @@ -1246,65 +1203,21 @@ void DataReplicator::_setState(const DataReplicatorState& newState) { } void DataReplicator::_setState_inlock(const DataReplicatorState& newState) { - _state = newState; + _dataReplicatorState = newState; } -Status DataReplicator::_ensureGoodSyncSource_inlock() { - if (_syncSource.empty()) { - _syncSource = - _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp()); - if (!_syncSource.empty()) { - return Status::OK(); - } - +StatusWith<HostAndPort> DataReplicator::_chooseSyncSource_inlock() { + auto syncSource = + _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime.getTimestamp()); + if (syncSource.empty()) { return Status{ErrorCodes::InvalidSyncSource, str::stream() << "No valid sync source available. Our last fetched optime: " << _lastFetched.opTime.toString()}; } - return Status::OK(); + return syncSource; } -Status DataReplicator::_scheduleFetch_inlock() { - if (!_oplogFetcher) { - if (!_ensureGoodSyncSource_inlock().isOK()) { - auto status = _scheduleDoNextActions(); - if (!status.isOK()) { - return status; - } - } - - invariant(DataReplicatorState::InitialSync == _state); - - const auto remoteOplogNS = _opts.remoteOplogNS; - _oplogFetcher = stdx::make_unique<OplogFetcher>( - _exec, - _lastFetched, - _syncSource, - remoteOplogNS, - uassertStatusOK(_dataReplicatorExternalState->getCurrentConfig()), - _opts.oplogFetcherMaxFetcherRestarts, - _dataReplicatorExternalState.get(), - stdx::bind(&DataReplicator::_enqueueDocuments, - this, - stdx::placeholders::_1, - stdx::placeholders::_2, - stdx::placeholders::_3), - stdx::bind(&DataReplicator::_onOplogFetchFinish, - this, - stdx::placeholders::_1, - stdx::placeholders::_2)); - } - if (!_oplogFetcher->isActive()) { - LOG(2) << "Starting OplogFetcher: " << _oplogFetcher->toString(); - Status status = _oplogFetcher->startup(); - if (!status.isOK()) { - return status; - } - } - return Status::OK(); -} - -Status DataReplicator::scheduleShutdown(OperationContext* txn) { +Status DataReplicator::scheduleShutdown() { auto eventStatus = _exec->makeEvent(); if (!eventStatus.isOK()) { return eventStatus.getStatus(); @@ -1315,7 +1228,7 @@ Status DataReplicator::scheduleShutdown(OperationContext* txn) { invariant(!_onShutdown.isValid()); _inShutdown = true; _onShutdown = eventStatus.getValue(); - if (DataReplicatorState::InitialSync == _state && _initialSyncState && + if (DataReplicatorState::InitialSync == _dataReplicatorState && _initialSyncState && _initialSyncState->status.isOK()) { _initialSyncState->status = {ErrorCodes::ShutdownInProgress, "Shutdown issued for the operation."}; @@ -1338,15 +1251,6 @@ void DataReplicator::waitForShutdown() { _exec->waitForEvent(onShutdown); } -Status DataReplicator::_shutdown(OperationContext* txn) { - auto status = scheduleShutdown(txn); - if (status.isOK()) { - log() << "Waiting for shutdown of DataReplicator."; - waitForShutdown(); - } - return status; -} - void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, const OplogFetcher::DocumentsInfo& info) { @@ -1381,8 +1285,6 @@ void DataReplicator::_enqueueDocuments(Fetcher::Documents::const_iterator begin, } void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched) { - _fetcherPaused = true; - log() << "Finished fetching oplog during initial sync: " << redact(status) << ". Last fetched optime and hash: " << lastFetched; @@ -1398,7 +1300,7 @@ void DataReplicator::_onOplogFetchFinish(const Status& status, const OpTimeWithH } if (!status.isOK()) { - invariant(_state == DataReplicatorState::InitialSync); + invariant(_dataReplicatorState == DataReplicatorState::InitialSync); // Do not change sync source, just log. error() << "Error fetching oplog during initial sync: " << redact(status); invariant(_initialSyncState); diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index d88a1d0ebd1..6316db8e3c1 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -190,12 +190,12 @@ public: virtual ~DataReplicator(); // Shuts down replication if "start" has been called, and blocks until shutdown has completed. - Status shutdown(OperationContext* txn); + Status shutdown(); /** * Cancels outstanding work and begins shutting down. */ - Status scheduleShutdown(OperationContext* txn); + Status scheduleShutdown(); /** * Waits for data replicator to finish shutting down. @@ -249,8 +249,9 @@ private: void _setState(const DataReplicatorState& newState); void _setState_inlock(const DataReplicatorState& newState); - // Returns OK when there is a good syncSource at _syncSource. - Status _ensureGoodSyncSource_inlock(); + // Obtains a valid sync source from the sync source selector. + // Returns error if a sync source cannot be found. + StatusWith<HostAndPort> _chooseSyncSource_inlock(); /** * Pushes documents from oplog fetcher to blocking queue for @@ -262,7 +263,6 @@ private: void _onOplogFetchFinish(const Status& status, const OpTimeWithHash& lastFetched); void _doNextActions(); void _doNextActions_inlock(); - void _doNextActions_Steady_inlock(); BSONObj _getInitialSyncProgress_inlock() const; @@ -281,17 +281,12 @@ private: void _scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn callback); Status _scheduleDoNextActions(); - Status _scheduleApplyBatch(); Status _scheduleApplyBatch_inlock(); - Status _scheduleApplyBatch_inlock(const Operations& ops); - Status _scheduleFetch_inlock(); void _cancelAllHandles_inlock(); void _waitOnAndResetAll_inlock(UniqueLock* lk); bool _anyActiveHandles_inlock() const; - Status _shutdown(OperationContext* txn); - // Counts how many documents have been refetched from the source in the current batch. AtomicUInt32 _fetchCount; @@ -310,10 +305,9 @@ private: const DataReplicatorOptions _opts; // (R) std::unique_ptr<DataReplicatorExternalState> _dataReplicatorExternalState; // (R) executor::TaskExecutor* _exec; // (R) - DataReplicatorState _state; // (MX) + DataReplicatorState _dataReplicatorState; // (MX) std::unique_ptr<InitialSyncState> _initialSyncState; // (M) StorageInterface* _storage; // (M) - bool _fetcherPaused = false; // (X) std::unique_ptr<OplogFetcher> _oplogFetcher; // (S) std::unique_ptr<Fetcher> _lastOplogEntryFetcher; // (S) bool _applierPaused = false; // (X) diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index e1d463c799e..2aaeb5a6074 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -640,7 +640,7 @@ void InitialSyncTest::tearDown() { TEST_F(InitialSyncTest, ShutdownImmediatelyAfterStartup) { startSync(1); auto txn = makeOpCtx(); - ASSERT_OK(getDR().shutdown(txn.get())); + ASSERT_OK(getDR().shutdown()); getExecutor().shutdown(); verifySync(getNet(), ErrorCodes::ShutdownInProgress); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 27debb69593..5a5b0cf9fce 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -563,7 +563,7 @@ void ReplicationCoordinatorImpl::_stopDataReplication(OperationContext* txn) { if (drCopy && drCopy->getState() == DataReplicatorState::InitialSync) { LOG(1) << "ReplicationCoordinatorImpl::_stopDataReplication calling DataReplicator::shutdown."; - const auto status = drCopy->shutdown(txn); + const auto status = drCopy->shutdown(); if (!status.isOK()) { warning() << "DataReplicator shutdown failed: " << status; } @@ -730,7 +730,7 @@ void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) { // joining the replication executor is blocking so it must be run outside of the mutex if (drCopy) { LOG(1) << "ReplicationCoordinatorImpl::shutdown calling DataReplicator::shutdown."; - const auto status = drCopy->shutdown(txn); + const auto status = drCopy->shutdown(); if (!status.isOK()) { warning() << "DataReplicator shutdown failed: " << status; } |