summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-10-22 16:32:24 -0400
committerIan Boros <ian.boros@10gen.com>2018-10-31 13:20:36 -0400
commitc9290364162cd76697ebee76c63ec148cd305101 (patch)
treeaef6b15af18651fe86d6274ed85257ec7af9a879
parent3caa3c4a4be7b84823f22f481365f58b124d6d00 (diff)
downloadmongo-c9290364162cd76697ebee76c63ec148cd305101.tar.gz
SERVER-37191 writeConcern for $out
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml3
-rw-r--r--jstests/aggregation/sources/out/use_cases.js12
-rw-r--r--jstests/sharding/commands_that_write_accept_wc_shards.js57
-rw-r--r--jstests/sharding/out_write_concern.js92
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp16
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h12
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_out.h12
-rw-r--r--src/mongo/db/pipeline/document_source_out_in_place.h1
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp51
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
-rw-r--r--src/mongo/s/commands/cluster_current_op.cpp8
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp8
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp92
-rw-r--r--src/mongo/s/query/cluster_aggregate.h16
-rw-r--r--src/mongo/s/query/establish_cursors.cpp11
-rw-r--r--src/mongo/s/query/establish_cursors.h15
24 files changed, 257 insertions, 190 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 9d4e90c2d5e..3cbe1c9b096 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -61,7 +61,8 @@ selector:
- jstests/sharding/txn_writes_during_movechunk.js
- jstests/sharding/update_sharded.js
- jstests/sharding/shard_existing_coll_chunk_count.js
- # Enable if SERVER-20865 is backported or 4.2 becomes last-stable
+ - jstests/sharding/out_write_concern.js
+# Enable if SERVER-20865 is backported or 4.2 becomes last-stable
- jstests/sharding/sharding_statistics_server_status.js
executor:
config:
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index 44aa9ba96ae..91c1737b1cb 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -375,7 +375,8 @@ selector:
- jstests/sharding/txn_coordinator_commands_basic_requirements.js
- jstests/sharding/txn_writes_during_movechunk.js
- jstests/sharding/update_sharded.js
- # Enable if SERVER-20865 is backported or 4.2 becomes last-stable
+ - jstests/sharding/out_write_concern.js
+# Enable if SERVER-20865 is backported or 4.2 becomes last-stable
- jstests/sharding/sharding_statistics_server_status.js
executor:
diff --git a/jstests/aggregation/sources/out/use_cases.js b/jstests/aggregation/sources/out/use_cases.js
index dd936c5c533..243f1f78ff4 100644
--- a/jstests/aggregation/sources/out/use_cases.js
+++ b/jstests/aggregation/sources/out/use_cases.js
@@ -102,15 +102,9 @@
runAggregate(hourSix, "insertDocuments");
- // TODO SERVER-37191 reenable when fixed.
- // assert.eq(3, res.length, tojson(res));
- // assert.eq(res[2], {_id: "2018-08-15T06", ticks: ticksSum, avgTemp: tempSum /
- // samplesPerHour});
- // also remove the assert.soon workaround.
- assert.soon(() => {
- res = rollupColl.find().sort({_id: 1}).toArray();
- return res.length == 3;
- });
+ res = rollupColl.find().sort({_id: 1}).toArray();
+ assert.eq(3, res.length, tojson(res));
+ assert.eq(res[2], {_id: "2018-08-15T06", ticks: ticksSum, avgTemp: tempSum / samplesPerHour});
st.stop();
}());
diff --git a/jstests/sharding/commands_that_write_accept_wc_shards.js b/jstests/sharding/commands_that_write_accept_wc_shards.js
index 6d843652d12..aebf9d5ad18 100644
--- a/jstests/sharding/commands_that_write_accept_wc_shards.js
+++ b/jstests/sharding/commands_that_write_accept_wc_shards.js
@@ -127,63 +127,6 @@ load('jstests/libs/write_concern_util.js');
admin: false
});
- // Aggregate with passthrough.
- commands.push({
- req: {aggregate: collName, pipeline: [{$sort: {type: 1}}, {$out: "foo"}], cursor: {}},
- setupFunc: function() {
- coll.insert({_id: 1, x: 3, type: 'oak'});
- coll.insert({_id: 2, x: 13, type: 'maple'});
- },
- confirmFunc: function() {
- assert.eq(db.foo.find({type: 'oak'}).itcount(), 1);
- assert.eq(db.foo.find({type: 'maple'}).itcount(), 1);
- db.foo.drop();
- },
- admin: false
- });
-
- // Aggregate that only matches one shard.
- commands.push({
- req: {
- aggregate: collName,
- pipeline: [{$match: {x: -3}}, {$match: {type: {$exists: 1}}}, {$out: "foo"}],
- cursor: {}
- },
- setupFunc: function() {
- shardCollectionWithChunks(st, coll);
- coll.insert({_id: 1, x: -3, type: 'oak'});
- coll.insert({_id: 2, x: -4, type: 'maple'});
- },
- confirmFunc: function() {
- assert.eq(db.foo.find().itcount(), 1);
- assert.eq(db.foo.find({type: 'oak'}).itcount(), 1);
- assert.eq(db.foo.find({type: 'maple'}).itcount(), 0);
- db.foo.drop();
- },
- admin: false
- });
-
- // Aggregate that must go to multiple shards.
- commands.push({
- req: {
- aggregate: collName,
- pipeline: [{$match: {type: {$exists: 1}}}, {$sort: {type: 1}}, {$out: "foo"}],
- cursor: {}
- },
- setupFunc: function() {
- shardCollectionWithChunks(st, coll);
- coll.insert({_id: 1, x: -3, type: 'oak'});
- coll.insert({_id: 2, x: 23, type: 'maple'});
- },
- confirmFunc: function() {
- assert.eq(db.foo.find().itcount(), 2);
- assert.eq(db.foo.find({type: 'oak'}).itcount(), 1);
- assert.eq(db.foo.find({type: 'maple'}).itcount(), 1);
- db.foo.drop();
- },
- admin: false
- });
-
// MapReduce on an unsharded collection.
commands.push({
req: {
diff --git a/jstests/sharding/out_write_concern.js b/jstests/sharding/out_write_concern.js
new file mode 100644
index 00000000000..d4aeb75c1d0
--- /dev/null
+++ b/jstests/sharding/out_write_concern.js
@@ -0,0 +1,92 @@
+// Tests that $out respects the writeConcern set on the original aggregation command.
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/out_helpers.js"); // For withEachOutMode() and isSharded().
+
+ const st = new ShardingTest({shards: 2, rs: {nodes: 3}, config: 1});
+
+ const mongosDB = st.s0.getDB("out_write_concern");
+ const source = mongosDB["source"];
+ const target = mongosDB["target"];
+ const shard0 = st.rs0;
+ const shard1 = st.rs1;
+
+ // Enable sharding on the test DB and ensure its primary is shard0.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ function testWriteConcernError(rs) {
+ // Make sure that there are only 2 nodes up so w:3 writes will always time out.
+ const stoppedSecondary = rs.getSecondary();
+ rs.stop(stoppedSecondary);
+
+ // Test that $out correctly returns a WC error.
+ withEachOutMode((mode) => {
+ // Skip mode "replaceCollection" if the target collection is sharded, as it is
+ // unsupported. Also skip mode "replaceCollection" if the test is expecting a timeout
+ // against the non-primary shard, since this mode writes to a temp collection on the
+ // primary only.
+ if (mode == "replaceCollection" && (rs != shard0 || FixtureHelpers.isSharded(target)))
+ return;
+
+ const res = mongosDB.runCommand({
+ aggregate: "source",
+ pipeline: [{$out: {to: "target", mode: mode}}],
+ writeConcern: {w: 3, wtimeout: 100},
+ cursor: {},
+ });
+
+ // $out writeConcern errors are handled differently from normal writeConcern
+ // errors. Rather than returing ok:1 and a WriteConcernError, the entire operation
+ // fails.
+ assert.commandFailedWithCode(res, ErrorCodes.WriteConcernFailed);
+ assert.commandWorked(target.remove({}));
+ });
+
+ // Restart the stopped node and verify that the $out's now pass.
+ rs.restart(rs.getSecondary());
+ rs.awaitReplication();
+ withEachOutMode((mode) => {
+ // "replaceCollection" is banned when the target collection is sharded.
+ if (mode == "replaceCollection" && FixtureHelpers.isSharded(target))
+ return;
+
+ const res = mongosDB.runCommand({
+ aggregate: "source",
+ pipeline: [{$out: {to: "target", mode: mode}}],
+ writeConcern: {w: 3},
+ cursor: {},
+ });
+
+ // Ensure that the write concern is satisfied within a reasonable amount of time. This
+ // prevents the test from hanging if for some reason the write concern can't be
+ // satisfied.
+ assert.soon(() => assert.commandWorked(res), "writeConcern was not satisfied");
+ assert.commandWorked(target.remove({}));
+ });
+ }
+
+ // Test that when both collections are unsharded, all writes are directed to the primary shard.
+ assert.commandWorked(source.insert([{_id: -1}, {_id: 0}, {_id: 1}, {_id: 2}]));
+ testWriteConcernError(shard0);
+
+ // Shard the source collection and continue to expect writes to the primary shard.
+ st.shardColl(source, {_id: 1}, {_id: 0}, {_id: 1}, mongosDB.getName());
+ testWriteConcernError(shard0);
+
+ // Shard the target collection, however make sure that all writes go to the primary shard by
+ // splitting the collection at {_id: 10} and keeping all values in the same chunk.
+ st.shardColl(target, {_id: 1}, {_id: 10}, {_id: 10}, mongosDB.getName());
+ assert.eq(FixtureHelpers.isSharded(target), true);
+ testWriteConcernError(shard0);
+
+ // Write a few documents to the source collection which will be $out-ed to the second shard.
+ assert.commandWorked(source.insert([{_id: 11}, {_id: 12}, {_id: 13}]));
+
+ // Verify that either shard can produce a WriteConcernError since writes are going to both.
+ testWriteConcernError(shard0);
+ testWriteConcernError(shard1);
+
+ st.stop();
+}());
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 7b5e7ab7da2..9a45b38ec17 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -46,7 +46,6 @@
#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/storage/storage_options.h"
-#include "mongo/db/write_concern_options.h"
namespace mongo {
@@ -225,6 +224,16 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
}
} else if (bypassDocumentValidationCommandOption() == fieldName) {
request.setBypassDocumentValidation(elem.trueValue());
+ } else if (WriteConcernOptions::kWriteConcernField == fieldName) {
+ if (elem.type() != BSONType::Object) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << fieldName << " must be an object, not a "
+ << typeName(elem.type())};
+ }
+
+ WriteConcernOptions writeConcern;
+ uassertStatusOK(writeConcern.parse(elem.embeddedObject()));
+ request.setWriteConcern(writeConcern);
} else if (!isGenericArgument(fieldName)) {
return {ErrorCodes::FailedToParse,
str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
@@ -324,7 +333,10 @@ Document AggregationRequest::serializeToCommandObj() const {
// Only serialize maxTimeMs if specified.
{QueryRequest::cmdOptionMaxTimeMS,
_maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))},
- {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()}};
+ {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()},
+ {WriteConcernOptions::kWriteConcernField,
+ _writeConcern ? Value(_writeConcern->toBSON()) : Value()},
+ };
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 019f46015f2..22134763ac9 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -38,6 +38,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/exchange_spec_gen.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/write_concern_options.h"
namespace mongo {
@@ -194,6 +195,10 @@ public:
return _exchangeSpec;
}
+ boost::optional<WriteConcernOptions> getWriteConcern() const {
+ return _writeConcern;
+ }
+
//
// Setters for optional fields.
//
@@ -254,6 +259,10 @@ public:
_exchangeSpec = std::move(spec);
}
+ void setWriteConcern(WriteConcernOptions writeConcern) {
+ _writeConcern = writeConcern;
+ }
+
private:
// Required fields.
const NamespaceString _nss;
@@ -299,5 +308,8 @@ private:
// represents a producer running as a part of the exchange machinery.
// This is an internal option; we do not expect it to be set on requests from users or drivers.
boost::optional<ExchangeSpec> _exchangeSpec;
+
+ // The explicit writeConcern for the operation or boost::none if the user did not specifiy one.
+ boost::optional<WriteConcernOptions> _writeConcern;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index d35a0c5b65a..502319d12dc 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -496,6 +496,12 @@ TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) {
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
+TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
//
// Ignore fields parsed elsewhere.
//
@@ -507,12 +513,5 @@ TEST(AggregationRequestTest, ShouldIgnoreQueryOptions) {
ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
-TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) {
- NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}");
- ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index ea158dba17c..be19fbd681e 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -289,6 +289,7 @@ DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs,
WriteModeEnum mode,
std::set<FieldPath> uniqueKey)
: DocumentSource(expCtx),
+ _writeConcern(expCtx->opCtx->getWriteConcern()),
_done(false),
_outputNs(std::move(outputNs)),
_mode(mode),
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 8909e7dd88c..62deff0291a 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_out_gen.h"
+#include "mongo/db/write_concern_options.h"
namespace mongo {
@@ -165,7 +166,8 @@ public:
* Writes the documents in 'batch' to the write namespace.
*/
virtual void spill(BatchedObjects&& batch) {
- pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), std::move(batch.objects));
+ pExpCtx->mongoProcessInterface->insert(
+ pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern);
};
/**
@@ -188,6 +190,14 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+protected:
+ // Stash the writeConcern of the original command as the operation context may change by the
+ // time we start to spill $out writes. This is because certain aggregations (e.g. $exchange)
+ // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation
+ // context. The getMore's will not have an attached writeConcern however we still want to
+ // respect the writeConcern of the original command.
+ WriteConcernOptions _writeConcern;
+
private:
bool _initialized = false;
bool _done = false;
diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h
index 2fc101d87a2..791d0985657 100644
--- a/src/mongo/db/pipeline/document_source_out_in_place.h
+++ b/src/mongo/db/pipeline/document_source_out_in_place.h
@@ -73,6 +73,7 @@ public:
getWriteNs(),
std::move(batch.uniqueKeys),
std::move(batch.objects),
+ _writeConcern,
upsert,
multi);
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index ec91cba6780..da96b66c4b0 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -112,7 +112,8 @@ public:
*/
virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) = 0;
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) = 0;
/**
* Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException
@@ -122,6 +123,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) = 0;
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 103ac12ec60..ee900586ba4 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -65,7 +65,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) final {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) final {
MONGO_UNREACHABLE;
}
@@ -73,6 +74,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index ce360bbd5bb..c6ccdb2d7ad 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -61,6 +61,18 @@ using write_ops::Insert;
using write_ops::Update;
using write_ops::UpdateOpEntry;
+namespace {
+
+// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has
+// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it.
+void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOptions& writeConcern) {
+ if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) {
+ request->setWriteConcern(writeConcern.toBSON());
+ }
+}
+
+} // namespace
+
std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields(
OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
@@ -121,15 +133,19 @@ std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocume
void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- ClusterWriter::write(
- expCtx->opCtx,
- BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)),
- &stats,
- &response);
+ BatchedCommandRequest insertCommand(
+ buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+
+ // If applicable, attach a write concern to the batched command request.
+ attachWriteConcern(&insertCommand, wc);
+
+ ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response);
+
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
}
@@ -138,19 +154,24 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- ClusterWriter::write(expCtx->opCtx,
- BatchedCommandRequest(buildUpdateOp(ns,
- std::move(queries),
- std::move(updates),
- upsert,
- multi,
- expCtx->bypassDocumentValidation)),
- &stats,
- &response);
+
+ BatchedCommandRequest updateCommand(buildUpdateOp(ns,
+ std::move(queries),
+ std::move(updates),
+ upsert,
+ multi,
+ expCtx->bypassDocumentValidation));
+
+ // If applicable, attach a write concern to the batched command request.
+ attachWriteConcern(&updateCommand, wc);
+
+ ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response);
+
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
}
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index 1550b9a6a62..39d550fc083 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -52,7 +52,8 @@ public:
*/
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) final;
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) final;
/**
* Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
@@ -62,6 +63,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) final;
};
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 660ee570b43..27ef61d9b7e 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -155,7 +155,8 @@ Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss,
void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) {
auto writeResults = performInserts(
expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
@@ -176,6 +177,7 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) {
auto writeResults = performUpdates(expCtx->opCtx,
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index e02ac9e6339..de64e9b0578 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -58,11 +58,13 @@ public:
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) override;
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) override;
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) override;
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index c085850eccc..e822be1e7b1 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -61,7 +61,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) override {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) override {
MONGO_UNREACHABLE;
}
@@ -69,6 +70,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp
index c95fe33280e..84e527fde77 100644
--- a/src/mongo/s/commands/cluster_current_op.cpp
+++ b/src/mongo/s/commands/cluster_current_op.cpp
@@ -72,16 +72,12 @@ private:
virtual StatusWith<CursorResponse> runAggregation(
OperationContext* opCtx, const AggregationRequest& request) const final {
- auto aggCmdObj = request.serializeToCommandObj().toBson();
auto nss = request.getNamespaceString();
BSONObjBuilder responseBuilder;
- auto status = ClusterAggregate::runAggregate(opCtx,
- ClusterAggregate::Namespaces{nss, nss},
- request,
- std::move(aggCmdObj),
- &responseBuilder);
+ auto status = ClusterAggregate::runAggregate(
+ opCtx, ClusterAggregate::Namespaces{nss, nss}, request, &responseBuilder);
if (!status.isOK()) {
return status;
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 260de4dc13e..f1cb773895f 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -78,12 +78,8 @@ public:
const auto& nss = aggregationRequest.getNamespaceString();
try {
- uassertStatusOK(
- ClusterAggregate::runAggregate(opCtx,
- ClusterAggregate::Namespaces{nss, nss},
- aggregationRequest,
- cmdObj,
- result));
+ uassertStatusOK(ClusterAggregate::runAggregate(
+ opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, result));
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
// If the aggregation failed because the namespace is a view, re-run the command
// with the resolved view pipeline and namespace.
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 6a94072cd5f..76617c91f05 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/query/find_common.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/db/views/view.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
@@ -89,6 +90,15 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries;
namespace {
+Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) {
+ // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
+ // pipeline does not support writeConcern.
+ if (req.getWriteConcern()) {
+ return Shard::RetryPolicy::kNotIdempotent;
+ }
+ return Shard::RetryPolicy::kIdempotent;
+}
+
// Given a document representing an aggregation command such as
//
// {aggregate: "myCollection", pipeline: [], ...},
@@ -218,15 +228,12 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
const boost::optional<ShardId>& shardId,
Pipeline* pipeline,
- const BSONObj& originalCmdObj,
BSONObj collationObj) {
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
if (pipeline) {
targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
}
- // This pipeline is not split, ensure that the write concern is propagated if present.
- targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj);
}
@@ -247,10 +254,17 @@ BSONObj createCommandForTargetedShards(
// send to the shards.
targetedCmd[AggregationRequest::kPipelineName] =
Value(splitPipeline.shardsPipeline->serialize());
+
// When running on many shards with the exchange we may not need merging.
if (needsMerge) {
targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+
+ // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
+ // part. Otherwise this is part of an exchange and in that case we should include the
+ // writeConcern.
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
}
+
targetedCmd[AggregationRequest::kCursorName] =
Value(DOC(AggregationRequest::kBatchSizeName << 0));
@@ -263,14 +277,12 @@ BSONObj createCommandForTargetedShards(
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- const BSONObj originalCmdObj,
const ShardId& shardId,
const Pipeline* pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
mergeCmd[AggregationRequest::kFromMongosName] = Value(true);
- mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
// If the user didn't specify a collation already, make sure there's a collation attached to
// the merge command, since the merging shard may not have the collection metadata.
@@ -296,14 +308,14 @@ std::vector<RemoteCursor> establishShardCursors(
const LiteParsedPipeline& litePipe,
boost::optional<CachedCollectionRoutingInfo>& routingInfo,
const BSONObj& cmdObj,
+ const AggregationRequest& request,
const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+ const BSONObj& shardQuery) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
std::set<ShardId> shardIds =
- getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation);
+ getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
std::vector<std::pair<ShardId, BSONObj>> requests;
// If we don't need to run on all shards, then we should always have a valid routing table.
@@ -345,7 +357,8 @@ std::vector<RemoteCursor> establishShardCursors(
nss,
readPref,
requests,
- false /* do not allow partial results */);
+ false /* do not allow partial results */,
+ getDesiredRetryPolicy(request));
}
struct DispatchShardPipelineResults {
@@ -384,7 +397,6 @@ struct DispatchShardPipelineResults {
DispatchShardPipelineResults dispatchShardPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
- BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
@@ -406,8 +418,6 @@ DispatchShardPipelineResults dispatchShardPipeline(
const auto shardQuery = pipeline->getInitialQuery();
- boost::optional<SplitPipeline> splitPipeline;
-
auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
// If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
@@ -440,6 +450,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
*(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ boost::optional<SplitPipeline> splitPipeline;
if (needsSplit) {
splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
@@ -453,7 +464,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
? createCommandForTargetedShards(
opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
: createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, pipeline.get(), originalCmdObj, collationObj);
+ opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
@@ -497,9 +508,9 @@ DispatchShardPipelineResults dispatchShardPipeline(
liteParsedPipeline,
executionNsRoutingInfo,
targetedCommand,
+ aggRequest,
ReadPreferenceSetting::get(opCtx),
- shardQuery,
- aggRequest.getCollation());
+ shardQuery);
invariant(cursors.size() % shardIds.size() == 0,
str::stream() << "Number of cursors (" << cursors.size()
<< ") is not a multiple of producers ("
@@ -533,7 +544,6 @@ DispatchShardPipelineResults dispatchShardPipeline(
DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
- BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
BSONObj collationObj,
@@ -666,6 +676,7 @@ Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const NamespaceString& nss,
+ const AggregationRequest& request,
const BSONObj mergeCmdObj,
const ShardId& mergingShardId) {
if (MONGO_FAIL_POINT(clusterAggregateFailToEstablishMergingShardCursor)) {
@@ -677,19 +688,15 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
- return uassertStatusOK(
- mergingShard->runCommandWithFixedRetryAttempts(opCtx,
- ReadPreferenceSetting::get(opCtx),
- nss.db().toString(),
- mergeCmdObj,
- Shard::RetryPolicy::kIdempotent));
+ Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request);
+ return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts(
+ opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy));
}
BSONObj establishMergingMongosCursor(
OperationContext* opCtx,
const AggregationRequest& request,
const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging) {
@@ -963,7 +970,6 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext*
Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ClusterAggregate::Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
const LiteParsedPipeline& litePipe,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
BSONObjBuilder* result) {
@@ -982,8 +988,8 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
!pipeline->getSources().front()->constraints().requiresInputDocSource);
// Register the new mongoS cursor, and retrieve the initial batch of results.
- auto cursorResponse = establishMergingMongosCursor(
- opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline));
+ auto cursorResponse =
+ establishMergingMongosCursor(opCtx, request, requestedNss, litePipe, std::move(pipeline));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
@@ -994,7 +1000,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ClusterAggregate::Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
const LiteParsedPipeline& litePipe,
const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
DispatchShardPipelineResults&& shardDispatchResults,
@@ -1027,7 +1032,6 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
return runPipelineOnMongoS(expCtx,
namespaces,
request,
- shardDispatchResults.commandForTargetedShards,
litePipe,
std::move(shardDispatchResults.splitPipeline->mergePipeline),
result);
@@ -1049,12 +1053,11 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
targetedShards,
routingInfo->db().primaryId());
- auto mergeCmdObj =
- createCommandForMergingShard(request, expCtx, cmdObj, mergingShardId, mergePipeline);
+ auto mergeCmdObj = createCommandForMergingShard(request, expCtx, mergingShardId, mergePipeline);
// Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
- auto mergeResponse =
- establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId);
+ auto mergeResponse = establishMergingShardCursor(
+ opCtx, namespaces.executionNss, request, mergeCmdObj, mergingShardId);
auto mergeCursorResponse = uassertStatusOK(storePossibleCursor(
opCtx, namespaces.requestedNss, mergingShardId, mergeResponse, expCtx->tailableMode));
@@ -1084,7 +1087,6 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx,
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
BSONObjBuilder* result) {
auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
boost::optional<CachedCollectionRoutingInfo> routingInfo;
@@ -1119,7 +1121,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) {
resolveInvolvedNamespaces(opCtx, litePipe);
const auto primaryShardId = routingInfo->db().primary()->getId();
- return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result);
+ return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result);
}
// Populate the collection UUID and the appropriate collation to use.
@@ -1147,17 +1149,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
return runPipelineOnMongoS(
- expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result);
+ expCtx, namespaces, request, litePipe, std::move(pipeline), result);
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = dispatchShardPipeline(expCtx,
- namespaces.executionNss,
- cmdObj,
- request,
- litePipe,
- std::move(pipeline),
- collationObj);
+ auto shardDispatchResults = dispatchShardPipeline(
+ expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
// If the operation is an explain, then we verify that it succeeded on all targeted shards,
// write the results to the output builder, and return immediately.
@@ -1183,7 +1180,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
if (shardDispatchResults.exchangeSpec) {
shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx,
namespaces.executionNss,
- cmdObj,
request,
litePipe,
collationObj,
@@ -1194,7 +1190,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return dispatchMergingPipeline(expCtx,
namespaces,
request,
- cmdObj,
litePipe,
routingInfo,
std::move(shardDispatchResults),
@@ -1223,7 +1218,6 @@ void ClusterAggregate::uassertAllShardsSupportExplain(
Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
const ShardId& shardId,
- BSONObj cmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
BSONObjBuilder* out) {
@@ -1241,8 +1235,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
- cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, cmdObj, BSONObj()));
+ BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
+ createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj()));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,
@@ -1295,7 +1289,6 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx,
}
auto resolvedAggRequest = resolvedView.asExpandedViewAggregation(request);
- auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
result->resetToEmpty();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
@@ -1310,8 +1303,7 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx,
nsStruct.requestedNss = requestedNss;
nsStruct.executionNss = resolvedView.getNamespace();
- auto status =
- ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result);
+ auto status = ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, result);
// If the underlying namespace was changed to a view during retry, then re-run the aggregation
// on the new resolved namespace.
diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h
index 3438c71b0b1..822ad3cc078 100644
--- a/src/mongo/s/query/cluster_aggregate.h
+++ b/src/mongo/s/query/cluster_aggregate.h
@@ -77,14 +77,11 @@ public:
* over which the aggregation will actually execute. Typically these two namespaces are the
* same, but they may differ in the case of a query on a view.
*
- * The raw aggregate command parameters should be passed in 'cmdObj'.
- *
* On success, fills out 'result' with the command response.
*/
static Status runAggregate(OperationContext* opCtx,
const Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
BSONObjBuilder* result);
/**
@@ -104,22 +101,9 @@ private:
static void uassertAllShardsSupportExplain(
const std::vector<AsyncRequestsSender::Response>& shardResults);
- // These are temporary hacks because the runCommand method doesn't report the exact
- // host the command was run on which is necessary for cursor support. The exact host
- // could be different from conn->getServerAddress() for connections that map to
- // multiple servers such as for replica sets. These also take care of registering
- // returned cursors.
- static BSONObj aggRunCommand(OperationContext* opCtx,
- const ShardId& shardId,
- DBClientBase* conn,
- const Namespaces& namespaces,
- const AggregationRequest& aggRequest,
- BSONObj cmd);
-
static Status aggPassthrough(OperationContext*,
const Namespaces&,
const ShardId&,
- BSONObj cmd,
const AggregationRequest&,
const LiteParsedPipeline&,
BSONObjBuilder* result);
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index b9881c03d5f..0aff00c9c34 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -55,7 +55,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
const NamespaceString& nss,
const ReadPreferenceSetting readPref,
const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults) {
+ bool allowPartialResults,
+ Shard::RetryPolicy retryPolicy) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& remote : remotes) {
@@ -63,12 +64,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
// Send the requests
- MultiStatementTransactionRequestsSender ars(opCtx,
- executor,
- nss.db().toString(),
- std::move(requests),
- readPref,
- Shard::RetryPolicy::kIdempotent);
+ MultiStatementTransactionRequestsSender ars(
+ opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy);
std::vector<RemoteCursor> remoteCursors;
try {
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index e07a3d8b328..2c956b50b18 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -38,6 +38,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/s/client/shard.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
@@ -63,12 +64,14 @@ class CursorResponse;
* on reachable hosts are returned.
*
*/
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults);
+std::vector<RemoteCursor> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults,
+ Shard::RetryPolicy retryPolicy = Shard::RetryPolicy::kIdempotent);
/**
* Schedules a remote killCursor command for each of the cursors in 'remoteCursors'.