summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_consistency_markers_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_consistency_markers_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp77
1 files changed, 74 insertions, 3 deletions
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index a49da8acdaa..0a61559fca1 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -44,12 +44,15 @@ namespace repl {
constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace;
constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace;
+constexpr StringData ReplicationConsistencyMarkersImpl::kDefaultCheckpointTimestampNamespace;
constexpr StringData ReplicationConsistencyMarkersImpl::kOldOplogDeleteFromPointFieldName;
namespace {
const BSONObj kInitialSyncFlag(BSON(MinValidDocument::kInitialSyncFlagFieldName << true));
const BSONObj kOplogTruncateAfterPointId(BSON("_id"
<< "oplogTruncateAfterPoint"));
+const BSONObj kCheckpointTimestampId(BSON("_id"
+ << "checkpointTimestamp"));
} // namespace
ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl(
@@ -58,15 +61,19 @@ ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl(
storageInterface,
NamespaceString(ReplicationConsistencyMarkersImpl::kDefaultMinValidNamespace),
NamespaceString(
- ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace)) {}
+ ReplicationConsistencyMarkersImpl::kDefaultOplogTruncateAfterPointNamespace),
+ NamespaceString(
+ ReplicationConsistencyMarkersImpl::kDefaultCheckpointTimestampNamespace)) {}
ReplicationConsistencyMarkersImpl::ReplicationConsistencyMarkersImpl(
StorageInterface* storageInterface,
NamespaceString minValidNss,
- NamespaceString oplogTruncateAfterPointNss)
+ NamespaceString oplogTruncateAfterPointNss,
+ NamespaceString checkpointTimestampNss)
: _storageInterface(storageInterface),
_minValidNss(minValidNss),
- _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss) {}
+ _oplogTruncateAfterPointNss(oplogTruncateAfterPointNss),
+ _checkpointTimestampNss(checkpointTimestampNss) {}
boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinValidDocument(
OperationContext* opCtx) const {
@@ -280,5 +287,69 @@ Timestamp ReplicationConsistencyMarkersImpl::getOplogTruncateAfterPoint(
return out;
}
+void ReplicationConsistencyMarkersImpl::_upsertCheckpointTimestampDocument(
+ OperationContext* opCtx, const BSONObj& updateSpec) {
+ auto status = _storageInterface->upsertById(
+ opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"], updateSpec);
+
+ // If the collection doesn't exist, creates it and tries again.
+ if (status == ErrorCodes::NamespaceNotFound) {
+ status = _storageInterface->createCollection(
+ opCtx, _checkpointTimestampNss, CollectionOptions());
+ fassertStatusOK(40581, status);
+
+ status = _storageInterface->upsertById(
+ opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"], updateSpec);
+ }
+
+ fassertStatusOK(40582, status);
+}
+
+void ReplicationConsistencyMarkersImpl::writeCheckpointTimestamp(OperationContext* opCtx,
+ const Timestamp& timestamp) {
+ LOG(3) << "setting checkpoint timestamp to: " << timestamp.toBSON();
+
+ auto timestampField = CheckpointTimestampDocument::kCheckpointTimestampFieldName;
+ auto spec = BSON("$set" << BSON(timestampField << timestamp));
+
+ // TODO: When SERVER-28602 is completed, utilize RecoveryUnit::setTimestamp so that this
+ // write operation itself is committed with a timestamp that is included in the checkpoint.
+ _upsertCheckpointTimestampDocument(opCtx, spec);
+}
+
+boost::optional<CheckpointTimestampDocument>
+ReplicationConsistencyMarkersImpl::_getCheckpointTimestampDocument(OperationContext* opCtx) const {
+ auto doc =
+ _storageInterface->findById(opCtx, _checkpointTimestampNss, kCheckpointTimestampId["_id"]);
+
+ if (!doc.isOK()) {
+ if (doc.getStatus() == ErrorCodes::NoSuchKey ||
+ doc.getStatus() == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
+ } else {
+ // Fails if there is an error other than the collection being missing or being empty.
+ fassertFailedWithStatus(40583, doc.getStatus());
+ }
+ }
+
+ auto checkpointTimestampDoc = CheckpointTimestampDocument::parse(
+ IDLParserErrorContext("CheckpointTimestampDocument"), doc.getValue());
+ return checkpointTimestampDoc;
+}
+
+Timestamp ReplicationConsistencyMarkersImpl::getCheckpointTimestamp(OperationContext* opCtx) {
+ auto doc = _getCheckpointTimestampDocument(opCtx);
+ if (!doc) {
+ LOG(3) << "Returning empty checkpoint timestamp since document did not exist";
+ return {};
+ }
+
+ Timestamp out = doc->getCheckpointTimestamp();
+
+ LOG(3) << "returning checkpoint timestamp: " << out;
+ return out;
+}
+
+
} // namespace repl
} // namespace mongo