/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
#include "mongo/platform/basic.h"
#include "mongo/s/commands/cluster_aggregate.h"
#include
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
#include "mongo/db/curop.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/db/views/view.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/commands/pipeline_s.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/cluster_query_knobs.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_stage_update_on_add_shard.h"
#include "mongo/s/query/store_possible_cursor.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
#include "mongo/util/net/sock.h"
namespace mongo {
MONGO_FP_DECLARE(clusterAggregateHangBeforeEstablishingShardCursors);
namespace {
// Given a document representing an aggregation command such as
//
// {aggregate: "myCollection", pipeline: [], ...},
//
// produces the corresponding explain command:
//
// {explain: {aggregate: "myCollection", pipline: [], ...}, $queryOptions: {...}, verbosity: ...}
Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity verbosity) {
MutableDocument explainCommandBuilder;
explainCommandBuilder["explain"] = Value(aggregateCommand);
// Downstream host targeting code expects queryOptions at the top level of the command object.
explainCommandBuilder[QueryRequest::kUnwrappedReadPrefField] =
Value(aggregateCommand[QueryRequest::kUnwrappedReadPrefField]);
// readConcern needs to be promoted to the top-level of the request.
explainCommandBuilder[repl::ReadConcernArgs::kReadConcernFieldName] =
Value(aggregateCommand[repl::ReadConcernArgs::kReadConcernFieldName]);
// Add explain command options.
for (auto&& explainOption : ExplainOptions::toBSON(verbosity)) {
explainCommandBuilder[explainOption.fieldNameStringData()] = Value(explainOption);
}
return explainCommandBuilder.freeze();
}
Status appendExplainResults(
const std::vector& shardResults,
const boost::intrusive_ptr& mergeCtx,
const std::unique_ptr& pipelineForTargetedShards,
const std::unique_ptr& pipelineForMerging,
BSONObjBuilder* result) {
if (pipelineForTargetedShards->isSplitForShards()) {
*result << "mergeType"
<< (pipelineForMerging->canRunOnMongos()
? "mongos"
: pipelineForMerging->needsPrimaryShardMerger() ? "primaryShard"
: "anyShard")
<< "splitPipeline"
<< Document{
{"shardsPart",
pipelineForTargetedShards->writeExplainOps(*mergeCtx->explain)},
{"mergerPart", pipelineForMerging->writeExplainOps(*mergeCtx->explain)}};
} else {
*result << "splitPipeline" << BSONNULL;
}
BSONObjBuilder shardExplains(result->subobjStart("shards"));
for (const auto& shardResult : shardResults) {
invariant(shardResult.shardHostAndPort);
shardExplains.append(shardResult.shardId.toString(),
BSON("host" << shardResult.shardHostAndPort->toString() << "stages"
<< shardResult.swResponse.getValue().data["stages"]));
}
return Status::OK();
}
Status appendCursorResponseToCommandResult(const ShardId& shardId,
const BSONObj cursorResponse,
BSONObjBuilder* result) {
// If a write error was encountered, append it to the output buffer first.
if (auto wcErrorElem = cursorResponse["writeConcernError"]) {
appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *result);
}
// Pass the results from the remote shard into our command response.
result->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(cursorResponse));
return getStatusFromCommandResult(result->asTempObj());
}
bool mustRunOnAllShards(const NamespaceString& nss,
const CachedCollectionRoutingInfo& routingInfo,
const LiteParsedPipeline& litePipe) {
// Any collectionless aggregation like a $currentOp, and a change stream on a sharded collection
// must run on all shards.
const bool nsIsSharded = static_cast(routingInfo.cm());
return nss.isCollectionlessAggregateNS() || (nsIsSharded && litePipe.hasChangeStream());
}
StatusWith getExecutionNsRoutingInfo(OperationContext* opCtx,
const NamespaceString& execNss,
CatalogCache* catalogCache) {
// This call to getCollectionRoutingInfo will return !OK if the database does not exist.
auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(opCtx, execNss);
// Collectionless aggregations, however, may be run on 'admin' (which should always exist) but
// are subsequently targeted towards the shards. If getCollectionRoutingInfo is OK, we perform a
// further check that at least one shard exists if the aggregation is collectionless.
if (swRoutingInfo.isOK() && execNss.isCollectionlessAggregateNS()) {
std::vector shardIds;
Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
if (shardIds.size() == 0) {
return {ErrorCodes::NamespaceNotFound, "No shards are present in the cluster"};
}
}
return swRoutingInfo;
}
std::set getTargetedShards(OperationContext* opCtx,
bool mustRunOnAllShards,
const CachedCollectionRoutingInfo& routingInfo,
const BSONObj shardQuery,
const BSONObj collation) {
if (mustRunOnAllShards) {
// The pipeline begins with a stage which must be run on all shards.
std::vector shardIds;
Grid::get(opCtx)->shardRegistry()->getAllShardIdsNoReload(&shardIds);
return {shardIds.begin(), shardIds.end()};
}
return getTargetedShardsForQuery(opCtx, routingInfo, shardQuery, collation);
}
BSONObj createCommandForTargetedShards(
OperationContext* opCtx,
const AggregationRequest& request,
const BSONObj originalCmdObj,
const std::unique_ptr& pipelineForTargetedShards,
boost::optional atClusterTime) {
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
// If 'pipelineForTargetedShards' is 'nullptr', this is an unsharded direct passthrough.
if (pipelineForTargetedShards) {
targetedCmd[AggregationRequest::kPipelineName] =
Value(pipelineForTargetedShards->serialize());
if (pipelineForTargetedShards->isSplitForShards()) {
targetedCmd[AggregationRequest::kNeedsMergeName] = Value(true);
targetedCmd[AggregationRequest::kCursorName] =
Value(DOC(AggregationRequest::kBatchSizeName << 0));
}
}
// If this pipeline is not split, ensure that the write concern is propagated if present.
if (!pipelineForTargetedShards || !pipelineForTargetedShards->isSplitForShards()) {
targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
}
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
if (auto explainVerbosity = request.getExplain()) {
targetedCmd.reset(wrapAggAsExplain(targetedCmd.freeze(), *explainVerbosity));
}
if (opCtx->getTxnNumber()) {
invariant(!targetedCmd.hasField(OperationSessionInfo::kTxnNumberFieldName));
targetedCmd[OperationSessionInfo::kTxnNumberFieldName] =
Value(static_cast(*opCtx->getTxnNumber()));
}
// TODO: SERVER-34078
BSONObj cmdObj =
(atClusterTime ? appendAtClusterTime(targetedCmd.freeze().toBson(), *atClusterTime)
: targetedCmd.freeze().toBson());
// agg creates temp collection and should handle implicit create separately.
return appendAllowImplicitCreate(cmdObj, true);
}
BSONObj createCommandForMergingShard(
const AggregationRequest& request,
const boost::intrusive_ptr& mergeCtx,
const BSONObj originalCmdObj,
const std::unique_ptr& pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
mergeCmd[AggregationRequest::kFromMongosName] = Value(true);
mergeCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
// If the user didn't specify a collation already, make sure there's a collation attached to
// the merge command, since the merging shard may not have the collection metadata.
if (mergeCmd.peek()["collation"].missing()) {
mergeCmd["collation"] = mergeCtx->getCollator()
? Value(mergeCtx->getCollator()->getSpec().toBSON())
: Value(Document{CollationSpec::kSimpleSpec});
}
// agg creates temp collection and should handle implicit create separately.
return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true);
}
std::vector establishShardCursors(OperationContext* opCtx,
const NamespaceString& nss,
const LiteParsedPipeline& litePipe,
CachedCollectionRoutingInfo* routingInfo,
const BSONObj& cmdObj,
const ReadPreferenceSetting& readPref,
const BSONObj& shardQuery,
const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
bool mustRunOnAll = mustRunOnAllShards(nss, *routingInfo, litePipe);
std::set shardIds =
getTargetedShards(opCtx, mustRunOnAll, *routingInfo, shardQuery, collation);
std::vector> requests;
if (mustRunOnAll) {
// The pipeline contains a stage which must be run on all shards. Skip versioning and
// enqueue the raw command objects.
for (auto&& shardId : shardIds) {
requests.emplace_back(std::move(shardId), cmdObj);
}
} else if (routingInfo->cm()) {
// The collection is sharded. Use the routing table to decide which shards to target
// based on the query and collation, and build versioned requests for them.
for (auto& shardId : shardIds) {
auto versionedCmdObj =
appendShardVersion(cmdObj, routingInfo->cm()->getVersion(shardId));
requests.emplace_back(std::move(shardId), std::move(versionedCmdObj));
}
} else {
// The collection is unsharded. Target only the primary shard for the database.
// Don't append shard version info when contacting the config servers.
requests.emplace_back(routingInfo->db().primaryId(),
!routingInfo->db().primary()->isConfig()
? appendShardVersion(cmdObj, ChunkVersion::UNSHARDED())
: cmdObj);
}
if (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
log() << "clusterAggregateHangBeforeEstablishingShardCursors fail point enabled. Blocking "
"until fail point is disabled.";
while (MONGO_FAIL_POINT(clusterAggregateHangBeforeEstablishingShardCursors)) {
sleepsecs(1);
}
}
// If we reach this point, we're either trying to establish cursors on a sharded execution
// namespace, or handling the case where a sharded collection was dropped and recreated as
// unsharded. Since views cannot be sharded, and because we will return an error rather than
// attempting to continue in the event that a recreated namespace is a view, we do not handle
// ErrorCodes::CommandOnShardedViewNotSupportedOnMongod here.
try {
return establishCursors(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
nss,
readPref,
requests,
false /* do not allow partial results */);
} catch (const ExceptionForCat&) {
// If any shard returned a stale shardVersion error, invalidate the routing table cache.
// This will cause the cache to be refreshed the next time it is accessed.
Grid::get(opCtx)->catalogCache()->onStaleShardVersion(std::move(*routingInfo));
throw;
} catch (const ExceptionForCat&) {
// If any shard returned a snapshot error, recompute the atClusterTime.
throw;
}
}
struct DispatchShardPipelineResults {
// True if this pipeline was split, and the second half of the pipeline needs to be run on the
// primary shard for the database.
bool needsPrimaryShardMerge;
// Populated if this *is not* an explain, this vector represents the cursors on the remote
// shards.
std::vector remoteCursors;
// Populated if this *is* an explain, this vector represents the results from each shard.
std::vector remoteExplainOutput;
// The half of the pipeline that was sent to each shard, or the entire pipeline if there was
// only one shard targeted.
std::unique_ptr pipelineForTargetedShards;
// The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
std::unique_ptr pipelineForMerging;
// The command object to send to the targeted shards.
BSONObj commandForTargetedShards;
};
/**
* Targets shards for the pipeline and returns a struct with the remote cursors or results, and
* the pipeline that will need to be executed to merge the results from the remotes. If a stale
* shard version is encountered, refreshes the routing table and tries again.
*/
DispatchShardPipelineResults dispatchShardPipeline(
const boost::intrusive_ptr& expCtx,
const NamespaceString& executionNss,
BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr pipeline) {
// The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
// - Call establishShardCursors to dispatch the aggregation to the targeted shards.
// - If we get a staleConfig exception, re-evaluate whether we need to split the pipeline with
// the refreshed routing table data.
// - If the pipeline is not split and we now need to target multiple shards, split it. If the
// pipeline is already split and we now only need to target a single shard, reassemble the
// original pipeline.
// - After exhausting 10 attempts to establish the cursors, we give up and throw.
auto cursors = std::vector();
auto shardResults = std::vector();
auto opCtx = expCtx->opCtx;
const bool needsPrimaryShardMerge =
(pipeline->needsPrimaryShardMerger() || internalQueryAlwaysMergeOnPrimaryShard.load());
const bool needsMongosMerge = pipeline->needsMongosMerger();
const auto shardQuery = pipeline->getInitialQuery();
auto pipelineForTargetedShards = std::move(pipeline);
std::unique_ptr pipelineForMerging;
BSONObj targetedCommand;
int numAttempts = 0;
while (++numAttempts <= kMaxNumStaleVersionRetries) {
// We need to grab a new routing table at the start of each iteration, since a stale config
// exception will invalidate the previous one.
auto executionNsRoutingInfo = uassertStatusOK(
Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, executionNss));
// Determine whether we can run the entire aggregation on a single shard.
bool mustRunOnAll =
mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline);
std::set shardIds = getTargetedShards(
opCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, aggRequest.getCollation());
uassert(ErrorCodes::ShardNotFound,
"No targets were found for this aggregation. All shards were removed from the "
"cluster mid-operation",
shardIds.size() > 0);
auto atClusterTime = computeAtClusterTime(
opCtx, mustRunOnAll, shardIds, executionNss, shardQuery, aggRequest.getCollation());
invariant(!atClusterTime || *atClusterTime != LogicalTime::kUninitialized);
// Don't need to split the pipeline if we are only targeting a single shard, unless:
// - There is a stage that needs to be run on the primary shard and the single target shard
// is not the primary.
// - The pipeline contains one or more stages which must always merge on mongoS.
const bool needsSplit = (shardIds.size() > 1u || needsMongosMerge ||
(needsPrimaryShardMerge &&
*(shardIds.begin()) != executionNsRoutingInfo.db().primaryId()));
const bool isSplit = pipelineForTargetedShards->isSplitForShards();
// If we have to run on multiple shards and the pipeline is not yet split, split it. If we
// can run on a single shard and the pipeline is already split, reassemble it.
if (needsSplit && !isSplit) {
pipelineForMerging = std::move(pipelineForTargetedShards);
pipelineForTargetedShards = pipelineForMerging->splitForSharded();
} else if (!needsSplit && isSplit) {
pipelineForTargetedShards->unsplitFromSharded(std::move(pipelineForMerging));
}
// Generate the command object for the targeted shards.
targetedCommand = createCommandForTargetedShards(
opCtx, aggRequest, originalCmdObj, pipelineForTargetedShards, atClusterTime);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
// $changeStream to work reliably, so we do a "hard" reload.
if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
if (!shardRegistry->reload(opCtx)) {
shardRegistry->reload(opCtx);
}
}
// Explain does not produce a cursor, so instead we scatter-gather commands to the shards.
try {
if (expCtx->explain) {
if (mustRunOnAllShards(executionNss, executionNsRoutingInfo, liteParsedPipeline)) {
// Some stages (such as $currentOp) need to be broadcast to all shards, and
// should not participate in the shard version protocol.
shardResults =
scatterGatherUnversionedTargetAllShards(opCtx,
executionNss.db(),
targetedCommand,
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent);
} else {
// Aggregations on a real namespace should use the routing table to target
// shards, and should participate in the shard version protocol.
shardResults = scatterGatherVersionedTargetByRoutingTable(
opCtx,
executionNss.db(),
executionNss,
executionNsRoutingInfo,
targetedCommand,
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent,
shardQuery,
aggRequest.getCollation());
}
} else {
cursors = establishShardCursors(opCtx,
executionNss,
liteParsedPipeline,
&executionNsRoutingInfo,
targetedCommand,
ReadPreferenceSetting::get(opCtx),
shardQuery,
aggRequest.getCollation());
}
} catch (const ExceptionForCat& ex) {
LOG(1) << "got stale shardVersion error " << redact(ex) << " while dispatching "
<< redact(targetedCommand) << " after " << (numAttempts + 1)
<< " dispatch attempts";
continue; // Try again if allowed.
} catch (const ExceptionForCat& ex) {
LOG(1) << "got snapshot error " << redact(ex) << " while dispatching "
<< redact(targetedCommand) << " after " << (numAttempts + 1)
<< " dispatch attempts";
continue; // Try again if allowed.
}
// Record the number of shards involved in the aggregation. If we are required to merge on
// the primary shard, but the primary shard was not in the set of targeted shards, then we
// must increment the number of involved shards.
CurOp::get(opCtx)->debug().nShards = shardIds.size() +
(needsPrimaryShardMerge && !shardIds.count(executionNsRoutingInfo.db().primaryId()));
break; // Success!
}
return DispatchShardPipelineResults{needsPrimaryShardMerge,
std::move(cursors),
std::move(shardResults),
std::move(pipelineForTargetedShards),
std::move(pipelineForMerging),
targetedCommand};
}
Shard::CommandResponse establishMergingShardCursor(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj mergeCmdObj,
const ShardId& mergingShardId) {
const auto mergingShard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, mergingShardId));
return uassertStatusOK(
mergingShard->runCommandWithFixedRetryAttempts(opCtx,
ReadPreferenceSetting::get(opCtx),
nss.db().toString(),
mergeCmdObj,
Shard::RetryPolicy::kIdempotent));
}
BSONObj establishMergingMongosCursor(OperationContext* opCtx,
const AggregationRequest& request,
const NamespaceString& requestedNss,
BSONObj cmdToRunOnNewShards,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr pipelineForMerging,
std::vector cursors) {
ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
params.originatingCommandObj = CurOp::get(opCtx)->opDescription().getOwned();
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
params.mergePipeline = std::move(pipelineForMerging);
params.remotes = std::move(cursors);
// A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
// size we pass here is used for getMores, so do not specify a batch size if the initial request
// had a batch size of 0.
params.batchSize = request.getBatchSize() == 0
? boost::none
: boost::optional(request.getBatchSize());
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
if (liteParsedPipeline.hasChangeStream()) {
// For change streams, we need to set up a custom stage to establish cursors on new shards
// when they are added. Be careful to extract the targeted shard IDs before the remote
// cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger.
std::vector shardIds;
for (const auto& remote : params.remotes) {
shardIds.emplace_back(remote.getShardId().toString());
}
params.createCustomCursorSource = [cmdToRunOnNewShards,
shardIds](OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params) {
return stdx::make_unique(
opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards);
};
}
auto ccc = ClusterClientCursorImpl::make(
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
auto cursorState = ClusterCursorManager::CursorState::NotExhausted;
BSONObjBuilder cursorResponse;
CursorResponseBuilder responseBuilder(true, &cursorResponse);
for (long long objCount = 0; objCount < request.getBatchSize(); ++objCount) {
ClusterQueryResult next;
try {
next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
} catch (const ExceptionFor&) {
// This exception is thrown when a $changeStream stage encounters an event
// that invalidates the cursor. We should close the cursor and return without
// error.
cursorState = ClusterCursorManager::CursorState::Exhausted;
break;
}
// Check whether we have exhausted the pipeline's results.
if (next.isEOF()) {
// We reached end-of-stream. If the cursor is not tailable, then we mark it as
// exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when
// we reach end-of-stream. However, if all the remote cursors are exhausted, there is no
// hope of returning data and thus we need to close the mongos cursor as well.
if (!ccc->isTailable() || ccc->remotesExhausted()) {
cursorState = ClusterCursorManager::CursorState::Exhausted;
}
break;
}
// If this result will fit into the current batch, add it. Otherwise, stash it in the cursor
// to be returned on the next getMore.
auto nextObj = *next.getResult();
if (!FindCommon::haveSpaceForNext(nextObj, objCount, responseBuilder.bytesUsed())) {
ccc->queueResult(nextObj);
break;
}
responseBuilder.append(nextObj);
}
ccc->detachFromOperationContext();
int nShards = ccc->getNumRemotes();
CursorId clusterCursorId = 0;
if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
auto authUsers = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames();
clusterCursorId = uassertStatusOK(Grid::get(opCtx)->getCursorManager()->registerCursor(
opCtx,
ccc.releaseCursor(),
requestedNss,
ClusterCursorManager::CursorType::MultiTarget,
ClusterCursorManager::CursorLifetime::Mortal,
authUsers));
}
// Fill out the aggregation metrics in CurOp.
if (clusterCursorId > 0) {
CurOp::get(opCtx)->debug().cursorid = clusterCursorId;
}
CurOp::get(opCtx)->debug().nShards = std::max(CurOp::get(opCtx)->debug().nShards, nShards);
CurOp::get(opCtx)->debug().cursorExhausted = (clusterCursorId == 0);
CurOp::get(opCtx)->debug().nreturned = responseBuilder.numDocs();
responseBuilder.done(clusterCursorId, requestedNss.ns());
CommandHelpers::appendCommandStatus(cursorResponse, Status::OK());
return cursorResponse.obj();
}
BSONObj getDefaultCollationForUnshardedCollection(const Shard* primaryShard,
const NamespaceString& nss) {
ScopedDbConnection conn(primaryShard->getConnString());
BSONObj defaultCollation;
std::list all =
conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
if (all.empty()) {
return defaultCollation;
}
BSONObj collectionInfo = all.front();
if (collectionInfo["options"].type() == BSONType::Object) {
BSONObj collectionOptions = collectionInfo["options"].Obj();
BSONElement collationElement;
auto status = bsonExtractTypedField(
collectionOptions, "collation", BSONType::Object, &collationElement);
if (status.isOK()) {
defaultCollation = collationElement.Obj().getOwned();
uassert(ErrorCodes::BadValue,
"Default collation in collection metadata cannot be empty.",
!defaultCollation.isEmpty());
} else if (status != ErrorCodes::NoSuchKey) {
uassertStatusOK(status);
}
}
return defaultCollation;
}
ShardId pickMergingShard(OperationContext* opCtx,
const DispatchShardPipelineResults& dispatchResults,
ShardId primaryShard) {
auto& prng = opCtx->getClient()->getPrng();
// If we cannot merge on mongoS, establish the merge cursor on a shard. Perform the merging
// command on random shard, unless the pipeline dictates that it needs to be run on the primary
// shard for the database.
return dispatchResults.needsPrimaryShardMerge
? primaryShard
: dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())]
.getShardId()
.toString();
}
} // namespace
Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const Namespaces& namespaces,
const AggregationRequest& request,
BSONObj cmdObj,
BSONObjBuilder* result) {
const auto catalogCache = Grid::get(opCtx)->catalogCache();
auto executionNsRoutingInfoStatus =
getExecutionNsRoutingInfo(opCtx, namespaces.executionNss, catalogCache);
LiteParsedPipeline liteParsedPipeline(request);
if (!executionNsRoutingInfoStatus.isOK()) {
// Standard aggregations swallow 'NamespaceNotFound' and return an empty cursor with id 0 in
// the event that the database does not exist. For $changeStream aggregations, however, we
// throw the exception in all error cases, including that of a non-existent database.
if (liteParsedPipeline.hasChangeStream()) {
uassertStatusOKWithContext(executionNsRoutingInfoStatus.getStatus(),
"failed to open $changeStream");
}
appendEmptyResultSet(
opCtx, *result, executionNsRoutingInfoStatus.getStatus(), namespaces.requestedNss.ns());
return Status::OK();
}
auto executionNsRoutingInfo = executionNsRoutingInfoStatus.getValue();
// Determine the appropriate collation and 'resolve' involved namespaces to make the
// ExpressionContext.
// We won't try to execute anything on a mongos, but we still have to populate this map so that
// any $lookups, etc. will be able to have a resolved view definition. It's okay that this is
// incorrect, we will repopulate the real resolved namespace map on the mongod. Note that we
// need to check if any involved collections are sharded before forwarding an aggregation
// command on an unsharded collection.
StringMap resolvedNamespaces;
for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =
uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
uassert(
28769, str::stream() << nss.ns() << " cannot be sharded", !resolvedNsRoutingInfo.cm());
resolvedNamespaces.try_emplace(nss.coll(), nss, std::vector{});
}
// If this pipeline is on an unsharded collection, is allowed to be forwarded to shards, does
// not need to run on all shards, and doesn't need transformation via
// DocumentSource::serialize(), then go ahead and pass it through to the owning shard
// unmodified.
if (!executionNsRoutingInfo.cm() &&
!mustRunOnAllShards(namespaces.executionNss, executionNsRoutingInfo, liteParsedPipeline) &&
liteParsedPipeline.allowedToForwardFromMongos() &&
liteParsedPipeline.allowedToPassthroughFromMongos()) {
return aggPassthrough(opCtx,
namespaces,
executionNsRoutingInfo.db().primary()->getId(),
cmdObj,
request,
liteParsedPipeline,
result);
}
std::unique_ptr collation;
if (!request.getCollation().isEmpty()) {
collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(request.getCollation()));
} else if (const auto chunkMgr = executionNsRoutingInfo.cm()) {
if (chunkMgr->getDefaultCollator()) {
collation = chunkMgr->getDefaultCollator()->clone();
}
} else {
// Unsharded collection. Get collection metadata from primary chunk.
auto collationObj = getDefaultCollationForUnshardedCollection(
executionNsRoutingInfo.db().primary().get(), namespaces.executionNss);
if (!collationObj.isEmpty()) {
collation = uassertStatusOK(CollatorFactoryInterface::get(opCtx->getServiceContext())
->makeFromBSON(collationObj));
}
}
boost::intrusive_ptr mergeCtx =
new ExpressionContext(opCtx,
request,
std::move(collation),
std::make_shared(),
std::move(resolvedNamespaces));
mergeCtx->inMongos = true;
// explicitly *not* setting mergeCtx->tempDir
auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), mergeCtx));
pipeline->optimizePipeline();
// Check whether the entire pipeline must be run on mongoS.
if (pipeline->requiredToRunOnMongos()) {
uassert(ErrorCodes::IllegalOperation,
str::stream() << "Aggregation pipeline must be run on mongoS, but "
<< pipeline->getSources().front()->getSourceName()
<< " is not capable of producing input",
!pipeline->getSources().front()->constraints().requiresInputDocSource);
if (mergeCtx->explain) {
*result << "splitPipeline" << BSONNULL << "mongos"
<< Document{{"host", getHostNameCachedAndPort()},
{"stages", pipeline->writeExplainOps(*mergeCtx->explain)}};
return Status::OK();
}
auto cursorResponse = establishMergingMongosCursor(opCtx,
request,
namespaces.requestedNss,
cmdObj,
liteParsedPipeline,
std::move(pipeline),
{});
CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
return getStatusFromCommandResult(result->asTempObj());
}
auto dispatchResults = dispatchShardPipeline(mergeCtx,
namespaces.executionNss,
cmdObj,
request,
liteParsedPipeline,
std::move(pipeline));
if (mergeCtx->explain) {
// If we reach here, we've either succeeded in running the explain or exhausted all
// attempts. In either case, attempt to append the explain results to the output builder.
uassertAllShardsSupportExplain(dispatchResults.remoteExplainOutput);
return appendExplainResults(std::move(dispatchResults.remoteExplainOutput),
mergeCtx,
dispatchResults.pipelineForTargetedShards,
dispatchResults.pipelineForMerging,
result);
}
invariant(dispatchResults.remoteCursors.size() > 0);
// If we dispatched to a single shard, store the remote cursor and return immediately.
if (!dispatchResults.pipelineForTargetedShards->isSplitForShards()) {
invariant(dispatchResults.remoteCursors.size() == 1);
const auto& remoteCursor = dispatchResults.remoteCursors[0];
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
remoteCursor.getShardId().toString(),
remoteCursor.getHostAndPort(),
remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse),
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
mergeCtx->tailableMode));
return appendCursorResponseToCommandResult(
remoteCursor.getShardId().toString(), reply, result);
}
// If we reach here, we have a merge pipeline to dispatch.
auto mergingPipeline = std::move(dispatchResults.pipelineForMerging);
invariant(mergingPipeline);
// First, check whether we can merge on the mongoS. If the merge pipeline MUST run on mongoS,
// then ignore the internalQueryProhibitMergingOnMongoS parameter.
if (mergingPipeline->requiredToRunOnMongos() ||
(!internalQueryProhibitMergingOnMongoS.load() && mergingPipeline->canRunOnMongos())) {
// Register the new mongoS cursor, and retrieve the initial batch of results.
auto cursorResponse =
establishMergingMongosCursor(opCtx,
request,
namespaces.requestedNss,
dispatchResults.commandForTargetedShards,
liteParsedPipeline,
std::move(mergingPipeline),
std::move(dispatchResults.remoteCursors));
// We don't need to storePossibleCursor or propagate writeConcern errors; an $out pipeline
// can never run on mongoS. Filter the command response and return immediately.
CommandHelpers::filterCommandReplyForPassthrough(cursorResponse, result);
return getStatusFromCommandResult(result->asTempObj());
}
// TODO SERVER-33683 allowing an aggregation within a transaction can lead to a deadlock in the
// SessionCatalog when a pipeline with a $mergeCursors sends a getMore to itself.
uassert(50732,
"Cannot specify a transaction number in combination with an aggregation on mongos when "
"merigng on a shard",
!opCtx->getTxnNumber());
ShardId mergingShardId =
pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId());
cluster_aggregation_planner::addMergeCursorsSource(
mergingPipeline.get(),
std::move(dispatchResults.remoteCursors),
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
auto mergeResponse =
establishMergingShardCursor(opCtx, namespaces.executionNss, mergeCmdObj, mergingShardId);
// The merging shard is remote, so if a response was received, a HostAndPort must have been set.
invariant(mergeResponse.hostAndPort);
auto mergeCursorResponse = uassertStatusOK(
storePossibleCursor(opCtx,
mergingShardId,
*mergeResponse.hostAndPort,
mergeResponse.response,
namespaces.requestedNss,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager()));
return appendCursorResponseToCommandResult(mergingShardId, mergeCursorResponse, result);
}
void ClusterAggregate::uassertAllShardsSupportExplain(
const std::vector& shardResults) {
for (const auto& result : shardResults) {
auto status = result.swResponse.getStatus();
if (status.isOK()) {
status = getStatusFromCommandResult(result.swResponse.getValue().data);
}
uassert(17403,
str::stream() << "Shard " << result.shardId.toString() << " failed: "
<< causedBy(status),
status.isOK());
uassert(17404,
str::stream() << "Shard " << result.shardId.toString()
<< " does not support $explain",
result.swResponse.getValue().data.hasField("stages"));
}
}
Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
const ShardId& shardId,
BSONObj cmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
BSONObjBuilder* out) {
// Temporary hack. See comment on declaration for details.
auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
if (!swShard.isOK()) {
return swShard.getStatus();
}
auto shard = std::move(swShard.getValue());
// aggPassthrough is for unsharded collections since changing primary shardId will cause SSV
// error and hence shardId history does not need to be verified.
auto atClusterTime = computeAtClusterTimeForOneShard(opCtx, shardId);
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
createCommandForTargetedShards(opCtx, aggRequest, cmdObj, nullptr, atClusterTime));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,
ReadPreferenceSetting::get(opCtx),
namespaces.executionNss.db().toString(),
!shard->isConfig() ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
: std::move(cmdObj),
Shard::RetryPolicy::kIdempotent));
if (ErrorCodes::isStaleShardVersionError(cmdResponse.commandStatus.code())) {
uassertStatusOK(
cmdResponse.commandStatus.withContext("command failed because of stale config"));
} else if (ErrorCodes::isSnapshotError(cmdResponse.commandStatus.code())) {
uassertStatusOK(cmdResponse.commandStatus.withContext(
"command failed because can not establish a snapshot"));
}
BSONObj result;
if (aggRequest.getExplain()) {
// If this was an explain, then we get back an explain result object rather than a cursor.
result = cmdResponse.response;
} else {
// The merging shard is remote, so if a response was received, a HostAndPort must have been
// set.
invariant(cmdResponse.hostAndPort);
result = uassertStatusOK(storePossibleCursor(
opCtx,
shard->getId(),
*cmdResponse.hostAndPort,
cmdResponse.response,
namespaces.requestedNss,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData
: TailableModeEnum::kNormal));
}
// First append the properly constructed writeConcernError. It will then be skipped
// in appendElementsUnique.
if (auto wcErrorElem = result["writeConcernError"]) {
appendWriteConcernErrorToCmdResponse(shard->getId(), wcErrorElem, *out);
}
out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
auto status = getStatusFromCommandResult(out->asTempObj());
if (auto resolvedView = status.extraInfo()) {
auto resolvedAggRequest = resolvedView->asExpandedViewAggregation(aggRequest);
auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
out->resetToEmpty();
// We pass both the underlying collection namespace and the view namespace here. The
// underlying collection namespace is used to execute the aggregation on mongoD. Any cursor
// returned will be registered under the view namespace so that subsequent getMore and
// killCursors calls against the view have access.
Namespaces nsStruct;
nsStruct.requestedNss = namespaces.requestedNss;
nsStruct.executionNss = resolvedView->getNamespace();
return ClusterAggregate::runAggregate(
opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out);
}
return status;
}
} // namespace mongo