diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-02 16:48:04 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-04 17:01:01 -0400 |
commit | 69005c338fe103892d2941791b12f5c06aae3394 (patch) | |
tree | 02fad539a49ccffa20a96c99949c977b7c0896a5 /src/mongo/db | |
parent | b7631b5698effad672d22014156dc16d6f51bf3d (diff) | |
download | mongo-69005c338fe103892d2941791b12f5c06aae3394.tar.gz |
SERVER-33820: Allow change streams with 'updateLookup': "fullDocument" against an entire database
Diffstat (limited to 'src/mongo/db')
4 files changed, 76 insertions, 19 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 2575b233d0f..f393efbeae9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -434,12 +434,6 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( << "\"", fullDocOption == "updateLookup"_sd || fullDocOption == "default"_sd); - // TODO: SERVER-33820 should add support for 'updateLookup' with a change stream on a whole - // database. - uassert(50761, - "'updateLookup' not supported with a change stream on an entire database.", - fullDocOption != "updateLookup"_sd || !expCtx->ns.isCollectionlessAggregateNS()); - const bool shouldLookupPostImage = (fullDocOption == "updateLookup"_sd); list<intrusive_ptr<DocumentSource>> stages; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index b0141c7988f..c6f5c456c7d 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -73,7 +73,7 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() { return output.freeze(); } -NamespaceString DocumentSourceLookupChangePostImage::assertNamespaceMatches( +NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace( const Document& inputDoc) const { auto namespaceObject = assertFieldHasType(inputDoc, DocumentSourceChangeStream::kNamespaceField, BSONType::Object) @@ -81,17 +81,20 @@ NamespaceString DocumentSourceLookupChangePostImage::assertNamespaceMatches( auto dbName = assertFieldHasType(namespaceObject, "db"_sd, BSONType::String); auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String); NamespaceString nss(dbName.getString(), collectionName.getString()); + + // Change streams on an entire database only need to verify that the database names match. uassert(40579, str::stream() << "unexpected namespace during post image lookup: " << nss.ns() << ", expected " << pExpCtx->ns.ns(), - nss == pExpCtx->ns); + nss == pExpCtx->ns || + (pExpCtx->ns.isCollectionlessAggregateNS() && nss.db() == pExpCtx->ns.db())); return nss; } Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updateOp) const { // Make sure we have a well-formed input. - auto nss = assertNamespaceMatches(updateOp); + auto nss = assertValidNamespace(updateOp); auto documentKey = assertFieldHasType(updateOp, DocumentSourceChangeStream::kDocumentKeyField, diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index 8f377b46986..efd5886d6ee 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -117,9 +117,10 @@ private: /** * Throws a AssertionException if the namespace found in 'inputDoc' doesn't match the one on the - * ExpressionContext. + * ExpressionContext. If the namespace on the ExpressionContext is 'collectionless', then this + * function verifies that the only the database names match. */ - NamespaceString assertNamespaceMatches(const Document& inputDoc) const; + NamespaceString assertValidNamespace(const Document& inputDoc) const; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index 175806d5f7e..b90ca65f594 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -119,7 +119,11 @@ public: UUID collectionUUID, const Document& documentKey, boost::optional<BSONObj> readConcern) { - auto swPipeline = makePipeline({BSON("$match" << documentKey)}, expCtx); + // The namespace 'nss' may be different than the namespace on the ExpressionContext in the + // case of a change stream on a whole database so we need to make a copy of the + // ExpressionContext with the new namespace. + auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID, boost::none); + auto swPipeline = makePipeline({BSON("$match" << documentKey)}, foreignExpCtx); if (swPipeline == ErrorCodes::NamespaceNotFound) { return boost::none; } @@ -146,7 +150,7 @@ private: TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyOnUpdate) { auto expCtx = getExpCtx(); - // Set up the $lookup stage. + // Set up the lookup change post image stage. auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input with a document without a "documentKey" field. @@ -168,7 +172,7 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationType) { auto expCtx = getExpCtx(); - // Set up the $lookup stage. + // Set up the lookup change post image stage. auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input with a document without a "ns" field. @@ -190,7 +194,7 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { auto expCtx = getExpCtx(); - // Set up the $lookup stage. + // Set up the lookup change post image stage. auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input with a document without a "ns" field. @@ -212,7 +216,7 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType) { auto expCtx = getExpCtx(); - // Set up the $lookup stage. + // Set up the lookup change post image stage. auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input with a document without a "ns" field. @@ -234,7 +238,7 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatchPipeline) { auto expCtx = getExpCtx(); - // Set up the $lookup stage. + // Set up the lookup change post image stage. auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input with a document without a "ns" field. @@ -253,10 +257,65 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579); } +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDatabaseMismatchOnCollectionlessNss) { + auto expCtx = getExpCtx(); + + expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test"); + + // Set up the lookup change post image stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock its input with a document without a "ns" field. + auto mockLocalSource = DocumentSourceMock::create( + Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, + {"operationType", "update"_sd}, + {"ns", Document{{"db", "DIFFERENT"_sd}, {"coll", "irrelevant"_sd}}}}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + // Mock out the foreign collection. + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}}; + expCtx->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(mockForeignContents); + + ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40579); +} + +TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPassIfDatabaseMatchesOnCollectionlessNss) { + auto expCtx = getExpCtx(); + + expCtx->ns = NamespaceString::makeCollectionlessAggregateNSS("test"); + + // Set up the lookup change post image stage. + auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); + + // Mock out the foreign collection. + deque<DocumentSource::GetNextResult> mockForeignContents{Document{{"_id", 0}}}; + expCtx->mongoProcessInterface = stdx::make_unique<MockMongoInterface>(mockForeignContents); + + auto mockLocalSource = DocumentSourceMock::create( + Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, + {"operationType", "update"_sd}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", "irrelevant"_sd}}}}); + + lookupChangeStage->setSource(mockLocalSource.get()); + + auto next = lookupChangeStage->getNext(); + ASSERT_TRUE(next.isAdvanced()); + ASSERT_DOCUMENT_EQ( + next.releaseDocument(), + (Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, + {"operationType", "update"_sd}, + {"ns", Document{{"db", expCtx->ns.db()}, {"coll", "irrelevant"_sd}}}, + {"fullDocument", Document{{"_id", 0}}}})); +} + TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUnique) { auto expCtx = getExpCtx(); - // Set up the $lookup stage. + // Set up the lookup change post image stage. auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input with an update document. @@ -281,7 +340,7 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { auto expCtx = getExpCtx(); - // Set up the $lookup stage. + // Set up the lookup change post image stage. auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input, pausing every other result. |