path: root/jstests/sharding/out_stale_unique_key.js
diff options
Diffstat (limited to 'jstests/sharding/out_stale_unique_key.js')
1 files changed, 170 insertions, 0 deletions
diff --git a/jstests/sharding/out_stale_unique_key.js b/jstests/sharding/out_stale_unique_key.js
new file mode 100644
index 00000000000..d42fffa0b51
--- /dev/null
+++ b/jstests/sharding/out_stale_unique_key.js
@@ -0,0 +1,170 @@
+// Tests that an $out stage is able to default the uniqueKey to the correct value - even if one or
+// more of the involved nodes has a stale cache of the routing information.
+(function() {
+ "use strict";
+ const st = new ShardingTest({shards: 2, mongos: 2});
+ const dbName = "out_stale_unique_key";
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ const source = st.s0.getDB(dbName).source;
+ const target = st.s0.getDB(dbName).target;
+ // Test that an $out through a stale mongos can still use the correct uniqueKey and succeed.
+ (function testDefaultUniqueKeyIsRecent() {
+ const freshMongos = st.s0;
+ const staleMongos = st.s1;
+ // Set up two collections for an aggregate with an $out: The source collection will be
+ // unsharded and the target collection will be sharded amongst the two shards.
+ const staleMongosDB = staleMongos.getDB(dbName);
+ st.shardColl(source, {_id: 1}, {_id: 0}, {_id: 1});
+ (function setupStaleMongos() {
+ // Shard the collection through 'staleMongos', setting up 'staleMongos' to believe the
+ // collection is sharded by {sk: 1, _id: 1}.
+ assert.commandWorked(staleMongosDB.adminCommand(
+ {shardCollection: target.getFullName(), key: {sk: 1, _id: 1}}));
+ // Perform a query through that mongos to ensure the cache is populated.
+ assert.eq(0, staleMongosDB[target.getName()].find().itcount());
+ // Drop the collection from the other mongos - it is no longer sharded but the stale
+ // mongos doesn't know that yet.
+ target.drop();
+ }());
+ // At this point 'staleMongos' will believe that the target collection is sharded. This
+ // should not prevent it from running an $out without a uniqueKey specified. Specifically,
+ // the mongos should force a refresh of its cache before defaulting the uniqueKey.
+ assert.commandWorked(source.insert({_id: 'seed'}));
+ // If we had used the stale uniqueKey, this aggregation would fail since the documents do
+ // not have an 'sk' field.
+ assert.doesNotThrow(() => staleMongosDB[source.getName()].aggregate(
+ [{$out: {to: target.getName(), mode: 'insertDocuments'}}]));
+ assert.eq(target.find().toArray(), [{_id: 'seed'}]);
+ target.drop();
+ }());
+ // Test that if the collection is dropped and re-sharded during the course of the aggregation
+ // that the operation will fail rather than proceed with the old shard key.
+ function testEpochChangeDuringAgg({outSpec, failpoint, failpointData}) {
+ target.drop();
+ if (outSpec.hasOwnProperty("uniqueKey")) {
+ assert.commandWorked(target.createIndex(outSpec.uniqueKey, {unique: true}));
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: target.getFullName(), key: outSpec.uniqueKey}));
+ } else {
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: target.getFullName(), key: {sk: 1, _id: 1}}));
+ }
+ // Use a failpoint to make the query feeding into the aggregate hang while we drop the
+ // collection.
+ [st.rs0.getPrimary(), st.rs1.getPrimary()].forEach((mongod) => {
+ assert.commandWorked(mongod.adminCommand(
+ {configureFailPoint: failpoint, mode: "alwaysOn", data: failpointData || {}}));
+ });
+ let parallelShellJoiner;
+ try {
+ let parallelCode = `
+ const source = db.getSiblingDB("${dbName}").${source.getName()};
+ const error = assert.throws(() => source.aggregate([
+ {$addFields: {sk: "$_id"}},
+ {$out: ${tojsononeline(outSpec)}}
+ ]));
+ assert.eq(error.code, ErrorCodes.StaleEpoch);
+ `;
+ if (outSpec.hasOwnProperty("uniqueKey")) {
+ // If a user specifies their own uniqueKey, we don't need to fail an aggregation if
+ // the collection is dropped and recreated or the epoch otherwise changes. We are
+ // allowed to fail such an operation should we choose to in the future, but for now
+ // we don't expect to because we do not do anything special on mongos to ensure the
+ // catalog cache is up to date, so do not want to attach mongos's believed epoch to
+ // the command for the shards.
+ parallelCode = `
+ const source = db.getSiblingDB("${dbName}").${source.getName()};
+ assert.doesNotThrow(() => source.aggregate([
+ {$addFields: {sk: "$_id"}},
+ {$out: ${tojsononeline(outSpec)}}
+ ]));
+ `;
+ }
+ parallelShellJoiner = startParallelShell(parallelCode, st.s.port);
+ // Wait for the merging $out to appear in the currentOp output from the shards. We
+ // should see that the $out stage has an 'epoch' field serialized from the mongos.
+ const getAggOps = function() {
+ return st.s.getDB("admin")
+ .aggregate([
+ {$currentOp: {}},
+ {$match: {"cursor.originatingCommand.pipeline": {$exists: true}}}
+ ])
+ .toArray();
+ };
+ const hasOutRunning = function() {
+ return getAggOps()
+ .filter((op) => {
+ const pipeline = op.cursor.originatingCommand.pipeline;
+ return pipeline.length > 0 &&
+ pipeline[pipeline.length - 1].hasOwnProperty("$out");
+ })
+ .length >= 1;
+ };
+ assert.soon(hasOutRunning, () => tojson(getAggOps()));
+ // Drop the collection so that the epoch changes.
+ target.drop();
+ } finally {
+ [st.rs0.getPrimary(), st.rs1.getPrimary()].forEach((mongod) => {
+ assert.commandWorked(
+ mongod.adminCommand({configureFailPoint: failpoint, mode: "off"}));
+ });
+ }
+ parallelShellJoiner();
+ }
+ // Insert enough documents to force a yield.
+ const bulk = source.initializeUnorderedBulkOp();
+ for (let i = 0; i < 1000; ++i) {
+ bulk.insert({_id: i});
+ }
+ assert.commandWorked(bulk.execute());
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "insertDocuments"},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "insertDocuments", uniqueKey: {sk: 1}},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "replaceDocuments"},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "replaceDocuments", uniqueKey: {sk: 1}},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+ // Test with some different failpoints to prove we will detect an epoch change in the middle of
+ // the inserts or updates.
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "insertDocuments"},
+ failpoint: "hangDuringBatchInsert"
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "replaceDocuments"},
+ failpoint: "hangDuringBatchUpdate"
+ });
+ st.stop();