diff options
Diffstat (limited to 'src')
19 files changed, 158 insertions, 122 deletions
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp index 7b5e7ab7da2..9a45b38ec17 100644 --- a/src/mongo/db/pipeline/aggregation_request.cpp +++ b/src/mongo/db/pipeline/aggregation_request.cpp @@ -46,7 +46,6 @@ #include "mongo/db/query/query_request.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/db/write_concern_options.h" namespace mongo { @@ -225,6 +224,16 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON( } } else if (bypassDocumentValidationCommandOption() == fieldName) { request.setBypassDocumentValidation(elem.trueValue()); + } else if (WriteConcernOptions::kWriteConcernField == fieldName) { + if (elem.type() != BSONType::Object) { + return {ErrorCodes::TypeMismatch, + str::stream() << fieldName << " must be an object, not a " + << typeName(elem.type())}; + } + + WriteConcernOptions writeConcern; + uassertStatusOK(writeConcern.parse(elem.embeddedObject())); + request.setWriteConcern(writeConcern); } else if (!isGenericArgument(fieldName)) { return {ErrorCodes::FailedToParse, str::stream() << "unrecognized field '" << elem.fieldName() << "'"}; @@ -324,7 +333,10 @@ Document AggregationRequest::serializeToCommandObj() const { // Only serialize maxTimeMs if specified. {QueryRequest::cmdOptionMaxTimeMS, _maxTimeMS == 0 ? Value() : Value(static_cast<int>(_maxTimeMS))}, - {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()}}; + {kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()}, + {WriteConcernOptions::kWriteConcernField, + _writeConcern ? Value(_writeConcern->toBSON()) : Value()}, + }; } } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h index 019f46015f2..22134763ac9 100644 --- a/src/mongo/db/pipeline/aggregation_request.h +++ b/src/mongo/db/pipeline/aggregation_request.h @@ -38,6 +38,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/exchange_spec_gen.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/write_concern_options.h" namespace mongo { @@ -194,6 +195,10 @@ public: return _exchangeSpec; } + boost::optional<WriteConcernOptions> getWriteConcern() const { + return _writeConcern; + } + // // Setters for optional fields. // @@ -254,6 +259,10 @@ public: _exchangeSpec = std::move(spec); } + void setWriteConcern(WriteConcernOptions writeConcern) { + _writeConcern = writeConcern; + } + private: // Required fields. const NamespaceString _nss; @@ -299,5 +308,8 @@ private: // represents a producer running as a part of the exchange machinery. // This is an internal option; we do not expect it to be set on requests from users or drivers. boost::optional<ExchangeSpec> _exchangeSpec; + + // The explicit writeConcern for the operation or boost::none if the user did not specifiy one. + boost::optional<WriteConcernOptions> _writeConcern; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/aggregation_request_test.cpp b/src/mongo/db/pipeline/aggregation_request_test.cpp index d35a0c5b65a..502319d12dc 100644 --- a/src/mongo/db/pipeline/aggregation_request_test.cpp +++ b/src/mongo/db/pipeline/aggregation_request_test.cpp @@ -496,6 +496,12 @@ TEST(AggregationRequestTest, ShouldRejectExchangeInvalidSpec) { ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } +TEST(AggregationRequestTest, ShouldRejectInvalidWriteConcern) { + NamespaceString nss("a.collection"); + const BSONObj inputBson = + fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}"); + ASSERT_NOT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); +} // // Ignore fields parsed elsewhere. // @@ -507,12 +513,5 @@ TEST(AggregationRequestTest, ShouldIgnoreQueryOptions) { ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); } -TEST(AggregationRequestTest, ShouldIgnoreWriteConcernOption) { - NamespaceString nss("a.collection"); - const BSONObj inputBson = - fromjson("{pipeline: [{$match: {a: 'abc'}}], cursor: {}, writeConcern: 'invalid'}"); - ASSERT_OK(AggregationRequest::parseFromBSON(nss, inputBson).getStatus()); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index ea158dba17c..be19fbd681e 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -289,6 +289,7 @@ DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs, WriteModeEnum mode, std::set<FieldPath> uniqueKey) : DocumentSource(expCtx), + _writeConcern(expCtx->opCtx->getWriteConcern()), _done(false), _outputNs(std::move(outputNs)), _mode(mode), diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 8909e7dd88c..62deff0291a 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_out_gen.h" +#include "mongo/db/write_concern_options.h" namespace mongo { @@ -165,7 +166,8 @@ public: * Writes the documents in 'batch' to the write namespace. */ virtual void spill(BatchedObjects&& batch) { - pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), std::move(batch.objects)); + pExpCtx->mongoProcessInterface->insert( + pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern); }; /** @@ -188,6 +190,14 @@ public: static boost::intrusive_ptr<DocumentSource> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); +protected: + // Stash the writeConcern of the original command as the operation context may change by the + // time we start to spill $out writes. This is because certain aggregations (e.g. $exchange) + // establish cursors with batchSize 0 then run subsequent getMore's which use a new operation + // context. The getMore's will not have an attached writeConcern however we still want to + // respect the writeConcern of the original command. + WriteConcernOptions _writeConcern; + private: bool _initialized = false; bool _done = false; diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h index 2fc101d87a2..791d0985657 100644 --- a/src/mongo/db/pipeline/document_source_out_in_place.h +++ b/src/mongo/db/pipeline/document_source_out_in_place.h @@ -73,6 +73,7 @@ public: getWriteNs(), std::move(batch.uniqueKeys), std::move(batch.objects), + _writeConcern, upsert, multi); } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index ec91cba6780..da96b66c4b0 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -112,7 +112,8 @@ public: */ virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) = 0; + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) = 0; /** * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException @@ -122,6 +123,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) = 0; diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 103ac12ec60..ee900586ba4 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -65,7 +65,8 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) final { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) final { MONGO_UNREACHABLE; } @@ -73,6 +74,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) final { MONGO_UNREACHABLE; diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index ce360bbd5bb..c6ccdb2d7ad 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -61,6 +61,18 @@ using write_ops::Insert; using write_ops::Update; using write_ops::UpdateOpEntry; +namespace { + +// Attaches the write concern to the given batch request. If it looks like 'writeConcern' has +// been default initialized to {w: 0, wtimeout: 0} then we do not bother attaching it. +void attachWriteConcern(BatchedCommandRequest* request, const WriteConcernOptions& writeConcern) { + if (!writeConcern.wMode.empty() || writeConcern.wNumNodes > 0) { + request->setWriteConcern(writeConcern.toBSON()); + } +} + +} // namespace + std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocumentKeyFields( OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const { invariant(serverGlobalParams.clusterRole == ClusterRole::ShardServer); @@ -121,15 +133,19 @@ std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocume void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) { BatchedCommandResponse response; BatchWriteExecStats stats; - ClusterWriter::write( - expCtx->opCtx, - BatchedCommandRequest(buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)), - &stats, - &response); + BatchedCommandRequest insertCommand( + buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); + + // If applicable, attach a write concern to the batched command request. + attachWriteConcern(&insertCommand, wc); + + ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response); + // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); } @@ -138,19 +154,24 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) { BatchedCommandResponse response; BatchWriteExecStats stats; - ClusterWriter::write(expCtx->opCtx, - BatchedCommandRequest(buildUpdateOp(ns, - std::move(queries), - std::move(updates), - upsert, - multi, - expCtx->bypassDocumentValidation)), - &stats, - &response); + + BatchedCommandRequest updateCommand(buildUpdateOp(ns, + std::move(queries), + std::move(updates), + upsert, + multi, + expCtx->bypassDocumentValidation)); + + // If applicable, attach a write concern to the batched command request. + attachWriteConcern(&updateCommand, wc); + + ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response); + // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Update failed: "); } diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index 1550b9a6a62..39d550fc083 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -52,7 +52,8 @@ public: */ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) final; + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) final; /** * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, @@ -62,6 +63,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) final; }; diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 660ee570b43..27ef61d9b7e 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -155,7 +155,8 @@ Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss, void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) { auto writeResults = performInserts( expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); @@ -176,6 +177,7 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) { auto writeResults = performUpdates(expCtx->opCtx, diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index e02ac9e6339..de64e9b0578 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -58,11 +58,13 @@ public: bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) override; + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) override; void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) override; CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index c085850eccc..e822be1e7b1 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -61,7 +61,8 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, - std::vector<BSONObj>&& objs) override { + std::vector<BSONObj>&& objs, + const WriteConcernOptions& wc) override { MONGO_UNREACHABLE; } @@ -69,6 +70,7 @@ public: const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, + const WriteConcernOptions& wc, bool upsert, bool multi) final { MONGO_UNREACHABLE; diff --git a/src/mongo/s/commands/cluster_current_op.cpp b/src/mongo/s/commands/cluster_current_op.cpp index c95fe33280e..84e527fde77 100644 --- a/src/mongo/s/commands/cluster_current_op.cpp +++ b/src/mongo/s/commands/cluster_current_op.cpp @@ -72,16 +72,12 @@ private: virtual StatusWith<CursorResponse> runAggregation( OperationContext* opCtx, const AggregationRequest& request) const final { - auto aggCmdObj = request.serializeToCommandObj().toBson(); auto nss = request.getNamespaceString(); BSONObjBuilder responseBuilder; - auto status = ClusterAggregate::runAggregate(opCtx, - ClusterAggregate::Namespaces{nss, nss}, - request, - std::move(aggCmdObj), - &responseBuilder); + auto status = ClusterAggregate::runAggregate( + opCtx, ClusterAggregate::Namespaces{nss, nss}, request, &responseBuilder); if (!status.isOK()) { return status; diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 260de4dc13e..f1cb773895f 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -78,12 +78,8 @@ public: const auto& nss = aggregationRequest.getNamespaceString(); try { - uassertStatusOK( - ClusterAggregate::runAggregate(opCtx, - ClusterAggregate::Namespaces{nss, nss}, - aggregationRequest, - cmdObj, - result)); + uassertStatusOK(ClusterAggregate::runAggregate( + opCtx, ClusterAggregate::Namespaces{nss, nss}, aggregationRequest, result)); } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { // If the aggregation failed because the namespace is a view, re-run the command // with the resolved view pipeline and namespace. diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 6a94072cd5f..76617c91f05 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -55,6 +55,7 @@ #include "mongo/db/query/find_common.h" #include "mongo/db/views/resolved_view.h" #include "mongo/db/views/view.h" +#include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" @@ -89,6 +90,15 @@ constexpr unsigned ClusterAggregate::kMaxViewRetries; namespace { +Shard::RetryPolicy getDesiredRetryPolicy(const AggregationRequest& req) { + // The idempotent retry policy will retry even for writeConcern failures, so only set it if the + // pipeline does not support writeConcern. + if (req.getWriteConcern()) { + return Shard::RetryPolicy::kNotIdempotent; + } + return Shard::RetryPolicy::kIdempotent; +} + // Given a document representing an aggregation command such as // // {aggregate: "myCollection", pipeline: [], ...}, @@ -218,15 +228,12 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, const boost::optional<ShardId>& shardId, Pipeline* pipeline, - const BSONObj& originalCmdObj, BSONObj collationObj) { // Create the command for the shards. MutableDocument targetedCmd(request.serializeToCommandObj()); if (pipeline) { targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); } - // This pipeline is not split, ensure that the write concern is propagated if present. - targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj); } @@ -247,10 +254,17 @@ BSONObj createCommandForTargetedShards( // send to the shards. targetedCmd[AggregationRequest::kPipelineName] = Value(splitPipeline.shardsPipeline->serialize()); + // When running on many shards with the exchange we may not need merging. if (needsMerge) { targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + + // For split pipelines which need merging, do *not* propagate the writeConcern to the shards + // part. Otherwise this is part of an exchange and in that case we should include the + // writeConcern. + targetedCmd[WriteConcernOptions::kWriteConcernField] = Value(); } + targetedCmd[AggregationRequest::kCursorName] = Value(DOC(AggregationRequest::kBatchSizeName << 0)); @@ -263,14 +277,12 @@ BSONObj createCommandForTargetedShards( BSONObj createCommandForMergingShard(const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const BSONObj originalCmdObj, const ShardId& shardId, const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); mergeCmd[AggregationRequest::kFromMongosName] = Value(true); - mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); // If the user didn't specify a collation already, make sure there's a collation attached to // the merge command, since the merging shard may not have the collection metadata. @@ -296,14 +308,14 @@ std::vector<RemoteCursor> establishShardCursors( const LiteParsedPipeline& litePipe, boost::optional<CachedCollectionRoutingInfo>& routingInfo, const BSONObj& cmdObj, + const AggregationRequest& request, const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { + const BSONObj& shardQuery) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; const bool mustRunOnAll = mustRunOnAllShards(nss, litePipe); std::set<ShardId> shardIds = - getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, collation); + getTargetedShards(opCtx, mustRunOnAll, routingInfo, shardQuery, request.getCollation()); std::vector<std::pair<ShardId, BSONObj>> requests; // If we don't need to run on all shards, then we should always have a valid routing table. @@ -345,7 +357,8 @@ std::vector<RemoteCursor> establishShardCursors( nss, readPref, requests, - false /* do not allow partial results */); + false /* do not allow partial results */, + getDesiredRetryPolicy(request)); } struct DispatchShardPipelineResults { @@ -384,7 +397,6 @@ struct DispatchShardPipelineResults { DispatchShardPipelineResults dispatchShardPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, - BSONObj originalCmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, @@ -406,8 +418,6 @@ DispatchShardPipelineResults dispatchShardPipeline( const auto shardQuery = pipeline->getInitialQuery(); - boost::optional<SplitPipeline> splitPipeline; - auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, executionNss); // If this is a $changeStream, we swallow NamespaceNotFound exceptions and continue. @@ -440,6 +450,7 @@ DispatchShardPipelineResults dispatchShardPipeline( *(shardIds.begin()) != executionNsRoutingInfo->db().primaryId())); boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec; + boost::optional<SplitPipeline> splitPipeline; if (needsSplit) { splitPipeline = cluster_aggregation_planner::splitPipeline(std::move(pipeline)); @@ -453,7 +464,7 @@ DispatchShardPipelineResults dispatchShardPipeline( ? createCommandForTargetedShards( opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) : createPassthroughCommandForShard( - opCtx, aggRequest, boost::none, pipeline.get(), originalCmdObj, collationObj); + opCtx, aggRequest, boost::none, pipeline.get(), collationObj); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for @@ -497,9 +508,9 @@ DispatchShardPipelineResults dispatchShardPipeline( liteParsedPipeline, executionNsRoutingInfo, targetedCommand, + aggRequest, ReadPreferenceSetting::get(opCtx), - shardQuery, - aggRequest.getCollation()); + shardQuery); invariant(cursors.size() % shardIds.size() == 0, str::stream() << "Number of cursors (" << cursors.size() << ") is not a multiple of producers (" @@ -533,7 +544,6 @@ DispatchShardPipelineResults dispatchShardPipeline( DispatchShardPipelineResults dispatchExchangeConsumerPipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& executionNss, - BSONObj originalCmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, BSONObj collationObj, @@ -666,6 +676,7 @@ Status appendExplainResults(DispatchShardPipelineResults&& dispatchResults, Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const NamespaceString& nss, + const AggregationRequest& request, const BSONObj mergeCmdObj, const ShardId& mergingShardId) { if (MONGO_FAIL_POINT(clusterAggregateFailToEstablishMergingShardCursor)) { @@ -677,19 +688,15 @@ Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx, const auto mergingShard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId)); - return uassertStatusOK( - mergingShard->runCommandWithFixedRetryAttempts(opCtx, - ReadPreferenceSetting::get(opCtx), - nss.db().toString(), - mergeCmdObj, - Shard::RetryPolicy::kIdempotent)); + Shard::RetryPolicy retryPolicy = getDesiredRetryPolicy(request); + return uassertStatusOK(mergingShard->runCommandWithFixedRetryAttempts( + opCtx, ReadPreferenceSetting::get(opCtx), nss.db().toString(), mergeCmdObj, retryPolicy)); } BSONObj establishMergingMongosCursor( OperationContext* opCtx, const AggregationRequest& request, const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, const LiteParsedPipeline& liteParsedPipeline, std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging) { @@ -963,7 +970,6 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, const LiteParsedPipeline& litePipe, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, BSONObjBuilder* result) { @@ -982,8 +988,8 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx !pipeline->getSources().front()->constraints().requiresInputDocSource); // Register the new mongoS cursor, and retrieve the initial batch of results. - auto cursorResponse = establishMergingMongosCursor( - opCtx, request, requestedNss, cmdObj, litePipe, std::move(pipeline)); + auto cursorResponse = + establishMergingMongosCursor(opCtx, request, requestedNss, litePipe, std::move(pipeline)); // We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline // can never run on mongoS. Filter the command response and return immediately. @@ -994,7 +1000,6 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, const ClusterAggregate::Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, const LiteParsedPipeline& litePipe, const boost::optional<CachedCollectionRoutingInfo>& routingInfo, DispatchShardPipelineResults&& shardDispatchResults, @@ -1027,7 +1032,6 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex return runPipelineOnMongoS(expCtx, namespaces, request, - shardDispatchResults.commandForTargetedShards, litePipe, std::move(shardDispatchResults.splitPipeline->mergePipeline), result); @@ -1049,12 +1053,11 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex targetedShards, routingInfo->db().primaryId()); - auto mergeCmdObj = - createCommandForMergingShard(request, expCtx, cmdObj, mergingShardId, mergePipeline); + auto mergeCmdObj = createCommandForMergingShard(request, expCtx, mergingShardId, mergePipeline); // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. - auto mergeResponse = - establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId); + auto mergeResponse = establishMergingShardCursor( + opCtx, namespaces.executionNss, request, mergeCmdObj, mergingShardId); auto mergeCursorResponse = uassertStatusOK(storePossibleCursor( opCtx, namespaces.requestedNss, mergingShardId, mergeResponse, expCtx->tailableMode)); @@ -1084,7 +1087,6 @@ void appendEmptyResultSetWithStatus(OperationContext* opCtx, Status ClusterAggregate::runAggregate(OperationContext* opCtx, const Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, BSONObjBuilder* result) { auto executionNsRoutingInfoStatus = getExecutionNsRoutingInfo(opCtx, namespaces.executionNss); boost::optional<CachedCollectionRoutingInfo> routingInfo; @@ -1119,7 +1121,7 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, litePipe.allowedToForwardFromMongos() && litePipe.allowedToPassthroughFromMongos()) { resolveInvolvedNamespaces(opCtx, litePipe); const auto primaryShardId = routingInfo->db().primary()->getId(); - return aggPassthrough(opCtx, namespaces, primaryShardId, cmdObj, request, litePipe, result); + return aggPassthrough(opCtx, namespaces, primaryShardId, request, litePipe, result); } // Populate the collection UUID and the appropriate collation to use. @@ -1147,17 +1149,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } return runPipelineOnMongoS( - expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result); + expCtx, namespaces, request, litePipe, std::move(pipeline), result); } // If not, split the pipeline as necessary and dispatch to the relevant shards. - auto shardDispatchResults = dispatchShardPipeline(expCtx, - namespaces.executionNss, - cmdObj, - request, - litePipe, - std::move(pipeline), - collationObj); + auto shardDispatchResults = dispatchShardPipeline( + expCtx, namespaces.executionNss, request, litePipe, std::move(pipeline), collationObj); // If the operation is an explain, then we verify that it succeeded on all targeted shards, // write the results to the output builder, and return immediately. @@ -1183,7 +1180,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, if (shardDispatchResults.exchangeSpec) { shardDispatchResults = dispatchExchangeConsumerPipeline(expCtx, namespaces.executionNss, - cmdObj, request, litePipe, collationObj, @@ -1194,7 +1190,6 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, return dispatchMergingPipeline(expCtx, namespaces, request, - cmdObj, litePipe, routingInfo, std::move(shardDispatchResults), @@ -1223,7 +1218,6 @@ void ClusterAggregate::uassertAllShardsSupportExplain( Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, const ShardId& shardId, - BSONObj cmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, BSONObjBuilder* out) { @@ -1241,8 +1235,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. - cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, cmdObj, BSONObj())); + BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough( + createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, BSONObj())); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, @@ -1295,7 +1289,6 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx, } auto resolvedAggRequest = resolvedView.asExpandedViewAggregation(request); - auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); result->resetToEmpty(); if (auto txnRouter = TransactionRouter::get(opCtx)) { @@ -1310,8 +1303,7 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx, nsStruct.requestedNss = requestedNss; nsStruct.executionNss = resolvedView.getNamespace(); - auto status = - ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result); + auto status = ClusterAggregate::runAggregate(opCtx, nsStruct, resolvedAggRequest, result); // If the underlying namespace was changed to a view during retry, then re-run the aggregation // on the new resolved namespace. diff --git a/src/mongo/s/query/cluster_aggregate.h b/src/mongo/s/query/cluster_aggregate.h index 3438c71b0b1..822ad3cc078 100644 --- a/src/mongo/s/query/cluster_aggregate.h +++ b/src/mongo/s/query/cluster_aggregate.h @@ -77,14 +77,11 @@ public: * over which the aggregation will actually execute. Typically these two namespaces are the * same, but they may differ in the case of a query on a view. * - * The raw aggregate command parameters should be passed in 'cmdObj'. - * * On success, fills out 'result' with the command response. */ static Status runAggregate(OperationContext* opCtx, const Namespaces& namespaces, const AggregationRequest& request, - BSONObj cmdObj, BSONObjBuilder* result); /** @@ -104,22 +101,9 @@ private: static void uassertAllShardsSupportExplain( const std::vector<AsyncRequestsSender::Response>& shardResults); - // These are temporary hacks because the runCommand method doesn't report the exact - // host the command was run on which is necessary for cursor support. The exact host - // could be different from conn->getServerAddress() for connections that map to - // multiple servers such as for replica sets. These also take care of registering - // returned cursors. - static BSONObj aggRunCommand(OperationContext* opCtx, - const ShardId& shardId, - DBClientBase* conn, - const Namespaces& namespaces, - const AggregationRequest& aggRequest, - BSONObj cmd); - static Status aggPassthrough(OperationContext*, const Namespaces&, const ShardId&, - BSONObj cmd, const AggregationRequest&, const LiteParsedPipeline&, BSONObjBuilder* result); diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index b9881c03d5f..0aff00c9c34 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -55,7 +55,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, const NamespaceString& nss, const ReadPreferenceSetting readPref, const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults) { + bool allowPartialResults, + Shard::RetryPolicy retryPolicy) { // Construct the requests std::vector<AsyncRequestsSender::Request> requests; for (const auto& remote : remotes) { @@ -63,12 +64,8 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } // Send the requests - MultiStatementTransactionRequestsSender ars(opCtx, - executor, - nss.db().toString(), - std::move(requests), - readPref, - Shard::RetryPolicy::kIdempotent); + MultiStatementTransactionRequestsSender ars( + opCtx, executor, nss.db().toString(), std::move(requests), readPref, retryPolicy); std::vector<RemoteCursor> remoteCursors; try { diff --git a/src/mongo/s/query/establish_cursors.h b/src/mongo/s/query/establish_cursors.h index e07a3d8b328..2c956b50b18 100644 --- a/src/mongo/s/query/establish_cursors.h +++ b/src/mongo/s/query/establish_cursors.h @@ -38,6 +38,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/db/cursor_id.h" #include "mongo/executor/task_executor.h" +#include "mongo/s/client/shard.h" #include "mongo/s/query/async_results_merger_params_gen.h" #include "mongo/stdx/mutex.h" #include "mongo/util/net/hostandport.h" @@ -63,12 +64,14 @@ class CursorResponse; * on reachable hosts are returned. * */ -std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, - executor::TaskExecutor* executor, - const NamespaceString& nss, - const ReadPreferenceSetting readPref, - const std::vector<std::pair<ShardId, BSONObj>>& remotes, - bool allowPartialResults); +std::vector<RemoteCursor> establishCursors( + OperationContext* opCtx, + executor::TaskExecutor* executor, + const NamespaceString& nss, + const ReadPreferenceSetting readPref, + const std::vector<std::pair<ShardId, BSONObj>>& remotes, + bool allowPartialResults, + Shard::RetryPolicy retryPolicy = Shard::RetryPolicy::kIdempotent); /** * Schedules a remote killCursor command for each of the cursors in 'remoteCursors'. |