summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2019-12-11 21:46:37 +0000
committerevergreen <evergreen@mongodb.com>2019-12-11 21:46:37 +0000
commit6dfa4748f017315300a32982b63fe71e8a68d42d (patch)
treebb089ddc8ccd769339bb1dfc2263f47020db86fe /src
parent57acc8b666b8c9dfc34eaf03c226ab26ac225781 (diff)
downloadmongo-6dfa4748f017315300a32982b63fe71e8a68d42d.tar.gz
SERVER-44911 Make index operations abort concurrent outgoing migrations
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/auth/auth_op_observer.h2
-rw-r--r--src/mongo/db/catalog/multi_index_block.cpp24
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h2
-rw-r--r--src/mongo/db/op_observer.h6
-rw-r--r--src/mongo/db/op_observer_impl.cpp31
-rw-r--r--src/mongo/db/op_observer_impl.h1
-rw-r--r--src/mongo/db/op_observer_noop.h3
-rw-r--r--src/mongo/db/op_observer_registry.h8
-rw-r--r--src/mongo/db/s/config_server_op_observer.h3
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp186
-rw-r--r--src/mongo/db/s/migration_source_manager.h29
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp10
-rw-r--r--src/mongo/db/s/shard_server_op_observer.cpp45
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h8
14 files changed, 226 insertions, 132 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index 931cb7e0747..7287876df0b 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -58,6 +58,8 @@ public:
const std::vector<BSONObj>& indexes,
bool fromMigrate) final {}
+ void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {}
+
void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
CollectionUUID collUUID,
diff --git a/src/mongo/db/catalog/multi_index_block.cpp b/src/mongo/db/catalog/multi_index_block.cpp
index f60bbc40147..6ee21034efc 100644
--- a/src/mongo/db/catalog/multi_index_block.cpp
+++ b/src/mongo/db/catalog/multi_index_block.cpp
@@ -186,29 +186,7 @@ MultiIndexBlock::OnInitFn MultiIndexBlock::kNoopOnInitFn =
MultiIndexBlock::OnInitFn MultiIndexBlock::makeTimestampedIndexOnInitFn(OperationContext* opCtx,
const Collection* coll) {
return [opCtx, ns = coll->ns()](std::vector<BSONObj>& specs) -> Status {
- // This function sets a timestamp for the initial catalog write when beginning an index
- // build, if necessary. There are four scenarios:
-
- // 1. A timestamp is already set -- replication application sets a timestamp ahead of time.
- // This could include the phase of initial sync where it applies oplog entries. Also,
- // primaries performing an index build via `applyOps` may have a wrapping commit timestamp.
- if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull())
- return Status::OK();
-
- // 2. If the node is initial syncing, we do not set a timestamp.
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- if (replCoord->isReplEnabled() && replCoord->getMemberState().startup2())
- return Status::OK();
-
- // 3. If the index build is on the local database, do not timestamp.
- if (ns.isLocal())
- return Status::OK();
-
- // 4. All other cases, we generate a timestamp by writing a no-op oplog entry. This is
- // better than using a ghost timestamp. Writing an oplog entry ensures this node is
- // primary.
- opCtx->getServiceContext()->getOpObserver()->onOpMessage(
- opCtx, BSON("msg" << std::string(str::stream() << "Creating indexes. Coll: " << ns)));
+ opCtx->getServiceContext()->getOpObserver()->onStartIndexBuildSinglePhase(opCtx, ns);
return Status::OK();
};
}
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 00c565d0dea..3220c2b3985 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -58,6 +58,8 @@ public:
const std::vector<BSONObj>& indexes,
bool fromMigrate) final {}
+ void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final {}
+
void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
CollectionUUID collUUID,
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index acb478fc68c..2c98b853cd9 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -101,6 +101,12 @@ public:
const std::vector<BSONObj>& indexes,
bool fromMigrate) = 0;
+ /**
+ * TODO (SERVER-45017): Remove when v4.4 becomes last-stable.
+ */
+ virtual void onStartIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) = 0;
+
virtual void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
CollectionUUID collUUID,
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 1b012b4a8ad..b47dabd1d01 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -281,6 +281,37 @@ void OpObserverImpl::onStartIndexBuild(OperationContext* opCtx,
logOperation(opCtx, &oplogEntry);
}
+void OpObserverImpl::onStartIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ // This function sets a timestamp for the initial catalog write when beginning an index
+ // build, if necessary. There are four scenarios:
+
+ // 1. A timestamp is already set -- replication application sets a timestamp ahead of time.
+ // This could include the phase of initial sync where it applies oplog entries. Also,
+ // primaries performing an index build via `applyOps` may have a wrapping commit timestamp.
+ if (!opCtx->recoveryUnit()->getCommitTimestamp().isNull())
+ return;
+
+ // 2. If the node is initial syncing, we do not set a timestamp.
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->isReplEnabled() && replCoord->getMemberState().startup2())
+ return;
+
+ // 3. If the index build is on the local database, do not timestamp.
+ if (nss.isLocal())
+ return;
+
+ // 4. All other cases, we generate a timestamp by writing a no-op oplog entry. This is
+ // better than using a ghost timestamp. Writing an oplog entry ensures this node is
+ // primary.
+ onInternalOpMessage(
+ opCtx,
+ {},
+ boost::none,
+ BSON("msg" << std::string(str::stream() << "Creating indexes. Coll: " << nss)),
+ boost::none);
+}
+
void OpObserverImpl::onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
CollectionUUID collUUID,
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index 1a512eb7f3a..b2d09b6d58c 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -53,6 +53,7 @@ public:
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
bool fromMigrate) final;
+ void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) final;
void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index b45fe089338..e22a3d9097b 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -48,6 +48,9 @@ public:
const std::vector<BSONObj>& indexes,
bool fromMigrate) override {}
+ void onStartIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) override {}
+
void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
CollectionUUID collUUID,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index 12370e0b561..8da173e1c91 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -80,6 +80,14 @@ public:
}
}
+ virtual void onStartIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) override {
+ ReservedTimes times{opCtx};
+ for (auto& o : _observers) {
+ o->onStartIndexBuildSinglePhase(opCtx, nss);
+ }
+ }
+
virtual void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
CollectionUUID collUUID,
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 89c420b356c..38f31c6caef 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -58,6 +58,9 @@ public:
const std::vector<BSONObj>& indexes,
bool fromMigrate) override {}
+ void onStartIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) override {}
+
void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
CollectionUUID collUUID,
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 42583b3baaa..31f451fa00d 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -134,11 +134,12 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
MoveChunkRequest request,
ConnectionString donorConnStr,
HostAndPort recipientHost)
- : _args(std::move(request)),
+ : _opCtx(opCtx),
+ _args(std::move(request)),
_donorConnStr(std::move(donorConnStr)),
_recipientHost(std::move(recipientHost)),
- _stats(ShardingStatistics::get(opCtx)) {
- invariant(!opCtx->lockState()->isLocked());
+ _stats(ShardingStatistics::get(_opCtx)) {
+ invariant(!_opCtx->lockState()->isLocked());
// Disallow moving a chunk to ourselves
uassert(ErrorCodes::InvalidOptions,
@@ -149,12 +150,12 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
<< " with expected collection version epoch " << _args.getVersionEpoch();
// Force refresh of the metadata to ensure we have the latest
- forceShardFilteringMetadataRefresh(opCtx, getNss());
+ forceShardFilteringMetadataRefresh(_opCtx, getNss());
// Snapshot the committed metadata from the time the migration starts
const auto collectionMetadataAndUUID = [&] {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS);
uassert(ErrorCodes::InvalidOptions,
"cannot move chunks for a collection that doesn't exist",
autoColl.getCollection());
@@ -163,7 +164,7 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
collectionUUID = autoColl.getCollection()->uuid();
auto optMetadata =
- CollectionShardingState::get(opCtx, getNss())->getCurrentMetadataIfKnown();
+ CollectionShardingState::get(_opCtx, getNss())->getCurrentMetadataIfKnown();
uassert(ErrorCodes::ConflictingOperationInProgress,
"The collection's sharding state was cleared by a concurrent operation",
optMetadata);
@@ -215,14 +216,14 @@ NamespaceString MigrationSourceManager::getNss() const {
return _args.getNss();
}
-Status MigrationSourceManager::startClone(OperationContext* opCtx) {
- invariant(!opCtx->lockState()->isLocked());
+Status MigrationSourceManager::startClone() {
+ invariant(!_opCtx->lockState()->isLocked());
invariant(_state == kCreated);
- auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); });
+ auto scopedGuard = makeGuard([&] { cleanupOnError(); });
_stats.countDonorMoveChunkStarted.addAndFetch(1);
- const Status logStatus = ShardingLogging::get(opCtx)->logChangeChecked(
- opCtx,
+ const Status logStatus = ShardingLogging::get(_opCtx)->logChangeChecked(
+ _opCtx,
"moveChunk.start",
getNss().ns(),
BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
@@ -234,11 +235,11 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) {
_cloneAndCommitTimer.reset();
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
auto replEnabled = replCoord->isReplEnabled();
{
- const auto metadata = _getCurrentMetadataAndCheckEpoch(opCtx);
+ const auto metadata = _getCurrentMetadataAndCheckEpoch();
// Having the metadata manager registered on the collection sharding state is what indicates
// that a chunk on that collection is being migrated. With an active migration, write
@@ -249,23 +250,23 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) {
boost::optional<AutoGetCollection> autoColl;
if (replEnabled) {
- autoColl.emplace(opCtx,
+ autoColl.emplace(_opCtx,
getNss(),
MODE_IX,
AutoGetCollection::ViewMode::kViewsForbidden,
- opCtx->getServiceContext()->getPreciseClockSource()->now() +
+ _opCtx->getServiceContext()->getPreciseClockSource()->now() +
Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
} else {
- autoColl.emplace(opCtx,
+ autoColl.emplace(_opCtx,
getNss(),
MODE_X,
AutoGetCollection::ViewMode::kViewsForbidden,
- opCtx->getServiceContext()->getPreciseClockSource()->now() +
+ _opCtx->getServiceContext()->getPreciseClockSource()->now() +
Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));
}
- auto csr = CollectionShardingRuntime::get(opCtx, getNss());
- auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(opCtx, csr);
+ auto csr = CollectionShardingRuntime::get(_opCtx, getNss());
+ auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
invariant(nullptr == std::exchange(msmForCsr(csr), this));
_state = kCloning;
@@ -275,12 +276,12 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) {
auto const readConcernArgs = repl::ReadConcernArgs(
replCoord->getMyLastAppliedOpTime(), repl::ReadConcernLevel::kLocalReadConcern);
- uassertStatusOK(waitForReadConcern(opCtx, readConcernArgs, false));
+ uassertStatusOK(waitForReadConcern(_opCtx, readConcernArgs, false));
setPrepareConflictBehaviorForReadConcern(
- opCtx, readConcernArgs, PrepareConflictBehavior::kEnforce);
+ _opCtx, readConcernArgs, PrepareConflictBehavior::kEnforce);
}
- Status startCloneStatus = _cloneDriver->startClone(opCtx);
+ Status startCloneStatus = _cloneDriver->startClone(_opCtx);
if (!startCloneStatus.isOK()) {
return startCloneStatus;
}
@@ -289,16 +290,16 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) {
return Status::OK();
}
-Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) {
- invariant(!opCtx->lockState()->isLocked());
+Status MigrationSourceManager::awaitToCatchUp() {
+ invariant(!_opCtx->lockState()->isLocked());
invariant(_state == kCloning);
- auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); });
+ auto scopedGuard = makeGuard([&] { cleanupOnError(); });
_stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
_cloneAndCommitTimer.reset();
// Block until the cloner deems it appropriate to enter the critical section.
Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate(
- opCtx, kMaxWaitToEnterCriticalSectionTimeout);
+ _opCtx, kMaxWaitToEnterCriticalSectionTimeout);
if (!catchUpStatus.isOK()) {
return catchUpStatus;
}
@@ -308,26 +309,26 @@ Status MigrationSourceManager::awaitToCatchUp(OperationContext* opCtx) {
return Status::OK();
}
-Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
- invariant(!opCtx->lockState()->isLocked());
+Status MigrationSourceManager::enterCriticalSection() {
+ invariant(!_opCtx->lockState()->isLocked());
invariant(_state == kCloneCaughtUp);
- auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); });
+ auto scopedGuard = makeGuard([&] { cleanupOnError(); });
_stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
_cloneAndCommitTimer.reset();
- _notifyChangeStreamsOnRecipientFirstChunk(opCtx, _getCurrentMetadataAndCheckEpoch(opCtx));
+ _notifyChangeStreamsOnRecipientFirstChunk(_getCurrentMetadataAndCheckEpoch());
// Mark the shard as running critical operation, which requires recovery on crash.
//
// NOTE: The 'migrateChunkToNewShard' oplog message written by the above call to
// '_notifyChangeStreamsOnRecipientFirstChunk' depends on this majority write to carry its local
// write to majority committed.
- Status status = ShardingStateRecovery::startMetadataOp(opCtx);
+ Status status = ShardingStateRecovery::startMetadataOp(_opCtx);
if (!status.isOK()) {
return status;
}
- _critSec.emplace(opCtx, _args.getNss());
+ _critSec.emplace(_opCtx, _args.getNss());
_state = kCriticalSection;
@@ -338,7 +339,7 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
// Note: this write must occur after the critSec flag is set, to ensure the secondary refresh
// will stall behind the flag.
Status signalStatus = updateShardCollectionsEntry(
- opCtx,
+ _opCtx,
BSON(ShardCollectionType::kNssFieldName << getNss().ns()),
BSONObj(),
BSON(ShardCollectionType::kEnterCriticalSectionCounterFieldName << 1),
@@ -356,13 +357,13 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) {
return Status::OK();
}
-Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) {
- invariant(!opCtx->lockState()->isLocked());
+Status MigrationSourceManager::commitChunkOnRecipient() {
+ invariant(!_opCtx->lockState()->isLocked());
invariant(_state == kCriticalSection);
- auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); });
+ auto scopedGuard = makeGuard([&] { cleanupOnError(); });
// Tell the recipient shard to fetch the latest changes.
- auto commitCloneStatus = _cloneDriver->commitClone(opCtx);
+ auto commitCloneStatus = _cloneDriver->commitClone(_opCtx);
if (MONGO_unlikely(failMigrationCommit.shouldFail()) && commitCloneStatus.isOK()) {
commitCloneStatus = {ErrorCodes::InternalError,
@@ -380,10 +381,10 @@ Status MigrationSourceManager::commitChunkOnRecipient(OperationContext* opCtx) {
return Status::OK();
}
-Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opCtx) {
- invariant(!opCtx->lockState()->isLocked());
+Status MigrationSourceManager::commitChunkMetadataOnConfig() {
+ invariant(!_opCtx->lockState()->isLocked());
invariant(_state == kCloneCompleted);
- auto scopedGuard = makeGuard([&] { cleanupOnError(opCtx); });
+ auto scopedGuard = makeGuard([&] { cleanupOnError(); });
// If we have chunks left on the FROM shard, bump the version of one of them as well. This will
// change the local collection major version, which indicates to other processes that the chunk
@@ -391,7 +392,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
BSONObjBuilder builder;
{
- const auto metadata = _getCurrentMetadataAndCheckEpoch(opCtx);
+ const auto metadata = _getCurrentMetadataAndCheckEpoch();
ChunkType migratedChunkType;
migratedChunkType.setMin(_args.getMinKey());
@@ -404,7 +405,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
_args.getToShardId(),
migratedChunkType,
metadata->getCollVersion(),
- LogicalClock::get(opCtx)->getClusterTime().asTimestamp());
+ LogicalClock::get(_opCtx)->getClusterTime().asTimestamp());
builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON());
}
@@ -416,8 +417,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
Timer t;
auto commitChunkMigrationResponse =
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
- opCtx,
+ Grid::get(_opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ _opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
"admin",
builder.obj(),
@@ -439,8 +440,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
"against the config server to obtain its latest optime"
<< causedBy(redact(migrationCommitStatus));
- Status status = ShardingLogging::get(opCtx)->logChangeChecked(
- opCtx,
+ Status status = ShardingLogging::get(_opCtx)->logChangeChecked(
+ _opCtx,
"moveChunk.validating",
getNss().ns(),
BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
@@ -462,10 +463,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
// metadata for this collection, forcing subsequent callers to do a full refresh. Check if
// this node can accept writes for this collection as a proxy for it being primary.
if (!status.isOK()) {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IX);
- if (!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, getNss())) {
- CollectionShardingRuntime::get(opCtx, getNss())->clearFilteringMetadata();
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX);
+ if (!repl::ReplicationCoordinator::get(_opCtx)->canAcceptWritesFor(_opCtx, getNss())) {
+ CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata();
uassertStatusOK(status.withContext(
str::stream() << "Unable to verify migration commit for chunk: "
<< redact(_args.toString())
@@ -488,12 +489,12 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
// section. It is okay if the refresh fails because that will cause the metadata to be cleared
// and subsequent callers will try to do a full refresh.
try {
- forceShardFilteringMetadataRefresh(opCtx, getNss(), true);
+ forceShardFilteringMetadataRefresh(_opCtx, getNss(), true);
} catch (const DBException& ex) {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IX);
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX);
- CollectionShardingRuntime::get(opCtx, getNss())->clearFilteringMetadata();
+ CollectionShardingRuntime::get(_opCtx, getNss())->clearFilteringMetadata();
log() << "Failed to refresh metadata after a "
<< (migrationCommitStatus.isOK() ? "failed commit attempt" : "successful commit")
@@ -509,7 +510,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
<< ex.toString() << "' after commit failed");
}
- const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(opCtx);
+ const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch();
if (refreshedMetadata->keyBelongsToMe(_args.getMinKey())) {
// This condition may only happen if the migration commit has failed for any reason
@@ -517,7 +518,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
severe() << "The migration commit succeeded, but the new chunk placement was not "
"reflected after metadata refresh, which is an indication of an "
"afterOpTime bug.";
- severe() << "The current config server opTime is " << Grid::get(opCtx)->configOpTime();
+ severe() << "The current config server opTime is " << Grid::get(_opCtx)->configOpTime();
severe() << "The commit response came from "
<< redact(commitChunkMigrationResponse.getValue().hostAndPort->toString())
<< " and contained";
@@ -543,10 +544,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
// Exit the critical section and ensure that all the necessary state is fully persisted before
// scheduling orphan cleanup.
- _cleanup(opCtx);
+ _cleanup();
- ShardingLogging::get(opCtx)->logChange(
- opCtx,
+ ShardingLogging::get(_opCtx)->logChange(
+ _opCtx,
"moveChunk.commit",
getNss().ns(),
BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
@@ -559,14 +560,14 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
auto notification = [&] {
auto const whenToClean = _args.getWaitForDelete() ? CollectionShardingRuntime::kNow
: CollectionShardingRuntime::kDelayed;
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
- return CollectionShardingRuntime::get(opCtx, getNss())->cleanUpRange(range, whenToClean);
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS);
+ return CollectionShardingRuntime::get(_opCtx, getNss())->cleanUpRange(range, whenToClean);
}();
if (!MONGO_unlikely(doNotRefreshRecipientAfterCommit.shouldFail())) {
// Best-effort make the recipient refresh its routing table to the new collection version.
- refreshRecipientRoutingTable(opCtx,
+ refreshRecipientRoutingTable(_opCtx,
getNss(),
_args.getToShardId(),
_recipientHost,
@@ -580,7 +581,7 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
if (_args.getWaitForDelete()) {
log() << "Waiting for cleanup of " << getNss().ns() << " range "
<< redact(range.toString());
- auto deleteStatus = notification.waitStatus(opCtx);
+ auto deleteStatus = notification.waitStatus(_opCtx);
if (!deleteStatus.isOK()) {
return {ErrorCodes::OrphanedRangeCleanUpFailed,
orphanedRangeCleanUpErrMsg + redact(deleteStatus)};
@@ -588,9 +589,9 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
return Status::OK();
}
- if (notification.ready() && !notification.waitStatus(opCtx).isOK()) {
+ if (notification.ready() && !notification.waitStatus(_opCtx).isOK()) {
return {ErrorCodes::OrphanedRangeCleanUpFailed,
- orphanedRangeCleanUpErrMsg + redact(notification.waitStatus(opCtx))};
+ orphanedRangeCleanUpErrMsg + redact(notification.waitStatus(_opCtx))};
} else {
log() << "Leaving cleanup of " << getNss().ns() << " range " << redact(range.toString())
<< " to complete in background";
@@ -600,13 +601,13 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
return Status::OK();
}
-void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) {
+void MigrationSourceManager::cleanupOnError() {
if (_state == kDone) {
return;
}
- ShardingLogging::get(opCtx)->logChange(
- opCtx,
+ ShardingLogging::get(_opCtx)->logChange(
+ _opCtx,
"moveChunk.error",
getNss().ns(),
BSON("min" << _args.getMinKey() << "max" << _args.getMaxKey() << "from"
@@ -614,19 +615,22 @@ void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) {
ShardingCatalogClient::kMajorityWriteConcern);
try {
- _cleanup(opCtx);
+ _cleanup();
} catch (const ExceptionForCat<ErrorCategory::NotMasterError>& ex) {
warning() << "Failed to clean up migration: " << redact(_args.toString())
<< "due to: " << redact(ex);
}
}
-ScopedCollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch(
- OperationContext* opCtx) {
+void MigrationSourceManager::abortDueToConflictingIndexOperation() {
+ _opCtx->markKilled();
+}
+
+ScopedCollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() {
auto metadata = [&] {
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
- auto* const css = CollectionShardingRuntime::get(opCtx, getNss());
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS);
+ auto* const css = CollectionShardingRuntime::get(_opCtx, getNss());
const auto optMetadata = css->getCurrentMetadataIfKnown();
uassert(ErrorCodes::ConflictingOperationInProgress,
@@ -647,7 +651,7 @@ ScopedCollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoc
}
void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
- OperationContext* opCtx, const ScopedCollectionMetadata& metadata) {
+ const ScopedCollectionMetadata& metadata) {
// If this is not the first donation, there is nothing to be done
if (metadata->getChunkManager()->getVersion(_args.getToShardId()).isSet())
return;
@@ -662,28 +666,28 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
<< "from" << _args.getFromShardId() << "to"
<< _args.getToShardId());
- auto const serviceContext = opCtx->getClient()->getServiceContext();
+ auto const serviceContext = _opCtx->getClient()->getServiceContext();
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, NamespaceString::kRsOplogNamespace, MODE_IX);
writeConflictRetry(
- opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
- WriteUnitOfWork uow(opCtx);
+ _opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ WriteUnitOfWork uow(_opCtx);
serviceContext->getOpObserver()->onInternalOpMessage(
- opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message);
+ _opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message);
uow.commit();
});
}
-void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
+void MigrationSourceManager::_cleanup() {
invariant(_state != kDone);
auto cloneDriver = [&]() {
// Unregister from the collection's sharding state and exit the migration critical section.
- UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetCollection autoColl(opCtx, getNss(), MODE_IX);
- auto* const csr = CollectionShardingRuntime::get(opCtx, getNss());
- auto csrLock = CollectionShardingState::CSRLock::lockExclusive(opCtx, csr);
+ UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
+ AutoGetCollection autoColl(_opCtx, getNss(), MODE_IX);
+ auto* const csr = CollectionShardingRuntime::get(_opCtx, getNss());
+ auto csrLock = CollectionShardingState::CSRLock::lockExclusive(_opCtx, csr);
if (_state != kCreated) {
invariant(msmForCsr(csr));
@@ -705,7 +709,7 @@ void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
// outside of the collection X lock
if (cloneDriver) {
- cloneDriver->cancelClone(opCtx);
+ cloneDriver->cancelClone(_opCtx);
}
if (_state == kCriticalSection || _state == kCloneCompleted) {
@@ -723,11 +727,11 @@ void MigrationSourceManager::_cleanup(OperationContext* opCtx) {
// possible that the persisted metadata is rolled back after step down, but the write which
// cleared the 'inMigration' flag is not, a secondary node will report itself at an older
// shard version.
- CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, getNss());
+ CatalogCacheLoader::get(_opCtx).waitForCollectionFlush(_opCtx, getNss());
// Clear the 'minOpTime recovery' document so that the next time a node from this shard
// becomes a primary, it won't have to recover the config server optime.
- ShardingStateRecovery::endMetadataOp(opCtx);
+ ShardingStateRecovery::endMetadataOp(_opCtx);
}
_state = kDone;
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index cf0a14fd9df..fbb0c73fab5 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -113,7 +113,7 @@ public:
* Expected state: kCreated
* Resulting state: kCloning on success, kDone on failure
*/
- Status startClone(OperationContext* opCtx);
+ Status startClone();
/**
* Waits for the cloning to catch up sufficiently so we won't have to stay in the critical
@@ -123,7 +123,7 @@ public:
* Expected state: kCloning
* Resulting state: kCloneCaughtUp on success, kDone on failure
*/
- Status awaitToCatchUp(OperationContext* opCtx);
+ Status awaitToCatchUp();
/**
* Waits for the active clone operation to catch up and enters critical section. Once this call
@@ -134,7 +134,7 @@ public:
* Expected state: kCloneCaughtUp
* Resulting state: kCriticalSection on success, kDone on failure
*/
- Status enterCriticalSection(OperationContext* opCtx);
+ Status enterCriticalSection();
/**
* Tells the recipient of the chunk to commit the chunk contents, which it received.
@@ -142,7 +142,7 @@ public:
* Expected state: kCriticalSection
* Resulting state: kCloneCompleted on success, kDone on failure
*/
- Status commitChunkOnRecipient(OperationContext* opCtx);
+ Status commitChunkOnRecipient();
/**
* Tells the recipient shard to fetch the latest portion of data from the donor and to commit it
@@ -156,7 +156,7 @@ public:
* Expected state: kCloneCompleted
* Resulting state: kDone
*/
- Status commitChunkMetadataOnConfig(OperationContext* opCtx);
+ Status commitChunkMetadataOnConfig();
/**
* May be called at any time. Unregisters the migration source manager from the collection,
@@ -166,7 +166,13 @@ public:
* Expected state: Any
* Resulting state: kDone
*/
- void cleanupOnError(OperationContext* opCtx);
+ void cleanupOnError();
+
+ /**
+ * Aborts the migration after observing a concurrent index operation by marking its operation
+ * context as killed.
+ */
+ void abortDueToConflictingIndexOperation();
/**
* Returns the cloner which is being used for this migration. This value is available only if
@@ -191,21 +197,24 @@ private:
// comments explaining the various state transitions.
enum State { kCreated, kCloning, kCloneCaughtUp, kCriticalSection, kCloneCompleted, kDone };
- ScopedCollectionMetadata _getCurrentMetadataAndCheckEpoch(OperationContext* opCtx);
+ ScopedCollectionMetadata _getCurrentMetadataAndCheckEpoch();
/**
* If this donation moves the first chunk to the recipient (i.e., the recipient didn't have any
* chunks), this function writes a no-op message to the oplog, so that change stream will notice
* that and close the cursor in order to notify mongos to target the new shard as well.
*/
- void _notifyChangeStreamsOnRecipientFirstChunk(OperationContext* opCtx,
- const ScopedCollectionMetadata& metadata);
+ void _notifyChangeStreamsOnRecipientFirstChunk(const ScopedCollectionMetadata& metadata);
/**
* Called when any of the states fails. May only be called once and will put the migration
* manager into the kDone state.
*/
- void _cleanup(OperationContext* opCtx);
+ void _cleanup();
+
+ // This is the opCtx of the moveChunk request that constructed the MigrationSourceManager.
+ // The caller must guarantee it outlives the MigrationSourceManager.
+ OperationContext* const _opCtx;
// The parameters to the moveChunk command
const MoveChunkRequest _args;
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index 8afc52f0a7f..1fa019e7ebd 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -225,20 +225,20 @@ private:
moveTimingHelper.done(2);
moveChunkHangAtStep2.pauseWhileSet();
- uassertStatusOKWithWarning(migrationSourceManager.startClone(opCtx));
+ uassertStatusOKWithWarning(migrationSourceManager.startClone());
moveTimingHelper.done(3);
moveChunkHangAtStep3.pauseWhileSet();
- uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp(opCtx));
+ uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp());
moveTimingHelper.done(4);
moveChunkHangAtStep4.pauseWhileSet();
- uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection(opCtx));
- uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient(opCtx));
+ uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection());
+ uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient());
moveTimingHelper.done(5);
moveChunkHangAtStep5.pauseWhileSet();
- uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig(opCtx));
+ uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig());
moveTimingHelper.done(6);
moveChunkHangAtStep6.pauseWhileSet();
}
diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp
index f8df5dc8e6b..b58479cbcde 100644
--- a/src/mongo/db/s/shard_server_op_observer.cpp
+++ b/src/mongo/db/s/shard_server_op_observer.cpp
@@ -188,6 +188,19 @@ void incrementChunkOnInsertOrUpdate(OperationContext* opCtx,
}
}
+/**
+ * Aborts any ongoing migration for the given namespace. Should only be called when observing index
+ * operations.
+ */
+void abortOngoingMigration(OperationContext* opCtx, const NamespaceString nss) {
+ auto* const csr = CollectionShardingRuntime::get(opCtx, nss);
+ auto csrLock = CollectionShardingRuntime::CSRLock::lockShared(opCtx, csr);
+ auto msm = MigrationSourceManager::get(csr, csrLock);
+ if (msm) {
+ msm->abortDueToConflictingIndexOperation();
+ }
+}
+
} // namespace
ShardServerOpObserver::ShardServerOpObserver() = default;
@@ -429,4 +442,36 @@ repl::OpTime ShardServerOpObserver::onDropCollection(OperationContext* opCtx,
return {};
}
+void ShardServerOpObserver::onStartIndexBuild(OperationContext* opCtx,
+ const NamespaceString& nss,
+ CollectionUUID collUUID,
+ const UUID& indexBuildUUID,
+ const std::vector<BSONObj>& indexes,
+ bool fromMigrate) {
+ abortOngoingMigration(opCtx, nss);
+};
+
+void ShardServerOpObserver::onStartIndexBuildSinglePhase(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ abortOngoingMigration(opCtx, nss);
+}
+
+void ShardServerOpObserver::onDropIndex(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ const std::string& indexName,
+ const BSONObj& indexInfo) {
+ abortOngoingMigration(opCtx, nss);
+};
+
+void ShardServerOpObserver::onCollMod(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OptionalCollectionUUID uuid,
+ const BSONObj& collModCmd,
+ const CollectionOptions& oldCollOptions,
+ boost::optional<TTLCollModInfo> ttlInfo) {
+ abortOngoingMigration(opCtx, nss);
+};
+
+
} // namespace mongo
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 08822c4e33a..b834fb751f7 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -57,7 +57,9 @@ public:
CollectionUUID collUUID,
const UUID& indexBuildUUID,
const std::vector<BSONObj>& indexes,
- bool fromMigrate) override {}
+ bool fromMigrate) override;
+
+ void onStartIndexBuildSinglePhase(OperationContext* opCtx, const NamespaceString& nss) override;
void onCommitIndexBuild(OperationContext* opCtx,
const NamespaceString& nss,
@@ -112,7 +114,7 @@ public:
OptionalCollectionUUID uuid,
const BSONObj& collModCmd,
const CollectionOptions& oldCollOptions,
- boost::optional<TTLCollModInfo> ttlInfo) override {}
+ boost::optional<TTLCollModInfo> ttlInfo) override;
void onDropDatabase(OperationContext* opCtx, const std::string& dbName) override {}
@@ -126,7 +128,7 @@ public:
const NamespaceString& nss,
OptionalCollectionUUID uuid,
const std::string& indexName,
- const BSONObj& indexInfo) override {}
+ const BSONObj& indexInfo) override;
void onRenameCollection(OperationContext* opCtx,
const NamespaceString& fromCollection,