summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Caplinger <christopher.caplinger@mongodb.com>2022-06-23 15:58:45 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-23 16:34:43 +0000
commit0642839041dcab4ec72395259544c498cc2e3dc5 (patch)
tree1a40d46f90f94159e49ac6ce2a4175d374acf5bf
parenta06bc8bbced8f0c60b94ed784f5f105f2f01ed5d (diff)
downloadmongo-0642839041dcab4ec72395259544c498cc2e3dc5.tar.gz
SERVER-63789 Async file copy/import
-rw-r--r--jstests/replsets/libs/tenant_migration_test.js4
-rw-r--r--jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js11
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.cpp280
-rw-r--r--src/mongo/db/repl/tenant_file_importer_service.h134
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp83
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.cpp25
6 files changed, 280 insertions, 257 deletions
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js
index b12512b2b8d..40d73eb5884 100644
--- a/jstests/replsets/libs/tenant_migration_test.js
+++ b/jstests/replsets/libs/tenant_migration_test.js
@@ -343,6 +343,10 @@ function TenantMigrationTest({
donorNodes = donorNodes || donorRst.nodes;
recipientNodes = recipientNodes || recipientRst.nodes;
+ if (typeof migrationId === "string") {
+ migrationId = UUID(migrationId);
+ }
+
donorNodes.forEach(node => {
const configDonorsColl = node.getCollection(TenantMigrationTest.kConfigDonorsNS);
assert.soon(() => 0 === configDonorsColl.count({_id: migrationId}), tojson(node));
diff --git a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
index d5055bb0337..c4ca54443af 100644
--- a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
+++ b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
@@ -34,21 +34,15 @@ if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) {
return;
}
-if (TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) {
- // TODO SERVER-63789: Re-enable this test for Shard Merge
- tenantMigrationTest.stop();
- jsTestLog("Temporarily skipping Shard Merge test, dependent on SERVER-63789.");
- return;
-}
-
const kDataDir =
`${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`;
assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0);
+const dbName = "myDatabase";
+
(function() {
jsTestLog("Generate test data");
-const dbName = "myDatabase";
const db = donorPrimary.getDB(dbName);
const collection = db["myCollection"];
const capped = db["myCappedCollection"];
@@ -69,7 +63,6 @@ configureFailPoint(
recipientPrimary, "WTWriteConflictExceptionForImportCollection", {} /* data */, {times: 1});
configureFailPoint(
recipientPrimary, "WTWriteConflictExceptionForImportIndex", {} /* data */, {times: 1});
-configureFailPoint(recipientPrimary, "skipDeleteTempDBPath");
jsTestLog("Run migration");
// The old multitenant migrations won't copy myDatabase since it doesn't start with testTenantId,
diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp
index 9d1076eb179..af565c3c713 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.cpp
+++ b/src/mongo/db/repl/tenant_file_importer_service.cpp
@@ -34,6 +34,7 @@
#include <fmt/format.h>
#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/catalog_raii.h"
#include "mongo/db/commands/tenant_migration_recipient_cmds_gen.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -54,15 +55,8 @@ namespace mongo::repl {
using namespace fmt::literals;
using namespace shard_merge_utils;
using namespace tenant_migration_access_blocker;
-using executor::NetworkInterface;
-using executor::NetworkInterfaceThreadPool;
-using executor::TaskExecutor;
-using executor::ThreadPoolTaskExecutor;
namespace {
-
-MONGO_FAIL_POINT_DEFINE(skipDeleteTempDBPath);
-
const auto _TenantFileImporterService =
ServiceContext::declareDecoration<TenantFileImporterService>();
@@ -70,7 +64,7 @@ const ReplicaSetAwareServiceRegistry::Registerer<TenantFileImporterService>
_TenantFileImporterServiceRegisterer("TenantFileImporterService");
void importCopiedFiles(OperationContext* opCtx,
- const UUID& migrationId,
+ UUID& migrationId,
const StringData& donorConnectionString) {
auto tempWTDirectory = fileClonerTempDir(migrationId);
uassert(6113315,
@@ -80,12 +74,6 @@ void importCopiedFiles(OperationContext* opCtx,
// TODO SERVER-63204: Evaluate correct place to remove the temporary WT dbpath.
ON_BLOCK_EXIT([&tempWTDirectory, &migrationId] {
- // TODO SERVER-63789: Delete skipDeleteTempDBPath failpoint
- if (MONGO_unlikely(skipDeleteTempDBPath.shouldFail())) {
- LOGV2(6114402,
- "skipDeleteTempDBPath failpoint enabled, skipping temp directory cleanup.");
- return;
- }
LOGV2_INFO(6113324,
"Done importing files, removing the temporary WT dbpath",
"migrationId"_attr = migrationId,
@@ -113,6 +101,7 @@ void importCopiedFiles(OperationContext* opCtx,
auto catalog = CollectionCatalog::get(opCtx);
for (auto&& m : metadatas) {
+ AutoGetDb dbLock(opCtx, m.ns.db(), MODE_IX);
Lock::CollectionLock systemViewsLock(
opCtx,
NamespaceString(m.ns.dbName(), NamespaceString::kSystemDotViewsCollectionName),
@@ -126,142 +115,205 @@ TenantFileImporterService* TenantFileImporterService::get(ServiceContext* servic
return &_TenantFileImporterService(serviceContext);
}
-void TenantFileImporterService::onStartup(OperationContext*) {
- auto net = executor::makeNetworkInterface("TenantFileImporterService-TaskExecutor");
- auto pool = std::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
- _executor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
- _executor->startup();
-}
-
void TenantFileImporterService::startMigration(const UUID& migrationId,
const StringData& donorConnectionString) {
stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kStarted && _state < State::kInterrupted) {
+ return;
+ }
+
_reset(lk);
_migrationId = migrationId;
_donorConnectionString = donorConnectionString.toString();
- _scopedExecutor = std::make_shared<executor::ScopedTaskExecutor>(
- _executor,
- Status{ErrorCodes::CallbackCanceled, "TenantFileImporterService executor cancelled"});
- _state.setState(ImporterState::State::kCopyingFiles);
+ _eventQueue = std::make_shared<Queue>();
+ _state = State::kStarted;
+
+ _thread = std::make_unique<stdx::thread>([this, migrationId] {
+ Client::initThread("TenantFileImporterService");
+ LOGV2_INFO(6378904,
+ "TenantFileImporterService starting worker thread",
+ "migrationId"_attr = migrationId.toString());
+ auto opCtx = cc().makeOperationContext();
+ _handleEvents(opCtx.get());
+ });
}
void TenantFileImporterService::learnedFilename(const UUID& migrationId,
const BSONObj& metadataDoc) {
- auto opCtx = cc().getOperationContext();
- {
- stdx::lock_guard lk(_mutex);
- uassert(8423347,
- "Called learnedFilename with migrationId {}, but {} is active"_format(
- migrationId.toString(), _migrationId ? _migrationId->toString() : "(null)"),
- migrationId == _migrationId);
+ stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) {
+ return;
}
- try {
- // TODO (SERVER-62734): Do this work asynchronously on the executor.
- cloneFile(opCtx, metadataDoc);
- } catch (const DBException& ex) {
- LOGV2_ERROR(6229306,
- "Error cloning files",
- "migrationUUID"_attr = migrationId,
- "error"_attr = ex.toStatus());
- // TODO (SERVER-63390): On error, vote shard merge abort to recipient primary.
- }
+ tassert(8423347,
+ "Called learnedFilename with migrationId {}, but {} is active"_format(
+ migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"),
+ migrationId == _migrationId);
+
+ _state = State::kLearnedFilename;
+ ImporterEvent event{ImporterEvent::Type::kLearnedFileName, migrationId};
+ event.metadataDoc = metadataDoc.getOwned();
+ invariant(_eventQueue);
+ auto success = _eventQueue->tryPush(std::move(event));
+
+ uassert(6378903,
+ "TenantFileImporterService failed to push '{}' event without blocking"_format(
+ stateToString(_state)),
+ success);
}
void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) {
+ stdx::lock_guard lk(_mutex);
+ if (migrationId == _migrationId && _state >= State::kLearnedAllFilenames) {
+ return;
+ }
+
+ tassert(8423345,
+ "Called learnedAllFilenames with migrationId {}, but {} is active"_format(
+ migrationId.toString(), _migrationId ? _migrationId->toString() : "no migration"),
+ migrationId == _migrationId);
+
+ _state = State::kLearnedAllFilenames;
+ invariant(_eventQueue);
+ auto success = _eventQueue->tryPush({ImporterEvent::Type::kLearnedAllFilenames, migrationId});
+ uassert(6378902,
+ "TenantFileImporterService failed to push '{}' event without blocking"_format(
+ stateToString(_state)),
+ success);
+}
+
+void TenantFileImporterService::interrupt(const UUID& migrationId) {
+ stdx::lock_guard lk(_mutex);
+ if (migrationId != _migrationId) {
+ LOGV2_WARNING(
+ 6378901,
+ "Called interrupt with migrationId {migrationId}, but {activeMigrationId} is active",
+ "migrationId"_attr = migrationId.toString(),
+ "activeMigrationId"_attr = _migrationId ? _migrationId->toString() : "no migration");
+ return;
+ }
+ _interrupt(lk);
+}
+
+void TenantFileImporterService::interruptAll() {
+ stdx::lock_guard lk(_mutex);
+ if (!_migrationId) {
+ return;
+ }
+ _interrupt(lk);
+}
+
+void TenantFileImporterService::_handleEvents(OperationContext* opCtx) {
+ using eventType = ImporterEvent::Type;
+
std::string donorConnectionString;
+ boost::optional<UUID> migrationId;
+
+ std::shared_ptr<Queue> eventQueueRef;
{
stdx::lock_guard lk(_mutex);
- if (!_state.is(ImporterState::State::kCopyingFiles)) {
- return;
+ invariant(_eventQueue);
+ eventQueueRef = _eventQueue;
+ donorConnectionString = _donorConnectionString;
+ migrationId = _migrationId;
+ }
+
+ ImporterEvent event{eventType::kNone, UUID::gen()};
+ while (true) {
+ opCtx->checkForInterrupt();
+
+ try {
+ event = eventQueueRef->pop(opCtx);
+ } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>& err) {
+ LOGV2_WARNING(6378900, "Event queue was interrupted", "error"_attr = err);
+ break;
}
- uassert(8423345,
- "Called learnedAllFilenames with migrationId {}, but {} is active"_format(
- migrationId.toString(), _migrationId ? _migrationId->toString() : "(null)"),
- migrationId == _migrationId);
+ // Out-of-order events for a different migration are not permitted.
+ invariant(event.migrationId == migrationId);
+
+ switch (event.type) {
+ case eventType::kNone:
+ continue;
+ case eventType::kLearnedFileName:
+ cloneFile(opCtx, event.metadataDoc);
+ continue;
+ case eventType::kLearnedAllFilenames:
+ importCopiedFiles(opCtx, event.migrationId, donorConnectionString);
+ _voteImportedFiles(opCtx);
+ break;
+ }
+ break;
+ }
+}
- _state.setState(ImporterState::State::kCopiedFiles);
- donorConnectionString = _donorConnectionString;
+void TenantFileImporterService::_voteImportedFiles(OperationContext* opCtx) {
+ boost::optional<UUID> migrationId;
+ {
+ stdx::lock_guard lk(_mutex);
+ migrationId = _migrationId;
}
+ invariant(migrationId);
+
+ auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext());
- auto opCtx = cc().getOperationContext();
- // TODO SERVER-63789: Revisit use of AllowLockAcquisitionOnTimestampedUnitOfWork and
- // remove if possible.
- // No other threads will try to acquire conflicting locks: we are acquiring
- // database/collection locks for new tenants.
- AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState());
+ RecipientVoteImportedFiles cmd(*migrationId, replCoord->getMyHostAndPort(), true /* success */);
- importCopiedFiles(opCtx, migrationId, donorConnectionString);
+ auto voteResponse = replCoord->runCmdOnPrimaryAndAwaitResponse(
+ opCtx,
+ NamespaceString::kAdminDb.toString(),
+ cmd.toBSON({}),
+ [](executor::TaskExecutor::CallbackHandle handle) {},
+ [](executor::TaskExecutor::CallbackHandle handle) {});
- // TODO (SERVER-62734): Keep count of files remaining to import, wait before voting.
- stdx::lock_guard lk(_mutex);
- if (!_state.is(ImporterState::State::kCopiedFiles) || migrationId != _migrationId) {
- LOGV2_INFO(6114103,
- "Not calling recipientVoteImportedFiles: migration ended",
- "currentMigrationId"_attr = _migrationId,
- "previousMigrationId"_attr = migrationId);
- return;
+ auto voteStatus = getStatusFromCommandResult(voteResponse);
+ if (!voteStatus.isOK()) {
+ LOGV2_WARNING(6113403,
+ "Failed to run recipientVoteImportedFiles command on primary",
+ "status"_attr = voteStatus);
+ // TODO SERVER-64192: handle this case, retry, and/or throw error, etc.
}
- _voteImportedFiles(migrationId, lk);
- _state.setState(ImporterState::State::kImportedFiles);
}
-void TenantFileImporterService::reset(const UUID& migrationId) {
- stdx::lock_guard lk(_mutex);
- if (migrationId != _migrationId) {
- LOGV2_DEBUG(6114106,
- 1,
- "Ignoring reset for unknown migrationId",
- "currentMigrationId"_attr = _migrationId,
- "unknownMigrationId"_attr = migrationId);
+void TenantFileImporterService::_interrupt(WithLock) {
+ if (_state == State::kInterrupted) {
return;
}
- _reset(lk);
-}
-void TenantFileImporterService::_voteImportedFiles(const UUID& migrationId, WithLock) {
- auto replCoord = ReplicationCoordinator::get(getGlobalServiceContext());
- // Call the command on the primary (which is self if this node is primary).
- auto primary = replCoord->getCurrentPrimaryHostAndPort();
- if (primary.empty()) {
- LOGV2_WARNING(
- 6113406,
- "No primary for recipientVoteImportedFiles command, cannot continue migration",
- "migrationId"_attr = migrationId);
- return;
+ // TODO SERVER-66150: interrupt the tenant file cloner by closing the dbClientConnnection via
+ // shutdownAndDisallowReconnect() and shutting down the writer pool.
+ if (_eventQueue) {
+ _eventQueue->closeConsumerEnd();
}
- RecipientVoteImportedFiles cmd(migrationId, replCoord->getMyHostAndPort(), true /* success */);
- executor::RemoteCommandRequest request(primary, "admin", cmd.toBSON({}), nullptr);
- request.sslMode = transport::kGlobalSSLMode;
- auto scheduleResult =
- (*_scopedExecutor)
- ->scheduleRemoteCommand(
- request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
- if (!args.response.isOK()) {
- LOGV2_WARNING(6113405,
- "recipientVoteImportedFiles command failed",
- "error"_attr = redact(args.response.status));
- return;
- }
- auto status = getStatusFromCommandResult(args.response.data);
- if (!status.isOK()) {
- LOGV2_WARNING(6113404,
- "recipientVoteImportedFiles command failed",
- "error"_attr = redact(status));
- }
- });
- if (!scheduleResult.isOK()) {
- LOGV2_WARNING(6113403,
- "Failed to schedule recipientVoteImportedFiles command on primary",
- "status"_attr = scheduleResult.getStatus());
+ {
+ // TODO SERVER-66907: Uncomment op ctx interrupt logic.
+ // OperationContext* ptr = _opCtx.get();
+ // stdx::lock_guard<Client> lk(*ptr->getClient());
+ // _opCtx->markKilled(ErrorCodes::Interrupted);
}
+
+ _state = State::kInterrupted;
}
void TenantFileImporterService::_reset(WithLock) {
- _scopedExecutor.reset(); // Shuts down and joins the executor.
- _migrationId.reset();
- _state.setState(ImporterState::State::kUninitialized);
+ if (_migrationId) {
+ LOGV2_INFO(6378905,
+ "TenantFileImporterService resetting migration",
+ "migrationId"_attr = _migrationId->toString());
+ _migrationId.reset();
+ }
+
+ if (_thread && _thread->joinable()) {
+ _thread->join();
+ _thread.reset();
+ }
+
+ if (_eventQueue) {
+ _eventQueue.reset();
+ }
+
+ // TODO SERVER-66907: how should we be resetting _opCtx?
+ _state = State::kUninitialized;
}
} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h
index 92aa2baa426..d7188f9a0e6 100644
--- a/src/mongo/db/repl/tenant_file_importer_service.h
+++ b/src/mongo/db/repl/tenant_file_importer_service.h
@@ -33,9 +33,8 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replica_set_aware_service.h"
-#include "mongo/executor/scoped_task_executor.h"
-#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/util/producer_consumer_queue.h"
#include "mongo/util/string_map.h"
#include "mongo/util/uuid.h"
@@ -49,107 +48,84 @@ public:
void startMigration(const UUID& migrationId, const StringData& donorConnectionString);
void learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc);
void learnedAllFilenames(const UUID& migrationId);
- void reset(const UUID& migrationId);
+ void interrupt(const UUID& migrationId);
+ void interruptAll();
private:
- void onStartup(OperationContext* opCtx) final;
-
- void onInitialDataAvailable(OperationContext* opCtx, bool isMajorityDataAvailable) final {}
+ void onInitialDataAvailable(OperationContext*, bool) final {}
void onShutdown() final {
- stdx::lock_guard lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
+ _interrupt(lk);
_reset(lk);
}
- void onStepUpBegin(OperationContext* opCtx, long long term) final {
- stdx::lock_guard lk(_mutex);
- _reset(lk);
- }
+ void onStartup(OperationContext*) final {}
- void onStepUpComplete(OperationContext* opCtx, long long term) final {
- stdx::lock_guard lk(_mutex);
- _reset(lk);
- }
+ void onStepUpBegin(OperationContext*, long long) final {}
- void onStepDown() final {
- stdx::lock_guard lk(_mutex);
- _reset(lk);
- }
+ void onStepUpComplete(OperationContext*, long long) final {}
- void onBecomeArbiter() final {
- stdx::lock_guard lk(_mutex);
- _reset(lk);
- }
+ void onStepDown() final {}
+
+ void onBecomeArbiter() final {}
+
+ void _handleEvents(OperationContext* opCtx);
- void _voteImportedFiles(const UUID& migrationId, WithLock);
+ void _voteImportedFiles(OperationContext* opCtx);
+
+ void _interrupt(WithLock);
void _reset(WithLock);
- // Lasts for the lifetime of the process.
- std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
- // Wraps _executor. Created by learnedFilename and destroyed by _reset.
- std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor;
+ std::unique_ptr<stdx::thread> _thread;
boost::optional<UUID> _migrationId;
std::string _donorConnectionString;
Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex");
- class ImporterState {
- public:
- enum class State { kUninitialized, kCopyingFiles, kCopiedFiles, kImportedFiles };
-
- void setState(State nextState) {
- tassert(6114403,
- str::stream() << "current state: " << toString(_state)
- << ", new state: " << toString(nextState),
- isValidTransition(nextState));
- _state = nextState;
- }
-
- bool is(State state) const {
- return _state == state;
- }
+ // Explicit State enum ordering defined here because we rely on comparison
+ // operators for state checking in various TenantFileImporterService methods.
+ enum class State {
+ kUninitialized = 0,
+ kStarted = 1,
+ kLearnedFilename = 2,
+ kLearnedAllFilenames = 3,
+ kInterrupted = 4
+ };
- StringData toString() const {
- return toString(_state);
+ static StringData stateToString(State state) {
+ switch (state) {
+ case State::kUninitialized:
+ return "uninitialized";
+ case State::kStarted:
+ return "started";
+ case State::kLearnedFilename:
+ return "learned filename";
+ case State::kLearnedAllFilenames:
+ return "learned all filenames";
+ case State::kInterrupted:
+ return "interrupted";
}
+ MONGO_UNREACHABLE;
+ return StringData();
+ }
- private:
- static StringData toString(State value) {
- switch (value) {
- case State::kUninitialized:
- return "uninitialized";
- case State::kCopyingFiles:
- return "copying files";
- case State::kCopiedFiles:
- return "copied files";
- case State::kImportedFiles:
- return "imported files";
- }
- MONGO_UNREACHABLE;
- return StringData();
- }
+ State _state;
- bool isValidTransition(State newState) {
- if (_state == newState) {
- return true;
- }
-
- switch (_state) {
- case State::kUninitialized:
- return newState == State::kCopyingFiles;
- case State::kCopyingFiles:
- return newState == State::kCopiedFiles || newState == State::kUninitialized;
- case State::kCopiedFiles:
- return newState == State::kImportedFiles || newState == State::kUninitialized;
- case State::kImportedFiles:
- return newState == State::kUninitialized;
- }
- MONGO_UNREACHABLE;
- }
+ struct ImporterEvent {
+ enum class Type { kNone, kLearnedFileName, kLearnedAllFilenames };
+ Type type;
+ UUID migrationId;
+ BSONObj metadataDoc;
- State _state = State::kUninitialized;
+ ImporterEvent(Type _type, const UUID& _migrationId)
+ : type(_type), migrationId(_migrationId) {}
};
- ImporterState _state;
+ using Queue =
+ MultiProducerSingleConsumerQueue<ImporterEvent,
+ producer_consumer_queue_detail::DefaultCostFunction>;
+
+ std::shared_ptr<Queue> _eventQueue;
};
} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index 26f8d75ba3c..4cfdb60b43c 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -155,17 +155,12 @@ void TenantMigrationRecipientOpObserver::onInserts(
return;
}
- try {
- auto fileImporter = repl::TenantFileImporterService::get(opCtx->getServiceContext());
- for (auto it = first; it != last; it++) {
- const auto& metadataDoc = it->doc;
- auto migrationId =
- uassertStatusOK(UUID::parse(metadataDoc[shard_merge_utils::kMigrationIdFieldName]));
- fileImporter->learnedFilename(migrationId, metadataDoc);
- }
- } catch (const DBException& exc) {
- LOGV2_ERROR(
- 8423349, "TenantMigrationRecipientOpObserver::onInserts", "exception"_attr = exc);
+ auto fileImporter = repl::TenantFileImporterService::get(opCtx->getServiceContext());
+ for (auto it = first; it != last; it++) {
+ const auto& metadataDoc = it->doc;
+ auto migrationId =
+ uassertStatusOK(UUID::parse(metadataDoc[shard_merge_utils::kMigrationIdFieldName]));
+ fileImporter->learnedFilename(migrationId, metadataDoc);
}
}
@@ -182,8 +177,8 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
if (recipientStateDoc.getExpireAt() && mtab) {
if (mtab->inStateReject()) {
// The TenantMigrationRecipientAccessBlocker entry needs to be removed to
- // re-allow reads and future migrations with the same tenantId as this migration
- // has already been aborted and forgotten.
+ // re-allow reads and future migrations with the same tenantId as this
+ // migration has already been aborted and forgotten.
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.remove(recipientStateDoc.getTenantId(),
TenantMigrationAccessBlocker::BlockerType::kRecipient);
@@ -209,9 +204,6 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
break;
case TenantMigrationRecipientStateEnum::kStarted:
createAccessBlockerIfNeeded(opCtx, recipientStateDoc);
- repl::TenantFileImporterService::get(opCtx->getServiceContext())
- ->startMigration(recipientStateDoc.getId(),
- recipientStateDoc.getDonorConnectionString());
break;
case TenantMigrationRecipientStateEnum::kLearnedFilenames:
break;
@@ -221,34 +213,36 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
}
break;
case TenantMigrationRecipientStateEnum::kDone:
- repl::TenantFileImporterService::get(opCtx->getServiceContext())
- ->reset(recipientStateDoc.getId());
break;
}
- });
- // Perform TenantFileImporterService::learnedAllFilenames work outside of the above onCommit
- // hook because of work done in a WriteUnitOfWork.
- // TODO SERVER-63789: Revisit this when we make file import async and move
- // within onCommit hook.
- auto state = recipientStateDoc.getState();
- auto protocol = recipientStateDoc.getProtocol().value_or(kDefaultMigrationProtocol);
- if (state == TenantMigrationRecipientStateEnum::kLearnedFilenames) {
- tassert(6114400,
- "Bad state '{}' for protocol '{}'"_format(
- TenantMigrationRecipientState_serializer(state),
- MigrationProtocol_serializer(protocol)),
- protocol == MigrationProtocolEnum::kShardMerge);
-
- try {
+ if (protocol != MigrationProtocolEnum::kShardMerge) {
+ return;
+ }
+
+ if (recipientStateDoc.getExpireAt() && mtab) {
repl::TenantFileImporterService::get(opCtx->getServiceContext())
- ->learnedAllFilenames(recipientStateDoc.getId());
- } catch (const DBException& exc) {
- LOGV2_ERROR(6114104,
- "Calling TenantFileImporterService::learnedAllFilenames",
- "exception"_attr = exc);
+ ->interrupt(recipientStateDoc.getId());
}
- }
+
+ auto fileImporter = repl::TenantFileImporterService::get(opCtx->getServiceContext());
+
+ switch (state) {
+ case TenantMigrationRecipientStateEnum::kUninitialized:
+ break;
+ case TenantMigrationRecipientStateEnum::kStarted:
+ fileImporter->startMigration(recipientStateDoc.getId(),
+ recipientStateDoc.getDonorConnectionString());
+ break;
+ case TenantMigrationRecipientStateEnum::kLearnedFilenames:
+ fileImporter->learnedAllFilenames(recipientStateDoc.getId());
+ break;
+ case TenantMigrationRecipientStateEnum::kConsistent:
+ break;
+ case TenantMigrationRecipientStateEnum::kDone:
+ break;
+ }
+ });
}
}
@@ -301,9 +295,12 @@ void TenantMigrationRecipientOpObserver::onDelete(OperationContext* opCtx,
LOGV2_INFO(6114101,
"Removing expired 'shard merge' migration",
"migrationId"_attr = migrationId);
- TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .removeRecipientAccessBlockersForMigration(migrationId);
- repl::TenantFileImporterService::get(opCtx->getServiceContext())->reset(migrationId);
+ opCtx->recoveryUnit()->onCommit([opCtx, migrationId](boost::optional<Timestamp>) {
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .removeRecipientAccessBlockersForMigration(migrationId);
+ repl::TenantFileImporterService::get(opCtx->getServiceContext())
+ ->interrupt(migrationId);
+ });
}
}
}
@@ -318,6 +315,8 @@ repl::OpTime TenantMigrationRecipientOpObserver::onDropCollection(
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kRecipient);
+
+ repl::TenantFileImporterService::get(opCtx->getServiceContext())->interruptAll();
});
}
return {};
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
index 3d5deee7ce2..b683fe3aebf 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
@@ -176,9 +176,7 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
invariant(db);
Lock::CollectionLock collLock(opCtx, nss, MODE_X);
auto catalog = CollectionCatalog::get(opCtx);
- // TODO SERVER-63789 Uncomment WriteUnitOfWork declaration below when we
- // make file import async.
- // WriteUnitOfWork wunit(opCtx);
+ WriteUnitOfWork wunit(opCtx);
AutoStatsTracker statsTracker(opCtx,
nss,
Top::LockType::NotLocked,
@@ -218,9 +216,8 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
makeCountsChange(ownedCollection->getRecordStore(), collectionMetadata));
CollectionCatalog::get(opCtx)->onCreateCollection(opCtx, std::move(ownedCollection));
- // TODO SERVER-63789 Uncomment wunit.commit() call below when we
- // make file copy/import async.
- // wunit.commit();
+ wunit.commit();
+
LOGV2(6114300,
"Imported donor collection",
"ns"_attr = nss,
@@ -240,23 +237,25 @@ void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) {
makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd);
ON_BLOCK_EXIT([&] {
- client->shutdownAndDisallowReconnect();
-
+ if (client) {
+ client->shutdownAndDisallowReconnect();
+ }
writerPool->shutdown();
writerPool->join();
});
auto fileName = metadataDoc["filename"].str();
auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName])));
- LOGV2_DEBUG(6113320,
- 1,
- "Cloning file",
- "migrationId"_attr = migrationId,
- "metadata"_attr = metadataDoc);
auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName])));
auto remoteDbpath = metadataDoc["remoteDbpath"].str();
size_t fileSize = std::max(0ll, metadataDoc["fileSize"].safeNumberLong());
auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str());
+ LOGV2_DEBUG(6113320,
+ 1,
+ "Cloning file",
+ "migrationId"_attr = migrationId,
+ "metadata"_attr = metadataDoc,
+ "destinationRelativePath"_attr = relativePath);
invariant(!relativePath.empty());
// Connect the client.