diff options
-rw-r--r-- | jstests/auth/lib/commands_lib.js | 120 | ||||
-rw-r--r-- | jstests/core/bypass_doc_validation.js | 14 | ||||
-rw-r--r-- | jstests/core/collation.js | 32 | ||||
-rw-r--r-- | jstests/core/do_txn_basic.js | 189 | ||||
-rw-r--r-- | jstests/core/do_txn_oneshot.js | 80 | ||||
-rw-r--r-- | jstests/core/do_txn_oplog_entry.js (renamed from jstests/core/do_txn_atomicity.js) | 34 | ||||
-rw-r--r-- | jstests/core/json_schema/misc_validation.js | 11 | ||||
-rw-r--r-- | jstests/core/views/views_all_commands.js | 13 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 92 | ||||
-rw-r--r-- | src/mongo/db/repl/do_txn.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/repl/do_txn_test.cpp | 99 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/session.h | 43 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 34 |
18 files changed, 673 insertions, 234 deletions
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js index 588fd06a6fb..d185de225e6 100644 --- a/jstests/auth/lib/commands_lib.js +++ b/jstests/auth/lib/commands_lib.js @@ -57,25 +57,28 @@ to be authorized to run the command. 4) skipSharded -Add "skipSharded: true" if you want to run the test only on a standalone. +Add "skipSharded: true" if you want to run the test only ony in a non-sharded configuration. -5) skipStandalone +5) skipUnlessSharded -Add "skipStandalone: true" if you want to run the test only in sharded +Add "skipUnlessSharded: true" if you want to run the test only in sharded configuration. -6) setup +6) skipUnlessReplicaSet +Add "skipUnlessReplicaSet: true" if you want to run the test only when replica sets are in use. + +7) setup The setup function, if present, is called before testing whether a particular role authorizes a command for a particular database. -7) teardown +8) teardown The teardown function, if present, is called immediately after testint whether a particular role authorizes a command for a particular database. -8) privileges +9) privileges An array of privileges used when testing user-defined roles. The test case tests that a user with the specified privileges is authorized to run the command, and that having only a subset of the @@ -178,6 +181,8 @@ var roles_all = { }; load("jstests/libs/uuid_util.js"); +// For isReplSet +load("jstests/libs/fixture_helpers.js"); var authCommandsLib = { @@ -198,7 +203,7 @@ var authCommandsLib = { { testname: "addShard", command: {addShard: "x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -221,7 +226,7 @@ var authCommandsLib = { u: {_id: {ns: "test.x", min: 1}, ns: "test.x"} }] }, - skipStandalone: true, + skipUnlessSharded: true, testcases: [{ runOnDb: "config", roles: roles_clusterManager, @@ -2264,7 +2269,7 @@ var authCommandsLib = { { testname: "balancerStart", command: {balancerStart: 1}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -2285,7 +2290,7 @@ var authCommandsLib = { { testname: "balancerStop", command: {balancerStop: 1}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -2306,7 +2311,7 @@ var authCommandsLib = { { testname: "balancerStatus", command: {balancerStatus: 1}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -2917,9 +2922,12 @@ var authCommandsLib = { "ns": firstDbName + ".x", "o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5} }], - preCondition: [{ns: firstDbName + ".x", q: {x: 5}, res: []}] + preCondition: [{ns: firstDbName + ".x", q: {x: 5}, res: []}], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { db.getSisterDB(firstDbName).x.save({}); }, @@ -2943,9 +2951,12 @@ var authCommandsLib = { "op": "i", "ns": firstDbName + ".x", "o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5} - }] + }], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { db.getSisterDB(firstDbName).x.save({}); }, @@ -2971,10 +2982,13 @@ var authCommandsLib = { "ns": state.collName, "ui": state.uuid, "o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5} - }] + }], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }; }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { var sibling = db.getSisterDB(firstDbName); sibling.runCommand({create: "x"}); @@ -3008,10 +3022,13 @@ var authCommandsLib = { // Given a nonexistent UUID. The command should fail. "ui": UUID("71f1d1d7-68ca-493e-a7e9-f03c94e2e960"), "o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5} - }] + }], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }; }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { var sibling = db.getSisterDB(firstDbName); sibling.runCommand({create: "x"}); @@ -3047,10 +3064,13 @@ var authCommandsLib = { "ns": state.collName, "ui": state.uuid, "o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5} - }] + }], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }; }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { var sibling = db.getSisterDB(firstDbName); sibling.runCommand({create: "x"}); @@ -3084,10 +3104,13 @@ var authCommandsLib = { firstDbName + ".y", // Specify wrong name but correct uuid. Should work. "ui": state.x_uuid, // The insert should on x "o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5} - }] + }], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }; }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { db.getSisterDB(firstDbName).x.drop(); db.getSisterDB(firstDbName).y.drop(); @@ -3123,10 +3146,13 @@ var authCommandsLib = { firstDbName + ".y", // Specify wrong name but correct uuid. Should work. "ui": state.x_uuid, // The insert should on x "o": {"_id": ObjectId("57dc3d7da4fce4358afa85b8"), "data": 5} - }] + }], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }; }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { db.getSisterDB(firstDbName).x.drop(); db.getSisterDB(firstDbName).y.drop(); @@ -3161,9 +3187,12 @@ var authCommandsLib = { "ns": firstDbName + ".x", "o2": {"_id": 1}, "o": {"_id": 1, "data": 8} - }] + }], + txnNumber: NumberLong(0), + lsid: {id: UUID()} }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { db.getSisterDB(firstDbName).x.save({_id: 1, data: 1}); }, @@ -3189,9 +3218,11 @@ var authCommandsLib = { "o2": {"_id": 1}, "o": {"_id": 1, "data": 8} }], - alwaysUpsert: false + txnNumber: NumberLong(0), + lsid: {id: UUID()} }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { db.getSisterDB(firstDbName).x.save({_id: 1, data: 1}); }, @@ -3219,10 +3250,12 @@ var authCommandsLib = { "o2": {"_id": 1}, "o": {"_id": 1, "data": 8} }], - alwaysUpsert: false + txnNumber: NumberLong(0), + lsid: {id: UUID()} }; }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { var sibling = db.getSisterDB(firstDbName); sibling.x.save({_id: 1, data: 1}); @@ -3257,10 +3290,12 @@ var authCommandsLib = { "o2": {"_id": 1}, "o": {"_id": 1, "data": 8} }], - alwaysUpsert: false + txnNumber: NumberLong(0), + lsid: {id: UUID()} }; }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { var sibling = db.getSisterDB(firstDbName); sibling.x.save({_id: 1, data: 1}); @@ -3285,8 +3320,13 @@ var authCommandsLib = { }, { testname: "doTxn_delete", - command: {doTxn: [{"op": "d", "ns": firstDbName + ".x", "o": {"_id": 1}}]}, + command: { + doTxn: [{"op": "d", "ns": firstDbName + ".x", "o": {"_id": 1}}], + txnNumber: NumberLong(0), + lsid: {id: UUID()} + }, skipSharded: true, + skipUnlessReplicaSet: true, setup: function(db) { db.getSisterDB(firstDbName).x.save({_id: 1, data: 1}); }, @@ -3403,7 +3443,7 @@ var authCommandsLib = { { testname: "enableSharding", command: {enableSharding: "x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -3742,7 +3782,7 @@ var authCommandsLib = { { testname: "flushRouterConfig", command: {flushRouterConfig: 1}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -4269,7 +4309,7 @@ var authCommandsLib = { { testname: "killOp", // sharded version command: {killOp: 1, op: "shard1:123"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -4437,7 +4477,7 @@ var authCommandsLib = { { testname: "listShards", command: {listShards: 1}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -4540,7 +4580,7 @@ var authCommandsLib = { { testname: "s_mergeChunks", command: {mergeChunks: "test.x", bounds: [{i: 0}, {i: 5}]}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -4570,7 +4610,7 @@ var authCommandsLib = { { testname: "s_moveChunk", command: {moveChunk: "test.x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -4600,7 +4640,7 @@ var authCommandsLib = { { testname: "movePrimary", command: {movePrimary: "x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -4615,7 +4655,7 @@ var authCommandsLib = { { testname: "netstat", command: {netstat: "x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -4903,7 +4943,7 @@ var authCommandsLib = { { testname: "removeShard", command: {removeShard: "x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -5239,7 +5279,7 @@ var authCommandsLib = { { testname: "shardCollection", command: {shardCollection: "test.x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -5274,7 +5314,7 @@ var authCommandsLib = { { testname: "split", command: {split: "test.x"}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -5531,7 +5571,7 @@ var authCommandsLib = { { testname: "addShardToZone", command: {addShardToZone: shard0name, zone: 'z'}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -5551,7 +5591,7 @@ var authCommandsLib = { { testname: "removeShardFromZone", command: {removeShardFromZone: shard0name, zone: 'z'}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -5574,7 +5614,7 @@ var authCommandsLib = { { testname: "updateZoneKeyRange", command: {updateZoneKeyRange: 'test.foo', min: {x: 1}, max: {x: 5}, zone: 'z'}, - skipStandalone: true, + skipUnlessSharded: true, testcases: [ { runOnDb: adminDbName, @@ -5665,7 +5705,11 @@ var authCommandsLib = { return []; } // others shouldn't run in a standalone environment - if (t.skipStandalone && !this.isMongos(conn)) { + if (t.skipUnlessSharded && !this.isMongos(conn)) { + return []; + } + // some tests require replica sets to be enabled. + if (t.skipUnlessReplicaSet && !FixtureHelpers.isReplSet(conn.getDB("admin"))) { return []; } diff --git a/jstests/core/bypass_doc_validation.js b/jstests/core/bypass_doc_validation.js index 107c4f2aba4..bed22676dc4 100644 --- a/jstests/core/bypass_doc_validation.js +++ b/jstests/core/bypass_doc_validation.js @@ -17,6 +17,8 @@ // For isMMAPv1. load("jstests/concurrency/fsm_workload_helpers/server_types.js"); + // For isReplSet + load("jstests/libs/fixture_helpers.js"); function assertFailsValidation(res) { if (res instanceof WriteResult || res instanceof BulkWriteResult) { @@ -53,12 +55,16 @@ assert.eq(1, coll.count({_id: 9})); } - // Test doTxn with a simple insert if not on mongos and not on MMAPv1. - if (!isMongos && !isMMAPv1(db)) { + // Test doTxn with a simple insert if a replica set, not on mongos and not on MMAPv1. + if (FixtureHelpers.isReplSet(db) && !isMongos && !isMMAPv1(db)) { + const session = db.getMongo().startSession(); + const sessionDb = session.getDatabase(myDb.getName()); const op = [{op: 'i', ns: coll.getFullName(), o: {_id: 10}}]; - assertFailsValidation(myDb.runCommand({doTxn: op, bypassDocumentValidation: false})); + assertFailsValidation(sessionDb.runCommand( + {doTxn: op, bypassDocumentValidation: false, txnNumber: NumberLong("0")})); assert.eq(0, coll.count({_id: 10})); - assert.commandWorked(myDb.runCommand({doTxn: op, bypassDocumentValidation: true})); + assert.commandWorked(sessionDb.runCommand( + {doTxn: op, bypassDocumentValidation: true, txnNumber: NumberLong("1")})); assert.eq(1, coll.count({_id: 10})); } diff --git a/jstests/core/collation.js b/jstests/core/collation.js index 3a089dde4ee..2b3996ccc96 100644 --- a/jstests/core/collation.js +++ b/jstests/core/collation.js @@ -11,6 +11,8 @@ load("jstests/libs/get_index_helpers.js"); // For isMMAPv1. load("jstests/concurrency/fsm_workload_helpers/server_types.js"); + // For isReplSet + load("jstests/libs/fixture_helpers.js"); var coll = db.collation; coll.drop(); @@ -1973,39 +1975,47 @@ } // doTxn - if (!isMongos && !isMMAPv1(db)) { + if (FixtureHelpers.isReplSet(db) && !isMongos && !isMMAPv1(db)) { + const session = db.getMongo().startSession(); + const sessionDb = session.getDatabase(db.getName()); coll.drop(); assert.commandWorked( db.createCollection("collation", {collation: {locale: "en_US", strength: 2}})); assert.writeOK(coll.insert({_id: "foo", x: 5, str: "bar"})); // preCondition.q respects collection default collation. - assert.commandFailed(db.runCommand({ + assert.commandFailed(sessionDb.runCommand({ doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 6}}}], - preCondition: [{ns: coll.getFullName(), q: {_id: "not foo"}, res: {str: "bar"}}] + preCondition: [{ns: coll.getFullName(), q: {_id: "not foo"}, res: {str: "bar"}}], + txnNumber: NumberLong("0") })); assert.eq(5, coll.findOne({_id: "foo"}).x); - assert.commandWorked(db.runCommand({ + assert.commandWorked(sessionDb.runCommand({ doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 6}}}], - preCondition: [{ns: coll.getFullName(), q: {_id: "FOO"}, res: {str: "bar"}}] + preCondition: [{ns: coll.getFullName(), q: {_id: "FOO"}, res: {str: "bar"}}], + txnNumber: NumberLong("1") })); assert.eq(6, coll.findOne({_id: "foo"}).x); // preCondition.res respects collection default collation. - assert.commandFailed(db.runCommand({ + assert.commandFailed(sessionDb.runCommand({ doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 7}}}], - preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "not bar"}}] + preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "not bar"}}], + txnNumber: NumberLong("2") })); assert.eq(6, coll.findOne({_id: "foo"}).x); - assert.commandWorked(db.runCommand({ + assert.commandWorked(sessionDb.runCommand({ doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "foo"}, o: {$set: {x: 7}}}], - preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "BAR"}}] + preCondition: [{ns: coll.getFullName(), q: {_id: "foo"}, res: {str: "BAR"}}], + txnNumber: NumberLong("3") })); assert.eq(7, coll.findOne({_id: "foo"}).x); // <operation>.o2 respects collection default collation. - assert.commandWorked(db.runCommand( - {doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "FOO"}, o: {$set: {x: 8}}}]})); + assert.commandWorked(sessionDb.runCommand({ + doTxn: [{op: "u", ns: coll.getFullName(), o2: {_id: "FOO"}, o: {$set: {x: 8}}}], + txnNumber: NumberLong("4") + })); assert.eq(8, coll.findOne({_id: "foo"}).x); } diff --git a/jstests/core/do_txn_basic.js b/jstests/core/do_txn_basic.js index 8d12ec377d0..199b4948c8e 100644 --- a/jstests/core/do_txn_basic.js +++ b/jstests/core/do_txn_basic.js @@ -6,6 +6,8 @@ load("jstests/libs/get_index_helpers.js"); // For isMMAPv1. load("jstests/concurrency/fsm_workload_helpers/server_types.js"); + // For isReplSet + load("jstests/libs/fixture_helpers.js"); const t = db.do_txn1; @@ -18,6 +20,14 @@ jsTestLog("Skipping test as the storage engine does not support doTxn."); return; } + if (!FixtureHelpers.isReplSet(db)) { + jsTestLog("Skipping test as doTxn requires a replSet and replication is not enabled."); + return; + } + + var session = db.getMongo().startSession(); + db = session.getDatabase("test"); + var txnNumber = 0; t.drop(); @@ -26,56 +36,71 @@ // // Empty array of operations. - assert.commandFailedWithCode(db.adminCommand({doTxn: []}), + assert.commandFailedWithCode(db.adminCommand({doTxn: [], txnNumber: NumberLong(txnNumber++)}), ErrorCodes.InvalidOptions, 'doTxn should fail on empty array of operations'); // Non-array type for operations. - assert.commandFailedWithCode(db.adminCommand({doTxn: "not an array"}), - ErrorCodes.TypeMismatch, - 'doTxn should fail on non-array type for operations'); + assert.commandFailedWithCode( + db.adminCommand({doTxn: "not an array", txnNumber: NumberLong(txnNumber++)}), + ErrorCodes.TypeMismatch, + 'doTxn should fail on non-array type for operations'); // Missing 'op' field in an operation. - assert.commandFailedWithCode(db.adminCommand({doTxn: [{ns: t.getFullName(), o: {_id: 0}}]}), - ErrorCodes.FailedToParse, - 'doTxn should fail on operation without "op" field'); - - // Non-string 'op' field in an operation. assert.commandFailedWithCode( - db.adminCommand({doTxn: [{op: 12345, ns: t.getFullName(), o: {_id: 0}}]}), + db.adminCommand( + {doTxn: [{ns: t.getFullName(), o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}), ErrorCodes.FailedToParse, - 'doTxn should fail on operation with non-string "op" field'); + 'doTxn should fail on operation without "op" field'); + + // Non-string 'op' field in an operation. + assert.commandFailedWithCode(db.adminCommand({ + doTxn: [{op: 12345, ns: t.getFullName(), o: {_id: 0}}], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.FailedToParse, + 'doTxn should fail on operation with non-string "op" field'); // Empty 'op' field value in an operation. - assert.commandFailedWithCode( - db.adminCommand({doTxn: [{op: '', ns: t.getFullName(), o: {_id: 0}}]}), - ErrorCodes.FailedToParse, - 'doTxn should fail on operation with empty "op" field value'); + assert.commandFailedWithCode(db.adminCommand({ + doTxn: [{op: '', ns: t.getFullName(), o: {_id: 0}}], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.FailedToParse, + 'doTxn should fail on operation with empty "op" field value'); // Missing 'ns' field in an operation. - assert.commandFailedWithCode(db.adminCommand({doTxn: [{op: 'u', o: {_id: 0}}]}), - ErrorCodes.FailedToParse, - 'doTxn should fail on operation without "ns" field'); + assert.commandFailedWithCode( + db.adminCommand({doTxn: [{op: 'u', o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}), + ErrorCodes.FailedToParse, + 'doTxn should fail on operation without "ns" field'); // Missing 'o' field in an operation. - assert.commandFailedWithCode(db.adminCommand({doTxn: [{op: 'u', ns: t.getFullName()}]}), - ErrorCodes.FailedToParse, - 'doTxn should fail on operation without "o" field'); + assert.commandFailedWithCode( + db.adminCommand( + {doTxn: [{op: 'u', ns: t.getFullName()}], txnNumber: NumberLong(txnNumber++)}), + ErrorCodes.FailedToParse, + 'doTxn should fail on operation without "o" field'); // Non-string 'ns' field in an operation. - assert.commandFailedWithCode(db.adminCommand({doTxn: [{op: 'u', ns: 12345, o: {_id: 0}}]}), - ErrorCodes.FailedToParse, - 'doTxn should fail on operation with non-string "ns" field'); + assert.commandFailedWithCode( + db.adminCommand( + {doTxn: [{op: 'u', ns: 12345, o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}), + ErrorCodes.FailedToParse, + 'doTxn should fail on operation with non-string "ns" field'); // Missing dbname in 'ns' field. assert.commandFailedWithCode( - db.adminCommand({doTxn: [{op: 'd', ns: t.getName(), o: {_id: 1}}]}), + db.adminCommand( + {doTxn: [{op: 'd', ns: t.getName(), o: {_id: 1}}], txnNumber: NumberLong(txnNumber++)}), ErrorCodes.InvalidNamespace, 'doTxn should fail with a missing dbname in the "ns" field value'); // Empty 'ns' field value. - assert.commandFailed(db.adminCommand({doTxn: [{op: 'u', ns: '', o: {_id: 0}}]}), - 'doTxn should fail with empty "ns" field value'); + assert.commandFailed( + db.adminCommand( + {doTxn: [{op: 'u', ns: '', o: {_id: 0}}], txnNumber: NumberLong(txnNumber++)}), + 'doTxn should fail with empty "ns" field value'); // Valid 'ns' field value in unknown operation type 'x'. assert.commandFailedWithCode( @@ -84,17 +109,69 @@ 'doTxn should fail on unknown operation type "x" with valid "ns" value'); // Illegal operation type 'n' (no-op). - assert.commandFailedWithCode( - db.adminCommand({doTxn: [{op: 'n', ns: t.getFullName(), o: {_id: 0}}]}), - ErrorCodes.InvalidOptions, - 'doTxn should fail on "no op" operations.'); + assert.commandFailedWithCode(db.adminCommand({ + doTxn: [{op: 'n', ns: t.getFullName(), o: {_id: 0}}], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.InvalidOptions, + 'doTxn should fail on "no op" operations.'); // Illegal operation type 'c' (command). + assert.commandFailedWithCode(db.adminCommand({ + doTxn: [{op: 'c', ns: t.getCollection('$cmd').getFullName(), o: {applyOps: []}}], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.InvalidOptions, + 'doTxn should fail on commands.'); + + // No transaction number in an otherwise valid operation. assert.commandFailedWithCode( - db.adminCommand( - {doTxn: [{op: 'c', ns: t.getCollection('$cmd').getFullName(), o: {applyOps: []}}]}), + db.adminCommand({doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}]}), ErrorCodes.InvalidOptions, - 'doTxn should fail on commands.'); + 'doTxn should fail when no transaction number is given.'); + + // Session IDs and transaction numbers on sub-ops are not allowed + var lsid = {id: UUID()}; + res = assert.commandFailedWithCode( + db.runCommand({ + doTxn: [{ + op: "i", + ns: t.getFullName(), + o: {_id: 7, x: 24}, + lsid: lsid, + txnNumber: NumberLong(1), + }], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.FailedToParse, + 'doTxn should fail when inner transaction contains session id.'); + + res = assert.commandFailedWithCode( + db.runCommand({ + doTxn: [{ + op: "u", + ns: t.getFullName(), + o2: {_id: 7}, + o: {$set: {x: 25}}, + txnNumber: NumberLong(1), + }], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.FailedToParse, + 'doTxn should fail when inner transaction contains transaction number.'); + + res = assert.commandFailedWithCode( + db.runCommand({ + doTxn: [{ + op: "d", + ns: t.getFullName(), + o: {_id: 7}, + stmtId: 0, + }], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.FailedToParse, + 'doTxn should fail when inner transaction contains statement id.'); // Malformed operation with unexpected field 'x'. assert.commandFailedWithCode( @@ -115,7 +192,7 @@ const t2 = db.getSiblingDB('do_txn1_no_such_db').getCollection('t'); [t, t2].forEach(coll => { const op = {op: optype, ns: coll.getFullName(), o: o, o2: o2}; - const cmd = {doTxn: [op]}; + const cmd = {doTxn: [op], txnNumber: NumberLong(txnNumber++)}; jsTestLog('Testing doTxn on non-existent namespace: ' + tojson(cmd)); if (expectedErrorCode === ErrorCodes.OK) { assert.commandWorked(db.adminCommand(cmd)); @@ -132,13 +209,17 @@ testCrudOperationOnNonExistentNamespace('u', {x: 0}, {_id: 0}, ErrorCodes.NamespaceNotFound); assert.commandWorked(db.createCollection(t.getName())); - var a = assert.commandWorked( - db.adminCommand({doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}]})); + var a = assert.commandWorked(db.adminCommand({ + doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}], + txnNumber: NumberLong(txnNumber++) + })); assert.eq(1, t.find().count(), "Valid insert failed"); assert.eq(true, a.results[0], "Bad result value for valid insert"); - a = assert.commandWorked( - db.adminCommand({doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}]})); + a = assert.commandWorked(db.adminCommand({ + doTxn: [{"op": "i", "ns": t.getFullName(), "o": {_id: 5, x: 17}}], + txnNumber: NumberLong(txnNumber++) + })); assert.eq(1, t.find().count(), "Duplicate insert failed"); assert.eq(true, a.results[0], "Bad result value for duplicate insert"); @@ -146,14 +227,17 @@ assert.eq(o, t.findOne(), "Mismatching document inserted."); // 'o' field is an empty array. - assert.commandFailed(db.adminCommand({doTxn: [{op: 'i', ns: t.getFullName(), o: []}]}), - 'doTxn should fail on insert of object with empty array element'); + assert.commandFailed( + db.adminCommand( + {doTxn: [{op: 'i', ns: t.getFullName(), o: []}], txnNumber: NumberLong(txnNumber++)}), + 'doTxn should fail on insert of object with empty array element'); var res = assert.commandWorked(db.runCommand({ doTxn: [ {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 18}}}, {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 19}}} - ] + ], + txnNumber: NumberLong(txnNumber++) })); o.x++; @@ -170,7 +254,8 @@ {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 20}}}, {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 21}}} ], - preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}] + preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}], + txnNumber: NumberLong(txnNumber++) })); o.x++; @@ -187,7 +272,8 @@ {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 22}}}, {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 23}}} ], - preCondition: [{ns: "foo.otherName", q: {_id: 5}, res: {x: 21}}] + preCondition: [{ns: "foo.otherName", q: {_id: 5}, res: {x: 21}}], + txnNumber: NumberLong(txnNumber++) })); assert.eq(o, t.findOne(), "preCondition didn't match, but ops were still applied"); @@ -198,7 +284,8 @@ {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 22}}}, {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 23}}} ], - preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}] + preCondition: [{ns: t.getFullName(), q: {_id: 5}, res: {x: 19}}], + txnNumber: NumberLong(txnNumber++) })); assert.eq(o, t.findOne(), "preCondition didn't match, but ops were still applied"); @@ -207,7 +294,8 @@ doTxn: [ {op: "u", ns: t.getFullName(), o2: {_id: 5}, o: {$set: {x: 22}}}, {op: "u", ns: t.getFullName(), o2: {_id: 6}, o: {$set: {x: 23}}} - ] + ], + txnNumber: NumberLong(txnNumber++) })); assert.eq(false, res.results[0], "Op required upsert, which should be disallowed."); @@ -219,7 +307,8 @@ doTxn: [ {"op": "i", "ns": t.getFullName(), "o": {_id: 6}}, {"op": "u", "ns": t.getFullName(), "o2": {_id: 6}, "o": {$set: {z: 1, a: 2}}} - ] + ], + txnNumber: NumberLong(txnNumber++) })); assert.eq(t.findOne({_id: 6}), {_id: 6, a: 2, z: 1}); // Note: 'a' and 'z' have been sorted. @@ -233,7 +322,8 @@ "o2": {_id: 7}, "o": {$v: NumberLong(0), $set: {z: 1, a: 2}} } - ] + ], + txnNumber: NumberLong(txnNumber++), })); assert.eq(res.code, 40682); @@ -248,7 +338,8 @@ "o2": {_id: 8}, "o": {$v: NumberLong(1), $set: {z: 1, a: 2}} } - ] + ], + txnNumber: NumberLong(txnNumber++), })); assert.eq(t.findOne({_id: 8}), {_id: 8, a: 2, z: 1}); // Note: 'a' and 'z' have been sorted. })(); diff --git a/jstests/core/do_txn_oneshot.js b/jstests/core/do_txn_oneshot.js new file mode 100644 index 00000000000..89bcb65b815 --- /dev/null +++ b/jstests/core/do_txn_oneshot.js @@ -0,0 +1,80 @@ +// @tags: [requires_non_retryable_commands] + +// Tests that doTxn produces correct oplog entries. +(function() { + 'use strict'; + // For isMMAPv1. + load("jstests/concurrency/fsm_workload_helpers/server_types.js"); + // For isReplSet + load("jstests/libs/fixture_helpers.js"); + load('jstests/libs/uuid_util.js'); + + if (isMMAPv1(db)) { + jsTestLog("Skipping test as the storage engine does not support doTxn."); + return; + } + if (!FixtureHelpers.isReplSet(db)) { + jsTestLog("Skipping test as doTxn requires a replSet and replication is not enabled."); + return; + } + + var oplog = db.getSiblingDB('local').oplog.rs; + var session = db.getMongo().startSession(); + var sessionDb = session.getDatabase("test"); + var t = db.doTxn; + t.drop(); + db.createCollection(t.getName()); + const tUuid = getUUIDFromListCollections(db, t.getName()); + + // + // Test insert ops. Insert ops are unmodified except the UUID field is added. + // + const insertOps = [ + {op: 'i', ns: t.getFullName(), o: {_id: 100, x: 1, y: 1}}, + {op: 'i', ns: t.getFullName(), o: {_id: 101, x: 2, y: 1}}, + ]; + assert.commandWorked(sessionDb.runCommand({doTxn: insertOps, txnNumber: NumberLong("1")})); + let topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next(); + assert.eq(topOfOplog.txnNumber, NumberLong("1")); + assert.docEq(topOfOplog.o.applyOps, insertOps.map(x => Object.assign(x, {ui: tUuid}))); + + // + // Test update ops. For updates, the "$v" UpdateSemantics field is added and non-idempotent + // operations are made idempotent. + // + const updateOps = [ + {op: 'u', ns: t.getFullName(), o: {$inc: {x: 10}}, o2: {_id: 100}}, + {op: 'u', ns: t.getFullName(), o: {$inc: {x: 10}}, o2: {_id: 101}} + ]; + const expectedUpdateOps = [ + {op: 'u', ns: t.getFullName(), o: {$v: 1, $set: {x: 11}}, o2: {_id: 100}, ui: tUuid}, + {op: 'u', ns: t.getFullName(), o: {$v: 1, $set: {x: 12}}, o2: {_id: 101}, ui: tUuid} + ]; + assert.commandWorked(sessionDb.runCommand({doTxn: updateOps, txnNumber: NumberLong("2")})); + topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next(); + assert.eq(topOfOplog.txnNumber, NumberLong("2")); + assert.docEq(topOfOplog.o.applyOps, expectedUpdateOps); + + // + // Test delete ops. Delete ops are unmodified except the UUID field is added. + // + const deleteOps = [ + {op: 'd', ns: t.getFullName(), o: {_id: 100}}, + {op: 'd', ns: t.getFullName(), o: {_id: 101}} + ]; + assert.commandWorked(sessionDb.runCommand({doTxn: deleteOps, txnNumber: NumberLong("3")})); + topOfOplog = oplog.find().sort({$natural: -1}).limit(1).next(); + assert.eq(topOfOplog.txnNumber, NumberLong("3")); + assert.docEq(topOfOplog.o.applyOps, deleteOps.map(x => Object.assign(x, {ui: tUuid}))); + + // + // Make sure the transaction table is not affected by one-shot transactions. + // + assert.eq(0, + db.getSiblingDB('config') + .transactions.find({"_id.id": session.getSessionId().id}) + .toArray(), + "No transactions should be written to the transaction table."); + + session.endSession(); +})(); diff --git a/jstests/core/do_txn_atomicity.js b/jstests/core/do_txn_oplog_entry.js index a1c54f5dd7e..23be1c4d819 100644 --- a/jstests/core/do_txn_atomicity.js +++ b/jstests/core/do_txn_oplog_entry.js @@ -6,47 +6,60 @@ // For isMMAPv1. load("jstests/concurrency/fsm_workload_helpers/server_types.js"); + // For isReplSet + load("jstests/libs/fixture_helpers.js"); if (isMMAPv1(db)) { jsTestLog("Skipping test as the storage engine does not support doTxn."); return; } + if (!FixtureHelpers.isReplSet(db)) { + jsTestLog("Skipping test as doTxn requires a replSet and replication is not enabled."); + return; + } + + var session = db.getMongo().startSession(); + var sessionDb = session.getDatabase("test"); + var txnNumber = 0; var t = db.doTxn; t.drop(); assert.writeOK(t.insert({_id: 1})); // Operations including commands are not allowed and should be rejected completely. - assert.commandFailedWithCode(db.adminCommand({ + assert.commandFailedWithCode(sessionDb.adminCommand({ doTxn: [ {op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}}, {op: 'c', ns: "invalid", o: {create: "t"}}, - ] + ], + txnNumber: NumberLong(txnNumber++) }), ErrorCodes.InvalidOptions); assert.eq(t.count({x: 1}), 0); // Operations only including CRUD commands should be atomic, so the next insert will fail. var tooLong = Array(2000).join("hello"); - assert.commandFailedWithCode(db.adminCommand({ + assert.commandFailedWithCode(sessionDb.adminCommand({ doTxn: [ {op: 'i', ns: t.getFullName(), o: {_id: ObjectId(), x: 1}}, {op: 'i', ns: t.getFullName(), o: {_id: tooLong, x: 1}}, - ] + ], + txnNumber: NumberLong(txnNumber++) }), ErrorCodes.KeyTooLong); assert.eq(t.count({x: 1}), 0); // Operations on non-existent databases cannot be atomic. var newDBName = "do_txn_atomicity"; - var newDB = db.getSiblingDB(newDBName); + var newDB = sessionDb.getSiblingDB(newDBName); assert.commandWorked(newDB.dropDatabase()); // Updates on a non-existent database no longer implicitly create collections and will fail with // a NamespaceNotFound error. - assert.commandFailedWithCode( - newDB.runCommand( - {doTxn: [{op: "u", ns: newDBName + ".foo", o: {_id: 5, x: 17}, o2: {_id: 5, x: 16}}]}), - ErrorCodes.NamespaceNotFound); + assert.commandFailedWithCode(newDB.runCommand({ + doTxn: [{op: "u", ns: newDBName + ".foo", o: {_id: 5, x: 17}, o2: {_id: 5, x: 16}}], + txnNumber: NumberLong(txnNumber++) + }), + ErrorCodes.NamespaceNotFound); var sawTooManyLocksError = false; @@ -66,7 +79,8 @@ multiOps.push({op: 'i', ns: newDBName + "." + multiName, o: {_id: 0, x: [0, 1]}}); } - let res = [cappedOps, multiOps].map((doTxn) => newDB.runCommand({doTxn})); + let res = [cappedOps, multiOps].map( + (doTxn) => newDB.runCommand({doTxn: doTxn, txnNumber: NumberLong(txnNumber++)})); sawTooManyLocksError |= res.some((res) => res.code === ErrorCodes.TooManyLocks); // Transactions involving just two collections should succeed. if (n <= 2) diff --git a/jstests/core/json_schema/misc_validation.js b/jstests/core/json_schema/misc_validation.js index 23ce4ef72c1..29504be9d69 100644 --- a/jstests/core/json_schema/misc_validation.js +++ b/jstests/core/json_schema/misc_validation.js @@ -25,6 +25,8 @@ // For isMMAPv1. load("jstests/concurrency/fsm_workload_helpers/server_types.js"); + // For isReplSet + load("jstests/libs/fixture_helpers.js"); const testName = "json_schema_misc_validation"; const testDB = db.getSiblingDB(testName); @@ -319,9 +321,11 @@ coll.drop(); assert.writeOK(coll.insert({_id: 1, a: true})); - if (!isMMAPv1(db)) { + if (FixtureHelpers.isReplSet(db) && !isMongos && !isMMAPv1(db)) { // Test $jsonSchema in the precondition checking for doTxn. - res = testDB.adminCommand({ + const session = db.getMongo().startSession(); + const sessionDb = session.getDatabase(testDB.getName()); + res = sessionDb.adminCommand({ doTxn: [ {op: "u", ns: coll.getFullName(), o2: {_id: 1}, o: {$set: {a: false}}}, ], @@ -329,7 +333,8 @@ ns: coll.getFullName(), q: {$jsonSchema: {properties: {a: {type: "boolean"}}}}, res: {a: true} - }] + }], + txnNumber: NumberLong("0") }); assert.commandWorked(res); assert.eq(1, res.applied); diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index 696e72fff80..e5f11d2c1b5 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -190,10 +190,17 @@ delete: {command: {delete: "view", deletes: [{q: {x: 1}, limit: 1}]}, expectFailure: true}, distinct: {command: {distinct: "view", key: "_id"}}, doTxn: { - command: {doTxn: [{op: "i", o: {_id: 1}, ns: "test.view"}]}, + command: { + doTxn: [{op: "i", o: {_id: 1}, ns: "test.view"}], + txnNumber: NumberLong("0"), + lsid: {id: UUID()} + }, expectFailure: true, - expectedErrorCode: - [ErrorCodes.CommandNotSupportedOnView, ErrorCodes.CommandNotSupported], + expectedErrorCode: [ + ErrorCodes.CommandNotSupportedOnView, + ErrorCodes.CommandNotSupported, + ErrorCodes.IllegalOperation + ], skipSharded: true, }, driverOIDTest: {skip: isUnrelated}, diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 4a1977ebf7d..eea416f7db9 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -55,6 +55,7 @@ #include "mongo/util/fail_point_service.h" namespace mongo { +using repl::OplogEntry; namespace { MONGO_FP_DECLARE(failCollectionUpdates); @@ -254,6 +255,31 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, return opTimes; } +/** + * Write oplog entry for applyOps/atomic transaction operations. + */ +OpTimeBundle replLogApplyOps(OperationContext* opCtx, + const NamespaceString& cmdNss, + const BSONObj& applyOpCmd, + const OperationSessionInfo& sessionInfo, + StmtId stmtId, + const repl::OplogLink& oplogLink) { + OpTimeBundle times; + times.wallClockTime = getWallClockTimeForOpLog(opCtx); + times.writeOpTime = repl::logOp(opCtx, + "c", + cmdNss, + {}, + applyOpCmd, + nullptr, + false, + times.wallClockTime, + sessionInfo, + stmtId, + oplogLink); + return times; +} + } // namespace void OpObserverImpl::onCreateIndex(OperationContext* opCtx, @@ -313,6 +339,13 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator end, bool fromMigrate) { Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; + if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { + for (auto iter = begin; iter != end; iter++) { + auto operation = OplogEntry::makeInsertOperation(nss, uuid, iter->doc); + session->addTransactionOperation(opCtx, operation); + } + return; + } const auto lastWriteDate = getWallClockTimeForOpLog(opCtx); @@ -377,6 +410,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; + if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { + auto operation = + OplogEntry::makeUpdateOperation(args.nss, args.uuid, args.update, args.criteria); + session->addTransactionOperation(opCtx, operation); + return; + } const auto opTime = replLogUpdate(opCtx, session, args); AuthorizationManager::get(opCtx->getServiceContext()) @@ -427,10 +466,15 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, StmtId stmtId, bool fromMigrate, const boost::optional<BSONObj>& deletedDoc) { + Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; auto& deleteState = getDeleteState(opCtx); invariant(!deleteState.documentKey.isEmpty()); - - Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr; + if (session && opCtx->writesAreReplicated() && session->inMultiDocumentTransaction()) { + auto operation = OplogEntry::makeDeleteOperation( + nss, uuid, deletedDoc ? deletedDoc.get() : deleteState.documentKey); + session->addTransactionOperation(opCtx, operation); + return; + } const auto opTime = replLogDelete(opCtx, nss, uuid, session, stmtId, fromMigrate, deletedDoc); AuthorizationManager::get(opCtx->getServiceContext()) @@ -739,17 +783,7 @@ void OpObserverImpl::onApplyOps(OperationContext* opCtx, const std::string& dbName, const BSONObj& applyOpCmd) { const NamespaceString cmdNss{dbName, "$cmd"}; - repl::logOp(opCtx, - "c", - cmdNss, - {}, - applyOpCmd, - nullptr, - false, - getWallClockTimeForOpLog(opCtx), - {}, - kUninitializedStmtId, - {}); + replLogApplyOps(opCtx, cmdNss, applyOpCmd, {}, kUninitializedStmtId, {}); AuthorizationManager::get(opCtx->getServiceContext()) ->logOp(opCtx, "c", cmdNss, applyOpCmd, nullptr); @@ -782,6 +816,38 @@ void OpObserverImpl::onEmptyCapped(OperationContext* opCtx, void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { invariant(opCtx->getTxnNumber()); + Session* const session = OperationContextSession::get(opCtx); + invariant(session); + invariant(session->inMultiDocumentTransaction()); + auto stmts = session->endTransactionAndRetrieveOperations(); + + // It is possible that the transaction resulted in no changes. In that case, we should + // not write an empty applyOps entry. + if (stmts.empty()) + return; + + BSONObjBuilder applyOpsBuilder; + BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); + for (auto& stmt : stmts) { + opsArray.append(stmt.toBSON()); + } + opsArray.done(); + const auto dbName = stmts[0].getNamespace().db().toString(); + const NamespaceString cmdNss{dbName, "$cmd"}; + + OperationSessionInfo sessionInfo; + repl::OplogLink oplogLink; + sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); + sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); + StmtId stmtId(0); + oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); + // Until we support multiple oplog entries per transaction, prevOpTime should always be null. + invariant(oplogLink.prevOpTime.isNull()); + + auto applyOpCmd = applyOpsBuilder.done(); + auto times = replLogApplyOps(opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink); + + onWriteOpCompleted(opCtx, cmdNss, session, {stmtId}, times.writeOpTime, times.wallClockTime); } void OpObserverImpl::onTransactionAbort(OperationContext* opCtx) { diff --git a/src/mongo/db/repl/do_txn.cpp b/src/mongo/db/repl/do_txn.cpp index 6506700a85f..777f8ccabcc 100644 --- a/src/mongo/db/repl/do_txn.cpp +++ b/src/mongo/db/repl/do_txn.cpp @@ -50,6 +50,7 @@ #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/service_context.h" +#include "mongo/db/session_catalog.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -102,8 +103,7 @@ Status _doTxn(OperationContext* opCtx, const std::string& dbName, const BSONObj& doTxnCmd, BSONObjBuilder* result, - int* numApplied, - BSONArrayBuilder* opsBuilder) { + int* numApplied) { BSONObj ops = doTxnCmd.firstElement().Obj(); // apply *numApplied = 0; @@ -174,22 +174,6 @@ Status _doTxn(OperationContext* opCtx, if (!status.isOK()) return status; - // Append completed op, including UUID if available, to 'opsBuilder'. - if (opsBuilder) { - if (opObj.hasField("ui") || nss.isSystemDotIndexes() || - !(collection && collection->uuid())) { - // No changes needed to operation document. - opsBuilder->append(opObj); - } else { - // Operation document has no "ui" field and collection has a UUID. - auto uuid = collection->uuid(); - BSONObjBuilder opBuilder; - opBuilder.appendElements(opObj); - uuid->appendToBuilder(&opBuilder, "ui"); - opsBuilder->append(opBuilder.obj()); - } - } - ab.append(status.isOK()); if (!status.isOK()) { log() << "doTxn error applying: " << status; @@ -293,10 +277,16 @@ Status doTxn(OperationContext* opCtx, const std::string& dbName, const BSONObj& doTxnCmd, BSONObjBuilder* result) { + auto txnNumber = opCtx->getTxnNumber(); + uassert(ErrorCodes::InvalidOptions, "doTxn can only be run with a transaction ID.", txnNumber); + auto* session = OperationContextSession::get(opCtx); + uassert(ErrorCodes::InvalidOptions, "doTxn must be run within a session", session); + invariant(!session->getAutocommit()); uassert( ErrorCodes::InvalidOptions, "doTxn supports only CRUD opts.", _areOpsCrudOnly(doTxnCmd)); auto hasPrecondition = _hasPrecondition(doTxnCmd); + // Acquire global lock in IX mode so that the replication state check will remain valid. Lock::GlobalLock globalLock(opCtx, MODE_IX, Date_t::max()); @@ -313,11 +303,6 @@ Status doTxn(OperationContext* opCtx, try { writeConflictRetry(opCtx, "doTxn", dbName, [&] { BSONObjBuilder intermediateResult; - std::unique_ptr<BSONArrayBuilder> opsBuilder; - if (opCtx->writesAreReplicated() && - repl::ReplicationCoordinator::modeMasterSlave != replCoord->getReplicationMode()) { - opsBuilder = stdx::make_unique<BSONArrayBuilder>(); - } // The write unit of work guarantees snapshot isolation for precondition check and the // write. WriteUnitOfWork wunit(opCtx); @@ -329,45 +314,10 @@ Status doTxn(OperationContext* opCtx, } numApplied = 0; - { - // Suppress replication for atomic operations until end of doTxn. - repl::UnreplicatedWritesBlock uwb(opCtx); - uassertStatusOK(_doTxn( - opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied, opsBuilder.get())); - } - // Generate oplog entry for all atomic ops collectively. - if (opCtx->writesAreReplicated()) { - // We want this applied atomically on slaves so we rewrite the oplog entry without - // the pre-condition for speed. - - BSONObjBuilder cmdBuilder; - - auto opsFieldName = doTxnCmd.firstElement().fieldNameStringData(); - for (auto elem : doTxnCmd) { - auto name = elem.fieldNameStringData(); - if (name == opsFieldName) { - // This should be written as applyOps, not doTxn. - invariant(opsFieldName == "doTxn"_sd); - if (opsBuilder) { - cmdBuilder.append("applyOps"_sd, opsBuilder->arr()); - } else { - cmdBuilder.appendAs(elem, "applyOps"_sd); - } - continue; - } - if (name == DoTxn::kPreconditionFieldName) - continue; - if (name == bypassDocumentValidationCommandOption()) - continue; - cmdBuilder.append(elem); - } - - const BSONObj cmdRewritten = cmdBuilder.done(); - - auto opObserver = getGlobalServiceContext()->getOpObserver(); - invariant(opObserver); - opObserver->onApplyOps(opCtx, dbName, cmdRewritten); - } + uassertStatusOK(_doTxn(opCtx, dbName, doTxnCmd, &intermediateResult, &numApplied)); + auto opObserver = getGlobalServiceContext()->getOpObserver(); + invariant(opObserver); + opObserver->onTransactionCommit(opCtx); wunit.commit(); result->appendElements(intermediateResult.obj()); }); diff --git a/src/mongo/db/repl/do_txn_test.cpp b/src/mongo/db/repl/do_txn_test.cpp index b70c21d1635..812e45a1ca4 100644 --- a/src/mongo/db/repl/do_txn_test.cpp +++ b/src/mongo/db/repl/do_txn_test.cpp @@ -30,12 +30,16 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" +#include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_noop.h" +#include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/do_txn.h" +#include "mongo/db/repl/oplog_interface_local.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_catalog.h" #include "mongo/logger/logger.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/memory.h" @@ -45,27 +49,24 @@ namespace repl { namespace { /** - * Mock OpObserver that tracks doTxn events. doTxn internally applies its arguments using applyOps. + * Mock OpObserver that tracks doTxn commit events. */ class OpObserverMock : public OpObserverNoop { public: /** - * Called by doTxn() when ops are applied atomically. + * Called by doTxn() when ops are ready to commit. */ - void onApplyOps(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& doTxnCmd) override; + void onTransactionCommit(OperationContext* opCtx) override; - // If not empty, holds the command object passed to last invocation of onApplyOps(). + // If not empty, holds the command object written out by the ObObserverImpl onTransactionCommit. BSONObj onApplyOpsCmdObj; }; -void OpObserverMock::onApplyOps(OperationContext* opCtx, - const std::string& dbName, - const BSONObj& doTxnCmd) { - ASSERT_FALSE(doTxnCmd.isEmpty()); - // Get owned copy because 'doTxnCmd' may be a temporary BSONObj created by doTxn(). - onApplyOpsCmdObj = doTxnCmd.getOwned(); +void OpObserverMock::onTransactionCommit(OperationContext* opCtx) { + OplogInterfaceLocal oplogInterface(opCtx, NamespaceString::kRsOplogNamespace.ns()); + auto oplogIter = oplogInterface.makeIterator(); + auto opEntry = unittest::assertGet(oplogIter->next()); + onApplyOpsCmdObj = opEntry.first.getObjectField("o").getOwned(); } /** @@ -77,8 +78,14 @@ private: void tearDown() override; protected: + OperationContext* opCtx() { + return _opCtx.get(); + } + OpObserverMock* _opObserver = nullptr; std::unique_ptr<StorageInterface> _storage; + ServiceContext::UniqueOperationContext _opCtx; + boost::optional<OperationContextSession> _ocs; }; void DoTxnTest::setUp() { @@ -86,28 +93,45 @@ void DoTxnTest::setUp() { ServiceContextMongoDTest::setUp(); auto service = getServiceContext(); - auto opCtx = cc().makeOperationContext(); + _opCtx = cc().makeOperationContext(); // Set up ReplicationCoordinator and create oplog. ReplicationCoordinator::set(service, stdx::make_unique<ReplicationCoordinatorMock>(service)); setOplogCollectionName(service); - createOplog(opCtx.get()); + createOplog(_opCtx.get()); // Ensure that we are primary. - auto replCoord = ReplicationCoordinator::get(opCtx.get()); + auto replCoord = ReplicationCoordinator::get(_opCtx.get()); ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_PRIMARY)); - // Use OpObserverMock to track notifications for doTxn(). + // Set up session catalog + SessionCatalog::reset_forTest(service); + SessionCatalog::create(service); + SessionCatalog::get(service)->onStepUp(_opCtx.get()); + + // Need the OpObserverImpl in the registry in order for doTxn to work. + OpObserverRegistry* opObserverRegistry = + dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); + opObserverRegistry->addObserver(stdx::make_unique<OpObserverImpl>()); + + // Use OpObserverMock to track applyOps calls generated by doTxn(). auto opObserver = stdx::make_unique<OpObserverMock>(); _opObserver = opObserver.get(); - service->setOpObserver(std::move(opObserver)); + opObserverRegistry->addObserver(std::move(opObserver)); // This test uses StorageInterface to create collections and inspect documents inside // collections. _storage = stdx::make_unique<StorageInterfaceImpl>(); + + // Set up the transaction and session. + _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + _opCtx->setTxnNumber(0); // TxnNumber can always be 0 because we have a new session. + _ocs.emplace(_opCtx.get(), true /* checkOutSession */, false /* autocommit */); } void DoTxnTest::tearDown() { + _ocs = boost::none; + _opCtx = nullptr; _storage = {}; _opObserver = nullptr; @@ -134,15 +158,6 @@ Status getStatusFromDoTxnResult(const BSONObj& result) { return getStatusFromCommandResult(newResult); } -TEST_F(DoTxnTest, AtomicDoTxnWithNoOpsReturnsSuccess) { - auto opCtx = cc().makeOperationContext(); - BSONObjBuilder resultBuilder; - auto cmdObj = BSON("doTxn" << BSONArray()); - auto expectedCmdObj = BSON("applyOps" << BSONArray()); - ASSERT_OK(doTxn(opCtx.get(), "test", cmdObj, &resultBuilder)); - ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj); -} - BSONObj makeInsertOperation(const NamespaceString& nss, const OptionalCollectionUUID& uuid, const BSONObj& documentToInsert) { @@ -183,37 +198,39 @@ BSONObj makeApplyOpsWithInsertOperation(const NamespaceString& nss, } TEST_F(DoTxnTest, AtomicDoTxnInsertIntoNonexistentCollectionReturnsNamespaceNotFoundInResult) { - auto opCtx = cc().makeOperationContext(); NamespaceString nss("test.t"); auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeDoTxnWithInsertOperation(nss, boost::none, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx.get(), "test", cmdObj, &resultBuilder)); + ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx(), "test", cmdObj, &resultBuilder)); auto result = resultBuilder.obj(); auto status = getStatusFromDoTxnResult(result); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); } TEST_F(DoTxnTest, AtomicDoTxnInsertWithUuidIntoCollectionWithUuid) { - auto opCtx = cc().makeOperationContext(); NamespaceString nss("test.t"); auto uuid = UUID::gen(); CollectionOptions collectionOptions; collectionOptions.uuid = uuid; - ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions)); + ASSERT_OK(_storage->createCollection(opCtx(), nss, collectionOptions)); auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeDoTxnWithInsertOperation(nss, uuid, documentToInsert); auto expectedCmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_OK(doTxn(opCtx.get(), "test", cmdObj, &resultBuilder)); - ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj); + ASSERT_OK(doTxn(opCtx(), "test", cmdObj, &resultBuilder)); + ASSERT_EQ(expectedCmdObj.woCompare(_opObserver->onApplyOpsCmdObj, + BSONObj(), + BSONObj::ComparisonRules::kIgnoreFieldOrder | + BSONObj::ComparisonRules::kConsiderFieldName), + 0) + << "expected: " << expectedCmdObj << " got: " << _opObserver->onApplyOpsCmdObj; } TEST_F(DoTxnTest, AtomicDoTxnInsertWithUuidIntoCollectionWithOtherUuid) { - auto opCtx = cc().makeOperationContext(); NamespaceString nss("test.t"); auto doTxnUuid = UUID::gen(); @@ -222,38 +239,42 @@ TEST_F(DoTxnTest, AtomicDoTxnInsertWithUuidIntoCollectionWithOtherUuid) { CollectionOptions collectionOptions; collectionOptions.uuid = UUID::gen(); ASSERT_NOT_EQUALS(doTxnUuid, collectionOptions.uuid); - ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions)); + ASSERT_OK(_storage->createCollection(opCtx(), nss, collectionOptions)); // The doTxn returns a NamespaceNotFound error because of the failed UUID lookup // even though a collection exists with the same namespace as the insert operation. auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeDoTxnWithInsertOperation(nss, doTxnUuid, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx.get(), "test", cmdObj, &resultBuilder)); + ASSERT_EQUALS(ErrorCodes::UnknownError, doTxn(opCtx(), "test", cmdObj, &resultBuilder)); auto result = resultBuilder.obj(); auto status = getStatusFromDoTxnResult(result); ASSERT_EQUALS(ErrorCodes::NamespaceNotFound, status); } TEST_F(DoTxnTest, AtomicDoTxnInsertWithoutUuidIntoCollectionWithUuid) { - auto opCtx = cc().makeOperationContext(); NamespaceString nss("test.t"); auto uuid = UUID::gen(); CollectionOptions collectionOptions; collectionOptions.uuid = uuid; - ASSERT_OK(_storage->createCollection(opCtx.get(), nss, collectionOptions)); + ASSERT_OK(_storage->createCollection(opCtx(), nss, collectionOptions)); auto documentToInsert = BSON("_id" << 0); auto cmdObj = makeDoTxnWithInsertOperation(nss, boost::none, documentToInsert); BSONObjBuilder resultBuilder; - ASSERT_OK(doTxn(opCtx.get(), "test", cmdObj, &resultBuilder)); + ASSERT_OK(doTxn(opCtx(), "test", cmdObj, &resultBuilder)); // Insert operation provided by caller did not contain collection uuid but doTxn() should add // the uuid to the oplog entry. auto expectedCmdObj = makeApplyOpsWithInsertOperation(nss, uuid, documentToInsert); - ASSERT_BSONOBJ_EQ(expectedCmdObj, _opObserver->onApplyOpsCmdObj); + ASSERT_EQ(expectedCmdObj.woCompare(_opObserver->onApplyOpsCmdObj, + BSONObj(), + BSONObj::ComparisonRules::kIgnoreFieldOrder | + BSONObj::ComparisonRules::kConsiderFieldName), + 0) + << "expected: " << expectedCmdObj << " got: " << _opObserver->onApplyOpsCmdObj; } } // namespace diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index c0f2e6a00c6..cef46afc15c 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -289,5 +289,9 @@ std::ostream& operator<<(std::ostream& s, const OplogEntry& o) { return s << o.toString(); } +std::ostream& operator<<(std::ostream& s, const ReplOperation& o) { + return s << o.toBSON().toString(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 1b3b15d1bae..e6eb921d985 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -155,5 +155,7 @@ inline bool operator==(const OplogEntry& lhs, const OplogEntry& rhs) { return SimpleBSONObjComparator::kInstance.evaluate(lhs.raw == rhs.raw); } +std::ostream& operator<<(std::ostream& s, const ReplOperation& o); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index e0a4ff28390..155779071d0 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -689,7 +689,11 @@ SessionRecordMap getLatestSessionRecords(const MultiApplier::Operations& ops) { for (auto&& op : ops) { const auto& sessionInfo = op.getOperationSessionInfo(); - if (sessionInfo.getTxnNumber()) { + // Do not write session table entries for applyOps, as multi-document transactions + // and retryable writes do not work together. + // TODO(SERVER-33501): Make multi-docunment transactions work with retryable writes. + if (sessionInfo.getTxnNumber() && + (!op.isCommand() || op.getCommandType() != OplogEntry::CommandType::kApplyOps)) { const auto& lsid = *sessionInfo.getSessionId(); SessionTxnRecord record; diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index a23026032a8..bf482e3a8b8 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -106,6 +106,7 @@ const StringMap<int> sessionCheckoutWhitelist = {{"aggregate", 1}, {"count", 1}, {"delete", 1}, {"distinct", 1}, + {"doTxn", 1}, {"eval", 1}, {"$eval", 1}, {"explain", 1}, @@ -500,6 +501,11 @@ void execCommandDatabase(OperationContext* opCtx, boost::optional<bool> autocommitVal = boost::none; if (sessionOptions && sessionOptions->getAutocommit()) { autocommitVal = *sessionOptions->getAutocommit(); + } else if (sessionOptions && command->getName() == "doTxn") { + // Autocommit is overridden specifically for doTxn to get the oplog entry generation + // behavior used for multi-document transactions. + // The doTxn command still logically behaves as a commit. + autocommitVal = false; } OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession, autocommitVal); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 1310b261932..1134299df81 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -281,6 +281,10 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, invariant(opCtx->lockState()->inAWriteUnitOfWork()); stdx::unique_lock<stdx::mutex> ul(_mutex); + // Multi-document transactions currently do not write to the transaction table. + // TODO(SERVER-32323): Update transaction table appropriately when a transaction commits. + if (!_autocommit) + return; // Sanity check that we don't double-execute statements for (const auto stmtId : stmtIdsWritten) { @@ -440,6 +444,9 @@ void Session::_beginOrContinueTxn(WithLock wl, _setActiveTxn(wl, txnNumber); _autocommit = (autocommit != boost::none) ? *autocommit : true; // autocommit defaults to true _isSnapshotTxn = false; + _txnState = _autocommit ? MultiDocumentTransactionState::kNone + : MultiDocumentTransactionState::kInProgress; + invariant(_transactionOperations.empty()); } void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const { @@ -450,6 +457,16 @@ void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const { << _activeTxnNumber << " has already started.", txnNumber >= _activeTxnNumber); + // TODO(SERVER-33432): Auto-abort an old transaction when a new one starts instead of asserting. + uassert(40691, + str::stream() << "Cannot start transaction " << txnNumber << " on session " + << getSessionId() + << " because a multi-document transaction " + << _activeTxnNumber + << " is in progress.", + txnNumber == _activeTxnNumber || + (_transactionOperations.empty() && + _txnState != MultiDocumentTransactionState::kCommitting)); } void Session::stashTransactionResources(OperationContext* opCtx) { @@ -547,6 +564,43 @@ void Session::_setActiveTxn(WithLock, TxnNumber txnNumber) { _hasIncompleteHistory = false; } +void Session::addTransactionOperation(OperationContext* opCtx, + const repl::ReplOperation& operation) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_txnState == MultiDocumentTransactionState::kInProgress); + invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber); + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + if (_transactionOperations.empty()) { + auto txnNumberCompleting = _activeTxnNumber; + opCtx->recoveryUnit()->onRollback([this, txnNumberCompleting] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_activeTxnNumber == txnNumberCompleting); + invariant(_txnState != MultiDocumentTransactionState::kCommitted); + _transactionOperations.clear(); + _txnState = MultiDocumentTransactionState::kAborted; + }); + opCtx->recoveryUnit()->onCommit([this, txnNumberCompleting] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_activeTxnNumber == txnNumberCompleting); + invariant(_txnState == MultiDocumentTransactionState::kCommitting || + _txnState == MultiDocumentTransactionState::kCommitted); + _txnState = MultiDocumentTransactionState::kCommitted; + }); + } + _transactionOperations.push_back(operation); +} + +std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(!_autocommit); + invariant(_txnState == MultiDocumentTransactionState::kInProgress); + // If _transactionOperations is empty, we will not see a commit because the write unit + // of work is empty. + _txnState = _transactionOperations.empty() ? MultiDocumentTransactionState::kCommitted + : MultiDocumentTransactionState::kCommitting; + return std::move(_transactionOperations); +} + void Session::_checkValid(WithLock) const { uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Session " << getSessionId() diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index cdff0db7b07..5cf9bec1467 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -104,7 +104,6 @@ public: */ void beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber); - /** * Called after a write under the specified transaction completes while the node is a primary * and specifies the statement ids which were written. Must be called while the caller is still @@ -209,6 +208,33 @@ public: return _autocommit; } + /** + * Returns whether we are in a multi-document transaction, which means we have an active + * transaction which has autoCommit:false and has not been committed or aborted. + */ + bool inMultiDocumentTransaction() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _txnState == MultiDocumentTransactionState::kInProgress; + }; + + /** + * Adds a stored operation to the list of stored operations for the current multi-document + * (non-autocommit) transaction. It is illegal to add operations when no multi-document + * transaction is in progress. + */ + void addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation); + + /** + * Returns and clears the stored operations for an multi-document (non-autocommit) transaction, + * and marks the transaction as closed. It is illegal to attempt to add operations to the + * transaction after this is called. + */ + std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(); + + const std::vector<repl::ReplOperation>& transactionOperationsForTest() { + return _transactionOperations; + } + private: void _beginOrContinueTxn(WithLock, TxnNumber txnNumber, boost::optional<bool> autocommit); @@ -277,6 +303,21 @@ private: // this allows transaction state to be maintained across network operations. std::unique_ptr<RecoveryUnit> _stashedRecoveryUnit; + // Indicates the state of the current multi-document transaction, if any. + // If the transaction is in any state but kInProgress, no more operations can + // be collected. + enum class MultiDocumentTransactionState { + kNone, + kInProgress, + kCommitting, + kCommitted, + kAborted + } _txnState; + + // Holds oplog data for operations which have been applied in the current multi-document + // transaction. Not used for retryable writes. + std::vector<repl::ReplOperation> _transactionOperations; + // For the active txn, tracks which statement ids have been committed and at which oplog // opTime. Used for fast retryability check and retrieving the previous write's data without // having to scan through the oplog. diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index fcdff8507f3..a39233ef28a 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -608,5 +608,39 @@ TEST_F(SessionTest, CheckAutocommitOnlyAllowedAtBeginningOfTxn) { ErrorCodes::IllegalOperation); } +TEST_F(SessionTest, SameTransactionPreservesStoredStatements) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 22; + session.beginOrContinueTxn(opCtx(), txnNum, false); + WriteUnitOfWork wuow(opCtx()); + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + session.addTransactionOperation(opCtx(), operation); + ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); + + // Re-opening the same transaction should have no effect. + session.beginOrContinueTxn(opCtx(), txnNum, boost::none); + ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); +} + +TEST_F(SessionTest, RollbackClearsStoredStatements) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 24; + session.beginOrContinueTxn(opCtx(), txnNum, false); + { + WriteUnitOfWork wuow(opCtx()); + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + session.addTransactionOperation(opCtx(), operation); + ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); + // Since the WriteUnitOfWork was not committed, it will implicitly roll back. + } + ASSERT_TRUE(session.transactionOperationsForTest().empty()); +} + } // anonymous } // namespace mongo |