summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_destination_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp168
1 files changed, 56 insertions, 112 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index af0888d8992..fc5b7258138 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -345,24 +345,13 @@ Status MigrationDestinationManager::start(OperationContext* opCtx,
invariant(!_sessionId);
invariant(!_scopedReceiveChunk);
- _enableResumableRangeDeleter = !disableResumableRangeDeleter.load();
-
_state = READY;
_stateChangedCV.notify_all();
_errmsg = "";
- if (_enableResumableRangeDeleter) {
- uassert(ErrorCodes::IllegalOperation,
- str::stream() << "Did not receive migrationId in _recvChunkStart, but this node "
- "has'disableResumableRangeDeleter=false'. Does the donor shard "
- << cloneRequest.getFromShardId()
- << " have 'disableResumableRangeDeleter=true'?",
- cloneRequest.hasMigrationId());
-
- _migrationId = cloneRequest.getMigrationId();
- _lsid = cloneRequest.getLsid();
- _txnNumber = cloneRequest.getTxnNumber();
- }
+ _migrationId = cloneRequest.getMigrationId();
+ _lsid = cloneRequest.getLsid();
+ _txnNumber = cloneRequest.getTxnNumber();
_nss = nss;
_fromShard = cloneRequest.getFromShardId();
@@ -784,31 +773,21 @@ void MigrationDestinationManager::_migrateThread() {
// txnNumber on this session while this node is still executing the recipient side
//(which is important because otherwise, this node may create orphans after the
// range deletion task on this node has been processed).
- if (_enableResumableRangeDeleter) {
- opCtx->setLogicalSessionId(_lsid);
- opCtx->setTxnNumber(_txnNumber);
-
- MongoDOperationContextSession sessionTxnState(opCtx);
-
- auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(opCtx,
- *opCtx->getTxnNumber(),
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
- _migrateDriver(opCtx);
- } else {
- _migrateDriver(opCtx);
- }
+ opCtx->setLogicalSessionId(_lsid);
+ opCtx->setTxnNumber(_txnNumber);
+
+ MongoDOperationContextSession sessionTxnState(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx,
+ *opCtx->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+ _migrateDriver(opCtx);
} catch (...) {
_setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
}
- if (!_enableResumableRangeDeleter) {
- if (getState() != DONE) {
- _forgetPending(opCtx, ChunkRange(_min, _max));
- }
- }
-
stdx::lock_guard<Latch> lk(_mutex);
_sessionId.reset();
_collUuid.reset();
@@ -833,7 +812,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
"fromShard"_attr = _fromShard,
"epoch"_attr = _epoch,
"sessionId"_attr = *_sessionId,
- "migrationId"_attr = _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
MoveTimingHelper timing(
outerOpCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, _toShard, _fromShard);
@@ -843,8 +822,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
if (initialState == ABORT) {
LOGV2_ERROR(22013,
"Migration abort requested before the migration started",
- "migrationId"_attr =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
return;
}
@@ -861,67 +839,44 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
// 2. Ensure any data which might have been left orphaned in the range being moved has been
// deleted.
- if (_enableResumableRangeDeleter) {
- while (migrationutil::checkForConflictingDeletions(
- outerOpCtx, range, donorCollectionOptionsAndIndexes.uuid)) {
- LOGV2(22001,
- "Migration paused because the requested range {range} for {namespace} "
- "overlaps with a range already scheduled for deletion",
- "Migration paused because the requested range overlaps with a range already "
- "scheduled for deletion",
- "namespace"_attr = _nss.ns(),
- "range"_attr = redact(range.toString()),
- "migrationId"_attr = *_migrationId);
-
- auto status = CollectionShardingRuntime::waitForClean(
- outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
-
- if (!status.isOK()) {
- _setStateFail(redact(status.reason()));
- return;
- }
-
- outerOpCtx->sleepFor(Milliseconds(1000));
- }
+ while (migrationutil::checkForConflictingDeletions(
+ outerOpCtx, range, donorCollectionOptionsAndIndexes.uuid)) {
+ uassert(ErrorCodes::ResumableRangeDeleterDisabled,
+ "Failing migration because the disableResumableRangeDeleter server "
+ "parameter is set to true on the recipient shard, which contains range "
+ "deletion tasks overlapping the incoming range.",
+ !disableResumableRangeDeleter.load());
+
+ LOGV2(22001,
+ "Migration paused because the requested range {range} for {namespace} "
+ "overlaps with a range already scheduled for deletion",
+ "Migration paused because the requested range overlaps with a range already "
+ "scheduled for deletion",
+ "namespace"_attr = _nss.ns(),
+ "range"_attr = redact(range.toString()),
+ "migrationId"_attr = _migrationId.toBSON());
- RangeDeletionTask recipientDeletionTask(*_migrationId,
- _nss,
- donorCollectionOptionsAndIndexes.uuid,
- _fromShard,
- range,
- CleanWhenEnum::kNow);
- recipientDeletionTask.setPending(true);
+ auto status = CollectionShardingRuntime::waitForClean(
+ outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
- migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask);
- } else {
- // Synchronously delete any data which might have been left orphaned in the range
- // being moved, and wait for completion
-
- // Needed for _forgetPending to make sure the collection has the same UUID at the end of
- // an aborted migration as at the beginning. Must be set before calling _notePending.
- _collUuid = donorCollectionOptionsAndIndexes.uuid;
- auto cleanupCompleteFuture = _notePending(outerOpCtx, range);
- auto cleanupStatus = cleanupCompleteFuture.getNoThrow(outerOpCtx);
- // Wait for the range deletion to report back. Swallow
- // RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the
- // collection could either never exist or get dropped directly from the shard after the
- // range deletion task got scheduled.
- if (!cleanupStatus.isOK() &&
- cleanupStatus !=
- ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist) {
- _setStateFail(redact(cleanupStatus.reason()));
+ if (!status.isOK()) {
+ _setStateFail(redact(status.reason()));
return;
}
- // Wait for any other, overlapping queued deletions to drain
- cleanupStatus = CollectionShardingRuntime::waitForClean(
- outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range);
- if (!cleanupStatus.isOK()) {
- _setStateFail(redact(cleanupStatus.reason()));
- return;
- }
+ outerOpCtx->sleepFor(Milliseconds(1000));
}
+ // Insert a pending range deletion task for the incoming range.
+ RangeDeletionTask recipientDeletionTask(_migrationId,
+ _nss,
+ donorCollectionOptionsAndIndexes.uuid,
+ _fromShard,
+ range,
+ CleanWhenEnum::kNow);
+ recipientDeletionTask.setPending(true);
+ migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask);
+
timing.done(1);
migrateThreadHangAtStep1.pauseWhileSet();
}
@@ -1015,9 +970,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
LOGV2_WARNING(22011,
"secondaryThrottle on, but doc insert timed out; continuing",
- "migrationId"_attr = _enableResumableRangeDeleter
- ? _migrationId->toBSON()
- : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
} else {
uassertStatusOK(replStatus.status);
}
@@ -1093,8 +1046,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
if (getState() == ABORT) {
LOGV2(22002,
"Migration aborted while waiting for replication at catch up stage",
- "migrationId"_attr =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
return;
}
@@ -1104,8 +1056,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
if (i > 100) {
LOGV2(22003,
"secondaries having hard time keeping up with migrate",
- "migrationId"_attr =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
}
sleepmillis(20);
@@ -1127,8 +1078,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
LOGV2(22004,
"Waiting for replication to catch up before entering critical section",
- "migrationId"_attr =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
opCtx, lastOpApplied, _writeConcern);
@@ -1137,8 +1087,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
LOGV2(22005,
"Chunk data replicated successfully.",
- "migrationId"_attr =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
}
{
@@ -1177,8 +1126,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
if (getState() == ABORT) {
LOGV2(22006,
"Migration aborted while transferring mods",
- "migrationId"_attr =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
return;
}
@@ -1300,9 +1248,6 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
autoColl.getDb(),
updatedDoc,
&localDoc)) {
- const auto migrationId =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj();
-
// Exception will abort migration cleanly
LOGV2_ERROR_OPTIONS(
16977,
@@ -1313,7 +1258,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx,
"reloaded remote document",
"localDoc"_attr = redact(localDoc),
"remoteDoc"_attr = redact(updatedDoc),
- "migrationId"_attr = migrationId);
+ "migrationId"_attr = _migrationId.toBSON());
}
// We are in write lock here, so sure we aren't killing
@@ -1344,8 +1289,7 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
"chunkMin"_attr = redact(_min),
"chunkMax"_attr = redact(_max),
"lastOpApplied"_attr = op,
- "migrationId"_attr =
- _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
}
return false;
}
@@ -1356,7 +1300,7 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
"namespace"_attr = _nss.ns(),
"chunkMin"_attr = redact(_min),
"chunkMax"_attr = redact(_max),
- "migrationId"_attr = _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj());
+ "migrationId"_attr = _migrationId.toBSON());
return true;
}