diff options
author | Randolph Tan <randolph@10gen.com> | 2022-11-01 19:52:33 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-16 19:08:59 +0000 |
commit | 051e9310e027a52591ae9100078c24efafebad47 (patch) | |
tree | 1b92a005c9254f41d59c1abe143f44131ae73a75 | |
parent | 8d9f1726104f30d90961611c7da54c4cacf9199a (diff) | |
download | mongo-051e9310e027a52591ae9100078c24efafebad47.tar.gz |
SERVER-68361 Make migration properly handle cases when shard key value modification also results to changes in chunk membership
(cherry picked from commit e8ee517f32043af220c8db1a739c2d80b2cb0969)
-rw-r--r-- | etc/backports_required_for_multiversion_tests.yml | 4 | ||||
-rw-r--r-- | jstests/libs/chunk_manipulation_util.js | 3 | ||||
-rw-r--r-- | jstests/sharding/prepare_transaction_then_migrate.js | 190 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 14 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 98 |
6 files changed, 233 insertions, 83 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 1755b51386d..dff8fb855fe 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -86,6 +86,8 @@ last-continuous: ticket: SERVER-68628 - test_file: jstests/sharding/move_chunk_interrupt_postimage.js ticket: SERVER-68728 + - test_file: jstests/sharding/prepare_transaction_then_migrate.js + ticket: SERVER-68361 suites: change_streams_multiversion_passthrough: null change_streams_sharded_collections_multiversion_passthrough: null @@ -269,6 +271,8 @@ last-lts: ticket: SERVER-68628 - test_file: jstests/sharding/move_chunk_interrupt_postimage.js ticket: SERVER-68728 + - test_file: jstests/sharding/prepare_transaction_then_migrate.js + ticket: SERVER-68361 suites: change_streams_multiversion_passthrough: null change_streams_sharded_collections_multiversion_passthrough: null diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js index 8f6b05ebf5d..e2ebfcbfd0b 100644 --- a/jstests/libs/chunk_manipulation_util.js +++ b/jstests/libs/chunk_manipulation_util.js @@ -138,7 +138,8 @@ function waitForMoveChunkStep(shardConnection, stepNumber) { assert.soon(function() { var inProgressStr = ''; - let in_progress = admin.aggregate([{$currentOp: {'allUsers': true}}]); + let in_progress = + admin.aggregate([{$currentOp: {'allUsers': true, idleConnections: true}}]); while (in_progress.hasNext()) { let op = in_progress.next(); diff --git a/jstests/sharding/prepare_transaction_then_migrate.js b/jstests/sharding/prepare_transaction_then_migrate.js index 034259d02be..36a9581752d 100644 --- a/jstests/sharding/prepare_transaction_then_migrate.js +++ b/jstests/sharding/prepare_transaction_then_migrate.js @@ -9,6 +9,7 @@ (function() { "use strict"; load('jstests/libs/chunk_manipulation_util.js'); +load('jstests/sharding/libs/create_sharded_collection_util.js'); load('jstests/sharding/libs/sharded_transactions_helpers.js'); const dbName = "test"; @@ -16,58 +17,149 @@ const collName = "user"; const staticMongod = MongoRunner.runMongod({}); // For startParallelOps. -const st = new ShardingTest({shards: {rs0: {nodes: 1}, rs1: {nodes: 1}}}); -st.adminCommand({enableSharding: 'test'}); -st.ensurePrimaryShard('test', st.shard0.shardName); -st.adminCommand({shardCollection: 'test.user', key: {_id: 1}}); +let runTest = function(withStepUp) { + const st = new ShardingTest({shards: {rs0: {nodes: withStepUp ? 2 : 1}, rs1: {nodes: 1}}}); + const collection = st.s.getDB(dbName).getCollection(collName); -const session = st.s.startSession({causalConsistency: false}); -const sessionDB = session.getDatabase(dbName); -const sessionColl = sessionDB.getCollection(collName); + CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {x: 1}, [ + {min: {x: MinKey}, max: {x: 0}, shard: st.shard0.shardName}, + {min: {x: 0}, max: {x: 1000}, shard: st.shard0.shardName}, + {min: {x: 1000}, max: {x: MaxKey}, shard: st.shard1.shardName}, + ]); -assert.commandWorked(sessionColl.insert({_id: 1})); + assert.commandWorked(collection.insert([ + {_id: 1, x: -1, note: "move into chunk range being migrated"}, + {_id: 2, x: -2, note: "keep out of chunk range being migrated"}, + {_id: 3, x: 50, note: "move out of chunk range being migrated"}, + {_id: 4, x: 100, note: "keep in chunk range being migrated"}, + ])); -const lsid = { - id: UUID() + const lsid = {id: UUID()}; + const txnNumber = 0; + let stmtId = 0; + + assert.commandWorked(st.s0.getDB(dbName).runCommand({ + insert: collName, + documents: [ + {_id: 5, x: -1.01, note: "move into chunk range being migrated"}, + {_id: 6, x: -2.01, note: "keep out of chunk range being migrated"}, + {_id: 7, x: 50.01, note: "move out of chunk range being migrated"}, + {_id: 8, x: 100.01, note: "keep in chunk range being migrated"}, + ], + lsid: lsid, + txnNumber: NumberLong(txnNumber), + stmtId: NumberInt(stmtId++), + startTransaction: true, + autocommit: false, + })); + + assert.commandWorked(st.s.getDB(dbName).runCommand({ + update: collName, + updates: [ + {q: {x: -1}, u: {$set: {x: 5}}}, + {q: {x: -2}, u: {$set: {x: -10}}}, + {q: {x: 50}, u: {$set: {x: -20}}}, + {q: {x: 100}, u: {$set: {x: 500}}}, + {q: {x: -1.01}, u: {$set: {x: 5.01}}}, + {q: {x: -2.01}, u: {$set: {x: -10.01}}}, + {q: {x: 50.01}, u: {$set: {x: -20.01}}}, + {q: {x: 100.01}, u: {$set: {x: 500.01}}}, + ], + lsid: lsid, + txnNumber: NumberLong(txnNumber), + stmtId: NumberInt(stmtId++), + autocommit: false, + })); + + const res = assert.commandWorked(st.shard0.getDB(dbName).adminCommand({ + prepareTransaction: 1, + lsid: lsid, + txnNumber: NumberLong(txnNumber), + autocommit: false, + writeConcern: {w: "majority"}, + })); + + let prepareTimestamp = res.prepareTimestamp; + + if (withStepUp) { + st.rs0.stepUp(st.rs0.getSecondary()); + } + + const joinMoveChunk = + moveChunkParallel(staticMongod, st.s.host, {x: 1}, null, 'test.user', st.shard1.shardName); + + pauseMigrateAtStep(st.shard1, migrateStepNames.catchup); + + // The donor shard only ignores prepare conflicts while scanning over the shard key index. We + // wait for donor shard to have finished buffering the RecordIds into memory from scanning over + // the shard key index before committing the transaction. Notably, the donor shard doesn't + // ignore prepare conflicts when fetching the full contents of the documents during calls to + // _migrateClone. + // + // TODO: SERVER-71028 Remove comment after making changes. + + waitForMoveChunkStep(st.shard0, moveChunkStepNames.startedMoveChunk); + + assert.commandWorked( + st.shard0.getDB(dbName).adminCommand(Object.assign({ + commitTransaction: 1, + lsid: lsid, + txnNumber: NumberLong(txnNumber), + autocommit: false, + }, + {commitTimestamp: prepareTimestamp}))); + + unpauseMigrateAtStep(st.shard1, migrateStepNames.catchup); + + joinMoveChunk(); + + class ArrayCursor { + constructor(arr) { + this.i = 0; + this.arr = arr; + } + + hasNext() { + return this.i < this.arr.length; + } + + next() { + return this.arr[this.i++]; + } + } + + const expected = new ArrayCursor([ + {_id: 1, x: 5, note: "move into chunk range being migrated"}, + {_id: 2, x: -10, note: "keep out of chunk range being migrated"}, + {_id: 3, x: -20, note: "move out of chunk range being migrated"}, + {_id: 4, x: 500, note: "keep in chunk range being migrated"}, + {_id: 5, x: 5.01, note: "move into chunk range being migrated"}, + {_id: 6, x: -10.01, note: "keep out of chunk range being migrated"}, + {_id: 7, x: -20.01, note: "move out of chunk range being migrated"}, + {_id: 8, x: 500.01, note: "keep in chunk range being migrated"}, + ]); + + const diff = ((diff) => { + return { + docsWithDifferentContents: diff.docsWithDifferentContents.map( + ({first, second}) => ({expected: first, actual: second})), + docsExtraAfterMigration: diff.docsMissingOnFirst, + docsMissingAfterMigration: diff.docsMissingOnSecond, + }; + })(DataConsistencyChecker.getDiff(expected, collection.find().sort({_id: 1, x: 1}))); + + assert.eq(diff, { + docsWithDifferentContents: [], + docsExtraAfterMigration: [], + docsMissingAfterMigration: [], + }); + + st.stop(); }; -const txnNumber = 0; -const stmtId = 0; - -assert.commandWorked(st.s0.getDB(dbName).runCommand({ - insert: collName, - documents: [{_id: 2}, {_id: 5}, {_id: 15}], - lsid: lsid, - txnNumber: NumberLong(txnNumber), - stmtId: NumberInt(stmtId), - startTransaction: true, - autocommit: false, -})); - -const res = assert.commandWorked(st.shard0.getDB(dbName).adminCommand({ - prepareTransaction: 1, - lsid: lsid, - txnNumber: NumberLong(txnNumber), - autocommit: false, -})); - -const joinMoveChunk = - moveChunkParallel(staticMongod, st.s.host, {_id: 1}, null, 'test.user', st.shard1.shardName); - -// Wait for catchup to verify that the migration has exited the clone phase. -waitForMigrateStep(st.shard1, migrateStepNames.catchup); - -assert.commandWorked(st.shard0.getDB(dbName).adminCommand({ - commitTransaction: 1, - lsid: lsid, - txnNumber: NumberLong(txnNumber), - autocommit: false, - commitTimestamp: res.prepareTimestamp, -})); - -joinMoveChunk(); - -assert.eq(sessionColl.find({_id: 2}).count(), 1); - -st.stop(); + +runTest(false); +// TODO: SERVER-71219 Enable test after fixing. +// runTest(true); + MongoRunner.stopMongod(staticMongod); })(); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 5421aeae099..9c514890321 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -673,6 +673,13 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg operation.setPreImage(args.updateArgs.preImageDoc->getOwned()); } + auto collectionDescription = + CollectionShardingState::get(opCtx, args.nss)->getCollectionDescription(opCtx); + if (collectionDescription.isSharded()) { + operation.setPostImageDocumentKey( + collectionDescription.extractDocumentKey(args.updateArgs.updatedDoc).getOwned()); + } + txnParticipant.addTransactionOperation(opCtx, operation); } else { MutableOplogEntry oplogEntry; diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 9d196e2c85d..30585b6befc 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -61,11 +61,13 @@ public: o.parseProtected(ctxt, bsonObject); return o; } - const BSONObj& getPreImageDocumentKey() const { - return _preImageDocumentKey; + + const BSONObj& getPostImageDocumentKey() const { + return _postImageDocumentKey; } - void setPreImageDocumentKey(BSONObj value) { - _preImageDocumentKey = std::move(value); + + void setPostImageDocumentKey(BSONObj value) { + _postImageDocumentKey = std::move(value); } const BSONObj& getPreImage() const { @@ -77,7 +79,9 @@ public: } private: - BSONObj _preImageDocumentKey; + // Stores the post image _id + shard key values. + BSONObj _postImageDocumentKey; + BSONObj _fullPreImage; }; diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 1da7e80b7b2..48a708f3b93 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -75,12 +75,22 @@ const Hours kMaxWaitToCommitCloneForJumboChunk(6); MONGO_FAIL_POINT_DEFINE(failTooMuchMemoryUsed); -bool isInRange(const BSONObj& obj, - const BSONObj& min, - const BSONObj& max, - const ShardKeyPattern& shardKeyPattern) { - BSONObj k = shardKeyPattern.extractShardKeyFromDoc(obj); - return k.woCompare(min) >= 0 && k.woCompare(max) < 0; +/** + * Returns true if the given BSON object in the shard key value pair format is within the given + * range. + */ +bool isShardKeyValueInRange(const BSONObj& shardKeyValue, const BSONObj& min, const BSONObj& max) { + return shardKeyValue.woCompare(min) >= 0 && shardKeyValue.woCompare(max) < 0; +} + +/** + * Returns true if the given BSON document is within the given chunk range. + */ +bool isDocInRange(const BSONObj& obj, + const BSONObj& min, + const BSONObj& max, + const ShardKeyPattern& shardKeyPattern) { + return isShardKeyValueInRange(shardKeyPattern.extractShardKeyFromDoc(obj), min, max); } BSONObj createRequestWithSessionId(StringData commandName, @@ -180,14 +190,13 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get()); auto opType = stmt.getOpType(); - auto documentKey = getDocumentKeyFromReplOperation(stmt, opType); + auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType); - auto idElement = documentKey["_id"]; + auto idElement = preImageDocKey["_id"]; if (idElement.eoo()) { LOGV2_WARNING(21994, - "Received a document without an _id field, ignoring: {documentKey}", "Received a document without an _id and will ignore that document", - "documentKey"_attr = redact(documentKey)); + "documentKey"_attr = redact(preImageDocKey)); continue; } @@ -195,18 +204,42 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam auto const& maxKey = cloner->_args.getMaxKey(); auto const& shardKeyPattern = cloner->_shardKeyPattern; - if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) { - // If the preImageDoc is not in range but the postImageDoc was, we know that the - // document has changed shard keys and no longer belongs in the chunk being cloned. - // We will model the deletion of the preImage document so that the destination chunk - // does not receive an outdated version of this document. - if (opType == repl::OpTypeEnum::kUpdate && - isInRange(stmt.getPreImageDocumentKey(), minKey, maxKey, shardKeyPattern) && - !stmt.getPreImageDocumentKey()["_id"].eoo()) { - opType = repl::OpTypeEnum::kDelete; - idElement = stmt.getPreImageDocumentKey()["id"]; - } else { - continue; + // Note: This assumes that prepared transactions will always have post document key + // set. There is a small window where create collection coordinator releases the critical + // section and before it writes down the chunks for non-empty collections. So in theory, + // it is possible to have a prepared transaction while collection is unsharded + // and becomes sharded midway. This doesn't happen in practice because the only way to + // have a prepared transactions without being sharded is by directly connecting to the + // shards and manually preparing the transaction. Another exception is when transaction + // is prepared on an older version that doesn't set the post image document key. + auto postImageDocKey = stmt.getPostImageDocumentKey(); + if (postImageDocKey.isEmpty()) { + LOGV2_WARNING( + 6836102, + "Migration encountered a transaction operation without a post image document key", + "preImageDocKey"_attr = preImageDocKey); + } else { + auto postShardKeyValues = + shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey); + fassert(6836100, !postShardKeyValues.isEmpty()); + + if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) { + // If the preImageDoc is not in range but the postImageDoc was, we know that the + // document has changed shard keys and no longer belongs in the chunk being cloned. + // We will model the deletion of the preImage document so that the destination chunk + // does not receive an outdated version of this document. + + auto preImageShardKeyValues = + shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey); + fassert(6836101, !preImageShardKeyValues.isEmpty()); + + if (opType == repl::OpTypeEnum::kUpdate && + isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) { + opType = repl::OpTypeEnum::kDelete; + idElement = postImageDocKey["_id"]; + } else { + continue; + } } } @@ -409,7 +442,7 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) { } bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& doc) { - return isInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern); + return isDocInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern); } void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, @@ -428,7 +461,7 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx, return; } - if (!isInRange(insertedDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { + if (!isDocInRange(insertedDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { return; } @@ -463,13 +496,13 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx, return; } - if (!isInRange(postImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { + if (!isDocInRange(postImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { // If the preImageDoc is not in range but the postImageDoc was, we know that the document // has changed shard keys and no longer belongs in the chunk being cloned. We will model // the deletion of the preImage document so that the destination chunk does not receive an // outdated version of this document. if (preImageDoc && - isInRange(*preImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { + isDocInRange(*preImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { onDeleteOp(opCtx, *preImageDoc, opTime, prePostImageOpTime); } return; @@ -669,9 +702,20 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon auto nextRecordId = *iter; lk.unlock(); + ON_BLOCK_EXIT([&lk] { lk.lock(); }); Snapshotted<BSONObj> doc; if (collection->findDoc(opCtx, nextRecordId, &doc)) { + // Do not send documents that are no longer in the chunk range being moved. This can + // happen when document shard key value of the document changed after the initial + // index scan during cloning. This is needed because the destination is very + // conservative in processing xferMod deletes and won't delete docs that are not in + // the range of the chunk being migrated. + if (!isDocInRange( + doc.value(), _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) { + continue; + } + // Use the builder size instead of accumulating the document sizes directly so // that we take into consideration the overhead of BSONArray indices. if (arrBuilder->arrSize() && @@ -683,8 +727,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon arrBuilder->append(doc.value()); ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1); } - - lk.lock(); } _cloneLocs.erase(_cloneLocs.begin(), iter); |