diff options
Diffstat (limited to 'src/mongo/db/s/migration_destination_manager.cpp')
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 168 |
1 files changed, 56 insertions, 112 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index af0888d8992..fc5b7258138 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -345,24 +345,13 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, invariant(!_sessionId); invariant(!_scopedReceiveChunk); - _enableResumableRangeDeleter = !disableResumableRangeDeleter.load(); - _state = READY; _stateChangedCV.notify_all(); _errmsg = ""; - if (_enableResumableRangeDeleter) { - uassert(ErrorCodes::IllegalOperation, - str::stream() << "Did not receive migrationId in _recvChunkStart, but this node " - "has'disableResumableRangeDeleter=false'. Does the donor shard " - << cloneRequest.getFromShardId() - << " have 'disableResumableRangeDeleter=true'?", - cloneRequest.hasMigrationId()); - - _migrationId = cloneRequest.getMigrationId(); - _lsid = cloneRequest.getLsid(); - _txnNumber = cloneRequest.getTxnNumber(); - } + _migrationId = cloneRequest.getMigrationId(); + _lsid = cloneRequest.getLsid(); + _txnNumber = cloneRequest.getTxnNumber(); _nss = nss; _fromShard = cloneRequest.getFromShardId(); @@ -784,31 +773,21 @@ void MigrationDestinationManager::_migrateThread() { // txnNumber on this session while this node is still executing the recipient side //(which is important because otherwise, this node may create orphans after the // range deletion task on this node has been processed). - if (_enableResumableRangeDeleter) { - opCtx->setLogicalSessionId(_lsid); - opCtx->setTxnNumber(_txnNumber); - - MongoDOperationContextSession sessionTxnState(opCtx); - - auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - *opCtx->getTxnNumber(), - boost::none /* autocommit */, - boost::none /* startTransaction */); - _migrateDriver(opCtx); - } else { - _migrateDriver(opCtx); - } + opCtx->setLogicalSessionId(_lsid); + opCtx->setTxnNumber(_txnNumber); + + MongoDOperationContextSession sessionTxnState(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue(opCtx, + *opCtx->getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */); + _migrateDriver(opCtx); } catch (...) { _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); } - if (!_enableResumableRangeDeleter) { - if (getState() != DONE) { - _forgetPending(opCtx, ChunkRange(_min, _max)); - } - } - stdx::lock_guard<Latch> lk(_mutex); _sessionId.reset(); _collUuid.reset(); @@ -833,7 +812,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { "fromShard"_attr = _fromShard, "epoch"_attr = _epoch, "sessionId"_attr = *_sessionId, - "migrationId"_attr = _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); MoveTimingHelper timing( outerOpCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, _toShard, _fromShard); @@ -843,8 +822,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { if (initialState == ABORT) { LOGV2_ERROR(22013, "Migration abort requested before the migration started", - "migrationId"_attr = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); return; } @@ -861,67 +839,44 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { // 2. Ensure any data which might have been left orphaned in the range being moved has been // deleted. - if (_enableResumableRangeDeleter) { - while (migrationutil::checkForConflictingDeletions( - outerOpCtx, range, donorCollectionOptionsAndIndexes.uuid)) { - LOGV2(22001, - "Migration paused because the requested range {range} for {namespace} " - "overlaps with a range already scheduled for deletion", - "Migration paused because the requested range overlaps with a range already " - "scheduled for deletion", - "namespace"_attr = _nss.ns(), - "range"_attr = redact(range.toString()), - "migrationId"_attr = *_migrationId); - - auto status = CollectionShardingRuntime::waitForClean( - outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range); - - if (!status.isOK()) { - _setStateFail(redact(status.reason())); - return; - } - - outerOpCtx->sleepFor(Milliseconds(1000)); - } + while (migrationutil::checkForConflictingDeletions( + outerOpCtx, range, donorCollectionOptionsAndIndexes.uuid)) { + uassert(ErrorCodes::ResumableRangeDeleterDisabled, + "Failing migration because the disableResumableRangeDeleter server " + "parameter is set to true on the recipient shard, which contains range " + "deletion tasks overlapping the incoming range.", + !disableResumableRangeDeleter.load()); + + LOGV2(22001, + "Migration paused because the requested range {range} for {namespace} " + "overlaps with a range already scheduled for deletion", + "Migration paused because the requested range overlaps with a range already " + "scheduled for deletion", + "namespace"_attr = _nss.ns(), + "range"_attr = redact(range.toString()), + "migrationId"_attr = _migrationId.toBSON()); - RangeDeletionTask recipientDeletionTask(*_migrationId, - _nss, - donorCollectionOptionsAndIndexes.uuid, - _fromShard, - range, - CleanWhenEnum::kNow); - recipientDeletionTask.setPending(true); + auto status = CollectionShardingRuntime::waitForClean( + outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range); - migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask); - } else { - // Synchronously delete any data which might have been left orphaned in the range - // being moved, and wait for completion - - // Needed for _forgetPending to make sure the collection has the same UUID at the end of - // an aborted migration as at the beginning. Must be set before calling _notePending. - _collUuid = donorCollectionOptionsAndIndexes.uuid; - auto cleanupCompleteFuture = _notePending(outerOpCtx, range); - auto cleanupStatus = cleanupCompleteFuture.getNoThrow(outerOpCtx); - // Wait for the range deletion to report back. Swallow - // RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist error since the - // collection could either never exist or get dropped directly from the shard after the - // range deletion task got scheduled. - if (!cleanupStatus.isOK() && - cleanupStatus != - ErrorCodes::RangeDeletionAbandonedBecauseCollectionWithUUIDDoesNotExist) { - _setStateFail(redact(cleanupStatus.reason())); + if (!status.isOK()) { + _setStateFail(redact(status.reason())); return; } - // Wait for any other, overlapping queued deletions to drain - cleanupStatus = CollectionShardingRuntime::waitForClean( - outerOpCtx, _nss, donorCollectionOptionsAndIndexes.uuid, range); - if (!cleanupStatus.isOK()) { - _setStateFail(redact(cleanupStatus.reason())); - return; - } + outerOpCtx->sleepFor(Milliseconds(1000)); } + // Insert a pending range deletion task for the incoming range. + RangeDeletionTask recipientDeletionTask(_migrationId, + _nss, + donorCollectionOptionsAndIndexes.uuid, + _fromShard, + range, + CleanWhenEnum::kNow); + recipientDeletionTask.setPending(true); + migrationutil::persistRangeDeletionTaskLocally(outerOpCtx, recipientDeletionTask); + timing.done(1); migrateThreadHangAtStep1.pauseWhileSet(); } @@ -1015,9 +970,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { LOGV2_WARNING(22011, "secondaryThrottle on, but doc insert timed out; continuing", - "migrationId"_attr = _enableResumableRangeDeleter - ? _migrationId->toBSON() - : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); } else { uassertStatusOK(replStatus.status); } @@ -1093,8 +1046,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { if (getState() == ABORT) { LOGV2(22002, "Migration aborted while waiting for replication at catch up stage", - "migrationId"_attr = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); return; } @@ -1104,8 +1056,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { if (i > 100) { LOGV2(22003, "secondaries having hard time keeping up with migrate", - "migrationId"_attr = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); } sleepmillis(20); @@ -1127,8 +1078,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { LOGV2(22004, "Waiting for replication to catch up before entering critical section", - "migrationId"_attr = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication( opCtx, lastOpApplied, _writeConcern); @@ -1137,8 +1087,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { LOGV2(22005, "Chunk data replicated successfully.", - "migrationId"_attr = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); } { @@ -1177,8 +1126,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) { if (getState() == ABORT) { LOGV2(22006, "Migration aborted while transferring mods", - "migrationId"_attr = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); return; } @@ -1300,9 +1248,6 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, autoColl.getDb(), updatedDoc, &localDoc)) { - const auto migrationId = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj(); - // Exception will abort migration cleanly LOGV2_ERROR_OPTIONS( 16977, @@ -1313,7 +1258,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, "reloaded remote document", "localDoc"_attr = redact(localDoc), "remoteDoc"_attr = redact(updatedDoc), - "migrationId"_attr = migrationId); + "migrationId"_attr = _migrationId.toBSON()); } // We are in write lock here, so sure we aren't killing @@ -1344,8 +1289,7 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, "chunkMin"_attr = redact(_min), "chunkMax"_attr = redact(_max), "lastOpApplied"_attr = op, - "migrationId"_attr = - _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); } return false; } @@ -1356,7 +1300,7 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, "namespace"_attr = _nss.ns(), "chunkMin"_attr = redact(_min), "chunkMax"_attr = redact(_max), - "migrationId"_attr = _enableResumableRangeDeleter ? _migrationId->toBSON() : BSONObj()); + "migrationId"_attr = _migrationId.toBSON()); return true; } |