summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin Pulo <kevin.pulo@mongodb.com>2018-11-21 22:55:50 +0000
committerKevin Pulo <kevin.pulo@mongodb.com>2018-11-22 03:25:35 +0000
commit28b8f0c479cf7a08e84c0c3c95b750cef2c04efd (patch)
treebcc444e08eb127ca5a20bd7191a077d2b40e06bd
parent46dfed87ba88a2c62573d63043c6ceb64d1bde57 (diff)
downloadmongo-28b8f0c479cf7a08e84c0c3c95b750cef2c04efd.tar.gz
SERVER-37080 tunable migration clone insertion batches
(cherry picked from commit 9cc66d04356cf7b542df77ebe9dbe94445308240)
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp110
1 files changed, 73 insertions, 37 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index d91073c674a..f3a15afb1ad 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/move_timing_helper.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
@@ -523,6 +524,31 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
_isActiveCV.notify_all();
}
+// The maximum number of documents to insert in a single batch during migration clone.
+// secondaryThrottle and migrateCloneInsertionBatchDelayMS apply between each batch.
+// 0 or negative values (the default) means no limit to batch size.
+// 1 corresponds to 3.4.16 (and earlier) behavior.
+MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchSize, int, 0)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 0) {
+ return Status(ErrorCodes::BadValue,
+ "migrateCloneInsertionBatchSize must not be negative");
+ }
+ return Status::OK();
+ });
+
+// Time in milliseconds between batches of insertions during migration clone.
+// This is in addition to any time spent waiting for replication (secondaryThrottle).
+// Defaults to 0, which means no wait.
+MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchDelayMS, int, 0)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 0) {
+ return Status(ErrorCodes::BadValue,
+ "migrateCloneInsertionBatchDelayMS must not be negative");
+ }
+ return Status::OK();
+ });
+
void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
const BSONObj& min,
const BSONObj& max,
@@ -771,47 +797,57 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
};
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
- int batchNumCloned = 0;
- int batchClonedBytes = 0;
-
- assertNotAborted(opCtx);
-
- write_ops::Insert insertOp(_nss);
- insertOp.getWriteCommandBase().setOrdered(true);
- insertOp.setDocuments([&] {
- std::vector<BSONObj> toInsert;
- for (const auto& doc : arr) {
- BSONObj docToClone = doc.Obj();
- toInsert.push_back(docToClone);
- batchNumCloned++;
- batchClonedBytes += docToClone.objsize();
- }
- return toInsert;
- }());
+ auto it = arr.begin();
+ while (it != arr.end()) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+ const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
+
+ assertNotAborted(opCtx);
+
+ write_ops::Insert insertOp(_nss);
+ insertOp.getWriteCommandBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ while (it != arr.end() &&
+ (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
+ const auto& doc = *it;
+
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ ++it;
+ }
+ return toInsert;
+ }());
- const WriteResult reply = performInserts(opCtx, insertOp, true);
+ const WriteResult reply = performInserts(opCtx, insertOp, true);
- for (unsigned long i = 0; i < reply.results.size(); ++i) {
- uassertStatusOK(reply.results[i]);
- }
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOK(reply.results[i]);
+ }
- {
- stdx::lock_guard<stdx::mutex> statsLock(_mutex);
- _numCloned += batchNumCloned;
- _clonedBytes += batchClonedBytes;
- }
- if (writeConcern.shouldWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- warning() << "secondaryThrottle on, but doc insert timed out; "
- "continuing";
- } else {
- uassertStatusOK(replStatus.status);
+ {
+ stdx::lock_guard<stdx::mutex> statsLock(_mutex);
+ _numCloned += batchNumCloned;
+ _clonedBytes += batchClonedBytes;
+ }
+ if (writeConcern.shouldWaitForOtherNodes()) {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ warning() << "secondaryThrottle on, but doc insert timed out; "
+ "continuing";
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
}
+
+ sleepmillis(migrateCloneInsertionBatchDelayMS.load());
}
};