diff options
Diffstat (limited to 'src/mongo/db/repl/initial_syncer.cpp')
-rw-r--r-- | src/mongo/db/repl/initial_syncer.cpp | 58 |
1 files changed, 28 insertions, 30 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index d3f03d1276c..8f19951e265 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -116,8 +116,8 @@ using Event = executor::TaskExecutor::EventHandle; using Handle = executor::TaskExecutor::CallbackHandle; using Operations = MultiApplier::Operations; using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>; -using UniqueLock = stdx::unique_lock<stdx::mutex>; -using LockGuard = stdx::lock_guard<stdx::mutex>; +using UniqueLock = stdx::unique_lock<Latch>; +using LockGuard = stdx::lock_guard<Latch>; // Used to reset the oldest timestamp during initial sync to a non-null timestamp. const Timestamp kTimestampOne(0, 1); @@ -197,7 +197,7 @@ InitialSyncer::~InitialSyncer() { } bool InitialSyncer::isActive() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _isActive_inlock(); } @@ -210,7 +210,7 @@ Status InitialSyncer::startup(OperationContext* opCtx, invariant(opCtx); invariant(initialSyncMaxAttempts >= 1U); - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); switch (_state) { case State::kPreStart: _state = State::kRunning; @@ -243,7 +243,7 @@ Status InitialSyncer::startup(OperationContext* opCtx, } Status InitialSyncer::shutdown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); switch (_state) { case State::kPreStart: // Transition directly from PreStart to Complete if not started yet. @@ -281,22 +281,22 @@ void InitialSyncer::_cancelRemainingWork_inlock() { } void InitialSyncer::join() { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _stateCondition.wait(lk, [this]() { return !_isActive_inlock(); }); } InitialSyncer::State InitialSyncer::getState_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _state; } Date_t InitialSyncer::getWallClockTime_forTest() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); return _lastApplied.wallTime; } bool InitialSyncer::_isShuttingDown() const { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); return _isShuttingDown_inlock(); } @@ -468,7 +468,7 @@ void InitialSyncer::_startInitialSyncAttemptCallback( // Lock guard must be declared after completion guard because completion guard destructor // has to run outside lock. - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _oplogApplier = {}; @@ -522,7 +522,7 @@ void InitialSyncer::_chooseSyncSourceCallback( std::uint32_t chooseSyncSourceAttempt, std::uint32_t chooseSyncSourceMaxAttempts, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); // Cancellation should be treated the same as other errors. In this case, the most likely cause // of a failed _chooseSyncSourceCallback() task is a cancellation triggered by // InitialSyncer::shutdown() or the task executor shutting down. @@ -678,7 +678,7 @@ Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock( void InitialSyncer::_rollbackCheckerResetCallback( const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(), "error while getting base rollback ID"); if (!status.isOK()) { @@ -696,7 +696,7 @@ void InitialSyncer::_rollbackCheckerResetCallback( void InitialSyncer::_getBeginFetchingOpTimeCallback( const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting oldest active transaction timestamp for begin fetching timestamp"); @@ -746,7 +746,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp( const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard, OpTime& beginFetchingOpTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting last oplog entry for begin timestamp"); if (!status.isOK()) { @@ -803,7 +803,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> std::shared_ptr<OnCompletionGuard> onCompletionGuard, const OpTime& lastOpTime, OpTime& beginFetchingOpTime) { - stdx::unique_lock<stdx::mutex> lock(_mutex); + stdx::unique_lock<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting the remote feature compatibility version"); if (!status.isOK()) { @@ -983,7 +983,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); log() << "Finished fetching oplog during initial sync: " << redact(oplogFetcherFinishStatus) << ". Last fetched optime: " << _lastFetched.toString(); @@ -1030,7 +1030,7 @@ void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishS } } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(databaseClonerFinishStatus, "error cloning databases"); if (!status.isOK()) { @@ -1055,7 +1055,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( std::shared_ptr<OnCompletionGuard> onCompletionGuard) { OpTimeAndWallTime resultOpTimeAndWallTime = {OpTime(), Date_t()}; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error fetching last oplog entry for stop timestamp"); if (!status.isOK()) { @@ -1102,7 +1102,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( TimestampedBSONObj{oplogSeedDoc, resultOpTimeAndWallTime.opTime.getTimestamp()}, resultOpTimeAndWallTime.opTime.getTerm()); if (!status.isOK()) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; } @@ -1111,7 +1111,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( opCtx.get(), resultOpTimeAndWallTime.opTime.getTimestamp(), orderedCommit); } - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); _lastApplied = resultOpTimeAndWallTime; log() << "No need to apply operations. (currently at " << _initialSyncState->stopTimestamp.toBSON() << ")"; @@ -1123,7 +1123,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( void InitialSyncer::_getNextApplierBatchCallback( const executor::TaskExecutor::CallbackArgs& callbackArgs, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(callbackArgs, "error getting next applier batch"); if (!status.isOK()) { @@ -1223,7 +1223,7 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, OpTimeAndWallTime lastApplied, std::uint32_t numApplied, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(multiApplierStatus, "error applying batch"); @@ -1260,7 +1260,7 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, void InitialSyncer::_rollbackCheckerCheckForRollbackCallback( const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(), "error while getting last rollback ID"); if (!status.isOK()) { @@ -1311,7 +1311,7 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime log() << "Initial sync attempt finishing up."; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); log() << "Initial Sync Attempt Statistics: " << redact(_getInitialSyncProgress_inlock()); auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0; @@ -1384,7 +1384,7 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) { // before we transition the state to Complete. decltype(_onCompletion) onCompletion; { - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); auto opCtx = makeOpCtx(); _tearDown_inlock(opCtx.get(), lastApplied); @@ -1414,7 +1414,7 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) { // before InitialSyncer::join() returns. onCompletion = {}; - stdx::lock_guard<stdx::mutex> lock(_mutex); + stdx::lock_guard<Latch> lock(_mutex); invariant(_state != State::kComplete); _state = State::kComplete; _stateCondition.notify_all(); @@ -1450,8 +1450,7 @@ Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn } void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( - const stdx::lock_guard<stdx::mutex>& lock, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { // We should check our current state because shutdown() could have been called before // we re-acquired the lock. if (_isShuttingDown_inlock()) { @@ -1506,8 +1505,7 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( } void InitialSyncer::_scheduleRollbackCheckerCheckForRollback_inlock( - const stdx::lock_guard<stdx::mutex>& lock, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { + const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { // We should check our current state because shutdown() could have been called before // we re-acquired the lock. if (_isShuttingDown_inlock()) { |