diff options
author | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2017-09-28 09:54:06 -0400 |
---|---|---|
committer | Daniel Gottlieb <daniel.gottlieb@mongodb.com> | 2017-09-28 11:40:18 -0400 |
commit | 3fa551ff7a93a2849a52146aff634ba215e75925 (patch) | |
tree | e9b6ca95b6ccd1ed542dd6b92d949a599adf8fd0 /src | |
parent | 2b0c3108aeda392e59f356542f6ea98c4a83670f (diff) | |
download | mongo-3fa551ff7a93a2849a52146aff634ba215e75925.tar.gz |
SERVER-31260: Adorn secondary upserts with timestamps.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 21 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 312 |
4 files changed, 326 insertions, 90 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 311d5df1ef6..70ed753ed14 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1020,12 +1020,14 @@ Status applyOperation_inlock(OperationContext* opCtx, // // Atomic `applyOps`: The server receives an `applyOps` command with operations that can be // run under a single transaction. In this case the caller has already opened a - // `WriteUnitOfWork` and expects all writes to become visible at the same time. The caller - // is responsible for setting the timestamp before committing. Assigning a competing - // timestamp in this codepath would break that atomicity. Sharding is a consumer of this - // use-case. - const bool assignOperationTimestamp = - !opCtx->writesAreReplicated() && !haveWrappingWriteUnitOfWork && fieldTs; + // `WriteUnitOfWork` and expects all writes to become visible at the same time. Moreover, + // the individual operations will not contain a `ts` field. The caller is responsible for + // setting the timestamp before committing. Assigning a competing timestamp in this + // codepath would break that atomicity. Sharding is a consumer of this use-case. + const bool assignOperationTimestamp = !opCtx->writesAreReplicated() && + !haveWrappingWriteUnitOfWork && fieldTs && + getGlobalReplicationCoordinator()->getReplicationMode() != + ReplicationCoordinator::modeMasterSlave; if (*opType == 'i') { if (requestNss.isSystemDotIndexes()) { @@ -1123,14 +1125,23 @@ Status applyOperation_inlock(OperationContext* opCtx, // 3. If not, do upsert (and commit) // 4. If both !Ok, return status - // We cannot rely on a DuplicateKey error if we'repart of a larger transaction, because - // that would require the transaction to abort. So instead, use upsert in that case. + // We cannot rely on a DuplicateKey error if we're part of a larger transaction, + // because that would require the transaction to abort. So instead, use upsert in that + // case. bool needToDoUpsert = haveWrappingWriteUnitOfWork; - if (!needToDoUpsert) { - SnapshotName timestamp; - long long term = OpTime::kUninitializedTerm; + SnapshotName timestamp; + long long term = OpTime::kUninitializedTerm; + if (assignOperationTimestamp) { + if (fieldTs) { + timestamp = SnapshotName(fieldTs.timestamp()); + } + if (fieldT) { + term = fieldT.Long(); + } + } + if (!needToDoUpsert) { WriteUnitOfWork wuow(opCtx); // Do not use supplied timestamps if running through applyOps, as that would allow @@ -1145,8 +1156,9 @@ Status applyOperation_inlock(OperationContext* opCtx, } OpDebug* const nullOpDebug = nullptr; - auto status = collection->insertDocument( + Status status = collection->insertDocument( opCtx, InsertStatement(o, timestamp, term), nullOpDebug, true); + if (status.isOK()) { wuow.commit(); } else if (status == ErrorCodes::DuplicateKey) { @@ -1155,6 +1167,7 @@ Status applyOperation_inlock(OperationContext* opCtx, return status; } } + // Now see if we need to do an upsert. if (needToDoUpsert) { // Do update on DuplicateKey errors. @@ -1164,20 +1177,31 @@ Status applyOperation_inlock(OperationContext* opCtx, b.append(o.getField("_id")); UpdateRequest request(requestNss); - request.setQuery(b.done()); request.setUpdates(o); request.setUpsert(); request.setFromOplogApplication(true); + UpdateLifecycleImpl updateLifecycle(requestNss); request.setLifecycle(&updateLifecycle); - UpdateResult res = update(opCtx, db, request); - if (res.numMatched == 0 && res.upserted.isEmpty()) { - error() << "No document was updated even though we got a DuplicateKey " - "error when inserting"; - fassertFailedNoTrace(28750); - } + const StringData ns = fieldNs.valueStringData(); + writeConflictRetry(opCtx, "applyOps_upsert", ns, [&] { + WriteUnitOfWork wuow(opCtx); + // If this is an atomic applyOps (i.e: `haveWrappingWriteUnitOfWork` is true), + // do not timestamp the write. + if (assignOperationTimestamp && timestamp != SnapshotName::min()) { + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(timestamp)); + } + + UpdateResult res = update(opCtx, db, request); + if (res.numMatched == 0 && res.upserted.isEmpty()) { + error() << "No document was updated even though we got a DuplicateKey " + "error when inserting"; + fassertFailedNoTrace(28750); + } + wuow.commit(); + }); } if (incrementOpsAppliedStats) { @@ -1195,11 +1219,11 @@ Status applyOperation_inlock(OperationContext* opCtx, updateCriteria.hasField("_id")); UpdateRequest request(requestNss); - request.setQuery(updateCriteria); request.setUpdates(o); request.setUpsert(upsert); request.setFromOplogApplication(true); + UpdateLifecycleImpl updateLifecycle(requestNss); request.setLifecycle(&updateLifecycle); @@ -1216,7 +1240,6 @@ Status applyOperation_inlock(OperationContext* opCtx, } UpdateResult ur = update(opCtx, db, request); - if (ur.numMatched == 0 && ur.upserted.isEmpty()) { if (ur.modifiers) { if (updateCriteria.nFields() == 1) { @@ -1225,6 +1248,7 @@ Status applyOperation_inlock(OperationContext* opCtx, error() << msg; return Status(ErrorCodes::UpdateOperationFailed, msg); } + // Need to check to see if it isn't present so we can exit early with a // failure. Note that adds some overhead for this extra check in some cases, // such as an updateCriteria of the form @@ -1277,6 +1301,7 @@ Status applyOperation_inlock(OperationContext* opCtx, if (assignOperationTimestamp) { timestamp = SnapshotName(fieldTs.timestamp()); } + const StringData ns = fieldNs.valueStringData(); writeConflictRetry(opCtx, "applyOps_delete", ns, [&] { WriteUnitOfWork wuow(opCtx); @@ -1288,11 +1313,12 @@ Status applyOperation_inlock(OperationContext* opCtx, deleteObjects(opCtx, collection, requestNss, o, /*justOne*/ valueB); } else verify(opType[1] == 'b'); // "db" advertisement - if (incrementOpsAppliedStats) { - incrementOpsAppliedStats(); - } wuow.commit(); }); + + if (incrementOpsAppliedStats) { + incrementOpsAppliedStats(); + } } else if (*opType == 'n') { // no op if (incrementOpsAppliedStats) { diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 3d14db9c24a..8cfd9b902d0 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -327,14 +327,11 @@ Status SyncTail::syncApply(OperationContext* opCtx, bool isNoOp = opType[0] == 'n'; if (isNoOp || (opType[0] == 'i' && nss.isSystemDotIndexes())) { - auto opStr = isNoOp ? "syncApply_noop" : "syncApply_indexBuild"; if (isNoOp && nss.db() == "") return Status::OK(); - return writeConflictRetry(opCtx, opStr, nss.ns(), [&] { - Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); - OldClientContext ctx(opCtx, nss.ns()); - return applyOp(ctx.db()); - }); + Lock::DBLock dbLock(opCtx, nss.db(), MODE_X); + OldClientContext ctx(opCtx, nss.ns()); + return applyOp(ctx.db()); } if (isCrudOpType(opType)) { diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 22c9c66ecc4..343fe15b1cb 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -211,27 +211,6 @@ TEST_F(SyncTailTest, SyncApplyNoOp) { ASSERT_TRUE(applyOpCalled); } -TEST_F(SyncTailTest, SyncApplyNoOpApplyOpThrowsException) { - const BSONObj op = BSON("op" - << "n" - << "ns" - << "test.t"); - int applyOpCalled = 0; - SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* opCtx, - Database* db, - const BSONObj& theOperation, - bool inSteadyStateReplication, - stdx::function<void()>) { - applyOpCalled++; - if (applyOpCalled < 5) { - throw WriteConflictException(); - } - return Status::OK(); - }; - ASSERT_OK(SyncTail::syncApply(_opCtx.get(), op, false, applyOp, failedApplyCommand, _incOps)); - ASSERT_EQUALS(5, applyOpCalled); -} - TEST_F(SyncTailTest, SyncApplyInsertDocumentDatabaseMissing) { ASSERT_THROWS_CODE(_testSyncApplyInsertDocument(ErrorCodes::OK), AssertionException, diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index a3153c00b8e..9f234730e44 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -52,6 +52,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/storage/kv/kv_storage_engine.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/stacktrace.h" namespace mongo { @@ -144,9 +145,45 @@ public: BSONObj findOne(Collection* coll) { auto optRecord = coll->getRecordStore()->getCursor(_opCtx)->next(); - ASSERT(optRecord != boost::none); + if (optRecord == boost::none) { + // Print a stack trace to help disambiguate which `findOne` failed. + printStackTrace(); + FAIL("Did not find any documents."); + } return optRecord.get().data.toBson(); } + + StatusWith<BSONObj> doAtomicApplyOps(const std::string& dbName, + const std::list<BSONObj>& applyOpsList) { + BSONObjBuilder result; + Status status = applyOps(_opCtx, dbName, BSON("applyOps" << applyOpsList), &result); + if (!status.isOK()) { + return status; + } + + return {result.obj()}; + } + + // Creates a dummy command operation to persuade `applyOps` to be non-atomic. + StatusWith<BSONObj> doNonAtomicApplyOps(const std::string& dbName, + const std::list<BSONObj>& applyOpsList, + Timestamp dummyTs) { + BSONArrayBuilder builder; + builder.append(applyOpsList); + builder << BSON("ts" << dummyTs << "t" << 1LL << "h" << 1 << "op" + << "c" + << "ns" + << "test.$cmd" + << "o" + << BSON("applyOps" << BSONArrayBuilder().obj())); + BSONObjBuilder result; + Status status = applyOps(_opCtx, dbName, BSON("applyOps" << builder.arr()), &result); + if (!status.isOK()) { + return status; + } + + return {result.obj()}; + } }; class SecondaryInsertTimes : public StorageTimestampTest { @@ -326,24 +363,23 @@ public: // Delete all documents one at a time. const LogicalTime startDeleteTime = _clock->reserveTicks(docsToInsert); for (std::int32_t num = 0; num < docsToInsert; ++num) { - BSONObjBuilder result; - ASSERT_OK(applyOps( - _opCtx, - nss.db().toString(), - BSON("applyOps" << BSON_ARRAY(BSON( - "ts" << startDeleteTime.addTicks(num).asTimestamp() << "t" << 0LL << "h" - << 0xBEEFBEEFLL - << "v" - << 2 - << "op" - << "d" - << "ns" - << nss.ns() - << "ui" - << autoColl.getCollection()->uuid().get() - << "o" - << BSON("_id" << num)))), - &result)); + ASSERT_OK( + doNonAtomicApplyOps( + nss.db().toString(), + {BSON("ts" << startDeleteTime.addTicks(num).asTimestamp() << "t" << 0LL << "h" + << 0xBEEFBEEFLL + << "v" + << 2 + << "op" + << "d" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o" + << BSON("_id" << num))}, + startDeleteTime.addTicks(num).asTimestamp()) + .getStatus()); } for (std::int32_t num = 0; num <= docsToInsert; ++num) { @@ -384,6 +420,9 @@ public: wunit.commit(); ASSERT_EQ(1, itCount(autoColl.getCollection())); + // Each pair in the vector represents the update to perform at the next tick of the + // clock. `pair.first` is the update to perform and `pair.second` is the full value of the + // document after the transformation. const std::vector<std::pair<BSONObj, BSONObj>> updates = { {BSON("$set" << BSON("val" << 1)), BSON("_id" << 0 << "val" << 1)}, {BSON("$unset" << BSON("val" << 1)), BSON("_id" << 0)}, @@ -401,26 +440,25 @@ public: const LogicalTime firstUpdateTime = _clock->reserveTicks(updates.size()); for (std::size_t idx = 0; idx < updates.size(); ++idx) { - BSONObjBuilder result; - ASSERT_OK(applyOps( - _opCtx, - nss.db().toString(), - BSON("applyOps" << BSON_ARRAY(BSON( - "ts" << firstUpdateTime.addTicks(idx).asTimestamp() << "t" << 0LL << "h" - << 0xBEEFBEEFLL - << "v" - << 2 - << "op" - << "u" - << "ns" - << nss.ns() - << "ui" - << autoColl.getCollection()->uuid().get() - << "o2" - << BSON("_id" << 0) - << "o" - << updates[idx].first))), - &result)); + ASSERT_OK( + doNonAtomicApplyOps( + nss.db().toString(), + {BSON("ts" << firstUpdateTime.addTicks(idx).asTimestamp() << "t" << 0LL << "h" + << 0xBEEFBEEFLL + << "v" + << 2 + << "op" + << "u" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o2" + << BSON("_id" << 0) + << "o" + << updates[idx].first)}, + firstUpdateTime.addTicks(idx).asTimestamp()) + .getStatus()); } for (std::size_t idx = 0; idx < updates.size(); ++idx) { @@ -438,6 +476,199 @@ public: } }; +class SecondaryInsertToUpsert : public StorageTimestampTest { +public: + void run() { + // Only run on 'wiredTiger'. No other storage engines to-date timestamp writes. + if (mongo::storageGlobalParams.engine != "wiredTiger") { + return; + } + + // In order for applyOps to assign timestamps, we must be in non-replicated mode. + repl::UnreplicatedWritesBlock uwb(_opCtx); + + // Create a new collection. + NamespaceString nss("unittests.insertToUpsert"); + reset(nss); + + AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); + + const LogicalTime insertTime = _clock->reserveTicks(2); + + // This applyOps runs into an insert of `{_id: 0, field: 0}` followed by a second insert + // on the same collection with `{_id: 0}`. It's expected for this second insert to be + // turned into an upsert. The goal document does not contain `field: 0`. + BSONObjBuilder resultBuilder; + auto swResult = doNonAtomicApplyOps( + nss.db().toString(), + {BSON("ts" << insertTime.asTimestamp() << "t" << 1LL << "h" << 0xBEEFBEEFLL << "v" << 2 + << "op" + << "i" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o" + << BSON("_id" << 0 << "field" << 0)), + BSON("ts" << insertTime.addTicks(1).asTimestamp() << "t" << 1LL << "h" << 0xBEEFBEEFLL + << "v" + << 2 + << "op" + << "i" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o" + << BSON("_id" << 0))}, + insertTime.addTicks(1).asTimestamp()); + ASSERT_OK(swResult); + + BSONObj& result = swResult.getValue(); + ASSERT_EQ(3, result.getIntField("applied")); + ASSERT(result["results"].Array()[0].Bool()); + ASSERT(result["results"].Array()[1].Bool()); + ASSERT(result["results"].Array()[2].Bool()); + + // Reading at `insertTime` should show the original document, `{_id: 0, field: 0}`. + auto recoveryUnit = _opCtx->recoveryUnit(); + recoveryUnit->abandonSnapshot(); + ASSERT_OK(recoveryUnit->selectSnapshot(SnapshotName(insertTime.asTimestamp()))); + auto doc = findOne(autoColl.getCollection()); + ASSERT_EQ(0, + SimpleBSONObjComparator::kInstance.compare(doc, BSON("_id" << 0 << "field" << 0))) + << "Doc: " << doc.toString() << " Expected: {_id: 0, field: 0}"; + + // Reading at `insertTime + 1` should show the second insert that got converted to an + // upsert, `{_id: 0}`. + recoveryUnit->abandonSnapshot(); + ASSERT_OK(recoveryUnit->selectSnapshot(SnapshotName(insertTime.addTicks(1).asTimestamp()))); + doc = findOne(autoColl.getCollection()); + ASSERT_EQ(0, SimpleBSONObjComparator::kInstance.compare(doc, BSON("_id" << 0))) + << "Doc: " << doc.toString() << " Expected: {_id: 0}"; + } +}; + +class SecondaryAtomicApplyOps : public StorageTimestampTest { +public: + void run() { + // Only run on 'wiredTiger'. No other storage engines to-date timestamp writes. + if (mongo::storageGlobalParams.engine != "wiredTiger") { + return; + } + + // Create a new collection. + NamespaceString nss("unittests.insertToUpsert"); + reset(nss); + + AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); + + // Reserve a timestamp before the inserts should happen. + const LogicalTime preInsertTimestamp = _clock->reserveTicks(1); + auto swResult = doAtomicApplyOps(nss.db().toString(), + {BSON("v" << 2 << "op" + << "i" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o" + << BSON("_id" << 0)), + BSON("v" << 2 << "op" + << "i" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o" + << BSON("_id" << 1))}); + ASSERT_OK(swResult); + + ASSERT_EQ(2, swResult.getValue().getIntField("applied")); + ASSERT(swResult.getValue()["results"].Array()[0].Bool()); + ASSERT(swResult.getValue()["results"].Array()[1].Bool()); + + // Reading at `preInsertTimestamp` should not find anything. + auto recoveryUnit = _opCtx->recoveryUnit(); + recoveryUnit->abandonSnapshot(); + ASSERT_OK(recoveryUnit->selectSnapshot(SnapshotName(preInsertTimestamp.asTimestamp()))); + ASSERT_EQ(0, itCount(autoColl.getCollection())) + << "Should not observe a write at `preInsertTimestamp`. TS: " + << preInsertTimestamp.asTimestamp(); + + // Reading at `preInsertTimestamp + 1` should observe both inserts. + recoveryUnit = _opCtx->recoveryUnit(); + recoveryUnit->abandonSnapshot(); + ASSERT_OK(recoveryUnit->selectSnapshot( + SnapshotName(preInsertTimestamp.addTicks(1).asTimestamp()))); + ASSERT_EQ(2, itCount(autoColl.getCollection())) + << "Should observe both writes at `preInsertTimestamp + 1`. TS: " + << preInsertTimestamp.addTicks(1).asTimestamp(); + } +}; + + +// This should have the same result as `SecondaryInsertToUpsert` except it gets there a different +// way. Doing an atomic `applyOps` should result in a WriteConflictException because the same +// transaction is trying to write modify the same document twice. The `applyOps` command should +// catch that failure and retry in non-atomic mode, preserving the timestamps supplied by the +// user. +class SecondaryAtomicApplyOpsWCEToNonAtomic : public StorageTimestampTest { +public: + void run() { + // Only run on 'wiredTiger'. No other storage engines to-date timestamp writes. + if (mongo::storageGlobalParams.engine != "wiredTiger") { + return; + } + + // Create a new collectiont. + NamespaceString nss("unitteTsts.insertToUpsert"); + reset(nss); + + AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_X, LockMode::MODE_IX); + + const LogicalTime preInsertTimestamp = _clock->reserveTicks(1); + auto swResult = doAtomicApplyOps(nss.db().toString(), + {BSON("v" << 2 << "op" + << "i" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o" + << BSON("_id" << 0 << "field" << 0)), + BSON("v" << 2 << "op" + << "i" + << "ns" + << nss.ns() + << "ui" + << autoColl.getCollection()->uuid().get() + << "o" + << BSON("_id" << 0))}); + ASSERT_OK(swResult); + + ASSERT_EQ(2, swResult.getValue().getIntField("applied")); + ASSERT(swResult.getValue()["results"].Array()[0].Bool()); + ASSERT(swResult.getValue()["results"].Array()[1].Bool()); + + // Reading at `insertTime` should not see any documents. + auto recoveryUnit = _opCtx->recoveryUnit(); + recoveryUnit->abandonSnapshot(); + ASSERT_OK(recoveryUnit->selectSnapshot(SnapshotName(preInsertTimestamp.asTimestamp()))); + ASSERT_EQ(0, itCount(autoColl.getCollection())) + << "Should not find any documents at `preInsertTimestamp`. TS: " + << preInsertTimestamp.asTimestamp(); + + // Reading at `preInsertTimestamp + 1` should show the final state of the document. + recoveryUnit->abandonSnapshot(); + ASSERT_OK(recoveryUnit->selectSnapshot( + SnapshotName(preInsertTimestamp.addTicks(1).asTimestamp()))); + auto doc = findOne(autoColl.getCollection()); + ASSERT_EQ(0, SimpleBSONObjComparator::kInstance.compare(doc, BSON("_id" << 0))) + << "Doc: " << doc.toString() << " Expected: {_id: 0}"; + } +}; + class AllStorageTimestampTests : public unittest::Suite { public: AllStorageTimestampTests() : unittest::Suite("StorageTimestampTests") {} @@ -446,6 +677,9 @@ public: add<SecondaryArrayInsertTimes>(); add<SecondaryDeleteTimes>(); add<SecondaryUpdateTimes>(); + add<SecondaryInsertToUpsert>(); + add<SecondaryAtomicApplyOps>(); + add<SecondaryAtomicApplyOpsWCEToNonAtomic>(); } }; |