summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@mongodb.com>2019-10-17 20:26:03 +0000
committerevergreen <evergreen@mongodb.com>2019-10-17 20:26:03 +0000
commit97cc7b5838db4ef13ede3149c44bceca8f5c2977 (patch)
tree38d9b78346777033545f99048f947559a67a9ab1 /src/mongo
parent1daf063435aa3f748840c476f8fa3bd13d7d2a68 (diff)
downloadmongo-97cc7b5838db4ef13ede3149c44bceca8f5c2977.tar.gz
SERVER-42723 New shard with new database can be ignored by change streams
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp59
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.idl8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp13
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp58
-rw-r--r--src/mongo/s/query/async_results_merger.cpp2
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.cpp72
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.h2
8 files changed, 143 insertions, 72 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a9d80a0e2a0..1049a9b04aa 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -244,8 +244,7 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
BSONObj DocumentSourceChangeStream::buildMatchFilter(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Timestamp startFrom,
- bool startFromInclusive,
+ Timestamp startFromInclusive,
bool showMigrationEvents) {
auto nss = expCtx->ns;
@@ -297,6 +296,11 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// 2.1) Normal CRUD ops.
auto normalOpTypeMatch = BSON("op" << NE << "n");
+ // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
+ // with 4.2, even though we no longer rely on them to detect new shards. We may wish to remove
+ // this mechanism in 4.6, or retain it for future cases where a change stream is targeted to a
+ // subset of shards. See SERVER-44039 for details.
+
// 2.2) A chunk gets migrated to a new shard that doesn't have any chunks.
auto chunkMigratedNewShardMatch = BSON("op"
<< "n"
@@ -326,7 +330,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// Only include CRUD operations tagged "fromMigrate" when the "showMigrationEvents" option is
// set - exempt all other operations and commands with that tag. Include the resume token, if
// resuming, so we can verify it was still present in the oplog.
- return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
+ return BSON("$and" << BSON_ARRAY(BSON("ts" << GTE << startFromInclusive)
<< BSON(OR(opMatch, commandAndApplyOpsMatch))));
}
@@ -388,6 +392,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
}
}
+ // If we do not have a 'resumeAfter' starting point, check for 'startAtOperationTime'.
if (auto startAtOperationTime = spec.getStartAtOperationTime()) {
uassert(40674,
"Only one type of resume option is allowed, but multiple were found.",
@@ -396,8 +401,7 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
resumeStage = DocumentSourceShardCheckResumability::create(expCtx, *startFrom);
}
- // There might not be a starting point if we're on mongos, otherwise we should either have a
- // 'resumeAfter' starting point, or should start from the latest majority committed operation.
+ // We can only run on a replica set, or through mongoS. Confirm that this is the case.
auto replCoord = repl::ReplicationCoordinator::get(expCtx->opCtx);
uassert(
40573,
@@ -405,24 +409,29 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
expCtx->inMongos ||
(replCoord &&
replCoord->getReplicationMode() == repl::ReplicationCoordinator::Mode::modeReplSet));
- if (!startFrom && !expCtx->inMongos) {
- startFrom = replCoord->getMyLastAppliedOpTime().getTimestamp();
+
+ // If we do not have an explicit starting point, we should start from the latest majority
+ // committed operation. If we are on mongoS and do not have a starting point, set it to the
+ // current clusterTime so that all shards start in sync. We always start one tick beyond the
+ // most recent operation, to ensure that the stream does not return it.
+ if (!startFrom) {
+ const auto currentTime = !expCtx->inMongos
+ ? LogicalTime{replCoord->getMyLastAppliedOpTime().getTimestamp()}
+ : LogicalClock::get(expCtx->opCtx)->getClusterTime();
+ startFrom = currentTime.addTicks(1).asTimestamp();
}
- if (startFrom) {
- const bool startFromInclusive = (resumeStage != nullptr);
- stages.push_back(DocumentSourceOplogMatch::create(
- DocumentSourceChangeStream::buildMatchFilter(
- expCtx, *startFrom, startFromInclusive, showMigrationEvents),
- expCtx));
-
- // If we haven't already populated the initial PBRT, then we are starting from a specific
- // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
- if (expCtx->initialPostBatchResumeToken.isEmpty()) {
- Timestamp startTime{startFrom->getSecs(), startFrom->getInc() + (!startFromInclusive)};
- expCtx->initialPostBatchResumeToken =
- ResumeToken::makeHighWaterMarkToken(startTime).toDocument().toBson();
- }
+ // We must always build the DSOplogMatch stage even on mongoS, since our validation logic relies
+ // upon the fact that it is always the first stage in the pipeline.
+ stages.push_back(DocumentSourceOplogMatch::create(
+ DocumentSourceChangeStream::buildMatchFilter(expCtx, *startFrom, showMigrationEvents),
+ expCtx));
+
+ // If we haven't already populated the initial PBRT, then we are starting from a specific
+ // timestamp rather than a resume token. Initialize the PBRT to a high water mark token.
+ if (expCtx->initialPostBatchResumeToken.isEmpty()) {
+ expCtx->initialPostBatchResumeToken =
+ ResumeToken::makeHighWaterMarkToken(*startFrom).toDocument().toBson();
}
// Obtain the current FCV and use it to create the DocumentSourceChangeStreamTransform stage.
@@ -516,12 +525,14 @@ void DocumentSourceChangeStream::assertIsLegalSpecification(
(expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS()));
// Prevent $changeStream from running on internal databases. A stream may run against the
- // 'admin' database iff 'allChangesForCluster' is true.
+ // 'admin' database iff 'allChangesForCluster' is true. A stream may run against the 'config'
+ // database iff 'allowToRunOnConfigDB' is true.
+ const bool isNotBannedInternalDB =
+ !expCtx->ns.isLocal() && (!expCtx->ns.isConfigDB() || spec.getAllowToRunOnConfigDB());
uassert(ErrorCodes::InvalidNamespace,
str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db()
<< " database",
- expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster()
- : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB()));
+ expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() : isNotBannedInternalDB);
// Prevent $changeStream from running on internal collections in any database.
uassert(ErrorCodes::InvalidNamespace,
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 182a9373e12..d3492b0c319 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -166,7 +166,6 @@ public:
*/
static BSONObj buildMatchFilter(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Timestamp startFrom,
- bool startFromInclusive,
bool showMigrationEvents);
/**
diff --git a/src/mongo/db/pipeline/document_source_change_stream.idl b/src/mongo/db/pipeline/document_source_change_stream.idl
index 93a92c25173..410e5ab9f15 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.idl
+++ b/src/mongo/db/pipeline/document_source_change_stream.idl
@@ -109,3 +109,11 @@ structs:
deletes may appear that do not reflect actual deletions or insertions
of data. Instead they reflect this data moving from one shard to
another.
+ allowToRunOnConfigDB:
+ cpp_name: allowToRunOnConfigDB
+ type: bool
+ default: false
+ description: A flag indicating whether the change stream may be opened on the
+ 'config' database, which is usually banned. This flag is used
+ internally to allow mongoS to open a stream on 'config.shards', in
+ order to monitor for the addition of new shards to the cluster. \ No newline at end of file
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index a147e5271c3..9d86e364189 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -362,14 +362,11 @@ Value DocumentSourceChangeStreamTransform::serialize(
changeStreamOptions[DocumentSourceChangeStreamSpec::kStartAfterFieldName].missing()) {
MutableDocument newChangeStreamOptions(changeStreamOptions);
- // Use the current cluster time plus 1 tick since the oplog query will include all
- // operations/commands equal to or greater than the 'startAtOperationTime' timestamp. In
- // particular, avoid including the last operation that went through mongos in an attempt to
- // match the behavior of a replica set more closely.
- auto clusterTime = LogicalClock::get(pExpCtx->opCtx)->getClusterTime();
- clusterTime.addTicks(1);
- newChangeStreamOptions[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] =
- Value(clusterTime.asTimestamp());
+ // Configure the serialized $changeStream to start from the initial high-watermark
+ // postBatchResumeToken which we generated while parsing the $changeStream pipeline.
+ invariant(!pExpCtx->initialPostBatchResumeToken.isEmpty());
+ newChangeStreamOptions[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] =
+ Value(pExpCtx->initialPostBatchResumeToken);
changeStreamOptions = newChangeStreamOptions.freeze();
}
return Value(Document{{getSourceName(), changeStreamOptions}});
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index a469b6d9807..f451d139d21 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -36,13 +36,17 @@
#include "mongo/client/connpool.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/curop.h"
+#include "mongo/db/logical_clock.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/query/find_common.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/s/grid.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
#include "mongo/s/query/cluster_aggregation_planner.h"
#include "mongo/s/query/cluster_cursor_manager.h"
@@ -109,6 +113,33 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
return explainCommandBuilder.freeze();
}
+/**
+ * Open a $changeStream cursor on the 'config.shards' collection to watch for new shards.
+ */
+RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Timestamp startMonitoringAtTime) {
+ const auto& configShard = Grid::get(expCtx->opCtx)->shardRegistry()->getConfigShard();
+ // Pipeline: {$changeStream: {startAtOperationTime: [now], allowToRunOnConfigDB: true}}
+ AggregationRequest aggReq(
+ ShardType::ConfigNS,
+ {BSON(DocumentSourceChangeStream::kStageName
+ << BSON(DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName
+ << startMonitoringAtTime
+ << DocumentSourceChangeStreamSpec::kAllowToRunOnConfigDBFieldName << true))});
+ aggReq.setFromMongos(true);
+ aggReq.setNeedsMerge(true);
+ aggReq.setBatchSize(0);
+ auto configCursor =
+ establishCursors(expCtx->opCtx,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ aggReq.getNamespaceString(),
+ ReadPreferenceSetting{ReadPreference::PrimaryPreferred},
+ {{configShard->getId(), aggReq.serializeToCommandObj().toBson()}},
+ false);
+ invariant(configCursor.size() == 1);
+ return std::move(*configCursor.begin());
+}
+
Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx) {
// The idempotent retry policy will retry even for writeConcern failures, so only set it if the
// pipeline does not support writeConcern.
@@ -575,13 +606,17 @@ DispatchShardPipelineResults dispatchShardPipeline(
pipeline.get(),
expCtx->collation);
- // In order for a $changeStream to work reliably, we need the shard registry to be at least as
- // current as the logical time at which the pipeline was serialized to 'targetedCommand' above.
- // We therefore hard-reload and retarget the shards here. We don't refresh for other pipelines
- // that must run on all shards (e.g. $currentOp) because, unlike $changeStream, those pipelines
- // may not have been forced to split if there was only one shard in the cluster when the command
- // began execution. If a shard was added since the earlier targeting logic ran, then refreshing
- // here may cause us to illegally target an unsplit pipeline to more than one shard.
+ // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the
+ // config server in order to monitor for new shards. To guarantee that we do not miss any
+ // shards, we must ensure that the list of shards to which we initially dispatch the pipeline is
+ // at least as current as the logical time at which the stream begins scanning for new shards.
+ // We therefore set 'shardRegistryReloadTime' to the current clusterTime and then hard-reload
+ // the shard registry. We don't refresh for other pipelines that must run on all shards (e.g.
+ // $currentOp) because, unlike $changeStream, those pipelines may not have been forced to split
+ // if there was only one shard in the cluster when the command began execution. If a shard was
+ // added since the earlier targeting logic ran, then refreshing here may cause us to illegally
+ // target an unsplit pipeline to more than one shard.
+ auto shardRegistryReloadTime = LogicalClock::get(opCtx)->getClusterTime().asTimestamp();
if (hasChangeStream) {
auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
if (!shardRegistry->reload(opCtx)) {
@@ -636,12 +671,19 @@ DispatchShardPipelineResults dispatchShardPipeline(
invariant(cursors.size() % shardIds.size() == 0,
str::stream() << "Number of cursors (" << cursors.size()
<< ") is not a multiple of producers (" << shardIds.size() << ")");
+
+ // For $changeStream, we must open an extra cursor on the 'config.shards' collection, so
+ // that we can monitor for the addition of new shards inline with real events.
+ if (hasChangeStream && expCtx->ns.db() != ShardType::ConfigNS.db()) {
+ cursors.emplace_back(openChangeStreamNewShardMonitor(expCtx, shardRegistryReloadTime));
+ }
}
// Convert remote cursors into a vector of "owned" cursors.
std::vector<OwnedRemoteCursor> ownedCursors;
for (auto&& cursor : cursors) {
- ownedCursors.emplace_back(OwnedRemoteCursor(opCtx, std::move(cursor), expCtx->ns));
+ auto cursorNss = cursor.getCursorResponse().getNSS();
+ ownedCursors.emplace_back(opCtx, std::move(cursor), std::move(cursorNss));
}
// Record the number of shards involved in the aggregation. If we are required to merge on
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index bc89f6aa19a..ae6aaf092b4 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -396,7 +396,7 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
}
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.getNss().db().toString(), cmdObj, _opCtx);
+ remote.getTargetHost(), remote.cursorNss.db().toString(), cmdObj, _opCtx);
auto callbackStatus =
_executor->scheduleRemoteCommand(request, [this, remoteIndex](auto const& cbData) {
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp
index d94a99518b7..4ae4318c997 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.cpp
+++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp
@@ -32,6 +32,7 @@
#include <algorithm>
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
@@ -40,10 +41,20 @@
namespace mongo {
namespace {
-// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
-bool needsUpdate(const Document& childResult) {
- return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
- DocumentSourceChangeStream::kNewShardDetectedOpType;
+// Returns true if the change stream document is an event in 'config.shards'.
+bool isShardConfigEvent(const Document& eventDoc) {
+ // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
+ // with 4.2, even though we no longer rely on them to detect new shards. We swallow the event
+ // here. We may wish to remove this mechanism entirely 4.6, or retain it for future cases where
+ // a change stream is targeted to a subset of shards. See SERVER-44039 for details.
+ if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
+ DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ return true;
+ }
+ auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField];
+ return nsObj.getType() == BSONType::Object &&
+ nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() &&
+ nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll();
}
} // namespace
@@ -69,14 +80,19 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
: DocumentSource(kStageName, expCtx),
_executor(std::move(executor)),
_mergeCursors(mergeCursors),
- _shardsWithCursors(std::move(shardsWithCursors)),
+ _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()),
_cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() {
auto childResult = pSource->getNext();
- while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) {
- addNewShardCursors(childResult.getDocument());
+ // If this is an insertion into the 'config.shards' collection, open a cursor on the new shard.
+ while (childResult.isAdvanced() && isShardConfigEvent(childResult.getDocument())) {
+ auto opType = childResult.getDocument()[DocumentSourceChangeStream::kOperationTypeField];
+ if (opType.getStringData() == DocumentSourceChangeStream::kInsertOpType) {
+ addNewShardCursors(childResult.getDocument());
+ }
+ // For shard removal or update, we do nothing. We also swallow kNewShardDetectedOpType.
childResult = pSource->getNext();
}
return childResult;
@@ -88,41 +104,39 @@ void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShard
std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards(
const Document& newShardDetectedObj) {
- auto* opCtx = pExpCtx->opCtx;
// Reload the shard registry. We need to ensure a reload initiated after calling this method
- // caused the reload, otherwise we aren't guaranteed to get all the new shards.
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
+ // caused the reload, otherwise we may not see the new shard, so we perform a "hard" reload.
+ auto* opCtx = pExpCtx->opCtx;
+ if (!Grid::get(opCtx)->shardRegistry()->reload(opCtx)) {
// A 'false' return from shardRegistry.reload() means a reload was already in progress and
// it completed before reload() returned. So another reload(), regardless of return value,
// will ensure a reload started after the first call to reload().
- shardRegistry->reload(opCtx);
+ Grid::get(opCtx)->shardRegistry()->reload(opCtx);
}
- std::vector<ShardId> shardIds, newShardIds;
- shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end());
- std::sort(shardIds.begin(), shardIds.end());
- std::set_difference(shardIds.begin(),
- shardIds.end(),
- _shardsWithCursors.begin(),
- _shardsWithCursors.end(),
- std::back_inserter(newShardIds));
+ // Parse the new shard's information from the document inserted into 'config.shards'.
+ auto newShardSpec = newShardDetectedObj[DocumentSourceChangeStream::kFullDocumentField];
+ auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson()));
- auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
- _cmdToRunOnNewShards,
- newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument());
- std::vector<std::pair<ShardId, BSONObj>> requests;
- for (const auto& shardId : newShardIds) {
- requests.emplace_back(shardId, cmdObj);
- _shardsWithCursors.push_back(shardId);
+ // Make sure we are not attempting to open a cursor on a shard that already has one.
+ if (!_shardsWithCursors.insert(newShard.getName()).second) {
+ return {};
}
+
+ // We must start the new cursor from the moment at which the shard became visible.
+ const auto newShardAddedTime = LogicalTime{
+ newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()};
+ auto resumeTokenForNewShard =
+ ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp());
+ auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
+ _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument());
+
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
_executor,
pExpCtx->ns,
ReadPreferenceSetting::get(opCtx),
- requests,
+ {{newShard.getName(), cmdObj}},
allowPartialResults);
}
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/s/query/document_source_update_on_add_shard.h
index 0b41fde92d1..ff76d2ce90e 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.h
+++ b/src/mongo/s/query/document_source_update_on_add_shard.h
@@ -102,7 +102,7 @@ private:
std::shared_ptr<executor::TaskExecutor> _executor;
boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors;
- std::vector<ShardId> _shardsWithCursors;
+ std::set<ShardId> _shardsWithCursors;
BSONObj _cmdToRunOnNewShards;
};
} // namespace mongo