diff options
-rw-r--r-- | jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js | 16 | ||||
-rw-r--r-- | jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js | 75 | ||||
-rw-r--r-- | jstests/noPassthrough/read_concern_snapshot_yielding.js | 28 | ||||
-rw-r--r-- | jstests/noPassthrough/snapshot_reads.js | 56 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 83 |
5 files changed, 186 insertions, 72 deletions
diff --git a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js index 1c377720ca8..a94fb0f4737 100644 --- a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js +++ b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js @@ -129,25 +129,13 @@ })); assert.eq(res.cursor.firstBatch.length, 2, printjson(res)); - // A read at a timestamp that is no longer held by the storage engine fails. - // TODO SERVER-31767: Once mongod supports a snapshot window, performing a majority write will - // not be sufficient to make a previous majority commit point stale. - assert.commandWorked( - primaryDB.runCommand({insert: collName, documents: [{}], writeConcern: {w: "majority"}})); - assert.commandFailedWithCode(primaryDB.runCommand({ - find: collName, - readConcern: {level: "snapshot", atClusterTime: clusterTimeAfter}, - txnNumber: NumberLong(primaryTxnNumber++) - }), - ErrorCodes.SnapshotTooOld); - - // A read at a timestamp that is older than our collection catalog min time fails. + // A read at a time that is too old fails. assert.commandFailedWithCode(primaryDB.runCommand({ find: collName, readConcern: {level: "snapshot", atClusterTime: Timestamp(1, 1)}, txnNumber: NumberLong(primaryTxnNumber++) }), - ErrorCodes.SnapshotUnavailable); + ErrorCodes.SnapshotTooOld); rst.stopSet(); }()); diff --git a/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js b/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js new file mode 100644 index 00000000000..1cc7f5078e4 --- /dev/null +++ b/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js @@ -0,0 +1,75 @@ +// Tests that snapshot reads return an error when accessing a collection whose metadata is invalid +// for the snapshot's point in time. +// @tags: [requires_replication] +(function() { + "use strict"; + + const kDbName = "test"; + const kCollName = "coll"; + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + + const testDB = rst.getPrimary().getDB(kDbName); + if (!testDB.serverStatus().storageEngine.supportsSnapshotReadConcern) { + rst.stopSet(); + return; + } + const adminDB = testDB.getSiblingDB("admin"); + const coll = testDB.getCollection(kCollName); + coll.drop(); + + function waitForOp(curOpFilter) { + assert.soon( + function() { + const res = adminDB.aggregate([{$currentOp: {}}, {$match: curOpFilter}]).toArray(); + if (res.length === 1) { + return true; + } + return false; + }, + function() { + return "Failed to find operation in $currentOp output: " + + tojson(adminDB.aggregate([{$currentOp: {}}]).toArray()); + }); + } + + assert.commandWorked(coll.insert({x: 1}, {writeConcern: {w: "majority"}})); + + // Start a snapshot find that hangs after establishing a storage engine transaction. + assert.commandWorked(testDB.adminCommand( + {configureFailPoint: "hangAfterPreallocateSnapshot", mode: "alwaysOn"})); + + TestData.sessionId = assert.commandWorked(testDB.adminCommand({startSession: 1})).id; + const awaitCommand = startParallelShell(function() { + const res = db.runCommand({ + find: "coll", + readConcern: {level: "snapshot"}, + lsid: TestData.sessionId, + txnNumber: NumberLong(0) + }); + assert.commandFailedWithCode(res, ErrorCodes.SnapshotUnavailable); + }, rst.ports[0]); + + waitForOp({"command.find": kCollName, "command.readConcern.level": "snapshot"}); + + // Create an index on the collection the find was executed against. This will move the + // collection's minimum visible timestamp to a point later than the point-in-time referenced + // by the find snapshot. + assert.commandWorked(testDB.runCommand({ + createIndexes: kCollName, + indexes: [{key: {x: 1}, name: "x_1"}], + writeConcern: {w: "majority"} + })); + + // Disable the hang and check for parallel shell success. Success indicates that the find + // command failed due to collection metadata invalidation. + assert.commandWorked( + testDB.adminCommand({configureFailPoint: "hangAfterPreallocateSnapshot", mode: "off"})); + + awaitCommand(); + + assert.commandWorked(testDB.adminCommand({endSessions: [TestData.sessionId]})); + rst.stopSet(); +})(); diff --git a/jstests/noPassthrough/read_concern_snapshot_yielding.js b/jstests/noPassthrough/read_concern_snapshot_yielding.js index baeeed5a7bb..777e8ca663c 100644 --- a/jstests/noPassthrough/read_concern_snapshot_yielding.js +++ b/jstests/noPassthrough/read_concern_snapshot_yielding.js @@ -250,6 +250,34 @@ assert.eq(res.cursor.firstBatch.length, TestData.numDocs, tojson(res)); }, {"command.pipeline": [{$match: {x: 1}}]}, {"command.pipeline": [{$match: {x: 1}}]}); + // Test getMore with an initial find batchSize of 0. Interrupt behavior of a getMore is not + // expected to change with a change of batchSize in the originating command. + testCommand(function() { + assert.commandWorked(db.adminCommand( + {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "off"})); + const initialFindBatchSize = 0; + const cursorId = assert + .commandWorked(db.runCommand({ + find: "coll", + filter: {x: 1}, + batchSize: initialFindBatchSize, + readConcern: {level: "snapshot"}, + lsid: TestData.sessionId, + txnNumber: NumberLong(TestData.txnNumber) + })) + .cursor.id; + assert.commandWorked(db.adminCommand( + {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "alwaysOn"})); + const res = assert.commandWorked(db.runCommand({ + getMore: NumberLong(cursorId), + collection: "coll", + lsid: TestData.sessionId, + txnNumber: NumberLong(TestData.txnNumber) + })); + assert.eq( + res.cursor.nextBatch.length, TestData.numDocs - initialFindBatchSize, tojson(res)); + }, {"originatingCommand.filter": {x: 1}}, {op: "getmore"}); + // Test update. // TODO SERVER-33412: Perform writes under autocommit:false transaction. // TODO SERVER-33548: We cannot provide a 'profilerFilter' because profiling is turned off for diff --git a/jstests/noPassthrough/snapshot_reads.js b/jstests/noPassthrough/snapshot_reads.js index 65a37fdc4db..5f6f7774d98 100644 --- a/jstests/noPassthrough/snapshot_reads.js +++ b/jstests/noPassthrough/snapshot_reads.js @@ -49,14 +49,14 @@ cursorCmd.readConcern = {level: "snapshot"}; cursorCmd.txnNumber = NumberLong(txnNumber); - // Establish a snapshot cursor, fetching the first 5 documents. + // Establish a snapshot batchSize:0 cursor. let res = assert.commandWorked(sessionDb.runCommand(cursorCmd)); - assert(res.hasOwnProperty("cursor")); - assert(res.cursor.hasOwnProperty("firstBatch")); - assert.eq(5, res.cursor.firstBatch.length); + assert(res.hasOwnProperty("cursor"), tojson(res)); + assert(res.cursor.hasOwnProperty("firstBatch"), tojson(res)); + assert.eq(0, res.cursor.firstBatch.length, tojson(res)); - assert(res.cursor.hasOwnProperty("id")); + assert(res.cursor.hasOwnProperty("id"), tojson(res)); const cursorId = res.cursor.id; assert.neq(cursorId, 0); @@ -64,35 +64,37 @@ // performed outside of the session. assert.writeOK(primaryDB.coll.insert({_id: 10}, {writeConcern: {w: "majority"}})); - // Fetch the 6th document. This confirms that the transaction stash is preserved across - // multiple getMore invocations. + // Fetch the first 5 documents. res = assert.commandWorked(sessionDb.runCommand({ getMore: cursorId, collection: collName, - batchSize: 1, + batchSize: 5, txnNumber: NumberLong(txnNumber) })); - assert(res.hasOwnProperty("cursor")); - assert(res.cursor.hasOwnProperty("id")); - assert.neq(0, res.cursor.id); - - // Exhaust the cursor, retrieving the remainder of the result set. + assert(res.hasOwnProperty("cursor"), tojson(res)); + assert(res.cursor.hasOwnProperty("id"), tojson(res)); + assert.neq(0, res.cursor.id, tojson(res)); + assert(res.cursor.hasOwnProperty("nextBatch"), tojson(res)); + assert.eq(5, res.cursor.nextBatch.length, tojson(res)); + + // Exhaust the cursor, retrieving the remainder of the result set. Performing a second + // getMore tests snapshot isolation across multiple getMore invocations. res = assert.commandWorked(sessionDb.runCommand({ getMore: cursorId, collection: collName, - batchSize: 10, + batchSize: 20, txnNumber: NumberLong(txnNumber++) })); // The cursor has been exhausted. - assert(res.hasOwnProperty("cursor")); - assert(res.cursor.hasOwnProperty("id")); - assert.eq(0, res.cursor.id); + assert(res.hasOwnProperty("cursor"), tojson(res)); + assert(res.cursor.hasOwnProperty("id"), tojson(res)); + assert.eq(0, res.cursor.id, tojson(res)); - // Only the remaining 4 of the initial 10 documents are returned. The 11th document is not + // Only the remaining 5 of the initial 10 documents are returned. The 11th document is not // part of the result set. - assert(res.cursor.hasOwnProperty("nextBatch")); - assert.eq(4, res.cursor.nextBatch.length); + assert(res.cursor.hasOwnProperty("nextBatch"), tojson(res)); + assert.eq(5, res.cursor.nextBatch.length, tojson(res)); if (readFromSecondary) { rst.awaitLastOpCommitted(); @@ -108,13 +110,13 @@ })); // The cursor has been exhausted. - assert(res.hasOwnProperty("cursor")); - assert(res.cursor.hasOwnProperty("id")); - assert.eq(0, res.cursor.id); + assert(res.hasOwnProperty("cursor"), tojson(res)); + assert(res.cursor.hasOwnProperty("id"), tojson(res)); + assert.eq(0, res.cursor.id, tojson(res)); // All 11 documents are returned. - assert(res.cursor.hasOwnProperty("firstBatch")); - assert.eq(11, res.cursor.firstBatch.length); + assert(res.cursor.hasOwnProperty("firstBatch"), tojson(res)); + assert.eq(11, res.cursor.firstBatch.length, tojson(res)); // Reject snapshot reads without txnNumber. assert.commandFailed(sessionDb.runCommand( @@ -133,14 +135,14 @@ } // Test snapshot reads using find. - let findCmd = {find: collName, sort: {_id: 1}, batchSize: 5}; + let findCmd = {find: collName, sort: {_id: 1}, batchSize: 0}; runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: findCmd}); runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: findCmd}); runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: findCmd}); runTest({useCausalConsistency: true, readFromSecondary: true, establishCursorCmd: findCmd}); // Test snapshot reads using aggregate. - let aggCmd = {aggregate: collName, pipeline: [{$sort: {_id: 1}}], cursor: {batchSize: 5}}; + let aggCmd = {aggregate: collName, pipeline: [{$sort: {_id: 1}}], cursor: {batchSize: 0}}; runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: aggCmd}); runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: aggCmd}); runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: aggCmd}); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index b20552fd234..db36244520e 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -271,6 +271,9 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( // will be allowed to commit. MONGO_FP_DECLARE(onPrimaryTransactionalWrite); +// Failpoint which will pause an operation just after allocating a point-in-time storage engine +// transaction. +MONGO_FP_DECLARE(hangAfterPreallocateSnapshot); } // namespace const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); @@ -605,41 +608,59 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { return; } - // We must lock the Client to change the Locker on the OperationContext and the Session mutex to - // access Session state. We must lock the Client before the Session mutex, since the Client - // effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go - // away, and then lock the Session owned by that client. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - stdx::lock_guard<stdx::mutex> lg(_mutex); - if (opCtx->getTxnNumber() < _activeTxnNumber) { - // The session is checked out, so _activeTxnNumber cannot advance due to a user operation. - // However, when a chunk is migrated, session and transaction information is copied from the - // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead - // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort - // the current transaction. Note that it would indicate a user bug to have a newer - // transaction on one shard while an older transaction is still active on another shard. - _releaseStashedTransactionResources(lg); - uasserted(ErrorCodes::TransactionAborted, - str::stream() << "Transaction aborted. Active txnNumber is now " - << _activeTxnNumber); - return; - } + bool snapshotPreallocated = false; + { + // We must lock the Client to change the Locker on the OperationContext and the Session + // mutex to access Session state. We must lock the Client before the Session mutex, since + // the Client effectively owns the Session. That is, a user might lock the Client to ensure + // it doesn't go away, and then lock the Session owned by that client. + stdx::lock_guard<Client> lk(*opCtx->getClient()); + stdx::lock_guard<stdx::mutex> lg(_mutex); + if (opCtx->getTxnNumber() < _activeTxnNumber) { + // The session is checked out, so _activeTxnNumber cannot advance due to a user + // operation. + // However, when a chunk is migrated, session and transaction information is copied from + // the donor shard to the recipient. This occurs outside of the check-out mechanism and + // can lead to a higher _activeTxnNumber during the lifetime of a checkout. If that + // occurs, we abort the current transaction. Note that it would indicate a user bug to + // have a newer transaction on one shard while an older transaction is still active on + // another shard. + _releaseStashedTransactionResources(lg); + uasserted(ErrorCodes::TransactionAborted, + str::stream() << "Transaction aborted. Active txnNumber is now " + << _activeTxnNumber); + return; + } - if (_txnResourceStash) { - invariant(_txnState != MultiDocumentTransactionState::kNone); - _txnResourceStash->release(opCtx); - _txnResourceStash = boost::none; - } else { - auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || - _txnState == MultiDocumentTransactionState::kInProgress) { - opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx)); - if (_txnState != MultiDocumentTransactionState::kInProgress) { - invariant(_txnState == MultiDocumentTransactionState::kNone); - _txnState = MultiDocumentTransactionState::kInSnapshotRead; + if (_txnResourceStash) { + invariant(_txnState != MultiDocumentTransactionState::kNone); + _txnResourceStash->release(opCtx); + _txnResourceStash = boost::none; + } else { + auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); + if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern || + _txnState == MultiDocumentTransactionState::kInProgress) { + opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx)); + + // Storage engine transactions may be started in a lazy manner. By explicitly + // starting here we ensure that a point-in-time snapshot is established during the + // first operation of a transaction. + opCtx->recoveryUnit()->preallocateSnapshot(); + snapshotPreallocated = true; + + if (_txnState != MultiDocumentTransactionState::kInProgress) { + invariant(_txnState == MultiDocumentTransactionState::kNone); + _txnState = MultiDocumentTransactionState::kInSnapshotRead; + } } } } + + if (snapshotPreallocated) { + // The Client lock must not be held when executing this failpoint as it will block currentOp + // execution. + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterPreallocateSnapshot); + } } void Session::abortIfSnapshotRead(TxnNumber txnNumber) { |