summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-08-07 06:16:48 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-07 19:48:44 +0000
commit2931f3c764f8e093ae31adb4b2b62c1ce01d7421 (patch)
tree77c8f40a0db32708f106b6498e9479dc650037af /src/mongo/db
parentf72268197eb8845b11ea24dcf458db4a0c463034 (diff)
downloadmongo-2931f3c764f8e093ae31adb4b2b62c1ce01d7421.tar.gz
SERVER-50174 Make MigrationCoordinator recovery acquire the MigrationBlockingGuard
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/migration_coordinator.cpp40
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp20
-rw-r--r--src/mongo/db/s/migration_source_manager.h13
-rw-r--r--src/mongo/db/s/migration_util.cpp31
4 files changed, 61 insertions, 43 deletions
diff --git a/src/mongo/db/s/migration_coordinator.cpp b/src/mongo/db/s/migration_coordinator.cpp
index a4fe84afb6d..22102845062 100644
--- a/src/mongo/db/s/migration_coordinator.cpp
+++ b/src/mongo/db/s/migration_coordinator.cpp
@@ -182,15 +182,17 @@ SemiFuture<void> MigrationCoordinator::_commitMigrationOnDonorAndRecipient(
23894, 2, "Making commit decision durable", "migrationId"_attr = _migrationInfo.getId());
migrationutil::persistCommitDecision(opCtx, _migrationInfo.getId());
- LOGV2_DEBUG(23895,
- 2,
- "Bumping transaction number with lsid {lsid} and current txnNumber "
- "{currentTxnNumber} on recipient shard {recipientShardId}",
- "Bumping transaction number on recipient shard",
- "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
- "lsid"_attr = _migrationInfo.getLsid().toBSON(),
- "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
- "migrationId"_attr = _migrationInfo.getId());
+ LOGV2_DEBUG(
+ 23895,
+ 2,
+ "Bumping transaction number with lsid {lsid} and current txnNumber {currentTxnNumber} on "
+ "recipient shard {recipientShardId} for commit of collection {nss}",
+ "Bumping transaction number on recipient shard for commit",
+ "namespace"_attr = _migrationInfo.getNss(),
+ "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
+ "lsid"_attr = _migrationInfo.getLsid(),
+ "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
+ "migrationId"_attr = _migrationInfo.getId());
migrationutil::advanceTransactionOnRecipient(opCtx,
_migrationInfo.getRecipientShardId(),
_migrationInfo.getLsid(),
@@ -233,15 +235,17 @@ void MigrationCoordinator::_abortMigrationOnDonorAndRecipient(OperationContext*
23899, 2, "Making abort decision durable", "migrationId"_attr = _migrationInfo.getId());
migrationutil::persistAbortDecision(opCtx, _migrationInfo.getId());
- LOGV2_DEBUG(23900,
- 2,
- "Bumping transaction number with lsid {lsid} and current txnNumber "
- "{currentTxnNumber} on recipient shard {recipientShardId}",
- "Bumping transaction number on recipient shard",
- "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
- "lsid"_attr = _migrationInfo.getLsid().toBSON(),
- "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
- "migrationId"_attr = _migrationInfo.getId());
+ LOGV2_DEBUG(
+ 23900,
+ 2,
+ "Bumping transaction number with lsid {lsid} and current txnNumber {currentTxnNumber} on "
+ "recipient shard {recipientShardId} for abort of collection {nss}",
+ "Bumping transaction number on recipient shard for abort",
+ "namespace"_attr = _migrationInfo.getNss(),
+ "recipientShardId"_attr = _migrationInfo.getRecipientShardId(),
+ "lsid"_attr = _migrationInfo.getLsid(),
+ "currentTxnNumber"_attr = _migrationInfo.getTxnNumber(),
+ "migrationId"_attr = _migrationInfo.getId());
migrationutil::advanceTransactionOnRecipient(opCtx,
_migrationInfo.getRecipientShardId(),
_migrationInfo.getLsid(),
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 5e59b07115f..02539348de4 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -145,15 +145,14 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
onShardVersionMismatch(_opCtx, getNss(), boost::none);
// Snapshot the committed metadata from the time the migration starts
- const auto collectionMetadataAndUUID = [&] {
+ const auto [collectionMetadata, collectionUUID] = [&] {
UninterruptibleLockGuard noInterrupt(_opCtx->lockState());
AutoGetCollection autoColl(_opCtx, getNss(), MODE_IS);
uassert(ErrorCodes::InvalidOptions,
"cannot move chunks for a collection that doesn't exist",
autoColl.getCollection());
- boost::optional<UUID> collectionUUID;
- collectionUUID = autoColl.getCollection()->uuid();
+ UUID collectionUUID = autoColl.getCollection()->uuid();
auto optMetadata =
CollectionShardingRuntime::get(_opCtx, getNss())->getCurrentMetadataIfKnown();
@@ -169,8 +168,6 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
return std::make_tuple(std::move(metadata), std::move(collectionUUID));
}();
- const auto& collectionMetadata = std::get<0>(collectionMetadataAndUUID);
-
const auto collectionVersion = collectionMetadata.getCollVersion();
const auto shardVersion = collectionMetadata.getShardVersion();
@@ -195,11 +192,12 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx,
str::stream() << "Unable to move chunk with arguments '"
<< redact(_args.toString()));
+ _collectionEpoch = collectionVersion.epoch();
+ _collectionUUID = collectionUUID;
+
_chunkVersion = collectionMetadata.getChunkManager()
->findIntersectingChunkWithSimpleCollation(_args.getMinKey())
.getLastmod();
- _collectionEpoch = collectionVersion.epoch();
- _collectionUuid = std::get<1>(collectionMetadataAndUUID);
}
MigrationSourceManager::~MigrationSourceManager() {
@@ -259,7 +257,7 @@ Status MigrationSourceManager::startClone() {
_args.getFromShardId(),
_args.getToShardId(),
getNss(),
- _collectionUuid.get(),
+ *_collectionUUID,
ChunkRange(_args.getMinKey(), _args.getMaxKey()),
_chunkVersion,
_args.getWaitForDelete());
@@ -617,10 +615,10 @@ CollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch() {
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "The collection's epoch has changed since the migration began. "
"Expected collection epoch: "
- << _collectionEpoch.toString() << ", but found: "
+ << _collectionEpoch->toString() << ", but found: "
<< (metadata.isSharded() ? metadata.getCollVersion().epoch().toString()
: "unsharded collection"),
- metadata.isSharded() && metadata.getCollVersion().epoch() == _collectionEpoch);
+ metadata.isSharded() && metadata.getCollVersion().epoch() == *_collectionEpoch);
return metadata;
}
@@ -649,7 +647,7 @@ void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk(
_opCtx, "migrateChunkToNewShard", NamespaceString::kRsOplogNamespace.ns(), [&] {
WriteUnitOfWork uow(_opCtx);
serviceContext->getOpObserver()->onInternalOpMessage(
- _opCtx, getNss(), _collectionUuid, BSON("msg" << dbgMessage), o2Message);
+ _opCtx, getNss(), *_collectionUUID, BSON("msg" << dbgMessage), o2Message);
uow.commit();
});
}
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index efefcda401e..3bbae9988af 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -242,16 +242,15 @@ private:
// The current state. Used only for diagnostics and validation.
State _state{kCreated};
+ // The epoch of the collection being migrated and its UUID, as of the time the migration
+ // started. Values are boost::optional up until the constructor runs, because UUID doesn't have
+ // a default constructor.
+ boost::optional<OID> _collectionEpoch;
+ boost::optional<UUID> _collectionUUID;
+
// The version of the chunk at the time the migration started.
ChunkVersion _chunkVersion;
- // The version of the collection at the time migration started.
- OID _collectionEpoch;
-
- // The UUID of the the collection whose chunks are being moved. Default to empty if the
- // collection doesn't have UUID.
- boost::optional<UUID> _collectionUuid;
-
// Contains logic for ensuring the donor's and recipient's config.rangeDeletions entries are
// correctly updated based on whether the migration committed or aborted.
std::unique_ptr<migrationutil::MigrationCoordinator> _coordinator;
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 1c2cc75c844..6d240a3a23b 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -776,15 +776,25 @@ void ensureChunkVersionIsGreaterThan(OperationContext* opCtx,
}
void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) {
- LOGV2_DEBUG(4798510, 2, "Starting migration coordinator stepup recovery");
+ LOGV2_DEBUG(4798510, 2, "Starting migration coordinator step-up recovery");
unsigned long long unfinishedMigrationsCount = 0;
+
PersistentTaskStore<MigrationCoordinatorDocument> store(
NamespaceString::kMigrationCoordinatorsNamespace);
- Query query;
store.forEach(opCtx,
- query,
+ Query{},
[&opCtx, &unfinishedMigrationsCount](const MigrationCoordinatorDocument& doc) {
+ // MigrationCoordinators are only created under the MigrationBlockingGuard,
+ // which means that only one can possibly exist on an instance at a time.
+ // Furthermore, recovery of an incomplete MigrationCoordator also acquires the
+ // MigrationBlockingGuard. Because of this it is not possible to have more
+ // than one unfinished migration.
+ invariant(unfinishedMigrationsCount == 0,
+ str::stream()
+ << "Upon step-up a second migration coordinator was found"
+ << redact(doc.toBSON()));
+
unfinishedMigrationsCount++;
LOGV2_DEBUG(4798511,
3,
@@ -792,15 +802,21 @@ void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) {
"migrationCoordinatorDoc"_attr = redact(doc.toBSON()),
"unfinishedMigrationsCount"_attr = unfinishedMigrationsCount);
- const auto nss = doc.getNss();
+ const auto& nss = doc.getNss();
+
{
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(opCtx);
}
- const auto serviceContext = opCtx->getServiceContext();
+ auto mbg = std::make_shared<MigrationBlockingGuard>(
+ opCtx,
+ str::stream() << "Recovery of migration session "
+ << doc.getMigrationSessionId().toString()
+ << " on collection " << nss);
+
ExecutorFuture<void>(getMigrationUtilExecutor())
- .then([serviceContext, nss] {
+ .then([serviceContext = opCtx->getServiceContext(), nss, mbg] {
ThreadClient tc("TriggerMigrationRecovery", serviceContext);
{
stdx::lock_guard<Client> lk(*tc.get());
@@ -826,9 +842,10 @@ void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) {
ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store(
unfinishedMigrationsCount);
+
LOGV2_DEBUG(4798513,
2,
- "Finished migration coordinator stepup recovery",
+ "Finished migration coordinator step-up recovery",
"unfinishedMigrationsCount"_attr = unfinishedMigrationsCount);
}