summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_destination_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp139
1 files changed, 113 insertions, 26 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 4cbcf6cb260..721fab26d3d 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -29,7 +29,6 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kShardingMigration
-#include "mongo/db/s/migration_batch_fetcher.h"
#include "mongo/platform/basic.h"
#include "mongo/db/s/migration_destination_manager.h"
@@ -412,8 +411,8 @@ void MigrationDestinationManager::report(BSONObjBuilder& b,
}
BSONObjBuilder bb(b.subobjStart("counts"));
- bb.append("cloned", _getNumCloned());
- bb.append("clonedBytes", _getNumBytesCloned());
+ bb.append("cloned", _numCloned);
+ bb.append("clonedBytes", _clonedBytes);
bb.append("catchup", _numCatchup);
bb.append("steady", _numSteady);
bb.done();
@@ -447,8 +446,6 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_lsid = cloneRequest.getLsid();
_txnNumber = cloneRequest.getTxnNumber();
- _parallelFetchersSupported = cloneRequest.parallelFetchingSupported();
-
_nss = nss;
_fromShard = cloneRequest.getFromShardId();
_fromShardConnString =
@@ -465,8 +462,8 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
_chunkMarkedPending = false;
- _migrationCloningProgress = std::make_shared<MigrationCloningProgressSharedState>();
-
+ _numCloned = 0;
+ _clonedBytes = 0;
_numCatchup = 0;
_numSteady = 0;
@@ -1341,32 +1338,122 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx,
_sessionMigration->start(opCtx->getServiceContext());
+ const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);
+
_chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
- {
- // Destructor of MigrationBatchFetcher is non-trivial. Therefore,
- // this scope has semantic significance.
- MigrationBatchFetcher<MigrationBatchInserter> fetcher{outerOpCtx,
- opCtx,
- _nss,
- *_sessionId,
- _writeConcern,
- _fromShard,
- range,
- *_migrationId,
- *_collectionUuid,
- _migrationCloningProgress,
- _parallelFetchersSupported};
- fetcher.fetchAndScheduleInsertion();
- }
- opCtx->checkForInterrupt();
- lastOpApplied = _migrationCloningProgress->getMaxOptime();
+ auto assertNotAborted = [&](OperationContext* opCtx) {
+ opCtx->checkForInterrupt();
+ outerOpCtx->checkForInterrupt();
+ uassert(50748, "Migration aborted while copying documents", getState() != kAbort);
+ };
+
+ auto insertBatchFn = [&](OperationContext* opCtx, BSONObj nextBatch) {
+ auto arr = nextBatch["objects"].Obj();
+ if (arr.isEmpty()) {
+ return false;
+ }
+ auto it = arr.begin();
+ while (it != arr.end()) {
+ int batchNumCloned = 0;
+ int batchClonedBytes = 0;
+ const int batchMaxCloned = migrateCloneInsertionBatchSize.load();
+
+ assertNotAborted(opCtx);
+
+ write_ops::InsertCommandRequest insertOp(_nss);
+ insertOp.getWriteCommandRequestBase().setOrdered(true);
+ insertOp.setDocuments([&] {
+ std::vector<BSONObj> toInsert;
+ while (it != arr.end() &&
+ (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
+ const auto& doc = *it;
+ BSONObj docToClone = doc.Obj();
+ toInsert.push_back(docToClone);
+ batchNumCloned++;
+ batchClonedBytes += docToClone.objsize();
+ ++it;
+ }
+ return toInsert;
+ }());
+
+ {
+ // Disable the schema validation (during document inserts and updates)
+ // and any internal validation for opCtx for performInserts()
+ DisableDocumentValidation documentValidationDisabler(
+ opCtx,
+ DocumentValidationSettings::kDisableSchemaValidation |
+ DocumentValidationSettings::kDisableInternalValidation);
+ const auto reply = write_ops_exec::performInserts(
+ opCtx, insertOp, OperationSource::kFromMigrate);
+ for (unsigned long i = 0; i < reply.results.size(); ++i) {
+ uassertStatusOKWithContext(reply.results[i],
+ str::stream() << "Insert of "
+ << insertOp.getDocuments()[i]
+ << " failed.");
+ }
+ // Revert to the original DocumentValidationSettings for opCtx
+ }
+
+ migrationutil::persistUpdatedNumOrphans(
+ opCtx, _migrationId.get(), *_collectionUuid, batchNumCloned);
+
+ {
+ stdx::lock_guard<Latch> statsLock(_mutex);
+ _numCloned += batchNumCloned;
+ ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(
+ batchNumCloned);
+ _clonedBytes += batchClonedBytes;
+ }
+ if (_writeConcern.needToWaitForOtherNodes()) {
+ runWithoutSession(outerOpCtx, [&] {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
+ opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ _writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ LOGV2_WARNING(
+ 22011,
+ "secondaryThrottle on, but doc insert timed out; continuing",
+ "migrationId"_attr = _migrationId->toBSON());
+ } else {
+ uassertStatusOK(replStatus.status);
+ }
+ });
+ }
+
+ sleepmillis(migrateCloneInsertionBatchDelayMS.load());
+ }
+ return true;
+ };
+
+ auto fetchBatchFn = [&](OperationContext* opCtx, BSONObj* nextBatch) {
+ auto commandResponse = uassertStatusOKWithContext(
+ fromShard->runCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ "admin",
+ migrateCloneRequest,
+ Shard::RetryPolicy::kNoRetry),
+ "_migrateClone failed: ");
+
+ uassertStatusOKWithContext(
+ Shard::CommandResponse::getEffectiveStatus(commandResponse),
+ "_migrateClone failed: ");
+
+ *nextBatch = commandResponse.response;
+ return nextBatch->getField("objects").Obj().isEmpty();
+ };
+
+ // If running on a replicated system, we'll need to flush the docs we cloned to the
+ // secondaries
+ lastOpApplied = fetchAndApplyBatch(opCtx, insertBatchFn, fetchBatchFn);
timing->done(4);
migrateThreadHangAtStep4.pauseWhileSet();
if (MONGO_unlikely(failMigrationOnRecipient.shouldFail())) {
- _setStateFail(str::stream() << "failing migration after cloning " << _getNumCloned()
+ _setStateFail(str::stream() << "failing migration after cloning " << _numCloned
<< " docs due to failMigrationOnRecipient failpoint");
return;
}