diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-08-15 19:06:30 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2016-08-23 10:49:55 -0400 |
commit | 34c6c691a038eac1ac3ee16e1eedc54aab964774 (patch) | |
tree | 7a0776eba2e12ade082628f1654aee3a753feb57 /src/mongo/db | |
parent | 3bcafe4fe23c9521fe028a176fffabdc79d434e9 (diff) | |
download | mongo-34c6c691a038eac1ac3ee16e1eedc54aab964774.tar.gz |
SERVER-7200 Write oplog entries on secondaries before applying
Diffstat (limited to 'src/mongo/db')
26 files changed, 475 insertions, 386 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 175cf7c17d0..36b6b4cc603 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1019,6 +1019,7 @@ env.Library( 'roll_back_local_operations.cpp', ], LIBDEPS=[ + 'optime', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/util/foundation', ], diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index db134069821..d31775f0001 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -302,12 +302,9 @@ void BackgroundSync::_produce(OperationContext* txn) { log() << "Our newest OpTime : " << lastOpTimeFetched; log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen; log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; - - StorageInterface::get(txn)->setMinValid( - txn, {lastOpTimeFetched, syncSourceResp.earliestOpTimeSeen}); auto status = _replCoord->setMaintenanceMode(true); if (!status.isOK()) { - warning() << "Failed to transition into maintenance mode."; + warning() << "Failed to transition into maintenance mode: " << status; } bool worked = _replCoord->setFollowerMode(MemberState::RS_RECOVERING); if (!worked) { @@ -342,6 +339,13 @@ void BackgroundSync::_produce(OperationContext* txn) { } } + // Set the applied point if unset. This is most likely the first time we've established a sync + // source since stepping down or otherwise clearing the applied point. We need to set this here, + // before the OplogWriter gets a chance to append to the oplog. + if (StorageInterface::get(txn)->getAppliedThrough(txn).isNull()) { + StorageInterface::get(txn)->setAppliedThrough(txn, _replCoord->getMyLastAppliedOpTime()); + } + // "lastFetched" not used. Already set in _enqueueDocuments. Status fetcherReturnStatus = Status::OK(); DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState( @@ -442,13 +446,13 @@ void BackgroundSync::_produce(OperationContext* txn) { } // check that we are at minvalid, otherwise we cannot roll back as we may be in an // inconsistent state - BatchBoundaries boundaries = StorageInterface::get(txn)->getMinValid(txn); - if (!boundaries.start.isNull() || boundaries.end > lastApplied) { + const auto minValid = StorageInterface::get(txn)->getMinValid(txn); + if (lastApplied < minValid) { fassertNoTrace(18750, Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << "need to rollback, but in inconsistent state. " << "minvalid: " - << boundaries.end.toString() + << minValid.toString() << " > our last optime: " << lastApplied.toString())); } diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 3a101edf49c..edf62d53261 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -806,7 +806,6 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn, _lastFetched = _lastApplied; _storage->clearInitialSyncFlag(txn); - _storage->setMinValid(txn, _lastApplied.opTime, DurableRequirement::Strong); _opts.setMyLastOptime(_lastApplied.opTime); log() << "initial sync done; took " << _stats.initialSyncEnd - _stats.initialSyncStart << " milliseconds."; diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 5fda4574d05..2e610523d99 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -288,7 +288,7 @@ OplogDocWriter _logOpWriter(OperationContext* txn, } } // end anon namespace -// Truncates the oplog to but excluding the "truncateTimestamp" entry. +// Truncates the oplog after and including the "truncateTimestamp" entry. void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { const NamespaceString oplogNss(rsOplogName); ScopedTransaction transaction(txn, MODE_IX); @@ -302,42 +302,41 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { } // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. - bool foundSomethingToTruncate = false; - RecordId lastRecordId; - BSONObj lastOplogEntry; + RecordId oldestIDToDelete; // Non-null if there is something to delete. auto oplogRs = oplogCollection->getRecordStore(); - auto oplogReverseCursor = oplogRs->getCursor(txn, false); - bool first = true; + auto oplogReverseCursor = oplogRs->getCursor(txn, /*forward=*/false); + size_t count = 0; while (auto next = oplogReverseCursor->next()) { - lastOplogEntry = next->data.releaseToBson(); - lastRecordId = next->id; + const BSONObj entry = next->data.releaseToBson(); + const RecordId id = next->id; + count++; - const auto tsElem = lastOplogEntry["ts"]; - - if (first) { + const auto tsElem = entry["ts"]; + if (count == 1) { if (tsElem.eoo()) - LOG(2) << "Oplog tail entry: " << lastOplogEntry; + LOG(2) << "Oplog tail entry: " << entry; else LOG(2) << "Oplog tail entry ts field: " << tsElem; - first = false; } - if (tsElem.timestamp() == truncateTimestamp) { - break; - } else if (tsElem.timestamp() < truncateTimestamp) { - fassertFailedWithStatusNoTrace(34411, - Status(ErrorCodes::OplogOutOfOrder, - str::stream() << "Can't find " - << truncateTimestamp.toString() - << " to truncate from!")); + if (tsElem.timestamp() < truncateTimestamp) { + // If count == 1, that means that we have nothing to delete because everything in the + // oplog is < truncateTimestamp. + if (count != 1) { + invariant(!oldestIDToDelete.isNull()); + oplogCollection->temp_cappedTruncateAfter( + txn, oldestIDToDelete, /*inclusive=*/true); + } + return; } - foundSomethingToTruncate = true; + oldestIDToDelete = id; } - if (foundSomethingToTruncate) { - oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false); - } + severe() << "Reached end of oplog looking for oplog entry before " + << truncateTimestamp.toStringPretty() + << " but couldn't find any after looking through " << count << " entries."; + fassertFailedNoTrace(40296); } /* we write to local.oplog.rs: diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp index f9a08da370f..6ab73a44b01 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/oplogreader.cpp @@ -154,10 +154,9 @@ void OplogReader::connectToSyncSource(OperationContext* txn, log() << "our last optime : " << lastOpTimeFetched; log() << "oldest available is " << oldestOpTimeSeen; log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"; - StorageInterface::get(txn)->setMinValid(txn, {lastOpTimeFetched, oldestOpTimeSeen}); auto status = replCoord->setMaintenanceMode(true); if (!status.isOK()) { - warning() << "Failed to transition into maintenance mode."; + warning() << "Failed to transition into maintenance mode: " << status; } bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING); if (!worked) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 857bac4abd7..f64214b462c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -136,9 +136,16 @@ public: virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config) = 0; /** - * Writes a message about our transition to primary to the oplog. + * Called as part of the process of transitioning to primary. See the call site in + * ReplicationCoordinatorImpl for details about when and how it is called. + * + * Among other things, this writes a message about our transition to primary to the oplog if + * isV1 and and returns the optime of that message. If !isV1, returns the optime of the last op + * in the oplog. + * + * Throws on errors. */ - virtual void logTransitionToPrimaryToOplog(OperationContext* txn) = 0; + virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) = 0; /** * Simple wrapper around SyncSourceFeedback::forwardSlaveProgress. Signals to the @@ -225,13 +232,6 @@ public: virtual void shardingOnStepDownHook() = 0; /** - * Called when the instance transitions to primary. Calls all drain mode hooks. - * - * Throws on errors. - */ - virtual void drainModeHook(OperationContext* txn) = 0; - - /** * Notifies the bgsync and syncSourceFeedback threads to choose a new sync source. */ virtual void signalApplierToChooseNewSyncSource() = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index c5d36369d07..ad89b160e80 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -43,6 +43,7 @@ #include "mongo/db/commands/feature_compatibility_version.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/op_observer.h" @@ -87,6 +88,7 @@ #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/listen.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { @@ -286,6 +288,13 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) { _snapshotThread->shutdown(); } + if (_storageInterface->getOplogDeleteFromPoint(txn).isNull() && + loadLastOpTime(txn) == _storageInterface->getAppliedThrough(txn)) { + // Clear the appliedThrough marker to indicate we are consistent with the top of the + // oplog. + _storageInterface->setAppliedThrough(txn, {}); + } + log() << "Stopping replication storage threads"; _taskExecutor->shutdown(); _taskExecutor->join(); @@ -318,24 +327,46 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati wuow.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs"); + + // This initializes the minvalid document with a null "ts" because older versions (<=3.2) + // get angry if the minValid document is present but doesn't have a "ts" field. + // Consider removing this once we no longer need to support downgrading to 3.2. + _storageInterface->setMinValidToAtLeast(txn, {}); } catch (const DBException& ex) { return ex.toStatus(); } return Status::OK(); } -void ReplicationCoordinatorExternalStateImpl::logTransitionToPrimaryToOplog(OperationContext* txn) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction scopedXact(txn, MODE_X); +OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn, + bool isV1ElectionProtocol) { + + // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be + // done before we add anything to our oplog. + invariant(_storageInterface->getOplogDeleteFromPoint(txn).isNull()); + _storageInterface->setAppliedThrough(txn, {}); - WriteUnitOfWork wuow(txn); - txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(txn, - BSON("msg" - << "new primary")); - wuow.commit(); + if (isV1ElectionProtocol) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction scopedXact(txn, MODE_X); + + WriteUnitOfWork wuow(txn); + txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage( + txn, + BSON("msg" + << "new primary")); + wuow.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, "logging transition to primary to oplog", "local.oplog.rs"); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "logging transition to primary to oplog", "local.oplog.rs"); + const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn)); + + FeatureCompatibilityVersion::setIfCleanStartup(txn); + shardingOnTransitionToPrimaryHook(txn); + dropAllTempCollections(txn); + + return opTimeToReturn; } void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() { @@ -453,12 +484,77 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon if (_storageInterface->getInitialSyncFlag(txn)) { return; // Initial Sync will take over so no cleanup is needed. } - auto mv = _storageInterface->getMinValid(txn); - if (!mv.start.isNull()) { - // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog. - LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON(); - truncateOplogTo(txn, mv.start.getTimestamp()); + const auto deleteFromPoint = _storageInterface->getOplogDeleteFromPoint(txn); + const auto appliedThrough = _storageInterface->getAppliedThrough(txn); + + const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() && + // This version should never have a non-null deleteFromPoint with a null appliedThrough. + // This scenario means that we downgraded after unclean shutdown, then the downgraded node + // deleted the ragged end of our oplog, then did a clean shutdown. + !appliedThrough.isNull() && + // Similarly we should never have an appliedThrough higher than the deleteFromPoint. This + // means that the downgraded node deleted our ragged end then applied ahead of our + // deleteFromPoint and then had an unclean shutdown before upgrading. We are ok with + // applying these ops because older versions wrote to the oplog from a single thread so we + // know they are in order. + !(appliedThrough.getTimestamp() >= deleteFromPoint); + if (needToDeleteEndOfOplog) { + log() << "Removing unapplied entries starting at: " << deleteFromPoint; + truncateOplogTo(txn, deleteFromPoint); + } + _storageInterface->setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint + + if (appliedThrough.isNull()) { + // No follow-up work to do. + return; + } + + // Check if we have any unapplied ops in our oplog. It is important that this is done after + // deleting the ragged end of the oplog. + const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn)); + if (topOfOplog >= appliedThrough) { + return; // We've applied all the valid oplog we have. + } + + log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to " + << topOfOplog << " (inclusive)."; + + DBDirectClient db(txn); + auto cursor = db.query(rsOplogName, + QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())), + /*batchSize*/ 0, + /*skip*/ 0, + /*projection*/ nullptr, + QueryOption_OplogReplay); + + // Check that the first document matches our appliedThrough point then skip it since it's + // already been applied. + if (!cursor->more()) { + // This should really be impossible because we check above that the top of the oplog is + // strictly > appliedThrough. If this fails it represents a serious bug in either the + // storage engine or query's implementation of OplogReplay. + severe() << "Couldn't find any entries in the oplog >= " << appliedThrough + << " which should be impossible."; + fassertFailedNoTrace(40293); + } + auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())); + if (firstOpTimeFound != appliedThrough) { + severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is " + << firstOpTimeFound; + fassertFailedNoTrace(40292); + } + + // Apply remaining ops one at at time, but don't log them because they are already logged. + const bool wereWritesReplicated = txn->writesAreReplicated(); + ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); }); + txn->setReplicatedWrites(false); + + while (cursor->more()) { + auto entry = cursor->nextSafe(); + fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true)); + _storageInterface->setAppliedThrough( + txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry))); } } @@ -524,13 +620,8 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() { ShardingState::get(getGlobalServiceContext())->clearCollectionMetadata(); } -void ReplicationCoordinatorExternalStateImpl::drainModeHook(OperationContext* txn) { - FeatureCompatibilityVersion::setIfCleanStartup(txn); - shardingOnDrainingStateHook(txn); - dropAllTempCollections(txn); -} - -void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(OperationContext* txn) { +void ReplicationCoordinatorExternalStateImpl::shardingOnTransitionToPrimaryHook( + OperationContext* txn) { auto status = ShardingStateRecovery::recover(txn); if (ErrorCodes::isShutdownError(status.code())) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 7e339031d17..0dec8b25560 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -74,7 +74,7 @@ public: virtual executor::TaskExecutor* getTaskExecutor() const override; virtual OldThreadPool* getDbWorkThreadPool() const override; virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config); - virtual void logTransitionToPrimaryToOplog(OperationContext* txn); + virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol); virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext* txn); virtual bool isSelf(const HostAndPort& host, ServiceContext* ctx); @@ -89,7 +89,6 @@ public: virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* txn); virtual void shardingOnStepDownHook(); - virtual void drainModeHook(OperationContext* txn); virtual void signalApplierToChooseNewSyncSource(); virtual void signalApplierToCancelFetcher(); void dropAllSnapshots() final; @@ -129,7 +128,7 @@ private: * * Throws on errors. */ - void shardingOnDrainingStateHook(OperationContext* txn); + void shardingOnTransitionToPrimaryHook(OperationContext* txn); /** * Drops all temporary collections on all databases except "local". diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 4b104c697fa..80d362cd13c 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -226,8 +226,6 @@ void ReplicationCoordinatorExternalStateMock::killAllUserOperations(OperationCon void ReplicationCoordinatorExternalStateMock::shardingOnStepDownHook() {} -void ReplicationCoordinatorExternalStateMock::drainModeHook(OperationContext* txn) {} - void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {} void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() { @@ -288,8 +286,12 @@ void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val _isReadCommittedSupported = val; } -void ReplicationCoordinatorExternalStateMock::logTransitionToPrimaryToOplog(OperationContext* txn) { - _lastOpTime = OpTime(Timestamp(1, 0), 1); +OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* txn, + bool isV1ElectionProtocol) { + if (isV1ElectionProtocol) { + _lastOpTime = OpTime(Timestamp(1, 0), 1); + } + return fassertStatusOK(40297, _lastOpTime); } } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 5df6cd7733f..c8ac8263b0b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -69,7 +69,7 @@ public: virtual executor::TaskExecutor* getTaskExecutor() const override; virtual OldThreadPool* getDbWorkThreadPool() const override; virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config); - virtual void logTransitionToPrimaryToOplog(OperationContext* txn); + virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol); virtual void forwardSlaveProgress(); virtual OID ensureMe(OperationContext*); virtual bool isSelf(const HostAndPort& host, ServiceContext* ctx); @@ -84,7 +84,6 @@ public: virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* txn); virtual void shardingOnStepDownHook(); - virtual void drainModeHook(OperationContext* txn); virtual void signalApplierToChooseNewSyncSource(); virtual void signalApplierToCancelFetcher(); virtual void dropAllSnapshots(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index a25fce29a13..00f3635072c 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -448,7 +448,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { fassertFailedNoTrace(28545); } - // Returns the last optime from the oplog, possibly truncating first if we need to recover. + // Read the last op from the oplog after cleaning up any partially applied batches. _externalState->cleanUpLastApplyBatch(txn); auto lastOpTimeStatus = _externalState->loadLastOpTime(txn); @@ -856,7 +856,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { // _isWaitingForDrainToComplete, set the flag allowing non-local database writes and // drop the mutex. At this point, no writes can occur from other threads, due to the // global exclusive lock. - // 4.) Drop all temp collections. + // 4.) Drop all temp collections, and log the drops to the oplog. // 5.) Log transition to primary in the oplog and set that OpTime as the floor for what we will // consider to be committed. // 6.) Drop the global exclusive lock. @@ -868,40 +868,42 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) { // external writes will be processed. This is important so that a new temp collection isn't // introduced on the new primary before we drop all the temp collections. + // When we go to drop all temp collections, we must replicate the drops. + invariant(txn->writesAreReplicated()); + stdx::unique_lock<stdx::mutex> lk(_mutex); if (!_isWaitingForDrainToComplete) { return; } - lk.unlock(); + lk.unlock(); ScopedTransaction transaction(txn, MODE_X); + // Block step downs even after we unlock lk. Lock::GlobalWrite globalWriteLock(txn->lockState()); - lk.lock(); + if (!_isWaitingForDrainToComplete) { return; } + invariant(!_isCatchingUp); _isWaitingForDrainToComplete = false; - _canAcceptNonLocalWrites = true; _drainFinishedCond.notify_all(); - lk.unlock(); - _externalState->drainModeHook(txn); - - // This is done for compatibility with PV0 replicas wrt how "n" ops are processed. - if (isV1ElectionProtocol()) { - _externalState->logTransitionToPrimaryToOplog(txn); + if (!_getMemberState_inlock().primary()) { + // We must have decided not to transition to primary while waiting for the applier to drain. + // Skip the rest of this function since it should only be done when really transitioning. + return; } - StatusWith<OpTime> lastOpTime = _externalState->loadLastOpTime(txn); - fassertStatusOK(28665, lastOpTime.getStatus()); - _setFirstOpTimeOfMyTerm(lastOpTime.getValue()); + _canAcceptNonLocalWrites = true; + lk.unlock(); + _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(txn, isV1ElectionProtocol())); lk.lock(); + // Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged - // our election in logTransitionToPrimaryToOplog(), above. + // our election in onTransitionToPrimary(), above. _updateLastCommittedOpTime_inlock(); - lk.unlock(); log() << "transition to primary complete; database writes are now permitted" << rsLog; } diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp index 2117458e9f4..92312d0d21c 100644 --- a/src/mongo/db/repl/roll_back_local_operations.cpp +++ b/src/mongo/db/repl/roll_back_local_operations.cpp @@ -41,6 +41,10 @@ namespace repl { namespace { +OpTime getOpTime(const OplogInterface::Iterator::Value& oplogValue) { + return fassertStatusOK(40298, OpTime::parseFromOplogEntry(oplogValue.first)); +} + Timestamp getTimestamp(const BSONObj& operation) { return operation["ts"].timestamp(); } @@ -116,7 +120,7 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations _scanned++; if (getHash(_localOplogValue) == getHash(operation)) { return StatusWith<RollbackCommonPoint>( - std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second)); + std::make_pair(getOpTime(_localOplogValue), _localOplogValue.second)); } auto status = _rollbackOperation(_localOplogValue.first); if (!status.isOK()) { @@ -139,14 +143,11 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations "Need to process additional remote operations."); } - if (getTimestamp(_localOplogValue) < getTimestamp(operation)) { - _scanned++; - return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey, - "Unable to determine common point. " - "Need to process additional remote operations."); - } - - return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId()); + invariant(getTimestamp(_localOplogValue) < getTimestamp(operation)); + _scanned++; + return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey, + "Unable to determine common point. " + "Need to process additional remote operations."); } StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations( diff --git a/src/mongo/db/repl/roll_back_local_operations.h b/src/mongo/db/repl/roll_back_local_operations.h index 20eb923083d..87a940ce57b 100644 --- a/src/mongo/db/repl/roll_back_local_operations.h +++ b/src/mongo/db/repl/roll_back_local_operations.h @@ -34,6 +34,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/record_id.h" #include "mongo/db/repl/oplog_interface.h" +#include "mongo/db/repl/optime.h" #include "mongo/stdx/functional.h" namespace mongo { @@ -49,7 +50,7 @@ public: */ using RollbackOperationFn = stdx::function<Status(const BSONObj&)>; - using RollbackCommonPoint = std::pair<Timestamp, RecordId>; + using RollbackCommonPoint = std::pair<OpTime, RecordId>; /** * Initializes rollback processor with a valid local oplog. diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp index 03abb00dfb2..b4a0200f145 100644 --- a/src/mongo/db/repl/roll_back_local_operations_test.cpp +++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp @@ -107,7 +107,7 @@ TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) { RollBackLocalOperations finder(localOplog, rollbackOperation); auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); @@ -165,7 +165,7 @@ TEST(RollBackLocalOperationsTest, SkipRemoteOperations) { } auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); @@ -198,7 +198,7 @@ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) { } auto result = finder.onRemoteOperation(commonOperation.first); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); @@ -271,7 +271,7 @@ TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) { return Status::OK(); }); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_FALSE(i == localOperations.cend()); ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); @@ -290,7 +290,7 @@ TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) { return Status::OK(); }); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); } @@ -308,7 +308,7 @@ TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) { return Status::OK(); }); ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first); ASSERT_EQUALS(commonOperation.second, result.getValue().second); ASSERT_TRUE(called); } diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 88c7183b1da..990b5fcf554 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -82,8 +82,9 @@ using std::string; void truncateAndResetOplog(OperationContext* txn, ReplicationCoordinator* replCoord, BackgroundSync* bgsync) { - // Clear minvalid - StorageInterface::get(txn)->setMinValid(txn, OpTime(), DurableRequirement::None); + + // Add field to minvalid document to tell us to restart initial sync if we crash + StorageInterface::get(txn)->setInitialSyncFlag(txn); AutoGetDb autoDb(txn, "local", MODE_X); massert(28585, "no local database found", autoDb.getDb()); @@ -300,9 +301,6 @@ Status _initialSync(BackgroundSync* bgsync) { return Status(ErrorCodes::InitialSyncFailure, msg); } - // Add field to minvalid document to tell us to restart initial sync if we crash - StorageInterface::get(&txn)->setInitialSyncFlag(&txn); - log() << "initial sync drop all databases"; dropAllDatabasesExceptLocal(&txn); @@ -427,16 +425,7 @@ Status _initialSync(BackgroundSync* bgsync) { log() << "initial sync finishing up"; - { - ScopedTransaction scopedXact(&txn, MODE_IX); - AutoGetDb autodb(&txn, "local", MODE_X); - OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastAppliedOpTime()); - log() << "set minValid=" << lastOpTimeWritten; - - // Initial sync is now complete. Flag this by setting minValid to the last thing we synced. - StorageInterface::get(&txn)->setMinValid(&txn, lastOpTimeWritten, DurableRequirement::None); - } - + // Initial sync is now complete. // Clear the initial sync flag -- cannot be done under a db lock, or recursive. StorageInterface::get(&txn)->clearInitialSyncFlag(&txn); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 0663c13e4dd..c03251f499d 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -159,7 +159,7 @@ struct FixUpInfo { set<string> collectionsToResyncData; set<string> collectionsToResyncMetadata; - Timestamp commonPoint; + OpTime commonPoint; RecordId commonPointOurDiskloc; int rbid; // remote server's current rollback sequence # @@ -391,9 +391,12 @@ void syncFixUp(OperationContext* txn, // we have items we are writing that aren't from a point-in-time. thus best not to come // online until we get to that point in freshness. + // TODO this is still wrong because we don't record that we are in rollback, and we can't really + // recover. OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid)); log() << "minvalid=" << minValid; - StorageInterface::get(txn)->setMinValid(txn, {OpTime{}, minValid}); + StorageInterface::get(txn)->setAppliedThrough(txn, {}); // Use top of oplog. + StorageInterface::get(txn)->setMinValid(txn, minValid); // any full collection resyncs required? if (!fixUpInfo.collectionsToResyncData.empty() || @@ -498,8 +501,8 @@ void syncFixUp(OperationContext* txn, } else { OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid)); log() << "minvalid=" << minValid; - const OpTime start{fixUpInfo.commonPoint, OpTime::kUninitializedTerm}; - StorageInterface::get(txn)->setMinValid(txn, {start, minValid}); + StorageInterface::get(txn)->setMinValid(txn, minValid); + StorageInterface::get(txn)->setAppliedThrough(txn, fixUpInfo.commonPoint); } } catch (const DBException& e) { err = "can't get/set minvalid: "; @@ -769,7 +772,7 @@ void syncFixUp(OperationContext* txn, log() << "rollback 6"; // clean up oplog - LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toStringPretty(); + LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toString(); { const NamespaceString oplogNss(rsOplogName); ScopedTransaction transaction(txn, MODE_IX); diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 2c38252d442..7bbe51cde11 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -147,7 +147,8 @@ void RSRollbackTest::setUp() { StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceMock>()); setOplogCollectionName(); - repl::StorageInterface::get(_txn.get())->setMinValid(_txn.get(), {OpTime{}, OpTime{}}); + repl::StorageInterface::get(_txn.get())->setAppliedThrough(_txn.get(), OpTime{}); + repl::StorageInterface::get(_txn.get())->setMinValid(_txn.get(), OpTime{}); } void RSRollbackTest::tearDown() { @@ -161,8 +162,9 @@ void noSleep(Seconds seconds) {} TEST_F(RSRollbackTest, InconsistentMinValid) { repl::StorageInterface::get(_txn.get()) - ->setMinValid(_txn.get(), - {OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)}); + ->setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0)); + repl::StorageInterface::get(_txn.get()) + ->setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0)); auto status = syncRollback(_txn.get(), OplogInterfaceMock(kEmptyMockOperations), RollbackSourceMock(std::unique_ptr<OplogInterface>( diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp index 557b44d079a..b9a1c25df45 100644 --- a/src/mongo/db/repl/storage_interface.cpp +++ b/src/mongo/db/repl/storage_interface.cpp @@ -43,21 +43,6 @@ const auto getStorageInterface = ServiceContext::declareDecoration<std::unique_ptr<StorageInterface>>(); } -bool BatchBoundaries::operator==(const BatchBoundaries& rhs) const { - if (&rhs == this) { - return true; - } - return start == rhs.start && end == rhs.end; -} - -std::string BatchBoundaries::toString() const { - return str::stream() << "[start=" << start.toString() << ", end=" << end.toString() << "]"; -} - -std::ostream& operator<<(std::ostream& stream, const BatchBoundaries& boundaries) { - return stream << boundaries.toString(); -} - StorageInterface* StorageInterface::get(ServiceContext* service) { return getStorageInterface(service).get(); } diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index 25873b366ea..22c7ca94e04 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -48,21 +48,6 @@ class OperationContext; namespace repl { -struct BatchBoundaries { - BatchBoundaries(const OpTime s, const OpTime e) : start(s), end(e) {} - bool operator==(const BatchBoundaries& rhs) const; - std::string toString() const; - OpTime start; - OpTime end; -}; - -std::ostream& operator<<(std::ostream& stream, const BatchBoundaries& boundaries); - -enum class DurableRequirement { - None, // Does not require any durability of the write. - Strong, // Requires journal or checkpoint write. -}; - /** * Storage interface used by the replication system to interact with storage. * This interface provides seperation of concerns and a place for mocking out test @@ -137,29 +122,40 @@ public: virtual void clearInitialSyncFlag(OperationContext* txn) = 0; /** - * Returns the bounds of the current apply batch, if active. If start is null/missing, and - * end is equal to the last oplog entry then we are in a consistent state and ready for reads. + * The minValid value is the earliest (minimum) Timestamp that must be applied in order to + * consider the dataset consistent. */ - virtual BatchBoundaries getMinValid(OperationContext* txn) const = 0; + virtual void setMinValid(OperationContext* txn, const OpTime& minValid) = 0; + virtual OpTime getMinValid(OperationContext* txn) const = 0; /** - * The minValid value is the earliest (minimum) Timestamp that must be applied in order to - * consider the dataset consistent. - * - * This is called when a batch finishes. - * - * Wait for durable writes (which will block on journaling/checkpointing) when specified. - * + * Sets minValid only if it is not already higher than endOpTime. + * Warning, this compares the term and timestamp independently. Do not use if the current + * minValid could be from the other fork of a rollback. + */ + virtual void setMinValidToAtLeast(OperationContext* txn, const OpTime& endOpTime) = 0; + + /** + * On startup all oplog entries with a value >= the oplog delete from point should be deleted. + * If null, no documents should be deleted. */ - virtual void setMinValid(OperationContext* txn, - const OpTime& endOpTime, - const DurableRequirement durReq) = 0; + virtual void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) = 0; + virtual Timestamp getOplogDeleteFromPoint(OperationContext* txn) = 0; /** - * The bounds indicate an apply is active and we are not in a consistent state to allow reads - * or transition from a non-visible state to primary/secondary. + * The applied through point is a persistent record of where we've applied through. If null, the + * applied through point is the top of the oplog. */ - virtual void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) = 0; + virtual void setAppliedThrough(OperationContext* txn, const OpTime& optime) = 0; + + /** + * You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead. + * + * This reads the value from storage which isn't always updated when the ReplicationCoordinator + * is. + */ + virtual OpTime getAppliedThrough(OperationContext* txn) = 0; + // Collection creation and population for initial sync. /** diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 27d15935055..ab7ff8623b6 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -75,6 +75,7 @@ namespace repl { const char StorageInterfaceImpl::kDefaultMinValidNamespace[] = "local.replset.minvalid"; const char StorageInterfaceImpl::kInitialSyncFlagFieldName[] = "doingInitialSync"; const char StorageInterfaceImpl::kBeginFieldName[] = "begin"; +const char StorageInterfaceImpl::kOplogDeleteFromPointFieldName[] = "oplogDeleteFromPoint"; namespace { using UniqueLock = stdx::unique_lock<stdx::mutex>; @@ -100,135 +101,139 @@ NamespaceString StorageInterfaceImpl::getMinValidNss() const { return _minValidNss; } -bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* txn) const { +BSONObj StorageInterfaceImpl::getMinValidDocument(OperationContext* txn) const { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IS); Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_IS); Lock::CollectionLock lk(txn->lockState(), _minValidNss.ns(), MODE_IS); - BSONObj mv; - bool found = Helpers::getSingleton(txn, _minValidNss.ns().c_str(), mv); - - if (found) { - const auto flag = mv[kInitialSyncFlagFieldName].trueValue(); - LOG(3) << "return initial flag value of " << flag; - return flag; - } - LOG(3) << "return initial flag value of false"; - return false; + BSONObj doc; + bool found = Helpers::getSingleton(txn, _minValidNss.ns().c_str(), doc); + invariant(found || doc.isEmpty()); + return doc; } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::getInitialSyncFlag", _minValidNss.ns()); + txn, "StorageInterfaceImpl::getMinValidDocument", _minValidNss.ns()); MONGO_UNREACHABLE; } -void StorageInterfaceImpl::setInitialSyncFlag(OperationContext* txn) { +void StorageInterfaceImpl::updateMinValidDocument(OperationContext* txn, + const BSONObj& updateSpec) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { ScopedTransaction transaction(txn, MODE_IX); + // For now this needs to be MODE_X because it sometimes creates the collection. Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X); - Helpers::putSingleton(txn, _minValidNss.ns().c_str(), BSON("$set" << kInitialSyncFlag)); + Helpers::putSingleton(txn, _minValidNss.ns().c_str(), updateSpec); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::setInitialSyncFlag", _minValidNss.ns()); + txn, "StorageInterfaceImpl::updateMinValidDocument", _minValidNss.ns()); +} - txn->recoveryUnit()->waitUntilDurable(); +bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* txn) const { + const BSONObj doc = getMinValidDocument(txn); + const auto flag = doc[kInitialSyncFlagFieldName].trueValue(); + LOG(3) << "returning initial sync flag value of " << flag; + return flag; +} + +void StorageInterfaceImpl::setInitialSyncFlag(OperationContext* txn) { LOG(3) << "setting initial sync flag"; + updateMinValidDocument(txn, BSON("$set" << kInitialSyncFlag)); + txn->recoveryUnit()->waitUntilDurable(); } void StorageInterfaceImpl::clearInitialSyncFlag(OperationContext* txn) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - // TODO: Investigate correctness of taking MODE_IX for DB/Collection locks - Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X); - Helpers::putSingleton(txn, _minValidNss.ns().c_str(), BSON("$unset" << kInitialSyncFlag)); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::clearInitialSyncFlag", _minValidNss.ns()); + LOG(3) << "clearing initial sync flag"; auto replCoord = repl::ReplicationCoordinator::get(txn); + OpTime time = replCoord->getMyLastAppliedOpTime(); + updateMinValidDocument( + txn, + BSON("$unset" << kInitialSyncFlag << "$set" + << BSON("ts" << time.getTimestamp() << "t" << time.getTerm() + << kBeginFieldName + << time.toBSON()))); + if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) { - OpTime time = replCoord->getMyLastAppliedOpTime(); txn->recoveryUnit()->waitUntilDurable(); replCoord->setMyLastDurableOpTime(time); } - LOG(3) << "clearing initial sync flag"; } -BatchBoundaries StorageInterfaceImpl::getMinValid(OperationContext* txn) const { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IS); - Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_IS); - Lock::CollectionLock lk(txn->lockState(), _minValidNss.ns(), MODE_IS); - BSONObj mv; - bool found = Helpers::getSingleton(txn, _minValidNss.ns().c_str(), mv); - if (found) { - auto status = OpTime::parseFromOplogEntry(mv.getObjectField(kBeginFieldName)); - OpTime start(status.isOK() ? status.getValue() : OpTime{}); - const auto opTimeStatus = OpTime::parseFromOplogEntry(mv); - // If any of the keys (fields) are missing from the minvalid document, we return - // empty. - if (opTimeStatus == ErrorCodes::NoSuchKey) { - return BatchBoundaries{{}, {}}; - } - - if (!opTimeStatus.isOK()) { - error() << "Error parsing minvalid entry: " << mv - << ", with status:" << opTimeStatus.getStatus(); - } - OpTime end(fassertStatusOK(40052, opTimeStatus)); - LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> " - << end.toString() << "(" << end.toBSON() << ")"; +OpTime StorageInterfaceImpl::getMinValid(OperationContext* txn) const { + const BSONObj doc = getMinValidDocument(txn); + const auto opTimeStatus = OpTime::parseFromOplogEntry(doc); + // If any of the keys (fields) are missing from the minvalid document, we return + // a null OpTime. + if (opTimeStatus == ErrorCodes::NoSuchKey) { + return {}; + } - return BatchBoundaries(start, end); - } - LOG(3) << "returning empty minvalid"; - return BatchBoundaries{{}, {}}; + if (!opTimeStatus.isOK()) { + severe() << "Error parsing minvalid entry: " << doc + << ", with status:" << opTimeStatus.getStatus(); + fassertFailedNoTrace(40052); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::getMinValid", _minValidNss.ns()); + + OpTime minValid = opTimeStatus.getValue(); + LOG(3) << "returning minvalid: " << minValid.toString() << "(" << minValid.toBSON() << ")"; + + return minValid; } -void StorageInterfaceImpl::setMinValid(OperationContext* txn, - const OpTime& endOpTime, - const DurableRequirement durReq) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X); - Helpers::putSingleton( - txn, - _minValidNss.ns().c_str(), - BSON("$set" << BSON("ts" << endOpTime.getTimestamp() << "t" << endOpTime.getTerm()) - << "$unset" - << BSON(kBeginFieldName << 1))); +void StorageInterfaceImpl::setMinValid(OperationContext* txn, const OpTime& minValid) { + LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON() + << ")"; + updateMinValidDocument( + txn, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); +} + +void StorageInterfaceImpl::setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) { + LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON() + << ")"; + updateMinValidDocument( + txn, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm()))); +} + +void StorageInterfaceImpl::setOplogDeleteFromPoint(OperationContext* txn, + const Timestamp& timestamp) { + LOG(3) << "setting oplog delete from point to: " << timestamp.toStringPretty(); + updateMinValidDocument(txn, BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp))); +} + +Timestamp StorageInterfaceImpl::getOplogDeleteFromPoint(OperationContext* txn) { + const BSONObj doc = getMinValidDocument(txn); + Timestamp out = {}; + if (auto field = doc[kOplogDeleteFromPointFieldName]) { + out = field.timestamp(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::setMinValid", _minValidNss.ns()); - if (durReq == DurableRequirement::Strong) { - txn->recoveryUnit()->waitUntilDurable(); + LOG(3) << "returning oplog delete from point: " << out; + return out; +} + +void StorageInterfaceImpl::setAppliedThrough(OperationContext* txn, const OpTime& optime) { + LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")"; + if (optime.isNull()) { + updateMinValidDocument(txn, BSON("$unset" << BSON(kBeginFieldName << 1))); + } else { + updateMinValidDocument(txn, BSON("$set" << BSON(kBeginFieldName << optime.toBSON()))); } - LOG(3) << "setting minvalid: " << endOpTime.toString() << "(" << endOpTime.toBSON() << ")"; } -void StorageInterfaceImpl::setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) { - const OpTime& start(boundaries.start); - const OpTime& end(boundaries.end); - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X); - Helpers::putSingleton(txn, - _minValidNss.ns().c_str(), - BSON("$set" << BSON("ts" << end.getTimestamp() << "t" << end.getTerm() - << kBeginFieldName - << start.toBSON()))); +OpTime StorageInterfaceImpl::getAppliedThrough(OperationContext* txn) { + const BSONObj doc = getMinValidDocument(txn); + const auto opTimeStatus = OpTime::parseFromOplogEntry(doc.getObjectField(kBeginFieldName)); + if (!opTimeStatus.isOK()) { + // Return null OpTime on any parse failure, including if "begin" is missing. + return {}; } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - txn, "StorageInterfaceImpl::setMinValid", _minValidNss.ns()); - // NOTE: No need to ensure durability here since starting a batch isn't a problem unless - // writes happen after, in which case this marker (minvalid) will be written already. - LOG(3) << "setting minvalid: " << boundaries.start.toString() << "(" - << boundaries.start.toBSON() << ") -> " << boundaries.end.toString() << "(" - << boundaries.end.toBSON() << ")"; + + OpTime appliedThrough = opTimeStatus.getValue(); + LOG(3) << "returning appliedThrough: " << appliedThrough.toString() << "(" + << appliedThrough.toBSON() << ")"; + + return appliedThrough; } StatusWith<std::unique_ptr<CollectionBulkLoader>> diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 0226c4705aa..fc3e67fcae5 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -48,6 +48,7 @@ public: static const char kDefaultMinValidNamespace[]; static const char kInitialSyncFlagFieldName[]; static const char kBeginFieldName[]; + static const char kOplogDeleteFromPointFieldName[]; StorageInterfaceImpl(); explicit StorageInterfaceImpl(const NamespaceString& minValidNss); @@ -67,13 +68,13 @@ public: void clearInitialSyncFlag(OperationContext* txn) override; - BatchBoundaries getMinValid(OperationContext* txn) const override; - - void setMinValid(OperationContext* ctx, - const OpTime& endOpTime, - const DurableRequirement durReq) override; - - void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries) override; + OpTime getMinValid(OperationContext* txn) const override; + void setMinValid(OperationContext* txn, const OpTime& minValid) override; + void setMinValidToAtLeast(OperationContext* txn, const OpTime& endOpTime) override; + void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) override; + Timestamp getOplogDeleteFromPoint(OperationContext* txn) override; + void setAppliedThrough(OperationContext* txn, const OpTime& optime) override; + OpTime getAppliedThrough(OperationContext* txn) override; /** * Allocates a new TaskRunner for use by the passed in collection. @@ -115,6 +116,10 @@ public: Status isAdminDbValid(OperationContext* txn) override; private: + // Returns empty document if not present. + BSONObj getMinValidDocument(OperationContext* txn) const; + void updateMinValidDocument(OperationContext* txn, const BSONObj& updateSpec); + const NamespaceString _minValidNss; }; diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 9a1f35682fa..1349331dcfc 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -274,9 +274,9 @@ TEST_F(StorageInterfaceImplTest, GetMinValidAfterSettingInitialSyncFlagWorks) { storageInterface.setInitialSyncFlag(txn.get()); ASSERT_TRUE(storageInterface.getInitialSyncFlag(txn.get())); - auto minValid = storageInterface.getMinValid(txn.get()); - ASSERT_TRUE(minValid.start.isNull()); - ASSERT_TRUE(minValid.end.isNull()); + ASSERT(storageInterface.getMinValid(txn.get()).isNull()); + ASSERT(storageInterface.getAppliedThrough(txn.get()).isNull()); + ASSERT(storageInterface.getOplogDeleteFromPoint(txn.get()).isNull()); } TEST_F(StorageInterfaceImplTest, MinValid) { @@ -285,18 +285,30 @@ TEST_F(StorageInterfaceImplTest, MinValid) { StorageInterfaceImpl storageInterface(nss); auto txn = getClient()->makeOperationContext(); - // MinValid boundaries should be {null optime, null optime} after initializing a new storage - // engine. - auto minValid = storageInterface.getMinValid(txn.get()); - ASSERT_TRUE(minValid.start.isNull()); - ASSERT_TRUE(minValid.end.isNull()); + // MinValid boundaries should all be null after initializing a new storage engine. + ASSERT(storageInterface.getMinValid(txn.get()).isNull()); + ASSERT(storageInterface.getAppliedThrough(txn.get()).isNull()); + ASSERT(storageInterface.getOplogDeleteFromPoint(txn.get()).isNull()); // Setting min valid boundaries should affect getMinValid() result. OpTime startOpTime({Seconds(123), 0}, 1LL); OpTime endOpTime({Seconds(456), 0}, 1LL); - storageInterface.setMinValid(txn.get(), {startOpTime, endOpTime}); - minValid = storageInterface.getMinValid(txn.get()); - ASSERT_EQUALS(BatchBoundaries(startOpTime, endOpTime), minValid); + storageInterface.setAppliedThrough(txn.get(), startOpTime); + storageInterface.setMinValid(txn.get(), endOpTime); + storageInterface.setOplogDeleteFromPoint(txn.get(), endOpTime.getTimestamp()); + + ASSERT_EQ(storageInterface.getAppliedThrough(txn.get()), startOpTime); + ASSERT_EQ(storageInterface.getMinValid(txn.get()), endOpTime); + ASSERT_EQ(storageInterface.getOplogDeleteFromPoint(txn.get()), endOpTime.getTimestamp()); + + + // setMinValid always changes minValid, but setMinValidToAtLeast only does if higher. + storageInterface.setMinValid(txn.get(), startOpTime); // Forcibly lower it. + ASSERT_EQ(storageInterface.getMinValid(txn.get()), startOpTime); + storageInterface.setMinValidToAtLeast(txn.get(), endOpTime); // Higher than current (sets it). + ASSERT_EQ(storageInterface.getMinValid(txn.get()), endOpTime); + storageInterface.setMinValidToAtLeast(txn.get(), startOpTime); // Lower than current (no-op). + ASSERT_EQ(storageInterface.getMinValid(txn.get()), endOpTime); // Check min valid document using storage engine interface. auto minValidDocument = getMinValidDocument(txn.get(), nss); @@ -306,6 +318,9 @@ TEST_F(StorageInterfaceImplTest, MinValid) { unittest::assertGet(OpTime::parseFromOplogEntry( minValidDocument[StorageInterfaceImpl::kBeginFieldName].Obj()))); ASSERT_EQUALS(endOpTime, unittest::assertGet(OpTime::parseFromOplogEntry(minValidDocument))); + ASSERT_EQUALS( + endOpTime.getTimestamp(), + minValidDocument[StorageInterfaceImpl::kOplogDeleteFromPointFieldName].timestamp()); // Recovery unit will be owned by "txn". RecoveryUnitWithDurabilityTracking* recoveryUnit = new RecoveryUnitWithDurabilityTracking(); @@ -313,19 +328,11 @@ TEST_F(StorageInterfaceImplTest, MinValid) { // Set min valid without waiting for the changes to be durable. OpTime endOpTime2({Seconds(789), 0}, 1LL); - storageInterface.setMinValid(txn.get(), endOpTime2, DurableRequirement::None); - minValid = storageInterface.getMinValid(txn.get()); - ASSERT_TRUE(minValid.start.isNull()); - ASSERT_EQUALS(endOpTime2, minValid.end); + storageInterface.setMinValid(txn.get(), endOpTime2); + storageInterface.setAppliedThrough(txn.get(), {}); + ASSERT_EQUALS(storageInterface.getAppliedThrough(txn.get()), OpTime()); + ASSERT_EQUALS(storageInterface.getMinValid(txn.get()), endOpTime2); ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled); - - // Set min valid and wait for the changes to be durable. - OpTime endOpTime3({Seconds(999), 0}, 1LL); - storageInterface.setMinValid(txn.get(), endOpTime3, DurableRequirement::Strong); - minValid = storageInterface.getMinValid(txn.get()); - ASSERT_TRUE(minValid.start.isNull()); - ASSERT_EQUALS(endOpTime3, minValid.end); - ASSERT_TRUE(recoveryUnit->waitUntilDurableCalled); } TEST_F(StorageInterfaceImplTest, SnapshotSupported) { diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index 96b40deba0d..7104824ebb0 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -55,21 +55,40 @@ void StorageInterfaceMock::clearInitialSyncFlag(OperationContext* txn) { _initialSyncFlag = false; } -BatchBoundaries StorageInterfaceMock::getMinValid(OperationContext* txn) const { +OpTime StorageInterfaceMock::getMinValid(OperationContext* txn) const { stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); - return _minValidBoundaries; + return _minValid; } -void StorageInterfaceMock::setMinValid(OperationContext* txn, - const OpTime& endOpTime, - const DurableRequirement durReq) { +void StorageInterfaceMock::setMinValid(OperationContext* txn, const OpTime& minValid) { stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); - _minValidBoundaries = {OpTime(), endOpTime}; + _minValid = minValid; } -void StorageInterfaceMock::setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) { +void StorageInterfaceMock::setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) { stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); - _minValidBoundaries = boundaries; + _minValid = std::max(_minValid, minValid); +} + +void StorageInterfaceMock::setOplogDeleteFromPoint(OperationContext* txn, + const Timestamp& timestamp) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + _oplogDeleteFromPoint = timestamp; +} + +Timestamp StorageInterfaceMock::getOplogDeleteFromPoint(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + return _oplogDeleteFromPoint; +} + +void StorageInterfaceMock::setAppliedThrough(OperationContext* txn, const OpTime& optime) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + _appliedThrough = optime; +} + +OpTime StorageInterfaceMock::getAppliedThrough(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + return _appliedThrough; } Status CollectionBulkLoaderMock::init(OperationContext* txn, diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 480cd2b7ecd..163dd76c396 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -125,11 +125,13 @@ public: void setInitialSyncFlag(OperationContext* txn) override; void clearInitialSyncFlag(OperationContext* txn) override; - BatchBoundaries getMinValid(OperationContext* txn) const override; - void setMinValid(OperationContext* txn, - const OpTime& endOpTime, - const DurableRequirement durReq) override; - void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) override; + OpTime getMinValid(OperationContext* txn) const override; + void setMinValid(OperationContext* txn, const OpTime& minValid) override; + void setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) override; + void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) override; + Timestamp getOplogDeleteFromPoint(OperationContext* txn) override; + void setAppliedThrough(OperationContext* txn, const OpTime& optime) override; + OpTime getAppliedThrough(OperationContext* txn) override; StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading( const NamespaceString& nss, @@ -239,7 +241,9 @@ private: bool _initialSyncFlag = false; mutable stdx::mutex _minValidBoundariesMutex; - BatchBoundaries _minValidBoundaries = {OpTime(), OpTime()}; + OpTime _appliedThrough; + OpTime _minValid; + Timestamp _oplogDeleteFromPoint; }; } // namespace repl diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index d0017600bef..9785eb94c6b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -174,10 +174,14 @@ public: protected: void _recordApplied(const OpTime& newOpTime) { + // We have to use setMyLastAppliedOpTimeForward since this thread races with + // ReplicationExternalStateImpl::onTransitionToPrimary. _replCoord->setMyLastAppliedOpTimeForward(newOpTime); } void _recordDurable(const OpTime& newOpTime) { + // We have to use setMyLastDurableOpTimeForward since this thread races with + // ReplicationExternalStateImpl::onTransitionToPrimary. _replCoord->setMyLastDurableOpTimeForward(newOpTime); } @@ -225,8 +229,6 @@ ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() { } void ApplyBatchFinalizerForJournal::record(const OpTime& newOpTime) { - // We have to use setMyLastAppliedOpTimeForward since this thread races with - // logTransitionToPrimaryToOplog. _recordApplied(newOpTime); stdx::unique_lock<stdx::mutex> lock(_mutex); @@ -256,8 +258,6 @@ void ApplyBatchFinalizerForJournal::_run() { auto txn = cc().makeOperationContext(); txn->recoveryUnit()->waitUntilDurable(); - // We have to use setMyLastDurableOpTimeForward since this thread races with - // logTransitionToPrimaryToOplog. _recordDurable(latestOpTime); } } @@ -635,10 +635,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops) } namespace { -void tryToGoLiveAsASecondary(OperationContext* txn, - ReplicationCoordinator* replCoord, - const BatchBoundaries& minValidBoundaries, - const OpTime& lastWriteOpTime) { +void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* replCoord) { if (replCoord->isInPrimaryOrSecondaryState()) { return; } @@ -659,19 +656,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn, return; } - // If an apply batch is active then we cannot transition. - if (!minValidBoundaries.start.isNull()) { - LOG(1) << "Can't go live (tryToGoLiveAsASecondary) as there is an active apply batch."; - return; - } - - // Must have applied/written to minvalid, so return if not. - // -- If 'lastWriteOpTime' is null/uninitialized then we can't transition. - // -- If 'lastWriteOpTime' is less than the end of the last batch then we can't transition. - if (lastWriteOpTime.isNull() || minValidBoundaries.end > lastWriteOpTime) { - log() << "Can't go live (tryToGoLiveAsASecondary) as last written optime (" - << lastWriteOpTime - << ") is null or greater than minvalid: " << minValidBoundaries.end; + // We can't go to SECONDARY until we reach minvalid. + if (replCoord->getMyLastAppliedOpTime() < StorageInterface::get(txn)->getMinValid(txn)) { return; } @@ -786,12 +772,8 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord, ? new ApplyBatchFinalizerForJournal(replCoord) : new ApplyBatchFinalizer(replCoord)}; - auto minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn); - OpTime originalEndOpTime(minValidBoundaries.end); - OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()}; while (!shouldShutdown()) { - - tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime); + tryToGoLiveAsASecondary(&txn, replCoord); // Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become // ready in time, we'll loop again so we can do the above checks periodically. @@ -799,77 +781,50 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord, if (ops.empty()) continue; // Try again. - const BSONObj lastOp = ops.back().raw; - - if (lastOp.isEmpty()) { + if (ops.front().raw.isEmpty()) { // This means that the network thread has coalesced and we have processed all of its // data. invariant(ops.getCount() == 1); if (replCoord->isWaitingForApplierToDrain()) { replCoord->signalDrainComplete(&txn); } - - // Reset some values when triggered in case it was from a rollback. - minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn); - lastWriteOpTime = replCoord->getMyLastAppliedOpTime(); - originalEndOpTime = minValidBoundaries.end; - continue; // This wasn't a real op. Don't try to apply it. } - const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)); - if (lastWriteOpTime >= lastOpTime) { - // Error for the oplog to go back in time. + // Extract some info from ops that we'll need after releasing the batch below. + const size_t opsInBatch = ops.getCount(); + const auto firstOpTimeInBatch = + fassertStatusOK(40299, OpTime::parseFromOplogEntry(ops.front().raw)); + const auto lastOpTimeInBatch = + fassertStatusOK(28773, OpTime::parseFromOplogEntry(ops.back().raw)); + + // Make sure the oplog doesn't go back in time or repeat an entry. + if (firstOpTimeInBatch <= replCoord->getMyLastAppliedOpTime()) { fassert(34361, Status(ErrorCodes::OplogOutOfOrder, str::stream() << "Attempted to apply an oplog entry (" - << lastOpTime.toString() - << ") which is not greater than our lastWrittenOptime (" - << lastWriteOpTime.toString() + << firstOpTimeInBatch.toString() + << ") which is not greater than our last applied OpTime (" + << replCoord->getMyLastAppliedOpTime().toString() << ").")); } - // Set minValid to the last OpTime that needs to be applied, in this batch or from the - // (last) failed batch, whichever is larger. - // This will cause this node to go into RECOVERING state - // if we should crash and restart before updating finishing. - const auto& start = lastWriteOpTime; - - // Take the max of the first endOptime (if we recovered) and the end of our batch. - - // Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch) - // ensures that we keep pushing out the point where we can become consistent - // and allow reads. If we recover and end up doing smaller batches we must pass the - // originalEndOpTime before we are good. - // - // For example: - // batch apply, 20-40, end = 40 - // batch failure, - // restart - // batch apply, 20-25, end = max(25, 40) = 40 - // batch apply, 25-45, end = 45 - const OpTime end(std::max(originalEndOpTime, lastOpTime)); - - // This write will not journal/checkpoint. - StorageInterface::get(&txn)->setMinValid(&txn, {start, end}); - - const size_t opsInBatch = ops.getCount(); - lastWriteOpTime = multiApply(&txn, ops.releaseBatch()); - if (lastWriteOpTime.isNull()) { + const bool fail = multiApply(&txn, ops.releaseBatch()).isNull(); + if (fail) { // fassert if oplog application failed for any reasons other than shutdown. - error() << "Failed to apply " << opsInBatch << " operations - batch start:" << start - << " end:" << end; + error() << "Failed to apply " << opsInBatch + << " operations - batch start:" << firstOpTimeInBatch + << " end:" << lastOpTimeInBatch; fassert(34360, inShutdownStrict()); // Return without setting minvalid in the case of shutdown. return; } - setNewTimestamp(lastWriteOpTime.getTimestamp()); - StorageInterface::get(&txn)->setMinValid(&txn, end, DurableRequirement::None); - minValidBoundaries.start = {}; - minValidBoundaries.end = end; - finalizer->record(lastWriteOpTime); + // Update various things that care about our last applied optime. + setNewTimestamp(lastOpTimeInBatch.getTimestamp()); + StorageInterface::get(&txn)->setAppliedThrough(&txn, lastOpTimeInBatch); + finalizer->record(lastOpTimeInBatch); } } @@ -1275,6 +1230,8 @@ StatusWith<OpTime> multiApply(OperationContext* txn, prefetchOps(ops, workerPool); } + auto storage = StorageInterface::get(txn); + LOG(2) << "replication batch size is " << ops.size(); // We must grab this because we're going to grab write locks later. // We hold this mutex the entire time we're writing; it doesn't matter @@ -1299,6 +1256,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn, std::vector<MultiApplier::OperationPtrs> writerVectors; ON_BLOCK_EXIT([&] { workerPool->join(); }); + storage->setOplogDeleteFromPoint(txn, ops.front().ts.timestamp()); const bool multiThreadedOplogWrites = scheduleWritesToOplog(txn, workerPool, ops); if (multiThreadedOplogWrites) { // Use all threads for oplog application. @@ -1309,6 +1267,21 @@ StatusWith<OpTime> multiApply(OperationContext* txn, } fillWriterVectors(txn, &ops, &writerVectors); + + workerPool->join(); + + // We must check this before altering the MinValid document because setting inShutdown may + // have caused the oplog writers to abort early. This code (and its duplicate below) will go + // away once SERVER-25071 is done and clean shutdown drains the apply queue. + if (inShutdownStrict()) { + log() << "Cannot apply operations due to shutdown in progress"; + return {ErrorCodes::InterruptedAtShutdown, + "Cannot apply operations due to shutdown in progress"}; + } + + storage->setOplogDeleteFromPoint(txn, Timestamp()); + storage->setMinValidToAtLeast(txn, ops.back().getOpTime()); + applyOps(&writerVectors, workerPool, applyOperation); } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index d4976be5116..c7f63999266 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -123,6 +123,10 @@ public: bool empty() const { return _batch.empty(); } + const OplogEntry& front() const { + invariant(!_batch.empty()); + return _batch.front(); + } const OplogEntry& back() const { invariant(!_batch.empty()); return _batch.back(); |