summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-09-28 16:33:27 -0400
committerRandolph Tan <randolph@10gen.com>2017-09-28 16:51:00 -0400
commitf88f6f43b7ae2af0286437da8f00c0079ed99145 (patch)
treeec22edf29d746877baa697f80bc10d0b40ebb616
parent0309fa8091bdf7d6663a02fefd5d61ae0965e7b1 (diff)
downloadmongo-f88f6f43b7ae2af0286437da8f00c0079ed99145.tar.gz
SERVER-30894/SERVER-31290 Implement command for transferring session information during migration
This reverts commit f24fbb0011c6ded9101f08574e7cd07e63690a9b. This reverts commit d293f6857bcb36b26ca8fa03d90299714fe060de.
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml5
-rw-r--r--jstests/auth/lib/commands_lib.js13
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js119
-rw-r--r--jstests/sharding/move_chunk_insert_with_write_retryability.js34
-rw-r--r--jstests/sharding/move_chunk_remove_with_write_retryability.js41
-rw-r--r--jstests/sharding/move_chunk_update_with_write_retryability.js44
-rw-r--r--jstests/sharding/move_chunk_with_session_helper.js49
-rw-r--r--jstests/sharding/write_transactions_during_migration.js84
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/op_observer_impl.cpp95
-rw-r--r--src/mongo/db/ops/write_ops_retryability.cpp10
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp17
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/repl/oplog.h3
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp19
-rw-r--r--src/mongo/db/s/collection_sharding_state.h12
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp10
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source.h15
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp90
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h19
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp52
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp16
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp23
-rw-r--r--src/mongo/db/s/migration_destination_manager.h3
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp120
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.h13
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp79
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp9
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h1
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp2
31 files changed, 862 insertions, 141 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 26eae7c72bc..58274c26f85 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -49,6 +49,11 @@ selector:
- jstests/sharding/uuid_propagated_to_recipient_shard_on_recvChunkStart.js
- jstests/sharding/uuid_propagated_to_shards_on_setFCV_3_6.js
- jstests/sharding/close_cursor_on_chunk_migration_to_new_shards.js
+ - jstests/sharding/move_chunk_insert_with_write_retryability.js
+ - jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js
+ - jstests/sharding/move_chunk_remove_with_write_retryability.js
+ - jstests/sharding/move_chunk_update_with_write_retryability.js
+ - jstests/sharding/write_transactions_during_migration.js
# New feature in v3.6 mongo shell.
- jstests/sharding/causal_consistency_shell_support.js
- jstests/sharding/keys_rotation_interval_sec.js
diff --git a/jstests/auth/lib/commands_lib.js b/jstests/auth/lib/commands_lib.js
index 6717e17e64d..7cde9677e96 100644
--- a/jstests/auth/lib/commands_lib.js
+++ b/jstests/auth/lib/commands_lib.js
@@ -4454,6 +4454,19 @@ var authCommandsLib = {
command: {refreshSessionsInternal: []},
testcases: [{runOnDb: adminDbName, roles: {__system: 1}}],
},
+ {
+ testname: "_getNextSessionMods",
+ command: {_getNextSessionMods: "a-b"},
+ skipSharded: true,
+ testcases: [
+ {
+ runOnDb: adminDbName,
+ roles: {__system: 1},
+ privileges: [{resource: {cluster: true}, actions: ["internal"]}],
+ expectFail: true
+ },
+ ]
+ },
],
/************* SHARED TEST LOGIC ****************/
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index b9c2474bf69..02d68beabda 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -76,6 +76,7 @@
_configsvrShardCollection: {skip: isAnInternalCommand},
_configsvrSetFeatureCompatibilityVersion: {skip: isAnInternalCommand},
_configsvrUpdateZoneKeyRange: {skip: isAnInternalCommand},
+ _getNextSessionMods: {skip: isAnInternalCommand},
_getUserCacheGeneration: {skip: isAnInternalCommand},
_hashBSONElement: {skip: isAnInternalCommand},
_isSelf: {skip: isAnInternalCommand},
diff --git a/jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js b/jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js
new file mode 100644
index 00000000000..c06a4cfdc9c
--- /dev/null
+++ b/jstests/sharding/move_chunk_find_and_modify_with_write_retryability.js
@@ -0,0 +1,119 @@
+load("jstests/sharding/move_chunk_with_session_helper.js");
+
+(function() {
+
+ "use strict";
+
+ var checkFindAndModifyResult = function(expected, toCheck) {
+ assert.eq(expected.ok, toCheck.ok);
+ assert.eq(expected.value, toCheck.value);
+
+ // TODO: SERVER-30532: after adding upserted, just compare the entire lastErrorObject
+ var expectedLE = expected.lastErrorObject;
+ var toCheckLE = toCheck.lastErrorObject;
+
+ assert.neq(null, toCheckLE);
+ assert.eq(expected.updatedExisting, toCheck.updatedExisting);
+ assert.eq(expected.n, toCheck.n);
+ };
+
+ var lsid = UUID();
+ var tests = [
+ {
+ coll: 'findAndMod-upsert',
+ cmd: {
+ findAndModify: 'findAndMod-upsert',
+ query: {x: 60},
+ update: {$inc: {y: 1}},
+ new: true,
+ upsert: true,
+ lsid: {id: lsid},
+ txnNumber: NumberLong(37),
+ },
+ setup: function(coll) {},
+ checkRetryResult: function(result, retryResult) {
+ checkFindAndModifyResult(result, retryResult);
+ },
+ checkDocuments: function(coll) {
+ assert.eq(1, coll.findOne({x: 60}).y);
+ },
+ },
+ {
+ coll: 'findAndMod-update-preImage',
+ cmd: {
+ findAndModify: 'findAndMod-update-preImage',
+ query: {x: 60},
+ update: {$inc: {y: 1}},
+ new: false,
+ upsert: false,
+ lsid: {id: lsid},
+ txnNumber: NumberLong(38),
+ },
+ setup: function(coll) {
+ coll.insert({x: 60});
+ },
+ checkRetryResult: function(result, retryResult) {
+ checkFindAndModifyResult(result, retryResult);
+ },
+ checkDocuments: function(coll) {
+ assert.eq(1, coll.findOne({x: 60}).y);
+ },
+ },
+ {
+ coll: 'findAndMod-update-postImage',
+ cmd: {
+ findAndModify: 'findAndMod-update-postImage',
+ query: {x: 60},
+ update: {$inc: {y: 1}},
+ new: true,
+ upsert: false,
+ lsid: {id: lsid},
+ txnNumber: NumberLong(39),
+ },
+ setup: function(coll) {
+ coll.insert({x: 60});
+ },
+ checkRetryResult: function(result, retryResult) {
+ checkFindAndModifyResult(result, retryResult);
+ },
+ checkDocuments: function(coll) {
+ assert.eq(1, coll.findOne({x: 60}).y);
+ },
+ },
+ {
+ coll: 'findAndMod-delete',
+ cmd: {
+ findAndModify: 'findAndMod-delete',
+ query: {x: 10},
+ remove: true,
+ lsid: {id: lsid},
+ txnNumber: NumberLong(40),
+ },
+ setup: function(coll) {
+ var bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < 10; i++) {
+ bulk.insert({x: 10});
+ }
+ assert.writeOK(bulk.execute());
+
+ },
+ checkRetryResult: function(result, retryResult) {
+ checkFindAndModifyResult(result, retryResult);
+ },
+ checkDocuments: function(coll) {
+ assert.eq(9, coll.find({x: 10}).itcount());
+ },
+ },
+ ];
+
+ var st = new ShardingTest({shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}});
+ assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
+ st.ensurePrimaryShard('test', st.shard0.shardName);
+
+ tests.forEach(function(test) {
+ testMoveChunkWithSession(
+ st, test.coll, test.cmd, test.setup, test.checkRetryResult, test.checkDocuments);
+ });
+
+ st.stop();
+})();
diff --git a/jstests/sharding/move_chunk_insert_with_write_retryability.js b/jstests/sharding/move_chunk_insert_with_write_retryability.js
new file mode 100644
index 00000000000..bdbdef47000
--- /dev/null
+++ b/jstests/sharding/move_chunk_insert_with_write_retryability.js
@@ -0,0 +1,34 @@
+load("jstests/sharding/move_chunk_with_session_helper.js");
+
+(function() {
+
+ "use strict";
+
+ var st = new ShardingTest({shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}});
+ assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
+ st.ensurePrimaryShard('test', st.shard0.shardName);
+
+ var coll = 'insert';
+ var cmd = {
+ insert: coll,
+ documents: [{x: 10}, {x: 30}],
+ ordered: false,
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(34),
+ };
+ var setup = function() {};
+ var checkRetryResult = function(result, retryResult) {
+ assert.eq(result.ok, retryResult.ok);
+ assert.eq(result.n, retryResult.n);
+ assert.eq(result.writeErrors, retryResult.writeErrors);
+ assert.eq(result.writeConcernErrors, retryResult.writeConcernErrors);
+ };
+ var checkDocuments = function(coll) {
+ assert.eq(1, coll.find({x: 10}).itcount());
+ assert.eq(1, coll.find({x: 30}).itcount());
+ };
+
+ testMoveChunkWithSession(st, coll, cmd, setup, checkRetryResult, checkDocuments);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/move_chunk_remove_with_write_retryability.js b/jstests/sharding/move_chunk_remove_with_write_retryability.js
new file mode 100644
index 00000000000..64d1f5d2dee
--- /dev/null
+++ b/jstests/sharding/move_chunk_remove_with_write_retryability.js
@@ -0,0 +1,41 @@
+load("jstests/sharding/move_chunk_with_session_helper.js");
+
+(function() {
+
+ "use strict";
+
+ var st = new ShardingTest({shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}});
+ assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
+ st.ensurePrimaryShard('test', st.shard0.shardName);
+
+ var coll = 'delete';
+ var cmd = {
+ delete: coll,
+ deletes: [{q: {x: 10}, limit: 1}, {q: {x: 20}, limit: 1}],
+ ordered: false,
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(36),
+ };
+ var setup = function(coll) {
+ var bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < 10; i++) {
+ bulk.insert({x: 10});
+ bulk.insert({x: 20});
+ }
+ assert.writeOK(bulk.execute());
+ };
+ var checkRetryResult = function(result, retryResult) {
+ assert.eq(result.ok, retryResult.ok);
+ assert.eq(result.n, retryResult.n);
+ assert.eq(result.writeErrors, retryResult.writeErrors);
+ assert.eq(result.writeConcernErrors, retryResult.writeConcernErrors);
+ };
+ var checkDocuments = function(coll) {
+ assert.eq(9, coll.find({x: 10}).itcount());
+ assert.eq(9, coll.find({x: 20}).itcount());
+ };
+
+ testMoveChunkWithSession(st, coll, cmd, setup, checkRetryResult, checkDocuments);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/move_chunk_update_with_write_retryability.js b/jstests/sharding/move_chunk_update_with_write_retryability.js
new file mode 100644
index 00000000000..1f23db58782
--- /dev/null
+++ b/jstests/sharding/move_chunk_update_with_write_retryability.js
@@ -0,0 +1,44 @@
+load("jstests/sharding/move_chunk_with_session_helper.js");
+
+(function() {
+
+ "use strict";
+
+ var st = new ShardingTest({shards: {rs0: {nodes: 2}, rs1: {nodes: 2}}});
+ assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
+ st.ensurePrimaryShard('test', st.shard0.shardName);
+
+ var coll = 'update';
+ var cmd = {
+ update: 'update',
+ updates: [
+ {q: {x: 10}, u: {$inc: {a: 1}}}, // in place
+ {q: {x: 20}, u: {$inc: {b: 1}}, upsert: true},
+ {q: {x: 30}, u: {x: 30, z: 1}} // replacement
+ ],
+ ordered: false,
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(35),
+ };
+ var setup = function(coll) {
+ coll.insert({x: 10});
+ coll.insert({x: 30});
+ };
+ var checkRetryResult = function(result, retryResult) {
+ assert.eq(result.ok, retryResult.ok);
+ assert.eq(result.n, retryResult.n);
+ assert.eq(result.nModified, retryResult.nModified);
+ assert.eq(result.upserted, retryResult.upserted);
+ assert.eq(result.writeErrors, retryResult.writeErrors);
+ assert.eq(result.writeConcernErrors, retryResult.writeConcernErrors);
+ };
+ var checkDocuments = function(coll) {
+ assert.eq(1, coll.findOne({x: 10}).a);
+ assert.eq(1, coll.findOne({x: 20}).b);
+ assert.eq(1, coll.findOne({x: 30}).z);
+ };
+
+ testMoveChunkWithSession(st, coll, cmd, setup, checkRetryResult, checkDocuments);
+
+ st.stop();
+})();
diff --git a/jstests/sharding/move_chunk_with_session_helper.js b/jstests/sharding/move_chunk_with_session_helper.js
new file mode 100644
index 00000000000..a320af844ca
--- /dev/null
+++ b/jstests/sharding/move_chunk_with_session_helper.js
@@ -0,0 +1,49 @@
+load("jstests/replsets/rslib.js");
+
+/**
+ * High level test scenario:
+ * 1. Shard collection.
+ * 2. Perform writes.
+ * 3. Migrate only chunk to other shard.
+ * 4. Retry writes.
+ * 5. Step down primary and wait for new primary.
+ * 6. Retry writes.
+ * 7. Migrate only chunk back to original shard.
+ * 8. Retry writes.
+ */
+var testMoveChunkWithSession = function(
+ st, collName, cmdObj, setupFunc, checkRetryResultFunc, checkDocumentsFunc) {
+ var ns = 'test.' + collName;
+ var testDB = st.s.getDB('test');
+ var coll = testDB.getCollection(collName);
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
+
+ setupFunc(coll);
+ var result = assert.commandWorked(testDB.runCommand(cmdObj));
+
+ assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {x: 0}, to: st.shard1.shardName}));
+
+ checkRetryResultFunc(result, assert.commandWorked(testDB.runCommand(cmdObj)));
+ checkDocumentsFunc(coll);
+
+ try {
+ st.rs1.getPrimary().adminCommand({replSetStepDown: 60, secondaryCatchUpPeriodSecs: 30});
+ } catch (excep) {
+ print('Expected exception due to step down: ' + tojson(excep));
+ }
+
+ st.rs1.awaitNodesAgreeOnPrimary();
+ awaitRSClientHosts(st.s, {host: st.rs1.getPrimary().host}, {ok: true, ismaster: true});
+
+ checkRetryResultFunc(result, assert.commandWorked(testDB.runCommand(cmdObj)));
+ checkDocumentsFunc(coll);
+
+ // Make sure that the other shard knows about the latest primary.
+ awaitRSClientHosts(
+ st.rs0.getPrimary(), {host: st.rs1.getPrimary().host}, {ok: true, ismaster: true});
+ assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {x: 0}, to: st.shard0.shardName}));
+
+ checkRetryResultFunc(result, assert.commandWorked(testDB.runCommand(cmdObj)));
+ checkDocumentsFunc(coll);
+};
diff --git a/jstests/sharding/write_transactions_during_migration.js b/jstests/sharding/write_transactions_during_migration.js
new file mode 100644
index 00000000000..7c52628e05e
--- /dev/null
+++ b/jstests/sharding/write_transactions_during_migration.js
@@ -0,0 +1,84 @@
+/**
+ * Tests that session information are properly transferred to the destination shard while
+ * new writes are being sent to the source shard.
+ */
+
+load('./jstests/libs/chunk_manipulation_util.js');
+
+/**
+ * Test outline:
+ * 1. Pause migration.
+ * 2. Perform writes and allow it to be capture via OpObserver
+ * 3. Unpause migration.
+ * 4. Retry writes and confirm that writes are not duplicated.
+ */
+(function() {
+
+ "use strict";
+
+ var staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
+
+ var st = new ShardingTest({shards: {rs0: {nodes: 1}, rs1: {nodes: 1}}});
+ st.adminCommand({enableSharding: 'test'});
+ st.ensurePrimaryShard('test', st.shard0.shardName);
+ st.adminCommand({shardCollection: 'test.user', key: {x: 1}});
+
+ pauseMoveChunkAtStep(st.shard0, moveChunkStepNames.reachedSteadyState);
+ var joinMoveChunk =
+ moveChunkParallel(staticMongod, st.s.host, {x: 0}, null, 'test.user', st.shard1.shardName);
+
+ var insertCmd = {
+ insert: 'user',
+ documents: [{x: 10}, {x: 30}],
+ ordered: false,
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(34),
+ };
+
+ var testDB = st.getDB('test');
+ var insertResult = assert.commandWorked(testDB.runCommand(insertCmd));
+
+ var findAndModCmd = {
+ findAndModify: 'user',
+ query: {x: 30},
+ update: {$inc: {y: 1}},
+ new: true,
+ upsert: true,
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(37),
+ };
+
+ var findAndModifyResult = assert.commandWorked(testDB.runCommand(findAndModCmd));
+
+ unpauseMoveChunkAtStep(st.shard0, moveChunkStepNames.reachedSteadyState);
+ joinMoveChunk();
+
+ var insertRetryResult = assert.commandWorked(testDB.runCommand(insertCmd));
+
+ assert.eq(insertResult.ok, insertRetryResult.ok);
+ assert.eq(insertResult.n, insertRetryResult.n);
+ assert.eq(insertResult.writeErrors, insertRetryResult.writeErrors);
+ assert.eq(insertResult.writeConcernErrors, insertRetryResult.writeConcernErrors);
+
+ assert.eq(1, testDB.user.find({x: 10}).itcount());
+ assert.eq(1, testDB.user.find({x: 30}).itcount());
+
+ var findAndModifyRetryResult = assert.commandWorked(testDB.runCommand(findAndModCmd));
+
+ assert.eq(findAndModifyResult.ok, findAndModifyRetryResult.ok);
+ assert.eq(findAndModifyResult.value, findAndModifyRetryResult.value);
+
+ // TODO: SERVER-30532: after adding upserted, just compare the entire lastErrorObject
+ var expectedLE = findAndModifyResult.lastErrorObject;
+ var toCheckLE = findAndModifyRetryResult.lastErrorObject;
+
+ assert.neq(null, toCheckLE);
+ assert.eq(findAndModifyResult.updatedExisting, findAndModifyRetryResult.updatedExisting);
+ assert.eq(findAndModifyResult.n, findAndModifyRetryResult.n);
+
+ assert.eq(1, testDB.user.findOne({x: 30}).y);
+
+ st.stop();
+
+ MongoRunner.stopMongod(staticMongod);
+})();
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 80a17b9ccf1..24825fb09ba 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -225,6 +225,7 @@ error_code("JSONSchemaNotAllowed", 224)
error_code("TransactionTooOld", 225)
error_code("AtomicityFailure", 226)
error_code("CannotImplicitlyCreateCollection", 227);
+error_code("SessionTransferIncomplete", 228)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 9a6bd8e5be7..b466b70c3bb 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -115,10 +115,15 @@ BSONObj makeCollModCmdObj(const BSONObj& collModCmd,
return cmdObjBuilder.obj();
}
+struct OpTimeBundle {
+ repl::OpTime writeOpTime;
+ repl::OpTime prePostImageOpTime;
+};
+
/**
* Write oplog entry(ies) for the update operation.
*/
-repl::OpTime replLogUpdate(OperationContext* opCtx,
+OpTimeBundle replLogUpdate(OperationContext* opCtx,
Session* session,
const OplogUpdateEntryArgs& args) {
BSONObj storeObj;
@@ -138,6 +143,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
+ OpTimeBundle opTimes;
+
if (!storeObj.isEmpty() && opCtx->getTxnNumber()) {
auto noteUpdateOpTime = repl::logOp(opCtx,
"n",
@@ -150,6 +157,8 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
args.stmtId,
{});
+ opTimes.prePostImageOpTime = noteUpdateOpTime;
+
if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PreImage) {
oplogLink.preImageTs = noteUpdateOpTime.getTimestamp();
} else if (args.storeDocOption == OplogUpdateEntryArgs::StoreDocOption::PostImage) {
@@ -157,22 +166,24 @@ repl::OpTime replLogUpdate(OperationContext* opCtx,
}
}
- return repl::logOp(opCtx,
- "u",
- args.nss,
- args.uuid,
- args.update,
- &args.criteria,
- args.fromMigrate,
- sessionInfo,
- args.stmtId,
- oplogLink);
+ opTimes.writeOpTime = repl::logOp(opCtx,
+ "u",
+ args.nss,
+ args.uuid,
+ args.update,
+ &args.criteria,
+ args.fromMigrate,
+ sessionInfo,
+ args.stmtId,
+ oplogLink);
+
+ return opTimes;
}
/**
* Write oplog entry(ies) for the delete operation.
*/
-repl::OpTime replLogDelete(OperationContext* opCtx,
+OpTimeBundle replLogDelete(OperationContext* opCtx,
const NamespaceString& nss,
OptionalCollectionUUID uuid,
Session* session,
@@ -189,22 +200,26 @@ repl::OpTime replLogDelete(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
+ OpTimeBundle opTimes;
+
if (deletedDoc && opCtx->getTxnNumber()) {
auto noteOplog = repl::logOp(
opCtx, "n", nss, uuid, deletedDoc.get(), nullptr, false, sessionInfo, stmtId, {});
+ opTimes.prePostImageOpTime = noteOplog;
oplogLink.preImageTs = noteOplog.getTimestamp();
}
- return repl::logOp(opCtx,
- "d",
- nss,
- uuid,
- deleteState.documentKey,
- nullptr,
- fromMigrate,
- sessionInfo,
- stmtId,
- oplogLink);
+ opTimes.writeOpTime = repl::logOp(opCtx,
+ "d",
+ nss,
+ uuid,
+ deleteState.documentKey,
+ nullptr,
+ fromMigrate,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+ return opTimes;
}
} // namespace
@@ -253,7 +268,7 @@ void OpObserverImpl::onCreateIndex(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, systemIndexes);
if (!fromMigrate) {
- css->onInsertOp(opCtx, indexDoc);
+ css->onInsertOp(opCtx, indexDoc, {});
}
}
@@ -264,15 +279,20 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator end,
bool fromMigrate) {
Session* const session = opCtx->getTxnNumber() ? OperationContextSession::get(opCtx) : nullptr;
- const auto lastOpTime = repl::logInsertOps(opCtx, nss, uuid, session, begin, end, fromMigrate);
+
+ const size_t count = end - begin;
+ auto timestamps = stdx::make_unique<Timestamp[]>(count);
+ const auto lastOpTime =
+ repl::logInsertOps(opCtx, nss, uuid, session, begin, end, timestamps.get(), fromMigrate);
auto css = CollectionShardingState::get(opCtx, nss.ns());
- for (auto it = begin; it != end; it++) {
+ size_t index = 0;
+ for (auto it = begin; it != end; it++, index++) {
AuthorizationManager::get(opCtx->getServiceContext())
->logOp(opCtx, "i", nss, it->doc, nullptr);
if (!fromMigrate) {
- css->onInsertOp(opCtx, it->doc);
+ css->onInsertOp(opCtx, it->doc, timestamps[index]);
}
}
@@ -312,7 +332,12 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
auto css = CollectionShardingState::get(opCtx, args.nss);
if (!args.fromMigrate) {
- css->onUpdateOp(opCtx, args.criteria, args.update, args.updatedDoc);
+ css->onUpdateOp(opCtx,
+ args.criteria,
+ args.update,
+ args.updatedDoc,
+ opTime.writeOpTime.getTimestamp(),
+ opTime.prePostImageOpTime.getTimestamp());
}
if (args.nss.coll() == "system.js") {
@@ -322,11 +347,13 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
} else if (args.nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updatedDoc);
} else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
- !opTime.isNull()) {
+ !opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updatedDoc);
}
- onWriteOpCompleted(opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime);
+
+ onWriteOpCompleted(
+ opCtx, args.nss, session, std::vector<StmtId>{args.stmtId}, opTime.writeOpTime);
}
auto OpObserverImpl::aboutToDelete(OperationContext* opCtx,
@@ -356,7 +383,10 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
auto css = CollectionShardingState::get(opCtx, nss.ns());
if (!fromMigrate) {
- css->onDeleteOp(opCtx, deleteState);
+ css->onDeleteOp(opCtx,
+ deleteState,
+ opTime.writeOpTime.getTimestamp(),
+ opTime.prePostImageOpTime.getTimestamp());
}
if (nss.coll() == "system.js") {
@@ -365,11 +395,12 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
DurableViewCatalog::onExternalChange(opCtx, nss);
} else if (nss.ns() == FeatureCompatibilityVersion::kCollection) {
FeatureCompatibilityVersion::onDelete(opCtx, deleteState.documentKey);
- } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.isNull()) {
+ } else if (nss == NamespaceString::kSessionTransactionsTableNamespace &&
+ !opTime.writeOpTime.isNull()) {
SessionCatalog::get(opCtx)->invalidateSessions(opCtx, deleteState.documentKey);
}
- onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime);
+ onWriteOpCompleted(opCtx, nss, session, std::vector<StmtId>{stmtId}, opTime.writeOpTime);
}
void OpObserverImpl::onInternalOpMessage(OperationContext* opCtx,
diff --git a/src/mongo/db/ops/write_ops_retryability.cpp b/src/mongo/db/ops/write_ops_retryability.cpp
index 13d3a1cae56..3033daa68f9 100644
--- a/src/mongo/db/ops/write_ops_retryability.cpp
+++ b/src/mongo/db/ops/write_ops_retryability.cpp
@@ -90,16 +90,6 @@ void validateFindAndModifyRetryability(const FindAndModifyRequest& request,
<< ", oplog: "
<< redact(oplogEntry.toBSON()),
opType == repl::OpTypeEnum::kUpdate);
- uassert(
- 40610,
- str::stream() << "findAndModify retry request: " << redact(request.toBSON())
- << " is not compatible with previous write in the transaction of type: "
- << OpType_serializer(oplogEntry.getOpType())
- << ", oplogTs: "
- << ts.toString()
- << ", oplog: "
- << redact(oplogEntry.toBSON()),
- !request.isUpsert());
if (request.shouldReturnNew()) {
uassert(40611,
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index f3415ae2427..a50c51eba58 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -273,23 +273,6 @@ TEST_F(FindAndModifyRetryability, NestedUpsert) {
ASSERT_BSONOBJ_EQ(BSON("x" << 1), result.getValue());
}
-TEST_F(FindAndModifyRetryability, ErrorIfRequestIsUpsertButOplogIsUpdate) {
- auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
- request.setUpsert(true);
-
- Timestamp imageTs(120, 3);
- repl::OplogEntry noteOplog(
- repl::OpTime(imageTs, 1), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 1 << "z" << 1));
-
- insertOplogEntry(noteOplog);
-
- repl::OplogEntry oplog(
- repl::OpTime(), 0, repl::OpTypeEnum::kUpdate, kNs, BSON("x" << 1), BSON("y" << 1));
- oplog.setPreImageTs(imageTs);
-
- ASSERT_THROWS(parseOplogEntryForFindAndModify(opCtx(), request, oplog), AssertionException);
-}
-
TEST_F(FindAndModifyRetryability, AttemptingToRetryUpsertWithUpdateWithoutUpsertErrors) {
auto request = FindAndModifyRequest::makeUpdate(kNs, BSONObj(), BSONObj());
request.setUpsert(false);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 70ed753ed14..b80dd5e0301 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -452,6 +452,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
Session* session,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
+ Timestamp timestamps[],
bool fromMigrate) {
invariant(begin != end);
@@ -480,7 +481,6 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
oplogLink.prevTs = session->getLastWriteOpTimeTs(*opCtx->getTxnNumber());
}
- auto timestamps = stdx::make_unique<Timestamp[]>(count);
OpTime lastOpTime;
for (size_t i = 0; i < count; i++) {
// Make a mutable copy.
@@ -512,7 +512,7 @@ repl::OpTime logInsertOps(OperationContext* opCtx,
basePtrs[i] = &writers[i];
}
invariant(!lastOpTime.isNull());
- _logOpsInner(opCtx, nss, basePtrs.get(), timestamps.get(), count, oplog, lastOpTime);
+ _logOpsInner(opCtx, nss, basePtrs.get(), timestamps, count, oplog, lastOpTime);
wuow.commit();
return lastOpTime;
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index a5eff8ac1aa..34a96b23b05 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -103,6 +103,8 @@ extern int OPLOG_VERSION;
/**
* Log insert(s) to the local oplog.
* Returns the OpTime of the last insert.
+ * The timestamps parameter can also be modified and contain the individual timestamps for each
+ * insert after the oplog entries were created.
*/
OpTime logInsertOps(OperationContext* opCtx,
const NamespaceString& nss,
@@ -110,6 +112,7 @@ OpTime logInsertOps(OperationContext* opCtx,
Session* session,
std::vector<InsertStatement>::const_iterator begin,
std::vector<InsertStatement>::const_iterator end,
+ Timestamp timestamps[],
bool fromMigrate);
/**
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 9e6477c1d4b..575c442ac74 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -267,7 +267,9 @@ boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj co
return _metadataManager->getNextOrphanRange(from);
}
-void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) {
+void CollectionShardingState::onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -291,14 +293,16 @@ void CollectionShardingState::onInsertOp(OperationContext* opCtx, const BSONObj&
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc);
+ _sourceMgr->getCloner()->onInsertOp(opCtx, insertedDoc, oplogTs);
}
}
void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc) {
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -315,7 +319,7 @@ void CollectionShardingState::onUpdateOp(OperationContext* opCtx,
checkShardVersionOrThrow(opCtx);
if (_sourceMgr) {
- _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc);
+ _sourceMgr->getCloner()->onUpdateOp(opCtx, updatedDoc, oplogTs, prePostImageTs);
}
}
@@ -324,7 +328,10 @@ auto CollectionShardingState::makeDeleteState(BSONObj const& doc) -> DeleteState
_sourceMgr && _sourceMgr->getCloner()->isDocumentInMigratingChunk(doc)};
}
-void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState) {
+void CollectionShardingState::onDeleteOp(OperationContext* opCtx,
+ const DeleteState& deleteState,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
@@ -365,7 +372,7 @@ void CollectionShardingState::onDeleteOp(OperationContext* opCtx, const DeleteSt
checkShardVersionOrThrow(opCtx);
if (_sourceMgr && deleteState.isMigrating) {
- _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey);
+ _sourceMgr->getCloner()->onDeleteOp(opCtx, deleteState.documentKey, oplogTs, preImageTs);
}
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 5aa44336f70..6f9722dceb9 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -49,6 +49,7 @@ struct ChunkVersion;
class CollectionMetadata;
class MigrationSourceManager;
class OperationContext;
+class Timestamp;
/**
* Contains all sharding-related runtime state for a given collection. One such object is assigned
@@ -223,12 +224,17 @@ public:
*
* The global exclusive lock is expected to be held by the caller of any of these functions.
*/
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc);
+ void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const Timestamp& oplogTs);
void onUpdateOp(OperationContext* opCtx,
const BSONObj& query,
const BSONObj& update,
- const BSONObj& updatedDoc);
- void onDeleteOp(OperationContext* opCtx, const DeleteState& deleteState);
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs);
+ void onDeleteOp(OperationContext* opCtx,
+ const DeleteState& deleteState,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs);
void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);
private:
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index 844727e72c4..9ac9f4a9702 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -78,7 +78,7 @@ TEST_F(CollShardingStateTest, GlobalInitGetsCalledAfterWriteCommits) {
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
@@ -103,7 +103,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetCalledIfWriteAborts) {
{
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
}
@@ -125,7 +125,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfNSIsNotForShardIdentit
shardIdentity.setClusterId(OID::gen());
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON());
+ collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {});
ASSERT_EQ(0, getInitCallCount());
@@ -144,7 +144,7 @@ TEST_F(CollShardingStateTest, OnInsertOpThrowWithIncompleteShardIdentityDocument
ShardIdentityType shardIdentity;
shardIdentity.setShardName("a");
- ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON()),
+ ASSERT_THROWS(collShardingState.onInsertOp(operationContext(), shardIdentity.toBSON(), {}),
AssertionException);
}
@@ -156,7 +156,7 @@ TEST_F(CollShardingStateTest, GlobalInitDoesntGetsCalledIfShardIdentityDocWasNot
NamespaceString::kServerConfigurationNamespace);
WriteUnitOfWork wuow(operationContext());
- collShardingState.onInsertOp(operationContext(), BSON("_id" << 1));
+ collShardingState.onInsertOp(operationContext(), BSON("_id" << 1), {});
ASSERT_EQ(0, getInitCallCount());
diff --git a/src/mongo/db/s/migration_chunk_cloner_source.h b/src/mongo/db/s/migration_chunk_cloner_source.h
index 750bcdcda80..9d9a1407f1d 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source.h
@@ -36,6 +36,7 @@ namespace mongo {
class BSONObj;
class OperationContext;
class Status;
+class Timestamp;
/**
* This class is responsible for producing chunk documents to be moved from donor to a recipient
@@ -118,7 +119,9 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) = 0;
+ virtual void onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) = 0;
/**
* Notifies this cloner that an update happened to the collection, which it owns. It is up to
@@ -127,7 +130,10 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) = 0;
+ virtual void onUpdateOp(OperationContext* opCtx,
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) = 0;
/**
* Notifies this cloner that a delede happened to the collection, which it owns. It is up to the
@@ -136,7 +142,10 @@ public:
*
* NOTE: Must be called with at least IX lock held on the collection.
*/
- virtual void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) = 0;
+ virtual void onDeleteOp(OperationContext* opCtx,
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) = 0;
protected:
MigrationChunkClonerSource();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 70a76e5e473..5d0db5dea0b 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -135,8 +135,14 @@ public:
*/
LogOpForShardingHandler(MigrationChunkClonerSourceLegacy* cloner,
const BSONObj& idObj,
- const char op)
- : _cloner(cloner), _idObj(idObj.getOwned()), _op(op) {}
+ const char op,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs)
+ : _cloner(cloner),
+ _idObj(idObj.getOwned()),
+ _op(op),
+ _oplogTs(oplogTs),
+ _prePostImageTs(prePostImageTs) {}
void commit() override {
switch (_op) {
@@ -156,6 +162,14 @@ public:
default:
MONGO_UNREACHABLE;
}
+
+ if (!_prePostImageTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_prePostImageTs);
+ }
+
+ if (!_oplogTs.isNull()) {
+ _cloner->_sessionCatalogSource.notifyNewWriteTS(_oplogTs);
+ }
}
void rollback() override {}
@@ -164,6 +178,8 @@ private:
MigrationChunkClonerSourceLegacy* const _cloner;
const BSONObj _idObj;
const char _op;
+ const Timestamp _oplogTs;
+ const Timestamp _prePostImageTs;
};
MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequest request,
@@ -175,7 +191,8 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
_args.getToShardId().toString())),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)) {}
+ _recipientHost(std::move(recipientHost)),
+ _sessionCatalogSource(_args.getNss()) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -192,6 +209,9 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
return storeCurrentLocsStatus;
}
+ // Prime up the session migration source if there are oplog entries to migrate.
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
@@ -314,6 +334,12 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
_callRecipient(createRequestWithSessionId(kRecvChunkCommit, _args.getNss(), _sessionId));
if (responseStatus.isOK()) {
_cleanup(opCtx);
+
+ if (_sessionCatalogSource.hasMoreOplog()) {
+ return {ErrorCodes::SessionTransferIncomplete,
+ "destination shard finished committing but there are still some session "
+ "metadata that needs to be transferred"};
+ }
return Status::OK();
}
@@ -344,7 +370,8 @@ bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj&
}
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
- const BSONObj& insertedDoc) {
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = insertedDoc["_id"];
@@ -358,11 +385,19 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'i'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', oplogTs, {}));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'i', {}, {}));
+ }
}
void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
- const BSONObj& updatedDoc) {
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = updatedDoc["_id"];
@@ -376,11 +411,19 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'u'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', oplogTs, prePostImageTs));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'u', {}, {}));
+ }
}
void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
- const BSONObj& deletedDocId) {
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss().ns(), MODE_IX));
BSONElement idElement = deletedDocId["_id"];
@@ -390,7 +433,13 @@ void MigrationChunkClonerSourceLegacy::onDeleteOp(OperationContext* opCtx,
return;
}
- opCtx->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idElement.wrap(), 'd'));
+ if (opCtx->getTxnNumber()) {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', oplogTs, preImageTs));
+ } else {
+ opCtx->recoveryUnit()->registerChange(
+ new LogOpForShardingHandler(this, idElement.wrap(), 'd', {}, {}));
+ }
}
uint64_t MigrationChunkClonerSourceLegacy::getCloneBatchBufferAllocationSize() {
@@ -679,4 +728,27 @@ void MigrationChunkClonerSourceLegacy::_xfer(OperationContext* opCtx,
arr.done();
}
+void MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(OperationContext* opCtx,
+ BSONArrayBuilder* arrBuilder) {
+ while (_sessionCatalogSource.hasMoreOplog()) {
+ auto oplogDoc = _sessionCatalogSource.getLastFetchedOplog();
+
+ if (oplogDoc.isEmpty()) {
+ // Last fetched turned out empty, try to see if there are more
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ continue;
+ }
+
+ // Use the builder size instead of accumulating the document sizes directly so that we
+ // take into consideration the overhead of BSONArray indices.
+ if (arrBuilder->arrSize() &&
+ (arrBuilder->len() + oplogDoc.objsize() + 1024) > BSONObjMaxUserSize) {
+ break;
+ }
+
+ arrBuilder->append(oplogDoc);
+ _sessionCatalogSource.fetchNextOplog(opCtx);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index c072718475a..72d299b58e4 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -37,6 +37,7 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/s/migration_chunk_cloner_source.h"
#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/s/move_chunk_request.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/memory.h"
@@ -72,11 +73,19 @@ public:
bool isDocumentInMigratingChunk(const BSONObj& doc) override;
- void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc) override;
+ void onInsertOp(OperationContext* opCtx,
+ const BSONObj& insertedDoc,
+ const Timestamp& oplogTs) override;
- void onUpdateOp(OperationContext* opCtx, const BSONObj& updatedDoc) override;
+ void onUpdateOp(OperationContext* opCtx,
+ const BSONObj& updatedDoc,
+ const Timestamp& oplogTs,
+ const Timestamp& prePostImageTs) override;
- void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId) override;
+ void onDeleteOp(OperationContext* opCtx,
+ const BSONObj& deletedDocId,
+ const Timestamp& oplogTs,
+ const Timestamp& preImageTs) override;
// Legacy cloner specific functionality
@@ -121,6 +130,8 @@ public:
*/
Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder);
+ void nextSessionMigrationBatch(OperationContext* opCtx, BSONArrayBuilder* arrBuilder);
+
private:
friend class DeleteNotificationStage;
friend class LogOpForShardingHandler;
@@ -183,6 +194,8 @@ private:
// during the cloning stage
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec;
+ SessionCatalogMigrationSource _sessionCatalogSource;
+
// Protects the entries below
stdx::mutex _mutex;
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
index 9e3e83774ee..d42c6bdaa99 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_commands.cpp
@@ -216,5 +216,57 @@ public:
} transferModsCommand;
+/**
+ * Command for extracting the oplog entries that needs to be migrated for the given migration
+ * session id.
+ * Note: this command is not stateless. Calling this command has a side-effect of gradually
+ * depleting the buffer that contains the oplog entries to be transfered.
+ */
+class MigrateSessionCommand : public BasicCommand {
+public:
+ MigrateSessionCommand() : BasicCommand("_getNextSessionMods") {}
+
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+
+ virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return false;
+ }
+
+ virtual bool slaveOk() const {
+ return false;
+ }
+
+ virtual bool adminOnly() const {
+ return true;
+ }
+
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+
+ bool run(OperationContext* opCtx,
+ const std::string&,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) {
+ const MigrationSessionId migrationSessionId(
+ uassertStatusOK(MigrationSessionId::extractFromBSON(cmdObj)));
+
+ BSONArrayBuilder arrBuilder;
+
+ AutoGetActiveCloner autoCloner(opCtx, migrationSessionId);
+ autoCloner.getCloner()->nextSessionMigrationBatch(opCtx, &arrBuilder);
+
+ result.appendArray("oplog", arrBuilder.arr());
+ return true;
+ }
+
+} migrateSessionCommand;
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 193c237db5c..a15f6487a33 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -246,14 +246,14 @@ TEST_F(MigrationChunkClonerSourceLegacyTest, CorrectDocumentsFetched) {
WriteUnitOfWork wuow(operationContext());
- cloner.onInsertOp(operationContext(), createCollectionDocument(90));
- cloner.onInsertOp(operationContext(), createCollectionDocument(150));
- cloner.onInsertOp(operationContext(), createCollectionDocument(151));
- cloner.onInsertOp(operationContext(), createCollectionDocument(210));
-
- cloner.onDeleteOp(operationContext(), createCollectionDocument(80));
- cloner.onDeleteOp(operationContext(), createCollectionDocument(199));
- cloner.onDeleteOp(operationContext(), createCollectionDocument(220));
+ cloner.onInsertOp(operationContext(), createCollectionDocument(90), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(150), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(151), {});
+ cloner.onInsertOp(operationContext(), createCollectionDocument(210), {});
+
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(80), {}, {});
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(199), {}, {});
+ cloner.onDeleteOp(operationContext(), createCollectionDocument(220), {}, {});
wuow.commit();
}
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index ca2c736a6b1..86688ba08ca 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -57,6 +57,8 @@
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
#include "mongo/util/concurrency/notification.h"
@@ -231,6 +233,8 @@ void MigrationDestinationManager::setStateFail(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
+
+ _sessionMigration->forceFail(msg);
}
void MigrationDestinationManager::setStateFailWarn(std::string msg) {
@@ -240,6 +244,8 @@ void MigrationDestinationManager::setStateFailWarn(std::string msg) {
_errmsg = std::move(msg);
_state = FAIL;
}
+
+ _sessionMigration->forceFail(msg);
}
bool MigrationDestinationManager::isActive() const {
@@ -334,6 +340,9 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_migrateThreadHandle.join();
}
+ _sessionMigration =
+ stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId);
+
_migrateThreadHandle =
stdx::thread([this, min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern]() {
_migrateThread(min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
@@ -397,6 +406,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
<< _sessionId->toString()};
}
+ _sessionMigration->finish();
_state = COMMIT_START;
auto const deadline = Date_t::now() + Seconds(30);
@@ -411,6 +421,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
if (_state != DONE) {
return {ErrorCodes::CommandFailed, "startCommit failed, final data failed to transfer"};
}
+
return Status::OK();
}
@@ -672,6 +683,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
// 3. Initial bulk clone
setState(CLONE);
+ _sessionMigration->start(opCtx->getServiceContext());
+
const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
@@ -829,13 +842,15 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
break;
}
- sleepsecs(1);
+ opCtx->sleepFor(Seconds(1));
}
if (t.minutes() >= 600) {
setStateFail("Cannot go to critical section because secondaries cannot keep up");
return;
}
+
+ _sessionMigration->waitUntilReadyToCommit(opCtx);
}
{
@@ -895,6 +910,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep5);
}
+ _sessionMigration->join();
+ if (_sessionMigration->getState() == SessionCatalogMigrationDestination::State::ErrorOccurred) {
+ setStateFail(_sessionMigration->getErrMsg());
+ return;
+ }
+
setState(DONE);
timing.done(6);
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index 610f70196c1..49c6f1e5089 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -39,6 +39,7 @@
#include "mongo/db/s/active_migrations_registry.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_session_id.h"
+#include "mongo/db/s/session_catalog_migration_destination.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
@@ -211,6 +212,8 @@ private:
State _state{READY};
std::string _errmsg;
+
+ std::unique_ptr<SessionCatalogMigrationDestination> _sessionMigration;
};
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index c9344031555..b1b373f17c5 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -201,7 +201,6 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx,
*/
ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const BSONObj& oplogBSON,
- // const Timestamp& prePostImageTs,
const ProcessOplogResult& lastResult) {
ProcessOplogResult result;
auto oplogEntry = parseOplog(oplogBSON);
@@ -240,50 +239,62 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto& sessionInfo = oplogEntry.getOperationSessionInfo();
result.sessionId = sessionInfo.getSessionId().value();
result.txnNum = sessionInfo.getTxnNumber().value();
+ const auto stmtId = *oplogEntry.getStatementId();
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
+ if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
+ return lastResult;
+ }
+
BSONObj object(result.isPrePostImage
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
oplogLink.prevTs = scopedSession->getLastWriteOpTimeTs(result.txnNum);
- writeConflictRetry(opCtx,
- "SessionOplogMigration",
- NamespaceString::kSessionTransactionsTableNamespace.ns(),
- [&] {
- Lock::GlobalLock globalLock(opCtx, MODE_IX, UINT_MAX);
- WriteUnitOfWork wunit(opCtx);
-
- result.oplogTime = repl::logOp(opCtx,
- "n",
- oplogEntry.getNamespace(),
- oplogEntry.getUuid(),
- object,
- &object2,
- true,
- sessionInfo,
- *oplogEntry.getStatementId(),
- oplogLink);
-
- auto oplogTs = result.oplogTime.getTimestamp();
- uassert(40633,
- str::stream()
- << "Failed to create new oplog entry for oplog with opTime: "
- << oplogEntry.getOpTime().toString()
- << ": "
- << redact(oplogBSON),
- !oplogTs.isNull());
-
- if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(
- opCtx, result.txnNum, {*oplogEntry.getStatementId()}, oplogTs);
- }
-
- wunit.commit();
- });
+ writeConflictRetry(
+ opCtx,
+ "SessionOplogMigration",
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ [&] {
+ // Need to take global lock here so repl::logOp will not unlock it and trigger the
+ // invariant that disallows unlocking global lock while inside a WUOW.
+ // Grab a DBLock here instead of plain GlobalLock to make sure the MMAPV1 flush
+ // lock will be lock/unlocked correctly. Take the transaction table db lock to
+ // ensure the same lock ordering with normal replicated updates to the table.
+ Lock::DBLock lk(
+ opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX);
+ WriteUnitOfWork wunit(opCtx);
+
+ result.oplogTime = repl::logOp(opCtx,
+ "n",
+ oplogEntry.getNamespace(),
+ oplogEntry.getUuid(),
+ object,
+ &object2,
+ true,
+ sessionInfo,
+ stmtId,
+ oplogLink);
+
+ auto oplogTs = result.oplogTime.getTimestamp();
+ uassert(40633,
+ str::stream() << "Failed to create new oplog entry for oplog with opTime: "
+ << oplogEntry.getOpTime().toString()
+ << ": "
+ << redact(oplogBSON),
+ !oplogTs.isNull());
+
+ // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because
+ // the next oplog will contain the real operation.
+ if (!result.isPrePostImage) {
+ scopedSession->onWriteOpCompletedOnPrimary(opCtx, result.txnNum, {stmtId}, oplogTs);
+ }
+
+ wunit.commit();
+ });
return result;
}
@@ -307,6 +318,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_state == State::NotStarted);
_state = State::Migrating;
+ _isStateChanged.notify_all();
}
_thread = stdx::thread(stdx::bind(
@@ -316,6 +328,7 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) {
void SessionCatalogMigrationDestination::finish() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Committing;
+ _isStateChanged.notify_all();
}
void SessionCatalogMigrationDestination::join() {
@@ -343,8 +356,9 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
auto uniqueCtx = cc().makeOperationContext();
auto opCtx = uniqueCtx.get();
- // Timestamp prePostImageTs;
+ bool oplogDrainedAfterCommiting = false;
ProcessOplogResult lastResult;
+ repl::OpTime lastOpTimeWaited;
while (true) {
{
@@ -363,7 +377,16 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_state == State::Committing) {
- break;
+ // The migration is considered done only when it gets an empty result from
+ // the source shard while this is in state committing. This is to make sure
+ // that it doesn't miss any new oplog created between the time window where
+ // this depleted the buffer from the source shard and receiving the commit
+ // command.
+ if (oplogDrainedAfterCommiting) {
+ break;
+ }
+
+ oplogDrainedAfterCommiting = true;
}
}
@@ -381,8 +404,17 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
// Note: only transition to "ready to commit" if state is not error/force stop.
if (_state == State::Migrating) {
_state = State::ReadyToCommit;
+ _isStateChanged.notify_all();
}
}
+
+ if (lastOpTimeWaited == lastResult.oplogTime) {
+ // We got an empty result at least twice in a row from the source shard so
+ // space it out a little bit so we don't hammer the shard.
+ opCtx->sleepFor(Milliseconds(200));
+ }
+
+ lastOpTimeWaited = lastResult.oplogTime;
}
while (oplogIter.more()) {
@@ -411,6 +443,7 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::Done;
+ _isStateChanged.notify_all();
}
}
@@ -423,6 +456,8 @@ void SessionCatalogMigrationDestination::_errorOccurred(StringData errMsg) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_state = State::ErrorOccurred;
_errMsg = errMsg.toString();
+
+ _isStateChanged.notify_all();
}
SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::getState() {
@@ -430,4 +465,15 @@ SessionCatalogMigrationDestination::State SessionCatalogMigrationDestination::ge
return _state;
}
+void SessionCatalogMigrationDestination::forceFail(std::string& errMsg) {
+ _errorOccurred(errMsg);
+}
+
+void SessionCatalogMigrationDestination::waitUntilReadyToCommit(OperationContext* opCtx) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ while (_state == State::Migrating) {
+ opCtx->waitForConditionOrInterrupt(_isStateChanged, lk);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_destination.h b/src/mongo/db/s/session_catalog_migration_destination.h
index 2e740f2f6bb..a67bd8012f6 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.h
+++ b/src/mongo/db/s/session_catalog_migration_destination.h
@@ -37,6 +37,7 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/s/migration_session_id.h"
#include "mongo/s/shard_id.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -88,6 +89,17 @@ public:
void join();
/**
+ * Forces this into an error state which will also stop session transfer thread.
+ */
+ void forceFail(std::string& errMsg);
+
+ /**
+ * Blocks until state changes is not Migrating. In other words, can return when state
+ * becomes ReadyToCommit/Done/ErrorOccurred, etc.
+ */
+ void waitUntilReadyToCommit(OperationContext* opCtx);
+
+ /**
* Returns the current state.
*/
State getState();
@@ -109,6 +121,7 @@ private:
// Protects _state and _errMsg.
stdx::mutex _mutex;
+ stdx::condition_variable _isStateChanged;
State _state = State::NotStarted;
std::string _errMsg; // valid only if _state == ErrorOccurred.
};
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index a1a76dfbd1d..71c63721c6f 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -253,6 +253,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyWhenNothingToTr
sessionMigration.start(getServiceContext());
sessionMigration.finish();
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -285,6 +287,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -336,6 +340,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -380,6 +386,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
returnOplog({oplog1, oplog2});
returnOplog({oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -433,6 +441,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
oplog3.setStatementId(5);
returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -506,6 +516,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
oplog2.setStatementId(45);
returnOplog({oplog1, oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -556,6 +568,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
updateOplog.setPreImageTs(Timestamp(100, 2));
returnOplog({preImageOplog, updateOplog});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -645,6 +659,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
updateOplog.setPostImageTs(Timestamp(100, 2));
returnOplog({postImageOplog, updateOplog});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -735,6 +751,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
returnOplog({preImageOplog});
returnOplog({updateOplog});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -829,6 +847,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
oplog2.setStatementId(45);
returnOplog({oplog1, oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -883,6 +903,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
oplog2.setStatementId(45);
returnOplog({oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -1048,6 +1070,8 @@ TEST_F(SessionCatalogMigrationDestinationTest,
oplog2.setStatementId(45);
returnOplog({oplog2});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
returnOplog({});
sessionMigration.join();
@@ -1303,6 +1327,61 @@ TEST_F(SessionCatalogMigrationDestinationTest,
ASSERT_FALSE(sessionMigration.getErrMsg().empty());
}
+TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatements) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ auto opCtx = operationContext();
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(19);
+
+ insertDocWithSessionInfo(sessionInfo, kNs, BSON("_id" << 46), 30);
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(OpTime(Timestamp(70, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(30);
+
+ OplogEntry oplog3(OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 80));
+ oplog3.setOperationSessionInfo(sessionInfo);
+ oplog3.setStatementId(45);
+
+ returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto session = getSessionWithTxn(opCtx, sessionId, 19);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTimeTs(19));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ auto firstInsertOplog = historyIter.next(opCtx);
+
+ ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert);
+ ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject());
+ ASSERT_TRUE(firstInsertOplog.getStatementId());
+ ASSERT_EQ(30, *firstInsertOplog.getStatementId());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index e79656ba94b..829d862b370 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -76,7 +76,6 @@ BSONObj SessionCatalogMigrationSource::getLastFetchedOplog() {
{
stdx::lock_guard<stdx::mutex> _lk(_newOplogMutex);
- invariant(!_lastFetchedNewWriteOplog.isEmpty());
return _lastFetchedNewWriteOplog;
}
}
@@ -101,11 +100,13 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
return false;
}
- _lastFetchedOplog = nextOplog.toBSON().getOwned();
-
+ auto nextOplogBSON = nextOplog.toBSON().getOwned();
auto doc = fetchPrePostImageOplog(opCtx, nextOplog);
if (!doc.isEmpty()) {
- _lastFetchedOplogBuffer.push_back(doc);
+ _lastFetchedOplogBuffer.push_back(nextOplogBSON);
+ _lastFetchedOplog = doc;
+ } else {
+ _lastFetchedOplog = nextOplogBSON;
}
return true;
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 42a3e71e372..5e44531d31c 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -68,6 +68,7 @@ public:
/**
* Returns the oplog document that was last fetched by the fetchNextOplog call.
+ * Returns an empty object if there are no oplog to fetch.
*/
BSONObj getLastFetchedOplog();
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index ad4942292ea..ff43cce5d0b 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -216,7 +216,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
SessionCatalogMigrationSource migrationSource(kNs);
ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- auto expectedSequece = {entry4.toBSON(), entry3.toBSON(), entry2.toBSON(), entry1.toBSON()};
+ auto expectedSequece = {entry3.toBSON(), entry4.toBSON(), entry1.toBSON(), entry2.toBSON()};
for (auto oplogDoc : expectedSequece) {
ASSERT_TRUE(migrationSource.hasMoreOplog());