diff options
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 138 |
1 files changed, 122 insertions, 16 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 75b08a4fd76..67abc29502e 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -42,6 +42,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/op_observer.h" @@ -70,6 +71,7 @@ #include "mongo/util/net/hostandport.h" #include "mongo/util/net/message_port.h" #include "mongo/util/net/sock.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { @@ -116,7 +118,7 @@ void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext* repl::startMasterSlave(txn); } -void ReplicationCoordinatorExternalStateImpl::shutdown() { +void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) { stdx::lock_guard<stdx::mutex> lk(_threadMutex); if (_startedThreads) { log() << "Stopping replication applier threads"; @@ -129,6 +131,13 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() { if (_snapshotThread) _snapshotThread->shutdown(); + + if (getOplogDeleteFromPoint(txn).isNull() && + loadLastOpTime(txn) == getAppliedThrough(txn)) { + // Clear the appliedThrough marker to indicate we are consistent with the top of the + // oplog. + setAppliedThrough(txn, {}); + } } } @@ -169,24 +178,45 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati wuow.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs"); + + // This initializes the minvalid document with a null "ts" because older versions (<=3.2) + // get angry if the minValid document is present but doesn't have a "ts" field. + // Consider removing this once we no longer need to support downgrading to 3.2. + setMinValidToAtLeast(txn, {}); } catch (const DBException& ex) { return ex.toStatus(); } return Status::OK(); } -void ReplicationCoordinatorExternalStateImpl::logTransitionToPrimaryToOplog(OperationContext* txn) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(txn, MODE_X); +OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn, + bool isV1ElectionProtocol) { + invariant(txn->lockState()->isW()); + + // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be + // done before we add anything to our oplog. + invariant(getOplogDeleteFromPoint(txn).isNull()); + setAppliedThrough(txn, {}); + + if (isV1ElectionProtocol) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction scopedXact(txn, MODE_X); - WriteUnitOfWork wuow(txn); - txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(txn, - BSON("msg" - << "new primary")); - wuow.commit(); + WriteUnitOfWork wuow(txn); + txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage( + txn, + BSON("msg" + << "new primary")); + wuow.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, "logging transition to primary to oplog", "local.oplog.rs"); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "logging transition to primary to oplog", "local.oplog.rs"); + const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn)); + + dropAllTempCollections(txn); + + return opTimeToReturn; } void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() { @@ -301,18 +331,94 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp } void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) { - auto mv = getMinValid(txn); + if (getInitialSyncFlag(txn)) { + return; // Initial Sync will take over so no cleanup is needed. + } - if (!mv.start.isNull()) { - // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog. - LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON(); - truncateOplogTo(txn, mv.start.getTimestamp()); + const auto deleteFromPoint = getOplogDeleteFromPoint(txn); + const auto appliedThrough = getAppliedThrough(txn); + + const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() && + // This version should never have a non-null deleteFromPoint with a null appliedThrough. + // This scenario means that we downgraded after unclean shutdown, then the downgraded node + // deleted the ragged end of our oplog, then did a clean shutdown. + !appliedThrough.isNull() && + // Similarly we should never have an appliedThrough higher than the deleteFromPoint. This + // means that the downgraded node deleted our ragged end then applied ahead of our + // deleteFromPoint and then had an unclean shutdown before upgrading. We are ok with + // applying these ops because older versions wrote to the oplog from a single thread so we + // know they are in order. + !(appliedThrough.getTimestamp() >= deleteFromPoint); + if (needToDeleteEndOfOplog) { + log() << "Removing unapplied entries starting at: " << deleteFromPoint; + truncateOplogTo(txn, deleteFromPoint); + } + setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint + + if (appliedThrough.isNull()) { + // No follow-up work to do. + return; + } + + // Check if we have any unapplied ops in our oplog. It is important that this is done after + // deleting the ragged end of the oplog. + const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn)); + if (appliedThrough == topOfOplog) { + return; // We've applied all the valid oplog we have. + } else if (appliedThrough > topOfOplog) { + severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog + << '.'; + fassertFailedNoTrace(40313); + } + + log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " + << topOfOplog << " (inclusive)."; + + DBDirectClient db(txn); + auto cursor = db.query(rsOplogName, + QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), + /*batchSize*/ 0, + /*skip*/ 0, + /*projection*/ nullptr, + QueryOption_OplogReplay); + + // Check that the first document matches our appliedThrough point then skip it since it's + // already been applied. + if (!cursor->more()) { + // This should really be impossible because we check above that the top of the oplog is + // strictly > appliedThrough. If this fails it represents a serious bug in either the + // storage engine or query's implementation of OplogReplay. + severe() << "Couldn't find any entries in the oplog >= " << appliedThrough + << " which should be impossible."; + fassertFailedNoTrace(40293); + } + auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())); + if (firstOpTimeFound != appliedThrough) { + severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is " + << firstOpTimeFound; + fassertFailedNoTrace(40292); + } + + // Apply remaining ops one at at time, but don't log them because they are already logged. + const bool wereWritesReplicated = txn->writesAreReplicated(); + ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); }); + txn->setReplicatedWrites(false); + + while (cursor->more()) { + auto entry = cursor->nextSafe(); + fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true)); + setAppliedThrough(txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); } } StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) { // TODO: handle WriteConflictExceptions below try { + // If we are doing an initial sync do not read from the oplog. + if (getInitialSyncFlag(txn)) { + return {ErrorCodes::InitialSyncFailure, "In the middle of an initial sync."}; + } + BSONObj oplogEntry; if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) { return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument, |