summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp118
1 files changed, 76 insertions, 42 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index ec2f53aabac..a708bf8a419 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/s/move_timing_helper.h"
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/db/s/start_chunk_clone_request.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/client/shard_registry.h"
@@ -701,6 +702,31 @@ void MigrationDestinationManager::_migrateThread() {
_isActiveCV.notify_all();
}
+// The maximum number of documents to insert in a single batch during migration clone.
+// secondaryThrottle and migrateCloneInsertionBatchDelayMS apply between each batch.
+// 0 or negative values (the default) means no limit to batch size.
+// 1 corresponds to 3.4.16 (and earlier) behavior.
+MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchSize, int, 0)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 0) {
+ return Status(ErrorCodes::BadValue,
+ "migrateCloneInsertionBatchSize must not be negative");
+ }
+ return Status::OK();
+ });
+
+// Time in milliseconds between batches of insertions during migration clone.
+// This is in addition to any time spent waiting for replication (secondaryThrottle).
+// Defaults to 0, which means no wait.
+MONGO_EXPORT_SERVER_PARAMETER(migrateCloneInsertionBatchDelayMS, int, 0)
+ ->withValidator([](const int& newVal) {
+ if (newVal < 0) {
+ return Status(ErrorCodes::BadValue,
+ "migrateCloneInsertionBatchDelayMS must not be negative");
+ }
+ return Status::OK();
+ });
+
void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
invariant(isActive());
invariant(_sessionId);
@@ -773,52 +799,60 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) {
};
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
- int batchNumCloned = 0;
- int batchClonedBytes = 0;
-
- assertNotAborted(opCtx);
-
- write_ops::Insert insertOp(_nss);
- insertOp.getWriteCommandBase().setOrdered(true);
- insertOp.setDocuments([&] {
- std::vector<BSONObj> toInsert;
- for (const auto& doc : arr) {
- BSONObj docToClone = doc.Obj();
- toInsert.push_back(docToClone);
- batchNumCloned++;
- batchClonedBytes += docToClone.objsize();
- }
- return toInsert;
- }());
+ auto it = arr.begin();
+ while (it != arr.end()) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+ const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
+
+ assertNotAborted(opCtx);
+
+ write_ops::Insert insertOp(_nss);
+ insertOp.getWriteCommandBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ while (it != arr.end() &&
+ (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
+ const auto& doc = *it;
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ ++it;
+ }
+ return toInsert;
+ }());
- const WriteResult reply = performInserts(opCtx, insertOp, true);
+ const WriteResult reply = performInserts(opCtx, insertOp, true);
- for (unsigned long i = 0; i < reply.results.size(); ++i) {
- uassertStatusOKWithContext(reply.results[i],
- str::stream() << "Insert of "
- << redact(insertOp.getDocuments()[i])
- << " failed.");
- }
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOKWithContext(
+ reply.results[i],
+ str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
+ }
- {
- stdx::lock_guard<stdx::mutex> statsLock(_mutex);
- _numCloned += batchNumCloned;
- ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(
- batchNumCloned);
- _clonedBytes += batchClonedBytes;
- }
- if (_writeConcern.shouldWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- _writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- warning() << "secondaryThrottle on, but doc insert timed out; "
- "continuing";
- } else {
- uassertStatusOK(replStatus.status);
+ {
+ stdx::lock_guard<stdx::mutex> statsLock(_mutex);
+ _numCloned += batchNumCloned;
+ ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(
+ batchNumCloned);
+ _clonedBytes += batchClonedBytes;
+ }
+ if (_writeConcern.shouldWaitForOtherNodes()) {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ warning() << "secondaryThrottle on, but doc insert timed out; "
+ "continuing";
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
}
+
+ sleepmillis(migrateCloneInsertionBatchDelayMS.load());
}
};