diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-14 13:07:03 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-20 14:56:12 -0400 |
commit | 0ef6e36f8c1d79801bca13b2adcf4a908a2bb720 (patch) | |
tree | bc1da17246590deb3ce886ab0661a81361fef6a5 | |
parent | bb9f6662e1f98b633df4d22082b5810d786fb620 (diff) | |
download | mongo-0ef6e36f8c1d79801bca13b2adcf4a908a2bb720.tar.gz |
SERVER-33323 Refactor cluster_aggregate logic
Attempts to make it more obvious how commands for the shards are
generated while also removing some methods from the Pipeline API.
-rw-r--r-- | src/mongo/db/pipeline/pipeline.h | 19 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 130 |
2 files changed, 75 insertions, 74 deletions
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index d4a2c813060..d3e99daae31 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -180,23 +180,8 @@ public: } /** - * Returns true if this pipeline is the part of a split pipeline which should be targeted to the - * shards. - */ - bool isSplitForShards() const { - return _splitState == SplitState::kSplitForShards; - } - - /** - * Returns true if this pipeline is the part of a split pipeline which is responsible for - * merging the results from the shards. - */ - bool isSplitForMerge() const { - return _splitState == SplitState::kSplitForMerge; - } - - /** If the pipeline starts with a $match, return its BSON predicate. - * Returns empty BSON if the first stage isn't $match. + * If the pipeline starts with a stage which is or includes a query predicate (e.g. a $match), + * returns a BSON object representing that query. Otherwise, returns an empty BSON object. */ BSONObj getInitialQuery() const; diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 75d4db92668..151de50c736 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -162,60 +162,80 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx, return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation); } -BSONObj createCommandForTargetedShards( - OperationContext* opCtx, - const AggregationRequest& request, - const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards, - const BSONObj collationObj, - boost::optional<LogicalTime> atClusterTime) { - - // Create the command for the shards. - MutableDocument targetedCmd(request.serializeToCommandObj()); - targetedCmd[AggregationRequest::kFromMongosName] = Value(true); - - // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough. - if (pipelineForTargetedShards) { - targetedCmd[AggregationRequest::kPipelineName] = - Value(pipelineForTargetedShards->serialize()); - - if (pipelineForTargetedShards->isSplitForShards()) { - targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); - targetedCmd[AggregationRequest::kCursorName] = - Value(DOC(AggregationRequest::kBatchSizeName << 0)); - } - } - - // If this pipeline is not split, ensure that the write concern is propagated if present. - if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) { - targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); - } - +/** + * Appends information to the command sent to the shards which should be appended both if this is a + * passthrough sent to a single shard and if this is a split pipeline. + */ +BSONObj genericTransformForShards(MutableDocument&& cmdForShards, + OperationContext* opCtx, + const AggregationRequest& request, + BSONObj collationObj, + boost::optional<LogicalTime> atClusterTime) { + cmdForShards[AggregationRequest::kFromMongosName] = Value(true); // If this is a request for an aggregation explain, then we must wrap the aggregate inside an // explain command. if (auto explainVerbosity = request.getExplain()) { - targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity)); + cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity)); } if (!collationObj.isEmpty()) { - targetedCmd[AggregationRequest::kCollationName] = Value(collationObj); + cmdForShards[AggregationRequest::kCollationName] = Value(collationObj); } if (opCtx->getTxnNumber()) { - invariant(!targetedCmd.hasField(OperationSessionInfo::kTxnNumberFieldName)); - targetedCmd[OperationSessionInfo::kTxnNumberFieldName] = + invariant(!cmdForShards.hasField(OperationSessionInfo::kTxnNumberFieldName)); + cmdForShards[OperationSessionInfo::kTxnNumberFieldName] = Value(static_cast<long long>(*opCtx->getTxnNumber())); } - // TODO: SERVER-34078 BSONObj cmdObj = - (atClusterTime ? appendAtClusterTime(targetedCmd.freeze().toBson(), *atClusterTime) - : targetedCmd.freeze().toBson()); + (atClusterTime ? appendAtClusterTime(cmdForShards.freeze().toBson(), *atClusterTime) + : cmdForShards.freeze().toBson()); // agg creates temp collection and should handle implicit create separately. return appendAllowImplicitCreate(cmdObj, true); } +BSONObj createPassthroughCommandForShard(OperationContext* opCtx, + const AggregationRequest& request, + Pipeline* pipeline, + const BSONObj& originalCmdObj, + BSONObj collationObj, + boost::optional<LogicalTime> atClusterTime) { + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + if (pipeline) { + targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize()); + } + // This pipeline is not split, ensure that the write concern is propagated if present. + targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); + + return genericTransformForShards( + std::move(targetedCmd), opCtx, request, collationObj, atClusterTime); +} + +BSONObj createCommandForTargetedShards(OperationContext* opCtx, + const AggregationRequest& request, + const SplitPipeline& splitPipeline, + const BSONObj collationObj, + boost::optional<LogicalTime> atClusterTime) { + + // Create the command for the shards. + MutableDocument targetedCmd(request.serializeToCommandObj()); + // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it + // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may + // have detected a logged in user and appended that user name to the $listSessions spec to + // send to the shards. + targetedCmd[AggregationRequest::kPipelineName] = + Value(splitPipeline.shardsPipeline->serialize()); + targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true); + targetedCmd[AggregationRequest::kCursorName] = + Value(DOC(AggregationRequest::kBatchSizeName << 0)); + + return genericTransformForShards( + std::move(targetedCmd), opCtx, request, collationObj, atClusterTime); +} + BSONObj createCommandForMergingShard(const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const BSONObj originalCmdObj, @@ -387,14 +407,10 @@ DispatchShardPipelineResults dispatchShardPipeline( // Generate the command object for the targeted shards. BSONObj targetedCommand = splitPipeline - ? createCommandForTargetedShards(opCtx, - aggRequest, - originalCmdObj, - splitPipeline->shardsPipeline, - collationObj, - atClusterTime) - : createCommandForTargetedShards( - opCtx, aggRequest, originalCmdObj, pipeline, collationObj, atClusterTime); + ? createCommandForTargetedShards( + opCtx, aggRequest, *splitPipeline, collationObj, atClusterTime) + : createPassthroughCommandForShard( + opCtx, aggRequest, pipeline.get(), originalCmdObj, collationObj, atClusterTime); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for @@ -780,8 +796,8 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx const LiteParsedPipeline& litePipe, std::unique_ptr<Pipeline, PipelineDeleter> pipeline, BSONObjBuilder* result) { - // We should never receive a pipeline intended for the shards, or which cannot run on mongoS. - invariant(!pipeline->isSplitForShards()); + // We should never receive a pipeline which cannot run on mongoS. + invariant(!expCtx->explain); invariant(pipeline->canRunOnMongos()); const auto& requestedNss = namespaces.requestedNss; @@ -792,15 +808,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx str::stream() << "Aggregation pipeline must be run on mongoS, but " << pipeline->getSources().front()->getSourceName() << " is not capable of producing input", - pipeline->isSplitForMerge() || - !pipeline->getSources().front()->constraints().requiresInputDocSource); - - if (expCtx->explain && !pipeline->isSplitForMerge()) { - *result << "splitPipeline" << BSONNULL << "mongos" - << Document{{"host", getHostNameCachedAndPort()}, - {"stages", pipeline->writeExplainOps(*expCtx->explain)}}; - return Status::OK(); - } + !pipeline->getSources().front()->constraints().requiresInputDocSource); // Register the new mongoS cursor, and retrieve the initial batch of results. auto cursorResponse = establishMergingMongosCursor( @@ -951,6 +959,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // Check whether the entire pipeline must be run on mongoS. if (pipeline->requiredToRunOnMongos()) { + // If this is an explain write the explain output and return. + if (expCtx->explain) { + *result << "splitPipeline" << BSONNULL << "mongos" + << Document{{"host", getHostNameCachedAndPort()}, + {"stages", pipeline->writeExplainOps(*expCtx->explain)}}; + return Status::OK(); + } + return runPipelineOnMongoS( expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result); } @@ -1034,8 +1050,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. - cmdObj = CommandHelpers::filterCommandRequestForPassthrough(createCommandForTargetedShards( - opCtx, aggRequest, cmdObj, nullptr, BSONObj(), atClusterTime)); + cmdObj = CommandHelpers::filterCommandRequestForPassthrough(createPassthroughCommandForShard( + opCtx, aggRequest, nullptr, cmdObj, BSONObj(), atClusterTime)); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, |