diff options
author | Janna Golden <janna.golden@mongodb.com> | 2019-10-08 15:31:01 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-10-08 15:31:01 +0000 |
commit | 8ac53a6a615f14c78cf007ae6a58688849b63f56 (patch) | |
tree | 858df204e7172adbceacdd982d5abea760c01e13 | |
parent | 5710a3332449c1cbe2b4c46043f4283bd19a8a43 (diff) | |
download | mongo-8ac53a6a615f14c78cf007ae6a58688849b63f56.tar.gz |
SERVER-42783 Migrations should wait for majority replication of cloned docs when there are no xfer mods
(cherry picked from commit 2fb73bcd2515cd8d566fecc5b23ee9f6970b1716)
-rw-r--r-- | jstests/libs/chunk_manipulation_util.js | 16 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 2 |
3 files changed, 26 insertions, 13 deletions
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js index ba69691ac2f..a96e830a6dc 100644 --- a/jstests/libs/chunk_manipulation_util.js +++ b/jstests/libs/chunk_manipulation_util.js @@ -19,11 +19,12 @@ load('./jstests/libs/test_background_ops.js'); // Returns a join function; call it to wait for moveChunk to complete. // -function moveChunkParallel(staticMongod, mongosURL, findCriteria, bounds, ns, toShardId) { +function moveChunkParallel( + staticMongod, mongosURL, findCriteria, bounds, ns, toShardId, expectSuccess = true) { assert((findCriteria || bounds) && !(findCriteria && bounds), 'Specify either findCriteria or bounds, but not both.'); - function runMoveChunk(mongosURL, findCriteria, bounds, ns, toShardId) { + function runMoveChunk(mongosURL, findCriteria, bounds, ns, toShardId, expectSuccess) { assert(mongosURL && ns && toShardId, 'Missing arguments.'); assert((findCriteria || bounds) && !(findCriteria && bounds), 'Specify either findCriteria or bounds, but not both.'); @@ -42,12 +43,17 @@ function moveChunkParallel(staticMongod, mongosURL, findCriteria, bounds, ns, to printjson(cmd); var result = admin.runCommand(cmd); printjson(result); - assert(result.ok); + if (expectSuccess) { + assert(result.ok); + } else { + assert.commandFailed(result); + } } // Return the join function. - return startParallelOps( - staticMongod, runMoveChunk, [mongosURL, findCriteria, bounds, ns, toShardId]); + return startParallelOps(staticMongod, + runMoveChunk, + [mongosURL, findCriteria, bounds, ns, toShardId, expectSuccess]); } // moveChunk starts at step 0 and proceeds to 1 (it has *finished* parsing diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 8d5fd474f0c..bdf67e27f83 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -369,16 +369,22 @@ Status MigrationDestinationManager::start(const NamespaceString& nss, return Status::OK(); } -void MigrationDestinationManager::cloneDocumentsFromDonor( +repl::OpTime MigrationDestinationManager::cloneDocumentsFromDonor( OperationContext* opCtx, stdx::function<void(OperationContext*, BSONObj)> insertBatchFn, stdx::function<BSONObj(OperationContext*)> fetchBatchFn) { ProducerConsumerQueue<BSONObj> batches(1); + repl::OpTime lastOpApplied; + stdx::thread inserterThread{[&] { Client::initThreadIfNotAlready("chunkInserter"); auto inserterOpCtx = Client::getCurrent()->makeOperationContext(); - auto consumerGuard = MakeGuard([&] { batches.closeConsumerEnd(); }); + auto consumerGuard = MakeGuard([&] { + batches.closeConsumerEnd(); + lastOpApplied = repl::ReplClientInfo::forClient(inserterOpCtx->getClient()).getLastOp(); + }); + try { while (true) { auto nextBatch = batches.pop(inserterOpCtx.get()); @@ -414,6 +420,8 @@ void MigrationDestinationManager::cloneDocumentsFromDonor( break; } } + + return lastOpApplied; } Status MigrationDestinationManager::abort(const MigrationSessionId& sessionId) { @@ -781,6 +789,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep2); } + repl::OpTime lastOpApplied; { // 3. Initial bulk clone setState(CLONE); @@ -864,7 +873,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, return res; }; - cloneDocumentsFromDonor(opCtx, insertBatchFn, fetchBatchFn); + // If running on a replicated system, we'll need to flush the docs we cloned to the + // secondaries + lastOpApplied = cloneDocumentsFromDonor(opCtx, insertBatchFn, fetchBatchFn); timing.done(3); MONGO_FAIL_POINT_PAUSE_WHILE_SET(migrateThreadHangAtStep3); @@ -876,10 +887,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } } - // If running on a replicated system, we'll need to flush the docs we cloned to the - // secondaries - repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - const BSONObj xferModsRequest = createTransferModsRequest(_nss, *_sessionId); { diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 87627586076..efe35ffb0c3 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -109,7 +109,7 @@ public: /** * Clones documents from a donor shard. */ - static void cloneDocumentsFromDonor( + static repl::OpTime cloneDocumentsFromDonor( OperationContext* opCtx, stdx::function<void(OperationContext*, BSONObj)> insertBatchFn, stdx::function<BSONObj(OperationContext*)> fetchBatchFn); |