diff options
author | Kevin Pulo <kevin.pulo@mongodb.com> | 2018-11-21 22:55:50 +0000 |
---|---|---|
committer | Kevin Pulo <kevin.pulo@mongodb.com> | 2018-11-22 03:25:35 +0000 |
commit | 28b8f0c479cf7a08e84c0c3c95b750cef2c04efd (patch) | |
tree | bcc444e08eb127ca5a20bd7191a077d2b40e06bd | |
parent | 46dfed87ba88a2c62573d63043c6ceb64d1bde57 (diff) | |
download | mongo-28b8f0c479cf7a08e84c0c3c95b750cef2c04efd.tar.gz |
SERVER-37080 tunable migration clone insertion batches
(cherry picked from commit 9cc66d04356cf7b542df77ebe9dbe94445308240)
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 110 |
1 files changed, 73 insertions, 37 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index d91073c674a..f3a15afb1ad 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -55,6 +55,7 @@ #include "mongo/db/s/migration_util.h" #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" @@ -523,6 +524,31 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, _isActiveCV.notify_all(); } +// The maximum number of documents to insert in a single batch during migration clone. +// secondaryThrottle and migrateCloneInsertionBatchDelayMS apply between each batch. +// 0 or negative values (the default) means no limit to batch size. +// 1 corresponds to 3.4.16 (and earlier) behavior. +MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchSize, int, 0) + ->withValidator([](const int& newVal) { + if (newVal < 0) { + return Status(ErrorCodes::BadValue, + "migrateCloneInsertionBatchSize must not be negative"); + } + return Status::OK(); + }); + +// Time in milliseconds between batches of insertions during migration clone. +// This is in addition to any time spent waiting for replication (secondaryThrottle). +// Defaults to 0, which means no wait. +MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchDelayMS, int, 0) + ->withValidator([](const int& newVal) { + if (newVal < 0) { + return Status(ErrorCodes::BadValue, + "migrateCloneInsertionBatchDelayMS must not be negative"); + } + return Status::OK(); + }); + void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, const BSONObj& min, const BSONObj& max, @@ -771,47 +797,57 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, }; auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) { - int batchNumCloned = 0; - int batchClonedBytes = 0; - - assertNotAborted(opCtx); - - write_ops::Insert insertOp(_nss); - insertOp.getWriteCommandBase().setOrdered(true); - insertOp.setDocuments([&] { - std::vector<BSONObj> toInsert; - for (const auto& doc : arr) { - BSONObj docToClone = doc.Obj(); - toInsert.push_back(docToClone); - batchNumCloned++; - batchClonedBytes += docToClone.objsize(); - } - return toInsert; - }()); + auto it = arr.begin(); + while (it != arr.end()) { + int batchNumCloned = 0; + int batchClonedBytes = 0; + const int batchMaxCloned = migrateCloneInsertionBatchSize.load(); + + assertNotAborted(opCtx); + + write_ops::Insert insertOp(_nss); + insertOp.getWriteCommandBase().setOrdered(true); + insertOp.setDocuments([&] { + std::vector<BSONObj> toInsert; + while (it != arr.end() && + (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) { + const auto& doc = *it; + + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); + ++it; + } + return toInsert; + }()); - const WriteResult reply = performInserts(opCtx, insertOp, true); + const WriteResult reply = performInserts(opCtx, insertOp, true); - for (unsigned long i = 0; i < reply.results.size(); ++i) { - uassertStatusOK(reply.results[i]); - } + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOK(reply.results[i]); + } - { - stdx::lock_guard<stdx::mutex> statsLock(_mutex); - _numCloned += batchNumCloned; - _clonedBytes += batchClonedBytes; - } - if (writeConcern.shouldWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - warning() << "secondaryThrottle on, but doc insert timed out; " - "continuing"; - } else { - uassertStatusOK(replStatus.status); + { + stdx::lock_guard<stdx::mutex> statsLock(_mutex); + _numCloned += batchNumCloned; + _clonedBytes += batchClonedBytes; + } + if (writeConcern.shouldWaitForOtherNodes()) { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + warning() << "secondaryThrottle on, but doc insert timed out; " + "continuing"; + } else { + uassertStatusOK(replStatus.status); + } } + + sleepmillis(migrateCloneInsertionBatchDelayMS.load()); } }; |