diff options
Diffstat (limited to 'src/mongo/db/pipeline')
25 files changed, 294 insertions, 140 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 61f38343e83..b26b9951be4 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -239,11 +239,23 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec)); } - // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as the - // split point for the pipline. All stages before this stages will run on shards and all stages - // after and inclusive of this stage will run on the MongoS. + // If the pipeline is built on MongoS, we check for topology change events here. If a topology + // change event is detected, this stage forwards the event directly to the executor via an + // exception (bypassing the rest of the pipeline). MongoS must see all topology change events, + // so it's important that this stage occurs before any filtering is performed. if (expCtx->inMongos) { stages.push_back(DocumentSourceChangeStreamCheckTopologyChange::create(expCtx)); + } + + // If 'fullDocument' is set to "updateLookup", add the DSCSLookupPostImage stage here. + if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) { + stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx)); + } + + // If the pipeline is built on MongoS, then the DSCSHandleTopologyChange stage acts as the + // split point for the pipline. All stages before this stages will run on shards and all + // stages after and inclusive of this stage will run on the MongoS. + if (expCtx->inMongos) { stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx)); } @@ -263,12 +275,6 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui stages.push_back(DocumentSourceChangeStreamLookupPreImage::create(expCtx, spec)); } - // There should be only one post-image lookup stage. If we're on the shards and producing - // input to be merged, the lookup is done on the mongos. - if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) { - stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx)); - } - return stages; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp index 8659f71e325..6ec641a23ed 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp @@ -123,24 +123,15 @@ Value DocumentSourceChangeStreamLookupPostImage::lookupPostImage(const Document& auto resumeToken = ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument()); - const auto readConcern = pExpCtx->inMongos - ? boost::optional<BSONObj>(BSON("level" - << "majority" - << "afterClusterTime" << resumeToken.getData().clusterTime)) - : boost::none; - + auto readConcern = BSON("level" + << "majority" + << "afterClusterTime" << resumeToken.getData().clusterTime); // Update lookup queries sent from mongoS to shards are allowed to use speculative majority // reads. - const auto allowSpeculativeMajorityRead = pExpCtx->inMongos; invariant(resumeToken.getData().uuid); - auto lookedUpDoc = - pExpCtx->mongoProcessInterface->lookupSingleDocument(pExpCtx, - nss, - *resumeToken.getData().uuid, - documentKey, - readConcern, - allowSpeculativeMajorityRead); + auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument( + pExpCtx, nss, *resumeToken.getData().uuid, documentKey, std::move(readConcern)); // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may // not have returned any results if the document was deleted in the time since the update op. diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h index 2a7c9666e68..4453e1f6473 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h +++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h @@ -66,12 +66,16 @@ public: StageConstraints constraints(Pipeline::SplitState pipeState) const final { invariant(pipeState != Pipeline::SplitState::kSplitForShards); + + // TODO SERVER-55659: remove the feature flag. + HostTypeRequirement hostTypeRequirement = + feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV() + ? HostTypeRequirement::kAnyShard + : HostTypeRequirement::kLocalOnly; + StageConstraints constraints(StreamType::kStreaming, PositionRequirement::kNone, - // If this is parsed on mongos it should stay on mongos. If - // we're not in a sharded cluster then it's okay to run on - // mongod. - HostTypeRequirement::kLocalOnly, + hostTypeRequirement, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, TransactionRequirement::kNotAllowed, diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 7452d9f7d98..ee6906ce199 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -158,8 +158,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface { const NamespaceString& nss, UUID collectionUUID, const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead) final { + boost::optional<BSONObj> readConcern) final { Matcher matcher(documentKey.toBson(), expCtx); auto it = std::find_if(_documentsForLookup.begin(), _documentsForLookup.end(), diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp index f550c2c68a2..f90fffe3c00 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp @@ -242,8 +242,10 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() { pipelineOpts.optimize = true; pipelineOpts.attachCursorSource = true; // By default, $graphLookup doesn't support a sharded 'from' collection. - pipelineOpts.allowTargetingShards = feature_flags::gFeatureFlagShardedLookup.isEnabled( - serverGlobalParams.featureCompatibility); + pipelineOpts.shardTargetingPolicy = feature_flags::gFeatureFlagShardedLookup.isEnabled( + serverGlobalParams.featureCompatibility) + ? ShardTargetingPolicy::kAllowed + : ShardTargetingPolicy::kNotAllowed; _variables.copyToExpCtx(_variablesParseState, _fromExpCtx.get()); auto pipeline = Pipeline::makePipeline(_fromPipeline, _fromExpCtx, pipelineOpts); while (auto next = pipeline->getNext()) { diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp index d22ee4b60bc..26345aa16ab 100644 --- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp @@ -64,7 +64,9 @@ public: : _results(std::move(results)) {} std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* ownedPipeline, bool allowTargetingShards) final { + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) final { std::unique_ptr<Pipeline, PipelineDeleter> pipeline( ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx)); pipeline->addInitialSource( diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp index 8adcb0a1bbc..e333143ae81 100644 --- a/src/mongo/db/pipeline/document_source_lookup.cpp +++ b/src/mongo/db/pipeline/document_source_lookup.cpp @@ -427,8 +427,10 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( pipelineOpts.attachCursorSource = true; pipelineOpts.validator = lookupPipeValidator; // By default, $lookup doesnt support sharded 'from' collections. - pipelineOpts.allowTargetingShards = feature_flags::gFeatureFlagShardedLookup.isEnabled( - serverGlobalParams.featureCompatibility); + pipelineOpts.shardTargetingPolicy = feature_flags::gFeatureFlagShardedLookup.isEnabled( + serverGlobalParams.featureCompatibility) + ? ShardTargetingPolicy::kAllowed + : ShardTargetingPolicy::kNotAllowed; return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts); } @@ -457,10 +459,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline( if (!_cache->isServing()) { // The cache has either been abandoned or has not yet been built. Attach a cursor. + auto shardTargetingPolicy = feature_flags::gFeatureFlagShardedLookup.isEnabledAndIgnoreFCV() + ? ShardTargetingPolicy::kAllowed + : ShardTargetingPolicy::kNotAllowed; pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline( - pipeline.release(), - feature_flags::gFeatureFlagShardedLookup - .isEnabledAndIgnoreFCV() /* allowTargetingShards*/); + pipeline.release(), shardTargetingPolicy); } // If the cache has been abandoned, release it. diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp index f094518c668..6d5eff9a5d3 100644 --- a/src/mongo/db/pipeline/document_source_lookup_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp @@ -818,7 +818,9 @@ public: } std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* ownedPipeline, bool allowTargetingShards = true) final { + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) final { std::unique_ptr<Pipeline, PipelineDeleter> pipeline( ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx)); diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp index 131d30a9161..a8194b1f305 100644 --- a/src/mongo/db/pipeline/pipeline.cpp +++ b/src/mongo/db/pipeline/pipeline.cpp @@ -652,7 +652,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline( if (opts.attachCursorSource) { pipeline = expCtx->mongoProcessInterface->attachCursorSourceToPipeline( - pipeline.release(), opts.allowTargetingShards); + pipeline.release(), opts.shardTargetingPolicy, std::move(opts.readConcern)); } return pipeline; diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h index e3ff4714577..3099fe6e2d1 100644 --- a/src/mongo/db/pipeline/pipeline.h +++ b/src/mongo/db/pipeline/pipeline.h @@ -40,6 +40,7 @@ #include "mongo/db/matcher/expression_parser.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/dependencies.h" +#include "mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/query/query_knobs_gen.h" #include "mongo/executor/task_executor.h" @@ -69,8 +70,9 @@ using PipelineValidatorCallback = std::function<void(const Pipeline&)>; struct MakePipelineOptions { bool optimize = true; bool attachCursorSource = true; - bool allowTargetingShards = true; + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed; PipelineValidatorCallback validator = nullptr; + boost::optional<BSONObj> readConcern; }; /** diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp index 95f4e91e925..9ca6c92a5c5 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp @@ -55,7 +55,6 @@ #include "mongo/db/query/collection_index_usage_tracker_decoration.h" #include "mongo/db/query/collection_query_info.h" #include "mongo/db/repl/primary_only_service.h" -#include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/transaction_coordinator_curop.h" @@ -326,19 +325,12 @@ std::vector<GenericCursor> CommonMongodProcessInterface::getIdleCursors( return CursorManager::get(expCtx->opCtx)->getIdleCursors(expCtx->opCtx, userMode); } -boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument( +boost::optional<Document> CommonMongodProcessInterface::doLookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, UUID collectionUUID, const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead) { - invariant(!readConcern); // We don't currently support a read concern on mongod - it's only - // expected to be necessary on mongos. - invariant(!allowSpeculativeMajorityRead); // We don't expect 'allowSpeculativeMajorityRead' on - // mongod - it's only expected to be necessary on - // mongos. - + MakePipelineOptions opts) { std::unique_ptr<Pipeline, PipelineDeleter> pipeline; try { // Be sure to do the lookup using the collection default collation @@ -346,11 +338,7 @@ boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument( nss, collectionUUID, _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - // When looking up on a mongoD, we only ever want to read from the local collection. By - // default, makePipeline will attach a cursor source which may read from remote if the - // collection is sharded, so we configure it to not allow that here. - MakePipelineOptions opts; - opts.allowTargetingShards = false; + pipeline = Pipeline::makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts); } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { return boost::none; @@ -364,22 +352,6 @@ boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument( << ", " << next->toString() << "]"); } - // Set the speculative read timestamp appropriately after we do a document lookup locally. We - // set the speculative read timestamp based on the timestamp used by the transaction. - repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo = - repl::SpeculativeMajorityReadInfo::get(expCtx->opCtx); - if (speculativeMajorityReadInfo.isSpeculativeRead()) { - // Speculative majority reads are required to use the 'kNoOverlap' read source. - // Storage engine operations require at least Global IS. - Lock::GlobalLock lk(expCtx->opCtx, MODE_IS); - invariant(expCtx->opCtx->recoveryUnit()->getTimestampReadSource() == - RecoveryUnit::ReadSource::kNoOverlap); - boost::optional<Timestamp> readTs = - expCtx->opCtx->recoveryUnit()->getPointInTimeReadTimestamp(expCtx->opCtx); - invariant(readTs); - speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(*readTs); - } - return lookedUpDocument; } diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h index 5321851c7bd..97e56b0b5dd 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h @@ -76,13 +76,6 @@ public: Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; - boost::optional<Document> lookupSingleDocument( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead = false) final; std::vector<GenericCursor> getIdleCursors(const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const final; BackupCursorState openBackupCursor(OperationContext* opCtx, @@ -131,6 +124,13 @@ public: std::unique_ptr<TemporaryRecordStore> rs) const final; protected: + boost::optional<Document> doLookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + MakePipelineOptions opts); + /** * Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'. */ diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h index 6bb56446ff9..1be99ee1826 100644 --- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h @@ -46,6 +46,7 @@ #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h" #include "mongo/db/pipeline/storage_stats_spec_gen.h" #include "mongo/db/query/explain_options.h" #include "mongo/db/record_id.h" @@ -265,11 +266,13 @@ public: * Changing it to a unique_ptr introduces a circular dependency on certain platforms where the * compiler expects to find an implementation of PipelineDeleter. * - * If `allowTargetingShards` is true, the cursor will only be for local reads regardless of - * whether or not this function is called in a sharded environment. + * If `shardTargetingPolicy` is kNotAllowed, the cursor will only be for local reads regardless + * of whether or not this function is called in a sharded environment. */ virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* pipeline, bool allowTargetingShards = true) = 0; + Pipeline* pipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) = 0; /** * Accepts a pipeline and attaches a cursor source to it. Returns a BSONObj of the form @@ -351,14 +354,17 @@ public: * as a unique identifier of a document, and may include an _id or all fields from the shard key * and an _id. Throws if more than one match was found. Returns boost::none if no matching * documents were found, including cases where the given namespace does not exist. + * + * If this interface needs to send requests (possibly to other nodes) in order to look up the + * document, 'readConcern' will be attached to these requests. Otherwise 'readConcern' will be + * ignored. */ virtual boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, UUID, const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead = false) = 0; + boost::optional<BSONObj> readConcern) = 0; /** * Returns a vector of all idle (non-pinned) local cursors. diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 9461572ccf9..1e42994af7f 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -100,9 +100,16 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, } // namespace std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( - Pipeline* ownedPipeline, bool allowTargetingShards) { + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy, + boost::optional<BSONObj> readConcern) { // On mongos we can't have local cursors. - return sharded_agg_helpers::attachCursorToPipeline(ownedPipeline, allowTargetingShards); + tassert(5530900, + "shardTargetingPolicy cannot be kNotAllowed on mongos", + shardTargetingPolicy != ShardTargetingPolicy::kNotAllowed); + + return sharded_agg_helpers::attachCursorToPipeline( + ownedPipeline, shardTargetingPolicy, std::move(readConcern)); } BSONObj MongosProcessInterface::preparePipelineAndExplain(Pipeline* ownedPipeline, @@ -122,8 +129,7 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument( const NamespaceString& nss, UUID collectionUUID, const Document& filter, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead) { + boost::optional<BSONObj> readConcern) { auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID); // Create the find command to be dispatched to the shard(s) in order to return the post-image. @@ -136,12 +142,10 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument( cmdBuilder.append("find", nss.coll()); } cmdBuilder.append("filter", filterObj); + cmdBuilder.append("allowSpeculativeMajorityRead", true); if (readConcern) { cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); } - if (allowSpeculativeMajorityRead) { - cmdBuilder.append("allowSpeculativeMajorityRead", true); - } try { auto findCmd = cmdBuilder.obj(); diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h index 413f2a7e02e..83998c935a1 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h @@ -50,8 +50,7 @@ public: const NamespaceString& nss, UUID collectionUUID, const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead = false) final; + boost::optional<BSONObj> readConcern) final; std::vector<GenericCursor> getIdleCursors(const boost::intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const final; @@ -234,7 +233,9 @@ public: * operation. */ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* pipeline, bool allowTargetingShards) final; + Pipeline* pipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) final; std::unique_ptr<TemporaryRecordStore> createTemporaryRecordStore( const boost::intrusive_ptr<ExpressionContext>& expCtx) const final { diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp index 615ff53aea3..fb1e4b1e976 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp @@ -40,12 +40,15 @@ #include "mongo/db/db_raii.h" #include "mongo/db/index_builds_coordinator.h" #include "mongo/db/pipeline/document_source_cursor.h" +#include "mongo/db/repl/speculative_majority_read_info.h" namespace mongo { std::unique_ptr<Pipeline, PipelineDeleter> -NonShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipeline, - bool allowTargetingShards) { +NonShardServerProcessInterface::attachCursorSourceToPipeline( + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy, + boost::optional<BSONObj> readConcern) { return attachCursorSourceToPipelineForLocalRead(ownedPipeline); } @@ -66,6 +69,38 @@ std::vector<FieldPath> NonShardServerProcessInterface::collectDocumentKeyFieldsA return {"_id"}; // Nothing is sharded. } +boost::optional<Document> NonShardServerProcessInterface::lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) { + MakePipelineOptions opts; + opts.shardTargetingPolicy = ShardTargetingPolicy::kNotAllowed; + opts.readConcern = std::move(readConcern); + + auto lookedUpDocument = + doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts)); + + // Set the speculative read timestamp appropriately after we do a document lookup locally. We + // set the speculative read timestamp based on the timestamp used by the transaction. + repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo = + repl::SpeculativeMajorityReadInfo::get(expCtx->opCtx); + if (speculativeMajorityReadInfo.isSpeculativeRead()) { + // Speculative majority reads are required to use the 'kNoOverlap' read source. + // Storage engine operations require at least Global IS. + Lock::GlobalLock lk(expCtx->opCtx, MODE_IS); + invariant(expCtx->opCtx->recoveryUnit()->getTimestampReadSource() == + RecoveryUnit::ReadSource::kNoOverlap); + boost::optional<Timestamp> readTs = + expCtx->opCtx->recoveryUnit()->getPointInTimeReadTimestamp(expCtx->opCtx); + invariant(readTs); + speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(*readTs); + } + + return lookedUpDocument; +} + Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h index 144ac86bf1a..e402d549494 100644 --- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h @@ -52,7 +52,9 @@ public: bool includeBuildUUIDs) override; std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* pipeline, bool allowTargetingShards) override; + Pipeline* pipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) override; BSONObj preparePipelineAndExplain(Pipeline* ownedPipeline, ExplainOptions::Verbosity verbosity); @@ -79,6 +81,13 @@ public: std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter( OperationContext*, const NamespaceString&) const final; + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final; + Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index eb9df64242b..8eccb087f88 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -122,6 +122,21 @@ ShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(Operati return {{"_id"}, false}; } +boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) { + // We only want to retrieve the one document that corresponds to 'documentKey', so we + // ignore collation when computing which shard to target. + MakePipelineOptions opts; + opts.shardTargetingPolicy = ShardTargetingPolicy::kForceTargetingWithSimpleCollation; + opts.readConcern = std::move(readConcern); + + return doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts)); +} + Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, @@ -381,8 +396,10 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx, std::unique_ptr<Pipeline, PipelineDeleter> ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipeline, - bool allowTargetingShards) { - return sharded_agg_helpers::attachCursorToPipeline(ownedPipeline, allowTargetingShards); + ShardTargetingPolicy shardTargetingPolicy, + boost::optional<BSONObj> readConcern) { + return sharded_agg_helpers::attachCursorToPipeline( + ownedPipeline, shardTargetingPolicy, std::move(readConcern)); } void ShardServerProcessInterface::setExpectedShardVersion( diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index f574ea938ac..b223907c792 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -71,6 +71,13 @@ public: uasserted(50997, "Unexpected attempt to consult catalog cache on a shard server"); } + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final; + /** * Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking, * routing, stale config handling, etc. @@ -124,7 +131,9 @@ public: * operation. */ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* pipeline, bool allowTargetingShards) final; + Pipeline* pipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) final; void setExpectedShardVersion(OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp index 0595eae3fbf..a8e15223613 100644 --- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp @@ -49,8 +49,10 @@ StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipelineForLocalRe } std::unique_ptr<Pipeline, PipelineDeleter> -StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipeline, - bool allowTargetingShards) { +StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipeline( + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy, + boost::optional<BSONObj> readConcern) { return attachCursorSourceToPipelineForLocalRead(ownedPipeline); } @@ -59,8 +61,7 @@ boost::optional<Document> StubLookupSingleDocumentProcessInterface::lookupSingle const NamespaceString& nss, UUID collectionUUID, const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead) { + boost::optional<BSONObj> readConcern) { // The namespace 'nss' may be different than the namespace on the ExpressionContext in the // case of a change stream on a whole database so we need to make a copy of the // ExpressionContext with the new namespace. diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h index 7d00c69a0f1..05fbf53349d 100644 --- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h @@ -73,7 +73,10 @@ public: : _mockResults(std::move(mockResults)) {} std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* ownedPipeline, bool allowTargetingShards = true) final; + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) final; + std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( Pipeline* ownedPipeline) final; @@ -82,8 +85,7 @@ public: const NamespaceString& nss, UUID collectionUUID, const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead); + boost::optional<BSONObj> readConcern); std::unique_ptr<ShardFilterer> getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const override { diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h index a5a8a8dc99c..bf500cf70e3 100644 --- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h @@ -145,7 +145,9 @@ public: } std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline( - Pipeline* pipeline, bool allowTargetingShards) override { + Pipeline* pipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none) override { MONGO_UNREACHABLE; } @@ -197,8 +199,7 @@ public: const NamespaceString& nss, UUID collectionUUID, const Document& documentKey, - boost::optional<BSONObj> readConcern, - bool allowSpeculativeMajorityRead) { + boost::optional<BSONObj> readConcern) { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 961c7bc1bcb..165daf1332c 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -130,7 +130,8 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi BSONObj genericTransformForShards(MutableDocument&& cmdForShards, const boost::intrusive_ptr<ExpressionContext>& expCtx, boost::optional<ExplainOptions::Verbosity> explainVerbosity, - BSONObj collationObj) { + BSONObj collationObj, + boost::optional<BSONObj> readConcern) { // Serialize the variables. // Check whether we are in a mixed-version cluster and so must use the old serialization format. // This is only tricky in the case we are sending an aggregate from shard to shard. For this @@ -176,6 +177,10 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, Value(static_cast<long long>(*expCtx->opCtx->getTxnNumber())); } + if (readConcern) { + cmdForShards["readConcern"] = Value(std::move(*readConcern)); + } + return cmdForShards.freeze().toBson(); } @@ -636,7 +641,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( const boost::intrusive_ptr<ExpressionContext>& expCtx, stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommandRequest> targetRequest, - boost::optional<BSONObj> shardCursorsSortSpec) { + boost::optional<BSONObj> shardCursorsSortSpec, + ShardTargetingPolicy shardTargetingPolicy, + boost::optional<BSONObj> readConcern) { auto&& [aggRequest, pipeline] = [&] { return stdx::visit( visit_helper::Overloaded{ @@ -670,7 +677,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( auto shardDispatchResults = dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest), hasChangeStream, - std::move(pipeline)); + std::move(pipeline), + shardTargetingPolicy, + std::move(readConcern)); std::vector<ShardId> targetedShards; targetedShards.reserve(shardDispatchResults.remoteCursors.size()); @@ -827,15 +836,19 @@ BSONObj createPassthroughCommandForShard( Document serializedCommand, boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, - BSONObj collationObj) { + BSONObj collationObj, + boost::optional<BSONObj> readConcern) { // Create the command for the shards. MutableDocument targetedCmd(serializedCommand); if (pipeline) { targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize()); } - auto shardCommand = - genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, collationObj); + auto shardCommand = genericTransformForShards(std::move(targetedCmd), + expCtx, + explainVerbosity, + std::move(collationObj), + std::move(readConcern)); // Apply filter and RW concern to the final shard command. return CommandHelpers::filterCommandRequestForPassthrough( @@ -849,7 +862,8 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont Document serializedCommand, const SplitPipeline& splitPipeline, const boost::optional<ShardedExchangePolicy> exchangeSpec, - bool needsMerge) { + bool needsMerge, + boost::optional<BSONObj> readConcern) { // Create the command for the shards. MutableDocument targetedCmd(serializedCommand); // If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it @@ -880,8 +894,11 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont targetedCmd[AggregateCommandRequest::kExchangeFieldName] = exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); - auto shardCommand = genericTransformForShards( - std::move(targetedCmd), expCtx, expCtx->explain, expCtx->getCollatorBSON()); + auto shardCommand = genericTransformForShards(std::move(targetedCmd), + expCtx, + expCtx->explain, + expCtx->getCollatorBSON(), + std::move(readConcern)); // Apply RW concern to the final shard command. return applyReadWriteConcern(expCtx->opCtx, @@ -898,7 +915,9 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont DispatchShardPipelineResults dispatchShardPipeline( Document serializedCommand, bool hasChangeStream, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + ShardTargetingPolicy shardTargetingPolicy, + boost::optional<BSONObj> readConcern) { auto expCtx = pipeline->getContext(); // The process is as follows: @@ -930,11 +949,19 @@ DispatchShardPipelineResults dispatchShardPipeline( ? std::move(executionNsRoutingInfoStatus.getValue()) : boost::optional<ChunkManager>{}; + // A $changeStream update lookup attempts to retrieve a single document by documentKey. In this + // case, we wish to target a single shard using the simple collation, but we also want to ensure + // that we use the collection-default collation on the shard so that the lookup can use the _id + // index. We therefore ignore the collation on the expCtx. + const auto& shardTargetingCollation = + shardTargetingPolicy == ShardTargetingPolicy::kForceTargetingWithSimpleCollation + ? CollationSpec::kSimpleSpec + : expCtx->getCollatorBSON(); + // Determine whether we can run the entire aggregation on a single shard. - const auto collationObj = expCtx->getCollatorBSON(); const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream); - std::set<ShardId> shardIds = - getTargetedShards(expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, collationObj); + std::set<ShardId> shardIds = getTargetedShards( + expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, shardTargetingCollation); // Don't need to split the pipeline if we are only targeting a single shard, unless: // - There is a stage that needs to be run on the primary shard and the single target shard @@ -962,11 +989,18 @@ DispatchShardPipelineResults dispatchShardPipeline( // Generate the command object for the targeted shards. BSONObj targetedCommand = - (splitPipelines - ? createCommandForTargetedShards( - expCtx, serializedCommand, *splitPipelines, exchangeSpec, true /* needsMerge */) - : createPassthroughCommandForShard( - expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj)); + (splitPipelines ? createCommandForTargetedShards(expCtx, + serializedCommand, + *splitPipelines, + exchangeSpec, + true /* needsMerge */, + std::move(readConcern)) + : createPassthroughCommandForShard(expCtx, + serializedCommand, + expCtx->explain, + pipeline.get(), + expCtx->getCollatorBSON(), + std::move(readConcern))); // A $changeStream pipeline must run on all shards, and will also open an extra cursor on the // config server in order to monitor for new shards. To guarantee that we do not miss any @@ -984,7 +1018,7 @@ DispatchShardPipelineResults dispatchShardPipeline( Grid::get(opCtx)->shardRegistry()->reload(opCtx); // Rebuild the set of shards as the shard registry might have changed. shardIds = getTargetedShards( - expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, collationObj); + expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, shardTargetingCollation); } // If there were no shards when we began execution, we wouldn't have run this aggregation in the @@ -1018,7 +1052,7 @@ DispatchShardPipelineResults dispatchShardPipeline( ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, shardQuery, - collationObj); + shardTargetingCollation); } } else { cursors = establishShardCursors(opCtx, @@ -1253,8 +1287,10 @@ bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) { return nss.isCollectionlessAggregateNS() || hasChangeStream; } -std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* ownedPipeline, - bool allowTargetingShards) { +std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy, + boost::optional<BSONObj> readConcern) { auto expCtx = ownedPipeline->getContext(); std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline, PipelineDeleter(expCtx->opCtx)); @@ -1274,15 +1310,19 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne return shardVersionRetry( expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() { auto pipelineToTarget = pipeline->clone(); - if (!allowTargetingShards || expCtx->ns.db() == "local" || - expCtx->ns.isReshardingLocalOplogBufferCollection()) { + if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed || + expCtx->ns.db() == "local" || expCtx->ns.isReshardingLocalOplogBufferCollection()) { // If the db is local, this may be a change stream examining the oplog. We know the // oplog, other local collections, and collections that store the local resharding // oplog buffer won't be sharded. return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( pipelineToTarget.release()); } - return targetShardsAndAddMergeCursors(expCtx, std::move(pipelineToTarget)); + return targetShardsAndAddMergeCursors(expCtx, + std::move(pipelineToTarget), + boost::none, + shardTargetingPolicy, + std::move(readConcern)); }); } diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index 1114c1a1d1a..1791411cdb4 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -124,20 +124,24 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline) DispatchShardPipelineResults dispatchShardPipeline( Document serializedCommand, bool hasChangeStream, - std::unique_ptr<Pipeline, PipelineDeleter> pipeline); + std::unique_ptr<Pipeline, PipelineDeleter> pipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none); BSONObj createPassthroughCommandForShard( const boost::intrusive_ptr<ExpressionContext>& expCtx, Document serializedCommand, boost::optional<ExplainOptions::Verbosity> explainVerbosity, Pipeline* pipeline, - BSONObj collationObj); + BSONObj collationObj, + boost::optional<BSONObj> readConcern); BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx, Document serializedCommand, const SplitPipeline& splitPipeline, const boost::optional<ShardedExchangePolicy> exchangeSpec, - bool needsMerge); + bool needsMerge, + boost::optional<BSONObj> readConcern = boost::none); /** * Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the @@ -189,8 +193,10 @@ Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx); * merging half executing in this process after attaching a $mergeCursors. Will retry on network * errors and also on StaleConfig errors to avoid restarting the entire operation. */ -std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* ownedPipeline, - bool allowTargetingShards); +std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline( + Pipeline* ownedPipeline, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none); /** * For a sharded collection, establishes remote cursors on each shard that may have results, and @@ -206,7 +212,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( const boost::intrusive_ptr<ExpressionContext>& expCtx, stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommandRequest> targetRequest, - boost::optional<BSONObj> shardCursorsSortSpec = boost::none); + boost::optional<BSONObj> shardCursorsSortSpec = boost::none, + ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed, + boost::optional<BSONObj> readConcern = boost::none); /** * For a sharded or unsharded collection, establishes a remote cursor on only the specified shard, diff --git a/src/mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h b/src/mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h new file mode 100644 index 00000000000..5c142344a2a --- /dev/null +++ b/src/mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { +enum class ShardTargetingPolicy { + kNotAllowed, + kAllowed, + kForceTargetingWithSimpleCollation, +}; +} // namespace mongo |