summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_source_manager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/migration_source_manager.cpp')
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp166
1 files changed, 83 insertions, 83 deletions
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 1de1af92316..5fb64445a75 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -76,7 +76,7 @@ MONGO_FP_DECLARE(migrationCommitNetworkError);
MONGO_FP_DECLARE(failMigrationCommit);
MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
-MigrationSourceManager::MigrationSourceManager(OperationContext* txn,
+MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
MoveChunkRequest request,
ConnectionString donorConnStr,
HostAndPort recipientHost)
@@ -84,7 +84,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn,
_donorConnStr(std::move(donorConnStr)),
_recipientHost(std::move(recipientHost)),
_startTime() {
- invariant(!txn->lockState()->isLocked());
+ invariant(!opCtx->lockState()->isLocked());
// Disallow moving a chunk to ourselves
uassert(ErrorCodes::InvalidOptions,
@@ -95,11 +95,11 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn,
<< " with expected collection version epoch" << _args.getVersionEpoch();
// Now that the collection is locked, snapshot the metadata and fetch the latest versions
- ShardingState* const shardingState = ShardingState::get(txn);
+ ShardingState* const shardingState = ShardingState::get(opCtx);
ChunkVersion shardVersion;
- Status refreshStatus = shardingState->refreshMetadataNow(txn, getNss(), &shardVersion);
+ Status refreshStatus = shardingState->refreshMetadataNow(opCtx, getNss(), &shardVersion);
if (!refreshStatus.isOK()) {
uasserted(refreshStatus.code(),
str::stream() << "cannot start migrate of chunk " << _args.toString()
@@ -117,10 +117,10 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* txn,
// Snapshot the committed metadata from the time the migration starts
{
- ScopedTransaction scopedXact(txn, MODE_IS);
- AutoGetCollection autoColl(txn, getNss(), MODE_IS);
+ ScopedTransaction scopedXact(opCtx, MODE_IS);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
- _collectionMetadata = CollectionShardingState::get(txn, getNss())->getMetadata();
+ _collectionMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata();
_keyPattern = _collectionMetadata->getKeyPattern();
}
@@ -163,34 +163,34 @@ NamespaceString MigrationSourceManager::getNss() const {
return _args.getNss();
}
-Status MigrationSourceManager::startClone(OperationContext* txn) {
- invariant(!txn->lockState()->isLocked());
+Status MigrationSourceManager::startClone(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
invariant(_state == kCreated);
- auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); });
-
- grid.catalogClient(txn)->logChange(txn,
- "moveChunk.start",
- getNss().ns(),
- BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey()
- << "from"
- << _args.getFromShardId()
- << "to"
- << _args.getToShardId()),
- ShardingCatalogClient::kMajorityWriteConcern);
+ auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
+
+ grid.catalogClient(opCtx)->logChange(
+ opCtx,
+ "moveChunk.start",
+ getNss().ns(),
+ BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
+ << _args.getFromShardId()
+ << "to"
+ << _args.getToShardId()),
+ ShardingCatalogClient::kMajorityWriteConcern);
_cloneDriver = stdx::make_unique<MigrationChunkClonerSourceLegacy>(
_args, _collectionMetadata->getKeyPattern(), _donorConnStr, _recipientHost);
{
// Register for notifications from the replication subsystem
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X);
+ ScopedTransaction scopedXact(opCtx, MODE_IX);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
- auto css = CollectionShardingState::get(txn, getNss().ns());
- css->setMigrationSourceManager(txn, this);
+ auto css = CollectionShardingState::get(opCtx, getNss().ns());
+ css->setMigrationSourceManager(opCtx, this);
}
- Status startCloneStatus = _cloneDriver->startClone(txn);
+ Status startCloneStatus = _cloneDriver->startClone(opCtx);
if (!startCloneStatus.isOK()) {
return startCloneStatus;
}
@@ -200,14 +200,14 @@ Status MigrationSourceManager::startClone(OperationContext* txn) {
return Status::OK();
}
-Status MigrationSourceManager::awaitToCatchUp(OperationContext* txn) {
- invariant(!txn->lockState()->isLocked());
+Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
invariant(_state == kCloning);
- auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); });
+ auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
// Block until the cloner deems it appropriate to enter the critical section.
Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate(
- txn, kMaxWaitToEnterCriticalSectionTimeout);
+ opCtx, kMaxWaitToEnterCriticalSectionTimeout);
if (!catchUpStatus.isOK()) {
return catchUpStatus;
}
@@ -217,13 +217,13 @@ Status MigrationSourceManager::awaitToCatchUp(OperationContext* txn) {
return Status::OK();
}
-Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) {
- invariant(!txn->lockState()->isLocked());
+Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
invariant(_state == kCloneCaughtUp);
- auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); });
+ auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
// Mark the shard as running critical operation, which requires recovery on crash
- Status status = ShardingStateRecovery::startMetadataOp(txn);
+ Status status = ShardingStateRecovery::startMetadataOp(opCtx);
if (!status.isOK()) {
return status;
}
@@ -232,11 +232,11 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) {
// The critical section must be entered with collection X lock in order to ensure there are
// no writes which could have entered and passed the version check just before we entered
// the crticial section, but managed to complete after we left it.
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X);
+ ScopedTransaction scopedXact(opCtx, MODE_IX);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
// Check that the collection has not been dropped or recreated since the migration began.
- auto css = CollectionShardingState::get(txn, getNss().ns());
+ auto css = CollectionShardingState::get(opCtx, getNss().ns());
auto metadata = css->getMetadata();
if (!metadata ||
(metadata->getCollVersion().epoch() != _collectionMetadata->getCollVersion().epoch())) {
@@ -261,13 +261,13 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* txn) {
return Status::OK();
}
-Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* txn) {
- invariant(!txn->lockState()->isLocked());
+Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
invariant(_state == kCriticalSection);
- auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); });
+ auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
// Tell the recipient shard to fetch the latest changes.
- Status commitCloneStatus = _cloneDriver->commitClone(txn);
+ Status commitCloneStatus = _cloneDriver->commitClone(opCtx);
if (MONGO_FAIL_POINT(failMigrationCommit) && commitCloneStatus.isOK()) {
commitCloneStatus = {ErrorCodes::InternalError,
@@ -284,10 +284,10 @@ Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* txn) {
return Status::OK();
}
-Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn) {
- invariant(!txn->lockState()->isLocked());
+Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opCtx) {
+ invariant(!opCtx->lockState()->isLocked());
invariant(_state == kCloneCompleted);
- auto scopedGuard = MakeGuard([&] { cleanupOnError(txn); });
+ auto scopedGuard = MakeGuard([&] { cleanupOnError(opCtx); });
ChunkType migratedChunkType;
migratedChunkType.setMin(_args.getMinKey());
@@ -319,7 +319,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn
auto commitChunkMigrationResponse =
grid.shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
- txn,
+ opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
builder.obj(),
@@ -342,8 +342,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn
"against the config server to obtain its latest optime"
<< causedBy(redact(migrationCommitStatus));
- Status status = grid.catalogClient(txn)->logChange(
- txn,
+ Status status = grid.catalogClient(opCtx)->logChange(
+ opCtx,
"moveChunk.validating",
getNss().ns(),
BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
@@ -376,13 +376,13 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn
// up so that subsequent requests will try to do a full refresh.
ChunkVersion unusedShardVersion;
Status refreshStatus =
- ShardingState::get(txn)->refreshMetadataNow(txn, getNss(), &unusedShardVersion);
+ ShardingState::get(opCtx)->refreshMetadataNow(opCtx, getNss(), &unusedShardVersion);
if (refreshStatus.isOK()) {
- ScopedTransaction scopedXact(txn, MODE_IS);
- AutoGetCollection autoColl(txn, getNss(), MODE_IS);
+ ScopedTransaction scopedXact(opCtx, MODE_IS);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
- auto refreshedMetadata = CollectionShardingState::get(txn, getNss())->getMetadata();
+ auto refreshedMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata();
if (!refreshedMetadata) {
return {ErrorCodes::NamespaceNotSharded,
@@ -402,10 +402,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn
log() << "Migration succeeded and updated collection version to "
<< refreshedMetadata->getCollVersion();
} else {
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X);
+ ScopedTransaction scopedXact(opCtx, MODE_IX);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
- CollectionShardingState::get(txn, getNss())->refreshMetadata(txn, nullptr);
+ CollectionShardingState::get(opCtx, getNss())->refreshMetadata(opCtx, nullptr);
log() << "Failed to refresh metadata after a failed commit attempt. Metadata was cleared "
"so it will get a full refresh when accessed again"
@@ -420,52 +420,52 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* txn
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection);
scopedGuard.Dismiss();
- _cleanup(txn);
-
- grid.catalogClient(txn)->logChange(txn,
- "moveChunk.commit",
- getNss().ns(),
- BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey()
- << "from"
- << _args.getFromShardId()
- << "to"
- << _args.getToShardId()),
- ShardingCatalogClient::kMajorityWriteConcern);
+ _cleanup(opCtx);
+
+ grid.catalogClient(opCtx)->logChange(
+ opCtx,
+ "moveChunk.commit",
+ getNss().ns(),
+ BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
+ << _args.getFromShardId()
+ << "to"
+ << _args.getToShardId()),
+ ShardingCatalogClient::kMajorityWriteConcern);
return Status::OK();
}
-void MigrationSourceManager::cleanupOnError(OperationContext* txn) {
+void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) {
if (_state == kDone) {
return;
}
- grid.catalogClient(txn)->logChange(txn,
- "moveChunk.error",
- getNss().ns(),
- BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey()
- << "from"
- << _args.getFromShardId()
- << "to"
- << _args.getToShardId()),
- ShardingCatalogClient::kMajorityWriteConcern);
-
- _cleanup(txn);
+ grid.catalogClient(opCtx)->logChange(
+ opCtx,
+ "moveChunk.error",
+ getNss().ns(),
+ BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
+ << _args.getFromShardId()
+ << "to"
+ << _args.getToShardId()),
+ ShardingCatalogClient::kMajorityWriteConcern);
+
+ _cleanup(opCtx);
}
-void MigrationSourceManager::_cleanup(OperationContext* txn) {
+void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
invariant(_state != kDone);
auto cloneDriver = [&]() {
// Unregister from the collection's sharding state
- ScopedTransaction scopedXact(txn, MODE_IX);
- AutoGetCollection autoColl(txn, getNss(), MODE_IX, MODE_X);
+ ScopedTransaction scopedXact(opCtx, MODE_IX);
+ AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X);
- auto css = CollectionShardingState::get(txn, getNss().ns());
+ auto css = CollectionShardingState::get(opCtx, getNss().ns());
// The migration source manager is not visible anymore after it is unregistered from the
// collection
- css->clearMigrationSourceManager(txn);
+ css->clearMigrationSourceManager(opCtx);
// Leave the critical section.
if (_critSecSignal) {
@@ -478,11 +478,11 @@ void MigrationSourceManager::_cleanup(OperationContext* txn) {
// Decrement the metadata op counter outside of the collection lock in order to hold it for as
// short as possible.
if (_state == kCriticalSection || _state == kCloneCompleted) {
- ShardingStateRecovery::endMetadataOp(txn);
+ ShardingStateRecovery::endMetadataOp(opCtx);
}
if (cloneDriver) {
- cloneDriver->cancelClone(txn);
+ cloneDriver->cancelClone(opCtx);
}
_state = kDone;