summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js16
-rw-r--r--jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js75
-rw-r--r--jstests/noPassthrough/read_concern_snapshot_yielding.js28
-rw-r--r--jstests/noPassthrough/snapshot_reads.js56
-rw-r--r--src/mongo/db/session.cpp83
5 files changed, 186 insertions, 72 deletions
diff --git a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
index 1c377720ca8..a94fb0f4737 100644
--- a/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
+++ b/jstests/noPassthrough/readConcern_atClusterTime_snapshot_selection.js
@@ -129,25 +129,13 @@
}));
assert.eq(res.cursor.firstBatch.length, 2, printjson(res));
- // A read at a timestamp that is no longer held by the storage engine fails.
- // TODO SERVER-31767: Once mongod supports a snapshot window, performing a majority write will
- // not be sufficient to make a previous majority commit point stale.
- assert.commandWorked(
- primaryDB.runCommand({insert: collName, documents: [{}], writeConcern: {w: "majority"}}));
- assert.commandFailedWithCode(primaryDB.runCommand({
- find: collName,
- readConcern: {level: "snapshot", atClusterTime: clusterTimeAfter},
- txnNumber: NumberLong(primaryTxnNumber++)
- }),
- ErrorCodes.SnapshotTooOld);
-
- // A read at a timestamp that is older than our collection catalog min time fails.
+ // A read at a time that is too old fails.
assert.commandFailedWithCode(primaryDB.runCommand({
find: collName,
readConcern: {level: "snapshot", atClusterTime: Timestamp(1, 1)},
txnNumber: NumberLong(primaryTxnNumber++)
}),
- ErrorCodes.SnapshotUnavailable);
+ ErrorCodes.SnapshotTooOld);
rst.stopSet();
}());
diff --git a/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js b/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js
new file mode 100644
index 00000000000..1cc7f5078e4
--- /dev/null
+++ b/jstests/noPassthrough/read_concern_snapshot_catalog_invalidation.js
@@ -0,0 +1,75 @@
+// Tests that snapshot reads return an error when accessing a collection whose metadata is invalid
+// for the snapshot's point in time.
+// @tags: [requires_replication]
+(function() {
+ "use strict";
+
+ const kDbName = "test";
+ const kCollName = "coll";
+
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+
+ const testDB = rst.getPrimary().getDB(kDbName);
+ if (!testDB.serverStatus().storageEngine.supportsSnapshotReadConcern) {
+ rst.stopSet();
+ return;
+ }
+ const adminDB = testDB.getSiblingDB("admin");
+ const coll = testDB.getCollection(kCollName);
+ coll.drop();
+
+ function waitForOp(curOpFilter) {
+ assert.soon(
+ function() {
+ const res = adminDB.aggregate([{$currentOp: {}}, {$match: curOpFilter}]).toArray();
+ if (res.length === 1) {
+ return true;
+ }
+ return false;
+ },
+ function() {
+ return "Failed to find operation in $currentOp output: " +
+ tojson(adminDB.aggregate([{$currentOp: {}}]).toArray());
+ });
+ }
+
+ assert.commandWorked(coll.insert({x: 1}, {writeConcern: {w: "majority"}}));
+
+ // Start a snapshot find that hangs after establishing a storage engine transaction.
+ assert.commandWorked(testDB.adminCommand(
+ {configureFailPoint: "hangAfterPreallocateSnapshot", mode: "alwaysOn"}));
+
+ TestData.sessionId = assert.commandWorked(testDB.adminCommand({startSession: 1})).id;
+ const awaitCommand = startParallelShell(function() {
+ const res = db.runCommand({
+ find: "coll",
+ readConcern: {level: "snapshot"},
+ lsid: TestData.sessionId,
+ txnNumber: NumberLong(0)
+ });
+ assert.commandFailedWithCode(res, ErrorCodes.SnapshotUnavailable);
+ }, rst.ports[0]);
+
+ waitForOp({"command.find": kCollName, "command.readConcern.level": "snapshot"});
+
+ // Create an index on the collection the find was executed against. This will move the
+ // collection's minimum visible timestamp to a point later than the point-in-time referenced
+ // by the find snapshot.
+ assert.commandWorked(testDB.runCommand({
+ createIndexes: kCollName,
+ indexes: [{key: {x: 1}, name: "x_1"}],
+ writeConcern: {w: "majority"}
+ }));
+
+ // Disable the hang and check for parallel shell success. Success indicates that the find
+ // command failed due to collection metadata invalidation.
+ assert.commandWorked(
+ testDB.adminCommand({configureFailPoint: "hangAfterPreallocateSnapshot", mode: "off"}));
+
+ awaitCommand();
+
+ assert.commandWorked(testDB.adminCommand({endSessions: [TestData.sessionId]}));
+ rst.stopSet();
+})();
diff --git a/jstests/noPassthrough/read_concern_snapshot_yielding.js b/jstests/noPassthrough/read_concern_snapshot_yielding.js
index baeeed5a7bb..777e8ca663c 100644
--- a/jstests/noPassthrough/read_concern_snapshot_yielding.js
+++ b/jstests/noPassthrough/read_concern_snapshot_yielding.js
@@ -250,6 +250,34 @@
assert.eq(res.cursor.firstBatch.length, TestData.numDocs, tojson(res));
}, {"command.pipeline": [{$match: {x: 1}}]}, {"command.pipeline": [{$match: {x: 1}}]});
+ // Test getMore with an initial find batchSize of 0. Interrupt behavior of a getMore is not
+ // expected to change with a change of batchSize in the originating command.
+ testCommand(function() {
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "off"}));
+ const initialFindBatchSize = 0;
+ const cursorId = assert
+ .commandWorked(db.runCommand({
+ find: "coll",
+ filter: {x: 1},
+ batchSize: initialFindBatchSize,
+ readConcern: {level: "snapshot"},
+ lsid: TestData.sessionId,
+ txnNumber: NumberLong(TestData.txnNumber)
+ }))
+ .cursor.id;
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "alwaysOn"}));
+ const res = assert.commandWorked(db.runCommand({
+ getMore: NumberLong(cursorId),
+ collection: "coll",
+ lsid: TestData.sessionId,
+ txnNumber: NumberLong(TestData.txnNumber)
+ }));
+ assert.eq(
+ res.cursor.nextBatch.length, TestData.numDocs - initialFindBatchSize, tojson(res));
+ }, {"originatingCommand.filter": {x: 1}}, {op: "getmore"});
+
// Test update.
// TODO SERVER-33412: Perform writes under autocommit:false transaction.
// TODO SERVER-33548: We cannot provide a 'profilerFilter' because profiling is turned off for
diff --git a/jstests/noPassthrough/snapshot_reads.js b/jstests/noPassthrough/snapshot_reads.js
index 65a37fdc4db..5f6f7774d98 100644
--- a/jstests/noPassthrough/snapshot_reads.js
+++ b/jstests/noPassthrough/snapshot_reads.js
@@ -49,14 +49,14 @@
cursorCmd.readConcern = {level: "snapshot"};
cursorCmd.txnNumber = NumberLong(txnNumber);
- // Establish a snapshot cursor, fetching the first 5 documents.
+ // Establish a snapshot batchSize:0 cursor.
let res = assert.commandWorked(sessionDb.runCommand(cursorCmd));
- assert(res.hasOwnProperty("cursor"));
- assert(res.cursor.hasOwnProperty("firstBatch"));
- assert.eq(5, res.cursor.firstBatch.length);
+ assert(res.hasOwnProperty("cursor"), tojson(res));
+ assert(res.cursor.hasOwnProperty("firstBatch"), tojson(res));
+ assert.eq(0, res.cursor.firstBatch.length, tojson(res));
- assert(res.cursor.hasOwnProperty("id"));
+ assert(res.cursor.hasOwnProperty("id"), tojson(res));
const cursorId = res.cursor.id;
assert.neq(cursorId, 0);
@@ -64,35 +64,37 @@
// performed outside of the session.
assert.writeOK(primaryDB.coll.insert({_id: 10}, {writeConcern: {w: "majority"}}));
- // Fetch the 6th document. This confirms that the transaction stash is preserved across
- // multiple getMore invocations.
+ // Fetch the first 5 documents.
res = assert.commandWorked(sessionDb.runCommand({
getMore: cursorId,
collection: collName,
- batchSize: 1,
+ batchSize: 5,
txnNumber: NumberLong(txnNumber)
}));
- assert(res.hasOwnProperty("cursor"));
- assert(res.cursor.hasOwnProperty("id"));
- assert.neq(0, res.cursor.id);
-
- // Exhaust the cursor, retrieving the remainder of the result set.
+ assert(res.hasOwnProperty("cursor"), tojson(res));
+ assert(res.cursor.hasOwnProperty("id"), tojson(res));
+ assert.neq(0, res.cursor.id, tojson(res));
+ assert(res.cursor.hasOwnProperty("nextBatch"), tojson(res));
+ assert.eq(5, res.cursor.nextBatch.length, tojson(res));
+
+ // Exhaust the cursor, retrieving the remainder of the result set. Performing a second
+ // getMore tests snapshot isolation across multiple getMore invocations.
res = assert.commandWorked(sessionDb.runCommand({
getMore: cursorId,
collection: collName,
- batchSize: 10,
+ batchSize: 20,
txnNumber: NumberLong(txnNumber++)
}));
// The cursor has been exhausted.
- assert(res.hasOwnProperty("cursor"));
- assert(res.cursor.hasOwnProperty("id"));
- assert.eq(0, res.cursor.id);
+ assert(res.hasOwnProperty("cursor"), tojson(res));
+ assert(res.cursor.hasOwnProperty("id"), tojson(res));
+ assert.eq(0, res.cursor.id, tojson(res));
- // Only the remaining 4 of the initial 10 documents are returned. The 11th document is not
+ // Only the remaining 5 of the initial 10 documents are returned. The 11th document is not
// part of the result set.
- assert(res.cursor.hasOwnProperty("nextBatch"));
- assert.eq(4, res.cursor.nextBatch.length);
+ assert(res.cursor.hasOwnProperty("nextBatch"), tojson(res));
+ assert.eq(5, res.cursor.nextBatch.length, tojson(res));
if (readFromSecondary) {
rst.awaitLastOpCommitted();
@@ -108,13 +110,13 @@
}));
// The cursor has been exhausted.
- assert(res.hasOwnProperty("cursor"));
- assert(res.cursor.hasOwnProperty("id"));
- assert.eq(0, res.cursor.id);
+ assert(res.hasOwnProperty("cursor"), tojson(res));
+ assert(res.cursor.hasOwnProperty("id"), tojson(res));
+ assert.eq(0, res.cursor.id, tojson(res));
// All 11 documents are returned.
- assert(res.cursor.hasOwnProperty("firstBatch"));
- assert.eq(11, res.cursor.firstBatch.length);
+ assert(res.cursor.hasOwnProperty("firstBatch"), tojson(res));
+ assert.eq(11, res.cursor.firstBatch.length, tojson(res));
// Reject snapshot reads without txnNumber.
assert.commandFailed(sessionDb.runCommand(
@@ -133,14 +135,14 @@
}
// Test snapshot reads using find.
- let findCmd = {find: collName, sort: {_id: 1}, batchSize: 5};
+ let findCmd = {find: collName, sort: {_id: 1}, batchSize: 0};
runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: findCmd});
runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: findCmd});
runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: findCmd});
runTest({useCausalConsistency: true, readFromSecondary: true, establishCursorCmd: findCmd});
// Test snapshot reads using aggregate.
- let aggCmd = {aggregate: collName, pipeline: [{$sort: {_id: 1}}], cursor: {batchSize: 5}};
+ let aggCmd = {aggregate: collName, pipeline: [{$sort: {_id: 1}}], cursor: {batchSize: 0}};
runTest({useCausalConsistency: false, readFromSecondary: false, establishCursorCmd: aggCmd});
runTest({useCausalConsistency: true, readFromSecondary: false, establishCursorCmd: aggCmd});
runTest({useCausalConsistency: false, readFromSecondary: true, establishCursorCmd: aggCmd});
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index b20552fd234..db36244520e 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -271,6 +271,9 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
// will be allowed to commit.
MONGO_FP_DECLARE(onPrimaryTransactionalWrite);
+// Failpoint which will pause an operation just after allocating a point-in-time storage engine
+// transaction.
+MONGO_FP_DECLARE(hangAfterPreallocateSnapshot);
} // namespace
const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
@@ -605,41 +608,59 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
return;
}
- // We must lock the Client to change the Locker on the OperationContext and the Session mutex to
- // access Session state. We must lock the Client before the Session mutex, since the Client
- // effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go
- // away, and then lock the Session owned by that client.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- if (opCtx->getTxnNumber() < _activeTxnNumber) {
- // The session is checked out, so _activeTxnNumber cannot advance due to a user operation.
- // However, when a chunk is migrated, session and transaction information is copied from the
- // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead
- // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort
- // the current transaction. Note that it would indicate a user bug to have a newer
- // transaction on one shard while an older transaction is still active on another shard.
- _releaseStashedTransactionResources(lg);
- uasserted(ErrorCodes::TransactionAborted,
- str::stream() << "Transaction aborted. Active txnNumber is now "
- << _activeTxnNumber);
- return;
- }
+ bool snapshotPreallocated = false;
+ {
+ // We must lock the Client to change the Locker on the OperationContext and the Session
+ // mutex to access Session state. We must lock the Client before the Session mutex, since
+ // the Client effectively owns the Session. That is, a user might lock the Client to ensure
+ // it doesn't go away, and then lock the Session owned by that client.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ if (opCtx->getTxnNumber() < _activeTxnNumber) {
+ // The session is checked out, so _activeTxnNumber cannot advance due to a user
+ // operation.
+ // However, when a chunk is migrated, session and transaction information is copied from
+ // the donor shard to the recipient. This occurs outside of the check-out mechanism and
+ // can lead to a higher _activeTxnNumber during the lifetime of a checkout. If that
+ // occurs, we abort the current transaction. Note that it would indicate a user bug to
+ // have a newer transaction on one shard while an older transaction is still active on
+ // another shard.
+ _releaseStashedTransactionResources(lg);
+ uasserted(ErrorCodes::TransactionAborted,
+ str::stream() << "Transaction aborted. Active txnNumber is now "
+ << _activeTxnNumber);
+ return;
+ }
- if (_txnResourceStash) {
- invariant(_txnState != MultiDocumentTransactionState::kNone);
- _txnResourceStash->release(opCtx);
- _txnResourceStash = boost::none;
- } else {
- auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ||
- _txnState == MultiDocumentTransactionState::kInProgress) {
- opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
- if (_txnState != MultiDocumentTransactionState::kInProgress) {
- invariant(_txnState == MultiDocumentTransactionState::kNone);
- _txnState = MultiDocumentTransactionState::kInSnapshotRead;
+ if (_txnResourceStash) {
+ invariant(_txnState != MultiDocumentTransactionState::kNone);
+ _txnResourceStash->release(opCtx);
+ _txnResourceStash = boost::none;
+ } else {
+ auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ||
+ _txnState == MultiDocumentTransactionState::kInProgress) {
+ opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
+
+ // Storage engine transactions may be started in a lazy manner. By explicitly
+ // starting here we ensure that a point-in-time snapshot is established during the
+ // first operation of a transaction.
+ opCtx->recoveryUnit()->preallocateSnapshot();
+ snapshotPreallocated = true;
+
+ if (_txnState != MultiDocumentTransactionState::kInProgress) {
+ invariant(_txnState == MultiDocumentTransactionState::kNone);
+ _txnState = MultiDocumentTransactionState::kInSnapshotRead;
+ }
}
}
}
+
+ if (snapshotPreallocated) {
+ // The Client lock must not be held when executing this failpoint as it will block currentOp
+ // execution.
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterPreallocateSnapshot);
+ }
}
void Session::abortIfSnapshotRead(TxnNumber txnNumber) {