summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2018-02-28 18:08:21 -0500
committerDavid Storch <david.storch@10gen.com>2018-03-09 17:20:42 -0500
commited1e2b4d2a4987e3744484f9482fdc7a0e119e94 (patch)
tree8096db9198fb62cd62e2192a38b15faf3d5100a6 /src/mongo
parentb9e20190b647fea262a8f4e154bbf18d9934a3ba (diff)
downloadmongo-ed1e2b4d2a4987e3744484f9482fdc7a0e119e94.tar.gz
SERVER-33541 Add readConcern level 'snapshot' support for aggregation.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/find_cmd.cpp16
-rw-r--r--src/mongo/db/pipeline/document_source.h33
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_auto.h3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_check_resume_token.h2
-rw-r--r--src/mongo/db/pipeline/document_source_coll_stats.h3
-rw-r--r--src/mongo/db/pipeline/document_source_current_op.h3
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp10
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.h3
-rw-r--r--src/mongo/db/pipeline/document_source_facet.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_facet_test.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_geo_near.h3
-rw-r--r--src/mongo/db/pipeline/document_source_graph_lookup.h3
-rw-r--r--src/mongo/db/pipeline/document_source_group.h3
-rw-r--r--src/mongo/db/pipeline/document_source_index_stats.h3
-rw-r--r--src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h3
-rw-r--r--src/mongo/db/pipeline/document_source_internal_split_pipeline.h3
-rw-r--r--src/mongo/db/pipeline/document_source_limit.h3
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_cursors.h3
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h3
-rw-r--r--src/mongo/db/pipeline/document_source_list_sessions.h3
-rw-r--r--src/mongo/db/pipeline/document_source_lookup.h3
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h1
-rw-r--r--src/mongo/db/pipeline/document_source_match.h1
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.h5
-rw-r--r--src/mongo/db/pipeline/document_source_mock.h3
-rw-r--r--src/mongo/db/pipeline/document_source_out.h3
-rw-r--r--src/mongo/db/pipeline/document_source_redact.h1
-rw-r--r--src/mongo/db/pipeline/document_source_sample.h3
-rw-r--r--src/mongo/db/pipeline/document_source_sample_from_random_cursor.h3
-rw-r--r--src/mongo/db/pipeline/document_source_sequential_document_cache.h3
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h3
-rw-r--r--src/mongo/db/pipeline/document_source_skip.h3
-rw-r--r--src/mongo/db/pipeline/document_source_sort.h1
-rw-r--r--src/mongo/db/pipeline/document_source_tee_consumer.h3
-rw-r--r--src/mongo/db/pipeline/document_source_unwind.h3
-rw-r--r--src/mongo/db/pipeline/pipeline.cpp39
-rw-r--r--src/mongo/db/pipeline/pipeline.h24
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp3
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp86
-rw-r--r--src/mongo/db/query/find.cpp3
-rw-r--r--src/mongo/db/query/get_executor.cpp33
-rw-r--r--src/mongo/db/query/get_executor.h10
-rw-r--r--src/mongo/db/service_entry_point_common.cpp3
-rw-r--r--src/mongo/s/query/document_source_router_adapter.h3
45 files changed, 265 insertions, 92 deletions
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 7cd41a2a6a8..9f4e12cd062 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -197,13 +197,7 @@ public:
Collection* const collection = ctx->getCollection();
// We have a parsed query. Time to get the execution plan for it.
- auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- auto yieldPolicy =
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO;
- auto statusWithPlanExecutor =
- getExecutorFind(opCtx, collection, nss, std::move(cq), yieldPolicy);
+ auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
if (!statusWithPlanExecutor.isOK()) {
return statusWithPlanExecutor.getStatus();
}
@@ -315,13 +309,7 @@ public:
Collection* const collection = ctx->getCollection();
// Get the execution plan for the query.
- auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- auto yieldPolicy =
- readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
- ? PlanExecutor::INTERRUPT_ONLY
- : PlanExecutor::YIELD_AUTO;
- auto statusWithPlanExecutor =
- getExecutorFind(opCtx, collection, nss, std::move(cq), yieldPolicy);
+ auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
if (!statusWithPlanExecutor.isOK()) {
return CommandHelpers::appendCommandStatus(result, statusWithPlanExecutor.getStatus());
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 17622862af1..f1ee7d5717c 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -187,18 +187,26 @@ public:
*/
enum class FacetRequirement { kAllowed, kNotAllowed };
+ /**
+ * Indicates whether or not this stage is legal when the read concern for the aggregate has
+ * readConcern level "snapshot" or is running inside of a multi-document transaction.
+ */
+ enum class TransactionRequirement { kNotAllowed, kAllowed };
+
StageConstraints(
StreamType streamType,
PositionRequirement requiredPosition,
HostTypeRequirement hostRequirement,
DiskUseRequirement diskRequirement,
FacetRequirement facetRequirement,
+ TransactionRequirement transactionRequirement,
ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist)
: requiredPosition(requiredPosition),
hostRequirement(hostRequirement),
diskRequirement(diskRequirement),
changeStreamRequirement(changeStreamRequirement),
facetRequirement(facetRequirement),
+ transactionRequirement(transactionRequirement),
streamType(streamType) {
// Stages which are allowed to run in $facet must not have any position requirements.
invariant(
@@ -219,6 +227,18 @@ public:
// A stage which is whitelisted for $changeStream cannot have a position requirement.
invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
requiredPosition != PositionRequirement::kNone));
+
+ // Change stream stages should not be permitted with readConcern level "snapshot" or
+ // inside of a multi-document transaction.
+ if (isChangeStreamStage()) {
+ invariant(!isAllowedInTransaction());
+ }
+
+ // Stages which write data to user collections should not be permitted with readConcern
+ // level "snapshot" or inside of a multi-document transaction.
+ if (diskRequirement == DiskUseRequirement::kWritesPersistentData) {
+ invariant(!isAllowedInTransaction());
+ }
}
/**
@@ -262,6 +282,14 @@ public:
return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage;
}
+ /**
+ * Returns true if this stage is legal when the readConcern level is "snapshot" or when this
+ * aggregation is being run within a multi-document transaction.
+ */
+ bool isAllowedInTransaction() const {
+ return transactionRequirement == TransactionRequirement::kAllowed;
+ }
+
// Indicates whether this stage needs to be at a particular position in the pipeline.
const PositionRequirement requiredPosition;
@@ -280,6 +308,10 @@ public:
// Indicates whether this stage may run inside a $facet stage.
const FacetRequirement facetRequirement;
+ // Indicates whether this stage is legal when the readConcern level is "snapshot" or the
+ // aggregate is running inside of a multi-document transaction.
+ const TransactionRequirement transactionRequirement;
+
// Indicates whether this is a streaming or blocking stage.
const StreamType streamType;
@@ -309,6 +341,7 @@ public:
using DiskUseRequirement = StageConstraints::DiskUseRequirement;
using FacetRequirement = StageConstraints::FacetRequirement;
using StreamType = StageConstraints::StreamType;
+ using TransactionRequirement = StageConstraints::TransactionRequirement;
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a
diff --git a/src/mongo/db/pipeline/document_source_bucket_auto.h b/src/mongo/db/pipeline/document_source_bucket_auto.h
index 77fd0173d9c..a5b041d741e 100644
--- a/src/mongo/db/pipeline/document_source_bucket_auto.h
+++ b/src/mongo/db/pipeline/document_source_bucket_auto.h
@@ -53,7 +53,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kWritesTmpData,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 6f4ac3ffc57..f4cee26c9c2 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -106,6 +106,7 @@ DocumentSource::StageConstraints DocumentSourceOplogMatch::constraints(
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
ChangeStreamRequirement::kChangeStreamStage};
}
@@ -157,6 +158,7 @@ public:
: HostTypeRequirement::kMongoS),
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
ChangeStreamRequirement::kChangeStreamStage};
}
diff --git a/src/mongo/db/pipeline/document_source_check_resume_token.h b/src/mongo/db/pipeline/document_source_check_resume_token.h
index a0d5b643d96..3401f9e7ba1 100644
--- a/src/mongo/db/pipeline/document_source_check_resume_token.h
+++ b/src/mongo/db/pipeline/document_source_check_resume_token.h
@@ -66,6 +66,7 @@ public:
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
ChangeStreamRequirement::kChangeStreamStage};
}
@@ -104,6 +105,7 @@ public:
: HostTypeRequirement::kMongoS),
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
ChangeStreamRequirement::kChangeStreamStage};
}
diff --git a/src/mongo/db/pipeline/document_source_coll_stats.h b/src/mongo/db/pipeline/document_source_coll_stats.h
index 5b7bf719a14..17cd974cb60 100644
--- a/src/mongo/db/pipeline/document_source_coll_stats.h
+++ b/src/mongo/db/pipeline/document_source_coll_stats.h
@@ -79,7 +79,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_current_op.h b/src/mongo/db/pipeline/document_source_current_op.h
index 14e95983f70..cb180a99085 100644
--- a/src/mongo/db/pipeline/document_source_current_op.h
+++ b/src/mongo/db/pipeline/document_source_current_op.h
@@ -83,7 +83,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed);
constraints.isIndependentOfAnyCollection = true;
constraints.requiresInputDocSource = false;
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index b090f195bbe..10d049cef28 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -210,8 +210,9 @@ Value DocumentSourceCursor::serialize(boost::optional<ExplainOptions::Verbosity>
{
auto opCtx = pExpCtx->opCtx;
- AutoGetDb dbLock(opCtx, _exec->nss().db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), MODE_IS);
+ auto lockMode = getLockModeForQuery(opCtx);
+ AutoGetDb dbLock(opCtx, _exec->nss().db(), lockMode);
+ Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), lockMode);
auto collection =
dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr;
@@ -266,8 +267,9 @@ void DocumentSourceCursor::cleanupExecutor() {
// already have been marked as killed when the collection was dropped, and we won't need to
// access the CursorManager to properly dispose of it.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
- AutoGetDb dbLock(opCtx, _exec->nss().db(), MODE_IS);
- Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), MODE_IS);
+ auto lockMode = getLockModeForQuery(opCtx);
+ AutoGetDb dbLock(opCtx, _exec->nss().db(), lockMode);
+ Lock::CollectionLock collLock(opCtx->lockState(), _exec->nss().ns(), lockMode);
auto collection = dbLock.getDb() ? dbLock.getDb()->getCollection(opCtx, _exec->nss()) : nullptr;
auto cursorManager = collection ? collection->getCursorManager() : nullptr;
_exec->dispose(opCtx, cursorManager);
diff --git a/src/mongo/db/pipeline/document_source_cursor.h b/src/mongo/db/pipeline/document_source_cursor.h
index 6f4e4a1fc01..fc9b37f9403 100644
--- a/src/mongo/db/pipeline/document_source_cursor.h
+++ b/src/mongo/db/pipeline/document_source_cursor.h
@@ -58,7 +58,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_facet.cpp b/src/mongo/db/pipeline/document_source_facet.cpp
index 050ffa9f331..bc49ab6371f 100644
--- a/src/mongo/db/pipeline/document_source_facet.cpp
+++ b/src/mongo/db/pipeline/document_source_facet.cpp
@@ -263,7 +263,8 @@ DocumentSource::StageConstraints DocumentSourceFacet::constraints(
PositionRequirement::kNone,
host,
mayUseDisk ? DiskUseRequirement::kWritesTmpData : DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed};
}
DocumentSource::GetDepsReturn DocumentSourceFacet::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/document_source_facet_test.cpp b/src/mongo/db/pipeline/document_source_facet_test.cpp
index c706a5ca31e..25f8cdc7a07 100644
--- a/src/mongo/db/pipeline/document_source_facet_test.cpp
+++ b/src/mongo/db/pipeline/document_source_facet_test.cpp
@@ -216,7 +216,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
DocumentSource::GetNextResult getNext() final {
@@ -671,7 +672,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kPrimaryShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
static boost::intrusive_ptr<DocumentSourceNeedsPrimaryShard> create() {
diff --git a/src/mongo/db/pipeline/document_source_geo_near.h b/src/mongo/db/pipeline/document_source_geo_near.h
index 376e4a03ea0..f5a7f0cf67d 100644
--- a/src/mongo/db/pipeline/document_source_geo_near.h
+++ b/src/mongo/db/pipeline/document_source_geo_near.h
@@ -54,7 +54,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_graph_lookup.h b/src/mongo/db/pipeline/document_source_graph_lookup.h
index 9b5882e3f52..527c71a21c2 100644
--- a/src/mongo/db/pipeline/document_source_graph_lookup.h
+++ b/src/mongo/db/pipeline/document_source_graph_lookup.h
@@ -58,7 +58,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kPrimaryShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed);
constraints.canSwapWithMatch = true;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_group.h b/src/mongo/db/pipeline/document_source_group.h
index 0b7120c3fc0..af262fd739e 100644
--- a/src/mongo/db/pipeline/document_source_group.h
+++ b/src/mongo/db/pipeline/document_source_group.h
@@ -74,7 +74,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kWritesTmpData,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
/**
diff --git a/src/mongo/db/pipeline/document_source_index_stats.h b/src/mongo/db/pipeline/document_source_index_stats.h
index b485db6d748..68ebc5ecfd1 100644
--- a/src/mongo/db/pipeline/document_source_index_stats.h
+++ b/src/mongo/db/pipeline/document_source_index_stats.h
@@ -74,7 +74,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
index 806779641af..9fbf57fe8cc 100644
--- a/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
+++ b/src/mongo/db/pipeline/document_source_internal_inhibit_optimization.h
@@ -57,7 +57,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
index d2f0e89754a..bf93c58c100 100644
--- a/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
+++ b/src/mongo/db/pipeline/document_source_internal_split_pipeline.h
@@ -71,7 +71,8 @@ public:
PositionRequirement::kNone,
_mergeType,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_limit.h b/src/mongo/db/pipeline/document_source_limit.h
index f58321289d8..3c8c21c2511 100644
--- a/src/mongo/db/pipeline/document_source_limit.h
+++ b/src/mongo/db/pipeline/document_source_limit.h
@@ -53,7 +53,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_list_local_cursors.h b/src/mongo/db/pipeline/document_source_list_local_cursors.h
index 4746c4933e7..7bd9e3ffed6 100644
--- a/src/mongo/db/pipeline/document_source_list_local_cursors.h
+++ b/src/mongo/db/pipeline/document_source_list_local_cursors.h
@@ -87,7 +87,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed);
constraints.isIndependentOfAnyCollection = true;
constraints.requiresInputDocSource = false;
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
index 9674feb6e66..e6e87e0c75c 100644
--- a/src/mongo/db/pipeline/document_source_list_local_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -98,7 +98,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kLocalOnly,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed);
constraints.isIndependentOfAnyCollection = true;
constraints.requiresInputDocSource = false;
diff --git a/src/mongo/db/pipeline/document_source_list_sessions.h b/src/mongo/db/pipeline/document_source_list_sessions.h
index 57ab2e2eb5d..32db9a6157c 100644
--- a/src/mongo/db/pipeline/document_source_list_sessions.h
+++ b/src/mongo/db/pipeline/document_source_list_sessions.h
@@ -90,7 +90,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed};
}
static boost::intrusive_ptr<DocumentSource> createFromBson(
diff --git a/src/mongo/db/pipeline/document_source_lookup.h b/src/mongo/db/pipeline/document_source_lookup.h
index 530c62f985c..c9f8872f580 100644
--- a/src/mongo/db/pipeline/document_source_lookup.h
+++ b/src/mongo/db/pipeline/document_source_lookup.h
@@ -110,7 +110,8 @@ public:
HostTypeRequirement::kPrimaryShard,
mayUseDisk ? DiskUseRequirement::kWritesTmpData
: DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed);
constraints.canSwapWithMatch = true;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index 416088fb742..8f377b46986 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -70,6 +70,7 @@ public:
: HostTypeRequirement::kMongoS,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed,
ChangeStreamRequirement::kChangeStreamStage);
constraints.canSwapWithMatch = true;
diff --git a/src/mongo/db/pipeline/document_source_match.h b/src/mongo/db/pipeline/document_source_match.h
index 4614ee5d50b..4deb71b7118 100644
--- a/src/mongo/db/pipeline/document_source_match.h
+++ b/src/mongo/db/pipeline/document_source_match.h
@@ -56,6 +56,7 @@ public:
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed,
ChangeStreamRequirement::kWhitelist};
}
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.h b/src/mongo/db/pipeline/document_source_merge_cursors.h
index e23772f411b..cb74f4bd9ef 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.h
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.h
@@ -86,7 +86,10 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ // TODO SERVER-33683: Permit $mergeCursors with readConcern
+ // level "snapshot".
+ TransactionRequirement::kNotAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_mock.h b/src/mongo/db/pipeline/document_source_mock.h
index 70f8bbc2d8e..cc81d533fb2 100644
--- a/src/mongo/db/pipeline/document_source_mock.h
+++ b/src/mongo/db/pipeline/document_source_mock.h
@@ -54,7 +54,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 26e49e9f54c..ad6659f42b0 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -49,7 +49,8 @@ public:
PositionRequirement::kLast,
HostTypeRequirement::kPrimaryShard,
DiskUseRequirement::kWritesPersistentData,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kNotAllowed};
}
// Virtuals for SplittableDocumentSource
diff --git a/src/mongo/db/pipeline/document_source_redact.h b/src/mongo/db/pipeline/document_source_redact.h
index c6c83794aec..af21e236d17 100644
--- a/src/mongo/db/pipeline/document_source_redact.h
+++ b/src/mongo/db/pipeline/document_source_redact.h
@@ -46,6 +46,7 @@ public:
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed,
ChangeStreamRequirement::kWhitelist};
}
diff --git a/src/mongo/db/pipeline/document_source_sample.h b/src/mongo/db/pipeline/document_source_sample.h
index deb3e3b033e..1ac496305ee 100644
--- a/src/mongo/db/pipeline/document_source_sample.h
+++ b/src/mongo/db/pipeline/document_source_sample.h
@@ -48,7 +48,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kWritesTmpData,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
GetDepsReturn getDependencies(DepsTracker* deps) const final {
diff --git a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
index 9dc724dbcfb..bf7f75252ed 100644
--- a/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
+++ b/src/mongo/db/pipeline/document_source_sample_from_random_cursor.h
@@ -49,7 +49,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed};
}
static boost::intrusive_ptr<DocumentSourceSampleFromRandomCursor> create(
diff --git a/src/mongo/db/pipeline/document_source_sequential_document_cache.h b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
index 48a98a5b0c3..102731d4a32 100644
--- a/src/mongo/db/pipeline/document_source_sequential_document_cache.h
+++ b/src/mongo/db/pipeline/document_source_sequential_document_cache.h
@@ -55,7 +55,8 @@ public:
: PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed);
constraints.requiresInputDocSource = (_cache->isBuilding());
return constraints;
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 22689123099..bffa484c46b 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -114,6 +114,9 @@ public:
? FacetRequirement::kNotAllowed
: FacetRequirement::kAllowed),
(getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
+ ? TransactionRequirement::kNotAllowed
+ : TransactionRequirement::kAllowed),
+ (getType() == TransformerInterface::TransformerType::kChangeStreamTransformation
? ChangeStreamRequirement::kChangeStreamStage
: ChangeStreamRequirement::kWhitelist));
diff --git a/src/mongo/db/pipeline/document_source_skip.h b/src/mongo/db/pipeline/document_source_skip.h
index cc3bca48d1b..760e887f8b4 100644
--- a/src/mongo/db/pipeline/document_source_skip.h
+++ b/src/mongo/db/pipeline/document_source_skip.h
@@ -55,7 +55,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_sort.h b/src/mongo/db/pipeline/document_source_sort.h
index a9b2ed14c2f..d2ad1cd9ec4 100644
--- a/src/mongo/db/pipeline/document_source_sort.h
+++ b/src/mongo/db/pipeline/document_source_sort.h
@@ -69,6 +69,7 @@ public:
HostTypeRequirement::kNone,
_mergingPresorted ? DiskUseRequirement::kNoDiskUse : DiskUseRequirement::kWritesTmpData,
_mergingPresorted ? FacetRequirement::kNotAllowed : FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed,
_mergingPresorted ? ChangeStreamRequirement::kWhitelist
: ChangeStreamRequirement::kBlacklist);
diff --git a/src/mongo/db/pipeline/document_source_tee_consumer.h b/src/mongo/db/pipeline/document_source_tee_consumer.h
index 397c5d28128..75b33ea8415 100644
--- a/src/mongo/db/pipeline/document_source_tee_consumer.h
+++ b/src/mongo/db/pipeline/document_source_tee_consumer.h
@@ -58,7 +58,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
GetNextResult getNext() final;
diff --git a/src/mongo/db/pipeline/document_source_unwind.h b/src/mongo/db/pipeline/document_source_unwind.h
index a2c68f9c8e0..763d0523642 100644
--- a/src/mongo/db/pipeline/document_source_unwind.h
+++ b/src/mongo/db/pipeline/document_source_unwind.h
@@ -51,7 +51,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed);
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed);
constraints.canSwapWithMatch = true;
return constraints;
diff --git a/src/mongo/db/pipeline/pipeline.cpp b/src/mongo/db/pipeline/pipeline.cpp
index 0eb286bca28..fafcd0d910f 100644
--- a/src/mongo/db/pipeline/pipeline.cpp
+++ b/src/mongo/db/pipeline/pipeline.cpp
@@ -120,11 +120,7 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelO
std::unique_ptr<Pipeline, PipelineDeleter> pipeline(new Pipeline(std::move(stages), expCtx),
PipelineDeleter(expCtx->opCtx));
try {
- if (isFacetPipeline) {
- pipeline->validateFacetPipeline();
- } else {
- pipeline->validatePipeline();
- }
+ pipeline->validate(isFacetPipeline);
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -133,7 +129,17 @@ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> Pipeline::createTopLevelO
return std::move(pipeline);
}
-void Pipeline::validatePipeline() const {
+void Pipeline::validate(bool isFacetPipeline) const {
+ if (isFacetPipeline) {
+ validateFacetPipeline();
+ } else {
+ validateTopLevelPipeline();
+ }
+
+ validateCommon();
+}
+
+void Pipeline::validateTopLevelPipeline() const {
// Verify that the specified namespace is valid for the initial stage of this pipeline.
const NamespaceString& nss = pCtx->ns;
@@ -173,9 +179,6 @@ void Pipeline::validatePipeline() const {
}
}
}
-
- // Verify that each stage is in a legal position within the pipeline.
- ensureAllStagesAreInLegalPositions();
}
void Pipeline::validateFacetPipeline() const {
@@ -194,14 +197,12 @@ void Pipeline::validateFacetPipeline() const {
invariant(stageConstraints.requiredPosition == PositionRequirement::kNone);
invariant(!stageConstraints.isIndependentOfAnyCollection);
}
-
- // Facet pipelines cannot have any stages which are initial sources. We've already validated the
- // first stage, and the 'ensureAllStagesAreInLegalPositions' method checks that there are no
- // initial sources in positions 1...N, so we can just return its result directly.
- ensureAllStagesAreInLegalPositions();
}
-void Pipeline::ensureAllStagesAreInLegalPositions() const {
+void Pipeline::validateCommon() const {
+ // TODO SERVER-33551: Don't use presence of WUOW to decide whether we are in a snapshot read or
+ // multi-doc transaction.
+ const bool isSnapshotReadOrTxn = static_cast<bool>(pCtx->opCtx->getWriteUnitOfWork());
size_t i = 0;
for (auto&& stage : _sources) {
auto constraints = stage->constraints(_splitState);
@@ -229,6 +230,14 @@ void Pipeline::ensureAllStagesAreInLegalPositions() const {
uassert(40644,
str::stream() << stage->getSourceName() << " can only be run on mongoS",
!(constraints.hostRequirement == HostTypeRequirement::kMongoS && !pCtx->inMongos));
+
+ if (isSnapshotReadOrTxn) {
+ uassert(50742,
+ str::stream() << "Stage not supported with readConcern level \"snapshot\" "
+ "or inside of a multi-document transaction: "
+ << stage->getSourceName(),
+ constraints.isAllowedInTransaction());
+ }
}
}
diff --git a/src/mongo/db/pipeline/pipeline.h b/src/mongo/db/pipeline/pipeline.h
index 6aba6d38d66..bcb7ee64521 100644
--- a/src/mongo/db/pipeline/pipeline.h
+++ b/src/mongo/db/pipeline/pipeline.h
@@ -338,21 +338,29 @@ private:
* is present then it must come last in the pipeline, while initial stages such as $indexStats
* must be at the start.
*/
- void validatePipeline() const;
+ void validate(bool isFacetPipeline) const;
/**
- * Throws if the $facet pipeline fails any of a set of semantic checks. For example, the
- * pipeline cannot be empty and may not contain any initial stages.
+ * Performs validation checking specific to top-level pipelines. Throws if the pipeline is
+ * invalid.
+ */
+ void validateTopLevelPipeline() const;
+
+ /**
+ * Performs validation checking specific to nested $facet pipelines. Throws if the pipeline is
+ * invalid.
*/
void validateFacetPipeline() const;
/**
- * Helper method which validates that each stage in pipeline is in a legal position. For
- * example, $out must be at the end, while a $match stage with a text query must be at the
- * start. Note that this method accepts an initial source as the first stage, which is illegal
- * for $facet pipelines.
+ * Performs common validation for top-level or facet pipelines. Throws if the pipeline is
+ * invalid.
+ *
+ * Includes checking for illegal stage positioning. For example, $out must be at the end, while
+ * a $match stage with a text query must be at the start. Note that this method accepts an
+ * initial source as the first stage, which is illegal for $facet pipelines.
*/
- void ensureAllStagesAreInLegalPositions() const;
+ void validateCommon() const;
/**
* Returns Status::OK if the pipeline can run on mongoS, or an error with a message explaining
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index b0718930f2c..ae130fa7595 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -214,8 +214,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
return {cq.getStatus()};
}
- return getExecutorFind(
- opCtx, collection, nss, std::move(cq.getValue()), PlanExecutor::YIELD_AUTO, plannerOpts);
+ return getExecutorFind(opCtx, collection, nss, std::move(cq.getValue()), plannerOpts);
}
BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) {
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 305e0f20d0f..19df0485811 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -2181,7 +2181,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kMongoS,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed};
}
static boost::intrusive_ptr<DocumentSourceMustRunOnMongoS> create() {
@@ -2337,9 +2338,10 @@ TEST(PipelineInitialSource, MatchInitialQuery) {
ASSERT_BSONOBJ_EQ(pipe->getInitialQuery(), BSON("a" << 4));
}
-namespace Namespaces {
+// Contains test cases for validation done on pipeline creation.
+namespace pipeline_validate {
-using PipelineInitialSourceNSTest = AggregationContextFixture;
+using PipelineValidateTest = AggregationContextFixture;
class DocumentSourceCollectionlessMock : public DocumentSourceMock {
public:
@@ -2350,7 +2352,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed);
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed);
constraints.isIndependentOfAnyCollection = true;
constraints.requiresInputDocSource = false;
return constraints;
@@ -2361,7 +2364,7 @@ public:
}
};
-TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidForEmptyPipeline) {
+TEST_F(PipelineValidateTest, AggregateOneNSNotValidForEmptyPipeline) {
const std::vector<BSONObj> rawPipeline = {};
auto ctx = getExpCtx();
@@ -2370,7 +2373,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidForEmptyPipeline) {
ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
}
-TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) {
+TEST_F(PipelineValidateTest, AggregateOneNSNotValidIfInitialStageRequiresCollection) {
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {}}")};
auto ctx = getExpCtx();
@@ -2379,7 +2382,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSNotValidIfInitialStageRequires
ASSERT_NOT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
}
-TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidIfInitialStageIsCollectionless) {
+TEST_F(PipelineValidateTest, AggregateOneNSValidIfInitialStageIsCollectionless) {
auto collectionlessSource = DocumentSourceCollectionlessMock::create();
auto ctx = getExpCtx();
@@ -2388,7 +2391,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidIfInitialStageIsCollectio
ASSERT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
}
-TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollectionless) {
+TEST_F(PipelineValidateTest, CollectionNSNotValidIfInitialStageIsCollectionless) {
auto collectionlessSource = DocumentSourceCollectionlessMock::create();
auto ctx = getExpCtx();
@@ -2397,7 +2400,7 @@ TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollecti
ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
}
-TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardlessOfInitialStage) {
+TEST_F(PipelineValidateTest, AggregateOneNSValidForFacetPipelineRegardlessOfInitialStage) {
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {}}")};
auto ctx = getExpCtx();
@@ -2406,7 +2409,7 @@ TEST_F(PipelineInitialSourceNSTest, AggregateOneNSValidForFacetPipelineRegardles
ASSERT_OK(Pipeline::parseFacetPipeline(rawPipeline, ctx).getStatus());
}
-TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsValidAsFirstStage) {
+TEST_F(PipelineValidateTest, ChangeStreamIsValidAsFirstStage) {
const std::vector<BSONObj> rawPipeline = {fromjson("{$changeStream: {}}")};
auto ctx = getExpCtx();
setMockReplicationCoordinatorOnOpCtx(ctx->opCtx);
@@ -2414,7 +2417,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsValidAsFirstStage) {
ASSERT_OK(Pipeline::parse(rawPipeline, ctx).getStatus());
}
-TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStage) {
+TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStage) {
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
fromjson("{$changeStream: {}}")};
auto ctx = getExpCtx();
@@ -2424,7 +2427,7 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStage) {
ASSERT_EQ(parseStatus, ErrorCodes::duplicateCodeForTest(40602));
}
-TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStageInFacet) {
+TEST_F(PipelineValidateTest, ChangeStreamIsNotValidIfNotFirstStageInFacet) {
const std::vector<BSONObj> rawPipeline = {fromjson("{$match: {custom: 'filter'}}"),
fromjson("{$changeStream: {}}")};
auto ctx = getExpCtx();
@@ -2435,7 +2438,61 @@ TEST_F(PipelineInitialSourceNSTest, ChangeStreamIsNotValidIfNotFirstStageInFacet
ASSERT(std::string::npos != parseStatus.reason().find("$changeStream"));
}
-} // namespace Namespaces
+class DocumentSourceDisallowedWithSnapshotReads : public DocumentSourceMock {
+public:
+ DocumentSourceDisallowedWithSnapshotReads() : DocumentSourceMock({}) {}
+
+ StageConstraints constraints(Pipeline::SplitState pipeState) const final {
+ return StageConstraints{StreamType::kStreaming,
+ PositionRequirement::kNone,
+ HostTypeRequirement::kNone,
+ DiskUseRequirement::kNoDiskUse,
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kNotAllowed};
+ }
+
+ static boost::intrusive_ptr<DocumentSourceDisallowedWithSnapshotReads> create() {
+ return new DocumentSourceDisallowedWithSnapshotReads();
+ }
+};
+
+TEST_F(PipelineValidateTest, TopLevelPipelineValidatedForStagesIllegalWithSnapshotReads) {
+ BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level"
+ << "snapshot"));
+ auto ctx = getExpCtx();
+ auto&& readConcernArgs = repl::ReadConcernArgs::get(ctx->opCtx);
+ ASSERT_OK(readConcernArgs.initialize(readConcernSnapshot["readConcern"]));
+ ASSERT(readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern);
+ ctx->opCtx->setWriteUnitOfWork(stdx::make_unique<WriteUnitOfWork>(ctx->opCtx));
+
+ // Make a pipeline with a legal $match, and then an illegal mock stage, and verify that pipeline
+ // creation fails with the expected error code.
+ auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx);
+ auto illegalStage = DocumentSourceDisallowedWithSnapshotReads::create();
+ auto pipeline = Pipeline::create({matchStage, illegalStage}, ctx);
+ ASSERT_NOT_OK(pipeline.getStatus());
+ ASSERT_EQ(pipeline.getStatus(), ErrorCodes::duplicateCodeForTest(50742));
+}
+
+TEST_F(PipelineValidateTest, FacetPipelineValidatedForStagesIllegalWithSnapshotReads) {
+ BSONObj readConcernSnapshot = BSON("readConcern" << BSON("level"
+ << "snapshot"));
+ auto ctx = getExpCtx();
+ auto&& readConcernArgs = repl::ReadConcernArgs::get(ctx->opCtx);
+ ASSERT_OK(readConcernArgs.initialize(readConcernSnapshot["readConcern"]));
+ ASSERT(readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern);
+ ctx->opCtx->setWriteUnitOfWork(stdx::make_unique<WriteUnitOfWork>(ctx->opCtx));
+
+ // Make a pipeline with a legal $match, and then an illegal mock stage, and verify that pipeline
+ // creation fails with the expected error code.
+ auto matchStage = DocumentSourceMatch::create(BSON("_id" << 3), ctx);
+ auto illegalStage = DocumentSourceDisallowedWithSnapshotReads::create();
+ auto pipeline = Pipeline::createFacetPipeline({matchStage, illegalStage}, ctx);
+ ASSERT_NOT_OK(pipeline.getStatus());
+ ASSERT_EQ(pipeline.getStatus(), ErrorCodes::duplicateCodeForTest(50742));
+}
+
+} // namespace pipeline_validate
namespace Dependencies {
@@ -2468,7 +2525,8 @@ public:
PositionRequirement::kNone,
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kAllowed};
+ FacetRequirement::kAllowed,
+ TransactionRequirement::kAllowed};
}
};
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index cf2a8fcfe36..85370429156 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -572,8 +572,7 @@ std::string runQuery(OperationContext* opCtx,
}
// We have a parsed query. Time to get the execution plan for it.
- auto exec = uassertStatusOK(
- getExecutorFind(opCtx, collection, nss, std::move(cq), PlanExecutor::YIELD_AUTO));
+ auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, nss, std::move(cq)));
const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest();
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index 111d2884174..4074dab4ca4 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -641,9 +641,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
opCtx, std::move(ws), std::move(cs), std::move(cq), collection, PlanExecutor::YIELD_AUTO);
}
-} // namespace
-
-StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
+StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
OperationContext* opCtx,
Collection* collection,
const NamespaceString& nss,
@@ -665,6 +663,35 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
return getExecutor(opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions);
}
+} // namespace
+
+StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
+ OperationContext* opCtx,
+ Collection* collection,
+ const NamespaceString& nss,
+ unique_ptr<CanonicalQuery> canonicalQuery,
+ size_t plannerOptions) {
+ auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ auto yieldPolicy = readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
+ ? PlanExecutor::INTERRUPT_ONLY
+ : PlanExecutor::YIELD_AUTO;
+ return _getExecutorFind(
+ opCtx, collection, nss, std::move(canonicalQuery), yieldPolicy, plannerOptions);
+}
+
+StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind(
+ OperationContext* opCtx,
+ Collection* collection,
+ const NamespaceString& nss,
+ std::unique_ptr<CanonicalQuery> canonicalQuery) {
+ return _getExecutorFind(opCtx,
+ collection,
+ nss,
+ std::move(canonicalQuery),
+ PlanExecutor::YIELD_AUTO,
+ QueryPlannerParams::DEFAULT);
+}
+
namespace {
/**
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index 34857084bf9..6a801c1a81b 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -91,10 +91,18 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
Collection* collection,
const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery,
- PlanExecutor::YieldPolicy yieldPolicy,
size_t plannerOptions = QueryPlannerParams::DEFAULT);
/**
+ * Returns a plan executor for a legacy OP_QUERY find.
+ */
+StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind(
+ OperationContext* opCtx,
+ Collection* collection,
+ const NamespaceString& nss,
+ std::unique_ptr<CanonicalQuery> canonicalQuery);
+
+/**
* If possible, turn the provided QuerySolution into a QuerySolution that uses a DistinctNode
* to provide results for the distinct command.
*
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 08b766b01e7..99b54a425c8 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -129,9 +129,10 @@ const StringMap<int> sessionCheckoutWhitelist = {{"aggregate", 1},
// The command names for which readConcern level snapshot is allowed. The getMore command is
// implicitly allowed to operate on a cursor which was opened under readConcern level snapshot.
-const StringMap<int> readConcernSnapshotWhitelist = {{"find", 1},
+const StringMap<int> readConcernSnapshotWhitelist = {{"aggregate", 1},
{"count", 1},
{"delete", 1},
+ {"find", 1},
{"geoSearch", 1},
{"insert", 1},
{"parallelCollectionScan", 1},
diff --git a/src/mongo/s/query/document_source_router_adapter.h b/src/mongo/s/query/document_source_router_adapter.h
index 1520713edd5..5c1a6a0935c 100644
--- a/src/mongo/s/query/document_source_router_adapter.h
+++ b/src/mongo/s/query/document_source_router_adapter.h
@@ -49,7 +49,8 @@ public:
PositionRequirement::kFirst,
HostTypeRequirement::kMongoS,
DiskUseRequirement::kNoDiskUse,
- FacetRequirement::kNotAllowed};
+ FacetRequirement::kNotAllowed,
+ TransactionRequirement::kAllowed};
}
GetNextResult getNext() final;