diff options
author | Matt Broadstone <mbroadst@mongodb.com> | 2022-08-25 14:12:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-25 15:29:42 +0000 |
commit | cb38f4cfafa311b6897cd2f36b64101fcd020e5c (patch) | |
tree | a6294bdc2cdf79d9fe2a8cc5fed993c14f9f3293 | |
parent | 3ae43985776cf6de9cff18775e4484b9c6904e3e (diff) | |
download | mongo-cb38f4cfafa311b6897cd2f36b64101fcd020e5c.tar.gz |
SERVER-68964 Reset lastCommitted to blockTimestamp after split
34 files changed, 197 insertions, 202 deletions
diff --git a/jstests/serverless/libs/basic_serverless_test.js b/jstests/serverless/libs/basic_serverless_test.js index 604d1e3e9d8..eeb7f5630f1 100644 --- a/jstests/serverless/libs/basic_serverless_test.js +++ b/jstests/serverless/libs/basic_serverless_test.js @@ -425,7 +425,7 @@ class BasicServerlessTest { /* * Look up tenant access blockers for the given tenant ids and will check, based upon the * expected state the access blockers are expected to be, that the different fields are - * properly set such as `blockTimestamp`, `abortOpTime` or `commitOpTime`. + * properly set such as `blockOpTime`, `abortOpTime` or `commitOpTime`. * @param {migrationId} the current shard split id. * @param {tenantIds} tenant ids of the shard split. * @param {expectedState} expected state the tenant access blocker to be in. @@ -438,9 +438,8 @@ class BasicServerlessTest { BasicServerlessTest.getTenantMigrationAccessBlocker({node: donorPrimary, tenantId}) .donor; const tenantAccessBlockersBlockRW = donorMtab.state == expectedState; - const tenantAccessBlockersBlockTimestamp = - bsonWoCompare(donorMtab.blockTimestamp, stateDoc.blockTimestamp) == 0; + bsonWoCompare(donorMtab.blockTimestamp, stateDoc.blockOpTime.ts) == 0; let tenantAccessBlockersAbortTimestamp = true; if (donorMtab.state > TenantMigrationTest.DonorAccessState.kBlockWritesAndReads) { @@ -566,6 +565,16 @@ class BasicServerlessTest { const donorRst = createRst(donorRstArgs, true); return donorRst.getPrimary(); } + + /** + * @returns A new ReplSetTest fixture representing the recipient set. + */ + getRecipient() { + const recipientRstArgs = createRstArgs(this.donor); + recipientRstArgs.nodeHosts = this.recipientNodes.map(node => node.host); + assert(recipientRstArgs.nodeHosts.length >= 3); + return createRst(recipientRstArgs, true); + } } BasicServerlessTest.kConfigSplitDonorsNS = "config.shardSplitDonors"; @@ -594,10 +603,10 @@ function assertMigrationState(primary, migrationId, state) { print(tojson(migrationDoc)); } - // If transitioning to "blocking", prove that we wrote that fact at the blockTimestamp. + // If transitioning to "blocking", prove that we wrote that fact at the blockOpTime. if (state === "blocking") { const oplogEntry = - primary.getDB("local").oplog.rs.find({ts: migrationDoc.blockTimestamp}).next(); + primary.getDB("local").oplog.rs.find({ts: migrationDoc.blockOpTime.ts}).next(); assert.neq(null, oplogEntry.o, oplogEntry); assert.neq(null, oplogEntry.o.state, oplogEntry); assert.eq(oplogEntry.o.state, state, oplogEntry); diff --git a/jstests/serverless/shard_split_apply_splitconfig.js b/jstests/serverless/shard_split_apply_splitconfig.js deleted file mode 100644 index 5527fb61321..00000000000 --- a/jstests/serverless/shard_split_apply_splitconfig.js +++ /dev/null @@ -1,70 +0,0 @@ -load("jstests/libs/fail_point_util.js"); // for "configureFailPoint" -load('jstests/libs/parallel_shell_helpers.js'); // for "startParallelShell" -load("jstests/serverless/libs/basic_serverless_test.js"); - -function populateRecipientMembers(splitConfig) { - return splitConfig.members.filter(m => m._id >= 3).map((member, idx) => { - member._id = idx; - member.votes = 1; - member.priority = 1; - member.hidden = false; - return member; - }); -} - -function runReconfigToSplitConfig() { - "use strict"; - - const kRecipientSetName = "receiveSet"; - - jsTestLog("Starting serverless"); - const test = - new BasicServerlessTest({recipientTagName: "recipientNode", recipientSetName: "recipient"}); - - jsTestLog("Adding recipient nodes"); - test.addRecipientNodes(); - - test.donor.awaitSecondaryNodes(); - - jsTestLog("Reconfigure the donor to apply a `splitConfig`"); - const config = test.donor.getReplSetConfigFromNode(); - const splitConfig = Object.extend({}, config, /* deepCopy */ true); - splitConfig._id = kRecipientSetName; - splitConfig.version++; - splitConfig.members = populateRecipientMembers(splitConfig); - - // TODO: possible future validation in replSetReconfig command? - delete splitConfig.settings.replicaSetId; - - const configWithSplitConfig = Object.extend({}, config, /* deepCopy */ true); - configWithSplitConfig.version++; - configWithSplitConfig.recipientConfig = splitConfig; - configWithSplitConfig.members = configWithSplitConfig.members.filter(m => m._id < 3); - - jsTestLog("Applying the split config, and waiting for it to propagate to recipient"); - const admin = test.donor.getPrimary().getDB("admin"); - assert.commandWorked(admin.runCommand({replSetReconfig: configWithSplitConfig})); - assert.soon(() => { - const recipientNode = test.recipientNodes[0]; - const status = - assert.commandWorked(recipientNode.getDB('admin').runCommand({replSetGetStatus: 1})); - return status.set === kRecipientSetName; - }, "waiting for split config to take", 30000, 2000); - - jsTestLog("Confirming we can write to recipient"); - let recipientPrimary = undefined; - assert.soon(function() { - recipientPrimary = test.recipientNodes.find(node => { - const n = node.adminCommand('hello'); - return n.isWritablePrimary || n.ismaster; - }); - return recipientPrimary != undefined; - }, "waiting for primary to be available", 30000, 1000); - - assert(recipientPrimary); - assert.commandWorked(recipientPrimary.getDB('foo').bar.insert({fake: 'document'})); - - test.stop(); -} - -runReconfigToSplitConfig(); diff --git a/jstests/serverless/shard_split_basic_test.js b/jstests/serverless/shard_split_basic_test.js index cfcd26c4c23..f79fbaad229 100644 --- a/jstests/serverless/shard_split_basic_test.js +++ b/jstests/serverless/shard_split_basic_test.js @@ -22,9 +22,7 @@ test.donor.awaitSecondaryNodes(); const donorPrimary = test.getDonorPrimary(); const operation = test.createSplitOperation(tenantIds); assert.commandWorked(operation.commit()); - assertMigrationState(donorPrimary, operation.migrationId, "committed"); - operation.forget(); const status = donorPrimary.adminCommand({serverStatus: 1}); @@ -33,6 +31,10 @@ assert.eq(status.shardSplits.totalAborted, 0); assert.gt(status.shardSplits.totalCommittedDurationMillis, 0); assert.gt(status.shardSplits.totalCommittedDurationWithoutCatchupMillis, 0); +const recipientPrimary = test.getRecipient().getPrimary(); +const recipientConfig = recipientPrimary.adminCommand({replSetGetConfig: 1}).config; +assert(!recipientConfig.settings.shardSplitBlockOpTime); + test.cleanupSuccesfulCommitted(operation.migrationId, tenantIds); test.stop(); })(); diff --git a/jstests/serverless/shard_split_concurrent_reads_on_donor_aborted.js b/jstests/serverless/shard_split_concurrent_reads_on_donor_aborted.js index 1c5af907e79..9691a191426 100644 --- a/jstests/serverless/shard_split_concurrent_reads_on_donor_aborted.js +++ b/jstests/serverless/shard_split_concurrent_reads_on_donor_aborted.js @@ -1,6 +1,6 @@ /** * Tests that the donor - * - does not rejects reads with atClusterTime/afterClusterTime >= blockTimestamp reads and + * - does not rejects reads with atClusterTime/afterClusterTime >= blockOpTime reads and * linearizable reads after the split aborts. * * @tags: [ @@ -29,7 +29,7 @@ const kTenantDefinedDbName = "0"; /** * Tests that after the split abort, the donor does not reject linearizable reads or reads with - * atClusterTime/afterClusterTime >= blockTimestamp. + * atClusterTime/afterClusterTime >= blockOpTime. */ function testDoNotRejectReadsAfterMigrationAborted(testCase, dbName, collName) { const tenantId = dbName.split('_')[0]; @@ -39,7 +39,7 @@ function testDoNotRejectReadsAfterMigrationAborted(testCase, dbName, collName) { const db = node.getDB(dbName); if (testCase.requiresReadTimestamp) { runCommandForConcurrentReadTest(db, - testCase.command(collName, donorDoc.blockTimestamp), + testCase.command(collName, donorDoc.blockOpTime.ts), null, testCase.isTransaction); runCommandForConcurrentReadTest( diff --git a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking.js b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking.js index d3bb1d3e85b..12d15fce0f1 100644 --- a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking.js +++ b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking.js @@ -1,6 +1,6 @@ /** * Tests that the donor - * - blocks reads with atClusterTime/afterClusterTime >= blockTimestamp that are executed while the + * - blocks reads with atClusterTime/afterClusterTime >= blockOpTime that are executed while the * split is in the blocking state but does not block linearizable reads. * * @tags: [ @@ -31,14 +31,14 @@ const kMaxTimeMS = 1 * 1000; /** * Tests that in the blocking state, the donor blocks reads with atClusterTime/afterClusterTime >= - * blockTimestamp but does not block linearizable reads. + * blockOpTime but does not block linearizable reads. */ let countBlockedReadsPrimary = 0; let countBlockedReadsSecondaries = 0; function testBlockReadsAfterMigrationEnteredBlocking(testCase, primary, dbName, collName) { const donorDoc = findSplitOperation(primary, operation.migrationId); const command = testCase.requiresReadTimestamp - ? testCase.command(collName, donorDoc.blockTimestamp) + ? testCase.command(collName, donorDoc.blockOpTime.ts) : testCase.command(collName); const shouldBlock = !testCase.isLinearizableRead; if (shouldBlock) { @@ -83,7 +83,7 @@ blockingFp.wait(); // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all secondaries to ensure that snapshot reads on the secondaries with -// unspecified atClusterTime have read timestamp >= blockTimestamp. +// unspecified atClusterTime have read timestamp >= blockOpTime. donorRst.awaitLastOpCommitted(); for (const [testCaseName, testCase] of Object.entries(testCases)) { diff --git a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js index bb76d0aa4aa..aca63f2daa2 100644 --- a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js +++ b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_aborted.js @@ -1,8 +1,8 @@ /** * Tests that the donor - * - blocks reads with atClusterTime/afterClusterTime >= blockTimestamp that are executed while the + * - blocks reads with atClusterTime/afterClusterTime >= blockOpTime that are executed while the * split is in the blocking state but does not block linearizable reads. - * - does not reject reads with atClusterTime/afterClusterTime >= blockTimestamp and linearizable + * - does not reject reads with atClusterTime/afterClusterTime >= blockOpTime and linearizable * reads after the split aborts. * * @tags: [ @@ -86,12 +86,12 @@ function testUnblockBlockedReadsAfterMigrationAborted(testCase, dbName, collName // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all secondaries to ensure that snapshot reads on the secondaries with - // unspecified atClusterTime have read timestamp >= blockTimestamp. + // unspecified atClusterTime have read timestamp >= blockOpTime. donorRst.awaitLastOpCommitted(); const donorDoc = findSplitOperation(donorPrimary, operation.migrationId); const command = testCase.requiresReadTimestamp - ? testCase.command(collName, donorDoc.blockTimestamp) + ? testCase.command(collName, donorDoc.blockOpTime.ts) : testCase.command(collName); // The split should unpause and abort after the read is blocked. Verify that the read diff --git a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_committed.js b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_committed.js index 6505439c7cb..22753bb01a6 100644 --- a/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_committed.js +++ b/jstests/serverless/shard_split_concurrent_reads_on_donor_blocking_then_committed.js @@ -1,6 +1,6 @@ /** * Tests that the donor - * - rejects reads with atClusterTime/afterClusterTime >= blockTimestamp reads and linearizable + * - rejects reads with atClusterTime/afterClusterTime >= blockOpTime reads and linearizable * reads after the split commits. * * @tags: [ @@ -82,12 +82,12 @@ function testRejectBlockedReadsAfterMigrationCommitted(testCase, dbName, collNam // Wait for the last oplog entry on the primary to be visible in the committed snapshot view of // the oplog on all secondaries to ensure that snapshot reads on the secondaries with - // unspecified atClusterTime have read timestamp >= blockTimestamp. + // unspecified atClusterTime have read timestamp >= blockOpTime. donorRst.awaitLastOpCommitted(); const donorDoc = findSplitOperation(donorPrimary, operation.migrationId); const command = testCase.requiresReadTimestamp - ? testCase.command(collName, donorDoc.blockTimestamp) + ? testCase.command(collName, donorDoc.blockOpTime.ts) : testCase.command(collName); // The split should unpause and commit after the read is blocked. Verify that the read diff --git a/jstests/serverless/shard_split_concurrent_reads_on_donor_committed.js b/jstests/serverless/shard_split_concurrent_reads_on_donor_committed.js index 16e16799fe6..fa35f54f169 100644 --- a/jstests/serverless/shard_split_concurrent_reads_on_donor_committed.js +++ b/jstests/serverless/shard_split_concurrent_reads_on_donor_committed.js @@ -1,6 +1,6 @@ /** * Tests that the donor - * - rejects reads with atClusterTime/afterClusterTime >= blockTimestamp reads and linearizable + * - rejects reads with atClusterTime/afterClusterTime >= blockOpTime reads and linearizable * reads after the split commits. * * @tags: [ @@ -29,7 +29,7 @@ const kTenantDefinedDbName = "0"; /** * Tests that after the split commits, the donor rejects linearizable reads and reads with - * atClusterTime/afterClusterTime >= blockTimestamp. + * atClusterTime/afterClusterTime >= blockOpTime. */ let countTenantMigrationCommittedErrorsPrimary = 0; let countTenantMigrationCommittedErrorsSecondaries = 0; @@ -57,7 +57,7 @@ function testRejectReadsAfterMigrationCommitted(testCase, primary, dbName, collN const db = node.getDB(dbName); if (testCase.requiresReadTimestamp) { runCommandForConcurrentReadTest(db, - testCase.command(collName, donorDoc.blockTimestamp), + testCase.command(collName, donorDoc.blockOpTime.ts), ErrorCodes.TenantMigrationCommitted, testCase.isTransaction); runCommandForConcurrentReadTest( diff --git a/jstests/serverless/shard_split_donor_current_op.js b/jstests/serverless/shard_split_donor_current_op.js index b9481cd8ea3..6513f7ca9c3 100644 --- a/jstests/serverless/shard_split_donor_current_op.js +++ b/jstests/serverless/shard_split_donor_current_op.js @@ -51,11 +51,12 @@ function checkStandardFieldsOK(ops, {migrationId, reachedDecision, tenantIds}) { const res = assert.commandWorked( donorPrimary.adminCommand({currentOp: true, desc: "shard split operation"})); + print(`CURR_OP: ${tojson(res)}`); checkStandardFieldsOK( res.inprog, {migrationId: operation.migrationId, reachedDecision: false, tenantIds: kTenantIds}); - assert(!res.inprog[0].blockTimestamp); + assert(!res.inprog[0].blockOpTime); fp.off(); @@ -88,7 +89,7 @@ function checkStandardFieldsOK(ops, {migrationId, reachedDecision, tenantIds}) { checkStandardFieldsOK( res.inprog, {migrationId: operation.migrationId, reachedDecision: false, tenantIds: kTenantIds}); - assert(res.inprog[0].blockTimestamp instanceof Timestamp); + assert(res.inprog[0].blockOpTime.ts instanceof Timestamp); fp.off(); @@ -140,7 +141,7 @@ function checkStandardFieldsOK(ops, {migrationId, reachedDecision, tenantIds}) { checkStandardFieldsOK( res.inprog, {migrationId: operation.migrationId, reachedDecision: true, tenantIds: kTenantIds}); - assert(res.inprog[0].blockTimestamp instanceof Timestamp); + assert(res.inprog[0].blockOpTime.ts instanceof Timestamp); assert(res.inprog[0].commitOrAbortOpTime.ts instanceof Timestamp); assert(res.inprog[0].commitOrAbortOpTime.t instanceof NumberLong); assert(!res.inprog[0].expireAt); @@ -159,7 +160,7 @@ function checkStandardFieldsOK(ops, {migrationId, reachedDecision, tenantIds}) { checkStandardFieldsOK( res.inprog, {migrationId: operation.migrationId, reachedDecision: true, tenantIds: kTenantIds}); - assert(res.inprog[0].blockTimestamp instanceof Timestamp); + assert(res.inprog[0].blockOpTime.ts instanceof Timestamp); assert(res.inprog[0].commitOrAbortOpTime.ts instanceof Timestamp); assert(res.inprog[0].commitOrAbortOpTime.t instanceof NumberLong); assert(res.inprog[0].expireAt instanceof Date); diff --git a/jstests/serverless/shard_split_shutdown_while_blocking_reads.js b/jstests/serverless/shard_split_shutdown_while_blocking_reads.js index bca8e73369a..451cd18d1ce 100644 --- a/jstests/serverless/shard_split_shutdown_while_blocking_reads.js +++ b/jstests/serverless/shard_split_shutdown_while_blocking_reads.js @@ -54,7 +54,7 @@ let readThread = new Thread((host, dbName, collName, afterClusterTime) => { readConcern: {afterClusterTime: Timestamp(afterClusterTime.t, afterClusterTime.i)} }); assert.commandFailedWithCode(res, ErrorCodes.InterruptedAtShutdown); -}, donorPrimary.host, kDbName, kCollName, donorDoc.blockTimestamp); +}, donorPrimary.host, kDbName, kCollName, donorDoc.blockOpTime.ts); readThread.start(); // Shut down the donor after the read starts blocking. diff --git a/jstests/serverless/shard_split_startup_recovery_aborted.js b/jstests/serverless/shard_split_startup_recovery_aborted.js index 9d85d45aa38..628cedc3e84 100644 --- a/jstests/serverless/shard_split_startup_recovery_aborted.js +++ b/jstests/serverless/shard_split_startup_recovery_aborted.js @@ -1,7 +1,7 @@ /** * Commits a shard split and abort it due to timeout prior to marking it for garbage collection and * checks that we recover the tenant access blockers since the split is aborted but not marked as - * garbage collectable. Checks that `abortOpTime` and `blockTimestamp` are set. + * garbage collectable. Checks that `abortOpTime` and `blockOpTime` are set. * @tags: [requires_fcv_52, featureFlagShardSplit] */ diff --git a/jstests/serverless/shard_split_startup_recovery_blocking.js b/jstests/serverless/shard_split_startup_recovery_blocking.js index a6914fd7be9..a7efd10fdbc 100644 --- a/jstests/serverless/shard_split_startup_recovery_blocking.js +++ b/jstests/serverless/shard_split_startup_recovery_blocking.js @@ -1,6 +1,6 @@ /** * Commits a shard split and shuts down while being in a blocking state. Tests that we recover the - * tenant access blockers in blocking state with `blockTimestamp` set. + * tenant access blockers in blocking state with `blockOpTime` set. * @tags: [requires_fcv_52, featureFlagShardSplit] */ diff --git a/jstests/serverless/shard_split_startup_recovery_committed.js b/jstests/serverless/shard_split_startup_recovery_committed.js index 231e34bf5c9..f55dedf091b 100644 --- a/jstests/serverless/shard_split_startup_recovery_committed.js +++ b/jstests/serverless/shard_split_startup_recovery_committed.js @@ -1,6 +1,6 @@ /** * Commits a shard split and shut down prior to marking the state document as garbage collectable. - * Checks that we recover the tenant access blockers with `commitOpTime` and `blockTimestamp` set. + * Checks that we recover the tenant access blockers with `commitOpTime` and `blockOpTime` set. * @tags: [requires_fcv_52, featureFlagShardSplit] */ diff --git a/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js b/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js index bf5dd5be8f6..9c04e042c61 100644 --- a/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js +++ b/jstests/serverless/shard_split_unblock_reads_and_writes_on_completion.js @@ -81,7 +81,7 @@ const kCollName = "testColl"; const laggedSecondary = test.donor.getSecondary(); const donorDoc = donorsColl.findOne({_id: operation.migrationId}); assert.neq(null, donorDoc); - const readThread = startReadThread(laggedSecondary, dbName, kCollName, donorDoc.blockTimestamp); + const readThread = startReadThread(laggedSecondary, dbName, kCollName, donorDoc.blockOpTime.ts); assert.soon(() => BasicServerlessTest.getNumBlockedReads(laggedSecondary, tenantId) == 1); // Disable snapshotting on that secondary, and wait for the split to abort and be garbage @@ -132,7 +132,7 @@ const kCollName = "testColl"; const laggedSecondary = test.donor.getSecondary(); const donorDoc = donorsColl.findOne({_id: operation.migrationId}); assert.neq(null, donorDoc); - const readThread = startReadThread(laggedSecondary, dbName, kCollName, donorDoc.blockTimestamp); + const readThread = startReadThread(laggedSecondary, dbName, kCollName, donorDoc.blockOpTime.ts); assert.soon(() => BasicServerlessTest.getNumBlockedReads(laggedSecondary, tenantId) == 1); // Disable snapshotting on that secondary, and wait for the split to commit and be garbage @@ -183,7 +183,7 @@ const kCollName = "testColl"; // Run a read command and a write command against the primary, and wait for them to block. const donorDoc = donorsColl.findOne({_id: operation.migrationId}); assert.neq(null, donorDoc); - const readThread = startReadThread(donorPrimary, dbName, kCollName, donorDoc.blockTimestamp); + const readThread = startReadThread(donorPrimary, dbName, kCollName, donorDoc.blockOpTime.ts); const writeThread = startWriteThread(donorPrimary, dbName, kCollName); assert.soon(() => BasicServerlessTest.getNumBlockedReads(donorPrimary, tenantId) == 1); assert.soon(() => BasicServerlessTest.getNumBlockedWrites(donorPrimary, tenantId) == 1); diff --git a/jstests/serverless/shard_split_wait_for_block_timestamp.js b/jstests/serverless/shard_split_wait_for_block_timestamp.js index e17deea7ba0..c5f15900ca3 100644 --- a/jstests/serverless/shard_split_wait_for_block_timestamp.js +++ b/jstests/serverless/shard_split_wait_for_block_timestamp.js @@ -1,5 +1,5 @@ /* - * Test that the shard split operation waits for recipient nodes to reach the blockTimestamp by + * Test that the shard split operation waits for recipient nodes to reach the blockOpTime by * pausing replication and observing the operation time out, then reenabling replication and * observing a successful split. * diff --git a/src/mongo/db/repl/repl_set_config.cpp b/src/mongo/db/repl/repl_set_config.cpp index 46f96bbbb59..4565587f3c3 100644 --- a/src/mongo/db/repl/repl_set_config.cpp +++ b/src/mongo/db/repl/repl_set_config.cpp @@ -791,6 +791,14 @@ bool ReplSetConfig::areWriteConcernModesTheSame(ReplSetConfig* otherConfig) cons return true; } +boost::optional<OpTime> ReplSetConfig::getShardSplitBlockOpTime() const { + return getSettings()->getShardSplitBlockOpTime(); +} + +void MutableReplSetConfig::removeShardSplitBlockOpTime() { + getSettings()->setShardSplitBlockOpTime(boost::none); +} + MemberConfig* MutableReplSetConfig::_findMemberByID(MemberId id) { for (auto it = getMembers().begin(); it != getMembers().end(); ++it) { if (it->getId() == id) { diff --git a/src/mongo/db/repl/repl_set_config.h b/src/mongo/db/repl/repl_set_config.h index b6e34afdc19..c8d3943e08d 100644 --- a/src/mongo/db/repl/repl_set_config.h +++ b/src/mongo/db/repl/repl_set_config.h @@ -139,6 +139,11 @@ public: */ void setSecondaryDelaySecsFieldDefault(MemberId memberId); + /** + * Removes the opTime field stored for an in-progress shard split operation. + */ + void removeShardSplitBlockOpTime(); + protected: MutableReplSetConfig() = default; @@ -560,6 +565,12 @@ public: */ bool areWriteConcernModesTheSame(ReplSetConfig* otherConfig) const; + /** + * Returns the opTime when an in-progress split operation started blocking requests, if one is + * currently running. + */ + boost::optional<OpTime> getShardSplitBlockOpTime() const; + private: /** * Sets replica set ID to 'defaultReplicaSetId' if 'cfg' does not contain an ID. diff --git a/src/mongo/db/repl/repl_set_config.idl b/src/mongo/db/repl/repl_set_config.idl index 8fc27bf199e..0848c3a02d5 100644 --- a/src/mongo/db/repl/repl_set_config.idl +++ b/src/mongo/db/repl/repl_set_config.idl @@ -129,6 +129,9 @@ structs: type: objectid optional: true validator: { callback: "validateReplicaSetIdNotNull"} + shardSplitBlockOpTime: + type: optime + optional: true ReplSetConfigBase: description: "The complete configuration for the replica set" diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 9ac44fdc62e..ddc80433aa7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -1362,7 +1362,8 @@ private: /** * Determines if the provided config is a split config, and validates it for installation. */ - std::tuple<StatusWith<ReplSetConfig>, bool> _resolveConfigToApply(const ReplSetConfig& config); + std::tuple<StatusWith<ReplSetConfig>, boost::optional<OpTime>> _resolveConfigToApply( + const ReplSetConfig& config); /** * Method to write a configuration transmitted via heartbeat message to stable storage. @@ -1376,7 +1377,7 @@ private: void _heartbeatReconfigFinish(const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex, - bool isRecipientConfig); + boost::optional<OpTime> shardSplitBlockOpTime); /** * Calculates the time (in millis) left in quiesce mode and converts the value to int64. diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 59c30e9cd41..681baa5e13f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -664,10 +664,10 @@ void ReplicationCoordinatorImpl::_scheduleHeartbeatReconfig(WithLock lk, .status_with_transitional_ignore(); } -std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolveConfigToApply( - const ReplSetConfig& config) { +std::tuple<StatusWith<ReplSetConfig>, boost::optional<OpTime>> +ReplicationCoordinatorImpl::_resolveConfigToApply(const ReplSetConfig& config) { if (!_settings.isServerless() || !config.isSplitConfig()) { - return {config, false}; + return {config, boost::none}; } stdx::unique_lock<Latch> lk(_mutex); @@ -684,12 +684,12 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve }); if (foundSelfInMembers) { - return {config, false}; + return {config, boost::none}; } return {Status(ErrorCodes::NotYetInitialized, "Cannot apply a split config if the current config is uninitialized"), - false}; + boost::none}; } auto recipientConfig = config.getRecipientConfig(); @@ -697,23 +697,25 @@ std::tuple<StatusWith<ReplSetConfig>, bool> ReplicationCoordinatorImpl::_resolve if (recipientConfig->findMemberByHostAndPort(selfMember.getHostAndPort())) { if (selfMember.getNumVotes() > 0) { return {Status(ErrorCodes::BadValue, "Cannot apply recipient config to a voting node"), - false}; + boost::none}; } if (_rsConfig.getReplSetName() == recipientConfig->getReplSetName()) { return {Status(ErrorCodes::InvalidReplicaSetConfig, "Cannot apply recipient config since current config and recipient " "config have the same set name."), - false}; + boost::none}; } + invariant(recipientConfig->getShardSplitBlockOpTime()); + auto shardSplitBlockOpTime = *recipientConfig->getShardSplitBlockOpTime(); auto mutableConfig = recipientConfig->getMutable(); - mutableConfig.setConfigVersion(1); - mutableConfig.setConfigTerm(1); - return {ReplSetConfig(std::move(mutableConfig)), true}; + mutableConfig.removeShardSplitBlockOpTime(); + + return {ReplSetConfig(std::move(mutableConfig)), shardSplitBlockOpTime}; } - return {config, false}; + return {config, boost::none}; } void ReplicationCoordinatorImpl::_heartbeatReconfigStore( @@ -729,7 +731,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( return; } - const auto [swConfig, isRecipientConfig] = _resolveConfigToApply(newConfig); + const auto [swConfig, shardSplitBlockOpTime] = _resolveConfigToApply(newConfig); if (!swConfig.isOK()) { LOGV2_WARNING(6234600, "Ignoring new configuration in heartbeat response because it is invalid", @@ -742,7 +744,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( } const auto configToApply = swConfig.getValue(); - if (isRecipientConfig) { + if (shardSplitBlockOpTime) { LOGV2(6309200, "Applying a recipient config for a shard split operation.", "config"_attr = configToApply); @@ -803,7 +805,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( auto opCtx = cc().makeOperationContext(); // Don't write the no-op for config learned via heartbeats. - auto status = [&, isRecipientConfig = isRecipientConfig]() { + auto status = [&, isRecipientConfig = shardSplitBlockOpTime.has_value()]() { if (isRecipientConfig) { return _externalState->replaceLocalConfigDocument(opCtx.get(), configToApply.toBSON()); @@ -857,7 +859,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( shouldStartDataReplication = true; } - if (isRecipientConfig) { + if (shardSplitBlockOpTime) { // Donor access blockers are removed from donor nodes via the shard split op observer. // Donor access blockers are removed from recipient nodes when the node applies the // recipient config. When the recipient primary steps up it will delete its state @@ -877,7 +879,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigStore( "configToApplyVersionAndTerm"_attr = configToApply.getConfigVersionAndTerm()); } - _heartbeatReconfigFinish(cbd, configToApply, myIndex, isRecipientConfig); + _heartbeatReconfigFinish(cbd, configToApply, myIndex, shardSplitBlockOpTime); // Start data replication after the config has been installed. if (shouldStartDataReplication) { @@ -908,7 +910,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const executor::TaskExecutor::CallbackArgs& cbData, const ReplSetConfig& newConfig, StatusWith<int> myIndex, - const bool isRecipientConfig) { + boost::optional<OpTime> shardSplitBlockOpTime) { if (cbData.status == ErrorCodes::CallbackCanceled) { return; } @@ -922,7 +924,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->scheduleWorkAt(_replExecutor->now() + Milliseconds{10}, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, isRecipientConfig); + cbData, newConfig, myIndex, shardSplitBlockOpTime); }) .status_with_transitional_ignore(); return; @@ -947,7 +949,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( ->onEvent(electionFinishedEvent, [=](const executor::TaskExecutor::CallbackArgs& cbData) { _heartbeatReconfigFinish( - cbData, newConfig, myIndex, isRecipientConfig); + cbData, newConfig, myIndex, shardSplitBlockOpTime); }) .status_with_transitional_ignore(); return; @@ -1000,7 +1002,7 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( invariant(_rsConfigState == kConfigHBReconfiguring); invariant(!_rsConfig.isInitialized() || _rsConfig.getConfigVersionAndTerm() < newConfig.getConfigVersionAndTerm() || - _selfIndex < 0 || isRecipientConfig); + _selfIndex < 0 || shardSplitBlockOpTime); if (!myIndex.isOK()) { switch (myIndex.getStatus().code()) { @@ -1038,6 +1040,10 @@ void ReplicationCoordinatorImpl::_heartbeatReconfigFinish( const PostMemberStateUpdateAction action = _setCurrentRSConfig(lk, opCtx.get(), newConfig, myIndexValue); + if (shardSplitBlockOpTime) { + _topCoord->resetLastCommittedOpTime(*shardSplitBlockOpTime); + } + lk.unlock(); if (contentChanged) { _externalState->notifyOtherMemberDataChanged(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index b01a5d51ea3..23e1d594488 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -705,7 +705,8 @@ public: // Add the raw config object. auto conf = ReplSetConfig::parse(makeConfigObj(configVersion, termVersion)); - auto splitConf = serverless::makeSplitConfig(conf, _recipientSetName, _recipientTag); + auto splitConf = serverless::makeSplitConfig( + conf, _recipientSetName, _recipientTag, repl::OpTime(Timestamp(12345, 1), termVersion)); // makeSplitConf increment the config version. We don't want that here as it makes the unit // test case harder to follow. @@ -819,11 +820,14 @@ TEST_F(ReplCoordHBV1SplitConfigTest, RecipientNodeApplyConfig) { getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(responseObj)); getNet()->runReadyNetworkOperations(); - // The recipient's config and term versions are set to 1. - ASSERT_EQ(getReplCoord()->getConfigVersion(), 1); - ASSERT_EQ(getReplCoord()->getConfigTerm(), 1); + // The recipient's lastCommittedOpTime is reset to the blockOpTime on applying the recipient + // config. + ASSERT_EQ(getReplCoord()->getLastCommittedOpTime(), + repl::OpTime(Timestamp(12345, 1), getReplCoord()->getConfigTerm())); - validateNextRequest("", _recipientSetName, 1, 1); + // Applying the recipient config will increase the configVersion by 1. + validateNextRequest( + "", _recipientSetName, (_configVersion + 2), getReplCoord()->getConfigTerm()); } TEST_F(ReplCoordHBV1SplitConfigTest, RejectMismatchedSetNameInHeartbeatResponse) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index 003e70c0ff7..3c61693428e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -1391,8 +1391,10 @@ TEST_F(ReplCoordReconfigTest, MustSendHeartbeatToSplitConfigRecipients) { BSONObjBuilder result; const auto opCtx = makeOperationContext(); - auto newConfig = mongo::serverless::makeSplitConfig( - ReplSetConfig::parse(oldConfigObj), "recipientSet", recipientTagName); + auto newConfig = mongo::serverless::makeSplitConfig(ReplSetConfig::parse(oldConfigObj), + "recipientSet", + recipientTagName, + repl::OpTime(Timestamp(100, 0), 1)); Status status(ErrorCodes::InternalError, "Not Set"); stdx::thread reconfigThread; diff --git a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp index ed285df2043..025c5495ca4 100644 --- a/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp +++ b/src/mongo/db/repl/tenant_migration_access_blocker_util.cpp @@ -452,20 +452,20 @@ void recoverTenantMigrationAccessBlockers(OperationContext* opCtx) { case ShardSplitDonorStateEnum::kAbortingIndexBuilds: break; case ShardSplitDonorStateEnum::kBlocking: - invariant(doc.getBlockTimestamp()); + invariant(doc.getBlockOpTime()); mtab->startBlockingWrites(); - mtab->startBlockingReadsAfter(doc.getBlockTimestamp().value()); + mtab->startBlockingReadsAfter(doc.getBlockOpTime()->getTimestamp()); break; case ShardSplitDonorStateEnum::kCommitted: - invariant(doc.getBlockTimestamp()); + invariant(doc.getBlockOpTime()); mtab->startBlockingWrites(); - mtab->startBlockingReadsAfter(doc.getBlockTimestamp().value()); + mtab->startBlockingReadsAfter(doc.getBlockOpTime()->getTimestamp()); mtab->setCommitOpTime(opCtx, doc.getCommitOrAbortOpTime().value()); break; case ShardSplitDonorStateEnum::kAborted: - if (doc.getBlockTimestamp()) { + if (doc.getBlockOpTime()) { mtab->startBlockingWrites(); - mtab->startBlockingReadsAfter(doc.getBlockTimestamp().value()); + mtab->startBlockingReadsAfter(doc.getBlockOpTime()->getTimestamp()); } mtab->setAbortOpTime(opCtx, doc.getCommitOrAbortOpTime().value()); break; diff --git a/src/mongo/db/repl/topology_coordinator.cpp b/src/mongo/db/repl/topology_coordinator.cpp index 030ad0a2792..b112eab9af0 100644 --- a/src/mongo/db/repl/topology_coordinator.cpp +++ b/src/mongo/db/repl/topology_coordinator.cpp @@ -2965,6 +2965,11 @@ bool TopologyCoordinator::advanceLastCommittedOpTimeAndWallTime(OpTimeAndWallTim return true; } +void TopologyCoordinator::resetLastCommittedOpTime(const OpTime& lastCommittedOpTime) { + LOGV2(8423364, "Resetting commit point", "lastCommittedOpTime"_attr = lastCommittedOpTime); + _lastCommittedOpTimeAndWallTime = OpTimeAndWallTime(lastCommittedOpTime, Date_t::now()); +} + OpTime TopologyCoordinator::getLastCommittedOpTime() const { return _lastCommittedOpTimeAndWallTime.opTime; } diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 3285a5b4825..6c2231d3904 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -322,6 +322,11 @@ public: bool forInitiate = false); /** + * Resets the commit point to the provided opTime, with a wall time of now. + */ + void resetLastCommittedOpTime(const OpTime& lastCommittedOpTime); + + /** * Returns the OpTime of the latest majority-committed op known to this server. */ OpTime getLastCommittedOpTime() const; diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp index 8aaedfb6fa2..0e25639559f 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.cpp @@ -63,10 +63,9 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { switch (donorStateDoc.getState()) { case ShardSplitDonorStateEnum::kUninitialized: uassert(ErrorCodes::BadValue, - fmt::format(errmsg, - "BlockTimeStamp should not be set in data sync state", - doc.toString()), - !donorStateDoc.getBlockTimestamp()); + fmt::format( + errmsg, "blockOpTime should not be set in data sync state", doc.toString()), + !donorStateDoc.getBlockOpTime()); uassert(ErrorCodes::BadValue, fmt::format(errmsg, "CommitOrAbortOpTime should not be set in data sync state", @@ -81,15 +80,15 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { case ShardSplitDonorStateEnum::kAbortingIndexBuilds: uassert(ErrorCodes::BadValue, errmsg, - !donorStateDoc.getBlockTimestamp() && !donorStateDoc.getCommitOrAbortOpTime() && + !donorStateDoc.getBlockOpTime() && !donorStateDoc.getCommitOrAbortOpTime() && !donorStateDoc.getAbortReason()); break; case ShardSplitDonorStateEnum::kBlocking: uassert(ErrorCodes::BadValue, fmt::format(errmsg, - "Missing blockTimeStamp while being in blocking state", + "Missing blockOpTime while being in blocking state", doc.toString()), - donorStateDoc.getBlockTimestamp()); + donorStateDoc.getBlockOpTime()); uassert( ErrorCodes::BadValue, fmt::format(errmsg, @@ -105,9 +104,9 @@ ShardSplitDonorDocument parseAndValidateDonorDocument(const BSONObj& doc) { case ShardSplitDonorStateEnum::kCommitted: uassert(ErrorCodes::BadValue, fmt::format(errmsg, - "Missing blockTimeStamp while being in committed state", + "Missing blockOpTime while being in committed state", doc.toString()), - donorStateDoc.getBlockTimestamp()); + donorStateDoc.getBlockOpTime()); uassert(ErrorCodes::BadValue, fmt::format(errmsg, "Missing CommitOrAbortOpTime while being in committed state", @@ -172,7 +171,7 @@ void onTransitionToAbortingIndexBuilds(OperationContext* opCtx, */ void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocument& donorStateDoc) { invariant(donorStateDoc.getState() == ShardSplitDonorStateEnum::kBlocking); - invariant(donorStateDoc.getBlockTimestamp()); + invariant(donorStateDoc.getBlockOpTime()); invariant(donorStateDoc.getTenantIds()); auto tenantIds = *donorStateDoc.getTenantIds(); @@ -191,7 +190,7 @@ void onTransitionToBlocking(OperationContext* opCtx, const ShardSplitDonorDocume // Both primaries and secondaries call startBlockingReadsAfter in the op observer, since // startBlockingReadsAfter just needs to be called before the "start blocking" write's oplog // hole is filled. - mtab->startBlockingReadsAfter(donorStateDoc.getBlockTimestamp().value()); + mtab->startBlockingReadsAfter(donorStateDoc.getBlockOpTime()->getTimestamp()); } } diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp index 93f7f3df93a..97c923524a3 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_op_observer_test.cpp @@ -295,7 +295,7 @@ TEST_F(ShardSplitDonorOpObserverTest, UpdateBlockingDocumentPrimary) { auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); - stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 1), 1)); auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { ASSERT_TRUE(mtab); @@ -321,7 +321,7 @@ TEST_F(ShardSplitDonorOpObserverTest, UpdateBlockingDocumentSecondary) { auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); - stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 1), 1)); auto mtabVerifier = [opCtx = _opCtx.get()](std::shared_ptr<TenantMigrationAccessBlocker> mtab) { ASSERT_TRUE(mtab); @@ -369,7 +369,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToCommit) { auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kCommitted); - stateDocument.setBlockTimestamp(Timestamp(1, 2)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 2), 1)); stateDocument.setCommitOrAbortOpTime(commitOpTime); auto blockers = createBlockersAndStartBlockingWrites(_tenantIds, _opCtx.get()); @@ -400,7 +400,7 @@ TEST_F(ShardSplitDonorOpObserverTest, TransitionToAbort) { auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kAborted); - stateDocument.setBlockTimestamp(Timestamp(1, 2)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 2), 1)); stateDocument.setCommitOrAbortOpTime(abortOpTime); stateDocument.setAbortReason(bob.obj()); @@ -431,7 +431,7 @@ TEST_F(ShardSplitDonorOpObserverTest, SetExpireAtForAbortedRemoveBlockers) { auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kAborted); - stateDocument.setBlockTimestamp(Timestamp(1, 2)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 2), 1)); stateDocument.setCommitOrAbortOpTime(abortOpTime); stateDocument.setAbortReason(bob.obj()); stateDocument.setExpireAt(mongo::Date_t::fromMillisSinceEpoch(1000)); @@ -460,7 +460,7 @@ TEST_F(ShardSplitDonorOpObserverTest, DeleteAbortedDocumentDoesNotRemoveBlockers auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kAborted); - stateDocument.setBlockTimestamp(Timestamp(1, 2)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 2), 1)); stateDocument.setCommitOrAbortOpTime(abortOpTime); stateDocument.setAbortReason(bob.obj()); stateDocument.setExpireAt(mongo::Date_t::fromMillisSinceEpoch(1000)); @@ -502,7 +502,7 @@ TEST_F(ShardSplitDonorOpObserverTest, DeleteCommittedDocumentRemovesBlockers) { auto stateDocument = defaultStateDocument(); stateDocument.setState(ShardSplitDonorStateEnum::kCommitted); - stateDocument.setBlockTimestamp(Timestamp(1, 2)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 2), 1)); stateDocument.setCommitOrAbortOpTime(commitOpTime); stateDocument.setExpireAt(mongo::Date_t::fromMillisSinceEpoch(1000)); diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 1d981d6bf29..f2bfdf0af70 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -357,7 +357,7 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get()); - return _waitForRecipientToReachBlockTimestamp(executor, abortToken); + return _waitForRecipientToReachBlockOpTime(executor, abortToken); }) .then([this, executor, abortToken, criticalSectionWithoutCatchupTimer] { criticalSectionWithoutCatchupTimer->reset(); @@ -500,8 +500,8 @@ boost::optional<BSONObj> ShardSplitDonorService::DonorStateMachine::reportForCur if (tenantIds) { bob.append("tenantIds", *tenantIds); } - if (_stateDoc.getBlockTimestamp()) { - bob.append("blockTimestamp", *_stateDoc.getBlockTimestamp()); + if (_stateDoc.getBlockOpTime()) { + _stateDoc.getBlockOpTime()->append(&bob, "blockOpTime"); } if (_stateDoc.getCommitOrAbortOpTime()) { _stateDoc.getCommitOrAbortOpTime()->append(&bob, "commitOrAbortOpTime"); @@ -673,8 +673,7 @@ ShardSplitDonorService::DonorStateMachine::_abortIndexBuildsAndEnterBlockingStat }); } -ExecutorFuture<void> -ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestamp( +ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockOpTime( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { checkForTokenInterrupt(abortToken); @@ -686,9 +685,8 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestam auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); - invariant(_stateDoc.getBlockTimestamp()); - auto blockTimestamp = *_stateDoc.getBlockTimestamp(); - repl::OpTime blockOpTime = repl::OpTime(blockTimestamp, replCoord->getConfigTerm()); + invariant(_stateDoc.getBlockOpTime()); + auto blockOpTime = *_stateDoc.getBlockOpTime(); invariant(_stateDoc.getRecipientTagName()); auto recipientTagName = *_stateDoc.getRecipientTagName(); @@ -721,16 +719,18 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_applySplitConfi auto splitConfig = [&]() { stdx::lock_guard<Latch> lg(_mutex); - auto setName = _stateDoc.getRecipientSetName(); - invariant(setName); - auto tagName = _stateDoc.getRecipientTagName(); - invariant(tagName); + invariant(_stateDoc.getRecipientSetName()); + auto recipientSetName = _stateDoc.getRecipientSetName()->toString(); + invariant(_stateDoc.getRecipientTagName()); + auto recipientTagName = _stateDoc.getRecipientTagName()->toString(); + invariant(_stateDoc.getBlockOpTime()); + auto blockOpTime = *_stateDoc.getBlockOpTime(); auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); invariant(replCoord); return serverless::makeSplitConfig( - replCoord->getConfig(), setName->toString(), tagName->toString()); + replCoord->getConfig(), recipientSetName, recipientTagName, blockOpTime); }(); LOGV2(6309100, @@ -916,7 +916,7 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS case ShardSplitDonorStateEnum::kAbortingIndexBuilds: break; case ShardSplitDonorStateEnum::kBlocking: - _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp()); + _stateDoc.setBlockOpTime(oplogSlot); break; case ShardSplitDonorStateEnum::kCommitted: _stateDoc.setCommitOrAbortOpTime(oplogSlot); diff --git a/src/mongo/db/serverless/shard_split_donor_service.h b/src/mongo/db/serverless/shard_split_donor_service.h index 43900186ba2..ddfd91b1f20 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.h +++ b/src/mongo/db/serverless/shard_split_donor_service.h @@ -168,8 +168,8 @@ private: ExecutorFuture<void> _abortIndexBuildsAndEnterBlockingState( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); - ExecutorFuture<void> _waitForRecipientToReachBlockTimestamp( - const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); + ExecutorFuture<void> _waitForRecipientToReachBlockOpTime(const ScopedTaskExecutorPtr& executor, + const CancellationToken& abortToken); ExecutorFuture<void> _applySplitConfigToDonor(const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken); diff --git a/src/mongo/db/serverless/shard_split_donor_service_test.cpp b/src/mongo/db/serverless/shard_split_donor_service_test.cpp index e3fe18e8ac2..74fc568a7d1 100644 --- a/src/mongo/db/serverless/shard_split_donor_service_test.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service_test.cpp @@ -1003,7 +1003,7 @@ public: ShardSplitDonorDocument initialStateDocument() override { auto stateDocument = defaultStateDocument(); - stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 1), 1)); stateDocument.setState(ShardSplitDonorStateEnum::kBlocking); stateDocument.setRecipientConnectionString(ConnectionString::forLocal()); @@ -1070,7 +1070,7 @@ public: auto stateDocument = defaultStateDocument(); stateDocument.setState(mongo::ShardSplitDonorStateEnum::kAborted); - stateDocument.setBlockTimestamp(Timestamp(1, 1)); + stateDocument.setBlockOpTime(repl::OpTime(Timestamp(1, 1), 1)); stateDocument.setCommitOrAbortOpTime(repl::OpTime(Timestamp(1, 1), 1)); Status status(ErrorCodes::InternalError, abortReason); diff --git a/src/mongo/db/serverless/shard_split_state_machine.idl b/src/mongo/db/serverless/shard_split_state_machine.idl index ee3462f5a05..49fa577b252 100644 --- a/src/mongo/db/serverless/shard_split_state_machine.idl +++ b/src/mongo/db/serverless/shard_split_state_machine.idl @@ -76,10 +76,10 @@ structs: type: ShardSplitDonorState description: "The current state of the shard split operation." default: kUninitialized - blockTimestamp: - type: timestamp + blockOpTime: + type: optime description: >- - The timestamp at which writes and causal reads against the databases + The opTime at which writes and causal reads against the databases being migrated should start blocking. optional: true commitOrAbortOpTime: diff --git a/src/mongo/db/serverless/shard_split_utils.cpp b/src/mongo/db/serverless/shard_split_utils.cpp index 780ea0b7e63..35822ba118c 100644 --- a/src/mongo/db/serverless/shard_split_utils.cpp +++ b/src/mongo/db/serverless/shard_split_utils.cpp @@ -81,7 +81,8 @@ ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, const std::string& recipientSetName, - const std::string& recipientTagName) { + const std::string& recipientTagName, + const repl::OpTime& blockOpTime) { dassert(!recipientSetName.empty() && recipientSetName != config.getReplSetName()); uassert(6201800, "We can not make a split config of an existing split config.", @@ -125,15 +126,18 @@ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, recipientConfigBob.append("_id", recipientSetName) .append("members", recipientMembers) .append("version", updatedVersion); - if (configNoMembersBson.hasField("settings") && - configNoMembersBson.getField("settings").isABSONObj()) { - BSONObj settings = configNoMembersBson.getField("settings").Obj(); - if (settings.hasField("replicaSetId")) { - recipientConfigBob.append( - "settings", - settings.removeField("replicaSetId").addFields(BSON("replicaSetId" << OID::gen()))); + + recipientConfigBob.append("settings", [&]() { + if (configNoMembersBson.hasField("settings") && + configNoMembersBson.getField("settings").isABSONObj()) { + BSONObj settings = configNoMembersBson.getField("settings").Obj(); + return settings.removeField("replicaSetId") + .addFields( + BSON("replicaSetId" << OID::gen() << "shardSplitBlockOpTime" << blockOpTime)); } - } + + return BSON("shardSplitBlockOpTime" << blockOpTime); + }()); BSONObjBuilder splitConfigBob(configNoMembersBson); splitConfigBob.append("version", updatedVersion); diff --git a/src/mongo/db/serverless/shard_split_utils.h b/src/mongo/db/serverless/shard_split_utils.h index db24008a055..a8387072ff1 100644 --- a/src/mongo/db/serverless/shard_split_utils.h +++ b/src/mongo/db/serverless/shard_split_utils.h @@ -60,7 +60,8 @@ ConnectionString makeRecipientConnectionString(const repl::ReplSetConfig& config */ repl::ReplSetConfig makeSplitConfig(const repl::ReplSetConfig& config, const std::string& recipientSetName, - const std::string& recipientTagName); + const std::string& recipientTagName, + const repl::OpTime& blockOpTime); /** * Inserts the shard split state document 'stateDoc' into diff --git a/src/mongo/db/serverless/shard_split_utils_test.cpp b/src/mongo/db/serverless/shard_split_utils_test.cpp index 0c85517c262..206807f5967 100644 --- a/src/mongo/db/serverless/shard_split_utils_test.cpp +++ b/src/mongo/db/serverless/shard_split_utils_test.cpp @@ -55,8 +55,8 @@ TEST(MakeSplitConfig, recipientConfigHasNewReplicaSetId) { << donorReplSetId))); const std::string recipientConfigSetName{"newSet"}; - const ReplSetConfig splitConfigResult = - serverless::makeSplitConfig(configA, recipientConfigSetName, recipientTagName); + const ReplSetConfig splitConfigResult = serverless::makeSplitConfig( + configA, recipientConfigSetName, recipientTagName, repl::OpTime(Timestamp(100, 0), 1)); ASSERT_EQ(splitConfigResult.getReplicaSetId(), donorReplSetId); ASSERT_NE(splitConfigResult.getReplicaSetId(), @@ -90,8 +90,8 @@ TEST(MakeSplitConfig, toBSONRoundTripAbility) { ASSERT_TRUE(configA == configB); const std::string recipientConfigSetName{"newSet"}; - const ReplSetConfig splitConfigResult = - serverless::makeSplitConfig(configA, recipientConfigSetName, recipientTagName); + const ReplSetConfig splitConfigResult = serverless::makeSplitConfig( + configA, recipientConfigSetName, recipientTagName, repl::OpTime(Timestamp(100, 0), 1)); // here we will test that the result from the method `makeSplitConfig` matches the hardcoded // resultSplitConfigBSON. We will also check that the recipient from the splitConfig matches @@ -155,8 +155,8 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) { << BSON("electionTimeoutMillis" << 1000 << "replicaSetId" << OID::gen()))); - const ReplSetConfig splitConfig = - serverless::makeSplitConfig(config, recipientConfigSetName, recipientTagName); + const ReplSetConfig splitConfig = serverless::makeSplitConfig( + config, recipientConfigSetName, recipientTagName, repl::OpTime(Timestamp(100, 0), 1)); ASSERT_OK(splitConfig.validate()); ASSERT_EQ(splitConfig.getReplSetName(), donorConfigSetName); ASSERT_TRUE(splitConfig.toBSON().hasField("members")); @@ -177,10 +177,12 @@ TEST(MakeSplitConfig, ValidateSplitConfigIntegrityTest) { ASSERT_TRUE(recipientConfigPtr->getRecipientConfig() == nullptr); ASSERT_EQ(recipientConfigPtr->getReplSetName(), recipientConfigSetName); - ASSERT_THROWS_CODE( - serverless::makeSplitConfig(splitConfig, recipientConfigSetName, recipientTagName), - AssertionException, - 6201800 /*calling on a splitconfig*/); + ASSERT_THROWS_CODE(serverless::makeSplitConfig(splitConfig, + recipientConfigSetName, + recipientTagName, + repl::OpTime(Timestamp(100, 0), 1)), + AssertionException, + 6201800 /*calling on a splitconfig*/); } TEST(MakeSplitConfig, SplitConfigAssertionsTest) { @@ -195,7 +197,8 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) { ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), recipientConfigSetName, - recipientTagName), + recipientTagName, + repl::OpTime(Timestamp(100, 0), 1)), AssertionException, 6201801 /*no recipient members created*/); @@ -210,7 +213,8 @@ TEST(MakeSplitConfig, SplitConfigAssertionsTest) { ASSERT_THROWS_CODE(serverless::makeSplitConfig(ReplSetConfig::parse(baseConfigBSON), recipientConfigSetName, - recipientTagName), + recipientTagName, + repl::OpTime(Timestamp(100, 0), 1)), AssertionException, 6201802 /*no donor members created*/); } |