summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin Pulo <kevin.pulo@mongodb.com>2018-11-14 05:52:59 +0000
committerKevin Pulo <kevin.pulo@mongodb.com>2018-11-22 03:23:58 +0000
commitb2ea16518d575bf50c25f4b9fcba07591f15d70f (patch)
treec913024b51de35524786219dac5fc85214e22949
parent75ff74fc15444191fe56d69d3c7d7b1adc0a40cc (diff)
downloadmongo-b2ea16518d575bf50c25f4b9fcba07591f15d70f.tar.gz
SERVER-37080 tunable migration clone insertion batches
(cherry picked from commit 9cc66d04356cf7b542df77ebe9dbe94445308240)
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp114
1 files changed, 74 insertions, 40 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 3e5b5581626..ac2a1a8ae41 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -54,6 +54,7 @@
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/move_timing_helper.h"
#include "mongo/db/s/start_chunk_clone_request.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
@@ -692,6 +693,31 @@ void MigrationDestinationManager::_migrateThread() {
_isActiveCV.notify_all();
}
+// The maximum number of documents to insert in a single batch during migration clone.
+// secondaryThrottle and migrateCloneInsertionBatchDelayMS apply between each batch.
+// 0 or negative values (the default) means no limit to batch size.
+// 1 corresponds to 3.4.16 (and earlier) behavior.
+MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchSize, int, 0)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 0) {
+ return Status(ErrorCodes::BadValue,
+ "migrateCloneInsertionBatchSize must not be negative");
+ }
+ return Status::OK();
+ });
+
+// Time in milliseconds between batches of insertions during migration clone.
+// This is in addition to any time spent waiting for replication (secondaryThrottle).
+// Defaults to 0, which means no wait.
+MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchDelayMS, int, 0)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 0) {
+ return Status(ErrorCodes::BadValue,
+ "migrateCloneInsertionBatchDelayMS must not be negative");
+ }
+ return Status::OK();
+ });
+
void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
invariant(isActive());
invariant(_sessionId);
@@ -764,50 +790,58 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
};
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
- int batchNumCloned = 0;
- int batchClonedBytes = 0;
-
- assertNotAborted(opCtx);
-
- write_ops::Insert insertOp(_nss);
- insertOp.getWriteCommandBase().setOrdered(true);
- insertOp.setDocuments([&] {
- std::vector<BSONObj> toInsert;
- for (const auto& doc : arr) {
- BSONObj docToClone = doc.Obj();
- toInsert.push_back(docToClone);
- batchNumCloned++;
- batchClonedBytes += docToClone.objsize();
- }
- return toInsert;
- }());
+ auto it = arr.begin();
+ while (it != arr.end()) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+ const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
+
+ assertNotAborted(opCtx);
+
+ write_ops::Insert insertOp(_nss);
+ insertOp.getWriteCommandBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ while (it != arr.end() &&
+ (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
+ const auto& doc = *it;
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ ++it;
+ }
+ return toInsert;
+ }());
- const WriteResult reply = performInserts(opCtx, insertOp, true);
+ const WriteResult reply = performInserts(opCtx, insertOp, true);
- for (unsigned long i = 0; i < reply.results.size(); ++i) {
- uassertStatusOKWithContext(reply.results[i],
- str::stream() << "Insert of "
- << redact(insertOp.getDocuments()[i])
- << " failed.");
- }
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOKWithContext(
+ reply.results[i],
+ str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
+ }
- {
- stdx::lock_guard<stdx::mutex> statsLock(_mutex);
- _numCloned += batchNumCloned;
- _clonedBytes += batchClonedBytes;
- }
- if (_writeConcern.shouldWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- warning() << "secondaryThrottle on, but doc insert timed out; "
- "continuing";
- } else {
- uassertStatusOK(replStatus.status);
+ {
+ stdx::lock_guard<stdx::mutex> statsLock(_mutex);
+ _numCloned += batchNumCloned;
+ _clonedBytes += batchClonedBytes;
+ }
+ if (_writeConcern.shouldWaitForOtherNodes()) {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ warning() << "secondaryThrottle on, but doc insert timed out; "
+ "continuing";
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
}
+
+ sleepmillis(migrateCloneInsertionBatchDelayMS.load());
}
};