diff options
author | Kevin Pulo <kevin.pulo@mongodb.com> | 2018-11-14 05:52:59 +0000 |
---|---|---|
committer | Kevin Pulo <kevin.pulo@mongodb.com> | 2018-11-22 03:23:58 +0000 |
commit | b2ea16518d575bf50c25f4b9fcba07591f15d70f (patch) | |
tree | c913024b51de35524786219dac5fc85214e22949 | |
parent | 75ff74fc15444191fe56d69d3c7d7b1adc0a40cc (diff) | |
download | mongo-b2ea16518d575bf50c25f4b9fcba07591f15d70f.tar.gz |
SERVER-37080 tunable migration clone insertion batches
(cherry picked from commit 9cc66d04356cf7b542df77ebe9dbe94445308240)
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 114 |
1 files changed, 74 insertions, 40 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 3e5b5581626..ac2a1a8ae41 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -54,6 +54,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/start_chunk_clone_request.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" @@ -692,6 +693,31 @@ void MigrationDestinationManager::_migrateThread() { _isActiveCV.notify_all(); } +// The maximum number of documents to insert in a single batch during migration clone. +// secondaryThrottle and migrateCloneInsertionBatchDelayMS apply between each batch. +// 0 or negative values (the default) means no limit to batch size. +// 1 corresponds to 3.4.16 (and earlier) behavior. +MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchSize, int, 0) + ->withValidator([](const int& newVal) { + if (newVal < 0) { + return Status(ErrorCodes::BadValue, + "migrateCloneInsertionBatchSize must not be negative"); + } + return Status::OK(); + }); + +// Time in milliseconds between batches of insertions during migration clone. +// This is in addition to any time spent waiting for replication (secondaryThrottle). +// Defaults to 0, which means no wait. +MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchDelayMS, int, 0) + ->withValidator([](const int& newVal) { + if (newVal < 0) { + return Status(ErrorCodes::BadValue, + "migrateCloneInsertionBatchDelayMS must not be negative"); + } + return Status::OK(); + }); + void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { invariant(isActive()); invariant(_sessionId); @@ -764,50 +790,58 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { }; auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) { - int batchNumCloned = 0; - int batchClonedBytes = 0; - - assertNotAborted(opCtx); - - write_ops::Insert insertOp(_nss); - insertOp.getWriteCommandBase().setOrdered(true); - insertOp.setDocuments([&] { - std::vector<BSONObj> toInsert; - for (const auto& doc : arr) { - BSONObj docToClone = doc.Obj(); - toInsert.push_back(docToClone); - batchNumCloned++; - batchClonedBytes += docToClone.objsize(); - } - return toInsert; - }()); + auto it = arr.begin(); + while (it != arr.end()) { + int batchNumCloned = 0; + int batchClonedBytes = 0; + const int batchMaxCloned = migrateCloneInsertionBatchSize.load(); + + assertNotAborted(opCtx); + + write_ops::Insert insertOp(_nss); + insertOp.getWriteCommandBase().setOrdered(true); + insertOp.setDocuments([&] { + std::vector<BSONObj> toInsert; + while (it != arr.end() && + (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) { + const auto& doc = *it; + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); + ++it; + } + return toInsert; + }()); - const WriteResult reply = performInserts(opCtx, insertOp, true); + const WriteResult reply = performInserts(opCtx, insertOp, true); - for (unsigned long i = 0; i < reply.results.size(); ++i) { - uassertStatusOKWithContext(reply.results[i], - str::stream() << "Insert of " - << redact(insertOp.getDocuments()[i]) - << " failed."); - } + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOKWithContext( + reply.results[i], + str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed."); + } - { - stdx::lock_guard<stdx::mutex> statsLock(_mutex); - _numCloned += batchNumCloned; - _clonedBytes += batchClonedBytes; - } - if (_writeConcern.shouldWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - _writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - warning() << "secondaryThrottle on, but doc insert timed out; " - "continuing"; - } else { - uassertStatusOK(replStatus.status); + { + stdx::lock_guard<stdx::mutex> statsLock(_mutex); + _numCloned += batchNumCloned; + _clonedBytes += batchClonedBytes; + } + if (_writeConcern.shouldWaitForOtherNodes()) { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + _writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + warning() << "secondaryThrottle on, but doc insert timed out; " + "continuing"; + } else { + uassertStatusOK(replStatus.status); + } } + + sleepmillis(migrateCloneInsertionBatchDelayMS.load()); } }; |