diff options
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 66 | ||||
-rw-r--r-- | src/mongo/s/commands/pipeline_s.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 2 |
3 files changed, 41 insertions, 33 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 88c888c586a..d6dbd316763 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -41,6 +41,7 @@ #include "mongo/db/curop.h" #include "mongo/db/logical_clock.h" #include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/cluster_aggregation_planner.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_merge_cursors.h" #include "mongo/db/pipeline/document_source_out.h" @@ -274,15 +275,14 @@ bool verifyTargetedShardsAtClusterTime(OperationContext* opCtx, return true; } -std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursors( - OperationContext* opCtx, - const NamespaceString& nss, - const LiteParsedPipeline& litePipe, - CachedCollectionRoutingInfo* routingInfo, - const BSONObj& cmdObj, - const ReadPreferenceSetting& readPref, - const BSONObj& shardQuery, - const BSONObj& collation) { +std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx, + const NamespaceString& nss, + const LiteParsedPipeline& litePipe, + CachedCollectionRoutingInfo* routingInfo, + const BSONObj& cmdObj, + const ReadPreferenceSetting& readPref, + const BSONObj& shardQuery, + const BSONObj& collation) { LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards"; std::set<ShardId> shardIds = @@ -351,7 +351,7 @@ struct DispatchShardPipelineResults { // Populated if this *is not* an explain, this vector represents the cursors on the remote // shards. - std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors; + std::vector<RemoteCursor> remoteCursors; // Populated if this *is* an explain, this vector represents the results from each shard. std::vector<AsyncRequestsSender::Response> remoteExplainOutput; @@ -389,7 +389,7 @@ DispatchShardPipelineResults dispatchShardPipeline( // pipeline is already split and we now only need to target a single shard, reassemble the // original pipeline. // - After exhausting 10 attempts to establish the cursors, we give up and throw. - auto cursors = std::vector<ClusterClientCursorParams::RemoteCursor>(); + auto cursors = std::vector<RemoteCursor>(); auto shardResults = std::vector<AsyncRequestsSender::Response>(); auto opCtx = expCtx->opCtx; @@ -558,7 +558,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, BSONObj cmdToRunOnNewShards, const LiteParsedPipeline& liteParsedPipeline, std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, - std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { + std::vector<RemoteCursor> cursors) { ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx)); @@ -566,7 +566,6 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, params.tailableMode = pipelineForMerging->getContext()->tailableMode; params.mergePipeline = std::move(pipelineForMerging); params.remotes = std::move(cursors); - // A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch // size we pass here is used for getMores, so do not specify a batch size if the initial request // had a batch size of 0. @@ -576,12 +575,19 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, if (liteParsedPipeline.hasChangeStream()) { // For change streams, we need to set up a custom stage to establish cursors on new shards - // when they are added. - params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx, - executor::TaskExecutor* executor, - ClusterClientCursorParams* params) { + // when they are added. Be careful to extract the targeted shard IDs before the remote + // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger. + std::vector<ShardId> shardIds; + for (const auto& remote : params.remotes) { + shardIds.emplace_back(remote.getShardId().toString()); + } + + params.createCustomCursorSource = [cmdToRunOnNewShards, + shardIds](OperationContext* opCtx, + executor::TaskExecutor* executor, + ClusterClientCursorParams* params) { return stdx::make_unique<RouterStageUpdateOnAddShard>( - opCtx, executor, params, cmdToRunOnNewShards); + opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards); }; } auto ccc = ClusterClientCursorImpl::make( @@ -630,7 +636,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx, ccc->detachFromOperationContext(); - int nShards = ccc->getRemotes().size(); + int nShards = ccc->getNumRemotes(); CursorId clusterCursorId = 0; if (cursorState == ClusterCursorManager::CursorState::NotExhausted) { @@ -696,7 +702,8 @@ ShardId pickMergingShard(OperationContext* opCtx, return dispatchResults.needsPrimaryShardMerge ? primaryShard : dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())] - .shardId; + .getShardId() + .toString(); } } // namespace @@ -849,15 +856,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, auto executorPool = Grid::get(opCtx)->getExecutorPool(); const BSONObj reply = uassertStatusOK(storePossibleCursor( opCtx, - remoteCursor.shardId, - remoteCursor.hostAndPort, - remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse), + remoteCursor.getShardId().toString(), + remoteCursor.getHostAndPort(), + remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse), namespaces.requestedNss, executorPool->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), mergeCtx->tailableMode)); - return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result); + return appendCursorResponseToCommandResult( + remoteCursor.getShardId().toString(), reply, result); } // If we reach here, we have a merge pipeline to dispatch. @@ -893,10 +901,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, ShardId mergingShardId = pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId()); - mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create( + cluster_aggregation_planner::addMergeCursorsSource( + mergingPipeline.get(), std::move(dispatchResults.remoteCursors), - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - mergeCtx)); + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()); auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline); auto mergeResponse = @@ -999,8 +1007,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, namespaces.requestedNss, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), Grid::get(opCtx)->getCursorManager(), - liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData - : TailableMode::kNormal)); + liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData + : TailableModeEnum::kNormal)); } // First append the properly constructed writeConcernError. It will then be skipped diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index 060c5533877..4d91ddfff08 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -114,7 +114,7 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); } - auto shardResult = std::vector<ClusterClientCursorParams::RemoteCursor>(); + auto shardResult = std::vector<RemoteCursor>(); auto findCmd = cmdBuilder.obj(); size_t numAttempts = 0; while (++numAttempts <= kMaxNumStaleVersionRetries) { @@ -164,13 +164,13 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( invariant(shardResult.size() == 1u); - auto& cursor = shardResult.front().cursorResponse; + auto& cursor = shardResult.front().getCursorResponse(); auto& batch = cursor.getBatch(); // We should have at most 1 result, and the cursor should be exhausted. uassert(ErrorCodes::InternalError, str::stream() << "Shard cursor was unexpectedly open after lookup: " - << shardResult.front().hostAndPort + << shardResult.front().getHostAndPort() << ", id: " << cursor.getCursorId(), cursor.getCursorId() == 0); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 8ae277237ae..baafa3bbce6 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -579,7 +579,7 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss } uassertStatusOK(statusGetDb); - boost::optional<long long> batchSize; + boost::optional<std::int64_t> batchSize; if (ntoreturn) { batchSize = ntoreturn; } |