diff options
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_service.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.cpp | 53 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_migration_donor_util.h | 2 |
5 files changed, 60 insertions, 43 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index c13f444e191..b10b2c240b1 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -505,6 +505,10 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( opCtx, it->doc["_id"], it->doc); } + } else if (nss == NamespaceString::kTenantMigrationDonorsNamespace) { + for (auto it = first; it != last; it++) { + tenant_migration_donor::onWriteToDonorStateDoc(opCtx, it->doc); + } } } @@ -574,7 +578,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg ReadWriteConcernDefaults::get(opCtx).observeDirectWriteToConfigSettings( opCtx, args.updateArgs.updatedDoc["_id"], args.updateArgs.updatedDoc); } else if (args.nss == NamespaceString::kTenantMigrationDonorsNamespace) { - tenant_migration_donor::onDonorStateDocUpdate(opCtx, args.updateArgs.updatedDoc); + tenant_migration_donor::onWriteToDonorStateDoc(opCtx, args.updateArgs.updatedDoc); } } diff --git a/src/mongo/db/repl/tenant_migration_donor_service.cpp b/src/mongo/db/repl/tenant_migration_donor_service.cpp index a07833a2caf..cb73f47a053 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_service.cpp @@ -56,6 +56,12 @@ MONGO_FAIL_POINT_DEFINE(skipSendingRecipientSyncDataCommand); const Seconds kRecipientSyncDataTimeout(30); +std::shared_ptr<TenantMigrationAccessBlocker> getTenantMigrationAccessBlocker( + ServiceContext* serviceContext, StringData dbPrefix) { + return TenantMigrationAccessBlockerByPrefix::get(serviceContext) + .getTenantMigrationAccessBlockerForDbPrefix(dbPrefix); +} + } // namespace TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext, @@ -63,13 +69,6 @@ TenantMigrationDonorService::Instance::Instance(ServiceContext* serviceContext, : repl::PrimaryOnlyService::TypedInstance<Instance>(), _serviceContext(serviceContext) { _stateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("initialStateDoc"), initialState); - - _mtab = std::make_shared<TenantMigrationAccessBlocker>( - _serviceContext, - tenant_migration_donor::makeTenantMigrationExecutor(_serviceContext), - _stateDoc.getDatabasePrefix().toString()); - TenantMigrationAccessBlockerByPrefix::get(_serviceContext) - .add(_stateDoc.getDatabasePrefix(), _mtab); } Status TenantMigrationDonorService::Instance::checkIfOptionsConflict(BSONObj options) { @@ -316,7 +315,10 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( }) .then([this, executor] { // Enter "blocking" state. - _mtab->startBlockingWrites(); + auto mtab = + getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix()); + invariant(mtab); + mtab->startBlockingWrites(); const auto opTime = _updateStateDocument(TenantMigrationDonorStateEnum::kBlocking); return _waitForMajorityWriteConcern(executor, std::move(opTime)); }) @@ -344,15 +346,22 @@ SemiFuture<void> TenantMigrationDonorService::Instance::run( .then([this] { // Enter "commit" state. _updateStateDocument(TenantMigrationDonorStateEnum::kCommitted); + auto mtab = + getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix()); + invariant(mtab); + return mtab->onCompletion(); }) - .onError([this](Status status) { - // Enter "abort" state. - _abortReason.emplace(status); - _updateStateDocument(TenantMigrationDonorStateEnum::kAborted); - }) - .then([this] { - // Wait for the migration to commit or abort. - return _mtab->onCompletion(); + .onError([this, executor](Status status) { + auto mtab = + getTenantMigrationAccessBlocker(_serviceContext, _stateDoc.getDatabasePrefix()); + if (status == ErrorCodes::ConflictingOperationInProgress || !mtab) { + return SharedSemiFuture<void>(status); + } else { + // Enter "abort" state. + _abortReason.emplace(status); + _updateStateDocument(TenantMigrationDonorStateEnum::kAborted); + return mtab->onCompletion(); + } }) .onCompletion([this](Status status) { LOGV2(5006601, diff --git a/src/mongo/db/repl/tenant_migration_donor_service.h b/src/mongo/db/repl/tenant_migration_donor_service.h index 6d1da3ac6d4..6234a4b4619 100644 --- a/src/mongo/db/repl/tenant_migration_donor_service.h +++ b/src/mongo/db/repl/tenant_migration_donor_service.h @@ -146,7 +146,6 @@ public: TenantMigrationDonorDocument _stateDoc; - std::shared_ptr<TenantMigrationAccessBlocker> _mtab; boost::optional<Status> _abortReason; // Promise that is resolved when the donor has majority-committed the migration decision. diff --git a/src/mongo/db/repl/tenant_migration_donor_util.cpp b/src/mongo/db/repl/tenant_migration_donor_util.cpp index edae74ab4ff..906655a12f1 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.cpp +++ b/src/mongo/db/repl/tenant_migration_donor_util.cpp @@ -55,6 +55,21 @@ const char kPoolName[] = "TenantMigrationWorkerThreadPool"; const char kNetName[] = "TenantMigrationWorkerNetwork"; /** + * Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the data sync + * state. + */ +void onTransitionToDataSync(OperationContext* opCtx, + const TenantMigrationDonorDocument& donorStateDoc) { + invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kDataSync); + auto mtab = std::make_shared<TenantMigrationAccessBlocker>( + opCtx->getServiceContext(), + tenant_migration_donor::makeTenantMigrationExecutor(opCtx->getServiceContext()), + donorStateDoc.getDatabasePrefix().toString()); + TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()) + .add(donorStateDoc.getDatabasePrefix(), mtab); +} + +/** * Updates the TenantMigrationAccessBlocker when the tenant migration transitions to the blocking * state. */ @@ -63,26 +78,17 @@ void onTransitionToBlocking(OperationContext* opCtx, invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kBlocking); invariant(donorStateDoc.getBlockTimestamp()); - auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()); - auto mtab = - mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix()); + auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()) + .getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix()); + invariant(mtab); if (!opCtx->writesAreReplicated()) { - // A primary must create the TenantMigrationAccessBlocker and call startBlockingWrites on it - // before reserving the OpTime for the "start blocking" write, so only secondaries create - // the TenantMigrationAccessBlocker and call startBlockingWrites on it in the op observer. - invariant(!mtab); - - mtab = std::make_shared<TenantMigrationAccessBlocker>( - opCtx->getServiceContext(), - tenant_migration_donor::makeTenantMigrationExecutor(opCtx->getServiceContext()), - donorStateDoc.getDatabasePrefix().toString()); - mtabByPrefix.add(donorStateDoc.getDatabasePrefix(), mtab); + // A primary must call startBlockingWrites on the TenantMigrationAccessBlocker before + // reserving the OpTime for the "start blocking" write, so only secondaries call + // startBlockingWrites on the TenantMigrationAccessBlocker in the op observer. mtab->startBlockingWrites(); } - invariant(mtab); - // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog // hole is filled. @@ -97,9 +103,8 @@ void onTransitionToCommitted(OperationContext* opCtx, invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kCommitted); invariant(donorStateDoc.getCommitOrAbortOpTime()); - auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()); - auto mtab = - mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix()); + auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()) + .getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix()); invariant(mtab); mtab->commit(donorStateDoc.getCommitOrAbortOpTime().get()); } @@ -112,9 +117,8 @@ void onTransitionToAborted(OperationContext* opCtx, invariant(donorStateDoc.getState() == TenantMigrationDonorStateEnum::kAborted); invariant(donorStateDoc.getCommitOrAbortOpTime()); - auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()); - auto mtab = - mtabByPrefix.getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix()); + auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()) + .getTenantMigrationAccessBlockerForDbPrefix(donorStateDoc.getDatabasePrefix()); invariant(mtab); mtab->abort(donorStateDoc.getCommitOrAbortOpTime().get()); } @@ -133,7 +137,7 @@ std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor( executor::makeNetworkInterface(kNetName, nullptr, nullptr)); } -void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson) { +void onWriteToDonorStateDoc(OperationContext* opCtx, const BSONObj& donorStateDocBson) { auto donorStateDoc = TenantMigrationDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), donorStateDocBson); if (donorStateDoc.getExpireAt()) { @@ -142,6 +146,7 @@ void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDoc } else { switch (donorStateDoc.getState()) { case TenantMigrationDonorStateEnum::kDataSync: + onTransitionToDataSync(opCtx, donorStateDoc); break; case TenantMigrationDonorStateEnum::kBlocking: onTransitionToBlocking(opCtx, donorStateDoc); @@ -196,8 +201,8 @@ void checkIfLinearizableReadWasAllowedOrThrow(OperationContext* opCtx, StringDat } void onWriteToDatabase(OperationContext* opCtx, StringData dbName) { - auto& mtabByPrefix = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()); - auto mtab = mtabByPrefix.getTenantMigrationAccessBlockerForDbName(dbName); + auto mtab = TenantMigrationAccessBlockerByPrefix::get(opCtx->getServiceContext()) + .getTenantMigrationAccessBlockerForDbName(dbName); if (mtab) { mtab->checkIfCanWriteOrThrow(); diff --git a/src/mongo/db/repl/tenant_migration_donor_util.h b/src/mongo/db/repl/tenant_migration_donor_util.h index 4d3376fe170..6c47c594621 100644 --- a/src/mongo/db/repl/tenant_migration_donor_util.h +++ b/src/mongo/db/repl/tenant_migration_donor_util.h @@ -51,7 +51,7 @@ std::unique_ptr<executor::TaskExecutor> makeTenantMigrationExecutor(ServiceConte /** * Updates the donor's in-memory migration state to reflect the given persisted state. */ -void onDonorStateDocUpdate(OperationContext* opCtx, const BSONObj& donorStateDocBson); +void onWriteToDonorStateDoc(OperationContext* opCtx, const BSONObj& donorStateDocBson); /** * If the operation has read concern "snapshot" or includes afterClusterTime, and the database is |