summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/pipeline/SConscript32
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp50
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h61
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_close_cursor.h5
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h15
-rw-r--r--src/mongo/db/pipeline/document_source_update_on_add_shard.cpp (renamed from src/mongo/s/query/document_source_update_on_add_shard.cpp)100
-rw-r--r--src/mongo/db/pipeline/document_source_update_on_add_shard.h (renamed from src/mongo/s/query/document_source_update_on_add_shard.h)34
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp3
-rw-r--r--src/mongo/db/pipeline/expression_context.h3
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp50
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/async_results_merger.cpp2
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp20
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp25
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp12
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h16
17 files changed, 232 insertions, 198 deletions
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 16dc1d93843..325e74d7c51 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -160,6 +160,7 @@ env.Library(
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
+ '$BUILD_DIR/mongo/db/pipeline/change_stream_pipeline',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/repl/isself',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 1554ab17ab3..2ad08caa453 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -223,17 +223,10 @@ pipelineEnv.InjectThirdParty(libraries=['snappy'])
pipelineEnv.Library(
target='pipeline',
source=[
- 'change_stream_document_diff_parser.cpp',
'document_source.cpp',
'document_source_add_fields.cpp',
'document_source_bucket.cpp',
'document_source_bucket_auto.cpp',
- 'document_source_change_stream.cpp',
- 'document_source_change_stream_close_cursor.cpp',
- 'document_source_change_stream_transform.cpp',
- 'document_source_change_stream_unwind_transactions.cpp',
- 'document_source_check_invalidate.cpp',
- 'document_source_check_resume_token.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
'document_source_current_op.cpp',
@@ -251,8 +244,6 @@ pipelineEnv.Library(
'document_source_list_local_sessions.cpp',
'document_source_list_sessions.cpp',
'document_source_lookup.cpp',
- 'document_source_lookup_change_post_image.cpp',
- 'document_source_lookup_change_pre_image.cpp',
'document_source_match.cpp',
'document_source_merge.cpp',
'document_source_operation_metrics.cpp',
@@ -339,6 +330,28 @@ pipelineEnv.Library(
)
env.Library(
+ target="change_stream_pipeline",
+ source=[
+ 'change_stream_document_diff_parser.cpp',
+ 'document_source_change_stream.cpp',
+ 'document_source_change_stream_close_cursor.cpp',
+ 'document_source_change_stream_transform.cpp',
+ 'document_source_change_stream_unwind_transactions.cpp',
+ 'document_source_check_invalidate.cpp',
+ 'document_source_check_resume_token.cpp',
+ 'document_source_lookup_change_post_image.cpp',
+ 'document_source_lookup_change_pre_image.cpp',
+ 'document_source_update_on_add_shard.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/pipeline/pipeline',
+ '$BUILD_DIR/mongo/db/pipeline/sharded_agg_helpers',
+ '$BUILD_DIR/mongo/s/query/router_exec_stage',
+ ],
+)
+
+
+env.Library(
target='runtime_constants_idl',
source=[
'legacy_runtime_constants.idl',
@@ -497,6 +510,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/util/clock_source_mock',
'accumulator',
'aggregation_request_helper',
+ 'change_stream_pipeline',
'document_source_mock',
'document_sources_idl',
'expression_context',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b450cae4c72..05133ece477 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_lookup_change_pre_image.h"
#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/resume_token.h"
@@ -448,6 +449,23 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
// The resume stage must come after the check invalidate stage so that the former can determine
// whether the event that matches the resume token should be followed by an "invalidate" event.
stages.push_back(DocumentSourceCheckInvalidate::create(expCtx, startAfterInvalidate));
+
+ // The resume stage 'DocumentSourceCheckResumability' should come before the split point stage
+ // 'DocumentSourceUpdateOnAddShard'.
+ if (resumeStage &&
+ resumeStage->getSourceName() == DocumentSourceCheckResumability::kStageName) {
+ stages.push_back(resumeStage);
+ resumeStage.reset();
+ }
+
+ // If the pipeline is built on MongoS, then the stage 'DocumentSourceUpdateOnAddShard' acts as
+ // the split point for the pipline. All stages before this stages will run on shards and all
+ // stages after and inclusive of this stage will run on the MongoS.
+ if (expCtx->inMongos) {
+ stages.push_back(DocumentSourceUpdateOnAddShard::create(expCtx));
+ }
+
+ // This resume stage should be 'DocumentSourceEnsureResumeTokenPresent'.
if (resumeStage) {
stages.push_back(resumeStage);
}
@@ -491,16 +509,8 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
auto stages = buildPipeline(expCtx, spec, elem);
- const bool csOptFeatureFlag =
- feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV();
-
- if (expCtx->inMongos && csOptFeatureFlag) {
- // TODO SERVER-55491: replace with DocumentSourceUpdateOnAddShard.
- stages.push_back(DocumentSourceChangeStreamPipelineSplitter::create(expCtx));
- }
-
if (!expCtx->needsMerge) {
- if (!csOptFeatureFlag) {
+ if (!feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()) {
// There should only be one close cursor stage. If we're on the shards and producing
// input to be merged, do not add a close cursor stage, since the mongos will already
// have one.
@@ -527,28 +537,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
return stages;
}
-BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(BSONObj originalCmdObj,
- Document resumeToken) {
- Document originalCmd(originalCmdObj);
- auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray();
- // A $changeStream must be the first element of the pipeline in order to be able
- // to replace (or add) a resume token.
- invariant(!pipeline[0][DocumentSourceChangeStream::kStageName].missing());
-
- MutableDocument changeStreamStage(
- pipeline[0][DocumentSourceChangeStream::kStageName].getDocument());
- changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken);
-
- // If the command was initially specified with a startAtOperationTime, we need to remove it to
- // use the new resume token.
- changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value();
- pipeline[0] =
- Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
- MutableDocument newCmd(std::move(originalCmd));
- newCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline);
- return newCmd.freeze().toBson();
-}
-
void DocumentSourceChangeStream::assertIsLegalSpecification(
const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) {
// If 'allChangesForCluster' is true, the stream must be opened on the 'admin' database with
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index eb40d2908ab..4368ffb01fd 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -184,14 +184,6 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
/**
- * Given a BSON object containing an aggregation command with a $changeStream stage, and a
- * resume token, returns a new BSON object with the same command except with the addition of a
- * resumeAfter: option containing the resume token. If there was a previous resumeAfter:
- * option, it is removed.
- */
- static BSONObj replaceResumeTokenInCommand(BSONObj originalCmdObj, Document resumeToken);
-
- /**
* Helper used by various change stream stages. Used for asserting that a certain Value of a
* field has a certain type. Will uassert() if the field does not have the expected type.
*/
@@ -269,57 +261,4 @@ private:
using DocumentSourceMatch::DocumentSourceMatch;
};
-/**
- * A DocumentSource that if part of the pipeline, directly passes on the received documents to the
- * next stages without interpreting it and marks where a sharded change streams pipeline should be
- * split. This stage should only ever be created by a mongoS.
- *
- * TODO SERVER-55491: replace this class with DocumentSourceUpdateOnAddShard.
- */
-class DocumentSourceChangeStreamPipelineSplitter final : public DocumentSource {
-public:
- static constexpr StringData kStageName = "$_internalChangeStreamPipelineSplitter"_sd;
-
- static boost::intrusive_ptr<DocumentSourceChangeStreamPipelineSplitter> create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return new DocumentSourceChangeStreamPipelineSplitter(expCtx);
- }
-
- const char* getSourceName() const final {
- return DocumentSourceChangeStreamPipelineSplitter::kStageName.rawData();
- }
-
- StageConstraints constraints(Pipeline::SplitState pipeState) const final {
- return {StreamType::kStreaming,
- PositionRequirement::kNone,
- HostTypeRequirement::kMongoS,
- DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed,
- TransactionRequirement::kNotAllowed,
- LookupRequirement::kNotAllowed,
- UnionRequirement::kNotAllowed,
- ChangeStreamRequirement::kChangeStreamStage};
- }
-
- Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- return (explain ? Value(Document{{kStageName, Document{}}}) : Value());
- }
-
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- return DistributedPlanLogic{nullptr, nullptr, change_stream_constants::kSortSpec};
- }
-
-private:
- DocumentSourceChangeStreamPipelineSplitter(
- const boost::intrusive_ptr<ExpressionContext>& expCtx)
- : DocumentSource(kStageName, expCtx) {
- invariant(expCtx->inMongos);
- }
-
- GetNextResult doGetNext() final {
- // Pass on the document to the next stage without interpreting.
- return pSource->getNext();
- }
-};
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
index 5713af11d58..59be3f550b3 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_close_cursor.h
@@ -77,10 +77,7 @@ public:
}
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- // This stage must run on mongos to ensure it sees any invalidation in the correct order,
- // and to ensure that all remote cursors are cleaned up properly.
- // {shardsStage, mergingStage, sortPattern}
- return DistributedPlanLogic{nullptr, this, change_stream_constants::kSortSpec};
+ return boost::none;
}
private:
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index c3e19a4a450..2af09ea780d 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -85,7 +85,7 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<DistributedPlanLogic> distributedPlanLogic() override {
+ boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
return boost::none;
}
@@ -134,19 +134,6 @@ public:
ChangeStreamRequirement::kChangeStreamStage};
}
- boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- DistributedPlanLogic logic;
- // This stage must run on mongos to ensure it sees the resume token, which could have come
- // from any shard. We also must include a mergingPresorted $sort stage to communicate to
- // the AsyncResultsMerger that we need to merge the streams in a particular order.
- logic.mergingStage = this;
- // Also add logic to the shards to ensure that each shard has enough oplog history to resume
- // the change stream.
- logic.shardsStage = DocumentSourceCheckResumability::create(pExpCtx, _tokenFromClient);
- logic.inputSortPattern = change_stream_constants::kSortSpec;
- return logic;
- };
-
static boost::intrusive_ptr<DocumentSourceEnsureResumeTokenPresent> create(
const boost::intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token);
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
index d531065306f..7aec2272d25 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.cpp
+++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.cpp
@@ -27,13 +27,16 @@
* it in the license file.
*/
-#include "mongo/s/query/document_source_update_on_add_shard.h"
+#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include <algorithm>
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
#include "mongo/s/query/establish_cursors.h"
@@ -59,25 +62,26 @@ bool isShardConfigEvent(const Document& eventDoc) {
} // namespace
boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> DocumentSourceUpdateOnAddShard::create(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
- std::vector<ShardId> shardsWithCursors,
- BSONObj cmdToRunOnNewShards) {
- return new DocumentSourceUpdateOnAddShard(
- expCtx, mergeCursors, std::move(shardsWithCursors), cmdToRunOnNewShards);
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceUpdateOnAddShard(expCtx);
}
DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const boost::intrusive_ptr<DocumentSourceMergeCursors>& mergeCursors,
- std::vector<ShardId>&& shardsWithCursors,
- BSONObj cmdToRunOnNewShards)
- : DocumentSource(kStageName, expCtx),
- _mergeCursors(mergeCursors),
- _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()),
- _cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
+ const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSource(kStageName, expCtx) {}
DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() {
+ // For the first call to the 'doGetNext', the '_mergeCursors' will be null and must be
+ // populated. We also resolve the original aggregation command from the expression context.
+ if (!_mergeCursors) {
+ _mergeCursors = dynamic_cast<DocumentSourceMergeCursors*>(pSource);
+ _originalAggregateCommand = pExpCtx->originalAggregateCommand.getOwned();
+
+ tassert(5549100, "Missing $mergeCursors stage", _mergeCursors);
+ tassert(
+ 5549101, "Empty $changeStream command object", !_originalAggregateCommand.isEmpty());
+ }
+
auto childResult = pSource->getNext();
// If this is an insertion into the 'config.shards' collection, open a cursor on the new shard.
@@ -107,17 +111,12 @@ std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsO
auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson()));
// Make sure we are not attempting to open a cursor on a shard that already has one.
- if (!_shardsWithCursors.insert(newShard.getName()).second) {
+ if (_mergeCursors->getShardIds().count(newShard.getName()) != 0) {
return {};
}
- // We must start the new cursor from the moment at which the shard became visible.
- const auto newShardAddedTime = LogicalTime{
- newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()};
- auto resumeTokenForNewShard =
- ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp());
- auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
- _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument());
+ auto cmdObj = createUpdatedCommandForNewShard(
+ newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp());
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
@@ -128,4 +127,59 @@ std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsO
allowPartialResults);
}
+BSONObj DocumentSourceUpdateOnAddShard::createUpdatedCommandForNewShard(Timestamp shardAddedTime) {
+ // We must start the new cursor from the moment at which the shard became visible.
+ const auto newShardAddedTime = LogicalTime{shardAddedTime};
+ auto resumeTokenForNewShard =
+ ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp());
+
+ // Create a new shard command object containing the new resume token.
+ auto shardCommand = replaceResumeTokenInCommand(resumeTokenForNewShard.toDocument());
+
+ auto* opCtx = pExpCtx->opCtx;
+ bool apiStrict = APIParameters::get(opCtx).getAPIStrict().value_or(false);
+
+ // Create the 'AggregateCommandRequest' object which will help in creating the parsed pipeline.
+ auto aggCmdRequest = aggregation_request_helper::parseFromBSON(
+ pExpCtx->ns, shardCommand, boost::none, apiStrict);
+
+ // Parse and optimize the pipeline.
+ auto pipeline = Pipeline::parse(aggCmdRequest.getPipeline(), pExpCtx);
+ pipeline->optimizePipeline();
+
+ // Split the full pipeline to get the shard pipeline.
+ auto splitPipelines = sharded_agg_helpers::splitPipeline(std::move(pipeline));
+
+ // Create the new command that will run on the shard.
+ return sharded_agg_helpers::createCommandForTargetedShards(pExpCtx,
+ Document{shardCommand},
+ splitPipelines,
+ boost::none, /* exhangeSpec */
+ true /* needsMerge */);
+}
+
+BSONObj DocumentSourceUpdateOnAddShard::replaceResumeTokenInCommand(Document resumeToken) {
+ Document originalCmd(_originalAggregateCommand);
+ auto pipeline = originalCmd[AggregateCommandRequest::kPipelineFieldName].getArray();
+
+ // A $changeStream must be the first element of the pipeline in order to be able
+ // to replace (or add) a resume token.
+ tassert(5549102,
+ "Invalid $changeStream command object",
+ !pipeline[0][DocumentSourceChangeStream::kStageName].missing());
+
+ MutableDocument changeStreamStage(
+ pipeline[0][DocumentSourceChangeStream::kStageName].getDocument());
+ changeStreamStage[DocumentSourceChangeStreamSpec::kResumeAfterFieldName] = Value(resumeToken);
+
+ // If the command was initially specified with a startAtOperationTime, we need to remove it to
+ // use the new resume token.
+ changeStreamStage[DocumentSourceChangeStreamSpec::kStartAtOperationTimeFieldName] = Value();
+ pipeline[0] =
+ Value(Document{{DocumentSourceChangeStream::kStageName, changeStreamStage.freeze()}});
+ MutableDocument newCmd(std::move(originalCmd));
+ newCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline);
+ return newCmd.freeze().toBson();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.h b/src/mongo/db/pipeline/document_source_update_on_add_shard.h
index 621b508bf63..82e981572c1 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.h
+++ b/src/mongo/db/pipeline/document_source_update_on_add_shard.h
@@ -31,6 +31,7 @@
#include <memory>
+#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/s/query/document_source_merge_cursors.h"
#include "mongo/s/shard_id.h"
@@ -53,15 +54,10 @@ public:
* by 'mergeCursorsStage' whenever a new shard is detected by a change stream.
*/
static boost::intrusive_ptr<DocumentSourceUpdateOnAddShard> create(
- const boost::intrusive_ptr<ExpressionContext>&,
- const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
- std::vector<ShardId> shardsWithCursors,
- BSONObj cmdToRunOnNewShards);
+ const boost::intrusive_ptr<ExpressionContext>&);
Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const final {
- // We only ever expect to add this stage if the pipeline is being executed locally on a
- // mongos. In this case, it should never be serialized.
- MONGO_UNREACHABLE;
+ return (explain ? Value(Document{{kStageName, Value()}}) : Value());
}
virtual StageConstraints constraints(Pipeline::SplitState) const {
@@ -77,14 +73,11 @@ public:
}
boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
- return boost::none;
+ return DistributedPlanLogic{nullptr, this, change_stream_constants::kSortSpec};
}
private:
- DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&,
- const boost::intrusive_ptr<DocumentSourceMergeCursors>&,
- std::vector<ShardId>&& shardsWithCursors,
- BSONObj cmdToRunOnNewShards);
+ DocumentSourceUpdateOnAddShard(const boost::intrusive_ptr<ExpressionContext>&);
GetNextResult doGetNext() final;
@@ -98,8 +91,21 @@ private:
*/
std::vector<RemoteCursor> establishShardCursorsOnNewShards(const Document& newShardDetectedObj);
+ /**
+ * Updates the $changeStream stage in the '_originalAggregateCommand' to reflect the start time
+ * for the newly-added shard(s), then generates the final command object to be run on those
+ * shards.
+ */
+ BSONObj createUpdatedCommandForNewShard(Timestamp shardAddedTime);
+
+ /**
+ * Given the '_originalAggregateCommand' and a resume token, returns a new BSON object with the
+ * same command except with the addition of a resumeAfter option containing the resume token.
+ * If there was a previous resumeAfter option, it will be removed.
+ */
+ BSONObj replaceResumeTokenInCommand(Document resumeToken);
+
boost::intrusive_ptr<DocumentSourceMergeCursors> _mergeCursors;
- std::set<ShardId> _shardsWithCursors;
- BSONObj _cmdToRunOnNewShards;
+ BSONObj _originalAggregateCommand;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index d63c9f4962a..c4cb0f6e411 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -214,6 +214,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
expCtx->exprUnstableForApiV1 = exprUnstableForApiV1;
expCtx->exprDeprectedForApiV1 = exprDeprectedForApiV1;
+ expCtx->initialPostBatchResumeToken = initialPostBatchResumeToken.getOwned();
+ expCtx->originalAggregateCommand = originalAggregateCommand.getOwned();
+
// Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is
// intended to be used for executing a separate aggregation pipeline.
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 84d9f46236d..5ca843b1498 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -383,6 +383,9 @@ public:
enum class CollationMatchesDefault { kNoDefault, kYes, kNo };
CollationMatchesDefault collationMatchesDefault = CollationMatchesDefault::kNoDefault;
+ // When non-empty, contains the unmodified user provided aggregation command.
+ BSONObj originalAggregateCommand;
+
protected:
static const int kInterruptCheckPeriod = 128;
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 9fe9104da25..7d405768fc2 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/pipeline/document_source_skip.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
+#include "mongo/db/pipeline/document_source_update_on_add_shard.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
#include "mongo/db/pipeline/semantic_analysis.h"
#include "mongo/db/vector_clock.h"
@@ -56,7 +57,6 @@
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/query/cluster_query_knobs_gen.h"
#include "mongo/s/query/document_source_merge_cursors.h"
-#include "mongo/s/query/document_source_update_on_add_shard.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/fail_point.h"
@@ -273,11 +273,8 @@ boost::optional<BSONObj> findSplitPoint(Pipeline::SourceContainer* shardPipe, Pi
continue;
}
- // TODO SERVER-55491: remove this 'if' to make the invariant unconditional.
- if (distributedPlanLogic->shardsStage && distributedPlanLogic->mergingStage) {
- // A source may not simultaneously be present on both sides of the split.
- invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage);
- }
+ // A source may not simultaneously be present on both sides of the split.
+ invariant(distributedPlanLogic->shardsStage != distributedPlanLogic->mergingStage);
if (distributedPlanLogic->shardsStage)
shardPipe->push_back(std::move(distributedPlanLogic->shardsStage));
@@ -837,8 +834,15 @@ BSONObj createPassthroughCommandForShard(
targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize());
}
- return genericTransformForShards(
- std::move(targetedCmd), expCtx, explainVerbosity, collationObj);
+ auto shardCommand =
+ genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, collationObj);
+
+ // Apply filter and RW concern to the final shard command.
+ return CommandHelpers::filterCommandRequestForPassthrough(
+ applyReadWriteConcern(expCtx->opCtx,
+ true, /* appendRC */
+ !explainVerbosity, /* appendWC */
+ shardCommand));
}
BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx,
@@ -876,8 +880,14 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
targetedCmd[AggregateCommandRequest::kExchangeFieldName] =
exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
- return genericTransformForShards(
+ auto shardCommand = genericTransformForShards(
std::move(targetedCmd), expCtx, expCtx->explain, expCtx->getCollatorBSON());
+
+ // Apply RW concern to the final shard command.
+ return applyReadWriteConcern(expCtx->opCtx,
+ true, /* appendRC */
+ !expCtx->explain, /* appendWC */
+ shardCommand);
}
/**
@@ -951,15 +961,12 @@ DispatchShardPipelineResults dispatchShardPipeline(
}
// Generate the command object for the targeted shards.
- BSONObj targetedCommand = applyReadWriteConcern(
- opCtx,
- true, /* appendRC */
- !expCtx->explain, /* appendWC */
- splitPipelines
- ? createCommandForTargetedShards(
- expCtx, serializedCommand, *splitPipelines, exchangeSpec, true)
- : createPassthroughCommandForShard(
- expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj));
+ BSONObj targetedCommand =
+ (splitPipelines
+ ? createCommandForTargetedShards(
+ expCtx, serializedCommand, *splitPipelines, exchangeSpec, true /* needsMerge */)
+ : createPassthroughCommandForShard(
+ expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj));
// A $changeStream pipeline must run on all shards, and will also open an extra cursor on the
// config server in order to monitor for new shards. To guarantee that we do not miss any
@@ -1097,16 +1104,9 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
armParams.setRemotes(std::move(remoteCursors));
- // For change streams, we need to set up a custom stage to establish cursors on new shards when
- // they are added, to ensure we don't miss results from the new shards.
auto mergeCursorsStage =
DocumentSourceMergeCursors::create(mergePipeline->getContext(), std::move(armParams));
- if (hasChangeStream) {
- mergePipeline->addInitialSource(DocumentSourceUpdateOnAddShard::create(
- mergePipeline->getContext(), mergeCursorsStage, targetedShards, cmdSentToShards));
- }
-
mergePipeline->addInitialSource(std::move(mergeCursorsStage));
}
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 2c05aee0f20..37dee4875f3 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -56,7 +56,6 @@ env.Library(
target="router_exec_stage",
source=[
'document_source_merge_cursors.cpp',
- 'document_source_update_on_add_shard.cpp',
'router_stage_limit.cpp',
'router_stage_mock.cpp',
'router_stage_pipeline.cpp',
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 111fde047f1..5ac4777ad48 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -128,6 +128,8 @@ AsyncResultsMerger::AsyncResultsMerger(OperationContext* opCtx,
tassert(
5493704, "Found invalidated cursor on the first batch", !_remotes.back().invalidated);
+ _remotes.back().shardId = remote.getShardId().toString();
+
// We don't check the return value of _addBatchToBuffer here; if there was an error,
// it will be stored in the remote and the first call to ready() will return true.
_addBatchToBuffer(WithLock::withoutLock(), remoteIndex, remote.getCursorResponse());
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 2521315c8e3..b9f68107a40 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -105,7 +105,8 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
const AggregateCommandRequest& request,
BSONObj collationObj,
boost::optional<UUID> uuid,
- StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces) {
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces,
+ bool hasChangeStream) {
std::unique_ptr<CollatorInterface> collation;
if (!collationObj.isEmpty()) {
@@ -126,6 +127,15 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(
uuid);
mergeCtx->inMongos = true;
+
+ // Serialize the 'AggregateCommandRequest' and save it so that the original command can be
+ // reconstructed for dispatch to a new shard, which is sometimes necessary for change streams
+ // pipelines.
+ if (hasChangeStream) {
+ mergeCtx->originalAggregateCommand =
+ aggregation_request_helper::serializeToCommandObj(request);
+ }
+
return mergeCtx;
}
@@ -293,8 +303,12 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// Build an ExpressionContext for the pipeline. This instantiates an appropriate collator,
// resolves all involved namespaces, and creates a shared MongoProcessInterface for use by
// the pipeline's stages.
- expCtx = makeExpressionContext(
- opCtx, request, collationObj, uuid, resolveInvolvedNamespaces(involvedNamespaces));
+ expCtx = makeExpressionContext(opCtx,
+ request,
+ collationObj,
+ uuid,
+ resolveInvolvedNamespaces(involvedNamespaces),
+ hasChangeStream);
// Parse and optimize the full pipeline.
auto pipeline = Pipeline::parse(request.getPipeline(), expCtx);
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index 8827907d5a2..8714c6b1aad 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -410,14 +410,15 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline(
consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
- auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards(
- expCtx, serializedCommand, consumerPipelines.back(), boost::none, false);
+ auto consumerCmdObj =
+ sharded_agg_helpers::createCommandForTargetedShards(expCtx,
+ serializedCommand,
+ consumerPipelines.back(),
+ boost::none, /* exchangeSpec */
+ false /* needsMerge */);
requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
- applyReadWriteConcern(opCtx,
- true, /* appendRC */
- !expCtx->explain, /* appendWC */
- consumerCmdObj));
+ consumerCmdObj);
}
auto cursors = establishCursors(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
@@ -620,13 +621,11 @@ Status runPipelineOnPrimaryShard(const boost::intrusive_ptr<ExpressionContext>&
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
- BSONObj cmdObj =
- applyReadWriteConcern(opCtx,
- true, /* appendRC */
- !explain, /* appendWC */
- CommandHelpers::filterCommandRequestForPassthrough(
- sharded_agg_helpers::createPassthroughCommandForShard(
- expCtx, serializedCommand, explain, nullptr, BSONObj())));
+ BSONObj cmdObj = sharded_agg_helpers::createPassthroughCommandForShard(expCtx,
+ serializedCommand,
+ explain,
+ nullptr, /* pipeline */
+ BSONObj());
const auto shardId = cm.dbPrimary();
const auto cmdObjWithShardVersion = (shardId != ShardId::kConfigServerId)
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
index a08e966817a..737ad721006 100644
--- a/src/mongo/s/query/document_source_merge_cursors.cpp
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -53,6 +53,9 @@ DocumentSourceMergeCursors::DocumentSourceMergeCursors(
_armParamsObj(std::move(ownedParamsSpec)),
_armParams(std::move(armParams)) {
_armParams->setRecordRemoteOpWaitTime(true);
+
+ // Populate the shard ids from the 'RemoteCursor'.
+ recordRemoteCursorShardIds(_armParams->getRemotes());
}
std::size_t DocumentSourceMergeCursors::getNumRemotes() const {
@@ -153,4 +156,13 @@ void DocumentSourceMergeCursors::doDispose() {
}
}
+
+void DocumentSourceMergeCursors::recordRemoteCursorShardIds(
+ const std::vector<RemoteCursor>& remoteCursors) {
+ for (const auto& remoteCursor : remoteCursors) {
+ tassert(5549103, "Encountered invalid shard ID", !remoteCursor.getShardId().empty());
+ _shardsWithCursors.emplace(remoteCursor.getShardId().toString());
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index b637e21e27a..d79c192d4b3 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -103,6 +103,13 @@ public:
std::size_t getNumRemotes() const;
/**
+ * Returns the set of shard ids whose cursor has already been established.
+ */
+ const std::set<ShardId>& getShardIds() const {
+ return _shardsWithCursors;
+ }
+
+ /**
* Returns the high water mark sort key for the given cursor, if it exists; otherwise, returns
* an empty BSONObj. Calling this method causes the underlying BlockingResultsMerger to be
* populated and assumes ownership of the remote cursors.
@@ -133,6 +140,7 @@ public:
*/
void addNewShardCursors(std::vector<RemoteCursor>&& newCursors) {
invariant(_blockingResultsMerger);
+ recordRemoteCursorShardIds(newCursors);
_blockingResultsMerger->addNewShardCursors(std::move(newCursors));
}
@@ -159,6 +167,11 @@ private:
*/
void populateMerger();
+ /**
+ * Adds the shard Ids of the given remote cursors into the _shardsWithCursors set.
+ */
+ void recordRemoteCursorShardIds(const std::vector<RemoteCursor>& remoteCursors);
+
// When we have parsed the params out of a BSONObj, the object needs to stay around while the
// params are in use. We store them here.
boost::optional<BSONObj> _armParamsObj;
@@ -182,6 +195,9 @@ private:
// Indicates whether the cursors stored in _armParams are "owned", meaning the cursors should be
// killed upon disposal of this DocumentSource.
bool _ownCursors = true;
+
+ // Set containing shard ids with valid cursors.
+ std::set<ShardId> _shardsWithCursors;
};
} // namespace mongo