summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/tenant_migration_shard_merge_util.cpp')
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.cpp213
1 files changed, 138 insertions, 75 deletions
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
index 5c39e53af19..700e0d34916 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
@@ -59,10 +59,16 @@ constexpr int kBackupCursorKeepAliveIntervalMillis = mongo::kCursorTimeoutMillis
namespace mongo::repl::shard_merge_utils {
namespace {
+MONGO_FAIL_POINT_DEFINE(skipDeleteTempDBPath);
using namespace fmt::literals;
void moveFile(const std::string& src, const std::string& dst) {
LOGV2_DEBUG(6114304, 1, "Moving file", "src"_attr = src, "dst"_attr = dst);
+
+ tassert(6114401,
+ "Destination file '{}' already exists"_format(dst),
+ !boost::filesystem::exists(dst));
+
// Boost filesystem functions clear "ec" on success.
boost::system::error_code ec;
boost::filesystem::rename(src, dst, ec);
@@ -122,75 +128,6 @@ Status connect(const HostAndPort& source, DBClientConnection* client) {
return replAuthenticate(client).withContext(str::stream()
<< "Failed to authenticate to " << source);
}
-} // namespace
-
-void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) {
- std::unique_ptr<DBClientConnection> client;
- std::unique_ptr<TenantMigrationSharedData> sharedData;
- auto writerPool =
- makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd);
-
- ON_BLOCK_EXIT([&] {
- client->shutdownAndDisallowReconnect();
-
- writerPool->shutdown();
- writerPool->join();
- });
-
- auto fileName = metadataDoc["filename"].str();
- auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName])));
- LOGV2_DEBUG(6113320,
- 1,
- "Cloning file",
- "migrationId"_attr = migrationId,
- "metadata"_attr = metadataDoc);
- auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName])));
- auto remoteDbpath = metadataDoc["remoteDbpath"].str();
- size_t fileSize = metadataDoc["fileSize"].safeNumberLong();
- auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str());
- invariant(!relativePath.empty());
-
- // Connect the client.
- if (!client) {
- auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str());
- client = std::make_unique<DBClientConnection>(true /* autoReconnect */);
- uassertStatusOK(connect(donor, client.get()));
- }
-
- if (!sharedData) {
- sharedData = std::make_unique<TenantMigrationSharedData>(
- getGlobalServiceContext()->getFastClockSource(), migrationId);
- }
-
- auto currentBackupFileCloner =
- std::make_unique<TenantFileCloner>(backupId,
- migrationId,
- fileName,
- fileSize,
- relativePath,
- sharedData.get(),
- client->getServerHostAndPort(),
- client.get(),
- repl::StorageInterface::get(cc().getServiceContext()),
- writerPool.get());
-
- auto cloneStatus = currentBackupFileCloner->run();
- if (!cloneStatus.isOK()) {
- LOGV2_WARNING(6113321,
- "Failed to clone file ",
- "migrationId"_attr = migrationId,
- "fileName"_attr = fileName,
- "error"_attr = cloneStatus);
- } else {
- LOGV2_DEBUG(6113322,
- 1,
- "Cloned file",
- "migrationId"_attr = migrationId,
- "fileName"_attr = fileName);
- }
-
- uassertStatusOK(cloneStatus);
-}
void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
const std::vector<CollectionImportMetadata>& metadatas,
@@ -199,12 +136,26 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
/*
* Move one collection file and one or more index files from temp dir to dbpath.
*/
- moveFile(constructSourcePath(importPath, collectionMetadata.importArgs.ident),
- constructDestinationPath(collectionMetadata.importArgs.ident));
+ auto collFileSourcePath =
+ constructSourcePath(importPath, collectionMetadata.importArgs.ident);
+ auto collFileDestPath = constructDestinationPath(collectionMetadata.importArgs.ident);
+
+ moveFile(collFileSourcePath, collFileDestPath);
+
+ ScopeGuard revertCollFileMove([&] { moveFile(collFileDestPath, collFileSourcePath); });
+
+ auto indexPaths = std::vector<std::tuple<std::string, std::string>>();
+ ScopeGuard revertIndexFileMove([&] {
+ for (const auto& pathTuple : indexPaths) {
+ moveFile(std::get<1>(pathTuple), std::get<0>(pathTuple));
+ }
+ });
for (auto&& indexImportArgs : collectionMetadata.indexes) {
- moveFile(constructSourcePath(importPath, indexImportArgs.ident),
- constructDestinationPath(indexImportArgs.ident));
+ auto indexFileSourcePath = constructSourcePath(importPath, indexImportArgs.ident);
+ auto indexFileDestPath = constructDestinationPath(indexImportArgs.ident);
+ moveFile(indexFileSourcePath, indexFileDestPath);
+ indexPaths.push_back(std::tuple(indexFileSourcePath, indexFileDestPath));
}
/*
@@ -222,7 +173,9 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
AutoGetDb autoDb(opCtx, nss.db(), MODE_IX);
Lock::CollectionLock collLock(opCtx, nss, MODE_X);
auto catalog = CollectionCatalog::get(opCtx);
- WriteUnitOfWork wunit(opCtx);
+ // TODO SERVER-63789 Uncomment WriteUnitOfWork declaration below when we
+ // make file import async.
+ // WriteUnitOfWork wunit(opCtx);
AutoStatsTracker statsTracker(opCtx,
nss,
Top::LockType::NotLocked,
@@ -264,15 +217,89 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
makeCountsChange(ownedCollection->getRecordStore(), collectionMetadata));
UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection));
- wunit.commit();
+ // TODO SERVER-63789 Uncomment wunit.commit() call below when we
+ // make file copy/import async.
+ // wunit.commit();
LOGV2(6114300,
"Imported donor collection",
"ns"_attr = nss,
"numRecordsApprox"_attr = collectionMetadata.numRecords,
"dataSizeApprox"_attr = collectionMetadata.dataSize);
});
+
+ revertCollFileMove.dismiss();
+ revertIndexFileMove.dismiss();
}
}
+} // namespace
+
+void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) {
+ std::unique_ptr<DBClientConnection> client;
+ std::unique_ptr<TenantMigrationSharedData> sharedData;
+ auto writerPool =
+ makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd);
+
+ ON_BLOCK_EXIT([&] {
+ client->shutdownAndDisallowReconnect();
+
+ writerPool->shutdown();
+ writerPool->join();
+ });
+
+ auto fileName = metadataDoc["filename"].str();
+ auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName])));
+ LOGV2_DEBUG(6113320,
+ 1,
+ "Cloning file",
+ "migrationId"_attr = migrationId,
+ "metadata"_attr = metadataDoc);
+ auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName])));
+ auto remoteDbpath = metadataDoc["remoteDbpath"].str();
+ size_t fileSize = metadataDoc["fileSize"].safeNumberLong();
+ auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str());
+ invariant(!relativePath.empty());
+
+ // Connect the client.
+ if (!client) {
+ auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str());
+ client = std::make_unique<DBClientConnection>(true /* autoReconnect */);
+ uassertStatusOK(connect(donor, client.get()));
+ }
+
+ if (!sharedData) {
+ sharedData = std::make_unique<TenantMigrationSharedData>(
+ getGlobalServiceContext()->getFastClockSource(), migrationId);
+ }
+
+ auto currentBackupFileCloner =
+ std::make_unique<TenantFileCloner>(backupId,
+ migrationId,
+ fileName,
+ fileSize,
+ relativePath,
+ sharedData.get(),
+ client->getServerHostAndPort(),
+ client.get(),
+ repl::StorageInterface::get(cc().getServiceContext()),
+ writerPool.get());
+
+ auto cloneStatus = currentBackupFileCloner->run();
+ if (!cloneStatus.isOK()) {
+ LOGV2_WARNING(6113321,
+ "Failed to clone file ",
+ "migrationId"_attr = migrationId,
+ "fileName"_attr = fileName,
+ "error"_attr = cloneStatus);
+ } else {
+ LOGV2_DEBUG(6113322,
+ 1,
+ "Cloned file",
+ "migrationId"_attr = migrationId,
+ "fileName"_attr = fileName);
+ }
+
+ uassertStatusOK(cloneStatus);
+}
SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource,
std::shared_ptr<executor::TaskExecutor> executor,
@@ -296,4 +323,40 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource,
.onCompletion([](auto&&) {})
.semi();
}
+
+void importCopiedFiles(OperationContext* opCtx, UUID migrationId) {
+ auto tempWTDirectory = fileClonerTempDir(migrationId);
+ uassert(6113315,
+ str::stream() << "Missing file cloner's temporary dbpath directory: "
+ << tempWTDirectory.string(),
+ boost::filesystem::exists(tempWTDirectory));
+
+ // TODO SERVER-63204: Evaluate correct place to remove the temporary
+ // WT dbpath.
+ ON_BLOCK_EXIT([&tempWTDirectory, &migrationId] {
+ // TODO SERVER-63789: Delete skipDeleteTempDBPath failpoint
+ if (MONGO_unlikely(skipDeleteTempDBPath.shouldFail())) {
+ LOGV2(6114402,
+ "skipDeleteTempDBPath failpoint enabled, skipping temp directory cleanup.");
+ return;
+ }
+ LOGV2_DEBUG(6113324,
+ 1,
+ "Done importing files, removing the temporary WT dbpath",
+ "migrationId"_attr = migrationId,
+ "tempDbPath"_attr = tempWTDirectory.string());
+ boost::system::error_code ec;
+ boost::filesystem::remove_all(tempWTDirectory, ec);
+ });
+
+ auto metadatas = wiredTigerRollbackToStableAndGetMetadata(opCtx, tempWTDirectory.string());
+
+ // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for
+ // shard merge protocol.
+ try {
+ wiredTigerImportFromBackupCursor(opCtx, metadatas, tempWTDirectory.string());
+ } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) {
+ LOGV2_WARNING(6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus());
+ }
+}
} // namespace mongo::repl::shard_merge_utils