summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2018-02-20 16:51:50 -0500
committerTess Avitabile <tess.avitabile@mongodb.com>2018-02-22 17:08:22 -0500
commitadfa37c74e716826cde8b7bb1ebbe9e6c99d77d4 (patch)
tree9c207a1729a573b2558dce04eb4656c1af07f790
parentc0480b762f7b155ca0a06921acf99fc31859ea85 (diff)
downloadmongo-adfa37c74e716826cde8b7bb1ebbe9e6c99d77d4.tar.gz
SERVER-33372 Support readConcern snapshot for updates
-rw-r--r--jstests/noPassthrough/readConcern_snapshot.js20
-rw-r--r--jstests/noPassthrough/read_concern_snapshot_yielding.js302
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp19
-rw-r--r--src/mongo/db/query/plan_yield_policy.cpp6
-rw-r--r--src/mongo/db/service_entry_point_common.cpp12
-rw-r--r--src/mongo/db/session.cpp19
-rw-r--r--src/mongo/db/session.h1
-rw-r--r--src/mongo/db/session_test.cpp38
8 files changed, 226 insertions, 191 deletions
diff --git a/jstests/noPassthrough/readConcern_snapshot.js b/jstests/noPassthrough/readConcern_snapshot.js
index a13bcc1ff2c..526a5c17b37 100644
--- a/jstests/noPassthrough/readConcern_snapshot.js
+++ b/jstests/noPassthrough/readConcern_snapshot.js
@@ -183,15 +183,27 @@
}),
ErrorCodes.InvalidOptions);
- // readConcern 'snapshot' is not supported by update.
- // TODO SERVER-33354: Add snapshot support for update.
+ // readConcern 'snapshot' is supported by update.
+ assert.commandWorked(sessionDb.coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
+ printjson(assert.commandWorked(sessionDb.runCommand({
+ update: collName,
+ updates: [{q: {_id: 0}, u: {$inc: {a: 1}}}],
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(txnNumber++)
+ })));
+ assert.eq({_id: 0, a: 1}, sessionDb.coll.findOne({_id: 0}));
+
+ // readConcern 'snapshot' is not yet supported by multi-statement updates.
+ assert.commandWorked(sessionDb.coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
assert.commandFailedWithCode(sessionDb.runCommand({
update: collName,
- updates: [{q: {}, u: {$set: {a: 1}}}],
+ updates: [{q: {_id: 0}, u: {$inc: {a: 1}}}, {q: {_id: 1}, u: {$inc: {a: 1}}}],
readConcern: {level: "snapshot"},
txnNumber: NumberLong(txnNumber++)
}),
- ErrorCodes.InvalidOptions);
+ ErrorCodes.WriteConflict);
+ assert.eq({_id: 0, a: 1}, sessionDb.coll.findOne({_id: 0}));
+ assert.eq({_id: 1}, sessionDb.coll.findOne({_id: 1}));
// readConcern 'snapshot' is not supported by delete.
// TODO SERVER-33354: Add snapshot support for delete.
diff --git a/jstests/noPassthrough/read_concern_snapshot_yielding.js b/jstests/noPassthrough/read_concern_snapshot_yielding.js
index bbc3d8243fa..de3ed4c39bb 100644
--- a/jstests/noPassthrough/read_concern_snapshot_yielding.js
+++ b/jstests/noPassthrough/read_concern_snapshot_yielding.js
@@ -1,5 +1,6 @@
-// Test that the read concern level 'snapshot' exhibits the correct yielding behavior for find and
-// getMore. That is, snapshot reads check for interrupt but do not yield locks.
+// Test that the read concern level 'snapshot' exhibits the correct yielding behavior. That is,
+// operations performed at read concern level snapshot check for interrupt but do not yield locks or
+// storage engine resources.
(function() {
"use strict";
@@ -14,18 +15,21 @@
const db = rst.getPrimary().getDB(dbName);
const adminDB = db.getSiblingDB("admin");
const coll = db.coll;
-
- TestData.sessionId = assert.commandWorked(adminDB.runCommand({startSession: 1})).id;
- TestData.txnNumber = 0;
-
- assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryExecYieldIterations: 1}));
- db.setProfilingLevel(2);
+ TestData.numDocs = 4;
if (!db.serverStatus().storageEngine.supportsSnapshotReadConcern) {
rst.stopSet();
return;
}
+ TestData.sessionId = assert.commandWorked(adminDB.runCommand({startSession: 1})).id;
+ TestData.txnNumber = 0;
+
+ // Set 'internalQueryExecYieldIterations' to 2 to ensure that commands yield on the second try
+ // (i.e. after they have established a snapshot but before they have returned any documents).
+ assert.commandWorked(db.adminCommand({setParameter: 1, internalQueryExecYieldIterations: 2}));
+ db.setProfilingLevel(2);
+
function waitForOpId(curOpFilter) {
let opId;
assert.soon(
@@ -62,152 +66,182 @@
assert.eq(true, res[0].killPending, tojson(res));
}
- function awaitFindFn() {
- assert.commandWorked(db.runCommand({
+ function populateCollection() {
+ db.coll.drop();
+ for (let i = 0; i < TestData.numDocs; i++) {
+ assert.commandWorked(db.coll.insert({_id: i, x: 1}, {writeConcern: {w: "majority"}}));
+ }
+ }
+
+ function testCommand(awaitCommandFn, curOpFilter, profilerFilter, testWriteConflict) {
+ //
+ // Test that the command can be killed.
+ //
+
+ TestData.txnNumber++;
+ populateCollection();
+
+ // Start a command that hangs before checking for interrupt.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "alwaysOn"}));
+ let awaitCommand = startParallelShell(awaitCommandFn, rst.ports[0]);
+
+ // Kill the command, and check that it is set to killPending.
+ let opId = waitForOpId(curOpFilter);
+ assert.commandWorked(db.killOp(opId));
+ assertKillPending(opId);
+
+ // Remove the hang, and check that the command is killed.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "off"}));
+ let exitCode = awaitCommand({checkExitSuccess: false});
+ assert.neq(0, exitCode, "Expected shell to exit with failure due to operation kill");
+
+ //
+ // Test that the command does not yield locks.
+ //
+
+ TestData.txnNumber++;
+ populateCollection();
+
+ // Start a command that hangs before checking for interrupt.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "alwaysOn"}));
+ awaitCommand = startParallelShell(awaitCommandFn, rst.ports[0]);
+ waitForOpId(curOpFilter);
+
+ // Start a drop. This should block behind the command, since the command does not yield
+ // locks.
+ let awaitDrop = startParallelShell(function() {
+ db.getSiblingDB("test").coll.drop();
+ }, rst.ports[0]);
+
+ // Remove the hang. The command should complete successfully.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "off"}));
+ awaitCommand();
+
+ // Now the drop can complete.
+ awaitDrop();
+
+ // Confirm that the command did not yield.
+ if (profilerFilter) {
+ let profilerEntry = getLatestProfilerEntry(db, profilerFilter);
+ assert(profilerEntry.hasOwnProperty("numYield"), tojson(profilerEntry));
+ assert.eq(0, profilerEntry.numYield, tojson(profilerEntry));
+ }
+
+ //
+ // Test that the command does not read data that is inserted during its execution.
+ // 'awaitCommandFn' should fail if it reads a document {_id: <numDocs>, x: 1, new: 1}.
+ //
+
+ TestData.txnNumber++;
+ populateCollection();
+
+ // Start a command that hangs before checking for interrupt.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "alwaysOn"}));
+ awaitCommand = startParallelShell(awaitCommandFn, rst.ports[0]);
+ waitForOpId(curOpFilter);
+
+ // Insert data that should not be read by the command.
+ assert.commandWorked(
+ db.coll.insert({_id: TestData.numDocs, x: 1, new: 1}, {writeConcern: {w: "majority"}}));
+
+ // Remove the hang. The command should complete successfully.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "off"}));
+ awaitCommand();
+
+ //
+ // Test that the command fails if a write conflict occurs. 'awaitCommandFn' should write to
+ // {_id: <numDocs>, x: 1, new: 1}.
+ //
+
+ if (testWriteConflict) {
+ TestData.txnNumber++;
+ populateCollection();
+
+ // Insert the document that the command will write to.
+ assert.commandWorked(db.coll.insert({_id: TestData.numDocs, x: 1, new: 1},
+ {writeConcern: {w: "majority"}}));
+
+ // Start a command that hangs before checking for interrupt.
+ assert.commandWorked(db.adminCommand({
+ configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang",
+ mode: "alwaysOn"
+ }));
+ awaitCommand = startParallelShell(awaitCommandFn, rst.ports[0]);
+ waitForOpId(curOpFilter);
+
+ // Update the document that the command will write to.
+ assert.commandWorked(db.coll.update({_id: TestData.numDocs},
+ {$set: {conflict: true}},
+ {writeConcern: {w: "majority"}}));
+
+ // Remove the hang. The command should fail.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "off"}));
+ exitCode = awaitCommand({checkExitSuccess: false});
+ assert.neq(0, exitCode, "Expected shell to exit with failure due to WriteConflict");
+ }
+ }
+
+ // Test find.
+ testCommand(function() {
+ const res = assert.commandWorked(db.runCommand({
find: "coll",
filter: {x: 1},
readConcern: {level: "snapshot"},
lsid: TestData.sessionId,
txnNumber: NumberLong(TestData.txnNumber)
}));
- }
-
- function awaitGetMoreFn() {
+ assert.eq(res.cursor.firstBatch.length, TestData.numDocs, tojson(res));
+ }, {"command.filter": {x: 1}}, {op: "query"});
+
+ // Test getMore.
+ testCommand(function() {
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "off"}));
+ const initialFindBatchSize = 2;
const cursorId = assert
.commandWorked(db.runCommand({
find: "coll",
filter: {x: 1},
- batchSize: 2,
+ batchSize: initialFindBatchSize,
readConcern: {level: "snapshot"},
lsid: TestData.sessionId,
txnNumber: NumberLong(TestData.txnNumber)
}))
.cursor.id;
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "setCheckForInterruptHang", mode: "alwaysOn"}));
- assert.commandWorked(db.runCommand({
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "setInterruptOnlyPlansCheckForInterruptHang", mode: "alwaysOn"}));
+ const res = assert.commandWorked(db.runCommand({
getMore: NumberLong(cursorId),
collection: "coll",
+ batchSize: TestData.numDocs,
lsid: TestData.sessionId,
txnNumber: NumberLong(TestData.txnNumber)
}));
- }
-
- for (let i = 0; i < 4; i++) {
- assert.commandWorked(db.coll.insert({_id: i, x: 1}, {writeConcern: {w: "majority"}}));
- }
-
- // Perform an initial find prior to setting the 'setCheckForInterruptHang' failpoint. As the
- // first read on a session, this will setup the session. Included in setup is a read on the
- // config.transactions collection. If the session is setup while the failpoint is active
- // we will block on the config.transaction read and not the user operation we are testing.
- assert.commandWorked(db.runCommand({
- find: "coll",
- filter: {x: 1},
- lsid: TestData.sessionId,
- txnNumber: NumberLong(TestData.txnNumber)
- }));
-
- //
- // Snapshot finds can be killed.
- //
- TestData.txnNumber++;
-
- // Start a find command that hangs before checking for interrupt.
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "setCheckForInterruptHang", mode: "alwaysOn"}));
- let awaitFind = startParallelShell(awaitFindFn, rst.ports[0]);
-
- // Kill the command, and check that it is set to killPending.
- let opId = waitForOpId({"command.filter": {x: 1}});
- assert.commandWorked(db.killOp(opId));
- assertKillPending(opId);
-
- // Remove the hang, and check that the command is killed.
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "setCheckForInterruptHang", mode: "off"}));
- let exitCode = awaitFind({checkExitSuccess: false});
- assert.neq(0, exitCode, "Expected shell to exit with failure due to operation kill");
-
- //
- // Snapshot getMores can be killed.
- //
- TestData.txnNumber++;
-
- // Start a getMore command that hangs before checking for interrupt.
- let awaitGetMore = startParallelShell(awaitGetMoreFn, rst.ports[0]);
-
- // Kill the command, and check that it is set to killPending.
- opId = waitForOpId({"originatingCommand.filter": {x: 1}});
- assert.commandWorked(db.killOp(opId));
- assertKillPending(opId);
-
- // Remove the hang, and check that the command is killed.
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "setCheckForInterruptHang", mode: "off"}));
- exitCode = awaitGetMore({checkExitSuccess: false});
- assert.neq(0, exitCode, "Expected shell to exit with failure due to operation kill");
-
- //
- // Snapshot finds do not yield locks.
- //
- TestData.txnNumber++;
-
- // Start a find command that hangs before checking for interrupt.
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "setCheckForInterruptHang", mode: "alwaysOn"}));
- awaitFind = startParallelShell(awaitFindFn, rst.ports[0]);
- waitForOpId({"command.filter": {x: 1}});
-
- // Start a drop. This should block behind the find, since the find does not yield locks.
- let awaitDrop = startParallelShell(function() {
- db.getSiblingDB("test").coll.drop();
- }, rst.ports[0]);
-
- // Remove the hang. The find should complete successfully.
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "setCheckForInterruptHang", mode: "off"}));
- awaitFind();
-
- // Now the drop can complete.
- awaitDrop();
-
- // Confirm that the find did not yield.
- let profilerEntry = getLatestProfilerEntry(db, {op: "query"});
- assert(profilerEntry.hasOwnProperty("numYield"), tojson(profilerEntry));
- assert.eq(0, profilerEntry.numYield, tojson(profilerEntry));
-
- // Restore the collection.
- for (let i = 0; i < 4; i++) {
- assert.commandWorked(db.coll.insert({_id: i, x: 1}, {writeConcern: {w: "majority"}}));
- }
-
- //
- // Snapshot getMores do not yield locks.
- //
- TestData.txnNumber++;
-
- // Start a getMore command that hangs before checking for interrupt.
- awaitGetMore = startParallelShell(awaitGetMoreFn, rst.ports[0]);
- waitForOpId({"originatingCommand.filter": {x: 1}});
-
- // Start a drop. This should block behing the getMore, since the getMore does not yield locks.
- awaitDrop = startParallelShell(function() {
- db.getSiblingDB("test").coll.drop();
- }, rst.ports[0]);
-
- // Remove the hang. The getMore should complete successfully.
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "setCheckForInterruptHang", mode: "off"}));
- awaitGetMore();
-
- // Now the drop can complete.
- awaitDrop();
-
- // Confirm that the getMore did not yield.
- profilerEntry = getLatestProfilerEntry(db, {op: "getmore"});
- assert(profilerEntry.hasOwnProperty("numYield"), tojson(profilerEntry));
- assert.eq(0, profilerEntry.numYield, tojson(profilerEntry));
+ assert.eq(
+ res.cursor.nextBatch.length, TestData.numDocs - initialFindBatchSize, tojson(res));
+ }, {"originatingCommand.filter": {x: 1}}, {op: "getmore"});
+
+ // Test update.
+ // We cannot profide a 'profilerFilter' because profiling is turned off for write commands in
+ // transactions.
+ testCommand(function() {
+ const res = assert.commandWorked(db.runCommand({
+ update: "coll",
+ updates: [{q: {new: 1}, u: {$set: {updated: true}}}],
+ readConcern: {level: "snapshot"},
+ lsid: TestData.sessionId,
+ txnNumber: NumberLong(TestData.txnNumber)
+ }));
+ assert.eq(res.n, 0, tojson(res));
+ assert.eq(res.nModified, 0, tojson(res));
+ }, {op: "update"}, null, true);
rst.stopSet();
}());
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 16f995f030b..ba28fca2ccb 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -126,7 +126,8 @@ void finishCurOp(OperationContext* opCtx, CurOp* curOp) {
log() << curOp->debug().report(opCtx->getClient(), *curOp, lockerInfo.stats);
}
- if (curOp->shouldDBProfile(shouldSample)) {
+ // Do not profile individual statements in a write command if we are in a transaction.
+ if (curOp->shouldDBProfile(shouldSample) && !opCtx->getWriteUnitOfWork()) {
profile(opCtx, CurOp::get(opCtx)->getNetworkOp());
}
} catch (const DBException& ex) {
@@ -213,6 +214,11 @@ bool handleError(OperationContext* opCtx,
throw; // These have always failed the whole batch.
}
+ if (opCtx->getWriteUnitOfWork()) {
+ // If we are in a transaction, we must fail the whole batch.
+ throw;
+ }
+
if (auto staleInfo = ex.extraInfo<StaleConfigInfo>()) {
if (!opCtx->getClient()->isInDirectClient()) {
// We already have the StaleConfig exception, so just swallow any errors due to refresh
@@ -554,7 +560,12 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
request.setArrayFilters(write_ops::arrayFiltersOf(op));
request.setMulti(op.getMulti());
request.setUpsert(op.getUpsert());
- request.setYieldPolicy(PlanExecutor::YIELD_AUTO); // ParsedUpdate overrides this for $isolated.
+
+ auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ request.setYieldPolicy(
+ readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
+ ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO); // ParsedUpdate overrides this for $isolated.
ParsedUpdate parsedUpdate(opCtx, &request);
uassertStatusOK(parsedUpdate.parseRequest());
@@ -623,7 +634,9 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
}
WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& wholeOp) {
- invariant(!opCtx->lockState()->inAWriteUnitOfWork()); // Does own retries.
+ // Update performs its own retries, so we should not be in a WriteUnitOfWork unless we are in a
+ // transaction.
+ invariant(opCtx->getWriteUnitOfWork() || !opCtx->lockState()->inAWriteUnitOfWork());
uassertStatusOK(userAllowedWriteNS(wholeOp.getNamespace()));
DisableDocumentValidationIfTrue docValidationDisabler(
diff --git a/src/mongo/db/query/plan_yield_policy.cpp b/src/mongo/db/query/plan_yield_policy.cpp
index 4c3a96282d4..9f1c322de8c 100644
--- a/src/mongo/db/query/plan_yield_policy.cpp
+++ b/src/mongo/db/query/plan_yield_policy.cpp
@@ -43,7 +43,7 @@
namespace mongo {
namespace {
-MONGO_FP_DECLARE(setCheckForInterruptHang);
+MONGO_FP_DECLARE(setInterruptOnlyPlansCheckForInterruptHang);
} // namespace
PlanYieldPolicy::PlanYieldPolicy(PlanExecutor* exec, PlanExecutor::YieldPolicy policy)
@@ -101,7 +101,7 @@ Status PlanYieldPolicy::yieldOrInterrupt(stdx::function<void()> beforeYieldingFn
ON_BLOCK_EXIT([this]() { resetTimer(); });
OperationContext* opCtx = _planYielding->getOpCtx();
invariant(opCtx);
- MONGO_FAIL_POINT_PAUSE_WHILE_SET(setCheckForInterruptHang);
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(setInterruptOnlyPlansCheckForInterruptHang);
return opCtx->checkForInterruptNoAssert();
}
@@ -131,8 +131,6 @@ Status PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn,
// that it's time to yield. Whether or not we will actually yield, we need to check
// if this operation has been interrupted.
if (_policy == PlanExecutor::YIELD_AUTO) {
- MONGO_FAIL_POINT_PAUSE_WHILE_SET(setCheckForInterruptHang);
-
auto interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index d5d55eda46a..09bf4b29962 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -119,7 +119,7 @@ const StringMap<int> sessionCheckoutWhitelist = {{"delete", 1},
// The command names for which readConcern level snapshot is allowed. The getMore command is
// implicitly allowed to operate on a cursor which was opened under readConcern level snapshot.
const StringMap<int> readConcernSnapshotWhitelist = {
- {"find", 1}, {"count", 1}, {"geoSearch", 1}, {"parallelCollectionScan", 1}};
+ {"find", 1}, {"count", 1}, {"geoSearch", 1}, {"parallelCollectionScan", 1}, {"update", 1}};
void generateLegacyQueryErrorResponse(const AssertionException* exception,
const QueryMessage& queryMessage,
@@ -685,7 +685,15 @@ void execCommandDatabase(OperationContext* opCtx,
runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime, behaviors);
if (retval) {
- sessionTxnState.stashTransactionResources();
+ if (opCtx->getWriteUnitOfWork()) {
+ if (!opCtx->hasStashedCursor()) {
+ // If we are in an autocommit=true transaction and have no stashed cursor,
+ // commit the transaction.
+ opCtx->getWriteUnitOfWork()->commit();
+ } else {
+ sessionTxnState.stashTransactionResources();
+ }
+ }
} else {
command->incrementCommandsFailed();
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 863ddb4227f..60ea0762b99 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -460,14 +460,15 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
return;
}
- if (!opCtx->hasStashedCursor()) {
- if (opCtx->getWriteUnitOfWork()) {
- opCtx->setWriteUnitOfWork(nullptr);
- }
- return;
- }
+ invariant(opCtx->hasStashedCursor());
if (*opCtx->getTxnNumber() != _activeTxnNumber) {
+ // The session is checked out, so _activeTxnNumber cannot advance due to a user operation.
+ // However, when a chunk is migrated, session and transaction information is copied from the
+ // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead
+ // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort
+ // the current transaction. Note that it would indicate a user bug to have a newer
+ // transaction on one shard while an older transaction is still active on another shard.
uasserted(ErrorCodes::TransactionAborted,
str::stream() << "Transaction aborted. Active txnNumber is now "
<< _activeTxnNumber);
@@ -487,6 +488,12 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
void Session::unstashTransactionResources(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
if (opCtx->getTxnNumber() < _activeTxnNumber) {
+ // The session is checked out, so _activeTxnNumber cannot advance due to a user operation.
+ // However, when a chunk is migrated, session and transaction information is copied from the
+ // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead
+ // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort
+ // the current transaction. Note that it would indicate a user bug to have a newer
+ // transaction on one shard while an older transaction is still active on another shard.
_releaseStashedTransactionResources(lg, opCtx);
uasserted(ErrorCodes::TransactionAborted,
str::stream() << "Transaction aborted. Active txnNumber is now "
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index de0d9505c68..cdff0db7b07 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -239,6 +239,7 @@ private:
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteTs);
+ // Releases stashed locks and WiredTiger transaction. This implicitly aborts the transaction.
void _releaseStashedTransactionResources(WithLock, OperationContext* opCtx);
const LogicalSessionId _sessionId;
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index ed536c4f70d..095ac134cc4 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -580,44 +580,6 @@ TEST_F(SessionTest, StashAndUnstashResources) {
ASSERT(opCtx()->getWriteUnitOfWork());
}
-TEST_F(SessionTest, StashNotRequired) {
- const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
- opCtx()->setLogicalSessionId(sessionId);
- opCtx()->setTxnNumber(txnNum);
-
- Locker* originalLocker = opCtx()->lockState();
- RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
- ASSERT(originalLocker);
- ASSERT(originalRecoveryUnit);
-
- Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
-
- session.beginOrContinueTxn(opCtx(), txnNum, boost::none);
-
- repl::ReadConcernArgs readConcernArgs;
- ASSERT_OK(readConcernArgs.initialize(BSON("find"
- << "test"
- << repl::ReadConcernArgs::kReadConcernFieldName
- << BSON(repl::ReadConcernArgs::kLevelFieldName
- << "snapshot"))));
- repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
-
- // Perform initial unstash which sets up a WriteUnitOfWork.
- session.unstashTransactionResources(opCtx());
- ASSERT(opCtx()->getWriteUnitOfWork());
- ASSERT_EQ(originalLocker, opCtx()->lockState());
- ASSERT_EQ(originalRecoveryUnit, opCtx()->recoveryUnit());
-
- // Attempt stash. As no stashing is required the original Locker and RecoveryUnit remain in
- // place.
- session.stashTransactionResources(opCtx());
- ASSERT_EQUALS(originalLocker, opCtx()->lockState());
- ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
- ASSERT(!opCtx()->getWriteUnitOfWork());
-}
-
TEST_F(SessionTest, CheckAutocommitOnlyAllowedAtBeginningOfTxn) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);