diff options
author | Tommaso Tocci <tommaso.tocci@mongodb.com> | 2020-05-25 13:34:23 +0200 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-06-03 14:13:02 +0000 |
commit | f7c2b0c472b9c0ed9e12301cc5951ecf9f886722 (patch) | |
tree | 2b706f4f15d5f720e399fe4ce09d260c1d8d108e /src/mongo/db/s/migration_util.cpp | |
parent | 9b1ed5d3acb9e38a0ba53e1edd15f8c377a07312 (diff) | |
download | mongo-f7c2b0c472b9c0ed9e12301cc5951ecf9f886722.tar.gz |
SERVER-47985 Implement recovery of a shard's `shardVersion` before it is allowed to perform version checking
Diffstat (limited to 'src/mongo/db/s/migration_util.cpp')
-rw-r--r-- | src/mongo/db/s/migration_util.cpp | 280 |
1 files changed, 135 insertions, 145 deletions
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index ea0ed0fa467..6795cbddc39 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -818,158 +818,148 @@ void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const Namespa } void resumeMigrationCoordinationsOnStepUp(OperationContext* opCtx) { - LOGV2(22037, "Starting migration coordinator stepup recovery thread."); + LOGV2_DEBUG(47985010, 2, "Starting migration coordinator stepup recovery"); - // Don't allow migrations to start until the recovery is complete. Otherwise, the - // migration may end up inserting a migrationCoordinator doc that the recovery thread - // reads and attempts to recovery the decision for by bumping the chunkVersion, which - // will cause the migration to abort on trying to commit anyway. - // Store it as shared_ptr so that it can be captured in the async recovery task below. - const auto migrationBlockingGuard = - std::make_shared<MigrationBlockingGuard>(opCtx, "migration coordinator stepup recovery"); + unsigned long long unfinishedMigrationsCount = 0; + PersistentTaskStore<MigrationCoordinatorDocument> store( + NamespaceString::kMigrationCoordinatorsNamespace); + Query query; + store.forEach(opCtx, + query, + [&opCtx, &unfinishedMigrationsCount](const MigrationCoordinatorDocument& doc) { + unfinishedMigrationsCount++; + LOGV2_DEBUG(47985011, + 3, + "Found unfinished migration on step-up", + "migrationCoordinatorDoc"_attr = redact(doc.toBSON()), + "unfinishedMigrationsCount"_attr = unfinishedMigrationsCount); + + const auto nss = doc.getNss(); + { + AutoGetCollection autoColl(opCtx, nss, MODE_X); + CollectionShardingRuntime::get(opCtx, nss)->clearFilteringMetadata(); + } + + const auto serviceContext = opCtx->getServiceContext(); + ExecutorFuture<void>(getMigrationUtilExecutor()) + .then([serviceContext, nss] { + ThreadClient tc("TriggerMigrationRecovery", serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillable(lk); + } + + auto opCtx = tc->makeOperationContext(); + + hangBeforeFilteringMetadataRefresh.pauseWhileSet(); + + onShardVersionMismatch( + opCtx.get(), nss, boost::none /* shardVersionReceived */); + }) + .onError([](const Status& status) { + LOGV2_WARNING(47985012, + "Error on deferred shardVersion recovery execution", + "error"_attr = redact(status)); + }) + .getAsync([](auto) {}); + + return true; + }); + + ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store( + unfinishedMigrationsCount); + LOGV2_DEBUG(47985013, + 2, + "Finished migration coordinator stepup recovery", + "unfinishedMigrationsCount"_attr = unfinishedMigrationsCount); +} - const auto serviceContext = opCtx->getServiceContext(); - ExecutorFuture<void>(getMigrationUtilExecutor()) - .then([serviceContext] { - ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); - { - stdx::lock_guard<Client> lk(*tc.get()); - tc->setSystemOperationKillable(lk); - } - auto uniqueOpCtx = tc->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); +void recoverMigrationCoordinations(OperationContext* opCtx, NamespaceString nss) { + LOGV2_DEBUG(47985001, 2, "Starting migration recovery", "namespace"_attr = nss); - // Wait for the latest OpTime to be majority committed to ensure any decision that is - // read is on the true branch of history. - // Note (Esha): I don't think this is strictly required for correctness, but it is - // difficult to reason about, and being pessimistic by waiting for the decision to be - // majority committed does not cost much, since stepup should be rare. It *is* required - // that this node ensure a decision that it itself recovers is majority committed. For - // example, it is possible that this node is a stale primary, and the true primary has - // already sent a *commit* decision and re-received a chunk containing the minKey of - // this migration. In this case, this node would see that the minKey is still owned and - // assume the migration *aborted*. If this node communicated the abort decision to the - // recipient, the recipient (if it had not heard the decision yet) would delete data - // that the recipient actually owns. (The recipient does not currently wait to hear the - // range deletion decision for the first migration before being able to donate (any - // part of) the chunk again.) - auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient()); - replClientInfo.setLastOpToSystemLastOpTime(opCtx); - const auto lastOpTime = replClientInfo.getLastOp(); - LOGV2_DEBUG(22038, + unsigned migrationRecoveryCount = 0; + PersistentTaskStore<MigrationCoordinatorDocument> store( + NamespaceString::kMigrationCoordinatorsNamespace); + store.forEach( + opCtx, + QUERY(MigrationCoordinatorDocument::kNssFieldName << nss.toString()), + [&opCtx, &migrationRecoveryCount](const MigrationCoordinatorDocument& doc) { + LOGV2_DEBUG(47985002, 2, - "Waiting for this OpTime to become majority committed", - "lastOpTime"_attr = lastOpTime); - return WaitForMajorityService::get(serviceContext).waitUntilMajority(lastOpTime); - }) - .thenRunOn(getMigrationUtilExecutor()) - .then([serviceContext]() { - ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); - { - stdx::lock_guard<Client> lk(*tc.get()); - tc->setSystemOperationKillable(lk); + "Recovering migration", + "migrationCoordinatorDocument"_attr = redact(doc.toBSON())); + // ensure there is only one migrationCoordinatorDocument + // to be recovered for this namespace + invariant(++migrationRecoveryCount == 1, + "Found more then one migration to recover for a single namespace"); + + + // Create a MigrationCoordinator to complete the coordination. + MigrationCoordinator coordinator(doc); + + if (doc.getDecision()) { + // The decision is already known. + coordinator.setMigrationDecision((*doc.getDecision()) == DecisionEnum::kCommitted + ? MigrationCoordinator::Decision::kCommitted + : MigrationCoordinator::Decision::kAborted); + coordinator.completeMigration(opCtx); + return true; } - auto uniqueOpCtx = tc->makeOperationContext(); - auto opCtx = uniqueOpCtx.get(); - long long migrationRecoveryCount = 0; - PersistentTaskStore<MigrationCoordinatorDocument> store( - NamespaceString::kMigrationCoordinatorsNamespace); - Query query; - store.forEach( - opCtx, - query, - [&opCtx, &migrationRecoveryCount](const MigrationCoordinatorDocument& doc) { - LOGV2_DEBUG(22039, - 2, - "Recovering migration using a given migration coordinator document", - "migrationCoordinatorDoc"_attr = redact(doc.toBSON())); - - migrationRecoveryCount++; - - // Create a MigrationCoordinator to complete the coordination. - MigrationCoordinator coordinator(doc); - - if (doc.getDecision()) { - // The decision is already known. - coordinator.setMigrationDecision( - (*doc.getDecision()) == DecisionEnum::kCommitted - ? MigrationCoordinator::Decision::kCommitted - : MigrationCoordinator::Decision::kAborted); - coordinator.completeMigration(opCtx); - return true; - } - - // The decision is not known. Recover the decision from the config server. - - ensureChunkVersionIsGreaterThan( - opCtx, doc.getRange(), doc.getPreMigrationChunkVersion()); - - hangBeforeFilteringMetadataRefresh.pauseWhileSet(); - - refreshFilteringMetadataUntilSuccess(opCtx, doc.getNss()); - - auto refreshedMetadata = [&] { - AutoGetCollection autoColl(opCtx, doc.getNss(), MODE_IS); - auto* const css = CollectionShardingRuntime::get(opCtx, doc.getNss()); - return css->getCurrentMetadataIfKnown(); - }(); - - if (!refreshedMetadata || !refreshedMetadata->isSharded() || - !refreshedMetadata->uuidMatches(doc.getCollectionUuid())) { - if (!refreshedMetadata || !refreshedMetadata->isSharded()) { - LOGV2( - 22040, - "Even after forced refresh, filtering metadata for this namespace " - "is not known. Deleting the range deletion tasks on the donor " - "and recipient as well as the migration coordinator document on " - "this node", - "migrationCoordinatorDocument"_attr = redact(doc.toBSON())); - } else { - LOGV2( - 46712004, - "Even after forced refresh, the filtering metadata has a UUID that " - "does not match the collection UUID in the migration coordinator " - "document. Deleting the range deletion tasks on the donor and " - "recipient as well as the migration coordinator document on this " - "node", - "migrationCoordinatorDocument"_attr = redact(doc.toBSON()), - "refreshedMetadataUUID"_attr = - refreshedMetadata->getChunkManager()->getUUID(), - "coordinatorDocumentUUID"_attr = doc.getCollectionUuid()); - } - - deleteRangeDeletionTaskOnRecipient( - opCtx, doc.getRecipientShardId(), doc.getId()); - deleteRangeDeletionTaskLocally(opCtx, doc.getId()); - coordinator.forgetMigration(opCtx); - return true; - } - - // Note this should only extend the range boundaries (if there has been a shard - // key refine since the migration began) and never truncate them. - auto chunkRangeToCompareToMetadata = - extendOrTruncateBoundsForMetadata(*refreshedMetadata, doc.getRange()); - if (refreshedMetadata->keyBelongsToMe(chunkRangeToCompareToMetadata.getMin())) { - coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted); - } else { - coordinator.setMigrationDecision( - MigrationCoordinator::Decision::kCommitted); - } - - coordinator.completeMigration(opCtx); - return true; - }); - - ShardingStatistics::get(opCtx).unfinishedMigrationFromPreviousPrimary.store( - migrationRecoveryCount); - }) - .getAsync([migrationBlockingGuard](const Status& status) { - if (!status.isOK()) { - LOGV2(22041, - "Failed to resume coordinator migrations on stepup", - "error"_attr = redact(status)); + // The decision is not known. Recover the decision from the config server. + + ensureChunkVersionIsGreaterThan( + opCtx, doc.getRange(), doc.getPreMigrationChunkVersion()); + + refreshFilteringMetadataUntilSuccess(opCtx, doc.getNss()); + + const auto refreshedMetadata = [&] { + AutoGetCollection autoColl(opCtx, doc.getNss(), MODE_IS); + auto const optMetadata = CollectionShardingRuntime::get(opCtx, doc.getNss()) + ->getCurrentMetadataIfKnown(); + invariant( + optMetadata, + "Collection's metadata have been found UNKNOWN during migration recovery"); + return optMetadata.get(); + }(); + + if (!refreshedMetadata.isSharded() || + !refreshedMetadata.uuidMatches(doc.getCollectionUuid())) { + if (!refreshedMetadata.isSharded()) { + LOGV2(47985003, + "During migration recovery the collection was discovered to have been " + "dropped." + "Deleting the range deletion tasks on the donor and the recipient " + "as well as the migration coordinator document on this node", + "migrationCoordinatorDocument"_attr = redact(doc.toBSON())); + } else { + // UUID don't match + LOGV2(47985004, + "During migration recovery the collection was discovered to have been " + "dropped and recreated. Collection has a UUID that " + "does not match the one in the migration coordinator " + "document. Deleting the range deletion tasks on the donor and " + "recipient as well as the migration coordinator document on this node", + "migrationCoordinatorDocument"_attr = redact(doc.toBSON()), + "refreshedMetadataUUID"_attr = + refreshedMetadata.getChunkManager()->getUUID(), + "coordinatorDocumentUUID"_attr = doc.getCollectionUuid()); + } + + deleteRangeDeletionTaskOnRecipient(opCtx, doc.getRecipientShardId(), doc.getId()); + deleteRangeDeletionTaskLocally(opCtx, doc.getId()); + coordinator.forgetMigration(opCtx); + return true; } + + if (refreshedMetadata.keyBelongsToMe(doc.getRange().getMin())) { + coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted); + } else { + coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted); + } + + coordinator.completeMigration(opCtx); + return true; }); } |