diff options
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_shard_merge_util.cpp')
-rw-r--r-- | src/mongo/db/repl/tenant_migration_shard_merge_util.cpp | 213 |
1 files changed, 138 insertions, 75 deletions
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp index 5c39e53af19..700e0d34916 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp @@ -59,10 +59,16 @@ constexpr int kBackupCursorKeepAliveIntervalMillis = mongo::kCursorTimeoutMillis namespace mongo::repl::shard_merge_utils { namespace { +MONGO_FAIL_POINT_DEFINE(skipDeleteTempDBPath); using namespace fmt::literals; void moveFile(const std::string& src, const std::string& dst) { LOGV2_DEBUG(6114304, 1, "Moving file", "src"_attr = src, "dst"_attr = dst); + + tassert(6114401, + "Destination file '{}' already exists"_format(dst), + !boost::filesystem::exists(dst)); + // Boost filesystem functions clear "ec" on success. boost::system::error_code ec; boost::filesystem::rename(src, dst, ec); @@ -122,75 +128,6 @@ Status connect(const HostAndPort& source, DBClientConnection* client) { return replAuthenticate(client).withContext(str::stream() << "Failed to authenticate to " << source); } -} // namespace - -void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) { - std::unique_ptr<DBClientConnection> client; - std::unique_ptr<TenantMigrationSharedData> sharedData; - auto writerPool = - makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd); - - ON_BLOCK_EXIT([&] { - client->shutdownAndDisallowReconnect(); - - writerPool->shutdown(); - writerPool->join(); - }); - - auto fileName = metadataDoc["filename"].str(); - auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName]))); - LOGV2_DEBUG(6113320, - 1, - "Cloning file", - "migrationId"_attr = migrationId, - "metadata"_attr = metadataDoc); - auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName]))); - auto remoteDbpath = metadataDoc["remoteDbpath"].str(); - size_t fileSize = metadataDoc["fileSize"].safeNumberLong(); - auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str()); - invariant(!relativePath.empty()); - - // Connect the client. - if (!client) { - auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str()); - client = std::make_unique<DBClientConnection>(true /* autoReconnect */); - uassertStatusOK(connect(donor, client.get())); - } - - if (!sharedData) { - sharedData = std::make_unique<TenantMigrationSharedData>( - getGlobalServiceContext()->getFastClockSource(), migrationId); - } - - auto currentBackupFileCloner = - std::make_unique<TenantFileCloner>(backupId, - migrationId, - fileName, - fileSize, - relativePath, - sharedData.get(), - client->getServerHostAndPort(), - client.get(), - repl::StorageInterface::get(cc().getServiceContext()), - writerPool.get()); - - auto cloneStatus = currentBackupFileCloner->run(); - if (!cloneStatus.isOK()) { - LOGV2_WARNING(6113321, - "Failed to clone file ", - "migrationId"_attr = migrationId, - "fileName"_attr = fileName, - "error"_attr = cloneStatus); - } else { - LOGV2_DEBUG(6113322, - 1, - "Cloned file", - "migrationId"_attr = migrationId, - "fileName"_attr = fileName); - } - - uassertStatusOK(cloneStatus); -} void wiredTigerImportFromBackupCursor(OperationContext* opCtx, const std::vector<CollectionImportMetadata>& metadatas, @@ -199,12 +136,26 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, /* * Move one collection file and one or more index files from temp dir to dbpath. */ - moveFile(constructSourcePath(importPath, collectionMetadata.importArgs.ident), - constructDestinationPath(collectionMetadata.importArgs.ident)); + auto collFileSourcePath = + constructSourcePath(importPath, collectionMetadata.importArgs.ident); + auto collFileDestPath = constructDestinationPath(collectionMetadata.importArgs.ident); + + moveFile(collFileSourcePath, collFileDestPath); + + ScopeGuard revertCollFileMove([&] { moveFile(collFileDestPath, collFileSourcePath); }); + + auto indexPaths = std::vector<std::tuple<std::string, std::string>>(); + ScopeGuard revertIndexFileMove([&] { + for (const auto& pathTuple : indexPaths) { + moveFile(std::get<1>(pathTuple), std::get<0>(pathTuple)); + } + }); for (auto&& indexImportArgs : collectionMetadata.indexes) { - moveFile(constructSourcePath(importPath, indexImportArgs.ident), - constructDestinationPath(indexImportArgs.ident)); + auto indexFileSourcePath = constructSourcePath(importPath, indexImportArgs.ident); + auto indexFileDestPath = constructDestinationPath(indexImportArgs.ident); + moveFile(indexFileSourcePath, indexFileDestPath); + indexPaths.push_back(std::tuple(indexFileSourcePath, indexFileDestPath)); } /* @@ -222,7 +173,9 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, AutoGetDb autoDb(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_X); auto catalog = CollectionCatalog::get(opCtx); - WriteUnitOfWork wunit(opCtx); + // TODO SERVER-63789 Uncomment WriteUnitOfWork declaration below when we + // make file import async. + // WriteUnitOfWork wunit(opCtx); AutoStatsTracker statsTracker(opCtx, nss, Top::LockType::NotLocked, @@ -264,15 +217,89 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, makeCountsChange(ownedCollection->getRecordStore(), collectionMetadata)); UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection)); - wunit.commit(); + // TODO SERVER-63789 Uncomment wunit.commit() call below when we + // make file copy/import async. + // wunit.commit(); LOGV2(6114300, "Imported donor collection", "ns"_attr = nss, "numRecordsApprox"_attr = collectionMetadata.numRecords, "dataSizeApprox"_attr = collectionMetadata.dataSize); }); + + revertCollFileMove.dismiss(); + revertIndexFileMove.dismiss(); } } +} // namespace + +void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) { + std::unique_ptr<DBClientConnection> client; + std::unique_ptr<TenantMigrationSharedData> sharedData; + auto writerPool = + makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd); + + ON_BLOCK_EXIT([&] { + client->shutdownAndDisallowReconnect(); + + writerPool->shutdown(); + writerPool->join(); + }); + + auto fileName = metadataDoc["filename"].str(); + auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName]))); + LOGV2_DEBUG(6113320, + 1, + "Cloning file", + "migrationId"_attr = migrationId, + "metadata"_attr = metadataDoc); + auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName]))); + auto remoteDbpath = metadataDoc["remoteDbpath"].str(); + size_t fileSize = metadataDoc["fileSize"].safeNumberLong(); + auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str()); + invariant(!relativePath.empty()); + + // Connect the client. + if (!client) { + auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str()); + client = std::make_unique<DBClientConnection>(true /* autoReconnect */); + uassertStatusOK(connect(donor, client.get())); + } + + if (!sharedData) { + sharedData = std::make_unique<TenantMigrationSharedData>( + getGlobalServiceContext()->getFastClockSource(), migrationId); + } + + auto currentBackupFileCloner = + std::make_unique<TenantFileCloner>(backupId, + migrationId, + fileName, + fileSize, + relativePath, + sharedData.get(), + client->getServerHostAndPort(), + client.get(), + repl::StorageInterface::get(cc().getServiceContext()), + writerPool.get()); + + auto cloneStatus = currentBackupFileCloner->run(); + if (!cloneStatus.isOK()) { + LOGV2_WARNING(6113321, + "Failed to clone file ", + "migrationId"_attr = migrationId, + "fileName"_attr = fileName, + "error"_attr = cloneStatus); + } else { + LOGV2_DEBUG(6113322, + 1, + "Cloned file", + "migrationId"_attr = migrationId, + "fileName"_attr = fileName); + } + + uassertStatusOK(cloneStatus); +} SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource, std::shared_ptr<executor::TaskExecutor> executor, @@ -296,4 +323,40 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource, .onCompletion([](auto&&) {}) .semi(); } + +void importCopiedFiles(OperationContext* opCtx, UUID migrationId) { + auto tempWTDirectory = fileClonerTempDir(migrationId); + uassert(6113315, + str::stream() << "Missing file cloner's temporary dbpath directory: " + << tempWTDirectory.string(), + boost::filesystem::exists(tempWTDirectory)); + + // TODO SERVER-63204: Evaluate correct place to remove the temporary + // WT dbpath. + ON_BLOCK_EXIT([&tempWTDirectory, &migrationId] { + // TODO SERVER-63789: Delete skipDeleteTempDBPath failpoint + if (MONGO_unlikely(skipDeleteTempDBPath.shouldFail())) { + LOGV2(6114402, + "skipDeleteTempDBPath failpoint enabled, skipping temp directory cleanup."); + return; + } + LOGV2_DEBUG(6113324, + 1, + "Done importing files, removing the temporary WT dbpath", + "migrationId"_attr = migrationId, + "tempDbPath"_attr = tempWTDirectory.string()); + boost::system::error_code ec; + boost::filesystem::remove_all(tempWTDirectory, ec); + }); + + auto metadatas = wiredTigerRollbackToStableAndGetMetadata(opCtx, tempWTDirectory.string()); + + // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for + // shard merge protocol. + try { + wiredTigerImportFromBackupCursor(opCtx, metadatas, tempWTDirectory.string()); + } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) { + LOGV2_WARNING(6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus()); + } +} } // namespace mongo::repl::shard_merge_utils |