diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2019-02-11 15:43:27 -0500 |
---|---|---|
committer | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2019-02-11 15:43:27 -0500 |
commit | 9db1a8dffe753808bea0d8c47d9fc959eaea9ea0 (patch) | |
tree | 26b5750c5088d745ab1fb93596d010501b0b4cbe | |
parent | 691ab6da0c38f52f32c1028a8fa7447997ced255 (diff) | |
download | mongo-9db1a8dffe753808bea0d8c47d9fc959eaea9ea0.tar.gz |
SERVER-39169 Add $_internalReadAtClusterTime option to find and dbHash.
The new $_internalReadAtClusterTime option replaces all usages of
running the dbHash command inside of a multi-statement transaction. It
can be used to read from a consistent snapshot in place of specifying an
atClusterTime read concern.
Unlike multi-statement transactions, the new $_internalReadAtClusterTime
option doesn't cause locks to be left on the server after returning a
network response. It instead restores the snapshot to read from as part
of handling the request.
-rw-r--r-- | jstests/hooks/run_check_repl_dbhash_background.js | 85 | ||||
-rw-r--r-- | jstests/replsets/dbhash_lock_acquisition.js | 95 | ||||
-rw-r--r-- | jstests/replsets/dbhash_read_at_cluster_time.js | 120 | ||||
-rw-r--r-- | jstests/replsets/read_at_cluster_time_outside_transactions.js | 99 | ||||
-rw-r--r-- | src/mongo/db/commands.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/commands/dbhash.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 66 | ||||
-rw-r--r-- | src/mongo/db/commands/getmore_cmd.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/query/query_request.h | 8 | ||||
-rw-r--r-- | src/mongo/db/transaction_validation.cpp | 1 | ||||
-rw-r--r-- | src/mongo/shell/replsettest.js | 24 |
12 files changed, 528 insertions, 120 deletions
diff --git a/jstests/hooks/run_check_repl_dbhash_background.js b/jstests/hooks/run_check_repl_dbhash_background.js index 8e5b942a2eb..8cd338b2d32 100644 --- a/jstests/hooks/run_check_repl_dbhash_background.js +++ b/jstests/hooks/run_check_repl_dbhash_background.js @@ -3,15 +3,17 @@ * * Unlike run_check_repl_dbhash.js, this version of the hook doesn't require that all operations * have finished replicating, nor does it require that the test has finished running. The dbHash - * command is run inside of a transaction specifying atClusterTime in order for an identical - * snapshot to be used by all members of the replica set. + * command reads at a particular clusterTime in order for an identical snapshot to be used by all + * members of the replica set. * - * The find and getMore commands used to generate the collection diff run as part of the same - * transaction as the dbHash command. This ensures the diagnostics for a dbhash mismatch aren't - * subjected to changes from any operations in flight. + * The find and getMore commands used to generate the collection diff read at the same clusterTime + * as the dbHash command. While this ensures the diagnostics for a dbhash mismatch aren't subjected + * to changes from any operations in flight, it is possible for the collection or an index on the + * collection to be dropped due to no locks being held. * - * If a transient transaction error occurs, then the dbhash check is retried until it succeeds, or - * until it fails with a non-transient error. + * If a transient error occurs, then the dbhash check is retried until it succeeds, or until it + * fails with a non-transient error. The most common case of a transient error is attempting to read + * from a collection after a catalog operation has been performed on the collection or database. */ 'use strict'; @@ -26,7 +28,7 @@ let debugInfo = []; // We turn off printing the JavaScript stacktrace in doassert() to avoid generating an - // overwhelming amount of log messages when handling transient transaction errors. + // overwhelming amount of log messages when handling transient errors. TestData = TestData || {}; TestData.traceExceptions = false; @@ -69,13 +71,8 @@ const kForeverSeconds = 1e9; const dbNames = new Set(); - // We enable the "WTPreserveSnapshotHistoryIndefinitely" failpoint and extend the - // "transactionLifetimeLimitSeconds" server parameter to ensure that - // - // (1) the same snapshot will be available to read at on the primary and secondaries, and - // - // (2) the potentally long-running transaction isn't killed while we are part-way through - // verifying data consistency. + // We enable the "WTPreserveSnapshotHistoryIndefinitely" failpoint to ensure that the same + // snapshot will be available to read at on the primary and secondaries. for (let session of sessions) { const db = session.getDatabase('admin'); @@ -96,19 +93,6 @@ mode: 'off', })); }); - - const res = assert.commandWorked(db.runCommand({ - setParameter: 1, - transactionLifetimeLimitSeconds: kForeverSeconds, - })); - - const transactionLifetimeLimitSecondsOriginal = res.was; - resetFns.push(() => { - assert.commandWorked(db.runCommand({ - setParameter: 1, - transactionLifetimeLimitSeconds: transactionLifetimeLimitSecondsOriginal, - })); - }); } for (let session of sessions) { @@ -124,8 +108,8 @@ }); } - // Transactions cannot be run on the following databases. (The "local" database is also not - // replicated.) + // Transactions cannot be run on the following databases so we don't attempt to read at a + // clusterTime on them either. (The "local" database is also not replicated.) dbNames.delete('admin'); dbNames.delete('config'); dbNames.delete('local'); @@ -133,9 +117,8 @@ const results = []; // The waitForSecondaries() function waits for all secondaries to have applied up to - // 'clusterTime' locally. This ensures that a later atClusterTime read inside a transaction - // doesn't stall as a result of a pending global X lock (e.g. from a dropDatabase command) on - // the primary preventing getMores on the oplog from receiving a response. + // 'clusterTime' locally. This ensures that a later $_internalReadAtClusterTime read doesn't + // fail as a result of the secondary's clusterTime being behind 'clusterTime'. const waitForSecondaries = (clusterTime, signedClusterTime) => { debugInfo.push({"waitForSecondaries": clusterTime, "signedClusterTime": signedClusterTime}); for (let i = 1; i < sessions.length; ++i) { @@ -198,9 +181,10 @@ // isn't multi-versioned. Unlike with ReplSetTest#checkReplicatedDataHashes(), it is possible // for a collection catalog operation (e.g. a drop or rename) to have been applied on the // primary but not yet applied on the secondary. - const checkCollectionHashesForDB = (dbName) => { + const checkCollectionHashesForDB = (dbName, clusterTime) => { const result = []; - const hashes = rst.getHashesUsingSessions(sessions, dbName); + const hashes = + rst.getHashesUsingSessions(sessions, dbName, {readAtClusterTime: clusterTime}); const hashesByUUID = hashes.map((response, i) => { const info = {}; @@ -281,9 +265,7 @@ // ReplSetTest#getCollectionDiffUsingSessions() upon detecting a dbHash mismatch. It is // presumed to still useful to know that a bug exists even if we cannot get more // diagnostics for it. - if ((e.hasOwnProperty('errorLabels') && - e.errorLabels.includes('TransientTransactionError')) || - e.code === ErrorCodes.Interrupted) { + if (e.code === ErrorCodes.Interrupted || e.code === ErrorCodes.SnapshotUnavailable) { hasTransientError = true; return true; } @@ -310,23 +292,15 @@ debugInfo.push({ "node": session.getClient(), "session": session, - "startTransaction": clusterTime + "readAtClusterTime": clusterTime }); - session.startTransaction( - {readConcern: {level: 'snapshot', atClusterTime: clusterTime}}); } hasTransientError = false; try { - result = checkCollectionHashesForDB(dbName); + result = checkCollectionHashesForDB(dbName, clusterTime); } catch (e) { - // We abort each of the transactions started on the nodes if one of them returns an - // error while running the dbHash check. - for (let session of sessions) { - session.abortTransaction_forTesting(); - } - if (isTransientError(e)) { debugInfo.push({"transientError": e}); continue; @@ -335,21 +309,6 @@ jsTestLog(debugInfo); throw e; } - - // We then attempt to commit each of the transactions started on the nodes to confirm - // the data we read was actually majority-committed. If one of them returns an error, - // then we still try to commit the transactions started on subsequent nodes in order to - // clear their transaction state. - for (let session of sessions) { - try { - session.commitTransaction(); - } catch (e) { - if (!isTransientError(e)) { - jsTestLog(debugInfo); - throw e; - } - } - } } while (hasTransientError); for (let mismatchInfo of result) { diff --git a/jstests/replsets/dbhash_lock_acquisition.js b/jstests/replsets/dbhash_lock_acquisition.js index f178e5ce4b5..cf26fb6caea 100644 --- a/jstests/replsets/dbhash_lock_acquisition.js +++ b/jstests/replsets/dbhash_lock_acquisition.js @@ -1,11 +1,14 @@ /** - * Tests that the dbHash command acquires IX mode locks on the global, database, and collection - * resources when running inside a multi-statement transaction. + * Tests that the dbHash command acquires IS mode locks on the global, database, and collection + * resources when reading a timestamp using the $_internalReadAtClusterTime option. + * * @tags: [uses_transactions] */ (function() { "use strict"; + load("jstests/libs/parallelTester.js"); // for ScopedThread + const rst = new ReplSetTest({nodes: 1}); rst.startSet(); rst.initiate(); @@ -16,31 +19,77 @@ const session = primary.startSession({causalConsistency: false}); const sessionDB = session.getDatabase(db.getName()); - function assertTransactionAcquiresIXLocks(txnOptions) { - session.startTransaction(txnOptions); + // We insert a document so the dbHash command has a collection to process. + assert.commandWorked(sessionDB.mycoll.insert({}, {writeConcern: {w: "majority"}})); + const clusterTime = session.getOperationTime(); + + // We then start a transaction in order to be able have a catalog operation queue up behind it. + session.startTransaction(); + assert.commandWorked(sessionDB.mycoll.insert({})); - assert.commandWorked(sessionDB.runCommand({dbHash: 1})); - const ops = db.currentOp({"lsid.id": session.getSessionId().id}).inprog; - assert.eq(1, - ops.length, - () => "Failed to find session in currentOp() output: " + tojson(db.currentOp())); - assert.eq(ops[0].locks, {Global: "w", Database: "w", Collection: "w"}); + const ops = db.currentOp({"lsid.id": session.getSessionId().id}).inprog; + assert.eq(1, + ops.length, + () => "Failed to find session in currentOp() output: " + tojson(db.currentOp())); + assert.eq(ops[0].locks, {Global: "w", Database: "w", Collection: "w"}); - session.abortTransaction_forTesting(); - } + const threadCaptruncCmd = new ScopedThread(function(host) { + try { + const conn = new Mongo(host); + const db = conn.getDB("test"); - // We insert a document so the dbHash command has a collection to process. - assert.commandWorked(db.mycoll.insert({}, {writeConcern: {w: "majority"}})); - - assertTransactionAcquiresIXLocks({}); - assertTransactionAcquiresIXLocks({readConcern: {level: "local"}}); - assertTransactionAcquiresIXLocks({readConcern: {level: "snapshot"}}); - assertTransactionAcquiresIXLocks({ - readConcern: { - level: "snapshot", - atClusterTime: session.getOperationTime(), + // We use the captrunc command as a catalog operation that requires a MODE_X lock on the + // collection. This ensures we aren't having the dbHash command queue up behind it on a + // database-level lock. The collection isn't capped so it'll fail with an + // IllegalOperation error response. + assert.commandFailedWithCode(db.runCommand({captrunc: "mycoll", n: 1}), + ErrorCodes.IllegalOperation); + return {ok: 1}; + } catch (e) { + return {ok: 0, error: e.toString(), stack: e.stack}; } - }); + }, db.getMongo().host); + + threadCaptruncCmd.start(); + + assert.soon(() => { + const ops = db.currentOp({"command.captrunc": "mycoll", waitingForLock: true}).inprog; + return ops.length === 1; + }, () => "Failed to find create collection in currentOp() output: " + tojson(db.currentOp())); + + const threadDBHash = new ScopedThread(function(host, clusterTime) { + try { + const conn = new Mongo(host); + const db = conn.getDB("test"); + assert.commandWorked(db.runCommand({ + dbHash: 1, + $_internalReadAtClusterTime: eval(clusterTime), + })); + return {ok: 1}; + } catch (e) { + return {ok: 0, error: e.toString(), stack: e.stack}; + } + }, db.getMongo().host, tojson(clusterTime)); + + threadDBHash.start(); + + assert.soon(() => { + const ops = db.currentOp({"command.dbHash": 1, waitingForLock: true}).inprog; + if (ops.length === 0) { + return false; + } + // The lock mode for the Global resource is reported as "w" rather than "r" because of the + // mode taken for the ReplicationStateTransitionLock when acquiring the global lock. + assert.eq(ops[0].locks, {Global: "w", Database: "r", Collection: "r"}); + return true; + }, () => "Failed to find create collection in currentOp() output: " + tojson(db.currentOp())); + + session.commitTransaction(); + threadCaptruncCmd.join(); + threadDBHash.join(); + + assert.commandWorked(threadCaptruncCmd.returnData()); + assert.commandWorked(threadDBHash.returnData()); session.endSession(); rst.stopSet(); diff --git a/jstests/replsets/dbhash_read_at_cluster_time.js b/jstests/replsets/dbhash_read_at_cluster_time.js new file mode 100644 index 00000000000..252b5004004 --- /dev/null +++ b/jstests/replsets/dbhash_read_at_cluster_time.js @@ -0,0 +1,120 @@ +/** + * Tests that "$_internalReadAtClusterTime" is supported by the "dbHash" command. + * + * @tags: [uses_transactions] + */ +(function() { + "use strict"; + + const rst = new ReplSetTest({nodes: 2}); + rst.startSet(); + + const replSetConfig = rst.getReplSetConfig(); + replSetConfig.members[1].priority = 0; + rst.initiate(replSetConfig); + + const primary = rst.getPrimary(); + const secondary = rst.getSecondary(); + + const session = primary.startSession({causalConsistency: false}); + const db = session.getDatabase("test"); + let txnNumber = 0; + + // We prevent the replica set from advancing oldest_timestamp. This ensures that the snapshot + // associated with 'clusterTime' is retained for the duration of this test. + rst.nodes.forEach(conn => { + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "WTPreserveSnapshotHistoryIndefinitely", + mode: "alwaysOn", + })); + }); + + // We insert a document and save the md5sum associated with the opTime of that write. + assert.commandWorked(db.mycoll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + const clusterTime = db.getSession().getOperationTime(); + + let res = assert.commandWorked(db.runCommand({ + dbHash: 1, + $_internalReadAtClusterTime: clusterTime, + })); + + const hash1 = {collections: res.collections, md5: res.md5}; + + // We insert another document to ensure the collection's contents have a different md5sum now. + // We use a w=majority write concern to ensure that the insert has also been applied on the + // secondary by the time we go to run the dbHash command later. This avoids a race where the + // replication subsystem could be applying the insert operation when the dbHash command is run + // on the secondary. + assert.commandWorked(db.mycoll.insert({_id: 2}, {writeConcern: {w: "majority"}})); + + // However, using $_internalReadAtClusterTime to read at the opTime of the first insert should + // return the same md5sum as it did originally. + res = assert.commandWorked(db.runCommand({ + dbHash: 1, + $_internalReadAtClusterTime: clusterTime, + })); + + const hash2 = {collections: res.collections, md5: res.md5}; + assert.eq(hash1, hash2, "primary returned different dbhash after second insert"); + + { + const secondarySession = secondary.startSession({causalConsistency: false}); + const secondaryDB = secondarySession.getDatabase("test"); + + // Using $_internalReadAtClusterTime to read at the opTime of the first insert should return + // the same md5sum on the secondary as it did on the primary. + res = assert.commandWorked(secondaryDB.runCommand({ + dbHash: 1, + $_internalReadAtClusterTime: clusterTime, + })); + + const secondaryHash = {collections: res.collections, md5: res.md5}; + assert.eq(hash1, secondaryHash, "primary and secondary have different dbhash"); + } + + { + const otherSession = primary.startSession({causalConsistency: false}); + const otherDB = otherSession.getDatabase("test"); + + // We perform another insert inside a separate transaction to cause a MODE_IX lock to be + // held on the collection. + otherSession.startTransaction(); + assert.commandWorked(otherDB.mycoll.insert({_id: 3})); + + // It should be possible to run the "dbHash" command with "$_internalReadAtClusterTime" + // concurrently. + res = assert.commandWorked(db.runCommand({ + dbHash: 1, + $_internalReadAtClusterTime: clusterTime, + })); + + const hash3 = {collections: res.collections, md5: res.md5}; + assert.eq(hash1, hash3, "primary returned different dbhash after third insert"); + + // However, the "dbHash" command should block behind the transaction if + // "$_internalReadAtClusterTime" wasn't specified. + res = assert.commandFailedWithCode(db.runCommand({dbHash: 1, maxTimeMS: 1000}), + ErrorCodes.MaxTimeMSExpired); + + otherSession.abortTransaction_forTesting(); + otherSession.endSession(); + } + + { + const otherSession = primary.startSession({causalConsistency: false}); + const otherDB = otherSession.getDatabase("test"); + + // We create another collection inside a separate session to modify the collection catalog + // at an opTime later than 'clusterTime'. This prevents further usage of the snapshot + // associated with 'clusterTime' for snapshot reads. + assert.commandWorked(otherDB.runCommand({create: "mycoll2"})); + assert.commandFailedWithCode( + db.runCommand({dbHash: 1, $_internalReadAtClusterTime: clusterTime}), + ErrorCodes.SnapshotUnavailable); + + otherSession.endSession(); + } + + session.endSession(); + rst.stopSet(); +})(); diff --git a/jstests/replsets/read_at_cluster_time_outside_transactions.js b/jstests/replsets/read_at_cluster_time_outside_transactions.js new file mode 100644 index 00000000000..3df9e09976d --- /dev/null +++ b/jstests/replsets/read_at_cluster_time_outside_transactions.js @@ -0,0 +1,99 @@ +/** + * Tests that the "find" and "dbHash" commands support reading at a Timestamp by using the + * $_internalReadAtClusterTime option. + */ +(function() { + "use strict"; + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + const db = primary.getDB("test"); + + const collName = "read_at_cluster_time_outside_transactions"; + const collection = db[collName]; + + // We prevent the replica set from advancing oldest_timestamp. This ensures that the snapshot + // associated with 'clusterTime' is retained for the duration of this test. + rst.nodes.forEach(conn => { + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "WTPreserveSnapshotHistoryIndefinitely", + mode: "alwaysOn", + })); + }); + + // We insert 3 documents in order to have data to return for both the find and getMore commands + // when using a batch size of 2. We then save the md5sum associated with the opTime of the last + // insert. + assert.commandWorked(collection.insert({_id: 1, comment: "should be seen by find command"})); + assert.commandWorked(collection.insert({_id: 3, comment: "should be seen by find command"})); + assert.commandWorked(collection.insert({_id: 5, comment: "should be seen by getMore command"})); + + const clusterTime = db.getSession().getOperationTime(); + + let res = assert.commandWorked(db.runCommand({dbHash: 1})); + const hashAfterOriginalInserts = {collections: res.collections, md5: res.md5}; + + // The documents with _id=1 and _id=3 should be returned by the find command. + let cursor = collection.find().sort({_id: 1}).batchSize(2); + assert.eq({_id: 1, comment: "should be seen by find command"}, cursor.next()); + assert.eq({_id: 3, comment: "should be seen by find command"}, cursor.next()); + + // We then insert documents with _id=2 and _id=4. The document with _id=2 is positioned behind + // the _id index cursor and won't be returned by the getMore command. However, the document with + // _id=4 is positioned ahead and should end up being returned. + assert.commandWorked( + collection.insert({_id: 2, comment: "should not be seen by getMore command"})); + assert.commandWorked( + collection.insert({_id: 4, comment: "should be seen by non-snapshot getMore command"})); + assert.eq({_id: 4, comment: "should be seen by non-snapshot getMore command"}, cursor.next()); + assert.eq({_id: 5, comment: "should be seen by getMore command"}, cursor.next()); + assert(!cursor.hasNext()); + + // When using the $_internalReadAtClusterTime option with a clusterTime from after the + // original 3 documents were inserted, the document with _id=2 shouldn't be visible to the find + // command because it was inserted afterwards. The same applies to the document with _id=4 and + // the getMore command. + res = collection.runCommand("find", { + batchSize: 2, + sort: {_id: 1}, + $_internalReadAtClusterTime: clusterTime, + }); + + const batchSize = 2; + cursor = new DBCommandCursor(db, res, batchSize); + assert.eq({_id: 1, comment: "should be seen by find command"}, cursor.next()); + assert.eq({_id: 3, comment: "should be seen by find command"}, cursor.next()); + assert.eq({_id: 5, comment: "should be seen by getMore command"}, cursor.next()); + assert(!cursor.hasNext()); + + // Using the $_internalReadAtClusterTime option to read at the opTime of the last of the 3 + // original inserts should return the same md5sum as it did originally. + res = assert.commandWorked(db.runCommand({ + dbHash: 1, + $_internalReadAtClusterTime: clusterTime, + })); + + const hashAtClusterTime = {collections: res.collections, md5: res.md5}; + assert.eq(hashAtClusterTime, hashAfterOriginalInserts); + + // Attempting to read at a clusterTime in the future should return an error. + const futureClusterTime = new Timestamp(clusterTime.getTime() + 1000, 1); + + assert.commandFailedWithCode(collection.runCommand("find", { + batchSize: 2, + sort: {_id: 1}, + $_internalReadAtClusterTime: futureClusterTime, + }), + ErrorCodes.InvalidOptions); + + assert.commandFailedWithCode(db.runCommand({ + dbHash: 1, + $_internalReadAtClusterTime: futureClusterTime, + }), + ErrorCodes.InvalidOptions); + + rst.stopSet(); +})(); diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index c4323d6a36f..507dbe78f82 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -129,10 +129,6 @@ const StringMap<int> txnCmdWhitelist = {{"abortTransaction", 1}, {"voteAbortTransaction", 1}, {"voteCommitTransaction", 1}}; -// The command names that are allowed in a multi-document transaction only when test commands are -// enabled. -const StringMap<int> txnCmdForTestingWhitelist = {{"dbHash", 1}}; - // The commands that can be run on the 'admin' database in multi-document transactions. const StringMap<int> txnAdminCommands = {{"abortTransaction", 1}, @@ -451,9 +447,7 @@ Status CommandHelpers::canUseTransactions(StringData dbName, StringData cmdName) "http://dochub.mongodb.org/core/transaction-count for a recommended alternative."}; } - if (txnCmdWhitelist.find(cmdName) == txnCmdWhitelist.cend() && - !(getTestCommandsEnabled() && - txnCmdForTestingWhitelist.find(cmdName) != txnCmdForTestingWhitelist.cend())) { + if (txnCmdWhitelist.find(cmdName) == txnCmdWhitelist.cend()) { return {ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "Cannot run '" << cmdName << "' in a multi-document transaction."}; } diff --git a/src/mongo/db/commands/dbhash.cpp b/src/mongo/db/commands/dbhash.cpp index 669f707111a..0ed79280f17 100644 --- a/src/mongo/db/commands/dbhash.cpp +++ b/src/mongo/db/commands/dbhash.cpp @@ -43,10 +43,14 @@ #include "mongo/db/catalog/database_catalog_entry.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/logical_clock.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/internal_plans.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/storage/storage_engine.h" #include "mongo/db/transaction_participant.h" #include "mongo/stdx/mutex.h" #include "mongo/util/log.h" @@ -70,13 +74,6 @@ public: return ReadWriteType::kRead; } - bool supportsReadConcern(const std::string& dbName, - const BSONObj& cmdObj, - repl::ReadConcernLevel level) const override { - return level == repl::ReadConcernLevel::kLocalReadConcern || - level == repl::ReadConcernLevel::kSnapshotReadConcern; - } - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kAlways; } @@ -114,14 +111,80 @@ public: str::stream() << "Invalid db name: " << ns, NamespaceString::validDBName(ns, NamespaceString::DollarInDbNameBehavior::Allow)); + if (auto elem = cmdObj["$_internalReadAtClusterTime"]) { + uassert(ErrorCodes::InvalidOptions, + "The '$_internalReadAtClusterTime' option is only supported when testing" + " commands are enabled", + getTestCommandsEnabled()); + + auto* replCoord = repl::ReplicationCoordinator::get(opCtx); + uassert(ErrorCodes::InvalidOptions, + "The '$_internalReadAtClusterTime' option is only supported when replication is" + " enabled", + replCoord->isReplEnabled()); + + auto* storageEngine = opCtx->getServiceContext()->getStorageEngine(); + uassert(ErrorCodes::InvalidOptions, + "The '$_internalReadAtClusterTime' option is only supported by storage engines" + " that support document-level concurrency", + storageEngine->supportsDocLocking()); + + uassert(ErrorCodes::TypeMismatch, + "The '$_internalReadAtClusterTime' option must be a Timestamp", + elem.type() == BSONType::bsonTimestamp); + + auto targetClusterTime = elem.timestamp(); + + // We aren't holding the global lock in intent mode, so it is possible after comparing + // 'targetClusterTime' to 'lastAppliedOpTime' for the last applied opTime to go + // backwards or for the term to change due to replication rollback. This isn't an actual + // concern because the testing infrastructure won't use the $_internalReadAtClusterTime + // option in any test suite where rollback is expected to occur. + auto lastAppliedOpTime = replCoord->getMyLastAppliedOpTime(); + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "$_internalReadAtClusterTime value must not be greater" + " than the last applied opTime. Requested clusterTime: " + << targetClusterTime.toString() + << "; last applied opTime: " + << lastAppliedOpTime.toString(), + lastAppliedOpTime.getTimestamp() >= targetClusterTime); + + // We aren't holding the global lock in intent mode, so it is possible for the global + // storage engine to have been destructed already as a result of the server shutting + // down. This isn't an actual concern because the testing infrastructure won't use the + // $_internalReadAtClusterTime option in any test suite where clean shutdown is expected + // to occur concurrently with tests running. + auto allCommittedTime = storageEngine->getAllCommittedTimestamp(); + invariant(!allCommittedTime.isNull()); + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "$_internalReadAtClusterTime value must not be greater" + " than the all-committed timestamp. Requested clusterTime: " + << targetClusterTime.toString() + << "; all-committed timestamp: " + << allCommittedTime.toString(), + allCommittedTime >= targetClusterTime); + + // The $_internalReadAtClusterTime option causes any storage-layer cursors created + // during plan execution to read from a consistent snapshot of data at the supplied + // clusterTime, even across yields. + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + targetClusterTime); + + // The $_internalReadAtClusterTime option also causes any storage-layer cursors created + // during plan execution to block on prepared transactions. + opCtx->recoveryUnit()->setIgnorePrepared(false); + } + // We lock the entire database in S-mode in order to ensure that the contents will not // change for the snapshot. auto lockMode = LockMode::MODE_S; - auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant && txnParticipant->inMultiDocumentTransaction()) { - // However, if we are inside a multi-statement transaction, then we only need to lock - // the database in intent mode to ensure that none of the collections get dropped. - lockMode = getLockModeForQuery(opCtx, boost::none); + if (opCtx->recoveryUnit()->getTimestampReadSource() == + RecoveryUnit::ReadSource::kProvided) { + // However, if we are performing a read at a timestamp, then we only need to lock the + // database in intent mode to ensure that none of the collections get dropped. + lockMode = LockMode::MODE_IS; } AutoGetDb autoDb(opCtx, ns, lockMode); Database* db = autoDb.getDb(); @@ -220,16 +283,14 @@ private: return ""; boost::optional<Lock::CollectionLock> collLock; - auto txnParticipant = TransactionParticipant::get(opCtx); - if (txnParticipant && txnParticipant->inMultiDocumentTransaction()) { - // When inside a multi-statement transaction, we are only holding the database lock in + if (opCtx->recoveryUnit()->getTimestampReadSource() == + RecoveryUnit::ReadSource::kProvided) { + // When performing a read at a timestamp, we are only holding the database lock in // intent mode. We need to also acquire the collection lock in intent mode to ensure // reading from the consistent snapshot doesn't overlap with any catalog operations on // the collection. - invariant( - opCtx->lockState()->isDbLockedForMode(db->name(), getLockModeForQuery(opCtx, ns))); - collLock.emplace( - opCtx->lockState(), fullCollectionName, getLockModeForQuery(opCtx, ns)); + invariant(opCtx->lockState()->isDbLockedForMode(db->name(), MODE_IS)); + collLock.emplace(opCtx->lockState(), fullCollectionName, MODE_IS); auto minSnapshot = collection->getMinimumVisibleSnapshot(); auto mySnapshot = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 0fde452ab39..a13d09d2fb7 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -37,6 +37,7 @@ #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/commands/run_aggregate.h" +#include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/db_raii.h" @@ -53,6 +54,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/server_read_concern_metrics.h" +#include "mongo/db/storage/storage_engine.h" #include "mongo/db/transaction_participant.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/log.h" @@ -262,12 +264,76 @@ public: !txnParticipant->inActiveOrKilledMultiDocumentTransaction() || !qr->isReadOnce()); + uassert(ErrorCodes::InvalidOptions, + "The '$_internalReadAtClusterTime' option is only supported when testing" + " commands are enabled", + !qr->getReadAtClusterTime() || getTestCommandsEnabled()); + + uassert(ErrorCodes::InvalidOptions, + "The '$_internalReadAtClusterTime' option is only supported when replication is" + " enabled", + !qr->getReadAtClusterTime() || replCoord->isReplEnabled()); + + auto* storageEngine = opCtx->getServiceContext()->getStorageEngine(); + uassert(ErrorCodes::InvalidOptions, + "The '$_internalReadAtClusterTime' option is only supported by storage engines" + " that support document-level concurrency", + !qr->getReadAtClusterTime() || storageEngine->supportsDocLocking()); + // Validate term before acquiring locks, if provided. if (auto term = qr->getReplicationTerm()) { // Note: updateTerm returns ok if term stayed the same. uassertStatusOK(replCoord->updateTerm(opCtx, *term)); } + // We call RecoveryUnit::setTimestampReadSource() before acquiring a lock on the + // collection via AutoGetCollectionForRead in order to ensure the comparison to the + // collection's minimum visible snapshot is accurate. + if (auto targetClusterTime = qr->getReadAtClusterTime()) { + // We aren't holding the global lock in intent mode, so it is possible after + // comparing 'targetClusterTime' to 'lastAppliedOpTime' for the last applied opTime + // to go backwards or for the term to change due to replication rollback. This isn't + // an actual concern because the testing infrastructure won't use the + // $_internalReadAtClusterTime option in any test suite where rollback is expected + // to occur. + auto lastAppliedOpTime = replCoord->getMyLastAppliedOpTime(); + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "$_internalReadAtClusterTime value must not be greater" + " than the last applied opTime. Requested clusterTime: " + << targetClusterTime->toString() + << "; last applied opTime: " + << lastAppliedOpTime.toString(), + lastAppliedOpTime.getTimestamp() >= targetClusterTime); + + // We aren't holding the global lock in intent mode, so it is possible for the + // global storage engine to have been destructed already as a result of the server + // shutting down. This isn't an actual concern because the testing infrastructure + // won't use the $_internalReadAtClusterTime option in any test suite where clean + // shutdown is expected to occur concurrently with tests running. + auto allCommittedTime = storageEngine->getAllCommittedTimestamp(); + invariant(!allCommittedTime.isNull()); + + uassert(ErrorCodes::InvalidOptions, + str::stream() << "$_internalReadAtClusterTime value must not be greater" + " than the all-committed timestamp. Requested" + " clusterTime: " + << targetClusterTime->toString() + << "; all-committed timestamp: " + << allCommittedTime.toString(), + allCommittedTime >= targetClusterTime); + + // The $_internalReadAtClusterTime option causes any storage-layer cursors created + // during plan execution to read from a consistent snapshot of data at the supplied + // clusterTime, even across yields. + opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, + targetClusterTime); + + // The $_internalReadAtClusterTime option also causes any storage-layer cursors + // created during plan execution to block on prepared transactions. + opCtx->recoveryUnit()->setIgnorePrepared(false); + } + // Acquire locks. If the query is on a view, we release our locks and convert the query // request into an aggregation command. boost::optional<AutoGetCollectionForReadCommand> ctx; diff --git a/src/mongo/db/commands/getmore_cmd.cpp b/src/mongo/db/commands/getmore_cmd.cpp index 350383d706f..3c6f5c5ad5b 100644 --- a/src/mongo/db/commands/getmore_cmd.cpp +++ b/src/mongo/db/commands/getmore_cmd.cpp @@ -307,6 +307,32 @@ public: auto cursorManager = CursorManager::get(opCtx); auto cursorPin = uassertStatusOK(cursorManager->pinCursor(opCtx, _request.cursorid)); + { + // We call RecoveryUnit::setTimestampReadSource() before acquiring a lock on the + // collection via AutoGetCollectionForRead in order to ensure the comparison to the + // collection's minimum visible snapshot is accurate. + PlanExecutor* exec = cursorPin->getExecutor(); + const auto* cq = exec->getCanonicalQuery(); + + if (auto clusterTime = + (cq ? cq->getQueryRequest().getReadAtClusterTime() : boost::none)) { + // We don't compare 'clusterTime' to the last applied opTime or to the + // all-committed timestamp because the testing infrastructure won't use the + // $_internalReadAtClusterTime option in any test suite where rollback is + // expected to occur. + + // The $_internalReadAtClusterTime option causes any storage-layer cursors + // created during plan execution to read from a consistent snapshot of data at + // the supplied clusterTime, even across yields. + opCtx->recoveryUnit()->setTimestampReadSource( + RecoveryUnit::ReadSource::kProvided, clusterTime); + + // The $_internalReadAtClusterTime option also causes any storage-layer cursors + // created during plan execution to block on prepared transactions. + opCtx->recoveryUnit()->setIgnorePrepared(false); + } + } + if (cursorPin->lockPolicy() == ClientCursorParams::LockPolicy::kLocksInternally) { if (!_request.nss.isCollectionlessCursorNamespace()) { const boost::optional<int> dbProfilingLevel = boost::none; diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index 7f02487ff7f..42c338bb5ec 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -103,6 +103,7 @@ const char kTermField[] = "term"; const char kOptionsField[] = "options"; const char kReadOnceField[] = "readOnce"; const char kAllowSpeculativeMajorityReadField[] = "allowSpeculativeMajorityRead"; +const char kInternalReadAtClusterTimeField[] = "$_internalReadAtClusterTime"; // Field names for sorting options. const char kNaturalSortField[] = "$natural"; @@ -381,6 +382,12 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p return status; } qr->_allowSpeculativeMajorityRead = el.boolean(); + } else if (fieldName == kInternalReadAtClusterTimeField) { + Status status = checkFieldType(el, BSONType::bsonTimestamp); + if (!status.isOK()) { + return status; + } + qr->_internalReadAtClusterTime = el.timestamp(); } else if (!isGenericArgument(fieldName)) { return Status(ErrorCodes::FailedToParse, str::stream() << "Failed to parse: " << cmdObj.toString() << ". " @@ -549,6 +556,10 @@ void QueryRequest::asFindCommandInternal(BSONObjBuilder* cmdBuilder) const { if (_allowSpeculativeMajorityRead) { cmdBuilder->append(kAllowSpeculativeMajorityReadField, true); } + + if (_internalReadAtClusterTime) { + cmdBuilder->append(kInternalReadAtClusterTimeField, *_internalReadAtClusterTime); + } } void QueryRequest::addReturnKeyMetaProj() { @@ -1044,6 +1055,12 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const { << " not supported in aggregation."}; } + if (_internalReadAtClusterTime) { + return {ErrorCodes::InvalidPipelineOperator, + str::stream() << "Option " << kInternalReadAtClusterTimeField + << " not supported in aggregation."}; + } + // Now that we've successfully validated this QR, begin building the aggregation command. aggregationBuilder.append("aggregate", _nss.coll()); diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h index 1d23544f7cd..8028ba452c0 100644 --- a/src/mongo/db/query/query_request.h +++ b/src/mongo/db/query/query_request.h @@ -394,6 +394,10 @@ public: return _allowSpeculativeMajorityRead; } + boost::optional<Timestamp> getReadAtClusterTime() const { + return _internalReadAtClusterTime; + } + /** * Return options as a bit vector. */ @@ -522,6 +526,10 @@ private: bool _allowSpeculativeMajorityRead = false; boost::optional<long long> _replicationTerm; + + // The Timestamp that RecoveryUnit::setTimestampReadSource() should be called with. The optional + // should only ever be engaged when testing commands are enabled. + boost::optional<Timestamp> _internalReadAtClusterTime; }; } // namespace mongo diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index b0e95e61b12..19ae87ccca6 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -50,7 +50,6 @@ const StringMap<int> sessionCheckOutList = {{"abortTransaction", 1}, {"applyOps", 1}, {"commitTransaction", 1}, {"count", 1}, - {"dbHash", 1}, {"delete", 1}, {"distinct", 1}, {"doTxn", 1}, diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index ea8766968e6..9efa8ac7f2e 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -1494,11 +1494,16 @@ var ReplSetTest = function(opts) { this.getHashesUsingSessions = function(sessions, dbName, { filterCapped: filterCapped = true, - filterMapReduce: filterMapReduce = true, + filterMapReduce: filterMapReduce = true, readAtClusterTime, } = {}) { return sessions.map(session => { + const commandObj = {dbHash: 1}; + if (readAtClusterTime !== undefined) { + commandObj.$_internalReadAtClusterTime = readAtClusterTime; + } + const db = session.getDatabase(dbName); - const res = assert.commandWorked(db.runCommand({dbHash: 1})); + const res = assert.commandWorked(db.runCommand(commandObj)); // The "capped" field in the dbHash command response is new as of MongoDB 4.0. const cappedCollections = new Set(filterCapped ? res.capped : []); @@ -1528,7 +1533,7 @@ var ReplSetTest = function(opts) { }; this.getCollectionDiffUsingSessions = function( - primarySession, secondarySession, dbName, collNameOrUUID) { + primarySession, secondarySession, dbName, collNameOrUUID, readAtClusterTime) { function PeekableCursor(cursor) { let _stashedDoc; @@ -1557,11 +1562,16 @@ var ReplSetTest = function(opts) { const primaryDB = primarySession.getDatabase(dbName); const secondaryDB = secondarySession.getDatabase(dbName); - const primaryCursor = new PeekableCursor(new DBCommandCursor( - primaryDB, primaryDB.runCommand({find: collNameOrUUID, sort: {_id: 1}}))); + const commandObj = {find: collNameOrUUID, sort: {_id: 1}}; + if (readAtClusterTime !== undefined) { + commandObj.$_internalReadAtClusterTime = readAtClusterTime; + } + + const primaryCursor = + new PeekableCursor(new DBCommandCursor(primaryDB, primaryDB.runCommand(commandObj))); - const secondaryCursor = new PeekableCursor(new DBCommandCursor( - secondaryDB, secondaryDB.runCommand({find: collNameOrUUID, sort: {_id: 1}}))); + const secondaryCursor = new PeekableCursor( + new DBCommandCursor(secondaryDB, secondaryDB.runCommand(commandObj))); while (primaryCursor.hasNext() && secondaryCursor.hasNext()) { const primaryDoc = primaryCursor.peekNext(); |