summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp79
1 files changed, 40 insertions, 39 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index a0f646f6078..186903d0924 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -140,20 +140,20 @@ BackgroundSync::BackgroundSync(
bufferMaxSizeGauge.increment(_oplogBuffer->getMaxSize() - bufferMaxSizeGauge.get());
}
-void BackgroundSync::startup(OperationContext* txn) {
- _oplogBuffer->startup(txn);
+void BackgroundSync::startup(OperationContext* opCtx) {
+ _oplogBuffer->startup(opCtx);
invariant(!_producerThread);
_producerThread.reset(new stdx::thread(stdx::bind(&BackgroundSync::_run, this)));
}
-void BackgroundSync::shutdown(OperationContext* txn) {
+void BackgroundSync::shutdown(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
// Clear the buffer. This unblocks the OplogFetcher if it is blocked with a full queue, but
// ensures that it won't add anything. It will also unblock the OpApplier pipeline if it is
// waiting for an operation to be past the slaveDelay point.
- clearBuffer(txn);
+ clearBuffer(opCtx);
_state = ProducerState::Stopped;
if (_syncSourceResolver) {
@@ -167,9 +167,9 @@ void BackgroundSync::shutdown(OperationContext* txn) {
_inShutdown = true;
}
-void BackgroundSync::join(OperationContext* txn) {
+void BackgroundSync::join(OperationContext* opCtx) {
_producerThread->join();
- _oplogBuffer->shutdown(txn);
+ _oplogBuffer->shutdown(opCtx);
}
bool BackgroundSync::inShutdown() const {
@@ -225,15 +225,15 @@ void BackgroundSync::_runProducer() {
}
// we want to start when we're no longer primary
// start() also loads _lastOpTimeFetched, which we know is set from the "if"
- auto txn = cc().makeOperationContext();
+ auto opCtx = cc().makeOperationContext();
if (getState() == ProducerState::Starting) {
- start(txn.get());
+ start(opCtx.get());
}
- _produce(txn.get());
+ _produce(opCtx.get());
}
-void BackgroundSync::_produce(OperationContext* txn) {
+void BackgroundSync::_produce(OperationContext* opCtx) {
if (MONGO_FAIL_POINT(stopReplProducer)) {
// This log output is used in js tests so please leave it.
log() << "bgsync - stopReplProducer fail point "
@@ -271,7 +271,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
HostAndPort source;
SyncSourceResolverResponse syncSourceResp;
{
- const OpTime minValidSaved = StorageInterface::get(txn)->getMinValid(txn);
+ const OpTime minValidSaved = StorageInterface::get(opCtx)->getMinValid(opCtx);
stdx::lock_guard<stdx::mutex> lock(_mutex);
const auto requiredOpTime = (minValidSaved > _lastOpTimeFetched) ? minValidSaved : OpTime();
@@ -358,8 +358,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
// Set the applied point if unset. This is most likely the first time we've established a sync
// source since stepping down or otherwise clearing the applied point. We need to set this here,
// before the OplogWriter gets a chance to append to the oplog.
- if (StorageInterface::get(txn)->getAppliedThrough(txn).isNull()) {
- StorageInterface::get(txn)->setAppliedThrough(txn, _replCoord->getMyLastAppliedOpTime());
+ if (StorageInterface::get(opCtx)->getAppliedThrough(opCtx).isNull()) {
+ StorageInterface::get(opCtx)->setAppliedThrough(opCtx,
+ _replCoord->getMyLastAppliedOpTime());
}
// "lastFetched" not used. Already set in _enqueueDocuments.
@@ -472,7 +473,7 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
- _rollback(txn, source, syncSourceResp.rbid, getConnection);
+ _rollback(opCtx, source, syncSourceResp.rbid, getConnection);
// Reset the producer to clear the sync source and the last optime fetched.
stop(true);
startProducerIfStopped();
@@ -540,10 +541,10 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi
return Status::OK(); // Nothing to do.
}
- auto txn = cc().makeOperationContext();
+ auto opCtx = cc().makeOperationContext();
// Wait for enough space.
- _oplogBuffer->waitForSpace(txn.get(), info.toApplyDocumentBytes);
+ _oplogBuffer->waitForSpace(opCtx.get(), info.toApplyDocumentBytes);
{
// Don't add more to the buffer if we are in shutdown. Continue holding the lock until we
@@ -560,7 +561,7 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi
}
// Buffer docs for later application.
- _oplogBuffer->pushAllNonBlocking(txn.get(), begin, end);
+ _oplogBuffer->pushAllNonBlocking(opCtx.get(), begin, end);
// Update last fetched info.
_lastFetchedHash = info.lastDocument.value;
@@ -585,8 +586,8 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi
return Status::OK();
}
-bool BackgroundSync::peek(OperationContext* txn, BSONObj* op) {
- return _oplogBuffer->peek(txn, op);
+bool BackgroundSync::peek(OperationContext* opCtx, BSONObj* op) {
+ return _oplogBuffer->peek(opCtx, op);
}
void BackgroundSync::waitForMore() {
@@ -594,11 +595,11 @@ void BackgroundSync::waitForMore() {
_oplogBuffer->waitForData(Seconds(1));
}
-void BackgroundSync::consume(OperationContext* txn) {
+void BackgroundSync::consume(OperationContext* opCtx) {
// this is just to get the op off the queue, it's been peeked at
// and queued for application already
BSONObj op;
- if (_oplogBuffer->tryPop(txn, &op)) {
+ if (_oplogBuffer->tryPop(opCtx, &op)) {
bufferCountGauge.decrement(1);
bufferSizeGauge.decrement(getSize(op));
} else {
@@ -609,7 +610,7 @@ void BackgroundSync::consume(OperationContext* txn) {
}
}
-void BackgroundSync::_rollback(OperationContext* txn,
+void BackgroundSync::_rollback(OperationContext* opCtx,
const HostAndPort& source,
boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
@@ -635,7 +636,7 @@ void BackgroundSync::_rollback(OperationContext* txn,
// then.
{
log() << "rollback 0";
- Lock::GlobalWrite globalWrite(txn->lockState());
+ Lock::GlobalWrite globalWrite(opCtx->lockState());
if (!_replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) {
log() << "Cannot transition from " << _replCoord->getMemberState().toString() << " to "
<< MemberState(MemberState::RS_ROLLBACK).toString();
@@ -644,8 +645,8 @@ void BackgroundSync::_rollback(OperationContext* txn,
}
try {
- auto status = syncRollback(txn,
- OplogInterfaceLocal(txn, rsOplogName),
+ auto status = syncRollback(opCtx,
+ OplogInterfaceLocal(opCtx, rsOplogName),
RollbackSourceImpl(getConnection, source, rsOplogName),
requiredRBID,
_replCoord);
@@ -668,7 +669,7 @@ void BackgroundSync::_rollback(OperationContext* txn,
warning() << "rollback cannot complete at this time (retrying later): " << redact(ex)
<< " appliedThrough=" << _replCoord->getMyLastAppliedOpTime()
- << " minvalid=" << StorageInterface::get(txn)->getMinValid(txn);
+ << " minvalid=" << StorageInterface::get(opCtx)->getMinValid(opCtx);
// Sleep a bit to allow upstream node to coalesce, if that was the cause of the failure. If
// we failed in a way that will keep failing, but wasn't flagged as a fatal failure, this
@@ -684,12 +685,12 @@ void BackgroundSync::_rollback(OperationContext* txn,
// so that if we wind up shutting down uncleanly in response to something we rolled back
// we know that we won't wind up right back in the same situation when we start back up
// because the rollback wasn't durable.
- txn->recoveryUnit()->waitUntilDurable();
+ opCtx->recoveryUnit()->waitUntilDurable();
// If we detected that we rolled back the shardIdentity document as part of this rollback
// then we must shut down to clear the in-memory ShardingState associated with the
// shardIdentity document.
- if (ShardIdentityRollbackNotifier::get(txn)->didRollbackHappen()) {
+ if (ShardIdentityRollbackNotifier::get(opCtx)->didRollbackHappen()) {
severe() << "shardIdentity document rollback detected. Shutting down to clear "
"in-memory sharding state. Restarting this process should safely return it "
"to a healthy state";
@@ -734,10 +735,10 @@ void BackgroundSync::stop(bool resetLastFetchedOptime) {
}
}
-void BackgroundSync::start(OperationContext* txn) {
+void BackgroundSync::start(OperationContext* opCtx) {
OpTimeWithHash lastAppliedOpTimeWithHash;
do {
- lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(txn);
+ lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(opCtx);
stdx::lock_guard<stdx::mutex> lk(_mutex);
// Double check the state after acquiring the mutex.
if (_state != ProducerState::Starting) {
@@ -762,28 +763,28 @@ void BackgroundSync::start(OperationContext* txn) {
LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
}
-void BackgroundSync::clearBuffer(OperationContext* txn) {
- _oplogBuffer->clear(txn);
+void BackgroundSync::clearBuffer(OperationContext* opCtx) {
+ _oplogBuffer->clear(opCtx);
const auto count = bufferCountGauge.get();
bufferCountGauge.decrement(count);
const auto size = bufferSizeGauge.get();
bufferSizeGauge.decrement(size);
}
-OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* txn) {
+OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* opCtx) {
BSONObj oplogEntry;
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
- bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry);
+ ScopedTransaction transaction(opCtx, MODE_IX);
+ Lock::DBLock lk(opCtx->lockState(), "local", MODE_X);
+ bool success = Helpers::getLast(opCtx, rsOplogName.c_str(), oplogEntry);
if (!success) {
// This can happen when we are to do an initial sync. lastHash will be set
// after the initial sync is complete.
return OpTimeWithHash(0);
}
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "readLastAppliedHash", rsOplogName);
} catch (const DBException& ex) {
severe() << "Problem reading " << rsOplogName << ": " << redact(ex);
fassertFailed(18904);
@@ -817,8 +818,8 @@ bool BackgroundSync::shouldStopFetching() const {
return false;
}
-void BackgroundSync::pushTestOpToBuffer(OperationContext* txn, const BSONObj& op) {
- _oplogBuffer->push(txn, op);
+void BackgroundSync::pushTestOpToBuffer(OperationContext* opCtx, const BSONObj& op) {
+ _oplogBuffer->push(opCtx, op);
bufferCountGauge.increment();
bufferSizeGauge.increment(op.objsize());
}