diff options
author | Mickey. J Winters <mickey.winters@mongodb.com> | 2022-03-17 13:26:57 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-17 17:59:23 +0000 |
commit | 1fe77b5bd9fb13f9eb74275359dcc4ba69f2d5e9 (patch) | |
tree | 05e986c89f98df1afdae8a7d52fdc181dd91a6a8 /src/mongo/s/query | |
parent | c5837542821a86feeea41e6a00c96ef746f03b15 (diff) | |
download | mongo-1fe77b5bd9fb13f9eb74275359dcc4ba69f2d5e9.tar.gz |
SERVER-58673 Delete per shard cursor feature flag
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 90 |
1 files changed, 9 insertions, 81 deletions
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 211715b0e1c..4e567e542ff 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -573,9 +573,6 @@ AggregationTargeter AggregationTargeter::make( bool allowedToPassthrough, bool perShardCursor) { if (perShardCursor) { - uassert(6273800, - "featureFlagPerShardCursor must be enabled to use $_passthroughToShard", - feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()); return {TargetingPolicy::kSpecificShardOnly, nullptr, cm}; } @@ -626,84 +623,15 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>& Document serializedCommand, const PrivilegeVector& privileges, BSONObjBuilder* out) { - if (feature_flags::gFeatureFlagPerShardCursor.isEnabledAndIgnoreFCV()) { - return runPipelineOnSpecificShardOnly(expCtx, - namespaces, - boost::optional<DatabaseVersion>(cm.dbVersion()), - explain, - serializedCommand, - privileges, - cm.dbPrimary(), - false, - out); - } - // TODO SERVER-58673 remove the if statement here, remove code below and just call - // runPipelineOnSpecificDirectly. make sure to clean up divergence between the two functions. - auto opCtx = expCtx->opCtx; - - // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an - // explain if necessary, and rewrites the result into a format safe to forward to shards. - BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx, - serializedCommand, - explain, - nullptr, /* pipeline */ - BSONObj(), - boost::none, - boost::none); - - const auto shardId = cm.dbPrimary(); - const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId) - ? appendShardVersion(std::move(cmdObj), ChunkVersion::UNSHARDED()) - : std::move(cmdObj); - - MultiStatementTransactionRequestsSender ars( - opCtx, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - namespaces.executionNss.db().toString(), - {{shardId, appendDbVersionIfPresent(cmdObjWithShardVersion, cm.dbVersion())}}, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent); - auto response = ars.next(); - tassert(6273805, - "requested and received data from just one shard, but results are still pending", - ars.done()); - - uassertStatusOK(response.swResponse); - auto commandStatus = getStatusFromCommandResult(response.swResponse.getValue().data); - - if (ErrorCodes::isStaleShardVersionError(commandStatus.code())) { - uassertStatusOK(commandStatus.withContext("command failed because of stale config")); - } else if (ErrorCodes::isSnapshotError(commandStatus.code())) { - uassertStatusOK( - commandStatus.withContext("command failed because establishing a snapshot failed")); - } - - BSONObj result; - if (explain) { - // If this was an explain, then we get back an explain result object rather than a cursor. - result = response.swResponse.getValue().data; - } else { - result = uassertStatusOK( - storePossibleCursor(opCtx, - shardId, - *response.shardHostAndPort, - response.swResponse.getValue().data, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager(), - privileges, - TailableModeEnum::kNormal)); - } - - // First append the properly constructed writeConcernError. It will then be skipped - // in appendElementsUnique. - if (auto wcErrorElem = result["writeConcernError"]) { - appendWriteConcernErrorToCmdResponse(shardId, wcErrorElem, *out); - } - - out->appendElementsUnique(CommandHelpers::filterCommandReplyForPassthrough(result)); - - return getStatusFromCommandResult(out->asTempObj()); + return runPipelineOnSpecificShardOnly(expCtx, + namespaces, + boost::optional<DatabaseVersion>(cm.dbVersion()), + explain, + serializedCommand, + privileges, + cm.dbPrimary(), + false, + out); } Status runPipelineOnMongoS(const ClusterAggregate::Namespaces& namespaces, |