diff options
Diffstat (limited to 'src/mongo')
8 files changed, 54 insertions, 18 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index c3155495e59..dda4a955219 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -176,6 +176,11 @@ public: virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0; /** + * Gets the global opTime timestamp, i.e. the latest cluster time. + */ + virtual Timestamp getGlobalTimestamp(ServiceContext* service) = 0; + + /** * Checks if the oplog exists. */ virtual bool oplogExists(OperationContext* opCtx) = 0; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index ec3a987a8a1..4bc3b8c8409 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -53,6 +53,7 @@ #include "mongo/db/free_mon/free_mon_mongod.h" #include "mongo/db/jsobj.h" #include "mongo/db/kill_sessions_local.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/op_observer.h" @@ -636,6 +637,10 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext* setNewTimestamp(ctx, newTime); } +Timestamp ReplicationCoordinatorExternalStateImpl::getGlobalTimestamp(ServiceContext* service) { + return LogicalClock::get(service)->getClusterTime().asTimestamp(); +} + bool ReplicationCoordinatorExternalStateImpl::oplogExists(OperationContext* opCtx) { AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS); return oplog.getCollection() != nullptr; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 81aa94a27c0..f55d7df0d11 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -88,6 +88,7 @@ public: virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx); virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); + virtual Timestamp getGlobalTimestamp(ServiceContext* service); bool oplogExists(OperationContext* opCtx) final; virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 15644138a16..5e6beb2a657 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -155,7 +155,15 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument( } void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service, - const Timestamp& newTime) {} + const Timestamp& newTime) { + if (newTime > _globalTimestamp) { + _globalTimestamp = newTime; + } +} + +Timestamp ReplicationCoordinatorExternalStateMock::getGlobalTimestamp(ServiceContext* service) { + return _globalTimestamp; +} bool ReplicationCoordinatorExternalStateMock::oplogExists(OperationContext* opCtx) { return true; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index fdce3427a3f..1143c7e332d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -78,6 +78,7 @@ public: virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx); virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); + virtual Timestamp getGlobalTimestamp(ServiceContext* service); bool oplogExists(OperationContext* opCtx) override; virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); virtual void closeConnections(); @@ -197,6 +198,7 @@ private: bool _areSnapshotsEnabled = true; OpTime _firstOpTimeOfMyTerm; double _electionTimeoutOffsetLimitFraction = 0.15; + Timestamp _globalTimestamp; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index b7949947cdd..077dfddc34a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -331,8 +331,8 @@ InitialSyncerOptions createInitialSyncerOptions( options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); }; options.setMyLastOptime = [replCoord, externalState]( const OpTime& opTime, ReplicationCoordinator::DataConsistency consistency) { + // Note that setting the last applied opTime forward also advances the global timestamp. replCoord->setMyLastAppliedOpTimeForward(opTime, consistency); - externalState->setGlobalTimestamp(replCoord->getServiceContext(), opTime.getTimestamp()); }; options.resetOptimes = [replCoord]() { replCoord->resetMyLastOpTimes(); }; options.syncSourceSelector = replCoord; @@ -632,6 +632,10 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( (lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent; } + // Update the global timestamp before setting the last applied opTime forward so the last + // applied optime is never greater than the latest cluster time in the logical clock. + _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp()); + stdx::unique_lock<stdx::mutex> lock(_mutex); invariant(_rsConfigState == kConfigStartingUp); const PostMemberStateUpdateAction action = @@ -647,7 +651,6 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( lock.unlock(); } - _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp()); { stdx::lock_guard<stdx::mutex> lk(_mutex); // Step down is impossible, so we don't need to wait for the returned event. @@ -1075,6 +1078,10 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) { void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime, DataConsistency consistency) { + // Update the global timestamp before setting the last applied opTime forward so the last + // applied optime is never greater than the latest cluster time in the logical clock. + _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp()); + stdx::unique_lock<stdx::mutex> lock(_mutex); auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock(); if (opTime > myLastAppliedOpTime) { @@ -1108,6 +1115,10 @@ void ReplicationCoordinatorImpl::setMyLastDurableOpTimeForward(const OpTime& opT } void ReplicationCoordinatorImpl::setMyLastAppliedOpTime(const OpTime& opTime) { + // Update the global timestamp before setting the last applied opTime forward so the last + // applied optime is never greater than the latest cluster time in the logical clock. + _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp()); + stdx::unique_lock<stdx::mutex> lock(_mutex); // The optime passed to this function is required to represent a consistent database state. _setMyLastAppliedOpTime(lock, opTime, false, DataConsistency::Consistent); @@ -1155,6 +1166,11 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime(WithLock lk, const OpTime& opTime, bool isRollbackAllowed, DataConsistency consistency) { + // The last applied opTime should never advance beyond the global timestamp (i.e. the latest + // cluster time). Not enforced if the logical clock is disabled, e.g. for arbiters. + dassert(!LogicalClock::get(getServiceContext())->isEnabled() || + _externalState->getGlobalTimestamp(getServiceContext()) >= opTime.getTimestamp()); + _topCoord->setMyLastAppliedOpTime(opTime, _replExecutor->now(), isRollbackAllowed); // If we are using applied times to calculate the commit level, update it now. if (!_rsConfig.getWriteConcernMajorityShouldJournal()) { @@ -3088,14 +3104,15 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC lastOpTime = lastOpTimeStatus.getValue(); } + // Update the global timestamp before setting last applied opTime forward so the last applied + // optime is never greater than the latest in-memory cluster time. + _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp()); + stdx::unique_lock<stdx::mutex> lock(_mutex); bool isRollbackAllowed = true; _setMyLastAppliedOpTime(lock, lastOpTime, isRollbackAllowed, consistency); _setMyLastDurableOpTime(lock, lastOpTime, isRollbackAllowed); _reportUpstream_inlock(std::move(lock)); - // Unlocked below. - - _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp()); } bool ReplicationCoordinatorImpl::shouldChangeSyncSource( diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 45468807acc..ef39d869278 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -904,16 +904,13 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer, minValid = lastOpTimeInBatch; } - // Update various things that care about our last applied optime. Tests rely on 2 happening - // before 3 even though it isn't strictly necessary. The order of 1 doesn't matter. + // Update various things that care about our last applied optime. Tests rely on 1 happening + // before 2 even though it isn't strictly necessary. - // 1. Update the global timestamp. - setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp()); - - // 2. Persist our "applied through" optime to disk. + // 1. Persist our "applied through" optime to disk. _consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch); - // 3. Ensure that the last applied op time hasn't changed since the start of this batch. + // 2. Ensure that the last applied op time hasn't changed since the start of this batch. const auto lastAppliedOpTimeAtEndOfBatch = replCoord->getMyLastAppliedOpTime(); invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch, str::stream() << "the last known applied OpTime has changed from " @@ -922,13 +919,14 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer, << lastAppliedOpTimeAtEndOfBatch.toString() << " in the middle of batch application"); - // 4. Update oplog visibility by notifying the storage engine of the new oplog entries. + // 3. Update oplog visibility by notifying the storage engine of the new oplog entries. const bool orderedCommit = true; _storageInterface->oplogDiskLocRegister( &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit); - // 5. Finalize this batch. We are at a consistent optime if our current optime is >= the - // current 'minValid' optime. + // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the + // current 'minValid' optime. Note that recording the lastOpTime in the finalizer includes + // advancing the global timestamp to at least its timestamp. auto consistency = (lastOpTimeInBatch >= minValid) ? ReplicationCoordinator::DataConsistency::Consistent : ReplicationCoordinator::DataConsistency::Inconsistent; diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 5f90209074b..95b3961bfa5 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -327,7 +327,7 @@ void appendClusterAndOperationTime(OperationContext* opCtx, auto signedTime = SignedLogicalTime( LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); - // TODO SERVER-35663: invariant that signedTime.getTime() >= operationTime. + dassert(signedTime.getTime() >= operationTime); rpc::LogicalTimeMetadata(signedTime).writeToMetadata(metadataBob); operationTime.appendAsOperationTime(commandBodyFieldsBob); @@ -349,7 +349,7 @@ void appendClusterAndOperationTime(OperationContext* opCtx, return; } - // TODO SERVER-35663: invariant that signedTime.getTime() >= operationTime. + dassert(signedTime.getTime() >= operationTime); rpc::LogicalTimeMetadata(signedTime).writeToMetadata(metadataBob); operationTime.appendAsOperationTime(commandBodyFieldsBob); } |