diff options
author | mathisbessamdb <mathis.bessa@mongodb.com> | 2022-02-17 22:10:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-17 22:47:16 +0000 |
commit | 909df54290217588dfb01b0942f8ba2cac642bac (patch) | |
tree | a3fa49bcfcf158480934e68a13c8e9c8d2c324ff /src/mongo/db | |
parent | 34308d1aab95d7604384fb0e17591b0c9ccb7a54 (diff) | |
download | mongo-909df54290217588dfb01b0942f8ba2cac642bac.tar.gz |
SERVER-62366 Add forgetShardSplit command
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/repl_server_parameters.idl | 10 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_commands.cpp | 79 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_commands.idl | 11 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_op_observer.cpp | 151 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 113 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.h | 42 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service_test.cpp | 146 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_state_machine.idl | 6 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils.cpp | 114 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils.h | 56 | ||||
-rw-r--r-- | src/mongo/db/serverless/shard_split_utils_test.cpp | 23 |
11 files changed, 667 insertions, 84 deletions
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl index 0531535742c..24e940d0f66 100644 --- a/src/mongo/db/repl/repl_server_parameters.idl +++ b/src/mongo/db/repl/repl_server_parameters.idl @@ -632,6 +632,16 @@ server_parameters: cpp_varname: allowMultipleArbiters default: false + shardSplitGarbageCollectionDelayMS: + description: >- + The amount of time in milliseconds that the donor should wait before + removing the shard split state document after receiving forgetShardSplit. + set_at: [ startup, runtime ] + cpp_vartype: AtomicWord<int> + cpp_varname: shardSplitGarbageCollectionDelayMS + default: + expr: 15 * 60 * 1000 + feature_flags: featureFlagRetryableFindAndModify: description: >- diff --git a/src/mongo/db/serverless/shard_split_commands.cpp b/src/mongo/db/serverless/shard_split_commands.cpp index 32143d5bb4c..27b010d4ee9 100644 --- a/src/mongo/db/serverless/shard_split_commands.cpp +++ b/src/mongo/db/serverless/shard_split_commands.cpp @@ -75,7 +75,7 @@ public: uassertStatusOK(donorPtr->checkIfOptionsConflict(stateDoc)); - auto state = donorPtr->completionFuture().get(opCtx); + auto state = donorPtr->decisionFuture().get(opCtx); auto response = Response(state.state); if (state.abortReason) { @@ -149,7 +149,7 @@ public: instance->tryAbort(); - auto state = instance->completionFuture().get(opCtx); + auto state = instance->decisionFuture().get(opCtx); uassert(ErrorCodes::CommandFailed, "Failed to abort shard split", @@ -192,5 +192,80 @@ public: } } abortShardSplitCmd; +class ForgetShardSplitCmd : public TypedCommand<ForgetShardSplitCmd> { +public: + using Request = ForgetShardSplit; + + class Invocation : public InvocationBase { + + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(6236600, + "feature \"shard split\" not supported", + repl::feature_flags::gShardSplit.isEnabled( + serverGlobalParams.featureCompatibility)); + + const RequestType& cmd = request(); + + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + auto splitService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(ShardSplitDonorService::kServiceName); + auto optionalDonor = ShardSplitDonorService::DonorStateMachine::lookup( + opCtx, splitService, BSON("_id" << cmd.getMigrationId())); + + uassert(ErrorCodes::NoSuchTenantMigration, + str::stream() << "Could not find shard split with id " << cmd.getMigrationId(), + optionalDonor); + + auto donorPtr = optionalDonor.get(); + + auto decision = donorPtr->decisionFuture().get(opCtx); + + uassert( + ErrorCodes::TenantMigrationInProgress, + "Could not forget migration with id {} since no decision has been made yet"_format( + cmd.getMigrationId().toString()), + decision.state == ShardSplitDonorStateEnum::kCommitted || + decision.state == ShardSplitDonorStateEnum::kAborted); + + donorPtr->tryForget(); + donorPtr->completionFuture().get(opCtx); + } + + private: + void doCheckAuthorization(OperationContext* opCtx) const final { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::runTenantMigration)); + } + + bool supportsWriteConcern() const override { + return true; + } + + NamespaceString ns() const { + return NamespaceString(request().getDbName(), ""); + } + }; + + std::string help() const override { + return "Forget the shard split operation."; + } + + bool adminOnly() const override { + return true; + } + + BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return BasicCommand::AllowedOnSecondary::kNever; + } +} forgetShardSplitCmd; + + } // namespace } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_commands.idl b/src/mongo/db/serverless/shard_split_commands.idl index f7aa6b1cf98..d3b042f408c 100644 --- a/src/mongo/db/serverless/shard_split_commands.idl +++ b/src/mongo/db/serverless/shard_split_commands.idl @@ -81,3 +81,14 @@ commands: migrationId: description: "Unique identifier for the shard split operation." type: uuid + + forgetShardSplit: + description: "Parser for the `forgetShardSplit` command." + command_name: forgetShardSplit + strict: true + namespace: ignored + api_version: "" + fields: + migrationId: + description: "Unique identifier for the shard split operation." + type: uuid diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index 48c660fb12f..0f40fd7a434 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -44,31 +44,71 @@ const auto tenantIdsToDeleteDecoration = ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { auto donorStateDoc = ShardSplitDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), doc); - const std::string errmsg = str::stream() << "invalid donor state doc " << doc; + const std::string errmsg = "Invalid donor state doc, {}: {}"; switch (donorStateDoc.getState()) { case ShardSplitDonorStateEnum::kUninitialized: uassert(ErrorCodes::BadValue, - errmsg, - !donorStateDoc.getBlockTimestamp() && !donorStateDoc.getCommitOrAbortOpTime() && - !donorStateDoc.getAbortReason()); + fmt::format(errmsg, + "BlockTimeStamp should not be set in data sync state", + doc.toString()), + !donorStateDoc.getBlockTimestamp()); + uassert(ErrorCodes::BadValue, + fmt::format(errmsg, + "CommitOrAbortOpTime should not be set in data sync state", + doc.toString()), + !donorStateDoc.getCommitOrAbortOpTime()); + uassert(ErrorCodes::BadValue, + fmt::format(errmsg, + "Cannot have abortReason while being in data sync state", + doc.toString()), + !donorStateDoc.getAbortReason()); break; case ShardSplitDonorStateEnum::kBlocking: uassert(ErrorCodes::BadValue, - errmsg, - donorStateDoc.getBlockTimestamp() && !donorStateDoc.getCommitOrAbortOpTime() && - !donorStateDoc.getAbortReason()); + fmt::format(errmsg, + "Missing blockTimeStamp while being in blocking state", + doc.toString()), + donorStateDoc.getBlockTimestamp()); + uassert( + ErrorCodes::BadValue, + fmt::format(errmsg, + "CommitOrAbortOpTime shouldn't be set while being in blocking state", + doc.toString()), + !donorStateDoc.getCommitOrAbortOpTime()); + uassert(ErrorCodes::BadValue, + fmt::format(errmsg, + "Cannot have an abortReason while being in blocking state", + doc.toString()), + !donorStateDoc.getAbortReason()); break; case ShardSplitDonorStateEnum::kCommitted: uassert(ErrorCodes::BadValue, - errmsg, - donorStateDoc.getBlockTimestamp() && donorStateDoc.getCommitOrAbortOpTime() && - !donorStateDoc.getAbortReason()); + fmt::format(errmsg, + "Missing blockTimeStamp while being in committed state", + doc.toString()), + donorStateDoc.getBlockTimestamp()); + uassert(ErrorCodes::BadValue, + fmt::format(errmsg, + "Missing CommitOrAbortOpTime while being in committed state", + doc.toString()), + donorStateDoc.getCommitOrAbortOpTime()); + uassert(ErrorCodes::BadValue, + fmt::format(errmsg, + "Cannot have abortReason while being in committed state", + doc.toString()), + !donorStateDoc.getAbortReason()); break; case ShardSplitDonorStateEnum::kAborted: uassert(ErrorCodes::BadValue, - errmsg, - donorStateDoc.getAbortReason() && donorStateDoc.getCommitOrAbortOpTime()); + fmt::format( + errmsg, "Missing abortReason while being in aborted state", doc.toString()), + donorStateDoc.getAbortReason()); + uassert(ErrorCodes::BadValue, + fmt::format(errmsg, + "Missing CommitOrAbortOpTime while being in aborted state", + doc.toString()), + donorStateDoc.getCommitOrAbortOpTime()); break; default: MONGO_UNREACHABLE; @@ -95,7 +135,7 @@ void onBlockerInitialization(OperationContext* opCtx, auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); auto recipientConnectionString = - repl::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); + serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); for (const auto& tenantId : optionalTenants.get()) { auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( @@ -125,23 +165,26 @@ void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocume invariant(donorStateDoc.getBlockTimestamp()); invariant(donorStateDoc.getTenantIds()); - auto tenantIds = donorStateDoc.getTenantIds().get(); - for (auto tenantId : tenantIds) { - auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( - opCtx->getServiceContext(), tenantId); - invariant(mtab); - - if (!opCtx->writesAreReplicated()) { - // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker before - // reserving the OpTime for the "start blocking" write, so only secondaries call - // startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op observer. - mtab->startBlockingWrites(); + if (donorStateDoc.getTenantIds()) { + auto tenantIds = donorStateDoc.getTenantIds().get(); + for (auto tenantId : tenantIds) { + auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( + opCtx->getServiceContext(), tenantId); + invariant(mtab); + + if (!opCtx->writesAreReplicated()) { + // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker + // before reserving the OpTime for the "start blocking" write, so only secondaries + // call startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op + // observer. + mtab->startBlockingWrites(); + } + + // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since + // startBlockingReadsAfter just needs to be called before the "start blocking" write's + // oplog hole is filled. + mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get()); } - - // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since - // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog - // hole is filled. - mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get()); } } @@ -201,6 +244,51 @@ public: : _opCtx(opCtx), _donorStateDoc(std::move(donorStateDoc)) {} void commit(boost::optional<Timestamp>) override { + if (_donorStateDoc.getExpireAt()) { + if (_donorStateDoc.getTenantIds()) { + auto tenantIds = _donorStateDoc.getTenantIds().get(); + for (auto tenantId : tenantIds) { + auto mtab = + tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( + _opCtx->getServiceContext(), tenantId); + + if (!mtab) { + // The state doc and TenantMigrationDonorAccessBlocker for this migration + // were removed immediately after expireAt was set. This is unlikely to + // occur in production where the garbage collection delay should be + // sufficiently large. + continue; + } + + if (!_opCtx->writesAreReplicated()) { + // Setting expireAt implies that the TenantMigrationDonorAccessBlocker for + // this migration will be removed shortly after this. However, a lagged + // secondary might not manage to advance its majority commit point past the + // migration commit or abort opTime and consequently transition out of the + // blocking state before the TenantMigrationDonorAccessBlocker is removed. + // When this occurs, blocked reads or writes will be left waiting for the + // migration decision indefinitely. To avoid that, notify the + // TenantMigrationDonorAccessBlocker here that the commit or abort opTime + // has been majority committed (guaranteed to be true since by design the + // donor never marks its state doc as garbage collectable before the + // migration decision is majority committed). + mtab->onMajorityCommitPointUpdate( + _donorStateDoc.getCommitOrAbortOpTime().get()); + } + + if (_donorStateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { + invariant(mtab->inStateAborted()); + // The migration durably aborted and is now marked as garbage collectable, + // remove its TenantMigrationDonorAccessBlocker right away to allow + // back-to-back migration retries. + TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext()) + .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); + } + } + } + return; + } + switch (_donorStateDoc.getState()) { case ShardSplitDonorStateEnum::kCommitted: onTransitionToCommitted(_opCtx, _donorStateDoc); @@ -286,6 +374,11 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, } auto donorStateDoc = parseAndValidateDonorDocument(doc); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "cannot delete a donor's state document " << doc + << " since it has not been marked as garbage collectable.", + donorStateDoc.getExpireAt()); + if (donorStateDoc.getTenantIds()) { auto tenantIds = *donorStateDoc.getTenantIds(); std::vector<std::string> result; diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 8c346a643ac..bd0cbf32b44 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -34,6 +34,7 @@ #include "mongo/client/streamable_replica_set_monitor.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/tenant_migration_access_blocker_util.h" @@ -107,6 +108,8 @@ MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlocking); MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking); MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance); +const std::string kTTLIndexName = "ShardSplitDonorTTLIndex"; + } // namespace namespace detail { @@ -161,7 +164,7 @@ SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor, auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); invariant(replCoord); - auto recipientConnectionString = repl::makeRecipientConnectionString( + auto recipientConnectionString = serverless::makeRecipientConnectionString( replCoord->getConfig(), recipientTagName, recipientSetName); auto monitor = ReplicaSetMonitor::createIfNeeded(MongoURI{recipientConnectionString}); invariant(monitor); @@ -201,6 +204,36 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ShardSplitDonorService::cons ShardSplitDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), initialState)); } +ExecutorFuture<void> ShardSplitDonorService::_createStateDocumentTTLIndex( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) { + return AsyncTry([this] { + auto nss = getStateDocumentsNS(); + + AllowOpCtxWhenServiceRebuildingBlock allowOpCtxBlock(Client::getCurrent()); + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + DBDirectClient client(opCtx); + + BSONObj result; + client.runCommand( + nss.db().toString(), + BSON("createIndexes" + << nss.coll().toString() << "indexes" + << BSON_ARRAY(BSON("key" << BSON("expireAt" << 1) << "name" << kTTLIndexName + << "expireAfterSeconds" << 0))), + result); + uassertStatusOK(getStatusFromCommandResult(result)); + }) + .until([](Status status) { return status.isOK(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, token); +} + +ExecutorFuture<void> ShardSplitDonorService::_rebuildService( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) { + return _createStateDocumentTTLIndex(executor, token); +} + ShardSplitDonorService::DonorStateMachine::DonorStateMachine( ServiceContext* serviceContext, ShardSplitDonorService* splitService, @@ -227,6 +260,17 @@ void ShardSplitDonorService::DonorStateMachine::tryAbort() { } } +void ShardSplitDonorService::DonorStateMachine::tryForget() { + LOGV2(6236601, "Forgetting shard split", "id"_attr = _migrationId); + + stdx::lock_guard<Latch> lg(_mutex); + if (_forgetShardSplitReceivedPromise.getFuture().isReady()) { + LOGV2(6236602, "Donor Forget Migration promise is already ready", "id"_attr = _migrationId); + return; + } + _forgetShardSplitReceivedPromise.emplaceValue(); +} + Status ShardSplitDonorService::DonorStateMachine::checkIfOptionsConflict( const ShardSplitDonorDocument& stateDoc) const { stdx::lock_guard<Latch> lg(_mutex); @@ -266,13 +310,12 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( "id"_attr = _migrationId, "timeout"_attr = repl::shardSplitTimeoutMS.load()); - _completionPromise.setWith([&] { + _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken] { // Note we do not use the abort split token here because the abortShardSplit // command waits for a decision to be persisted which will not happen if // inserting the initial state document fails. - return _writeInitialDocument(executor, primaryToken); }) .then([this] { pauseShardSplitBeforeBlocking.pauseWhileSet(); }) @@ -310,7 +353,17 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( .unsafeToInlineFuture(); }); - return _completionPromise.getFuture().semi().ignoreValue(); + _completionPromise.setFrom( + _decisionPromise.getFuture() + .semi() + .ignoreValue() + .thenRunOn(**executor) + .then([this, anchor = shared_from_this(), executor, primaryToken] { + return _waitForForgetCmdThenMarkGarbageCollectible(executor, primaryToken); + }) + .unsafeToInlineFuture()); + + return _completionPromise.getFuture().semi(); } void ShardSplitDonorService::DonorStateMachine::interrupt(Status status) {} @@ -361,7 +414,7 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestam invariant(_stateDoc.getRecipientTagName()); auto recipientTagName = *_stateDoc.getRecipientTagName(); - auto recipientNodes = getRecipientMembers(replCoord->getConfig(), recipientTagName); + auto recipientNodes = serverless::getRecipientMembers(replCoord->getConfig(), recipientTagName); WriteConcernOptions writeConcern; writeConcern.w = WTags{{recipientTagName.toString(), recipientNodes.size()}}; @@ -667,4 +720,54 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( return DurableState{_stateDoc.getState(), _abortReason}; }); } + +ExecutorFuture<repl::OpTime> +ShardSplitDonorService::DonorStateMachine::_markStateDocAsGarbageCollectable( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) { + stdx::lock_guard<Latch> lg(_mutex); + + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); + + return AsyncTry([this, self = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc)); + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + }) + .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, token); +} + +ExecutorFuture<void> +ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible( + const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { + LOGV2(6236603, + "Waiting to receive 'forgetShardSplit' command.", + "migrationId"_attr = _migrationId); + auto expiredAt = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + return _stateDoc.getExpireAt(); + }(); + + if (expiredAt) { + LOGV2(6236604, "expiredAt is already set", "migrationId"_attr = _migrationId); + return ExecutorFuture(**executor); + } + + return std::move(_forgetShardSplitReceivedPromise.getFuture()) + .thenRunOn(**executor) + .then([this, self = shared_from_this(), executor, token] { + LOGV2(6236606, + "Marking shard split as garbage-collectable.", + "migrationId"_attr = _migrationId); + return _markStateDocAsGarbageCollectable(executor, token); + }) + .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), token); + }); +} + } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index ff42a57147e..58a2a984703 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -80,6 +80,12 @@ protected: std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override; private: + ExecutorFuture<void> _createStateDocumentTTLIndex( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token); + + ExecutorFuture<void> _rebuildService(std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) override; + ServiceContext* const _serviceContext; }; @@ -103,6 +109,12 @@ public: */ void tryAbort(); + /** + * Try to forget the shard split operation. If the operation is not in a final state, the + * promise will be set but the garbage collection will be skipped. + */ + void tryForget(); + Status checkIfOptionsConflict(const ShardSplitDonorDocument& stateDoc) const; SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor, @@ -110,7 +122,11 @@ public: void interrupt(Status status) override; - SharedSemiFuture<DurableState> completionFuture() const { + SharedSemiFuture<DurableState> decisionFuture() const { + return _decisionPromise.getFuture(); + } + + SharedSemiFuture<void> completionFuture() const { return _completionPromise.getFuture(); } @@ -129,6 +145,14 @@ public: MongoProcessInterface::CurrentOpConnectionsMode connMode, MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override; + /** + * Returns true if the state doc was marked to expire (marked garbage collectable). + */ + bool isGarbageCollectable() const { + stdx::lock_guard<Latch> lg(_mutex); + return !!_stateDoc.getExpireAt(); + } + private: // Tasks ExecutorFuture<void> _enterBlockingState(const ScopedTaskExecutorPtr& executor, @@ -164,6 +188,12 @@ private: const CancellationToken& instanceAbortToken, const CancellationToken& abortToken); + ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable( + std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token); + + ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible( + const ScopedTaskExecutorPtr& executor, const CancellationToken& token); + private: const NamespaceString _stateDocumentsNS = NamespaceString::kTenantSplitDonorsNamespace; mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardSplitDonorService::_mutex"); @@ -186,11 +216,17 @@ private: // A promise fulfilled when the replicaSetMonitor has been created; SharedPromise<void> _replicaSetMonitorCreatedPromise; - // A promise fulfilled when the shard split operation has fully completed - SharedPromise<DurableState> _completionPromise; + // A promise fulfilled when the shard split has committed or aborted. + SharedPromise<DurableState> _decisionPromise; + + // A promise fulfilled when the shard split operation has fully completed. + SharedPromise<void> _completionPromise; // A promise fulfilled when all recipient nodes have accepted the split. SharedPromise<void> _recipientAcceptedSplit; + + // A promise fulfilled when tryForget is called. + SharedPromise<void> _forgetShardSplitReceivedPromise; }; } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index 7dc4bccd866..8a979fbff5c 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -36,6 +36,7 @@ #include "mongo/client/streamable_replica_set_monitor_for_testing.h" #include "mongo/db/catalog/database_holder_mock.h" #include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/op_observer_impl.h" #include "mongo/db/op_observer_registry.h" #include "mongo/db/repl/drop_pending_collection_reaper.h" @@ -49,6 +50,7 @@ #include "mongo/db/serverless/shard_split_donor_service.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" #include "mongo/db/serverless/shard_split_test_utils.h" +#include "mongo/db/serverless/shard_split_utils.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/dbtests/mock/mock_conn_registry.h" #include "mongo/dbtests/mock/mock_replica_set.h" @@ -62,6 +64,7 @@ #include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/unittest/death_test.h" #include "mongo/unittest/log_test.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" @@ -108,6 +111,27 @@ std::ostringstream& operator<<(std::ostringstream& builder, return builder; } +void fastForwardCommittedSnapshotOpTime( + std::shared_ptr<ShardSplitDonorService::DonorStateMachine> instance, + ServiceContext* serviceContext, + OperationContext* opCtx, + const UUID& uuid) { + // When a state document is transitioned to kAborted, the ShardSplitDonorOpObserver will + // transition tenant access blockers to a kAborted state if, and only if, the abort timestamp + // is less than or equal to the currentCommittedSnapshotOpTime. Since we are using the + // ReplicationCoordinatorMock, we must manually manage the currentCommittedSnapshotOpTime + // using this method. + auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>( + repl::ReplicationCoordinator::get(serviceContext)); + + auto foundStateDoc = uassertStatusOK(serverless::getStateDocument(opCtx, uuid)); + invariant(foundStateDoc.getCommitOrAbortOpTime()); + + replCoord->setCurrentCommittedSnapshotOpTime(*foundStateDoc.getCommitOrAbortOpTime()); + serviceContext->getOpObserver()->onMajorityCommitPointUpdate( + serviceContext, *foundStateDoc.getCommitOrAbortOpTime()); +} + class ShardSplitDonorServiceTest : public repl::PrimaryOnlyServiceMongoDTest { public: void setUp() override { @@ -170,7 +194,7 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) ASSERT(serviceInstance.get()); ASSERT_EQ(_uuid, serviceInstance->getId()); - auto completionFuture = serviceInstance->completionFuture(); + auto decisionFuture = serviceInstance->decisionFuture(); std::shared_ptr<TopologyDescription> topologyDescriptionOld = std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration()); @@ -189,18 +213,24 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) publisher->onTopologyDescriptionChangedEvent(topologyDescriptionOld, topologyDescriptionNew); - completionFuture.wait(); + decisionFuture.wait(); - auto result = completionFuture.get(); + auto result = decisionFuture.get(); ASSERT(!result.abortReason); ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); + + serviceInstance->tryForget(); + + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) { auto opCtx = makeOperationContext(); + auto serviceContext = getServiceContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); test::shard_split::reconfigToAddRecipientNodes( - getServiceContext(), _recipientTagName, _replSet.getHosts()); + serviceContext, _recipientTagName, _replSet.getHosts()); auto stateDocument = defaultStateDocument(); @@ -213,12 +243,18 @@ TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) { ASSERT(serviceInstance.get()); ASSERT_EQ(_uuid, serviceInstance->getId()); - auto completionFuture = serviceInstance->completionFuture(); + auto decisionFuture = serviceInstance->decisionFuture(); - auto result = completionFuture.get(); + auto result = decisionFuture.get(); ASSERT(result.abortReason); ASSERT_EQ(result.abortReason->code(), ErrorCodes::ExceededTimeLimit); + + fastForwardCommittedSnapshotOpTime(serviceInstance, serviceContext, opCtx.get(), _uuid); + serviceInstance->tryForget(); + + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } // Abort scenario : abortSplit called before startSplit. @@ -234,20 +270,26 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) { opCtx.get(), _service, stateDocument.toBSON()); ASSERT(serviceInstance.get()); - auto result = serviceInstance->completionFuture().get(opCtx.get()); + auto result = serviceInstance->decisionFuture().get(opCtx.get()); ASSERT(!!result.abortReason); ASSERT_EQ(result.abortReason->code(), ErrorCodes::TenantMigrationAborted); ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted); + + serviceInstance->tryForget(); + + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } // Abort scenario : instance created through startSplit then calling abortSplit. TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) { auto opCtx = makeOperationContext(); + auto serviceContext = getServiceContext(); test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); test::shard_split::reconfigToAddRecipientNodes( - getServiceContext(), _recipientTagName, _replSet.getHosts()); + serviceContext, _recipientTagName, _replSet.getHosts()); std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance; { @@ -262,11 +304,18 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) { serviceInstance->tryAbort(); } - auto result = serviceInstance->completionFuture().get(opCtx.get()); + + auto result = serviceInstance->decisionFuture().get(opCtx.get()); ASSERT(!!result.abortReason); ASSERT_EQ(result.abortReason->code(), ErrorCodes::TenantMigrationAborted); ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted); + + fastForwardCommittedSnapshotOpTime(serviceInstance, serviceContext, opCtx.get(), _uuid); + serviceInstance->tryForget(); + + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } TEST_F(ShardSplitDonorServiceTest, StepDownTest) { @@ -290,9 +339,76 @@ TEST_F(ShardSplitDonorServiceTest, StepDownTest) { stepDown(); } - auto result = serviceInstance->completionFuture().getNoThrow(); + auto result = serviceInstance->decisionFuture().getNoThrow(); ASSERT_FALSE(result.isOK()); ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus()); + + ASSERT_EQ(serviceInstance->completionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); + ASSERT_FALSE(serviceInstance->isGarbageCollectable()); +} + +// TODO(SERVER-62363) : Remove this test once the ticket is completed. +DEATH_TEST_F(ShardSplitDonorServiceTest, StartWithExpireAtAlreadySet, "invariant") { + auto opCtx = makeOperationContext(); + + test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kCommitted); + + auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( + opCtx.get(), _service, stateDocument.toBSON()); + + ASSERT(serviceInstance.get()); + + // this triggers an invariant updateResult.numDocsModified == 1 in _updateStateDocument when + // going in the _handleErrorOrEnterAbortedState. + auto result = serviceInstance->decisionFuture().get(opCtx.get()); +} + +TEST_F(ShardSplitDonorServiceTest, StepUpWithkCommitted) { + auto opCtx = makeOperationContext(); + + test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get()); + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts()); + + auto nss = NamespaceString::kTenantSplitDonorsNamespace; + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized); + + // insert the document for the first time. + ASSERT_OK(serverless::insertStateDoc(opCtx.get(), stateDocument)); + + { + // update to kCommitted + stateDocument.setState(ShardSplitDonorStateEnum::kCommitted); + boost::optional<mongo::Date_t> expireAt = getServiceContext()->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}; + stateDocument.setExpireAt(expireAt); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1)); + + ASSERT_OK(serverless::updateStateDoc(opCtx.get(), stateDocument)); + + auto foundStateDoc = uassertStatusOK(serverless::getStateDocument(opCtx.get(), _uuid)); + invariant(foundStateDoc.getExpireAt()); + ASSERT_EQ(*foundStateDoc.getExpireAt(), *expireAt); + } + + auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate( + opCtx.get(), _service, stateDocument.toBSON()); + ASSERT(serviceInstance.get()); + + auto result = serviceInstance->decisionFuture().get(); + ASSERT(!result.abortReason); + ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); + + // we don't need to call tryForget since expireAt is already set the completionPromise will + // complete. + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } TEST_F(ShardSplitDonorServiceTest, TimeoutAbortsAwaitReplication) { @@ -327,10 +443,16 @@ TEST_F(ShardSplitDonorServiceTest, TimeoutAbortsAwaitReplication) { }); } - uassertStatusOK(serviceInstance->completionFuture().getNoThrow()); - auto result = serviceInstance->completionFuture().get(opCtx.get()); + uassertStatusOK(serviceInstance->decisionFuture().getNoThrow()); + auto result = serviceInstance->decisionFuture().get(opCtx.get()); ASSERT(result.abortReason); ASSERT_EQ(result.abortReason->code(), ErrorCodes::ExceededTimeLimit); + + fastForwardCommittedSnapshotOpTime(serviceInstance, getServiceContext(), opCtx.get(), _uuid); + serviceInstance->tryForget(); + + ASSERT_OK(serviceInstance->completionFuture().getNoThrow()); + ASSERT_TRUE(serviceInstance->isGarbageCollectable()); } class SplitReplicaSetObserverTest : public ServiceContextTest { diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl index 2140427732c..81cef079bff 100644 --- a/src/mongo/db/serverless/shard_split_state_machine.idl +++ b/src/mongo/db/serverless/shard_split_state_machine.idl @@ -85,3 +85,9 @@ structs: type: object_owned description: "The error that caused the split to abort." optional: true + expireAt: + type: date + description: >- + The wall-clock time at which the state machine document should be + removed by the TTL monitor. + optional: true diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp index 35c138122b3..c1d6afcb913 100644 --- a/src/mongo/db/serverless/shard_split_utils.cpp +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -28,17 +28,23 @@ */ #include "mongo/db/serverless/shard_split_utils.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/lock_manager_defs.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/repl/repl_set_config.h" namespace mongo { -namespace repl { -std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config, - const StringData& recipientTagName) { - std::vector<MemberConfig> result; + +namespace serverless { +std::vector<repl::MemberConfig> getRecipientMembers(const repl::ReplSetConfig& config, + const StringData& recipientTagName) { + std::vector<repl::MemberConfig> result; const auto& tagConfig = config.getTagConfig(); for (auto member : config.members()) { auto matchesTag = - std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const ReplSetTag& tag) { + std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const repl::ReplSetTag& tag) { return tagConfig.getTagKey(tag) == recipientTagName; }); @@ -51,7 +57,7 @@ std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config, } -ConnectionString makeRecipientConnectionString(const ReplSetConfig& config, +ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config, const StringData& recipientTagName, const StringData& recipientSetName) { auto recipientMembers = getRecipientMembers(config, recipientTagName); @@ -59,14 +65,14 @@ ConnectionString makeRecipientConnectionString(const ReplSetConfig& config, std::transform(recipientMembers.cbegin(), recipientMembers.cend(), std::back_inserter(recipientNodes), - [](const MemberConfig& member) { return member.getHostAndPort(); }); + [](const repl::MemberConfig& member) { return member.getHostAndPort(); }); return ConnectionString::forReplicaSet(recipientSetName.toString(), recipientNodes); } -ReplSetConfig makeSplitConfig(const ReplSetConfig& config, - const std::string& recipientSetName, - const std::string& recipientTagName) { +repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, + const std::string& recipientSetName, + const std::string& recipientTagName) { dassert(!recipientSetName.empty() && recipientSetName != config.getReplSetName()); uassert(6201800, "We can not make a split config of an existing split config.", @@ -77,7 +83,7 @@ ReplSetConfig makeSplitConfig(const ReplSetConfig& config, int donorIndex = 0, recipientIndex = 0; for (const auto& member : config.members()) { bool isRecipient = - std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const ReplSetTag& tag) { + std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const repl::ReplSetTag& tag) { return tagConfig.getTagKey(tag) == recipientTagName; }); if (isRecipient) { @@ -114,7 +120,89 @@ ReplSetConfig makeSplitConfig(const ReplSetConfig& config, splitConfigBob.append("members", donorMembers); splitConfigBob.append("recipientConfig", recipientConfigBob.obj()); - return ReplSetConfig::parse(splitConfigBob.obj()); + return repl::ReplSetConfig::parse(splitConfigBob.obj()); +} + +Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) { + const auto nss = NamespaceString::kTenantSplitDonorsNamespace; + AutoGetCollection collection(opCtx, nss, MODE_IX); + + uassert(ErrorCodes::PrimarySteppedDown, + str::stream() << "No longer primary while attempting to insert shard split" + " state document", + repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)); + + return writeConflictRetry(opCtx, "insertShardSplitStateDoc", nss.ns(), [&]() -> Status { + const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName + << stateDoc.getId() << ShardSplitDonorDocument::kExpireAtFieldName + << BSON("$exists" << false)); + const auto updateMod = BSON("$setOnInsert" << stateDoc.toBSON()); + auto updateResult = + Helpers::upsert(opCtx, nss.ns(), filter, updateMod, /*fromMigrate=*/false); + + invariant(!updateResult.numDocsModified); + if (updateResult.upsertedId.isEmpty()) { + return {ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Failed to insert the shard split state doc: " + << stateDoc.toBSON()}; + } + return Status::OK(); + }); +} + +Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) { + const auto nss = NamespaceString::kTenantSplitDonorsNamespace; + AutoGetCollection collection(opCtx, nss, MODE_IX); + + if (!collection) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << nss.ns() << " does not exist"); + } + + return writeConflictRetry(opCtx, "updateShardSplitStateDoc", nss.ns(), [&]() -> Status { + auto updateResult = + Helpers::upsert(opCtx, nss.ns(), stateDoc.toBSON(), /*fromMigrate=*/false); + if (updateResult.numMatched == 0) { + return {ErrorCodes::NoSuchKey, + str::stream() << "Existing shard split state document not found for id: " + << stateDoc.getId()}; + } + + return Status::OK(); + }); +} + +StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx, + const UUID& shardSplitId) { + // Read the most up to date data. + ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kNoTimestamp); + AutoGetCollectionForRead collection(opCtx, NamespaceString::kTenantSplitDonorsNamespace); + if (!collection) { + return Status(ErrorCodes::NamespaceNotFound, + str::stream() << "Collection not found looking for state document: " + << NamespaceString::kTenantSplitDonorsNamespace.ns()); + } + + BSONObj result; + auto foundDoc = Helpers::findOne( + opCtx, collection.getCollection(), BSON("_id" << shardSplitId), result, true); + + if (!foundDoc) { + return Status(ErrorCodes::NoMatchingDocument, + str::stream() + << "No matching state doc found with shard split id: " << shardSplitId); + } + + try { + return ShardSplitDonorDocument::parse(IDLParserErrorContext("shardSplitStateDocument"), + result); + } catch (DBException& ex) { + return ex.toStatus(str::stream() + << "Invalid BSON found for matching document with shard split id: " + << shardSplitId << " , res: " << result); + } } -} // namespace repl + + +} // namespace serverless } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h index ce3a4e6783f..99060abfa5c 100644 --- a/src/mongo/db/serverless/shard_split_utils.h +++ b/src/mongo/db/serverless/shard_split_utils.h @@ -30,16 +30,16 @@ #pragma once #include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/serverless/shard_split_state_machine_gen.h" namespace mongo { -namespace repl { - +namespace serverless { /** * @returns A list of `MemberConfig` for member nodes which match a provided replica set tag name */ -std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config, - const StringData& recipientTagName); +std::vector<repl::MemberConfig> getRecipientMembers(const repl::ReplSetConfig& config, + const StringData& recipientTagName); /** @@ -47,7 +47,7 @@ std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config, * `recipientTagName`. The `recipientSetName` is the `replSet` parameter of the recipient * connection string. */ -ConnectionString makeRecipientConnectionString(const ReplSetConfig& config, +ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config, const StringData& recipientTagName, const StringData& recipientSetName); @@ -57,9 +57,47 @@ ConnectionString makeRecipientConnectionString(const ReplSetConfig& config, * to filter the local member list for recipient nodes. The `recipientSetName` is used to validate * that we are indeed generating a config for a recipient set with a new name. */ -ReplSetConfig makeSplitConfig(const ReplSetConfig& config, - const std::string& recipientSetName, - const std::string& recipientTagName); +repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, + const std::string& recipientSetName, + const std::string& recipientTagName); + +/** + * Inserts the shard split state document 'stateDoc' into + * 'config.tenantSplitDonors' collection. Also, creates the collection if not present + * before inserting the document. + * + * NOTE: A state doc might get inserted based on a decision made out of a stale read within a + * storage transaction. Callers are expected to have their own concurrency mechanism to handle + * write skew problem. + * + * @Returns 'ConflictingOperationInProgress' error code if an active shard split op found for the + * given state doc id provided in the 'stateDoc'. + * + * Throws 'DuplicateKey' error code if a document already exists on the disk with the same + * shardSplitId, irrespective of the document marked for garbage collect or not. + */ +Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc); + +/** + * Updates the shard split state doc in the database. + * + * Returns 'NoSuchKey' error code if no state document already exists on the disk with the same + * shardSplitId. + */ +Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc); + +/** + * Returns the state doc matching the document with shardSplitId from the disk if it + * exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date + * data. + * + * If the stored state doc on disk contains invalid BSON, the 'InvalidBSON' error code is + * returned. + * + * Returns 'NoMatchingDocument' error code if no document with 'shardSplitId' is found. + */ +StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx, + const UUID& shardSplitId); -} // namespace repl +} // namespace serverless } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_utils_test.cpp b/src/mongo/db/serverless/shard_split_utils_test.cpp index 12994379de3..1d24b091090 100644 --- a/src/mongo/db/serverless/shard_split_utils_test.cpp +++ b/src/mongo/db/serverless/shard_split_utils_test.cpp @@ -82,7 +82,7 @@ TEST(MakeSplitConfig, toBSONRoundTripAbility) { << "recipientConfig" << resultRecipientConfigBSON); const ReplSetConfig splitConfigResult = - repl::makeSplitConfig(configA, recipientConfigSetName, recipientTagName); + serverless::makeSplitConfig(configA, recipientConfigSetName, recipientTagName); ASSERT_OK(splitConfigResult.validate()); ASSERT_TRUE(splitConfigResult == ReplSetConfig::parse(splitConfigResult.toBSON())); @@ -119,7 +119,7 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) { const ReplSetConfig splitConfig = - repl::makeSplitConfig(config, recipientConfigSetName, recipientTagName); + serverless::makeSplitConfig(config, recipientConfigSetName, recipientTagName); ASSERT_OK(splitConfig.validate()); ASSERT_EQ(splitConfig.getReplSetName(), donorConfigSetName); ASSERT_TRUE(splitConfig.toBSON().hasField("members")); @@ -135,9 +135,10 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) { ASSERT_TRUE(recipientConfigPtr->getRecipientConfig() == nullptr); ASSERT_EQ(recipientConfigPtr->getReplSetName(), recipientConfigSetName); - ASSERT_THROWS_CODE(repl::makeSplitConfig(splitConfig, recipientConfigSetName, recipientTagName), - AssertionException, - 6201800 /*calling on a splitconfig*/); + ASSERT_THROWS_CODE( + serverless::makeSplitConfig(splitConfig, recipientConfigSetName, recipientTagName), + AssertionException, + 6201800 /*calling on a splitconfig*/); } TEST(MakeSplitConfig, SplitConfigAssertionsTest) { @@ -150,9 +151,9 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) { << "localhost:20002" << "priority" << 0 << "votes" << 0))); - ASSERT_THROWS_CODE(repl::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), - recipientConfigSetName, - recipientTagName), + ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), + recipientConfigSetName, + recipientTagName), AssertionException, 6201801 /*no recipient members created*/); @@ -165,9 +166,9 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) { << BSON(recipientTagName << "one"))) << "settings" << BSON("electionTimeoutMillis" << 1000)); - ASSERT_THROWS_CODE(repl::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), - recipientConfigSetName, - recipientTagName), + ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), + recipientConfigSetName, + recipientTagName), AssertionException, 6201802 /*no donor members created*/); } |