diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/migrating_tenant_donor_util.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/repl/migrating_tenant_donor_util.h | 12 |
6 files changed, 115 insertions, 12 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 10e1afc525c..ffaa6527748 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -796,6 +796,7 @@ env.Library( 'views/views_mongod', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/catalog/collection_catalog', + "$BUILD_DIR/mongo/db/repl/migrating_tenant", '$BUILD_DIR/mongo/s/coreshard', "$BUILD_DIR/mongo/s/grid", ], diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index f051925c4b4..bb935f1cfb7 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -51,6 +51,7 @@ #include "mongo/db/op_observer_util.h" #include "mongo/db/operation_context.h" #include "mongo/db/read_write_concern_defaults.h" +#include "mongo/db/repl/migrating_tenant_donor_util.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/db/repl/replication_coordinator.h" @@ -572,6 +573,9 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg } else if (args.nss == NamespaceString::kConfigSettingsNamespace) { ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc); + } else if (args.nss == NamespaceString::kMigrationDonorsNamespace) { + migrating_tenant_donor_util::onTenantMigrationDonorStateTransition( + opCtx, args.updateArgs.updatedDoc); } } diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b0d1660b815..7c68f650174 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1165,28 +1165,34 @@ env.Library( ) env.Library( - target='migrating_tenant_access_blocker', + target='migrating_tenant', source=[ 'migrating_tenant_access_blocker.cpp', 'migrating_tenant_access_blocker_by_prefix.cpp', 'migrating_tenant_access_blocker_server_status_section.cpp', + 'migrating_tenant_donor_util.cpp', + env.Idlc('migrate_tenant_state_machine.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/catalog_raii', + '$BUILD_DIR/mongo/db/commands/server_status', + '$BUILD_DIR/mongo/db/dbhelpers', '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/mongo/executor/network_interface_factory', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', + 'local_oplog_info', 'optime', - 'repl_coordinator_impl' + 'repl_coordinator_interface', ], ) env.Library( target="serveronly_repl", source=[ - 'migrating_tenant_donor_util.cpp', 'noop_writer.cpp', "replication_coordinator_external_state_impl.cpp", "sync_source_feedback.cpp", - env.Idlc('migrate_tenant_state_machine.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -1214,7 +1220,7 @@ env.Library( '$BUILD_DIR/mongo/util/fail_point', 'bgsync', 'drop_pending_collection_reaper', - 'migrating_tenant_access_blocker', + 'migrating_tenant', 'oplog_application', 'oplog_buffer_collection', 'oplog_interface_remote', diff --git a/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp b/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp index 6c59e14975a..2b6bf6a9547 100644 --- a/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp +++ b/src/mongo/db/repl/migrating_tenant_access_blocker_by_prefix.cpp @@ -45,8 +45,8 @@ void MigratingTenantAccessBlockerByPrefix::add(StringData dbPrefix, std::shared_ptr<MigratingTenantAccessBlocker> mtab) { stdx::lock_guard<Latch> lg(_mutex); - auto blockerIterator = _migratingTenantAccessBlockers.find(dbPrefix); - invariant(blockerIterator != _migratingTenantAccessBlockers.end()); + auto it = _migratingTenantAccessBlockers.find(dbPrefix); + invariant(it == _migratingTenantAccessBlockers.end()); _migratingTenantAccessBlockers.emplace(dbPrefix, mtab); } @@ -80,14 +80,14 @@ MigratingTenantAccessBlockerByPrefix::getMigratingTenantBlocker(StringData dbNam return dbName.startsWith(dbPrefix); }; - auto blockerMatchedIterator = std::find_if(_migratingTenantAccessBlockers.begin(), - _migratingTenantAccessBlockers.end(), - doesDBNameStartWithPrefix); + auto it = std::find_if(_migratingTenantAccessBlockers.begin(), + _migratingTenantAccessBlockers.end(), + doesDBNameStartWithPrefix); - if (blockerMatchedIterator == _migratingTenantAccessBlockers.end()) { + if (it == _migratingTenantAccessBlockers.end()) { return nullptr; } else { - return blockerMatchedIterator->second; + return it->second; } } diff --git a/src/mongo/db/repl/migrating_tenant_donor_util.cpp b/src/mongo/db/repl/migrating_tenant_donor_util.cpp index 5aac1584f02..8e1501fc5ae 100644 --- a/src/mongo/db/repl/migrating_tenant_donor_util.cpp +++ b/src/mongo/db/repl/migrating_tenant_donor_util.cpp @@ -34,10 +34,57 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" +#include "mongo/db/repl/migrate_tenant_state_machine_gen.h" +#include "mongo/db/repl/migrating_tenant_access_blocker_by_prefix.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/util/concurrency/thread_pool.h" namespace mongo { namespace migrating_tenant_donor_util { + +namespace { + +const char kThreadNamePrefix[] = "TenantMigrationWorker-"; +const char kPoolName[] = "TenantMigrationWorkerThreadPool"; +const char kNetName[] = "TenantMigrationWorkerNetwork"; + +/** + * Updates the MigratingTenantAccessBlocker when the tenant migration transitions to the blocking + * state. + */ +void onTransitionToBlocking(OperationContext* opCtx, TenantMigrationDonorDocument& donorDoc) { + invariant(donorDoc.getState() == TenantMigrationDonorStateEnum::kBlocking); + invariant(donorDoc.getBlockTimestamp()); + + auto& mtabByPrefix = MigratingTenantAccessBlockerByPrefix::get(opCtx->getServiceContext()); + auto mtab = mtabByPrefix.getMigratingTenantBlocker(donorDoc.getDatabasePrefix()); + + if (!opCtx->writesAreReplicated()) { + // A primary must create the MigratingTenantAccessBlocker and call startBlockingWrites on it + // before reserving the OpTime for the "start blocking" write, so only secondaries create + // the MigratingTenantAccessBlocker and call startBlockingWrites on it in the op observer. + invariant(!mtab); + + mtab = std::make_shared<MigratingTenantAccessBlocker>( + opCtx->getServiceContext(), + migrating_tenant_donor_util::getTenantMigrationExecutor(opCtx->getServiceContext()) + .get()); + mtabByPrefix.add(donorDoc.getDatabasePrefix(), mtab); + mtab->startBlockingWrites(); + } + + invariant(mtab); + + // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since + // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog + // hole is filled. + mtab->startBlockingReadsAfter(donorDoc.getBlockTimestamp().get()); +} + +} // namespace + void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& originalDoc) { // Send recipientSyncData. @@ -95,6 +142,39 @@ void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& origi return Status::OK(); })); } + +std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContext* serviceContext) { + ThreadPool::Options tpOptions; + tpOptions.threadNamePrefix = kThreadNamePrefix; + tpOptions.poolName = kPoolName; + tpOptions.maxThreads = ThreadPool::Options::kUnlimited; + tpOptions.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + + return std::make_unique<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(tpOptions), + executor::makeNetworkInterface(kNetName, nullptr, nullptr)); +} + +void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc) { + auto donorDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorDoc"), doc); + + switch (donorDoc.getState()) { + case TenantMigrationDonorStateEnum::kDataSync: + break; + case TenantMigrationDonorStateEnum::kBlocking: + onTransitionToBlocking(opCtx, donorDoc); + break; + case TenantMigrationDonorStateEnum::kCommitted: + break; + case TenantMigrationDonorStateEnum::kAborted: + break; + default: + MONGO_UNREACHABLE; + } +} + } // namespace migrating_tenant_donor_util } // namespace mongo diff --git a/src/mongo/db/repl/migrating_tenant_donor_util.h b/src/mongo/db/repl/migrating_tenant_donor_util.h index 14f93fd2bb5..49db3fce5c1 100644 --- a/src/mongo/db/repl/migrating_tenant_donor_util.h +++ b/src/mongo/db/repl/migrating_tenant_donor_util.h @@ -31,6 +31,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/repl/migrate_tenant_state_machine_gen.h" +#include "mongo/executor/task_executor.h" namespace mongo { @@ -42,6 +43,17 @@ namespace migrating_tenant_donor_util { */ void dataSync(OperationContext* opCtx, const TenantMigrationDonorDocument& donorDoc); +/** + * Creates a task executor to be used for tenant migration. + */ +std::shared_ptr<executor::TaskExecutor> getTenantMigrationExecutor(ServiceContext* serviceContext); + +/** + * Updates the MigratingTenantAccessBlocker for the tenant migration represented by the given + * config.migrationDonors document. + */ +void onTenantMigrationDonorStateTransition(OperationContext* opCtx, const BSONObj& doc); + } // namespace migrating_tenant_donor_util } // namespace mongo |