diff options
Diffstat (limited to 'src/mongo/db/s/migration_source_manager.cpp')
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 166 |
1 files changed, 83 insertions, 83 deletions
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 1de1af92316..5fb64445a75 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -76,7 +76,7 @@ MONGO_FP_DECLARE(migrationCommitNetworkError); MONGO_FP_DECLARE(failMigrationCommit); MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection); -MigrationSourceManager::MigrationSourceManager(OperationContext* txn, +MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, MoveChunkRequest request, ConnectionString donorConnStr, HostAndPort recipientHost) @@ -84,7 +84,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, _donorConnStr(std::move(donorConnStr)), _recipientHost(std::move(recipientHost)), _startTime() { - invariant(!txn->lockState()->isLocked()); + invariant(!opCtx->lockState()->isLocked()); // Disallow moving a chunk to ourselves uassert(ErrorCodes::InvalidOptions, @@ -95,11 +95,11 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, << " with expected collection version epoch" << _args.getVersionEpoch(); // Now that the collection is locked, snapshot the metadata and fetch the latest versions - ShardingState* const shardingState = ShardingState::get(txn); + ShardingState* const shardingState = ShardingState::get(opCtx); ChunkVersion shardVersion; - Status refreshStatus = shardingState->refreshMetadataNow(txn, getNss(), &shardVersion); + Status refreshStatus = shardingState->refreshMetadataNow(opCtx, getNss(), &shardVersion); if (!refreshStatus.isOK()) { uasserted(refreshStatus.code(), str::stream() << "cannot start migrate of chunk " << _args.toString() @@ -117,10 +117,10 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn, // Snapshot the committed metadata from the time the migration starts { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, getNss(), MODE_IS); + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - _collectionMetadata = CollectionShardingState::get(txn, getNss())->getMetadata(); + _collectionMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata(); _keyPattern = _collectionMetadata->getKeyPattern(); } @@ -163,34 +163,34 @@ NamespaceString MigrationSourceManager::getNss() const { return _args.getNss(); } -Status MigrationSourceManager::startClone(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::startClone(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCreated); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); - - grid.catalogClient(txn)->logChange(txn, - "moveChunk.start", - getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() - << "from" - << _args.getFromShardId() - << "to" - << _args.getToShardId()), - ShardingCatalogClient::kMajorityWriteConcern); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); + + grid.catalogClient(opCtx)->logChange( + opCtx, + "moveChunk.start", + getNss().ns(), + BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" + << _args.getFromShardId() + << "to" + << _args.getToShardId()), + ShardingCatalogClient::kMajorityWriteConcern); _cloneDriver = stdx::make_unique<MigrationChunkClonerSourceLegacy>( _args, _collectionMetadata->getKeyPattern(), _donorConnStr, _recipientHost); { // Register for notifications from the replication subsystem - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - auto css = CollectionShardingState::get(txn, getNss().ns()); - css->setMigrationSourceManager(txn, this); + auto css = CollectionShardingState::get(opCtx, getNss().ns()); + css->setMigrationSourceManager(opCtx, this); } - Status startCloneStatus = _cloneDriver->startClone(txn); + Status startCloneStatus = _cloneDriver->startClone(opCtx); if (!startCloneStatus.isOK()) { return startCloneStatus; } @@ -200,14 +200,14 @@ Status MigrationSourceManager::startClone(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::awaitToCatchUp(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCloning); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); // Block until the cloner deems it appropriate to enter the critical section. Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate( - txn, kMaxWaitToEnterCriticalSectionTimeout); + opCtx, kMaxWaitToEnterCriticalSectionTimeout); if (!catchUpStatus.isOK()) { return catchUpStatus; } @@ -217,13 +217,13 @@ Status MigrationSourceManager::awaitToCatchUp(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCloneCaughtUp); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); // Mark the shard as running critical operation, which requires recovery on crash - Status status = ShardingStateRecovery::startMetadataOp(txn); + Status status = ShardingStateRecovery::startMetadataOp(opCtx); if (!status.isOK()) { return status; } @@ -232,11 +232,11 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) { // The critical section must be entered with collection X lock in order to ensure there are // no writes which could have entered and passed the version check just before we entered // the crticial section, but managed to complete after we left it. - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); // Check that the collection has not been dropped or recreated since the migration began. - auto css = CollectionShardingState::get(txn, getNss().ns()); + auto css = CollectionShardingState::get(opCtx, getNss().ns()); auto metadata = css->getMetadata(); if (!metadata || (metadata->getCollVersion().epoch() != _collectionMetadata->getCollVersion().epoch())) { @@ -261,13 +261,13 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCriticalSection); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); // Tell the recipient shard to fetch the latest changes. - Status commitCloneStatus = _cloneDriver->commitClone(txn); + Status commitCloneStatus = _cloneDriver->commitClone(opCtx); if (MONGO_FAIL_POINT(failMigrationCommit) && commitCloneStatus.isOK()) { commitCloneStatus = {ErrorCodes::InternalError, @@ -284,10 +284,10 @@ Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* txn) { return Status::OK(); } -Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn) { - invariant(!txn->lockState()->isLocked()); +Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opCtx) { + invariant(!opCtx->lockState()->isLocked()); invariant(_state == kCloneCompleted); - auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); }); + auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); }); ChunkType migratedChunkType; migratedChunkType.setMin(_args.getMinKey()); @@ -319,7 +319,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn auto commitChunkMigrationResponse = grid.shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - txn, + opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", builder.obj(), @@ -342,8 +342,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn "against the config server to obtain its latest optime" << causedBy(redact(migrationCommitStatus)); - Status status = grid.catalogClient(txn)->logChange( - txn, + Status status = grid.catalogClient(opCtx)->logChange( + opCtx, "moveChunk.validating", getNss().ns(), BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" @@ -376,13 +376,13 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn // up so that subsequent requests will try to do a full refresh. ChunkVersion unusedShardVersion; Status refreshStatus = - ShardingState::get(txn)->refreshMetadataNow(txn, getNss(), &unusedShardVersion); + ShardingState::get(opCtx)->refreshMetadataNow(opCtx, getNss(), &unusedShardVersion); if (refreshStatus.isOK()) { - ScopedTransaction scopedXact(txn, MODE_IS); - AutoGetCollection autoColl(txn, getNss(), MODE_IS); + ScopedTransaction scopedXact(opCtx, MODE_IS); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - auto refreshedMetadata = CollectionShardingState::get(txn, getNss())->getMetadata(); + auto refreshedMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata(); if (!refreshedMetadata) { return {ErrorCodes::NamespaceNotSharded, @@ -402,10 +402,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn log() << "Migration succeeded and updated collection version to " << refreshedMetadata->getCollVersion(); } else { - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - CollectionShardingState::get(txn, getNss())->refreshMetadata(txn, nullptr); + CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr); log() << "Failed to refresh metadata after a failed commit attempt. Metadata was cleared " "so it will get a full refresh when accessed again" @@ -420,52 +420,52 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection); scopedGuard.Dismiss(); - _cleanup(txn); - - grid.catalogClient(txn)->logChange(txn, - "moveChunk.commit", - getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() - << "from" - << _args.getFromShardId() - << "to" - << _args.getToShardId()), - ShardingCatalogClient::kMajorityWriteConcern); + _cleanup(opCtx); + + grid.catalogClient(opCtx)->logChange( + opCtx, + "moveChunk.commit", + getNss().ns(), + BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" + << _args.getFromShardId() + << "to" + << _args.getToShardId()), + ShardingCatalogClient::kMajorityWriteConcern); return Status::OK(); } -void MigrationSourceManager::cleanupOnError(OperationContext* txn) { +void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) { if (_state == kDone) { return; } - grid.catalogClient(txn)->logChange(txn, - "moveChunk.error", - getNss().ns(), - BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() - << "from" - << _args.getFromShardId() - << "to" - << _args.getToShardId()), - ShardingCatalogClient::kMajorityWriteConcern); - - _cleanup(txn); + grid.catalogClient(opCtx)->logChange( + opCtx, + "moveChunk.error", + getNss().ns(), + BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from" + << _args.getFromShardId() + << "to" + << _args.getToShardId()), + ShardingCatalogClient::kMajorityWriteConcern); + + _cleanup(opCtx); } -void MigrationSourceManager::_cleanup(OperationContext* txn) { +void MigrationSourceManager::_cleanup(OperationContext* opCtx) { invariant(_state != kDone); auto cloneDriver = [&]() { // Unregister from the collection's sharding state - ScopedTransaction scopedXact(txn, MODE_IX); - AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X); + ScopedTransaction scopedXact(opCtx, MODE_IX); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - auto css = CollectionShardingState::get(txn, getNss().ns()); + auto css = CollectionShardingState::get(opCtx, getNss().ns()); // The migration source manager is not visible anymore after it is unregistered from the // collection - css->clearMigrationSourceManager(txn); + css->clearMigrationSourceManager(opCtx); // Leave the critical section. if (_critSecSignal) { @@ -478,11 +478,11 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) { // Decrement the metadata op counter outside of the collection lock in order to hold it for as // short as possible. if (_state == kCriticalSection || _state == kCloneCompleted) { - ShardingStateRecovery::endMetadataOp(txn); + ShardingStateRecovery::endMetadataOp(opCtx); } if (cloneDriver) { - cloneDriver->cancelClone(txn); + cloneDriver->cancelClone(opCtx); } _state = kDone; |