summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-03-29 10:51:58 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-31 15:02:41 +0000
commitb2696ab65cf20e5c1082c162990a381661530ec2 (patch)
treefcad5f967824027f703766c4faaedcabd669f548
parent49e9747069d526816310629f713f00f9934fcb5b (diff)
downloadmongo-b2696ab65cf20e5c1082c162990a381661530ec2.tar.gz
SERVER-47170 Get rid of Status returns from the NSTargeter API
Instead, use exceptions uniformly.
-rw-r--r--jstests/sharding/aggregates_during_balancing.js339
-rw-r--r--jstests/sharding/merge_stale_on_fields.js73
-rw-r--r--jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js6
-rw-r--r--src/mongo/db/exec/shard_filterer_impl.cpp3
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp1
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp51
-rw-r--r--src/mongo/s/ns_targeter.h39
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp30
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp8
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp7
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp318
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h47
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter_test.cpp157
-rw-r--r--src/mongo/s/write_ops/cluster_write.cpp55
-rw-r--r--src/mongo/s/write_ops/mock_ns_targeter.cpp32
-rw-r--r--src/mongo/s/write_ops/mock_ns_targeter.h48
-rw-r--r--src/mongo/s/write_ops/write_op.cpp60
-rw-r--r--src/mongo/s/write_ops/write_op.h6
-rw-r--r--src/mongo/s/write_ops/write_op_test.cpp35
19 files changed, 550 insertions, 765 deletions
diff --git a/jstests/sharding/aggregates_during_balancing.js b/jstests/sharding/aggregates_during_balancing.js
index 06db4cb6955..4b613e07d53 100644
--- a/jstests/sharding/aggregates_during_balancing.js
+++ b/jstests/sharding/aggregates_during_balancing.js
@@ -1,116 +1,119 @@
// Inserts some interesting data into a sharded collection, enables the balancer, and tests that
// various kinds of aggregations return the expected results.
(function() {
+'use strict';
+
load('jstests/aggregation/extras/utils.js');
-var shardedAggTest =
- new ShardingTest({shards: 2, mongos: 1, other: {chunkSize: 1, enableBalancer: true}});
+const shardedAggTest = new ShardingTest({shards: 2, mongos: 1});
-shardedAggTest.adminCommand({enablesharding: "aggShard"});
-db = shardedAggTest.getDB("aggShard");
+assert.commandWorked(shardedAggTest.s0.adminCommand({enablesharding: "aggShard"}));
shardedAggTest.ensurePrimaryShard('aggShard', shardedAggTest.shard0.shardName);
-db.ts1.drop();
-db.literal.drop();
-
-shardedAggTest.adminCommand({shardcollection: "aggShard.ts1", key: {"_id": 1}});
-shardedAggTest.adminCommand({shardcollection: "aggShard.literal", key: {"_id": 1}});
+const database = shardedAggTest.getDB("aggShard");
-/*
-Test combining results in mongos for operations that sub-aggregate on shards.
+assert.commandWorked(
+ shardedAggTest.s0.adminCommand({shardcollection: "aggShard.ts1", key: {"_id": 1}}));
-The unusual operators here are $avg, $pushToSet, $push. In the case of $avg,
-the shard pipeline produces an object with the current subtotal and item count
-so that these can be combined in mongos by totalling the subtotals counts
-before performing the final division. For $pushToSet and $push, the shard
-pipelines produce arrays, but in mongos these are combined rather than simply
-being added as arrays within arrays.
-*/
+// Test combining results in mongos for operations that sub-aggregate on shards.
+//
+// The unusual operators here are $avg, $pushToSet, $push. In the case of $avg, the shard pipeline
+// produces an object with the current subtotal and item count so that these can be combined in
+// mongos by totalling the subtotals counts before performing the final division. For $pushToSet and
+// $push, the shard pipelines produce arrays, but in mongos these are combined rather than simply
+// being added as arrays within arrays.
-var count = 0;
-var strings = [
+jsTestLog("Bulk inserting test data");
+const strings = [
"one", "two", "three", "four", "five", "six", "seven",
"eight", "nine", "ten", "eleven", "twelve", "thirteen", "fourteen",
"fifteen", "sixteen", "seventeen", "eighteen", "nineteen", "twenty"
];
-jsTestLog("Bulk inserting data");
-var nItems = 200000;
-var bulk = db.ts1.initializeUnorderedBulkOp();
-for (i = 0; i < nItems; ++i) {
- bulk.insert({
- _id: i,
- counter: ++count,
- number: strings[i % 20],
- random: Math.random(),
- filler: "0123456789012345678901234567890123456789"
- });
+const nItems = 200000;
+var bulk = database.ts1.initializeUnorderedBulkOp();
+for (var i = 0; i < nItems; ++i) {
+ bulk.insert({_id: i, counter: i + 1, number: strings[i % 20], random: Math.random()});
+
+ // Generate one chunk for each 1000 documents so the balancer is kept busy throughout the
+ // execution of the test
+ if (i % 1000 == 0) {
+ assert.commandWorked(shardedAggTest.splitAt("aggShard.ts1", {_id: i}));
+ }
}
assert.commandWorked(bulk.execute());
-jsTestLog('a project and group in shards, result combined in mongos');
-var a1 = db.ts1
- .aggregate([
- {$project: {cMod10: {$mod: ["$counter", 10]}, number: 1, counter: 1}},
- {
- $group: {
- _id: "$cMod10",
- numberSet: {$addToSet: "$number"},
- avgCounter: {$avg: "$cMod10"}
- }
- },
- {$sort: {_id: 1}}
- ])
- .toArray();
-
-for (i = 0; i < 10; ++i) {
- assert.eq(a1[i].avgCounter, a1[i]._id, 'agg sharded test avgCounter failed');
- assert.eq(a1[i].numberSet.length, 2, 'agg sharded test numberSet length failed');
-}
+jsTestLog("Bulk insert completed, starting balancer");
+shardedAggTest.startBalancer();
+shardedAggTest.awaitBalancerRound();
+
+(function testProjectAndGroup() {
+ jsTestLog('Testing project and group in shards, result combined in mongos');
+ var a1 = database.ts1
+ .aggregate([
+ {$project: {cMod10: {$mod: ["$counter", 10]}, number: 1, counter: 1}},
+ {
+ $group: {
+ _id: "$cMod10",
+ numberSet: {$addToSet: "$number"},
+ avgCounter: {$avg: "$cMod10"}
+ }
+ },
+ {$sort: {_id: 1}}
+ ])
+ .toArray();
+ assert.eq(a1.length, 10, tojson(a1));
+ for (var i = 0; i < 10; ++i) {
+ assert.eq(a1[i].avgCounter, a1[i]._id, 'agg sharded test avgCounter failed');
+ assert.eq(a1[i].numberSet.length, 2, 'agg sharded test numberSet length failed');
+ }
-jsTestLog('an initial group starts the group in the shards, and combines them in mongos');
-var a2 = db.ts1.aggregate([{$group: {_id: "all", total: {$sum: "$counter"}}}]).toArray();
+ jsTestLog('an initial group starts the group in the shards, and combines them in mongos');
+ var a2 = database.ts1.aggregate([{$group: {_id: "all", total: {$sum: "$counter"}}}]).toArray();
-jsTestLog('sum of an arithmetic progression S(n) = (n/2)(a(1) + a(n));');
-assert.eq(a2[0].total, (nItems / 2) * (1 + nItems), 'agg sharded test counter sum failed');
+ jsTestLog('sum of an arithmetic progression S(n) = (n/2)(a(1) + a(n));');
+ assert.eq(a2[0].total, (nItems / 2) * (1 + nItems), 'agg sharded test counter sum failed');
-jsTestLog('A group combining all documents into one, averaging a null field.');
-assert.eq(db.ts1.aggregate([{$group: {_id: null, avg: {$avg: "$missing"}}}]).toArray(),
- [{_id: null, avg: null}]);
+ jsTestLog('A group combining all documents into one, averaging a null field.');
+ assert.eq(database.ts1.aggregate([{$group: {_id: null, avg: {$avg: "$missing"}}}]).toArray(),
+ [{_id: null, avg: null}]);
-jsTestLog('an initial group starts the group in the shards, and combines them in mongos');
-var a3 =
- db.ts1.aggregate([{$group: {_id: "$number", total: {$sum: 1}}}, {$sort: {_id: 1}}]).toArray();
+ jsTestLog('an initial group starts the group in the shards, and combines them in mongos');
+ var a3 =
+ database.ts1.aggregate([{$group: {_id: "$number", total: {$sum: 1}}}, {$sort: {_id: 1}}])
+ .toArray();
-for (i = 0; i < strings.length; ++i) {
- assert.eq(a3[i].total, nItems / strings.length, 'agg sharded test sum numbers failed');
-}
+ for (var i = 0; i < strings.length; ++i) {
+ assert.eq(a3[i].total, nItems / strings.length, 'agg sharded test sum numbers failed');
+ }
-jsTestLog('a match takes place in the shards; just returning the results from mongos');
-var a4 = db.ts1
- .aggregate([{
- $match: {
- $or: [
- {counter: 55},
- {counter: 1111},
- {counter: 2222},
- {counter: 33333},
- {counter: 99999},
- {counter: 55555}
- ]
- }
- }])
- .toArray();
-assert.eq(a4.length, 6, tojson(a4));
-for (i = 0; i < 6; ++i) {
- c = a4[i].counter;
- printjson({c: c});
- assert((c == 55) || (c == 1111) || (c == 2222) || (c == 33333) || (c == 99999) || (c == 55555),
- 'agg sharded test simple match failed');
-}
+ jsTestLog('a match takes place in the shards; just returning the results from mongos');
+ var a4 = database.ts1
+ .aggregate([{
+ $match: {
+ $or: [
+ {counter: 55},
+ {counter: 1111},
+ {counter: 2222},
+ {counter: 33333},
+ {counter: 99999},
+ {counter: 55555}
+ ]
+ }
+ }])
+ .toArray();
+ assert.eq(a4.length, 6, tojson(a4));
+ for (var i = 0; i < 6; ++i) {
+ var c = a4[i].counter;
+ printjson({c: c});
+ assert(
+ (c == 55) || (c == 1111) || (c == 2222) || (c == 33333) || (c == 99999) || (c == 55555),
+ 'agg sharded test simple match failed');
+ }
+}());
function testSkipLimit(ops, expectedCount) {
- jsTestLog('testSkipLimit(' + tojson(ops) + ', ' + expectedCount + ')');
+ jsTestLog('Testing $skip with $limit: ' + tojson(ops) + ', ' + expectedCount);
if (expectedCount > 10) {
// make shard -> mongos intermediate results less than 16MB
ops.unshift({$project: {_id: 1}});
@@ -118,10 +121,9 @@ function testSkipLimit(ops, expectedCount) {
ops.push({$group: {_id: 1, count: {$sum: 1}}});
- var out = db.ts1.aggregate(ops).toArray();
+ var out = database.ts1.aggregate(ops).toArray();
assert.eq(out[0].count, expectedCount);
}
-
testSkipLimit([], nItems); // control
testSkipLimit([{$skip: 10}], nItems - 10);
testSkipLimit([{$limit: 10}], 10);
@@ -133,11 +135,11 @@ testSkipLimit([{$limit: 10}, {$skip: 5}, {$skip: 3}], 10 - 3 - 5);
// test sort + limit (using random to pull from both shards)
function testSortLimit(limit, direction) {
- jsTestLog('testSortLimit(' + limit + ', ' + direction + ')');
+ jsTestLog('Testing $sort with $limit: ' + limit + ', ' + direction);
var from_cursor =
- db.ts1.find({}, {random: 1, _id: 0}).sort({random: direction}).limit(limit).toArray();
+ database.ts1.find({}, {random: 1, _id: 0}).sort({random: direction}).limit(limit).toArray();
var from_agg =
- db.ts1
+ database.ts1
.aggregate(
[{$project: {random: 1, _id: 0}}, {$sort: {random: direction}}, {$limit: limit}])
.toArray();
@@ -150,11 +152,11 @@ testSortLimit(10, -1);
testSortLimit(100, 1);
testSortLimit(100, -1);
-function testAvgStdDev() {
- jsTestLog('testing $avg and $stdDevPop in sharded $group');
+(function testAvgStdDev() {
+ jsTestLog('Testing $avg and $stdDevPop in sharded $group');
// $stdDevPop can vary slightly between runs if a migration occurs. This is why we use
// assert.close below.
- var res = db.ts1
+ var res = database.ts1
.aggregate([{
$group: {
_id: null,
@@ -170,83 +172,82 @@ function testAvgStdDev() {
// http://en.wikipedia.org/wiki/Arithmetic_progression#Standard_deviation
var stdDev = Math.sqrt(((nItems - 1) * (nItems + 1)) / 12);
assert.close(res[0].stdDevPop, stdDev, '', 10 /*decimal places*/);
-}
-testAvgStdDev();
-
-function testSample() {
- jsTestLog('testing $sample');
- [0, 1, 10, nItems, nItems + 1].forEach(function(size) {
- // TODO: SERVER-29446 Remove this try catch block after completing SERVER-29446.
- let res = {};
- try {
- res = db.ts1.aggregate([{$sample: {size: size}}]).toArray();
- } catch (e) {
- assert.eq(e.code, 28799, e);
- return;
- }
- assert.eq(res.length, Math.min(nItems, size));
- });
-}
-
-testSample();
-
-jsTestLog('test $out by copying source collection verbatim to output');
-var outCollection = db.ts1_out;
-var res = db.ts1.aggregate([{$out: outCollection.getName()}]).toArray();
-assert.eq(db.ts1.find().itcount(), outCollection.find().itcount());
-assert.eq(db.ts1.find().sort({_id: 1}).toArray(), outCollection.find().sort({_id: 1}).toArray());
-
-// Make sure we error out if $out collection is sharded
-assert.commandFailed(
- db.runCommand({aggregate: outCollection.getName(), pipeline: [{$out: db.ts1.getName()}]}));
-
-assert.commandWorked(db.literal.save({dollar: false}));
-
-result =
- db.literal
- .aggregate([{
- $project: {_id: 0, cost: {$cond: ['$dollar', {$literal: '$1.00'}, {$literal: '$.99'}]}}
- }])
- .toArray();
-
-assert.eq([{cost: '$.99'}], result);
+}());
-(function() {
-jsTestLog('Testing a $match stage on the shard key.');
-
-var outCollection = 'testShardKeyMatchOut';
-
-// Point query.
-var targetId = Math.floor(nItems * Math.random());
-var pipeline = [{$match: {_id: targetId}}, {$project: {_id: 1}}, {$sort: {_id: 1}}];
-var expectedDocs = [{_id: targetId}];
-// Normal pipeline.
-assert.eq(db.ts1.aggregate(pipeline).toArray(), expectedDocs);
-// With $out.
-db[outCollection].drop();
-pipeline.push({$out: outCollection});
-db.ts1.aggregate(pipeline);
-assert.eq(db[outCollection].find().toArray(), expectedDocs);
-
-// Range query.
-var range = 500;
-var targetStart = Math.floor((nItems - range) * Math.random());
-pipeline = [
- {$match: {_id: {$gte: targetStart, $lt: targetStart + range}}},
- {$project: {_id: 1}},
- {$sort: {_id: 1}}
-];
-expectedDocs = [];
-for (var i = targetStart; i < targetStart + range; i++) {
- expectedDocs.push({_id: i});
+function testSample(size) {
+ jsTestLog('Testing $sample of size ' + size);
+ let result = database.ts1.aggregate([{$sample: {size: size}}]).toArray();
+ assert.eq(result.length, Math.min(nItems, size));
}
-// Normal pipeline.
-assert.eq(db.ts1.aggregate(pipeline).toArray(), expectedDocs);
-// With $out.
-db[outCollection].drop();
-pipeline.push({$out: outCollection});
-db.ts1.aggregate(pipeline);
-assert.eq(db[outCollection].find().toArray(), expectedDocs);
+testSample(0);
+testSample(1);
+testSample(10);
+testSample(nItems);
+testSample(nItems + 1);
+
+(function testOutWithCopy() {
+ jsTestLog('Testing $out by copying source collection verbatim to output');
+ assert.commandWorked(
+ shardedAggTest.s0.adminCommand({shardcollection: "aggShard.literal", key: {"_id": 1}}));
+
+ var outCollection = database.ts1_out;
+ assert.eq(database.ts1.aggregate([{$out: outCollection.getName()}]).toArray(), "");
+ assert.eq(database.ts1.find().itcount(), outCollection.find().itcount());
+ assert.eq(database.ts1.find().sort({_id: 1}).toArray(),
+ outCollection.find().sort({_id: 1}).toArray());
+
+ // Make sure we error out if $out collection is sharded
+ assert.commandFailed(database.runCommand(
+ {aggregate: outCollection.getName(), pipeline: [{$out: database.ts1.getName()}]}));
+
+ assert.commandWorked(database.literal.save({dollar: false}));
+
+ let result =
+ database.literal
+ .aggregate([{
+ $project:
+ {_id: 0, cost: {$cond: ['$dollar', {$literal: '$1.00'}, {$literal: '$.99'}]}}
+ }])
+ .toArray();
+ assert.eq([{cost: '$.99'}], result);
+}());
+
+(function testMatch() {
+ jsTestLog('Testing a $match stage on the shard key.');
+
+ var outCollection = 'testShardKeyMatchOut';
+
+ // Point query.
+ var targetId = Math.floor(nItems * Math.random());
+ var pipeline = [{$match: {_id: targetId}}, {$project: {_id: 1}}, {$sort: {_id: 1}}];
+ var expectedDocs = [{_id: targetId}];
+ // Normal pipeline.
+ assert.eq(database.ts1.aggregate(pipeline).toArray(), expectedDocs);
+ // With $out.
+ database[outCollection].drop();
+ pipeline.push({$out: outCollection});
+ database.ts1.aggregate(pipeline);
+ assert.eq(database[outCollection].find().toArray(), expectedDocs);
+
+ // Range query.
+ var range = 500;
+ var targetStart = Math.floor((nItems - range) * Math.random());
+ pipeline = [
+ {$match: {_id: {$gte: targetStart, $lt: targetStart + range}}},
+ {$project: {_id: 1}},
+ {$sort: {_id: 1}}
+ ];
+ expectedDocs = [];
+ for (var i = targetStart; i < targetStart + range; i++) {
+ expectedDocs.push({_id: i});
+ }
+ // Normal pipeline.
+ assert.eq(database.ts1.aggregate(pipeline).toArray(), expectedDocs);
+ // With $out.
+ database[outCollection].drop();
+ pipeline.push({$out: outCollection});
+ database.ts1.aggregate(pipeline);
+ assert.eq(database[outCollection].find().toArray(), expectedDocs);
}());
shardedAggTest.stop();
diff --git a/jstests/sharding/merge_stale_on_fields.js b/jstests/sharding/merge_stale_on_fields.js
index 87a48a0482c..59066b5b59d 100644
--- a/jstests/sharding/merge_stale_on_fields.js
+++ b/jstests/sharding/merge_stale_on_fields.js
@@ -8,42 +8,41 @@ load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode,
const st = new ShardingTest({shards: 2, mongos: 2});
const dbName = "merge_stale_unique_key";
-assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s0.adminCommand({enableSharding: dbName}));
const source = st.s0.getDB(dbName).source;
const target = st.s0.getDB(dbName).target;
+st.shardColl(source, {_id: 1} /* shardKey */, {_id: 0} /* splitAt */, {_id: 1} /* chunkToMove*/);
+assert.commandWorked(source.insert({_id: 'seed'}));
+
// Test that an $merge through a stale mongos can still use the correct "on" fields and succeed.
(function testDefaultOnFieldsIsRecent() {
const freshMongos = st.s0;
const staleMongos = st.s1;
-
- // Set up two collections for an aggregate with an $merge: The source collection will be
- // unsharded and the target collection will be sharded amongst the two shards.
const staleMongosDB = staleMongos.getDB(dbName);
- st.shardColl(source, {_id: 1}, {_id: 0}, {_id: 1});
(function setupStaleMongos() {
- // Shard the collection through 'staleMongos', setting up 'staleMongos' to believe the
- // collection is sharded by {sk: 1, _id: 1}.
+ // Shard the collection through 'staleMongos', setting it up to believe the collection is
+ // sharded by {sk: 1, _id: 1}.
assert.commandWorked(staleMongosDB.adminCommand(
{shardCollection: target.getFullName(), key: {sk: 1, _id: 1}}));
// Perform a query through that mongos to ensure the cache is populated.
assert.eq(0, staleMongosDB[target.getName()].find().itcount());
- // Drop the collection from the other mongos - it is no longer sharded but the stale
- // mongos doesn't know that yet.
+ // Drop the collection from the other mongos - it is no longer sharded but the stale mongos
+ // doesn't know that yet.
target.drop();
}());
- // At this point 'staleMongos' will believe that the target collection is sharded. This
- // should not prevent it from running an $merge without "on" fields specified.
+ // At this point 'staleMongos' will believe that the target collection is sharded. This should
+ // not prevent it from running an $merge without "on" fields specified.
+ //
// Specifically, the mongos should force a refresh of its cache before defaulting the "on"
// fields.
- assert.commandWorked(source.insert({_id: 'seed'}));
- // If we had used the stale "on" fields, this aggregation would fail since the documents do
- // not have an 'sk' field.
+ // If we had used the stale "on" fields, this aggregation would fail since the documents do not
+ // have an 'sk' field.
assert.doesNotThrow(
() => staleMongosDB[source.getName()].aggregate(
[{$merge: {into: target.getName(), whenMatched: 'fail', whenNotMatched: 'insert'}}]));
@@ -51,11 +50,11 @@ const target = st.s0.getDB(dbName).target;
target.drop();
}());
-// Test that if the collection is dropped and re-sharded during the course of the aggregation
-// that the operation will fail rather than proceed with the old shard key.
+// Test that if the collection is dropped and re-sharded during the course of the aggregation that
+// the operation will fail rather than proceed with the old shard key.
function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) {
- // Converts a single string or an array of strings into it's object spec form. For instance,
- // for input ["a", "b"] the returned object would be {a: 1, b: 1}.
+ // Converts a single string or an array of strings into it's object spec form. For instance, for
+ // input ["a", "b"] the returned object would be {a: 1, b: 1}.
function indexSpecFromOnFields(onFields) {
let spec = {};
if (typeof (onFields) == "string") {
@@ -68,15 +67,16 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) {
return spec;
}
+ // Drop the collection and reshard it with a different shard key
target.drop();
if (mergeSpec.hasOwnProperty('on')) {
assert.commandWorked(
target.createIndex(indexSpecFromOnFields(mergeSpec.on), {unique: true}));
- assert.commandWorked(st.s.adminCommand(
+ assert.commandWorked(st.s0.adminCommand(
{shardCollection: target.getFullName(), key: indexSpecFromOnFields(mergeSpec.on)}));
} else {
assert.commandWorked(
- st.s.adminCommand({shardCollection: target.getFullName(), key: {sk: 1, _id: 1}}));
+ st.s0.adminCommand({shardCollection: target.getFullName(), key: {sk: 1, _id: 1}}));
}
// Use a failpoint to make the query feeding into the aggregate hang while we drop the
@@ -85,6 +85,7 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) {
assert.commandWorked(mongod.adminCommand(
{configureFailPoint: failpoint, mode: "alwaysOn", data: failpointData || {}}));
});
+
let parallelShellJoiner;
try {
let parallelCode = `
@@ -97,12 +98,12 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) {
`;
if (mergeSpec.hasOwnProperty("on")) {
- // If a user specifies their own "on" fields, we don't need to fail an aggregation
- // if the collection is dropped and recreated or the epoch otherwise changes. We are
- // allowed to fail such an operation should we choose to in the future, but for now
- // we don't expect to because we do not do anything special on mongos to ensure the
- // catalog cache is up to date, so do not want to attach mongos's believed epoch to
- // the command for the shards.
+ // If a user specifies their own "on" fields, we don't need to fail an aggregation if
+ // the collection is dropped and recreated or the epoch otherwise changes. We are
+ // allowed to fail such an operation should we choose to in the future, but for now we
+ // don't expect to because we do not do anything special on mongos to ensure the catalog
+ // cache is up to date, so do not want to attach mongos's believed epoch to the command
+ // for the shards.
parallelCode = `
const source = db.getSiblingDB("${dbName}").${source.getName()};
assert.doesNotThrow(() => source.aggregate([
@@ -112,12 +113,12 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) {
`;
}
- parallelShellJoiner = startParallelShell(parallelCode, st.s.port);
+ parallelShellJoiner = startParallelShell(parallelCode, st.s0.port);
- // Wait for the merging $merge to appear in the currentOp output from the shards. We
- // should see that the $merge stage has an 'epoch' field serialized from the mongos.
+ // Wait for the merging $merge to appear in the currentOp output from the shards. We should
+ // see that the $merge stage has an 'epoch' field serialized from the mongos.
const getAggOps = function() {
- return st.s.getDB("admin")
+ return st.s0.getDB("admin")
.aggregate([
{$currentOp: {}},
{$match: {"cursor.originatingCommand.pipeline": {$exists: true}}}
@@ -135,7 +136,7 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) {
};
assert.soon(hasMergeRunning, () => tojson(getAggOps()));
- // Drop the collection so that the epoch changes.
+ // Drop the collection so that the epoch changes while the merge operation is executing
target.drop();
} finally {
[st.rs0.getPrimary(), st.rs1.getPrimary()].forEach((mongod) => {
@@ -153,9 +154,8 @@ for (let i = 0; i < 1000; ++i) {
assert.commandWorked(bulk.execute());
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
- // Skip the combination of merge modes which will fail depending on the contents of the
- // source and target collection, as this will cause a different assertion error from the one
- // expected.
+ // Skip the combination of merge modes which will fail depending on the contents of the source
+ // and target collection, as this will cause a different assertion error from the one expected.
if (whenNotMatchedMode == "fail")
return;
@@ -179,8 +179,9 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
failpointData: {namespace: source.getFullName()}
});
});
-// Test with some different failpoints to prove we will detect an epoch change in the middle
-// of the inserts or updates.
+
+// Test with some different failpoints to prove we will detect an epoch change in the middle of the
+// inserts or updates.
testEpochChangeDuringAgg({
mergeSpec: {into: target.getName(), whenMatched: "fail", whenNotMatched: "insert"},
failpoint: "hangDuringBatchInsert",
diff --git a/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js b/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js
index 700d47990f8..558045c328d 100644
--- a/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js
+++ b/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js
@@ -12,11 +12,11 @@
// TODO (SERVER-32198) remove after SERVER-32198 is fixed
TestData.skipCheckOrphans = true;
-var st = new ShardingTest({shards: 2, mongos: 2});
+const st = new ShardingTest({shards: 2, mongos: 2});
// Used to get the shard destination ids for the moveChunks commands
-var shard0Name = st.shard0.shardName;
-var shard1Name = st.shard1.shardName;
+const shard0Name = st.shard0.shardName;
+const shard1Name = st.shard1.shardName;
var database = 'TestDB';
st.enableSharding(database);
diff --git a/src/mongo/db/exec/shard_filterer_impl.cpp b/src/mongo/db/exec/shard_filterer_impl.cpp
index dbe4c057c7d..131cc8b40f4 100644
--- a/src/mongo/db/exec/shard_filterer_impl.cpp
+++ b/src/mongo/db/exec/shard_filterer_impl.cpp
@@ -29,11 +29,10 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/exec/filter.h"
#include "mongo/db/exec/shard_filterer_impl.h"
+#include "mongo/db/exec/filter.h"
#include "mongo/db/matcher/matchable.h"
-#include "mongo/db/s/scoped_collection_metadata.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 0a32967e3fe..188f8de3551 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/repl/read_concern_args.h"
-#include "mongo/db/s/scoped_collection_metadata.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/redaction.h"
#include "mongo/s/catalog_cache.h"
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 06cfbd0aaea..7154b659dc2 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -359,39 +359,29 @@ private:
*
* Does *not* retry or retarget if the metadata is stale.
*/
- static Status _commandOpWrite(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& command,
- BatchItemRef targetingBatchItem,
- std::vector<Strategy::CommandResult>* results) {
- // Note that this implementation will not handle targeting retries and does not completely
- // emulate write behavior
- ChunkManagerTargeter targeter(nss);
- Status status = targeter.init(opCtx);
- if (!status.isOK())
- return status;
-
- auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> {
+ static void _commandOpWrite(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& command,
+ BatchItemRef targetingBatchItem,
+ std::vector<Strategy::CommandResult>* results) {
+ auto endpoints = [&] {
+ // Note that this implementation will not handle targeting retries and does not
+ // completely emulate write behavior
+ ChunkManagerTargeter targeter(opCtx, nss);
+
if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) {
- auto swEndpoint = targeter.targetInsert(opCtx, targetingBatchItem.getDocument());
- if (!swEndpoint.isOK())
- return swEndpoint.getStatus();
- return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())};
+ return std::vector{targeter.targetInsert(opCtx, targetingBatchItem.getDocument())};
} else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) {
return targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate());
} else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) {
return targeter.targetDelete(opCtx, targetingBatchItem.getDelete());
- } else {
- MONGO_UNREACHABLE;
}
+ MONGO_UNREACHABLE;
}();
- if (!swEndpoints.isOK())
- return swEndpoints.getStatus();
-
// Assemble requests
std::vector<AsyncRequestsSender::Request> requests;
- for (const auto& endpoint : swEndpoints.getValue()) {
+ for (const auto& endpoint : endpoints) {
BSONObj cmdObjWithVersions = BSONObj(command);
if (endpoint.databaseVersion) {
cmdObjWithVersions =
@@ -412,17 +402,10 @@ private:
readPref,
Shard::RetryPolicy::kNoRetry);
- // Receive the responses.
-
- Status dispatchStatus = Status::OK();
while (!ars.done()) {
// Block until a response is available.
auto response = ars.next();
-
- if (!response.swResponse.isOK()) {
- dispatchStatus = std::move(response.swResponse.getStatus());
- break;
- }
+ uassertStatusOK(response.swResponse);
Strategy::CommandResult result;
@@ -435,8 +418,6 @@ private:
results->push_back(result);
}
-
- return dispatchStatus;
}
};
@@ -596,8 +577,8 @@ private:
// Target the command to the shards based on the singleton batch item.
BatchItemRef targetingBatchItem(&_batchedRequest, 0);
std::vector<Strategy::CommandResult> shardResults;
- uassertStatusOK(_commandOpWrite(
- opCtx, _batchedRequest.getNS(), explainCmd, targetingBatchItem, &shardResults));
+ _commandOpWrite(
+ opCtx, _batchedRequest.getNS(), explainCmd, targetingBatchItem, &shardResults);
auto bodyBuilder = result->getBodyBuilder();
uassertStatusOK(ClusterExplain::buildExplainResult(
opCtx, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), &bodyBuilder));
diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h
index c3f86a4ea29..36ce6269680 100644
--- a/src/mongo/s/ns_targeter.h
+++ b/src/mongo/s/ns_targeter.h
@@ -67,7 +67,7 @@ struct ShardEndpoint {
* 1a. On targeting failure we may need to refresh, note that it happened.
* 1b. On stale config from a child write operation we may need to refresh, note the error.
*
- * 2. RefreshIfNeeded() to get newer targeting information
+ * 2. refreshIfNeeded() to get newer targeting information
*
* 3. Goto 0.
*
@@ -78,8 +78,7 @@ struct ShardEndpoint {
* Implementers are free to define more specific targeting error codes to allow more complex
* error handling.
*
- * Interface must be externally synchronized if used in multiple threads, for now.
- * TODO: Determine if we should internally synchronize.
+ * The interface must not be used from multiple threads.
*/
class NSTargeter {
public:
@@ -91,36 +90,29 @@ public:
virtual const NamespaceString& getNS() const = 0;
/**
- * Returns a ShardEndpoint for a single document write.
- *
- * Returns !OK with message if document could not be targeted for other reasons.
+ * Returns a ShardEndpoint for a single document write or throws ShardKeyNotFound if 'doc' is
+ * malformed with respect to the shard key pattern of the collection.
*/
- virtual StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx,
- const BSONObj& doc) const = 0;
+ virtual ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const = 0;
/**
- * Returns a vector of ShardEndpoints for a potentially multi-shard update.
- *
- * Returns OK and fills the endpoints; returns a status describing the error otherwise.
+ * Returns a vector of ShardEndpoints for a potentially multi-shard update or throws
+ * ShardKeyNotFound if 'updateOp' misses a shard key, but the type of update requires it.
*/
- virtual StatusWith<std::vector<ShardEndpoint>> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const = 0;
+ virtual std::vector<ShardEndpoint> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const = 0;
/**
- * Returns a vector of ShardEndpoints for a potentially multi-shard delete.
- *
- * Returns OK and fills the endpoints; returns a status describing the error otherwise.
+ * Returns a vector of ShardEndpoints for a potentially multi-shard delete or throws
+ * ShardKeyNotFound if 'deleteOp' misses a shard key, but the type of delete requires it.
*/
- virtual StatusWith<std::vector<ShardEndpoint>> targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const = 0;
+ virtual std::vector<ShardEndpoint> targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const = 0;
/**
* Returns a vector of ShardEndpoints for all shards.
- *
- * Returns !OK with message if all shards could not be targeted.
*/
- virtual StatusWith<std::vector<ShardEndpoint>> targetAllShards(
- OperationContext* opCtx) const = 0;
+ virtual std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const = 0;
/**
* Informs the targeter that a targeting failure occurred during one of the last targeting
@@ -160,9 +152,8 @@ public:
* information used here was changed.
*
* NOTE: This function may block for shared resources or network calls.
- * Returns !OK with message if could not refresh
*/
- virtual Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) = 0;
+ virtual void refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) = 0;
/**
* Returns whether this write targets the config server. Invariants if the write targets the
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 0145cc9cdc4..d2d0ff46af9 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -403,8 +403,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
abortBatch = true;
// Throw when there is a transient transaction error since this should be a
- // top
- // level error and not just a write error.
+ // top level error and not just a write error.
if (isTransientTransactionError(status.code(), false, false)) {
uassertStatusOK(status);
}
@@ -429,19 +428,18 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
//
bool targeterChanged = false;
- Status refreshStatus = targeter.refreshIfNeeded(opCtx, &targeterChanged);
-
- LOGV2_DEBUG(22909,
- 4,
- "executeBatch targeter changed: {targeterChanged}",
- "targeterChanged"_attr = targeterChanged);
-
- if (!refreshStatus.isOK()) {
- // It's okay if we can't refresh, we'll just record errors for the ops if
- // needed.
+ try {
+ targeter.refreshIfNeeded(opCtx, &targeterChanged);
+ } catch (const ExceptionFor<ErrorCodes::StaleEpoch>& ex) {
+ batchOp.abortBatch(errorFromStatus(
+ ex.toStatus("collection was dropped in the middle of the operation")));
+ break;
+ } catch (const DBException& ex) {
+ // It's okay if we can't refresh, we'll just record errors for the ops if needed
LOGV2_WARNING(22911,
- "could not refresh targeter{causedBy_refreshStatus_reason}",
- "causedBy_refreshStatus_reason"_attr = causedBy(refreshStatus.reason()));
+ "Could not refresh targeter due to {error}",
+ "Could not refresh targeter",
+ "error"_attr = redact(ex));
}
//
@@ -468,8 +466,8 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
}
auto nShardsOwningChunks = batchOp.getNShardsOwningChunks();
- if (nShardsOwningChunks.is_initialized())
- stats->noteNumShardsOwningChunks(nShardsOwningChunks.get());
+ if (nShardsOwningChunks)
+ stats->noteNumShardsOwningChunks(*nShardsOwningChunks);
batchOp.buildClientResponse(clientResponse);
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 8853ca9ab94..39e3c1c609f 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -412,7 +412,7 @@ TEST_F(BatchWriteExecTest, StaleShardVersionReturnedFromBatchWithSingleMultiWrit
public:
using MockNSTargeter::MockNSTargeter;
- StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ std::vector<ShardEndpoint> targetUpdate(
OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
@@ -513,7 +513,7 @@ TEST_F(BatchWriteExecTest,
public:
using MockNSTargeter::MockNSTargeter;
- StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ std::vector<ShardEndpoint> targetUpdate(
OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
@@ -629,7 +629,7 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1Firs) {
public:
using MockNSTargeter::MockNSTargeter;
- StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ std::vector<ShardEndpoint> targetUpdate(
OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
@@ -756,7 +756,7 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOK
public:
using MockNSTargeter::MockNSTargeter;
- StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ std::vector<ShardEndpoint> targetUpdate(
OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
return std::vector<ShardEndpoint>{
ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 0f849bd3bc8..9d763c328a1 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -318,7 +318,12 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
OwnedPointerVector<TargetedWrite> writesOwned;
vector<TargetedWrite*>& writes = writesOwned.mutableVector();
- Status targetStatus = writeOp.targetWrites(_opCtx, targeter, &writes);
+ Status targetStatus = Status::OK();
+ try {
+ writeOp.targetWrites(_opCtx, targeter, &writes);
+ } catch (const DBException& ex) {
+ targetStatus = ex.toStatus();
+ }
if (!targetStatus.isOK()) {
WriteErrorDetail targetError;
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index ae05b9e3fff..c904bd7c829 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -71,13 +71,12 @@ ServerStatusMetricField<Counter64> updateOneOpStyleBroadcastWithExactIDStats(
* or
* coll.update({x: 1}, [{$addFields: {y: 2}}])
*/
-StatusWith<UpdateType> getUpdateExprType(const write_ops::UpdateOpEntry& updateDoc) {
- const auto updateMod = updateDoc.getU();
+UpdateType getUpdateExprType(const write_ops::UpdateOpEntry& updateDoc) {
+ const auto& updateMod = updateDoc.getU();
if (updateMod.type() == write_ops::UpdateModification::Type::kPipeline) {
return UpdateType::kOpStyle;
}
- // Obtain the update expression from the request.
const auto& updateExpr = updateMod.getUpdateClassic();
// Empty update is replacement-style by default.
@@ -90,18 +89,18 @@ StatusWith<UpdateType> getUpdateExprType(const write_ops::UpdateOpEntry& updateD
: UpdateType::kReplacement);
// If the current field's type does not match the existing updateType, abort.
- if (updateType != curFieldType && updateType != UpdateType::kUnknown) {
- return {ErrorCodes::UnsupportedFormat,
- str::stream() << "update document " << updateExpr
- << " has mixed $operator and non-$operator style fields"};
- }
- updateType = curFieldType;
- }
+ if (updateType == UpdateType::kUnknown)
+ updateType = curFieldType;
- if (updateType == UpdateType::kReplacement && updateDoc.getMulti()) {
- return {ErrorCodes::InvalidOptions, "Replacement-style updates cannot be {multi:true}"};
+ uassert(ErrorCodes::UnsupportedFormat,
+ str::stream() << "update document " << updateExpr
+ << " has mixed $operator and non-$operator style fields",
+ updateType == curFieldType);
}
+ uassert(ErrorCodes::InvalidOptions,
+ "Replacement-style updates cannot be {multi:true}",
+ updateType == UpdateType::kOpStyle || !updateDoc.getMulti());
return updateType;
}
@@ -113,29 +112,29 @@ StatusWith<UpdateType> getUpdateExprType(const write_ops::UpdateOpEntry& updateD
* generating the new document in the case of an upsert. It is therefore always correct to target
* the operation on the basis of the combined updateExpr and query.
*/
-StatusWith<BSONObj> getUpdateExprForTargeting(OperationContext* opCtx,
- const ShardKeyPattern& shardKeyPattern,
- const NamespaceString& nss,
- const UpdateType updateType,
- const write_ops::UpdateOpEntry& updateDoc) {
+BSONObj getUpdateExprForTargeting(OperationContext* opCtx,
+ const ShardKeyPattern& shardKeyPattern,
+ const NamespaceString& nss,
+ UpdateType updateType,
+ const write_ops::UpdateOpEntry& updateOp) {
// We should never see an invalid update type here.
invariant(updateType != UpdateType::kUnknown);
+ const auto& updateMod = updateOp.getU();
+
// If this is not a replacement update, then the update expression remains unchanged.
if (updateType != UpdateType::kReplacement) {
- const auto& updateMod = updateDoc.getU();
BSONObjBuilder objBuilder;
updateMod.serializeToBSON("u", &objBuilder);
return objBuilder.obj();
}
// Extract the raw update expression from the request.
- const auto& updateMod = updateDoc.getU();
invariant(updateMod.type() == write_ops::UpdateModification::Type::kClassic);
- auto updateExpr = updateMod.getUpdateClassic();
// Replace any non-existent shard key values with a null value.
- updateExpr = shardKeyPattern.emplaceMissingShardKeyValuesForDocument(updateExpr);
+ auto updateExpr =
+ shardKeyPattern.emplaceMissingShardKeyValuesForDocument(updateMod.getUpdateClassic());
// If we aren't missing _id, return the update expression as-is.
if (updateExpr.hasField(kIdFieldName)) {
@@ -146,10 +145,8 @@ StatusWith<BSONObj> getUpdateExprForTargeting(OperationContext* opCtx,
// This will guarantee that we can target a single shard, but it is not necessarily fatal if no
// exact _id can be found.
const auto idFromQuery =
- kVirtualIdShardKey.extractShardKeyFromQuery(opCtx, nss, updateDoc.getQ());
- if (!idFromQuery.isOK()) {
- return idFromQuery;
- } else if (auto idElt = idFromQuery.getValue()[kIdFieldName]) {
+ uassertStatusOK(kVirtualIdShardKey.extractShardKeyFromQuery(opCtx, nss, updateOp.getQ()));
+ if (auto idElt = idFromQuery[kIdFieldName]) {
updateExpr = updateExpr.addField(idElt);
}
@@ -205,6 +202,7 @@ bool isExactIdQuery(OperationContext* opCtx,
return cq.isOK() && isExactIdQuery(opCtx, *cq.getValue(), manager);
}
+
//
// Utilities to compare shard and db versions
//
@@ -351,24 +349,16 @@ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA,
} // namespace
-ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss,
+ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx,
+ const NamespaceString& nss,
boost::optional<OID> targetEpoch)
- : _nss(nss), _needsTargetingRefresh(false), _targetEpoch(targetEpoch) {}
-
-
-Status ChunkManagerTargeter::init(OperationContext* opCtx) {
- try {
- createShardDatabase(opCtx, _nss.db());
- } catch (const DBException& e) {
- return e.toStatus();
- }
-
- const auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, _nss);
- if (!routingInfoStatus.isOK()) {
- return routingInfoStatus.getStatus();
- }
+ : _nss(nss), _needsTargetingRefresh(false), _targetEpoch(std::move(targetEpoch)) {
+ _init(opCtx);
+}
- _routingInfo = std::move(routingInfoStatus.getValue());
+void ChunkManagerTargeter::_init(OperationContext* opCtx) {
+ createShardDatabase(opCtx, _nss.db());
+ _routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss));
if (_targetEpoch) {
uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm());
@@ -376,22 +366,20 @@ Status ChunkManagerTargeter::init(OperationContext* opCtx) {
"Collection epoch has changed",
_routingInfo->cm()->getVersion().epoch() == *_targetEpoch);
}
-
- return Status::OK();
}
const NamespaceString& ChunkManagerTargeter::getNS() const {
return _nss;
}
-StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
- const BSONObj& doc) const {
+ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx,
+ const BSONObj& doc) const {
BSONObj shardKey;
if (_routingInfo->cm()) {
shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc);
- // The shard key would only be empty after extraction if we encountered an error case,
- // such as the shard key possessing an array value or array descendants. If the shard key
+ // The shard key would only be empty after extraction if we encountered an error case, such
+ // as the shard key possessing an array value or array descendants. If the shard key
// presented to the targeter was empty, we would emplace the missing fields, and the
// extracted key here would *not* be empty.
uassert(ErrorCodes::ShardKeyNotFound,
@@ -401,24 +389,17 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* o
// Target the shard key or database primary
if (!shardKey.isEmpty()) {
- return _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize());
- } else {
- if (!_routingInfo->db().primary()) {
- return Status(ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target insert in collection " << getNS().ns()
- << "; no metadata found");
- }
-
- return ShardEndpoint(_routingInfo->db().primary()->getId(),
- ChunkVersion::UNSHARDED(),
- _routingInfo->db().databaseVersion());
+ return uassertStatusOK(
+ _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()));
}
- return Status::OK();
+ return ShardEndpoint(_routingInfo->db().primary()->getId(),
+ ChunkVersion::UNSHARDED(),
+ _routingInfo->db().databaseVersion());
}
-StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const {
+std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const {
// If the update is replacement-style:
// 1. Attempt to target using the query. If this fails, AND the query targets more than one
// shard,
@@ -432,173 +413,132 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate(
// as if the the shard key values are specified as NULL. A replacement document is also allowed
// to have a missing '_id', and if the '_id' exists in the query, it will be emplaced in the
// replacement document for targeting purposes.
- const auto updateType = getUpdateExprType(updateDoc);
- if (!updateType.isOK()) {
- return updateType.getStatus();
- }
+ const auto updateType = getUpdateExprType(updateOp);
// If the collection is not sharded, forward the update to the primary shard.
if (!_routingInfo->cm()) {
- if (!_routingInfo->db().primary()) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target update on " << getNS().ns()
- << "; no metadata found"};
- }
return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
ChunkVersion::UNSHARDED(),
_routingInfo->db().databaseVersion()}};
}
const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern();
- const auto collation = write_ops::collationOf(updateDoc);
+ const auto collation = write_ops::collationOf(updateOp);
- const auto updateExpr = getUpdateExprForTargeting(
- opCtx, shardKeyPattern, getNS(), updateType.getValue(), updateDoc);
- const bool isUpsert = updateDoc.getUpsert();
- const auto query = updateDoc.getQ();
- if (!updateExpr.isOK()) {
- return updateExpr.getStatus();
- }
+ const auto updateExpr =
+ getUpdateExprForTargeting(opCtx, shardKeyPattern, _nss, updateType, updateOp);
+ const bool isUpsert = updateOp.getUpsert();
+ const auto query = updateOp.getQ();
// Utility function to target an update by shard key, and to handle any potential error results.
- const auto targetByShardKey = [&collation,
- this](StatusWith<BSONObj> shardKey,
- StringData msg) -> StatusWith<std::vector<ShardEndpoint>> {
- if (!shardKey.isOK()) {
- return shardKey.getStatus().withContext(msg);
- }
- if (shardKey.getValue().isEmpty()) {
- return {ErrorCodes::ShardKeyNotFound,
- str::stream() << msg << " :: could not extract exact shard key"};
- }
- auto swEndPoint = _targetShardKey(shardKey.getValue(), collation, 0);
- if (!swEndPoint.isOK()) {
- return swEndPoint.getStatus().withContext(msg);
- }
- return {{swEndPoint.getValue()}};
+ auto targetByShardKey = [this, &collation](StatusWith<BSONObj> swShardKey, std::string msg) {
+ const auto& shardKey = uassertStatusOKWithContext(std::move(swShardKey), msg);
+ uassert(ErrorCodes::ShardKeyNotFound,
+ str::stream() << msg << " :: could not extract exact shard key",
+ !shardKey.isEmpty());
+ return std::vector{
+ uassertStatusOKWithContext(_targetShardKey(shardKey, collation, 0), msg)};
};
// If this is an upsert, then the query must contain an exact match on the shard key. If we were
// to target based on the replacement doc, it could result in an insertion even if a document
// matching the query exists on another shard.
if (isUpsert) {
- return targetByShardKey(shardKeyPattern.extractShardKeyFromQuery(opCtx, getNS(), query),
+ return targetByShardKey(shardKeyPattern.extractShardKeyFromQuery(opCtx, _nss, query),
"Failed to target upsert by query");
}
// We first try to target based on the update's query. It is always valid to forward any update
// or upsert to a single shard, so return immediately if we are able to target a single shard.
- auto shardEndPoints = _targetQuery(opCtx, query, collation);
- if (!shardEndPoints.isOK() || shardEndPoints.getValue().size() == 1) {
- return shardEndPoints;
+ auto endPoints = uassertStatusOK(_targetQuery(opCtx, query, collation));
+ if (endPoints.size() == 1) {
+ return endPoints;
}
// Replacement-style updates must always target a single shard. If we were unable to do so using
// the query, we attempt to extract the shard key from the replacement and target based on it.
if (updateType == UpdateType::kReplacement) {
- return targetByShardKey(shardKeyPattern.extractShardKeyFromDoc(updateExpr.getValue()),
+ return targetByShardKey(shardKeyPattern.extractShardKeyFromDoc(updateExpr),
"Failed to target update by replacement document");
}
// If we are here then this is an op-style update and we were not able to target a single shard.
// Non-multi updates must target a single shard or an exact _id.
- if (!updateDoc.getMulti() &&
- !isExactIdQuery(opCtx, getNS(), query, collation, _routingInfo->cm().get())) {
- return {ErrorCodes::InvalidOptions,
- str::stream() << "A {multi:false} update on a sharded collection must either "
- "contain an exact match on _id or must target a single shard but "
- "this update targeted _id (and have the collection default "
- "collation) or must target a single shard (and have the simple "
- "collation), but this update targeted "
- << shardEndPoints.getValue().size()
- << " shards. Update request: " << updateDoc.toBSON()
- << ", shard key pattern: " << shardKeyPattern.toString()};
- }
+ uassert(
+ ErrorCodes::InvalidOptions,
+ str::stream() << "A {multi:false} update on a sharded collection must either contain an "
+ "exact match on _id or must target a single shard, but this update "
+ "targeted _id (and have the collection default collation) or must target "
+ "a single shard (and have the simple collation), but this update targeted "
+ << endPoints.size() << " shards. Update request: " << updateOp.toBSON()
+ << ", shard key pattern: " << shardKeyPattern.toString(),
+ updateOp.getMulti() ||
+ isExactIdQuery(opCtx, _nss, query, collation, _routingInfo->cm().get()));
// If the request is {multi:false}, then this is a single op-style update which we are
// broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics.
- if (!updateDoc.getMulti()) {
+ if (!updateOp.getMulti()) {
updateOneOpStyleBroadcastWithExactIDCount.increment(1);
}
- return shardEndPoints;
+
+ return endPoints;
}
-StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const {
+std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const {
BSONObj shardKey;
if (_routingInfo->cm()) {
- //
// Sharded collections have the following further requirements for targeting:
//
// Limit-1 deletes must be targeted exactly by shard key *or* exact _id
- //
-
- // Get the shard key
- StatusWith<BSONObj> status =
- _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(
- opCtx, getNS(), deleteDoc.getQ());
-
- // Bad query
- if (!status.isOK())
- return status.getStatus();
-
- shardKey = status.getValue();
+ shardKey =
+ uassertStatusOK(_routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery(
+ opCtx, _nss, deleteOp.getQ()));
}
- const auto collation = write_ops::collationOf(deleteDoc);
+ const auto collation = write_ops::collationOf(deleteOp);
// Target the shard key or delete query
if (!shardKey.isEmpty()) {
auto swEndpoint = _targetShardKey(shardKey, collation, 0);
if (swEndpoint.isOK()) {
- return {{swEndpoint.getValue()}};
+ return std::vector{std::move(swEndpoint.getValue())};
}
}
// We failed to target a single shard.
// Parse delete query.
- auto qr = std::make_unique<QueryRequest>(getNS());
- qr->setFilter(deleteDoc.getQ());
+ auto qr = std::make_unique<QueryRequest>(_nss);
+ qr->setFilter(deleteOp.getQ());
if (!collation.isEmpty()) {
qr->setCollation(collation);
}
const boost::intrusive_ptr<ExpressionContext> expCtx;
- auto cq = CanonicalQuery::canonicalize(opCtx,
- std::move(qr),
- expCtx,
- ExtensionsCallbackNoop(),
- MatchExpressionParser::kAllowAllSpecialFeatures);
- if (!cq.isOK()) {
- return cq.getStatus().withContext(str::stream()
- << "Could not parse delete query " << deleteDoc.getQ());
- }
+ auto cq = uassertStatusOKWithContext(
+ CanonicalQuery::canonicalize(opCtx,
+ std::move(qr),
+ expCtx,
+ ExtensionsCallbackNoop(),
+ MatchExpressionParser::kAllowAllSpecialFeatures),
+ str::stream() << "Could not parse delete query " << deleteOp.getQ());
// Single deletes must target a single shard or be exact-ID.
- if (_routingInfo->cm() && !deleteDoc.getMulti() &&
- !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) {
- return Status(ErrorCodes::ShardKeyNotFound,
- str::stream()
- << "A single delete on a sharded collection must contain an exact "
- "match on _id (and have the collection default collation) or "
- "contain the shard key (and have the simple collation). Delete "
- "request: "
- << deleteDoc.toBSON() << ", shard key pattern: "
- << _routingInfo->cm()->getShardKeyPattern().toString());
- }
-
- return _targetQuery(opCtx, deleteDoc.getQ(), collation);
+ uassert(ErrorCodes::ShardKeyNotFound,
+ str::stream() << "A single delete on a sharded collection must contain an exact match "
+ "on _id (and have the collection default collation) or contain the "
+ "shard key (and have the simple collation). Delete request: "
+ << deleteOp.toBSON() << ", shard key pattern: "
+ << _routingInfo->cm()->getShardKeyPattern().toString(),
+ !_routingInfo->cm() || deleteOp.getMulti() ||
+ isExactIdQuery(opCtx, *cq, _routingInfo->cm().get()));
+
+ return uassertStatusOK(_targetQuery(opCtx, deleteOp.getQ(), collation));
}
StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery(
OperationContext* opCtx, const BSONObj& query, const BSONObj& collation) const {
- if (!_routingInfo->db().primary() && !_routingInfo->cm()) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target query in " << getNS().ns()
- << "; no metadata found"};
- }
-
if (!_routingInfo->cm()) {
return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(),
ChunkVersion::UNSHARDED(),
@@ -632,21 +572,14 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s
MONGO_UNREACHABLE;
}
-StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards(
- OperationContext* opCtx) const {
- if (!_routingInfo->db().primary() && !_routingInfo->cm()) {
- return {ErrorCodes::NamespaceNotFound,
- str::stream() << "could not target every shard with versions for " << getNS().ns()
- << "; metadata not found"};
- }
-
- std::vector<ShardId> shardIds;
- Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
-
+std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const {
// This function is only called if doing a multi write that targets more than one shard. This
// implies the collection is sharded, so we should always have a chunk manager.
invariant(_routingInfo->cm());
+ std::vector<ShardId> shardIds;
+ Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
+
std::vector<ShardEndpoint> endpoints;
for (auto&& shardId : shardIds) {
endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId));
@@ -724,7 +657,7 @@ void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint,
_remoteDbVersion = remoteDbVersion;
}
-Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) {
+void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) {
bool dummy;
if (!wasChanged) {
wasChanged = &dummy;
@@ -740,14 +673,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
"ChunkManagerTargeter checking if refresh is needed",
"needsTargetingRefresh"_attr = _needsTargetingRefresh,
"hasRemoteShardVersions"_attr = !_remoteShardVersions.empty(),
- "hasRemoteDbVersion"_attr = static_cast<bool>(_remoteDbVersion));
+ "hasRemoteDbVersion"_attr = bool{_remoteDbVersion});
//
// Did we have any stale config or targeting errors at all?
//
if (!_needsTargetingRefresh && _remoteShardVersions.empty() && !_remoteDbVersion) {
- return Status::OK();
+ return;
}
//
@@ -757,10 +690,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
auto lastManager = _routingInfo->cm();
auto lastDbVersion = _routingInfo->db().databaseVersion();
- auto initStatus = init(opCtx);
- if (!initStatus.isOK()) {
- return initStatus;
- }
+ _init(opCtx);
// We now have the latest metadata from the cache.
@@ -782,12 +712,12 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
// If didn't already refresh the targeting information, refresh it
if (!alreadyRefreshed) {
// To match previous behavior, we just need an incremental refresh here
- return _refreshShardVersionNow(opCtx);
+ _refreshShardVersionNow(opCtx);
+ return;
}
*wasChanged = isMetadataDifferent(
lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
- return Status::OK();
} else if (!_remoteShardVersions.empty()) {
// If we got stale shard versions from remote shards, we may need to refresh
// NOTE: Not sure yet if this can happen simultaneously with targeting issues
@@ -805,12 +735,12 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
if (result == CompareResult_Unknown || result == CompareResult_LT) {
// Our current shard versions aren't all comparable to the old versions, maybe drop
- return _refreshShardVersionNow(opCtx);
+ _refreshShardVersionNow(opCtx);
+ return;
}
*wasChanged = isMetadataDifferent(
lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
- return Status::OK();
} else if (_remoteDbVersion) {
// If we got stale database versions from the remote shard, we may need to refresh
// NOTE: Not sure yet if this can happen simultaneously with targeting issues
@@ -829,23 +759,16 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC
if (result == CompareResult_Unknown || result == CompareResult_LT) {
// Our current db version isn't always comparable to the old version, it may have been
// dropped
- return _refreshDbVersionNow(opCtx);
+ _refreshDbVersionNow(opCtx);
+ return;
}
*wasChanged = isMetadataDifferent(
lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion());
- return Status::OK();
}
-
- MONGO_UNREACHABLE;
}
bool ChunkManagerTargeter::endpointIsConfigServer() const {
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << "could not verify full range of " << getNS().ns()
- << "; metadata not found",
- _routingInfo->db().primary() || _routingInfo->cm());
-
if (!_routingInfo->cm()) {
return _routingInfo->db().primaryId() == ShardRegistry::kConfigServerShardId;
}
@@ -872,21 +795,18 @@ int ChunkManagerTargeter::getNShardsOwningChunks() const {
return 0;
}
-Status ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) {
- auto routingInfoStatus =
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss, true);
- if (!routingInfoStatus.isOK()) {
- return routingInfoStatus.getStatus();
- }
+void ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) {
+ uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss, true));
- return init(opCtx);
+ _init(opCtx);
}
-Status ChunkManagerTargeter::_refreshDbVersionNow(OperationContext* opCtx) {
+void ChunkManagerTargeter::_refreshDbVersionNow(OperationContext* opCtx) {
Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(
_nss.db(), std::move(_routingInfo->db().databaseVersion()));
- return init(opCtx);
+ _init(opCtx);
}
} // namespace mongo
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index fa215756f0b..d280d6383b9 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
@@ -60,37 +60,28 @@ public:
enum class UpdateType { kReplacement, kOpStyle, kUnknown };
/**
- * If 'targetEpoch' is not boost::none, throws a 'StaleEpoch' exception if the collection given
- * by 'nss' is ever found to not have the target epoch.
- */
- ChunkManagerTargeter(const NamespaceString& nss,
- boost::optional<OID> targetEpoch = boost::none);
-
- /**
- * Initializes the ChunkManagerTargeter with the latest targeting information for the
- * namespace. May need to block and load information from a remote config server.
+ * Initializes the targeter with the latest routing information for the namespace, which means
+ * it may have to block and load information from the config server.
*
- * Throws a 'StaleEpoch' exception if the collection targeted has an epoch which does not match
- * '_targetEpoch'
- * Returns !OK if the information could not be initialized.
+ * If 'expectedEpoch' is specified, the targeter will throws 'StaleEpoch' exception if the epoch
+ * for 'nss' ever becomes different from 'expectedEpoch'. Otherwise, the targeter will continue
+ * targeting even if the collection gets dropped and recreated.
*/
- Status init(OperationContext* opCtx);
+ ChunkManagerTargeter(OperationContext* opCtx,
+ const NamespaceString& nss,
+ boost::optional<OID> expectedEpoch = boost::none);
const NamespaceString& getNS() const override;
- // Returns ShardKeyNotFound if document does not have a full shard key.
- StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx,
- const BSONObj& doc) const override;
+ ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const override;
- // Returns ShardKeyNotFound if the update can't be targeted without a shard key.
- StatusWith<std::vector<ShardEndpoint>> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override;
+ std::vector<ShardEndpoint> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override;
- // Returns ShardKeyNotFound if the delete can't be targeted without a shard key.
- StatusWith<std::vector<ShardEndpoint>> targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const override;
+ std::vector<ShardEndpoint> targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override;
- StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override;
+ std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override;
void noteCouldNotTarget() override;
@@ -109,22 +100,24 @@ public:
*
* Also see NSTargeter::refreshIfNeeded().
*/
- Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override;
+ void refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override;
- virtual bool endpointIsConfigServer() const override;
+ bool endpointIsConfigServer() const override;
int getNShardsOwningChunks() const override;
private:
+ void _init(OperationContext* opCtx);
+
/**
* Performs an actual refresh from the config server.
*/
- Status _refreshShardVersionNow(OperationContext* opCtx);
+ void _refreshShardVersionNow(OperationContext* opCtx);
/**
* Performs an actual refresh from the config server.
*/
- Status _refreshDbVersionNow(OperationContext* opCtx);
+ void _refreshDbVersionNow(OperationContext* opCtx);
/**
* Returns a vector of ShardEndpoints for a potentially multi-shard query.
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp
index 541b8b30011..7da11f654f6 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp
@@ -65,9 +65,7 @@ public:
ChunkManagerTargeter prepare(BSONObj shardKeyPattern, const std::vector<BSONObj>& splitPoints) {
chunkManager =
makeChunkManager(kNss, ShardKeyPattern(shardKeyPattern), nullptr, false, splitPoints);
- ChunkManagerTargeter cmTargeter(kNss);
- auto status = cmTargeter.init(operationContext());
- return cmTargeter;
+ return ChunkManagerTargeter(operationContext(), kNss);
}
std::shared_ptr<ChunkManager> chunkManager;
};
@@ -83,29 +81,24 @@ TEST_F(ChunkManagerTargeterTest, TargetInsertWithRangePrefixHashedShardKey) {
splitPoints);
auto res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: -111}, c: {d: '1'}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "1");
+ ASSERT_EQUALS(res.shardName, "1");
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: -10}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "2");
+ ASSERT_EQUALS(res.shardName, "2");
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: 4}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "3");
+ ASSERT_EQUALS(res.shardName, "3");
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 1000}, c: null, d: {}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "4");
+ ASSERT_EQUALS(res.shardName, "4");
// Missing field will be treated as null and will be targeted to the chunk which holds null,
// which is shard '1'.
res = cmTargeter.targetInsert(operationContext(), BSONObj());
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "1");
+ ASSERT_EQUALS(res.shardName, "1");
+
res = cmTargeter.targetInsert(operationContext(), BSON("a" << 10));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "1");
+ ASSERT_EQUALS(res.shardName, "1");
// Arrays along shard key path are not allowed.
ASSERT_THROWS_CODE(cmTargeter.targetInsert(operationContext(), fromjson("{a: [1,2]}")),
@@ -131,14 +124,13 @@ TEST_F(ChunkManagerTargeterTest, TargetInsertsWithVaryingHashedPrefixAndConstant
for (int i = 0; i < 1000; i++) {
auto insertObj = BSON("a" << BSON("b" << i) << "c" << BSON("d" << 10));
- const auto res = cmTargeter.targetInsert(operationContext(), insertObj);
- ASSERT_OK(res.getStatus());
+ auto res = cmTargeter.targetInsert(operationContext(), insertObj);
// Verify that the given document is being routed based on hashed value of 'i'.
auto chunk = chunkManager->findIntersectingChunkWithSimpleCollation(
BSON("a.b" << BSONElementHasher::hash64(insertObj["a"]["b"],
BSONElementHasher::DEFAULT_HASH_SEED)));
- ASSERT_EQUALS(res.getValue().shardName, chunk.getShardId());
+ ASSERT_EQUALS(res.shardName, chunk.getShardId());
}
// Arrays along shard key path are not allowed.
@@ -168,29 +160,24 @@ TEST_F(ChunkManagerTargeterTest, TargetInsertsWithConstantHashedPrefixAndVarying
splitPoints);
auto res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: -111}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "1");
+ ASSERT_EQUALS(res.shardName, "1");
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: -11}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "2");
+ ASSERT_EQUALS(res.shardName, "2");
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: 0}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "3");
+ ASSERT_EQUALS(res.shardName, "3");
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: 111}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "4");
+ ASSERT_EQUALS(res.shardName, "4");
// Missing field will be treated as null and will be targeted to the chunk which holds null,
// which is shard '1'.
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "1");
+ ASSERT_EQUALS(res.shardName, "1");
+
res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}}, c: 5}"));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().shardName, "1");
+ ASSERT_EQUALS(res.shardName, "1");
}
TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) {
@@ -207,9 +194,8 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) {
auto res = cmTargeter.targetUpdate(
operationContext(),
buildUpdate(fromjson("{'a.b': {$gt : 2}}"), fromjson("{a: {b: -1}}"), false));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "2");
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "2");
// When update targets using query.
res = cmTargeter.targetUpdate(
@@ -217,47 +203,44 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) {
buildUpdate(fromjson("{$and: [{'a.b': {$gte : 0}}, {'a.b': {$lt: 99}}]}}"),
fromjson("{$set: {p : 1}}"),
false));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "3");
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "3");
res = cmTargeter.targetUpdate(
operationContext(),
buildUpdate(fromjson("{'a.b': {$lt : -101}}"), fromjson("{a: {b: 111}}"), false));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "1");
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "1");
// For op-style updates, query on _id gets targeted to all shards.
res = cmTargeter.targetUpdate(
operationContext(), buildUpdate(fromjson("{_id: 1}"), fromjson("{$set: {p: 111}}"), false));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 5);
+ ASSERT_EQUALS(res.size(), 5);
// For replacement style updates, query on _id uses replacement doc to target. If the
// replacement doc doesn't have shard key fields, then update should be routed to the shard
// holding 'null' shard key documents.
res = cmTargeter.targetUpdate(operationContext(),
buildUpdate(fromjson("{_id: 1}"), fromjson("{p: 111}}"), false));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "1");
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "1");
// Upsert requires full shard key in query, even if the query can target a single shard.
- res = cmTargeter.targetUpdate(operationContext(),
- buildUpdate(fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"),
- fromjson("{a: {b: -111}}"),
- true));
- ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound);
+ ASSERT_THROWS_CODE(
+ cmTargeter.targetUpdate(operationContext(),
+ buildUpdate(fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"),
+ fromjson("{a: {b: -111}}"),
+ true)),
+ DBException,
+ ErrorCodes::ShardKeyNotFound);
// Upsert success case.
res = cmTargeter.targetUpdate(
operationContext(),
buildUpdate(fromjson("{'a.b': 100, 'c.d': 'val'}"), fromjson("{a: {b: -111}}"), true));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "4");
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "4");
}
TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) {
@@ -282,10 +265,8 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) {
// 'updateQueryObj'.
const auto res = cmTargeter.targetUpdate(
operationContext(), buildUpdate(updateQueryObj, fromjson("{$set: {p: 1}}"), false));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName,
- findChunk(updateQueryObj["a"]["b"]).getShardId());
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, findChunk(updateQueryObj["a"]["b"]).getShardId());
}
// Range queries on hashed field cannot be used for targeting. In this case, update will be
@@ -293,13 +274,14 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) {
const auto updateObj = fromjson("{a: {b: -1}}");
auto res = cmTargeter.targetUpdate(
operationContext(), buildUpdate(fromjson("{'a.b': {$gt : 101}}"), updateObj, false));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, findChunk(updateObj["a"]["b"]).getShardId());
- res = cmTargeter.targetUpdate(
- operationContext(),
- buildUpdate(fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false));
- ASSERT_EQUALS(res.getStatus(), ErrorCodes::InvalidOptions);
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, findChunk(updateObj["a"]["b"]).getShardId());
+ ASSERT_THROWS_CODE(
+ cmTargeter.targetUpdate(
+ operationContext(),
+ buildUpdate(fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false)),
+ DBException,
+ ErrorCodes::InvalidOptions);
}
TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) {
@@ -313,33 +295,32 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) {
splitPoints);
// Cannot delete without full shardkey in the query.
- auto res =
- cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 2}}")));
- ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound);
- res = cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': -101}")));
- ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound);
+ ASSERT_THROWS_CODE(
+ cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 2}}"))),
+ DBException,
+ ErrorCodes::ShardKeyNotFound);
+ ASSERT_THROWS_CODE(
+ cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': -101}"))),
+ DBException,
+ ErrorCodes::ShardKeyNotFound);
// Delete targeted correctly with full shard key in query.
- res = cmTargeter.targetDelete(operationContext(),
- buildDelete(fromjson("{'a.b': -101, 'c.d': 5}")));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "1");
-
- // Query with MinKey value should go to chunk '0' because MinKey is smaller than
- // BSONNULL.
+ auto res = cmTargeter.targetDelete(operationContext(),
+ buildDelete(fromjson("{'a.b': -101, 'c.d': 5}")));
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "1");
+
+ // Query with MinKey value should go to chunk '0' because MinKey is smaller than BSONNULL.
res = cmTargeter.targetDelete(
operationContext(),
buildDelete(BSONObjBuilder().appendMinKey("a.b").append("c.d", 4).obj()));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "0");
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "0");
res =
cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': 0, 'c.d': 5}")));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, "3");
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, "3");
}
TEST_F(ChunkManagerTargeterTest, TargetDeleteWithHashedPrefixHashedShardKey) {
@@ -362,15 +343,15 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithHashedPrefixHashedShardKey) {
// Verify that the given document is being routed based on hashed value of 'i' in
// 'queryObj'.
const auto res = cmTargeter.targetDelete(operationContext(), buildDelete(queryObj));
- ASSERT_OK(res.getStatus());
- ASSERT_EQUALS(res.getValue().size(), 1);
- ASSERT_EQUALS(res.getValue()[0].shardName, findChunk(queryObj["a"]["b"]).getShardId());
+ ASSERT_EQUALS(res.size(), 1);
+ ASSERT_EQUALS(res[0].shardName, findChunk(queryObj["a"]["b"]).getShardId());
}
// Range queries on hashed field cannot be used for targeting.
- auto res =
- cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 101}}")));
- ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound);
+ ASSERT_THROWS_CODE(
+ cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 101}}"))),
+ DBException,
+ ErrorCodes::ShardKeyNotFound);
}
} // namespace
diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp
index 5d544dcfd9d..f3c4b97f8d6 100644
--- a/src/mongo/s/write_ops/cluster_write.cpp
+++ b/src/mongo/s/write_ops/cluster_write.cpp
@@ -33,73 +33,22 @@
#include "mongo/s/write_ops/cluster_write.h"
-#include <algorithm>
-
-#include "mongo/base/status.h"
-#include "mongo/client/connpool.h"
-#include "mongo/client/dbclient_cursor.h"
#include "mongo/db/lasterror.h"
-#include "mongo/db/write_concern_options.h"
-#include "mongo/s/balancer_configuration.h"
-#include "mongo/s/catalog/type_collection.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/chunk_writes_tracker.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/config_server_client.h"
#include "mongo/s/grid.h"
-#include "mongo/s/shard_util.h"
#include "mongo/s/write_ops/chunk_manager_targeter.h"
-#include "mongo/util/str.h"
namespace mongo {
-namespace {
-
-void toBatchError(const Status& status, BatchedCommandResponse* response) {
- response->clear();
- response->setStatus(status);
- dassert(response->isValid(nullptr));
-}
-
-} // namespace
void ClusterWriter::write(OperationContext* opCtx,
const BatchedCommandRequest& request,
BatchWriteExecStats* stats,
BatchedCommandResponse* response,
boost::optional<OID> targetEpoch) {
- const NamespaceString& nss = request.getNS();
-
LastError::Disabled disableLastError(&LastError::get(opCtx->getClient()));
- // Config writes and shard writes are done differently
- if (nss.db() == NamespaceString::kAdminDb) {
- Grid::get(opCtx)->catalogClient()->writeConfigServerDirect(opCtx, request, response);
- return;
- }
-
- ChunkManagerTargeter targeter(request.getNS(), targetEpoch);
-
- Status targetInitStatus = targeter.init(opCtx);
- if (!targetInitStatus.isOK()) {
- toBatchError(targetInitStatus.withContext(
- str::stream()
- << "unable to initialize targeter for write op for collection "
- << request.getNS().ns()),
- response);
- return;
- }
-
- bool endpointIsConfigServer;
- try {
- endpointIsConfigServer = targeter.endpointIsConfigServer();
- } catch (DBException& ex) {
- toBatchError(ex.toStatus(str::stream()
- << "unable to target write op for collection " << request.getNS()),
- response);
- return;
- }
+ ChunkManagerTargeter targeter(opCtx, request.getNS(), targetEpoch);
- if (endpointIsConfigServer) {
+ if (targeter.endpointIsConfigServer()) {
Grid::get(opCtx)->catalogClient()->writeConfigServerDirect(opCtx, request, response);
return;
}
diff --git a/src/mongo/s/write_ops/mock_ns_targeter.cpp b/src/mongo/s/write_ops/mock_ns_targeter.cpp
index b23c088f27a..15496780487 100644
--- a/src/mongo/s/write_ops/mock_ns_targeter.cpp
+++ b/src/mongo/s/write_ops/mock_ns_targeter.cpp
@@ -32,14 +32,9 @@
#include "mongo/s/write_ops/mock_ns_targeter.h"
namespace mongo {
+namespace {
-MockNSTargeter::MockNSTargeter(const NamespaceString& nss, std::vector<MockRange> mockRanges)
- : _nss(nss), _mockRanges(std::move(mockRanges)) {
- ASSERT(_nss.isValid());
- ASSERT(!_mockRanges.empty());
-}
-
-ChunkRange MockNSTargeter::_parseRange(const BSONObj& query) {
+ChunkRange parseRange(const BSONObj& query) {
const StringData fieldName(query.firstElement().fieldName());
if (query.firstElement().isNumber()) {
@@ -63,6 +58,29 @@ ChunkRange MockNSTargeter::_parseRange(const BSONObj& query) {
MONGO_UNREACHABLE;
}
+} // namespace
+
+MockNSTargeter::MockNSTargeter(const NamespaceString& nss, std::vector<MockRange> mockRanges)
+ : _nss(nss), _mockRanges(std::move(mockRanges)) {
+ ASSERT(_nss.isValid());
+ ASSERT(!_mockRanges.empty());
+}
+
+std::vector<ShardEndpoint> MockNSTargeter::_targetQuery(const BSONObj& query) const {
+ const ChunkRange queryRange(parseRange(query));
+
+ std::vector<ShardEndpoint> endpoints;
+
+ for (const auto& range : _mockRanges) {
+ if (queryRange.overlapWith(range.range)) {
+ endpoints.push_back(range.endpoint);
+ }
+ }
+
+ uassert(ErrorCodes::UnknownError, "no mock ranges found for query", !endpoints.empty());
+ return endpoints;
+}
+
void assertEndpointsEqual(const ShardEndpoint& endpointA, const ShardEndpoint& endpointB) {
ASSERT_EQUALS(endpointA.shardName, endpointB.shardName);
ASSERT_EQUALS(endpointA.shardVersion.toLong(), endpointB.shardVersion.toLong());
diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h
index 1a308e19ce0..ec6fb892834 100644
--- a/src/mongo/s/write_ops/mock_ns_targeter.h
+++ b/src/mongo/s/write_ops/mock_ns_targeter.h
@@ -64,35 +64,31 @@ public:
/**
* Returns a ShardEndpoint for the doc from the mock ranges
*/
- StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx,
- const BSONObj& doc) const override {
- auto swEndpoints = _targetQuery(doc);
- if (!swEndpoints.isOK())
- return swEndpoints.getStatus();
-
- ASSERT_EQ(1U, swEndpoints.getValue().size());
- return swEndpoints.getValue().front();
+ ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const override {
+ auto endpoints = _targetQuery(doc);
+ ASSERT_EQ(1U, endpoints.size());
+ return endpoints.front();
}
/**
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- StatusWith<std::vector<ShardEndpoint>> targetUpdate(
- OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
- return _targetQuery(updateDoc.getQ());
+ std::vector<ShardEndpoint> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override {
+ return _targetQuery(updateOp.getQ());
}
/**
* Returns the first ShardEndpoint for the query from the mock ranges. Only can handle
* queries of the form { field : { $gte : <value>, $lt : <value> } }.
*/
- StatusWith<std::vector<ShardEndpoint>> targetDelete(
- OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const override {
- return _targetQuery(deleteDoc.getQ());
+ std::vector<ShardEndpoint> targetDelete(
+ OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override {
+ return _targetQuery(deleteOp.getQ());
}
- StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override {
+ std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override {
std::vector<ShardEndpoint> endpoints;
for (const auto& range : _mockRanges) {
endpoints.push_back(range.endpoint);
@@ -115,11 +111,10 @@ public:
// No-op
}
- Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override {
+ void refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override {
// No-op
if (wasChanged)
*wasChanged = false;
- return Status::OK();
}
bool endpointIsConfigServer() const override {
@@ -133,28 +128,11 @@ public:
}
private:
- static ChunkRange _parseRange(const BSONObj& query);
-
/**
* Returns the first ShardEndpoint for the query from the mock ranges. Only handles queries of
* the form { field : { $gte : <value>, $lt : <value> } }.
*/
- StatusWith<std::vector<ShardEndpoint>> _targetQuery(const BSONObj& query) const {
- const ChunkRange queryRange(_parseRange(query));
-
- std::vector<ShardEndpoint> endpoints;
-
- for (const auto& range : _mockRanges) {
- if (queryRange.overlapWith(range.range)) {
- endpoints.push_back(range.endpoint);
- }
- }
-
- if (endpoints.empty())
- return {ErrorCodes::UnknownError, "no mock ranges found for query"};
-
- return endpoints;
- }
+ std::vector<ShardEndpoint> _targetQuery(const BSONObj& query) const;
NamespaceString _nss;
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index ca2407843d3..c7efb0b3c7b 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -52,23 +52,18 @@ const WriteErrorDetail& WriteOp::getOpError() const {
return *_error;
}
-Status WriteOp::targetWrites(OperationContext* opCtx,
- const NSTargeter& targeter,
- std::vector<TargetedWrite*>* targetedWrites) {
- auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> {
+void WriteOp::targetWrites(OperationContext* opCtx,
+ const NSTargeter& targeter,
+ std::vector<TargetedWrite*>* targetedWrites) {
+ auto endpoints = [&] {
if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) {
- auto swEndpoint = targeter.targetInsert(opCtx, _itemRef.getDocument());
- if (!swEndpoint.isOK())
- return swEndpoint.getStatus();
-
- return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())};
+ return std::vector{targeter.targetInsert(opCtx, _itemRef.getDocument())};
} else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) {
return targeter.targetUpdate(opCtx, _itemRef.getUpdate());
} else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) {
return targeter.targetDelete(opCtx, _itemRef.getDelete());
- } else {
- MONGO_UNREACHABLE;
}
+ MONGO_UNREACHABLE;
}();
// Unless executing as part of a transaction, if we're targeting more than one endpoint with an
@@ -77,40 +72,33 @@ Status WriteOp::targetWrites(OperationContext* opCtx,
// NOTE: Index inserts are currently specially targeted only at the current collection to avoid
// creating collections everywhere.
const bool inTransaction = bool(TransactionRouter::get(opCtx));
- if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !inTransaction) {
- swEndpoints = targeter.targetAllShards(opCtx);
+ if (endpoints.size() > 1u && !inTransaction) {
+ endpoints = targeter.targetAllShards(opCtx);
}
- // If we had an error, stop here
- if (!swEndpoints.isOK())
- return swEndpoints.getStatus();
-
- auto& endpoints = swEndpoints.getValue();
-
for (auto&& endpoint : endpoints) {
- // if the operation was already successfull on that shard, there is no need to repeat the
- // write
- if (!_successfulShardSet.count(endpoint.shardName)) {
- _childOps.emplace_back(this);
-
- WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
-
- // Outside of a transaction, multiple endpoints currently imply no versioning, since we
- // can't retry half a regular multi-write.
- if (endpoints.size() > 1u && !inTransaction) {
- endpoint.shardVersion = ChunkVersion::IGNORED();
- endpoint.shardVersion.canThrowSSVOnIgnored();
- }
+ // If the operation was already successfull on that shard, do not repeat it
+ if (_successfulShardSet.count(endpoint.shardName))
+ continue;
- targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref));
+ _childOps.emplace_back(this);
- _childOps.back().pendingWrite = targetedWrites->back();
- _childOps.back().state = WriteOpState_Pending;
+ WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
+
+ // Outside of a transaction, multiple endpoints currently imply no versioning, since we
+ // can't retry half a regular multi-write.
+ if (endpoints.size() > 1u && !inTransaction) {
+ endpoint.shardVersion = ChunkVersion::IGNORED();
+ endpoint.shardVersion.canThrowSSVOnIgnored();
}
+
+ targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref));
+
+ _childOps.back().pendingWrite = targetedWrites->back();
+ _childOps.back().state = WriteOpState_Pending;
}
_state = WriteOpState_Pending;
- return Status::OK();
}
size_t WriteOp::getNumTargeted() {
diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h
index ca959dc783f..138b466a556 100644
--- a/src/mongo/s/write_ops/write_op.h
+++ b/src/mongo/s/write_ops/write_op.h
@@ -119,9 +119,9 @@ public:
* Returns !OK if the targeting process itself fails
* (no TargetedWrites will be added, state unchanged)
*/
- Status targetWrites(OperationContext* opCtx,
- const NSTargeter& targeter,
- std::vector<TargetedWrite*>* targetedWrites);
+ void targetWrites(OperationContext* opCtx,
+ const NSTargeter& targeter,
+ std::vector<TargetedWrite*>* targetedWrites);
/**
* Returns the number of child writes that were last targeted.
diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp
index fee63d63f09..193ab8fe943 100644
--- a/src/mongo/s/write_ops/write_op_test.cpp
+++ b/src/mongo/s/write_ops/write_op_test.cpp
@@ -113,9 +113,7 @@ TEST_F(WriteOpTest, TargetSingle) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 1u);
assertEndpointsEqual(targeted.front()->endpoint, endpoint);
@@ -147,9 +145,7 @@ TEST_F(WriteOpTest, TargetMultiOneShard) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 1u);
assertEndpointsEqual(targeted.front()->endpoint, endpointA);
@@ -182,9 +178,7 @@ TEST_F(WriteOpTest, TargetMultiAllShards) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 3u);
sortByEndpoint(&targeted);
@@ -222,9 +216,7 @@ TEST_F(WriteOpTest, TargetMultiAllShardsAndErrorSingleChildOp) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 2u);
sortByEndpoint(&targeted);
@@ -266,9 +258,7 @@ TEST_F(WriteOpTest, ErrorSingle) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 1u);
assertEndpointsEqual(targeted.front()->endpoint, endpoint);
@@ -302,9 +292,7 @@ TEST_F(WriteOpTest, CancelSingle) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 1u);
assertEndpointsEqual(targeted.front()->endpoint, endpoint);
@@ -337,9 +325,7 @@ TEST_F(WriteOpTest, RetrySingleOp) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 1u);
assertEndpointsEqual(targeted.front()->endpoint, endpoint);
@@ -382,10 +368,9 @@ TEST_F(WriteOpTransactionTest, TargetMultiDoesNotTargetAllShards) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
// The write should only target shardA and shardB and send real shard versions to each.
- ASSERT(status.isOK());
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 2u);
sortByEndpoint(&targeted);
@@ -425,9 +410,7 @@ TEST_F(WriteOpTransactionTest, TargetMultiAllShardsAndErrorSingleChildOp) {
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
- Status status = writeOp.targetWrites(_opCtx, targeter, &targeted);
-
- ASSERT(status.isOK());
+ writeOp.targetWrites(_opCtx, targeter, &targeted);
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
ASSERT_EQUALS(targeted.size(), 2u);
sortByEndpoint(&targeted);