summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp66
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp6
-rw-r--r--src/mongo/s/commands/strategy.cpp2
3 files changed, 41 insertions, 33 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 88c888c586a..d6dbd316763 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/curop.h"
#include "mongo/db/logical_clock.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/cluster_aggregation_planner.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_merge_cursors.h"
#include "mongo/db/pipeline/document_source_out.h"
@@ -274,15 +275,14 @@ bool verifyTargetedShardsAtClusterTime(OperationContext* opCtx,
return true;
}
-std::vector<ClusterClientCursorParams::RemoteCursor> establishShardCursors(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const LiteParsedPipeline& litePipe,
- CachedCollectionRoutingInfo* routingInfo,
- const BSONObj& cmdObj,
- const ReadPreferenceSetting& readPref,
- const BSONObj& shardQuery,
- const BSONObj& collation) {
+std::vector<RemoteCursor> establishShardCursors(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LiteParsedPipeline& litePipe,
+ CachedCollectionRoutingInfo* routingInfo,
+ const BSONObj& cmdObj,
+ const ReadPreferenceSetting& readPref,
+ const BSONObj& shardQuery,
+ const BSONObj& collation) {
LOG(1) << "Dispatching command " << redact(cmdObj) << " to establish cursors on shards";
std::set<ShardId> shardIds =
@@ -351,7 +351,7 @@ struct DispatchShardPipelineResults {
// Populated if this *is not* an explain, this vector represents the cursors on the remote
// shards.
- std::vector<ClusterClientCursorParams::RemoteCursor> remoteCursors;
+ std::vector<RemoteCursor> remoteCursors;
// Populated if this *is* an explain, this vector represents the results from each shard.
std::vector<AsyncRequestsSender::Response> remoteExplainOutput;
@@ -389,7 +389,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
// pipeline is already split and we now only need to target a single shard, reassemble the
// original pipeline.
// - After exhausting 10 attempts to establish the cursors, we give up and throw.
- auto cursors = std::vector<ClusterClientCursorParams::RemoteCursor>();
+ auto cursors = std::vector<RemoteCursor>();
auto shardResults = std::vector<AsyncRequestsSender::Response>();
auto opCtx = expCtx->opCtx;
@@ -558,7 +558,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
BSONObj cmdToRunOnNewShards,
const LiteParsedPipeline& liteParsedPipeline,
std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
+ std::vector<RemoteCursor> cursors) {
ClusterClientCursorParams params(requestedNss, ReadPreferenceSetting::get(opCtx));
@@ -566,7 +566,6 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
params.tailableMode = pipelineForMerging->getContext()->tailableMode;
params.mergePipeline = std::move(pipelineForMerging);
params.remotes = std::move(cursors);
-
// A batch size of 0 is legal for the initial aggregate, but not valid for getMores, the batch
// size we pass here is used for getMores, so do not specify a batch size if the initial request
// had a batch size of 0.
@@ -576,12 +575,19 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
if (liteParsedPipeline.hasChangeStream()) {
// For change streams, we need to set up a custom stage to establish cursors on new shards
- // when they are added.
- params.createCustomCursorSource = [cmdToRunOnNewShards](OperationContext* opCtx,
- executor::TaskExecutor* executor,
- ClusterClientCursorParams* params) {
+ // when they are added. Be careful to extract the targeted shard IDs before the remote
+ // cursors are transferred from the ClusterClientCursorParams to the AsyncResultsMerger.
+ std::vector<ShardId> shardIds;
+ for (const auto& remote : params.remotes) {
+ shardIds.emplace_back(remote.getShardId().toString());
+ }
+
+ params.createCustomCursorSource = [cmdToRunOnNewShards,
+ shardIds](OperationContext* opCtx,
+ executor::TaskExecutor* executor,
+ ClusterClientCursorParams* params) {
return stdx::make_unique<RouterStageUpdateOnAddShard>(
- opCtx, executor, params, cmdToRunOnNewShards);
+ opCtx, executor, params, std::move(shardIds), cmdToRunOnNewShards);
};
}
auto ccc = ClusterClientCursorImpl::make(
@@ -630,7 +636,7 @@ BSONObj establishMergingMongosCursor(OperationContext* opCtx,
ccc->detachFromOperationContext();
- int nShards = ccc->getRemotes().size();
+ int nShards = ccc->getNumRemotes();
CursorId clusterCursorId = 0;
if (cursorState == ClusterCursorManager::CursorState::NotExhausted) {
@@ -696,7 +702,8 @@ ShardId pickMergingShard(OperationContext* opCtx,
return dispatchResults.needsPrimaryShardMerge
? primaryShard
: dispatchResults.remoteCursors[prng.nextInt32(dispatchResults.remoteCursors.size())]
- .shardId;
+ .getShardId()
+ .toString();
}
} // namespace
@@ -849,15 +856,16 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
auto executorPool = Grid::get(opCtx)->getExecutorPool();
const BSONObj reply = uassertStatusOK(storePossibleCursor(
opCtx,
- remoteCursor.shardId,
- remoteCursor.hostAndPort,
- remoteCursor.cursorResponse.toBSON(CursorResponse::ResponseType::InitialResponse),
+ remoteCursor.getShardId().toString(),
+ remoteCursor.getHostAndPort(),
+ remoteCursor.getCursorResponse().toBSON(CursorResponse::ResponseType::InitialResponse),
namespaces.requestedNss,
executorPool->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
mergeCtx->tailableMode));
- return appendCursorResponseToCommandResult(remoteCursor.shardId, reply, result);
+ return appendCursorResponseToCommandResult(
+ remoteCursor.getShardId().toString(), reply, result);
}
// If we reach here, we have a merge pipeline to dispatch.
@@ -893,10 +901,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
ShardId mergingShardId =
pickMergingShard(opCtx, dispatchResults, executionNsRoutingInfo.db().primaryId());
- mergingPipeline->addInitialSource(DocumentSourceMergeCursors::create(
+ cluster_aggregation_planner::addMergeCursorsSource(
+ mergingPipeline.get(),
std::move(dispatchResults.remoteCursors),
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- mergeCtx));
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor());
auto mergeCmdObj = createCommandForMergingShard(request, mergeCtx, cmdObj, mergingPipeline);
auto mergeResponse =
@@ -999,8 +1007,8 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
namespaces.requestedNss,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
Grid::get(opCtx)->getCursorManager(),
- liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData
- : TailableMode::kNormal));
+ liteParsedPipeline.hasChangeStream() ? TailableModeEnum::kTailableAndAwaitData
+ : TailableModeEnum::kNormal));
}
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index 060c5533877..4d91ddfff08 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -114,7 +114,7 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
}
- auto shardResult = std::vector<ClusterClientCursorParams::RemoteCursor>();
+ auto shardResult = std::vector<RemoteCursor>();
auto findCmd = cmdBuilder.obj();
size_t numAttempts = 0;
while (++numAttempts <= kMaxNumStaleVersionRetries) {
@@ -164,13 +164,13 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
invariant(shardResult.size() == 1u);
- auto& cursor = shardResult.front().cursorResponse;
+ auto& cursor = shardResult.front().getCursorResponse();
auto& batch = cursor.getBatch();
// We should have at most 1 result, and the cursor should be exhausted.
uassert(ErrorCodes::InternalError,
str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.front().hostAndPort
+ << shardResult.front().getHostAndPort()
<< ", id: "
<< cursor.getCursorId(),
cursor.getCursorId() == 0);
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 8ae277237ae..baafa3bbce6 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -579,7 +579,7 @@ DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss
}
uassertStatusOK(statusGetDb);
- boost::optional<long long> batchSize;
+ boost::optional<std::int64_t> batchSize;
if (ntoreturn) {
batchSize = ntoreturn;
}