summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorMickey. J Winters <mickey.winters@mongodb.com>2022-03-17 13:26:57 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-17 17:59:23 +0000
commit1fe77b5bd9fb13f9eb74275359dcc4ba69f2d5e9 (patch)
tree05e986c89f98df1afdae8a7d52fdc181dd91a6a8 /src/mongo/s/query
parentc5837542821a86feeea41e6a00c96ef746f03b15 (diff)
downloadmongo-1fe77b5bd9fb13f9eb74275359dcc4ba69f2d5e9.tar.gz
SERVER-58673 Delete per shard cursor feature flag
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp90
1 files changed, 9 insertions, 81 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 211715b0e1c..4e567e542ff 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -573,9 +573,6 @@ AggregationTargeter AggregationTargeter::make(
bool allowedToPassthrough,
bool perShardCursor) {
if (perShardCursor) {
- uassert(6273800,
- "featureFlagPerShardCursor must be enabled to use $_passthroughToShard",
- feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV());
return {TargetingPolicy::kSpecificShardOnly, nullptr, cm};
}
@@ -626,84 +623,15 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
Document serializedCommand,
const PrivilegeVector& privileges,
BSONObjBuilder* out) {
- if (feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()) {
- return runPipelineOnSpecificShardOnly(expCtx,
- namespaces,
- boost::optional<DatabaseVersion>(cm.dbVersion()),
- explain,
- serializedCommand,
- privileges,
- cm.dbPrimary(),
- false,
- out);
- }
- // TODO SERVER-58673 remove the if statement here, remove code below and just call
- // runPipelineOnSpecificDirectly. make sure to clean up divergence between the two functions.
- auto opCtx = expCtx->opCtx;
-
- // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
- // explain if necessary, and rewrites the result into a format safe to forward to shards.
- BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx,
- serializedCommand,
- explain,
- nullptr, /* pipeline */
- BSONObj(),
- boost::none,
- boost::none);
-
- const auto shardId = cm.dbPrimary();
- const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId)
- ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED())
- : std::move(cmdObj);
-
- MultiStatementTransactionRequestsSender ars(
- opCtx,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- namespaces.executionNss.db().toString(),
- {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}},
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent);
- auto response = ars.next();
- tassert(6273805,
- "requested and received data from just one shard, but results are still pending",
- ars.done());
-
- uassertStatusOK(response.swResponse);
- auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data);
-
- if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) {
- uassertStatusOK(commandStatus.withContext("command failed because of stale config"));
- } else if (ErrorCodes::isSnapshotError(commandStatus.code())) {
- uassertStatusOK(
- commandStatus.withContext("command failed because establishing a snapshot failed"));
- }
-
- BSONObj result;
- if (explain) {
- // If this was an explain, then we get back an explain result object rather than a cursor.
- result = response.swResponse.getValue().data;
- } else {
- result = uassertStatusOK(
- storePossibleCursor(opCtx,
- shardId,
- *response.shardHostAndPort,
- response.swResponse.getValue().data,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager(),
- privileges,
- TailableModeEnum::kNormal));
- }
-
- // First append the properly constructed writeConcernError. It will then be skipped
- // in appendElementsUnique.
- if (auto wcErrorElem = result["writeConcernError"]) {
- appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out);
- }
-
- out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result));
-
- return getStatusFromCommandResult(out->asTempObj());
+ return runPipelineOnSpecificShardOnly(expCtx,
+ namespaces,
+ boost::optional<DatabaseVersion>(cm.dbVersion()),
+ explain,
+ serializedCommand,
+ privileges,
+ cm.dbPrimary(),
+ false,
+ out);
}
Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces,