summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDidier Nadeau <didier.nadeau@mongodb.com>2022-06-15 18:01:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-15 20:18:42 +0000
commitbfd0810609bf8213c92d004313b87a68b3b66981 (patch)
tree5ea07d5e26fc9148eeb8831c4efc119de14f3617
parentfecef7a1f75e196a24715fabb0721124e71e170b (diff)
downloadmongo-bfd0810609bf8213c92d004313b87a68b3b66981.tar.gz
SERVER-66705 Move aborting index builds out of critical section
-rw-r--r--buildscripts/resmokelib/testing/hooks/shard_split.py4
-rw-r--r--jstests/libs/override_methods/inject_tenant_prefix.js2
-rw-r--r--jstests/serverless/libs/basic_serverless_test.js4
-rw-r--r--jstests/serverless/shard_split_performance_test.js6
-rw-r--r--jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js2
-rw-r--r--jstests/serverless/shard_split_write_during_split_stepdown.js5
-rw-r--r--src/mongo/db/namespace_string.cpp4
-rw-r--r--src/mongo/db/namespace_string.h2
-rw-r--r--src/mongo/db/repl/tenant_migration_access_blocker_util.cpp4
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.cpp187
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp50
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp435
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.h17
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service_test.cpp61
-rw-r--r--src/mongo/db/serverless/shard_split_state_machine.idl1
-rw-r--r--src/mongo/db/serverless/shard_split_utils.cpp6
-rw-r--r--src/mongo/db/serverless/shard_split_utils.h2
17 files changed, 429 insertions, 363 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/shard_split.py b/buildscripts/resmokelib/testing/hooks/shard_split.py
index d7cb0d05a72..10748f6e1e4 100644
--- a/buildscripts/resmokelib/testing/hooks/shard_split.py
+++ b/buildscripts/resmokelib/testing/hooks/shard_split.py
@@ -473,7 +473,7 @@ class _ShardSplitThread(threading.Thread): # pylint: disable=too-many-instance-
while True:
try:
res = donor_node_client.config.command({
- "count": "tenantSplitDonors",
+ "count": "shardSplitDonors",
"query": {"tenantIds": split_opts.tenant_ids}
})
if res["n"] == 0:
@@ -502,7 +502,7 @@ class _ShardSplitThread(threading.Thread): # pylint: disable=too-many-instance-
while True:
try:
res = recipient_node_client.config.command({
- "count": "tenantSplitDonors",
+ "count": "shardSplitDonors",
"query": {"tenantIds": split_opts.tenant_ids}
})
if res["n"] == 0:
diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js
index 92749a1b9de..2d46f138291 100644
--- a/jstests/libs/override_methods/inject_tenant_prefix.js
+++ b/jstests/libs/override_methods/inject_tenant_prefix.js
@@ -435,7 +435,7 @@ function convertServerConnectionStringToURI(input) {
* that there is only one such operation.
*/
function getOperationStateDocument(conn) {
- const collection = isShardSplitPassthrough() ? "tenantSplitDonors" : "tenantMigrationDonors";
+ const collection = isShardSplitPassthrough() ? "shardSplitDonors" : "tenantMigrationDonors";
const filter =
isShardSplitPassthrough() ? {tenantIds: TestData.tenantIds} : {tenantId: TestData.tenantId};
const findRes = assert.commandWorked(
diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js
index bba2a651cc7..6017c17c756 100644
--- a/jstests/serverless/libs/basic_serverless_test.js
+++ b/jstests/serverless/libs/basic_serverless_test.js
@@ -347,7 +347,7 @@ class BasicServerlessTest {
/*
* Wait for state document garbage collection by polling for when the document has been removed
- * from the tenantSplitDonors namespace, and all access blockers have been removed.
+ * from the 'shardSplitDonors' namespace, and all access blockers have been removed.
* @param {migrationId} id that was used for the commitShardSplit command.
* @param {tenantIds} tenant ids of the shard split.
*/
@@ -538,7 +538,7 @@ class BasicServerlessTest {
}
}
-BasicServerlessTest.kConfigSplitDonorsNS = "config.tenantSplitDonors";
+BasicServerlessTest.kConfigSplitDonorsNS = "config.shardSplitDonors";
BasicServerlessTest.DonorState = {
kUninitialized: "uninitialized",
kBlocking: "blocking",
diff --git a/jstests/serverless/shard_split_performance_test.js b/jstests/serverless/shard_split_performance_test.js
index cb5d0dd20cb..2633d804dec 100644
--- a/jstests/serverless/shard_split_performance_test.js
+++ b/jstests/serverless/shard_split_performance_test.js
@@ -8,7 +8,6 @@ load("jstests/serverless/libs/basic_serverless_test.js");
load("jstests/replsets/rslib.js");
const kBlockStart = "Entering 'blocking' state.";
-const kAbortingIndex = "Aborting index build for shard split.";
const kReconfig = "Applying the split config";
const kWaitForRecipients = "Waiting for recipient to accept the split.";
const kEndMsg = "Shard split decision reached";
@@ -82,18 +81,15 @@ function runOneSplit() {
assertMigrationState(test.donor.getPrimary(), operation.migrationId, "committed");
const blockTS = extractTs(checkLog.getLogMessage(primary, kBlockStart));
- const abortingTS = extractTs(checkLog.getLogMessage(primary, kAbortingIndex));
const reconfigTS = extractTs(checkLog.getLogMessage(primary, kReconfig));
const waitForRecipientsTS = extractTs(checkLog.getLogMessage(primary, kWaitForRecipients));
const endTS = extractTs(checkLog.getLogMessage(primary, kEndMsg));
const blockDurationMs = endTS - blockTS;
- const abortingIndexDurationMs = endTS - abortingTS;
const waitForRecipientsDurationMs = endTS - waitForRecipientsTS;
const reconfigDurationMs = endTS - reconfigTS;
- const splitResult =
- {blockDurationMs, abortingIndexDurationMs, reconfigDurationMs, waitForRecipientsDurationMs};
+ const splitResult = {blockDurationMs, reconfigDurationMs, waitForRecipientsDurationMs};
jsTestLog(`Performance result of shard split: ${tojson(splitResult)}`);
const maximumReconfigDuration = 500;
diff --git a/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js b/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js
index ac41ae32af2..bf5dd5be8f6 100644
--- a/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js
+++ b/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js
@@ -194,7 +194,7 @@ const kCollName = "testColl";
// Cannot mark the state doc as garbage collectable before the migration commits or aborts.
assert.commandFailedWithCode(donorsColl.update({recipientSetName: operation.recipientSetName},
{$set: {expireAt: new Date()}}),
- ErrorCodes.IllegalOperation);
+ ErrorCodes.BadValue);
// Can drop the state doc collection but this will not cause all blocked reads and writes to
// hang.
diff --git a/jstests/serverless/shard_split_write_during_split_stepdown.js b/jstests/serverless/shard_split_write_during_split_stepdown.js
index d147f09412b..1670b5b57b3 100644
--- a/jstests/serverless/shard_split_write_during_split_stepdown.js
+++ b/jstests/serverless/shard_split_write_during_split_stepdown.js
@@ -38,12 +38,9 @@ tenantIds.forEach(id => {
[{_id: 0, x: 0}, {_id: 1, x: 1}, {_id: 2, x: 2}], {writeConcern: {w: "majority"}}));
});
-const operation = test.createSplitOperation(tenantIds);
-
const blockingFP = configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterBlocking");
-
+const operation = test.createSplitOperation(tenantIds);
const splitThread = operation.commitAsync();
-
blockingFP.wait();
const donorRst = createRstArgs(test.donor);
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 9d29b806e5d..dd17cf16877 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -95,8 +95,8 @@ const NamespaceString NamespaceString::kTenantMigrationRecipientsNamespace(
const NamespaceString NamespaceString::kTenantMigrationOplogView(
NamespaceString::kLocalDb, "system.tenantMigration.oplogView");
-const NamespaceString NamespaceString::kTenantSplitDonorsNamespace(NamespaceString::kConfigDb,
- "tenantSplitDonors");
+const NamespaceString NamespaceString::kShardSplitDonorsNamespace(NamespaceString::kConfigDb,
+ "shardSplitDonors");
const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb,
"cache.collections");
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 66872635967..c633ecab4fa 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -156,7 +156,7 @@ public:
static const NamespaceString kTenantMigrationOplogView;
// Namespace for storing the persisted state of tenant split donors.
- static const NamespaceString kTenantSplitDonorsNamespace;
+ static const NamespaceString kShardSplitDonorsNamespace;
// Namespace for replica set configuration settings.
static const NamespaceString kSystemReplSetNamespace;
diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
index 53e7b24f135..fc693f64c20 100644
--- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp
@@ -437,7 +437,7 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) {
// Recover TenantMigrationDonorAccessBlockers for ShardSplit.
PersistentTaskStore<ShardSplitDonorDocument> shardSplitDonorStore(
- NamespaceString::kTenantSplitDonorsNamespace);
+ NamespaceString::kShardSplitDonorsNamespace);
shardSplitDonorStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) {
// Skip creating a TenantMigrationDonorAccessBlocker for terminal shard split that have been
@@ -462,6 +462,8 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) {
.add(tenantId.toString(), mtab);
switch (doc.getState()) {
+ case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ break;
case ShardSplitDonorStateEnum::kBlocking:
invariant(doc.getBlockTimestamp());
mtab->startBlockingWrites();
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
index ce1d0e55ddf..9de2da2e33d 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp
@@ -42,6 +42,10 @@ bool isSecondary(const OperationContext* opCtx) {
return !opCtx->writesAreReplicated();
}
+bool isPrimary(const OperationContext* opCtx) {
+ return opCtx->writesAreReplicated();
+}
+
const auto tenantIdsToDeleteDecoration =
OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>();
@@ -50,6 +54,13 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
ShardSplitDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), doc);
const std::string errmsg = "Invalid donor state doc, {}: {}";
+ if (donorStateDoc.getExpireAt()) {
+ uassert(ErrorCodes::BadValue,
+ "Contains 'expireAt' but the split has not committed or aborted",
+ donorStateDoc.getState() == ShardSplitDonorStateEnum::kCommitted ||
+ donorStateDoc.getState() == ShardSplitDonorStateEnum::kAborted);
+ }
+
switch (donorStateDoc.getState()) {
case ShardSplitDonorStateEnum::kUninitialized:
uassert(ErrorCodes::BadValue,
@@ -68,6 +79,12 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
doc.toString()),
!donorStateDoc.getAbortReason());
break;
+ case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ uassert(ErrorCodes::BadValue,
+ errmsg,
+ !donorStateDoc.getBlockTimestamp() && !donorStateDoc.getCommitOrAbortOpTime() &&
+ !donorStateDoc.getAbortReason());
+ break;
case ShardSplitDonorStateEnum::kBlocking:
uassert(ErrorCodes::BadValue,
fmt::format(errmsg,
@@ -125,54 +142,61 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) {
* Initializes the TenantMigrationDonorAccessBlocker for the tenant migration denoted by the given
* state doc.
*/
-void onBlockerInitialization(OperationContext* opCtx,
- const ShardSplitDonorDocument& donorStateDoc) {
- invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking);
- invariant(donorStateDoc.getBlockTimestamp());
-
- auto optionalTenants = donorStateDoc.getTenantIds();
- invariant(optionalTenants);
-
- const auto& tenantIds = optionalTenants.get();
+void onTransitionToAbortingIndexBuilds(OperationContext* opCtx,
+ const ShardSplitDonorDocument& donorStateDoc) {
+ invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kAbortingIndexBuilds);
+ invariant(donorStateDoc.getTenantIds());
+ invariant(donorStateDoc.getRecipientConnectionString());
+
+ auto tenantIds = *donorStateDoc.getTenantIds();
+ auto recipientConnectionString = *donorStateDoc.getRecipientConnectionString();
+ for (const auto& tenantId : tenantIds) {
+ auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
+ opCtx->getServiceContext(),
+ donorStateDoc.getId(),
+ tenantId.toString(),
+ MigrationProtocolEnum::kMultitenantMigrations,
+ recipientConnectionString.toString());
+
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab);
+ }
- // The primary create and sets the tenant access blocker to blocking within the
- // ShardSplitDonorService.
- if (isSecondary(opCtx)) {
- auto recipientConnectionString = [stateDoc = donorStateDoc]() {
- if (stateDoc.getRecipientConnectionString()) {
- return *stateDoc.getRecipientConnectionString();
+ if (isPrimary(opCtx)) {
+ // onRollback is not registered on secondaries since secondaries should not fail to
+ // apply the write.
+ opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] {
+ for (const auto& tenantId : tenantIds) {
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
-
- auto recipientTagName = stateDoc.getRecipientTagName();
- invariant(recipientTagName);
- auto recipientSetName = stateDoc.getRecipientSetName();
- invariant(recipientSetName);
- auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
- return serverless::makeRecipientConnectionString(
- config, *recipientTagName, *recipientSetName);
- }();
-
- for (const auto& tenantId : tenantIds) {
- auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
- opCtx->getServiceContext(),
- donorStateDoc.getId(),
- tenantId.toString(),
- MigrationProtocolEnum::kMultitenantMigrations,
- recipientConnectionString.toString());
-
- TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .add(tenantId, mtab);
-
- // No rollback handler is necessary as the write should not fail on secondaries.
- mtab->startBlockingWrites();
- }
+ });
}
+}
- for (const auto& tenantId : tenantIds) {
+/**
+ * Transitions the TenantMigrationDonorAccessBlocker to the blocking state.
+ */
+void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocument& donorStateDoc) {
+ invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking);
+ invariant(donorStateDoc.getBlockTimestamp());
+ invariant(donorStateDoc.getTenantIds());
+
+ auto tenantIds = *donorStateDoc.getTenantIds();
+ for (auto tenantId : tenantIds) {
auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(
opCtx->getServiceContext(), tenantId);
invariant(mtab);
+ if (isSecondary(opCtx)) {
+ // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker before
+ // reserving the OpTime for the "start blocking" write, so only secondaries call
+ // startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op observer.
+ mtab->startBlockingWrites();
+ }
+
+ // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since
+ // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog
+ // hole is filled.
mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get());
}
}
@@ -206,9 +230,9 @@ void onTransitionToAborted(OperationContext* opCtx, const ShardSplitDonorDocumen
auto tenants = donorStateDoc.getTenantIds();
if (!tenants) {
- // The only case where there can be no tenants is when the instance is created by the abort
- // command. In that case, no tenant migration blockers are created and the state will go
- // straight to abort.
+ // The only case where there can be no tenants is when the instance is created by the
+ // abort command. In that case, no tenant migration blockers are created and the state
+ // will go straight to abort.
invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized);
return;
}
@@ -242,34 +266,35 @@ public:
_opCtx->getServiceContext(), tenantId);
if (!mtab) {
- // The state doc and TenantMigrationDonorAccessBlocker for this migration
- // were removed immediately after expireAt was set. This is unlikely to
- // occur in production where the garbage collection delay should be
- // sufficiently large.
+ // The state doc and TenantMigrationDonorAccessBlocker for this
+ // migration were removed immediately after expireAt was set. This is
+ // unlikely to occur in production where the garbage collection delay
+ // should be sufficiently large.
continue;
}
- if (!_opCtx->writesAreReplicated()) {
- // Setting expireAt implies that the TenantMigrationDonorAccessBlocker for
- // this migration will be removed shortly after this. However, a lagged
- // secondary might not manage to advance its majority commit point past the
- // migration commit or abort opTime and consequently transition out of the
- // blocking state before the TenantMigrationDonorAccessBlocker is removed.
- // When this occurs, blocked reads or writes will be left waiting for the
- // migration decision indefinitely. To avoid that, notify the
- // TenantMigrationDonorAccessBlocker here that the commit or abort opTime
- // has been majority committed (guaranteed to be true since by design the
- // donor never marks its state doc as garbage collectable before the
- // migration decision is majority committed).
+ if (isSecondary(_opCtx)) {
+ // Setting expireAt implies that the TenantMigrationDonorAccessBlocker
+ // for this migration will be removed shortly after this. However, a
+ // lagged secondary might not manage to advance its majority commit
+ // point past the migration commit or abort opTime and consequently
+ // transition out of the blocking state before the
+ // TenantMigrationDonorAccessBlocker is removed. When this occurs,
+ // blocked reads or writes will be left waiting for the migration
+ // decision indefinitely. To avoid that, notify the
+ // TenantMigrationDonorAccessBlocker here that the commit or abort
+ // opTime has been majority committed (guaranteed to be true since by
+ // design the donor never marks its state doc as garbage collectable
+ // before the migration decision is majority committed).
mtab->onMajorityCommitPointUpdate(
_donorStateDoc.getCommitOrAbortOpTime().get());
}
if (_donorStateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
invariant(mtab->inStateAborted());
- // The migration durably aborted and is now marked as garbage collectable,
- // remove its TenantMigrationDonorAccessBlocker right away to allow
- // back-to-back migration retries.
+ // The migration durably aborted and is now marked as garbage
+ // collectable, remove its TenantMigrationDonorAccessBlocker right away
+ // to allow back-to-back migration retries.
TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext())
.remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor);
}
@@ -305,7 +330,7 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx,
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
bool fromMigrate) {
- if (nss != NamespaceString::kTenantSplitDonorsNamespace ||
+ if (nss != NamespaceString::kShardSplitDonorsNamespace ||
tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
return;
}
@@ -313,45 +338,41 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx,
for (auto it = first; it != last; it++) {
auto donorStateDoc = parseAndValidateDonorDocument(it->doc);
switch (donorStateDoc.getState()) {
- case ShardSplitDonorStateEnum::kBlocking:
- onBlockerInitialization(opCtx, donorStateDoc);
+ case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ onTransitionToAbortingIndexBuilds(opCtx, donorStateDoc);
break;
case ShardSplitDonorStateEnum::kAborted:
// If the operation starts aborted, do not do anything.
break;
- case ShardSplitDonorStateEnum::kUninitialized:
- case ShardSplitDonorStateEnum::kCommitted:
- uasserted(ErrorCodes::IllegalOperation,
- "cannot insert a donor's state doc with 'state' other than 'kAborted' or "
- "'kBlocking'");
- break;
default:
- MONGO_UNREACHABLE;
+ uasserted(ErrorCodes::IllegalOperation,
+ "Cannot insert donor's state document with state other than 'aborted' or "
+ "'aborting index builds'.");
}
}
}
void ShardSplitDonorOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
- if (args.nss != NamespaceString::kTenantSplitDonorsNamespace ||
+ if (args.nss != NamespaceString::kShardSplitDonorsNamespace ||
tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
return;
}
auto donorStateDoc = parseAndValidateDonorDocument(args.updateArgs->updatedDoc);
switch (donorStateDoc.getState()) {
+ case ShardSplitDonorStateEnum::kBlocking:
+ onTransitionToBlocking(opCtx, donorStateDoc);
+ break;
case ShardSplitDonorStateEnum::kCommitted:
case ShardSplitDonorStateEnum::kAborted:
opCtx->recoveryUnit()->registerChange(
std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(opCtx, donorStateDoc));
break;
- case ShardSplitDonorStateEnum::kBlocking:
- uasserted(ErrorCodes::IllegalOperation,
- "The state document should be inserted as blocking and never transition to "
- "blocking");
- break;
default:
- MONGO_UNREACHABLE;
+ uasserted(ErrorCodes::IllegalOperation,
+ "Cannot update donor's state document with state other than 'aborted', "
+ "'committed', or 'aborted'");
}
}
@@ -359,13 +380,12 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx,
NamespaceString const& nss,
const UUID& uuid,
BSONObj const& doc) {
- if (nss != NamespaceString::kTenantSplitDonorsNamespace ||
+ if (nss != NamespaceString::kShardSplitDonorsNamespace ||
tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
return;
}
auto donorStateDoc = parseAndValidateDonorDocument(doc);
-
uassert(ErrorCodes::IllegalOperation,
str::stream() << "cannot delete a donor's state document " << doc
<< " since it has not been marked as garbage collectable and is not a"
@@ -390,8 +410,7 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx,
const UUID& uuid,
StmtId stmtId,
const OplogDeleteEntryArgs& args) {
- if (nss != NamespaceString::kTenantSplitDonorsNamespace ||
- !tenantIdsToDeleteDecoration(opCtx) ||
+ if (nss != NamespaceString::kShardSplitDonorsNamespace || !tenantIdsToDeleteDecoration(opCtx) ||
tenant_migration_access_blocker::inRecoveryMode(opCtx)) {
return;
}
@@ -414,7 +433,7 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx
const UUID& uuid,
std::uint64_t numRecords,
const CollectionDropType dropType) {
- if (collectionName == NamespaceString::kTenantSplitDonorsNamespace) {
+ if (collectionName == NamespaceString::kShardSplitDonorsNamespace) {
opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) {
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
.removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor);
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
index c52868126e6..6f2e376de47 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/serverless/shard_split_donor_op_observer.h"
#include "mongo/db/serverless/shard_split_state_machine_gen.h"
#include "mongo/db/serverless/shard_split_test_utils.h"
+#include "mongo/db/serverless/shard_split_utils.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/dbtests/mock/mock_replica_set.h"
@@ -129,7 +130,8 @@ protected:
std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>>
createBlockersAndStartBlockingWrites(const std::vector<std::string>& tenants,
OperationContext* opCtx,
- const std::string& connectionStr) {
+ const std::string& connectionStr,
+ bool isSecondary = false) {
auto uuid = UUID::gen();
std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> blockers;
for (const auto& tenant : tenants) {
@@ -141,7 +143,10 @@ protected:
_connectionStr);
blockers.push_back(mtab);
- mtab->startBlockingWrites();
+ if (!isSecondary) {
+ mtab->startBlockingWrites();
+ }
+
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenant, mtab);
}
@@ -160,7 +165,7 @@ protected:
MockReplicaSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
MockReplicaSet _recipientReplSet =
MockReplicaSet("recipientSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */);
- const NamespaceString _nss = NamespaceString::kTenantSplitDonorsNamespace;
+ const NamespaceString _nss = NamespaceString::kShardSplitDonorsNamespace;
std::vector<std::string> _tenantIds = {"tenant1", "tenantAB"};
std::string _connectionStr = _replSet.getConnectionString();
UUID _uuid = UUID::gen();
@@ -253,7 +258,30 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertValidAbortedDocument) {
}
}
-TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentPrimary) {
+TEST_F(ShardSplitDonorOpObserverTest, InsertAbortingIndexDocumentPrimary) {
+ test::shard_split::reconfigToAddRecipientNodes(
+ getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts());
+
+ auto stateDocument = defaultStateDocument();
+ stateDocument.setState(ShardSplitDonorStateEnum::kAbortingIndexBuilds);
+ stateDocument.setRecipientConnectionString(mongo::serverless::makeRecipientConnectionString(
+ repl::ReplicationCoordinator::get(_opCtx.get())->getConfig(),
+ _recipientTagName,
+ _recipientSetName));
+
+ auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) {
+ ASSERT_TRUE(mtab);
+ // The OpObserver does not set the mtab to blocking for primaries.
+ ASSERT_OK(mtab->checkIfCanWrite(Timestamp(1, 1)));
+ ASSERT_OK(mtab->checkIfCanWrite(Timestamp(1, 3)));
+ ASSERT_OK(mtab->checkIfLinearizableReadWasAllowed(opCtx));
+ ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict);
+ };
+
+ runInsertTestCase(stateDocument, _tenantIds, mtabVerifier);
+}
+
+TEST_F(ShardSplitDonorOpObserverTest, UpdateBlockingDocumentPrimary) {
test::shard_split::reconfigToAddRecipientNodes(
getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts());
@@ -274,15 +302,16 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentPrimary) {
ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict);
};
- runInsertTestCase(stateDocument, _tenantIds, mtabVerifier);
+ runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
}
-TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentSecondary) {
+TEST_F(ShardSplitDonorOpObserverTest, UpdateBlockingDocumentSecondary) {
test::shard_split::reconfigToAddRecipientNodes(
getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts());
// This indicates the instance is secondary for the OpObserver.
repl::UnreplicatedWritesBlock setSecondary(_opCtx.get());
+ createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr, true);
auto stateDocument = defaultStateDocument();
stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
@@ -299,18 +328,15 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentSecondary) {
ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict);
};
- runInsertTestCase(stateDocument, _tenantIds, mtabVerifier);
+ runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier);
}
-
-TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingFail) {
+TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbortingIndexBuildsFail) {
// This indicates the instance is secondary for the OpObserver.
repl::UnreplicatedWritesBlock setSecondary(_opCtx.get());
auto stateDocument = defaultStateDocument();
- stateDocument.setState(ShardSplitDonorStateEnum::kBlocking);
- stateDocument.setBlockTimestamp(Timestamp(1, 1));
-
+ stateDocument.setState(ShardSplitDonorStateEnum::kAbortingIndexBuilds);
CollectionUpdateArgs updateArgs;
updateArgs.stmtIds = {};
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index e2a039fbab0..deb78f1779b 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -74,51 +74,10 @@ MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterReceivingAbortCmd);
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
-bool shouldStopInsertingDonorStateDoc(Status status) {
- return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress;
-}
-
-void setStateDocTimestamps(WithLock,
- ShardSplitDonorStateEnum nextState,
- repl::OpTime time,
- ShardSplitDonorDocument& stateDoc) {
- switch (nextState) {
- case ShardSplitDonorStateEnum::kUninitialized:
- break;
- case ShardSplitDonorStateEnum::kBlocking:
- stateDoc.setBlockTimestamp(time.getTimestamp());
- break;
- case ShardSplitDonorStateEnum::kAborted:
- stateDoc.setCommitOrAbortOpTime(time);
- break;
- case ShardSplitDonorStateEnum::kCommitted:
- stateDoc.setCommitOrAbortOpTime(time);
- break;
- default:
- MONGO_UNREACHABLE;
- }
-}
-
bool isAbortedDocumentPersistent(WithLock, ShardSplitDonorDocument& stateDoc) {
return !!stateDoc.getAbortReason();
}
-void setMtabToBlockingForTenants(ServiceContext* context,
- OperationContext* opCtx,
- const std::vector<StringData>& tenantIds) {
- // Start blocking writes before getting an oplog slot to guarantee no
- // writes to the tenant's data can commit with a timestamp after the
- // block timestamp.
- for (const auto& tenantId : tenantIds) {
- auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(context,
- tenantId);
- invariant(mtab);
- mtab->startBlockingWrites();
-
- opCtx->recoveryUnit()->onRollback([mtab] { mtab->rollBackStartBlocking(); });
- }
-}
-
void checkForTokenInterrupt(const CancellationToken& token) {
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
}
@@ -423,19 +382,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
// Note we do not use the abort split token here because the abortShardSplit
// command waits for a decision to be persisted which will not happen if
// inserting the initial state document fails.
- if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) {
- pauseShardSplitBeforeBlockingState.pauseWhileSet();
- }
- return _enterBlockingOrAbortedState(executor, primaryToken, abortToken);
+ return _enterAbortIndexBuildsOrAbortedState(executor, primaryToken, abortToken);
+ })
+ .then([this, executor, abortToken] {
+ // Start tracking the abortToken for killing operation contexts
+ _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
+ return _abortIndexBuildsAndEnterBlockingState(executor, abortToken);
})
.then([this, executor, abortToken, criticalSectionTimer] {
criticalSectionTimer->reset();
- checkForTokenInterrupt(abortToken);
- _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
- _abortIndexBuilds(abortToken);
- })
- .then([this, executor, abortToken] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get());
@@ -583,6 +539,143 @@ bool ShardSplitDonorService::DonorStateMachine::_hasInstalledSplitConfig(WithLoc
config.getRecipientConfig()->getReplSetName() == *_stateDoc.getRecipientSetName();
}
+ConnectionString ShardSplitDonorService::DonorStateMachine::_setupAcceptanceMonitoring(
+ WithLock lock, const CancellationToken& abortToken) {
+ auto recipientConnectionString = [stateDoc = _stateDoc]() {
+ if (stateDoc.getRecipientConnectionString()) {
+ return *stateDoc.getRecipientConnectionString();
+ }
+
+ auto recipientTagName = stateDoc.getRecipientTagName();
+ invariant(recipientTagName);
+ auto recipientSetName = stateDoc.getRecipientSetName();
+ invariant(recipientSetName);
+ auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
+ return serverless::makeRecipientConnectionString(
+ config, *recipientTagName, *recipientSetName);
+ }();
+
+ // Always start the replica set monitor if we haven't reached a decision yet
+ _splitAcceptancePromise.setWith([&]() -> Future<void> {
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking ||
+ MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) {
+ return SemiFuture<void>::makeReady().unsafeToInlineFuture();
+ }
+
+ // Optionally select a task executor for unit testing
+ auto executor = _splitAcceptanceTaskExecutorForTest
+ ? *_splitAcceptanceTaskExecutorForTest
+ : _shardSplitService->getInstanceCleanupExecutor();
+
+ LOGV2(6142508,
+ "Monitoring recipient nodes for split acceptance.",
+ "id"_attr = _migrationId,
+ "recipientConnectionString"_attr = recipientConnectionString);
+
+ return detail::makeRecipientAcceptSplitFuture(
+ executor, abortToken, recipientConnectionString, _migrationId)
+ .unsafeToInlineFuture();
+ });
+
+ return recipientConnectionString;
+}
+
+ExecutorFuture<void>
+ShardSplitDonorService::DonorStateMachine::_enterAbortIndexBuildsOrAbortedState(
+ const ScopedTaskExecutorPtr& executor,
+ const CancellationToken& primaryToken,
+ const CancellationToken& abortToken) {
+ ShardSplitDonorStateEnum nextState;
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
+ if (isAbortedDocumentPersistent(lg, _stateDoc)) {
+ // Node has step up and created an instance using a document in abort state. No
+ // need to write the document as it already exists.
+ return ExecutorFuture(**executor);
+ }
+
+ _abortReason =
+ Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'.");
+ BSONObjBuilder bob;
+ _abortReason->serializeErrorToBSON(&bob);
+ _stateDoc.setAbortReason(bob.obj());
+ _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
+ nextState = ShardSplitDonorStateEnum::kAborted;
+
+ LOGV2(6670500, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId());
+ } else {
+ // Always set up acceptance monitoring.
+ auto recipientConnectionString = _setupAcceptanceMonitoring(lg, abortToken);
+
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) {
+ // Node has stepped up and resumed a shard split. No need to write the document as
+ // it already exists.
+ return ExecutorFuture(**executor);
+ }
+
+ _stateDoc.setRecipientConnectionString(recipientConnectionString);
+ nextState = ShardSplitDonorStateEnum::kAbortingIndexBuilds;
+
+ LOGV2(
+ 6670501, "Entering 'aborting index builds' state.", "id"_attr = _stateDoc.getId());
+ }
+ }
+
+ return _updateStateDocument(executor, primaryToken, nextState)
+ .then([this, executor, primaryToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken);
+ })
+ .then([this, executor, nextState]() {
+ uassert(ErrorCodes::TenantMigrationAborted,
+ "Shard split operation aborted.",
+ nextState != ShardSplitDonorStateEnum::kAborted);
+ });
+}
+
+ExecutorFuture<void>
+ShardSplitDonorService::DonorStateMachine::_abortIndexBuildsAndEnterBlockingState(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
+ checkForTokenInterrupt(abortToken);
+
+ boost::optional<std::vector<StringData>> tenantIds;
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kAbortingIndexBuilds) {
+ return ExecutorFuture(**executor);
+ }
+
+ tenantIds = _stateDoc.getTenantIds();
+ invariant(tenantIds);
+ }
+
+ LOGV2(6436100, "Aborting index builds for shard split.", "id"_attr = _migrationId);
+
+ // Abort any in-progress index builds. No new index builds can start while we are doing this
+ // because the mtab prevents it.
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get());
+ for (const auto& tenantId : *tenantIds) {
+ indexBuildsCoordinator->abortTenantIndexBuilds(
+ opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split");
+ }
+
+ if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) {
+ pauseShardSplitBeforeBlockingState.pauseWhileSet();
+ }
+
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId());
+ }
+
+ return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kBlocking)
+ .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken);
+ });
+}
+
ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestamp(
const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
@@ -795,170 +888,80 @@ ShardSplitDonorService::DonorStateMachine::_triggerElectionAndEnterCommitedState
});
}
-ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState(
- const ScopedTaskExecutorPtr& executor,
- const CancellationToken& primaryToken,
- const CancellationToken& abortToken) {
- ShardSplitDonorStateEnum nextState;
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
- if (isAbortedDocumentPersistent(lg, _stateDoc)) {
- // Node has step up and created an instance using a document in abort state. No
- // need to write the document as it already exists.
- return ExecutorFuture(**executor);
- }
-
- _abortReason =
- Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'.");
- BSONObjBuilder bob;
- _abortReason->serializeErrorToBSON(&bob);
- _stateDoc.setAbortReason(bob.obj());
- _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
- Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
- nextState = ShardSplitDonorStateEnum::kAborted;
-
- LOGV2(8423355, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId());
- } else {
- auto recipientConnectionString = [stateDoc = _stateDoc]() {
- if (stateDoc.getRecipientConnectionString()) {
- return *stateDoc.getRecipientConnectionString();
- }
-
- auto recipientTagName = stateDoc.getRecipientTagName();
- invariant(recipientTagName);
- auto recipientSetName = stateDoc.getRecipientSetName();
- invariant(recipientSetName);
- auto config =
- repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
- return serverless::makeRecipientConnectionString(
- config, *recipientTagName, *recipientSetName);
- }();
-
- // Always start the replica set monitor if we haven't reached a decision yet
- _splitAcceptancePromise.setWith([&]() -> Future<void> {
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking ||
- MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) {
- return SemiFuture<void>::makeReady().unsafeToInlineFuture();
- }
-
- // Optionally select a task executor for unit testing
- auto executor = _splitAcceptanceTaskExecutorForTest
- ? *_splitAcceptanceTaskExecutorForTest
- : _shardSplitService->getInstanceCleanupExecutor();
-
- LOGV2(6142508,
- "Monitoring recipient nodes for split acceptance.",
- "id"_attr = _migrationId,
- "recipientConnectionString"_attr = recipientConnectionString);
-
- return detail::makeRecipientAcceptSplitFuture(
- executor, abortToken, recipientConnectionString, _migrationId)
- .unsafeToInlineFuture();
- });
-
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) {
- // Node has step up and resumed a shard split. No need to write the document as
- // it already exists.
- return ExecutorFuture(**executor);
- }
-
- // Otherwise, record the recipient connection string
- _stateDoc.setRecipientConnectionString(recipientConnectionString);
- _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking);
- nextState = ShardSplitDonorStateEnum::kBlocking;
-
- LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId());
- }
- }
-
- return AsyncTry([this, nextState, uuid = _migrationId]() {
- auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto opCtx = opCtxHolder.get();
-
- AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
-
- writeConflictRetry(
- opCtx, "ShardSplitDonorInsertStateDoc", _stateDocumentsNS.ns(), [&] {
- const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid);
- const auto getUpdatedStateDocBson = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- return _stateDoc.toBSON();
- };
-
- WriteUnitOfWork wuow(opCtx);
- if (nextState == ShardSplitDonorStateEnum::kBlocking) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- insertTenantAccessBlocker(lg, opCtx, _stateDoc);
-
- auto tenantIds = _stateDoc.getTenantIds();
- invariant(tenantIds);
- setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get());
- }
-
- // Reserve an opTime for the write.
- auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
- setStateDocTimestamps(
- stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc);
-
- auto updateResult = Helpers::upsert(opCtx,
- _stateDocumentsNS.ns(),
- filter,
- getUpdatedStateDocBson(),
- /*fromMigrate=*/false);
-
-
- // We only want to insert, not modify, document
- invariant(updateResult.numMatched == 0);
- wuow.commit();
- });
-
- return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- })
- .until([](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
- })
- .withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, primaryToken)
- .then([this, executor, primaryToken](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken);
- })
- .then([this, executor, nextState]() {
- uassert(ErrorCodes::TenantMigrationAborted,
- "Shard split operation aborted.",
- nextState != ShardSplitDonorStateEnum::kAborted);
- });
-}
-
ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateStateDocument(
const ScopedTaskExecutorPtr& executor,
const CancellationToken& token,
ShardSplitDonorStateEnum nextState) {
- auto tenantIds = [&]() {
+ auto [tenantIds, isInsert] = [&]() {
stdx::lock_guard<Latch> lg(_mutex);
- _stateDoc.setState(nextState);
-
- return _stateDoc.getTenantIds();
+ auto isInsert = _stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized ||
+ _stateDoc.getState() == ShardSplitDonorStateEnum::kAborted;
+ return std::make_pair(_stateDoc.getTenantIds(), isInsert);
}();
- return AsyncTry([this, tenantIds = std::move(tenantIds), uuid = _migrationId, nextState] {
+ return AsyncTry([this,
+ tenantIds = std::move(tenantIds),
+ isInsert = isInsert,
+ uuid = _migrationId,
+ nextState] {
auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto opCtx = opCtxHolder.get();
AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << _stateDocumentsNS.ns() << " does not exist",
- collection);
+
+ if (!isInsert) {
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << _stateDocumentsNS.ns() << " does not exist",
+ collection);
+ }
writeConflictRetry(
- opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] {
+ opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&]() {
WriteUnitOfWork wuow(opCtx);
+ if (nextState == ShardSplitDonorStateEnum::kBlocking) {
+ // Start blocking writes before getting an oplog slot to guarantee no
+ // writes to the tenant's data can commit with a timestamp after the
+ // block timestamp.
+ for (const auto& tenantId : *tenantIds) {
+ auto mtab = tenant_migration_access_blocker::
+ getTenantMigrationDonorAccessBlocker(_serviceContext, tenantId);
+ invariant(mtab);
+ mtab->startBlockingWrites();
+
+ opCtx->recoveryUnit()->onRollback(
+ [mtab] { mtab->rollBackStartBlocking(); });
+ }
+ }
+
// Reserve an opTime for the write.
auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
- setStateDocTimestamps(
- stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc);
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _stateDoc.setState(nextState);
+ switch (nextState) {
+ case ShardSplitDonorStateEnum::kUninitialized:
+ case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ break;
+ case ShardSplitDonorStateEnum::kBlocking:
+ _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp());
+ break;
+ case ShardSplitDonorStateEnum::kCommitted:
+ _stateDoc.setCommitOrAbortOpTime(oplogSlot);
+ break;
+ case ShardSplitDonorStateEnum::kAborted: {
+ _stateDoc.setCommitOrAbortOpTime(oplogSlot);
+
+ invariant(_abortReason);
+ BSONObjBuilder bob;
+ _abortReason.get().serializeErrorToBSON(&bob);
+ _stateDoc.setAbortReason(bob.obj());
+ break;
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid);
const auto updatedStateDocBson = [&]() {
@@ -971,15 +974,19 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
updatedStateDocBson,
/*fromMigrate=*/false);
- invariant(updateResult.numDocsModified == 1);
+ if (isInsert) {
+ invariant(!updateResult.existing);
+ invariant(!updateResult.upsertedId.isEmpty());
+ } else {
+ invariant(updateResult.numDocsModified == 1);
+ }
+
wuow.commit();
});
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
- })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -1177,30 +1184,4 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientS
.on(**executor, primaryToken)
.ignoreValue();
}
-
-void ShardSplitDonorService::DonorStateMachine::_abortIndexBuilds(
- const CancellationToken& abortToken) {
- checkForTokenInterrupt(abortToken);
-
- boost::optional<std::vector<StringData>> tenantIds;
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) {
- return;
- }
- tenantIds = _stateDoc.getTenantIds();
- invariant(tenantIds);
- }
-
- LOGV2(6436100, "Aborting index build for shard split.", "id"_attr = _migrationId);
-
- // Before applying the split config, abort any in-progress index builds. No new index builds
- // can start while we are doing this because the mtab prevents it.
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get());
- for (const auto& tenantId : *tenantIds) {
- indexBuildsCoordinator->abortTenantIndexBuilds(
- opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split");
- }
-}
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h
index 40ed22f96a5..bf1548527dc 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.h
+++ b/src/mongo/db/serverless/shard_split_donor_service.h
@@ -56,7 +56,7 @@ public:
}
NamespaceString getStateDocumentsNS() const override {
- return NamespaceString::kTenantSplitDonorsNamespace;
+ return NamespaceString::kShardSplitDonorsNamespace;
}
ThreadPool::Limits getThreadPoolLimits() const override;
@@ -156,9 +156,12 @@ public:
private:
// Tasks
- ExecutorFuture<void> _enterBlockingOrAbortedState(const ScopedTaskExecutorPtr& executor,
- const CancellationToken& primaryToken,
- const CancellationToken& abortToken);
+ ExecutorFuture<void> _enterAbortIndexBuildsOrAbortedState(const ScopedTaskExecutorPtr& executor,
+ const CancellationToken& primaryToken,
+ const CancellationToken& abortToken);
+
+ ExecutorFuture<void> _abortIndexBuildsAndEnterBlockingState(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken);
ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp(
const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken);
@@ -195,7 +198,7 @@ private:
void _initiateTimeout(const ScopedTaskExecutorPtr& executor,
const CancellationToken& abortToken);
-
+ ConnectionString _setupAcceptanceMonitoring(WithLock lock, const CancellationToken& abortToken);
bool _hasInstalledSplitConfig(WithLock lock);
/*
@@ -205,10 +208,8 @@ private:
ExecutorFuture<void> _cleanRecipientStateDoc(const ScopedTaskExecutorPtr& executor,
const CancellationToken& token);
- void _abortIndexBuilds(const CancellationToken& abortToken);
-
private:
- const NamespaceString _stateDocumentsNS = NamespaceString::kTenantSplitDonorsNamespace;
+ const NamespaceString _stateDocumentsNS = NamespaceString::kShardSplitDonorsNamespace;
mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardSplitDonorService::_mutex");
const UUID _migrationId;
diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
index f061e686c13..5824029d097 100644
--- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp
@@ -90,11 +90,11 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx,
const UUID& shardSplitId) {
// Use kLastApplied so that we can read the state document as a secondary.
ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kLastApplied);
- AutoGetCollectionForRead collection(opCtx, NamespaceString::kTenantSplitDonorsNamespace);
+ AutoGetCollectionForRead collection(opCtx, NamespaceString::kShardSplitDonorsNamespace);
if (!collection) {
return Status(ErrorCodes::NamespaceNotFound,
str::stream() << "Collection not found looking for state document: "
- << NamespaceString::kTenantSplitDonorsNamespace.ns());
+ << NamespaceString::kShardSplitDonorsNamespace.ns());
}
BSONObj result;
@@ -192,6 +192,9 @@ std::ostringstream& operator<<(std::ostringstream& builder,
case mongo::ShardSplitDonorStateEnum::kUninitialized:
builder << "kUninitialized";
break;
+ case mongo::ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ builder << "kAbortingIndexBuilds";
+ break;
case mongo::ShardSplitDonorStateEnum::kAborted:
builder << "kAborted";
break;
@@ -348,8 +351,7 @@ public:
// The database needs to be open before using shard split donor service.
{
auto opCtx = cc().makeOperationContext();
- AutoGetDb autoDb(
- opCtx.get(), NamespaceString::kTenantSplitDonorsNamespace.db(), MODE_X);
+ AutoGetDb autoDb(opCtx.get(), NamespaceString::kShardSplitDonorsNamespace.db(), MODE_X);
auto db = autoDb.ensureDbExists(opCtx.get());
ASSERT_TRUE(db);
}
@@ -484,18 +486,14 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation)
ASSERT_EQ(_uuid, serviceInstance->getId());
waitForMonitorAndProcessHello();
-
waitForReplSetStepUp(Status(ErrorCodes::OK, ""));
auto result = serviceInstance->decisionFuture().get();
-
ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds));
-
ASSERT(!result.abortReason);
ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted);
serviceInstance->tryForget();
-
auto completionFuture = serviceInstance->completionFuture();
completionFuture.wait();
@@ -692,7 +690,7 @@ TEST_F(ShardSplitDonorServiceTest, ReconfigToRemoveSplitConfig) {
}
// Abort scenario : abortSplit called before startSplit.
-TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) {
+TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortedState) {
auto opCtx = makeOperationContext();
auto serviceContext = getServiceContext();
@@ -1067,4 +1065,49 @@ TEST_F(ShardSplitRecipientCleanupTest, ShardSplitRecipientCleanup) {
ErrorCodes::NoMatchingDocument);
}
+class ShardSplitAbortedStepUpTest : public ShardSplitPersistenceTest {
+public:
+ repl::ReplSetConfig initialDonorConfig() override {
+ BSONArrayBuilder members;
+ members.append(BSON("_id" << 1 << "host"
+ << "node1"));
+
+ return repl::ReplSetConfig::parse(BSON("_id"
+ << "donorSetName"
+ << "version" << 1 << "protocolVersion" << 1
+ << "members" << members.arr()));
+ }
+
+ ShardSplitDonorDocument initialStateDocument() override {
+
+ auto stateDocument = defaultStateDocument();
+
+ stateDocument.setState(mongo::ShardSplitDonorStateEnum::kAborted);
+ stateDocument.setBlockTimestamp(Timestamp(1, 1));
+ stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1));
+
+ Status status(ErrorCodes::InternalError, abortReason);
+ BSONObjBuilder bob;
+ status.serializeErrorToBSON(&bob);
+ stateDocument.setAbortReason(bob.obj());
+
+ return stateDocument;
+ }
+
+ std::string abortReason{"Testing simulated error"};
+};
+
+TEST_F(ShardSplitAbortedStepUpTest, ShardSplitAbortedStepUp) {
+ auto opCtx = makeOperationContext();
+ auto splitService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext())
+ ->lookupServiceByName(ShardSplitDonorService::kServiceName);
+ auto optionalDonor = ShardSplitDonorService::DonorStateMachine::lookup(
+ opCtx.get(), splitService, BSON("_id" << _uuid));
+
+ ASSERT(optionalDonor);
+ auto result = optionalDonor->get()->decisionFuture().get();
+
+ ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl
index 8aa65017c1b..ee3462f5a05 100644
--- a/src/mongo/db/serverless/shard_split_state_machine.idl
+++ b/src/mongo/db/serverless/shard_split_state_machine.idl
@@ -40,6 +40,7 @@ enums:
type: string
values:
kUninitialized: "uninitialized"
+ kAbortingIndexBuilds: "aborting index builds"
kBlocking: "blocking"
kCommitted: "committed"
kAborted: "aborted"
diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp
index b9bb407220d..041c133b02b 100644
--- a/src/mongo/db/serverless/shard_split_utils.cpp
+++ b/src/mongo/db/serverless/shard_split_utils.cpp
@@ -149,7 +149,7 @@ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config,
}
Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) {
- const auto nss = NamespaceString::kTenantSplitDonorsNamespace;
+ const auto nss = NamespaceString::kShardSplitDonorsNamespace;
AutoGetCollection collection(opCtx, nss, MODE_IX);
uassert(ErrorCodes::PrimarySteppedDown,
@@ -176,7 +176,7 @@ Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st
}
Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) {
- const auto nss = NamespaceString::kTenantSplitDonorsNamespace;
+ const auto nss = NamespaceString::kShardSplitDonorsNamespace;
AutoGetCollection collection(opCtx, nss, MODE_IX);
if (!collection) {
@@ -198,7 +198,7 @@ Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st
}
StatusWith<bool> deleteStateDoc(OperationContext* opCtx, const UUID& shardSplitId) {
- const auto nss = NamespaceString::kTenantSplitDonorsNamespace;
+ const auto nss = NamespaceString::kShardSplitDonorsNamespace;
AutoGetCollection collection(opCtx, nss, MODE_IX);
if (!collection) {
diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h
index b58f24b5a1a..2d9ab8402e7 100644
--- a/src/mongo/db/serverless/shard_split_utils.h
+++ b/src/mongo/db/serverless/shard_split_utils.h
@@ -64,7 +64,7 @@ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config,
/**
* Inserts the shard split state document 'stateDoc' into
- * 'config.tenantSplitDonors' collection. Also, creates the collection if not present
+ * 'config.shardSplitDonors' collection. Also, creates the collection if not present
* before inserting the document.
*
* NOTE: A state doc might get inserted based on a decision made out of a stale read within a