summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp16
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h12
-rw-r--r--src/mongo/db/pipeline/aggregation_request_test.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_out.h12
-rw-r--r--src/mongo/db/pipeline/document_source_out_in_place.h1
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp51
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h4
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
-rw-r--r--src/mongo/s/commands/cluster_current_op.cpp8
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp8
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp92
-rw-r--r--src/mongo/s/query/cluster_aggregate.h16
-rw-r--r--src/mongo/s/query/establish_cursors.cpp11
-rw-r--r--src/mongo/s/query/establish_cursors.h15
19 files changed, 158 insertions, 122 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index 7b5e7ab7da2..9a45b38ec17 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -46,7 +46,6 @@
#include "mongo/db/query/query_request.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/storage/storage_options.h"
-#include "mongo/db/write_concern_options.h"
namespace mongo {
@@ -225,6 +224,16 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
}
} else if (bypassDocumentValidationCommandOption() == fieldName) {
request.setBypassDocumentValidation(elem.trueValue());
+ } else if (WriteConcernOptions::kWriteConcernField == fieldName) {
+ if (elem.type() != BSONType::Object) {
+ return {ErrorCodes::TypeMismatch,
+ str::stream() << fieldName << " must be an object, not a "
+ << typeName(elem.type())};
+ }
+
+ WriteConcernOptions writeConcern;
+ uassertStatusOK(writeConcern.parse(elem.embeddedObject()));
+ request.setWriteConcern(writeConcern);
} else if (!isGenericArgument(fieldName)) {
return {ErrorCodes::FailedToParse,
str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
@@ -324,7 +333,10 @@ Document AggregationRequest::serializeToCommandObj() const {
// Only serialize maxTimeMs if specified.
{QueryRequest::cmdOptionMaxTimeMS,
_maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))},
- {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()}};
+ {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()},
+ {WriteConcernOptions::kWriteConcernField,
+ _writeConcern ? Value(_writeConcern->toBSON()) : Value()},
+ };
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index 019f46015f2..22134763ac9 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -38,6 +38,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/exchange_spec_gen.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/write_concern_options.h"
namespace mongo {
@@ -194,6 +195,10 @@ public:
return _exchangeSpec;
}
+ boost::optional<WriteConcernOptions> getWriteConcern() const {
+ return _writeConcern;
+ }
+
//
// Setters for optional fields.
//
@@ -254,6 +259,10 @@ public:
_exchangeSpec = std::move(spec);
}
+ void setWriteConcern(WriteConcernOptions writeConcern) {
+ _writeConcern = writeConcern;
+ }
+
private:
// Required fields.
const NamespaceString _nss;
@@ -299,5 +308,8 @@ private:
// represents a producer running as a part of the exchange machinery.
// This is an internal option; we do not expect it to be set on requests from users or drivers.
boost::optional<ExchangeSpec> _exchangeSpec;
+
+ // The explicit writeConcern for the operation or boost::none if the user did not specifiy one.
+ boost::optional<WriteConcernOptions> _writeConcern;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp
index d35a0c5b65a..502319d12dc 100644
--- a/src/mongo/db/pipeline/aggregation_request_test.cpp
+++ b/src/mongo/db/pipeline/aggregation_request_test.cpp
@@ -496,6 +496,12 @@ TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) {
ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
+TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) {
+ NamespaceString nss("a.collection");
+ const BSONObj inputBson =
+ fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}");
+ ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
+}
//
// Ignore fields parsed elsewhere.
//
@@ -507,12 +513,5 @@ TEST(AggregationRequestTest, ShouldIgnoreQueryOptions) {
ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
}
-TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) {
- NamespaceString nss("a.collection");
- const BSONObj inputBson =
- fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}");
- ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus());
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index ea158dba17c..be19fbd681e 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -289,6 +289,7 @@ DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs,
WriteModeEnum mode,
std::set<FieldPath> uniqueKey)
: DocumentSource(expCtx),
+ _writeConcern(expCtx->opCtx->getWriteConcern()),
_done(false),
_outputNs(std::move(outputNs)),
_mode(mode),
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 8909e7dd88c..62deff0291a 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_out_gen.h"
+#include "mongo/db/write_concern_options.h"
namespace mongo {
@@ -165,7 +166,8 @@ public:
* Writes the documents in 'batch' to the write namespace.
*/
virtual void spill(BatchedObjects&& batch) {
- pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), std::move(batch.objects));
+ pExpCtx->mongoProcessInterface->insert(
+ pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern);
};
/**
@@ -188,6 +190,14 @@ public:
static boost::intrusive_ptr<DocumentSource> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+protected:
+ // Stash the writeConcern of the original command as the operation context may change by the
+ // time we start to spill $out writes. This is because certain aggregations (e.g. $exchange)
+ // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation
+ // context. The getMore's will not have an attached writeConcern however we still want to
+ // respect the writeConcern of the original command.
+ WriteConcernOptions _writeConcern;
+
private:
bool _initialized = false;
bool _done = false;
diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h
index 2fc101d87a2..791d0985657 100644
--- a/src/mongo/db/pipeline/document_source_out_in_place.h
+++ b/src/mongo/db/pipeline/document_source_out_in_place.h
@@ -73,6 +73,7 @@ public:
getWriteNs(),
std::move(batch.uniqueKeys),
std::move(batch.objects),
+ _writeConcern,
upsert,
multi);
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index ec91cba6780..da96b66c4b0 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -112,7 +112,8 @@ public:
*/
virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) = 0;
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) = 0;
/**
* Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException
@@ -122,6 +123,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) = 0;
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 103ac12ec60..ee900586ba4 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -65,7 +65,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) final {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) final {
MONGO_UNREACHABLE;
}
@@ -73,6 +74,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index ce360bbd5bb..c6ccdb2d7ad 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -61,6 +61,18 @@ using write_ops::Insert;
using write_ops::Update;
using write_ops::UpdateOpEntry;
+namespace {
+
+// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has
+// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it.
+void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOptions& writeConcern) {
+ if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) {
+ request->setWriteConcern(writeConcern.toBSON());
+ }
+}
+
+} // namespace
+
std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields(
OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const {
invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer);
@@ -121,15 +133,19 @@ std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocume
void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- ClusterWriter::write(
- expCtx->opCtx,
- BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)),
- &stats,
- &response);
+ BatchedCommandRequest insertCommand(
+ buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
+
+ // If applicable, attach a write concern to the batched command request.
+ attachWriteConcern(&insertCommand, wc);
+
+ ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response);
+
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
}
@@ -138,19 +154,24 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- ClusterWriter::write(expCtx->opCtx,
- BatchedCommandRequest(buildUpdateOp(ns,
- std::move(queries),
- std::move(updates),
- upsert,
- multi,
- expCtx->bypassDocumentValidation)),
- &stats,
- &response);
+
+ BatchedCommandRequest updateCommand(buildUpdateOp(ns,
+ std::move(queries),
+ std::move(updates),
+ upsert,
+ multi,
+ expCtx->bypassDocumentValidation));
+
+ // If applicable, attach a write concern to the batched command request.
+ attachWriteConcern(&updateCommand, wc);
+
+ ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response);
+
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
}
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index 1550b9a6a62..39d550fc083 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -52,7 +52,8 @@ public:
*/
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) final;
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) final;
/**
* Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
@@ -62,6 +63,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) final;
};
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 660ee570b43..27ef61d9b7e 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -155,7 +155,8 @@ Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss,
void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) {
auto writeResults = performInserts(
expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
@@ -176,6 +177,7 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) {
auto writeResults = performUpdates(expCtx->opCtx,
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index e02ac9e6339..de64e9b0578 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -58,11 +58,13 @@ public:
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) override;
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) override;
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) override;
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index c085850eccc..e822be1e7b1 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -61,7 +61,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
- std::vector<BSONObj>&& objs) override {
+ std::vector<BSONObj>&& objs,
+ const WriteConcernOptions& wc) override {
MONGO_UNREACHABLE;
}
@@ -69,6 +70,7 @@ public:
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
+ const WriteConcernOptions& wc,
bool upsert,
bool multi) final {
MONGO_UNREACHABLE;
diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp
index c95fe33280e..84e527fde77 100644
--- a/src/mongo/s/commands/cluster_current_op.cpp
+++ b/src/mongo/s/commands/cluster_current_op.cpp
@@ -72,16 +72,12 @@ private:
virtual StatusWith<CursorResponse> runAggregation(
OperationContext* opCtx, const AggregationRequest& request) const final {
- auto aggCmdObj = request.serializeToCommandObj().toBson();
auto nss = request.getNamespaceString();
BSONObjBuilder responseBuilder;
- auto status = ClusterAggregate::runAggregate(opCtx,
- ClusterAggregate::Namespaces{nss, nss},
- request,
- std::move(aggCmdObj),
- &responseBuilder);
+ auto status = ClusterAggregate::runAggregate(
+ opCtx, ClusterAggregate::Namespaces{nss, nss}, request, &responseBuilder);
if (!status.isOK()) {
return status;
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 260de4dc13e..f1cb773895f 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -78,12 +78,8 @@ public:
const auto& nss = aggregationRequest.getNamespaceString();
try {
- uassertStatusOK(
- ClusterAggregate::runAggregate(opCtx,
- ClusterAggregate::Namespaces{nss, nss},
- aggregationRequest,
- cmdObj,
- result));
+ uassertStatusOK(ClusterAggregate::runAggregate(
+ opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, result));
} catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
// If the aggregation failed because the namespace is a view, re-run the command
// with the resolved view pipeline and namespace.
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 6a94072cd5f..76617c91f05 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/query/find_common.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/db/views/view.h"
+#include "mongo/db/write_concern_options.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
@@ -89,6 +90,15 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries;
namespace {
+Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) {
+ // The idempotent retry policy will retry even for writeConcern failures, so only set it if the
+ // pipeline does not support writeConcern.
+ if (req.getWriteConcern()) {
+ return Shard::RetryPolicy::kNotIdempotent;
+ }
+ return Shard::RetryPolicy::kIdempotent;
+}
+
// Given a document representing an aggregation command such as
//
// {aggregate: "myCollection", pipeline: [], ...},
@@ -218,15 +228,12 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
const boost::optional<ShardId>& shardId,
Pipeline* pipeline,
- const BSONObj& originalCmdObj,
BSONObj collationObj) {
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
if (pipeline) {
targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
}
- // This pipeline is not split, ensure that the write concern is propagated if present.
- targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj);
}
@@ -247,10 +254,17 @@ BSONObj createCommandForTargetedShards(
// send to the shards.
targetedCmd[AggregationRequest::kPipelineName] =
Value(splitPipeline.shardsPipeline->serialize());
+
// When running on many shards with the exchange we may not need merging.
if (needsMerge) {
targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+
+ // For split pipelines which need merging, do *not* propagate the writeConcern to the shards
+ // part. Otherwise this is part of an exchange and in that case we should include the
+ // writeConcern.
+ targetedCmd[WriteConcernOptions::kWriteConcernField] = Value();
}
+
targetedCmd[AggregationRequest::kCursorName] =
Value(DOC(AggregationRequest::kBatchSizeName << 0));
@@ -263,14 +277,12 @@ BSONObj createCommandForTargetedShards(
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- const BSONObj originalCmdObj,
const ShardId& shardId,
const Pipeline* pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
mergeCmd[AggregationRequest::kFromMongosName] = Value(true);
- mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
// If the user didn't specify a collation already, make sure there's a collation attached to
// the merge command, since the merging shard may not have the collection metadata.
@@ -296,14 +308,14 @@ std::vector<RemoteCursor> establishShardCursors(
const LiteParsedPipeline& litePipe,
boost::optional<CachedCollectionRoutingInfo>& routingInfo,
const BSONObj& cmdObj,
+ const AggregationRequest& request,
const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+ const BSONObj& shardQuery) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe);
std::set<ShardId> shardIds =
- getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation);
+ getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation());
std::vector<std::pair<ShardId, BSONObj>> requests;
// If we don't need to run on all shards, then we should always have a valid routing table.
@@ -345,7 +357,8 @@ std::vector<RemoteCursor> establishShardCursors(
nss,
readPref,
requests,
- false /* do not allow partial results */);
+ false /* do not allow partial results */,
+ getDesiredRetryPolicy(request));
}
struct DispatchShardPipelineResults {
@@ -384,7 +397,6 @@ struct DispatchShardPipelineResults {
DispatchShardPipelineResults dispatchShardPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
- BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
@@ -406,8 +418,6 @@ DispatchShardPipelineResults dispatchShardPipeline(
const auto shardQuery = pipeline->getInitialQuery();
- boost::optional<SplitPipeline> splitPipeline;
-
auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss);
// If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue.
@@ -440,6 +450,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
*(shardIds.begin()) != executionNsRoutingInfo->db().primaryId()));
boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec;
+ boost::optional<SplitPipeline> splitPipeline;
if (needsSplit) {
splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline));
@@ -453,7 +464,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
? createCommandForTargetedShards(
opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
: createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, pipeline.get(), originalCmdObj, collationObj);
+ opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
@@ -497,9 +508,9 @@ DispatchShardPipelineResults dispatchShardPipeline(
liteParsedPipeline,
executionNsRoutingInfo,
targetedCommand,
+ aggRequest,
ReadPreferenceSetting::get(opCtx),
- shardQuery,
- aggRequest.getCollation());
+ shardQuery);
invariant(cursors.size() % shardIds.size() == 0,
str::stream() << "Number of cursors (" << cursors.size()
<< ") is not a multiple of producers ("
@@ -533,7 +544,6 @@ DispatchShardPipelineResults dispatchShardPipeline(
DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& executionNss,
- BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
BSONObj collationObj,
@@ -666,6 +676,7 @@ Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults,
Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const NamespaceString& nss,
+ const AggregationRequest& request,
const BSONObj mergeCmdObj,
const ShardId& mergingShardId) {
if (MONGO_FAIL_POINT(clusterAggregateFailToEstablishMergingShardCursor)) {
@@ -677,19 +688,15 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
- return uassertStatusOK(
- mergingShard->runCommandWithFixedRetryAttempts(opCtx,
- ReadPreferenceSetting::get(opCtx),
- nss.db().toString(),
- mergeCmdObj,
- Shard::RetryPolicy::kIdempotent));
+ Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request);
+ return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts(
+ opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy));
}
BSONObj establishMergingMongosCursor(
OperationContext* opCtx,
const AggregationRequest& request,
const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging) {
@@ -963,7 +970,6 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext*
Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ClusterAggregate::Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
const LiteParsedPipeline& litePipe,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
BSONObjBuilder* result) {
@@ -982,8 +988,8 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
!pipeline->getSources().front()->constraints().requiresInputDocSource);
// Register the new mongoS cursor, and retrieve the initial batch of results.
- auto cursorResponse = establishMergingMongosCursor(
- opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline));
+ auto cursorResponse =
+ establishMergingMongosCursor(opCtx, request, requestedNss, litePipe, std::move(pipeline));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
@@ -994,7 +1000,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ClusterAggregate::Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
const LiteParsedPipeline& litePipe,
const boost::optional<CachedCollectionRoutingInfo>& routingInfo,
DispatchShardPipelineResults&& shardDispatchResults,
@@ -1027,7 +1032,6 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
return runPipelineOnMongoS(expCtx,
namespaces,
request,
- shardDispatchResults.commandForTargetedShards,
litePipe,
std::move(shardDispatchResults.splitPipeline->mergePipeline),
result);
@@ -1049,12 +1053,11 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
targetedShards,
routingInfo->db().primaryId());
- auto mergeCmdObj =
- createCommandForMergingShard(request, expCtx, cmdObj, mergingShardId, mergePipeline);
+ auto mergeCmdObj = createCommandForMergingShard(request, expCtx, mergingShardId, mergePipeline);
// Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
- auto mergeResponse =
- establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId);
+ auto mergeResponse = establishMergingShardCursor(
+ opCtx, namespaces.executionNss, request, mergeCmdObj, mergingShardId);
auto mergeCursorResponse = uassertStatusOK(storePossibleCursor(
opCtx, namespaces.requestedNss, mergingShardId, mergeResponse, expCtx->tailableMode));
@@ -1084,7 +1087,6 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx,
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
BSONObjBuilder* result) {
auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss);
boost::optional<CachedCollectionRoutingInfo> routingInfo;
@@ -1119,7 +1121,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) {
resolveInvolvedNamespaces(opCtx, litePipe);
const auto primaryShardId = routingInfo->db().primary()->getId();
- return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result);
+ return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result);
}
// Populate the collection UUID and the appropriate collation to use.
@@ -1147,17 +1149,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
return runPipelineOnMongoS(
- expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result);
+ expCtx, namespaces, request, litePipe, std::move(pipeline), result);
}
// If not, split the pipeline as necessary and dispatch to the relevant shards.
- auto shardDispatchResults = dispatchShardPipeline(expCtx,
- namespaces.executionNss,
- cmdObj,
- request,
- litePipe,
- std::move(pipeline),
- collationObj);
+ auto shardDispatchResults = dispatchShardPipeline(
+ expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj);
// If the operation is an explain, then we verify that it succeeded on all targeted shards,
// write the results to the output builder, and return immediately.
@@ -1183,7 +1180,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
if (shardDispatchResults.exchangeSpec) {
shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx,
namespaces.executionNss,
- cmdObj,
request,
litePipe,
collationObj,
@@ -1194,7 +1190,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
return dispatchMergingPipeline(expCtx,
namespaces,
request,
- cmdObj,
litePipe,
routingInfo,
std::move(shardDispatchResults),
@@ -1223,7 +1218,6 @@ void ClusterAggregate::uassertAllShardsSupportExplain(
Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
const ShardId& shardId,
- BSONObj cmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
BSONObjBuilder* out) {
@@ -1241,8 +1235,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
- cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, cmdObj, BSONObj()));
+ BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
+ createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj()));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,
@@ -1295,7 +1289,6 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx,
}
auto resolvedAggRequest = resolvedView.asExpandedViewAggregation(request);
- auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
result->resetToEmpty();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
@@ -1310,8 +1303,7 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx,
nsStruct.requestedNss = requestedNss;
nsStruct.executionNss = resolvedView.getNamespace();
- auto status =
- ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result);
+ auto status = ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, result);
// If the underlying namespace was changed to a view during retry, then re-run the aggregation
// on the new resolved namespace.
diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h
index 3438c71b0b1..822ad3cc078 100644
--- a/src/mongo/s/query/cluster_aggregate.h
+++ b/src/mongo/s/query/cluster_aggregate.h
@@ -77,14 +77,11 @@ public:
* over which the aggregation will actually execute. Typically these two namespaces are the
* same, but they may differ in the case of a query on a view.
*
- * The raw aggregate command parameters should be passed in 'cmdObj'.
- *
* On success, fills out 'result' with the command response.
*/
static Status runAggregate(OperationContext* opCtx,
const Namespaces& namespaces,
const AggregationRequest& request,
- BSONObj cmdObj,
BSONObjBuilder* result);
/**
@@ -104,22 +101,9 @@ private:
static void uassertAllShardsSupportExplain(
const std::vector<AsyncRequestsSender::Response>& shardResults);
- // These are temporary hacks because the runCommand method doesn't report the exact
- // host the command was run on which is necessary for cursor support. The exact host
- // could be different from conn->getServerAddress() for connections that map to
- // multiple servers such as for replica sets. These also take care of registering
- // returned cursors.
- static BSONObj aggRunCommand(OperationContext* opCtx,
- const ShardId& shardId,
- DBClientBase* conn,
- const Namespaces& namespaces,
- const AggregationRequest& aggRequest,
- BSONObj cmd);
-
static Status aggPassthrough(OperationContext*,
const Namespaces&,
const ShardId&,
- BSONObj cmd,
const AggregationRequest&,
const LiteParsedPipeline&,
BSONObjBuilder* result);
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index b9881c03d5f..0aff00c9c34 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -55,7 +55,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
const NamespaceString& nss,
const ReadPreferenceSetting readPref,
const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults) {
+ bool allowPartialResults,
+ Shard::RetryPolicy retryPolicy) {
// Construct the requests
std::vector<AsyncRequestsSender::Request> requests;
for (const auto& remote : remotes) {
@@ -63,12 +64,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
// Send the requests
- MultiStatementTransactionRequestsSender ars(opCtx,
- executor,
- nss.db().toString(),
- std::move(requests),
- readPref,
- Shard::RetryPolicy::kIdempotent);
+ MultiStatementTransactionRequestsSender ars(
+ opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy);
std::vector<RemoteCursor> remoteCursors;
try {
diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h
index e07a3d8b328..2c956b50b18 100644
--- a/src/mongo/s/query/establish_cursors.h
+++ b/src/mongo/s/query/establish_cursors.h
@@ -38,6 +38,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/cursor_id.h"
#include "mongo/executor/task_executor.h"
+#include "mongo/s/client/shard.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/net/hostandport.h"
@@ -63,12 +64,14 @@ class CursorResponse;
* on reachable hosts are returned.
*
*/
-std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
- executor::TaskExecutor* executor,
- const NamespaceString& nss,
- const ReadPreferenceSetting readPref,
- const std::vector<std::pair<ShardId, BSONObj>>& remotes,
- bool allowPartialResults);
+std::vector<RemoteCursor> establishCursors(
+ OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ const NamespaceString& nss,
+ const ReadPreferenceSetting readPref,
+ const std::vector<std::pair<ShardId, BSONObj>>& remotes,
+ bool allowPartialResults,
+ Shard::RetryPolicy retryPolicy = Shard::RetryPolicy::kIdempotent);
/**
* Schedules a remote killCursor command for each of the cursors in 'remoteCursors'.