summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-06-27 13:29:02 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-08-01 17:16:14 -0400
commitad30a49a33b8773cbc07388bb257d605cbd6aa12 (patch)
tree3707869546caa93ed42efd29cc8404c8f36f9e4a /src/mongo/db
parent2431e1356823d898ef8af16997d6f63b65b385a5 (diff)
downloadmongo-ad30a49a33b8773cbc07388bb257d605cbd6aa12.tar.gz
SERVER-29135 Add post-image lookup to $changeNotification
This patch only adds support on an unsharded collection.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/SConscript20
-rw-r--r--src/mongo/db/pipeline/document_source.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source.h6
-rw-r--r--src/mongo/db/pipeline/document_source_bucket.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_bucket.h2
-rw-r--r--src/mongo/db/pipeline/document_source_bucket_test.cpp17
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp197
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.h48
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp166
-rw-r--r--src/mongo/db/pipeline/document_source_count.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_count.h2
-rw-r--r--src/mongo/db/pipeline/document_source_count_test.cpp11
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp125
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h115
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp266
-rw-r--r--src/mongo/db/pipeline/document_source_sort_by_count.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_sort_by_count.h2
-rw-r--r--src/mongo/db/pipeline/document_source_sort_by_count_test.cpp11
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp3
-rw-r--r--src/mongo/db/pipeline/expression_context.h4
-rw-r--r--src/mongo/db/pipeline/expression_context_for_test.h4
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp58
-rw-r--r--src/mongo/db/pipeline/value.h3
23 files changed, 916 insertions, 158 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 0d67ccbb2df..093cbb5a5c9 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -135,7 +135,9 @@ env.CppUnitTest(
'document_source_geo_near_test.cpp',
'document_source_group_test.cpp',
'document_source_limit_test.cpp',
+ 'document_source_lookup_change_post_image_test.cpp',
'document_source_lookup_test.cpp',
+ 'document_source_graph_lookup_test.cpp',
'document_source_match_test.cpp',
'document_source_mock_test.cpp',
'document_source_project_test.cpp',
@@ -231,7 +233,6 @@ docSourceEnv.Library(
'document_source_add_fields.cpp',
'document_source_bucket.cpp',
'document_source_bucket_auto.cpp',
- 'document_source_change_notification.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
'document_source_current_op.cpp',
@@ -262,6 +263,7 @@ docSourceEnv.Library(
'$BUILD_DIR/mongo/db/matcher/expression_algo',
'$BUILD_DIR/mongo/db/matcher/expressions',
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
+ '$BUILD_DIR/mongo/db/repl/oplog_entry',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/stats/top',
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
@@ -309,8 +311,10 @@ env.Library(
env.Library(
target='document_source_lookup',
source=[
+ 'document_source_change_notification.cpp',
'document_source_graph_lookup.cpp',
'document_source_lookup.cpp',
+ 'document_source_lookup_change_post_image.cpp',
],
LIBDEPS=[
'document_source',
@@ -318,20 +322,6 @@ env.Library(
],
)
-env.CppUnitTest(
- target='document_source_graph_lookup_test',
- source='document_source_graph_lookup_test.cpp',
- LIBDEPS=[
- 'document_source',
- 'document_source_lookup',
- 'document_value_test_util',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/query/query_test_service_context',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
- '$BUILD_DIR/mongo/s/is_mongos',
- ],
-)
-
env.Library(
target='document_source_facet',
source=[
diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp
index 6e58adae2bf..a9d9f74181a 100644
--- a/src/mongo/db/pipeline/document_source.cpp
+++ b/src/mongo/db/pipeline/document_source.cpp
@@ -41,6 +41,7 @@ namespace mongo {
using Parser = DocumentSource::Parser;
using boost::intrusive_ptr;
+using std::list;
using std::string;
using std::vector;
@@ -60,7 +61,7 @@ void DocumentSource::registerParser(string name, Parser parser) {
parserMap[name] = parser;
}
-vector<intrusive_ptr<DocumentSource>> DocumentSource::parse(
+list<intrusive_ptr<DocumentSource>> DocumentSource::parse(
const intrusive_ptr<ExpressionContext>& expCtx, BSONObj stageObj) {
uassert(16435,
"A pipeline stage specification object must contain exactly one field.",
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 2545f02d8de..383cd56e35f 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -85,7 +85,7 @@ class Document;
MONGO_INITIALIZER(addToDocSourceParserMap_##key)(InitializerContext*) { \
auto fullParserWrapper = [](BSONElement stageSpec, \
const boost::intrusive_ptr<ExpressionContext>& expCtx) { \
- return std::vector<boost::intrusive_ptr<DocumentSource>>{ \
+ return std::list<boost::intrusive_ptr<DocumentSource>>{ \
(fullParser)(stageSpec, expCtx)}; \
}; \
LiteParsedDocumentSource::registerParser("$" #key, liteParser); \
@@ -115,7 +115,7 @@ class Document;
class DocumentSource : public IntrusiveCounterUnsigned {
public:
- using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>(
+ using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>(
BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>;
/**
@@ -303,7 +303,7 @@ public:
/**
* Create a DocumentSource pipeline stage from 'stageObj'.
*/
- static std::vector<boost::intrusive_ptr<DocumentSource>> parse(
+ static std::list<boost::intrusive_ptr<DocumentSource>> parse(
const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj stageObj);
/**
diff --git a/src/mongo/db/pipeline/document_source_bucket.cpp b/src/mongo/db/pipeline/document_source_bucket.cpp
index 853ef0aaa42..e209834b14d 100644
--- a/src/mongo/db/pipeline/document_source_bucket.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket.cpp
@@ -37,6 +37,7 @@ namespace mongo {
using boost::intrusive_ptr;
using std::vector;
+using std::list;
REGISTER_MULTI_STAGE_ALIAS(bucket,
LiteParsedDocumentSourceDefault::parse,
@@ -52,7 +53,7 @@ intrusive_ptr<ExpressionConstant> getExpressionConstant(
}
} // namespace
-vector<intrusive_ptr<DocumentSource>> DocumentSourceBucket::createFromBson(
+list<intrusive_ptr<DocumentSource>> DocumentSourceBucket::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(40201,
str::stream() << "Argument to $bucket stage must be an object, but found type: "
diff --git a/src/mongo/db/pipeline/document_source_bucket.h b/src/mongo/db/pipeline/document_source_bucket.h
index aa83ee07cc2..e059660fc99 100644
--- a/src/mongo/db/pipeline/document_source_bucket.h
+++ b/src/mongo/db/pipeline/document_source_bucket.h
@@ -40,7 +40,7 @@ public:
/**
* Returns a $group stage followed by a $sort stage.
*/
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
diff --git a/src/mongo/db/pipeline/document_source_bucket_test.cpp b/src/mongo/db/pipeline/document_source_bucket_test.cpp
index 1a61934dcb5..26f18c4dca9 100644
--- a/src/mongo/db/pipeline/document_source_bucket_test.cpp
+++ b/src/mongo/db/pipeline/document_source_bucket_test.cpp
@@ -47,21 +47,22 @@
namespace mongo {
namespace {
-using std::vector;
using boost::intrusive_ptr;
+using std::list;
+using std::vector;
class BucketReturnsGroupAndSort : public AggregationContextFixture {
public:
void testCreateFromBsonResult(BSONObj bucketSpec, Value expectedGroupExplain) {
- vector<intrusive_ptr<DocumentSource>> result =
+ list<intrusive_ptr<DocumentSource>> result =
DocumentSourceBucket::createFromBson(bucketSpec.firstElement(), getExpCtx());
ASSERT_EQUALS(result.size(), 2UL);
- const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result[0].get());
+ const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result.front().get());
ASSERT(groupStage);
- const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result[1].get());
+ const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result.back().get());
ASSERT(sortStage);
// Serialize the DocumentSourceGroup and DocumentSourceSort from $bucket so that we can
@@ -154,7 +155,7 @@ TEST_F(BucketReturnsGroupAndSort, BucketSucceedsWithMultipleBoundaryValues) {
class InvalidBucketSpec : public AggregationContextFixture {
public:
- vector<intrusive_ptr<DocumentSource>> createBucket(BSONObj bucketSpec) {
+ list<intrusive_ptr<DocumentSource>> createBucket(BSONObj bucketSpec) {
auto sources = DocumentSourceBucket::createFromBson(bucketSpec.firstElement(), getExpCtx());
return sources;
}
@@ -267,14 +268,14 @@ TEST_F(InvalidBucketSpec, GroupFailsForBucketWithInvalidOutputField) {
TEST_F(InvalidBucketSpec, SwitchFailsForBucketWhenNoDefaultSpecified) {
const auto spec = fromjson("{$bucket : {groupBy : '$x', boundaries : [1, 2, 3]}}");
- vector<intrusive_ptr<DocumentSource>> bucketStages = createBucket(spec);
+ list<intrusive_ptr<DocumentSource>> bucketStages = createBucket(spec);
ASSERT_EQUALS(bucketStages.size(), 2UL);
- auto* groupStage = dynamic_cast<DocumentSourceGroup*>(bucketStages[0].get());
+ auto* groupStage = dynamic_cast<DocumentSourceGroup*>(bucketStages.front().get());
ASSERT(groupStage);
- const auto* sortStage = dynamic_cast<DocumentSourceSort*>(bucketStages[1].get());
+ const auto* sortStage = dynamic_cast<DocumentSourceSort*>(bucketStages.back().get());
ASSERT(sortStage);
auto doc = Document{{"x", 4}};
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
index dff06f05802..d173d1183d5 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification.cpp
@@ -30,30 +30,46 @@
#include "mongo/db/pipeline/document_source_change_notification.h"
+#include "mongo/bson/simple_bsonelement_comparator.h"
#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/util/log.h"
namespace mongo {
using boost::intrusive_ptr;
using boost::optional;
-using std::vector;
+using std::list;
using std::string;
+using std::vector;
-constexpr StringData kNamespaceField = "ns"_sd;
-constexpr StringData kTimestmapField = "ts"_sd;
-constexpr StringData kOpTypeField = "op"_sd;
-constexpr StringData kOField = "o"_sd;
-constexpr StringData kIdField = "_id"_sd;
-
+// The $changeNotification stage is an alias for many stages, but we need to be able to serialize
+// and re-parse the pipeline. To make this work, the 'transformation' stage will serialize itself
+// with the original specification, and all other stages that are created during the alias expansion
+// will not serialize themselves.
REGISTER_MULTI_STAGE_ALIAS(changeNotification,
DocumentSourceChangeNotification::LiteParsed::parse,
DocumentSourceChangeNotification::createFromBson);
+constexpr StringData DocumentSourceChangeNotification::kDocumentKeyField;
+constexpr StringData DocumentSourceChangeNotification::kFullDocumentField;
+constexpr StringData DocumentSourceChangeNotification::kIdField;
+constexpr StringData DocumentSourceChangeNotification::kNamespaceField;
+constexpr StringData DocumentSourceChangeNotification::kOperationTypeField;
+constexpr StringData DocumentSourceChangeNotification::kStageName;
+constexpr StringData DocumentSourceChangeNotification::kTimestmapField;
+constexpr StringData DocumentSourceChangeNotification::kUpdateOpType;
+constexpr StringData DocumentSourceChangeNotification::kDeleteOpType;
+constexpr StringData DocumentSourceChangeNotification::kReplaceOpType;
+constexpr StringData DocumentSourceChangeNotification::kInsertOpType;
+constexpr StringData DocumentSourceChangeNotification::kInvalidateOpType;
+
namespace {
static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd;
@@ -127,7 +143,7 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString
return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch));
}
-vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson(
+list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
// TODO: Add sharding support here (SERVER-29141).
uassert(40470,
@@ -137,17 +153,54 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr
"Only default collation is allowed when using a $changeNotification stage.",
!expCtx->getCollator());
- BSONObj filter = buildMatchFilter(expCtx->ns);
+ uassert(40573,
+ str::stream() << "the $changeNotification stage must be specified as an object, got "
+ << typeName(elem.type()),
+ elem.type() == BSONType::Object);
+
+ bool shouldLookupPostImage = false;
+ for (auto&& option : elem.embeddedObject()) {
+ auto optionName = option.fieldNameStringData();
+ if (optionName == "fullDocument"_sd) {
+ uassert(40574,
+ str::stream() << "the 'fullDocument' option to the $changeNotification stage "
+ "must be a string, got "
+ << typeName(option.type()),
+ option.type() == BSONType::String);
+ auto fullDocOption = option.valueStringData();
+ uassert(40575,
+ str::stream() << "unrecognized value for the 'fullDocument' option to the "
+ "$changeNotification stage. Expected \"none\" or "
+ "\"fullDocument\", got \""
+ << option.String()
+ << "\"",
+ fullDocOption == "lookup"_sd || fullDocOption == "none"_sd);
+ shouldLookupPostImage = (fullDocOption == "lookup"_sd);
+ } else if (optionName == "resumeAfter"_sd) {
+ uasserted(
+ 40576,
+ "the 'resumeAfter' option to the $changeNotification stage is not yet supported");
+ } else {
+ uasserted(40577,
+ str::stream() << "unrecognized option to $changeNotification stage: \""
+ << optionName
+ << "\"");
+ }
+ }
- auto oplogMatch = DocumentSourceOplogMatch::create(filter, expCtx);
- auto transformation = createTransformationStage(expCtx);
- return {oplogMatch, transformation};
+ auto oplogMatch = DocumentSourceOplogMatch::create(buildMatchFilter(expCtx->ns), expCtx);
+ auto transformation = createTransformationStage(elem.embeddedObject(), expCtx);
+ list<intrusive_ptr<DocumentSource>> stages = {oplogMatch, transformation};
+ if (shouldLookupPostImage) {
+ stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx));
+ }
+ return stages;
}
intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage(
- const intrusive_ptr<ExpressionContext>& expCtx) {
+ BSONObj changeNotificationSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation(
- expCtx, stdx::make_unique<Transformation>(), "$changeNotification"));
+ expCtx, stdx::make_unique<Transformation>(changeNotificationSpec), kStageName.toString()));
}
namespace {
@@ -167,89 +220,103 @@ Document DocumentSourceChangeNotification::Transformation::applyTransformation(
MutableDocument doc;
// Extract the fields we need.
- checkValueType(input[kOpTypeField], kOpTypeField, BSONType::String);
- string op = input[kOpTypeField].getString();
- Value ts = input[kTimestmapField];
- Value ns = input[kNamespaceField];
- checkValueType(ns, kNamespaceField, BSONType::String);
+ checkValueType(input[repl::OplogEntry::kOpTypeFieldName],
+ repl::OplogEntry::kOpTypeFieldName,
+ BSONType::String);
+ string op = input[repl::OplogEntry::kOpTypeFieldName].getString();
+ Value ts = input[repl::OplogEntry::kTimestampFieldName];
+ Value ns = input[repl::OplogEntry::kNamespaceFieldName];
+ checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String);
NamespaceString nss(ns.getString());
Value id = input.getNestedField("o._id");
// Non-replace updates have the _id in field "o2".
Value documentId = id.missing() ? input.getNestedField("o2._id") : id;
- string operationType;
- Value newDocument;
+ StringData operationType;
+ Value fullDocument = Value(BSONNULL);
Value updateDescription;
// Deal with CRUD operations and commands.
- if (op == "i") {
- operationType = "insert";
- newDocument = input[kOField];
- } else if (op == "d") {
- operationType = "delete";
- } else if (op == "u") {
- if (id.missing()) {
- operationType = "update";
- checkValueType(input[kOField], kOField, BSONType::Object);
- Document o = input[kOField].getDocument();
- Value updatedFields = o["$set"];
- Value removedFields = o["$unset"];
-
- // Extract the field names of $unset document.
- vector<Value> removedFieldsVector;
- if (removedFields.getType() == BSONType::Object) {
- auto iter = removedFields.getDocument().fieldIterator();
- while (iter.more()) {
- removedFieldsVector.push_back(Value(iter.next().first));
+ auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeNotificationEntry.op"), op);
+ switch (opType) {
+ case repl::OpTypeEnum::kInsert: {
+ operationType = kInsertOpType;
+ fullDocument = input[repl::OplogEntry::kObjectFieldName];
+ break;
+ }
+ case repl::OpTypeEnum::kDelete: {
+ operationType = kDeleteOpType;
+ break;
+ }
+ case repl::OpTypeEnum::kUpdate: {
+ if (id.missing()) {
+ operationType = kUpdateOpType;
+ checkValueType(input[repl::OplogEntry::kObjectFieldName],
+ repl::OplogEntry::kObjectFieldName,
+ BSONType::Object);
+ Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument();
+ Value updatedFields = opObject["$set"];
+ Value removedFields = opObject["$unset"];
+
+ // Extract the field names of $unset document.
+ vector<Value> removedFieldsVector;
+ if (removedFields.getType() == BSONType::Object) {
+ auto iter = removedFields.getDocument().fieldIterator();
+ while (iter.more()) {
+ removedFieldsVector.push_back(Value(iter.next().first));
+ }
}
+ updateDescription = Value(Document{
+ {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields},
+ {"removedFields", removedFieldsVector}});
+ } else {
+ operationType = kReplaceOpType;
+ fullDocument = input[repl::OplogEntry::kObjectFieldName];
}
- updateDescription = Value(Document{
- {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields},
- {"removedFields", removedFieldsVector}});
- } else {
- operationType = "replace";
- newDocument = input[kOField];
+ break;
+ }
+ case repl::OpTypeEnum::kCommand: {
+ operationType = kInvalidateOpType;
+ // Make sure the result doesn't have a document id.
+ documentId = Value();
+ break;
}
- } else if (op == "c") {
- operationType = "invalidate";
- // Make sure the result doesn't have a document id.
- documentId = Value();
+ default: { MONGO_UNREACHABLE; }
}
- // Construct the result document. If document id is missing, it will not appear in the output.
+ // Construct the result document. Note that 'documentId' might be the missing value, in which
+ // case it will not appear in the output.
doc.addField(
kIdField,
Value(Document{{kTimestmapField, ts}, {kNamespaceField, ns}, {kIdField, documentId}}));
- doc.addField("operationType", Value(operationType));
+ doc.addField(kOperationTypeField, Value(operationType));
+ doc.addField(kFullDocumentField, fullDocument);
// "invalidate" entry has fewer fields.
- if (op == "c") {
+ if (opType == repl::OpTypeEnum::kCommand) {
return doc.freeze();
}
- // Add fields for normal operations.
doc.addField(kNamespaceField, Value(Document{{"db", nss.db()}, {"coll", nss.coll()}}));
- doc.addField("documentKey", Value(Document{{kIdField, documentId}}));
+ doc.addField(kDocumentKeyField, Value(Document{{kIdField, documentId}}));
- // If newDocument or updateDescription is missing, it will not be serialized.
- doc.addField("newDocument", newDocument);
+ // Note that 'updateDescription' might be the 'missing' value, in which case it will not be
+ // serialized.
doc.addField("updateDescription", updateDescription);
return doc.freeze();
}
Document DocumentSourceChangeNotification::Transformation::serializeStageOptions(
boost::optional<ExplainOptions::Verbosity> explain) const {
- // TODO SERVER-29135 Be sure to re-serialize the 'postImage' argument.
- // TODO SERVER-29131 Be sure to re-serialize the 'resumeAfter' argument.
- return Document();
+ return Document(_changeNotificationSpec);
}
DocumentSource::GetDepsReturn DocumentSourceChangeNotification::Transformation::addDependencies(
DepsTracker* deps) const {
- deps->fields.insert(kOpTypeField.toString());
- deps->fields.insert(kTimestmapField.toString());
- deps->fields.insert(kNamespaceField.toString());
- deps->fields.insert(kOField.toString());
- deps->fields.insert("o2");
+ deps->fields.insert(repl::OplogEntry::kOpTypeFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kTimestampFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kNamespaceFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString());
+ deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString());
return DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL;
}
diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h
index 3b38a908626..ad10a5ad210 100644
--- a/src/mongo/db/pipeline/document_source_change_notification.h
+++ b/src/mongo/db/pipeline/document_source_change_notification.h
@@ -51,6 +51,8 @@ public:
}
stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ // TODO SERVER-29138: we need to communicate that this stage will need to look up
+ // documents from different collections.
return stdx::unordered_set<NamespaceString>();
}
@@ -62,6 +64,8 @@ public:
class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface {
public:
+ Transformation(BSONObj changeNotificationSpec)
+ : _changeNotificationSpec(changeNotificationSpec.getOwned()) {}
~Transformation() = default;
Document applyTransformation(const Document& input) final;
TransformerType getType() const final {
@@ -72,10 +76,48 @@ public:
boost::optional<ExplainOptions::Verbosity> explain) const final;
DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const final;
DocumentSource::GetModPathsReturn getModifiedPaths() const final;
+
+ private:
+ BSONObj _changeNotificationSpec;
};
+ // The name of the field where the document key (_id and shard key, if present) will be found
+ // after the transformation.
+ static constexpr StringData kDocumentKeyField = "documentKey"_sd;
+
+ // The name of the field where the full document will be found after the transformation. The
+ // full document is only present for certain types of operations, such as an insert.
+ static constexpr StringData kFullDocumentField = "fullDocument"_sd;
+
+ // The name of the field where the change identifier will be located after the transformation.
+ static constexpr StringData kIdField = "_id"_sd;
+
+ // The name of the field where the namespace of the change will be located after the
+ // transformation.
+ static constexpr StringData kNamespaceField = "ns"_sd;
+
+ // The name of the field where the type of the operation will be located after the
+ // transformation.
+ static constexpr StringData kOperationTypeField = "operationType"_sd;
+
+ // The name of this stage.
+ static constexpr StringData kStageName = "$changeNotification"_sd;
+
+ // The name of the field where the timestamp of the change will be located after the
+ // transformation. The timestamp will be located inside the change identifier, so the full path
+ // to the timestamp will be kIdField + "." + kTimestampField.
+ static constexpr StringData kTimestmapField = "ts"_sd;
+
+ // The different types of operations we can use for the operation type.
+ static constexpr StringData kUpdateOpType = "update"_sd;
+ static constexpr StringData kDeleteOpType = "delete"_sd;
+ static constexpr StringData kReplaceOpType = "replace"_sd;
+ static constexpr StringData kInsertOpType = "insert"_sd;
+ static constexpr StringData kInvalidateOpType = "invalidate"_sd;
+
/**
- * Produce the BSON for the $match stage based on a $changeNotification stage.
+ * Produce the BSON object representing the filter for the $match stage to filter oplog entries
+ * to only those relevant for this $changeNotification stage.
*/
static BSONObj buildMatchFilter(const NamespaceString& nss);
@@ -83,11 +125,11 @@ public:
* Parses a $changeNotification stage from 'elem' and produces the $match and transformation
* stages required.
*/
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
static boost::intrusive_ptr<DocumentSource> createTransformationStage(
- const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ BSONObj changeNotificationSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
// It is illegal to construct a DocumentSourceChangeNotification directly, use createFromBson()
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
index b3595789577..9c08bd91637 100644
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
@@ -49,14 +49,18 @@
namespace mongo {
namespace {
-using std::vector;
-using std::string;
using boost::intrusive_ptr;
-using repl::OplogEntry;
using repl::OpTypeEnum;
+using repl::OplogEntry;
+using std::list;
+using std::string;
+using std::vector;
+
using D = Document;
using V = Value;
+using DSChangeNotification = DocumentSourceChangeNotification;
+
static const Timestamp ts(100, 1);
static const repl::OpTime optime(ts, 1);
static const NamespaceString nss("unittests.change_notification");
@@ -67,10 +71,10 @@ public:
void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) {
const auto spec = fromjson("{$changeNotification: {}}");
- vector<intrusive_ptr<DocumentSource>> result =
- DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
+ list<intrusive_ptr<DocumentSource>> result =
+ DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
- auto match = dynamic_cast<DocumentSourceMatch*>(result[0].get());
+ auto match = dynamic_cast<DocumentSourceMatch*>(result.front().get());
ASSERT(match);
auto mock = DocumentSourceMock::create(D(entry.toBSON()));
match->setSource(mock.get());
@@ -78,7 +82,7 @@ public:
// Check the oplog entry is transformed correctly.
auto transform = result.back().get();
ASSERT(transform);
- ASSERT_EQ(string(transform->getSourceName()), "$changeNotification");
+ ASSERT_EQ(string(transform->getSourceName()), DSChangeNotification::kStageName);
transform->setSource(match);
auto next = transform->getNext();
@@ -94,16 +98,62 @@ public:
}
};
+TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedOption) {
+ auto expCtx = getExpCtx();
+
+ ASSERT_THROWS_CODE(
+ DSChangeNotification::createFromBson(
+ BSON(DSChangeNotification::kStageName << BSON("unexpected" << 4)).firstElement(),
+ expCtx),
+ UserException,
+ 40577);
+}
+
+TEST_F(ChangeNotificationStageTest, ShouldRejectResumeAfterOption) {
+ // TODO SERVER-29131 change this test to accept the option.
+ auto expCtx = getExpCtx();
+
+ ASSERT_THROWS_CODE(
+ DSChangeNotification::createFromBson(
+ BSON(DSChangeNotification::kStageName << BSON("resumeAfter" << ts)).firstElement(),
+ expCtx),
+ UserException,
+ 40576);
+}
+
+TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) {
+ auto expCtx = getExpCtx();
+
+ ASSERT_THROWS_CODE(
+ DSChangeNotification::createFromBson(
+ BSON(DSChangeNotification::kStageName << BSON("fullDocument" << true)).firstElement(),
+ expCtx),
+ UserException,
+ 40574);
+}
+
+TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) {
+ auto expCtx = getExpCtx();
+
+ ASSERT_THROWS_CODE(DSChangeNotification::createFromBson(
+ BSON(DSChangeNotification::kStageName << BSON("fullDocument"
+ << "unrecognized"))
+ .firstElement(),
+ expCtx),
+ UserException,
+ 40575);
+}
+
TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) {
const auto spec = fromjson("{$changeNotification: {}}");
- vector<intrusive_ptr<DocumentSource>> result =
- DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
+ list<intrusive_ptr<DocumentSource>> result =
+ DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
ASSERT_EQUALS(result.size(), 2UL);
- ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result[0].get()));
- ASSERT_EQUALS(string(result[0]->getSourceName()), "$changeNotification");
- ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification");
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result.front().get()));
+ ASSERT_EQUALS(string(result.front()->getSourceName()), DSChangeNotification::kStageName);
+ ASSERT_EQUALS(string(result.back()->getSourceName()), DSChangeNotification::kStageName);
// TODO: Check explain result.
}
@@ -112,11 +162,11 @@ TEST_F(ChangeNotificationStageTest, TransformInsert) {
OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 1));
// Insert
Document expectedInsert{
- {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
- {"operationType", "insert"_sd},
- {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {"documentKey", D{{"_id", 1}}},
- {"newDocument", D{{"_id", 1}, {"x", 1}}},
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInsertOpType},
+ {DSChangeNotification::kFullDocumentField, D{{"_id", 1}, {"x", 1}}},
+ {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}},
};
checkTransformation(insert, expectedInsert);
}
@@ -126,13 +176,15 @@ TEST_F(ChangeNotificationStageTest, TransformUpdateFields) {
optime, 1, OpTypeEnum::kUpdate, nss, BSON("$set" << BSON("y" << 1)), BSON("_id" << 1));
// Update fields
Document expectedUpdateField{
- {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
- {"operationType", "update"_sd},
- {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {"documentKey", D{{"_id", 1}}},
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kUpdateOpType},
+ {DSChangeNotification::kFullDocumentField, BSONNULL},
+ {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}},
{
"updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}},
- }};
+ },
+ };
checkTransformation(updateField, expectedUpdateField);
}
@@ -141,10 +193,11 @@ TEST_F(ChangeNotificationStageTest, TransformRemoveFields) {
optime, 1, OpTypeEnum::kUpdate, nss, BSON("$unset" << BSON("y" << 1)), BSON("_id" << 1));
// Remove fields
Document expectedRemoveField{
- {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
- {"operationType", "update"_sd},
- {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {"documentKey", D{{"_id", 1}}},
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kUpdateOpType},
+ {DSChangeNotification::kFullDocumentField, BSONNULL},
+ {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}},
{
"updateDescription", D{{"updatedFields", D{}}, {"removedFields", vector<V>{V("y"_sd)}}},
}};
@@ -156,11 +209,11 @@ TEST_F(ChangeNotificationStageTest, TransformReplace) {
optime, 1, OpTypeEnum::kUpdate, nss, BSON("_id" << 1 << "y" << 1), BSON("_id" << 1));
// Replace
Document expectedReplace{
- {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
- {"operationType", "replace"_sd},
- {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {"documentKey", D{{"_id", 1}}},
- {"newDocument", D{{"_id", 1}, {"y", 1}}},
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kReplaceOpType},
+ {DSChangeNotification::kFullDocumentField, D{{"_id", 1}, {"y", 1}}},
+ {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}},
};
checkTransformation(replace, expectedReplace);
}
@@ -169,10 +222,11 @@ TEST_F(ChangeNotificationStageTest, TransformDelete) {
OplogEntry deleteEntry(optime, 1, OpTypeEnum::kDelete, nss, BSON("_id" << 1));
// Delete
Document expectedDelete{
- {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
- {"operationType", "delete"_sd},
- {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}},
- {"documentKey", D{{"_id", 1}}},
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kDeleteOpType},
+ {DSChangeNotification::kFullDocumentField, BSONNULL},
+ {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}},
};
checkTransformation(deleteEntry, expectedDelete);
}
@@ -187,7 +241,9 @@ TEST_F(ChangeNotificationStageTest, TransformInvalidate) {
// Invalidate entry includes $cmd namespace in _id and doesn't have a document id.
Document expectedInvalidate{
- {"_id", D{{"ts", ts}, {"ns", nss.getCommandNS().ns()}}}, {"operationType", "invalidate"_sd},
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.getCommandNS().ns()}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInvalidateOpType},
+ {DSChangeNotification::kFullDocumentField, BSONNULL},
};
for (auto& entry : {dropColl, dropDB, rename}) {
checkTransformation(entry, expectedInvalidate);
@@ -203,8 +259,9 @@ TEST_F(ChangeNotificationStageTest, TransformInvalidateRenameDropTarget) {
otherColl.getCommandNS(),
BSON("renameCollection" << otherColl.ns() << "to" << nss.ns()));
Document expectedInvalidate{
- {"_id", D{{"ts", ts}, {"ns", otherColl.getCommandNS().ns()}}},
- {"operationType", "invalidate"_sd},
+ {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", otherColl.getCommandNS().ns()}}},
+ {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInvalidateOpType},
+ {DSChangeNotification::kFullDocumentField, BSONNULL},
};
checkTransformation(rename, expectedInvalidate);
}
@@ -230,5 +287,38 @@ TEST_F(ChangeNotificationStageTest, MatchFiltersCreateIndex) {
checkTransformation(createIndex, boost::none);
}
+TEST_F(ChangeNotificationStageTest, TransformationShouldBeAbleToReParseSerializedStage) {
+ auto expCtx = getExpCtx();
+
+ auto originalSpec = BSON(DSChangeNotification::kStageName << BSONObj());
+ auto allStages = DSChangeNotification::createFromBson(originalSpec.firstElement(), expCtx);
+ ASSERT_EQ(allStages.size(), 2UL);
+ auto stage = allStages.back();
+ ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(stage.get()));
+
+ //
+ // Serialize the stage and confirm contents.
+ //
+ vector<Value> serialization;
+ stage->serializeToArray(serialization);
+ ASSERT_EQ(serialization.size(), 1UL);
+ ASSERT_EQ(serialization[0].getType(), BSONType::Object);
+ auto serializedDoc = serialization[0].getDocument();
+ ASSERT_BSONOBJ_EQ(serializedDoc.toBson(), originalSpec);
+
+ //
+ // Create a new stage from the serialization. Serialize the new stage and confirm that it is
+ // equivalent to the original serialization.
+ //
+ auto serializedBson = serializedDoc.toBson();
+ auto roundTripped = uassertStatusOK(Pipeline::create(
+ DSChangeNotification::createFromBson(serializedBson.firstElement(), expCtx), expCtx));
+
+ auto newSerialization = roundTripped->serialize();
+
+ ASSERT_EQ(newSerialization.size(), 1UL);
+ ASSERT_VALUE_EQ(newSerialization[0], serialization[0]);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_count.cpp b/src/mongo/db/pipeline/document_source_count.cpp
index 01c4044fece..0a2d299200b 100644
--- a/src/mongo/db/pipeline/document_source_count.cpp
+++ b/src/mongo/db/pipeline/document_source_count.cpp
@@ -39,14 +39,14 @@
namespace mongo {
using boost::intrusive_ptr;
-using std::vector;
+using std::list;
using std::string;
REGISTER_MULTI_STAGE_ALIAS(count,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceCount::createFromBson);
-vector<intrusive_ptr<DocumentSource>> DocumentSourceCount::createFromBson(
+list<intrusive_ptr<DocumentSource>> DocumentSourceCount::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
uassert(40156,
str::stream() << "the count field must be a non-empty string",
diff --git a/src/mongo/db/pipeline/document_source_count.h b/src/mongo/db/pipeline/document_source_count.h
index 50fbccdf41b..e77ce3eb0ed 100644
--- a/src/mongo/db/pipeline/document_source_count.h
+++ b/src/mongo/db/pipeline/document_source_count.h
@@ -40,7 +40,7 @@ public:
/**
* Returns a $group stage followed by a $project stage.
*/
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
diff --git a/src/mongo/db/pipeline/document_source_count_test.cpp b/src/mongo/db/pipeline/document_source_count_test.cpp
index e083399b0fa..254f011ff1d 100644
--- a/src/mongo/db/pipeline/document_source_count_test.cpp
+++ b/src/mongo/db/pipeline/document_source_count_test.cpp
@@ -45,22 +45,23 @@
namespace mongo {
namespace {
-using std::vector;
using boost::intrusive_ptr;
+using std::list;
+using std::vector;
class CountReturnsGroupAndProjectStages : public AggregationContextFixture {
public:
void testCreateFromBsonResult(BSONObj countSpec) {
- vector<intrusive_ptr<DocumentSource>> result =
+ list<intrusive_ptr<DocumentSource>> result =
DocumentSourceCount::createFromBson(countSpec.firstElement(), getExpCtx());
ASSERT_EQUALS(result.size(), 2UL);
- const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result[0].get());
+ const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result.front().get());
ASSERT(groupStage);
const auto* projectStage =
- dynamic_cast<DocumentSourceSingleDocumentTransformation*>(result[1].get());
+ dynamic_cast<DocumentSourceSingleDocumentTransformation*>(result.back().get());
ASSERT(projectStage);
auto explain = ExplainOptions::Verbosity::kQueryPlanner;
@@ -94,7 +95,7 @@ TEST_F(CountReturnsGroupAndProjectStages, ValidStringSpec) {
class InvalidCountSpec : public AggregationContextFixture {
public:
- vector<intrusive_ptr<DocumentSource>> createCount(BSONObj countSpec) {
+ list<intrusive_ptr<DocumentSource>> createCount(BSONObj countSpec) {
auto specElem = countSpec.firstElement();
return DocumentSourceCount::createFromBson(specElem, getExpCtx());
}
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
new file mode 100644
index 00000000000..ee605710a54
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+
+#include "mongo/bson/simple_bsonelement_comparator.h"
+
+namespace mongo {
+
+constexpr StringData DocumentSourceLookupChangePostImage::kStageName;
+constexpr StringData DocumentSourceLookupChangePostImage::kFullDocumentFieldName;
+
+namespace {
+Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType expectedType) {
+ auto val = fullDoc[fieldName];
+ uassert(40578,
+ str::stream() << "failed to look up post image after change: expected \"" << fieldName
+ << "\" field to have type "
+ << typeName(expectedType)
+ << ", instead found type "
+ << typeName(val.getType())
+ << ": "
+ << val.toString()
+ << ", full object: "
+ << fullDoc.toString(),
+ val.getType() == expectedType);
+ return val;
+}
+} // namespace
+
+DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ auto input = pSource->getNext();
+ if (!input.isAdvanced()) {
+ return input;
+ }
+ auto opTypeVal = assertFieldHasType(input.getDocument(),
+ DocumentSourceChangeNotification::kOperationTypeField,
+ BSONType::String);
+ if (opTypeVal.getString() != DocumentSourceChangeNotification::kUpdateOpType) {
+ return input;
+ }
+
+ MutableDocument output(input.releaseDocument());
+ output[kFullDocumentFieldName] = lookupPostImage(output.peek());
+ return output.freeze();
+}
+
+NamespaceString DocumentSourceLookupChangePostImage::assertNamespaceMatches(
+ const Document& inputDoc) const {
+ auto namespaceObject = assertFieldHasType(inputDoc,
+ DocumentSourceChangeNotification::kNamespaceField,
+ BSONType::Object)
+ .getDocument();
+ auto dbName = assertFieldHasType(namespaceObject, "db"_sd, BSONType::String);
+ auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String);
+ NamespaceString nss(dbName.getString(), collectionName.getString());
+ uassert(40579,
+ str::stream() << "unexpected namespace during post image lookup: " << nss.ns()
+ << ", expected "
+ << pExpCtx->ns.ns(),
+ nss == pExpCtx->ns);
+ return nss;
+}
+
+Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updateOp) const {
+ // Make sure we have a well-formed input.
+ auto nss = assertNamespaceMatches(updateOp);
+
+ auto documentKey = assertFieldHasType(
+ updateOp, DocumentSourceChangeNotification::kDocumentKeyField, BSONType::Object);
+ auto matchSpec = BSON("$match" << documentKey);
+
+ // TODO SERVER-29134 we need to extract the namespace from the document and set them on the new
+ // ExpressionContext if we're getting notifications from an entire database.
+ auto foreignExpCtx = pExpCtx->copyWith(nss);
+ auto pipeline = uassertStatusOK(_mongod->makePipeline({matchSpec}, foreignExpCtx));
+
+ if (auto first = pipeline->getNext()) {
+ auto lookedUpDocument = Value(*first);
+ if (auto next = pipeline->getNext()) {
+ uasserted(40580,
+ str::stream() << "found more than document with documentKey "
+ << documentKey.toString()
+ << " while looking up post image after change: ["
+ << lookedUpDocument.toString()
+ << ", "
+ << next->toString()
+ << "]");
+ }
+ return lookedUpDocument;
+ }
+ // We couldn't find it with the documentKey, it may have been deleted.
+ return Value(BSONNULL);
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
new file mode 100644
index 00000000000..ca14656a63c
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -0,0 +1,115 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_notification.h"
+
+namespace mongo {
+
+/**
+ * Part of the change notification API machinery used to look up the post-image of a document. Uses
+ * the "documentKey" field of the input to look up the new version of the document.
+ *
+ * Uses the ExpressionContext to determine what collection to look up into.
+ * TODO SERVER-29134 When we allow change streams on multiple collections, this will need to change.
+ */
+class DocumentSourceLookupChangePostImage final : public DocumentSourceNeedsMongod {
+public:
+ static constexpr StringData kStageName = "$_internalLookupChangePostImage"_sd;
+ static constexpr StringData kFullDocumentFieldName =
+ DocumentSourceChangeNotification::kFullDocumentField;
+
+ /**
+ * Creates a DocumentSourceLookupChangePostImage stage.
+ */
+ static boost::intrusive_ptr<DocumentSourceLookupChangePostImage> create(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) {
+ return new DocumentSourceLookupChangePostImage(expCtx);
+ }
+
+ /**
+ * Only modifies a single path: "fullDocument".
+ */
+ GetModPathsReturn getModifiedPaths() const final {
+ return {GetModPathsReturn::Type::kFiniteSet, {kFullDocumentFieldName.toString()}, {}};
+ }
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.canSwapWithMatch = true;
+ constraints.isAllowedInsideFacetStage = false;
+ return constraints;
+ }
+
+ GetDepsReturn getDependencies(DepsTracker* deps) const {
+ // The namespace is not technically needed yet, but we will if there is more than one
+ // collection involved.
+ deps->fields.insert(DocumentSourceChangeNotification::kNamespaceField.toString());
+ deps->fields.insert(DocumentSourceChangeNotification::kDocumentKeyField.toString());
+ deps->fields.insert(DocumentSourceChangeNotification::kOperationTypeField.toString());
+ // This stage does not restrict the output fields to a finite set, and has no impact on
+ // whether metadata is available or needed.
+ return SEE_NEXT;
+ }
+
+ /**
+ * Performs the lookup to retrieve the full document.
+ */
+ GetNextResult getNext() final;
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ if (explain) {
+ return Value{Document{{kStageName, Document()}}};
+ }
+ return Value(); // Do not serialize this stage unless we're explaining.
+ }
+
+ const char* getSourceName() const final {
+ return kStageName.rawData();
+ }
+
+private:
+ DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx)
+ : DocumentSourceNeedsMongod(expCtx) {}
+
+ /**
+ * Uses the "documentKey" field from 'updateOp' to look up the current version of the document.
+ * Returns Value(BSONNULL) if the document couldn't be found.
+ */
+ Value lookupPostImage(const Document& updateOp) const;
+
+ /**
+ * Throws a UserException if the namespace found in 'inputDoc' doesn't match the one on the
+ * ExpressionContext.
+ */
+ NamespaceString assertNamespaceMatches(const Document& inputDoc) const;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
new file mode 100644
index 00000000000..728db433c55
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -0,0 +1,266 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <boost/intrusive_ptr.hpp>
+#include <deque>
+#include <vector>
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source_change_notification.h"
+#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/field_path.h"
+#include "mongo/db/pipeline/stub_mongod_interface.h"
+#include "mongo/db/pipeline/value.h"
+
+namespace mongo {
+namespace {
+using boost::intrusive_ptr;
+using std::deque;
+using std::vector;
+
+// This provides access to getExpCtx(), but we'll use a different name for this test suite.
+using DocumentSourceLookupChangePostImageTest = AggregationContextFixture;
+
+/**
+ * A mock MongodInterface which allows mocking a foreign pipeline.
+ */
+class MockMongodInterface final : public StubMongodInterface {
+public:
+ MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults)
+ : _mockResults(std::move(mockResults)) {}
+
+ bool isSharded(const NamespaceString& ns) final {
+ return false;
+ }
+
+ StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) final {
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults));
+ pipeline.getValue()->optimizePipeline();
+
+ return pipeline;
+ }
+
+private:
+ deque<DocumentSource::GetNextResult> _mockResults;
+};
+
+TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $lookup stage.
+ auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+
+ // Mock its input with a document without a "documentKey" field.
+ auto mockLocalSource = DocumentSourceMock::create(
+ Document{{"operationType", "update"_sd},
+ {"fullDocument", Document{{"_id", 0}}},
+ {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
+
+ lookupChangeStage->setSource(mockLocalSource.get());
+
+ // Mock out the foreign collection.
+ lookupChangeStage->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{}));
+
+ ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578);
+}
+
+TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationType) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $lookup stage.
+ auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+
+ // Mock its input with a document without a "ns" field.
+ auto mockLocalSource = DocumentSourceMock::create(
+ Document{{"documentKey", Document{{"_id", 0}}},
+ {"fullDocument", Document{{"_id", 0}}},
+ {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
+
+ lookupChangeStage->setSource(mockLocalSource.get());
+
+ // Mock out the foreign collection.
+ lookupChangeStage->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{}));
+
+ ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578);
+}
+
+TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $lookup stage.
+ auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+
+ // Mock its input with a document without a "ns" field.
+ auto mockLocalSource = DocumentSourceMock::create(Document{
+ {"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd},
+ });
+
+ lookupChangeStage->setSource(mockLocalSource.get());
+
+ // Mock out the foreign collection.
+ lookupChangeStage->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{}));
+
+ ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578);
+}
+
+TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $lookup stage.
+ auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+
+ // Mock its input with a document without a "ns" field.
+ auto mockLocalSource = DocumentSourceMock::create(
+ Document{{"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, {"ns", 4}});
+
+ lookupChangeStage->setSource(mockLocalSource.get());
+
+ // Mock out the foreign collection.
+ lookupChangeStage->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{}));
+
+ ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578);
+}
+
+TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $lookup stage.
+ auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+
+ // Mock its input with a document without a "ns" field.
+ auto mockLocalSource = DocumentSourceMock::create(
+ Document{{"documentKey", Document{{"_id", 0}}},
+ {"operationType", "update"_sd},
+ {"ns", Document{{"db", "DIFFERENT"_sd}, {"coll", expCtx->ns.coll()}}}});
+
+ lookupChangeStage->setSource(mockLocalSource.get());
+
+ // Mock out the foreign collection.
+ lookupChangeStage->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{}));
+
+ ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40579);
+}
+
+TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $lookup stage.
+ auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+
+ // Mock its input with an update document.
+ auto mockLocalSource = DocumentSourceMock::create(
+ Document{{"documentKey", Document{{"_id", 0}}},
+ {"operationType", "update"_sd},
+ {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
+
+ lookupChangeStage->setSource(mockLocalSource.get());
+
+ // Mock out the foreign collection to have two documents with the same document key.
+ deque<DocumentSource::GetNextResult> foreignCollection = {Document{{"_id", 0}},
+ Document{{"_id", 0}}};
+ lookupChangeStage->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(std::move(foreignCollection)));
+
+ ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40580);
+}
+
+TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
+ auto expCtx = getExpCtx();
+
+ // Set up the $lookup stage.
+ auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
+
+ // Mock its input, pausing every other result.
+ auto mockLocalSource = DocumentSourceMock::create(
+ {Document{{"documentKey", Document{{"_id", 0}}},
+ {"operationType", "insert"_sd},
+ {"fullDocument", Document{{"_id", 0}}},
+ {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}},
+ DocumentSource::GetNextResult::makePauseExecution(),
+ Document{{"documentKey", Document{{"_id", 1}}},
+ {"operationType", "update"_sd},
+ {"fullDocument", BSONNULL},
+ {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}},
+ DocumentSource::GetNextResult::makePauseExecution()});
+
+ lookupChangeStage->setSource(mockLocalSource.get());
+
+ // Mock out the foreign collection.
+ deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}},
+ Document{{"_id", 1}}};
+ lookupChangeStage->injectMongodInterface(
+ std::make_shared<MockMongodInterface>(std::move(mockForeignContents)));
+
+ auto next = lookupChangeStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.releaseDocument(),
+ (Document{{"documentKey", Document{{"_id", 0}}},
+ {"operationType", "insert"_sd},
+ {"fullDocument", Document{{"_id", 0}}},
+ {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}));
+
+ ASSERT_TRUE(lookupChangeStage->getNext().isPaused());
+
+ next = lookupChangeStage->getNext();
+ ASSERT_TRUE(next.isAdvanced());
+ ASSERT_DOCUMENT_EQ(
+ next.releaseDocument(),
+ (Document{{"documentKey", Document{{"_id", 1}}},
+ {"operationType", "update"_sd},
+ {"fullDocument", Document{{"_id", 1}}},
+ {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}));
+
+ ASSERT_TRUE(lookupChangeStage->getNext().isPaused());
+
+ ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
+ ASSERT_TRUE(lookupChangeStage->getNext().isEOF());
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.cpp b/src/mongo/db/pipeline/document_source_sort_by_count.cpp
index 48cfaa9f559..97c5e0f0297 100644
--- a/src/mongo/db/pipeline/document_source_sort_by_count.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_by_count.cpp
@@ -39,13 +39,13 @@
namespace mongo {
using boost::intrusive_ptr;
-using std::vector;
+using std::list;
REGISTER_MULTI_STAGE_ALIAS(sortByCount,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceSortByCount::createFromBson);
-vector<intrusive_ptr<DocumentSource>> DocumentSourceSortByCount::createFromBson(
+list<intrusive_ptr<DocumentSource>> DocumentSourceSortByCount::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
if (elem.type() == Object) {
// Make sure that the sortByCount field is an expression inside an object
diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.h b/src/mongo/db/pipeline/document_source_sort_by_count.h
index 69bf8d2c5e0..c8196c9b498 100644
--- a/src/mongo/db/pipeline/document_source_sort_by_count.h
+++ b/src/mongo/db/pipeline/document_source_sort_by_count.h
@@ -40,7 +40,7 @@ public:
/**
* Returns a $group stage followed by a $sort stage.
*/
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
+ static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
diff --git a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp
index 3e0a007c59b..12975e96149 100644
--- a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp
+++ b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp
@@ -46,8 +46,9 @@
namespace mongo {
namespace {
-using std::vector;
using boost::intrusive_ptr;
+using std::list;
+using std::vector;
/**
* Fixture to test that $sortByCount returns a DocumentSourceGroup and DocumentSourceSort.
@@ -55,15 +56,15 @@ using boost::intrusive_ptr;
class SortByCountReturnsGroupAndSort : public AggregationContextFixture {
public:
void testCreateFromBsonResult(BSONObj sortByCountSpec, Value expectedGroupExplain) {
- vector<intrusive_ptr<DocumentSource>> result =
+ list<intrusive_ptr<DocumentSource>> result =
DocumentSourceSortByCount::createFromBson(sortByCountSpec.firstElement(), getExpCtx());
ASSERT_EQUALS(result.size(), 2UL);
- const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result[0].get());
+ const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result.front().get());
ASSERT(groupStage);
- const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result[1].get());
+ const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result.back().get());
ASSERT(sortStage);
// Serialize the DocumentSourceGroup and DocumentSourceSort from $sortByCount so that we can
@@ -111,7 +112,7 @@ TEST_F(SortByCountReturnsGroupAndSort, ExpressionInObjectSpec) {
*/
class InvalidSortByCountSpec : public AggregationContextFixture {
public:
- vector<intrusive_ptr<DocumentSource>> createSortByCount(BSONObj sortByCountSpec) {
+ list<intrusive_ptr<DocumentSource>> createSortByCount(BSONObj sortByCountSpec) {
auto specElem = sortByCountSpec.firstElement();
return DocumentSourceSortByCount::createFromBson(specElem, getExpCtx());
}
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index d2a16a6ca87..f5f42d197c0 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -73,7 +73,7 @@ void ExpressionContext::setCollator(std::unique_ptr<CollatorInterface> coll) {
}
intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) const {
- intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext();
+ intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(std::move(ns));
expCtx->explain = explain;
expCtx->inShard = inShard;
@@ -81,7 +81,6 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns)
expCtx->extSortAllowed = extSortAllowed;
expCtx->bypassDocumentValidation = bypassDocumentValidation;
- expCtx->ns = std::move(ns);
expCtx->tempDir = tempDir;
expCtx->opCtx = opCtx;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index d04b095aa4b..ba2690796f6 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -134,7 +134,9 @@ public:
protected:
static const int kInterruptCheckPeriod = 128;
- ExpressionContext() : variablesParseState(variables.useIdGenerator()) {}
+
+ ExpressionContext(NamespaceString nss)
+ : ns(std::move(nss)), variablesParseState(variables.useIdGenerator()) {}
/**
* Sets '_collator' and resets '_documentComparator' and '_valueComparator'.
diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h
index 234b54e1a10..542e5b91eea 100644
--- a/src/mongo/db/pipeline/expression_context_for_test.h
+++ b/src/mongo/db/pipeline/expression_context_for_test.h
@@ -40,7 +40,9 @@ namespace mongo {
*/
class ExpressionContextForTest : public ExpressionContext {
public:
- ExpressionContextForTest() = default;
+ ExpressionContextForTest() : ExpressionContext(NamespaceString{"test"_sd, "namespace"_sd}) {}
+
+ ExpressionContextForTest(NamespaceString nss) : ExpressionContext(std::move(nss)) {}
ExpressionContextForTest(OperationContext* opCtx, const AggregationRequest& request)
: ExpressionContext(opCtx, request, nullptr, {}) {}
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index 1f37b30fc90..1402f6f504d 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -37,7 +37,11 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_source_change_notification.h"
+#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
+#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/pipeline/document_source_project.h"
#include "mongo/db/pipeline/document_value_test_util.h"
#include "mongo/db/pipeline/expression_context_for_test.h"
#include "mongo/db/pipeline/field_path.h"
@@ -52,6 +56,8 @@ using boost::intrusive_ptr;
using std::string;
using std::vector;
+const NamespaceString kTestNss = NamespaceString("a.collection");
+
namespace Optimizations {
using namespace mongo;
@@ -77,7 +83,7 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson,
ASSERT_EQUALS(stageElem.type(), BSONType::Object);
rawPipeline.push_back(stageElem.embeddedObject());
}
- AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
+ AggregationRequest request(kTestNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(opCtx.get(), request);
@@ -962,6 +968,52 @@ TEST(PipelineOptimizationTest, MatchOnMaxLengthShouldMoveAcrossRename) {
assertPipelineOptimizesTo(inputPipe, outputPipe);
}
+TEST(PipelineOptimizationTest, ChangeNotificationLookupSwapsWithIndependentMatch) {
+ QueryTestServiceContext testServiceContext;
+ auto opCtx = testServiceContext.makeOperationContext();
+
+ intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
+ expCtx->opCtx = opCtx.get();
+
+ auto spec = BSON("$changeNotification" << BSON("fullDocument"
+ << "lookup"));
+ auto stages = DocumentSourceChangeNotification::createFromBson(spec.firstElement(), expCtx);
+ ASSERT_EQ(stages.size(), 3UL);
+ // Make sure the change lookup is at the end.
+ ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
+
+ auto matchPredicate = BSON("extra"
+ << "predicate");
+ stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx));
+ auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ pipeline->optimizePipeline();
+
+ // Make sure the $match stage has swapped before the change look up.
+ ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(pipeline->getSources().back().get()));
+}
+
+TEST(PipelineOptimizationTest, ChangeNotificationLookupDoesNotSwapWithMatchOnPostImage) {
+ QueryTestServiceContext testServiceContext;
+ auto opCtx = testServiceContext.makeOperationContext();
+
+ intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss));
+ expCtx->opCtx = opCtx.get();
+
+ auto spec = BSON("$changeNotification" << BSON("fullDocument"
+ << "lookup"));
+ auto stages = DocumentSourceChangeNotification::createFromBson(spec.firstElement(), expCtx);
+ ASSERT_EQ(stages.size(), 3UL);
+ // Make sure the change lookup is at the end.
+ ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get()));
+
+ stages.push_back(DocumentSourceMatch::create(
+ BSON(DocumentSourceLookupChangePostImage::kFullDocumentFieldName << BSONNULL), expCtx));
+ auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx));
+ pipeline->optimizePipeline();
+
+ // Make sure the $match stage stays at the end.
+ ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline->getSources().back().get()));
+}
} // namespace Local
namespace Sharded {
@@ -986,7 +1038,7 @@ public:
ASSERT_EQUALS(stageElem.type(), BSONType::Object);
rawPipeline.push_back(stageElem.embeddedObject());
}
- AggregationRequest request(NamespaceString("a.collection"), rawPipeline);
+ AggregationRequest request(kTestNss, rawPipeline);
intrusive_ptr<ExpressionContextForTest> ctx =
new ExpressionContextForTest(&_opCtx, request);
@@ -1391,7 +1443,7 @@ TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollecti
auto collectionlessSource = DocumentSourceCollectionlessMock::create();
auto ctx = getExpCtx();
- ctx->ns = NamespaceString("a.collection");
+ ctx->ns = kTestNss;
ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus());
}
diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h
index 9f7a357f677..04e396dc58d 100644
--- a/src/mongo/db/pipeline/value.h
+++ b/src/mongo/db/pipeline/value.h
@@ -117,6 +117,9 @@ public:
explicit Value(const MaxKeyLabeler&) : _storage(MaxKey) {} // MAXKEY
explicit Value(const Date_t& date) : _storage(Date, date.toMillisSinceEpoch()) {}
+ explicit Value(const char*) = delete; // Use StringData instead to prevent accidentally
+ // terminating the string at the first null byte.
+
// TODO: add an unsafe version that can share storage with the BSONElement
/// Deep-convert from BSONElement to Value
explicit Value(const BSONElement& elem);