LOG(0) << "Filtering metadata for namespace in deletion task " << deletionTask.toBSON() << (css->getCurrentMetadataIfKnown() ? " has UUID that does not match UUID of the deletion task" : " is not known") << ", forcing a refresh of " << deletionTask.getNss(); // TODO (SERVER-45577): Add an asynchronous version of // forceShardFilteringMetadataRefresh to avoid blocking on the network in the // thread pool. autoColl.reset(); try { forceShardFilteringMetadataRefresh(opCtx, deletionTask.getNss(), true); } catch (const DBException& ex) { if (ex.toStatus() == ErrorCodes::NamespaceNotFound) { deleteRangeDeletionTaskLocally( opCtx, deletionTask.getId(), ShardingCatalogClient::kLocalWriteConcern); return false; } throw; } } autoColl.emplace(opCtx, deletionTask.getNss(), MODE_IS); if (!css->getCurrentMetadataIfKnown() || !css->getCurrentMetadata()->uuidMatches(deletionTask.getCollectionUuid())) { LOG(0) << "Even after forced refresh, filtering metadata for namespace in deletion " "task " << deletionTask.toBSON() << (css->getCurrentMetadataIfKnown() ? "has UUID that does not match UUID of the deletion task" : "is not known") << ", deleting the task."; autoColl.reset(); deleteRangeDeletionTaskLocally( opCtx, deletionTask.getId(), ShardingCatalogClient::kLocalWriteConcern); return false; } LOG(0) << "Submitting range deletion task " << deletionTask.toBSON(); const auto whenToClean = deletionTask.getWhenToClean() == CleanWhenEnum::kNow ? CollectionShardingRuntime::kNow : CollectionShardingRuntime::kDelayed; auto cleanupCompleteFuture = css->cleanUpRange(deletionTask.getRange(), whenToClean); if (cleanupCompleteFuture.isReady() && !cleanupCompleteFuture.getNoThrow(opCtx).isOK()) { LOG(0) << "Failed to submit range deletion task " << deletionTask.toBSON() << causedBy(cleanupCompleteFuture.getNoThrow(opCtx)); return false; } return true; }); } void submitPendingDeletions(OperationContext* opCtx) { PersistentTaskStore store(opCtx, NamespaceString::kRangeDeletionNamespace); auto query = QUERY("pending" << BSON("$exists" << false)); std::vector invalidRanges; store.forEach(opCtx, query, [&opCtx, &invalidRanges](const RangeDeletionTask& deletionTask) { migrationutil::submitRangeDeletionTask(opCtx, deletionTask); return true; }); } void resubmitRangeDeletionsOnStepUp(ServiceContext* serviceContext) { LOG(0) << "Starting pending deletion submission thread."; auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); ExecutorFuture(executor).getAsync([serviceContext](const Status& status) { ThreadClient tc("ResubmitRangeDeletions", serviceContext); { stdx::lock_guard lk(*tc.get()); tc->setSystemOperationKillable(lk); } auto opCtx = tc->makeOperationContext(); submitPendingDeletions(opCtx.get()); }); } void dropRangeDeletionsCollection(OperationContext* opCtx) { DBDirectClient client(opCtx); client.dropCollection(NamespaceString::kRangeDeletionNamespace.toString(), WriteConcerns::kMajorityWriteConcern); } template void forEachOrphanRange(OperationContext* opCtx, const NamespaceString& nss, Callable&& handler) { AutoGetCollection autoColl(opCtx, nss, MODE_IX); const auto css = CollectionShardingRuntime::get(opCtx, nss); const auto metadata = css->getCurrentMetadata(); const auto emptyChunkMap = RangeMap{SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap()}; if (!metadata->isSharded()) { LOG(0) << "Upgrade: skipping orphaned range enumeration for " << nss << ", collection is not sharded"; return; } auto startingKey = metadata->getMinKey(); while (true) { auto range = metadata->getNextOrphanRange(emptyChunkMap, startingKey); if (!range) { LOG(2) << "Upgrade: Completed orphaned range enumeration for " << nss.toString() << " starting from " << redact(startingKey) << ", no orphan ranges remain"; return; } handler(*range); startingKey = range->getMax(); } } void submitOrphanRanges(OperationContext* opCtx, const NamespaceString& nss, const UUID& uuid) { try { auto version = forceShardFilteringMetadataRefresh(opCtx, nss, true); if (version == ChunkVersion::UNSHARDED()) return; LOG(2) << "Upgrade: Cleaning up existing orphans for " << nss << " : " << uuid; std::vector deletions; forEachOrphanRange(opCtx, nss, [&deletions, &opCtx, &nss, &uuid](const auto& range) { // Since this is not part of an active migration, the migration UUID and the donor shard // are set to unused values so that they don't conflict. RangeDeletionTask task( UUID::gen(), nss, uuid, ShardId("fromFCVUpgrade"), range, CleanWhenEnum::kDelayed); deletions.emplace_back(task); }); if (deletions.empty()) return; PersistentTaskStore store(opCtx, NamespaceString::kRangeDeletionNamespace); for (const auto& task : deletions) { LOG(2) << "Upgrade: Submitting range for cleanup: " << task.getRange() << " from " << nss; store.add(opCtx, task); } } catch (ExceptionFor& e) { LOG(0) << "Upgrade: Failed to cleanup orphans for " << nss << " because the namespace was not found: " << e.what() << ", the collection must have been dropped"; } } void submitOrphanRangesForCleanup(OperationContext* opCtx) { auto& catalog = CollectionCatalog::get(opCtx); const auto& dbs = catalog.getAllDbNames(); for (const auto& dbName : dbs) { if (dbName == NamespaceString::kLocalDb) continue; for (auto collIt = catalog.begin(dbName); collIt != catalog.end(); ++collIt) { auto uuid = collIt.uuid().get(); auto nss = catalog.lookupNSSByUUID(opCtx, uuid).get(); LOG(2) << "Upgrade: processing collection: " << nss; submitOrphanRanges(opCtx, nss, uuid); } } } void persistMigrationCoordinatorLocally(OperationContext* opCtx, const MigrationCoordinatorDocument& migrationDoc) { PersistentTaskStore store( opCtx, NamespaceString::kMigrationCoordinatorsNamespace); try { store.add(opCtx, migrationDoc); } catch (const ExceptionFor&) { // Convert a DuplicateKey error to an anonymous error. uasserted( 31374, str::stream() << "While attempting to write migration information for migration " << ", found document with the same migration id. Attempted migration: " << migrationDoc.toBSON()); } } void persistRangeDeletionTaskLocally(OperationContext* opCtx, const RangeDeletionTask& deletionTask) { PersistentTaskStore store(opCtx, NamespaceString::kRangeDeletionNamespace); try { store.add(opCtx, deletionTask); } catch (const ExceptionFor&) { // Convert a DuplicateKey error to an anonymous error. uasserted(31375, str::stream() << "While attempting to write range deletion task for migration " << ", found document with the same migration id. Attempted range " "deletion task: " << deletionTask.toBSON()); } } void persistCommitDecision(OperationContext* opCtx, const UUID& migrationId) { PersistentTaskStore store( opCtx, NamespaceString::kMigrationCoordinatorsNamespace); store.update( opCtx, QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "committed"))); } void persistAbortDecision(OperationContext* opCtx, const UUID& migrationId) { PersistentTaskStore store( opCtx, NamespaceString::kMigrationCoordinatorsNamespace); store.update( opCtx, QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), BSON("$set" << BSON(MigrationCoordinatorDocument::kDecisionFieldName << "aborted"))); } void deleteRangeDeletionTaskOnRecipient(OperationContext* opCtx, const ShardId& recipientId, const UUID& migrationId) { write_ops::Delete deleteOp(NamespaceString::kRangeDeletionNamespace); write_ops::DeleteOpEntry query(BSON(RangeDeletionTask::kIdFieldName << migrationId), false /*multi*/); deleteOp.setDeletes({query}); sendToRecipient(opCtx, recipientId, deleteOp); } void deleteRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& deletionTaskId, const WriteConcernOptions& writeConcern) { PersistentTaskStore store(opCtx, NamespaceString::kRangeDeletionNamespace); store.remove(opCtx, QUERY(RangeDeletionTask::kIdFieldName << deletionTaskId), writeConcern); } void deleteRangeDeletionTasksForCollectionLocally(OperationContext* opCtx, const UUID& collectionUuid) { PersistentTaskStore store(opCtx, NamespaceString::kRangeDeletionNamespace); store.remove(opCtx, QUERY(RangeDeletionTask::kCollectionUuidFieldName << collectionUuid)); } void markAsReadyRangeDeletionTaskOnRecipient(OperationContext* opCtx, const ShardId& recipientId, const UUID& migrationId) { write_ops::Update updateOp(NamespaceString::kRangeDeletionNamespace); auto queryFilter = BSON(RangeDeletionTask::kIdFieldName << migrationId); auto updateModification = write_ops::UpdateModification( BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << ""))); write_ops::UpdateOpEntry updateEntry(queryFilter, updateModification); updateEntry.setMulti(false); updateEntry.setUpsert(false); updateOp.setUpdates({updateEntry}); sendToRecipient(opCtx, recipientId, updateOp); } void markAsReadyRangeDeletionTaskLocally(OperationContext* opCtx, const UUID& migrationId) { PersistentTaskStore store(opCtx, NamespaceString::kRangeDeletionNamespace); auto query = QUERY(RangeDeletionTask::kIdFieldName << migrationId); auto update = BSON("$unset" << BSON(RangeDeletionTask::kPendingFieldName << "")); store.update(opCtx, query, update); } void deleteMigrationCoordinatorDocumentLocally(OperationContext* opCtx, const UUID& migrationId) { PersistentTaskStore store( opCtx, NamespaceString::kMigrationCoordinatorsNamespace); store.remove(opCtx, QUERY(MigrationCoordinatorDocument::kIdFieldName << migrationId), {1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}); } void ensureChunkVersionIsGreaterThan(OperationContext* opCtx, const ChunkRange& range, const ChunkVersion& preMigrationChunkVersion) { ConfigsvrEnsureChunkVersionIsGreaterThan ensureChunkVersionIsGreaterThanRequest; ensureChunkVersionIsGreaterThanRequest.setDbName(NamespaceString::kAdminDb); ensureChunkVersionIsGreaterThanRequest.setMinKey(range.getMin()); ensureChunkVersionIsGreaterThanRequest.setMaxKey(range.getMax()); ensureChunkVersionIsGreaterThanRequest.setVersion(preMigrationChunkVersion); const auto ensureChunkVersionIsGreaterThanRequestBSON = ensureChunkVersionIsGreaterThanRequest.toBSON({}); for (int attempts = 1;; attempts++) { const auto ensureChunkVersionIsGreaterThanResponse = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, "admin", ensureChunkVersionIsGreaterThanRequestBSON, Shard::RetryPolicy::kIdempotent); const auto ensureChunkVersionIsGreaterThanStatus = Shard::CommandResponse::getEffectiveStatus(ensureChunkVersionIsGreaterThanResponse); if (ensureChunkVersionIsGreaterThanStatus.isOK()) { break; } // If the server is already doing a clean shutdown, join the shutdown. if (globalInShutdownDeprecated()) { shutdown(waitForShutdown()); } opCtx->checkForInterrupt(); LOG(0) << "_configsvrEnsureChunkVersionIsGreaterThan failed after " << attempts << " attempts " << causedBy(ensureChunkVersionIsGreaterThanStatus) << " . Will try again."; } } void refreshFilteringMetadataUntilSuccess(OperationContext* opCtx, const NamespaceString& nss) { for (int attempts = 1;; attempts++) { try { forceShardFilteringMetadataRefresh(opCtx, nss, true); break; } catch (const DBException& ex) { // If the server is already doing a clean shutdown, join the shutdown. if (globalInShutdownDeprecated()) { shutdown(waitForShutdown()); } opCtx->checkForInterrupt(); LOG(0) << "Failed to refresh metadata for " << nss.ns() << " after " << attempts << " attempts " << causedBy(redact(ex.toStatus())) << ". Will try to refresh again."; } } } void resumeMigrationCoordinationsOnStepUp(ServiceContext* serviceContext) { LOG(0) << "Starting migration coordinator stepup recovery thread."; auto executor = Grid::get(serviceContext)->getExecutorPool()->getFixedExecutor(); ExecutorFuture(executor).getAsync([serviceContext](const Status& status) { try { ThreadClient tc("MigrationCoordinatorStepupRecovery", serviceContext); { stdx::lock_guard lk(*tc.get()); tc->setSystemOperationKillable(lk); } auto uniqueOpCtx = tc->makeOperationContext(); auto opCtx = uniqueOpCtx.get(); // Wait for the latest OpTime to be majority committed to ensure any decision that is // read is on the true branch of history. // Note (Esha): I don't think this is strictly required for correctness, but it is // is difficult to reason about, and being pessimistic by waiting for the decision to be // majority committed does not cost much, since stepup should be rare. It *is* required // that this node ensure a decision that it itself recovers is majority committed. For // example, it is possible that this node is a stale primary, and the true primary has // already sent a *commit* decision and re-received a chunk containing the minKey of // this migration. In this case, this node would see that the minKey is still owned and // assume the migration *aborted*. If this node communicated the abort decision to the // recipient, the recipient (if it had not heard the decision yet) would delete data // that the recipient actually owns. (The recipient does not currently wait to hear the // range deletion decision for the first migration before being able to donate (any // part of) the chunk again.) auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient()); replClientInfo.setLastOpToSystemLastOpTime(opCtx); const auto lastOpTime = replClientInfo.getLastOp(); LOG(0) << "Waiting for OpTime " << lastOpTime << " to become majority committed"; WriteConcernResult unusedWCResult; uassertStatusOK( waitForWriteConcern(opCtx, lastOpTime, WriteConcernOptions{WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, WriteConcernOptions::kNoTimeout}, &unusedWCResult)); PersistentTaskStore store( opCtx, NamespaceString::kMigrationCoordinatorsNamespace); Query query; store.forEach(opCtx, query, [&opCtx](const MigrationCoordinatorDocument& doc) { LOG(0) << "Recovering migration " << doc.toBSON(); // Create a MigrationCoordinator to complete the coordination. MigrationCoordinator coordinator(doc.getId(), doc.getDonorShardId(), doc.getRecipientShardId(), doc.getNss(), doc.getCollectionUuid(), doc.getRange(), doc.getPreMigrationChunkVersion()); if (doc.getDecision()) { // The decision is already known. coordinator.setMigrationDecision( (*doc.getDecision()) == DecisionEnum::kCommitted ? MigrationCoordinator::Decision::kCommitted : MigrationCoordinator::Decision::kAborted); coordinator.completeMigration(opCtx); return true; } // The decision is not known. Recover the decision from the config server. ensureChunkVersionIsGreaterThan( opCtx, doc.getRange(), doc.getPreMigrationChunkVersion()); hangBeforeFilteringMetadataRefresh.pauseWhileSet(); refreshFilteringMetadataUntilSuccess(opCtx, doc.getNss()); auto refreshedMetadata = [&] { AutoGetCollection autoColl(opCtx, doc.getNss(), MODE_IS); auto* const css = CollectionShardingRuntime::get(opCtx, doc.getNss()); return css->getCurrentMetadataIfKnown(); }(); if (!refreshedMetadata || !(*refreshedMetadata)->isSharded() || !(*refreshedMetadata)->uuidMatches(doc.getCollectionUuid())) { LOG(0) << "Even after forced refresh, filtering metadata for namespace in " "migration coordinator doc " << doc.toBSON() << (!refreshedMetadata || !(*refreshedMetadata)->isSharded() ? "is not known" : "has UUID that does not match the collection UUID in the " "coordinator doc") << ". Deleting the range deletion tasks on the donor and recipient as " "well as the migration coordinator document on this node."; // TODO (SERVER-45707): Test that range deletion tasks are eventually // deleted even if the collection is dropped before migration coordination // is resumed. deleteRangeDeletionTaskOnRecipient( opCtx, doc.getRecipientShardId(), doc.getId()); deleteRangeDeletionTaskLocally(opCtx, doc.getId()); coordinator.forgetMigration(opCtx); return true; } if ((*refreshedMetadata)->keyBelongsToMe(doc.getRange().getMin())) { coordinator.setMigrationDecision(MigrationCoordinator::Decision::kAborted); } else { coordinator.setMigrationDecision(MigrationCoordinator::Decision::kCommitted); } coordinator.completeMigration(opCtx); return true; }); } catch (const DBException& ex) { LOG(0) << "Failed to resume coordinating migrations on stepup " << causedBy(ex.toStatus()); } }); } } // namespace migrationutil } // namespace mongo