summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/router_stage_update_on_add_shard.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/query/router_stage_update_on_add_shard.cpp')
-rw-r--r--src/mongo/s/query/router_stage_update_on_add_shard.cpp21
1 files changed, 12 insertions, 9 deletions
diff --git a/src/mongo/s/query/router_stage_update_on_add_shard.cpp b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
index 61fa2a9176d..451a1ee9699 100644
--- a/src/mongo/s/query/router_stage_update_on_add_shard.cpp
+++ b/src/mongo/s/query/router_stage_update_on_add_shard.cpp
@@ -56,11 +56,9 @@ bool needsUpdate(const StatusWith<ClusterQueryResult>& childResult) {
RouterStageUpdateOnAddShard::RouterStageUpdateOnAddShard(OperationContext* opCtx,
executor::TaskExecutor* executor,
ClusterClientCursorParams* params,
- std::vector<ShardId> shardIds,
BSONObj cmdToRunOnNewShards)
: RouterExecStage(opCtx, stdx::make_unique<RouterStageMerge>(opCtx, executor, params)),
_params(params),
- _shardIds(std::move(shardIds)),
_cmdToRunOnNewShards(cmdToRunOnNewShards) {}
StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
@@ -75,12 +73,18 @@ StatusWith<ClusterQueryResult> RouterStageUpdateOnAddShard::next(
}
void RouterStageUpdateOnAddShard::addNewShardCursors(BSONObj newShardDetectedObj) {
+ std::vector<ShardId> existingShardIds;
+ for (const auto& remote : _params->remotes) {
+ existingShardIds.push_back(remote.shardId);
+ }
checked_cast<RouterStageMerge*>(getChildStage())
- ->addNewShardCursors(establishShardCursorsOnNewShards(newShardDetectedObj));
+ ->addNewShardCursors(
+ establishShardCursorsOnNewShards(std::move(existingShardIds), newShardDetectedObj));
}
-std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(
- const BSONObj& newShardDetectedObj) {
+std::vector<ClusterClientCursorParams::RemoteCursor>
+RouterStageUpdateOnAddShard::establishShardCursorsOnNewShards(std::vector<ShardId> existingShardIds,
+ const BSONObj& newShardDetectedObj) {
auto* opCtx = getOpCtx();
// Reload the shard registry. We need to ensure a reload initiated after calling this method
// caused the reload, otherwise we aren't guaranteed to get all the new shards.
@@ -94,12 +98,12 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe
std::vector<ShardId> shardIds, newShardIds;
shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardIds.begin(), _shardIds.end());
+ std::sort(existingShardIds.begin(), existingShardIds.end());
std::sort(shardIds.begin(), shardIds.end());
std::set_difference(shardIds.begin(),
shardIds.end(),
- _shardIds.begin(),
- _shardIds.end(),
+ existingShardIds.begin(),
+ existingShardIds.end(),
std::back_inserter(newShardIds));
auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
@@ -108,7 +112,6 @@ std::vector<RemoteCursor> RouterStageUpdateOnAddShard::establishShardCursorsOnNe
std::vector<std::pair<ShardId, BSONObj>> requests;
for (const auto& shardId : newShardIds) {
requests.emplace_back(shardId, cmdObj);
- _shardIds.push_back(shardId);
}
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,