path: root/src
diff options
authorDaniel Gottlieb <>2017-09-28 09:43:46 -0400
committerDaniel Gottlieb <>2017-09-28 11:40:17 -0400
commit3e7520fdf1c8d24481fe35c728d6563ada824f35 (patch)
tree3078dc43004a38fa4ec87ac9e37c269ce9742980 /src
parentb0b99866781302ba8b16de033ff2681f20483c14 (diff)
SERVER-31087: Adorn secondary updates with timestamps.
Diffstat (limited to 'src')
2 files changed, 164 insertions, 36 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 6fa9c678fbd..a7d2dfddc20 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1000,6 +1000,33 @@ Status applyOperation_inlock(OperationContext* opCtx,
str::stream() << "applyOps not supported on view: " << requestNss.ns(),
collection || !db->getViewCatalog()->lookup(opCtx, requestNss.ns()));
+ // This code must decide what timestamp the storage engine should make the upcoming writes
+ // visible with. The requirements and use-cases:
+ //
+ // Requirement: A client calling the `applyOps` command must not be able to dictate timestamps
+ // that violate oplog ordering. Disallow this regardless of whether the timestamps chosen
+ // are otherwise legal.
+ //
+ // Use cases: Secondary oplog application: Use the timestamp in the operation document. These
+ // operations are replicated to the oplog and this is not nested in a parent
+ // `WriteUnitOfWork`.
+ //
+ // Non-atomic `applyOps`: The server receives an `applyOps` command with a series of
+ // operations that cannot be run under a single transaction. The common exemption from
+ // being "transactionable" is containing a command operation. These will not be under a
+ // parent `WriteUnitOfWork`. The timestamps on the operations will only be used if the
+ // writes are not being replicated, as that would violate the guiding requirement. This is
+ // primarily allowed for unit testing. See `dbtest/storage_timestamp_tests.cpp`.
+ //
+ // Atomic `applyOps`: The server receives an `applyOps` command with operations that can be
+ // run under a single transaction. In this case the caller has already opened a
+ // `WriteUnitOfWork` and expects all writes to become visible at the same time. The caller
+ // is responsible for setting the timestamp before committing. Assigning a competing
+ // timestamp in this codepath would break that atomicity. Sharding is a consumer of this
+ // use-case.
+ const bool assignOperationTimestamp =
+ !opCtx->writesAreReplicated() && !haveWrappingWriteUnitOfWork && fieldTs;
if (*opType == 'i') {
if (requestNss.isSystemDotIndexes()) {
BSONObj indexSpec;
@@ -1108,7 +1135,7 @@ Status applyOperation_inlock(OperationContext* opCtx,
// Do not use supplied timestamps if running through applyOps, as that would allow
// a user to dictate what timestamps appear in the oplog.
- if (!opCtx->writesAreReplicated()) {
+ if (assignOperationTimestamp) {
if (fieldTs.ok()) {
timestamp = SnapshotName(fieldTs.timestamp());
@@ -1176,46 +1203,66 @@ Status applyOperation_inlock(OperationContext* opCtx,
UpdateLifecycleImpl updateLifecycle(requestNss);
- UpdateResult ur = update(opCtx, db, request);
+ SnapshotName timestamp;
+ if (assignOperationTimestamp) {
+ timestamp = SnapshotName(fieldTs.timestamp());
+ }
- if (ur.numMatched == 0 && ur.upserted.isEmpty()) {
- if (ur.modifiers) {
- if (updateCriteria.nFields() == 1) {
- // was a simple { _id : ... } update criteria
- string msg = str::stream() << "failed to apply update: " << redact(op);
- error() << msg;
- return Status(ErrorCodes::UpdateOperationFailed, msg);
- }
- // Need to check to see if it isn't present so we can exit early with a
- // failure. Note that adds some overhead for this extra check in some cases,
- // such as an updateCriteria
- // of the form
- // { _id:..., { x : {$size:...} }
- // thus this is not ideal.
- if (collection == NULL ||
- (indexCatalog->haveIdIndex(opCtx) &&
- Helpers::findById(opCtx, collection, updateCriteria).isNull()) ||
- // capped collections won't have an _id index
- (!indexCatalog->haveIdIndex(opCtx) &&
- Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) {
- string msg = str::stream() << "couldn't find doc: " << redact(op);
- error() << msg;
- return Status(ErrorCodes::UpdateOperationFailed, msg);
- }
+ const StringData ns = fieldNs.valueStringData();
+ auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] {
+ WriteUnitOfWork wuow(opCtx);
+ if (timestamp != SnapshotName::min()) {
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp));
+ }
- // Otherwise, it's present; zero objects were updated because of additional
- // specifiers in the query for idempotence
- } else {
- // this could happen benignly on an oplog duplicate replay of an upsert
- // (because we are idempotent),
- // if an regular non-mod update fails the item is (presumably) missing.
- if (!upsert) {
- string msg = str::stream() << "update of non-mod failed: " << redact(op);
- error() << msg;
- return Status(ErrorCodes::UpdateOperationFailed, msg);
+ UpdateResult ur = update(opCtx, db, request);
+ if (ur.numMatched == 0 && ur.upserted.isEmpty()) {
+ if (ur.modifiers) {
+ if (updateCriteria.nFields() == 1) {
+ // was a simple { _id : ... } update criteria
+ string msg = str::stream() << "failed to apply update: " << redact(op);
+ error() << msg;
+ return Status(ErrorCodes::UpdateOperationFailed, msg);
+ }
+ // Need to check to see if it isn't present so we can exit early with a
+ // failure. Note that adds some overhead for this extra check in some cases,
+ // such as an updateCriteria of the form
+ // { _id:..., { x : {$size:...} }
+ // thus this is not ideal.
+ if (collection == NULL ||
+ (indexCatalog->haveIdIndex(opCtx) &&
+ Helpers::findById(opCtx, collection, updateCriteria).isNull()) ||
+ // capped collections won't have an _id index
+ (!indexCatalog->haveIdIndex(opCtx) &&
+ Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) {
+ string msg = str::stream() << "couldn't find doc: " << redact(op);
+ error() << msg;
+ return Status(ErrorCodes::UpdateOperationFailed, msg);
+ }
+ // Otherwise, it's present; zero objects were updated because of additional
+ // specifiers in the query for idempotence
+ } else {
+ // this could happen benignly on an oplog duplicate replay of an upsert
+ // (because we are idempotent),
+ // if an regular non-mod update fails the item is (presumably) missing.
+ if (!upsert) {
+ string msg = str::stream() << "update of non-mod failed: " << redact(op);
+ error() << msg;
+ return Status(ErrorCodes::UpdateOperationFailed, msg);
+ }
+ wuow.commit();
+ return Status::OK();
+ });
+ if (!status.isOK()) {
+ return status;
if (incrementOpsAppliedStats) {
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index e404ca16f15..ae442ec0516 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -291,12 +291,93 @@ public:
+class SecondaryUpdateTimes : public StorageTimestampTest {
+ void run() {
+ // Only run on 'wiredTiger'. No other storage engines to-date timestamp writes.
+ if (mongo::storageGlobalParams.engine != "wiredTiger") {
+ return;
+ }
+ // In order for applyOps to assign timestamps, we must be in non-replicated mode.
+ repl::UnreplicatedWritesBlock uwb(_opCtx);
+ // Create a new collection.
+ NamespaceString nss("unittests.timestampedUpdates");
+ reset(nss);
+ AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX);
+ // Insert one document that will go through a series of updates.
+ const LogicalTime insertTime = _clock->reserveTicks(1);
+ WriteUnitOfWork wunit(_opCtx);
+ insertDocument(
+ autoColl.getCollection(),
+ InsertStatement(BSON("_id" << 0), SnapshotName(insertTime.asTimestamp()), 0LL));
+ wunit.commit();
+ ASSERT_EQ(1, itCount(autoColl.getCollection()));
+ const std::vector<std::pair<BSONObj, BSONObj>> updates = {
+ {BSON("$set" << BSON("val" << 1)), BSON("_id" << 0 << "val" << 1)},
+ {BSON("$unset" << BSON("val" << 1)), BSON("_id" << 0)},
+ {BSON("$addToSet" << BSON("theSet" << 1)),
+ BSON("_id" << 0 << "theSet" << BSON_ARRAY(1))},
+ {BSON("$addToSet" << BSON("theSet" << 2)),
+ BSON("_id" << 0 << "theSet" << BSON_ARRAY(1 << 2))},
+ {BSON("$pull" << BSON("theSet" << 1)), BSON("_id" << 0 << "theSet" << BSON_ARRAY(2))},
+ {BSON("$pull" << BSON("theSet" << 2)), BSON("_id" << 0 << "theSet" << BSONArray())},
+ {BSON("$set" << BSON("theMap.val" << 1)),
+ BSON("_id" << 0 << "theSet" << BSONArray() << "theMap" << BSON("val" << 1))},
+ {BSON("$rename" << BSON("theSet"
+ << "theOtherSet")),
+ BSON("_id" << 0 << "theMap" << BSON("val" << 1) << "theOtherSet" << BSONArray())}};
+ const LogicalTime firstUpdateTime = _clock->reserveTicks(updates.size());
+ for (std::size_t idx = 0; idx < updates.size(); ++idx) {
+ BSONObjBuilder result;
+ ASSERT_OK(applyOps(
+ _opCtx,
+ nss.db().toString(),
+ BSON("applyOps" << BSON_ARRAY(BSON(
+ "ts" << firstUpdateTime.addTicks(idx).asTimestamp() << "t" << 0LL << "h"
+ << "v"
+ << 2
+ << "op"
+ << "u"
+ << "ns"
+ << nss.ns()
+ << "ui"
+ << autoColl.getCollection()->uuid().get()
+ << "o2"
+ << BSON("_id" << 0)
+ << "o"
+ << updates[idx].first))),
+ &result));
+ }
+ for (std::size_t idx = 0; idx < updates.size(); ++idx) {
+ // Querying at each successive ticks after `insertTime` sees the document transform in
+ // the series.
+ auto recoveryUnit = _opCtx->recoveryUnit();
+ recoveryUnit->abandonSnapshot();
+ ASSERT_OK(recoveryUnit->selectSnapshot(
+ SnapshotName(insertTime.addTicks(idx + 1).asTimestamp())));
+ auto doc = findOne(autoColl.getCollection());
+ ASSERT_EQ(0,, updates[idx].second))
+ << "Doc: " << doc.toString() << " Expected: " << updates[idx].second.toString();
+ }
+ }
class AllStorageTimestampTests : public unittest::Suite {
AllStorageTimestampTests() : unittest::Suite("StorageTimestampTests") {}
void setupTests() {
+ add<SecondaryUpdateTimes>();