diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 79 |
1 files changed, 40 insertions, 39 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index a0f646f6078..186903d0924 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -140,20 +140,20 @@ BackgroundSync::BackgroundSync( bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get()); } -void BackgroundSync::startup(OperationContext* txn) { - _oplogBuffer->startup(txn); +void BackgroundSync::startup(OperationContext* opCtx) { + _oplogBuffer->startup(opCtx); invariant(!_producerThread); _producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::_run, this))); } -void BackgroundSync::shutdown(OperationContext* txn) { +void BackgroundSync::shutdown(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); // Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but // ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is // waiting for an operation to be past the slaveDelay point. - clearBuffer(txn); + clearBuffer(opCtx); _state = ProducerState::Stopped; if (_syncSourceResolver) { @@ -167,9 +167,9 @@ void BackgroundSync::shutdown(OperationContext* txn) { _inShutdown = true; } -void BackgroundSync::join(OperationContext* txn) { +void BackgroundSync::join(OperationContext* opCtx) { _producerThread->join(); - _oplogBuffer->shutdown(txn); + _oplogBuffer->shutdown(opCtx); } bool BackgroundSync::inShutdown() const { @@ -225,15 +225,15 @@ void BackgroundSync::_runProducer() { } // we want to start when we're no longer primary // start() also loads _lastOpTimeFetched, which we know is set from the "if" - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); if (getState() == ProducerState::Starting) { - start(txn.get()); + start(opCtx.get()); } - _produce(txn.get()); + _produce(opCtx.get()); } -void BackgroundSync::_produce(OperationContext* txn) { +void BackgroundSync::_produce(OperationContext* opCtx) { if (MONGO_FAIL_POINT(stopReplProducer)) { // This log output is used in js tests so please leave it. log() << "bgsync - stopReplProducer fail point " @@ -271,7 +271,7 @@ void BackgroundSync::_produce(OperationContext* txn) { HostAndPort source; SyncSourceResolverResponse syncSourceResp; { - const OpTime minValidSaved = StorageInterface::get(txn)->getMinValid(txn); + const OpTime minValidSaved = StorageInterface::get(opCtx)->getMinValid(opCtx); stdx::lock_guard<stdx::mutex> lock(_mutex); const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime(); @@ -358,8 +358,9 @@ void BackgroundSync::_produce(OperationContext* txn) { // Set the applied point if unset. This is most likely the first time we've established a sync // source since stepping down or otherwise clearing the applied point. We need to set this here, // before the OplogWriter gets a chance to append to the oplog. - if (StorageInterface::get(txn)->getAppliedThrough(txn).isNull()) { - StorageInterface::get(txn)->setAppliedThrough(txn, _replCoord->getMyLastAppliedOpTime()); + if (StorageInterface::get(opCtx)->getAppliedThrough(opCtx).isNull()) { + StorageInterface::get(opCtx)->setAppliedThrough(opCtx, + _replCoord->getMyLastAppliedOpTime()); } // "lastFetched" not used. Already set in _enqueueDocuments. @@ -472,7 +473,7 @@ void BackgroundSync::_produce(OperationContext* txn) { } } - _rollback(txn, source, syncSourceResp.rbid, getConnection); + _rollback(opCtx, source, syncSourceResp.rbid, getConnection); // Reset the producer to clear the sync source and the last optime fetched. stop(true); startProducerIfStopped(); @@ -540,10 +541,10 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi return Status::OK(); // Nothing to do. } - auto txn = cc().makeOperationContext(); + auto opCtx = cc().makeOperationContext(); // Wait for enough space. - _oplogBuffer->waitForSpace(txn.get(), info.toApplyDocumentBytes); + _oplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes); { // Don't add more to the buffer if we are in shutdown. Continue holding the lock until we @@ -560,7 +561,7 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi } // Buffer docs for later application. - _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end); + _oplogBuffer->pushAllNonBlocking(opCtx.get(), begin, end); // Update last fetched info. _lastFetchedHash = info.lastDocument.value; @@ -585,8 +586,8 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi return Status::OK(); } -bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) { - return _oplogBuffer->peek(txn, op); +bool BackgroundSync::peek(OperationContext* opCtx, BSONObj* op) { + return _oplogBuffer->peek(opCtx, op); } void BackgroundSync::waitForMore() { @@ -594,11 +595,11 @@ void BackgroundSync::waitForMore() { _oplogBuffer->waitForData(Seconds(1)); } -void BackgroundSync::consume(OperationContext* txn) { +void BackgroundSync::consume(OperationContext* opCtx) { // this is just to get the op off the queue, it's been peeked at // and queued for application already BSONObj op; - if (_oplogBuffer->tryPop(txn, &op)) { + if (_oplogBuffer->tryPop(opCtx, &op)) { bufferCountGauge.decrement(1); bufferSizeGauge.decrement(getSize(op)); } else { @@ -609,7 +610,7 @@ void BackgroundSync::consume(OperationContext* txn) { } } -void BackgroundSync::_rollback(OperationContext* txn, +void BackgroundSync::_rollback(OperationContext* opCtx, const HostAndPort& source, boost::optional<int> requiredRBID, stdx::function<DBClientBase*()> getConnection) { @@ -635,7 +636,7 @@ void BackgroundSync::_rollback(OperationContext* txn, // then. { log() << "rollback 0"; - Lock::GlobalWrite globalWrite(txn->lockState()); + Lock::GlobalWrite globalWrite(opCtx->lockState()); if (!_replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) { log() << "Cannot transition from " << _replCoord->getMemberState().toString() << " to " << MemberState(MemberState::RS_ROLLBACK).toString(); @@ -644,8 +645,8 @@ void BackgroundSync::_rollback(OperationContext* txn, } try { - auto status = syncRollback(txn, - OplogInterfaceLocal(txn, rsOplogName), + auto status = syncRollback(opCtx, + OplogInterfaceLocal(opCtx, rsOplogName), RollbackSourceImpl(getConnection, source, rsOplogName), requiredRBID, _replCoord); @@ -668,7 +669,7 @@ void BackgroundSync::_rollback(OperationContext* txn, warning() << "rollback cannot complete at this time (retrying later): " << redact(ex) << " appliedThrough=" << _replCoord->getMyLastAppliedOpTime() - << " minvalid=" << StorageInterface::get(txn)->getMinValid(txn); + << " minvalid=" << StorageInterface::get(opCtx)->getMinValid(opCtx); // Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If // we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this @@ -684,12 +685,12 @@ void BackgroundSync::_rollback(OperationContext* txn, // so that if we wind up shutting down uncleanly in response to something we rolled back // we know that we won't wind up right back in the same situation when we start back up // because the rollback wasn't durable. - txn->recoveryUnit()->waitUntilDurable(); + opCtx->recoveryUnit()->waitUntilDurable(); // If we detected that we rolled back the shardIdentity document as part of this rollback // then we must shut down to clear the in-memory ShardingState associated with the // shardIdentity document. - if (ShardIdentityRollbackNotifier::get(txn)->didRollbackHappen()) { + if (ShardIdentityRollbackNotifier::get(opCtx)->didRollbackHappen()) { severe() << "shardIdentity document rollback detected. Shutting down to clear " "in-memory sharding state. Restarting this process should safely return it " "to a healthy state"; @@ -734,10 +735,10 @@ void BackgroundSync::stop(bool resetLastFetchedOptime) { } } -void BackgroundSync::start(OperationContext* txn) { +void BackgroundSync::start(OperationContext* opCtx) { OpTimeWithHash lastAppliedOpTimeWithHash; do { - lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(txn); + lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(opCtx); stdx::lock_guard<stdx::mutex> lk(_mutex); // Double check the state after acquiring the mutex. if (_state != ProducerState::Starting) { @@ -762,28 +763,28 @@ void BackgroundSync::start(OperationContext* txn) { LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; } -void BackgroundSync::clearBuffer(OperationContext* txn) { - _oplogBuffer->clear(txn); +void BackgroundSync::clearBuffer(OperationContext* opCtx) { + _oplogBuffer->clear(opCtx); const auto count = bufferCountGauge.get(); bufferCountGauge.decrement(count); const auto size = bufferSizeGauge.get(); bufferSizeGauge.decrement(size); } -OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* txn) { +OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* opCtx) { BSONObj oplogEntry; try { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry); + ScopedTransaction transaction(opCtx, MODE_IX); + Lock::DBLock lk(opCtx->lockState(), "local", MODE_X); + bool success = Helpers::getLast(opCtx, rsOplogName.c_str(), oplogEntry); if (!success) { // This can happen when we are to do an initial sync. lastHash will be set // after the initial sync is complete. return OpTimeWithHash(0); } } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "readLastAppliedHash", rsOplogName); } catch (const DBException& ex) { severe() << "Problem reading " << rsOplogName << ": " << redact(ex); fassertFailed(18904); @@ -817,8 +818,8 @@ bool BackgroundSync::shouldStopFetching() const { return false; } -void BackgroundSync::pushTestOpToBuffer(OperationContext* txn, const BSONObj& op) { - _oplogBuffer->push(txn, op); +void BackgroundSync::pushTestOpToBuffer(OperationContext* opCtx, const BSONObj& op) { + _oplogBuffer->push(opCtx, op); bufferCountGauge.increment(); bufferSizeGauge.increment(op.objsize()); } |