summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2022-11-01 19:52:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-16 19:08:59 +0000
commit051e9310e027a52591ae9100078c24efafebad47 (patch)
tree1b92a005c9254f41d59c1abe143f44131ae73a75
parent8d9f1726104f30d90961611c7da54c4cacf9199a (diff)
downloadmongo-051e9310e027a52591ae9100078c24efafebad47.tar.gz
SERVER-68361 Make migration properly handle cases when shard key value modification also results to changes in chunk membership
(cherry picked from commit e8ee517f32043af220c8db1a739c2d80b2cb0969)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/libs/chunk_manipulation_util.js3
-rw-r--r--jstests/sharding/prepare_transaction_then_migrate.js190
-rw-r--r--src/mongo/db/op_observer_impl.cpp7
-rw-r--r--src/mongo/db/repl/oplog_entry.h14
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp98
6 files changed, 233 insertions, 83 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index 1755b51386d..dff8fb855fe 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -86,6 +86,8 @@ last-continuous:
ticket: SERVER-68628
- test_file: jstests/sharding/move_chunk_interrupt_postimage.js
ticket: SERVER-68728
+ - test_file: jstests/sharding/prepare_transaction_then_migrate.js
+ ticket: SERVER-68361
suites:
change_streams_multiversion_passthrough: null
change_streams_sharded_collections_multiversion_passthrough: null
@@ -269,6 +271,8 @@ last-lts:
ticket: SERVER-68628
- test_file: jstests/sharding/move_chunk_interrupt_postimage.js
ticket: SERVER-68728
+ - test_file: jstests/sharding/prepare_transaction_then_migrate.js
+ ticket: SERVER-68361
suites:
change_streams_multiversion_passthrough: null
change_streams_sharded_collections_multiversion_passthrough: null
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js
index 8f6b05ebf5d..e2ebfcbfd0b 100644
--- a/jstests/libs/chunk_manipulation_util.js
+++ b/jstests/libs/chunk_manipulation_util.js
@@ -138,7 +138,8 @@ function waitForMoveChunkStep(shardConnection, stepNumber) {
assert.soon(function() {
var inProgressStr = '';
- let in_progress = admin.aggregate([{$currentOp: {'allUsers': true}}]);
+ let in_progress =
+ admin.aggregate([{$currentOp: {'allUsers': true, idleConnections: true}}]);
while (in_progress.hasNext()) {
let op = in_progress.next();
diff --git a/jstests/sharding/prepare_transaction_then_migrate.js b/jstests/sharding/prepare_transaction_then_migrate.js
index 034259d02be..36a9581752d 100644
--- a/jstests/sharding/prepare_transaction_then_migrate.js
+++ b/jstests/sharding/prepare_transaction_then_migrate.js
@@ -9,6 +9,7 @@
(function() {
"use strict";
load('jstests/libs/chunk_manipulation_util.js');
+load('jstests/sharding/libs/create_sharded_collection_util.js');
load('jstests/sharding/libs/sharded_transactions_helpers.js');
const dbName = "test";
@@ -16,58 +17,149 @@ const collName = "user";
const staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
-const st = new ShardingTest({shards: {rs0: {nodes: 1}, rs1: {nodes: 1}}});
-st.adminCommand({enableSharding: 'test'});
-st.ensurePrimaryShard('test', st.shard0.shardName);
-st.adminCommand({shardCollection: 'test.user', key: {_id: 1}});
+let runTest = function(withStepUp) {
+ const st = new ShardingTest({shards: {rs0: {nodes: withStepUp ? 2 : 1}, rs1: {nodes: 1}}});
+ const collection = st.s.getDB(dbName).getCollection(collName);
-const session = st.s.startSession({causalConsistency: false});
-const sessionDB = session.getDatabase(dbName);
-const sessionColl = sessionDB.getCollection(collName);
+ CreateShardedCollectionUtil.shardCollectionWithChunks(collection, {x: 1}, [
+ {min: {x: MinKey}, max: {x: 0}, shard: st.shard0.shardName},
+ {min: {x: 0}, max: {x: 1000}, shard: st.shard0.shardName},
+ {min: {x: 1000}, max: {x: MaxKey}, shard: st.shard1.shardName},
+ ]);
-assert.commandWorked(sessionColl.insert({_id: 1}));
+ assert.commandWorked(collection.insert([
+ {_id: 1, x: -1, note: "move into chunk range being migrated"},
+ {_id: 2, x: -2, note: "keep out of chunk range being migrated"},
+ {_id: 3, x: 50, note: "move out of chunk range being migrated"},
+ {_id: 4, x: 100, note: "keep in chunk range being migrated"},
+ ]));
-const lsid = {
- id: UUID()
+ const lsid = {id: UUID()};
+ const txnNumber = 0;
+ let stmtId = 0;
+
+ assert.commandWorked(st.s0.getDB(dbName).runCommand({
+ insert: collName,
+ documents: [
+ {_id: 5, x: -1.01, note: "move into chunk range being migrated"},
+ {_id: 6, x: -2.01, note: "keep out of chunk range being migrated"},
+ {_id: 7, x: 50.01, note: "move out of chunk range being migrated"},
+ {_id: 8, x: 100.01, note: "keep in chunk range being migrated"},
+ ],
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ stmtId: NumberInt(stmtId++),
+ startTransaction: true,
+ autocommit: false,
+ }));
+
+ assert.commandWorked(st.s.getDB(dbName).runCommand({
+ update: collName,
+ updates: [
+ {q: {x: -1}, u: {$set: {x: 5}}},
+ {q: {x: -2}, u: {$set: {x: -10}}},
+ {q: {x: 50}, u: {$set: {x: -20}}},
+ {q: {x: 100}, u: {$set: {x: 500}}},
+ {q: {x: -1.01}, u: {$set: {x: 5.01}}},
+ {q: {x: -2.01}, u: {$set: {x: -10.01}}},
+ {q: {x: 50.01}, u: {$set: {x: -20.01}}},
+ {q: {x: 100.01}, u: {$set: {x: 500.01}}},
+ ],
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ stmtId: NumberInt(stmtId++),
+ autocommit: false,
+ }));
+
+ const res = assert.commandWorked(st.shard0.getDB(dbName).adminCommand({
+ prepareTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ writeConcern: {w: "majority"},
+ }));
+
+ let prepareTimestamp = res.prepareTimestamp;
+
+ if (withStepUp) {
+ st.rs0.stepUp(st.rs0.getSecondary());
+ }
+
+ const joinMoveChunk =
+ moveChunkParallel(staticMongod, st.s.host, {x: 1}, null, 'test.user', st.shard1.shardName);
+
+ pauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
+
+ // The donor shard only ignores prepare conflicts while scanning over the shard key index. We
+ // wait for donor shard to have finished buffering the RecordIds into memory from scanning over
+ // the shard key index before committing the transaction. Notably, the donor shard doesn't
+ // ignore prepare conflicts when fetching the full contents of the documents during calls to
+ // _migrateClone.
+ //
+ // TODO: SERVER-71028 Remove comment after making changes.
+
+ waitForMoveChunkStep(st.shard0, moveChunkStepNames.startedMoveChunk);
+
+ assert.commandWorked(
+ st.shard0.getDB(dbName).adminCommand(Object.assign({
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ },
+ {commitTimestamp: prepareTimestamp})));
+
+ unpauseMigrateAtStep(st.shard1, migrateStepNames.catchup);
+
+ joinMoveChunk();
+
+ class ArrayCursor {
+ constructor(arr) {
+ this.i = 0;
+ this.arr = arr;
+ }
+
+ hasNext() {
+ return this.i < this.arr.length;
+ }
+
+ next() {
+ return this.arr[this.i++];
+ }
+ }
+
+ const expected = new ArrayCursor([
+ {_id: 1, x: 5, note: "move into chunk range being migrated"},
+ {_id: 2, x: -10, note: "keep out of chunk range being migrated"},
+ {_id: 3, x: -20, note: "move out of chunk range being migrated"},
+ {_id: 4, x: 500, note: "keep in chunk range being migrated"},
+ {_id: 5, x: 5.01, note: "move into chunk range being migrated"},
+ {_id: 6, x: -10.01, note: "keep out of chunk range being migrated"},
+ {_id: 7, x: -20.01, note: "move out of chunk range being migrated"},
+ {_id: 8, x: 500.01, note: "keep in chunk range being migrated"},
+ ]);
+
+ const diff = ((diff) => {
+ return {
+ docsWithDifferentContents: diff.docsWithDifferentContents.map(
+ ({first, second}) => ({expected: first, actual: second})),
+ docsExtraAfterMigration: diff.docsMissingOnFirst,
+ docsMissingAfterMigration: diff.docsMissingOnSecond,
+ };
+ })(DataConsistencyChecker.getDiff(expected, collection.find().sort({_id: 1, x: 1})));
+
+ assert.eq(diff, {
+ docsWithDifferentContents: [],
+ docsExtraAfterMigration: [],
+ docsMissingAfterMigration: [],
+ });
+
+ st.stop();
};
-const txnNumber = 0;
-const stmtId = 0;
-
-assert.commandWorked(st.s0.getDB(dbName).runCommand({
- insert: collName,
- documents: [{_id: 2}, {_id: 5}, {_id: 15}],
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(stmtId),
- startTransaction: true,
- autocommit: false,
-}));
-
-const res = assert.commandWorked(st.shard0.getDB(dbName).adminCommand({
- prepareTransaction: 1,
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- autocommit: false,
-}));
-
-const joinMoveChunk =
- moveChunkParallel(staticMongod, st.s.host, {_id: 1}, null, 'test.user', st.shard1.shardName);
-
-// Wait for catchup to verify that the migration has exited the clone phase.
-waitForMigrateStep(st.shard1, migrateStepNames.catchup);
-
-assert.commandWorked(st.shard0.getDB(dbName).adminCommand({
- commitTransaction: 1,
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- autocommit: false,
- commitTimestamp: res.prepareTimestamp,
-}));
-
-joinMoveChunk();
-
-assert.eq(sessionColl.find({_id: 2}).count(), 1);
-
-st.stop();
+
+runTest(false);
+// TODO: SERVER-71219 Enable test after fixing.
+// runTest(true);
+
MongoRunner.stopMongod(staticMongod);
})();
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 5421aeae099..9c514890321 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -673,6 +673,13 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
operation.setPreImage(args.updateArgs.preImageDoc->getOwned());
}
+ auto collectionDescription =
+ CollectionShardingState::get(opCtx, args.nss)->getCollectionDescription(opCtx);
+ if (collectionDescription.isSharded()) {
+ operation.setPostImageDocumentKey(
+ collectionDescription.extractDocumentKey(args.updateArgs.updatedDoc).getOwned());
+ }
+
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
MutableOplogEntry oplogEntry;
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 9d196e2c85d..30585b6befc 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -61,11 +61,13 @@ public:
o.parseProtected(ctxt, bsonObject);
return o;
}
- const BSONObj& getPreImageDocumentKey() const {
- return _preImageDocumentKey;
+
+ const BSONObj& getPostImageDocumentKey() const {
+ return _postImageDocumentKey;
}
- void setPreImageDocumentKey(BSONObj value) {
- _preImageDocumentKey = std::move(value);
+
+ void setPostImageDocumentKey(BSONObj value) {
+ _postImageDocumentKey = std::move(value);
}
const BSONObj& getPreImage() const {
@@ -77,7 +79,9 @@ public:
}
private:
- BSONObj _preImageDocumentKey;
+ // Stores the post image _id + shard key values.
+ BSONObj _postImageDocumentKey;
+
BSONObj _fullPreImage;
};
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 1da7e80b7b2..48a708f3b93 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -75,12 +75,22 @@ const Hours kMaxWaitToCommitCloneForJumboChunk(6);
MONGO_FAIL_POINT_DEFINE(failTooMuchMemoryUsed);
-bool isInRange(const BSONObj& obj,
- const BSONObj& min,
- const BSONObj& max,
- const ShardKeyPattern& shardKeyPattern) {
- BSONObj k = shardKeyPattern.extractShardKeyFromDoc(obj);
- return k.woCompare(min) >= 0 && k.woCompare(max) < 0;
+/**
+ * Returns true if the given BSON object in the shard key value pair format is within the given
+ * range.
+ */
+bool isShardKeyValueInRange(const BSONObj& shardKeyValue, const BSONObj& min, const BSONObj& max) {
+ return shardKeyValue.woCompare(min) >= 0 && shardKeyValue.woCompare(max) < 0;
+}
+
+/**
+ * Returns true if the given BSON document is within the given chunk range.
+ */
+bool isDocInRange(const BSONObj& obj,
+ const BSONObj& min,
+ const BSONObj& max,
+ const ShardKeyPattern& shardKeyPattern) {
+ return isShardKeyValueInRange(shardKeyPattern.extractShardKeyFromDoc(obj), min, max);
}
BSONObj createRequestWithSessionId(StringData commandName,
@@ -180,14 +190,13 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
auto cloner = dynamic_cast<MigrationChunkClonerSourceLegacy*>(msm->getCloner().get());
auto opType = stmt.getOpType();
- auto documentKey = getDocumentKeyFromReplOperation(stmt, opType);
+ auto preImageDocKey = getDocumentKeyFromReplOperation(stmt, opType);
- auto idElement = documentKey["_id"];
+ auto idElement = preImageDocKey["_id"];
if (idElement.eoo()) {
LOGV2_WARNING(21994,
- "Received a document without an _id field, ignoring: {documentKey}",
"Received a document without an _id and will ignore that document",
- "documentKey"_attr = redact(documentKey));
+ "documentKey"_attr = redact(preImageDocKey));
continue;
}
@@ -195,18 +204,42 @@ void LogTransactionOperationsForShardingHandler::commit(boost::optional<Timestam
auto const& maxKey = cloner->_args.getMaxKey();
auto const& shardKeyPattern = cloner->_shardKeyPattern;
- if (!isInRange(documentKey, minKey, maxKey, shardKeyPattern)) {
- // If the preImageDoc is not in range but the postImageDoc was, we know that the
- // document has changed shard keys and no longer belongs in the chunk being cloned.
- // We will model the deletion of the preImage document so that the destination chunk
- // does not receive an outdated version of this document.
- if (opType == repl::OpTypeEnum::kUpdate &&
- isInRange(stmt.getPreImageDocumentKey(), minKey, maxKey, shardKeyPattern) &&
- !stmt.getPreImageDocumentKey()["_id"].eoo()) {
- opType = repl::OpTypeEnum::kDelete;
- idElement = stmt.getPreImageDocumentKey()["id"];
- } else {
- continue;
+ // Note: This assumes that prepared transactions will always have post document key
+ // set. There is a small window where create collection coordinator releases the critical
+ // section and before it writes down the chunks for non-empty collections. So in theory,
+ // it is possible to have a prepared transaction while collection is unsharded
+ // and becomes sharded midway. This doesn't happen in practice because the only way to
+ // have a prepared transactions without being sharded is by directly connecting to the
+ // shards and manually preparing the transaction. Another exception is when transaction
+ // is prepared on an older version that doesn't set the post image document key.
+ auto postImageDocKey = stmt.getPostImageDocumentKey();
+ if (postImageDocKey.isEmpty()) {
+ LOGV2_WARNING(
+ 6836102,
+ "Migration encountered a transaction operation without a post image document key",
+ "preImageDocKey"_attr = preImageDocKey);
+ } else {
+ auto postShardKeyValues =
+ shardKeyPattern.extractShardKeyFromDocumentKey(postImageDocKey);
+ fassert(6836100, !postShardKeyValues.isEmpty());
+
+ if (!isShardKeyValueInRange(postShardKeyValues, minKey, maxKey)) {
+ // If the preImageDoc is not in range but the postImageDoc was, we know that the
+ // document has changed shard keys and no longer belongs in the chunk being cloned.
+ // We will model the deletion of the preImage document so that the destination chunk
+ // does not receive an outdated version of this document.
+
+ auto preImageShardKeyValues =
+ shardKeyPattern.extractShardKeyFromDocumentKey(preImageDocKey);
+ fassert(6836101, !preImageShardKeyValues.isEmpty());
+
+ if (opType == repl::OpTypeEnum::kUpdate &&
+ isShardKeyValueInRange(preImageShardKeyValues, minKey, maxKey)) {
+ opType = repl::OpTypeEnum::kDelete;
+ idElement = postImageDocKey["_id"];
+ } else {
+ continue;
+ }
}
}
@@ -409,7 +442,7 @@ void MigrationChunkClonerSourceLegacy::cancelClone(OperationContext* opCtx) {
}
bool MigrationChunkClonerSourceLegacy::isDocumentInMigratingChunk(const BSONObj& doc) {
- return isInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern);
+ return isDocInRange(doc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern);
}
void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
@@ -428,7 +461,7 @@ void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
return;
}
- if (!isInRange(insertedDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
+ if (!isDocInRange(insertedDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
return;
}
@@ -463,13 +496,13 @@ void MigrationChunkClonerSourceLegacy::onUpdateOp(OperationContext* opCtx,
return;
}
- if (!isInRange(postImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
+ if (!isDocInRange(postImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
// If the preImageDoc is not in range but the postImageDoc was, we know that the document
// has changed shard keys and no longer belongs in the chunk being cloned. We will model
// the deletion of the preImage document so that the destination chunk does not receive an
// outdated version of this document.
if (preImageDoc &&
- isInRange(*preImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
+ isDocInRange(*preImageDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
onDeleteOp(opCtx, *preImageDoc, opTime, prePostImageOpTime);
}
return;
@@ -669,9 +702,20 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
auto nextRecordId = *iter;
lk.unlock();
+ ON_BLOCK_EXIT([&lk] { lk.lock(); });
Snapshotted<BSONObj> doc;
if (collection->findDoc(opCtx, nextRecordId, &doc)) {
+ // Do not send documents that are no longer in the chunk range being moved. This can
+ // happen when document shard key value of the document changed after the initial
+ // index scan during cloning. This is needed because the destination is very
+ // conservative in processing xferMod deletes and won't delete docs that are not in
+ // the range of the chunk being migrated.
+ if (!isDocInRange(
+ doc.value(), _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
+ continue;
+ }
+
// Use the builder size instead of accumulating the document sizes directly so
// that we take into consideration the overhead of BSONArray indices.
if (arrBuilder->arrSize() &&
@@ -683,8 +727,6 @@ void MigrationChunkClonerSourceLegacy::_nextCloneBatchFromCloneLocs(OperationCon
arrBuilder->append(doc.value());
ShardingStatistics::get(opCtx).countDocsClonedOnDonor.addAndFetch(1);
}
-
- lk.lock();
}
_cloneLocs.erase(_cloneLocs.begin(), iter);