summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Caplinger <christopher.caplinger@mongodb.com>2022-08-26 17:35:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-26 18:56:53 +0000
commit2c64cc5a0536aa4c496204af0ee38263426a4085 (patch)
tree17d1bfd6ab89afee3a56554ac0947960e0b2012c
parentc53c0cf2dbea9ea36af6746f59b7f3647370e210 (diff)
downloadmongo-2c64cc5a0536aa4c496204af0ee38263426a4085.tar.gz
SERVER-66150: Share writer pool and client connection for file import
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.cpp115
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.h80
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp3
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.cpp50
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.h8
5 files changed, 180 insertions, 76 deletions
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp
index f3f347915b7..19699a80c85 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.cpp
+++ b/src/mongo/db/repl/tenant_file_importer_service.cpp
@@ -37,9 +37,12 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/replication_auth.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
+#include "mongo/db/repl/tenant_migration_shared_data.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_import.h"
#include "mongo/executor/network_interface_factory.h"
@@ -63,6 +66,17 @@ const auto _TenantFileImporterService =
const ReplicaSetAwareServiceRegistry::Registerer<TenantFileImporterService>
_TenantFileImporterServiceRegisterer("TenantFileImporterService");
+/**
+ * Makes a connection to the provided 'source'.
+ */
+Status connect(const HostAndPort& source, DBClientConnection* client) {
+ Status status = client->connect(source, "TenantFileImporterService", boost::none);
+ if (!status.isOK())
+ return status;
+ return replAuthenticate(client).withContext(str::stream()
+ << "Failed to authenticate to " << source);
+}
+
void importCopiedFiles(OperationContext* opCtx, UUID& migrationId) {
auto tempWTDirectory = fileClonerTempDir(migrationId);
uassert(6113315,
@@ -110,8 +124,7 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic
return &_TenantFileImporterService(serviceContext);
}
-void TenantFileImporterService::startMigration(const UUID& migrationId,
- const StringData& donorConnectionString) {
+void TenantFileImporterService::startMigration(const UUID& migrationId) {
stdx::lock_guard lk(_mutex);
if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) {
return;
@@ -119,7 +132,6 @@ void TenantFileImporterService::startMigration(const UUID& migrationId,
_reset(lk);
_migrationId = migrationId;
- _donorConnectionString = donorConnectionString.toString();
_eventQueue = std::make_shared<Queue>();
_state = State::kStarted;
@@ -129,7 +141,14 @@ void TenantFileImporterService::startMigration(const UUID& migrationId,
"TenantFileImporterService starting worker thread",
"migrationId"_attr = migrationId.toString());
auto opCtx = cc().makeOperationContext();
- _handleEvents(opCtx.get());
+ try {
+ _handleEvents(opCtx.get());
+ } catch (const DBException& err) {
+ LOGV2_DEBUG(6615001,
+ 1,
+ "TenantFileImporterService encountered an error",
+ "error"_attr = err.toString());
+ }
});
}
@@ -202,25 +221,49 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) {
using eventType = ImporterEvent::Type;
boost::optional<UUID> migrationId;
-
- std::shared_ptr<Queue> eventQueueRef;
+ std::shared_ptr<Queue> eventQueue;
{
stdx::lock_guard lk(_mutex);
- invariant(_eventQueue);
- eventQueueRef = _eventQueue;
migrationId = _migrationId;
+ invariant(_eventQueue);
+ eventQueue = _eventQueue;
}
- ImporterEvent event{eventType::kNone, UUID::gen()};
+ std::shared_ptr<DBClientConnection> donorConnection;
+ std::shared_ptr<ThreadPool> writerPool;
+ std::shared_ptr<TenantMigrationSharedData> sharedData;
+
+ auto setUpImporterResourcesIfNeeded = [&](const BSONObj& metadataDoc) {
+ // Return early if we have already set up the donor connection.
+ if (_donorConnection) {
+ return;
+ }
+
+ auto conn = std::make_shared<DBClientConnection>(true /* autoReconnect */);
+ auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str());
+ uassertStatusOK(connect(donor, conn.get()));
+
+ stdx::lock_guard lk(_mutex);
+ uassert(ErrorCodes::Interrupted,
+ str::stream() << "TenantFileImporterService was interrupted for migrationId=\""
+ << _migrationId << "\"",
+ _state != State::kInterrupted);
+
+ _donorConnection = std::move(conn);
+ _writerPool =
+ makeReplWriterPool(tenantApplierThreadCount, "TenantFileImporterServiceWriter"_sd);
+ _sharedData = std::make_shared<TenantMigrationSharedData>(
+ getGlobalServiceContext()->getFastClockSource(), _migrationId.get());
+
+ donorConnection = _donorConnection;
+ writerPool = _writerPool;
+ sharedData = _sharedData;
+ };
+
while (true) {
opCtx->checkForInterrupt();
- try {
- event = eventQueueRef->pop(opCtx);
- } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) {
- LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err);
- break;
- }
+ auto event = eventQueue->pop(opCtx);
// Out-of-order events for a different migration are not permitted.
invariant(event.migrationId == migrationId);
@@ -228,9 +271,19 @@ void TenantFileImporterService::_handleEvents(OperationContext* opCtx) {
switch (event.type) {
case eventType::kNone:
continue;
- case eventType::kLearnedFileName:
- cloneFile(opCtx, event.metadataDoc);
+ case eventType::kLearnedFileName: {
+ // we won't have valid donor metadata until the first
+ // 'TenantFileImporterService::learnedFilename' call, so we need to set up the
+ // connection for the first kLearnedFileName event.
+ setUpImporterResourcesIfNeeded(event.metadataDoc);
+
+ cloneFile(opCtx,
+ donorConnection.get(),
+ writerPool.get(),
+ sharedData.get(),
+ event.metadataDoc);
continue;
+ }
case eventType::kLearnedAllFilenames:
importCopiedFiles(opCtx, event.migrationId);
_voteImportedFiles(opCtx);
@@ -273,8 +326,21 @@ void TenantFileImporterService::_interrupt(WithLock) {
return;
}
- // TODO SERVER-66150: interrupt the tenant file cloner by closing the dbClientConnnection via
- // shutdownAndDisallowReconnect() and shutting down the writer pool.
+ if (_donorConnection) {
+ _donorConnection->shutdownAndDisallowReconnect();
+ }
+
+ if (_writerPool) {
+ _writerPool->shutdown();
+ }
+
+ if (_sharedData) {
+ stdx::lock_guard<TenantMigrationSharedData> sharedDatalk(*_sharedData);
+ // Prevent the TenantFileCloner from getting retried on retryable errors.
+ _sharedData->setStatusIfOK(
+ sharedDatalk, Status{ErrorCodes::CallbackCanceled, "TenantFileCloner canceled"});
+ }
+
if (_eventQueue) {
_eventQueue->closeConsumerEnd();
}
@@ -299,11 +365,16 @@ void TenantFileImporterService::_reset(WithLock) {
if (_thread && _thread->joinable()) {
_thread->join();
- _thread.reset();
}
- if (_eventQueue) {
- _eventQueue.reset();
+ if (_writerPool) {
+ _writerPool->join();
+ }
+
+ // Reset _donorConnection so that we create a new DBClientConnection.
+ // for the next migration.
+ if (_donorConnection) {
+ _donorConnection.reset();
}
// TODO SERVER-66907: how should we be resetting _opCtx?
diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h
index d7188f9a0e6..98e87ef9246 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.h
+++ b/src/mongo/db/repl/tenant_file_importer_service.h
@@ -31,24 +31,48 @@
#include "boost/optional/optional.hpp"
+#include "mongo/client/dbclient_connection.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replica_set_aware_service.h"
+#include "mongo/db/repl/tenant_migration_shared_data.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/producer_consumer_queue.h"
#include "mongo/util/string_map.h"
#include "mongo/util/uuid.h"
namespace mongo::repl {
-// Runs on tenant migration recipient primary and secondaries. Copies and imports donor files.
+// Runs on tenant migration recipient primary and secondaries. Copies and imports donor files and
+// then informs the primary that it has finished by running recipientVoteImportedFiles.
class TenantFileImporterService : public ReplicaSetAwareService<TenantFileImporterService> {
public:
static constexpr StringData kTenantFileImporterServiceName = "TenantFileImporterService"_sd;
static TenantFileImporterService* get(ServiceContext* serviceContext);
TenantFileImporterService() = default;
- void startMigration(const UUID& migrationId, const StringData& donorConnectionString);
+
+ /**
+ * Begins the process of copying and importing files for a given migration.
+ */
+ void startMigration(const UUID& migrationId);
+
+ /**
+ * Called for each file to be copied for a given migration.
+ */
void learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc);
+
+ /**
+ * Called after all files have been copied for a given migration.
+ */
void learnedAllFilenames(const UUID& migrationId);
+
+ /**
+ * Interrupts an in-progress migration with the provided migration id.
+ */
void interrupt(const UUID& migrationId);
+
+ /**
+ * Causes any in-progress migration be interrupted.
+ */
void interruptAll();
private:
@@ -70,19 +94,26 @@ private:
void onBecomeArbiter() final {}
+ /**
+ * A worker function that waits for ImporterEvents and handles cloning and importing files.
+ */
void _handleEvents(OperationContext* opCtx);
+ /**
+ * Called to inform the primary that we have finished copying and importing all files.
+ */
void _voteImportedFiles(OperationContext* opCtx);
+ /**
+ * Called internally by interrupt and interruptAll to interrupt a running file import operation.
+ */
void _interrupt(WithLock);
+ /**
+ * Waits for all async work to be finished and then resets internal state.
+ */
void _reset(WithLock);
- std::unique_ptr<stdx::thread> _thread;
- boost::optional<UUID> _migrationId;
- std::string _donorConnectionString;
- Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex");
-
// Explicit State enum ordering defined here because we rely on comparison
// operators for state checking in various TenantFileImporterService methods.
enum class State {
@@ -110,8 +141,6 @@ private:
return StringData();
}
- State _state;
-
struct ImporterEvent {
enum class Type { kNone, kLearnedFileName, kLearnedAllFilenames };
Type type;
@@ -125,7 +154,38 @@ private:
using Queue =
MultiProducerSingleConsumerQueue<ImporterEvent,
producer_consumer_queue_detail::DefaultCostFunction>;
+ Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex");
+
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (R) Read-only in concurrent operation; no synchronization required.
+ // (S) Self-synchronizing; access according to class's own rules.
+ // (M) Reads and writes guarded by _mutex.
+ // (W) Synchronization required only for writes.
+ // (I) Independently synchronized, see member variable comment.
+
+ // The worker thread that processes ImporterEvents
+ std::unique_ptr<stdx::thread> _thread; // (M)
+
+ // The UUID of the current running migration.
+ boost::optional<UUID> _migrationId; // (M)
+
+ // The state of the current running migration.
+ State _state; // (M)
+
+ // The DBClientConnection to the donor used for cloning files.
+ std::shared_ptr<DBClientConnection>
+ _donorConnection; // (I) pointer set under mutex, copied by callers.
+
+ // The ThreadPool used for cloning files.
+ std::shared_ptr<ThreadPool> _writerPool; // (I) pointer set under mutex, copied by callers.
+
+ // The TenantMigrationSharedData used for cloning files.
+ std::shared_ptr<TenantMigrationSharedData>
+ _sharedData; // (I) pointer set under mutex, copied by callers.
- std::shared_ptr<Queue> _eventQueue;
+ // The Queue used for processing ImporterEvents.
+ std::shared_ptr<Queue> _eventQueue; // (I) pointer set under mutex, copied by callers.
};
} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index 6aee09a4052..e2a047876bf 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -104,8 +104,7 @@ void handleShardMergeStateChange(OperationContext* opCtx,
case TenantMigrationRecipientStateEnum::kUninitialized:
break;
case TenantMigrationRecipientStateEnum::kStarted:
- fileImporter->startMigration(recipientStateDoc.getId(),
- recipientStateDoc.getDonorConnectionString());
+ fileImporter->startMigration(recipientStateDoc.getId());
break;
case TenantMigrationRecipientStateEnum::kLearnedFilenames:
fileImporter->learnedAllFilenames(recipientStateDoc.getId());
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
index a600fdc5289..e60b86efdb0 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
@@ -117,17 +117,6 @@ std::string _getPathRelativeTo(const std::string& path, const std::string& baseP
std::replace(result.begin(), result.end(), '\\', '/');
return result;
}
-
-/**
- * Makes a connection to the provided 'source'.
- */
-Status connect(const HostAndPort& source, DBClientConnection* client) {
- Status status = client->connect(source, "TenantFileCloner", boost::none);
- if (!status.isOK())
- return status;
- return replAuthenticate(client).withContext(str::stream()
- << "Failed to authenticate to " << source);
-}
} // namespace
void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
@@ -230,20 +219,11 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
}
}
-void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) {
- std::unique_ptr<DBClientConnection> client;
- std::unique_ptr<TenantMigrationSharedData> sharedData;
- auto writerPool =
- makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd);
-
- ON_BLOCK_EXIT([&] {
- if (client) {
- client->shutdownAndDisallowReconnect();
- }
- writerPool->shutdown();
- writerPool->join();
- });
-
+void cloneFile(OperationContext* opCtx,
+ DBClientConnection* clientConnection,
+ ThreadPool* writerPool,
+ TenantMigrationSharedData* sharedData,
+ const BSONObj& metadataDoc) {
auto fileName = metadataDoc["filename"].str();
auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName])));
auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName])));
@@ -258,29 +238,17 @@ void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) {
"destinationRelativePath"_attr = relativePath);
invariant(!relativePath.empty());
- // Connect the client.
- if (!client) {
- auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str());
- client = std::make_unique<DBClientConnection>(true /* autoReconnect */);
- uassertStatusOK(connect(donor, client.get()));
- }
-
- if (!sharedData) {
- sharedData = std::make_unique<TenantMigrationSharedData>(
- getGlobalServiceContext()->getFastClockSource(), migrationId);
- }
-
auto currentBackupFileCloner =
std::make_unique<TenantFileCloner>(backupId,
migrationId,
fileName,
fileSize,
relativePath,
- sharedData.get(),
- client->getServerHostAndPort(),
- client.get(),
+ sharedData,
+ clientConnection->getServerHostAndPort(),
+ clientConnection,
repl::StorageInterface::get(cc().getServiceContext()),
- writerPool.get());
+ writerPool);
auto cloneStatus = currentBackupFileCloner->run();
if (!cloneStatus.isOK()) {
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.h b/src/mongo/db/repl/tenant_migration_shard_merge_util.h
index f1493b1459b..a44ec08d642 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.h
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.h
@@ -38,9 +38,11 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/tenant_migration_shared_data.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_import.h"
#include "mongo/executor/scoped_task_executor.h"
#include "mongo/util/cancellation.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace mongo::repl::shard_merge_utils {
@@ -104,7 +106,11 @@ struct MetadataInfo {
/**
* Copy a file from the donor.
*/
-void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc);
+void cloneFile(OperationContext* opCtx,
+ DBClientConnection* clientConnection,
+ ThreadPool* writerPool,
+ TenantMigrationSharedData* sharedData,
+ const BSONObj& metadataDoc);
/**
* Import a donor collection after its files have been cloned to a temp dir.