summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authormathisbessamdb <mathis.bessa@mongodb.com>2022-02-17 22:10:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-17 22:47:16 +0000
commit909df54290217588dfb01b0942f8ba2cac642bac (patch)
treea3fa49bcfcf158480934e68a13c8e9c8d2c324ff /src/mongo/db
parent34308d1aab95d7604384fb0e17591b0c9ccb7a54 (diff)
downloadmongo-909df54290217588dfb01b0942f8ba2cac642bac.tar.gz
SERVER-62366 Add forgetShardSplit command
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/repl_server_parameters.idl10
-rw-r--r--src/mongo/db/serverless/shard_split_commands.cpp79
-rw-r--r--src/mongo/db/serverless/shard_split_commands.idl11
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp151
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp113
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.h42
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp146
-rw-r--r--src/mongo/db/serverless/shard_split_state_machine.idl6
-rw-r--r--src/mongo/db/serverless/shard_split_utils.cpp114
-rw-r--r--src/mongo/db/serverless/shard_split_utils.h56
-rw-r--r--src/mongo/db/serverless/shard_split_utils_test.cpp23
11 files changed, 667 insertions, 84 deletions
diff --git a/src/mongo/db/repl/repl_server_parameters.idl b/src/mongo/db/repl/repl_server_parameters.idl
index 0531535742c..24e940d0f66 100644
--- a/src/mongo/db/repl/repl_server_parameters.idl
+++ b/src/mongo/db/repl/repl_server_parameters.idl
@@ -632,6 +632,16 @@ server_parameters:
cpp_varname: allowMultipleArbiters
default: false
+ shardSplitGarbageCollectionDelayMS:
+ description: >-
+ The amount of time in milliseconds that the donor should wait before
+ removing the shard split state document after receiving forgetShardSplit.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<int>
+ cpp_varname: shardSplitGarbageCollectionDelayMS
+ default:
+ expr: 15 * 60 * 1000
+
feature_flags:
featureFlagRetryableFindAndModify:
description: >-
diff --git a/src/mongo/db/serverless/shard_split_commands.cpp b/src/mongo/db/serverless/shard_split_commands.cpp
index 32143d5bb4c..27b010d4ee9 100644
--- a/src/mongo/db/serverless/shard_split_commands.cpp
+++ b/src/mongo/db/serverless/shard_split_commands.cpp
@@ -75,7 +75,7 @@ public:
uassertStatusOK(donorPtr->checkIfOptionsConflict(stateDoc));
- auto state = donorPtr->completionFuture().get(opCtx);
+ auto state = donorPtr->decisionFuture().get(opCtx);
auto response = Response(state.state);
if (state.abortReason) {
@@ -149,7 +149,7 @@ public:
instance->tryAbort();
- auto state = instance->completionFuture().get(opCtx);
+ auto state = instance->decisionFuture().get(opCtx);
uassert(ErrorCodes::CommandFailed,
"Failed to abort shard split",
@@ -192,5 +192,80 @@ public:
}
} abortShardSplitCmd;
+class ForgetShardSplitCmd : public TypedCommand<ForgetShardSplitCmd> {
+public:
+ using Request = ForgetShardSplit;
+
+ class Invocation : public InvocationBase {
+
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ uassert(6236600,
+ "feature \"shard split\" not supported",
+ repl::feature_flags::gShardSplit.isEnabled(
+ serverGlobalParams.featureCompatibility));
+
+ const RequestType& cmd = request();
+
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
+
+ auto splitService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(ShardSplitDonorService::kServiceName);
+ auto optionalDonor = ShardSplitDonorService::DonorStateMachine::lookup(
+ opCtx, splitService, BSON("_id" << cmd.getMigrationId()));
+
+ uassert(ErrorCodes::NoSuchTenantMigration,
+ str::stream() << "Could not find shard split with id " << cmd.getMigrationId(),
+ optionalDonor);
+
+ auto donorPtr = optionalDonor.get();
+
+ auto decision = donorPtr->decisionFuture().get(opCtx);
+
+ uassert(
+ ErrorCodes::TenantMigrationInProgress,
+ "Could not forget migration with id {} since no decision has been made yet"_format(
+ cmd.getMigrationId().toString()),
+ decision.state == ShardSplitDonorStateEnum::kCommitted ||
+ decision.state == ShardSplitDonorStateEnum::kAborted);
+
+ donorPtr->tryForget();
+ donorPtr->completionFuture().get(opCtx);
+ }
+
+ private:
+ void doCheckAuthorization(OperationContext* opCtx) const final {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::runTenantMigration));
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ NamespaceString ns() const {
+ return NamespaceString(request().getDbName(), "");
+ }
+ };
+
+ std::string help() const override {
+ return "Forget the shard split operation.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ BasicCommand::AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return BasicCommand::AllowedOnSecondary::kNever;
+ }
+} forgetShardSplitCmd;
+
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_commands.idl b/src/mongo/db/serverless/shard_split_commands.idl
index f7aa6b1cf98..d3b042f408c 100644
--- a/src/mongo/db/serverless/shard_split_commands.idl
+++ b/src/mongo/db/serverless/shard_split_commands.idl
@@ -81,3 +81,14 @@ commands:
migrationId:
description: "Unique identifier for the shard split operation."
type: uuid
+
+ forgetShardSplit:
+ description: "Parser for the `forgetShardSplit` command."
+ command_name: forgetShardSplit
+ strict: true
+ namespace: ignored
+ api_version: ""
+ fields:
+ migrationId:
+ description: "Unique identifier for the shard split operation."
+ type: uuid
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index 48c660fb12f..0f40fd7a434 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -44,31 +44,71 @@ const auto tenantIdsToDeleteDecoration =
ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
auto donorStateDoc =
ShardSplitDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), doc);
- const std::string errmsg = str::stream() << "invalid donor state doc " << doc;
+ const std::string errmsg = "Invalid donor state doc, {}: {}";
switch (donorStateDoc.getState()) {
case ShardSplitDonorStateEnum::kUninitialized:
uassert(ErrorCodes::BadValue,
- errmsg,
- !donorStateDoc.getBlockTimestamp() && !donorStateDoc.getCommitOrAbortOpTime() &&
- !donorStateDoc.getAbortReason());
+ fmt::format(errmsg,
+ "BlockTimeStamp should not be set in data sync state",
+ doc.toString()),
+ !donorStateDoc.getBlockTimestamp());
+ uassert(ErrorCodes::BadValue,
+ fmt::format(errmsg,
+ "CommitOrAbortOpTime should not be set in data sync state",
+ doc.toString()),
+ !donorStateDoc.getCommitOrAbortOpTime());
+ uassert(ErrorCodes::BadValue,
+ fmt::format(errmsg,
+ "Cannot have abortReason while being in data sync state",
+ doc.toString()),
+ !donorStateDoc.getAbortReason());
break;
case ShardSplitDonorStateEnum::kBlocking:
uassert(ErrorCodes::BadValue,
- errmsg,
- donorStateDoc.getBlockTimestamp() && !donorStateDoc.getCommitOrAbortOpTime() &&
- !donorStateDoc.getAbortReason());
+ fmt::format(errmsg,
+ "Missing blockTimeStamp while being in blocking state",
+ doc.toString()),
+ donorStateDoc.getBlockTimestamp());
+ uassert(
+ ErrorCodes::BadValue,
+ fmt::format(errmsg,
+ "CommitOrAbortOpTime shouldn't be set while being in blocking state",
+ doc.toString()),
+ !donorStateDoc.getCommitOrAbortOpTime());
+ uassert(ErrorCodes::BadValue,
+ fmt::format(errmsg,
+ "Cannot have an abortReason while being in blocking state",
+ doc.toString()),
+ !donorStateDoc.getAbortReason());
break;
case ShardSplitDonorStateEnum::kCommitted:
uassert(ErrorCodes::BadValue,
- errmsg,
- donorStateDoc.getBlockTimestamp() && donorStateDoc.getCommitOrAbortOpTime() &&
- !donorStateDoc.getAbortReason());
+ fmt::format(errmsg,
+ "Missing blockTimeStamp while being in committed state",
+ doc.toString()),
+ donorStateDoc.getBlockTimestamp());
+ uassert(ErrorCodes::BadValue,
+ fmt::format(errmsg,
+ "Missing CommitOrAbortOpTime while being in committed state",
+ doc.toString()),
+ donorStateDoc.getCommitOrAbortOpTime());
+ uassert(ErrorCodes::BadValue,
+ fmt::format(errmsg,
+ "Cannot have abortReason while being in committed state",
+ doc.toString()),
+ !donorStateDoc.getAbortReason());
break;
case ShardSplitDonorStateEnum::kAborted:
uassert(ErrorCodes::BadValue,
- errmsg,
- donorStateDoc.getAbortReason() && donorStateDoc.getCommitOrAbortOpTime());
+ fmt::format(
+ errmsg, "Missing abortReason while being in aborted state", doc.toString()),
+ donorStateDoc.getAbortReason());
+ uassert(ErrorCodes::BadValue,
+ fmt::format(errmsg,
+ "Missing CommitOrAbortOpTime while being in aborted state",
+ doc.toString()),
+ donorStateDoc.getCommitOrAbortOpTime());
break;
default:
MONGO_UNREACHABLE;
@@ -95,7 +135,7 @@ void onBlockerInitialization(OperationContext* opCtx,
auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
auto recipientConnectionString =
- repl::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName);
+ serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName);
for (const auto& tenantId : optionalTenants.get()) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
@@ -125,23 +165,26 @@ void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocume
invariant(donorStateDoc.getBlockTimestamp());
invariant(donorStateDoc.getTenantIds());
- auto tenantIds = donorStateDoc.getTenantIds().get();
- for (auto tenantId : tenantIds) {
- auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
- opCtx->getServiceContext(), tenantId);
- invariant(mtab);
-
- if (!opCtx->writesAreReplicated()) {
- // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker before
- // reserving the OpTime for the "start blocking" write, so only secondaries call
- // startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op observer.
- mtab->startBlockingWrites();
+ if (donorStateDoc.getTenantIds()) {
+ auto tenantIds = donorStateDoc.getTenantIds().get();
+ for (auto tenantId : tenantIds) {
+ auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
+ opCtx->getServiceContext(), tenantId);
+ invariant(mtab);
+
+ if (!opCtx->writesAreReplicated()) {
+ // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker
+ // before reserving the OpTime for the "start blocking" write, so only secondaries
+ // call startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op
+ // observer.
+ mtab->startBlockingWrites();
+ }
+
+ // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since
+ // startBlockingReadsAfter just needs to be called before the "start blocking" write's
+ // oplog hole is filled.
+ mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get());
}
-
- // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since
- // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog
- // hole is filled.
- mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get());
}
}
@@ -201,6 +244,51 @@ public:
: _opCtx(opCtx), _donorStateDoc(std::move(donorStateDoc)) {}
void commit(boost::optional<Timestamp>) override {
+ if (_donorStateDoc.getExpireAt()) {
+ if (_donorStateDoc.getTenantIds()) {
+ auto tenantIds = _donorStateDoc.getTenantIds().get();
+ for (auto tenantId : tenantIds) {
+ auto mtab =
+ tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
+ _opCtx->getServiceContext(), tenantId);
+
+ if (!mtab) {
+ // The state doc and TenantMigrationDonorAccessBlocker for this migration
+ // were removed immediately after expireAt was set. This is unlikely to
+ // occur in production where the garbage collection delay should be
+ // sufficiently large.
+ continue;
+ }
+
+ if (!_opCtx->writesAreReplicated()) {
+ // Setting expireAt implies that the TenantMigrationDonorAccessBlocker for
+ // this migration will be removed shortly after this. However, a lagged
+ // secondary might not manage to advance its majority commit point past the
+ // migration commit or abort opTime and consequently transition out of the
+ // blocking state before the TenantMigrationDonorAccessBlocker is removed.
+ // When this occurs, blocked reads or writes will be left waiting for the
+ // migration decision indefinitely. To avoid that, notify the
+ // TenantMigrationDonorAccessBlocker here that the commit or abort opTime
+ // has been majority committed (guaranteed to be true since by design the
+ // donor never marks its state doc as garbage collectable before the
+ // migration decision is majority committed).
+ mtab->onMajorityCommitPointUpdate(
+ _donorStateDoc.getCommitOrAbortOpTime().get());
+ }
+
+ if (_donorStateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
+ invariant(mtab->inStateAborted());
+ // The migration durably aborted and is now marked as garbage collectable,
+ // remove its TenantMigrationDonorAccessBlocker right away to allow
+ // back-to-back migration retries.
+ TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext())
+ .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
+ }
+ }
+ }
+ return;
+ }
+
switch (_donorStateDoc.getState()) {
case ShardSplitDonorStateEnum::kCommitted:
onTransitionToCommitted(_opCtx, _donorStateDoc);
@@ -286,6 +374,11 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
}
auto donorStateDoc = parseAndValidateDonorDocument(doc);
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "cannot delete a donor's state document " << doc
+ << " since it has not been marked as garbage collectable.",
+ donorStateDoc.getExpireAt());
+
if (donorStateDoc.getTenantIds()) {
auto tenantIds = *donorStateDoc.getTenantIds();
std::vector<std::string> result;
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 8c346a643ac..bd0cbf32b44 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -34,6 +34,7 @@
#include "mongo/client/streamable_replica_set_monitor.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
@@ -107,6 +108,8 @@ MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeBlocking);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterBlocking);
MONGO_FAIL_POINT_DEFINE(skipShardSplitWaitForSplitAcceptance);
+const std::string kTTLIndexName = "ShardSplitDonorTTLIndex";
+
} // namespace
namespace detail {
@@ -161,7 +164,7 @@ SemiFuture<void> makeRecipientAcceptSplitFuture(ExecutorPtr executor,
auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext());
invariant(replCoord);
- auto recipientConnectionString = repl::makeRecipientConnectionString(
+ auto recipientConnectionString = serverless::makeRecipientConnectionString(
replCoord->getConfig(), recipientTagName, recipientSetName);
auto monitor = ReplicaSetMonitor::createIfNeeded(MongoURI{recipientConnectionString});
invariant(monitor);
@@ -201,6 +204,36 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ShardSplitDonorService::cons
ShardSplitDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), initialState));
}
+ExecutorFuture<void> ShardSplitDonorService::_createStateDocumentTTLIndex(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) {
+ return AsyncTry([this] {
+ auto nss = getStateDocumentsNS();
+
+ AllowOpCtxWhenServiceRebuildingBlock allowOpCtxBlock(Client::getCurrent());
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+ DBDirectClient client(opCtx);
+
+ BSONObj result;
+ client.runCommand(
+ nss.db().toString(),
+ BSON("createIndexes"
+ << nss.coll().toString() << "indexes"
+ << BSON_ARRAY(BSON("key" << BSON("expireAt" << 1) << "name" << kTTLIndexName
+ << "expireAfterSeconds" << 0))),
+ result);
+ uassertStatusOK(getStatusFromCommandResult(result));
+ })
+ .until([](Status status) { return status.isOK(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, token);
+}
+
+ExecutorFuture<void> ShardSplitDonorService::_rebuildService(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) {
+ return _createStateDocumentTTLIndex(executor, token);
+}
+
ShardSplitDonorService::DonorStateMachine::DonorStateMachine(
ServiceContext* serviceContext,
ShardSplitDonorService* splitService,
@@ -227,6 +260,17 @@ void ShardSplitDonorService::DonorStateMachine::tryAbort() {
}
}
+void ShardSplitDonorService::DonorStateMachine::tryForget() {
+ LOGV2(6236601, "Forgetting shard split", "id"_attr = _migrationId);
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_forgetShardSplitReceivedPromise.getFuture().isReady()) {
+ LOGV2(6236602, "Donor Forget Migration promise is already ready", "id"_attr = _migrationId);
+ return;
+ }
+ _forgetShardSplitReceivedPromise.emplaceValue();
+}
+
Status ShardSplitDonorService::DonorStateMachine::checkIfOptionsConflict(
const ShardSplitDonorDocument& stateDoc) const {
stdx::lock_guard<Latch> lg(_mutex);
@@ -266,13 +310,12 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
"id"_attr = _migrationId,
"timeout"_attr = repl::shardSplitTimeoutMS.load());
- _completionPromise.setWith([&] {
+ _decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
.then([this, executor, primaryToken] {
// Note we do not use the abort split token here because the abortShardSplit
// command waits for a decision to be persisted which will not happen if
// inserting the initial state document fails.
-
return _writeInitialDocument(executor, primaryToken);
})
.then([this] { pauseShardSplitBeforeBlocking.pauseWhileSet(); })
@@ -310,7 +353,17 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
.unsafeToInlineFuture();
});
- return _completionPromise.getFuture().semi().ignoreValue();
+ _completionPromise.setFrom(
+ _decisionPromise.getFuture()
+ .semi()
+ .ignoreValue()
+ .thenRunOn(**executor)
+ .then([this, anchor = shared_from_this(), executor, primaryToken] {
+ return _waitForForgetCmdThenMarkGarbageCollectible(executor, primaryToken);
+ })
+ .unsafeToInlineFuture());
+
+ return _completionPromise.getFuture().semi();
}
void ShardSplitDonorService::DonorStateMachine::interrupt(Status status) {}
@@ -361,7 +414,7 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestam
invariant(_stateDoc.getRecipientTagName());
auto recipientTagName = *_stateDoc.getRecipientTagName();
- auto recipientNodes = getRecipientMembers(replCoord->getConfig(), recipientTagName);
+ auto recipientNodes = serverless::getRecipientMembers(replCoord->getConfig(), recipientTagName);
WriteConcernOptions writeConcern;
writeConcern.w = WTags{{recipientTagName.toString(), recipientNodes.size()}};
@@ -667,4 +720,54 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
return DurableState{_stateDoc.getState(), _abortReason};
});
}
+
+ExecutorFuture<repl::OpTime>
+ShardSplitDonorService::DonorStateMachine::_markStateDocAsGarbageCollectable(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
+
+ return AsyncTry([this, self = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc));
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, token);
+}
+
+ExecutorFuture<void>
+ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
+ LOGV2(6236603,
+ "Waiting to receive 'forgetShardSplit' command.",
+ "migrationId"_attr = _migrationId);
+ auto expiredAt = [&]() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ return _stateDoc.getExpireAt();
+ }();
+
+ if (expiredAt) {
+ LOGV2(6236604, "expiredAt is already set", "migrationId"_attr = _migrationId);
+ return ExecutorFuture(**executor);
+ }
+
+ return std::move(_forgetShardSplitReceivedPromise.getFuture())
+ .thenRunOn(**executor)
+ .then([this, self = shared_from_this(), executor, token] {
+ LOGV2(6236606,
+ "Marking shard split as garbage-collectable.",
+ "migrationId"_attr = _migrationId);
+ return _markStateDocAsGarbageCollectable(executor, token);
+ })
+ .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), token);
+ });
+}
+
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h
index ff42a57147e..58a2a984703 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.h
+++ b/src/mongo/db/serverless/shard_split_donor_service.h
@@ -80,6 +80,12 @@ protected:
std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override;
private:
+ ExecutorFuture<void> _createStateDocumentTTLIndex(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token);
+
+ ExecutorFuture<void> _rebuildService(std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) override;
+
ServiceContext* const _serviceContext;
};
@@ -103,6 +109,12 @@ public:
*/
void tryAbort();
+ /**
+ * Try to forget the shard split operation. If the operation is not in a final state, the
+ * promise will be set but the garbage collection will be skipped.
+ */
+ void tryForget();
+
Status checkIfOptionsConflict(const ShardSplitDonorDocument& stateDoc) const;
SemiFuture<void> run(std::shared_ptr<executor::ScopedTaskExecutor> executor,
@@ -110,7 +122,11 @@ public:
void interrupt(Status status) override;
- SharedSemiFuture<DurableState> completionFuture() const {
+ SharedSemiFuture<DurableState> decisionFuture() const {
+ return _decisionPromise.getFuture();
+ }
+
+ SharedSemiFuture<void> completionFuture() const {
return _completionPromise.getFuture();
}
@@ -129,6 +145,14 @@ public:
MongoProcessInterface::CurrentOpConnectionsMode connMode,
MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override;
+ /**
+ * Returns true if the state doc was marked to expire (marked garbage collectable).
+ */
+ bool isGarbageCollectable() const {
+ stdx::lock_guard<Latch> lg(_mutex);
+ return !!_stateDoc.getExpireAt();
+ }
+
private:
// Tasks
ExecutorFuture<void> _enterBlockingState(const ScopedTaskExecutorPtr& executor,
@@ -164,6 +188,12 @@ private:
const CancellationToken& instanceAbortToken,
const CancellationToken& abortToken);
+ ExecutorFuture<repl::OpTime> _markStateDocAsGarbageCollectable(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token);
+
+ ExecutorFuture<void> _waitForForgetCmdThenMarkGarbageCollectible(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& token);
+
private:
const NamespaceString _stateDocumentsNS = NamespaceString::kTenantSplitDonorsNamespace;
mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardSplitDonorService::_mutex");
@@ -186,11 +216,17 @@ private:
// A promise fulfilled when the replicaSetMonitor has been created;
SharedPromise<void> _replicaSetMonitorCreatedPromise;
- // A promise fulfilled when the shard split operation has fully completed
- SharedPromise<DurableState> _completionPromise;
+ // A promise fulfilled when the shard split has committed or aborted.
+ SharedPromise<DurableState> _decisionPromise;
+
+ // A promise fulfilled when the shard split operation has fully completed.
+ SharedPromise<void> _completionPromise;
// A promise fulfilled when all recipient nodes have accepted the split.
SharedPromise<void> _recipientAcceptedSplit;
+
+ // A promise fulfilled when tryForget is called.
+ SharedPromise<void> _forgetShardSplitReceivedPromise;
};
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index 7dc4bccd866..8a979fbff5c 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/client/streamable_replica_set_monitor_for_testing.h"
#include "mongo/db/catalog/database_holder_mock.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/op_observer_impl.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/repl/drop_pending_collection_reaper.h"
@@ -49,6 +50,7 @@
#include "mongo/db/serverless/shard_split_donor_service.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_test_utils.h"
+#include "mongo/db/serverless/shard_split_utils.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/dbtests/mock/mock_conn_registry.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
@@ -62,6 +64,7 @@
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/log_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
@@ -108,6 +111,27 @@ std::ostringstream& operator<<(std::ostringstream& builder,
return builder;
}
+void fastForwardCommittedSnapshotOpTime(
+ std::shared_ptr<ShardSplitDonorService::DonorStateMachine> instance,
+ ServiceContext* serviceContext,
+ OperationContext* opCtx,
+ const UUID& uuid) {
+ // When a state document is transitioned to kAborted, the ShardSplitDonorOpObserver will
+ // transition tenant access blockers to a kAborted state if, and only if, the abort timestamp
+ // is less than or equal to the currentCommittedSnapshotOpTime. Since we are using the
+ // ReplicationCoordinatorMock, we must manually manage the currentCommittedSnapshotOpTime
+ // using this method.
+ auto replCoord = dynamic_cast<repl::ReplicationCoordinatorMock*>(
+ repl::ReplicationCoordinator::get(serviceContext));
+
+ auto foundStateDoc = uassertStatusOK(serverless::getStateDocument(opCtx, uuid));
+ invariant(foundStateDoc.getCommitOrAbortOpTime());
+
+ replCoord->setCurrentCommittedSnapshotOpTime(*foundStateDoc.getCommitOrAbortOpTime());
+ serviceContext->getOpObserver()->onMajorityCommitPointUpdate(
+ serviceContext, *foundStateDoc.getCommitOrAbortOpTime());
+}
+
class ShardSplitDonorServiceTest : public repl::PrimaryOnlyServiceMongoDTest {
public:
void setUp() override {
@@ -170,7 +194,7 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
ASSERT(serviceInstance.get());
ASSERT_EQ(_uuid, serviceInstance->getId());
- auto completionFuture = serviceInstance->completionFuture();
+ auto decisionFuture = serviceInstance->decisionFuture();
std::shared_ptr<TopologyDescription> topologyDescriptionOld =
std::make_shared<sdam::TopologyDescription>(sdam::SdamConfiguration());
@@ -189,18 +213,24 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
publisher->onTopologyDescriptionChangedEvent(topologyDescriptionOld, topologyDescriptionNew);
- completionFuture.wait();
+ decisionFuture.wait();
- auto result = completionFuture.get();
+ auto result = decisionFuture.get();
ASSERT(!result.abortReason);
ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted);
+
+ serviceInstance->tryForget();
+
+ ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) {
auto opCtx = makeOperationContext();
+ auto serviceContext = getServiceContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
test::shard_split::reconfigToAddRecipientNodes(
- getServiceContext(), _recipientTagName, _replSet.getHosts());
+ serviceContext, _recipientTagName, _replSet.getHosts());
auto stateDocument = defaultStateDocument();
@@ -213,12 +243,18 @@ TEST_F(ShardSplitDonorServiceTest, ShardSplitDonorServiceTimeout) {
ASSERT(serviceInstance.get());
ASSERT_EQ(_uuid, serviceInstance->getId());
- auto completionFuture = serviceInstance->completionFuture();
+ auto decisionFuture = serviceInstance->decisionFuture();
- auto result = completionFuture.get();
+ auto result = decisionFuture.get();
ASSERT(result.abortReason);
ASSERT_EQ(result.abortReason->code(), ErrorCodes::ExceededTimeLimit);
+
+ fastForwardCommittedSnapshotOpTime(serviceInstance, serviceContext, opCtx.get(), _uuid);
+ serviceInstance->tryForget();
+
+ ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
// Abort scenario : abortSplit called before startSplit.
@@ -234,20 +270,26 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) {
opCtx.get(), _service, stateDocument.toBSON());
ASSERT(serviceInstance.get());
- auto result = serviceInstance->completionFuture().get(opCtx.get());
+ auto result = serviceInstance->decisionFuture().get(opCtx.get());
ASSERT(!!result.abortReason);
ASSERT_EQ(result.abortReason->code(), ErrorCodes::TenantMigrationAborted);
ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted);
+
+ serviceInstance->tryForget();
+
+ ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
// Abort scenario : instance created through startSplit then calling abortSplit.
TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) {
auto opCtx = makeOperationContext();
+ auto serviceContext = getServiceContext();
test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
test::shard_split::reconfigToAddRecipientNodes(
- getServiceContext(), _recipientTagName, _replSet.getHosts());
+ serviceContext, _recipientTagName, _replSet.getHosts());
std::shared_ptr<ShardSplitDonorService::DonorStateMachine> serviceInstance;
{
@@ -262,11 +304,18 @@ TEST_F(ShardSplitDonorServiceTest, CreateInstanceThenAbort) {
serviceInstance->tryAbort();
}
- auto result = serviceInstance->completionFuture().get(opCtx.get());
+
+ auto result = serviceInstance->decisionFuture().get(opCtx.get());
ASSERT(!!result.abortReason);
ASSERT_EQ(result.abortReason->code(), ErrorCodes::TenantMigrationAborted);
ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted);
+
+ fastForwardCommittedSnapshotOpTime(serviceInstance, serviceContext, opCtx.get(), _uuid);
+ serviceInstance->tryForget();
+
+ ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
TEST_F(ShardSplitDonorServiceTest, StepDownTest) {
@@ -290,9 +339,76 @@ TEST_F(ShardSplitDonorServiceTest, StepDownTest) {
stepDown();
}
- auto result = serviceInstance->completionFuture().getNoThrow();
+ auto result = serviceInstance->decisionFuture().getNoThrow();
ASSERT_FALSE(result.isOK());
ASSERT_EQ(ErrorCodes::InterruptedDueToReplStateChange, result.getStatus());
+
+ ASSERT_EQ(serviceInstance->completionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+ ASSERT_FALSE(serviceInstance->isGarbageCollectable());
+}
+
+// TODO(SERVER-62363) : Remove this test once the ticket is completed.
+DEATH_TEST_F(ShardSplitDonorServiceTest, StartWithExpireAtAlreadySet, "invariant") {
+ auto opCtx = makeOperationContext();
+
+ test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
+
+ auto stateDocument = defaultStateDocument();
+ stateDocument.setState(ShardSplitDonorStateEnum::kCommitted);
+
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, stateDocument.toBSON());
+
+ ASSERT(serviceInstance.get());
+
+ // this triggers an invariant updateResult.numDocsModified == 1 in _updateStateDocument when
+ // going in the _handleErrorOrEnterAbortedState.
+ auto result = serviceInstance->decisionFuture().get(opCtx.get());
+}
+
+TEST_F(ShardSplitDonorServiceTest, StepUpWithkCommitted) {
+ auto opCtx = makeOperationContext();
+
+ test::shard_split::ScopedTenantAccessBlocker scopedTenants(_tenantIds, opCtx.get());
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts());
+
+ auto nss = NamespaceString::kTenantSplitDonorsNamespace;
+ auto stateDocument = defaultStateDocument();
+ stateDocument.setState(ShardSplitDonorStateEnum::kUninitialized);
+
+ // insert the document for the first time.
+ ASSERT_OK(serverless::insertStateDoc(opCtx.get(), stateDocument));
+
+ {
+ // update to kCommitted
+ stateDocument.setState(ShardSplitDonorStateEnum::kCommitted);
+ boost::optional<mongo::Date_t> expireAt = getServiceContext()->getFastClockSource()->now() +
+ Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()};
+ stateDocument.setExpireAt(expireAt);
+ stateDocument.setBlockTimestamp(Timestamp(1, 1));
+ stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1));
+
+ ASSERT_OK(serverless::updateStateDoc(opCtx.get(), stateDocument));
+
+ auto foundStateDoc = uassertStatusOK(serverless::getStateDocument(opCtx.get(), _uuid));
+ invariant(foundStateDoc.getExpireAt());
+ ASSERT_EQ(*foundStateDoc.getExpireAt(), *expireAt);
+ }
+
+ auto serviceInstance = ShardSplitDonorService::DonorStateMachine::getOrCreate(
+ opCtx.get(), _service, stateDocument.toBSON());
+ ASSERT(serviceInstance.get());
+
+ auto result = serviceInstance->decisionFuture().get();
+ ASSERT(!result.abortReason);
+ ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted);
+
+ // we don't need to call tryForget since expireAt is already set the completionPromise will
+ // complete.
+ ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
TEST_F(ShardSplitDonorServiceTest, TimeoutAbortsAwaitReplication) {
@@ -327,10 +443,16 @@ TEST_F(ShardSplitDonorServiceTest, TimeoutAbortsAwaitReplication) {
});
}
- uassertStatusOK(serviceInstance->completionFuture().getNoThrow());
- auto result = serviceInstance->completionFuture().get(opCtx.get());
+ uassertStatusOK(serviceInstance->decisionFuture().getNoThrow());
+ auto result = serviceInstance->decisionFuture().get(opCtx.get());
ASSERT(result.abortReason);
ASSERT_EQ(result.abortReason->code(), ErrorCodes::ExceededTimeLimit);
+
+ fastForwardCommittedSnapshotOpTime(serviceInstance, getServiceContext(), opCtx.get(), _uuid);
+ serviceInstance->tryForget();
+
+ ASSERT_OK(serviceInstance->completionFuture().getNoThrow());
+ ASSERT_TRUE(serviceInstance->isGarbageCollectable());
}
class SplitReplicaSetObserverTest : public ServiceContextTest {
diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl
index 2140427732c..81cef079bff 100644
--- a/src/mongo/db/serverless/shard_split_state_machine.idl
+++ b/src/mongo/db/serverless/shard_split_state_machine.idl
@@ -85,3 +85,9 @@ structs:
type: object_owned
description: "The error that caused the split to abort."
optional: true
+ expireAt:
+ type: date
+ description: >-
+ The wall-clock time at which the state machine document should be
+ removed by the TTL monitor.
+ optional: true
diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp
index 35c138122b3..c1d6afcb913 100644
--- a/src/mongo/db/serverless/shard_split_utils.cpp
+++ b/src/mongo/db/serverless/shard_split_utils.cpp
@@ -28,17 +28,23 @@
*/
#include "mongo/db/serverless/shard_split_utils.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/lock_manager_defs.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/repl/repl_set_config.h"
namespace mongo {
-namespace repl {
-std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config,
- const StringData& recipientTagName) {
- std::vector<MemberConfig> result;
+
+namespace serverless {
+std::vector<repl::MemberConfig> getRecipientMembers(const repl::ReplSetConfig& config,
+ const StringData& recipientTagName) {
+ std::vector<repl::MemberConfig> result;
const auto& tagConfig = config.getTagConfig();
for (auto member : config.members()) {
auto matchesTag =
- std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const ReplSetTag& tag) {
+ std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const repl::ReplSetTag& tag) {
return tagConfig.getTagKey(tag) == recipientTagName;
});
@@ -51,7 +57,7 @@ std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config,
}
-ConnectionString makeRecipientConnectionString(const ReplSetConfig& config,
+ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config,
const StringData& recipientTagName,
const StringData& recipientSetName) {
auto recipientMembers = getRecipientMembers(config, recipientTagName);
@@ -59,14 +65,14 @@ ConnectionString makeRecipientConnectionString(const ReplSetConfig& config,
std::transform(recipientMembers.cbegin(),
recipientMembers.cend(),
std::back_inserter(recipientNodes),
- [](const MemberConfig& member) { return member.getHostAndPort(); });
+ [](const repl::MemberConfig& member) { return member.getHostAndPort(); });
return ConnectionString::forReplicaSet(recipientSetName.toString(), recipientNodes);
}
-ReplSetConfig makeSplitConfig(const ReplSetConfig& config,
- const std::string& recipientSetName,
- const std::string& recipientTagName) {
+repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config,
+ const std::string& recipientSetName,
+ const std::string& recipientTagName) {
dassert(!recipientSetName.empty() && recipientSetName != config.getReplSetName());
uassert(6201800,
"We can not make a split config of an existing split config.",
@@ -77,7 +83,7 @@ ReplSetConfig makeSplitConfig(const ReplSetConfig& config,
int donorIndex = 0, recipientIndex = 0;
for (const auto& member : config.members()) {
bool isRecipient =
- std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const ReplSetTag& tag) {
+ std::any_of(member.tagsBegin(), member.tagsEnd(), [&](const repl::ReplSetTag& tag) {
return tagConfig.getTagKey(tag) == recipientTagName;
});
if (isRecipient) {
@@ -114,7 +120,89 @@ ReplSetConfig makeSplitConfig(const ReplSetConfig& config,
splitConfigBob.append("members", donorMembers);
splitConfigBob.append("recipientConfig", recipientConfigBob.obj());
- return ReplSetConfig::parse(splitConfigBob.obj());
+ return repl::ReplSetConfig::parse(splitConfigBob.obj());
+}
+
+Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) {
+ const auto nss = NamespaceString::kTenantSplitDonorsNamespace;
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ uassert(ErrorCodes::PrimarySteppedDown,
+ str::stream() << "No longer primary while attempting to insert shard split"
+ " state document",
+ repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss));
+
+ return writeConflictRetry(opCtx, "insertShardSplitStateDoc", nss.ns(), [&]() -> Status {
+ const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName
+ << stateDoc.getId() << ShardSplitDonorDocument::kExpireAtFieldName
+ << BSON("$exists" << false));
+ const auto updateMod = BSON("$setOnInsert" << stateDoc.toBSON());
+ auto updateResult =
+ Helpers::upsert(opCtx, nss.ns(), filter, updateMod, /*fromMigrate=*/false);
+
+ invariant(!updateResult.numDocsModified);
+ if (updateResult.upsertedId.isEmpty()) {
+ return {ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Failed to insert the shard split state doc: "
+ << stateDoc.toBSON()};
+ }
+ return Status::OK();
+ });
+}
+
+Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) {
+ const auto nss = NamespaceString::kTenantSplitDonorsNamespace;
+ AutoGetCollection collection(opCtx, nss, MODE_IX);
+
+ if (!collection) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << nss.ns() << " does not exist");
+ }
+
+ return writeConflictRetry(opCtx, "updateShardSplitStateDoc", nss.ns(), [&]() -> Status {
+ auto updateResult =
+ Helpers::upsert(opCtx, nss.ns(), stateDoc.toBSON(), /*fromMigrate=*/false);
+ if (updateResult.numMatched == 0) {
+ return {ErrorCodes::NoSuchKey,
+ str::stream() << "Existing shard split state document not found for id: "
+ << stateDoc.getId()};
+ }
+
+ return Status::OK();
+ });
+}
+
+StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx,
+ const UUID& shardSplitId) {
+ // Read the most up to date data.
+ ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kNoTimestamp);
+ AutoGetCollectionForRead collection(opCtx, NamespaceString::kTenantSplitDonorsNamespace);
+ if (!collection) {
+ return Status(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Collection not found looking for state document: "
+ << NamespaceString::kTenantSplitDonorsNamespace.ns());
+ }
+
+ BSONObj result;
+ auto foundDoc = Helpers::findOne(
+ opCtx, collection.getCollection(), BSON("_id" << shardSplitId), result, true);
+
+ if (!foundDoc) {
+ return Status(ErrorCodes::NoMatchingDocument,
+ str::stream()
+ << "No matching state doc found with shard split id: " << shardSplitId);
+ }
+
+ try {
+ return ShardSplitDonorDocument::parse(IDLParserErrorContext("shardSplitStateDocument"),
+ result);
+ } catch (DBException& ex) {
+ return ex.toStatus(str::stream()
+ << "Invalid BSON found for matching document with shard split id: "
+ << shardSplitId << " , res: " << result);
+ }
}
-} // namespace repl
+
+
+} // namespace serverless
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h
index ce3a4e6783f..99060abfa5c 100644
--- a/src/mongo/db/serverless/shard_split_utils.h
+++ b/src/mongo/db/serverless/shard_split_utils.h
@@ -30,16 +30,16 @@
#pragma once
#include "mongo/db/repl/repl_set_config.h"
+#include "mongo/db/serverless/shard_split_state_machine_gen.h"
namespace mongo {
-namespace repl {
-
+namespace serverless {
/**
* @returns A list of `MemberConfig` for member nodes which match a provided replica set tag name
*/
-std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config,
- const StringData& recipientTagName);
+std::vector<repl::MemberConfig> getRecipientMembers(const repl::ReplSetConfig& config,
+ const StringData& recipientTagName);
/**
@@ -47,7 +47,7 @@ std::vector<MemberConfig> getRecipientMembers(const ReplSetConfig& config,
* `recipientTagName`. The `recipientSetName` is the `replSet` parameter of the recipient
* connection string.
*/
-ConnectionString makeRecipientConnectionString(const ReplSetConfig& config,
+ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config,
const StringData& recipientTagName,
const StringData& recipientSetName);
@@ -57,9 +57,47 @@ ConnectionString makeRecipientConnectionString(const ReplSetConfig& config,
* to filter the local member list for recipient nodes. The `recipientSetName` is used to validate
* that we are indeed generating a config for a recipient set with a new name.
*/
-ReplSetConfig makeSplitConfig(const ReplSetConfig& config,
- const std::string& recipientSetName,
- const std::string& recipientTagName);
+repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config,
+ const std::string& recipientSetName,
+ const std::string& recipientTagName);
+
+/**
+ * Inserts the shard split state document 'stateDoc' into
+ * 'config.tenantSplitDonors' collection. Also, creates the collection if not present
+ * before inserting the document.
+ *
+ * NOTE: A state doc might get inserted based on a decision made out of a stale read within a
+ * storage transaction. Callers are expected to have their own concurrency mechanism to handle
+ * write skew problem.
+ *
+ * @Returns 'ConflictingOperationInProgress' error code if an active shard split op found for the
+ * given state doc id provided in the 'stateDoc'.
+ *
+ * Throws 'DuplicateKey' error code if a document already exists on the disk with the same
+ * shardSplitId, irrespective of the document marked for garbage collect or not.
+ */
+Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc);
+
+/**
+ * Updates the shard split state doc in the database.
+ *
+ * Returns 'NoSuchKey' error code if no state document already exists on the disk with the same
+ * shardSplitId.
+ */
+Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc);
+
+/**
+ * Returns the state doc matching the document with shardSplitId from the disk if it
+ * exists. Reads at "no" timestamp i.e, reading with the "latest" snapshot reflecting up to date
+ * data.
+ *
+ * If the stored state doc on disk contains invalid BSON, the 'InvalidBSON' error code is
+ * returned.
+ *
+ * Returns 'NoMatchingDocument' error code if no document with 'shardSplitId' is found.
+ */
+StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx,
+ const UUID& shardSplitId);
-} // namespace repl
+} // namespace serverless
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_utils_test.cpp b/src/mongo/db/serverless/shard_split_utils_test.cpp
index 12994379de3..1d24b091090 100644
--- a/src/mongo/db/serverless/shard_split_utils_test.cpp
+++ b/src/mongo/db/serverless/shard_split_utils_test.cpp
@@ -82,7 +82,7 @@ TEST(MakeSplitConfig, toBSONRoundTripAbility) {
<< "recipientConfig" << resultRecipientConfigBSON);
const ReplSetConfig splitConfigResult =
- repl::makeSplitConfig(configA, recipientConfigSetName, recipientTagName);
+ serverless::makeSplitConfig(configA, recipientConfigSetName, recipientTagName);
ASSERT_OK(splitConfigResult.validate());
ASSERT_TRUE(splitConfigResult == ReplSetConfig::parse(splitConfigResult.toBSON()));
@@ -119,7 +119,7 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) {
const ReplSetConfig splitConfig =
- repl::makeSplitConfig(config, recipientConfigSetName, recipientTagName);
+ serverless::makeSplitConfig(config, recipientConfigSetName, recipientTagName);
ASSERT_OK(splitConfig.validate());
ASSERT_EQ(splitConfig.getReplSetName(), donorConfigSetName);
ASSERT_TRUE(splitConfig.toBSON().hasField("members"));
@@ -135,9 +135,10 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) {
ASSERT_TRUE(recipientConfigPtr->getRecipientConfig() == nullptr);
ASSERT_EQ(recipientConfigPtr->getReplSetName(), recipientConfigSetName);
- ASSERT_THROWS_CODE(repl::makeSplitConfig(splitConfig, recipientConfigSetName, recipientTagName),
- AssertionException,
- 6201800 /*calling on a splitconfig*/);
+ ASSERT_THROWS_CODE(
+ serverless::makeSplitConfig(splitConfig, recipientConfigSetName, recipientTagName),
+ AssertionException,
+ 6201800 /*calling on a splitconfig*/);
}
TEST(MakeSplitConfig, SplitConfigAssertionsTest) {
@@ -150,9 +151,9 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) {
<< "localhost:20002"
<< "priority" << 0 << "votes" << 0)));
- ASSERT_THROWS_CODE(repl::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON),
- recipientConfigSetName,
- recipientTagName),
+ ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON),
+ recipientConfigSetName,
+ recipientTagName),
AssertionException,
6201801 /*no recipient members created*/);
@@ -165,9 +166,9 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) {
<< BSON(recipientTagName << "one")))
<< "settings" << BSON("electionTimeoutMillis" << 1000));
- ASSERT_THROWS_CODE(repl::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON),
- recipientConfigSetName,
- recipientTagName),
+ ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON),
+ recipientConfigSetName,
+ recipientTagName),
AssertionException,
6201802 /*no donor members created*/);
}