diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-10-22 16:32:24 -0400 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2018-10-31 13:20:36 -0400 |
commit | c9290364162cd76697ebee76c63ec148cd305101 (patch) | |
tree | aef6b15af18651fe86d6274ed85257ec7af9a879 | |
parent | 3caa3c4a4be7b84823f22f481365f58b124d6d00 (diff) | |
download | mongo-c9290364162cd76697ebee76c63ec148cd305101.tar.gz |
SERVER-37191 writeConcern for $out
24 files changed, 257 insertions, 190 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 9d4e90c2d5e..3cbe1c9b096 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -61,7 +61,8 @@ selector: - jstests/sharding/txn_writes_during_movechunk.js - jstests/sharding/update_sharded.js - jstests/sharding/shard_existing_coll_chunk_count.js - # Enable if SERVER-20865 is backported or 4.2 becomes last-stable + - jstests/sharding/out_write_concern.js +# Enable if SERVER-20865 is backported or 4.2 becomes last-stable - jstests/sharding/sharding_statistics_server_status.js executor: config: diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index 44aa9ba96ae..91c1737b1cb 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -375,7 +375,8 @@ selector: - jstests/sharding/txn_coordinator_commands_basic_requirements.js - jstests/sharding/txn_writes_during_movechunk.js - jstests/sharding/update_sharded.js - # Enable if SERVER-20865 is backported or 4.2 becomes last-stable + - jstests/sharding/out_write_concern.js +# Enable if SERVER-20865 is backported or 4.2 becomes last-stable - jstests/sharding/sharding_statistics_server_status.js executor: diff --git a/jstests/aggregation/sources/out/use_cases.js b/jstests/aggregation/sources/out/use_cases.js index dd936c5c533..243f1f78ff4 100644 --- a/jstests/aggregation/sources/out/use_cases.js +++ b/jstests/aggregation/sources/out/use_cases.js @@ -102,15 +102,9 @@ runAggregate(hourSix, "insertDocuments"); - // TODO SERVER-37191 reenable when fixed. - // assert.eq(3, res.length, tojson(res)); - // assert.eq(res[2], {_id: "2018-08-15T06", ticks: ticksSum, avgTemp: tempSum / - // samplesPerHour}); - // also remove the assert.soon workaround. - assert.soon(() => { - res = rollupColl.find().sort({_id: 1}).toArray(); - return res.length == 3; - }); + res = rollupColl.find().sort({_id: 1}).toArray(); + assert.eq(3, res.length, tojson(res)); + assert.eq(res[2], {_id: "2018-08-15T06", ticks: ticksSum, avgTemp: tempSum / samplesPerHour}); st.stop(); }()); diff --git a/jstests/sharding/commands_that_write_accept_wc_shards.js b/jstests/sharding/commands_that_write_accept_wc_shards.js index 6d843652d12..aebf9d5ad18 100644 --- a/jstests/sharding/commands_that_write_accept_wc_shards.js +++ b/jstests/sharding/commands_that_write_accept_wc_shards.js @@ -127,63 +127,6 @@ load('jstests/libs/write_concern_util.js'); admin: false }); - // Aggregate with passthrough. - commands.push({ - req: {aggregate: collName, pipeline: [{$sort: {type: 1}}, {$out: "foo"}], cursor: {}}, - setupFunc: function() { - coll.insert({_id: 1, x: 3, type: 'oak'}); - coll.insert({_id: 2, x: 13, type: 'maple'}); - }, - confirmFunc: function() { - assert.eq(db.foo.find({type: 'oak'}).itcount(), 1); - assert.eq(db.foo.find({type: 'maple'}).itcount(), 1); - db.foo.drop(); - }, - admin: false - }); - - // Aggregate that only matches one shard. - commands.push({ - req: { - aggregate: collName, - pipeline: [{$match: {x: -3}}, {$match: {type: {$exists: 1}}}, {$out: "foo"}], - cursor: {} - }, - setupFunc: function() { - shardCollectionWithChunks(st, coll); - coll.insert({_id: 1, x: -3, type: 'oak'}); - coll.insert({_id: 2, x: -4, type: 'maple'}); - }, - confirmFunc: function() { - assert.eq(db.foo.find().itcount(), 1); - assert.eq(db.foo.find({type: 'oak'}).itcount(), 1); - assert.eq(db.foo.find({type: 'maple'}).itcount(), 0); - db.foo.drop(); - }, - admin: false - }); - - // Aggregate that must go to multiple shards. - commands.push({ - req: { - aggregate: collName, - pipeline: [{$match: {type: {$exists: 1}}}, {$sort: {type: 1}}, {$out: "foo"}], - cursor: {} - }, - setupFunc: function() { - shardCollectionWithChunks(st, coll); - coll.insert({_id: 1, x: -3, type: 'oak'}); - coll.insert({_id: 2, x: 23, type: 'maple'}); - }, - confirmFunc: function() { - assert.eq(db.foo.find().itcount(), 2); - assert.eq(db.foo.find({type: 'oak'}).itcount(), 1); - assert.eq(db.foo.find({type: 'maple'}).itcount(), 1); - db.foo.drop(); - }, - admin: false - }); - // MapReduce on an unsharded collection. commands.push({ req: { diff --git a/jstests/sharding/out_write_concern.js b/jstests/sharding/out_write_concern.js new file mode 100644 index 00000000000..d4aeb75c1d0 --- /dev/null +++ b/jstests/sharding/out_write_concern.js @@ -0,0 +1,92 @@ +// Tests that $out respects the writeConcern set on the original aggregation command. +(function() { + "use strict"; + + load("jstests/aggregation/extras/out_helpers.js"); // For withEachOutMode() and isSharded(). + + const st = new ShardingTest({shards: 2, rs: {nodes: 3}, config: 1}); + + const mongosDB = st.s0.getDB("out_write_concern"); + const source = mongosDB["source"]; + const target = mongosDB["target"]; + const shard0 = st.rs0; + const shard1 = st.rs1; + + // Enable sharding on the test DB and ensure its primary is shard0. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + function testWriteConcernError(rs) { + // Make sure that there are only 2 nodes up so w:3 writes will always time out. + const stoppedSecondary = rs.getSecondary(); + rs.stop(stoppedSecondary); + + // Test that $out correctly returns a WC error. + withEachOutMode((mode) => { + // Skip mode "replaceCollection" if the target collection is sharded, as it is + // unsupported. Also skip mode "replaceCollection" if the test is expecting a timeout + // against the non-primary shard, since this mode writes to a temp collection on the + // primary only. + if (mode == "replaceCollection" && (rs != shard0 || FixtureHelpers.isSharded(target))) + return; + + const res = mongosDB.runCommand({ + aggregate: "source", + pipeline: [{$out: {to: "target", mode: mode}}], + writeConcern: {w: 3, wtimeout: 100}, + cursor: {}, + }); + + // $out writeConcern errors are handled differently from normal writeConcern + // errors. Rather than returing ok:1 and a WriteConcernError, the entire operation + // fails. + assert.commandFailedWithCode(res, ErrorCodes.WriteConcernFailed); + assert.commandWorked(target.remove({})); + }); + + // Restart the stopped node and verify that the $out's now pass. + rs.restart(rs.getSecondary()); + rs.awaitReplication(); + withEachOutMode((mode) => { + // "replaceCollection" is banned when the target collection is sharded. + if (mode == "replaceCollection" && FixtureHelpers.isSharded(target)) + return; + + const res = mongosDB.runCommand({ + aggregate: "source", + pipeline: [{$out: {to: "target", mode: mode}}], + writeConcern: {w: 3}, + cursor: {}, + }); + + // Ensure that the write concern is satisfied within a reasonable amount of time. This + // prevents the test from hanging if for some reason the write concern can't be + // satisfied. + assert.soon(() => assert.commandWorked(res), "writeConcern was not satisfied"); + assert.commandWorked(target.remove({})); + }); + } + + // Test that when both collections are unsharded, all writes are directed to the primary shard. + assert.commandWorked(source.insert([{_id: -1}, {_id: 0}, {_id: 1}, {_id: 2}])); + testWriteConcernError(shard0); + + // Shard the source collection and continue to expect writes to the primary shard. + st.shardColl(source, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName()); + testWriteConcernError(shard0); + + // Shard the target collection, however make sure that all writes go to the primary shard by + // splitting the collection at {_id: 10} and keeping all values in the same chunk. + st.shardColl(target, {_id: 1}, {_id: 10}, {_id: 10}, mongosDB.getName()); + assert.eq(FixtureHelpers.isSharded(target), true); + testWriteConcernError(shard0); + + // Write a few documents to the source collection which will be $out-ed to the second shard. + assert.commandWorked(source.insert([{_id: 11}, {_id: 12}, {_id: 13}])); + + // Verify that either shard can produce a WriteConcernError since writes are going to both. + testWriteConcernError(shard0); + testWriteConcernError(shard1); + + st.stop(); +}()); diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 7b5e7ab7da2..9a45b38ec17 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -46,7 +46,6 @@ #include "mongo/db/query/query_request.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/db/write_concern_options.h" namespace mongo { @@ -225,6 +224,16 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( } } else if (bypassDocumentValidationCommandOption() == fieldName) { request.setBypassDocumentValidation(elem.trueValue()); + } else if (WriteConcernOptions::kWriteConcernField == fieldName) { + if (elem.type() != BSONType::Object) { + return {ErrorCodes::TypeMismatch, + str::stream() << fieldName << " must be an object, not a " + << typeName(elem.type())}; + } + + WriteConcernOptions writeConcern; + uassertStatusOK(writeConcern.parse(elem.embeddedObject())); + request.setWriteConcern(writeConcern); } else if (!isGenericArgument(fieldName)) { return {ErrorCodes::FailedToParse, str::stream() << "unrecognized field '" << elem.fieldName() << "'"}; @@ -324,7 +333,10 @@ Document AggregationRequest::serializeToCommandObj() const { // Only serialize maxTimeMs if specified. {QueryRequest::cmdOptionMaxTimeMS, _maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))}, - {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()}}; + {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()}, + {WriteConcernOptions::kWriteConcernField, + _writeConcern ? Value(_writeConcern->toBSON()) : Value()}, + }; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 019f46015f2..22134763ac9 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -38,6 +38,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/exchange_spec_gen.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/write_concern_options.h" namespace mongo { @@ -194,6 +195,10 @@ public: return _exchangeSpec; } + boost::optional<WriteConcernOptions> getWriteConcern() const { + return _writeConcern; + } + // // Setters for optional fields. // @@ -254,6 +259,10 @@ public: _exchangeSpec = std::move(spec); } + void setWriteConcern(WriteConcernOptions writeConcern) { + _writeConcern = writeConcern; + } + private: // Required fields. const NamespaceString _nss; @@ -299,5 +308,8 @@ private: // represents a producer running as a part of the exchange machinery. // This is an internal option; we do not expect it to be set on requests from users or drivers. boost::optional<ExchangeSpec> _exchangeSpec; + + // The explicit writeConcern for the operation or boost::none if the user did not specifiy one. + boost::optional<WriteConcernOptions> _writeConcern; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index d35a0c5b65a..502319d12dc 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -496,6 +496,12 @@ TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) { ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } +TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} // // Ignore fields parsed elsewhere. // @@ -507,12 +513,5 @@ TEST(AggregationRequestTest, ShouldIgnoreQueryOptions) { ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } -TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) { - NamespaceString nss("a.collection"); - const BSONObj inputBson = - fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}"); - ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index ea158dba17c..be19fbd681e 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -289,6 +289,7 @@ DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs, WriteModeEnum mode, std::set<FieldPath> uniqueKey) : DocumentSource(expCtx), + _writeConcern(expCtx->opCtx->getWriteConcern()), _done(false), _outputNs(std::move(outputNs)), _mode(mode), diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 8909e7dd88c..62deff0291a 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_out_gen.h" +#include "mongo/db/write_concern_options.h" namespace mongo { @@ -165,7 +166,8 @@ public: * Writes the documents in 'batch' to the write namespace. */ virtual void spill(BatchedObjects&& batch) { - pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), std::move(batch.objects)); + pExpCtx->mongoProcessInterface->insert( + pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern); }; /** @@ -188,6 +190,14 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); +protected: + // Stash the writeConcern of the original command as the operation context may change by the + // time we start to spill $out writes. This is because certain aggregations (e.g. $exchange) + // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation + // context. The getMore's will not have an attached writeConcern however we still want to + // respect the writeConcern of the original command. + WriteConcernOptions _writeConcern; + private: bool _initialized = false; bool _done = false; diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h index 2fc101d87a2..791d0985657 100644 --- a/src/mongo/db/pipeline/document_source_out_in_place.h +++ b/src/mongo/db/pipeline/document_source_out_in_place.h @@ -73,6 +73,7 @@ public: getWriteNs(), std::move(batch.uniqueKeys), std::move(batch.objects), + _writeConcern, upsert, multi); } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index ec91cba6780..da96b66c4b0 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -112,7 +112,8 @@ public: */ virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) = 0; + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) = 0; /** * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException @@ -122,6 +123,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) = 0; diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 103ac12ec60..ee900586ba4 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -65,7 +65,8 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) final { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) final { MONGO_UNREACHABLE; } @@ -73,6 +74,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) final { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index ce360bbd5bb..c6ccdb2d7ad 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -61,6 +61,18 @@ using write_ops::Insert; using write_ops::Update; using write_ops::UpdateOpEntry; +namespace { + +// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has +// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it. +void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOptions& writeConcern) { + if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) { + request->setWriteConcern(writeConcern.toBSON()); + } +} + +} // namespace + std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields( OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); @@ -121,15 +133,19 @@ std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocume void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) { BatchedCommandResponse response; BatchWriteExecStats stats; - ClusterWriter::write( - expCtx->opCtx, - BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)), - &stats, - &response); + BatchedCommandRequest insertCommand( + buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + + // If applicable, attach a write concern to the batched command request. + attachWriteConcern(&insertCommand, wc); + + ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response); + // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); } @@ -138,19 +154,24 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) { BatchedCommandResponse response; BatchWriteExecStats stats; - ClusterWriter::write(expCtx->opCtx, - BatchedCommandRequest(buildUpdateOp(ns, - std::move(queries), - std::move(updates), - upsert, - multi, - expCtx->bypassDocumentValidation)), - &stats, - &response); + + BatchedCommandRequest updateCommand(buildUpdateOp(ns, + std::move(queries), + std::move(updates), + upsert, + multi, + expCtx->bypassDocumentValidation)); + + // If applicable, attach a write concern to the batched command request. + attachWriteConcern(&updateCommand, wc); + + ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response); + // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Update failed: "); } diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index 1550b9a6a62..39d550fc083 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -52,7 +52,8 @@ public: */ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) final; + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) final; /** * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, @@ -62,6 +63,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) final; }; diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 660ee570b43..27ef61d9b7e 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -155,7 +155,8 @@ Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss, void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) { auto writeResults = performInserts( expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); @@ -176,6 +177,7 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) { auto writeResults = performUpdates(expCtx->opCtx, diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index e02ac9e6339..de64e9b0578 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -58,11 +58,13 @@ public: bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) override; + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) override; void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) override; CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index c085850eccc..e822be1e7b1 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -61,7 +61,8 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) override { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) override { MONGO_UNREACHABLE; } @@ -69,6 +70,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) final { MONGO_UNREACHABLE; diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp index c95fe33280e..84e527fde77 100644 --- a/src/mongo/s/commands/cluster_current_op.cpp +++ b/src/mongo/s/commands/cluster_current_op.cpp @@ -72,16 +72,12 @@ private: virtual StatusWith<CursorResponse> runAggregation( OperationContext* opCtx, const AggregationRequest& request) const final { - auto aggCmdObj = request.serializeToCommandObj().toBson(); auto nss = request.getNamespaceString(); BSONObjBuilder responseBuilder; - auto status = ClusterAggregate::runAggregate(opCtx, - ClusterAggregate::Namespaces{nss, nss}, - request, - std::move(aggCmdObj), - &responseBuilder); + auto status = ClusterAggregate::runAggregate( + opCtx, ClusterAggregate::Namespaces{nss, nss}, request, &responseBuilder); if (!status.isOK()) { return status; diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 260de4dc13e..f1cb773895f 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -78,12 +78,8 @@ public: const auto& nss = aggregationRequest.getNamespaceString(); try { - uassertStatusOK( - ClusterAggregate::runAggregate(opCtx, - ClusterAggregate::Namespaces{nss, nss}, - aggregationRequest, - cmdObj, - result)); + uassertStatusOK(ClusterAggregate::runAggregate( + opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, result)); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { // If the aggregation failed because the namespace is a view, re-run the command // with the resolved view pipeline and namespace. diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 6a94072cd5f..76617c91f05 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -55,6 +55,7 @@ #include "mongo/db/query/find_common.h" #include "mongo/db/views/resolved_view.h" #include "mongo/db/views/view.h" +#include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" @@ -89,6 +90,15 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries; namespace { +Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) { + // The idempotent retry policy will retry even for writeConcern failures, so only set it if the + // pipeline does not support writeConcern. + if (req.getWriteConcern()) { + return Shard::RetryPolicy::kNotIdempotent; + } + return Shard::RetryPolicy::kIdempotent; +} + // Given a document representing an aggregation command such as // // {aggregate: "myCollection", pipeline: [], ...}, @@ -218,15 +228,12 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, const boost::optional<ShardId>& shardId, Pipeline* pipeline, - const BSONObj& originalCmdObj, BSONObj collationObj) { // Create the command for the shards. MutableDocument targetedCmd(request.serializeToCommandObj()); if (pipeline) { targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); } - // This pipeline is not split, ensure that the write concern is propagated if present. - targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj); } @@ -247,10 +254,17 @@ BSONObj createCommandForTargetedShards( // send to the shards. targetedCmd[AggregationRequest::kPipelineName] = Value(splitPipeline.shardsPipeline->serialize()); + // When running on many shards with the exchange we may not need merging. if (needsMerge) { targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards + // part. Otherwise this is part of an exchange and in that case we should include the + // writeConcern. + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); } + targetedCmd[AggregationRequest::kCursorName] = Value(DOC(AggregationRequest::kBatchSizeName << 0)); @@ -263,14 +277,12 @@ BSONObj createCommandForTargetedShards( BSONObj createCommandForMergingShard(const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const BSONObj originalCmdObj, const ShardId& shardId, const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); mergeCmd[AggregationRequest::kFromMongosName] = Value(true); - mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); // If the user didn't specify a collation already, make sure there's a collation attached to // the merge command, since the merging shard may not have the collection metadata. @@ -296,14 +308,14 @@ std::vector<RemoteCursor> establishShardCursors( const LiteParsedPipeline& litePipe, boost::optional<CachedCollectionRoutingInfo>& routingInfo, const BSONObj& cmdObj, + const AggregationRequest& request, const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { + const BSONObj& shardQuery) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); std::set<ShardId> shardIds = - getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation); + getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); std::vector<std::pair<ShardId, BSONObj>> requests; // If we don't need to run on all shards, then we should always have a valid routing table. @@ -345,7 +357,8 @@ std::vector<RemoteCursor> establishShardCursors( nss, readPref, requests, - false /* do not allow partial results */); + false /* do not allow partial results */, + getDesiredRetryPolicy(request)); } struct DispatchShardPipelineResults { @@ -384,7 +397,6 @@ struct DispatchShardPipelineResults { DispatchShardPipelineResults dispatchShardPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, - BSONObj originalCmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, @@ -406,8 +418,6 @@ DispatchShardPipelineResults dispatchShardPipeline( const auto shardQuery = pipeline->getInitialQuery(); - boost::optional<SplitPipeline> splitPipeline; - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. @@ -440,6 +450,7 @@ DispatchShardPipelineResults dispatchShardPipeline( *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + boost::optional<SplitPipeline> splitPipeline; if (needsSplit) { splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); @@ -453,7 +464,7 @@ DispatchShardPipelineResults dispatchShardPipeline( ? createCommandForTargetedShards( opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) : createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, pipeline.get(), originalCmdObj, collationObj); + opCtx, aggRequest, boost::none, pipeline.get(), collationObj); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for @@ -497,9 +508,9 @@ DispatchShardPipelineResults dispatchShardPipeline( liteParsedPipeline, executionNsRoutingInfo, targetedCommand, + aggRequest, ReadPreferenceSetting::get(opCtx), - shardQuery, - aggRequest.getCollation()); + shardQuery); invariant(cursors.size() % shardIds.size() == 0, str::stream() << "Number of cursors (" << cursors.size() << ") is not a multiple of producers (" @@ -533,7 +544,6 @@ DispatchShardPipelineResults dispatchShardPipeline( DispatchShardPipelineResults dispatchExchangeConsumerPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, - BSONObj originalCmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, BSONObj collationObj, @@ -666,6 +676,7 @@ Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const NamespaceString& nss, + const AggregationRequest& request, const BSONObj mergeCmdObj, const ShardId& mergingShardId) { if (MONGO_FAIL_POINT(clusterAggregateFailToEstablishMergingShardCursor)) { @@ -677,19 +688,15 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); - return uassertStatusOK( - mergingShard->runCommandWithFixedRetryAttempts(opCtx, - ReadPreferenceSetting::get(opCtx), - nss.db().toString(), - mergeCmdObj, - Shard::RetryPolicy::kIdempotent)); + Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request); + return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts( + opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy)); } BSONObj establishMergingMongosCursor( OperationContext* opCtx, const AggregationRequest& request, const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, const LiteParsedPipeline& liteParsedPipeline, std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging) { @@ -963,7 +970,6 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, const LiteParsedPipeline& litePipe, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, BSONObjBuilder* result) { @@ -982,8 +988,8 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx !pipeline->getSources().front()->constraints().requiresInputDocSource); // Register the new mongoS cursor, and retrieve the initial batch of results. - auto cursorResponse = establishMergingMongosCursor( - opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline)); + auto cursorResponse = + establishMergingMongosCursor(opCtx, request, requestedNss, litePipe, std::move(pipeline)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -994,7 +1000,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, const LiteParsedPipeline& litePipe, const boost::optional<CachedCollectionRoutingInfo>& routingInfo, DispatchShardPipelineResults&& shardDispatchResults, @@ -1027,7 +1032,6 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex return runPipelineOnMongoS(expCtx, namespaces, request, - shardDispatchResults.commandForTargetedShards, litePipe, std::move(shardDispatchResults.splitPipeline->mergePipeline), result); @@ -1049,12 +1053,11 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex targetedShards, routingInfo->db().primaryId()); - auto mergeCmdObj = - createCommandForMergingShard(request, expCtx, cmdObj, mergingShardId, mergePipeline); + auto mergeCmdObj = createCommandForMergingShard(request, expCtx, mergingShardId, mergePipeline); // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. - auto mergeResponse = - establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId); + auto mergeResponse = establishMergingShardCursor( + opCtx, namespaces.executionNss, request, mergeCmdObj, mergingShardId); auto mergeCursorResponse = uassertStatusOK(storePossibleCursor( opCtx, namespaces.requestedNss, mergingShardId, mergeResponse, expCtx->tailableMode)); @@ -1084,7 +1087,6 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx, Status ClusterAggregate::runAggregate(OperationContext* opCtx, const Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, BSONObjBuilder* result) { auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); boost::optional<CachedCollectionRoutingInfo> routingInfo; @@ -1119,7 +1121,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) { resolveInvolvedNamespaces(opCtx, litePipe); const auto primaryShardId = routingInfo->db().primary()->getId(); - return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result); + return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result); } // Populate the collection UUID and the appropriate collation to use. @@ -1147,17 +1149,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } return runPipelineOnMongoS( - expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result); + expCtx, namespaces, request, litePipe, std::move(pipeline), result); } // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = dispatchShardPipeline(expCtx, - namespaces.executionNss, - cmdObj, - request, - litePipe, - std::move(pipeline), - collationObj); + auto shardDispatchResults = dispatchShardPipeline( + expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); // If the operation is an explain, then we verify that it succeeded on all targeted shards, // write the results to the output builder, and return immediately. @@ -1183,7 +1180,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, if (shardDispatchResults.exchangeSpec) { shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx, namespaces.executionNss, - cmdObj, request, litePipe, collationObj, @@ -1194,7 +1190,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return dispatchMergingPipeline(expCtx, namespaces, request, - cmdObj, litePipe, routingInfo, std::move(shardDispatchResults), @@ -1223,7 +1218,6 @@ void ClusterAggregate::uassertAllShardsSupportExplain( Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, const ShardId& shardId, - BSONObj cmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, BSONObjBuilder* out) { @@ -1241,8 +1235,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. - cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, cmdObj, BSONObj())); + BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( + createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj())); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, @@ -1295,7 +1289,6 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx, } auto resolvedAggRequest = resolvedView.asExpandedViewAggregation(request); - auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); result->resetToEmpty(); if (auto txnRouter = TransactionRouter::get(opCtx)) { @@ -1310,8 +1303,7 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx, nsStruct.requestedNss = requestedNss; nsStruct.executionNss = resolvedView.getNamespace(); - auto status = - ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result); + auto status = ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, result); // If the underlying namespace was changed to a view during retry, then re-run the aggregation // on the new resolved namespace. diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 3438c71b0b1..822ad3cc078 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -77,14 +77,11 @@ public: * over which the aggregation will actually execute. Typically these two namespaces are the * same, but they may differ in the case of a query on a view. * - * The raw aggregate command parameters should be passed in 'cmdObj'. - * * On success, fills out 'result' with the command response. */ static Status runAggregate(OperationContext* opCtx, const Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, BSONObjBuilder* result); /** @@ -104,22 +101,9 @@ private: static void uassertAllShardsSupportExplain( const std::vector<AsyncRequestsSender::Response>& shardResults); - // These are temporary hacks because the runCommand method doesn't report the exact - // host the command was run on which is necessary for cursor support. The exact host - // could be different from conn->getServerAddress() for connections that map to - // multiple servers such as for replica sets. These also take care of registering - // returned cursors. - static BSONObj aggRunCommand(OperationContext* opCtx, - const ShardId& shardId, - DBClientBase* conn, - const Namespaces& namespaces, - const AggregationRequest& aggRequest, - BSONObj cmd); - static Status aggPassthrough(OperationContext*, const Namespaces&, const ShardId&, - BSONObj cmd, const AggregationRequest&, const LiteParsedPipeline&, BSONObjBuilder* result); diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index b9881c03d5f..0aff00c9c34 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -55,7 +55,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, const NamespaceString& nss, const ReadPreferenceSetting readPref, const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults) { + bool allowPartialResults, + Shard::RetryPolicy retryPolicy) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; for (const auto& remote : remotes) { @@ -63,12 +64,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } // Send the requests - MultiStatementTransactionRequestsSender ars(opCtx, - executor, - nss.db().toString(), - std::move(requests), - readPref, - Shard::RetryPolicy::kIdempotent); + MultiStatementTransactionRequestsSender ars( + opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy); std::vector<RemoteCursor> remoteCursors; try { diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index e07a3d8b328..2c956b50b18 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -38,6 +38,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" +#include "mongo/s/client/shard.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" @@ -63,12 +64,14 @@ class CursorResponse; * on reachable hosts are returned. * */ -std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults); +std::vector<RemoteCursor> establishCursors( + OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults, + Shard::RetryPolicy retryPolicy = Shard::RetryPolicy::kIdempotent); /** * Schedules a remote killCursor command for each of the cursors in 'remoteCursors'. |