summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-08-14 13:07:03 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-08-20 14:56:12 -0400
commit0ef6e36f8c1d79801bca13b2adcf4a908a2bb720 (patch)
treebc1da17246590deb3ce886ab0661a81361fef6a5
parentbb9f6662e1f98b633df4d22082b5810d786fb620 (diff)
downloadmongo-0ef6e36f8c1d79801bca13b2adcf4a908a2bb720.tar.gz
SERVER-33323 Refactor cluster_aggregate logic
Attempts to make it more obvious how commands for the shards are generated while also removing some methods from the Pipeline API.
-rw-r--r--src/mongo/db/pipeline/pipeline.h19
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp130
2 files changed, 75 insertions, 74 deletions
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index d4a2c813060..d3e99daae31 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -180,23 +180,8 @@ public:
}
/**
- * Returns true if this pipeline is the part of a split pipeline which should be targeted to the
- * shards.
- */
- bool isSplitForShards() const {
- return _splitState == SplitState::kSplitForShards;
- }
-
- /**
- * Returns true if this pipeline is the part of a split pipeline which is responsible for
- * merging the results from the shards.
- */
- bool isSplitForMerge() const {
- return _splitState == SplitState::kSplitForMerge;
- }
-
- /** If the pipeline starts with a $match, return its BSON predicate.
- * Returns empty BSON if the first stage isn't $match.
+ * If the pipeline starts with a stage which is or includes a query predicate (e.g. a $match),
+ * returns a BSON object representing that query. Otherwise, returns an empty BSON object.
*/
BSONObj getInitialQuery() const;
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 75d4db92668..151de50c736 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -162,60 +162,80 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx,
return getTargetedShardsForQuery(opCtx, *routingInfo, shardQuery, collation);
}
-BSONObj createCommandForTargetedShards(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards,
- const BSONObj collationObj,
- boost::optional<LogicalTime> atClusterTime) {
-
- // Create the command for the shards.
- MutableDocument targetedCmd(request.serializeToCommandObj());
- targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
-
- // If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough.
- if (pipelineForTargetedShards) {
- targetedCmd[AggregationRequest::kPipelineName] =
- Value(pipelineForTargetedShards->serialize());
-
- if (pipelineForTargetedShards->isSplitForShards()) {
- targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
- targetedCmd[AggregationRequest::kCursorName] =
- Value(DOC(AggregationRequest::kBatchSizeName << 0));
- }
- }
-
- // If this pipeline is not split, ensure that the write concern is propagated if present.
- if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) {
- targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
- }
-
+/**
+ * Appends information to the command sent to the shards which should be appended both if this is a
+ * passthrough sent to a single shard and if this is a split pipeline.
+ */
+BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
+ OperationContext* opCtx,
+ const AggregationRequest& request,
+ BSONObj collationObj,
+ boost::optional<LogicalTime> atClusterTime) {
+ cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (auto explainVerbosity = request.getExplain()) {
- targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity));
+ cmdForShards.reset(wrapAggAsExplain(cmdForShards.freeze(), *explainVerbosity));
}
if (!collationObj.isEmpty()) {
- targetedCmd[AggregationRequest::kCollationName] = Value(collationObj);
+ cmdForShards[AggregationRequest::kCollationName] = Value(collationObj);
}
if (opCtx->getTxnNumber()) {
- invariant(!targetedCmd.hasField(OperationSessionInfo::kTxnNumberFieldName));
- targetedCmd[OperationSessionInfo::kTxnNumberFieldName] =
+ invariant(!cmdForShards.hasField(OperationSessionInfo::kTxnNumberFieldName));
+ cmdForShards[OperationSessionInfo::kTxnNumberFieldName] =
Value(static_cast<long long>(*opCtx->getTxnNumber()));
}
- // TODO: SERVER-34078
BSONObj cmdObj =
- (atClusterTime ? appendAtClusterTime(targetedCmd.freeze().toBson(), *atClusterTime)
- : targetedCmd.freeze().toBson());
+ (atClusterTime ? appendAtClusterTime(cmdForShards.freeze().toBson(), *atClusterTime)
+ : cmdForShards.freeze().toBson());
// agg creates temp collection and should handle implicit create separately.
return appendAllowImplicitCreate(cmdObj, true);
}
+BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
+ const AggregationRequest& request,
+ Pipeline* pipeline,
+ const BSONObj& originalCmdObj,
+ BSONObj collationObj,
+ boost::optional<LogicalTime> atClusterTime) {
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ if (pipeline) {
+ targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
+ }
+ // This pipeline is not split, ensure that the write concern is propagated if present.
+ targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
+
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, request, collationObj, atClusterTime);
+}
+
+BSONObj createCommandForTargetedShards(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const SplitPipeline& splitPipeline,
+ const BSONObj collationObj,
+ boost::optional<LogicalTime> atClusterTime) {
+
+ // Create the command for the shards.
+ MutableDocument targetedCmd(request.serializeToCommandObj());
+ // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
+ // has defaulted any arguments or otherwise changed the spec. For example, $listSessions may
+ // have detected a logged in user and appended that user name to the $listSessions spec to
+ // send to the shards.
+ targetedCmd[AggregationRequest::kPipelineName] =
+ Value(splitPipeline.shardsPipeline->serialize());
+ targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
+ targetedCmd[AggregationRequest::kCursorName] =
+ Value(DOC(AggregationRequest::kBatchSizeName << 0));
+
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, request, collationObj, atClusterTime);
+}
+
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const BSONObj originalCmdObj,
@@ -387,14 +407,10 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Generate the command object for the targeted shards.
BSONObj targetedCommand = splitPipeline
- ? createCommandForTargetedShards(opCtx,
- aggRequest,
- originalCmdObj,
- splitPipeline->shardsPipeline,
- collationObj,
- atClusterTime)
- : createCommandForTargetedShards(
- opCtx, aggRequest, originalCmdObj, pipeline, collationObj, atClusterTime);
+ ? createCommandForTargetedShards(
+ opCtx, aggRequest, *splitPipeline, collationObj, atClusterTime)
+ : createPassthroughCommandForShard(
+ opCtx, aggRequest, pipeline.get(), originalCmdObj, collationObj, atClusterTime);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
@@ -780,8 +796,8 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
const LiteParsedPipeline& litePipe,
std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
BSONObjBuilder* result) {
- // We should never receive a pipeline intended for the shards, or which cannot run on mongoS.
- invariant(!pipeline->isSplitForShards());
+ // We should never receive a pipeline which cannot run on mongoS.
+ invariant(!expCtx->explain);
invariant(pipeline->canRunOnMongos());
const auto& requestedNss = namespaces.requestedNss;
@@ -792,15 +808,7 @@ Status runPipelineOnMongoS(const boost::intrusive_ptr<ExpressionContext>& expCtx
str::stream() << "Aggregation pipeline must be run on mongoS, but "
<< pipeline->getSources().front()->getSourceName()
<< " is not capable of producing input",
- pipeline->isSplitForMerge() ||
- !pipeline->getSources().front()->constraints().requiresInputDocSource);
-
- if (expCtx->explain && !pipeline->isSplitForMerge()) {
- *result << "splitPipeline" << BSONNULL << "mongos"
- << Document{{"host", getHostNameCachedAndPort()},
- {"stages", pipeline->writeExplainOps(*expCtx->explain)}};
- return Status::OK();
- }
+ !pipeline->getSources().front()->constraints().requiresInputDocSource);
// Register the new mongoS cursor, and retrieve the initial batch of results.
auto cursorResponse = establishMergingMongosCursor(
@@ -951,6 +959,14 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Check whether the entire pipeline must be run on mongoS.
if (pipeline->requiredToRunOnMongos()) {
+ // If this is an explain write the explain output and return.
+ if (expCtx->explain) {
+ *result << "splitPipeline" << BSONNULL << "mongos"
+ << Document{{"host", getHostNameCachedAndPort()},
+ {"stages", pipeline->writeExplainOps(*expCtx->explain)}};
+ return Status::OK();
+ }
+
return runPipelineOnMongoS(
expCtx, namespaces, request, cmdObj, litePipe, std::move(pipeline), result);
}
@@ -1034,8 +1050,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
- cmdObj = CommandHelpers::filterCommandRequestForPassthrough(createCommandForTargetedShards(
- opCtx, aggRequest, cmdObj, nullptr, BSONObj(), atClusterTime));
+ cmdObj = CommandHelpers::filterCommandRequestForPassthrough(createPassthroughCommandForShard(
+ opCtx, aggRequest, nullptr, cmdObj, BSONObj(), atClusterTime));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,