diff options
Diffstat (limited to 'src/mongo/db')
7 files changed, 174 insertions, 9 deletions
diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h index aadc2491b62..ddd46cbff6b 100644 --- a/src/mongo/db/repl/replication_consistency_markers.h +++ b/src/mongo/db/repl/replication_consistency_markers.h @@ -174,6 +174,16 @@ public: * always safe. */ virtual OpTime getAppliedThrough(OperationContext* opCtx) const = 0; + + // -------- Checkpoint Timestamp ---------- + + /** + * The checkpoint timestamp is the latest timestamp that the database can recover to. It is the + * job of a storage engine to call this function with the timestamp of the checkpoint it is + * about to take. + */ + virtual void writeCheckpointTimestamp(OperationContext* opCtx, const Timestamp& timestamp) = 0; + virtual Timestamp getCheckpointTimestamp(OperationContext* opCtx) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_consistency_markers.idl b/src/mongo/db/repl/replication_consistency_markers.idl index d7a7ba6be00..4925845178d 100644 --- a/src/mongo/db/repl/replication_consistency_markers.idl +++ b/src/mongo/db/repl/replication_consistency_markers.idl @@ -68,3 +68,14 @@ structs: _id: type: string description: "Always set to 'oplogTruncateAfterPoint' to easily retrieve it." + CheckpointTimestampDocument: + description: A document that stores the latest timestamp the database can recover to. + fields: + checkpointTimestamp: + type: timestamp + description: "The checkpoint timestamp. Should be set by a storage engine + before a checkpoint is taken." + _id: + type: string + description: "Always set to 'checkpointTimestamp' to easily retrieve it." + diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index a49da8acdaa..0a61559fca1 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -44,12 +44,15 @@ namespace repl { constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace; constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace; +constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultCheckpointTimestampNamespace; constexpr StringData ReplicationConsistencyMarkersImpl::kOldOplogDeleteFromPointFieldName; namespace { const BSONObj kInitialSyncFlag(BSON(MinValidDocument::kInitialSyncFlagFieldName << true)); const BSONObj kOplogTruncateAfterPointId(BSON("_id" << "oplogTruncateAfterPoint")); +const BSONObj kCheckpointTimestampId(BSON("_id" + << "checkpointTimestamp")); } // namespace ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( @@ -58,15 +61,19 @@ ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( storageInterface, NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace), NamespaceString( - ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace)) {} + ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace), + NamespaceString( + ReplicationConsistencyMarkersImpl::kDefaultCheckpointTimestampNamespace)) {} ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( StorageInterface* storageInterface, NamespaceString minValidNss, - NamespaceString oplogTruncateAfterPointNss) + NamespaceString oplogTruncateAfterPointNss, + NamespaceString checkpointTimestampNss) : _storageInterface(storageInterface), _minValidNss(minValidNss), - _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss) {} + _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss), + _checkpointTimestampNss(checkpointTimestampNss) {} boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinValidDocument( OperationContext* opCtx) const { @@ -280,5 +287,69 @@ Timestamp ReplicationConsistencyMarkersImpl::getOplogTruncateAfterPoint( return out; } +void ReplicationConsistencyMarkersImpl::_upsertCheckpointTimestampDocument( + OperationContext* opCtx, const BSONObj& updateSpec) { + auto status = _storageInterface->upsertById( + opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"], updateSpec); + + // If the collection doesn't exist, creates it and tries again. + if (status == ErrorCodes::NamespaceNotFound) { + status = _storageInterface->createCollection( + opCtx, _checkpointTimestampNss, CollectionOptions()); + fassertStatusOK(40581, status); + + status = _storageInterface->upsertById( + opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"], updateSpec); + } + + fassertStatusOK(40582, status); +} + +void ReplicationConsistencyMarkersImpl::writeCheckpointTimestamp(OperationContext* opCtx, + const Timestamp& timestamp) { + LOG(3) << "setting checkpoint timestamp to: " << timestamp.toBSON(); + + auto timestampField = CheckpointTimestampDocument::kCheckpointTimestampFieldName; + auto spec = BSON("$set" << BSON(timestampField << timestamp)); + + // TODO: When SERVER-28602 is completed, utilize RecoveryUnit::setTimestamp so that this + // write operation itself is committed with a timestamp that is included in the checkpoint. + _upsertCheckpointTimestampDocument(opCtx, spec); +} + +boost::optional<CheckpointTimestampDocument> +ReplicationConsistencyMarkersImpl::_getCheckpointTimestampDocument(OperationContext* opCtx) const { + auto doc = + _storageInterface->findById(opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"]); + + if (!doc.isOK()) { + if (doc.getStatus() == ErrorCodes::NoSuchKey || + doc.getStatus() == ErrorCodes::NamespaceNotFound) { + return boost::none; + } else { + // Fails if there is an error other than the collection being missing or being empty. + fassertFailedWithStatus(40583, doc.getStatus()); + } + } + + auto checkpointTimestampDoc = CheckpointTimestampDocument::parse( + IDLParserErrorContext("CheckpointTimestampDocument"), doc.getValue()); + return checkpointTimestampDoc; +} + +Timestamp ReplicationConsistencyMarkersImpl::getCheckpointTimestamp(OperationContext* opCtx) { + auto doc = _getCheckpointTimestampDocument(opCtx); + if (!doc) { + LOG(3) << "Returning empty checkpoint timestamp since document did not exist"; + return {}; + } + + Timestamp out = doc->getCheckpointTimestamp(); + + LOG(3) << "returning checkpoint timestamp: " << out; + return out; +} + + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h index 9de5a545a29..d6f298f0ba1 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.h +++ b/src/mongo/db/repl/replication_consistency_markers_impl.h @@ -51,13 +51,16 @@ public: static constexpr StringData kDefaultMinValidNamespace = "local.replset.minvalid"_sd; static constexpr StringData kDefaultOplogTruncateAfterPointNamespace = "local.replset.oplogTruncateAfterPoint"_sd; + static constexpr StringData kDefaultCheckpointTimestampNamespace = + "local.replset.checkpointTimestamp"_sd; // TODO: Remove this constant and its usage in minValid initialization in 3.8. static constexpr StringData kOldOplogDeleteFromPointFieldName = "oplogDeleteFromPoint"_sd; explicit ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface); ReplicationConsistencyMarkersImpl(StorageInterface* storageInterface, NamespaceString minValidNss, - NamespaceString oplogTruncateAfterNss); + NamespaceString oplogTruncateAfterNss, + NamespaceString checkpointTimestampNss); void initializeMinValidDocument(OperationContext* opCtx) override; @@ -75,6 +78,9 @@ public: void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; + void writeCheckpointTimestamp(OperationContext* opCtx, const Timestamp& timestamp); + Timestamp getCheckpointTimestamp(OperationContext* opCtx); + private: /** * Reads the MinValid document from disk. @@ -98,6 +104,13 @@ private: OperationContext* opCtx) const; /** + * Reads the CheckpointTimestamp document from disk. + * Returns boost::none if not present. + */ + boost::optional<CheckpointTimestampDocument> _getCheckpointTimestampDocument( + OperationContext* opCtx) const; + + /** * Upserts the OplogTruncateAfterPoint document according to the provided update spec. * If the collection does not exist, it is created. If the document does not exist, * it is upserted. @@ -106,9 +119,20 @@ private: */ void _upsertOplogTruncateAfterPointDocument(OperationContext* opCtx, const BSONObj& updateSpec); + /** + * Upserts the CheckpointTimestamp document according to the provided update spec. + * If the collection does not exist, it is created. If the document does not exist, + * it is upserted. + * + * This fasserts on failure. + */ + void _upsertCheckpointTimestampDocument(OperationContext* opCtx, const BSONObj& updateSpec); + + StorageInterface* _storageInterface; const NamespaceString _minValidNss; const NamespaceString _oplogTruncateAfterPointNss; + const NamespaceString _checkpointTimestampNss; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp index 1306811fce4..8a0ba53e6ce 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp @@ -97,6 +97,26 @@ BSONObj getOplogTruncateAfterPointDocument(OperationContext* opCtx, }); } +/** + * Returns checkpoint timestamp document. + */ +BSONObj getCheckpointTimestampDocument(OperationContext* opCtx, + const NamespaceString& checkpointTimestampNss) { + return writeConflictRetry( + opCtx, + "getCheckpointTimestampDocument", + checkpointTimestampNss.ns(), + [opCtx, checkpointTimestampNss] { + Lock::DBLock dblk(opCtx, checkpointTimestampNss.db(), MODE_IS); + Lock::CollectionLock lk(opCtx->lockState(), checkpointTimestampNss.ns(), MODE_IS); + BSONObj mv; + if (Helpers::getSingleton(opCtx, checkpointTimestampNss.ns().c_str(), mv)) { + return mv; + } + return mv; + }); +} + class ReplicationConsistencyMarkersTest : public ServiceContextMongoDTest { protected: OperationContext* getOperationContext() { @@ -147,9 +167,10 @@ bool RecoveryUnitWithDurabilityTracking::waitUntilDurable() { TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) { auto minValidNss = makeNamespace(_agent, "minValid"); auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint"); + auto checkpointTimestampNss = makeNamespace(_agent, "checkpointTimestamp"); ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), minValidNss, oplogTruncateAfterPointNss); + getStorageInterface(), minValidNss, oplogTruncateAfterPointNss, checkpointTimestampNss); auto opCtx = getOperationContext(); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -173,9 +194,10 @@ TEST_F(ReplicationConsistencyMarkersTest, InitialSyncFlag) { TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlagWorks) { auto minValidNss = makeNamespace(_agent, "minValid"); auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint"); + auto checkpointTimestampNss = makeNamespace(_agent, "checkpointTimestamp"); ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), minValidNss, oplogTruncateAfterPointNss); + getStorageInterface(), minValidNss, oplogTruncateAfterPointNss, checkpointTimestampNss); auto opCtx = getOperationContext(); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -194,9 +216,10 @@ TEST_F(ReplicationConsistencyMarkersTest, GetMinValidAfterSettingInitialSyncFlag TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { auto minValidNss = makeNamespace(_agent, "minValid"); auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint"); + auto checkpointTimestampNss = makeNamespace(_agent, "checkpointTimestamp"); ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), minValidNss, oplogTruncateAfterPointNss); + getStorageInterface(), minValidNss, oplogTruncateAfterPointNss, checkpointTimestampNss); auto opCtx = getOperationContext(); consistencyMarkers.initializeMinValidDocument(opCtx); @@ -204,6 +227,7 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { ASSERT(consistencyMarkers.getMinValid(opCtx).isNull()); ASSERT(consistencyMarkers.getAppliedThrough(opCtx).isNull()); ASSERT(consistencyMarkers.getOplogTruncateAfterPoint(opCtx).isNull()); + ASSERT(consistencyMarkers.getCheckpointTimestamp(opCtx).isNull()); // Setting min valid boundaries should affect getMinValid() result. OpTime startOpTime({Seconds(123), 0}, 1LL); @@ -211,10 +235,12 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { consistencyMarkers.setAppliedThrough(opCtx, startOpTime); consistencyMarkers.setMinValid(opCtx, endOpTime); consistencyMarkers.setOplogTruncateAfterPoint(opCtx, endOpTime.getTimestamp()); + consistencyMarkers.writeCheckpointTimestamp(opCtx, endOpTime.getTimestamp()); ASSERT_EQ(consistencyMarkers.getAppliedThrough(opCtx), startOpTime); ASSERT_EQ(consistencyMarkers.getMinValid(opCtx), endOpTime); ASSERT_EQ(consistencyMarkers.getOplogTruncateAfterPoint(opCtx), endOpTime.getTimestamp()); + ASSERT_EQ(consistencyMarkers.getCheckpointTimestamp(opCtx), endOpTime.getTimestamp()); // setMinValid always changes minValid, but setMinValidToAtLeast only does if higher. consistencyMarkers.setMinValid(opCtx, startOpTime); // Forcibly lower it. @@ -241,6 +267,14 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { [OplogTruncateAfterPointDocument::kOplogTruncateAfterPointFieldName] .timestamp()); + // Check checkpoint timestamp document. + auto checkpointTimestampDocument = + getCheckpointTimestampDocument(opCtx, checkpointTimestampNss); + ASSERT_EQUALS( + endOpTime.getTimestamp(), + checkpointTimestampDocument[CheckpointTimestampDocument::kCheckpointTimestampFieldName] + .timestamp()); + // Recovery unit will be owned by "opCtx". RecoveryUnitWithDurabilityTracking* recoveryUnit = new RecoveryUnitWithDurabilityTracking(); opCtx->setRecoveryUnit(recoveryUnit, OperationContext::kNotInUnitOfWork); @@ -257,9 +291,10 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { TEST_F(ReplicationConsistencyMarkersTest, OplogTruncateAfterPointUpgrade) { auto minValidNss = makeNamespace(_agent, "minValid"); auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint"); + auto checkpointTimestampNss = makeNamespace(_agent, "checkpointTimestamp"); ReplicationConsistencyMarkersImpl consistencyMarkers( - getStorageInterface(), minValidNss, oplogTruncateAfterPointNss); + getStorageInterface(), minValidNss, oplogTruncateAfterPointNss, checkpointTimestampNss); auto opCtx = getOperationContext(); Timestamp time1(Seconds(123), 0); Timestamp time2(Seconds(456), 0); @@ -284,5 +319,4 @@ TEST_F(ReplicationConsistencyMarkersTest, OplogTruncateAfterPointUpgrade) { consistencyMarkers.setOplogTruncateAfterPoint(opCtx, time2); ASSERT_EQ(consistencyMarkers.getOplogTruncateAfterPoint(opCtx), time2); } - } // namespace diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp index b2df7bc5553..ec533e1558f 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp @@ -102,5 +102,16 @@ OpTime ReplicationConsistencyMarkersMock::getAppliedThrough(OperationContext* op return _appliedThrough; } +void ReplicationConsistencyMarkersMock::writeCheckpointTimestamp(OperationContext* opCtx, + const Timestamp& timestamp) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + _checkpointTimestamp = timestamp; +} + +Timestamp ReplicationConsistencyMarkersMock::getCheckpointTimestamp(OperationContext* opCtx) { + stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex); + return _checkpointTimestamp; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h index 75d820d11bc..cf3745a529e 100644 --- a/src/mongo/db/repl/replication_consistency_markers_mock.h +++ b/src/mongo/db/repl/replication_consistency_markers_mock.h @@ -66,6 +66,9 @@ public: void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override; OpTime getAppliedThrough(OperationContext* opCtx) const override; + void writeCheckpointTimestamp(OperationContext* opCtx, const Timestamp& timestamp) override; + Timestamp getCheckpointTimestamp(OperationContext* opCtx) override; + private: mutable stdx::mutex _initialSyncFlagMutex; bool _initialSyncFlag = false; @@ -74,6 +77,7 @@ private: OpTime _appliedThrough; OpTime _minValid; Timestamp _oplogTruncateAfterPoint; + Timestamp _checkpointTimestamp; }; } // namespace repl |