summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/initial_syncer.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/initial_syncer.cpp')
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp58
1 files changed, 28 insertions, 30 deletions
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index d3f03d1276c..8f19951e265 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -116,8 +116,8 @@ using Event = executor::TaskExecutor::EventHandle;
using Handle = executor::TaskExecutor::CallbackHandle;
using Operations = MultiApplier::Operations;
using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>;
-using UniqueLock = stdx::unique_lock<stdx::mutex>;
-using LockGuard = stdx::lock_guard<stdx::mutex>;
+using UniqueLock = stdx::unique_lock<Latch>;
+using LockGuard = stdx::lock_guard<Latch>;
// Used to reset the oldest timestamp during initial sync to a non-null timestamp.
const Timestamp kTimestampOne(0, 1);
@@ -197,7 +197,7 @@ InitialSyncer::~InitialSyncer() {
}
bool InitialSyncer::isActive() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _isActive_inlock();
}
@@ -210,7 +210,7 @@ Status InitialSyncer::startup(OperationContext* opCtx,
invariant(opCtx);
invariant(initialSyncMaxAttempts >= 1U);
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
switch (_state) {
case State::kPreStart:
_state = State::kRunning;
@@ -243,7 +243,7 @@ Status InitialSyncer::startup(OperationContext* opCtx,
}
Status InitialSyncer::shutdown() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
switch (_state) {
case State::kPreStart:
// Transition directly from PreStart to Complete if not started yet.
@@ -281,22 +281,22 @@ void InitialSyncer::_cancelRemainingWork_inlock() {
}
void InitialSyncer::join() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_stateCondition.wait(lk, [this]() { return !_isActive_inlock(); });
}
InitialSyncer::State InitialSyncer::getState_forTest() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _state;
}
Date_t InitialSyncer::getWallClockTime_forTest() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
return _lastApplied.wallTime;
}
bool InitialSyncer::_isShuttingDown() const {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
return _isShuttingDown_inlock();
}
@@ -468,7 +468,7 @@ void InitialSyncer::_startInitialSyncAttemptCallback(
// Lock guard must be declared after completion guard because completion guard destructor
// has to run outside lock.
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_oplogApplier = {};
@@ -522,7 +522,7 @@ void InitialSyncer::_chooseSyncSourceCallback(
std::uint32_t chooseSyncSourceAttempt,
std::uint32_t chooseSyncSourceMaxAttempts,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
// Cancellation should be treated the same as other errors. In this case, the most likely cause
// of a failed _chooseSyncSourceCallback() task is a cancellation triggered by
// InitialSyncer::shutdown() or the task executor shutting down.
@@ -678,7 +678,7 @@ Status InitialSyncer::_scheduleGetBeginFetchingOpTime_inlock(
void InitialSyncer::_rollbackCheckerResetCallback(
const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(),
"error while getting base rollback ID");
if (!status.isOK()) {
@@ -696,7 +696,7 @@ void InitialSyncer::_rollbackCheckerResetCallback(
void InitialSyncer::_getBeginFetchingOpTimeCallback(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
result.getStatus(),
"error while getting oldest active transaction timestamp for begin fetching timestamp");
@@ -746,7 +746,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginApplyingTimestamp(
const StatusWith<Fetcher::QueryResponse>& result,
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
OpTime& beginFetchingOpTime) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
result.getStatus(), "error while getting last oplog entry for begin timestamp");
if (!status.isOK()) {
@@ -803,7 +803,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
std::shared_ptr<OnCompletionGuard> onCompletionGuard,
const OpTime& lastOpTime,
OpTime& beginFetchingOpTime) {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
+ stdx::unique_lock<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
result.getStatus(), "error while getting the remote feature compatibility version");
if (!status.isOK()) {
@@ -983,7 +983,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
log() << "Finished fetching oplog during initial sync: " << redact(oplogFetcherFinishStatus)
<< ". Last fetched optime: " << _lastFetched.toString();
@@ -1030,7 +1030,7 @@ void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishS
}
}
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(databaseClonerFinishStatus,
"error cloning databases");
if (!status.isOK()) {
@@ -1055,7 +1055,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
OpTimeAndWallTime resultOpTimeAndWallTime = {OpTime(), Date_t()};
{
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(
result.getStatus(), "error fetching last oplog entry for stop timestamp");
if (!status.isOK()) {
@@ -1102,7 +1102,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
TimestampedBSONObj{oplogSeedDoc, resultOpTimeAndWallTime.opTime.getTimestamp()},
resultOpTimeAndWallTime.opTime.getTerm());
if (!status.isOK()) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
@@ -1111,7 +1111,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
opCtx.get(), resultOpTimeAndWallTime.opTime.getTimestamp(), orderedCommit);
}
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
_lastApplied = resultOpTimeAndWallTime;
log() << "No need to apply operations. (currently at "
<< _initialSyncState->stopTimestamp.toBSON() << ")";
@@ -1123,7 +1123,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
void InitialSyncer::_getNextApplierBatchCallback(
const executor::TaskExecutor::CallbackArgs& callbackArgs,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto status =
_checkForShutdownAndConvertStatus_inlock(callbackArgs, "error getting next applier batch");
if (!status.isOK()) {
@@ -1223,7 +1223,7 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus,
OpTimeAndWallTime lastApplied,
std::uint32_t numApplied,
std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto status =
_checkForShutdownAndConvertStatus_inlock(multiApplierStatus, "error applying batch");
@@ -1260,7 +1260,7 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus,
void InitialSyncer::_rollbackCheckerCheckForRollbackCallback(
const RollbackChecker::Result& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto status = _checkForShutdownAndConvertStatus_inlock(result.getStatus(),
"error while getting last rollback ID");
if (!status.isOK()) {
@@ -1311,7 +1311,7 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeAndWallTime
log() << "Initial sync attempt finishing up.";
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
log() << "Initial Sync Attempt Statistics: " << redact(_getInitialSyncProgress_inlock());
auto runTime = _initialSyncState ? _initialSyncState->timer.millis() : 0;
@@ -1384,7 +1384,7 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) {
// before we transition the state to Complete.
decltype(_onCompletion) onCompletion;
{
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
auto opCtx = makeOpCtx();
_tearDown_inlock(opCtx.get(), lastApplied);
@@ -1414,7 +1414,7 @@ void InitialSyncer::_finishCallback(StatusWith<OpTimeAndWallTime> lastApplied) {
// before InitialSyncer::join() returns.
onCompletion = {};
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ stdx::lock_guard<Latch> lock(_mutex);
invariant(_state != State::kComplete);
_state = State::kComplete;
_stateCondition.notify_all();
@@ -1450,8 +1450,7 @@ Status InitialSyncer::_scheduleLastOplogEntryFetcher_inlock(Fetcher::CallbackFn
}
void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
- const stdx::lock_guard<stdx::mutex>& lock,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
// We should check our current state because shutdown() could have been called before
// we re-acquired the lock.
if (_isShuttingDown_inlock()) {
@@ -1506,8 +1505,7 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(
}
void InitialSyncer::_scheduleRollbackCheckerCheckForRollback_inlock(
- const stdx::lock_guard<stdx::mutex>& lock,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
+ const stdx::lock_guard<Latch>& lock, std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
// We should check our current state because shutdown() could have been called before
// we re-acquired the lock.
if (_isShuttingDown_inlock()) {