diff options
author | Christopher Caplinger <christopher.caplinger@mongodb.com> | 2022-08-26 17:35:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-26 18:56:53 +0000 |
commit | 2c64cc5a0536aa4c496204af0ee38263426a4085 (patch) | |
tree | 17d1bfd6ab89afee3a56554ac0947960e0b2012c | |
parent | c53c0cf2dbea9ea36af6746f59b7f3647370e210 (diff) | |
download | mongo-2c64cc5a0536aa4c496204af0ee38263426a4085.tar.gz |
SERVER-66150: Share writer pool and client connection for file import
5 files changed, 180 insertions, 76 deletions
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index f3f347915b7..19699a80c85 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -37,9 +37,12 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/replication_auth.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_shard_merge_util.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/wiredtiger/wiredtiger_import.h" #include "mongo/executor/network_interface_factory.h" @@ -63,6 +66,17 @@ const auto _TenantFileImporterService = const ReplicaSetAwareServiceRegistry::Registerer<TenantFileImporterService> _TenantFileImporterServiceRegisterer("TenantFileImporterService"); +/** + * Makes a connection to the provided 'source'. + */ +Status connect(const HostAndPort& source, DBClientConnection* client) { + Status status = client->connect(source, "TenantFileImporterService", boost::none); + if (!status.isOK()) + return status; + return replAuthenticate(client).withContext(str::stream() + << "Failed to authenticate to " << source); +} + void importCopiedFiles(OperationContext* opCtx, UUID& migrationId) { auto tempWTDirectory = fileClonerTempDir(migrationId); uassert(6113315, @@ -110,8 +124,7 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic return &_TenantFileImporterService(serviceContext); } -void TenantFileImporterService::startMigration(const UUID& migrationId, - const StringData& donorConnectionString) { +void TenantFileImporterService::startMigration(const UUID& migrationId) { stdx::lock_guard lk(_mutex); if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) { return; @@ -119,7 +132,6 @@ void TenantFileImporterService::startMigration(const UUID& migrationId, _reset(lk); _migrationId = migrationId; - _donorConnectionString = donorConnectionString.toString(); _eventQueue = std::make_shared<Queue>(); _state = State::kStarted; @@ -129,7 +141,14 @@ void TenantFileImporterService::startMigration(const UUID& migrationId, "TenantFileImporterService starting worker thread", "migrationId"_attr = migrationId.toString()); auto opCtx = cc().makeOperationContext(); - _handleEvents(opCtx.get()); + try { + _handleEvents(opCtx.get()); + } catch (const DBException& err) { + LOGV2_DEBUG(6615001, + 1, + "TenantFileImporterService encountered an error", + "error"_attr = err.toString()); + } }); } @@ -202,25 +221,49 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) { using eventType = ImporterEvent::Type; boost::optional<UUID> migrationId; - - std::shared_ptr<Queue> eventQueueRef; + std::shared_ptr<Queue> eventQueue; { stdx::lock_guard lk(_mutex); - invariant(_eventQueue); - eventQueueRef = _eventQueue; migrationId = _migrationId; + invariant(_eventQueue); + eventQueue = _eventQueue; } - ImporterEvent event{eventType::kNone, UUID::gen()}; + std::shared_ptr<DBClientConnection> donorConnection; + std::shared_ptr<ThreadPool> writerPool; + std::shared_ptr<TenantMigrationSharedData> sharedData; + + auto setUpImporterResourcesIfNeeded = [&](const BSONObj& metadataDoc) { + // Return early if we have already set up the donor connection. + if (_donorConnection) { + return; + } + + auto conn = std::make_shared<DBClientConnection>(true /* autoReconnect */); + auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str()); + uassertStatusOK(connect(donor, conn.get())); + + stdx::lock_guard lk(_mutex); + uassert(ErrorCodes::Interrupted, + str::stream() << "TenantFileImporterService was interrupted for migrationId=\"" + << _migrationId << "\"", + _state != State::kInterrupted); + + _donorConnection = std::move(conn); + _writerPool = + makeReplWriterPool(tenantApplierThreadCount, "TenantFileImporterServiceWriter"_sd); + _sharedData = std::make_shared<TenantMigrationSharedData>( + getGlobalServiceContext()->getFastClockSource(), _migrationId.get()); + + donorConnection = _donorConnection; + writerPool = _writerPool; + sharedData = _sharedData; + }; + while (true) { opCtx->checkForInterrupt(); - try { - event = eventQueueRef->pop(opCtx); - } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) { - LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err); - break; - } + auto event = eventQueue->pop(opCtx); // Out-of-order events for a different migration are not permitted. invariant(event.migrationId == migrationId); @@ -228,9 +271,19 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) { switch (event.type) { case eventType::kNone: continue; - case eventType::kLearnedFileName: - cloneFile(opCtx, event.metadataDoc); + case eventType::kLearnedFileName: { + // we won't have valid donor metadata until the first + // 'TenantFileImporterService::learnedFilename' call, so we need to set up the + // connection for the first kLearnedFileName event. + setUpImporterResourcesIfNeeded(event.metadataDoc); + + cloneFile(opCtx, + donorConnection.get(), + writerPool.get(), + sharedData.get(), + event.metadataDoc); continue; + } case eventType::kLearnedAllFilenames: importCopiedFiles(opCtx, event.migrationId); _voteImportedFiles(opCtx); @@ -273,8 +326,21 @@ void TenantFileImporterService::_interrupt(WithLock) { return; } - // TODO SERVER-66150: interrupt the tenant file cloner by closing the dbClientConnnection via - // shutdownAndDisallowReconnect() and shutting down the writer pool. + if (_donorConnection) { + _donorConnection->shutdownAndDisallowReconnect(); + } + + if (_writerPool) { + _writerPool->shutdown(); + } + + if (_sharedData) { + stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData); + // Prevent the TenantFileCloner from getting retried on retryable errors. + _sharedData->setStatusIfOK( + sharedDatalk, Status{ErrorCodes::CallbackCanceled, "TenantFileCloner canceled"}); + } + if (_eventQueue) { _eventQueue->closeConsumerEnd(); } @@ -299,11 +365,16 @@ void TenantFileImporterService::_reset(WithLock) { if (_thread && _thread->joinable()) { _thread->join(); - _thread.reset(); } - if (_eventQueue) { - _eventQueue.reset(); + if (_writerPool) { + _writerPool->join(); + } + + // Reset _donorConnection so that we create a new DBClientConnection. + // for the next migration. + if (_donorConnection) { + _donorConnection.reset(); } // TODO SERVER-66907: how should we be resetting _opCtx? diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h index d7188f9a0e6..98e87ef9246 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.h +++ b/src/mongo/db/repl/tenant_file_importer_service.h @@ -31,24 +31,48 @@ #include "boost/optional/optional.hpp" +#include "mongo/client/dbclient_connection.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" #include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/producer_consumer_queue.h" #include "mongo/util/string_map.h" #include "mongo/util/uuid.h" namespace mongo::repl { -// Runs on tenant migration recipient primary and secondaries. Copies and imports donor files. +// Runs on tenant migration recipient primary and secondaries. Copies and imports donor files and +// then informs the primary that it has finished by running recipientVoteImportedFiles. class TenantFileImporterService : public ReplicaSetAwareService<TenantFileImporterService> { public: static constexpr StringData kTenantFileImporterServiceName = "TenantFileImporterService"_sd; static TenantFileImporterService* get(ServiceContext* serviceContext); TenantFileImporterService() = default; - void startMigration(const UUID& migrationId, const StringData& donorConnectionString); + + /** + * Begins the process of copying and importing files for a given migration. + */ + void startMigration(const UUID& migrationId); + + /** + * Called for each file to be copied for a given migration. + */ void learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc); + + /** + * Called after all files have been copied for a given migration. + */ void learnedAllFilenames(const UUID& migrationId); + + /** + * Interrupts an in-progress migration with the provided migration id. + */ void interrupt(const UUID& migrationId); + + /** + * Causes any in-progress migration be interrupted. + */ void interruptAll(); private: @@ -70,19 +94,26 @@ private: void onBecomeArbiter() final {} + /** + * A worker function that waits for ImporterEvents and handles cloning and importing files. + */ void _handleEvents(OperationContext* opCtx); + /** + * Called to inform the primary that we have finished copying and importing all files. + */ void _voteImportedFiles(OperationContext* opCtx); + /** + * Called internally by interrupt and interruptAll to interrupt a running file import operation. + */ void _interrupt(WithLock); + /** + * Waits for all async work to be finished and then resets internal state. + */ void _reset(WithLock); - std::unique_ptr<stdx::thread> _thread; - boost::optional<UUID> _migrationId; - std::string _donorConnectionString; - Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex"); - // Explicit State enum ordering defined here because we rely on comparison // operators for state checking in various TenantFileImporterService methods. enum class State { @@ -110,8 +141,6 @@ private: return StringData(); } - State _state; - struct ImporterEvent { enum class Type { kNone, kLearnedFileName, kLearnedAllFilenames }; Type type; @@ -125,7 +154,38 @@ private: using Queue = MultiProducerSingleConsumerQueue<ImporterEvent, producer_consumer_queue_detail::DefaultCostFunction>; + Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex"); + + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access according to class's own rules. + // (M) Reads and writes guarded by _mutex. + // (W) Synchronization required only for writes. + // (I) Independently synchronized, see member variable comment. + + // The worker thread that processes ImporterEvents + std::unique_ptr<stdx::thread> _thread; // (M) + + // The UUID of the current running migration. + boost::optional<UUID> _migrationId; // (M) + + // The state of the current running migration. + State _state; // (M) + + // The DBClientConnection to the donor used for cloning files. + std::shared_ptr<DBClientConnection> + _donorConnection; // (I) pointer set under mutex, copied by callers. + + // The ThreadPool used for cloning files. + std::shared_ptr<ThreadPool> _writerPool; // (I) pointer set under mutex, copied by callers. + + // The TenantMigrationSharedData used for cloning files. + std::shared_ptr<TenantMigrationSharedData> + _sharedData; // (I) pointer set under mutex, copied by callers. - std::shared_ptr<Queue> _eventQueue; + // The Queue used for processing ImporterEvents. + std::shared_ptr<Queue> _eventQueue; // (I) pointer set under mutex, copied by callers. }; } // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 6aee09a4052..e2a047876bf 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -104,8 +104,7 @@ void handleShardMergeStateChange(OperationContext* opCtx, case TenantMigrationRecipientStateEnum::kUninitialized: break; case TenantMigrationRecipientStateEnum::kStarted: - fileImporter->startMigration(recipientStateDoc.getId(), - recipientStateDoc.getDonorConnectionString()); + fileImporter->startMigration(recipientStateDoc.getId()); break; case TenantMigrationRecipientStateEnum::kLearnedFilenames: fileImporter->learnedAllFilenames(recipientStateDoc.getId()); diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp index a600fdc5289..e60b86efdb0 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp @@ -117,17 +117,6 @@ std::string _getPathRelativeTo(const std::string& path, const std::string& baseP std::replace(result.begin(), result.end(), '\\', '/'); return result; } - -/** - * Makes a connection to the provided 'source'. - */ -Status connect(const HostAndPort& source, DBClientConnection* client) { - Status status = client->connect(source, "TenantFileCloner", boost::none); - if (!status.isOK()) - return status; - return replAuthenticate(client).withContext(str::stream() - << "Failed to authenticate to " << source); -} } // namespace void wiredTigerImportFromBackupCursor(OperationContext* opCtx, @@ -230,20 +219,11 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, } } -void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) { - std::unique_ptr<DBClientConnection> client; - std::unique_ptr<TenantMigrationSharedData> sharedData; - auto writerPool = - makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd); - - ON_BLOCK_EXIT([&] { - if (client) { - client->shutdownAndDisallowReconnect(); - } - writerPool->shutdown(); - writerPool->join(); - }); - +void cloneFile(OperationContext* opCtx, + DBClientConnection* clientConnection, + ThreadPool* writerPool, + TenantMigrationSharedData* sharedData, + const BSONObj& metadataDoc) { auto fileName = metadataDoc["filename"].str(); auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName]))); auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName]))); @@ -258,29 +238,17 @@ void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) { "destinationRelativePath"_attr = relativePath); invariant(!relativePath.empty()); - // Connect the client. - if (!client) { - auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str()); - client = std::make_unique<DBClientConnection>(true /* autoReconnect */); - uassertStatusOK(connect(donor, client.get())); - } - - if (!sharedData) { - sharedData = std::make_unique<TenantMigrationSharedData>( - getGlobalServiceContext()->getFastClockSource(), migrationId); - } - auto currentBackupFileCloner = std::make_unique<TenantFileCloner>(backupId, migrationId, fileName, fileSize, relativePath, - sharedData.get(), - client->getServerHostAndPort(), - client.get(), + sharedData, + clientConnection->getServerHostAndPort(), + clientConnection, repl::StorageInterface::get(cc().getServiceContext()), - writerPool.get()); + writerPool); auto cloneStatus = currentBackupFileCloner->run(); if (!cloneStatus.isOK()) { diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.h b/src/mongo/db/repl/tenant_migration_shard_merge_util.h index f1493b1459b..a44ec08d642 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.h +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.h @@ -38,9 +38,11 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" #include "mongo/db/storage/wiredtiger/wiredtiger_import.h" #include "mongo/executor/scoped_task_executor.h" #include "mongo/util/cancellation.h" +#include "mongo/util/concurrency/thread_pool.h" namespace mongo::repl::shard_merge_utils { @@ -104,7 +106,11 @@ struct MetadataInfo { /** * Copy a file from the donor. */ -void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc); +void cloneFile(OperationContext* opCtx, + DBClientConnection* clientConnection, + ThreadPool* writerPool, + TenantMigrationSharedData* sharedData, + const BSONObj& metadataDoc); /** * Import a donor collection after its files have been cloned to a temp dir. |