summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/op_observer_impl.cpp4
-rw-r--r--src/mongo/db/repl/SConscript16
-rw-r--r--src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp14
-rw-r--r--src/mongo/db/repl/migrating_tenant_donor_util.cpp80
-rw-r--r--src/mongo/db/repl/migrating_tenant_donor_util.h12
6 files changed, 115 insertions, 12 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 10e1afc525c..ffaa6527748 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -796,6 +796,7 @@ env.Library(
'views/views_mongod',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/catalog/collection_catalog',
+ "$BUILD_DIR/mongo/db/repl/migrating_tenant",
'$BUILD_DIR/mongo/s/coreshard',
"$BUILD_DIR/mongo/s/grid",
],
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index f051925c4b4..bb935f1cfb7 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/op_observer_util.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/read_write_concern_defaults.h"
+#include "mongo/db/repl/migrating_tenant_donor_util.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -572,6 +573,9 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
} else if (args.nss == NamespaceString::kConfigSettingsNamespace) {
ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings(
opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc);
+ } else if (args.nss == NamespaceString::kMigrationDonorsNamespace) {
+ migrating_tenant_donor_util::onTenantMigrationDonorStateTransition(
+ opCtx, args.updateArgs.updatedDoc);
}
}
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b0d1660b815..7c68f650174 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1165,28 +1165,34 @@ env.Library(
)
env.Library(
- target='migrating_tenant_access_blocker',
+ target='migrating_tenant',
source=[
'migrating_tenant_access_blocker.cpp',
'migrating_tenant_access_blocker_by_prefix.cpp',
'migrating_tenant_access_blocker_server_status_section.cpp',
+ 'migrating_tenant_donor_util.cpp',
+ env.Idlc('migrate_tenant_state_machine.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/catalog_raii',
+ '$BUILD_DIR/mongo/db/commands/server_status',
+ '$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/service_context',
+ '$BUILD_DIR/mongo/executor/network_interface_factory',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
+ 'local_oplog_info',
'optime',
- 'repl_coordinator_impl'
+ 'repl_coordinator_interface',
],
)
env.Library(
target="serveronly_repl",
source=[
- 'migrating_tenant_donor_util.cpp',
'noop_writer.cpp',
"replication_coordinator_external_state_impl.cpp",
"sync_source_feedback.cpp",
- env.Idlc('migrate_tenant_state_machine.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -1214,7 +1220,7 @@ env.Library(
'$BUILD_DIR/mongo/util/fail_point',
'bgsync',
'drop_pending_collection_reaper',
- 'migrating_tenant_access_blocker',
+ 'migrating_tenant',
'oplog_application',
'oplog_buffer_collection',
'oplog_interface_remote',
diff --git a/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp b/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp
index 6c59e14975a..2b6bf6a9547 100644
--- a/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp
+++ b/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp
@@ -45,8 +45,8 @@ void MigratingTenantAccessBlockerByPrefix::add(StringData dbPrefix,
std::shared_ptr<MigratingTenantAccessBlocker> mtab) {
stdx::lock_guard<Latch> lg(_mutex);
- auto blockerIterator = _migratingTenantAccessBlockers.find(dbPrefix);
- invariant(blockerIterator != _migratingTenantAccessBlockers.end());
+ auto it = _migratingTenantAccessBlockers.find(dbPrefix);
+ invariant(it == _migratingTenantAccessBlockers.end());
_migratingTenantAccessBlockers.emplace(dbPrefix, mtab);
}
@@ -80,14 +80,14 @@ MigratingTenantAccessBlockerByPrefix::getMigratingTenantBlocker(StringData dbNam
return dbName.startsWith(dbPrefix);
};
- auto blockerMatchedIterator = std::find_if(_migratingTenantAccessBlockers.begin(),
- _migratingTenantAccessBlockers.end(),
- doesDBNameStartWithPrefix);
+ auto it = std::find_if(_migratingTenantAccessBlockers.begin(),
+ _migratingTenantAccessBlockers.end(),
+ doesDBNameStartWithPrefix);
- if (blockerMatchedIterator == _migratingTenantAccessBlockers.end()) {
+ if (it == _migratingTenantAccessBlockers.end()) {
return nullptr;
} else {
- return blockerMatchedIterator->second;
+ return it->second;
}
}
diff --git a/src/mongo/db/repl/migrating_tenant_donor_util.cpp b/src/mongo/db/repl/migrating_tenant_donor_util.cpp
index 5aac1584f02..8e1501fc5ae 100644
--- a/src/mongo/db/repl/migrating_tenant_donor_util.cpp
+++ b/src/mongo/db/repl/migrating_tenant_donor_util.cpp
@@ -34,10 +34,57 @@
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
+#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
+#include "mongo/db/repl/migrating_tenant_access_blocker_by_prefix.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/util/concurrency/thread_pool.h"
namespace mongo {
namespace migrating_tenant_donor_util {
+
+namespace {
+
+const char kThreadNamePrefix[] = "TenantMigrationWorker-";
+const char kPoolName[] = "TenantMigrationWorkerThreadPool";
+const char kNetName[] = "TenantMigrationWorkerNetwork";
+
+/**
+ * Updates the MigratingTenantAccessBlocker when the tenant migration transitions to the blocking
+ * state.
+ */
+void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorDoc) {
+ invariant(donorDoc.getState() == TenantMigrationDonorStateEnum::kBlocking);
+ invariant(donorDoc.getBlockTimestamp());
+
+ auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(opCtx->getServiceContext());
+ auto mtab = mtabByPrefix.getMigratingTenantBlocker(donorDoc.getDatabasePrefix());
+
+ if (!opCtx->writesAreReplicated()) {
+ // A primary must create the MigratingTenantAccessBlocker and call startBlockingWrites on it
+ // before reserving the OpTime for the "start blocking" write, so only secondaries create
+ // the MigratingTenantAccessBlocker and call startBlockingWrites on it in the op observer.
+ invariant(!mtab);
+
+ mtab = std::make_shared<MigratingTenantAccessBlocker>(
+ opCtx->getServiceContext(),
+ migrating_tenant_donor_util::getTenantMigrationExecutor(opCtx->getServiceContext())
+ .get());
+ mtabByPrefix.add(donorDoc.getDatabasePrefix(), mtab);
+ mtab->startBlockingWrites();
+ }
+
+ invariant(mtab);
+
+ // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since
+ // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog
+ // hole is filled.
+ mtab->startBlockingReadsAfter(donorDoc.getBlockTimestamp().get());
+}
+
+} // namespace
+
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDoc) {
// Send recipientSyncData.
@@ -95,6 +142,39 @@ void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& origi
return Status::OK();
}));
}
+
+std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContext* serviceContext) {
+ ThreadPool::Options tpOptions;
+ tpOptions.threadNamePrefix = kThreadNamePrefix;
+ tpOptions.poolName = kPoolName;
+ tpOptions.maxThreads = ThreadPool::Options::kUnlimited;
+ tpOptions.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+
+ return std::make_unique<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(tpOptions),
+ executor::makeNetworkInterface(kNetName, nullptr, nullptr));
+}
+
+void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc) {
+ auto donorDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"), doc);
+
+ switch (donorDoc.getState()) {
+ case TenantMigrationDonorStateEnum::kDataSync:
+ break;
+ case TenantMigrationDonorStateEnum::kBlocking:
+ onTransitionToBlocking(opCtx, donorDoc);
+ break;
+ case TenantMigrationDonorStateEnum::kCommitted:
+ break;
+ case TenantMigrationDonorStateEnum::kAborted:
+ break;
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
} // namespace migrating_tenant_donor_util
} // namespace mongo
diff --git a/src/mongo/db/repl/migrating_tenant_donor_util.h b/src/mongo/db/repl/migrating_tenant_donor_util.h
index 14f93fd2bb5..49db3fce5c1 100644
--- a/src/mongo/db/repl/migrating_tenant_donor_util.h
+++ b/src/mongo/db/repl/migrating_tenant_donor_util.h
@@ -31,6 +31,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/migrate_tenant_state_machine_gen.h"
+#include "mongo/executor/task_executor.h"
namespace mongo {
@@ -42,6 +43,17 @@ namespace migrating_tenant_donor_util {
*/
void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& donorDoc);
+/**
+ * Creates a task executor to be used for tenant migration.
+ */
+std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContext* serviceContext);
+
+/**
+ * Updates the MigratingTenantAccessBlocker for the tenant migration represented by the given
+ * config.migrationDonors document.
+ */
+void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc);
+
} // namespace migrating_tenant_donor_util
} // namespace mongo