diff options
author | Didier Nadeau <didier.nadeau@mongodb.com> | 2022-06-15 18:01:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-15 20:18:42 +0000 |
commit | bfd0810609bf8213c92d004313b87a68b3b66981 (patch) | |
tree | 5ea07d5e26fc9148eeb8831c4efc119de14f3617 | |
parent | fecef7a1f75e196a24715fabb0721124e71e170b (diff) | |
download | mongo-bfd0810609bf8213c92d004313b87a68b3b66981.tar.gz |
SERVER-66705 Move aborting index builds out of critical section
17 files changed, 429 insertions, 363 deletions
diff --git a/buildscripts/resmokelib/testing/hooks/shard_split.py b/buildscripts/resmokelib/testing/hooks/shard_split.py index d7cb0d05a72..10748f6e1e4 100644 --- a/buildscripts/resmokelib/testing/hooks/shard_split.py +++ b/buildscripts/resmokelib/testing/hooks/shard_split.py @@ -473,7 +473,7 @@ class _ShardSplitThread(threading.Thread): # pylint: disable=too-many-instance- while True: try: res = donor_node_client.config.command({ - "count": "tenantSplitDonors", + "count": "shardSplitDonors", "query": {"tenantIds": split_opts.tenant_ids} }) if res["n"] == 0: @@ -502,7 +502,7 @@ class _ShardSplitThread(threading.Thread): # pylint: disable=too-many-instance- while True: try: res = recipient_node_client.config.command({ - "count": "tenantSplitDonors", + "count": "shardSplitDonors", "query": {"tenantIds": split_opts.tenant_ids} }) if res["n"] == 0: diff --git a/jstests/libs/override_methods/inject_tenant_prefix.js b/jstests/libs/override_methods/inject_tenant_prefix.js index 92749a1b9de..2d46f138291 100644 --- a/jstests/libs/override_methods/inject_tenant_prefix.js +++ b/jstests/libs/override_methods/inject_tenant_prefix.js @@ -435,7 +435,7 @@ function convertServerConnectionStringToURI(input) { * that there is only one such operation. */ function getOperationStateDocument(conn) { - const collection = isShardSplitPassthrough() ? "tenantSplitDonors" : "tenantMigrationDonors"; + const collection = isShardSplitPassthrough() ? "shardSplitDonors" : "tenantMigrationDonors"; const filter = isShardSplitPassthrough() ? {tenantIds: TestData.tenantIds} : {tenantId: TestData.tenantId}; const findRes = assert.commandWorked( diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js index bba2a651cc7..6017c17c756 100644 --- a/jstests/serverless/libs/basic_serverless_test.js +++ b/jstests/serverless/libs/basic_serverless_test.js @@ -347,7 +347,7 @@ class BasicServerlessTest { /* * Wait for state document garbage collection by polling for when the document has been removed - * from the tenantSplitDonors namespace, and all access blockers have been removed. + * from the 'shardSplitDonors' namespace, and all access blockers have been removed. * @param {migrationId} id that was used for the commitShardSplit command. * @param {tenantIds} tenant ids of the shard split. */ @@ -538,7 +538,7 @@ class BasicServerlessTest { } } -BasicServerlessTest.kConfigSplitDonorsNS = "config.tenantSplitDonors"; +BasicServerlessTest.kConfigSplitDonorsNS = "config.shardSplitDonors"; BasicServerlessTest.DonorState = { kUninitialized: "uninitialized", kBlocking: "blocking", diff --git a/jstests/serverless/shard_split_performance_test.js b/jstests/serverless/shard_split_performance_test.js index cb5d0dd20cb..2633d804dec 100644 --- a/jstests/serverless/shard_split_performance_test.js +++ b/jstests/serverless/shard_split_performance_test.js @@ -8,7 +8,6 @@ load("jstests/serverless/libs/basic_serverless_test.js"); load("jstests/replsets/rslib.js"); const kBlockStart = "Entering 'blocking' state."; -const kAbortingIndex = "Aborting index build for shard split."; const kReconfig = "Applying the split config"; const kWaitForRecipients = "Waiting for recipient to accept the split."; const kEndMsg = "Shard split decision reached"; @@ -82,18 +81,15 @@ function runOneSplit() { assertMigrationState(test.donor.getPrimary(), operation.migrationId, "committed"); const blockTS = extractTs(checkLog.getLogMessage(primary, kBlockStart)); - const abortingTS = extractTs(checkLog.getLogMessage(primary, kAbortingIndex)); const reconfigTS = extractTs(checkLog.getLogMessage(primary, kReconfig)); const waitForRecipientsTS = extractTs(checkLog.getLogMessage(primary, kWaitForRecipients)); const endTS = extractTs(checkLog.getLogMessage(primary, kEndMsg)); const blockDurationMs = endTS - blockTS; - const abortingIndexDurationMs = endTS - abortingTS; const waitForRecipientsDurationMs = endTS - waitForRecipientsTS; const reconfigDurationMs = endTS - reconfigTS; - const splitResult = - {blockDurationMs, abortingIndexDurationMs, reconfigDurationMs, waitForRecipientsDurationMs}; + const splitResult = {blockDurationMs, reconfigDurationMs, waitForRecipientsDurationMs}; jsTestLog(`Performance result of shard split: ${tojson(splitResult)}`); const maximumReconfigDuration = 500; diff --git a/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js b/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js index ac41ae32af2..bf5dd5be8f6 100644 --- a/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js +++ b/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js @@ -194,7 +194,7 @@ const kCollName = "testColl"; // Cannot mark the state doc as garbage collectable before the migration commits or aborts. assert.commandFailedWithCode(donorsColl.update({recipientSetName: operation.recipientSetName}, {$set: {expireAt: new Date()}}), - ErrorCodes.IllegalOperation); + ErrorCodes.BadValue); // Can drop the state doc collection but this will not cause all blocked reads and writes to // hang. diff --git a/jstests/serverless/shard_split_write_during_split_stepdown.js b/jstests/serverless/shard_split_write_during_split_stepdown.js index d147f09412b..1670b5b57b3 100644 --- a/jstests/serverless/shard_split_write_during_split_stepdown.js +++ b/jstests/serverless/shard_split_write_during_split_stepdown.js @@ -38,12 +38,9 @@ tenantIds.forEach(id => { [{_id: 0, x: 0}, {_id: 1, x: 1}, {_id: 2, x: 2}], {writeConcern: {w: "majority"}})); }); -const operation = test.createSplitOperation(tenantIds); - const blockingFP = configureFailPoint(donorPrimary.getDB("admin"), "pauseShardSplitAfterBlocking"); - +const operation = test.createSplitOperation(tenantIds); const splitThread = operation.commitAsync(); - blockingFP.wait(); const donorRst = createRstArgs(test.donor); diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 9d29b806e5d..dd17cf16877 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -95,8 +95,8 @@ const NamespaceString NamespaceString::kTenantMigrationRecipientsNamespace( const NamespaceString NamespaceString::kTenantMigrationOplogView( NamespaceString::kLocalDb, "system.tenantMigration.oplogView"); -const NamespaceString NamespaceString::kTenantSplitDonorsNamespace(NamespaceString::kConfigDb, - "tenantSplitDonors"); +const NamespaceString NamespaceString::kShardSplitDonorsNamespace(NamespaceString::kConfigDb, + "shardSplitDonors"); const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb, "cache.collections"); diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h index 66872635967..c633ecab4fa 100644 --- a/src/mongo/db/namespace_string.h +++ b/src/mongo/db/namespace_string.h @@ -156,7 +156,7 @@ public: static const NamespaceString kTenantMigrationOplogView; // Namespace for storing the persisted state of tenant split donors. - static const NamespaceString kTenantSplitDonorsNamespace; + static const NamespaceString kShardSplitDonorsNamespace; // Namespace for replica set configuration settings. static const NamespaceString kSystemReplSetNamespace; diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index 53e7b24f135..fc693f64c20 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -437,7 +437,7 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) { // Recover TenantMigrationDonorAccessBlockers for ShardSplit. PersistentTaskStore<ShardSplitDonorDocument> shardSplitDonorStore( - NamespaceString::kTenantSplitDonorsNamespace); + NamespaceString::kShardSplitDonorsNamespace); shardSplitDonorStore.forEach(opCtx, {}, [&](const ShardSplitDonorDocument& doc) { // Skip creating a TenantMigrationDonorAccessBlocker for terminal shard split that have been @@ -462,6 +462,8 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) { .add(tenantId.toString(), mtab); switch (doc.getState()) { + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + break; case ShardSplitDonorStateEnum::kBlocking: invariant(doc.getBlockTimestamp()); mtab->startBlockingWrites(); diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index ce1d0e55ddf..9de2da2e33d 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -42,6 +42,10 @@ bool isSecondary(const OperationContext* opCtx) { return !opCtx->writesAreReplicated(); } +bool isPrimary(const OperationContext* opCtx) { + return opCtx->writesAreReplicated(); +} + const auto tenantIdsToDeleteDecoration = OperationContext::declareDecoration<boost::optional<std::vector<std::string>>>(); @@ -50,6 +54,13 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { ShardSplitDonorDocument::parse(IDLParserErrorContext("donorStateDoc"), doc); const std::string errmsg = "Invalid donor state doc, {}: {}"; + if (donorStateDoc.getExpireAt()) { + uassert(ErrorCodes::BadValue, + "Contains 'expireAt' but the split has not committed or aborted", + donorStateDoc.getState() == ShardSplitDonorStateEnum::kCommitted || + donorStateDoc.getState() == ShardSplitDonorStateEnum::kAborted); + } + switch (donorStateDoc.getState()) { case ShardSplitDonorStateEnum::kUninitialized: uassert(ErrorCodes::BadValue, @@ -68,6 +79,12 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { doc.toString()), !donorStateDoc.getAbortReason()); break; + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + uassert(ErrorCodes::BadValue, + errmsg, + !donorStateDoc.getBlockTimestamp() && !donorStateDoc.getCommitOrAbortOpTime() && + !donorStateDoc.getAbortReason()); + break; case ShardSplitDonorStateEnum::kBlocking: uassert(ErrorCodes::BadValue, fmt::format(errmsg, @@ -125,54 +142,61 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { * Initializes the TenantMigrationDonorAccessBlocker for the tenant migration denoted by the given * state doc. */ -void onBlockerInitialization(OperationContext* opCtx, - const ShardSplitDonorDocument& donorStateDoc) { - invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking); - invariant(donorStateDoc.getBlockTimestamp()); - - auto optionalTenants = donorStateDoc.getTenantIds(); - invariant(optionalTenants); - - const auto& tenantIds = optionalTenants.get(); +void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, + const ShardSplitDonorDocument& donorStateDoc) { + invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kAbortingIndexBuilds); + invariant(donorStateDoc.getTenantIds()); + invariant(donorStateDoc.getRecipientConnectionString()); + + auto tenantIds = *donorStateDoc.getTenantIds(); + auto recipientConnectionString = *donorStateDoc.getRecipientConnectionString(); + for (const auto& tenantId : tenantIds) { + auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( + opCtx->getServiceContext(), + donorStateDoc.getId(), + tenantId.toString(), + MigrationProtocolEnum::kMultitenantMigrations, + recipientConnectionString.toString()); + + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab); + } - // The primary create and sets the tenant access blocker to blocking within the - // ShardSplitDonorService. - if (isSecondary(opCtx)) { - auto recipientConnectionString = [stateDoc = donorStateDoc]() { - if (stateDoc.getRecipientConnectionString()) { - return *stateDoc.getRecipientConnectionString(); + if (isPrimary(opCtx)) { + // onRollback is not registered on secondaries since secondaries should not fail to + // apply the write. + opCtx->recoveryUnit()->onRollback([opCtx, tenantIds] { + for (const auto& tenantId : tenantIds) { + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } - - auto recipientTagName = stateDoc.getRecipientTagName(); - invariant(recipientTagName); - auto recipientSetName = stateDoc.getRecipientSetName(); - invariant(recipientSetName); - auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); - return serverless::makeRecipientConnectionString( - config, *recipientTagName, *recipientSetName); - }(); - - for (const auto& tenantId : tenantIds) { - auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( - opCtx->getServiceContext(), - donorStateDoc.getId(), - tenantId.toString(), - MigrationProtocolEnum::kMultitenantMigrations, - recipientConnectionString.toString()); - - TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .add(tenantId, mtab); - - // No rollback handler is necessary as the write should not fail on secondaries. - mtab->startBlockingWrites(); - } + }); } +} - for (const auto& tenantId : tenantIds) { +/** + * Transitions the TenantMigrationDonorAccessBlocker to the blocking state. + */ +void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocument& donorStateDoc) { + invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking); + invariant(donorStateDoc.getBlockTimestamp()); + invariant(donorStateDoc.getTenantIds()); + + auto tenantIds = *donorStateDoc.getTenantIds(); + for (auto tenantId : tenantIds) { auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker( opCtx->getServiceContext(), tenantId); invariant(mtab); + if (isSecondary(opCtx)) { + // A primary calls startBlockingWrites on the TenantMigrationDonorAccessBlocker before + // reserving the OpTime for the "start blocking" write, so only secondaries call + // startBlockingWrites on the TenantMigrationDonorAccessBlocker in the op observer. + mtab->startBlockingWrites(); + } + + // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since + // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog + // hole is filled. mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().get()); } } @@ -206,9 +230,9 @@ void onTransitionToAborted(OperationContext* opCtx, const ShardSplitDonorDocumen auto tenants = donorStateDoc.getTenantIds(); if (!tenants) { - // The only case where there can be no tenants is when the instance is created by the abort - // command. In that case, no tenant migration blockers are created and the state will go - // straight to abort. + // The only case where there can be no tenants is when the instance is created by the + // abort command. In that case, no tenant migration blockers are created and the state + // will go straight to abort. invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized); return; } @@ -242,34 +266,35 @@ public: _opCtx->getServiceContext(), tenantId); if (!mtab) { - // The state doc and TenantMigrationDonorAccessBlocker for this migration - // were removed immediately after expireAt was set. This is unlikely to - // occur in production where the garbage collection delay should be - // sufficiently large. + // The state doc and TenantMigrationDonorAccessBlocker for this + // migration were removed immediately after expireAt was set. This is + // unlikely to occur in production where the garbage collection delay + // should be sufficiently large. continue; } - if (!_opCtx->writesAreReplicated()) { - // Setting expireAt implies that the TenantMigrationDonorAccessBlocker for - // this migration will be removed shortly after this. However, a lagged - // secondary might not manage to advance its majority commit point past the - // migration commit or abort opTime and consequently transition out of the - // blocking state before the TenantMigrationDonorAccessBlocker is removed. - // When this occurs, blocked reads or writes will be left waiting for the - // migration decision indefinitely. To avoid that, notify the - // TenantMigrationDonorAccessBlocker here that the commit or abort opTime - // has been majority committed (guaranteed to be true since by design the - // donor never marks its state doc as garbage collectable before the - // migration decision is majority committed). + if (isSecondary(_opCtx)) { + // Setting expireAt implies that the TenantMigrationDonorAccessBlocker + // for this migration will be removed shortly after this. However, a + // lagged secondary might not manage to advance its majority commit + // point past the migration commit or abort opTime and consequently + // transition out of the blocking state before the + // TenantMigrationDonorAccessBlocker is removed. When this occurs, + // blocked reads or writes will be left waiting for the migration + // decision indefinitely. To avoid that, notify the + // TenantMigrationDonorAccessBlocker here that the commit or abort + // opTime has been majority committed (guaranteed to be true since by + // design the donor never marks its state doc as garbage collectable + // before the migration decision is majority committed). mtab->onMajorityCommitPointUpdate( _donorStateDoc.getCommitOrAbortOpTime().get()); } if (_donorStateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { invariant(mtab->inStateAborted()); - // The migration durably aborted and is now marked as garbage collectable, - // remove its TenantMigrationDonorAccessBlocker right away to allow - // back-to-back migration retries. + // The migration durably aborted and is now marked as garbage + // collectable, remove its TenantMigrationDonorAccessBlocker right away + // to allow back-to-back migration retries. TenantMigrationAccessBlockerRegistry::get(_opCtx->getServiceContext()) .remove(tenantId, TenantMigrationAccessBlocker::BlockerType::kDonor); } @@ -305,7 +330,7 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator first, std::vector<InsertStatement>::const_iterator last, bool fromMigrate) { - if (nss != NamespaceString::kTenantSplitDonorsNamespace || + if (nss != NamespaceString::kShardSplitDonorsNamespace || tenant_migration_access_blocker::inRecoveryMode(opCtx)) { return; } @@ -313,45 +338,41 @@ void ShardSplitDonorOpObserver::onInserts(OperationContext* opCtx, for (auto it = first; it != last; it++) { auto donorStateDoc = parseAndValidateDonorDocument(it->doc); switch (donorStateDoc.getState()) { - case ShardSplitDonorStateEnum::kBlocking: - onBlockerInitialization(opCtx, donorStateDoc); + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + onTransitionToAbortingIndexBuilds(opCtx, donorStateDoc); break; case ShardSplitDonorStateEnum::kAborted: // If the operation starts aborted, do not do anything. break; - case ShardSplitDonorStateEnum::kUninitialized: - case ShardSplitDonorStateEnum::kCommitted: - uasserted(ErrorCodes::IllegalOperation, - "cannot insert a donor's state doc with 'state' other than 'kAborted' or " - "'kBlocking'"); - break; default: - MONGO_UNREACHABLE; + uasserted(ErrorCodes::IllegalOperation, + "Cannot insert donor's state document with state other than 'aborted' or " + "'aborting index builds'."); } } } void ShardSplitDonorOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - if (args.nss != NamespaceString::kTenantSplitDonorsNamespace || + if (args.nss != NamespaceString::kShardSplitDonorsNamespace || tenant_migration_access_blocker::inRecoveryMode(opCtx)) { return; } auto donorStateDoc = parseAndValidateDonorDocument(args.updateArgs->updatedDoc); switch (donorStateDoc.getState()) { + case ShardSplitDonorStateEnum::kBlocking: + onTransitionToBlocking(opCtx, donorStateDoc); + break; case ShardSplitDonorStateEnum::kCommitted: case ShardSplitDonorStateEnum::kAborted: opCtx->recoveryUnit()->registerChange( std::make_unique<TenantMigrationDonorCommitOrAbortHandler>(opCtx, donorStateDoc)); break; - case ShardSplitDonorStateEnum::kBlocking: - uasserted(ErrorCodes::IllegalOperation, - "The state document should be inserted as blocking and never transition to " - "blocking"); - break; default: - MONGO_UNREACHABLE; + uasserted(ErrorCodes::IllegalOperation, + "Cannot update donor's state document with state other than 'aborted', " + "'committed', or 'aborted'"); } } @@ -359,13 +380,12 @@ void ShardSplitDonorOpObserver::aboutToDelete(OperationContext* opCtx, NamespaceString const& nss, const UUID& uuid, BSONObj const& doc) { - if (nss != NamespaceString::kTenantSplitDonorsNamespace || + if (nss != NamespaceString::kShardSplitDonorsNamespace || tenant_migration_access_blocker::inRecoveryMode(opCtx)) { return; } auto donorStateDoc = parseAndValidateDonorDocument(doc); - uassert(ErrorCodes::IllegalOperation, str::stream() << "cannot delete a donor's state document " << doc << " since it has not been marked as garbage collectable and is not a" @@ -390,8 +410,7 @@ void ShardSplitDonorOpObserver::onDelete(OperationContext* opCtx, const UUID& uuid, StmtId stmtId, const OplogDeleteEntryArgs& args) { - if (nss != NamespaceString::kTenantSplitDonorsNamespace || - !tenantIdsToDeleteDecoration(opCtx) || + if (nss != NamespaceString::kShardSplitDonorsNamespace || !tenantIdsToDeleteDecoration(opCtx) || tenant_migration_access_blocker::inRecoveryMode(opCtx)) { return; } @@ -414,7 +433,7 @@ repl::OpTime ShardSplitDonorOpObserver::onDropCollection(OperationContext* opCtx const UUID& uuid, std::uint64_t numRecords, const CollectionDropType dropType) { - if (collectionName == NamespaceString::kTenantSplitDonorsNamespace) { + if (collectionName == NamespaceString::kShardSplitDonorsNamespace) { opCtx->recoveryUnit()->onCommit([opCtx](boost::optional<Timestamp>) { TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) .removeAll(TenantMigrationAccessBlocker::BlockerType::kDonor); diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp index c52868126e6..6f2e376de47 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/serverless/shard_split_donor_op_observer.h" #include "mongo/db/serverless/shard_split_state_machine_gen.h" #include "mongo/db/serverless/shard_split_test_utils.h" +#include "mongo/db/serverless/shard_split_utils.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/dbtests/mock/mock_replica_set.h" @@ -129,7 +130,8 @@ protected: std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> createBlockersAndStartBlockingWrites(const std::vector<std::string>& tenants, OperationContext* opCtx, - const std::string& connectionStr) { + const std::string& connectionStr, + bool isSecondary = false) { auto uuid = UUID::gen(); std::vector<std::shared_ptr<TenantMigrationDonorAccessBlocker>> blockers; for (const auto& tenant : tenants) { @@ -141,7 +143,10 @@ protected: _connectionStr); blockers.push_back(mtab); - mtab->startBlockingWrites(); + if (!isSecondary) { + mtab->startBlockingWrites(); + } + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenant, mtab); } @@ -160,7 +165,7 @@ protected: MockReplicaSet("donorSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); MockReplicaSet _recipientReplSet = MockReplicaSet("recipientSet", 3, true /* hasPrimary */, true /* dollarPrefixHosts */); - const NamespaceString _nss = NamespaceString::kTenantSplitDonorsNamespace; + const NamespaceString _nss = NamespaceString::kShardSplitDonorsNamespace; std::vector<std::string> _tenantIds = {"tenant1", "tenantAB"}; std::string _connectionStr = _replSet.getConnectionString(); UUID _uuid = UUID::gen(); @@ -253,7 +258,30 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertValidAbortedDocument) { } } -TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentPrimary) { +TEST_F(ShardSplitDonorOpObserverTest, InsertAbortingIndexDocumentPrimary) { + test::shard_split::reconfigToAddRecipientNodes( + getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts()); + + auto stateDocument = defaultStateDocument(); + stateDocument.setState(ShardSplitDonorStateEnum::kAbortingIndexBuilds); + stateDocument.setRecipientConnectionString(mongo::serverless::makeRecipientConnectionString( + repl::ReplicationCoordinator::get(_opCtx.get())->getConfig(), + _recipientTagName, + _recipientSetName)); + + auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { + ASSERT_TRUE(mtab); + // The OpObserver does not set the mtab to blocking for primaries. + ASSERT_OK(mtab->checkIfCanWrite(Timestamp(1, 1))); + ASSERT_OK(mtab->checkIfCanWrite(Timestamp(1, 3))); + ASSERT_OK(mtab->checkIfLinearizableReadWasAllowed(opCtx)); + ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); + }; + + runInsertTestCase(stateDocument, _tenantIds, mtabVerifier); +} + +TEST_F(ShardSplitDonorOpObserverTest, UpdateBlockingDocumentPrimary) { test::shard_split::reconfigToAddRecipientNodes( getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts()); @@ -274,15 +302,16 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentPrimary) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; - runInsertTestCase(stateDocument, _tenantIds, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier); } -TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentSecondary) { +TEST_F(ShardSplitDonorOpObserverTest, UpdateBlockingDocumentSecondary) { test::shard_split::reconfigToAddRecipientNodes( getServiceContext(), _recipientTagName, _replSet.getHosts(), _recipientReplSet.getHosts()); // This indicates the instance is secondary for the OpObserver. repl::UnreplicatedWritesBlock setSecondary(_opCtx.get()); + createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get(), _connectionStr, true); auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); @@ -299,18 +328,15 @@ TEST_F(ShardSplitDonorOpObserverTest, InsertBlockingDocumentSecondary) { ASSERT_EQ(mtab->checkIfCanBuildIndex().code(), ErrorCodes::TenantMigrationConflict); }; - runInsertTestCase(stateDocument, _tenantIds, mtabVerifier); + runUpdateTestCase(stateDocument, _tenantIds, mtabVerifier); } - -TEST_F(ShardSplitDonorOpObserverTest, TransitionToBlockingFail) { +TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbortingIndexBuildsFail) { // This indicates the instance is secondary for the OpObserver. repl::UnreplicatedWritesBlock setSecondary(_opCtx.get()); auto stateDocument = defaultStateDocument(); - stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); - stateDocument.setBlockTimestamp(Timestamp(1, 1)); - + stateDocument.setState(ShardSplitDonorStateEnum::kAbortingIndexBuilds); CollectionUpdateArgs updateArgs; updateArgs.stmtIds = {}; diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index e2a039fbab0..deb78f1779b 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -74,51 +74,10 @@ MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterReceivingAbortCmd); const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); -bool shouldStopInsertingDonorStateDoc(Status status) { - return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress; -} - -void setStateDocTimestamps(WithLock, - ShardSplitDonorStateEnum nextState, - repl::OpTime time, - ShardSplitDonorDocument& stateDoc) { - switch (nextState) { - case ShardSplitDonorStateEnum::kUninitialized: - break; - case ShardSplitDonorStateEnum::kBlocking: - stateDoc.setBlockTimestamp(time.getTimestamp()); - break; - case ShardSplitDonorStateEnum::kAborted: - stateDoc.setCommitOrAbortOpTime(time); - break; - case ShardSplitDonorStateEnum::kCommitted: - stateDoc.setCommitOrAbortOpTime(time); - break; - default: - MONGO_UNREACHABLE; - } -} - bool isAbortedDocumentPersistent(WithLock, ShardSplitDonorDocument& stateDoc) { return !!stateDoc.getAbortReason(); } -void setMtabToBlockingForTenants(ServiceContext* context, - OperationContext* opCtx, - const std::vector<StringData>& tenantIds) { - // Start blocking writes before getting an oplog slot to guarantee no - // writes to the tenant's data can commit with a timestamp after the - // block timestamp. - for (const auto& tenantId : tenantIds) { - auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(context, - tenantId); - invariant(mtab); - mtab->startBlockingWrites(); - - opCtx->recoveryUnit()->onRollback([mtab] { mtab->rollBackStartBlocking(); }); - } -} - void checkForTokenInterrupt(const CancellationToken& token) { uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); } @@ -423,19 +382,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( // Note we do not use the abort split token here because the abortShardSplit // command waits for a decision to be persisted which will not happen if // inserting the initial state document fails. - if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) { - pauseShardSplitBeforeBlockingState.pauseWhileSet(); - } - return _enterBlockingOrAbortedState(executor, primaryToken, abortToken); + return _enterAbortIndexBuildsOrAbortedState(executor, primaryToken, abortToken); + }) + .then([this, executor, abortToken] { + // Start tracking the abortToken for killing operation contexts + _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); + return _abortIndexBuildsAndEnterBlockingState(executor, abortToken); }) .then([this, executor, abortToken, criticalSectionTimer] { criticalSectionTimer->reset(); - checkForTokenInterrupt(abortToken); - _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); - _abortIndexBuilds(abortToken); - }) - .then([this, executor, abortToken] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get()); @@ -583,6 +539,143 @@ bool ShardSplitDonorService::DonorStateMachine::_hasInstalledSplitConfig(WithLoc config.getRecipientConfig()->getReplSetName() == *_stateDoc.getRecipientSetName(); } +ConnectionString ShardSplitDonorService::DonorStateMachine::_setupAcceptanceMonitoring( + WithLock lock, const CancellationToken& abortToken) { + auto recipientConnectionString = [stateDoc = _stateDoc]() { + if (stateDoc.getRecipientConnectionString()) { + return *stateDoc.getRecipientConnectionString(); + } + + auto recipientTagName = stateDoc.getRecipientTagName(); + invariant(recipientTagName); + auto recipientSetName = stateDoc.getRecipientSetName(); + invariant(recipientSetName); + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + return serverless::makeRecipientConnectionString( + config, *recipientTagName, *recipientSetName); + }(); + + // Always start the replica set monitor if we haven't reached a decision yet + _splitAcceptancePromise.setWith([&]() -> Future<void> { + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking || + MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { + return SemiFuture<void>::makeReady().unsafeToInlineFuture(); + } + + // Optionally select a task executor for unit testing + auto executor = _splitAcceptanceTaskExecutorForTest + ? *_splitAcceptanceTaskExecutorForTest + : _shardSplitService->getInstanceCleanupExecutor(); + + LOGV2(6142508, + "Monitoring recipient nodes for split acceptance.", + "id"_attr = _migrationId, + "recipientConnectionString"_attr = recipientConnectionString); + + return detail::makeRecipientAcceptSplitFuture( + executor, abortToken, recipientConnectionString, _migrationId) + .unsafeToInlineFuture(); + }); + + return recipientConnectionString; +} + +ExecutorFuture<void> +ShardSplitDonorService::DonorStateMachine::_enterAbortIndexBuildsOrAbortedState( + const ScopedTaskExecutorPtr& executor, + const CancellationToken& primaryToken, + const CancellationToken& abortToken) { + ShardSplitDonorStateEnum nextState; + { + stdx::lock_guard<Latch> lg(_mutex); + if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { + if (isAbortedDocumentPersistent(lg, _stateDoc)) { + // Node has step up and created an instance using a document in abort state. No + // need to write the document as it already exists. + return ExecutorFuture(**executor); + } + + _abortReason = + Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'."); + BSONObjBuilder bob; + _abortReason->serializeErrorToBSON(&bob); + _stateDoc.setAbortReason(bob.obj()); + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); + nextState = ShardSplitDonorStateEnum::kAborted; + + LOGV2(6670500, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId()); + } else { + // Always set up acceptance monitoring. + auto recipientConnectionString = _setupAcceptanceMonitoring(lg, abortToken); + + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) { + // Node has stepped up and resumed a shard split. No need to write the document as + // it already exists. + return ExecutorFuture(**executor); + } + + _stateDoc.setRecipientConnectionString(recipientConnectionString); + nextState = ShardSplitDonorStateEnum::kAbortingIndexBuilds; + + LOGV2( + 6670501, "Entering 'aborting index builds' state.", "id"_attr = _stateDoc.getId()); + } + } + + return _updateStateDocument(executor, primaryToken, nextState) + .then([this, executor, primaryToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken); + }) + .then([this, executor, nextState]() { + uassert(ErrorCodes::TenantMigrationAborted, + "Shard split operation aborted.", + nextState != ShardSplitDonorStateEnum::kAborted); + }); +} + +ExecutorFuture<void> +ShardSplitDonorService::DonorStateMachine::_abortIndexBuildsAndEnterBlockingState( + const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { + checkForTokenInterrupt(abortToken); + + boost::optional<std::vector<StringData>> tenantIds; + { + stdx::lock_guard<Latch> lg(_mutex); + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kAbortingIndexBuilds) { + return ExecutorFuture(**executor); + } + + tenantIds = _stateDoc.getTenantIds(); + invariant(tenantIds); + } + + LOGV2(6436100, "Aborting index builds for shard split.", "id"_attr = _migrationId); + + // Abort any in-progress index builds. No new index builds can start while we are doing this + // because the mtab prevents it. + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get()); + for (const auto& tenantId : *tenantIds) { + indexBuildsCoordinator->abortTenantIndexBuilds( + opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split"); + } + + if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) { + pauseShardSplitBeforeBlockingState.pauseWhileSet(); + } + + { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId()); + } + + return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kBlocking) + .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken); + }); +} + ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestamp( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { @@ -795,170 +888,80 @@ ShardSplitDonorService::DonorStateMachine::_triggerElectionAndEnterCommitedState }); } -ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState( - const ScopedTaskExecutorPtr& executor, - const CancellationToken& primaryToken, - const CancellationToken& abortToken) { - ShardSplitDonorStateEnum nextState; - { - stdx::lock_guard<Latch> lg(_mutex); - if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { - if (isAbortedDocumentPersistent(lg, _stateDoc)) { - // Node has step up and created an instance using a document in abort state. No - // need to write the document as it already exists. - return ExecutorFuture(**executor); - } - - _abortReason = - Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'."); - BSONObjBuilder bob; - _abortReason->serializeErrorToBSON(&bob); - _stateDoc.setAbortReason(bob.obj()); - _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + - Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); - nextState = ShardSplitDonorStateEnum::kAborted; - - LOGV2(8423355, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId()); - } else { - auto recipientConnectionString = [stateDoc = _stateDoc]() { - if (stateDoc.getRecipientConnectionString()) { - return *stateDoc.getRecipientConnectionString(); - } - - auto recipientTagName = stateDoc.getRecipientTagName(); - invariant(recipientTagName); - auto recipientSetName = stateDoc.getRecipientSetName(); - invariant(recipientSetName); - auto config = - repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); - return serverless::makeRecipientConnectionString( - config, *recipientTagName, *recipientSetName); - }(); - - // Always start the replica set monitor if we haven't reached a decision yet - _splitAcceptancePromise.setWith([&]() -> Future<void> { - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking || - MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { - return SemiFuture<void>::makeReady().unsafeToInlineFuture(); - } - - // Optionally select a task executor for unit testing - auto executor = _splitAcceptanceTaskExecutorForTest - ? *_splitAcceptanceTaskExecutorForTest - : _shardSplitService->getInstanceCleanupExecutor(); - - LOGV2(6142508, - "Monitoring recipient nodes for split acceptance.", - "id"_attr = _migrationId, - "recipientConnectionString"_attr = recipientConnectionString); - - return detail::makeRecipientAcceptSplitFuture( - executor, abortToken, recipientConnectionString, _migrationId) - .unsafeToInlineFuture(); - }); - - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) { - // Node has step up and resumed a shard split. No need to write the document as - // it already exists. - return ExecutorFuture(**executor); - } - - // Otherwise, record the recipient connection string - _stateDoc.setRecipientConnectionString(recipientConnectionString); - _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking); - nextState = ShardSplitDonorStateEnum::kBlocking; - - LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId()); - } - } - - return AsyncTry([this, nextState, uuid = _migrationId]() { - auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto opCtx = opCtxHolder.get(); - - AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); - - writeConflictRetry( - opCtx, "ShardSplitDonorInsertStateDoc", _stateDocumentsNS.ns(), [&] { - const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid); - const auto getUpdatedStateDocBson = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - return _stateDoc.toBSON(); - }; - - WriteUnitOfWork wuow(opCtx); - if (nextState == ShardSplitDonorStateEnum::kBlocking) { - stdx::lock_guard<Latch> lg(_mutex); - - insertTenantAccessBlocker(lg, opCtx, _stateDoc); - - auto tenantIds = _stateDoc.getTenantIds(); - invariant(tenantIds); - setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get()); - } - - // Reserve an opTime for the write. - auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - setStateDocTimestamps( - stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc); - - auto updateResult = Helpers::upsert(opCtx, - _stateDocumentsNS.ns(), - filter, - getUpdatedStateDocBson(), - /*fromMigrate=*/false); - - - // We only want to insert, not modify, document - invariant(updateResult.numMatched == 0); - wuow.commit(); - }); - - return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - }) - .until([](StatusWith<repl::OpTime> swOpTime) { - return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); - }) - .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, primaryToken) - .then([this, executor, primaryToken](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken); - }) - .then([this, executor, nextState]() { - uassert(ErrorCodes::TenantMigrationAborted, - "Shard split operation aborted.", - nextState != ShardSplitDonorStateEnum::kAborted); - }); -} - ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateStateDocument( const ScopedTaskExecutorPtr& executor, const CancellationToken& token, ShardSplitDonorStateEnum nextState) { - auto tenantIds = [&]() { + auto [tenantIds, isInsert] = [&]() { stdx::lock_guard<Latch> lg(_mutex); - _stateDoc.setState(nextState); - - return _stateDoc.getTenantIds(); + auto isInsert = _stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized || + _stateDoc.getState() == ShardSplitDonorStateEnum::kAborted; + return std::make_pair(_stateDoc.getTenantIds(), isInsert); }(); - return AsyncTry([this, tenantIds = std::move(tenantIds), uuid = _migrationId, nextState] { + return AsyncTry([this, + tenantIds = std::move(tenantIds), + isInsert = isInsert, + uuid = _migrationId, + nextState] { auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto opCtx = opCtxHolder.get(); AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << _stateDocumentsNS.ns() << " does not exist", - collection); + + if (!isInsert) { + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << _stateDocumentsNS.ns() << " does not exist", + collection); + } writeConflictRetry( - opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] { + opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&]() { WriteUnitOfWork wuow(opCtx); + if (nextState == ShardSplitDonorStateEnum::kBlocking) { + // Start blocking writes before getting an oplog slot to guarantee no + // writes to the tenant's data can commit with a timestamp after the + // block timestamp. + for (const auto& tenantId : *tenantIds) { + auto mtab = tenant_migration_access_blocker:: + getTenantMigrationDonorAccessBlocker(_serviceContext, tenantId); + invariant(mtab); + mtab->startBlockingWrites(); + + opCtx->recoveryUnit()->onRollback( + [mtab] { mtab->rollBackStartBlocking(); }); + } + } + // Reserve an opTime for the write. auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - setStateDocTimestamps( - stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc); + { + stdx::lock_guard<Latch> lg(_mutex); + _stateDoc.setState(nextState); + switch (nextState) { + case ShardSplitDonorStateEnum::kUninitialized: + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + break; + case ShardSplitDonorStateEnum::kBlocking: + _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp()); + break; + case ShardSplitDonorStateEnum::kCommitted: + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + break; + case ShardSplitDonorStateEnum::kAborted: { + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + + invariant(_abortReason); + BSONObjBuilder bob; + _abortReason.get().serializeErrorToBSON(&bob); + _stateDoc.setAbortReason(bob.obj()); + break; + } + default: + MONGO_UNREACHABLE; + } + } const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid); const auto updatedStateDocBson = [&]() { @@ -971,15 +974,19 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS updatedStateDocBson, /*fromMigrate=*/false); - invariant(updateResult.numDocsModified == 1); + if (isInsert) { + invariant(!updateResult.existing); + invariant(!updateResult.upsertedId.isEmpty()); + } else { + invariant(updateResult.numDocsModified == 1); + } + wuow.commit(); }); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([](StatusWith<repl::OpTime> swOpTime) { - return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); - }) + .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, token); } @@ -1177,30 +1184,4 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientS .on(**executor, primaryToken) .ignoreValue(); } - -void ShardSplitDonorService::DonorStateMachine::_abortIndexBuilds( - const CancellationToken& abortToken) { - checkForTokenInterrupt(abortToken); - - boost::optional<std::vector<StringData>> tenantIds; - { - stdx::lock_guard<Latch> lg(_mutex); - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) { - return; - } - tenantIds = _stateDoc.getTenantIds(); - invariant(tenantIds); - } - - LOGV2(6436100, "Aborting index build for shard split.", "id"_attr = _migrationId); - - // Before applying the split config, abort any in-progress index builds. No new index builds - // can start while we are doing this because the mtab prevents it. - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get()); - for (const auto& tenantId : *tenantIds) { - indexBuildsCoordinator->abortTenantIndexBuilds( - opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split"); - } -} } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index 40ed22f96a5..bf1548527dc 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -56,7 +56,7 @@ public: } NamespaceString getStateDocumentsNS() const override { - return NamespaceString::kTenantSplitDonorsNamespace; + return NamespaceString::kShardSplitDonorsNamespace; } ThreadPool::Limits getThreadPoolLimits() const override; @@ -156,9 +156,12 @@ public: private: // Tasks - ExecutorFuture<void> _enterBlockingOrAbortedState(const ScopedTaskExecutorPtr& executor, - const CancellationToken& primaryToken, - const CancellationToken& abortToken); + ExecutorFuture<void> _enterAbortIndexBuildsOrAbortedState(const ScopedTaskExecutorPtr& executor, + const CancellationToken& primaryToken, + const CancellationToken& abortToken); + + ExecutorFuture<void> _abortIndexBuildsAndEnterBlockingState( + const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); @@ -195,7 +198,7 @@ private: void _initiateTimeout(const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); - + ConnectionString _setupAcceptanceMonitoring(WithLock lock, const CancellationToken& abortToken); bool _hasInstalledSplitConfig(WithLock lock); /* @@ -205,10 +208,8 @@ private: ExecutorFuture<void> _cleanRecipientStateDoc(const ScopedTaskExecutorPtr& executor, const CancellationToken& token); - void _abortIndexBuilds(const CancellationToken& abortToken); - private: - const NamespaceString _stateDocumentsNS = NamespaceString::kTenantSplitDonorsNamespace; + const NamespaceString _stateDocumentsNS = NamespaceString::kShardSplitDonorsNamespace; mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardSplitDonorService::_mutex"); const UUID _migrationId; diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index f061e686c13..5824029d097 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -90,11 +90,11 @@ StatusWith<ShardSplitDonorDocument> getStateDocument(OperationContext* opCtx, const UUID& shardSplitId) { // Use kLastApplied so that we can read the state document as a secondary. ReadSourceScope readSourceScope(opCtx, RecoveryUnit::ReadSource::kLastApplied); - AutoGetCollectionForRead collection(opCtx, NamespaceString::kTenantSplitDonorsNamespace); + AutoGetCollectionForRead collection(opCtx, NamespaceString::kShardSplitDonorsNamespace); if (!collection) { return Status(ErrorCodes::NamespaceNotFound, str::stream() << "Collection not found looking for state document: " - << NamespaceString::kTenantSplitDonorsNamespace.ns()); + << NamespaceString::kShardSplitDonorsNamespace.ns()); } BSONObj result; @@ -192,6 +192,9 @@ std::ostringstream& operator<<(std::ostringstream& builder, case mongo::ShardSplitDonorStateEnum::kUninitialized: builder << "kUninitialized"; break; + case mongo::ShardSplitDonorStateEnum::kAbortingIndexBuilds: + builder << "kAbortingIndexBuilds"; + break; case mongo::ShardSplitDonorStateEnum::kAborted: builder << "kAborted"; break; @@ -348,8 +351,7 @@ public: // The database needs to be open before using shard split donor service. { auto opCtx = cc().makeOperationContext(); - AutoGetDb autoDb( - opCtx.get(), NamespaceString::kTenantSplitDonorsNamespace.db(), MODE_X); + AutoGetDb autoDb(opCtx.get(), NamespaceString::kShardSplitDonorsNamespace.db(), MODE_X); auto db = autoDb.ensureDbExists(opCtx.get()); ASSERT_TRUE(db); } @@ -484,18 +486,14 @@ TEST_F(ShardSplitDonorServiceTest, BasicShardSplitDonorServiceInstanceCreation) ASSERT_EQ(_uuid, serviceInstance->getId()); waitForMonitorAndProcessHello(); - waitForReplSetStepUp(Status(ErrorCodes::OK, "")); auto result = serviceInstance->decisionFuture().get(); - ASSERT_TRUE(hasActiveSplitForTenants(opCtx.get(), _tenantIds)); - ASSERT(!result.abortReason); ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kCommitted); serviceInstance->tryForget(); - auto completionFuture = serviceInstance->completionFuture(); completionFuture.wait(); @@ -692,7 +690,7 @@ TEST_F(ShardSplitDonorServiceTest, ReconfigToRemoveSplitConfig) { } // Abort scenario : abortSplit called before startSplit. -TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortState) { +TEST_F(ShardSplitDonorServiceTest, CreateInstanceInAbortedState) { auto opCtx = makeOperationContext(); auto serviceContext = getServiceContext(); @@ -1067,4 +1065,49 @@ TEST_F(ShardSplitRecipientCleanupTest, ShardSplitRecipientCleanup) { ErrorCodes::NoMatchingDocument); } +class ShardSplitAbortedStepUpTest : public ShardSplitPersistenceTest { +public: + repl::ReplSetConfig initialDonorConfig() override { + BSONArrayBuilder members; + members.append(BSON("_id" << 1 << "host" + << "node1")); + + return repl::ReplSetConfig::parse(BSON("_id" + << "donorSetName" + << "version" << 1 << "protocolVersion" << 1 + << "members" << members.arr())); + } + + ShardSplitDonorDocument initialStateDocument() override { + + auto stateDocument = defaultStateDocument(); + + stateDocument.setState(mongo::ShardSplitDonorStateEnum::kAborted); + stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1)); + + Status status(ErrorCodes::InternalError, abortReason); + BSONObjBuilder bob; + status.serializeErrorToBSON(&bob); + stateDocument.setAbortReason(bob.obj()); + + return stateDocument; + } + + std::string abortReason{"Testing simulated error"}; +}; + +TEST_F(ShardSplitAbortedStepUpTest, ShardSplitAbortedStepUp) { + auto opCtx = makeOperationContext(); + auto splitService = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext()) + ->lookupServiceByName(ShardSplitDonorService::kServiceName); + auto optionalDonor = ShardSplitDonorService::DonorStateMachine::lookup( + opCtx.get(), splitService, BSON("_id" << _uuid)); + + ASSERT(optionalDonor); + auto result = optionalDonor->get()->decisionFuture().get(); + + ASSERT_EQ(result.state, mongo::ShardSplitDonorStateEnum::kAborted); +} + } // namespace mongo diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl index 8aa65017c1b..ee3462f5a05 100644 --- a/src/mongo/db/serverless/shard_split_state_machine.idl +++ b/src/mongo/db/serverless/shard_split_state_machine.idl @@ -40,6 +40,7 @@ enums: type: string values: kUninitialized: "uninitialized" + kAbortingIndexBuilds: "aborting index builds" kBlocking: "blocking" kCommitted: "committed" kAborted: "aborted" diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp index b9bb407220d..041c133b02b 100644 --- a/src/mongo/db/serverless/shard_split_utils.cpp +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -149,7 +149,7 @@ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, } Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) { - const auto nss = NamespaceString::kTenantSplitDonorsNamespace; + const auto nss = NamespaceString::kShardSplitDonorsNamespace; AutoGetCollection collection(opCtx, nss, MODE_IX); uassert(ErrorCodes::PrimarySteppedDown, @@ -176,7 +176,7 @@ Status insertStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st } Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& stateDoc) { - const auto nss = NamespaceString::kTenantSplitDonorsNamespace; + const auto nss = NamespaceString::kShardSplitDonorsNamespace; AutoGetCollection collection(opCtx, nss, MODE_IX); if (!collection) { @@ -198,7 +198,7 @@ Status updateStateDoc(OperationContext* opCtx, const ShardSplitDonorDocument& st } StatusWith<bool> deleteStateDoc(OperationContext* opCtx, const UUID& shardSplitId) { - const auto nss = NamespaceString::kTenantSplitDonorsNamespace; + const auto nss = NamespaceString::kShardSplitDonorsNamespace; AutoGetCollection collection(opCtx, nss, MODE_IX); if (!collection) { diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h index b58f24b5a1a..2d9ab8402e7 100644 --- a/src/mongo/db/serverless/shard_split_utils.h +++ b/src/mongo/db/serverless/shard_split_utils.h @@ -64,7 +64,7 @@ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, /** * Inserts the shard split state document 'stateDoc' into - * 'config.tenantSplitDonors' collection. Also, creates the collection if not present + * 'config.shardSplitDonors' collection. Also, creates the collection if not present * before inserting the document. * * NOTE: A state doc might get inserted based on a decision made out of a stale read within a |