summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorDrew Paroski <drew.paroski@mongodb.com>2021-06-25 19:06:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-08 05:53:42 +0000
commit50315039df4c36d9cde809eaaf56c439ee767340 (patch)
tree5d0b6dec8597ad9d841ce3d46ca0289e6f2edf26 /src/mongo/db
parentfd4bca02d3ec8ceb03e8ff62d22469e14b093889 (diff)
downloadmongo-50315039df4c36d9cde809eaaf56c439ee767340.tar.gz
SERVER-55309 Re-order post-image lookup stages when expanding $changeStream
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp24
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp19
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup_test.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.cpp13
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_test.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp2
-rw-r--r--src/mongo/db/pipeline/pipeline.h4
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp34
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h14
-rw-r--r--src/mongo/db/pipeline/process_interface/mongo_process_interface.h16
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp18
-rw-r--r--src/mongo/db/pipeline/process_interface/mongos_process_interface.h7
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp39
-rw-r--r--src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h11
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp21
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h11
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp9
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h8
-rw-r--r--src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h7
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp90
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h20
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h38
-rw-r--r--src/mongo/db/s/resharding/resharding_agg_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp4
27 files changed, 300 insertions, 142 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 61f38343e83..b26b9951be4 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -239,11 +239,23 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
stages.push_back(DocumentSourceChangeStreamCheckResumability::create(expCtx, spec));
}
- // If the pipeline is built on MongoS, then the stage 'DSCSHandleTopologyChange' acts as the
- // split point for the pipline. All stages before this stages will run on shards and all stages
- // after and inclusive of this stage will run on the MongoS.
+ // If the pipeline is built on MongoS, we check for topology change events here. If a topology
+ // change event is detected, this stage forwards the event directly to the executor via an
+ // exception (bypassing the rest of the pipeline). MongoS must see all topology change events,
+ // so it's important that this stage occurs before any filtering is performed.
if (expCtx->inMongos) {
stages.push_back(DocumentSourceChangeStreamCheckTopologyChange::create(expCtx));
+ }
+
+ // If 'fullDocument' is set to "updateLookup", add the DSCSLookupPostImage stage here.
+ if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) {
+ stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx));
+ }
+
+ // If the pipeline is built on MongoS, then the DSCSHandleTopologyChange stage acts as the
+ // split point for the pipline. All stages before this stages will run on shards and all
+ // stages after and inclusive of this stage will run on the MongoS.
+ if (expCtx->inMongos) {
stages.push_back(DocumentSourceChangeStreamHandleTopologyChange::create(expCtx));
}
@@ -263,12 +275,6 @@ std::list<boost::intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::_bui
stages.push_back(DocumentSourceChangeStreamLookupPreImage::create(expCtx, spec));
}
- // There should be only one post-image lookup stage. If we're on the shards and producing
- // input to be merged, the lookup is done on the mongos.
- if (spec.getFullDocument() == FullDocumentModeEnum::kUpdateLookup) {
- stages.push_back(DocumentSourceChangeStreamLookupPostImage::create(expCtx));
- }
-
return stages;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp
index 8659f71e325..6ec641a23ed 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.cpp
@@ -123,24 +123,15 @@ Value DocumentSourceChangeStreamLookupPostImage::lookupPostImage(const Document&
auto resumeToken =
ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument());
- const auto readConcern = pExpCtx->inMongos
- ? boost::optional<BSONObj>(BSON("level"
- << "majority"
- << "afterClusterTime" << resumeToken.getData().clusterTime))
- : boost::none;
-
+ auto readConcern = BSON("level"
+ << "majority"
+ << "afterClusterTime" << resumeToken.getData().clusterTime);
// Update lookup queries sent from mongoS to shards are allowed to use speculative majority
// reads.
- const auto allowSpeculativeMajorityRead = pExpCtx->inMongos;
invariant(resumeToken.getData().uuid);
- auto lookedUpDoc =
- pExpCtx->mongoProcessInterface->lookupSingleDocument(pExpCtx,
- nss,
- *resumeToken.getData().uuid,
- documentKey,
- readConcern,
- allowSpeculativeMajorityRead);
+ auto lookedUpDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument(
+ pExpCtx, nss, *resumeToken.getData().uuid, documentKey, std::move(readConcern));
// Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may
// not have returned any results if the document was deleted in the time since the update op.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h
index 2a7c9666e68..4453e1f6473 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_lookup_post_image.h
@@ -66,12 +66,16 @@ public:
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
invariant(pipeState != Pipeline::SplitState::kSplitForShards);
+
+ // TODO SERVER-55659: remove the feature flag.
+ HostTypeRequirement hostTypeRequirement =
+ feature_flags::gFeatureFlagChangeStreamsOptimization.isEnabledAndIgnoreFCV()
+ ? HostTypeRequirement::kAnyShard
+ : HostTypeRequirement::kLocalOnly;
+
StageConstraints constraints(StreamType::kStreaming,
PositionRequirement::kNone,
- // If this is parsed on mongos it should stay on mongos. If
- // we're not in a sharded cluster then it's okay to run on
- // mongod.
- HostTypeRequirement::kLocalOnly,
+ hostTypeRequirement,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 7452d9f7d98..ee6906ce199 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -158,8 +158,7 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
const NamespaceString& nss,
UUID collectionUUID,
const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead) final {
+ boost::optional<BSONObj> readConcern) final {
Matcher matcher(documentKey.toBson(), expCtx);
auto it = std::find_if(_documentsForLookup.begin(),
_documentsForLookup.end(),
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.cpp b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
index f550c2c68a2..f90fffe3c00 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.cpp
@@ -242,8 +242,10 @@ void DocumentSourceGraphLookUp::doBreadthFirstSearch() {
pipelineOpts.optimize = true;
pipelineOpts.attachCursorSource = true;
// By default, $graphLookup doesn't support a sharded 'from' collection.
- pipelineOpts.allowTargetingShards = feature_flags::gFeatureFlagShardedLookup.isEnabled(
- serverGlobalParams.featureCompatibility);
+ pipelineOpts.shardTargetingPolicy = feature_flags::gFeatureFlagShardedLookup.isEnabled(
+ serverGlobalParams.featureCompatibility)
+ ? ShardTargetingPolicy::kAllowed
+ : ShardTargetingPolicy::kNotAllowed;
_variables.copyToExpCtx(_variablesParseState, _fromExpCtx.get());
auto pipeline = Pipeline::makePipeline(_fromPipeline, _fromExpCtx, pipelineOpts);
while (auto next = pipeline->getNext()) {
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
index d22ee4b60bc..26345aa16ab 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_graph_lookup_test.cpp
@@ -64,7 +64,9 @@ public:
: _results(std::move(results)) {}
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* ownedPipeline, bool allowTargetingShards) final {
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) final {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx));
pipeline->addInitialSource(
diff --git a/src/mongo/db/pipeline/document_source_lookup.cpp b/src/mongo/db/pipeline/document_source_lookup.cpp
index 8adcb0a1bbc..e333143ae81 100644
--- a/src/mongo/db/pipeline/document_source_lookup.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup.cpp
@@ -427,8 +427,10 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
pipelineOpts.attachCursorSource = true;
pipelineOpts.validator = lookupPipeValidator;
// By default, $lookup doesnt support sharded 'from' collections.
- pipelineOpts.allowTargetingShards = feature_flags::gFeatureFlagShardedLookup.isEnabled(
- serverGlobalParams.featureCompatibility);
+ pipelineOpts.shardTargetingPolicy = feature_flags::gFeatureFlagShardedLookup.isEnabled(
+ serverGlobalParams.featureCompatibility)
+ ? ShardTargetingPolicy::kAllowed
+ : ShardTargetingPolicy::kNotAllowed;
return Pipeline::makePipeline(_resolvedPipeline, _fromExpCtx, pipelineOpts);
}
@@ -457,10 +459,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> DocumentSourceLookUp::buildPipeline(
if (!_cache->isServing()) {
// The cache has either been abandoned or has not yet been built. Attach a cursor.
+ auto shardTargetingPolicy = feature_flags::gFeatureFlagShardedLookup.isEnabledAndIgnoreFCV()
+ ? ShardTargetingPolicy::kAllowed
+ : ShardTargetingPolicy::kNotAllowed;
pipeline = pExpCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- pipeline.release(),
- feature_flags::gFeatureFlagShardedLookup
- .isEnabledAndIgnoreFCV() /* allowTargetingShards*/);
+ pipeline.release(), shardTargetingPolicy);
}
// If the cache has been abandoned, release it.
diff --git a/src/mongo/db/pipeline/document_source_lookup_test.cpp b/src/mongo/db/pipeline/document_source_lookup_test.cpp
index f094518c668..6d5eff9a5d3 100644
--- a/src/mongo/db/pipeline/document_source_lookup_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_test.cpp
@@ -818,7 +818,9 @@ public:
}
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* ownedPipeline, bool allowTargetingShards = true) final {
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) final {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx));
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 131d30a9161..a8194b1f305 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -652,7 +652,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> Pipeline::makePipeline(
if (opts.attachCursorSource) {
pipeline = expCtx->mongoProcessInterface->attachCursorSourceToPipeline(
- pipeline.release(), opts.allowTargetingShards);
+ pipeline.release(), opts.shardTargetingPolicy, std::move(opts.readConcern));
}
return pipeline;
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index e3ff4714577..3099fe6e2d1 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -40,6 +40,7 @@
#include "mongo/db/matcher/expression_parser.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/dependencies.h"
+#include "mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/executor/task_executor.h"
@@ -69,8 +70,9 @@ using PipelineValidatorCallback = std::function<void(const Pipeline&)>;
struct MakePipelineOptions {
bool optimize = true;
bool attachCursorSource = true;
- bool allowTargetingShards = true;
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed;
PipelineValidatorCallback validator = nullptr;
+ boost::optional<BSONObj> readConcern;
};
/**
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
index 95f4e91e925..9ca6c92a5c5 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.cpp
@@ -55,7 +55,6 @@
#include "mongo/db/query/collection_index_usage_tracker_decoration.h"
#include "mongo/db/query/collection_query_info.h"
#include "mongo/db/repl/primary_only_service.h"
-#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/transaction_coordinator_curop.h"
@@ -326,19 +325,12 @@ std::vector<GenericCursor> CommonMongodProcessInterface::getIdleCursors(
return CursorManager::get(expCtx->opCtx)->getIdleCursors(expCtx->opCtx, userMode);
}
-boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument(
+boost::optional<Document> CommonMongodProcessInterface::doLookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
UUID collectionUUID,
const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead) {
- invariant(!readConcern); // We don't currently support a read concern on mongod - it's only
- // expected to be necessary on mongos.
- invariant(!allowSpeculativeMajorityRead); // We don't expect 'allowSpeculativeMajorityRead' on
- // mongod - it's only expected to be necessary on
- // mongos.
-
+ MakePipelineOptions opts) {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
try {
// Be sure to do the lookup using the collection default collation
@@ -346,11 +338,7 @@ boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument(
nss,
collectionUUID,
_getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
- // When looking up on a mongoD, we only ever want to read from the local collection. By
- // default, makePipeline will attach a cursor source which may read from remote if the
- // collection is sharded, so we configure it to not allow that here.
- MakePipelineOptions opts;
- opts.allowTargetingShards = false;
+
pipeline = Pipeline::makePipeline({BSON("$match" << documentKey)}, foreignExpCtx, opts);
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
return boost::none;
@@ -364,22 +352,6 @@ boost::optional<Document> CommonMongodProcessInterface::lookupSingleDocument(
<< ", " << next->toString() << "]");
}
- // Set the speculative read timestamp appropriately after we do a document lookup locally. We
- // set the speculative read timestamp based on the timestamp used by the transaction.
- repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
- repl::SpeculativeMajorityReadInfo::get(expCtx->opCtx);
- if (speculativeMajorityReadInfo.isSpeculativeRead()) {
- // Speculative majority reads are required to use the 'kNoOverlap' read source.
- // Storage engine operations require at least Global IS.
- Lock::GlobalLock lk(expCtx->opCtx, MODE_IS);
- invariant(expCtx->opCtx->recoveryUnit()->getTimestampReadSource() ==
- RecoveryUnit::ReadSource::kNoOverlap);
- boost::optional<Timestamp> readTs =
- expCtx->opCtx->recoveryUnit()->getPointInTimeReadTimestamp(expCtx->opCtx);
- invariant(readTs);
- speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(*readTs);
- }
-
return lookedUpDocument;
}
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
index 5321851c7bd..97e56b0b5dd 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
@@ -76,13 +76,6 @@ public:
Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
- boost::optional<Document> lookupSingleDocument(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead = false) final;
std::vector<GenericCursor> getIdleCursors(const boost::intrusive_ptr<ExpressionContext>& expCtx,
CurrentOpUserMode userMode) const final;
BackupCursorState openBackupCursor(OperationContext* opCtx,
@@ -131,6 +124,13 @@ public:
std::unique_ptr<TemporaryRecordStore> rs) const final;
protected:
+ boost::optional<Document> doLookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ MakePipelineOptions opts);
+
/**
* Builds an ordered insert op on namespace 'nss' and documents to be written 'objs'.
*/
diff --git a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
index 6bb56446ff9..1be99ee1826 100644
--- a/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongo_process_interface.h
@@ -46,6 +46,7 @@
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h"
#include "mongo/db/pipeline/storage_stats_spec_gen.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/record_id.h"
@@ -265,11 +266,13 @@ public:
* Changing it to a unique_ptr introduces a circular dependency on certain platforms where the
* compiler expects to find an implementation of PipelineDeleter.
*
- * If `allowTargetingShards` is true, the cursor will only be for local reads regardless of
- * whether or not this function is called in a sharded environment.
+ * If `shardTargetingPolicy` is kNotAllowed, the cursor will only be for local reads regardless
+ * of whether or not this function is called in a sharded environment.
*/
virtual std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* pipeline, bool allowTargetingShards = true) = 0;
+ Pipeline* pipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) = 0;
/**
* Accepts a pipeline and attaches a cursor source to it. Returns a BSONObj of the form
@@ -351,14 +354,17 @@ public:
* as a unique identifier of a document, and may include an _id or all fields from the shard key
* and an _id. Throws if more than one match was found. Returns boost::none if no matching
* documents were found, including cases where the given namespace does not exist.
+ *
+ * If this interface needs to send requests (possibly to other nodes) in order to look up the
+ * document, 'readConcern' will be attached to these requests. Otherwise 'readConcern' will be
+ * ignored.
*/
virtual boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
UUID,
const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead = false) = 0;
+ boost::optional<BSONObj> readConcern) = 0;
/**
* Returns a vector of all idle (non-pinned) local cursors.
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
index 9461572ccf9..1e42994af7f 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp
@@ -100,9 +100,16 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
} // namespace
std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline(
- Pipeline* ownedPipeline, bool allowTargetingShards) {
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy,
+ boost::optional<BSONObj> readConcern) {
// On mongos we can't have local cursors.
- return sharded_agg_helpers::attachCursorToPipeline(ownedPipeline, allowTargetingShards);
+ tassert(5530900,
+ "shardTargetingPolicy cannot be kNotAllowed on mongos",
+ shardTargetingPolicy != ShardTargetingPolicy::kNotAllowed);
+
+ return sharded_agg_helpers::attachCursorToPipeline(
+ ownedPipeline, shardTargetingPolicy, std::move(readConcern));
}
BSONObj MongosProcessInterface::preparePipelineAndExplain(Pipeline* ownedPipeline,
@@ -122,8 +129,7 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument(
const NamespaceString& nss,
UUID collectionUUID,
const Document& filter,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead) {
+ boost::optional<BSONObj> readConcern) {
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID);
// Create the find command to be dispatched to the shard(s) in order to return the post-image.
@@ -136,12 +142,10 @@ boost::optional<Document> MongosProcessInterface::lookupSingleDocument(
cmdBuilder.append("find", nss.coll());
}
cmdBuilder.append("filter", filterObj);
+ cmdBuilder.append("allowSpeculativeMajorityRead", true);
if (readConcern) {
cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
}
- if (allowSpeculativeMajorityRead) {
- cmdBuilder.append("allowSpeculativeMajorityRead", true);
- }
try {
auto findCmd = cmdBuilder.obj();
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
index 413f2a7e02e..83998c935a1 100644
--- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.h
@@ -50,8 +50,7 @@ public:
const NamespaceString& nss,
UUID collectionUUID,
const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead = false) final;
+ boost::optional<BSONObj> readConcern) final;
std::vector<GenericCursor> getIdleCursors(const boost::intrusive_ptr<ExpressionContext>& expCtx,
CurrentOpUserMode userMode) const final;
@@ -234,7 +233,9 @@ public:
* operation.
*/
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* pipeline, bool allowTargetingShards) final;
+ Pipeline* pipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) final;
std::unique_ptr<TemporaryRecordStore> createTemporaryRecordStore(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const final {
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
index 615ff53aea3..fb1e4b1e976 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.cpp
@@ -40,12 +40,15 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/pipeline/document_source_cursor.h"
+#include "mongo/db/repl/speculative_majority_read_info.h"
namespace mongo {
std::unique_ptr<Pipeline, PipelineDeleter>
-NonShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipeline,
- bool allowTargetingShards) {
+NonShardServerProcessInterface::attachCursorSourceToPipeline(
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy,
+ boost::optional<BSONObj> readConcern) {
return attachCursorSourceToPipelineForLocalRead(ownedPipeline);
}
@@ -66,6 +69,38 @@ std::vector<FieldPath> NonShardServerProcessInterface::collectDocumentKeyFieldsA
return {"_id"}; // Nothing is sharded.
}
+boost::optional<Document> NonShardServerProcessInterface::lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
+ MakePipelineOptions opts;
+ opts.shardTargetingPolicy = ShardTargetingPolicy::kNotAllowed;
+ opts.readConcern = std::move(readConcern);
+
+ auto lookedUpDocument =
+ doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts));
+
+ // Set the speculative read timestamp appropriately after we do a document lookup locally. We
+ // set the speculative read timestamp based on the timestamp used by the transaction.
+ repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
+ repl::SpeculativeMajorityReadInfo::get(expCtx->opCtx);
+ if (speculativeMajorityReadInfo.isSpeculativeRead()) {
+ // Speculative majority reads are required to use the 'kNoOverlap' read source.
+ // Storage engine operations require at least Global IS.
+ Lock::GlobalLock lk(expCtx->opCtx, MODE_IS);
+ invariant(expCtx->opCtx->recoveryUnit()->getTimestampReadSource() ==
+ RecoveryUnit::ReadSource::kNoOverlap);
+ boost::optional<Timestamp> readTs =
+ expCtx->opCtx->recoveryUnit()->getPointInTimeReadTimestamp(expCtx->opCtx);
+ invariant(readTs);
+ speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(*readTs);
+ }
+
+ return lookedUpDocument;
+}
+
Status NonShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
diff --git a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
index 144ac86bf1a..e402d549494 100644
--- a/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h
@@ -52,7 +52,9 @@ public:
bool includeBuildUUIDs) override;
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* pipeline, bool allowTargetingShards) override;
+ Pipeline* pipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) override;
BSONObj preparePipelineAndExplain(Pipeline* ownedPipeline, ExplainOptions::Verbosity verbosity);
@@ -79,6 +81,13 @@ public:
std::vector<FieldPath> collectDocumentKeyFieldsActingAsRouter(
OperationContext*, const NamespaceString&) const final;
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final;
+
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index eb9df64242b..8eccb087f88 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -122,6 +122,21 @@ ShardServerProcessInterface::collectDocumentKeyFieldsForHostedCollection(Operati
return {{"_id"}, false};
}
+boost::optional<Document> ShardServerProcessInterface::lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
+ // We only want to retrieve the one document that corresponds to 'documentKey', so we
+ // ignore collation when computing which shard to target.
+ MakePipelineOptions opts;
+ opts.shardTargetingPolicy = ShardTargetingPolicy::kForceTargetingWithSimpleCollation;
+ opts.readConcern = std::move(readConcern);
+
+ return doLookupSingleDocument(expCtx, nss, collectionUUID, documentKey, std::move(opts));
+}
+
Status ShardServerProcessInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
@@ -381,8 +396,10 @@ void ShardServerProcessInterface::dropCollection(OperationContext* opCtx,
std::unique_ptr<Pipeline, PipelineDeleter>
ShardServerProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipeline,
- bool allowTargetingShards) {
- return sharded_agg_helpers::attachCursorToPipeline(ownedPipeline, allowTargetingShards);
+ ShardTargetingPolicy shardTargetingPolicy,
+ boost::optional<BSONObj> readConcern) {
+ return sharded_agg_helpers::attachCursorToPipeline(
+ ownedPipeline, shardTargetingPolicy, std::move(readConcern));
}
void ShardServerProcessInterface::setExpectedShardVersion(
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index f574ea938ac..b223907c792 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -71,6 +71,13 @@ public:
uasserted(50997, "Unexpected attempt to consult catalog cache on a shard server");
}
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final;
+
/**
* Inserts the documents 'objs' into the namespace 'ns' using the ClusterWriter for locking,
* routing, stale config handling, etc.
@@ -124,7 +131,9 @@ public:
* operation.
*/
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* pipeline, bool allowTargetingShards) final;
+ Pipeline* pipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) final;
void setExpectedShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
index 0595eae3fbf..a8e15223613 100644
--- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.cpp
@@ -49,8 +49,10 @@ StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipelineForLocalRe
}
std::unique_ptr<Pipeline, PipelineDeleter>
-StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipeline(Pipeline* ownedPipeline,
- bool allowTargetingShards) {
+StubLookupSingleDocumentProcessInterface::attachCursorSourceToPipeline(
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy,
+ boost::optional<BSONObj> readConcern) {
return attachCursorSourceToPipelineForLocalRead(ownedPipeline);
}
@@ -59,8 +61,7 @@ boost::optional<Document> StubLookupSingleDocumentProcessInterface::lookupSingle
const NamespaceString& nss,
UUID collectionUUID,
const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead) {
+ boost::optional<BSONObj> readConcern) {
// The namespace 'nss' may be different than the namespace on the ExpressionContext in the
// case of a change stream on a whole database so we need to make a copy of the
// ExpressionContext with the new namespace.
diff --git a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h
index 7d00c69a0f1..05fbf53349d 100644
--- a/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_lookup_single_document_process_interface.h
@@ -73,7 +73,10 @@ public:
: _mockResults(std::move(mockResults)) {}
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* ownedPipeline, bool allowTargetingShards = true) final;
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) final;
+
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
Pipeline* ownedPipeline) final;
@@ -82,8 +85,7 @@ public:
const NamespaceString& nss,
UUID collectionUUID,
const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead);
+ boost::optional<BSONObj> readConcern);
std::unique_ptr<ShardFilterer> getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const override {
diff --git a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
index a5a8a8dc99c..bf500cf70e3 100644
--- a/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/stub_mongo_process_interface.h
@@ -145,7 +145,9 @@ public:
}
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* pipeline, bool allowTargetingShards) override {
+ Pipeline* pipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) override {
MONGO_UNREACHABLE;
}
@@ -197,8 +199,7 @@ public:
const NamespaceString& nss,
UUID collectionUUID,
const Document& documentKey,
- boost::optional<BSONObj> readConcern,
- bool allowSpeculativeMajorityRead) {
+ boost::optional<BSONObj> readConcern) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 961c7bc1bcb..165daf1332c 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -130,7 +130,8 @@ RemoteCursor openChangeStreamNewShardMonitor(const boost::intrusive_ptr<Expressi
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
- BSONObj collationObj) {
+ BSONObj collationObj,
+ boost::optional<BSONObj> readConcern) {
// Serialize the variables.
// Check whether we are in a mixed-version cluster and so must use the old serialization format.
// This is only tricky in the case we are sending an aggregate from shard to shard. For this
@@ -176,6 +177,10 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
Value(static_cast<long long>(*expCtx->opCtx->getTxnNumber()));
}
+ if (readConcern) {
+ cmdForShards["readConcern"] = Value(std::move(*readConcern));
+ }
+
return cmdForShards.freeze().toBson();
}
@@ -636,7 +641,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommandRequest>
targetRequest,
- boost::optional<BSONObj> shardCursorsSortSpec) {
+ boost::optional<BSONObj> shardCursorsSortSpec,
+ ShardTargetingPolicy shardTargetingPolicy,
+ boost::optional<BSONObj> readConcern) {
auto&& [aggRequest, pipeline] = [&] {
return stdx::visit(
visit_helper::Overloaded{
@@ -670,7 +677,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
auto shardDispatchResults =
dispatchShardPipeline(aggregation_request_helper::serializeToCommandDoc(aggRequest),
hasChangeStream,
- std::move(pipeline));
+ std::move(pipeline),
+ shardTargetingPolicy,
+ std::move(readConcern));
std::vector<ShardId> targetedShards;
targetedShards.reserve(shardDispatchResults.remoteCursors.size());
@@ -827,15 +836,19 @@ BSONObj createPassthroughCommandForShard(
Document serializedCommand,
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
Pipeline* pipeline,
- BSONObj collationObj) {
+ BSONObj collationObj,
+ boost::optional<BSONObj> readConcern) {
// Create the command for the shards.
MutableDocument targetedCmd(serializedCommand);
if (pipeline) {
targetedCmd[AggregateCommandRequest::kPipelineFieldName] = Value(pipeline->serialize());
}
- auto shardCommand =
- genericTransformForShards(std::move(targetedCmd), expCtx, explainVerbosity, collationObj);
+ auto shardCommand = genericTransformForShards(std::move(targetedCmd),
+ expCtx,
+ explainVerbosity,
+ std::move(collationObj),
+ std::move(readConcern));
// Apply filter and RW concern to the final shard command.
return CommandHelpers::filterCommandRequestForPassthrough(
@@ -849,7 +862,8 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
Document serializedCommand,
const SplitPipeline& splitPipeline,
const boost::optional<ShardedExchangePolicy> exchangeSpec,
- bool needsMerge) {
+ bool needsMerge,
+ boost::optional<BSONObj> readConcern) {
// Create the command for the shards.
MutableDocument targetedCmd(serializedCommand);
// If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
@@ -880,8 +894,11 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
targetedCmd[AggregateCommandRequest::kExchangeFieldName] =
exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
- auto shardCommand = genericTransformForShards(
- std::move(targetedCmd), expCtx, expCtx->explain, expCtx->getCollatorBSON());
+ auto shardCommand = genericTransformForShards(std::move(targetedCmd),
+ expCtx,
+ expCtx->explain,
+ expCtx->getCollatorBSON(),
+ std::move(readConcern));
// Apply RW concern to the final shard command.
return applyReadWriteConcern(expCtx->opCtx,
@@ -898,7 +915,9 @@ BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionCont
DispatchShardPipelineResults dispatchShardPipeline(
Document serializedCommand,
bool hasChangeStream,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ ShardTargetingPolicy shardTargetingPolicy,
+ boost::optional<BSONObj> readConcern) {
auto expCtx = pipeline->getContext();
// The process is as follows:
@@ -930,11 +949,19 @@ DispatchShardPipelineResults dispatchShardPipeline(
? std::move(executionNsRoutingInfoStatus.getValue())
: boost::optional<ChunkManager>{};
+ // A $changeStream update lookup attempts to retrieve a single document by documentKey. In this
+ // case, we wish to target a single shard using the simple collation, but we also want to ensure
+ // that we use the collection-default collation on the shard so that the lookup can use the _id
+ // index. We therefore ignore the collation on the expCtx.
+ const auto& shardTargetingCollation =
+ shardTargetingPolicy == ShardTargetingPolicy::kForceTargetingWithSimpleCollation
+ ? CollationSpec::kSimpleSpec
+ : expCtx->getCollatorBSON();
+
// Determine whether we can run the entire aggregation on a single shard.
- const auto collationObj = expCtx->getCollatorBSON();
const bool mustRunOnAll = mustRunOnAllShards(expCtx->ns, hasChangeStream);
- std::set<ShardId> shardIds =
- getTargetedShards(expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, collationObj);
+ std::set<ShardId> shardIds = getTargetedShards(
+ expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, shardTargetingCollation);
// Don't need to split the pipeline if we are only targeting a single shard, unless:
// - There is a stage that needs to be run on the primary shard and the single target shard
@@ -962,11 +989,18 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Generate the command object for the targeted shards.
BSONObj targetedCommand =
- (splitPipelines
- ? createCommandForTargetedShards(
- expCtx, serializedCommand, *splitPipelines, exchangeSpec, true /* needsMerge */)
- : createPassthroughCommandForShard(
- expCtx, serializedCommand, expCtx->explain, pipeline.get(), collationObj));
+ (splitPipelines ? createCommandForTargetedShards(expCtx,
+ serializedCommand,
+ *splitPipelines,
+ exchangeSpec,
+ true /* needsMerge */,
+ std::move(readConcern))
+ : createPassthroughCommandForShard(expCtx,
+ serializedCommand,
+ expCtx->explain,
+ pipeline.get(),
+ expCtx->getCollatorBSON(),
+ std::move(readConcern)));
// A $changeStream pipeline must run on all shards, and will also open an extra cursor on the
// config server in order to monitor for new shards. To guarantee that we do not miss any
@@ -984,7 +1018,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
Grid::get(opCtx)->shardRegistry()->reload(opCtx);
// Rebuild the set of shards as the shard registry might have changed.
shardIds = getTargetedShards(
- expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, collationObj);
+ expCtx, mustRunOnAll, executionNsRoutingInfo, shardQuery, shardTargetingCollation);
}
// If there were no shards when we began execution, we wouldn't have run this aggregation in the
@@ -1018,7 +1052,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
ReadPreferenceSetting::get(opCtx),
Shard::RetryPolicy::kIdempotent,
shardQuery,
- collationObj);
+ shardTargetingCollation);
}
} else {
cursors = establishShardCursors(opCtx,
@@ -1253,8 +1287,10 @@ bool mustRunOnAllShards(const NamespaceString& nss, bool hasChangeStream) {
return nss.isCollectionlessAggregateNS() || hasChangeStream;
}
-std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* ownedPipeline,
- bool allowTargetingShards) {
+std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy,
+ boost::optional<BSONObj> readConcern) {
auto expCtx = ownedPipeline->getContext();
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(ownedPipeline,
PipelineDeleter(expCtx->opCtx));
@@ -1274,15 +1310,19 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne
return shardVersionRetry(
expCtx->opCtx, catalogCache, expCtx->ns, "targeting pipeline to attach cursors"_sd, [&]() {
auto pipelineToTarget = pipeline->clone();
- if (!allowTargetingShards || expCtx->ns.db() == "local" ||
- expCtx->ns.isReshardingLocalOplogBufferCollection()) {
+ if (shardTargetingPolicy == ShardTargetingPolicy::kNotAllowed ||
+ expCtx->ns.db() == "local" || expCtx->ns.isReshardingLocalOplogBufferCollection()) {
// If the db is local, this may be a change stream examining the oplog. We know the
// oplog, other local collections, and collections that store the local resharding
// oplog buffer won't be sharded.
return expCtx->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
pipelineToTarget.release());
}
- return targetShardsAndAddMergeCursors(expCtx, std::move(pipelineToTarget));
+ return targetShardsAndAddMergeCursors(expCtx,
+ std::move(pipelineToTarget),
+ boost::none,
+ shardTargetingPolicy,
+ std::move(readConcern));
});
}
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index 1114c1a1d1a..1791411cdb4 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -124,20 +124,24 @@ SplitPipeline splitPipeline(std::unique_ptr<Pipeline, PipelineDeleter> pipeline)
DispatchShardPipelineResults dispatchShardPipeline(
Document serializedCommand,
bool hasChangeStream,
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline);
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none);
BSONObj createPassthroughCommandForShard(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
Document serializedCommand,
boost::optional<ExplainOptions::Verbosity> explainVerbosity,
Pipeline* pipeline,
- BSONObj collationObj);
+ BSONObj collationObj,
+ boost::optional<BSONObj> readConcern);
BSONObj createCommandForTargetedShards(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Document serializedCommand,
const SplitPipeline& splitPipeline,
const boost::optional<ShardedExchangePolicy> exchangeSpec,
- bool needsMerge);
+ bool needsMerge,
+ boost::optional<BSONObj> readConcern = boost::none);
/**
* Creates a new DocumentSourceMergeCursors from the provided 'remoteCursors' and adds it to the
@@ -189,8 +193,10 @@ Shard::RetryPolicy getDesiredRetryPolicy(OperationContext* opCtx);
* merging half executing in this process after attaching a $mergeCursors. Will retry on network
* errors and also on StaleConfig errors to avoid restarting the entire operation.
*/
-std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* ownedPipeline,
- bool allowTargetingShards);
+std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none);
/**
* For a sharded collection, establishes remote cursors on each shard that may have results, and
@@ -206,7 +212,9 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommandRequest>
targetRequest,
- boost::optional<BSONObj> shardCursorsSortSpec = boost::none);
+ boost::optional<BSONObj> shardCursorsSortSpec = boost::none,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none);
/**
* For a sharded or unsharded collection, establishes a remote cursor on only the specified shard,
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h b/src/mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h
new file mode 100644
index 00000000000..5c142344a2a
--- /dev/null
+++ b/src/mongo/db/pipeline/sharded_agg_helpers_targeting_policy.h
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+enum class ShardTargetingPolicy {
+ kNotAllowed,
+ kAllowed,
+ kForceTargetingWithSimpleCollation,
+};
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_agg_test.cpp b/src/mongo/db/s/resharding/resharding_agg_test.cpp
index 58fb77a7cb6..adbe1a16ad6 100644
--- a/src/mongo/db/s/resharding/resharding_agg_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_agg_test.cpp
@@ -113,7 +113,9 @@ public:
: _mockResults(std::move(mockResults)) {}
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* ownedPipeline, bool allowTargetingShards = true) final {
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) final {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx));
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
index f47186f8e91..d6b063c965d 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
@@ -60,7 +60,9 @@ public:
: _mockResults(std::move(mockResults)) {}
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipeline(
- Pipeline* ownedPipeline, bool allowTargetingShards = true) final {
+ Pipeline* ownedPipeline,
+ ShardTargetingPolicy shardTargetingPolicy = ShardTargetingPolicy::kAllowed,
+ boost::optional<BSONObj> readConcern = boost::none) final {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(
ownedPipeline, PipelineDeleter(ownedPipeline->getContext()->opCtx));