From 8816bdcfe2a6889c40f7130559f246ebe31e3569 Mon Sep 17 00:00:00 2001 From: Blake Oler Date: Thu, 28 Jun 2018 15:05:47 -0400 Subject: SERVER-25333 Clean up argument passing in the MigrationDestinationManager (cherry picked from commit e117102282d4d8bc0352429cf67b2d01754f4ad1) --- src/mongo/db/s/migration_destination_manager.cpp | 178 +++++++++++------------ src/mongo/db/s/migration_destination_manager.h | 45 ++---- 2 files changed, 98 insertions(+), 125 deletions(-) diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index cf6294788f1..8d93049a908 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -345,6 +345,10 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _max = max; _shardKeyPattern = shardKeyPattern; + _epoch = epoch; + + _writeConcern = writeConcern; + _chunkMarkedPending = false; _numCloned = 0; @@ -365,9 +369,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _sessionMigration = stdx::make_unique(fromShard, *_sessionId); - _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, epoch, writeConcern]() { - _migrateThread(min, max, shardKeyPattern, epoch, writeConcern); - }); + _migrateThreadHandle = stdx::thread([this]() { _migrateThread(); }); return Status::OK(); } @@ -496,11 +498,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio return Status::OK(); } -void MigrationDestinationManager::_migrateThread(BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - OID epoch, - WriteConcernOptions writeConcern) { +void MigrationDestinationManager::_migrateThread() { Client::initThread("migrateThread"); auto opCtx = Client::getCurrent()->makeOperationContext(); @@ -510,13 +508,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, } try { - _migrateDriver(opCtx.get(), min, max, shardKeyPattern, epoch, writeConcern); + _migrateDriver(opCtx.get()); } catch (...) { _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); } if (getState() != DONE && !MONGO_FAIL_POINT(failMigrationLeaveOrphans)) { - _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max)); + _forgetPending(opCtx.get(), ChunkRange(_min, _max)); } stdx::lock_guard lk(_mutex); @@ -525,26 +523,21 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, _isActiveCV.notify_all(); } -void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, - const OID& epoch, - const WriteConcernOptions& writeConcern) { +void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { invariant(isActive()); invariant(_sessionId); invariant(_scopedReceiveChunk); - invariant(!min.isEmpty()); - invariant(!max.isEmpty()); + invariant(!_min.isEmpty()); + invariant(!_max.isEmpty()); auto const serviceContext = opCtx->getServiceContext(); - log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max) - << " for collection " << _nss.ns() << " from " << _fromShard << " at epoch " - << epoch.toString() << " with session id " << *_sessionId; + log() << "Starting receiving end of migration of chunk " << redact(_min) << " -> " + << redact(_max) << " for collection " << _nss.ns() << " from " << _fromShard + << " at epoch " << _epoch.toString() << " with session id " << *_sessionId; MoveTimingHelper timing( - opCtx, "to", _nss.ns(), min, max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); + opCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); const auto initialState = getState(); @@ -664,22 +657,20 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, donorUUID.emplace(UUID::parse(donorOptions)); } - if (collection->uuid() != donorUUID) { - _setStateFailWarn( + uassert(ErrorCodes::InvalidUUID, str::stream() - << "Cannot receive chunk " - << redact(ChunkRange(min, max).toString()) - << " for collection " - << _nss.ns() - << " because we already have an identically named collection with UUID " - << (collection->uuid() ? collection->uuid()->toString() : "(none)") - << ", which differs from the donor's UUID " - << (donorUUID ? donorUUID->toString() : "(none)") - << ". Manually drop the collection on this shard if it contains data from a " - "previous incarnation of " - << _nss.ns()); - return; - } + << "Cannot receive chunk " + << ChunkRange(_min, _max).toString() + << " for collection " + << _nss.ns() + << " because we already have an identically named collection with UUID " + << (collection->uuid() ? collection->uuid()->toString() : "(none)") + << ", which differs from the donor's UUID " + << (donorUUID ? donorUUID->toString() : "(none)") + << ". Manually drop the collection on this shard if it contains data from " + "a previous incarnation of " + << _nss.ns(), + collection->uuid() == donorUUID); } else { // We do not have a collection by this name. Create the collection with the donor's // options. @@ -740,8 +731,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 2. Synchronously delete any data which might have been left orphaned in the range // being moved, and wait for completion - const ChunkRange footprint(min, max); - auto notification = _notePending(opCtx, _nss, epoch, footprint); + const ChunkRange footprint(_min, _max); + auto notification = _notePending(opCtx, footprint); // Wait for the range deletion to report back if (!notification.waitStatus(opCtx).isOK()) { _setStateFail(redact(notification.waitStatus(opCtx).reason())); @@ -749,7 +740,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } // Wait for any other, overlapping queued deletions to drain - auto status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint); + auto status = CollectionShardingState::waitForClean(opCtx, _nss, _epoch, footprint); if (!status.isOK()) { _setStateFail(redact(status.reason())); return; @@ -807,12 +798,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, _numCloned += batchNumCloned; _clonedBytes += batchClonedBytes; } - if (writeConcern.shouldWaitForOtherNodes()) { + if (_writeConcern.shouldWaitForOtherNodes()) { repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::ReplicationCoordinator::get(opCtx)->awaitReplication( opCtx, repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - writeConcern); + _writeConcern); if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { warning() << "secondaryThrottle on, but doc insert timed out; " "continuing"; @@ -877,7 +868,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, break; } - _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied); + _applyMigrateOp(opCtx, mods, &lastOpApplied); const int maxIterations = 3600 * 50; @@ -890,7 +881,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, return; } - if (opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) + if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern)) break; if (i > 100) { @@ -917,7 +908,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, log() << "Waiting for replication to catch up before entering critical section"; auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, lastOpApplied, writeConcern); + opCtx, lastOpApplied, _writeConcern); uassertStatusOKWithContext(awaitReplicationResult.status, awaitReplicationResult.status.codeString()); @@ -953,8 +944,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, auto mods = res.response; - if (mods["size"].number() > 0 && - _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied)) { + if (mods["size"].number() > 0 && _applyMigrateOp(opCtx, mods, &lastOpApplied)) { continue; } @@ -967,7 +957,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 1) The from side has told us that it has locked writes (COMMIT_START) // 2) We've checked at least one more time for un-transmitted mods if (getState() == COMMIT_START && transferAfterCommit == true) { - if (_flushPendingWrites(opCtx, _nss.ns(), min, max, lastOpApplied, writeConcern)) { + if (_flushPendingWrites(opCtx, lastOpApplied)) { break; } } @@ -999,10 +989,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, const BSONObj& xfer, repl::OpTime* lastOpApplied) { repl::OpTime dummy; @@ -1013,20 +999,25 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, bool didAnything = false; if (xfer["deleted"].isABSONObj()) { - Lock::DBLock dlk(opCtx, nss.db(), MODE_IX); - Helpers::RemoveSaver rs("moveChunk", nss.ns(), "removedDuring"); + boost::optional rs; + if (serverGlobalParams.moveParanoia) { + rs.emplace("moveChunk", _nss.ns(), "removedDuring"); + } BSONObjIterator i(xfer["deleted"].Obj()); // deleted documents while (i.more()) { - Lock::CollectionLock clk(opCtx->lockState(), nss.ns(), MODE_X); - OldClientContext ctx(opCtx, nss.ns()); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Collection " << _nss.ns() + << " was dropped in the middle of the migration", + autoColl.getCollection()); BSONObj id = i.next().Obj(); // do not apply delete if doc does not belong to the chunk being migrated BSONObj fullObj; - if (Helpers::findById(opCtx, ctx.db(), nss.ns(), id, fullObj)) { - if (!isInRange(fullObj, min, max, shardKeyPattern)) { + if (Helpers::findById(opCtx, autoColl.getDb(), _nss.ns(), id, fullObj)) { + if (!isInRange(fullObj, _min, _max, _shardKeyPattern)) { if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) { MONGO_UNREACHABLE; } @@ -1035,12 +1026,12 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, } if (serverGlobalParams.moveParanoia) { - rs.goingToDelete(fullObj).transitional_ignore(); + rs->goingToDelete(fullObj).transitional_ignore(); } deleteObjects(opCtx, - ctx.db() ? ctx.db()->getCollection(opCtx, nss) : nullptr, - nss, + autoColl.getCollection(), + _nss, id, true /* justOne */, false /* god */, @@ -1054,12 +1045,16 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, if (xfer["reload"].isABSONObj()) { // modified documents (insert/update) BSONObjIterator i(xfer["reload"].Obj()); while (i.more()) { - OldClientWriteContext cx(opCtx, nss.ns()); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Collection " << _nss.ns() + << " was dropped in the middle of the migration", + autoColl.getCollection()); BSONObj updatedDoc = i.next().Obj(); // do not apply insert/update if doc does not belong to the chunk being migrated - if (!isInRange(updatedDoc, min, max, shardKeyPattern)) { + if (!isInRange(updatedDoc, _min, _max, _shardKeyPattern)) { if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) { MONGO_UNREACHABLE; } @@ -1067,8 +1062,14 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, } BSONObj localDoc; - if (willOverrideLocalId( - opCtx, nss, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) { + if (willOverrideLocalId(opCtx, + _nss, + _min, + _max, + _shardKeyPattern, + autoColl.getDb(), + updatedDoc, + &localDoc)) { const std::string errMsg = str::stream() << "cannot migrate chunk, local document " << redact(localDoc) << " has same _id as reloaded remote document " << redact(updatedDoc); @@ -1079,7 +1080,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, } // We are in write lock here, so sure we aren't killing - Helpers::upsert(opCtx, nss.ns(), updatedDoc, true); + Helpers::upsert(opCtx, _nss.ns(), updatedDoc, true); *lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); didAnything = true; @@ -1090,41 +1091,35 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, } bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, - const std::string& ns, - BSONObj min, - BSONObj max, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern) { - if (!opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) { + const repl::OpTime& lastOpApplied) { + if (!opReplicatedEnough(opCtx, lastOpApplied, _writeConcern)) { repl::OpTime op(lastOpApplied); - OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << ns << "' " - << redact(min) << " -> " << redact(max) << " waiting for: " << op; + OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << _nss.ns() + << "' " << redact(_min) << " -> " << redact(_max) + << " waiting for: " << op; return false; } - log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << redact(min) - << " -> " << redact(max); + log() << "migrate commit succeeded flushing to secondaries for '" << _nss.ns() << "' " + << redact(_min) << " -> " << redact(_max); return true; } CollectionShardingState::CleanupNotification MigrationDestinationManager::_notePending( - OperationContext* opCtx, - NamespaceString const& nss, - OID const& epoch, - ChunkRange const& range) { + OperationContext* opCtx, ChunkRange const& range) { - AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X); + auto css = CollectionShardingState::get(opCtx, _nss); auto metadata = css->getMetadata(opCtx); // This can currently happen because drops aren't synchronized with in-migrations. The idea for // checking this here is that in the future we shouldn't have this problem. - if (!metadata || metadata->getCollVersion().epoch() != epoch) { + if (!metadata || metadata->getCollVersion().epoch() != _epoch) { return Status{ErrorCodes::StaleShardVersion, str::stream() << "not noting chunk " << redact(range.toString()) << " as pending because the epoch of " - << nss.ns() + << _nss.ns() << " changed"}; } @@ -1132,31 +1127,28 @@ CollectionShardingState::CleanupNotification MigrationDestinationManager::_noteP auto notification = css->beginReceive(range); if (notification.ready() && !notification.waitStatus(opCtx).isOK()) { return notification.waitStatus(opCtx).withContext( - str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString()) + str::stream() << "Collection " << _nss.ns() << " range " << redact(range.toString()) << " migration aborted"); } return notification; } -void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, - const NamespaceString& nss, - OID const& epoch, - ChunkRange const& range) { +void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, ChunkRange const& range) { if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.) return; // no documents can have been moved in, so there is nothing to clean up. } UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X); + auto css = CollectionShardingState::get(opCtx, _nss); auto metadata = css->getMetadata(opCtx); // This can currently happen because drops aren't synchronized with in-migrations. The idea for // checking this here is that in the future we shouldn't have this problem. - if (!metadata || metadata->getCollVersion().epoch() != epoch) { + if (!metadata || metadata->getCollVersion().epoch() != _epoch) { log() << "no need to forget pending chunk " << redact(range.toString()) - << " because the epoch for " << nss.ns() << " changed"; + << " because the epoch for " << _nss.ns() << " changed"; return; } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 3e3e3a4beb8..73d57649bff 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -144,33 +144,13 @@ private: /** * Thread which drives the migration apply process on the recipient side. */ - void _migrateThread(BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - OID epoch, - WriteConcernOptions writeConcern); - - void _migrateDriver(OperationContext* opCtx, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, - const OID& epoch, - const WriteConcernOptions& writeConcern); - - bool _applyMigrateOp(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, - const BSONObj& xfer, - repl::OpTime* lastOpApplied); - - bool _flushPendingWrites(OperationContext* opCtx, - const std::string& ns, - BSONObj min, - BSONObj max, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern); + void _migrateThread(); + + void _migrateDriver(OperationContext* opCtx); + + bool _applyMigrateOp(OperationContext* opCtx, const BSONObj& xfer, repl::OpTime* lastOpApplied); + + bool _flushPendingWrites(OperationContext* opCtx, const repl::OpTime& lastOpApplied); /** * Remembers a chunk range between 'min' and 'max' as a range which will have data migrated @@ -178,16 +158,13 @@ private: * it schedules deletion of any documents in the range, so that process must be seen to be * complete before migrating any new documents in. */ - CollectionShardingState::CleanupNotification _notePending(OperationContext*, - NamespaceString const&, - OID const&, - ChunkRange const&); + CollectionShardingState::CleanupNotification _notePending(OperationContext*, ChunkRange const&); /** * Stops tracking a chunk range between 'min' and 'max' that previously was having data * migrated into it, and schedules deletion of any such documents already migrated in. */ - void _forgetPending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&); + void _forgetPending(OperationContext*, ChunkRange const&); /** * Checks whether the MigrationDestinationManager is currently handling a migration by checking @@ -217,6 +194,10 @@ private: BSONObj _max; BSONObj _shardKeyPattern; + OID _epoch; + + WriteConcernOptions _writeConcern; + // Set to true once we have accepted the chunk as pending into our metadata. Used so that on // failure we can perform the appropriate cleanup. bool _chunkMarkedPending{false}; -- cgit v1.2.1