diff options
19 files changed, 550 insertions, 765 deletions
diff --git a/jstests/sharding/aggregates_during_balancing.js b/jstests/sharding/aggregates_during_balancing.js index 06db4cb6955..4b613e07d53 100644 --- a/jstests/sharding/aggregates_during_balancing.js +++ b/jstests/sharding/aggregates_during_balancing.js @@ -1,116 +1,119 @@ // Inserts some interesting data into a sharded collection, enables the balancer, and tests that // various kinds of aggregations return the expected results. (function() { +'use strict'; + load('jstests/aggregation/extras/utils.js'); -var shardedAggTest = - new ShardingTest({shards: 2, mongos: 1, other: {chunkSize: 1, enableBalancer: true}}); +const shardedAggTest = new ShardingTest({shards: 2, mongos: 1}); -shardedAggTest.adminCommand({enablesharding: "aggShard"}); -db = shardedAggTest.getDB("aggShard"); +assert.commandWorked(shardedAggTest.s0.adminCommand({enablesharding: "aggShard"})); shardedAggTest.ensurePrimaryShard('aggShard', shardedAggTest.shard0.shardName); -db.ts1.drop(); -db.literal.drop(); - -shardedAggTest.adminCommand({shardcollection: "aggShard.ts1", key: {"_id": 1}}); -shardedAggTest.adminCommand({shardcollection: "aggShard.literal", key: {"_id": 1}}); +const database = shardedAggTest.getDB("aggShard"); -/* -Test combining results in mongos for operations that sub-aggregate on shards. +assert.commandWorked( + shardedAggTest.s0.adminCommand({shardcollection: "aggShard.ts1", key: {"_id": 1}})); -The unusual operators here are $avg, $pushToSet, $push. In the case of $avg, -the shard pipeline produces an object with the current subtotal and item count -so that these can be combined in mongos by totalling the subtotals counts -before performing the final division. For $pushToSet and $push, the shard -pipelines produce arrays, but in mongos these are combined rather than simply -being added as arrays within arrays. -*/ +// Test combining results in mongos for operations that sub-aggregate on shards. +// +// The unusual operators here are $avg, $pushToSet, $push. In the case of $avg, the shard pipeline +// produces an object with the current subtotal and item count so that these can be combined in +// mongos by totalling the subtotals counts before performing the final division. For $pushToSet and +// $push, the shard pipelines produce arrays, but in mongos these are combined rather than simply +// being added as arrays within arrays. -var count = 0; -var strings = [ +jsTestLog("Bulk inserting test data"); +const strings = [ "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve", "thirteen", "fourteen", "fifteen", "sixteen", "seventeen", "eighteen", "nineteen", "twenty" ]; -jsTestLog("Bulk inserting data"); -var nItems = 200000; -var bulk = db.ts1.initializeUnorderedBulkOp(); -for (i = 0; i < nItems; ++i) { - bulk.insert({ - _id: i, - counter: ++count, - number: strings[i % 20], - random: Math.random(), - filler: "0123456789012345678901234567890123456789" - }); +const nItems = 200000; +var bulk = database.ts1.initializeUnorderedBulkOp(); +for (var i = 0; i < nItems; ++i) { + bulk.insert({_id: i, counter: i + 1, number: strings[i % 20], random: Math.random()}); + + // Generate one chunk for each 1000 documents so the balancer is kept busy throughout the + // execution of the test + if (i % 1000 == 0) { + assert.commandWorked(shardedAggTest.splitAt("aggShard.ts1", {_id: i})); + } } assert.commandWorked(bulk.execute()); -jsTestLog('a project and group in shards, result combined in mongos'); -var a1 = db.ts1 - .aggregate([ - {$project: {cMod10: {$mod: ["$counter", 10]}, number: 1, counter: 1}}, - { - $group: { - _id: "$cMod10", - numberSet: {$addToSet: "$number"}, - avgCounter: {$avg: "$cMod10"} - } - }, - {$sort: {_id: 1}} - ]) - .toArray(); - -for (i = 0; i < 10; ++i) { - assert.eq(a1[i].avgCounter, a1[i]._id, 'agg sharded test avgCounter failed'); - assert.eq(a1[i].numberSet.length, 2, 'agg sharded test numberSet length failed'); -} +jsTestLog("Bulk insert completed, starting balancer"); +shardedAggTest.startBalancer(); +shardedAggTest.awaitBalancerRound(); + +(function testProjectAndGroup() { + jsTestLog('Testing project and group in shards, result combined in mongos'); + var a1 = database.ts1 + .aggregate([ + {$project: {cMod10: {$mod: ["$counter", 10]}, number: 1, counter: 1}}, + { + $group: { + _id: "$cMod10", + numberSet: {$addToSet: "$number"}, + avgCounter: {$avg: "$cMod10"} + } + }, + {$sort: {_id: 1}} + ]) + .toArray(); + assert.eq(a1.length, 10, tojson(a1)); + for (var i = 0; i < 10; ++i) { + assert.eq(a1[i].avgCounter, a1[i]._id, 'agg sharded test avgCounter failed'); + assert.eq(a1[i].numberSet.length, 2, 'agg sharded test numberSet length failed'); + } -jsTestLog('an initial group starts the group in the shards, and combines them in mongos'); -var a2 = db.ts1.aggregate([{$group: {_id: "all", total: {$sum: "$counter"}}}]).toArray(); + jsTestLog('an initial group starts the group in the shards, and combines them in mongos'); + var a2 = database.ts1.aggregate([{$group: {_id: "all", total: {$sum: "$counter"}}}]).toArray(); -jsTestLog('sum of an arithmetic progression S(n) = (n/2)(a(1) + a(n));'); -assert.eq(a2[0].total, (nItems / 2) * (1 + nItems), 'agg sharded test counter sum failed'); + jsTestLog('sum of an arithmetic progression S(n) = (n/2)(a(1) + a(n));'); + assert.eq(a2[0].total, (nItems / 2) * (1 + nItems), 'agg sharded test counter sum failed'); -jsTestLog('A group combining all documents into one, averaging a null field.'); -assert.eq(db.ts1.aggregate([{$group: {_id: null, avg: {$avg: "$missing"}}}]).toArray(), - [{_id: null, avg: null}]); + jsTestLog('A group combining all documents into one, averaging a null field.'); + assert.eq(database.ts1.aggregate([{$group: {_id: null, avg: {$avg: "$missing"}}}]).toArray(), + [{_id: null, avg: null}]); -jsTestLog('an initial group starts the group in the shards, and combines them in mongos'); -var a3 = - db.ts1.aggregate([{$group: {_id: "$number", total: {$sum: 1}}}, {$sort: {_id: 1}}]).toArray(); + jsTestLog('an initial group starts the group in the shards, and combines them in mongos'); + var a3 = + database.ts1.aggregate([{$group: {_id: "$number", total: {$sum: 1}}}, {$sort: {_id: 1}}]) + .toArray(); -for (i = 0; i < strings.length; ++i) { - assert.eq(a3[i].total, nItems / strings.length, 'agg sharded test sum numbers failed'); -} + for (var i = 0; i < strings.length; ++i) { + assert.eq(a3[i].total, nItems / strings.length, 'agg sharded test sum numbers failed'); + } -jsTestLog('a match takes place in the shards; just returning the results from mongos'); -var a4 = db.ts1 - .aggregate([{ - $match: { - $or: [ - {counter: 55}, - {counter: 1111}, - {counter: 2222}, - {counter: 33333}, - {counter: 99999}, - {counter: 55555} - ] - } - }]) - .toArray(); -assert.eq(a4.length, 6, tojson(a4)); -for (i = 0; i < 6; ++i) { - c = a4[i].counter; - printjson({c: c}); - assert((c == 55) || (c == 1111) || (c == 2222) || (c == 33333) || (c == 99999) || (c == 55555), - 'agg sharded test simple match failed'); -} + jsTestLog('a match takes place in the shards; just returning the results from mongos'); + var a4 = database.ts1 + .aggregate([{ + $match: { + $or: [ + {counter: 55}, + {counter: 1111}, + {counter: 2222}, + {counter: 33333}, + {counter: 99999}, + {counter: 55555} + ] + } + }]) + .toArray(); + assert.eq(a4.length, 6, tojson(a4)); + for (var i = 0; i < 6; ++i) { + var c = a4[i].counter; + printjson({c: c}); + assert( + (c == 55) || (c == 1111) || (c == 2222) || (c == 33333) || (c == 99999) || (c == 55555), + 'agg sharded test simple match failed'); + } +}()); function testSkipLimit(ops, expectedCount) { - jsTestLog('testSkipLimit(' + tojson(ops) + ', ' + expectedCount + ')'); + jsTestLog('Testing $skip with $limit: ' + tojson(ops) + ', ' + expectedCount); if (expectedCount > 10) { // make shard -> mongos intermediate results less than 16MB ops.unshift({$project: {_id: 1}}); @@ -118,10 +121,9 @@ function testSkipLimit(ops, expectedCount) { ops.push({$group: {_id: 1, count: {$sum: 1}}}); - var out = db.ts1.aggregate(ops).toArray(); + var out = database.ts1.aggregate(ops).toArray(); assert.eq(out[0].count, expectedCount); } - testSkipLimit([], nItems); // control testSkipLimit([{$skip: 10}], nItems - 10); testSkipLimit([{$limit: 10}], 10); @@ -133,11 +135,11 @@ testSkipLimit([{$limit: 10}, {$skip: 5}, {$skip: 3}], 10 - 3 - 5); // test sort + limit (using random to pull from both shards) function testSortLimit(limit, direction) { - jsTestLog('testSortLimit(' + limit + ', ' + direction + ')'); + jsTestLog('Testing $sort with $limit: ' + limit + ', ' + direction); var from_cursor = - db.ts1.find({}, {random: 1, _id: 0}).sort({random: direction}).limit(limit).toArray(); + database.ts1.find({}, {random: 1, _id: 0}).sort({random: direction}).limit(limit).toArray(); var from_agg = - db.ts1 + database.ts1 .aggregate( [{$project: {random: 1, _id: 0}}, {$sort: {random: direction}}, {$limit: limit}]) .toArray(); @@ -150,11 +152,11 @@ testSortLimit(10, -1); testSortLimit(100, 1); testSortLimit(100, -1); -function testAvgStdDev() { - jsTestLog('testing $avg and $stdDevPop in sharded $group'); +(function testAvgStdDev() { + jsTestLog('Testing $avg and $stdDevPop in sharded $group'); // $stdDevPop can vary slightly between runs if a migration occurs. This is why we use // assert.close below. - var res = db.ts1 + var res = database.ts1 .aggregate([{ $group: { _id: null, @@ -170,83 +172,82 @@ function testAvgStdDev() { // http://en.wikipedia.org/wiki/Arithmetic_progression#Standard_deviation var stdDev = Math.sqrt(((nItems - 1) * (nItems + 1)) / 12); assert.close(res[0].stdDevPop, stdDev, '', 10 /*decimal places*/); -} -testAvgStdDev(); - -function testSample() { - jsTestLog('testing $sample'); - [0, 1, 10, nItems, nItems + 1].forEach(function(size) { - // TODO: SERVER-29446 Remove this try catch block after completing SERVER-29446. - let res = {}; - try { - res = db.ts1.aggregate([{$sample: {size: size}}]).toArray(); - } catch (e) { - assert.eq(e.code, 28799, e); - return; - } - assert.eq(res.length, Math.min(nItems, size)); - }); -} - -testSample(); - -jsTestLog('test $out by copying source collection verbatim to output'); -var outCollection = db.ts1_out; -var res = db.ts1.aggregate([{$out: outCollection.getName()}]).toArray(); -assert.eq(db.ts1.find().itcount(), outCollection.find().itcount()); -assert.eq(db.ts1.find().sort({_id: 1}).toArray(), outCollection.find().sort({_id: 1}).toArray()); - -// Make sure we error out if $out collection is sharded -assert.commandFailed( - db.runCommand({aggregate: outCollection.getName(), pipeline: [{$out: db.ts1.getName()}]})); - -assert.commandWorked(db.literal.save({dollar: false})); - -result = - db.literal - .aggregate([{ - $project: {_id: 0, cost: {$cond: ['$dollar', {$literal: '$1.00'}, {$literal: '$.99'}]}} - }]) - .toArray(); - -assert.eq([{cost: '$.99'}], result); +}()); -(function() { -jsTestLog('Testing a $match stage on the shard key.'); - -var outCollection = 'testShardKeyMatchOut'; - -// Point query. -var targetId = Math.floor(nItems * Math.random()); -var pipeline = [{$match: {_id: targetId}}, {$project: {_id: 1}}, {$sort: {_id: 1}}]; -var expectedDocs = [{_id: targetId}]; -// Normal pipeline. -assert.eq(db.ts1.aggregate(pipeline).toArray(), expectedDocs); -// With $out. -db[outCollection].drop(); -pipeline.push({$out: outCollection}); -db.ts1.aggregate(pipeline); -assert.eq(db[outCollection].find().toArray(), expectedDocs); - -// Range query. -var range = 500; -var targetStart = Math.floor((nItems - range) * Math.random()); -pipeline = [ - {$match: {_id: {$gte: targetStart, $lt: targetStart + range}}}, - {$project: {_id: 1}}, - {$sort: {_id: 1}} -]; -expectedDocs = []; -for (var i = targetStart; i < targetStart + range; i++) { - expectedDocs.push({_id: i}); +function testSample(size) { + jsTestLog('Testing $sample of size ' + size); + let result = database.ts1.aggregate([{$sample: {size: size}}]).toArray(); + assert.eq(result.length, Math.min(nItems, size)); } -// Normal pipeline. -assert.eq(db.ts1.aggregate(pipeline).toArray(), expectedDocs); -// With $out. -db[outCollection].drop(); -pipeline.push({$out: outCollection}); -db.ts1.aggregate(pipeline); -assert.eq(db[outCollection].find().toArray(), expectedDocs); +testSample(0); +testSample(1); +testSample(10); +testSample(nItems); +testSample(nItems + 1); + +(function testOutWithCopy() { + jsTestLog('Testing $out by copying source collection verbatim to output'); + assert.commandWorked( + shardedAggTest.s0.adminCommand({shardcollection: "aggShard.literal", key: {"_id": 1}})); + + var outCollection = database.ts1_out; + assert.eq(database.ts1.aggregate([{$out: outCollection.getName()}]).toArray(), ""); + assert.eq(database.ts1.find().itcount(), outCollection.find().itcount()); + assert.eq(database.ts1.find().sort({_id: 1}).toArray(), + outCollection.find().sort({_id: 1}).toArray()); + + // Make sure we error out if $out collection is sharded + assert.commandFailed(database.runCommand( + {aggregate: outCollection.getName(), pipeline: [{$out: database.ts1.getName()}]})); + + assert.commandWorked(database.literal.save({dollar: false})); + + let result = + database.literal + .aggregate([{ + $project: + {_id: 0, cost: {$cond: ['$dollar', {$literal: '$1.00'}, {$literal: '$.99'}]}} + }]) + .toArray(); + assert.eq([{cost: '$.99'}], result); +}()); + +(function testMatch() { + jsTestLog('Testing a $match stage on the shard key.'); + + var outCollection = 'testShardKeyMatchOut'; + + // Point query. + var targetId = Math.floor(nItems * Math.random()); + var pipeline = [{$match: {_id: targetId}}, {$project: {_id: 1}}, {$sort: {_id: 1}}]; + var expectedDocs = [{_id: targetId}]; + // Normal pipeline. + assert.eq(database.ts1.aggregate(pipeline).toArray(), expectedDocs); + // With $out. + database[outCollection].drop(); + pipeline.push({$out: outCollection}); + database.ts1.aggregate(pipeline); + assert.eq(database[outCollection].find().toArray(), expectedDocs); + + // Range query. + var range = 500; + var targetStart = Math.floor((nItems - range) * Math.random()); + pipeline = [ + {$match: {_id: {$gte: targetStart, $lt: targetStart + range}}}, + {$project: {_id: 1}}, + {$sort: {_id: 1}} + ]; + expectedDocs = []; + for (var i = targetStart; i < targetStart + range; i++) { + expectedDocs.push({_id: i}); + } + // Normal pipeline. + assert.eq(database.ts1.aggregate(pipeline).toArray(), expectedDocs); + // With $out. + database[outCollection].drop(); + pipeline.push({$out: outCollection}); + database.ts1.aggregate(pipeline); + assert.eq(database[outCollection].find().toArray(), expectedDocs); }()); shardedAggTest.stop(); diff --git a/jstests/sharding/merge_stale_on_fields.js b/jstests/sharding/merge_stale_on_fields.js index 87a48a0482c..59066b5b59d 100644 --- a/jstests/sharding/merge_stale_on_fields.js +++ b/jstests/sharding/merge_stale_on_fields.js @@ -8,42 +8,41 @@ load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode, const st = new ShardingTest({shards: 2, mongos: 2}); const dbName = "merge_stale_unique_key"; -assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +assert.commandWorked(st.s0.adminCommand({enableSharding: dbName})); const source = st.s0.getDB(dbName).source; const target = st.s0.getDB(dbName).target; +st.shardColl(source, {_id: 1} /* shardKey */, {_id: 0} /* splitAt */, {_id: 1} /* chunkToMove*/); +assert.commandWorked(source.insert({_id: 'seed'})); + // Test that an $merge through a stale mongos can still use the correct "on" fields and succeed. (function testDefaultOnFieldsIsRecent() { const freshMongos = st.s0; const staleMongos = st.s1; - - // Set up two collections for an aggregate with an $merge: The source collection will be - // unsharded and the target collection will be sharded amongst the two shards. const staleMongosDB = staleMongos.getDB(dbName); - st.shardColl(source, {_id: 1}, {_id: 0}, {_id: 1}); (function setupStaleMongos() { - // Shard the collection through 'staleMongos', setting up 'staleMongos' to believe the - // collection is sharded by {sk: 1, _id: 1}. + // Shard the collection through 'staleMongos', setting it up to believe the collection is + // sharded by {sk: 1, _id: 1}. assert.commandWorked(staleMongosDB.adminCommand( {shardCollection: target.getFullName(), key: {sk: 1, _id: 1}})); // Perform a query through that mongos to ensure the cache is populated. assert.eq(0, staleMongosDB[target.getName()].find().itcount()); - // Drop the collection from the other mongos - it is no longer sharded but the stale - // mongos doesn't know that yet. + // Drop the collection from the other mongos - it is no longer sharded but the stale mongos + // doesn't know that yet. target.drop(); }()); - // At this point 'staleMongos' will believe that the target collection is sharded. This - // should not prevent it from running an $merge without "on" fields specified. + // At this point 'staleMongos' will believe that the target collection is sharded. This should + // not prevent it from running an $merge without "on" fields specified. + // // Specifically, the mongos should force a refresh of its cache before defaulting the "on" // fields. - assert.commandWorked(source.insert({_id: 'seed'})); - // If we had used the stale "on" fields, this aggregation would fail since the documents do - // not have an 'sk' field. + // If we had used the stale "on" fields, this aggregation would fail since the documents do not + // have an 'sk' field. assert.doesNotThrow( () => staleMongosDB[source.getName()].aggregate( [{$merge: {into: target.getName(), whenMatched: 'fail', whenNotMatched: 'insert'}}])); @@ -51,11 +50,11 @@ const target = st.s0.getDB(dbName).target; target.drop(); }()); -// Test that if the collection is dropped and re-sharded during the course of the aggregation -// that the operation will fail rather than proceed with the old shard key. +// Test that if the collection is dropped and re-sharded during the course of the aggregation that +// the operation will fail rather than proceed with the old shard key. function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) { - // Converts a single string or an array of strings into it's object spec form. For instance, - // for input ["a", "b"] the returned object would be {a: 1, b: 1}. + // Converts a single string or an array of strings into it's object spec form. For instance, for + // input ["a", "b"] the returned object would be {a: 1, b: 1}. function indexSpecFromOnFields(onFields) { let spec = {}; if (typeof (onFields) == "string") { @@ -68,15 +67,16 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) { return spec; } + // Drop the collection and reshard it with a different shard key target.drop(); if (mergeSpec.hasOwnProperty('on')) { assert.commandWorked( target.createIndex(indexSpecFromOnFields(mergeSpec.on), {unique: true})); - assert.commandWorked(st.s.adminCommand( + assert.commandWorked(st.s0.adminCommand( {shardCollection: target.getFullName(), key: indexSpecFromOnFields(mergeSpec.on)})); } else { assert.commandWorked( - st.s.adminCommand({shardCollection: target.getFullName(), key: {sk: 1, _id: 1}})); + st.s0.adminCommand({shardCollection: target.getFullName(), key: {sk: 1, _id: 1}})); } // Use a failpoint to make the query feeding into the aggregate hang while we drop the @@ -85,6 +85,7 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) { assert.commandWorked(mongod.adminCommand( {configureFailPoint: failpoint, mode: "alwaysOn", data: failpointData || {}})); }); + let parallelShellJoiner; try { let parallelCode = ` @@ -97,12 +98,12 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) { `; if (mergeSpec.hasOwnProperty("on")) { - // If a user specifies their own "on" fields, we don't need to fail an aggregation - // if the collection is dropped and recreated or the epoch otherwise changes. We are - // allowed to fail such an operation should we choose to in the future, but for now - // we don't expect to because we do not do anything special on mongos to ensure the - // catalog cache is up to date, so do not want to attach mongos's believed epoch to - // the command for the shards. + // If a user specifies their own "on" fields, we don't need to fail an aggregation if + // the collection is dropped and recreated or the epoch otherwise changes. We are + // allowed to fail such an operation should we choose to in the future, but for now we + // don't expect to because we do not do anything special on mongos to ensure the catalog + // cache is up to date, so do not want to attach mongos's believed epoch to the command + // for the shards. parallelCode = ` const source = db.getSiblingDB("${dbName}").${source.getName()}; assert.doesNotThrow(() => source.aggregate([ @@ -112,12 +113,12 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) { `; } - parallelShellJoiner = startParallelShell(parallelCode, st.s.port); + parallelShellJoiner = startParallelShell(parallelCode, st.s0.port); - // Wait for the merging $merge to appear in the currentOp output from the shards. We - // should see that the $merge stage has an 'epoch' field serialized from the mongos. + // Wait for the merging $merge to appear in the currentOp output from the shards. We should + // see that the $merge stage has an 'epoch' field serialized from the mongos. const getAggOps = function() { - return st.s.getDB("admin") + return st.s0.getDB("admin") .aggregate([ {$currentOp: {}}, {$match: {"cursor.originatingCommand.pipeline": {$exists: true}}} @@ -135,7 +136,7 @@ function testEpochChangeDuringAgg({mergeSpec, failpoint, failpointData}) { }; assert.soon(hasMergeRunning, () => tojson(getAggOps())); - // Drop the collection so that the epoch changes. + // Drop the collection so that the epoch changes while the merge operation is executing target.drop(); } finally { [st.rs0.getPrimary(), st.rs1.getPrimary()].forEach((mongod) => { @@ -153,9 +154,8 @@ for (let i = 0; i < 1000; ++i) { assert.commandWorked(bulk.execute()); withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => { - // Skip the combination of merge modes which will fail depending on the contents of the - // source and target collection, as this will cause a different assertion error from the one - // expected. + // Skip the combination of merge modes which will fail depending on the contents of the source + // and target collection, as this will cause a different assertion error from the one expected. if (whenNotMatchedMode == "fail") return; @@ -179,8 +179,9 @@ withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => { failpointData: {namespace: source.getFullName()} }); }); -// Test with some different failpoints to prove we will detect an epoch change in the middle -// of the inserts or updates. + +// Test with some different failpoints to prove we will detect an epoch change in the middle of the +// inserts or updates. testEpochChangeDuringAgg({ mergeSpec: {into: target.getName(), whenMatched: "fail", whenNotMatched: "insert"}, failpoint: "hangDuringBatchInsert", diff --git a/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js b/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js index 700d47990f8..558045c328d 100644 --- a/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js +++ b/jstests/sharding/stale_mongos_and_restarted_shards_agree_on_shard_version.js @@ -12,11 +12,11 @@ // TODO (SERVER-32198) remove after SERVER-32198 is fixed TestData.skipCheckOrphans = true; -var st = new ShardingTest({shards: 2, mongos: 2}); +const st = new ShardingTest({shards: 2, mongos: 2}); // Used to get the shard destination ids for the moveChunks commands -var shard0Name = st.shard0.shardName; -var shard1Name = st.shard1.shardName; +const shard0Name = st.shard0.shardName; +const shard1Name = st.shard1.shardName; var database = 'TestDB'; st.enableSharding(database); diff --git a/src/mongo/db/exec/shard_filterer_impl.cpp b/src/mongo/db/exec/shard_filterer_impl.cpp index dbe4c057c7d..131cc8b40f4 100644 --- a/src/mongo/db/exec/shard_filterer_impl.cpp +++ b/src/mongo/db/exec/shard_filterer_impl.cpp @@ -29,11 +29,10 @@ #include "mongo/platform/basic.h" -#include "mongo/db/exec/filter.h" #include "mongo/db/exec/shard_filterer_impl.h" +#include "mongo/db/exec/filter.h" #include "mongo/db/matcher/matchable.h" -#include "mongo/db/s/scoped_collection_metadata.h" namespace mongo { diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 0a32967e3fe..188f8de3551 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -42,7 +42,6 @@ #include "mongo/db/query/collation/collation_spec.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/repl/read_concern_args.h" -#include "mongo/db/s/scoped_collection_metadata.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/redaction.h" #include "mongo/s/catalog_cache.h" diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 06cfbd0aaea..7154b659dc2 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -359,39 +359,29 @@ private: * * Does *not* retry or retarget if the metadata is stale. */ - static Status _commandOpWrite(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& command, - BatchItemRef targetingBatchItem, - std::vector<Strategy::CommandResult>* results) { - // Note that this implementation will not handle targeting retries and does not completely - // emulate write behavior - ChunkManagerTargeter targeter(nss); - Status status = targeter.init(opCtx); - if (!status.isOK()) - return status; - - auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> { + static void _commandOpWrite(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& command, + BatchItemRef targetingBatchItem, + std::vector<Strategy::CommandResult>* results) { + auto endpoints = [&] { + // Note that this implementation will not handle targeting retries and does not + // completely emulate write behavior + ChunkManagerTargeter targeter(opCtx, nss); + if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Insert) { - auto swEndpoint = targeter.targetInsert(opCtx, targetingBatchItem.getDocument()); - if (!swEndpoint.isOK()) - return swEndpoint.getStatus(); - return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())}; + return std::vector{targeter.targetInsert(opCtx, targetingBatchItem.getDocument())}; } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Update) { return targeter.targetUpdate(opCtx, targetingBatchItem.getUpdate()); } else if (targetingBatchItem.getOpType() == BatchedCommandRequest::BatchType_Delete) { return targeter.targetDelete(opCtx, targetingBatchItem.getDelete()); - } else { - MONGO_UNREACHABLE; } + MONGO_UNREACHABLE; }(); - if (!swEndpoints.isOK()) - return swEndpoints.getStatus(); - // Assemble requests std::vector<AsyncRequestsSender::Request> requests; - for (const auto& endpoint : swEndpoints.getValue()) { + for (const auto& endpoint : endpoints) { BSONObj cmdObjWithVersions = BSONObj(command); if (endpoint.databaseVersion) { cmdObjWithVersions = @@ -412,17 +402,10 @@ private: readPref, Shard::RetryPolicy::kNoRetry); - // Receive the responses. - - Status dispatchStatus = Status::OK(); while (!ars.done()) { // Block until a response is available. auto response = ars.next(); - - if (!response.swResponse.isOK()) { - dispatchStatus = std::move(response.swResponse.getStatus()); - break; - } + uassertStatusOK(response.swResponse); Strategy::CommandResult result; @@ -435,8 +418,6 @@ private: results->push_back(result); } - - return dispatchStatus; } }; @@ -596,8 +577,8 @@ private: // Target the command to the shards based on the singleton batch item. BatchItemRef targetingBatchItem(&_batchedRequest, 0); std::vector<Strategy::CommandResult> shardResults; - uassertStatusOK(_commandOpWrite( - opCtx, _batchedRequest.getNS(), explainCmd, targetingBatchItem, &shardResults)); + _commandOpWrite( + opCtx, _batchedRequest.getNS(), explainCmd, targetingBatchItem, &shardResults); auto bodyBuilder = result->getBodyBuilder(); uassertStatusOK(ClusterExplain::buildExplainResult( opCtx, shardResults, ClusterExplain::kWriteOnShards, timer.millis(), &bodyBuilder)); diff --git a/src/mongo/s/ns_targeter.h b/src/mongo/s/ns_targeter.h index c3f86a4ea29..36ce6269680 100644 --- a/src/mongo/s/ns_targeter.h +++ b/src/mongo/s/ns_targeter.h @@ -67,7 +67,7 @@ struct ShardEndpoint { * 1a. On targeting failure we may need to refresh, note that it happened. * 1b. On stale config from a child write operation we may need to refresh, note the error. * - * 2. RefreshIfNeeded() to get newer targeting information + * 2. refreshIfNeeded() to get newer targeting information * * 3. Goto 0. * @@ -78,8 +78,7 @@ struct ShardEndpoint { * Implementers are free to define more specific targeting error codes to allow more complex * error handling. * - * Interface must be externally synchronized if used in multiple threads, for now. - * TODO: Determine if we should internally synchronize. + * The interface must not be used from multiple threads. */ class NSTargeter { public: @@ -91,36 +90,29 @@ public: virtual const NamespaceString& getNS() const = 0; /** - * Returns a ShardEndpoint for a single document write. - * - * Returns !OK with message if document could not be targeted for other reasons. + * Returns a ShardEndpoint for a single document write or throws ShardKeyNotFound if 'doc' is + * malformed with respect to the shard key pattern of the collection. */ - virtual StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx, - const BSONObj& doc) const = 0; + virtual ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const = 0; /** - * Returns a vector of ShardEndpoints for a potentially multi-shard update. - * - * Returns OK and fills the endpoints; returns a status describing the error otherwise. + * Returns a vector of ShardEndpoints for a potentially multi-shard update or throws + * ShardKeyNotFound if 'updateOp' misses a shard key, but the type of update requires it. */ - virtual StatusWith<std::vector<ShardEndpoint>> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const = 0; + virtual std::vector<ShardEndpoint> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const = 0; /** - * Returns a vector of ShardEndpoints for a potentially multi-shard delete. - * - * Returns OK and fills the endpoints; returns a status describing the error otherwise. + * Returns a vector of ShardEndpoints for a potentially multi-shard delete or throws + * ShardKeyNotFound if 'deleteOp' misses a shard key, but the type of delete requires it. */ - virtual StatusWith<std::vector<ShardEndpoint>> targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const = 0; + virtual std::vector<ShardEndpoint> targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const = 0; /** * Returns a vector of ShardEndpoints for all shards. - * - * Returns !OK with message if all shards could not be targeted. */ - virtual StatusWith<std::vector<ShardEndpoint>> targetAllShards( - OperationContext* opCtx) const = 0; + virtual std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const = 0; /** * Informs the targeter that a targeting failure occurred during one of the last targeting @@ -160,9 +152,8 @@ public: * information used here was changed. * * NOTE: This function may block for shared resources or network calls. - * Returns !OK with message if could not refresh */ - virtual Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) = 0; + virtual void refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) = 0; /** * Returns whether this write targets the config server. Invariants if the write targets the diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 0145cc9cdc4..d2d0ff46af9 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -403,8 +403,7 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, abortBatch = true; // Throw when there is a transient transaction error since this should be a - // top - // level error and not just a write error. + // top level error and not just a write error. if (isTransientTransactionError(status.code(), false, false)) { uassertStatusOK(status); } @@ -429,19 +428,18 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, // bool targeterChanged = false; - Status refreshStatus = targeter.refreshIfNeeded(opCtx, &targeterChanged); - - LOGV2_DEBUG(22909, - 4, - "executeBatch targeter changed: {targeterChanged}", - "targeterChanged"_attr = targeterChanged); - - if (!refreshStatus.isOK()) { - // It's okay if we can't refresh, we'll just record errors for the ops if - // needed. + try { + targeter.refreshIfNeeded(opCtx, &targeterChanged); + } catch (const ExceptionFor<ErrorCodes::StaleEpoch>& ex) { + batchOp.abortBatch(errorFromStatus( + ex.toStatus("collection was dropped in the middle of the operation"))); + break; + } catch (const DBException& ex) { + // It's okay if we can't refresh, we'll just record errors for the ops if needed LOGV2_WARNING(22911, - "could not refresh targeter{causedBy_refreshStatus_reason}", - "causedBy_refreshStatus_reason"_attr = causedBy(refreshStatus.reason())); + "Could not refresh targeter due to {error}", + "Could not refresh targeter", + "error"_attr = redact(ex)); } // @@ -468,8 +466,8 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, } auto nShardsOwningChunks = batchOp.getNShardsOwningChunks(); - if (nShardsOwningChunks.is_initialized()) - stats->noteNumShardsOwningChunks(nShardsOwningChunks.get()); + if (nShardsOwningChunks) + stats->noteNumShardsOwningChunks(*nShardsOwningChunks); batchOp.buildClientResponse(clientResponse); diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 8853ca9ab94..39e3c1c609f 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -412,7 +412,7 @@ TEST_F(BatchWriteExecTest, StaleShardVersionReturnedFromBatchWithSingleMultiWrit public: using MockNSTargeter::MockNSTargeter; - StatusWith<std::vector<ShardEndpoint>> targetUpdate( + std::vector<ShardEndpoint> targetUpdate( OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), @@ -513,7 +513,7 @@ TEST_F(BatchWriteExecTest, public: using MockNSTargeter::MockNSTargeter; - StatusWith<std::vector<ShardEndpoint>> targetUpdate( + std::vector<ShardEndpoint> targetUpdate( OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), @@ -629,7 +629,7 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1Firs) { public: using MockNSTargeter::MockNSTargeter; - StatusWith<std::vector<ShardEndpoint>> targetUpdate( + std::vector<ShardEndpoint> targetUpdate( OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), @@ -756,7 +756,7 @@ TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOK public: using MockNSTargeter::MockNSTargeter; - StatusWith<std::vector<ShardEndpoint>> targetUpdate( + std::vector<ShardEndpoint> targetUpdate( OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { return std::vector<ShardEndpoint>{ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 0f849bd3bc8..9d763c328a1 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -318,7 +318,12 @@ Status BatchWriteOp::targetBatch(const NSTargeter& targeter, OwnedPointerVector<TargetedWrite> writesOwned; vector<TargetedWrite*>& writes = writesOwned.mutableVector(); - Status targetStatus = writeOp.targetWrites(_opCtx, targeter, &writes); + Status targetStatus = Status::OK(); + try { + writeOp.targetWrites(_opCtx, targeter, &writes); + } catch (const DBException& ex) { + targetStatus = ex.toStatus(); + } if (!targetStatus.isOK()) { WriteErrorDetail targetError; diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index ae05b9e3fff..c904bd7c829 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -71,13 +71,12 @@ ServerStatusMetricField<Counter64> updateOneOpStyleBroadcastWithExactIDStats( * or * coll.update({x: 1}, [{$addFields: {y: 2}}]) */ -StatusWith<UpdateType> getUpdateExprType(const write_ops::UpdateOpEntry& updateDoc) { - const auto updateMod = updateDoc.getU(); +UpdateType getUpdateExprType(const write_ops::UpdateOpEntry& updateDoc) { + const auto& updateMod = updateDoc.getU(); if (updateMod.type() == write_ops::UpdateModification::Type::kPipeline) { return UpdateType::kOpStyle; } - // Obtain the update expression from the request. const auto& updateExpr = updateMod.getUpdateClassic(); // Empty update is replacement-style by default. @@ -90,18 +89,18 @@ StatusWith<UpdateType> getUpdateExprType(const write_ops::UpdateOpEntry& updateD : UpdateType::kReplacement); // If the current field's type does not match the existing updateType, abort. - if (updateType != curFieldType && updateType != UpdateType::kUnknown) { - return {ErrorCodes::UnsupportedFormat, - str::stream() << "update document " << updateExpr - << " has mixed $operator and non-$operator style fields"}; - } - updateType = curFieldType; - } + if (updateType == UpdateType::kUnknown) + updateType = curFieldType; - if (updateType == UpdateType::kReplacement && updateDoc.getMulti()) { - return {ErrorCodes::InvalidOptions, "Replacement-style updates cannot be {multi:true}"}; + uassert(ErrorCodes::UnsupportedFormat, + str::stream() << "update document " << updateExpr + << " has mixed $operator and non-$operator style fields", + updateType == curFieldType); } + uassert(ErrorCodes::InvalidOptions, + "Replacement-style updates cannot be {multi:true}", + updateType == UpdateType::kOpStyle || !updateDoc.getMulti()); return updateType; } @@ -113,29 +112,29 @@ StatusWith<UpdateType> getUpdateExprType(const write_ops::UpdateOpEntry& updateD * generating the new document in the case of an upsert. It is therefore always correct to target * the operation on the basis of the combined updateExpr and query. */ -StatusWith<BSONObj> getUpdateExprForTargeting(OperationContext* opCtx, - const ShardKeyPattern& shardKeyPattern, - const NamespaceString& nss, - const UpdateType updateType, - const write_ops::UpdateOpEntry& updateDoc) { +BSONObj getUpdateExprForTargeting(OperationContext* opCtx, + const ShardKeyPattern& shardKeyPattern, + const NamespaceString& nss, + UpdateType updateType, + const write_ops::UpdateOpEntry& updateOp) { // We should never see an invalid update type here. invariant(updateType != UpdateType::kUnknown); + const auto& updateMod = updateOp.getU(); + // If this is not a replacement update, then the update expression remains unchanged. if (updateType != UpdateType::kReplacement) { - const auto& updateMod = updateDoc.getU(); BSONObjBuilder objBuilder; updateMod.serializeToBSON("u", &objBuilder); return objBuilder.obj(); } // Extract the raw update expression from the request. - const auto& updateMod = updateDoc.getU(); invariant(updateMod.type() == write_ops::UpdateModification::Type::kClassic); - auto updateExpr = updateMod.getUpdateClassic(); // Replace any non-existent shard key values with a null value. - updateExpr = shardKeyPattern.emplaceMissingShardKeyValuesForDocument(updateExpr); + auto updateExpr = + shardKeyPattern.emplaceMissingShardKeyValuesForDocument(updateMod.getUpdateClassic()); // If we aren't missing _id, return the update expression as-is. if (updateExpr.hasField(kIdFieldName)) { @@ -146,10 +145,8 @@ StatusWith<BSONObj> getUpdateExprForTargeting(OperationContext* opCtx, // This will guarantee that we can target a single shard, but it is not necessarily fatal if no // exact _id can be found. const auto idFromQuery = - kVirtualIdShardKey.extractShardKeyFromQuery(opCtx, nss, updateDoc.getQ()); - if (!idFromQuery.isOK()) { - return idFromQuery; - } else if (auto idElt = idFromQuery.getValue()[kIdFieldName]) { + uassertStatusOK(kVirtualIdShardKey.extractShardKeyFromQuery(opCtx, nss, updateOp.getQ())); + if (auto idElt = idFromQuery[kIdFieldName]) { updateExpr = updateExpr.addField(idElt); } @@ -205,6 +202,7 @@ bool isExactIdQuery(OperationContext* opCtx, return cq.isOK() && isExactIdQuery(opCtx, *cq.getValue(), manager); } + // // Utilities to compare shard and db versions // @@ -351,24 +349,16 @@ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, } // namespace -ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, +ChunkManagerTargeter::ChunkManagerTargeter(OperationContext* opCtx, + const NamespaceString& nss, boost::optional<OID> targetEpoch) - : _nss(nss), _needsTargetingRefresh(false), _targetEpoch(targetEpoch) {} - - -Status ChunkManagerTargeter::init(OperationContext* opCtx) { - try { - createShardDatabase(opCtx, _nss.db()); - } catch (const DBException& e) { - return e.toStatus(); - } - - const auto routingInfoStatus = getCollectionRoutingInfoForTxnCmd(opCtx, _nss); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); - } + : _nss(nss), _needsTargetingRefresh(false), _targetEpoch(std::move(targetEpoch)) { + _init(opCtx); +} - _routingInfo = std::move(routingInfoStatus.getValue()); +void ChunkManagerTargeter::_init(OperationContext* opCtx) { + createShardDatabase(opCtx, _nss.db()); + _routingInfo = uassertStatusOK(getCollectionRoutingInfoForTxnCmd(opCtx, _nss)); if (_targetEpoch) { uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm()); @@ -376,22 +366,20 @@ Status ChunkManagerTargeter::init(OperationContext* opCtx) { "Collection epoch has changed", _routingInfo->cm()->getVersion().epoch() == *_targetEpoch); } - - return Status::OK(); } const NamespaceString& ChunkManagerTargeter::getNS() const { return _nss; } -StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* opCtx, - const BSONObj& doc) const { +ShardEndpoint ChunkManagerTargeter::targetInsert(OperationContext* opCtx, + const BSONObj& doc) const { BSONObj shardKey; if (_routingInfo->cm()) { shardKey = _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromDoc(doc); - // The shard key would only be empty after extraction if we encountered an error case, - // such as the shard key possessing an array value or array descendants. If the shard key + // The shard key would only be empty after extraction if we encountered an error case, such + // as the shard key possessing an array value or array descendants. If the shard key // presented to the targeter was empty, we would emplace the missing fields, and the // extracted key here would *not* be empty. uassert(ErrorCodes::ShardKeyNotFound, @@ -401,24 +389,17 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::targetInsert(OperationContext* o // Target the shard key or database primary if (!shardKey.isEmpty()) { - return _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize()); - } else { - if (!_routingInfo->db().primary()) { - return Status(ErrorCodes::NamespaceNotFound, - str::stream() << "could not target insert in collection " << getNS().ns() - << "; no metadata found"); - } - - return ShardEndpoint(_routingInfo->db().primary()->getId(), - ChunkVersion::UNSHARDED(), - _routingInfo->db().databaseVersion()); + return uassertStatusOK( + _targetShardKey(shardKey, CollationSpec::kSimpleSpec, doc.objsize())); } - return Status::OK(); + return ShardEndpoint(_routingInfo->db().primary()->getId(), + ChunkVersion::UNSHARDED(), + _routingInfo->db().databaseVersion()); } -StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const { +std::vector<ShardEndpoint> ChunkManagerTargeter::targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const { // If the update is replacement-style: // 1. Attempt to target using the query. If this fails, AND the query targets more than one // shard, @@ -432,173 +413,132 @@ StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetUpdate( // as if the the shard key values are specified as NULL. A replacement document is also allowed // to have a missing '_id', and if the '_id' exists in the query, it will be emplaced in the // replacement document for targeting purposes. - const auto updateType = getUpdateExprType(updateDoc); - if (!updateType.isOK()) { - return updateType.getStatus(); - } + const auto updateType = getUpdateExprType(updateOp); // If the collection is not sharded, forward the update to the primary shard. if (!_routingInfo->cm()) { - if (!_routingInfo->db().primary()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target update on " << getNS().ns() - << "; no metadata found"}; - } return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), ChunkVersion::UNSHARDED(), _routingInfo->db().databaseVersion()}}; } const auto& shardKeyPattern = _routingInfo->cm()->getShardKeyPattern(); - const auto collation = write_ops::collationOf(updateDoc); + const auto collation = write_ops::collationOf(updateOp); - const auto updateExpr = getUpdateExprForTargeting( - opCtx, shardKeyPattern, getNS(), updateType.getValue(), updateDoc); - const bool isUpsert = updateDoc.getUpsert(); - const auto query = updateDoc.getQ(); - if (!updateExpr.isOK()) { - return updateExpr.getStatus(); - } + const auto updateExpr = + getUpdateExprForTargeting(opCtx, shardKeyPattern, _nss, updateType, updateOp); + const bool isUpsert = updateOp.getUpsert(); + const auto query = updateOp.getQ(); // Utility function to target an update by shard key, and to handle any potential error results. - const auto targetByShardKey = [&collation, - this](StatusWith<BSONObj> shardKey, - StringData msg) -> StatusWith<std::vector<ShardEndpoint>> { - if (!shardKey.isOK()) { - return shardKey.getStatus().withContext(msg); - } - if (shardKey.getValue().isEmpty()) { - return {ErrorCodes::ShardKeyNotFound, - str::stream() << msg << " :: could not extract exact shard key"}; - } - auto swEndPoint = _targetShardKey(shardKey.getValue(), collation, 0); - if (!swEndPoint.isOK()) { - return swEndPoint.getStatus().withContext(msg); - } - return {{swEndPoint.getValue()}}; + auto targetByShardKey = [this, &collation](StatusWith<BSONObj> swShardKey, std::string msg) { + const auto& shardKey = uassertStatusOKWithContext(std::move(swShardKey), msg); + uassert(ErrorCodes::ShardKeyNotFound, + str::stream() << msg << " :: could not extract exact shard key", + !shardKey.isEmpty()); + return std::vector{ + uassertStatusOKWithContext(_targetShardKey(shardKey, collation, 0), msg)}; }; // If this is an upsert, then the query must contain an exact match on the shard key. If we were // to target based on the replacement doc, it could result in an insertion even if a document // matching the query exists on another shard. if (isUpsert) { - return targetByShardKey(shardKeyPattern.extractShardKeyFromQuery(opCtx, getNS(), query), + return targetByShardKey(shardKeyPattern.extractShardKeyFromQuery(opCtx, _nss, query), "Failed to target upsert by query"); } // We first try to target based on the update's query. It is always valid to forward any update // or upsert to a single shard, so return immediately if we are able to target a single shard. - auto shardEndPoints = _targetQuery(opCtx, query, collation); - if (!shardEndPoints.isOK() || shardEndPoints.getValue().size() == 1) { - return shardEndPoints; + auto endPoints = uassertStatusOK(_targetQuery(opCtx, query, collation)); + if (endPoints.size() == 1) { + return endPoints; } // Replacement-style updates must always target a single shard. If we were unable to do so using // the query, we attempt to extract the shard key from the replacement and target based on it. if (updateType == UpdateType::kReplacement) { - return targetByShardKey(shardKeyPattern.extractShardKeyFromDoc(updateExpr.getValue()), + return targetByShardKey(shardKeyPattern.extractShardKeyFromDoc(updateExpr), "Failed to target update by replacement document"); } // If we are here then this is an op-style update and we were not able to target a single shard. // Non-multi updates must target a single shard or an exact _id. - if (!updateDoc.getMulti() && - !isExactIdQuery(opCtx, getNS(), query, collation, _routingInfo->cm().get())) { - return {ErrorCodes::InvalidOptions, - str::stream() << "A {multi:false} update on a sharded collection must either " - "contain an exact match on _id or must target a single shard but " - "this update targeted _id (and have the collection default " - "collation) or must target a single shard (and have the simple " - "collation), but this update targeted " - << shardEndPoints.getValue().size() - << " shards. Update request: " << updateDoc.toBSON() - << ", shard key pattern: " << shardKeyPattern.toString()}; - } + uassert( + ErrorCodes::InvalidOptions, + str::stream() << "A {multi:false} update on a sharded collection must either contain an " + "exact match on _id or must target a single shard, but this update " + "targeted _id (and have the collection default collation) or must target " + "a single shard (and have the simple collation), but this update targeted " + << endPoints.size() << " shards. Update request: " << updateOp.toBSON() + << ", shard key pattern: " << shardKeyPattern.toString(), + updateOp.getMulti() || + isExactIdQuery(opCtx, _nss, query, collation, _routingInfo->cm().get())); // If the request is {multi:false}, then this is a single op-style update which we are // broadcasting to multiple shards by exact _id. Record this event in our serverStatus metrics. - if (!updateDoc.getMulti()) { + if (!updateOp.getMulti()) { updateOneOpStyleBroadcastWithExactIDCount.increment(1); } - return shardEndPoints; + + return endPoints; } -StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const { +std::vector<ShardEndpoint> ChunkManagerTargeter::targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const { BSONObj shardKey; if (_routingInfo->cm()) { - // // Sharded collections have the following further requirements for targeting: // // Limit-1 deletes must be targeted exactly by shard key *or* exact _id - // - - // Get the shard key - StatusWith<BSONObj> status = - _routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery( - opCtx, getNS(), deleteDoc.getQ()); - - // Bad query - if (!status.isOK()) - return status.getStatus(); - - shardKey = status.getValue(); + shardKey = + uassertStatusOK(_routingInfo->cm()->getShardKeyPattern().extractShardKeyFromQuery( + opCtx, _nss, deleteOp.getQ())); } - const auto collation = write_ops::collationOf(deleteDoc); + const auto collation = write_ops::collationOf(deleteOp); // Target the shard key or delete query if (!shardKey.isEmpty()) { auto swEndpoint = _targetShardKey(shardKey, collation, 0); if (swEndpoint.isOK()) { - return {{swEndpoint.getValue()}}; + return std::vector{std::move(swEndpoint.getValue())}; } } // We failed to target a single shard. // Parse delete query. - auto qr = std::make_unique<QueryRequest>(getNS()); - qr->setFilter(deleteDoc.getQ()); + auto qr = std::make_unique<QueryRequest>(_nss); + qr->setFilter(deleteOp.getQ()); if (!collation.isEmpty()) { qr->setCollation(collation); } const boost::intrusive_ptr<ExpressionContext> expCtx; - auto cq = CanonicalQuery::canonicalize(opCtx, - std::move(qr), - expCtx, - ExtensionsCallbackNoop(), - MatchExpressionParser::kAllowAllSpecialFeatures); - if (!cq.isOK()) { - return cq.getStatus().withContext(str::stream() - << "Could not parse delete query " << deleteDoc.getQ()); - } + auto cq = uassertStatusOKWithContext( + CanonicalQuery::canonicalize(opCtx, + std::move(qr), + expCtx, + ExtensionsCallbackNoop(), + MatchExpressionParser::kAllowAllSpecialFeatures), + str::stream() << "Could not parse delete query " << deleteOp.getQ()); // Single deletes must target a single shard or be exact-ID. - if (_routingInfo->cm() && !deleteDoc.getMulti() && - !isExactIdQuery(opCtx, *cq.getValue(), _routingInfo->cm().get())) { - return Status(ErrorCodes::ShardKeyNotFound, - str::stream() - << "A single delete on a sharded collection must contain an exact " - "match on _id (and have the collection default collation) or " - "contain the shard key (and have the simple collation). Delete " - "request: " - << deleteDoc.toBSON() << ", shard key pattern: " - << _routingInfo->cm()->getShardKeyPattern().toString()); - } - - return _targetQuery(opCtx, deleteDoc.getQ(), collation); + uassert(ErrorCodes::ShardKeyNotFound, + str::stream() << "A single delete on a sharded collection must contain an exact match " + "on _id (and have the collection default collation) or contain the " + "shard key (and have the simple collation). Delete request: " + << deleteOp.toBSON() << ", shard key pattern: " + << _routingInfo->cm()->getShardKeyPattern().toString(), + !_routingInfo->cm() || deleteOp.getMulti() || + isExactIdQuery(opCtx, *cq, _routingInfo->cm().get())); + + return uassertStatusOK(_targetQuery(opCtx, deleteOp.getQ(), collation)); } StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::_targetQuery( OperationContext* opCtx, const BSONObj& query, const BSONObj& collation) const { - if (!_routingInfo->db().primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target query in " << getNS().ns() - << "; no metadata found"}; - } - if (!_routingInfo->cm()) { return std::vector<ShardEndpoint>{{_routingInfo->db().primaryId(), ChunkVersion::UNSHARDED(), @@ -632,21 +572,14 @@ StatusWith<ShardEndpoint> ChunkManagerTargeter::_targetShardKey(const BSONObj& s MONGO_UNREACHABLE; } -StatusWith<std::vector<ShardEndpoint>> ChunkManagerTargeter::targetAllShards( - OperationContext* opCtx) const { - if (!_routingInfo->db().primary() && !_routingInfo->cm()) { - return {ErrorCodes::NamespaceNotFound, - str::stream() << "could not target every shard with versions for " << getNS().ns() - << "; metadata not found"}; - } - - std::vector<ShardId> shardIds; - Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); - +std::vector<ShardEndpoint> ChunkManagerTargeter::targetAllShards(OperationContext* opCtx) const { // This function is only called if doing a multi write that targets more than one shard. This // implies the collection is sharded, so we should always have a chunk manager. invariant(_routingInfo->cm()); + std::vector<ShardId> shardIds; + Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds); + std::vector<ShardEndpoint> endpoints; for (auto&& shardId : shardIds) { endpoints.emplace_back(std::move(shardId), _routingInfo->cm()->getVersion(shardId)); @@ -724,7 +657,7 @@ void ChunkManagerTargeter::noteStaleDbResponse(const ShardEndpoint& endpoint, _remoteDbVersion = remoteDbVersion; } -Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) { +void ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) { bool dummy; if (!wasChanged) { wasChanged = &dummy; @@ -740,14 +673,14 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC "ChunkManagerTargeter checking if refresh is needed", "needsTargetingRefresh"_attr = _needsTargetingRefresh, "hasRemoteShardVersions"_attr = !_remoteShardVersions.empty(), - "hasRemoteDbVersion"_attr = static_cast<bool>(_remoteDbVersion)); + "hasRemoteDbVersion"_attr = bool{_remoteDbVersion}); // // Did we have any stale config or targeting errors at all? // if (!_needsTargetingRefresh && _remoteShardVersions.empty() && !_remoteDbVersion) { - return Status::OK(); + return; } // @@ -757,10 +690,7 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC auto lastManager = _routingInfo->cm(); auto lastDbVersion = _routingInfo->db().databaseVersion(); - auto initStatus = init(opCtx); - if (!initStatus.isOK()) { - return initStatus; - } + _init(opCtx); // We now have the latest metadata from the cache. @@ -782,12 +712,12 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC // If didn't already refresh the targeting information, refresh it if (!alreadyRefreshed) { // To match previous behavior, we just need an incremental refresh here - return _refreshShardVersionNow(opCtx); + _refreshShardVersionNow(opCtx); + return; } *wasChanged = isMetadataDifferent( lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); - return Status::OK(); } else if (!_remoteShardVersions.empty()) { // If we got stale shard versions from remote shards, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues @@ -805,12 +735,12 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC if (result == CompareResult_Unknown || result == CompareResult_LT) { // Our current shard versions aren't all comparable to the old versions, maybe drop - return _refreshShardVersionNow(opCtx); + _refreshShardVersionNow(opCtx); + return; } *wasChanged = isMetadataDifferent( lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); - return Status::OK(); } else if (_remoteDbVersion) { // If we got stale database versions from the remote shard, we may need to refresh // NOTE: Not sure yet if this can happen simultaneously with targeting issues @@ -829,23 +759,16 @@ Status ChunkManagerTargeter::refreshIfNeeded(OperationContext* opCtx, bool* wasC if (result == CompareResult_Unknown || result == CompareResult_LT) { // Our current db version isn't always comparable to the old version, it may have been // dropped - return _refreshDbVersionNow(opCtx); + _refreshDbVersionNow(opCtx); + return; } *wasChanged = isMetadataDifferent( lastManager, lastDbVersion, _routingInfo->cm(), _routingInfo->db().databaseVersion()); - return Status::OK(); } - - MONGO_UNREACHABLE; } bool ChunkManagerTargeter::endpointIsConfigServer() const { - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "could not verify full range of " << getNS().ns() - << "; metadata not found", - _routingInfo->db().primary() || _routingInfo->cm()); - if (!_routingInfo->cm()) { return _routingInfo->db().primaryId() == ShardRegistry::kConfigServerShardId; } @@ -872,21 +795,18 @@ int ChunkManagerTargeter::getNShardsOwningChunks() const { return 0; } -Status ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) { - auto routingInfoStatus = - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss, true); - if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); - } +void ChunkManagerTargeter::_refreshShardVersionNow(OperationContext* opCtx) { + uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh(opCtx, _nss, true)); - return init(opCtx); + _init(opCtx); } -Status ChunkManagerTargeter::_refreshDbVersionNow(OperationContext* opCtx) { +void ChunkManagerTargeter::_refreshDbVersionNow(OperationContext* opCtx) { Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion( _nss.db(), std::move(_routingInfo->db().databaseVersion())); - return init(opCtx); + _init(opCtx); } } // namespace mongo diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index fa215756f0b..d280d6383b9 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -60,37 +60,28 @@ public: enum class UpdateType { kReplacement, kOpStyle, kUnknown }; /** - * If 'targetEpoch' is not boost::none, throws a 'StaleEpoch' exception if the collection given - * by 'nss' is ever found to not have the target epoch. - */ - ChunkManagerTargeter(const NamespaceString& nss, - boost::optional<OID> targetEpoch = boost::none); - - /** - * Initializes the ChunkManagerTargeter with the latest targeting information for the - * namespace. May need to block and load information from a remote config server. + * Initializes the targeter with the latest routing information for the namespace, which means + * it may have to block and load information from the config server. * - * Throws a 'StaleEpoch' exception if the collection targeted has an epoch which does not match - * '_targetEpoch' - * Returns !OK if the information could not be initialized. + * If 'expectedEpoch' is specified, the targeter will throws 'StaleEpoch' exception if the epoch + * for 'nss' ever becomes different from 'expectedEpoch'. Otherwise, the targeter will continue + * targeting even if the collection gets dropped and recreated. */ - Status init(OperationContext* opCtx); + ChunkManagerTargeter(OperationContext* opCtx, + const NamespaceString& nss, + boost::optional<OID> expectedEpoch = boost::none); const NamespaceString& getNS() const override; - // Returns ShardKeyNotFound if document does not have a full shard key. - StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx, - const BSONObj& doc) const override; + ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const override; - // Returns ShardKeyNotFound if the update can't be targeted without a shard key. - StatusWith<std::vector<ShardEndpoint>> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override; + std::vector<ShardEndpoint> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override; - // Returns ShardKeyNotFound if the delete can't be targeted without a shard key. - StatusWith<std::vector<ShardEndpoint>> targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const override; + std::vector<ShardEndpoint> targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override; - StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override; + std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override; void noteCouldNotTarget() override; @@ -109,22 +100,24 @@ public: * * Also see NSTargeter::refreshIfNeeded(). */ - Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override; + void refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override; - virtual bool endpointIsConfigServer() const override; + bool endpointIsConfigServer() const override; int getNShardsOwningChunks() const override; private: + void _init(OperationContext* opCtx); + /** * Performs an actual refresh from the config server. */ - Status _refreshShardVersionNow(OperationContext* opCtx); + void _refreshShardVersionNow(OperationContext* opCtx); /** * Performs an actual refresh from the config server. */ - Status _refreshDbVersionNow(OperationContext* opCtx); + void _refreshDbVersionNow(OperationContext* opCtx); /** * Returns a vector of ShardEndpoints for a potentially multi-shard query. diff --git a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp index 541b8b30011..7da11f654f6 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter_test.cpp @@ -65,9 +65,7 @@ public: ChunkManagerTargeter prepare(BSONObj shardKeyPattern, const std::vector<BSONObj>& splitPoints) { chunkManager = makeChunkManager(kNss, ShardKeyPattern(shardKeyPattern), nullptr, false, splitPoints); - ChunkManagerTargeter cmTargeter(kNss); - auto status = cmTargeter.init(operationContext()); - return cmTargeter; + return ChunkManagerTargeter(operationContext(), kNss); } std::shared_ptr<ChunkManager> chunkManager; }; @@ -83,29 +81,24 @@ TEST_F(ChunkManagerTargeterTest, TargetInsertWithRangePrefixHashedShardKey) { splitPoints); auto res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: -111}, c: {d: '1'}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "1"); + ASSERT_EQUALS(res.shardName, "1"); res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: -10}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "2"); + ASSERT_EQUALS(res.shardName, "2"); res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: 4}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "3"); + ASSERT_EQUALS(res.shardName, "3"); res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 1000}, c: null, d: {}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "4"); + ASSERT_EQUALS(res.shardName, "4"); // Missing field will be treated as null and will be targeted to the chunk which holds null, // which is shard '1'. res = cmTargeter.targetInsert(operationContext(), BSONObj()); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "1"); + ASSERT_EQUALS(res.shardName, "1"); + res = cmTargeter.targetInsert(operationContext(), BSON("a" << 10)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "1"); + ASSERT_EQUALS(res.shardName, "1"); // Arrays along shard key path are not allowed. ASSERT_THROWS_CODE(cmTargeter.targetInsert(operationContext(), fromjson("{a: [1,2]}")), @@ -131,14 +124,13 @@ TEST_F(ChunkManagerTargeterTest, TargetInsertsWithVaryingHashedPrefixAndConstant for (int i = 0; i < 1000; i++) { auto insertObj = BSON("a" << BSON("b" << i) << "c" << BSON("d" << 10)); - const auto res = cmTargeter.targetInsert(operationContext(), insertObj); - ASSERT_OK(res.getStatus()); + auto res = cmTargeter.targetInsert(operationContext(), insertObj); // Verify that the given document is being routed based on hashed value of 'i'. auto chunk = chunkManager->findIntersectingChunkWithSimpleCollation( BSON("a.b" << BSONElementHasher::hash64(insertObj["a"]["b"], BSONElementHasher::DEFAULT_HASH_SEED))); - ASSERT_EQUALS(res.getValue().shardName, chunk.getShardId()); + ASSERT_EQUALS(res.shardName, chunk.getShardId()); } // Arrays along shard key path are not allowed. @@ -168,29 +160,24 @@ TEST_F(ChunkManagerTargeterTest, TargetInsertsWithConstantHashedPrefixAndVarying splitPoints); auto res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: -111}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "1"); + ASSERT_EQUALS(res.shardName, "1"); res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: -11}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "2"); + ASSERT_EQUALS(res.shardName, "2"); res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: 0}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "3"); + ASSERT_EQUALS(res.shardName, "3"); res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}, c: {d: 111}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "4"); + ASSERT_EQUALS(res.shardName, "4"); // Missing field will be treated as null and will be targeted to the chunk which holds null, // which is shard '1'. res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "1"); + ASSERT_EQUALS(res.shardName, "1"); + res = cmTargeter.targetInsert(operationContext(), fromjson("{a: {b: 0}}, c: 5}")); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().shardName, "1"); + ASSERT_EQUALS(res.shardName, "1"); } TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) { @@ -207,9 +194,8 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) { auto res = cmTargeter.targetUpdate( operationContext(), buildUpdate(fromjson("{'a.b': {$gt : 2}}"), fromjson("{a: {b: -1}}"), false)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "2"); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "2"); // When update targets using query. res = cmTargeter.targetUpdate( @@ -217,47 +203,44 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithRangePrefixHashedShardKey) { buildUpdate(fromjson("{$and: [{'a.b': {$gte : 0}}, {'a.b': {$lt: 99}}]}}"), fromjson("{$set: {p : 1}}"), false)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "3"); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "3"); res = cmTargeter.targetUpdate( operationContext(), buildUpdate(fromjson("{'a.b': {$lt : -101}}"), fromjson("{a: {b: 111}}"), false)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "1"); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "1"); // For op-style updates, query on _id gets targeted to all shards. res = cmTargeter.targetUpdate( operationContext(), buildUpdate(fromjson("{_id: 1}"), fromjson("{$set: {p: 111}}"), false)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 5); + ASSERT_EQUALS(res.size(), 5); // For replacement style updates, query on _id uses replacement doc to target. If the // replacement doc doesn't have shard key fields, then update should be routed to the shard // holding 'null' shard key documents. res = cmTargeter.targetUpdate(operationContext(), buildUpdate(fromjson("{_id: 1}"), fromjson("{p: 111}}"), false)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "1"); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "1"); // Upsert requires full shard key in query, even if the query can target a single shard. - res = cmTargeter.targetUpdate(operationContext(), - buildUpdate(fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"), - fromjson("{a: {b: -111}}"), - true)); - ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound); + ASSERT_THROWS_CODE( + cmTargeter.targetUpdate(operationContext(), + buildUpdate(fromjson("{'a.b': 100, 'c.d' : {$exists: false}}}"), + fromjson("{a: {b: -111}}"), + true)), + DBException, + ErrorCodes::ShardKeyNotFound); // Upsert success case. res = cmTargeter.targetUpdate( operationContext(), buildUpdate(fromjson("{'a.b': 100, 'c.d': 'val'}"), fromjson("{a: {b: -111}}"), true)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "4"); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "4"); } TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) { @@ -282,10 +265,8 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) { // 'updateQueryObj'. const auto res = cmTargeter.targetUpdate( operationContext(), buildUpdate(updateQueryObj, fromjson("{$set: {p: 1}}"), false)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, - findChunk(updateQueryObj["a"]["b"]).getShardId()); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, findChunk(updateQueryObj["a"]["b"]).getShardId()); } // Range queries on hashed field cannot be used for targeting. In this case, update will be @@ -293,13 +274,14 @@ TEST_F(ChunkManagerTargeterTest, TargetUpdateWithHashedPrefixHashedShardKey) { const auto updateObj = fromjson("{a: {b: -1}}"); auto res = cmTargeter.targetUpdate( operationContext(), buildUpdate(fromjson("{'a.b': {$gt : 101}}"), updateObj, false)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, findChunk(updateObj["a"]["b"]).getShardId()); - res = cmTargeter.targetUpdate( - operationContext(), - buildUpdate(fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false)); - ASSERT_EQUALS(res.getStatus(), ErrorCodes::InvalidOptions); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, findChunk(updateObj["a"]["b"]).getShardId()); + ASSERT_THROWS_CODE( + cmTargeter.targetUpdate( + operationContext(), + buildUpdate(fromjson("{'a.b': {$gt : 101}}"), fromjson("{$set: {p: 1}}"), false)), + DBException, + ErrorCodes::InvalidOptions); } TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) { @@ -313,33 +295,32 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithRangePrefixHashedShardKey) { splitPoints); // Cannot delete without full shardkey in the query. - auto res = - cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 2}}"))); - ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound); - res = cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': -101}"))); - ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound); + ASSERT_THROWS_CODE( + cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 2}}"))), + DBException, + ErrorCodes::ShardKeyNotFound); + ASSERT_THROWS_CODE( + cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': -101}"))), + DBException, + ErrorCodes::ShardKeyNotFound); // Delete targeted correctly with full shard key in query. - res = cmTargeter.targetDelete(operationContext(), - buildDelete(fromjson("{'a.b': -101, 'c.d': 5}"))); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "1"); - - // Query with MinKey value should go to chunk '0' because MinKey is smaller than - // BSONNULL. + auto res = cmTargeter.targetDelete(operationContext(), + buildDelete(fromjson("{'a.b': -101, 'c.d': 5}"))); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "1"); + + // Query with MinKey value should go to chunk '0' because MinKey is smaller than BSONNULL. res = cmTargeter.targetDelete( operationContext(), buildDelete(BSONObjBuilder().appendMinKey("a.b").append("c.d", 4).obj())); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "0"); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "0"); res = cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': 0, 'c.d': 5}"))); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, "3"); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, "3"); } TEST_F(ChunkManagerTargeterTest, TargetDeleteWithHashedPrefixHashedShardKey) { @@ -362,15 +343,15 @@ TEST_F(ChunkManagerTargeterTest, TargetDeleteWithHashedPrefixHashedShardKey) { // Verify that the given document is being routed based on hashed value of 'i' in // 'queryObj'. const auto res = cmTargeter.targetDelete(operationContext(), buildDelete(queryObj)); - ASSERT_OK(res.getStatus()); - ASSERT_EQUALS(res.getValue().size(), 1); - ASSERT_EQUALS(res.getValue()[0].shardName, findChunk(queryObj["a"]["b"]).getShardId()); + ASSERT_EQUALS(res.size(), 1); + ASSERT_EQUALS(res[0].shardName, findChunk(queryObj["a"]["b"]).getShardId()); } // Range queries on hashed field cannot be used for targeting. - auto res = - cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 101}}"))); - ASSERT_EQUALS(res.getStatus(), ErrorCodes::ShardKeyNotFound); + ASSERT_THROWS_CODE( + cmTargeter.targetDelete(operationContext(), buildDelete(fromjson("{'a.b': {$gt : 101}}"))), + DBException, + ErrorCodes::ShardKeyNotFound); } } // namespace diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp index 5d544dcfd9d..f3c4b97f8d6 100644 --- a/src/mongo/s/write_ops/cluster_write.cpp +++ b/src/mongo/s/write_ops/cluster_write.cpp @@ -33,73 +33,22 @@ #include "mongo/s/write_ops/cluster_write.h" -#include <algorithm> - -#include "mongo/base/status.h" -#include "mongo/client/connpool.h" -#include "mongo/client/dbclient_cursor.h" #include "mongo/db/lasterror.h" -#include "mongo/db/write_concern_options.h" -#include "mongo/s/balancer_configuration.h" -#include "mongo/s/catalog/type_collection.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/chunk_writes_tracker.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/config_server_client.h" #include "mongo/s/grid.h" -#include "mongo/s/shard_util.h" #include "mongo/s/write_ops/chunk_manager_targeter.h" -#include "mongo/util/str.h" namespace mongo { -namespace { - -void toBatchError(const Status& status, BatchedCommandResponse* response) { - response->clear(); - response->setStatus(status); - dassert(response->isValid(nullptr)); -} - -} // namespace void ClusterWriter::write(OperationContext* opCtx, const BatchedCommandRequest& request, BatchWriteExecStats* stats, BatchedCommandResponse* response, boost::optional<OID> targetEpoch) { - const NamespaceString& nss = request.getNS(); - LastError::Disabled disableLastError(&LastError::get(opCtx->getClient())); - // Config writes and shard writes are done differently - if (nss.db() == NamespaceString::kAdminDb) { - Grid::get(opCtx)->catalogClient()->writeConfigServerDirect(opCtx, request, response); - return; - } - - ChunkManagerTargeter targeter(request.getNS(), targetEpoch); - - Status targetInitStatus = targeter.init(opCtx); - if (!targetInitStatus.isOK()) { - toBatchError(targetInitStatus.withContext( - str::stream() - << "unable to initialize targeter for write op for collection " - << request.getNS().ns()), - response); - return; - } - - bool endpointIsConfigServer; - try { - endpointIsConfigServer = targeter.endpointIsConfigServer(); - } catch (DBException& ex) { - toBatchError(ex.toStatus(str::stream() - << "unable to target write op for collection " << request.getNS()), - response); - return; - } + ChunkManagerTargeter targeter(opCtx, request.getNS(), targetEpoch); - if (endpointIsConfigServer) { + if (targeter.endpointIsConfigServer()) { Grid::get(opCtx)->catalogClient()->writeConfigServerDirect(opCtx, request, response); return; } diff --git a/src/mongo/s/write_ops/mock_ns_targeter.cpp b/src/mongo/s/write_ops/mock_ns_targeter.cpp index b23c088f27a..15496780487 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.cpp +++ b/src/mongo/s/write_ops/mock_ns_targeter.cpp @@ -32,14 +32,9 @@ #include "mongo/s/write_ops/mock_ns_targeter.h" namespace mongo { +namespace { -MockNSTargeter::MockNSTargeter(const NamespaceString& nss, std::vector<MockRange> mockRanges) - : _nss(nss), _mockRanges(std::move(mockRanges)) { - ASSERT(_nss.isValid()); - ASSERT(!_mockRanges.empty()); -} - -ChunkRange MockNSTargeter::_parseRange(const BSONObj& query) { +ChunkRange parseRange(const BSONObj& query) { const StringData fieldName(query.firstElement().fieldName()); if (query.firstElement().isNumber()) { @@ -63,6 +58,29 @@ ChunkRange MockNSTargeter::_parseRange(const BSONObj& query) { MONGO_UNREACHABLE; } +} // namespace + +MockNSTargeter::MockNSTargeter(const NamespaceString& nss, std::vector<MockRange> mockRanges) + : _nss(nss), _mockRanges(std::move(mockRanges)) { + ASSERT(_nss.isValid()); + ASSERT(!_mockRanges.empty()); +} + +std::vector<ShardEndpoint> MockNSTargeter::_targetQuery(const BSONObj& query) const { + const ChunkRange queryRange(parseRange(query)); + + std::vector<ShardEndpoint> endpoints; + + for (const auto& range : _mockRanges) { + if (queryRange.overlapWith(range.range)) { + endpoints.push_back(range.endpoint); + } + } + + uassert(ErrorCodes::UnknownError, "no mock ranges found for query", !endpoints.empty()); + return endpoints; +} + void assertEndpointsEqual(const ShardEndpoint& endpointA, const ShardEndpoint& endpointB) { ASSERT_EQUALS(endpointA.shardName, endpointB.shardName); ASSERT_EQUALS(endpointA.shardVersion.toLong(), endpointB.shardVersion.toLong()); diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index 1a308e19ce0..ec6fb892834 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -64,35 +64,31 @@ public: /** * Returns a ShardEndpoint for the doc from the mock ranges */ - StatusWith<ShardEndpoint> targetInsert(OperationContext* opCtx, - const BSONObj& doc) const override { - auto swEndpoints = _targetQuery(doc); - if (!swEndpoints.isOK()) - return swEndpoints.getStatus(); - - ASSERT_EQ(1U, swEndpoints.getValue().size()); - return swEndpoints.getValue().front(); + ShardEndpoint targetInsert(OperationContext* opCtx, const BSONObj& doc) const override { + auto endpoints = _targetQuery(doc); + ASSERT_EQ(1U, endpoints.size()); + return endpoints.front(); } /** * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - StatusWith<std::vector<ShardEndpoint>> targetUpdate( - OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { - return _targetQuery(updateDoc.getQ()); + std::vector<ShardEndpoint> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateOp) const override { + return _targetQuery(updateOp.getQ()); } /** * Returns the first ShardEndpoint for the query from the mock ranges. Only can handle * queries of the form { field : { $gte : <value>, $lt : <value> } }. */ - StatusWith<std::vector<ShardEndpoint>> targetDelete( - OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteDoc) const override { - return _targetQuery(deleteDoc.getQ()); + std::vector<ShardEndpoint> targetDelete( + OperationContext* opCtx, const write_ops::DeleteOpEntry& deleteOp) const override { + return _targetQuery(deleteOp.getQ()); } - StatusWith<std::vector<ShardEndpoint>> targetAllShards(OperationContext* opCtx) const override { + std::vector<ShardEndpoint> targetAllShards(OperationContext* opCtx) const override { std::vector<ShardEndpoint> endpoints; for (const auto& range : _mockRanges) { endpoints.push_back(range.endpoint); @@ -115,11 +111,10 @@ public: // No-op } - Status refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override { + void refreshIfNeeded(OperationContext* opCtx, bool* wasChanged) override { // No-op if (wasChanged) *wasChanged = false; - return Status::OK(); } bool endpointIsConfigServer() const override { @@ -133,28 +128,11 @@ public: } private: - static ChunkRange _parseRange(const BSONObj& query); - /** * Returns the first ShardEndpoint for the query from the mock ranges. Only handles queries of * the form { field : { $gte : <value>, $lt : <value> } }. */ - StatusWith<std::vector<ShardEndpoint>> _targetQuery(const BSONObj& query) const { - const ChunkRange queryRange(_parseRange(query)); - - std::vector<ShardEndpoint> endpoints; - - for (const auto& range : _mockRanges) { - if (queryRange.overlapWith(range.range)) { - endpoints.push_back(range.endpoint); - } - } - - if (endpoints.empty()) - return {ErrorCodes::UnknownError, "no mock ranges found for query"}; - - return endpoints; - } + std::vector<ShardEndpoint> _targetQuery(const BSONObj& query) const; NamespaceString _nss; diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index ca2407843d3..c7efb0b3c7b 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -52,23 +52,18 @@ const WriteErrorDetail& WriteOp::getOpError() const { return *_error; } -Status WriteOp::targetWrites(OperationContext* opCtx, - const NSTargeter& targeter, - std::vector<TargetedWrite*>* targetedWrites) { - auto swEndpoints = [&]() -> StatusWith<std::vector<ShardEndpoint>> { +void WriteOp::targetWrites(OperationContext* opCtx, + const NSTargeter& targeter, + std::vector<TargetedWrite*>* targetedWrites) { + auto endpoints = [&] { if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Insert) { - auto swEndpoint = targeter.targetInsert(opCtx, _itemRef.getDocument()); - if (!swEndpoint.isOK()) - return swEndpoint.getStatus(); - - return std::vector<ShardEndpoint>{std::move(swEndpoint.getValue())}; + return std::vector{targeter.targetInsert(opCtx, _itemRef.getDocument())}; } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Update) { return targeter.targetUpdate(opCtx, _itemRef.getUpdate()); } else if (_itemRef.getOpType() == BatchedCommandRequest::BatchType_Delete) { return targeter.targetDelete(opCtx, _itemRef.getDelete()); - } else { - MONGO_UNREACHABLE; } + MONGO_UNREACHABLE; }(); // Unless executing as part of a transaction, if we're targeting more than one endpoint with an @@ -77,40 +72,33 @@ Status WriteOp::targetWrites(OperationContext* opCtx, // NOTE: Index inserts are currently specially targeted only at the current collection to avoid // creating collections everywhere. const bool inTransaction = bool(TransactionRouter::get(opCtx)); - if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !inTransaction) { - swEndpoints = targeter.targetAllShards(opCtx); + if (endpoints.size() > 1u && !inTransaction) { + endpoints = targeter.targetAllShards(opCtx); } - // If we had an error, stop here - if (!swEndpoints.isOK()) - return swEndpoints.getStatus(); - - auto& endpoints = swEndpoints.getValue(); - for (auto&& endpoint : endpoints) { - // if the operation was already successfull on that shard, there is no need to repeat the - // write - if (!_successfulShardSet.count(endpoint.shardName)) { - _childOps.emplace_back(this); - - WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1); - - // Outside of a transaction, multiple endpoints currently imply no versioning, since we - // can't retry half a regular multi-write. - if (endpoints.size() > 1u && !inTransaction) { - endpoint.shardVersion = ChunkVersion::IGNORED(); - endpoint.shardVersion.canThrowSSVOnIgnored(); - } + // If the operation was already successfull on that shard, do not repeat it + if (_successfulShardSet.count(endpoint.shardName)) + continue; - targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref)); + _childOps.emplace_back(this); - _childOps.back().pendingWrite = targetedWrites->back(); - _childOps.back().state = WriteOpState_Pending; + WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1); + + // Outside of a transaction, multiple endpoints currently imply no versioning, since we + // can't retry half a regular multi-write. + if (endpoints.size() > 1u && !inTransaction) { + endpoint.shardVersion = ChunkVersion::IGNORED(); + endpoint.shardVersion.canThrowSSVOnIgnored(); } + + targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref)); + + _childOps.back().pendingWrite = targetedWrites->back(); + _childOps.back().state = WriteOpState_Pending; } _state = WriteOpState_Pending; - return Status::OK(); } size_t WriteOp::getNumTargeted() { diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h index ca959dc783f..138b466a556 100644 --- a/src/mongo/s/write_ops/write_op.h +++ b/src/mongo/s/write_ops/write_op.h @@ -119,9 +119,9 @@ public: * Returns !OK if the targeting process itself fails * (no TargetedWrites will be added, state unchanged) */ - Status targetWrites(OperationContext* opCtx, - const NSTargeter& targeter, - std::vector<TargetedWrite*>* targetedWrites); + void targetWrites(OperationContext* opCtx, + const NSTargeter& targeter, + std::vector<TargetedWrite*>* targetedWrites); /** * Returns the number of child writes that were last targeted. diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index fee63d63f09..193ab8fe943 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -113,9 +113,7 @@ TEST_F(WriteOpTest, TargetSingle) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 1u); assertEndpointsEqual(targeted.front()->endpoint, endpoint); @@ -147,9 +145,7 @@ TEST_F(WriteOpTest, TargetMultiOneShard) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 1u); assertEndpointsEqual(targeted.front()->endpoint, endpointA); @@ -182,9 +178,7 @@ TEST_F(WriteOpTest, TargetMultiAllShards) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 3u); sortByEndpoint(&targeted); @@ -222,9 +216,7 @@ TEST_F(WriteOpTest, TargetMultiAllShardsAndErrorSingleChildOp) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 2u); sortByEndpoint(&targeted); @@ -266,9 +258,7 @@ TEST_F(WriteOpTest, ErrorSingle) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 1u); assertEndpointsEqual(targeted.front()->endpoint, endpoint); @@ -302,9 +292,7 @@ TEST_F(WriteOpTest, CancelSingle) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 1u); assertEndpointsEqual(targeted.front()->endpoint, endpoint); @@ -337,9 +325,7 @@ TEST_F(WriteOpTest, RetrySingleOp) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 1u); assertEndpointsEqual(targeted.front()->endpoint, endpoint); @@ -382,10 +368,9 @@ TEST_F(WriteOpTransactionTest, TargetMultiDoesNotTargetAllShards) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); + writeOp.targetWrites(_opCtx, targeter, &targeted); // The write should only target shardA and shardB and send real shard versions to each. - ASSERT(status.isOK()); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 2u); sortByEndpoint(&targeted); @@ -425,9 +410,7 @@ TEST_F(WriteOpTransactionTest, TargetMultiAllShardsAndErrorSingleChildOp) { OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); - Status status = writeOp.targetWrites(_opCtx, targeter, &targeted); - - ASSERT(status.isOK()); + writeOp.targetWrites(_opCtx, targeter, &targeted); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); ASSERT_EQUALS(targeted.size(), 2u); sortByEndpoint(&targeted); |