diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2017-09-28 09:43:46 -0400 |
---|---|---|
committer | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2017-09-28 11:40:17 -0400 |
commit | 3e7520fdf1c8d24481fe35c728d6563ada824f35 (patch) | |
tree | 3078dc43004a38fa4ec87ac9e37c269ce9742980 /src | |
parent | b0b99866781302ba8b16de033ff2681f20483c14 (diff) | |
download | mongo-3e7520fdf1c8d24481fe35c728d6563ada824f35.tar.gz |
SERVER-31087: Adorn secondary updates with timestamps.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 119 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 81 |
2 files changed, 164 insertions, 36 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 6fa9c678fbd..a7d2dfddc20 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1000,6 +1000,33 @@ Status applyOperation_inlock(OperationContext* opCtx, str::stream() << "applyOps not supported on view: " << requestNss.ns(), collection || !db->getViewCatalog()->lookup(opCtx, requestNss.ns())); + // This code must decide what timestamp the storage engine should make the upcoming writes + // visible with. The requirements and use-cases: + // + // Requirement: A client calling the `applyOps` command must not be able to dictate timestamps + // that violate oplog ordering. Disallow this regardless of whether the timestamps chosen + // are otherwise legal. + // + // Use cases: Secondary oplog application: Use the timestamp in the operation document. These + // operations are replicated to the oplog and this is not nested in a parent + // `WriteUnitOfWork`. + // + // Non-atomic `applyOps`: The server receives an `applyOps` command with a series of + // operations that cannot be run under a single transaction. The common exemption from + // being "transactionable" is containing a command operation. These will not be under a + // parent `WriteUnitOfWork`. The timestamps on the operations will only be used if the + // writes are not being replicated, as that would violate the guiding requirement. This is + // primarily allowed for unit testing. See `dbtest/storage_timestamp_tests.cpp`. + // + // Atomic `applyOps`: The server receives an `applyOps` command with operations that can be + // run under a single transaction. In this case the caller has already opened a + // `WriteUnitOfWork` and expects all writes to become visible at the same time. The caller + // is responsible for setting the timestamp before committing. Assigning a competing + // timestamp in this codepath would break that atomicity. Sharding is a consumer of this + // use-case. + const bool assignOperationTimestamp = + !opCtx->writesAreReplicated() && !haveWrappingWriteUnitOfWork && fieldTs; + if (*opType == 'i') { if (requestNss.isSystemDotIndexes()) { BSONObj indexSpec; @@ -1108,7 +1135,7 @@ Status applyOperation_inlock(OperationContext* opCtx, // Do not use supplied timestamps if running through applyOps, as that would allow // a user to dictate what timestamps appear in the oplog. - if (!opCtx->writesAreReplicated()) { + if (assignOperationTimestamp) { if (fieldTs.ok()) { timestamp = SnapshotName(fieldTs.timestamp()); } @@ -1176,46 +1203,66 @@ Status applyOperation_inlock(OperationContext* opCtx, UpdateLifecycleImpl updateLifecycle(requestNss); request.setLifecycle(&updateLifecycle); - UpdateResult ur = update(opCtx, db, request); + SnapshotName timestamp; + if (assignOperationTimestamp) { + timestamp = SnapshotName(fieldTs.timestamp()); + } - if (ur.numMatched == 0 && ur.upserted.isEmpty()) { - if (ur.modifiers) { - if (updateCriteria.nFields() == 1) { - // was a simple { _id : ... } update criteria - string msg = str::stream() << "failed to apply update: " << redact(op); - error() << msg; - return Status(ErrorCodes::UpdateOperationFailed, msg); - } - // Need to check to see if it isn't present so we can exit early with a - // failure. Note that adds some overhead for this extra check in some cases, - // such as an updateCriteria - // of the form - // { _id:..., { x : {$size:...} } - // thus this is not ideal. - if (collection == NULL || - (indexCatalog->haveIdIndex(opCtx) && - Helpers::findById(opCtx, collection, updateCriteria).isNull()) || - // capped collections won't have an _id index - (!indexCatalog->haveIdIndex(opCtx) && - Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) { - string msg = str::stream() << "couldn't find doc: " << redact(op); - error() << msg; - return Status(ErrorCodes::UpdateOperationFailed, msg); - } + const StringData ns = fieldNs.valueStringData(); + auto status = writeConflictRetry(opCtx, "applyOps_update", ns, [&] { + WriteUnitOfWork wuow(opCtx); + if (timestamp != SnapshotName::min()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); + } - // Otherwise, it's present; zero objects were updated because of additional - // specifiers in the query for idempotence - } else { - // this could happen benignly on an oplog duplicate replay of an upsert - // (because we are idempotent), - // if an regular non-mod update fails the item is (presumably) missing. - if (!upsert) { - string msg = str::stream() << "update of non-mod failed: " << redact(op); - error() << msg; - return Status(ErrorCodes::UpdateOperationFailed, msg); + UpdateResult ur = update(opCtx, db, request); + + if (ur.numMatched == 0 && ur.upserted.isEmpty()) { + if (ur.modifiers) { + if (updateCriteria.nFields() == 1) { + // was a simple { _id : ... } update criteria + string msg = str::stream() << "failed to apply update: " << redact(op); + error() << msg; + return Status(ErrorCodes::UpdateOperationFailed, msg); + } + // Need to check to see if it isn't present so we can exit early with a + // failure. Note that adds some overhead for this extra check in some cases, + // such as an updateCriteria of the form + // { _id:..., { x : {$size:...} } + // thus this is not ideal. + if (collection == NULL || + (indexCatalog->haveIdIndex(opCtx) && + Helpers::findById(opCtx, collection, updateCriteria).isNull()) || + // capped collections won't have an _id index + (!indexCatalog->haveIdIndex(opCtx) && + Helpers::findOne(opCtx, collection, updateCriteria, false).isNull())) { + string msg = str::stream() << "couldn't find doc: " << redact(op); + error() << msg; + return Status(ErrorCodes::UpdateOperationFailed, msg); + } + + // Otherwise, it's present; zero objects were updated because of additional + // specifiers in the query for idempotence + } else { + // this could happen benignly on an oplog duplicate replay of an upsert + // (because we are idempotent), + // if an regular non-mod update fails the item is (presumably) missing. + if (!upsert) { + string msg = str::stream() << "update of non-mod failed: " << redact(op); + error() << msg; + return Status(ErrorCodes::UpdateOperationFailed, msg); + } } } + + wuow.commit(); + return Status::OK(); + }); + + if (!status.isOK()) { + return status; } + if (incrementOpsAppliedStats) { incrementOpsAppliedStats(); } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index e404ca16f15..ae442ec0516 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -291,12 +291,93 @@ public: } }; +class SecondaryUpdateTimes : public StorageTimestampTest { +public: + void run() { + // Only run on 'wiredTiger'. No other storage engines to-date timestamp writes. + if (mongo::storageGlobalParams.engine != "wiredTiger") { + return; + } + + // In order for applyOps to assign timestamps, we must be in non-replicated mode. + repl::UnreplicatedWritesBlock uwb(_opCtx); + + // Create a new collection. + NamespaceString nss("unittests.timestampedUpdates"); + reset(nss); + + AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); + + // Insert one document that will go through a series of updates. + const LogicalTime insertTime = _clock->reserveTicks(1); + WriteUnitOfWork wunit(_opCtx); + insertDocument( + autoColl.getCollection(), + InsertStatement(BSON("_id" << 0), SnapshotName(insertTime.asTimestamp()), 0LL)); + wunit.commit(); + ASSERT_EQ(1, itCount(autoColl.getCollection())); + + const std::vector<std::pair<BSONObj, BSONObj>> updates = { + {BSON("$set" << BSON("val" << 1)), BSON("_id" << 0 << "val" << 1)}, + {BSON("$unset" << BSON("val" << 1)), BSON("_id" << 0)}, + {BSON("$addToSet" << BSON("theSet" << 1)), + BSON("_id" << 0 << "theSet" << BSON_ARRAY(1))}, + {BSON("$addToSet" << BSON("theSet" << 2)), + BSON("_id" << 0 << "theSet" << BSON_ARRAY(1 << 2))}, + {BSON("$pull" << BSON("theSet" << 1)), BSON("_id" << 0 << "theSet" << BSON_ARRAY(2))}, + {BSON("$pull" << BSON("theSet" << 2)), BSON("_id" << 0 << "theSet" << BSONArray())}, + {BSON("$set" << BSON("theMap.val" << 1)), + BSON("_id" << 0 << "theSet" << BSONArray() << "theMap" << BSON("val" << 1))}, + {BSON("$rename" << BSON("theSet" + << "theOtherSet")), + BSON("_id" << 0 << "theMap" << BSON("val" << 1) << "theOtherSet" << BSONArray())}}; + + const LogicalTime firstUpdateTime = _clock->reserveTicks(updates.size()); + for (std::size_t idx = 0; idx < updates.size(); ++idx) { + BSONObjBuilder result; + ASSERT_OK(applyOps( + _opCtx, + nss.db().toString(), + BSON("applyOps" << BSON_ARRAY(BSON( + "ts" << firstUpdateTime.addTicks(idx).asTimestamp() << "t" << 0LL << "h" + << 0xBEEFBEEFLL + << "v" + << 2 + << "op" + << "u" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o2" + << BSON("_id" << 0) + << "o" + << updates[idx].first))), + &result)); + } + + for (std::size_t idx = 0; idx < updates.size(); ++idx) { + // Querying at each successive ticks after `insertTime` sees the document transform in + // the series. + auto recoveryUnit = _opCtx->recoveryUnit(); + recoveryUnit->abandonSnapshot(); + ASSERT_OK(recoveryUnit->selectSnapshot( + SnapshotName(insertTime.addTicks(idx + 1).asTimestamp()))); + + auto doc = findOne(autoColl.getCollection()); + ASSERT_EQ(0, SimpleBSONObjComparator::kInstance.compare(doc, updates[idx].second)) + << "Doc: " << doc.toString() << " Expected: " << updates[idx].second.toString(); + } + } +}; + class AllStorageTimestampTests : public unittest::Suite { public: AllStorageTimestampTests() : unittest::Suite("StorageTimestampTests") {} void setupTests() { add<SecondaryInsertTimes>(); add<SecondaryArrayInsertTimes>(); + add<SecondaryUpdateTimes>(); } }; |