summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-12-13 12:39:47 -0500
committerJudah Schvimer <judah@mongodb.com>2017-12-13 12:39:47 -0500
commit6796859387ba77a3556ed583a317681a288970e4 (patch)
tree64dfff38f225a714d83533c8952198d067792e77 /src/mongo/db/repl
parent4d53b4beb00eaa341bf2a134fbc9366d3333e830 (diff)
downloadmongo-6796859387ba77a3556ed583a317681a288970e4.tar.gz
SERVER-30926 Add timestamps to writes to minValid document
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/replication_consistency_markers.h8
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp126
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.h4
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.cpp6
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp15
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp7
-rw-r--r--src/mongo/db/repl/rs_rollback_no_uuid.cpp2
-rw-r--r--src/mongo/db/repl/storage_interface.h8
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp24
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h4
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp42
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h4
15 files changed, 190 insertions, 65 deletions
diff --git a/src/mongo/db/repl/replication_consistency_markers.h b/src/mongo/db/repl/replication_consistency_markers.h
index 96ea46b68c7..2bf12adb0db 100644
--- a/src/mongo/db/repl/replication_consistency_markers.h
+++ b/src/mongo/db/repl/replication_consistency_markers.h
@@ -176,12 +176,16 @@ public:
* The applied through point is a persistent record of which oplog entries we've applied.
* If we crash while applying a batch of oplog entries, this OpTime tells us where to start
* applying operations on startup.
- *
- * If null, the applied through point is the top of the oplog.
*/
virtual void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) = 0;
/**
+ * Unsets the applied through OpTime at the given 'writeTimestamp'.
+ * Once cleared, the applied through point is the top of the oplog.
+ */
+ virtual void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) = 0;
+
+ /**
* You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead.
*
* This reads the value from storage which isn't always updated when the ReplicationCoordinator
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index 3896d3a13e2..87d030b3098 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -92,8 +92,8 @@ boost::optional<MinValidDocument> ReplicationConsistencyMarkersImpl::_getMinVali
return minValid;
}
-void ReplicationConsistencyMarkersImpl::_updateMinValidDocument(OperationContext* opCtx,
- const BSONObj& updateSpec) {
+void ReplicationConsistencyMarkersImpl::_updateMinValidDocument(
+ OperationContext* opCtx, const TimestampedBSONObj& updateSpec) {
Status status = _storageInterface->putSingleton(opCtx, _minValidNss, updateSpec);
invariantOK(status);
}
@@ -104,19 +104,26 @@ void ReplicationConsistencyMarkersImpl::initializeMinValidDocument(OperationCont
// This initializes the values of the required fields if they are not already set.
// If one of the fields is already set, the $max will prefer the existing value since it
// will always be greater than the provided ones.
- auto spec = BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName
- << Timestamp()
- << MinValidDocument::kMinValidTermFieldName
- << OpTime::kUninitializedTerm));
+ TimestampedBSONObj upsert;
+ upsert.obj = BSON("$max" << BSON(MinValidDocument::kMinValidTimestampFieldName
+ << Timestamp()
+ << MinValidDocument::kMinValidTermFieldName
+ << OpTime::kUninitializedTerm));
- Status status = _storageInterface->putSingleton(opCtx, _minValidNss, spec);
+ // The initialization write should go into the first checkpoint taken, so we provide no
+ // timestamp. The 'minValid' document could exist already and this could simply add fields to
+ // the 'minValid' document, but we still want the initialization write to go into the next
+ // checkpoint since a newly initialized 'minValid' document is always valid.
+ upsert.timestamp = Timestamp();
+
+ Status status = _storageInterface->putSingleton(opCtx, _minValidNss, upsert);
// If the collection doesn't exist, create it and try again.
if (status == ErrorCodes::NamespaceNotFound) {
status = _storageInterface->createCollection(opCtx, _minValidNss, CollectionOptions());
fassertStatusOK(40509, status);
- status = _storageInterface->putSingleton(opCtx, _minValidNss, spec);
+ status = _storageInterface->putSingleton(opCtx, _minValidNss, upsert);
}
fassertStatusOK(40467, status);
@@ -138,7 +145,15 @@ bool ReplicationConsistencyMarkersImpl::getInitialSyncFlag(OperationContext* opC
void ReplicationConsistencyMarkersImpl::setInitialSyncFlag(OperationContext* opCtx) {
LOG(3) << "setting initial sync flag";
- _updateMinValidDocument(opCtx, BSON("$set" << kInitialSyncFlag));
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << kInitialSyncFlag);
+
+ // We do not provide a timestamp when we set the initial sync flag. Initial sync can only
+ // occur right when we start up, and thus there cannot be any checkpoints being taken. This
+ // write should go into the next checkpoint.
+ update.timestamp = Timestamp();
+
+ _updateMinValidDocument(opCtx, update);
opCtx->recoveryUnit()->waitUntilDurable();
}
@@ -147,14 +162,23 @@ void ReplicationConsistencyMarkersImpl::clearInitialSyncFlag(OperationContext* o
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
OpTime time = replCoord->getMyLastAppliedOpTime();
- _updateMinValidDocument(opCtx,
- BSON("$unset" << kInitialSyncFlag << "$set"
- << BSON(MinValidDocument::kMinValidTimestampFieldName
- << time.getTimestamp()
- << MinValidDocument::kMinValidTermFieldName
- << time.getTerm()
- << MinValidDocument::kAppliedThroughFieldName
- << time)));
+ TimestampedBSONObj update;
+ update.obj = BSON("$unset" << kInitialSyncFlag << "$set"
+ << BSON(MinValidDocument::kMinValidTimestampFieldName
+ << time.getTimestamp()
+ << MinValidDocument::kMinValidTermFieldName
+ << time.getTerm()
+ << MinValidDocument::kAppliedThroughFieldName
+ << time));
+
+ // We clear the initial sync flag at the 'lastAppliedOpTime'. This is unnecessary, since there
+ // should not be any stable checkpoints being taken that this write could inadvertantly enter.
+ // This 'lastAppliedOpTime' will be the first stable timestamp candidate, so it will be in the
+ // first stable checkpoint taken after initial sync. This provides more clarity than providing
+ // no timestamp.
+ update.timestamp = time.getTimestamp();
+
+ _updateMinValidDocument(opCtx, update);
if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) {
opCtx->recoveryUnit()->waitUntilDurable();
@@ -177,11 +201,19 @@ void ReplicationConsistencyMarkersImpl::setMinValid(OperationContext* opCtx,
const OpTime& minValid) {
LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON()
<< ")";
- _updateMinValidDocument(opCtx,
- BSON("$set" << BSON(MinValidDocument::kMinValidTimestampFieldName
- << minValid.getTimestamp()
- << MinValidDocument::kMinValidTermFieldName
- << minValid.getTerm())));
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON(MinValidDocument::kMinValidTimestampFieldName
+ << minValid.getTimestamp()
+ << MinValidDocument::kMinValidTermFieldName
+ << minValid.getTerm()));
+
+ // This method is only used with storage engines that do not support recover to stable
+ // timestamp. As a result, their timestamps do not matter.
+ invariant(
+ !opCtx->getServiceContext()->getGlobalStorageEngine()->supportsRecoverToStableTimestamp());
+ update.timestamp = Timestamp();
+
+ _updateMinValidDocument(opCtx, update);
}
void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* opCtx,
@@ -208,26 +240,56 @@ void ReplicationConsistencyMarkersImpl::setMinValidToAtLeast(OperationContext* o
OR(BSON(termField << LT << minValid.getTerm()),
BSON(termField << minValid.getTerm() << tsField << LT << minValid.getTimestamp())));
}
- Status status = _storageInterface->updateSingleton(opCtx, _minValidNss, query, updateSpec);
+
+ TimestampedBSONObj update;
+ update.obj = updateSpec;
+
+ // We write to the 'minValid' document with the 'minValid' timestamp. We only take stable
+ // checkpoints when we are consistent. Thus, the next checkpoint we can take is at this
+ // 'minValid'. If we gave it a timestamp from before the batch, and we took a stable checkpoint
+ // at that timestamp, then we would consider that checkpoint inconsistent, even though it is
+ // consistent.
+ update.timestamp = minValid.getTimestamp();
+
+ Status status = _storageInterface->updateSingleton(opCtx, _minValidNss, query, update);
invariantOK(status);
}
void ReplicationConsistencyMarkersImpl::removeOldOplogDeleteFromPointField(
OperationContext* opCtx) {
- _updateMinValidDocument(
- opCtx, BSON("$unset" << BSON(MinValidDocument::kOldOplogDeleteFromPointFieldName << 1)));
+ TimestampedBSONObj update;
+ update.obj = BSON("$unset" << BSON(MinValidDocument::kOldOplogDeleteFromPointFieldName << 1));
+
+ // This is getting removed in SERVER-30556 before 3.8 is released, so the timestamp does not
+ // matter.
+ update.timestamp = Timestamp();
+ _updateMinValidDocument(opCtx, update);
}
void ReplicationConsistencyMarkersImpl::setAppliedThrough(OperationContext* opCtx,
const OpTime& optime) {
+ invariant(!optime.isNull());
LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")";
- if (optime.isNull()) {
- _updateMinValidDocument(
- opCtx, BSON("$unset" << BSON(MinValidDocument::kAppliedThroughFieldName << 1)));
- } else {
- _updateMinValidDocument(
- opCtx, BSON("$set" << BSON(MinValidDocument::kAppliedThroughFieldName << optime)));
- }
+
+ // We set the 'appliedThrough' to the provided timestamp. The 'appliedThrough' is only valid
+ // in checkpoints that contain all writes through this timestamp since it indicates the top of
+ // the oplog.
+ TimestampedBSONObj update;
+ update.timestamp = optime.getTimestamp();
+ update.obj = BSON("$set" << BSON(MinValidDocument::kAppliedThroughFieldName << optime));
+
+ _updateMinValidDocument(opCtx, update);
+}
+
+void ReplicationConsistencyMarkersImpl::clearAppliedThrough(OperationContext* opCtx,
+ const Timestamp& writeTimestamp) {
+ LOG(3) << "clearing appliedThrough at: " << writeTimestamp.toString();
+
+ TimestampedBSONObj update;
+ update.timestamp = writeTimestamp;
+ update.obj = BSON("$unset" << BSON(MinValidDocument::kAppliedThroughFieldName << 1));
+
+ _updateMinValidDocument(opCtx, update);
}
OpTime ReplicationConsistencyMarkersImpl::getAppliedThrough(OperationContext* opCtx) const {
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.h b/src/mongo/db/repl/replication_consistency_markers_impl.h
index 7bf70fa5c0b..0b36c1439a7 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.h
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.h
@@ -43,6 +43,7 @@ namespace repl {
class OpTime;
class StorageInterface;
+struct TimestampedBSONObj;
class ReplicationConsistencyMarkersImpl : public ReplicationConsistencyMarkers {
MONGO_DISALLOW_COPYING(ReplicationConsistencyMarkersImpl);
@@ -76,6 +77,7 @@ public:
void removeOldOplogDeleteFromPointField(OperationContext* opCtx) override;
void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
+ void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override;
OpTime getAppliedThrough(OperationContext* opCtx) const override;
void writeCheckpointTimestamp(OperationContext* opCtx, const Timestamp& timestamp);
@@ -94,7 +96,7 @@ private:
*
* This fasserts on failure.
*/
- void _updateMinValidDocument(OperationContext* opCtx, const BSONObj& updateSpec);
+ void _updateMinValidDocument(OperationContext* opCtx, const TimestampedBSONObj& updateSpec);
/**
* Reads the OplogTruncateAfterPoint document from disk.
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
index ae4257507c1..9abfb52c525 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp
@@ -282,7 +282,7 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) {
// Set min valid without waiting for the changes to be durable.
OpTime endOpTime2({Seconds(789), 0}, 1LL);
consistencyMarkers.setMinValid(opCtx, endOpTime2);
- consistencyMarkers.setAppliedThrough(opCtx, {});
+ consistencyMarkers.clearAppliedThrough(opCtx, {});
ASSERT_EQUALS(consistencyMarkers.getAppliedThrough(opCtx), OpTime());
ASSERT_EQUALS(consistencyMarkers.getMinValid(opCtx), endOpTime2);
ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled);
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.cpp b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
index 9a33b0ea902..7510ab2679e 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.cpp
@@ -100,6 +100,12 @@ void ReplicationConsistencyMarkersMock::setAppliedThrough(OperationContext* opCt
_appliedThrough = optime;
}
+void ReplicationConsistencyMarkersMock::clearAppliedThrough(OperationContext* opCtx,
+ const Timestamp& writeTimestamp) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ _appliedThrough = {};
+}
+
OpTime ReplicationConsistencyMarkersMock::getAppliedThrough(OperationContext* opCtx) const {
stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
return _appliedThrough;
diff --git a/src/mongo/db/repl/replication_consistency_markers_mock.h b/src/mongo/db/repl/replication_consistency_markers_mock.h
index 45497877cea..cc6a2009bc1 100644
--- a/src/mongo/db/repl/replication_consistency_markers_mock.h
+++ b/src/mongo/db/repl/replication_consistency_markers_mock.h
@@ -66,6 +66,7 @@ public:
void removeOldOplogDeleteFromPointField(OperationContext* opCtx) override;
void setAppliedThrough(OperationContext* opCtx, const OpTime& optime) override;
+ void clearAppliedThrough(OperationContext* opCtx, const Timestamp& writeTimestamp) override;
OpTime getAppliedThrough(OperationContext* opCtx) const override;
void writeCheckpointTimestamp(OperationContext* opCtx, const Timestamp& timestamp) override;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index fbd97666f09..47fe0839850 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -339,8 +339,12 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* opCtx)
loadLastOpTime(opCtx) ==
_replicationProcess->getConsistencyMarkers()->getAppliedThrough(opCtx)) {
// Clear the appliedThrough marker to indicate we are consistent with the top of the
- // oplog.
- _replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {});
+ // oplog. We record this update at the 'lastAppliedOpTime'. If there are any outstanding
+ // checkpoints being taken, they should only reflect this write if they see all writes up
+ // to our 'lastAppliedOpTime'.
+ auto lastAppliedOpTime = repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime();
+ _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(
+ opCtx, lastAppliedOpTime.getTimestamp());
}
}
@@ -445,9 +449,14 @@ OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationC
// Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
// done before we add anything to our oplog.
+ // We record this update at the 'lastAppliedOpTime'. If there are any outstanding
+ // checkpoints being taken, they should only reflect this write if they see all writes up
+ // to our 'lastAppliedOpTime'.
invariant(
_replicationProcess->getConsistencyMarkers()->getOplogTruncateAfterPoint(opCtx).isNull());
- _replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {});
+ auto lastAppliedOpTime = repl::getGlobalReplicationCoordinator()->getMyLastAppliedOpTime();
+ _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(
+ opCtx, lastAppliedOpTime.getTimestamp());
if (isV1ElectionProtocol) {
writeConflictRetry(opCtx, "logging transition to primary to oplog", "local.oplog.rs", [&] {
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index c017bb6110b..2731343178f 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -86,7 +86,7 @@ void RollbackTest::setUp() {
SessionCatalog::create(serviceContext);
_opCtx = cc().makeOperationContext();
- _replicationProcess->getConsistencyMarkers()->setAppliedThrough(_opCtx.get(), OpTime{});
+ _replicationProcess->getConsistencyMarkers()->clearAppliedThrough(_opCtx.get(), {});
_replicationProcess->getConsistencyMarkers()->setMinValid(_opCtx.get(), OpTime{});
_replicationProcess->initializeRollbackID(_opCtx.get()).transitional_ignore();
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 7d128e663ed..0a14fc74c18 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -610,7 +610,12 @@ void checkRbidAndUpdateMinValid(OperationContext* opCtx,
OpTime minValid = fassertStatusOK(40492, OpTime::parseFromOplogEntry(newMinValidDoc));
log() << "Setting minvalid to " << minValid;
- replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {}); // Use top of oplog.
+
+ // This method is only used with storage engines that do not support recover to stable
+ // timestamp. As a result, the timestamp on the 'appliedThrough' update does not matter.
+ invariant(
+ !opCtx->getServiceContext()->getGlobalStorageEngine()->supportsRecoverToStableTimestamp());
+ replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, {});
replicationProcess->getConsistencyMarkers()->setMinValid(opCtx, minValid);
if (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) {
diff --git a/src/mongo/db/repl/rs_rollback_no_uuid.cpp b/src/mongo/db/repl/rs_rollback_no_uuid.cpp
index 765041a0638..256719f04cd 100644
--- a/src/mongo/db/repl/rs_rollback_no_uuid.cpp
+++ b/src/mongo/db/repl/rs_rollback_no_uuid.cpp
@@ -417,7 +417,7 @@ void checkRbidAndUpdateMinValid(OperationContext* opCtx,
OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValidDoc));
log() << "Setting minvalid to " << minValid;
- replicationProcess->getConsistencyMarkers()->setAppliedThrough(opCtx, {}); // Use top of oplog.
+ replicationProcess->getConsistencyMarkers()->clearAppliedThrough(opCtx, {});
replicationProcess->getConsistencyMarkers()->setMinValid(opCtx, minValid);
if (MONGO_FAIL_POINT(rollbackHangThenFailAfterWritingMinValid)) {
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index a7882041c21..26812b5ea84 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -239,20 +239,20 @@ public:
* Updates a singleton document in a collection. Upserts the document if it does not exist. If
* the document is upserted and no '_id' is provided, one will be generated.
* If the collection has more than 1 document, the update will only be performed on the first
- * one found.
+ * one found. The upsert is performed at the given timestamp.
* Returns 'NamespaceNotFound' if the collection does not exist. This does not implicitly
* create the collection so that the caller can create the collection with any collection
* options they want (ex: capped, temp, collation, etc.).
*/
virtual Status putSingleton(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& update) = 0;
+ const TimestampedBSONObj& update) = 0;
/**
* Updates a singleton document in a collection. Never upsert.
*
* If the collection has more than 1 document, the update will only be performed on the first
- * one found.
+ * one found. The update is performed at the given timestamp.
* Returns 'NamespaceNotFound' if the collection does not exist. This does not implicitly
* create the collection so that the caller can create the collection with any collection
* options they want (ex: capped, temp, collation, etc.).
@@ -260,7 +260,7 @@ public:
virtual Status updateSingleton(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& query,
- const BSONObj& update) = 0;
+ const TimestampedBSONObj& update) = 0;
/**
* Finds a single document in the collection referenced by the specified _id.
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 2173d59640f..6a5841bb3b1 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -752,7 +752,9 @@ StatusWith<BSONObj> makeUpsertQuery(const BSONElement& idKey) {
return query;
}
-Status _updateWithQuery(OperationContext* opCtx, const UpdateRequest& request) {
+Status _updateWithQuery(OperationContext* opCtx,
+ const UpdateRequest& request,
+ const Timestamp& ts) {
invariant(!request.isMulti()); // We only want to update one document for performance.
invariant(!request.shouldReturnAnyDocs());
invariant(PlanExecutor::NO_YIELD == request.getYieldPolicy());
@@ -778,6 +780,10 @@ Status _updateWithQuery(OperationContext* opCtx, const UpdateRequest& request) {
return collectionResult.getStatus();
}
auto collection = collectionResult.getValue();
+ WriteUnitOfWork wuow(opCtx);
+ if (!ts.isNull()) {
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(ts));
+ }
auto planExecutorResult =
mongo::getExecutorUpdate(opCtx, nullptr, collection, &parsedUpdate);
@@ -786,7 +792,9 @@ Status _updateWithQuery(OperationContext* opCtx, const UpdateRequest& request) {
}
auto planExecutor = std::move(planExecutorResult.getValue());
- return planExecutor->executePlan();
+ auto ret = planExecutor->executePlan();
+ wuow.commit();
+ return ret;
});
}
@@ -851,23 +859,23 @@ Status StorageInterfaceImpl::upsertById(OperationContext* opCtx,
Status StorageInterfaceImpl::putSingleton(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& update) {
+ const TimestampedBSONObj& update) {
UpdateRequest request(nss);
request.setQuery({});
- request.setUpdates(update);
+ request.setUpdates(update.obj);
request.setUpsert(true);
- return _updateWithQuery(opCtx, request);
+ return _updateWithQuery(opCtx, request, update.timestamp);
}
Status StorageInterfaceImpl::updateSingleton(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& query,
- const BSONObj& update) {
+ const TimestampedBSONObj& update) {
UpdateRequest request(nss);
request.setQuery(query);
- request.setUpdates(update);
+ request.setUpdates(update.obj);
invariant(!request.isUpsert());
- return _updateWithQuery(opCtx, request);
+ return _updateWithQuery(opCtx, request, update.timestamp);
}
Status StorageInterfaceImpl::deleteByFilter(OperationContext* opCtx,
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 710f50d9e10..9b7bf9f6690 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -113,12 +113,12 @@ public:
Status putSingleton(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& update) override;
+ const TimestampedBSONObj& update) override;
Status updateSingleton(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& query,
- const BSONObj& update) override;
+ const TimestampedBSONObj& update) override;
StatusWith<BSONObj> findById(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 343ee8b5720..c119935ab6f 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -1700,7 +1700,11 @@ TEST_F(StorageInterfaceImplTest, PutSingletonReturnsNamespaceNotFoundWhenDatabas
auto opCtx = getOperationContext();
StorageInterfaceImpl storage;
NamespaceString nss("nosuchdb.coll");
- auto update = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+ update.timestamp = Timestamp();
+
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, storage.putSingleton(opCtx, nss, update));
}
@@ -1709,7 +1713,11 @@ TEST_F(StorageInterfaceImplTest, PutSingletonReturnsNamespaceNotFoundWhenCollect
StorageInterfaceImpl storage;
NamespaceString nss("db.coll1");
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- auto update = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+ update.timestamp = Timestamp();
+
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound,
storage.putSingleton(opCtx, NamespaceString("db.coll2"), update));
}
@@ -1719,7 +1727,11 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpsertsDocumentsWhenCollectionIsEmp
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- auto update = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+ update.timestamp = Timestamp();
+
ASSERT_OK(storage.putSingleton(opCtx, nss, update));
ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "x" << 1),
unittest::assertGet(storage.findSingleton(opCtx, nss)));
@@ -1733,7 +1745,11 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesDocumentWhenCollectionIsNotE
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
auto doc1 = BSON("_id" << 0 << "x" << 0);
ASSERT_OK(storage.insertDocument(opCtx, nss, {doc1, Timestamp(0)}, OpTime::kUninitializedTerm));
- auto update = BSON("$set" << BSON("x" << 1));
+
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON("x" << 1));
+ update.timestamp = Timestamp();
+
ASSERT_OK(storage.putSingleton(opCtx, nss, update));
ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "x" << 1),
unittest::assertGet(storage.findSingleton(opCtx, nss)));
@@ -1751,7 +1767,11 @@ TEST_F(StorageInterfaceImplTest, PutSingletonUpdatesFirstDocumentWhenCollectionI
nss,
{{doc1, Timestamp(0), OpTime::kUninitializedTerm},
{doc2, Timestamp(0), OpTime::kUninitializedTerm}}));
- auto update = BSON("$set" << BSON("x" << 2));
+
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON("x" << 2));
+ update.timestamp = Timestamp();
+
ASSERT_OK(storage.putSingleton(opCtx, nss, update));
_assertDocumentsInCollectionEquals(opCtx, nss, {BSON("_id" << 0 << "x" << 2), doc2});
}
@@ -1761,7 +1781,11 @@ TEST_F(StorageInterfaceImplTest, UpdateSingletonNeverUpserts) {
StorageInterfaceImpl storage;
auto nss = makeNamespace(_agent);
ASSERT_OK(storage.createCollection(opCtx, nss, CollectionOptions()));
- auto update = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON("_id" << 0 << "x" << 1));
+ update.timestamp = Timestamp();
+
ASSERT_OK(storage.updateSingleton(opCtx, nss, {}, update));
ASSERT_EQ(ErrorCodes::CollectionIsEmpty, storage.findSingleton(opCtx, nss));
_assertDocumentsInCollectionEquals(opCtx, nss, std::vector<mongo::BSONObj>());
@@ -1775,7 +1799,11 @@ TEST_F(StorageInterfaceImplTest, UpdateSingletonUpdatesDocumentWhenCollectionIsN
auto doc1 = BSON("_id" << 0 << "x" << 0);
ASSERT_OK(
storage.insertDocument(opCtx, nss, {doc1, Timestamp::min()}, OpTime::kUninitializedTerm));
- auto update = BSON("$set" << BSON("x" << 1));
+
+ TimestampedBSONObj update;
+ update.obj = BSON("$set" << BSON("x" << 1));
+ update.timestamp = Timestamp();
+
ASSERT_OK(storage.updateSingleton(opCtx, nss, BSON("_id" << 0), update));
ASSERT_BSONOBJ_EQ(BSON("_id" << 0 << "x" << 1),
unittest::assertGet(storage.findSingleton(opCtx, nss)));
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 2f6fea75cdd..a3c9735220d 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -218,14 +218,14 @@ public:
Status putSingleton(OperationContext* opCtx,
const NamespaceString& nss,
- const BSONObj& update) override {
+ const TimestampedBSONObj& update) override {
return Status{ErrorCodes::IllegalOperation, "putSingleton not implemented."};
}
Status updateSingleton(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& query,
- const BSONObj& update) override {
+ const TimestampedBSONObj& update) override {
return Status{ErrorCodes::IllegalOperation, "updateSingleton not implemented."};
}