diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-06-27 13:29:02 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-08-01 17:16:14 -0400 |
commit | ad30a49a33b8773cbc07388bb257d605cbd6aa12 (patch) | |
tree | 3707869546caa93ed42efd29cc8404c8f36f9e4a /src/mongo/db | |
parent | 2431e1356823d898ef8af16997d6f63b65b385a5 (diff) | |
download | mongo-ad30a49a33b8773cbc07388bb257d605cbd6aa12.tar.gz |
SERVER-29135 Add post-image lookup to $changeNotification
This patch only adds support on an unsharded collection.
Diffstat (limited to 'src/mongo/db')
23 files changed, 916 insertions, 158 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 0d67ccbb2df..093cbb5a5c9 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -135,7 +135,9 @@ env.CppUnitTest( 'document_source_geo_near_test.cpp', 'document_source_group_test.cpp', 'document_source_limit_test.cpp', + 'document_source_lookup_change_post_image_test.cpp', 'document_source_lookup_test.cpp', + 'document_source_graph_lookup_test.cpp', 'document_source_match_test.cpp', 'document_source_mock_test.cpp', 'document_source_project_test.cpp', @@ -231,7 +233,6 @@ docSourceEnv.Library( 'document_source_add_fields.cpp', 'document_source_bucket.cpp', 'document_source_bucket_auto.cpp', - 'document_source_change_notification.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', 'document_source_current_op.cpp', @@ -262,6 +263,7 @@ docSourceEnv.Library( '$BUILD_DIR/mongo/db/matcher/expression_algo', '$BUILD_DIR/mongo/db/matcher/expressions', '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', + '$BUILD_DIR/mongo/db/repl/oplog_entry', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/top', '$BUILD_DIR/mongo/db/storage/encryption_hooks', @@ -309,8 +311,10 @@ env.Library( env.Library( target='document_source_lookup', source=[ + 'document_source_change_notification.cpp', 'document_source_graph_lookup.cpp', 'document_source_lookup.cpp', + 'document_source_lookup_change_post_image.cpp', ], LIBDEPS=[ 'document_source', @@ -318,20 +322,6 @@ env.Library( ], ) -env.CppUnitTest( - target='document_source_graph_lookup_test', - source='document_source_graph_lookup_test.cpp', - LIBDEPS=[ - 'document_source', - 'document_source_lookup', - 'document_value_test_util', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/query/query_test_service_context', - '$BUILD_DIR/mongo/db/service_context_noop_init', - '$BUILD_DIR/mongo/s/is_mongos', - ], -) - env.Library( target='document_source_facet', source=[ diff --git a/src/mongo/db/pipeline/document_source.cpp b/src/mongo/db/pipeline/document_source.cpp index 6e58adae2bf..a9d9f74181a 100644 --- a/src/mongo/db/pipeline/document_source.cpp +++ b/src/mongo/db/pipeline/document_source.cpp @@ -41,6 +41,7 @@ namespace mongo { using Parser = DocumentSource::Parser; using boost::intrusive_ptr; +using std::list; using std::string; using std::vector; @@ -60,7 +61,7 @@ void DocumentSource::registerParser(string name, Parser parser) { parserMap[name] = parser; } -vector<intrusive_ptr<DocumentSource>> DocumentSource::parse( +list<intrusive_ptr<DocumentSource>> DocumentSource::parse( const intrusive_ptr<ExpressionContext>& expCtx, BSONObj stageObj) { uassert(16435, "A pipeline stage specification object must contain exactly one field.", diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 2545f02d8de..383cd56e35f 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -85,7 +85,7 @@ class Document; MONGO_INITIALIZER(addToDocSourceParserMap_##key)(InitializerContext*) { \ auto fullParserWrapper = [](BSONElement stageSpec, \ const boost::intrusive_ptr<ExpressionContext>& expCtx) { \ - return std::vector<boost::intrusive_ptr<DocumentSource>>{ \ + return std::list<boost::intrusive_ptr<DocumentSource>>{ \ (fullParser)(stageSpec, expCtx)}; \ }; \ LiteParsedDocumentSource::registerParser("$" #key, liteParser); \ @@ -115,7 +115,7 @@ class Document; class DocumentSource : public IntrusiveCounterUnsigned { public: - using Parser = stdx::function<std::vector<boost::intrusive_ptr<DocumentSource>>( + using Parser = stdx::function<std::list<boost::intrusive_ptr<DocumentSource>>( BSONElement, const boost::intrusive_ptr<ExpressionContext>&)>; /** @@ -303,7 +303,7 @@ public: /** * Create a DocumentSource pipeline stage from 'stageObj'. */ - static std::vector<boost::intrusive_ptr<DocumentSource>> parse( + static std::list<boost::intrusive_ptr<DocumentSource>> parse( const boost::intrusive_ptr<ExpressionContext>& expCtx, BSONObj stageObj); /** diff --git a/src/mongo/db/pipeline/document_source_bucket.cpp b/src/mongo/db/pipeline/document_source_bucket.cpp index 853ef0aaa42..e209834b14d 100644 --- a/src/mongo/db/pipeline/document_source_bucket.cpp +++ b/src/mongo/db/pipeline/document_source_bucket.cpp @@ -37,6 +37,7 @@ namespace mongo { using boost::intrusive_ptr; using std::vector; +using std::list; REGISTER_MULTI_STAGE_ALIAS(bucket, LiteParsedDocumentSourceDefault::parse, @@ -52,7 +53,7 @@ intrusive_ptr<ExpressionConstant> getExpressionConstant( } } // namespace -vector<intrusive_ptr<DocumentSource>> DocumentSourceBucket::createFromBson( +list<intrusive_ptr<DocumentSource>> DocumentSourceBucket::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(40201, str::stream() << "Argument to $bucket stage must be an object, but found type: " diff --git a/src/mongo/db/pipeline/document_source_bucket.h b/src/mongo/db/pipeline/document_source_bucket.h index aa83ee07cc2..e059660fc99 100644 --- a/src/mongo/db/pipeline/document_source_bucket.h +++ b/src/mongo/db/pipeline/document_source_bucket.h @@ -40,7 +40,7 @@ public: /** * Returns a $group stage followed by a $sort stage. */ - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( + static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); private: diff --git a/src/mongo/db/pipeline/document_source_bucket_test.cpp b/src/mongo/db/pipeline/document_source_bucket_test.cpp index 1a61934dcb5..26f18c4dca9 100644 --- a/src/mongo/db/pipeline/document_source_bucket_test.cpp +++ b/src/mongo/db/pipeline/document_source_bucket_test.cpp @@ -47,21 +47,22 @@ namespace mongo { namespace { -using std::vector; using boost::intrusive_ptr; +using std::list; +using std::vector; class BucketReturnsGroupAndSort : public AggregationContextFixture { public: void testCreateFromBsonResult(BSONObj bucketSpec, Value expectedGroupExplain) { - vector<intrusive_ptr<DocumentSource>> result = + list<intrusive_ptr<DocumentSource>> result = DocumentSourceBucket::createFromBson(bucketSpec.firstElement(), getExpCtx()); ASSERT_EQUALS(result.size(), 2UL); - const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result[0].get()); + const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result.front().get()); ASSERT(groupStage); - const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result[1].get()); + const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result.back().get()); ASSERT(sortStage); // Serialize the DocumentSourceGroup and DocumentSourceSort from $bucket so that we can @@ -154,7 +155,7 @@ TEST_F(BucketReturnsGroupAndSort, BucketSucceedsWithMultipleBoundaryValues) { class InvalidBucketSpec : public AggregationContextFixture { public: - vector<intrusive_ptr<DocumentSource>> createBucket(BSONObj bucketSpec) { + list<intrusive_ptr<DocumentSource>> createBucket(BSONObj bucketSpec) { auto sources = DocumentSourceBucket::createFromBson(bucketSpec.firstElement(), getExpCtx()); return sources; } @@ -267,14 +268,14 @@ TEST_F(InvalidBucketSpec, GroupFailsForBucketWithInvalidOutputField) { TEST_F(InvalidBucketSpec, SwitchFailsForBucketWhenNoDefaultSpecified) { const auto spec = fromjson("{$bucket : {groupBy : '$x', boundaries : [1, 2, 3]}}"); - vector<intrusive_ptr<DocumentSource>> bucketStages = createBucket(spec); + list<intrusive_ptr<DocumentSource>> bucketStages = createBucket(spec); ASSERT_EQUALS(bucketStages.size(), 2UL); - auto* groupStage = dynamic_cast<DocumentSourceGroup*>(bucketStages[0].get()); + auto* groupStage = dynamic_cast<DocumentSourceGroup*>(bucketStages.front().get()); ASSERT(groupStage); - const auto* sortStage = dynamic_cast<DocumentSourceSort*>(bucketStages[1].get()); + const auto* sortStage = dynamic_cast<DocumentSourceSort*>(bucketStages.back().get()); ASSERT(sortStage); auto doc = Document{{"x", 4}}; diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp index dff06f05802..d173d1183d5 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification.cpp @@ -30,30 +30,46 @@ #include "mongo/db/pipeline/document_source_change_notification.h" +#include "mongo/bson/simple_bsonelement_comparator.h" #include "mongo/db/pipeline/document_source_limit.h" +#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/oplog_entry_gen.h" #include "mongo/util/log.h" namespace mongo { using boost::intrusive_ptr; using boost::optional; -using std::vector; +using std::list; using std::string; +using std::vector; -constexpr StringData kNamespaceField = "ns"_sd; -constexpr StringData kTimestmapField = "ts"_sd; -constexpr StringData kOpTypeField = "op"_sd; -constexpr StringData kOField = "o"_sd; -constexpr StringData kIdField = "_id"_sd; - +// The $changeNotification stage is an alias for many stages, but we need to be able to serialize +// and re-parse the pipeline. To make this work, the 'transformation' stage will serialize itself +// with the original specification, and all other stages that are created during the alias expansion +// will not serialize themselves. REGISTER_MULTI_STAGE_ALIAS(changeNotification, DocumentSourceChangeNotification::LiteParsed::parse, DocumentSourceChangeNotification::createFromBson); +constexpr StringData DocumentSourceChangeNotification::kDocumentKeyField; +constexpr StringData DocumentSourceChangeNotification::kFullDocumentField; +constexpr StringData DocumentSourceChangeNotification::kIdField; +constexpr StringData DocumentSourceChangeNotification::kNamespaceField; +constexpr StringData DocumentSourceChangeNotification::kOperationTypeField; +constexpr StringData DocumentSourceChangeNotification::kStageName; +constexpr StringData DocumentSourceChangeNotification::kTimestmapField; +constexpr StringData DocumentSourceChangeNotification::kUpdateOpType; +constexpr StringData DocumentSourceChangeNotification::kDeleteOpType; +constexpr StringData DocumentSourceChangeNotification::kReplaceOpType; +constexpr StringData DocumentSourceChangeNotification::kInsertOpType; +constexpr StringData DocumentSourceChangeNotification::kInvalidateOpType; + namespace { static constexpr StringData kOplogMatchExplainName = "$_internalOplogMatch"_sd; @@ -127,7 +143,7 @@ BSONObj DocumentSourceChangeNotification::buildMatchFilter(const NamespaceString return BSON("ts" << GT << Timestamp() << "$or" << BSON_ARRAY(opMatch << commandMatch)); } -vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( +list<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { // TODO: Add sharding support here (SERVER-29141). uassert(40470, @@ -137,17 +153,54 @@ vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFr "Only default collation is allowed when using a $changeNotification stage.", !expCtx->getCollator()); - BSONObj filter = buildMatchFilter(expCtx->ns); + uassert(40573, + str::stream() << "the $changeNotification stage must be specified as an object, got " + << typeName(elem.type()), + elem.type() == BSONType::Object); + + bool shouldLookupPostImage = false; + for (auto&& option : elem.embeddedObject()) { + auto optionName = option.fieldNameStringData(); + if (optionName == "fullDocument"_sd) { + uassert(40574, + str::stream() << "the 'fullDocument' option to the $changeNotification stage " + "must be a string, got " + << typeName(option.type()), + option.type() == BSONType::String); + auto fullDocOption = option.valueStringData(); + uassert(40575, + str::stream() << "unrecognized value for the 'fullDocument' option to the " + "$changeNotification stage. Expected \"none\" or " + "\"fullDocument\", got \"" + << option.String() + << "\"", + fullDocOption == "lookup"_sd || fullDocOption == "none"_sd); + shouldLookupPostImage = (fullDocOption == "lookup"_sd); + } else if (optionName == "resumeAfter"_sd) { + uasserted( + 40576, + "the 'resumeAfter' option to the $changeNotification stage is not yet supported"); + } else { + uasserted(40577, + str::stream() << "unrecognized option to $changeNotification stage: \"" + << optionName + << "\""); + } + } - auto oplogMatch = DocumentSourceOplogMatch::create(filter, expCtx); - auto transformation = createTransformationStage(expCtx); - return {oplogMatch, transformation}; + auto oplogMatch = DocumentSourceOplogMatch::create(buildMatchFilter(expCtx->ns), expCtx); + auto transformation = createTransformationStage(elem.embeddedObject(), expCtx); + list<intrusive_ptr<DocumentSource>> stages = {oplogMatch, transformation}; + if (shouldLookupPostImage) { + stages.push_back(DocumentSourceLookupChangePostImage::create(expCtx)); + } + return stages; } intrusive_ptr<DocumentSource> DocumentSourceChangeNotification::createTransformationStage( - const intrusive_ptr<ExpressionContext>& expCtx) { + BSONObj changeNotificationSpec, const intrusive_ptr<ExpressionContext>& expCtx) { return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation( - expCtx, stdx::make_unique<Transformation>(), "$changeNotification")); + expCtx, stdx::make_unique<Transformation>(changeNotificationSpec), kStageName.toString())); } namespace { @@ -167,89 +220,103 @@ Document DocumentSourceChangeNotification::Transformation::applyTransformation( MutableDocument doc; // Extract the fields we need. - checkValueType(input[kOpTypeField], kOpTypeField, BSONType::String); - string op = input[kOpTypeField].getString(); - Value ts = input[kTimestmapField]; - Value ns = input[kNamespaceField]; - checkValueType(ns, kNamespaceField, BSONType::String); + checkValueType(input[repl::OplogEntry::kOpTypeFieldName], + repl::OplogEntry::kOpTypeFieldName, + BSONType::String); + string op = input[repl::OplogEntry::kOpTypeFieldName].getString(); + Value ts = input[repl::OplogEntry::kTimestampFieldName]; + Value ns = input[repl::OplogEntry::kNamespaceFieldName]; + checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String); NamespaceString nss(ns.getString()); Value id = input.getNestedField("o._id"); // Non-replace updates have the _id in field "o2". Value documentId = id.missing() ? input.getNestedField("o2._id") : id; - string operationType; - Value newDocument; + StringData operationType; + Value fullDocument = Value(BSONNULL); Value updateDescription; // Deal with CRUD operations and commands. - if (op == "i") { - operationType = "insert"; - newDocument = input[kOField]; - } else if (op == "d") { - operationType = "delete"; - } else if (op == "u") { - if (id.missing()) { - operationType = "update"; - checkValueType(input[kOField], kOField, BSONType::Object); - Document o = input[kOField].getDocument(); - Value updatedFields = o["$set"]; - Value removedFields = o["$unset"]; - - // Extract the field names of $unset document. - vector<Value> removedFieldsVector; - if (removedFields.getType() == BSONType::Object) { - auto iter = removedFields.getDocument().fieldIterator(); - while (iter.more()) { - removedFieldsVector.push_back(Value(iter.next().first)); + auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeNotificationEntry.op"), op); + switch (opType) { + case repl::OpTypeEnum::kInsert: { + operationType = kInsertOpType; + fullDocument = input[repl::OplogEntry::kObjectFieldName]; + break; + } + case repl::OpTypeEnum::kDelete: { + operationType = kDeleteOpType; + break; + } + case repl::OpTypeEnum::kUpdate: { + if (id.missing()) { + operationType = kUpdateOpType; + checkValueType(input[repl::OplogEntry::kObjectFieldName], + repl::OplogEntry::kObjectFieldName, + BSONType::Object); + Document opObject = input[repl::OplogEntry::kObjectFieldName].getDocument(); + Value updatedFields = opObject["$set"]; + Value removedFields = opObject["$unset"]; + + // Extract the field names of $unset document. + vector<Value> removedFieldsVector; + if (removedFields.getType() == BSONType::Object) { + auto iter = removedFields.getDocument().fieldIterator(); + while (iter.more()) { + removedFieldsVector.push_back(Value(iter.next().first)); + } } + updateDescription = Value(Document{ + {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields}, + {"removedFields", removedFieldsVector}}); + } else { + operationType = kReplaceOpType; + fullDocument = input[repl::OplogEntry::kObjectFieldName]; } - updateDescription = Value(Document{ - {"updatedFields", updatedFields.missing() ? Value(Document()) : updatedFields}, - {"removedFields", removedFieldsVector}}); - } else { - operationType = "replace"; - newDocument = input[kOField]; + break; + } + case repl::OpTypeEnum::kCommand: { + operationType = kInvalidateOpType; + // Make sure the result doesn't have a document id. + documentId = Value(); + break; } - } else if (op == "c") { - operationType = "invalidate"; - // Make sure the result doesn't have a document id. - documentId = Value(); + default: { MONGO_UNREACHABLE; } } - // Construct the result document. If document id is missing, it will not appear in the output. + // Construct the result document. Note that 'documentId' might be the missing value, in which + // case it will not appear in the output. doc.addField( kIdField, Value(Document{{kTimestmapField, ts}, {kNamespaceField, ns}, {kIdField, documentId}})); - doc.addField("operationType", Value(operationType)); + doc.addField(kOperationTypeField, Value(operationType)); + doc.addField(kFullDocumentField, fullDocument); // "invalidate" entry has fewer fields. - if (op == "c") { + if (opType == repl::OpTypeEnum::kCommand) { return doc.freeze(); } - // Add fields for normal operations. doc.addField(kNamespaceField, Value(Document{{"db", nss.db()}, {"coll", nss.coll()}})); - doc.addField("documentKey", Value(Document{{kIdField, documentId}})); + doc.addField(kDocumentKeyField, Value(Document{{kIdField, documentId}})); - // If newDocument or updateDescription is missing, it will not be serialized. - doc.addField("newDocument", newDocument); + // Note that 'updateDescription' might be the 'missing' value, in which case it will not be + // serialized. doc.addField("updateDescription", updateDescription); return doc.freeze(); } Document DocumentSourceChangeNotification::Transformation::serializeStageOptions( boost::optional<ExplainOptions::Verbosity> explain) const { - // TODO SERVER-29135 Be sure to re-serialize the 'postImage' argument. - // TODO SERVER-29131 Be sure to re-serialize the 'resumeAfter' argument. - return Document(); + return Document(_changeNotificationSpec); } DocumentSource::GetDepsReturn DocumentSourceChangeNotification::Transformation::addDependencies( DepsTracker* deps) const { - deps->fields.insert(kOpTypeField.toString()); - deps->fields.insert(kTimestmapField.toString()); - deps->fields.insert(kNamespaceField.toString()); - deps->fields.insert(kOField.toString()); - deps->fields.insert("o2"); + deps->fields.insert(repl::OplogEntry::kOpTypeFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kTimestampFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kNamespaceFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kObjectFieldName.toString()); + deps->fields.insert(repl::OplogEntry::kObject2FieldName.toString()); return DocumentSource::GetDepsReturn::EXHAUSTIVE_ALL; } diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h index 3b38a908626..ad10a5ad210 100644 --- a/src/mongo/db/pipeline/document_source_change_notification.h +++ b/src/mongo/db/pipeline/document_source_change_notification.h @@ -51,6 +51,8 @@ public: } stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { + // TODO SERVER-29138: we need to communicate that this stage will need to look up + // documents from different collections. return stdx::unordered_set<NamespaceString>(); } @@ -62,6 +64,8 @@ public: class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface { public: + Transformation(BSONObj changeNotificationSpec) + : _changeNotificationSpec(changeNotificationSpec.getOwned()) {} ~Transformation() = default; Document applyTransformation(const Document& input) final; TransformerType getType() const final { @@ -72,10 +76,48 @@ public: boost::optional<ExplainOptions::Verbosity> explain) const final; DocumentSource::GetDepsReturn addDependencies(DepsTracker* deps) const final; DocumentSource::GetModPathsReturn getModifiedPaths() const final; + + private: + BSONObj _changeNotificationSpec; }; + // The name of the field where the document key (_id and shard key, if present) will be found + // after the transformation. + static constexpr StringData kDocumentKeyField = "documentKey"_sd; + + // The name of the field where the full document will be found after the transformation. The + // full document is only present for certain types of operations, such as an insert. + static constexpr StringData kFullDocumentField = "fullDocument"_sd; + + // The name of the field where the change identifier will be located after the transformation. + static constexpr StringData kIdField = "_id"_sd; + + // The name of the field where the namespace of the change will be located after the + // transformation. + static constexpr StringData kNamespaceField = "ns"_sd; + + // The name of the field where the type of the operation will be located after the + // transformation. + static constexpr StringData kOperationTypeField = "operationType"_sd; + + // The name of this stage. + static constexpr StringData kStageName = "$changeNotification"_sd; + + // The name of the field where the timestamp of the change will be located after the + // transformation. The timestamp will be located inside the change identifier, so the full path + // to the timestamp will be kIdField + "." + kTimestampField. + static constexpr StringData kTimestmapField = "ts"_sd; + + // The different types of operations we can use for the operation type. + static constexpr StringData kUpdateOpType = "update"_sd; + static constexpr StringData kDeleteOpType = "delete"_sd; + static constexpr StringData kReplaceOpType = "replace"_sd; + static constexpr StringData kInsertOpType = "insert"_sd; + static constexpr StringData kInvalidateOpType = "invalidate"_sd; + /** - * Produce the BSON for the $match stage based on a $changeNotification stage. + * Produce the BSON object representing the filter for the $match stage to filter oplog entries + * to only those relevant for this $changeNotification stage. */ static BSONObj buildMatchFilter(const NamespaceString& nss); @@ -83,11 +125,11 @@ public: * Parses a $changeNotification stage from 'elem' and produces the $match and transformation * stages required. */ - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( + static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); static boost::intrusive_ptr<DocumentSource> createTransformationStage( - const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + BSONObj changeNotificationSpec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); private: // It is illegal to construct a DocumentSourceChangeNotification directly, use createFromBson() diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp index b3595789577..9c08bd91637 100644 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_notification_test.cpp @@ -49,14 +49,18 @@ namespace mongo { namespace { -using std::vector; -using std::string; using boost::intrusive_ptr; -using repl::OplogEntry; using repl::OpTypeEnum; +using repl::OplogEntry; +using std::list; +using std::string; +using std::vector; + using D = Document; using V = Value; +using DSChangeNotification = DocumentSourceChangeNotification; + static const Timestamp ts(100, 1); static const repl::OpTime optime(ts, 1); static const NamespaceString nss("unittests.change_notification"); @@ -67,10 +71,10 @@ public: void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) { const auto spec = fromjson("{$changeNotification: {}}"); - vector<intrusive_ptr<DocumentSource>> result = - DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); + list<intrusive_ptr<DocumentSource>> result = + DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); - auto match = dynamic_cast<DocumentSourceMatch*>(result[0].get()); + auto match = dynamic_cast<DocumentSourceMatch*>(result.front().get()); ASSERT(match); auto mock = DocumentSourceMock::create(D(entry.toBSON())); match->setSource(mock.get()); @@ -78,7 +82,7 @@ public: // Check the oplog entry is transformed correctly. auto transform = result.back().get(); ASSERT(transform); - ASSERT_EQ(string(transform->getSourceName()), "$changeNotification"); + ASSERT_EQ(string(transform->getSourceName()), DSChangeNotification::kStageName); transform->setSource(match); auto next = transform->getNext(); @@ -94,16 +98,62 @@ public: } }; +TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedOption) { + auto expCtx = getExpCtx(); + + ASSERT_THROWS_CODE( + DSChangeNotification::createFromBson( + BSON(DSChangeNotification::kStageName << BSON("unexpected" << 4)).firstElement(), + expCtx), + UserException, + 40577); +} + +TEST_F(ChangeNotificationStageTest, ShouldRejectResumeAfterOption) { + // TODO SERVER-29131 change this test to accept the option. + auto expCtx = getExpCtx(); + + ASSERT_THROWS_CODE( + DSChangeNotification::createFromBson( + BSON(DSChangeNotification::kStageName << BSON("resumeAfter" << ts)).firstElement(), + expCtx), + UserException, + 40576); +} + +TEST_F(ChangeNotificationStageTest, ShouldRejectNonStringFullDocumentOption) { + auto expCtx = getExpCtx(); + + ASSERT_THROWS_CODE( + DSChangeNotification::createFromBson( + BSON(DSChangeNotification::kStageName << BSON("fullDocument" << true)).firstElement(), + expCtx), + UserException, + 40574); +} + +TEST_F(ChangeNotificationStageTest, ShouldRejectUnrecognizedFullDocumentOption) { + auto expCtx = getExpCtx(); + + ASSERT_THROWS_CODE(DSChangeNotification::createFromBson( + BSON(DSChangeNotification::kStageName << BSON("fullDocument" + << "unrecognized")) + .firstElement(), + expCtx), + UserException, + 40575); +} + TEST_F(ChangeNotificationStageTest, StagesGeneratedCorrectly) { const auto spec = fromjson("{$changeNotification: {}}"); - vector<intrusive_ptr<DocumentSource>> result = - DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); + list<intrusive_ptr<DocumentSource>> result = + DSChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); ASSERT_EQUALS(result.size(), 2UL); - ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result[0].get())); - ASSERT_EQUALS(string(result[0]->getSourceName()), "$changeNotification"); - ASSERT_EQUALS(string(result[1]->getSourceName()), "$changeNotification"); + ASSERT_TRUE(dynamic_cast<DocumentSourceMatch*>(result.front().get())); + ASSERT_EQUALS(string(result.front()->getSourceName()), DSChangeNotification::kStageName); + ASSERT_EQUALS(string(result.back()->getSourceName()), DSChangeNotification::kStageName); // TODO: Check explain result. } @@ -112,11 +162,11 @@ TEST_F(ChangeNotificationStageTest, TransformInsert) { OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 1)); // Insert Document expectedInsert{ - {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, - {"operationType", "insert"_sd}, - {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {"documentKey", D{{"_id", 1}}}, - {"newDocument", D{{"_id", 1}, {"x", 1}}}, + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInsertOpType}, + {DSChangeNotification::kFullDocumentField, D{{"_id", 1}, {"x", 1}}}, + {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}}, }; checkTransformation(insert, expectedInsert); } @@ -126,13 +176,15 @@ TEST_F(ChangeNotificationStageTest, TransformUpdateFields) { optime, 1, OpTypeEnum::kUpdate, nss, BSON("$set" << BSON("y" << 1)), BSON("_id" << 1)); // Update fields Document expectedUpdateField{ - {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, - {"operationType", "update"_sd}, - {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {"documentKey", D{{"_id", 1}}}, + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kUpdateOpType}, + {DSChangeNotification::kFullDocumentField, BSONNULL}, + {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}}, { "updateDescription", D{{"updatedFields", D{{"y", 1}}}, {"removedFields", vector<V>()}}, - }}; + }, + }; checkTransformation(updateField, expectedUpdateField); } @@ -141,10 +193,11 @@ TEST_F(ChangeNotificationStageTest, TransformRemoveFields) { optime, 1, OpTypeEnum::kUpdate, nss, BSON("$unset" << BSON("y" << 1)), BSON("_id" << 1)); // Remove fields Document expectedRemoveField{ - {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, - {"operationType", "update"_sd}, - {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {"documentKey", D{{"_id", 1}}}, + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kUpdateOpType}, + {DSChangeNotification::kFullDocumentField, BSONNULL}, + {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}}, { "updateDescription", D{{"updatedFields", D{}}, {"removedFields", vector<V>{V("y"_sd)}}}, }}; @@ -156,11 +209,11 @@ TEST_F(ChangeNotificationStageTest, TransformReplace) { optime, 1, OpTypeEnum::kUpdate, nss, BSON("_id" << 1 << "y" << 1), BSON("_id" << 1)); // Replace Document expectedReplace{ - {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, - {"operationType", "replace"_sd}, - {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {"documentKey", D{{"_id", 1}}}, - {"newDocument", D{{"_id", 1}, {"y", 1}}}, + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kReplaceOpType}, + {DSChangeNotification::kFullDocumentField, D{{"_id", 1}, {"y", 1}}}, + {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}}, }; checkTransformation(replace, expectedReplace); } @@ -169,10 +222,11 @@ TEST_F(ChangeNotificationStageTest, TransformDelete) { OplogEntry deleteEntry(optime, 1, OpTypeEnum::kDelete, nss, BSON("_id" << 1)); // Delete Document expectedDelete{ - {"_id", D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, - {"operationType", "delete"_sd}, - {"ns", D{{"db", nss.db()}, {"coll", nss.coll()}}}, - {"documentKey", D{{"_id", 1}}}, + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.ns()}, {"_id", 1}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kDeleteOpType}, + {DSChangeNotification::kFullDocumentField, BSONNULL}, + {DSChangeNotification::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeNotification::kDocumentKeyField, D{{"_id", 1}}}, }; checkTransformation(deleteEntry, expectedDelete); } @@ -187,7 +241,9 @@ TEST_F(ChangeNotificationStageTest, TransformInvalidate) { // Invalidate entry includes $cmd namespace in _id and doesn't have a document id. Document expectedInvalidate{ - {"_id", D{{"ts", ts}, {"ns", nss.getCommandNS().ns()}}}, {"operationType", "invalidate"_sd}, + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", nss.getCommandNS().ns()}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInvalidateOpType}, + {DSChangeNotification::kFullDocumentField, BSONNULL}, }; for (auto& entry : {dropColl, dropDB, rename}) { checkTransformation(entry, expectedInvalidate); @@ -203,8 +259,9 @@ TEST_F(ChangeNotificationStageTest, TransformInvalidateRenameDropTarget) { otherColl.getCommandNS(), BSON("renameCollection" << otherColl.ns() << "to" << nss.ns())); Document expectedInvalidate{ - {"_id", D{{"ts", ts}, {"ns", otherColl.getCommandNS().ns()}}}, - {"operationType", "invalidate"_sd}, + {DSChangeNotification::kIdField, D{{"ts", ts}, {"ns", otherColl.getCommandNS().ns()}}}, + {DSChangeNotification::kOperationTypeField, DSChangeNotification::kInvalidateOpType}, + {DSChangeNotification::kFullDocumentField, BSONNULL}, }; checkTransformation(rename, expectedInvalidate); } @@ -230,5 +287,38 @@ TEST_F(ChangeNotificationStageTest, MatchFiltersCreateIndex) { checkTransformation(createIndex, boost::none); } +TEST_F(ChangeNotificationStageTest, TransformationShouldBeAbleToReParseSerializedStage) { + auto expCtx = getExpCtx(); + + auto originalSpec = BSON(DSChangeNotification::kStageName << BSONObj()); + auto allStages = DSChangeNotification::createFromBson(originalSpec.firstElement(), expCtx); + ASSERT_EQ(allStages.size(), 2UL); + auto stage = allStages.back(); + ASSERT(dynamic_cast<DocumentSourceSingleDocumentTransformation*>(stage.get())); + + // + // Serialize the stage and confirm contents. + // + vector<Value> serialization; + stage->serializeToArray(serialization); + ASSERT_EQ(serialization.size(), 1UL); + ASSERT_EQ(serialization[0].getType(), BSONType::Object); + auto serializedDoc = serialization[0].getDocument(); + ASSERT_BSONOBJ_EQ(serializedDoc.toBson(), originalSpec); + + // + // Create a new stage from the serialization. Serialize the new stage and confirm that it is + // equivalent to the original serialization. + // + auto serializedBson = serializedDoc.toBson(); + auto roundTripped = uassertStatusOK(Pipeline::create( + DSChangeNotification::createFromBson(serializedBson.firstElement(), expCtx), expCtx)); + + auto newSerialization = roundTripped->serialize(); + + ASSERT_EQ(newSerialization.size(), 1UL); + ASSERT_VALUE_EQ(newSerialization[0], serialization[0]); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_count.cpp b/src/mongo/db/pipeline/document_source_count.cpp index 01c4044fece..0a2d299200b 100644 --- a/src/mongo/db/pipeline/document_source_count.cpp +++ b/src/mongo/db/pipeline/document_source_count.cpp @@ -39,14 +39,14 @@ namespace mongo { using boost::intrusive_ptr; -using std::vector; +using std::list; using std::string; REGISTER_MULTI_STAGE_ALIAS(count, LiteParsedDocumentSourceDefault::parse, DocumentSourceCount::createFromBson); -vector<intrusive_ptr<DocumentSource>> DocumentSourceCount::createFromBson( +list<intrusive_ptr<DocumentSource>> DocumentSourceCount::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { uassert(40156, str::stream() << "the count field must be a non-empty string", diff --git a/src/mongo/db/pipeline/document_source_count.h b/src/mongo/db/pipeline/document_source_count.h index 50fbccdf41b..e77ce3eb0ed 100644 --- a/src/mongo/db/pipeline/document_source_count.h +++ b/src/mongo/db/pipeline/document_source_count.h @@ -40,7 +40,7 @@ public: /** * Returns a $group stage followed by a $project stage. */ - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( + static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); private: diff --git a/src/mongo/db/pipeline/document_source_count_test.cpp b/src/mongo/db/pipeline/document_source_count_test.cpp index e083399b0fa..254f011ff1d 100644 --- a/src/mongo/db/pipeline/document_source_count_test.cpp +++ b/src/mongo/db/pipeline/document_source_count_test.cpp @@ -45,22 +45,23 @@ namespace mongo { namespace { -using std::vector; using boost::intrusive_ptr; +using std::list; +using std::vector; class CountReturnsGroupAndProjectStages : public AggregationContextFixture { public: void testCreateFromBsonResult(BSONObj countSpec) { - vector<intrusive_ptr<DocumentSource>> result = + list<intrusive_ptr<DocumentSource>> result = DocumentSourceCount::createFromBson(countSpec.firstElement(), getExpCtx()); ASSERT_EQUALS(result.size(), 2UL); - const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result[0].get()); + const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result.front().get()); ASSERT(groupStage); const auto* projectStage = - dynamic_cast<DocumentSourceSingleDocumentTransformation*>(result[1].get()); + dynamic_cast<DocumentSourceSingleDocumentTransformation*>(result.back().get()); ASSERT(projectStage); auto explain = ExplainOptions::Verbosity::kQueryPlanner; @@ -94,7 +95,7 @@ TEST_F(CountReturnsGroupAndProjectStages, ValidStringSpec) { class InvalidCountSpec : public AggregationContextFixture { public: - vector<intrusive_ptr<DocumentSource>> createCount(BSONObj countSpec) { + list<intrusive_ptr<DocumentSource>> createCount(BSONObj countSpec) { auto specElem = countSpec.firstElement(); return DocumentSourceCount::createFromBson(specElem, getExpCtx()); } diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp new file mode 100644 index 00000000000..ee605710a54 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -0,0 +1,125 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" + +#include "mongo/bson/simple_bsonelement_comparator.h" + +namespace mongo { + +constexpr StringData DocumentSourceLookupChangePostImage::kStageName; +constexpr StringData DocumentSourceLookupChangePostImage::kFullDocumentFieldName; + +namespace { +Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType expectedType) { + auto val = fullDoc[fieldName]; + uassert(40578, + str::stream() << "failed to look up post image after change: expected \"" << fieldName + << "\" field to have type " + << typeName(expectedType) + << ", instead found type " + << typeName(val.getType()) + << ": " + << val.toString() + << ", full object: " + << fullDoc.toString(), + val.getType() == expectedType); + return val; +} +} // namespace + +DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() { + pExpCtx->checkForInterrupt(); + + auto input = pSource->getNext(); + if (!input.isAdvanced()) { + return input; + } + auto opTypeVal = assertFieldHasType(input.getDocument(), + DocumentSourceChangeNotification::kOperationTypeField, + BSONType::String); + if (opTypeVal.getString() != DocumentSourceChangeNotification::kUpdateOpType) { + return input; + } + + MutableDocument output(input.releaseDocument()); + output[kFullDocumentFieldName] = lookupPostImage(output.peek()); + return output.freeze(); +} + +NamespaceString DocumentSourceLookupChangePostImage::assertNamespaceMatches( + const Document& inputDoc) const { + auto namespaceObject = assertFieldHasType(inputDoc, + DocumentSourceChangeNotification::kNamespaceField, + BSONType::Object) + .getDocument(); + auto dbName = assertFieldHasType(namespaceObject, "db"_sd, BSONType::String); + auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String); + NamespaceString nss(dbName.getString(), collectionName.getString()); + uassert(40579, + str::stream() << "unexpected namespace during post image lookup: " << nss.ns() + << ", expected " + << pExpCtx->ns.ns(), + nss == pExpCtx->ns); + return nss; +} + +Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updateOp) const { + // Make sure we have a well-formed input. + auto nss = assertNamespaceMatches(updateOp); + + auto documentKey = assertFieldHasType( + updateOp, DocumentSourceChangeNotification::kDocumentKeyField, BSONType::Object); + auto matchSpec = BSON("$match" << documentKey); + + // TODO SERVER-29134 we need to extract the namespace from the document and set them on the new + // ExpressionContext if we're getting notifications from an entire database. + auto foreignExpCtx = pExpCtx->copyWith(nss); + auto pipeline = uassertStatusOK(_mongod->makePipeline({matchSpec}, foreignExpCtx)); + + if (auto first = pipeline->getNext()) { + auto lookedUpDocument = Value(*first); + if (auto next = pipeline->getNext()) { + uasserted(40580, + str::stream() << "found more than document with documentKey " + << documentKey.toString() + << " while looking up post image after change: [" + << lookedUpDocument.toString() + << ", " + << next->toString() + << "]"); + } + return lookedUpDocument; + } + // We couldn't find it with the documentKey, it may have been deleted. + return Value(BSONNULL); +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h new file mode 100644 index 00000000000..ca14656a63c --- /dev/null +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -0,0 +1,115 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_notification.h" + +namespace mongo { + +/** + * Part of the change notification API machinery used to look up the post-image of a document. Uses + * the "documentKey" field of the input to look up the new version of the document. + * + * Uses the ExpressionContext to determine what collection to look up into. + * TODO SERVER-29134 When we allow change streams on multiple collections, this will need to change. + */ +class DocumentSourceLookupChangePostImage final : public DocumentSourceNeedsMongod { +public: + static constexpr StringData kStageName = "$_internalLookupChangePostImage"_sd; + static constexpr StringData kFullDocumentFieldName = + DocumentSourceChangeNotification::kFullDocumentField; + + /** + * Creates a DocumentSourceLookupChangePostImage stage. + */ + static boost::intrusive_ptr<DocumentSourceLookupChangePostImage> create( + const boost::intrusive_ptr<ExpressionContext>& expCtx) { + return new DocumentSourceLookupChangePostImage(expCtx); + } + + /** + * Only modifies a single path: "fullDocument". + */ + GetModPathsReturn getModifiedPaths() const final { + return {GetModPathsReturn::Type::kFiniteSet, {kFullDocumentFieldName.toString()}, {}}; + } + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.canSwapWithMatch = true; + constraints.isAllowedInsideFacetStage = false; + return constraints; + } + + GetDepsReturn getDependencies(DepsTracker* deps) const { + // The namespace is not technically needed yet, but we will if there is more than one + // collection involved. + deps->fields.insert(DocumentSourceChangeNotification::kNamespaceField.toString()); + deps->fields.insert(DocumentSourceChangeNotification::kDocumentKeyField.toString()); + deps->fields.insert(DocumentSourceChangeNotification::kOperationTypeField.toString()); + // This stage does not restrict the output fields to a finite set, and has no impact on + // whether metadata is available or needed. + return SEE_NEXT; + } + + /** + * Performs the lookup to retrieve the full document. + */ + GetNextResult getNext() final; + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + if (explain) { + return Value{Document{{kStageName, Document()}}}; + } + return Value(); // Do not serialize this stage unless we're explaining. + } + + const char* getSourceName() const final { + return kStageName.rawData(); + } + +private: + DocumentSourceLookupChangePostImage(const boost::intrusive_ptr<ExpressionContext>& expCtx) + : DocumentSourceNeedsMongod(expCtx) {} + + /** + * Uses the "documentKey" field from 'updateOp' to look up the current version of the document. + * Returns Value(BSONNULL) if the document couldn't be found. + */ + Value lookupPostImage(const Document& updateOp) const; + + /** + * Throws a UserException if the namespace found in 'inputDoc' doesn't match the one on the + * ExpressionContext. + */ + NamespaceString assertNamespaceMatches(const Document& inputDoc) const; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp new file mode 100644 index 00000000000..728db433c55 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -0,0 +1,266 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <boost/intrusive_ptr.hpp> +#include <deque> +#include <vector> + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/aggregation_context_fixture.h" +#include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source_change_notification.h" +#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" +#include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/field_path.h" +#include "mongo/db/pipeline/stub_mongod_interface.h" +#include "mongo/db/pipeline/value.h" + +namespace mongo { +namespace { +using boost::intrusive_ptr; +using std::deque; +using std::vector; + +// This provides access to getExpCtx(), but we'll use a different name for this test suite. +using DocumentSourceLookupChangePostImageTest = AggregationContextFixture; + +/** + * A mock MongodInterface which allows mocking a foreign pipeline. + */ +class MockMongodInterface final : public StubMongodInterface { +public: + MockMongodInterface(deque<DocumentSource::GetNextResult> mockResults) + : _mockResults(std::move(mockResults)) {} + + bool isSharded(const NamespaceString& ns) final { + return false; + } + + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx) final { + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + pipeline.getValue()->addInitialSource(DocumentSourceMock::create(_mockResults)); + pipeline.getValue()->optimizePipeline(); + + return pipeline; + } + +private: + deque<DocumentSource::GetNextResult> _mockResults; +}; + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) { + auto expCtx = getExpCtx(); + + // Set up the $lookup stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input with a document without a "documentKey" field. + auto mockLocalSource = DocumentSourceMock::create( + Document{{"operationType", "update"_sd}, + {"fullDocument", Document{{"_id", 0}}}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection. + lookupChangeStage->injectMongodInterface( + std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + + ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578); +} + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationType) { + auto expCtx = getExpCtx(); + + // Set up the $lookup stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input with a document without a "ns" field. + auto mockLocalSource = DocumentSourceMock::create( + Document{{"documentKey", Document{{"_id", 0}}}, + {"fullDocument", Document{{"_id", 0}}}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection. + lookupChangeStage->injectMongodInterface( + std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + + ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578); +} + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { + auto expCtx = getExpCtx(); + + // Set up the $lookup stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input with a document without a "ns" field. + auto mockLocalSource = DocumentSourceMock::create(Document{ + {"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, + }); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection. + lookupChangeStage->injectMongodInterface( + std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + + ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578); +} + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType) { + auto expCtx = getExpCtx(); + + // Set up the $lookup stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input with a document without a "ns" field. + auto mockLocalSource = DocumentSourceMock::create( + Document{{"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, {"ns", 4}}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection. + lookupChangeStage->injectMongodInterface( + std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + + ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40578); +} + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) { + auto expCtx = getExpCtx(); + + // Set up the $lookup stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input with a document without a "ns" field. + auto mockLocalSource = DocumentSourceMock::create( + Document{{"documentKey", Document{{"_id", 0}}}, + {"operationType", "update"_sd}, + {"ns", Document{{"db", "DIFFERENT"_sd}, {"coll", expCtx->ns.coll()}}}}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection. + lookupChangeStage->injectMongodInterface( + std::make_shared<MockMongodInterface>(deque<DocumentSource::GetNextResult>{})); + + ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40579); +} + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) { + auto expCtx = getExpCtx(); + + // Set up the $lookup stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input with an update document. + auto mockLocalSource = DocumentSourceMock::create( + Document{{"documentKey", Document{{"_id", 0}}}, + {"operationType", "update"_sd}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection to have two documents with the same document key. + deque<DocumentSource::GetNextResult> foreignCollection = {Document{{"_id", 0}}, + Document{{"_id", 0}}}; + lookupChangeStage->injectMongodInterface( + std::make_shared<MockMongodInterface>(std::move(foreignCollection))); + + ASSERT_THROWS_CODE(lookupChangeStage->getNext(), UserException, 40580); +} + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { + auto expCtx = getExpCtx(); + + // Set up the $lookup stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input, pausing every other result. + auto mockLocalSource = DocumentSourceMock::create( + {Document{{"documentKey", Document{{"_id", 0}}}, + {"operationType", "insert"_sd}, + {"fullDocument", Document{{"_id", 0}}}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}, + DocumentSource::GetNextResult::makePauseExecution(), + Document{{"documentKey", Document{{"_id", 1}}}, + {"operationType", "update"_sd}, + {"fullDocument", BSONNULL}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}, + DocumentSource::GetNextResult::makePauseExecution()}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection. + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}, + Document{{"_id", 1}}}; + lookupChangeStage->injectMongodInterface( + std::make_shared<MockMongodInterface>(std::move(mockForeignContents))); + + auto next = lookupChangeStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.releaseDocument(), + (Document{{"documentKey", Document{{"_id", 0}}}, + {"operationType", "insert"_sd}, + {"fullDocument", Document{{"_id", 0}}}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}})); + + ASSERT_TRUE(lookupChangeStage->getNext().isPaused()); + + next = lookupChangeStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.releaseDocument(), + (Document{{"documentKey", Document{{"_id", 1}}}, + {"operationType", "update"_sd}, + {"fullDocument", Document{{"_id", 1}}}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}})); + + ASSERT_TRUE(lookupChangeStage->getNext().isPaused()); + + ASSERT_TRUE(lookupChangeStage->getNext().isEOF()); + ASSERT_TRUE(lookupChangeStage->getNext().isEOF()); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.cpp b/src/mongo/db/pipeline/document_source_sort_by_count.cpp index 48cfaa9f559..97c5e0f0297 100644 --- a/src/mongo/db/pipeline/document_source_sort_by_count.cpp +++ b/src/mongo/db/pipeline/document_source_sort_by_count.cpp @@ -39,13 +39,13 @@ namespace mongo { using boost::intrusive_ptr; -using std::vector; +using std::list; REGISTER_MULTI_STAGE_ALIAS(sortByCount, LiteParsedDocumentSourceDefault::parse, DocumentSourceSortByCount::createFromBson); -vector<intrusive_ptr<DocumentSource>> DocumentSourceSortByCount::createFromBson( +list<intrusive_ptr<DocumentSource>> DocumentSourceSortByCount::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { if (elem.type() == Object) { // Make sure that the sortByCount field is an expression inside an object diff --git a/src/mongo/db/pipeline/document_source_sort_by_count.h b/src/mongo/db/pipeline/document_source_sort_by_count.h index 69bf8d2c5e0..c8196c9b498 100644 --- a/src/mongo/db/pipeline/document_source_sort_by_count.h +++ b/src/mongo/db/pipeline/document_source_sort_by_count.h @@ -40,7 +40,7 @@ public: /** * Returns a $group stage followed by a $sort stage. */ - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( + static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); private: diff --git a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp index 3e0a007c59b..12975e96149 100644 --- a/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp +++ b/src/mongo/db/pipeline/document_source_sort_by_count_test.cpp @@ -46,8 +46,9 @@ namespace mongo { namespace { -using std::vector; using boost::intrusive_ptr; +using std::list; +using std::vector; /** * Fixture to test that $sortByCount returns a DocumentSourceGroup and DocumentSourceSort. @@ -55,15 +56,15 @@ using boost::intrusive_ptr; class SortByCountReturnsGroupAndSort : public AggregationContextFixture { public: void testCreateFromBsonResult(BSONObj sortByCountSpec, Value expectedGroupExplain) { - vector<intrusive_ptr<DocumentSource>> result = + list<intrusive_ptr<DocumentSource>> result = DocumentSourceSortByCount::createFromBson(sortByCountSpec.firstElement(), getExpCtx()); ASSERT_EQUALS(result.size(), 2UL); - const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result[0].get()); + const auto* groupStage = dynamic_cast<DocumentSourceGroup*>(result.front().get()); ASSERT(groupStage); - const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result[1].get()); + const auto* sortStage = dynamic_cast<DocumentSourceSort*>(result.back().get()); ASSERT(sortStage); // Serialize the DocumentSourceGroup and DocumentSourceSort from $sortByCount so that we can @@ -111,7 +112,7 @@ TEST_F(SortByCountReturnsGroupAndSort, ExpressionInObjectSpec) { */ class InvalidSortByCountSpec : public AggregationContextFixture { public: - vector<intrusive_ptr<DocumentSource>> createSortByCount(BSONObj sortByCountSpec) { + list<intrusive_ptr<DocumentSource>> createSortByCount(BSONObj sortByCountSpec) { auto specElem = sortByCountSpec.firstElement(); return DocumentSourceSortByCount::createFromBson(specElem, getExpCtx()); } diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index d2a16a6ca87..f5f42d197c0 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -73,7 +73,7 @@ void ExpressionContext::setCollator(std::unique_ptr<CollatorInterface> coll) { } intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) const { - intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(); + intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(std::move(ns)); expCtx->explain = explain; expCtx->inShard = inShard; @@ -81,7 +81,6 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) expCtx->extSortAllowed = extSortAllowed; expCtx->bypassDocumentValidation = bypassDocumentValidation; - expCtx->ns = std::move(ns); expCtx->tempDir = tempDir; expCtx->opCtx = opCtx; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index d04b095aa4b..ba2690796f6 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -134,7 +134,9 @@ public: protected: static const int kInterruptCheckPeriod = 128; - ExpressionContext() : variablesParseState(variables.useIdGenerator()) {} + + ExpressionContext(NamespaceString nss) + : ns(std::move(nss)), variablesParseState(variables.useIdGenerator()) {} /** * Sets '_collator' and resets '_documentComparator' and '_valueComparator'. diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h index 234b54e1a10..542e5b91eea 100644 --- a/src/mongo/db/pipeline/expression_context_for_test.h +++ b/src/mongo/db/pipeline/expression_context_for_test.h @@ -40,7 +40,9 @@ namespace mongo { */ class ExpressionContextForTest : public ExpressionContext { public: - ExpressionContextForTest() = default; + ExpressionContextForTest() : ExpressionContext(NamespaceString{"test"_sd, "namespace"_sd}) {} + + ExpressionContextForTest(NamespaceString nss) : ExpressionContext(std::move(nss)) {} ExpressionContextForTest(OperationContext* opCtx, const AggregationRequest& request) : ExpressionContext(opCtx, request, nullptr, {}) {} diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index 1f37b30fc90..1402f6f504d 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -37,7 +37,11 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_source_change_notification.h" +#include "mongo/db/pipeline/document_source_lookup_change_post_image.h" +#include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" +#include "mongo/db/pipeline/document_source_project.h" #include "mongo/db/pipeline/document_value_test_util.h" #include "mongo/db/pipeline/expression_context_for_test.h" #include "mongo/db/pipeline/field_path.h" @@ -52,6 +56,8 @@ using boost::intrusive_ptr; using std::string; using std::vector; +const NamespaceString kTestNss = NamespaceString("a.collection"); + namespace Optimizations { using namespace mongo; @@ -77,7 +83,7 @@ void assertPipelineOptimizesAndSerializesTo(std::string inputPipeJson, ASSERT_EQUALS(stageElem.type(), BSONType::Object); rawPipeline.push_back(stageElem.embeddedObject()); } - AggregationRequest request(NamespaceString("a.collection"), rawPipeline); + AggregationRequest request(kTestNss, rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(opCtx.get(), request); @@ -962,6 +968,52 @@ TEST(PipelineOptimizationTest, MatchOnMaxLengthShouldMoveAcrossRename) { assertPipelineOptimizesTo(inputPipe, outputPipe); } +TEST(PipelineOptimizationTest, ChangeNotificationLookupSwapsWithIndependentMatch) { + QueryTestServiceContext testServiceContext; + auto opCtx = testServiceContext.makeOperationContext(); + + intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); + expCtx->opCtx = opCtx.get(); + + auto spec = BSON("$changeNotification" << BSON("fullDocument" + << "lookup")); + auto stages = DocumentSourceChangeNotification::createFromBson(spec.firstElement(), expCtx); + ASSERT_EQ(stages.size(), 3UL); + // Make sure the change lookup is at the end. + ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); + + auto matchPredicate = BSON("extra" + << "predicate"); + stages.push_back(DocumentSourceMatch::create(matchPredicate, expCtx)); + auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + pipeline->optimizePipeline(); + + // Make sure the $match stage has swapped before the change look up. + ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(pipeline->getSources().back().get())); +} + +TEST(PipelineOptimizationTest, ChangeNotificationLookupDoesNotSwapWithMatchOnPostImage) { + QueryTestServiceContext testServiceContext; + auto opCtx = testServiceContext.makeOperationContext(); + + intrusive_ptr<ExpressionContext> expCtx(new ExpressionContextForTest(kTestNss)); + expCtx->opCtx = opCtx.get(); + + auto spec = BSON("$changeNotification" << BSON("fullDocument" + << "lookup")); + auto stages = DocumentSourceChangeNotification::createFromBson(spec.firstElement(), expCtx); + ASSERT_EQ(stages.size(), 3UL); + // Make sure the change lookup is at the end. + ASSERT(dynamic_cast<DocumentSourceLookupChangePostImage*>(stages.back().get())); + + stages.push_back(DocumentSourceMatch::create( + BSON(DocumentSourceLookupChangePostImage::kFullDocumentFieldName << BSONNULL), expCtx)); + auto pipeline = uassertStatusOK(Pipeline::create(stages, expCtx)); + pipeline->optimizePipeline(); + + // Make sure the $match stage stays at the end. + ASSERT(dynamic_cast<DocumentSourceMatch*>(pipeline->getSources().back().get())); +} } // namespace Local namespace Sharded { @@ -986,7 +1038,7 @@ public: ASSERT_EQUALS(stageElem.type(), BSONType::Object); rawPipeline.push_back(stageElem.embeddedObject()); } - AggregationRequest request(NamespaceString("a.collection"), rawPipeline); + AggregationRequest request(kTestNss, rawPipeline); intrusive_ptr<ExpressionContextForTest> ctx = new ExpressionContextForTest(&_opCtx, request); @@ -1391,7 +1443,7 @@ TEST_F(PipelineInitialSourceNSTest, CollectionNSNotValidIfInitialStageIsCollecti auto collectionlessSource = DocumentSourceCollectionlessMock::create(); auto ctx = getExpCtx(); - ctx->ns = NamespaceString("a.collection"); + ctx->ns = kTestNss; ASSERT_NOT_OK(Pipeline::create({collectionlessSource}, ctx).getStatus()); } diff --git a/src/mongo/db/pipeline/value.h b/src/mongo/db/pipeline/value.h index 9f7a357f677..04e396dc58d 100644 --- a/src/mongo/db/pipeline/value.h +++ b/src/mongo/db/pipeline/value.h @@ -117,6 +117,9 @@ public: explicit Value(const MaxKeyLabeler&) : _storage(MaxKey) {} // MAXKEY explicit Value(const Date_t& date) : _storage(Date, date.toMillisSinceEpoch()) {} + explicit Value(const char*) = delete; // Use StringData instead to prevent accidentally + // terminating the string at the first null byte. + // TODO: add an unsafe version that can share storage with the BSONElement /// Deep-convert from BSONElement to Value explicit Value(const BSONElement& elem); |