summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp27
-rw-r--r--src/mongo/db/repl/sync_tail.cpp18
-rw-r--r--src/mongo/db/service_entry_point_common.cpp4
8 files changed, 54 insertions, 18 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index c3155495e59..dda4a955219 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -176,6 +176,11 @@ public:
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0;
/**
+ * Gets the global opTime timestamp, i.e. the latest cluster time.
+ */
+ virtual Timestamp getGlobalTimestamp(ServiceContext* service) = 0;
+
+ /**
* Checks if the oplog exists.
*/
virtual bool oplogExists(OperationContext* opCtx) = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index ec3a987a8a1..4bc3b8c8409 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/free_mon/free_mon_mongod.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/kill_sessions_local.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/op_observer.h"
@@ -636,6 +637,10 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext*
setNewTimestamp(ctx, newTime);
}
+Timestamp ReplicationCoordinatorExternalStateImpl::getGlobalTimestamp(ServiceContext* service) {
+ return LogicalClock::get(service)->getClusterTime().asTimestamp();
+}
+
bool ReplicationCoordinatorExternalStateImpl::oplogExists(OperationContext* opCtx) {
AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS);
return oplog.getCollection() != nullptr;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 81aa94a27c0..f55d7df0d11 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -88,6 +88,7 @@ public:
virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx);
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
+ virtual Timestamp getGlobalTimestamp(ServiceContext* service);
bool oplogExists(OperationContext* opCtx) final;
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 15644138a16..5e6beb2a657 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -155,7 +155,15 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument(
}
void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service,
- const Timestamp& newTime) {}
+ const Timestamp& newTime) {
+ if (newTime > _globalTimestamp) {
+ _globalTimestamp = newTime;
+ }
+}
+
+Timestamp ReplicationCoordinatorExternalStateMock::getGlobalTimestamp(ServiceContext* service) {
+ return _globalTimestamp;
+}
bool ReplicationCoordinatorExternalStateMock::oplogExists(OperationContext* opCtx) {
return true;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index fdce3427a3f..1143c7e332d 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -78,6 +78,7 @@ public:
virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx);
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
+ virtual Timestamp getGlobalTimestamp(ServiceContext* service);
bool oplogExists(OperationContext* opCtx) override;
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
virtual void closeConnections();
@@ -197,6 +198,7 @@ private:
bool _areSnapshotsEnabled = true;
OpTime _firstOpTimeOfMyTerm;
double _electionTimeoutOffsetLimitFraction = 0.15;
+ Timestamp _globalTimestamp;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index b7949947cdd..077dfddc34a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -331,8 +331,8 @@ InitialSyncerOptions createInitialSyncerOptions(
options.getMyLastOptime = [replCoord]() { return replCoord->getMyLastAppliedOpTime(); };
options.setMyLastOptime = [replCoord, externalState](
const OpTime& opTime, ReplicationCoordinator::DataConsistency consistency) {
+ // Note that setting the last applied opTime forward also advances the global timestamp.
replCoord->setMyLastAppliedOpTimeForward(opTime, consistency);
- externalState->setGlobalTimestamp(replCoord->getServiceContext(), opTime.getTimestamp());
};
options.resetOptimes = [replCoord]() { replCoord->resetMyLastOpTimes(); };
options.syncSourceSelector = replCoord;
@@ -632,6 +632,10 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
(lastOpTime >= minValid) ? DataConsistency::Consistent : DataConsistency::Inconsistent;
}
+ // Update the global timestamp before setting the last applied opTime forward so the last
+ // applied optime is never greater than the latest cluster time in the logical clock.
+ _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp());
+
stdx::unique_lock<stdx::mutex> lock(_mutex);
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
@@ -647,7 +651,6 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
lock.unlock();
}
- _externalState->setGlobalTimestamp(getServiceContext(), lastOpTime.getTimestamp());
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
// Step down is impossible, so we don't need to wait for the returned event.
@@ -1075,6 +1078,10 @@ void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
void ReplicationCoordinatorImpl::setMyLastAppliedOpTimeForward(const OpTime& opTime,
DataConsistency consistency) {
+ // Update the global timestamp before setting the last applied opTime forward so the last
+ // applied optime is never greater than the latest cluster time in the logical clock.
+ _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp());
+
stdx::unique_lock<stdx::mutex> lock(_mutex);
auto myLastAppliedOpTime = _getMyLastAppliedOpTime_inlock();
if (opTime > myLastAppliedOpTime) {
@@ -1108,6 +1115,10 @@ void ReplicationCoordinatorImpl::setMyLastDurableOpTimeForward(const OpTime& opT
}
void ReplicationCoordinatorImpl::setMyLastAppliedOpTime(const OpTime& opTime) {
+ // Update the global timestamp before setting the last applied opTime forward so the last
+ // applied optime is never greater than the latest cluster time in the logical clock.
+ _externalState->setGlobalTimestamp(getServiceContext(), opTime.getTimestamp());
+
stdx::unique_lock<stdx::mutex> lock(_mutex);
// The optime passed to this function is required to represent a consistent database state.
_setMyLastAppliedOpTime(lock, opTime, false, DataConsistency::Consistent);
@@ -1155,6 +1166,11 @@ void ReplicationCoordinatorImpl::_setMyLastAppliedOpTime(WithLock lk,
const OpTime& opTime,
bool isRollbackAllowed,
DataConsistency consistency) {
+ // The last applied opTime should never advance beyond the global timestamp (i.e. the latest
+ // cluster time). Not enforced if the logical clock is disabled, e.g. for arbiters.
+ dassert(!LogicalClock::get(getServiceContext())->isEnabled() ||
+ _externalState->getGlobalTimestamp(getServiceContext()) >= opTime.getTimestamp());
+
_topCoord->setMyLastAppliedOpTime(opTime, _replExecutor->now(), isRollbackAllowed);
// If we are using applied times to calculate the commit level, update it now.
if (!_rsConfig.getWriteConcernMajorityShouldJournal()) {
@@ -3088,14 +3104,15 @@ void ReplicationCoordinatorImpl::resetLastOpTimesFromOplog(OperationContext* opC
lastOpTime = lastOpTimeStatus.getValue();
}
+ // Update the global timestamp before setting last applied opTime forward so the last applied
+ // optime is never greater than the latest in-memory cluster time.
+ _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp());
+
stdx::unique_lock<stdx::mutex> lock(_mutex);
bool isRollbackAllowed = true;
_setMyLastAppliedOpTime(lock, lastOpTime, isRollbackAllowed, consistency);
_setMyLastDurableOpTime(lock, lastOpTime, isRollbackAllowed);
_reportUpstream_inlock(std::move(lock));
- // Unlocked below.
-
- _externalState->setGlobalTimestamp(opCtx->getServiceContext(), lastOpTime.getTimestamp());
}
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 45468807acc..ef39d869278 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -904,16 +904,13 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer,
minValid = lastOpTimeInBatch;
}
- // Update various things that care about our last applied optime. Tests rely on 2 happening
- // before 3 even though it isn't strictly necessary. The order of 1 doesn't matter.
+ // Update various things that care about our last applied optime. Tests rely on 1 happening
+ // before 2 even though it isn't strictly necessary.
- // 1. Update the global timestamp.
- setNewTimestamp(opCtx.getServiceContext(), lastOpTimeInBatch.getTimestamp());
-
- // 2. Persist our "applied through" optime to disk.
+ // 1. Persist our "applied through" optime to disk.
_consistencyMarkers->setAppliedThrough(&opCtx, lastOpTimeInBatch);
- // 3. Ensure that the last applied op time hasn't changed since the start of this batch.
+ // 2. Ensure that the last applied op time hasn't changed since the start of this batch.
const auto lastAppliedOpTimeAtEndOfBatch = replCoord->getMyLastAppliedOpTime();
invariant(lastAppliedOpTimeAtStartOfBatch == lastAppliedOpTimeAtEndOfBatch,
str::stream() << "the last known applied OpTime has changed from "
@@ -922,13 +919,14 @@ void SyncTail::_oplogApplication(OplogBuffer* oplogBuffer,
<< lastAppliedOpTimeAtEndOfBatch.toString()
<< " in the middle of batch application");
- // 4. Update oplog visibility by notifying the storage engine of the new oplog entries.
+ // 3. Update oplog visibility by notifying the storage engine of the new oplog entries.
const bool orderedCommit = true;
_storageInterface->oplogDiskLocRegister(
&opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit);
- // 5. Finalize this batch. We are at a consistent optime if our current optime is >= the
- // current 'minValid' optime.
+ // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the
+ // current 'minValid' optime. Note that recording the lastOpTime in the finalizer includes
+ // advancing the global timestamp to at least its timestamp.
auto consistency = (lastOpTimeInBatch >= minValid)
? ReplicationCoordinator::DataConsistency::Consistent
: ReplicationCoordinator::DataConsistency::Inconsistent;
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 5f90209074b..95b3961bfa5 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -327,7 +327,7 @@ void appendClusterAndOperationTime(OperationContext* opCtx,
auto signedTime = SignedLogicalTime(
LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0);
- // TODO SERVER-35663: invariant that signedTime.getTime() >= operationTime.
+ dassert(signedTime.getTime() >= operationTime);
rpc::LogicalTimeMetadata(signedTime).writeToMetadata(metadataBob);
operationTime.appendAsOperationTime(commandBodyFieldsBob);
@@ -349,7 +349,7 @@ void appendClusterAndOperationTime(OperationContext* opCtx,
return;
}
- // TODO SERVER-35663: invariant that signedTime.getTime() >= operationTime.
+ dassert(signedTime.getTime() >= operationTime);
rpc::LogicalTimeMetadata(signedTime).writeToMetadata(metadataBob);
operationTime.appendAsOperationTime(commandBodyFieldsBob);
}