diff options
author | Blake Oler <blake.oler@mongodb.com> | 2018-06-28 15:05:47 -0400 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2018-07-02 11:17:00 -0400 |
commit | e117102282d4d8bc0352429cf67b2d01754f4ad1 (patch) | |
tree | eb340ccc51bac2677670dc76e80c7422c1f5b443 /src | |
parent | 2f730baf61adae417517c149271576207ce3a210 (diff) | |
download | mongo-e117102282d4d8bc0352429cf67b2d01754f4ad1.tar.gz |
SERVER-25333 Clean up argument passing in the MigrationDestinationManager
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 139 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.h | 45 |
2 files changed, 72 insertions, 112 deletions
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 56addc132a9..674213d8d81 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -344,6 +344,10 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _max = max; _shardKeyPattern = shardKeyPattern; + _epoch = epoch; + + _writeConcern = writeConcern; + _chunkMarkedPending = false; _numCloned = 0; @@ -364,9 +368,7 @@ Status MigrationDestinationManager::start(OperationContext* opCtx, _sessionMigration = stdx::make_unique<SessionCatalogMigrationDestination>(fromShard, *_sessionId); - _migrateThreadHandle = stdx::thread([this, min, max, shardKeyPattern, epoch, writeConcern]() { - _migrateThread(min, max, shardKeyPattern, epoch, writeConcern); - }); + _migrateThreadHandle = stdx::thread([this]() { _migrateThread(); }); return Status::OK(); } @@ -495,11 +497,7 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio return Status::OK(); } -void MigrationDestinationManager::_migrateThread(BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - OID epoch, - WriteConcernOptions writeConcern) { +void MigrationDestinationManager::_migrateThread() { Client::initThread("migrateThread"); auto opCtx = Client::getCurrent()->makeOperationContext(); @@ -509,13 +507,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, } try { - _migrateDriver(opCtx.get(), min, max, shardKeyPattern, epoch, writeConcern); + _migrateDriver(opCtx.get()); } catch (...) { _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); } if (getState() != DONE && !MONGO_FAIL_POINT(failMigrationLeaveOrphans)) { - _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max)); + _forgetPending(opCtx.get(), ChunkRange(_min, _max)); } stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -524,26 +522,21 @@ void MigrationDestinationManager::_migrateThread(BSONObj min, _isActiveCV.notify_all(); } -void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, - const OID& epoch, - const WriteConcernOptions& writeConcern) { +void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx) { invariant(isActive()); invariant(_sessionId); invariant(_scopedReceiveChunk); - invariant(!min.isEmpty()); - invariant(!max.isEmpty()); + invariant(!_min.isEmpty()); + invariant(!_max.isEmpty()); auto const serviceContext = opCtx->getServiceContext(); - log() << "Starting receiving end of migration of chunk " << redact(min) << " -> " << redact(max) - << " for collection " << _nss.ns() << " from " << _fromShard << " at epoch " - << epoch.toString() << " with session id " << *_sessionId; + log() << "Starting receiving end of migration of chunk " << redact(_min) << " -> " + << redact(_max) << " for collection " << _nss.ns() << " from " << _fromShard + << " at epoch " << _epoch.toString() << " with session id " << *_sessionId; MoveTimingHelper timing( - opCtx, "to", _nss.ns(), min, max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); + opCtx, "to", _nss.ns(), _min, _max, 6 /* steps */, &_errmsg, ShardId(), ShardId()); const auto initialState = getState(); @@ -664,7 +657,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, uassert(ErrorCodes::InvalidUUID, str::stream() << "Cannot receive chunk " - << ChunkRange(min, max).toString() + << ChunkRange(_min, _max).toString() << " for collection " << _nss.ns() << " because we already have an identically named collection with UUID " @@ -732,8 +725,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 2. Synchronously delete any data which might have been left orphaned in the range // being moved, and wait for completion - const ChunkRange footprint(min, max); - auto notification = _notePending(opCtx, _nss, epoch, footprint); + const ChunkRange footprint(_min, _max); + auto notification = _notePending(opCtx, footprint); // Wait for the range deletion to report back if (!notification.waitStatus(opCtx).isOK()) { _setStateFail(redact(notification.waitStatus(opCtx).reason())); @@ -741,7 +734,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } // Wait for any other, overlapping queued deletions to drain - auto status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint); + auto status = CollectionShardingState::waitForClean(opCtx, _nss, _epoch, footprint); if (!status.isOK()) { _setStateFail(redact(status.reason())); return; @@ -782,9 +775,9 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, BSONObj localDoc; if (willOverrideLocalId(opCtx, _nss, - min, - max, - shardKeyPattern, + _min, + _max, + _shardKeyPattern, autoColl.getDb(), docToClone, &localDoc)) { @@ -804,12 +797,12 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, _numCloned++; _clonedBytes += docToClone.objsize(); } - if (writeConcern.shouldWaitForOtherNodes()) { + if (_writeConcern.shouldWaitForOtherNodes()) { repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::ReplicationCoordinator::get(opCtx)->awaitReplication( opCtx, repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(), - writeConcern); + _writeConcern); if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) { warning() << "secondaryThrottle on, but doc insert timed out; " "continuing"; @@ -875,7 +868,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, break; } - _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied); + _applyMigrateOp(opCtx, mods, &lastOpApplied); const int maxIterations = 3600 * 50; @@ -888,7 +881,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, return; } - if (opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) + if (opReplicatedEnough(opCtx, lastOpApplied, _writeConcern)) break; if (i > 100) { @@ -915,7 +908,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, log() << "Waiting for replication to catch up before entering critical section"; auto awaitReplicationResult = repl::ReplicationCoordinator::get(opCtx)->awaitReplication( - opCtx, lastOpApplied, writeConcern); + opCtx, lastOpApplied, _writeConcern); uassertStatusOKWithContext(awaitReplicationResult.status, awaitReplicationResult.status.codeString()); @@ -951,8 +944,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, auto mods = res.response; - if (mods["size"].number() > 0 && - _applyMigrateOp(opCtx, _nss, min, max, shardKeyPattern, mods, &lastOpApplied)) { + if (mods["size"].number() > 0 && _applyMigrateOp(opCtx, mods, &lastOpApplied)) { continue; } @@ -965,7 +957,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, // 1) The from side has told us that it has locked writes (COMMIT_START) // 2) We've checked at least one more time for un-transmitted mods if (getState() == COMMIT_START && transferAfterCommit == true) { - if (_flushPendingWrites(opCtx, _nss.ns(), min, max, lastOpApplied, writeConcern)) { + if (_flushPendingWrites(opCtx, lastOpApplied)) { break; } } @@ -997,10 +989,6 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx, } bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, const BSONObj& xfer, repl::OpTime* lastOpApplied) { invariant(lastOpApplied); @@ -1011,12 +999,12 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, if (xfer["deleted"].isABSONObj()) { boost::optional<Helpers::RemoveSaver> rs; if (serverGlobalParams.moveParanoia) { - rs.emplace("moveChunk", nss.ns(), "removedDuring"); + rs.emplace("moveChunk", _nss.ns(), "removedDuring"); } BSONObjIterator i(xfer["deleted"].Obj()); while (i.more()) { - AutoGetCollection autoColl(opCtx, nss, MODE_IX); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Collection " << _nss.ns() << " was dropped in the middle of the migration", @@ -1026,8 +1014,8 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, // Do not apply delete if doc does not belong to the chunk being migrated BSONObj fullObj; - if (Helpers::findById(opCtx, autoColl.getDb(), nss.ns(), id, fullObj)) { - if (!isInRange(fullObj, min, max, shardKeyPattern)) { + if (Helpers::findById(opCtx, autoColl.getDb(), _nss.ns(), id, fullObj)) { + if (!isInRange(fullObj, _min, _max, _shardKeyPattern)) { if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) { MONGO_UNREACHABLE; } @@ -1041,7 +1029,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, deleteObjects(opCtx, autoColl.getCollection(), - nss, + _nss, id, true /* justOne */, false /* god */, @@ -1056,7 +1044,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, if (xfer["reload"].isABSONObj()) { BSONObjIterator i(xfer["reload"].Obj()); while (i.more()) { - AutoGetCollection autoColl(opCtx, nss, MODE_IX); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX); uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Collection " << _nss.ns() << " was dropped in the middle of the migration", @@ -1065,7 +1053,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, BSONObj updatedDoc = i.next().Obj(); // do not apply insert/update if doc does not belong to the chunk being migrated - if (!isInRange(updatedDoc, min, max, shardKeyPattern)) { + if (!isInRange(updatedDoc, _min, _max, _shardKeyPattern)) { if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeOperation)) { MONGO_UNREACHABLE; } @@ -1074,10 +1062,10 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, BSONObj localDoc; if (willOverrideLocalId(opCtx, - nss, - min, - max, - shardKeyPattern, + _nss, + _min, + _max, + _shardKeyPattern, autoColl.getDb(), updatedDoc, &localDoc)) { @@ -1091,7 +1079,7 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, } // We are in write lock here, so sure we aren't killing - Helpers::upsert(opCtx, nss.ns(), updatedDoc, true); + Helpers::upsert(opCtx, _nss.ns(), updatedDoc, true); *lastOpApplied = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); didAnything = true; @@ -1102,41 +1090,35 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* opCtx, } bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx, - const std::string& ns, - BSONObj min, - BSONObj max, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern) { - if (!opReplicatedEnough(opCtx, lastOpApplied, writeConcern)) { + const repl::OpTime& lastOpApplied) { + if (!opReplicatedEnough(opCtx, lastOpApplied, _writeConcern)) { repl::OpTime op(lastOpApplied); - OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << ns << "' " - << redact(min) << " -> " << redact(max) << " waiting for: " << op; + OCCASIONALLY log() << "migrate commit waiting for a majority of slaves for '" << _nss.ns() + << "' " << redact(_min) << " -> " << redact(_max) + << " waiting for: " << op; return false; } - log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << redact(min) - << " -> " << redact(max); + log() << "migrate commit succeeded flushing to secondaries for '" << _nss.ns() << "' " + << redact(_min) << " -> " << redact(_max); return true; } CollectionShardingState::CleanupNotification MigrationDestinationManager::_notePending( - OperationContext* opCtx, - NamespaceString const& nss, - OID const& epoch, - ChunkRange const& range) { + OperationContext* opCtx, ChunkRange const& range) { - AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X); + auto css = CollectionShardingState::get(opCtx, _nss); auto metadata = css->getMetadata(opCtx); // This can currently happen because drops aren't synchronized with in-migrations. The idea for // checking this here is that in the future we shouldn't have this problem. - if (!metadata || metadata->getCollVersion().epoch() != epoch) { + if (!metadata || metadata->getCollVersion().epoch() != _epoch) { return Status{ErrorCodes::StaleShardVersion, str::stream() << "not noting chunk " << redact(range.toString()) << " as pending because the epoch of " - << nss.ns() + << _nss.ns() << " changed"}; } @@ -1144,31 +1126,28 @@ CollectionShardingState::CleanupNotification MigrationDestinationManager::_noteP auto notification = css->beginReceive(range); if (notification.ready() && !notification.waitStatus(opCtx).isOK()) { return notification.waitStatus(opCtx).withContext( - str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString()) + str::stream() << "Collection " << _nss.ns() << " range " << redact(range.toString()) << " migration aborted"); } return notification; } -void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, - const NamespaceString& nss, - OID const& epoch, - ChunkRange const& range) { +void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, ChunkRange const& range) { if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.) return; // no documents can have been moved in, so there is nothing to clean up. } UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - auto css = CollectionShardingState::get(opCtx, nss); + AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X); + auto css = CollectionShardingState::get(opCtx, _nss); auto metadata = css->getMetadata(opCtx); // This can currently happen because drops aren't synchronized with in-migrations. The idea for // checking this here is that in the future we shouldn't have this problem. - if (!metadata || metadata->getCollVersion().epoch() != epoch) { + if (!metadata || metadata->getCollVersion().epoch() != _epoch) { log() << "no need to forget pending chunk " << redact(range.toString()) - << " because the epoch for " << nss.ns() << " changed"; + << " because the epoch for " << _nss.ns() << " changed"; return; } diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h index 7fa514e107c..eedf65b14fb 100644 --- a/src/mongo/db/s/migration_destination_manager.h +++ b/src/mongo/db/s/migration_destination_manager.h @@ -144,33 +144,13 @@ private: /** * Thread which drives the migration apply process on the recipient side. */ - void _migrateThread(BSONObj min, - BSONObj max, - BSONObj shardKeyPattern, - OID epoch, - WriteConcernOptions writeConcern); - - void _migrateDriver(OperationContext* opCtx, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, - const OID& epoch, - const WriteConcernOptions& writeConcern); - - bool _applyMigrateOp(OperationContext* opCtx, - const NamespaceString& ns, - const BSONObj& min, - const BSONObj& max, - const BSONObj& shardKeyPattern, - const BSONObj& xfer, - repl::OpTime* lastOpApplied); - - bool _flushPendingWrites(OperationContext* opCtx, - const std::string& ns, - BSONObj min, - BSONObj max, - const repl::OpTime& lastOpApplied, - const WriteConcernOptions& writeConcern); + void _migrateThread(); + + void _migrateDriver(OperationContext* opCtx); + + bool _applyMigrateOp(OperationContext* opCtx, const BSONObj& xfer, repl::OpTime* lastOpApplied); + + bool _flushPendingWrites(OperationContext* opCtx, const repl::OpTime& lastOpApplied); /** * Remembers a chunk range between 'min' and 'max' as a range which will have data migrated @@ -178,16 +158,13 @@ private: * it schedules deletion of any documents in the range, so that process must be seen to be * complete before migrating any new documents in. */ - CollectionShardingState::CleanupNotification _notePending(OperationContext*, - NamespaceString const&, - OID const&, - ChunkRange const&); + CollectionShardingState::CleanupNotification _notePending(OperationContext*, ChunkRange const&); /** * Stops tracking a chunk range between 'min' and 'max' that previously was having data * migrated into it, and schedules deletion of any such documents already migrated in. */ - void _forgetPending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&); + void _forgetPending(OperationContext*, ChunkRange const&); /** * Checks whether the MigrationDestinationManager is currently handling a migration by checking @@ -217,6 +194,10 @@ private: BSONObj _max; BSONObj _shardKeyPattern; + OID _epoch; + + WriteConcernOptions _writeConcern; + // Set to true once we have accepted the chunk as pending into our metadata. Used so that on // failure we can perform the appropriate cleanup. bool _chunkMarkedPending{false}; |