summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/migration_util.cpp
diff options
context:
space:
mode:
authorTommaso Tocci <tommaso.tocci@mongodb.com>2020-05-25 13:34:23 +0200
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-06-03 14:13:02 +0000
commitf7c2b0c472b9c0ed9e12301cc5951ecf9f886722 (patch)
tree2b706f4f15d5f720e399fe4ce09d260c1d8d108e /src/mongo/db/s/migration_util.cpp
parent9b1ed5d3acb9e38a0ba53e1edd15f8c377a07312 (diff)
downloadmongo-f7c2b0c472b9c0ed9e12301cc5951ecf9f886722.tar.gz
SERVER-47985 Implement recovery of a shard's `shardVersion` before it is allowed to perform version checking
Diffstat (limited to 'src/mongo/db/s/migration_util.cpp')
-rw-r--r--src/mongo/db/s/migration_util.cpp280
1 files changed, 135 insertions, 145 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index ea0ed0fa467..6795cbddc39 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -818,158 +818,148 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa
}
void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) {
- LOGV2(22037, "Starting migration coordinator stepup recovery thread.");
+ LOGV2_DEBUG(47985010, 2, "Starting migration coordinator stepup recovery");
- // Don't allow migrations to start until the recovery is complete. Otherwise, the
- // migration may end up inserting a migrationCoordinator doc that the recovery thread
- // reads and attempts to recovery the decision for by bumping the chunkVersion, which
- // will cause the migration to abort on trying to commit anyway.
- // Store it as shared_ptr so that it can be captured in the async recovery task below.
- const auto migrationBlockingGuard =
- std::make_shared<MigrationBlockingGuard>(opCtx, "migration coordinator stepup recovery");
+ unsigned long long unfinishedMigrationsCount = 0;
+ PersistentTaskStore<MigrationCoordinatorDocument> store(
+ NamespaceString::kMigrationCoordinatorsNamespace);
+ Query query;
+ store.forEach(opCtx,
+ query,
+ [&opCtx, &unfinishedMigrationsCount](const MigrationCoordinatorDocument& doc) {
+ unfinishedMigrationsCount++;
+ LOGV2_DEBUG(47985011,
+ 3,
+ "Found unfinished migration on step-up",
+ "migrationCoordinatorDoc"_attr = redact(doc.toBSON()),
+ "unfinishedMigrationsCount"_attr = unfinishedMigrationsCount);
+
+ const auto nss = doc.getNss();
+ {
+ AutoGetCollection autoColl(opCtx, nss, MODE_X);
+ CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata();
+ }
+
+ const auto serviceContext = opCtx->getServiceContext();
+ ExecutorFuture<void>(getMigrationUtilExecutor())
+ .then([serviceContext, nss] {
+ ThreadClient tc("TriggerMigrationRecovery", serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillable(lk);
+ }
+
+ auto opCtx = tc->makeOperationContext();
+
+ hangBeforeFilteringMetadataRefresh.pauseWhileSet();
+
+ onShardVersionMismatch(
+ opCtx.get(), nss, boost::none /* shardVersionReceived */);
+ })
+ .onError([](const Status& status) {
+ LOGV2_WARNING(47985012,
+ "Error on deferred shardVersion recovery execution",
+ "error"_attr = redact(status));
+ })
+ .getAsync([](auto) {});
+
+ return true;
+ });
+
+ ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store(
+ unfinishedMigrationsCount);
+ LOGV2_DEBUG(47985013,
+ 2,
+ "Finished migration coordinator stepup recovery",
+ "unfinishedMigrationsCount"_attr = unfinishedMigrationsCount);
+}
- const auto serviceContext = opCtx->getServiceContext();
- ExecutorFuture<void>(getMigrationUtilExecutor())
- .then([serviceContext] {
- ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillable(lk);
- }
- auto uniqueOpCtx = tc->makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
+void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss) {
+ LOGV2_DEBUG(47985001, 2, "Starting migration recovery", "namespace"_attr = nss);
- // Wait for the latest OpTime to be majority committed to ensure any decision that is
- // read is on the true branch of history.
- // Note (Esha): I don't think this is strictly required for correctness, but it is
- // difficult to reason about, and being pessimistic by waiting for the decision to be
- // majority committed does not cost much, since stepup should be rare. It *is* required
- // that this node ensure a decision that it itself recovers is majority committed. For
- // example, it is possible that this node is a stale primary, and the true primary has
- // already sent a *commit* decision and re-received a chunk containing the minKey of
- // this migration. In this case, this node would see that the minKey is still owned and
- // assume the migration *aborted*. If this node communicated the abort decision to the
- // recipient, the recipient (if it had not heard the decision yet) would delete data
- // that the recipient actually owns. (The recipient does not currently wait to hear the
- // range deletion decision for the first migration before being able to donate (any
- // part of) the chunk again.)
- auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient());
- replClientInfo.setLastOpToSystemLastOpTime(opCtx);
- const auto lastOpTime = replClientInfo.getLastOp();
- LOGV2_DEBUG(22038,
+ unsigned migrationRecoveryCount = 0;
+ PersistentTaskStore<MigrationCoordinatorDocument> store(
+ NamespaceString::kMigrationCoordinatorsNamespace);
+ store.forEach(
+ opCtx,
+ QUERY(MigrationCoordinatorDocument::kNssFieldName << nss.toString()),
+ [&opCtx, &migrationRecoveryCount](const MigrationCoordinatorDocument& doc) {
+ LOGV2_DEBUG(47985002,
2,
- "Waiting for this OpTime to become majority committed",
- "lastOpTime"_attr = lastOpTime);
- return WaitForMajorityService::get(serviceContext).waitUntilMajority(lastOpTime);
- })
- .thenRunOn(getMigrationUtilExecutor())
- .then([serviceContext]() {
- ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext);
- {
- stdx::lock_guard<Client> lk(*tc.get());
- tc->setSystemOperationKillable(lk);
+ "Recovering migration",
+ "migrationCoordinatorDocument"_attr = redact(doc.toBSON()));
+ // ensure there is only one migrationCoordinatorDocument
+ // to be recovered for this namespace
+ invariant(++migrationRecoveryCount == 1,
+ "Found more then one migration to recover for a single namespace");
+
+
+ // Create a MigrationCoordinator to complete the coordination.
+ MigrationCoordinator coordinator(doc);
+
+ if (doc.getDecision()) {
+ // The decision is already known.
+ coordinator.setMigrationDecision((*doc.getDecision()) == DecisionEnum::kCommitted
+ ? MigrationCoordinator::Decision::kCommitted
+ : MigrationCoordinator::Decision::kAborted);
+ coordinator.completeMigration(opCtx);
+ return true;
}
- auto uniqueOpCtx = tc->makeOperationContext();
- auto opCtx = uniqueOpCtx.get();
- long long migrationRecoveryCount = 0;
- PersistentTaskStore<MigrationCoordinatorDocument> store(
- NamespaceString::kMigrationCoordinatorsNamespace);
- Query query;
- store.forEach(
- opCtx,
- query,
- [&opCtx, &migrationRecoveryCount](const MigrationCoordinatorDocument& doc) {
- LOGV2_DEBUG(22039,
- 2,
- "Recovering migration using a given migration coordinator document",
- "migrationCoordinatorDoc"_attr = redact(doc.toBSON()));
-
- migrationRecoveryCount++;
-
- // Create a MigrationCoordinator to complete the coordination.
- MigrationCoordinator coordinator(doc);
-
- if (doc.getDecision()) {
- // The decision is already known.
- coordinator.setMigrationDecision(
- (*doc.getDecision()) == DecisionEnum::kCommitted
- ? MigrationCoordinator::Decision::kCommitted
- : MigrationCoordinator::Decision::kAborted);
- coordinator.completeMigration(opCtx);
- return true;
- }
-
- // The decision is not known. Recover the decision from the config server.
-
- ensureChunkVersionIsGreaterThan(
- opCtx, doc.getRange(), doc.getPreMigrationChunkVersion());
-
- hangBeforeFilteringMetadataRefresh.pauseWhileSet();
-
- refreshFilteringMetadataUntilSuccess(opCtx, doc.getNss());
-
- auto refreshedMetadata = [&] {
- AutoGetCollection autoColl(opCtx, doc.getNss(), MODE_IS);
- auto* const css = CollectionShardingRuntime::get(opCtx, doc.getNss());
- return css->getCurrentMetadataIfKnown();
- }();
-
- if (!refreshedMetadata || !refreshedMetadata->isSharded() ||
- !refreshedMetadata->uuidMatches(doc.getCollectionUuid())) {
- if (!refreshedMetadata || !refreshedMetadata->isSharded()) {
- LOGV2(
- 22040,
- "Even after forced refresh, filtering metadata for this namespace "
- "is not known. Deleting the range deletion tasks on the donor "
- "and recipient as well as the migration coordinator document on "
- "this node",
- "migrationCoordinatorDocument"_attr = redact(doc.toBSON()));
- } else {
- LOGV2(
- 46712004,
- "Even after forced refresh, the filtering metadata has a UUID that "
- "does not match the collection UUID in the migration coordinator "
- "document. Deleting the range deletion tasks on the donor and "
- "recipient as well as the migration coordinator document on this "
- "node",
- "migrationCoordinatorDocument"_attr = redact(doc.toBSON()),
- "refreshedMetadataUUID"_attr =
- refreshedMetadata->getChunkManager()->getUUID(),
- "coordinatorDocumentUUID"_attr = doc.getCollectionUuid());
- }
-
- deleteRangeDeletionTaskOnRecipient(
- opCtx, doc.getRecipientShardId(), doc.getId());
- deleteRangeDeletionTaskLocally(opCtx, doc.getId());
- coordinator.forgetMigration(opCtx);
- return true;
- }
-
- // Note this should only extend the range boundaries (if there has been a shard
- // key refine since the migration began) and never truncate them.
- auto chunkRangeToCompareToMetadata =
- extendOrTruncateBoundsForMetadata(*refreshedMetadata, doc.getRange());
- if (refreshedMetadata->keyBelongsToMe(chunkRangeToCompareToMetadata.getMin())) {
- coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted);
- } else {
- coordinator.setMigrationDecision(
- MigrationCoordinator::Decision::kCommitted);
- }
-
- coordinator.completeMigration(opCtx);
- return true;
- });
-
- ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store(
- migrationRecoveryCount);
- })
- .getAsync([migrationBlockingGuard](const Status& status) {
- if (!status.isOK()) {
- LOGV2(22041,
- "Failed to resume coordinator migrations on stepup",
- "error"_attr = redact(status));
+ // The decision is not known. Recover the decision from the config server.
+
+ ensureChunkVersionIsGreaterThan(
+ opCtx, doc.getRange(), doc.getPreMigrationChunkVersion());
+
+ refreshFilteringMetadataUntilSuccess(opCtx, doc.getNss());
+
+ const auto refreshedMetadata = [&] {
+ AutoGetCollection autoColl(opCtx, doc.getNss(), MODE_IS);
+ auto const optMetadata = CollectionShardingRuntime::get(opCtx, doc.getNss())
+ ->getCurrentMetadataIfKnown();
+ invariant(
+ optMetadata,
+ "Collection's metadata have been found UNKNOWN during migration recovery");
+ return optMetadata.get();
+ }();
+
+ if (!refreshedMetadata.isSharded() ||
+ !refreshedMetadata.uuidMatches(doc.getCollectionUuid())) {
+ if (!refreshedMetadata.isSharded()) {
+ LOGV2(47985003,
+ "During migration recovery the collection was discovered to have been "
+ "dropped."
+ "Deleting the range deletion tasks on the donor and the recipient "
+ "as well as the migration coordinator document on this node",
+ "migrationCoordinatorDocument"_attr = redact(doc.toBSON()));
+ } else {
+ // UUID don't match
+ LOGV2(47985004,
+ "During migration recovery the collection was discovered to have been "
+ "dropped and recreated. Collection has a UUID that "
+ "does not match the one in the migration coordinator "
+ "document. Deleting the range deletion tasks on the donor and "
+ "recipient as well as the migration coordinator document on this node",
+ "migrationCoordinatorDocument"_attr = redact(doc.toBSON()),
+ "refreshedMetadataUUID"_attr =
+ refreshedMetadata.getChunkManager()->getUUID(),
+ "coordinatorDocumentUUID"_attr = doc.getCollectionUuid());
+ }
+
+ deleteRangeDeletionTaskOnRecipient(opCtx, doc.getRecipientShardId(), doc.getId());
+ deleteRangeDeletionTaskLocally(opCtx, doc.getId());
+ coordinator.forgetMigration(opCtx);
+ return true;
}
+
+ if (refreshedMetadata.keyBelongsToMe(doc.getRange().getMin())) {
+ coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted);
+ } else {
+ coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted);
+ }
+
+ coordinator.completeMigration(opCtx);
+ return true;
});
}