diff options
Diffstat (limited to 'src/mongo/db/repl/replication_consistency_markers_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_consistency_markers_impl.cpp | 77 |
1 files changed, 74 insertions, 3 deletions
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index a49da8acdaa..0a61559fca1 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -44,12 +44,15 @@ namespace repl { constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace; constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace; +constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultCheckpointTimestampNamespace; constexpr StringData ReplicationConsistencyMarkersImpl::kOldOplogDeleteFromPointFieldName; namespace { const BSONObj kInitialSyncFlag(BSON(MinValidDocument::kInitialSyncFlagFieldName << true)); const BSONObj kOplogTruncateAfterPointId(BSON("_id" << "oplogTruncateAfterPoint")); +const BSONObj kCheckpointTimestampId(BSON("_id" + << "checkpointTimestamp")); } // namespace ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( @@ -58,15 +61,19 @@ ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( storageInterface, NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace), NamespaceString( - ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace)) {} + ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace), + NamespaceString( + ReplicationConsistencyMarkersImpl::kDefaultCheckpointTimestampNamespace)) {} ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl( StorageInterface* storageInterface, NamespaceString minValidNss, - NamespaceString oplogTruncateAfterPointNss) + NamespaceString oplogTruncateAfterPointNss, + NamespaceString checkpointTimestampNss) : _storageInterface(storageInterface), _minValidNss(minValidNss), - _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss) {} + _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss), + _checkpointTimestampNss(checkpointTimestampNss) {} boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinValidDocument( OperationContext* opCtx) const { @@ -280,5 +287,69 @@ Timestamp ReplicationConsistencyMarkersImpl::getOplogTruncateAfterPoint( return out; } +void ReplicationConsistencyMarkersImpl::_upsertCheckpointTimestampDocument( + OperationContext* opCtx, const BSONObj& updateSpec) { + auto status = _storageInterface->upsertById( + opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"], updateSpec); + + // If the collection doesn't exist, creates it and tries again. + if (status == ErrorCodes::NamespaceNotFound) { + status = _storageInterface->createCollection( + opCtx, _checkpointTimestampNss, CollectionOptions()); + fassertStatusOK(40581, status); + + status = _storageInterface->upsertById( + opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"], updateSpec); + } + + fassertStatusOK(40582, status); +} + +void ReplicationConsistencyMarkersImpl::writeCheckpointTimestamp(OperationContext* opCtx, + const Timestamp& timestamp) { + LOG(3) << "setting checkpoint timestamp to: " << timestamp.toBSON(); + + auto timestampField = CheckpointTimestampDocument::kCheckpointTimestampFieldName; + auto spec = BSON("$set" << BSON(timestampField << timestamp)); + + // TODO: When SERVER-28602 is completed, utilize RecoveryUnit::setTimestamp so that this + // write operation itself is committed with a timestamp that is included in the checkpoint. + _upsertCheckpointTimestampDocument(opCtx, spec); +} + +boost::optional<CheckpointTimestampDocument> +ReplicationConsistencyMarkersImpl::_getCheckpointTimestampDocument(OperationContext* opCtx) const { + auto doc = + _storageInterface->findById(opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"]); + + if (!doc.isOK()) { + if (doc.getStatus() == ErrorCodes::NoSuchKey || + doc.getStatus() == ErrorCodes::NamespaceNotFound) { + return boost::none; + } else { + // Fails if there is an error other than the collection being missing or being empty. + fassertFailedWithStatus(40583, doc.getStatus()); + } + } + + auto checkpointTimestampDoc = CheckpointTimestampDocument::parse( + IDLParserErrorContext("CheckpointTimestampDocument"), doc.getValue()); + return checkpointTimestampDoc; +} + +Timestamp ReplicationConsistencyMarkersImpl::getCheckpointTimestamp(OperationContext* opCtx) { + auto doc = _getCheckpointTimestampDocument(opCtx); + if (!doc) { + LOG(3) << "Returning empty checkpoint timestamp since document did not exist"; + return {}; + } + + Timestamp out = doc->getCheckpointTimestamp(); + + LOG(3) << "returning checkpoint timestamp: " << out; + return out; +} + + } // namespace repl } // namespace mongo |