summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/auth/lib/commands_lib.js120
-rw-r--r--jstests/core/bypass_doc_validation.js14
-rw-r--r--jstests/core/collation.js32
-rw-r--r--jstests/core/do_txn_basic.js189
-rw-r--r--jstests/core/do_txn_oneshot.js80
-rw-r--r--jstests/core/do_txn_oplog_entry.js (renamed from jstests/core/do_txn_atomicity.js)34
-rw-r--r--jstests/core/json_schema/misc_validation.js11
-rw-r--r--jstests/core/views/views_all_commands.js13
-rw-r--r--src/mongo/db/op_observer_impl.cpp92
-rw-r--r--src/mongo/db/repl/do_txn.cpp74
-rw-r--r--src/mongo/db/repl/do_txn_test.cpp99
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp4
-rw-r--r--src/mongo/db/repl/oplog_entry.h2
-rw-r--r--src/mongo/db/repl/sync_tail.cpp6
-rw-r--r--src/mongo/db/service_entry_point_common.cpp6
-rw-r--r--src/mongo/db/session.cpp54
-rw-r--r--src/mongo/db/session.h43
-rw-r--r--src/mongo/db/session_test.cpp34
18 files changed, 673 insertions, 234 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index 588fd06a6fb..d185de225e6 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -57,25 +57,28 @@ to be authorized to run the command.
4) skipSharded
-Add "skipSharded: true" if you want to run the test only on a standalone.
+Add "skipSharded: true" if you want to run the test only ony in a non-sharded configuration.
-5) skipStandalone
+5) skipUnlessSharded
-Add "skipStandalone: true" if you want to run the test only in sharded
+Add "skipUnlessSharded: true" if you want to run the test only in sharded
configuration.
-6) setup
+6) skipUnlessReplicaSet
+Add "skipUnlessReplicaSet: true" if you want to run the test only when replica sets are in use.
+
+7) setup
The setup function, if present, is called before testing whether a
particular role authorizes a command for a particular database.
-7) teardown
+8) teardown
The teardown function, if present, is called immediately after
testint whether a particular role authorizes a command for a
particular database.
-8) privileges
+9) privileges
An array of privileges used when testing user-defined roles. The test case tests that a user with
the specified privileges is authorized to run the command, and that having only a subset of the
@@ -178,6 +181,8 @@ var roles_all = {
};
load("jstests/libs/uuid_util.js");
+// For isReplSet
+load("jstests/libs/fixture_helpers.js");
var authCommandsLib = {
@@ -198,7 +203,7 @@ var authCommandsLib = {
{
testname: "addShard",
command: {addShard: "x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -221,7 +226,7 @@ var authCommandsLib = {
u: {_id: {ns: "test.x", min: 1}, ns: "test.x"}
}]
},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [{
runOnDb: "config",
roles: roles_clusterManager,
@@ -2264,7 +2269,7 @@ var authCommandsLib = {
{
testname: "balancerStart",
command: {balancerStart: 1},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -2285,7 +2290,7 @@ var authCommandsLib = {
{
testname: "balancerStop",
command: {balancerStop: 1},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -2306,7 +2311,7 @@ var authCommandsLib = {
{
testname: "balancerStatus",
command: {balancerStatus: 1},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -2917,9 +2922,12 @@ var authCommandsLib = {
"ns": firstDbName + ".x",
"o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5}
}],
- preCondition: [{ns: firstDbName + ".x", q: {x: 5}, res: []}]
+ preCondition: [{ns: firstDbName + ".x", q: {x: 5}, res: []}],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
db.getSisterDB(firstDbName).x.save({});
},
@@ -2943,9 +2951,12 @@ var authCommandsLib = {
"op": "i",
"ns": firstDbName + ".x",
"o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5}
- }]
+ }],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
db.getSisterDB(firstDbName).x.save({});
},
@@ -2971,10 +2982,13 @@ var authCommandsLib = {
"ns": state.collName,
"ui": state.uuid,
"o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5}
- }]
+ }],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
};
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
var sibling = db.getSisterDB(firstDbName);
sibling.runCommand({create: "x"});
@@ -3008,10 +3022,13 @@ var authCommandsLib = {
// Given a nonexistent UUID. The command should fail.
"ui": UUID("71f1d1d7-68ca-493e-a7e9-f03c94e2e960"),
"o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5}
- }]
+ }],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
};
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
var sibling = db.getSisterDB(firstDbName);
sibling.runCommand({create: "x"});
@@ -3047,10 +3064,13 @@ var authCommandsLib = {
"ns": state.collName,
"ui": state.uuid,
"o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5}
- }]
+ }],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
};
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
var sibling = db.getSisterDB(firstDbName);
sibling.runCommand({create: "x"});
@@ -3084,10 +3104,13 @@ var authCommandsLib = {
firstDbName + ".y", // Specify wrong name but correct uuid. Should work.
"ui": state.x_uuid, // The insert should on x
"o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5}
- }]
+ }],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
};
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
db.getSisterDB(firstDbName).x.drop();
db.getSisterDB(firstDbName).y.drop();
@@ -3123,10 +3146,13 @@ var authCommandsLib = {
firstDbName + ".y", // Specify wrong name but correct uuid. Should work.
"ui": state.x_uuid, // The insert should on x
"o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5}
- }]
+ }],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
};
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
db.getSisterDB(firstDbName).x.drop();
db.getSisterDB(firstDbName).y.drop();
@@ -3161,9 +3187,12 @@ var authCommandsLib = {
"ns": firstDbName + ".x",
"o2": {"_id": 1},
"o": {"_id": 1, "data": 8}
- }]
+ }],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
db.getSisterDB(firstDbName).x.save({_id: 1, data: 1});
},
@@ -3189,9 +3218,11 @@ var authCommandsLib = {
"o2": {"_id": 1},
"o": {"_id": 1, "data": 8}
}],
- alwaysUpsert: false
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
db.getSisterDB(firstDbName).x.save({_id: 1, data: 1});
},
@@ -3219,10 +3250,12 @@ var authCommandsLib = {
"o2": {"_id": 1},
"o": {"_id": 1, "data": 8}
}],
- alwaysUpsert: false
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
};
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
var sibling = db.getSisterDB(firstDbName);
sibling.x.save({_id: 1, data: 1});
@@ -3257,10 +3290,12 @@ var authCommandsLib = {
"o2": {"_id": 1},
"o": {"_id": 1, "data": 8}
}],
- alwaysUpsert: false
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
};
},
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
var sibling = db.getSisterDB(firstDbName);
sibling.x.save({_id: 1, data: 1});
@@ -3285,8 +3320,13 @@ var authCommandsLib = {
},
{
testname: "doTxn_delete",
- command: {doTxn: [{"op": "d", "ns": firstDbName + ".x", "o": {"_id": 1}}]},
+ command: {
+ doTxn: [{"op": "d", "ns": firstDbName + ".x", "o": {"_id": 1}}],
+ txnNumber: NumberLong(0),
+ lsid: {id: UUID()}
+ },
skipSharded: true,
+ skipUnlessReplicaSet: true,
setup: function(db) {
db.getSisterDB(firstDbName).x.save({_id: 1, data: 1});
},
@@ -3403,7 +3443,7 @@ var authCommandsLib = {
{
testname: "enableSharding",
command: {enableSharding: "x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -3742,7 +3782,7 @@ var authCommandsLib = {
{
testname: "flushRouterConfig",
command: {flushRouterConfig: 1},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -4269,7 +4309,7 @@ var authCommandsLib = {
{
testname: "killOp", // sharded version
command: {killOp: 1, op: "shard1:123"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -4437,7 +4477,7 @@ var authCommandsLib = {
{
testname: "listShards",
command: {listShards: 1},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -4540,7 +4580,7 @@ var authCommandsLib = {
{
testname: "s_mergeChunks",
command: {mergeChunks: "test.x", bounds: [{i: 0}, {i: 5}]},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -4570,7 +4610,7 @@ var authCommandsLib = {
{
testname: "s_moveChunk",
command: {moveChunk: "test.x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -4600,7 +4640,7 @@ var authCommandsLib = {
{
testname: "movePrimary",
command: {movePrimary: "x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -4615,7 +4655,7 @@ var authCommandsLib = {
{
testname: "netstat",
command: {netstat: "x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -4903,7 +4943,7 @@ var authCommandsLib = {
{
testname: "removeShard",
command: {removeShard: "x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -5239,7 +5279,7 @@ var authCommandsLib = {
{
testname: "shardCollection",
command: {shardCollection: "test.x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -5274,7 +5314,7 @@ var authCommandsLib = {
{
testname: "split",
command: {split: "test.x"},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -5531,7 +5571,7 @@ var authCommandsLib = {
{
testname: "addShardToZone",
command: {addShardToZone: shard0name, zone: 'z'},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -5551,7 +5591,7 @@ var authCommandsLib = {
{
testname: "removeShardFromZone",
command: {removeShardFromZone: shard0name, zone: 'z'},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -5574,7 +5614,7 @@ var authCommandsLib = {
{
testname: "updateZoneKeyRange",
command: {updateZoneKeyRange: 'test.foo', min: {x: 1}, max: {x: 5}, zone: 'z'},
- skipStandalone: true,
+ skipUnlessSharded: true,
testcases: [
{
runOnDb: adminDbName,
@@ -5665,7 +5705,11 @@ var authCommandsLib = {
return [];
}
// others shouldn't run in a standalone environment
- if (t.skipStandalone && !this.isMongos(conn)) {
+ if (t.skipUnlessSharded && !this.isMongos(conn)) {
+ return [];
+ }
+ // some tests require replica sets to be enabled.
+ if (t.skipUnlessReplicaSet && !FixtureHelpers.isReplSet(conn.getDB("admin"))) {
return [];
}
diff --git a/jstests/core/bypass_doc_validation.js b/jstests/core/bypass_doc_validation.js
index 107c4f2aba4..bed22676dc4 100644
--- a/jstests/core/bypass_doc_validation.js
+++ b/jstests/core/bypass_doc_validation.js
@@ -17,6 +17,8 @@
// For isMMAPv1.
load("jstests/concurrency/fsm_workload_helpers/server_types.js");
+ // For isReplSet
+ load("jstests/libs/fixture_helpers.js");
function assertFailsValidation(res) {
if (res instanceof WriteResult || res instanceof BulkWriteResult) {
@@ -53,12 +55,16 @@
assert.eq(1, coll.count({_id: 9}));
}
- // Test doTxn with a simple insert if not on mongos and not on MMAPv1.
- if (!isMongos && !isMMAPv1(db)) {
+ // Test doTxn with a simple insert if a replica set, not on mongos and not on MMAPv1.
+ if (FixtureHelpers.isReplSet(db) && !isMongos && !isMMAPv1(db)) {
+ const session = db.getMongo().startSession();
+ const sessionDb = session.getDatabase(myDb.getName());
const op = [{op: 'i', ns: coll.getFullName(), o: {_id: 10}}];
- assertFailsValidation(myDb.runCommand({doTxn: op, bypassDocumentValidation: false}));
+ assertFailsValidation(sessionDb.runCommand(
+ {doTxn: op, bypassDocumentValidation: false, txnNumber: NumberLong("0")}));
assert.eq(0, coll.count({_id: 10}));
- assert.commandWorked(myDb.runCommand({doTxn: op, bypassDocumentValidation: true}));
+ assert.commandWorked(sessionDb.runCommand(
+ {doTxn: op, bypassDocumentValidation: true, txnNumber: NumberLong("1")}));
assert.eq(1, coll.count({_id: 10}));
}
diff --git a/jstests/core/collation.js b/jstests/core/collation.js
index 3a089dde4ee..2b3996ccc96 100644
--- a/jstests/core/collation.js
+++ b/jstests/core/collation.js
@@ -11,6 +11,8 @@
load("jstests/libs/get_index_helpers.js");
// For isMMAPv1.
load("jstests/concurrency/fsm_workload_helpers/server_types.js");
+ // For isReplSet
+ load("jstests/libs/fixture_helpers.js");
var coll = db.collation;
coll.drop();
@@ -1973,39 +1975,47 @@
}
// doTxn
- if (!isMongos && !isMMAPv1(db)) {
+ if (FixtureHelpers.isReplSet(db) && !isMongos && !isMMAPv1(db)) {
+ const session = db.getMongo().startSession();
+ const sessionDb = session.getDatabase(db.getName());
coll.drop();
assert.commandWorked(
db.createCollection("collation", {collation: {locale: "en_US", strength: 2}}));
assert.writeOK(coll.insert({_id: "foo", x: 5, str: "bar"}));
// preCondition.q respects collection default collation.
- assert.commandFailed(db.runCommand({
+ assert.commandFailed(sessionDb.runCommand({
doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 6}}}],
- preCondition: [{ns: coll.getFullName(), q: {_id: "not foo"}, res: {str: "bar"}}]
+ preCondition: [{ns: coll.getFullName(), q: {_id: "not foo"}, res: {str: "bar"}}],
+ txnNumber: NumberLong("0")
}));
assert.eq(5, coll.findOne({_id: "foo"}).x);
- assert.commandWorked(db.runCommand({
+ assert.commandWorked(sessionDb.runCommand({
doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 6}}}],
- preCondition: [{ns: coll.getFullName(), q: {_id: "FOO"}, res: {str: "bar"}}]
+ preCondition: [{ns: coll.getFullName(), q: {_id: "FOO"}, res: {str: "bar"}}],
+ txnNumber: NumberLong("1")
}));
assert.eq(6, coll.findOne({_id: "foo"}).x);
// preCondition.res respects collection default collation.
- assert.commandFailed(db.runCommand({
+ assert.commandFailed(sessionDb.runCommand({
doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 7}}}],
- preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "not bar"}}]
+ preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "not bar"}}],
+ txnNumber: NumberLong("2")
}));
assert.eq(6, coll.findOne({_id: "foo"}).x);
- assert.commandWorked(db.runCommand({
+ assert.commandWorked(sessionDb.runCommand({
doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 7}}}],
- preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "BAR"}}]
+ preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "BAR"}}],
+ txnNumber: NumberLong("3")
}));
assert.eq(7, coll.findOne({_id: "foo"}).x);
// <operation>.o2 respects collection default collation.
- assert.commandWorked(db.runCommand(
- {doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "FOO"}, o: {$set: {x: 8}}}]}));
+ assert.commandWorked(sessionDb.runCommand({
+ doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "FOO"}, o: {$set: {x: 8}}}],
+ txnNumber: NumberLong("4")
+ }));
assert.eq(8, coll.findOne({_id: "foo"}).x);
}
diff --git a/jstests/core/do_txn_basic.js b/jstests/core/do_txn_basic.js
index 8d12ec377d0..199b4948c8e 100644
--- a/jstests/core/do_txn_basic.js
+++ b/jstests/core/do_txn_basic.js
@@ -6,6 +6,8 @@
load("jstests/libs/get_index_helpers.js");
// For isMMAPv1.
load("jstests/concurrency/fsm_workload_helpers/server_types.js");
+ // For isReplSet
+ load("jstests/libs/fixture_helpers.js");
const t = db.do_txn1;
@@ -18,6 +20,14 @@
jsTestLog("Skipping test as the storage engine does not support doTxn.");
return;
}
+ if (!FixtureHelpers.isReplSet(db)) {
+ jsTestLog("Skipping test as doTxn requires a replSet and replication is not enabled.");
+ return;
+ }
+
+ var session = db.getMongo().startSession();
+ db = session.getDatabase("test");
+ var txnNumber = 0;
t.drop();
@@ -26,56 +36,71 @@
//
// Empty array of operations.
- assert.commandFailedWithCode(db.adminCommand({doTxn: []}),
+ assert.commandFailedWithCode(db.adminCommand({doTxn: [], txnNumber: NumberLong(txnNumber++)}),
ErrorCodes.InvalidOptions,
'doTxn should fail on empty array of operations');
// Non-array type for operations.
- assert.commandFailedWithCode(db.adminCommand({doTxn: "not an array"}),
- ErrorCodes.TypeMismatch,
- 'doTxn should fail on non-array type for operations');
+ assert.commandFailedWithCode(
+ db.adminCommand({doTxn: "not an array", txnNumber: NumberLong(txnNumber++)}),
+ ErrorCodes.TypeMismatch,
+ 'doTxn should fail on non-array type for operations');
// Missing 'op' field in an operation.
- assert.commandFailedWithCode(db.adminCommand({doTxn: [{ns: t.getFullName(), o: {_id: 0}}]}),
- ErrorCodes.FailedToParse,
- 'doTxn should fail on operation without "op" field');
-
- // Non-string 'op' field in an operation.
assert.commandFailedWithCode(
- db.adminCommand({doTxn: [{op: 12345, ns: t.getFullName(), o: {_id: 0}}]}),
+ db.adminCommand(
+ {doTxn: [{ns: t.getFullName(), o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}),
ErrorCodes.FailedToParse,
- 'doTxn should fail on operation with non-string "op" field');
+ 'doTxn should fail on operation without "op" field');
+
+ // Non-string 'op' field in an operation.
+ assert.commandFailedWithCode(db.adminCommand({
+ doTxn: [{op: 12345, ns: t.getFullName(), o: {_id: 0}}],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail on operation with non-string "op" field');
// Empty 'op' field value in an operation.
- assert.commandFailedWithCode(
- db.adminCommand({doTxn: [{op: '', ns: t.getFullName(), o: {_id: 0}}]}),
- ErrorCodes.FailedToParse,
- 'doTxn should fail on operation with empty "op" field value');
+ assert.commandFailedWithCode(db.adminCommand({
+ doTxn: [{op: '', ns: t.getFullName(), o: {_id: 0}}],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail on operation with empty "op" field value');
// Missing 'ns' field in an operation.
- assert.commandFailedWithCode(db.adminCommand({doTxn: [{op: 'u', o: {_id: 0}}]}),
- ErrorCodes.FailedToParse,
- 'doTxn should fail on operation without "ns" field');
+ assert.commandFailedWithCode(
+ db.adminCommand({doTxn: [{op: 'u', o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail on operation without "ns" field');
// Missing 'o' field in an operation.
- assert.commandFailedWithCode(db.adminCommand({doTxn: [{op: 'u', ns: t.getFullName()}]}),
- ErrorCodes.FailedToParse,
- 'doTxn should fail on operation without "o" field');
+ assert.commandFailedWithCode(
+ db.adminCommand(
+ {doTxn: [{op: 'u', ns: t.getFullName()}], txnNumber: NumberLong(txnNumber++)}),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail on operation without "o" field');
// Non-string 'ns' field in an operation.
- assert.commandFailedWithCode(db.adminCommand({doTxn: [{op: 'u', ns: 12345, o: {_id: 0}}]}),
- ErrorCodes.FailedToParse,
- 'doTxn should fail on operation with non-string "ns" field');
+ assert.commandFailedWithCode(
+ db.adminCommand(
+ {doTxn: [{op: 'u', ns: 12345, o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail on operation with non-string "ns" field');
// Missing dbname in 'ns' field.
assert.commandFailedWithCode(
- db.adminCommand({doTxn: [{op: 'd', ns: t.getName(), o: {_id: 1}}]}),
+ db.adminCommand(
+ {doTxn: [{op: 'd', ns: t.getName(), o: {_id: 1}}], txnNumber: NumberLong(txnNumber++)}),
ErrorCodes.InvalidNamespace,
'doTxn should fail with a missing dbname in the "ns" field value');
// Empty 'ns' field value.
- assert.commandFailed(db.adminCommand({doTxn: [{op: 'u', ns: '', o: {_id: 0}}]}),
- 'doTxn should fail with empty "ns" field value');
+ assert.commandFailed(
+ db.adminCommand(
+ {doTxn: [{op: 'u', ns: '', o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}),
+ 'doTxn should fail with empty "ns" field value');
// Valid 'ns' field value in unknown operation type 'x'.
assert.commandFailedWithCode(
@@ -84,17 +109,69 @@
'doTxn should fail on unknown operation type "x" with valid "ns" value');
// Illegal operation type 'n' (no-op).
- assert.commandFailedWithCode(
- db.adminCommand({doTxn: [{op: 'n', ns: t.getFullName(), o: {_id: 0}}]}),
- ErrorCodes.InvalidOptions,
- 'doTxn should fail on "no op" operations.');
+ assert.commandFailedWithCode(db.adminCommand({
+ doTxn: [{op: 'n', ns: t.getFullName(), o: {_id: 0}}],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.InvalidOptions,
+ 'doTxn should fail on "no op" operations.');
// Illegal operation type 'c' (command).
+ assert.commandFailedWithCode(db.adminCommand({
+ doTxn: [{op: 'c', ns: t.getCollection('$cmd').getFullName(), o: {applyOps: []}}],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.InvalidOptions,
+ 'doTxn should fail on commands.');
+
+ // No transaction number in an otherwise valid operation.
assert.commandFailedWithCode(
- db.adminCommand(
- {doTxn: [{op: 'c', ns: t.getCollection('$cmd').getFullName(), o: {applyOps: []}}]}),
+ db.adminCommand({doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}]}),
ErrorCodes.InvalidOptions,
- 'doTxn should fail on commands.');
+ 'doTxn should fail when no transaction number is given.');
+
+ // Session IDs and transaction numbers on sub-ops are not allowed
+ var lsid = {id: UUID()};
+ res = assert.commandFailedWithCode(
+ db.runCommand({
+ doTxn: [{
+ op: "i",
+ ns: t.getFullName(),
+ o: {_id: 7, x: 24},
+ lsid: lsid,
+ txnNumber: NumberLong(1),
+ }],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail when inner transaction contains session id.');
+
+ res = assert.commandFailedWithCode(
+ db.runCommand({
+ doTxn: [{
+ op: "u",
+ ns: t.getFullName(),
+ o2: {_id: 7},
+ o: {$set: {x: 25}},
+ txnNumber: NumberLong(1),
+ }],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail when inner transaction contains transaction number.');
+
+ res = assert.commandFailedWithCode(
+ db.runCommand({
+ doTxn: [{
+ op: "d",
+ ns: t.getFullName(),
+ o: {_id: 7},
+ stmtId: 0,
+ }],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.FailedToParse,
+ 'doTxn should fail when inner transaction contains statement id.');
// Malformed operation with unexpected field 'x'.
assert.commandFailedWithCode(
@@ -115,7 +192,7 @@
const t2 = db.getSiblingDB('do_txn1_no_such_db').getCollection('t');
[t, t2].forEach(coll => {
const op = {op: optype, ns: coll.getFullName(), o: o, o2: o2};
- const cmd = {doTxn: [op]};
+ const cmd = {doTxn: [op], txnNumber: NumberLong(txnNumber++)};
jsTestLog('Testing doTxn on non-existent namespace: ' + tojson(cmd));
if (expectedErrorCode === ErrorCodes.OK) {
assert.commandWorked(db.adminCommand(cmd));
@@ -132,13 +209,17 @@
testCrudOperationOnNonExistentNamespace('u', {x: 0}, {_id: 0}, ErrorCodes.NamespaceNotFound);
assert.commandWorked(db.createCollection(t.getName()));
- var a = assert.commandWorked(
- db.adminCommand({doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}]}));
+ var a = assert.commandWorked(db.adminCommand({
+ doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}],
+ txnNumber: NumberLong(txnNumber++)
+ }));
assert.eq(1, t.find().count(), "Valid insert failed");
assert.eq(true, a.results[0], "Bad result value for valid insert");
- a = assert.commandWorked(
- db.adminCommand({doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}]}));
+ a = assert.commandWorked(db.adminCommand({
+ doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}],
+ txnNumber: NumberLong(txnNumber++)
+ }));
assert.eq(1, t.find().count(), "Duplicate insert failed");
assert.eq(true, a.results[0], "Bad result value for duplicate insert");
@@ -146,14 +227,17 @@
assert.eq(o, t.findOne(), "Mismatching document inserted.");
// 'o' field is an empty array.
- assert.commandFailed(db.adminCommand({doTxn: [{op: 'i', ns: t.getFullName(), o: []}]}),
- 'doTxn should fail on insert of object with empty array element');
+ assert.commandFailed(
+ db.adminCommand(
+ {doTxn: [{op: 'i', ns: t.getFullName(), o: []}], txnNumber: NumberLong(txnNumber++)}),
+ 'doTxn should fail on insert of object with empty array element');
var res = assert.commandWorked(db.runCommand({
doTxn: [
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 18}}},
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 19}}}
- ]
+ ],
+ txnNumber: NumberLong(txnNumber++)
}));
o.x++;
@@ -170,7 +254,8 @@
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 20}}},
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 21}}}
],
- preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}]
+ preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}],
+ txnNumber: NumberLong(txnNumber++)
}));
o.x++;
@@ -187,7 +272,8 @@
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 22}}},
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 23}}}
],
- preCondition: [{ns: "foo.otherName", q: {_id: 5}, res: {x: 21}}]
+ preCondition: [{ns: "foo.otherName", q: {_id: 5}, res: {x: 21}}],
+ txnNumber: NumberLong(txnNumber++)
}));
assert.eq(o, t.findOne(), "preCondition didn't match, but ops were still applied");
@@ -198,7 +284,8 @@
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 22}}},
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 23}}}
],
- preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}]
+ preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}],
+ txnNumber: NumberLong(txnNumber++)
}));
assert.eq(o, t.findOne(), "preCondition didn't match, but ops were still applied");
@@ -207,7 +294,8 @@
doTxn: [
{op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 22}}},
{op: "u", ns: t.getFullName(), o2: {_id: 6}, o: {$set: {x: 23}}}
- ]
+ ],
+ txnNumber: NumberLong(txnNumber++)
}));
assert.eq(false, res.results[0], "Op required upsert, which should be disallowed.");
@@ -219,7 +307,8 @@
doTxn: [
{"op": "i", "ns": t.getFullName(), "o": {_id: 6}},
{"op": "u", "ns": t.getFullName(), "o2": {_id: 6}, "o": {$set: {z: 1, a: 2}}}
- ]
+ ],
+ txnNumber: NumberLong(txnNumber++)
}));
assert.eq(t.findOne({_id: 6}), {_id: 6, a: 2, z: 1}); // Note: 'a' and 'z' have been sorted.
@@ -233,7 +322,8 @@
"o2": {_id: 7},
"o": {$v: NumberLong(0), $set: {z: 1, a: 2}}
}
- ]
+ ],
+ txnNumber: NumberLong(txnNumber++),
}));
assert.eq(res.code, 40682);
@@ -248,7 +338,8 @@
"o2": {_id: 8},
"o": {$v: NumberLong(1), $set: {z: 1, a: 2}}
}
- ]
+ ],
+ txnNumber: NumberLong(txnNumber++),
}));
assert.eq(t.findOne({_id: 8}), {_id: 8, a: 2, z: 1}); // Note: 'a' and 'z' have been sorted.
})();
diff --git a/jstests/core/do_txn_oneshot.js b/jstests/core/do_txn_oneshot.js
new file mode 100644
index 00000000000..89bcb65b815
--- /dev/null
+++ b/jstests/core/do_txn_oneshot.js
@@ -0,0 +1,80 @@
+// @tags: [requires_non_retryable_commands]
+
+// Tests that doTxn produces correct oplog entries.
+(function() {
+ 'use strict';
+ // For isMMAPv1.
+ load("jstests/concurrency/fsm_workload_helpers/server_types.js");
+ // For isReplSet
+ load("jstests/libs/fixture_helpers.js");
+ load('jstests/libs/uuid_util.js');
+
+ if (isMMAPv1(db)) {
+ jsTestLog("Skipping test as the storage engine does not support doTxn.");
+ return;
+ }
+ if (!FixtureHelpers.isReplSet(db)) {
+ jsTestLog("Skipping test as doTxn requires a replSet and replication is not enabled.");
+ return;
+ }
+
+ var oplog = db.getSiblingDB('local').oplog.rs;
+ var session = db.getMongo().startSession();
+ var sessionDb = session.getDatabase("test");
+ var t = db.doTxn;
+ t.drop();
+ db.createCollection(t.getName());
+ const tUuid = getUUIDFromListCollections(db, t.getName());
+
+ //
+ // Test insert ops. Insert ops are unmodified except the UUID field is added.
+ //
+ const insertOps = [
+ {op: 'i', ns: t.getFullName(), o: {_id: 100, x: 1, y: 1}},
+ {op: 'i', ns: t.getFullName(), o: {_id: 101, x: 2, y: 1}},
+ ];
+ assert.commandWorked(sessionDb.runCommand({doTxn: insertOps, txnNumber: NumberLong("1")}));
+ let topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next();
+ assert.eq(topOfOplog.txnNumber, NumberLong("1"));
+ assert.docEq(topOfOplog.o.applyOps, insertOps.map(x => Object.assign(x, {ui: tUuid})));
+
+ //
+ // Test update ops. For updates, the "$v" UpdateSemantics field is added and non-idempotent
+ // operations are made idempotent.
+ //
+ const updateOps = [
+ {op: 'u', ns: t.getFullName(), o: {$inc: {x: 10}}, o2: {_id: 100}},
+ {op: 'u', ns: t.getFullName(), o: {$inc: {x: 10}}, o2: {_id: 101}}
+ ];
+ const expectedUpdateOps = [
+ {op: 'u', ns: t.getFullName(), o: {$v: 1, $set: {x: 11}}, o2: {_id: 100}, ui: tUuid},
+ {op: 'u', ns: t.getFullName(), o: {$v: 1, $set: {x: 12}}, o2: {_id: 101}, ui: tUuid}
+ ];
+ assert.commandWorked(sessionDb.runCommand({doTxn: updateOps, txnNumber: NumberLong("2")}));
+ topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next();
+ assert.eq(topOfOplog.txnNumber, NumberLong("2"));
+ assert.docEq(topOfOplog.o.applyOps, expectedUpdateOps);
+
+ //
+ // Test delete ops. Delete ops are unmodified except the UUID field is added.
+ //
+ const deleteOps = [
+ {op: 'd', ns: t.getFullName(), o: {_id: 100}},
+ {op: 'd', ns: t.getFullName(), o: {_id: 101}}
+ ];
+ assert.commandWorked(sessionDb.runCommand({doTxn: deleteOps, txnNumber: NumberLong("3")}));
+ topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next();
+ assert.eq(topOfOplog.txnNumber, NumberLong("3"));
+ assert.docEq(topOfOplog.o.applyOps, deleteOps.map(x => Object.assign(x, {ui: tUuid})));
+
+ //
+ // Make sure the transaction table is not affected by one-shot transactions.
+ //
+ assert.eq(0,
+ db.getSiblingDB('config')
+ .transactions.find({"_id.id": session.getSessionId().id})
+ .toArray(),
+ "No transactions should be written to the transaction table.");
+
+ session.endSession();
+})();
diff --git a/jstests/core/do_txn_atomicity.js b/jstests/core/do_txn_oplog_entry.js
index a1c54f5dd7e..23be1c4d819 100644
--- a/jstests/core/do_txn_atomicity.js
+++ b/jstests/core/do_txn_oplog_entry.js
@@ -6,47 +6,60 @@
// For isMMAPv1.
load("jstests/concurrency/fsm_workload_helpers/server_types.js");
+ // For isReplSet
+ load("jstests/libs/fixture_helpers.js");
if (isMMAPv1(db)) {
jsTestLog("Skipping test as the storage engine does not support doTxn.");
return;
}
+ if (!FixtureHelpers.isReplSet(db)) {
+ jsTestLog("Skipping test as doTxn requires a replSet and replication is not enabled.");
+ return;
+ }
+
+ var session = db.getMongo().startSession();
+ var sessionDb = session.getDatabase("test");
+ var txnNumber = 0;
var t = db.doTxn;
t.drop();
assert.writeOK(t.insert({_id: 1}));
// Operations including commands are not allowed and should be rejected completely.
- assert.commandFailedWithCode(db.adminCommand({
+ assert.commandFailedWithCode(sessionDb.adminCommand({
doTxn: [
{op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}},
{op: 'c', ns: "invalid", o: {create: "t"}},
- ]
+ ],
+ txnNumber: NumberLong(txnNumber++)
}),
ErrorCodes.InvalidOptions);
assert.eq(t.count({x: 1}), 0);
// Operations only including CRUD commands should be atomic, so the next insert will fail.
var tooLong = Array(2000).join("hello");
- assert.commandFailedWithCode(db.adminCommand({
+ assert.commandFailedWithCode(sessionDb.adminCommand({
doTxn: [
{op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}},
{op: 'i', ns: t.getFullName(), o: {_id: tooLong, x: 1}},
- ]
+ ],
+ txnNumber: NumberLong(txnNumber++)
}),
ErrorCodes.KeyTooLong);
assert.eq(t.count({x: 1}), 0);
// Operations on non-existent databases cannot be atomic.
var newDBName = "do_txn_atomicity";
- var newDB = db.getSiblingDB(newDBName);
+ var newDB = sessionDb.getSiblingDB(newDBName);
assert.commandWorked(newDB.dropDatabase());
// Updates on a non-existent database no longer implicitly create collections and will fail with
// a NamespaceNotFound error.
- assert.commandFailedWithCode(
- newDB.runCommand(
- {doTxn: [{op: "u", ns: newDBName + ".foo", o: {_id: 5, x: 17}, o2: {_id: 5, x: 16}}]}),
- ErrorCodes.NamespaceNotFound);
+ assert.commandFailedWithCode(newDB.runCommand({
+ doTxn: [{op: "u", ns: newDBName + ".foo", o: {_id: 5, x: 17}, o2: {_id: 5, x: 16}}],
+ txnNumber: NumberLong(txnNumber++)
+ }),
+ ErrorCodes.NamespaceNotFound);
var sawTooManyLocksError = false;
@@ -66,7 +79,8 @@
multiOps.push({op: 'i', ns: newDBName + "." + multiName, o: {_id: 0, x: [0, 1]}});
}
- let res = [cappedOps, multiOps].map((doTxn) => newDB.runCommand({doTxn}));
+ let res = [cappedOps, multiOps].map(
+ (doTxn) => newDB.runCommand({doTxn: doTxn, txnNumber: NumberLong(txnNumber++)}));
sawTooManyLocksError |= res.some((res) => res.code === ErrorCodes.TooManyLocks);
// Transactions involving just two collections should succeed.
if (n <= 2)
diff --git a/jstests/core/json_schema/misc_validation.js b/jstests/core/json_schema/misc_validation.js
index 23ce4ef72c1..29504be9d69 100644
--- a/jstests/core/json_schema/misc_validation.js
+++ b/jstests/core/json_schema/misc_validation.js
@@ -25,6 +25,8 @@
// For isMMAPv1.
load("jstests/concurrency/fsm_workload_helpers/server_types.js");
+ // For isReplSet
+ load("jstests/libs/fixture_helpers.js");
const testName = "json_schema_misc_validation";
const testDB = db.getSiblingDB(testName);
@@ -319,9 +321,11 @@
coll.drop();
assert.writeOK(coll.insert({_id: 1, a: true}));
- if (!isMMAPv1(db)) {
+ if (FixtureHelpers.isReplSet(db) && !isMongos && !isMMAPv1(db)) {
// Test $jsonSchema in the precondition checking for doTxn.
- res = testDB.adminCommand({
+ const session = db.getMongo().startSession();
+ const sessionDb = session.getDatabase(testDB.getName());
+ res = sessionDb.adminCommand({
doTxn: [
{op: "u", ns: coll.getFullName(), o2: {_id: 1}, o: {$set: {a: false}}},
],
@@ -329,7 +333,8 @@
ns: coll.getFullName(),
q: {$jsonSchema: {properties: {a: {type: "boolean"}}}},
res: {a: true}
- }]
+ }],
+ txnNumber: NumberLong("0")
});
assert.commandWorked(res);
assert.eq(1, res.applied);
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index 696e72fff80..e5f11d2c1b5 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -190,10 +190,17 @@
delete: {command: {delete: "view", deletes: [{q: {x: 1}, limit: 1}]}, expectFailure: true},
distinct: {command: {distinct: "view", key: "_id"}},
doTxn: {
- command: {doTxn: [{op: "i", o: {_id: 1}, ns: "test.view"}]},
+ command: {
+ doTxn: [{op: "i", o: {_id: 1}, ns: "test.view"}],
+ txnNumber: NumberLong("0"),
+ lsid: {id: UUID()}
+ },
expectFailure: true,
- expectedErrorCode:
- [ErrorCodes.CommandNotSupportedOnView, ErrorCodes.CommandNotSupported],
+ expectedErrorCode: [
+ ErrorCodes.CommandNotSupportedOnView,
+ ErrorCodes.CommandNotSupported,
+ ErrorCodes.IllegalOperation
+ ],
skipSharded: true,
},
driverOIDTest: {skip: isUnrelated},
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 4a1977ebf7d..eea416f7db9 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -55,6 +55,7 @@
#include "mongo/util/fail_point_service.h"
namespace mongo {
+using repl::OplogEntry;
namespace {
MONGO_FP_DECLARE(failCollectionUpdates);
@@ -254,6 +255,31 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
return opTimes;
}
+/**
+ * Write oplog entry for applyOps/atomic transaction operations.
+ */
+OpTimeBundle replLogApplyOps(OperationContext* opCtx,
+ const NamespaceString& cmdNss,
+ const BSONObj& applyOpCmd,
+ const OperationSessionInfo& sessionInfo,
+ StmtId stmtId,
+ const repl::OplogLink& oplogLink) {
+ OpTimeBundle times;
+ times.wallClockTime = getWallClockTimeForOpLog(opCtx);
+ times.writeOpTime = repl::logOp(opCtx,
+ "c",
+ cmdNss,
+ {},
+ applyOpCmd,
+ nullptr,
+ false,
+ times.wallClockTime,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+ return times;
+}
+
} // namespace
void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
@@ -313,6 +339,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator end,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
+ if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
+ for (auto iter = begin; iter != end; iter++) {
+ auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc);
+ session->addTransactionOperation(opCtx, operation);
+ }
+ return;
+ }
const auto lastWriteDate = getWallClockTimeForOpLog(opCtx);
@@ -377,6 +410,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
}
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
+ if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
+ auto operation =
+ OplogEntry::makeUpdateOperation(args.nss, args.uuid, args.update, args.criteria);
+ session->addTransactionOperation(opCtx, operation);
+ return;
+ }
const auto opTime = replLogUpdate(opCtx, session, args);
AuthorizationManager::get(opCtx->getServiceContext())
@@ -427,10 +466,15 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
StmtId stmtId,
bool fromMigrate,
const boost::optional<BSONObj>& deletedDoc) {
+ Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
auto& deleteState = getDeleteState(opCtx);
invariant(!deleteState.documentKey.isEmpty());
-
- Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
+ if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) {
+ auto operation = OplogEntry::makeDeleteOperation(
+ nss, uuid, deletedDoc ? deletedDoc.get() : deleteState.documentKey);
+ session->addTransactionOperation(opCtx, operation);
+ return;
+ }
const auto opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, fromMigrate, deletedDoc);
AuthorizationManager::get(opCtx->getServiceContext())
@@ -739,17 +783,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& applyOpCmd) {
const NamespaceString cmdNss{dbName, "$cmd"};
- repl::logOp(opCtx,
- "c",
- cmdNss,
- {},
- applyOpCmd,
- nullptr,
- false,
- getWallClockTimeForOpLog(opCtx),
- {},
- kUninitializedStmtId,
- {});
+ replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {});
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr);
@@ -782,6 +816,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx,
void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) {
invariant(opCtx->getTxnNumber());
+ Session* const session = OperationContextSession::get(opCtx);
+ invariant(session);
+ invariant(session->inMultiDocumentTransaction());
+ auto stmts = session->endTransactionAndRetrieveOperations();
+
+ // It is possible that the transaction resulted in no changes. In that case, we should
+ // not write an empty applyOps entry.
+ if (stmts.empty())
+ return;
+
+ BSONObjBuilder applyOpsBuilder;
+ BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
+ for (auto& stmt : stmts) {
+ opsArray.append(stmt.toBSON());
+ }
+ opsArray.done();
+ const auto dbName = stmts[0].getNamespace().db().toString();
+ const NamespaceString cmdNss{dbName, "$cmd"};
+
+ OperationSessionInfo sessionInfo;
+ repl::OplogLink oplogLink;
+ sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
+ sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
+ StmtId stmtId(0);
+ oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
+ // Until we support multiple oplog entries per transaction, prevOpTime should always be null.
+ invariant(oplogLink.prevOpTime.isNull());
+
+ auto applyOpCmd = applyOpsBuilder.done();
+ auto times = replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink);
+
+ onWriteOpCompleted(opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime);
}
void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) {
diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp
index 6506700a85f..777f8ccabcc 100644
--- a/src/mongo/db/repl/do_txn.cpp
+++ b/src/mongo/db/repl/do_txn.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -102,8 +103,7 @@ Status _doTxn(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& doTxnCmd,
BSONObjBuilder* result,
- int* numApplied,
- BSONArrayBuilder* opsBuilder) {
+ int* numApplied) {
BSONObj ops = doTxnCmd.firstElement().Obj();
// apply
*numApplied = 0;
@@ -174,22 +174,6 @@ Status _doTxn(OperationContext* opCtx,
if (!status.isOK())
return status;
- // Append completed op, including UUID if available, to 'opsBuilder'.
- if (opsBuilder) {
- if (opObj.hasField("ui") || nss.isSystemDotIndexes() ||
- !(collection && collection->uuid())) {
- // No changes needed to operation document.
- opsBuilder->append(opObj);
- } else {
- // Operation document has no "ui" field and collection has a UUID.
- auto uuid = collection->uuid();
- BSONObjBuilder opBuilder;
- opBuilder.appendElements(opObj);
- uuid->appendToBuilder(&opBuilder, "ui");
- opsBuilder->append(opBuilder.obj());
- }
- }
-
ab.append(status.isOK());
if (!status.isOK()) {
log() << "doTxn error applying: " << status;
@@ -293,10 +277,16 @@ Status doTxn(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& doTxnCmd,
BSONObjBuilder* result) {
+ auto txnNumber = opCtx->getTxnNumber();
+ uassert(ErrorCodes::InvalidOptions, "doTxn can only be run with a transaction ID.", txnNumber);
+ auto* session = OperationContextSession::get(opCtx);
+ uassert(ErrorCodes::InvalidOptions, "doTxn must be run within a session", session);
+ invariant(!session->getAutocommit());
uassert(
ErrorCodes::InvalidOptions, "doTxn supports only CRUD opts.", _areOpsCrudOnly(doTxnCmd));
auto hasPrecondition = _hasPrecondition(doTxnCmd);
+
// Acquire global lock in IX mode so that the replication state check will remain valid.
Lock::GlobalLock globalLock(opCtx, MODE_IX, Date_t::max());
@@ -313,11 +303,6 @@ Status doTxn(OperationContext* opCtx,
try {
writeConflictRetry(opCtx, "doTxn", dbName, [&] {
BSONObjBuilder intermediateResult;
- std::unique_ptr<BSONArrayBuilder> opsBuilder;
- if (opCtx->writesAreReplicated() &&
- repl::ReplicationCoordinator::modeMasterSlave != replCoord->getReplicationMode()) {
- opsBuilder = stdx::make_unique<BSONArrayBuilder>();
- }
// The write unit of work guarantees snapshot isolation for precondition check and the
// write.
WriteUnitOfWork wunit(opCtx);
@@ -329,45 +314,10 @@ Status doTxn(OperationContext* opCtx,
}
numApplied = 0;
- {
- // Suppress replication for atomic operations until end of doTxn.
- repl::UnreplicatedWritesBlock uwb(opCtx);
- uassertStatusOK(_doTxn(
- opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied, opsBuilder.get()));
- }
- // Generate oplog entry for all atomic ops collectively.
- if (opCtx->writesAreReplicated()) {
- // We want this applied atomically on slaves so we rewrite the oplog entry without
- // the pre-condition for speed.
-
- BSONObjBuilder cmdBuilder;
-
- auto opsFieldName = doTxnCmd.firstElement().fieldNameStringData();
- for (auto elem : doTxnCmd) {
- auto name = elem.fieldNameStringData();
- if (name == opsFieldName) {
- // This should be written as applyOps, not doTxn.
- invariant(opsFieldName == "doTxn"_sd);
- if (opsBuilder) {
- cmdBuilder.append("applyOps"_sd, opsBuilder->arr());
- } else {
- cmdBuilder.appendAs(elem, "applyOps"_sd);
- }
- continue;
- }
- if (name == DoTxn::kPreconditionFieldName)
- continue;
- if (name == bypassDocumentValidationCommandOption())
- continue;
- cmdBuilder.append(elem);
- }
-
- const BSONObj cmdRewritten = cmdBuilder.done();
-
- auto opObserver = getGlobalServiceContext()->getOpObserver();
- invariant(opObserver);
- opObserver->onApplyOps(opCtx, dbName, cmdRewritten);
- }
+ uassertStatusOK(_doTxn(opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied));
+ auto opObserver = getGlobalServiceContext()->getOpObserver();
+ invariant(opObserver);
+ opObserver->onTransactionCommit(opCtx);
wunit.commit();
result->appendElements(intermediateResult.obj());
});
diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp
index b70c21d1635..812e45a1ca4 100644
--- a/src/mongo/db/repl/do_txn_test.cpp
+++ b/src/mongo/db/repl/do_txn_test.cpp
@@ -30,12 +30,16 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
+#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_noop.h"
+#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/do_txn.h"
+#include "mongo/db/repl/oplog_interface_local.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/logger/logger.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/memory.h"
@@ -45,27 +49,24 @@ namespace repl {
namespace {
/**
- * Mock OpObserver that tracks doTxn events. doTxn internally applies its arguments using applyOps.
+ * Mock OpObserver that tracks doTxn commit events.
*/
class OpObserverMock : public OpObserverNoop {
public:
/**
- * Called by doTxn() when ops are applied atomically.
+ * Called by doTxn() when ops are ready to commit.
*/
- void onApplyOps(OperationContext* opCtx,
- const std::string& dbName,
- const BSONObj& doTxnCmd) override;
+ void onTransactionCommit(OperationContext* opCtx) override;
- // If not empty, holds the command object passed to last invocation of onApplyOps().
+ // If not empty, holds the command object written out by the ObObserverImpl onTransactionCommit.
BSONObj onApplyOpsCmdObj;
};
-void OpObserverMock::onApplyOps(OperationContext* opCtx,
- const std::string& dbName,
- const BSONObj& doTxnCmd) {
- ASSERT_FALSE(doTxnCmd.isEmpty());
- // Get owned copy because 'doTxnCmd' may be a temporary BSONObj created by doTxn().
- onApplyOpsCmdObj = doTxnCmd.getOwned();
+void OpObserverMock::onTransactionCommit(OperationContext* opCtx) {
+ OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns());
+ auto oplogIter = oplogInterface.makeIterator();
+ auto opEntry = unittest::assertGet(oplogIter->next());
+ onApplyOpsCmdObj = opEntry.first.getObjectField("o").getOwned();
}
/**
@@ -77,8 +78,14 @@ private:
void tearDown() override;
protected:
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
OpObserverMock* _opObserver = nullptr;
std::unique_ptr<StorageInterface> _storage;
+ ServiceContext::UniqueOperationContext _opCtx;
+ boost::optional<OperationContextSession> _ocs;
};
void DoTxnTest::setUp() {
@@ -86,28 +93,45 @@ void DoTxnTest::setUp() {
ServiceContextMongoDTest::setUp();
auto service = getServiceContext();
- auto opCtx = cc().makeOperationContext();
+ _opCtx = cc().makeOperationContext();
// Set up ReplicationCoordinator and create oplog.
ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service));
setOplogCollectionName(service);
- createOplog(opCtx.get());
+ createOplog(_opCtx.get());
// Ensure that we are primary.
- auto replCoord = ReplicationCoordinator::get(opCtx.get());
+ auto replCoord = ReplicationCoordinator::get(_opCtx.get());
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY));
- // Use OpObserverMock to track notifications for doTxn().
+ // Set up session catalog
+ SessionCatalog::reset_forTest(service);
+ SessionCatalog::create(service);
+ SessionCatalog::get(service)->onStepUp(_opCtx.get());
+
+ // Need the OpObserverImpl in the registry in order for doTxn to work.
+ OpObserverRegistry* opObserverRegistry =
+ dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
+ opObserverRegistry->addObserver(stdx::make_unique<OpObserverImpl>());
+
+ // Use OpObserverMock to track applyOps calls generated by doTxn().
auto opObserver = stdx::make_unique<OpObserverMock>();
_opObserver = opObserver.get();
- service->setOpObserver(std::move(opObserver));
+ opObserverRegistry->addObserver(std::move(opObserver));
// This test uses StorageInterface to create collections and inspect documents inside
// collections.
_storage = stdx::make_unique<StorageInterfaceImpl>();
+
+ // Set up the transaction and session.
+ _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ _opCtx->setTxnNumber(0); // TxnNumber can always be 0 because we have a new session.
+ _ocs.emplace(_opCtx.get(), true /* checkOutSession */, false /* autocommit */);
}
void DoTxnTest::tearDown() {
+ _ocs = boost::none;
+ _opCtx = nullptr;
_storage = {};
_opObserver = nullptr;
@@ -134,15 +158,6 @@ Status getStatusFromDoTxnResult(const BSONObj& result) {
return getStatusFromCommandResult(newResult);
}
-TEST_F(DoTxnTest, AtomicDoTxnWithNoOpsReturnsSuccess) {
- auto opCtx = cc().makeOperationContext();
- BSONObjBuilder resultBuilder;
- auto cmdObj = BSON("doTxn" << BSONArray());
- auto expectedCmdObj = BSON("applyOps" << BSONArray());
- ASSERT_OK(doTxn(opCtx.get(), "test", cmdObj, &resultBuilder));
- ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj);
-}
-
BSONObj makeInsertOperation(const NamespaceString& nss,
const OptionalCollectionUUID& uuid,
const BSONObj& documentToInsert) {
@@ -183,37 +198,39 @@ BSONObj makeApplyOpsWithInsertOperation(const NamespaceString& nss,
}
TEST_F(DoTxnTest, AtomicDoTxnInsertIntoNonexistentCollectionReturnsNamespaceNotFoundInResult) {
- auto opCtx = cc().makeOperationContext();
NamespaceString nss("test.t");
auto documentToInsert = BSON("_id" << 0);
auto cmdObj = makeDoTxnWithInsertOperation(nss, boost::none, documentToInsert);
BSONObjBuilder resultBuilder;
- ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx.get(), "test", cmdObj, &resultBuilder));
+ ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx(), "test", cmdObj, &resultBuilder));
auto result = resultBuilder.obj();
auto status = getStatusFromDoTxnResult(result);
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
}
TEST_F(DoTxnTest, AtomicDoTxnInsertWithUuidIntoCollectionWithUuid) {
- auto opCtx = cc().makeOperationContext();
NamespaceString nss("test.t");
auto uuid = UUID::gen();
CollectionOptions collectionOptions;
collectionOptions.uuid = uuid;
- ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions));
+ ASSERT_OK(_storage->createCollection(opCtx(), nss, collectionOptions));
auto documentToInsert = BSON("_id" << 0);
auto cmdObj = makeDoTxnWithInsertOperation(nss, uuid, documentToInsert);
auto expectedCmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert);
BSONObjBuilder resultBuilder;
- ASSERT_OK(doTxn(opCtx.get(), "test", cmdObj, &resultBuilder));
- ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj);
+ ASSERT_OK(doTxn(opCtx(), "test", cmdObj, &resultBuilder));
+ ASSERT_EQ(expectedCmdObj.woCompare(_opObserver->onApplyOpsCmdObj,
+ BSONObj(),
+ BSONObj::ComparisonRules::kIgnoreFieldOrder |
+ BSONObj::ComparisonRules::kConsiderFieldName),
+ 0)
+ << "expected: " << expectedCmdObj << " got: " << _opObserver->onApplyOpsCmdObj;
}
TEST_F(DoTxnTest, AtomicDoTxnInsertWithUuidIntoCollectionWithOtherUuid) {
- auto opCtx = cc().makeOperationContext();
NamespaceString nss("test.t");
auto doTxnUuid = UUID::gen();
@@ -222,38 +239,42 @@ TEST_F(DoTxnTest, AtomicDoTxnInsertWithUuidIntoCollectionWithOtherUuid) {
CollectionOptions collectionOptions;
collectionOptions.uuid = UUID::gen();
ASSERT_NOT_EQUALS(doTxnUuid, collectionOptions.uuid);
- ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions));
+ ASSERT_OK(_storage->createCollection(opCtx(), nss, collectionOptions));
// The doTxn returns a NamespaceNotFound error because of the failed UUID lookup
// even though a collection exists with the same namespace as the insert operation.
auto documentToInsert = BSON("_id" << 0);
auto cmdObj = makeDoTxnWithInsertOperation(nss, doTxnUuid, documentToInsert);
BSONObjBuilder resultBuilder;
- ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx.get(), "test", cmdObj, &resultBuilder));
+ ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx(), "test", cmdObj, &resultBuilder));
auto result = resultBuilder.obj();
auto status = getStatusFromDoTxnResult(result);
ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status);
}
TEST_F(DoTxnTest, AtomicDoTxnInsertWithoutUuidIntoCollectionWithUuid) {
- auto opCtx = cc().makeOperationContext();
NamespaceString nss("test.t");
auto uuid = UUID::gen();
CollectionOptions collectionOptions;
collectionOptions.uuid = uuid;
- ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions));
+ ASSERT_OK(_storage->createCollection(opCtx(), nss, collectionOptions));
auto documentToInsert = BSON("_id" << 0);
auto cmdObj = makeDoTxnWithInsertOperation(nss, boost::none, documentToInsert);
BSONObjBuilder resultBuilder;
- ASSERT_OK(doTxn(opCtx.get(), "test", cmdObj, &resultBuilder));
+ ASSERT_OK(doTxn(opCtx(), "test", cmdObj, &resultBuilder));
// Insert operation provided by caller did not contain collection uuid but doTxn() should add
// the uuid to the oplog entry.
auto expectedCmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert);
- ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj);
+ ASSERT_EQ(expectedCmdObj.woCompare(_opObserver->onApplyOpsCmdObj,
+ BSONObj(),
+ BSONObj::ComparisonRules::kIgnoreFieldOrder |
+ BSONObj::ComparisonRules::kConsiderFieldName),
+ 0)
+ << "expected: " << expectedCmdObj << " got: " << _opObserver->onApplyOpsCmdObj;
}
} // namespace
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index c0f2e6a00c6..cef46afc15c 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -289,5 +289,9 @@ std::ostream& operator<<(std::ostream& s, const OplogEntry& o) {
return s << o.toString();
}
+std::ostream& operator<<(std::ostream& s, const ReplOperation& o) {
+ return s << o.toBSON().toString();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 1b3b15d1bae..e6eb921d985 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -155,5 +155,7 @@ inline bool operator==(const OplogEntry& lhs, const OplogEntry& rhs) {
return SimpleBSONObjComparator::kInstance.evaluate(lhs.raw == rhs.raw);
}
+std::ostream& operator<<(std::ostream& s, const ReplOperation& o);
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index e0a4ff28390..155779071d0 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -689,7 +689,11 @@ SessionRecordMap getLatestSessionRecords(const MultiApplier::Operations& ops) {
for (auto&& op : ops) {
const auto& sessionInfo = op.getOperationSessionInfo();
- if (sessionInfo.getTxnNumber()) {
+ // Do not write session table entries for applyOps, as multi-document transactions
+ // and retryable writes do not work together.
+ // TODO(SERVER-33501): Make multi-docunment transactions work with retryable writes.
+ if (sessionInfo.getTxnNumber() &&
+ (!op.isCommand() || op.getCommandType() != OplogEntry::CommandType::kApplyOps)) {
const auto& lsid = *sessionInfo.getSessionId();
SessionTxnRecord record;
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index a23026032a8..bf482e3a8b8 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -106,6 +106,7 @@ const StringMap<int> sessionCheckoutWhitelist = {{"aggregate", 1},
{"count", 1},
{"delete", 1},
{"distinct", 1},
+ {"doTxn", 1},
{"eval", 1},
{"$eval", 1},
{"explain", 1},
@@ -500,6 +501,11 @@ void execCommandDatabase(OperationContext* opCtx,
boost::optional<bool> autocommitVal = boost::none;
if (sessionOptions && sessionOptions->getAutocommit()) {
autocommitVal = *sessionOptions->getAutocommit();
+ } else if (sessionOptions && command->getName() == "doTxn") {
+ // Autocommit is overridden specifically for doTxn to get the oplog entry generation
+ // behavior used for multi-document transactions.
+ // The doTxn command still logically behaves as a commit.
+ autocommitVal = false;
}
OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession, autocommitVal);
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 1310b261932..1134299df81 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -281,6 +281,10 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
invariant(opCtx->lockState()->inAWriteUnitOfWork());
stdx::unique_lock<stdx::mutex> ul(_mutex);
+ // Multi-document transactions currently do not write to the transaction table.
+ // TODO(SERVER-32323): Update transaction table appropriately when a transaction commits.
+ if (!_autocommit)
+ return;
// Sanity check that we don't double-execute statements
for (const auto stmtId : stmtIdsWritten) {
@@ -440,6 +444,9 @@ void Session::_beginOrContinueTxn(WithLock wl,
_setActiveTxn(wl, txnNumber);
_autocommit = (autocommit != boost::none) ? *autocommit : true; // autocommit defaults to true
_isSnapshotTxn = false;
+ _txnState = _autocommit ? MultiDocumentTransactionState::kNone
+ : MultiDocumentTransactionState::kInProgress;
+ invariant(_transactionOperations.empty());
}
void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const {
@@ -450,6 +457,16 @@ void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const {
<< _activeTxnNumber
<< " has already started.",
txnNumber >= _activeTxnNumber);
+ // TODO(SERVER-33432): Auto-abort an old transaction when a new one starts instead of asserting.
+ uassert(40691,
+ str::stream() << "Cannot start transaction " << txnNumber << " on session "
+ << getSessionId()
+ << " because a multi-document transaction "
+ << _activeTxnNumber
+ << " is in progress.",
+ txnNumber == _activeTxnNumber ||
+ (_transactionOperations.empty() &&
+ _txnState != MultiDocumentTransactionState::kCommitting));
}
void Session::stashTransactionResources(OperationContext* opCtx) {
@@ -547,6 +564,43 @@ void Session::_setActiveTxn(WithLock, TxnNumber txnNumber) {
_hasIncompleteHistory = false;
}
+void Session::addTransactionOperation(OperationContext* opCtx,
+ const repl::ReplOperation& operation) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_txnState == MultiDocumentTransactionState::kInProgress);
+ invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber);
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+ if (_transactionOperations.empty()) {
+ auto txnNumberCompleting = _activeTxnNumber;
+ opCtx->recoveryUnit()->onRollback([this, txnNumberCompleting] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_activeTxnNumber == txnNumberCompleting);
+ invariant(_txnState != MultiDocumentTransactionState::kCommitted);
+ _transactionOperations.clear();
+ _txnState = MultiDocumentTransactionState::kAborted;
+ });
+ opCtx->recoveryUnit()->onCommit([this, txnNumberCompleting] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(_activeTxnNumber == txnNumberCompleting);
+ invariant(_txnState == MultiDocumentTransactionState::kCommitting ||
+ _txnState == MultiDocumentTransactionState::kCommitted);
+ _txnState = MultiDocumentTransactionState::kCommitted;
+ });
+ }
+ _transactionOperations.push_back(operation);
+}
+
+std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ invariant(!_autocommit);
+ invariant(_txnState == MultiDocumentTransactionState::kInProgress);
+ // If _transactionOperations is empty, we will not see a commit because the write unit
+ // of work is empty.
+ _txnState = _transactionOperations.empty() ? MultiDocumentTransactionState::kCommitted
+ : MultiDocumentTransactionState::kCommitting;
+ return std::move(_transactionOperations);
+}
+
void Session::_checkValid(WithLock) const {
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Session " << getSessionId()
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index cdff0db7b07..5cf9bec1467 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -104,7 +104,6 @@ public:
*/
void beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber);
-
/**
* Called after a write under the specified transaction completes while the node is a primary
* and specifies the statement ids which were written. Must be called while the caller is still
@@ -209,6 +208,33 @@ public:
return _autocommit;
}
+ /**
+ * Returns whether we are in a multi-document transaction, which means we have an active
+ * transaction which has autoCommit:false and has not been committed or aborted.
+ */
+ bool inMultiDocumentTransaction() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _txnState == MultiDocumentTransactionState::kInProgress;
+ };
+
+ /**
+ * Adds a stored operation to the list of stored operations for the current multi-document
+ * (non-autocommit) transaction. It is illegal to add operations when no multi-document
+ * transaction is in progress.
+ */
+ void addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation);
+
+ /**
+ * Returns and clears the stored operations for an multi-document (non-autocommit) transaction,
+ * and marks the transaction as closed. It is illegal to attempt to add operations to the
+ * transaction after this is called.
+ */
+ std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations();
+
+ const std::vector<repl::ReplOperation>& transactionOperationsForTest() {
+ return _transactionOperations;
+ }
+
private:
void _beginOrContinueTxn(WithLock, TxnNumber txnNumber, boost::optional<bool> autocommit);
@@ -277,6 +303,21 @@ private:
// this allows transaction state to be maintained across network operations.
std::unique_ptr<RecoveryUnit> _stashedRecoveryUnit;
+ // Indicates the state of the current multi-document transaction, if any.
+ // If the transaction is in any state but kInProgress, no more operations can
+ // be collected.
+ enum class MultiDocumentTransactionState {
+ kNone,
+ kInProgress,
+ kCommitting,
+ kCommitted,
+ kAborted
+ } _txnState;
+
+ // Holds oplog data for operations which have been applied in the current multi-document
+ // transaction. Not used for retryable writes.
+ std::vector<repl::ReplOperation> _transactionOperations;
+
// For the active txn, tracks which statement ids have been committed and at which oplog
// opTime. Used for fast retryability check and retrieving the previous write's data without
// having to scan through the oplog.
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index fcdff8507f3..a39233ef28a 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -608,5 +608,39 @@ TEST_F(SessionTest, CheckAutocommitOnlyAllowedAtBeginningOfTxn) {
ErrorCodes::IllegalOperation);
}
+TEST_F(SessionTest, SameTransactionPreservesStoredStatements) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 22;
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+ WriteUnitOfWork wuow(opCtx());
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ session.addTransactionOperation(opCtx(), operation);
+ ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON());
+
+ // Re-opening the same transaction should have no effect.
+ session.beginOrContinueTxn(opCtx(), txnNum, boost::none);
+ ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON());
+}
+
+TEST_F(SessionTest, RollbackClearsStoredStatements) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 24;
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+ {
+ WriteUnitOfWork wuow(opCtx());
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ session.addTransactionOperation(opCtx(), operation);
+ ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON());
+ // Since the WriteUnitOfWork was not committed, it will implicitly roll back.
+ }
+ ASSERT_TRUE(session.transactionOperationsForTest().empty());
+}
+
} // anonymous
} // namespace mongo