summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_aggregation_planner.cpp
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-02-25 14:52:13 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-25 15:56:46 +0000
commit586663fec7c3a7d4a8b0185ff24825bd15e80dff (patch)
tree57539dcde8d2a38184536582367a6c4f6c96a592 /src/mongo/s/query/cluster_aggregation_planner.cpp
parentf01a90660cb0a0a22d6b2166cd8b70d7990a6b12 (diff)
downloadmongo-586663fec7c3a7d4a8b0185ff24825bd15e80dff.tar.gz
SERVER-62738 implement aggregate $_passthroughToShard option
Diffstat (limited to 'src/mongo/s/query/cluster_aggregation_planner.cpp')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp119
1 files changed, 115 insertions, 4 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 6062c02e6ba..9430ddb8270 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -35,6 +35,7 @@
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/change_stream_invalidation_info.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_skip.h"
@@ -101,7 +102,9 @@ AsyncRequestsSender::Response establishMergingShardCursor(OperationContext* opCt
ReadPreferenceSetting::get(opCtx),
sharded_agg_helpers::getDesiredRetryPolicy(opCtx));
const auto response = ars.next();
- invariant(ars.done());
+ tassert(6273807,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
return response;
}
@@ -567,7 +570,14 @@ AggregationTargeter AggregationTargeter::make(
boost::optional<ChunkManager> cm,
stdx::unordered_set<NamespaceString> involvedNamespaces,
bool hasChangeStream,
- bool allowedToPassthrough) {
+ bool allowedToPassthrough,
+ bool perShardCursor) {
+ if (perShardCursor) {
+ uassert(6273800,
+ "featureFlagPerShardCursor must be enabled to use $_passthroughToShard",
+ feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV());
+ return {TargetingPolicy::kSpecificShardOnly, nullptr, cm};
+ }
// Check if any of the involved collections are sharded.
bool involvesShardedCollections = [&]() {
@@ -616,6 +626,19 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
Document serializedCommand,
const PrivilegeVector& privileges,
BSONObjBuilder* out) {
+ if (feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()) {
+ return runPipelineOnSpecificShardOnly(expCtx,
+ namespaces,
+ cm,
+ explain,
+ serializedCommand,
+ privileges,
+ cm.dbPrimary(),
+ false,
+ out);
+ }
+ // TODO SERVER-58673 remove the if statement here, remove code below and just call
+ // runPipelineOnSpecificDirectly. make sure to clean up divergence between the two functions.
auto opCtx = expCtx->opCtx;
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
@@ -640,7 +663,9 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent);
auto response = ars.next();
- invariant(ars.done());
+ tassert(6273805,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
uassertStatusOK(response.swResponse);
auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
@@ -649,7 +674,7 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
} else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
uassertStatusOK(
- commandStatus.withContext("command failed because can not establish a snapshot"));
+ commandStatus.withContext("command failed because establishing a snapshot failed"));
}
BSONObj result;
@@ -820,5 +845,91 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
return {collation.isEmpty() ? getCollation() : collation, getUUID()};
}
+Status runPipelineOnSpecificShardOnly(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const ClusterAggregate::Namespaces& namespaces,
+ const ChunkManager& cm,
+ boost::optional<ExplainOptions::Verbosity> explain,
+ Document serializedCommand,
+ const PrivilegeVector& privileges,
+ ShardId shardId,
+ bool forPerShardCursor,
+ BSONObjBuilder* out) {
+ auto opCtx = expCtx->opCtx;
+
+ if (forPerShardCursor) {
+ tassert(6273804,
+ "Per shard cursors are supposed to pass fromMongos: false to shards",
+ !expCtx->inMongos);
+ }
+
+ // Format the command for the shard. This wraps the command as an explain if necessary, and
+ // rewrites the result into a format safe to forward to shards.
+ BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx,
+ serializedCommand,
+ explain,
+ nullptr, /* pipeline */
+ BSONObj(),
+ boost::none);
+
+ if (!forPerShardCursor && shardId != ShardId::kConfigServerId) {
+ cmdObj = appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED());
+ }
+ if (!forPerShardCursor) {
+ // Per shard cursors should not send any shard version info.
+ cmdObj = appendDbVersionIfPresent(std::move(cmdObj), cm.dbVersion());
+ }
+
+ MultiStatementTransactionRequestsSender ars(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ namespaces.executionNss.db().toString(),
+ {{shardId, cmdObj}},
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent);
+ auto response = ars.next();
+ tassert(6273806,
+ "requested and received data from just one shard, but results are still pending",
+ ars.done());
+
+ uassertStatusOK(response.swResponse);
+ auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
+
+ if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) {
+ uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
+ } else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
+ uassertStatusOK(
+ commandStatus.withContext("command failed because can not establish a snapshot"));
+ }
+
+ BSONObj result;
+ if (explain) {
+ // If this was an explain, then we get back an explain result object rather than a cursor.
+ result = response.swResponse.getValue().data;
+ } else {
+ result = uassertStatusOK(storePossibleCursor(
+ opCtx,
+ shardId,
+ *response.shardHostAndPort,
+ response.swResponse.getValue().data,
+ namespaces.requestedNss,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ privileges,
+ expCtx->tailableMode,
+ forPerShardCursor ? boost::optional<BSONObj>(change_stream_constants::kSortSpec)
+ : boost::none));
+ }
+
+ // First append the properly constructed writeConcernError. It will then be skipped
+ // in appendElementsUnique.
+ if (auto wcErrorElem = result["writeConcernError"]) {
+ appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out);
+ }
+
+ out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
+
+ return getStatusFromCommandResult(out->asTempObj());
+}
+
} // namespace cluster_aggregation_planner
} // namespace mongo