diff options
author | Kevin Pulo <kevin.pulo@mongodb.com> | 2018-11-14 05:52:59 +0000 |
---|---|---|
committer | Kevin Pulo <kevin.pulo@mongodb.com> | 2018-11-21 05:56:30 +0000 |
commit | 9cc66d04356cf7b542df77ebe9dbe94445308240 (patch) | |
tree | 344babab7d73b99912655a43557f1c08c1a3723f /src/mongo | |
parent | 3aa006e505e3f65a9272f637b816e17eaf0dadb3 (diff) | |
download | mongo-9cc66d04356cf7b542df77ebe9dbe94445308240.tar.gz |
SERVER-37080 tunable migration clone insertion batches
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 118 |
1 files changed, 76 insertions, 42 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index ec2f53aabac..a708bf8a419 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -56,6 +56,7 @@ #include "mongo/db/s/move_timing_helper.h" #include "mongo/db/s/sharding_statistics.h" #include "mongo/db/s/start_chunk_clone_request.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/client/shard_registry.h" @@ -701,6 +702,31 @@ void MigrationDestinationManager::_migrateThread() { _isActiveCV.notify_all(); } +// The maximum number of documents to insert in a single batch during migration clone. +// secondaryThrottle and migrateCloneInsertionBatchDelayMS apply between each batch. +// 0 or negative values (the default) means no limit to batch size. +// 1 corresponds to 3.4.16 (and earlier) behavior. +MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchSize, int, 0) + ->withValidator([](const int& newVal) { + if (newVal < 0) { + return Status(ErrorCodes::BadValue, + "migrateCloneInsertionBatchSize must not be negative"); + } + return Status::OK(); + }); + +// Time in milliseconds between batches of insertions during migration clone. +// This is in addition to any time spent waiting for replication (secondaryThrottle). +// Defaults to 0, which means no wait. +MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchDelayMS, int, 0) + ->withValidator([](const int& newVal) { + if (newVal < 0) { + return Status(ErrorCodes::BadValue, + "migrateCloneInsertionBatchDelayMS must not be negative"); + } + return Status::OK(); + }); + void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { invariant(isActive()); invariant(_sessionId); @@ -773,52 +799,60 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { }; auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) { - int batchNumCloned = 0; - int batchClonedBytes = 0; - - assertNotAborted(opCtx); - - write_ops::Insert insertOp(_nss); - insertOp.getWriteCommandBase().setOrdered(true); - insertOp.setDocuments([&] { - std::vector<BSONObj> toInsert; - for (const auto& doc : arr) { - BSONObj docToClone = doc.Obj(); - toInsert.push_back(docToClone); - batchNumCloned++; - batchClonedBytes += docToClone.objsize(); - } - return toInsert; - }()); + auto it = arr.begin(); + while (it != arr.end()) { + int batchNumCloned = 0; + int batchClonedBytes = 0; + const int batchMaxCloned = migrateCloneInsertionBatchSize.load(); + + assertNotAborted(opCtx); + + write_ops::Insert insertOp(_nss); + insertOp.getWriteCommandBase().setOrdered(true); + insertOp.setDocuments([&] { + std::vector<BSONObj> toInsert; + while (it != arr.end() && + (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) { + const auto& doc = *it; + BSONObj docToClone = doc.Obj(); + toInsert.push_back(docToClone); + batchNumCloned++; + batchClonedBytes += docToClone.objsize(); + ++it; + } + return toInsert; + }()); - const WriteResult reply = performInserts(opCtx, insertOp, true); + const WriteResult reply = performInserts(opCtx, insertOp, true); - for (unsigned long i = 0; i < reply.results.size(); ++i) { - uassertStatusOKWithContext(reply.results[i], - str::stream() << "Insert of " - << redact(insertOp.getDocuments()[i]) - << " failed."); - } + for (unsigned long i = 0; i < reply.results.size(); ++i) { + uassertStatusOKWithContext( + reply.results[i], + str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed."); + } - { - stdx::lock_guard<stdx::mutex> statsLock(_mutex); - _numCloned += batchNumCloned; - ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch( - batchNumCloned); - _clonedBytes += batchClonedBytes; - } - if (_writeConcern.shouldWaitForOtherNodes()) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - _writeConcern); - if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { - warning() << "secondaryThrottle on, but doc insert timed out; " - "continuing"; - } else { - uassertStatusOK(replStatus.status); + { + stdx::lock_guard<stdx::mutex> statsLock(_mutex); + _numCloned += batchNumCloned; + ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch( + batchNumCloned); + _clonedBytes += batchClonedBytes; + } + if (_writeConcern.shouldWaitForOtherNodes()) { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::ReplicationCoordinator::get(opCtx)->awaitReplication( + opCtx, + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), + _writeConcern); + if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { + warning() << "secondaryThrottle on, but doc insert timed out; " + "continuing"; + } else { + uassertStatusOK(replStatus.status); + } } + + sleepmillis(migrateCloneInsertionBatchDelayMS.load()); } }; |