summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source.h
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2018-02-28 18:08:21 -0500
committerDavid Storch <david.storch@10gen.com>2018-03-09 17:20:42 -0500
commited1e2b4d2a4987e3744484f9482fdc7a0e119e94 (patch)
tree8096db9198fb62cd62e2192a38b15faf3d5100a6 /src/mongo/db/pipeline/document_source.h
parentb9e20190b647fea262a8f4e154bbf18d9934a3ba (diff)
downloadmongo-ed1e2b4d2a4987e3744484f9482fdc7a0e119e94.tar.gz
SERVER-33541 Add readConcern level 'snapshot' support for aggregation.
Diffstat (limited to 'src/mongo/db/pipeline/document_source.h')
-rw-r--r--src/mongo/db/pipeline/document_source.h33
1 files changed, 33 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 17622862af1..f1ee7d5717c 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -187,18 +187,26 @@ public:
*/
enum class FacetRequirement { kAllowed, kNotAllowed };
+ /**
+ * Indicates whether or not this stage is legal when the read concern for the aggregate has
+ * readConcern level "snapshot" or is running inside of a multi-document transaction.
+ */
+ enum class TransactionRequirement { kNotAllowed, kAllowed };
+
StageConstraints(
StreamType streamType,
PositionRequirement requiredPosition,
HostTypeRequirement hostRequirement,
DiskUseRequirement diskRequirement,
FacetRequirement facetRequirement,
+ TransactionRequirement transactionRequirement,
ChangeStreamRequirement changeStreamRequirement = ChangeStreamRequirement::kBlacklist)
: requiredPosition(requiredPosition),
hostRequirement(hostRequirement),
diskRequirement(diskRequirement),
changeStreamRequirement(changeStreamRequirement),
facetRequirement(facetRequirement),
+ transactionRequirement(transactionRequirement),
streamType(streamType) {
// Stages which are allowed to run in $facet must not have any position requirements.
invariant(
@@ -219,6 +227,18 @@ public:
// A stage which is whitelisted for $changeStream cannot have a position requirement.
invariant(!(changeStreamRequirement == ChangeStreamRequirement::kWhitelist &&
requiredPosition != PositionRequirement::kNone));
+
+ // Change stream stages should not be permitted with readConcern level "snapshot" or
+ // inside of a multi-document transaction.
+ if (isChangeStreamStage()) {
+ invariant(!isAllowedInTransaction());
+ }
+
+ // Stages which write data to user collections should not be permitted with readConcern
+ // level "snapshot" or inside of a multi-document transaction.
+ if (diskRequirement == DiskUseRequirement::kWritesPersistentData) {
+ invariant(!isAllowedInTransaction());
+ }
}
/**
@@ -262,6 +282,14 @@ public:
return changeStreamRequirement == ChangeStreamRequirement::kChangeStreamStage;
}
+ /**
+ * Returns true if this stage is legal when the readConcern level is "snapshot" or when this
+ * aggregation is being run within a multi-document transaction.
+ */
+ bool isAllowedInTransaction() const {
+ return transactionRequirement == TransactionRequirement::kAllowed;
+ }
+
// Indicates whether this stage needs to be at a particular position in the pipeline.
const PositionRequirement requiredPosition;
@@ -280,6 +308,10 @@ public:
// Indicates whether this stage may run inside a $facet stage.
const FacetRequirement facetRequirement;
+ // Indicates whether this stage is legal when the readConcern level is "snapshot" or the
+ // aggregate is running inside of a multi-document transaction.
+ const TransactionRequirement transactionRequirement;
+
// Indicates whether this is a streaming or blocking stage.
const StreamType streamType;
@@ -309,6 +341,7 @@ public:
using DiskUseRequirement = StageConstraints::DiskUseRequirement;
using FacetRequirement = StageConstraints::FacetRequirement;
using StreamType = StageConstraints::StreamType;
+ using TransactionRequirement = StageConstraints::TransactionRequirement;
/**
* This is what is returned from the main DocumentSource API: getNext(). It is essentially a