summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJanna Golden <janna.golden@mongodb.com>2019-10-07 19:59:49 +0000
committerevergreen <evergreen@mongodb.com>2019-10-07 19:59:49 +0000
commit19007f085afb46b0c56329d2230667f789b3f13c (patch)
tree21df5c931bd582263a0a3fef8e61dfed30244041
parent6e2a0a863d59213c61128690350ec010cd7a759e (diff)
downloadmongo-19007f085afb46b0c56329d2230667f789b3f13c.tar.gz
SERVER-42783 Migrations should wait for majority replication of cloned docs when there are no xfer mods
(cherry picked from commit 2fb73bcd2515cd8d566fecc5b23ee9f6970b1716)
-rw-r--r--jstests/libs/chunk_manipulation_util.js16
-rw-r--r--jstests/sharding/migration_waits_for_majority_commit.js64
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp20
-rw-r--r--src/mongo/db/s/migration_destination_manager.h2
4 files changed, 89 insertions, 13 deletions
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js
index 00ec3687b45..7509deba755 100644
--- a/jstests/libs/chunk_manipulation_util.js
+++ b/jstests/libs/chunk_manipulation_util.js
@@ -19,11 +19,12 @@ load('./jstests/libs/test_background_ops.js');
// Returns a join function; call it to wait for moveChunk to complete.
//
-function moveChunkParallel(staticMongod, mongosURL, findCriteria, bounds, ns, toShardId) {
+function moveChunkParallel(
+ staticMongod, mongosURL, findCriteria, bounds, ns, toShardId, expectSuccess = true) {
assert((findCriteria || bounds) && !(findCriteria && bounds),
'Specify either findCriteria or bounds, but not both.');
- function runMoveChunk(mongosURL, findCriteria, bounds, ns, toShardId) {
+ function runMoveChunk(mongosURL, findCriteria, bounds, ns, toShardId, expectSuccess) {
assert(mongosURL && ns && toShardId, 'Missing arguments.');
assert((findCriteria || bounds) && !(findCriteria && bounds),
'Specify either findCriteria or bounds, but not both.');
@@ -42,12 +43,17 @@ function moveChunkParallel(staticMongod, mongosURL, findCriteria, bounds, ns, to
printjson(cmd);
var result = admin.runCommand(cmd);
printjson(result);
- assert(result.ok);
+ if (expectSuccess) {
+ assert(result.ok);
+ } else {
+ assert.commandFailed(result);
+ }
}
// Return the join function.
- return startParallelOps(
- staticMongod, runMoveChunk, [mongosURL, findCriteria, bounds, ns, toShardId]);
+ return startParallelOps(staticMongod,
+ runMoveChunk,
+ [mongosURL, findCriteria, bounds, ns, toShardId, expectSuccess]);
}
// moveChunk starts at step 0 and proceeds to 1 (it has *finished* parsing
diff --git a/jstests/sharding/migration_waits_for_majority_commit.js b/jstests/sharding/migration_waits_for_majority_commit.js
new file mode 100644
index 00000000000..6b808b80081
--- /dev/null
+++ b/jstests/sharding/migration_waits_for_majority_commit.js
@@ -0,0 +1,64 @@
+/**
+ * This test is meant to test that a migration will correctly wait for the majority commit point
+ * when there are no transfer mod writes (SERVER-42783).
+ * @tags: [requires_find_command]
+ */
+
+(function() {
+"use strict";
+
+load('./jstests/libs/chunk_manipulation_util.js');
+
+// Set up a sharded cluster with two shards, two chunks, and one document in one of the chunks.
+const st = new ShardingTest({shards: 2, rs: {nodes: 2}, config: 1});
+const testDB = st.s.getDB("test");
+
+assert.commandWorked(testDB.foo.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+st.ensurePrimaryShard("test", st.shard0.shardName);
+assert.commandWorked(st.s.adminCommand({enableSharding: "test"}));
+assert.commandWorked(st.s.adminCommand({shardCollection: "test.foo", key: {_id: 1}}));
+assert.commandWorked(st.s.adminCommand({split: "test.foo", middle: {_id: 0}}));
+
+// The document is in the majority committed snapshot.
+assert.eq(1, testDB.foo.find().readConcern("majority").itcount());
+
+// Advance a migration to the beginning of the cloning phase.
+pauseMigrateAtStep(st.rs1.getPrimary(), 2);
+
+// For startParallelOps to write its state
+let staticMongod = MongoRunner.runMongod({});
+
+let awaitMigration = moveChunkParallel(staticMongod,
+ st.s.host,
+ {_id: 1},
+ null,
+ "test.foo",
+ st.shard1.shardName,
+ false /* expectSuccess */);
+
+// Wait for the migration to reach the failpoint and allow any writes to become majority committed
+// before pausing replication.
+waitForMigrateStep(st.rs1.getPrimary(), 2);
+st.rs1.awaitLastOpCommitted();
+
+// Disable replication on the recipient shard's secondary node, so the recipient shard's majority
+// commit point cannot advance.
+const destinationSec = st.rs1.getSecondary();
+assert.commandWorked(
+ destinationSec.adminCommand({configureFailPoint: "rsSyncApplyStop", mode: "alwaysOn"}),
+ "failed to enable fail point on secondary");
+
+// Allow the migration to begin cloning.
+unpauseMigrateAtStep(st.rs1.getPrimary(), 2);
+
+// The migration should fail to commit without being able to advance the majority commit point.
+awaitMigration();
+
+assert.commandWorked(
+ destinationSec.adminCommand({configureFailPoint: "rsSyncApplyStop", mode: "off"}),
+ "failed to enable fail point on secondary");
+
+st.stop();
+MongoRunner.stopMongod(staticMongod);
+})(); \ No newline at end of file
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index a88be055ad6..059dfe85723 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -375,7 +375,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
return Status::OK();
}
-void MigrationDestinationManager::cloneDocumentsFromDonor(
+repl::OpTime MigrationDestinationManager::cloneDocumentsFromDonor(
OperationContext* opCtx,
stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn) {
@@ -384,11 +384,16 @@ void MigrationDestinationManager::cloneDocumentsFromDonor(
options.maxQueueDepth = 1;
SingleProducerSingleConsumerQueue<BSONObj> batches(options);
+ repl::OpTime lastOpApplied;
stdx::thread inserterThread{[&] {
ThreadClient tc("chunkInserter", opCtx->getServiceContext());
auto inserterOpCtx = Client::getCurrent()->makeOperationContext();
- auto consumerGuard = makeGuard([&] { batches.closeConsumerEnd(); });
+ auto consumerGuard = makeGuard([&] {
+ batches.closeConsumerEnd();
+ lastOpApplied = repl::ReplClientInfo::forClient(inserterOpCtx->getClient()).getLastOp();
+ });
+
try {
while (true) {
auto nextBatch = batches.pop(inserterOpCtx.get());
@@ -424,6 +429,8 @@ void MigrationDestinationManager::cloneDocumentsFromDonor(
break;
}
}
+
+ return lastOpApplied;
}
Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) {
@@ -765,6 +772,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep2);
}
+ repl::OpTime lastOpApplied;
{
// 3. Initial bulk clone
setState(CLONE);
@@ -853,7 +861,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
return res.response;
};
- cloneDocumentsFromDonor(opCtx, insertBatchFn, fetchBatchFn);
+ // If running on a replicated system, we'll need to flush the docs we cloned to the
+ // secondaries
+ lastOpApplied = cloneDocumentsFromDonor(opCtx, insertBatchFn, fetchBatchFn);
timing.done(3);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3);
@@ -865,10 +875,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
}
}
- // If running on a replicated system, we'll need to flush the docs we cloned to the
- // secondaries
- repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
-
const BSONObj xferModsRequest = createTransferModsRequest(_nss, *_sessionId);
{
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index dcda2e37bee..afdc5c2f125 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -111,7 +111,7 @@ public:
/**
* Clones documents from a donor shard.
*/
- static void cloneDocumentsFromDonor(
+ static repl::OpTime cloneDocumentsFromDonor(
OperationContext* opCtx,
stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn);