diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-08-30 23:58:30 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2017-09-15 17:20:22 -0400 |
commit | 7626535bbcc2f90b7815cbf1a8e6d2c0bef732f1 (patch) | |
tree | 8638e4aafe02c50a616e8f319f8ed0cae068210f /src/mongo/db | |
parent | c9e5bcbc0dacfa8031f3a2aaa1c6e369d0bc26c3 (diff) | |
download | mongo-7626535bbcc2f90b7815cbf1a8e6d2c0bef732f1.tar.gz |
SERVER-30591 Do changeStream lookups by UUID instead of namespace.
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/db_raii.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/db_raii.h | 16 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image.h | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp | 57 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/expression_context.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 14 |
9 files changed, 131 insertions, 19 deletions
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index 018a2591ed3..07e65df9aa1 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -33,6 +33,7 @@ #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/repl/replication_coordinator_global.h" @@ -134,6 +135,25 @@ AutoStatsTracker::~AutoStatsTracker() { } AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx, + const StringData dbName, + const UUID& uuid) { + // Lock the database since a UUID will always be in the same database even though its + // collection name may change. + Lock::DBLock dbSLock(opCtx, dbName, MODE_IS); + + auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid); + + // If the UUID doesn't exist, we leave _autoColl to be boost::none. + if (!nss.isEmpty()) { + _autoColl.emplace( + opCtx, nss, MODE_IS, AutoGetCollection::ViewMode::kViewsForbidden, std::move(dbSLock)); + + // Note: this can yield. + _ensureMajorityCommittedSnapshotIsValid(nss, opCtx); + } +} + +AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx, const NamespaceString& nss, AutoGetCollection::ViewMode viewMode) { _autoColl.emplace(opCtx, nss, MODE_IS, MODE_IS, viewMode); @@ -226,6 +246,23 @@ AutoGetCollectionOrViewForReadCommand::AutoGetCollectionOrViewForReadCommand( ? _autoCollForRead->getDb()->getViewCatalog()->lookup(opCtx, nss.ns()) : nullptr) {} +AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand(OperationContext* opCtx, + const StringData dbName, + const UUID& uuid) { + _autoCollForRead.emplace(opCtx, dbName, uuid); + if (_autoCollForRead->getCollection()) { + _statsTracker.emplace(opCtx, + _autoCollForRead->getCollection()->ns(), + Top::LockType::ReadLocked, + _autoCollForRead->getDb()->getProfilingLevel()); + + // We have both the DB and collection locked, which is the prerequisite to do a stable shard + // version check, but we'd like to do the check after we have a satisfactory snapshot. + auto css = CollectionShardingState::get(opCtx, _autoCollForRead->getCollection()->ns()); + css->checkShardVersionOrThrow(opCtx); + } +} + void AutoGetCollectionOrViewForReadCommand::releaseLocksForView() noexcept { invariant(_view); _view = nullptr; diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h index 7ff6b8f1a1d..e251d1fe05d 100644 --- a/src/mongo/db/db_raii.h +++ b/src/mongo/db/db_raii.h @@ -268,6 +268,12 @@ public: AutoGetCollectionForRead(OperationContext* opCtx, const NamespaceString& nss) : AutoGetCollectionForRead(opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden) {} + AutoGetCollectionForRead(OperationContext* opCtx, const NamespaceString& nss, Lock::DBLock lock) + : AutoGetCollectionForRead( + opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden, std::move(lock)) {} + + AutoGetCollectionForRead(OperationContext* opCtx, const StringData dbName, const UUID& uuid); + /** * This constructor is intended for internal use and should not be used outside this file. * AutoGetCollectionForReadCommand and AutoGetCollectionOrViewForReadCommand use 'viewMode' to @@ -283,10 +289,16 @@ public: AutoGetCollection::ViewMode viewMode, Lock::DBLock lock); Database* getDb() const { + if (!_autoColl) { + return nullptr; + } return _autoColl->getDb(); } Collection* getCollection() const { + if (!_autoColl) { + return nullptr; + } return _autoColl->getCollection(); } @@ -325,6 +337,10 @@ public: : AutoGetCollectionForReadCommand( opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden, std::move(lock)) {} + AutoGetCollectionForReadCommand(OperationContext* opCtx, + const StringData dbName, + const UUID& uuid); + Database* getDb() const { return _autoCollForRead->getDb(); } diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 5c10259ce4a..462afbecb2f 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -637,6 +637,8 @@ public: * for execution. The returned pipeline is optimized and has a cursor source prepared. * * This function returns a non-OK status if parsing the pipeline failed. + * NamespaceNotFound will be returned if ExpressionContext has a UUID and that UUID doesn't + * exist anymore. That should be the only case where NamespaceNotFound gets returned. */ virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index 30f79ab8787..7c6295ea021 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -97,10 +97,18 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat updateOp, DocumentSourceChangeStream::kDocumentKeyField, BSONType::Object); auto matchSpec = BSON("$match" << documentKey); + // Extract the UUID from resume token and do change stream lookups by UUID. + ResumeToken resumeToken(updateOp[DocumentSourceChangeStream::kIdField]); + // TODO SERVER-29134 we need to extract the namespace from the document and set them on the new // ExpressionContext if we're getting notifications from an entire database. - auto foreignExpCtx = pExpCtx->copyWith(nss); - auto pipeline = uassertStatusOK(_mongod->makePipeline({matchSpec}, foreignExpCtx)); + auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getUuid()); + auto pipelineStatus = _mongod->makePipeline({matchSpec}, foreignExpCtx); + if (pipelineStatus.getStatus() == ErrorCodes::NamespaceNotFound) { + // We couldn't find the collection with UUID, it may have been dropped. + return Value(BSONNULL); + } + auto pipeline = uassertStatusOK(std::move(pipelineStatus)); if (auto first = pipeline->getNext()) { auto lookedUpDocument = Value(*first); diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h index b11c79e44d3..c51653ddb8d 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h @@ -74,6 +74,7 @@ public: deps->fields.insert(DocumentSourceChangeStream::kNamespaceField.toString()); deps->fields.insert(DocumentSourceChangeStream::kDocumentKeyField.toString()); deps->fields.insert(DocumentSourceChangeStream::kOperationTypeField.toString()); + deps->fields.insert(DocumentSourceChangeStream::kIdField.toString()); // This stage does not restrict the output fields to a finite set, and has no impact on // whether metadata is available or needed. return SEE_NEXT; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index fdeea5bffc6..66c408aa0d2 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -52,7 +52,27 @@ using std::deque; using std::vector; // This provides access to getExpCtx(), but we'll use a different name for this test suite. -using DocumentSourceLookupChangePostImageTest = AggregationContextFixture; +class DocumentSourceLookupChangePostImageTest : public AggregationContextFixture { +public: + /** + * This method is required to avoid a static initialization fiasco resulting from calling + * UUID::gen() in file static scope. + */ + static const UUID& testUuid() { + static const UUID* uuid_gen = new UUID(UUID::gen()); + return *uuid_gen; + } + + Document makeResumeToken(ImplicitValue id = Value()) { + const Timestamp ts(100, 1); + if (id.missing()) { + return {{"clusterTime", Document{{"ts", ts}}}, {"uuid", testUuid()}}; + } + return {{"clusterTime", Document{{"ts", ts}}}, + {"uuid", testUuid()}, + {"documentKey", Document{{"_id", id}}}}; + } +}; /** * A mock MongodInterface which allows mocking a foreign pipeline. @@ -92,7 +112,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO // Mock its input with a document without a "documentKey" field. auto mockLocalSource = DocumentSourceMock::create( - Document{{"operationType", "update"_sd}, + Document{{"_id", makeResumeToken(0)}, + {"operationType", "update"_sd}, {"fullDocument", Document{{"_id", 0}}}, {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}); @@ -113,7 +134,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::create( - Document{{"documentKey", Document{{"_id", 0}}}, + Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, {"fullDocument", Document{{"_id", 0}}}, {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}); @@ -134,7 +156,9 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) { // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::create(Document{ - {"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, + {"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, + {"operationType", "update"_sd}, }); lookupChangeStage->setSource(mockLocalSource.get()); @@ -153,8 +177,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx); // Mock its input with a document without a "ns" field. - auto mockLocalSource = DocumentSourceMock::create( - Document{{"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, {"ns", 4}}); + auto mockLocalSource = + DocumentSourceMock::create(Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, + {"operationType", "update"_sd}, + {"ns", 4}}); lookupChangeStage->setSource(mockLocalSource.get()); @@ -173,7 +200,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch // Mock its input with a document without a "ns" field. auto mockLocalSource = DocumentSourceMock::create( - Document{{"documentKey", Document{{"_id", 0}}}, + Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, {"ns", Document{{"db", "DIFFERENT"_sd}, {"coll", expCtx->ns.coll()}}}}); @@ -194,7 +222,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni // Mock its input with an update document. auto mockLocalSource = DocumentSourceMock::create( - Document{{"documentKey", Document{{"_id", 0}}}, + Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}); @@ -217,12 +246,14 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { // Mock its input, pausing every other result. auto mockLocalSource = DocumentSourceMock::create( - {Document{{"documentKey", Document{{"_id", 0}}}, + {Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, {"operationType", "insert"_sd}, {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}, {"fullDocument", Document{{"_id", 0}}}}, DocumentSource::GetNextResult::makePauseExecution(), - Document{{"documentKey", Document{{"_id", 1}}}, + Document{{"_id", makeResumeToken(1)}, + {"documentKey", Document{{"_id", 1}}}, {"operationType", "update"_sd}, {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}}, DocumentSource::GetNextResult::makePauseExecution()}); @@ -239,7 +270,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { ASSERT_TRUE(next.isAdvanced()); ASSERT_DOCUMENT_EQ( next.releaseDocument(), - (Document{{"documentKey", Document{{"_id", 0}}}, + (Document{{"_id", makeResumeToken(0)}, + {"documentKey", Document{{"_id", 0}}}, {"operationType", "insert"_sd}, {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}, {"fullDocument", Document{{"_id", 0}}}})); @@ -250,7 +282,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { ASSERT_TRUE(next.isAdvanced()); ASSERT_DOCUMENT_EQ( next.releaseDocument(), - (Document{{"documentKey", Document{{"_id", 1}}}, + (Document{{"_id", makeResumeToken(1)}, + {"documentKey", Document{{"_id", 1}}}, {"operationType", "update"_sd}, {"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}, {"fullDocument", Document{{"_id", 1}}}})); diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp index 0a6005c73b0..8f15fb5a3fe 100644 --- a/src/mongo/db/pipeline/expression_context.cpp +++ b/src/mongo/db/pipeline/expression_context.cpp @@ -74,9 +74,11 @@ void ExpressionContext::setCollator(std::unique_ptr<CollatorInterface> coll) { _valueComparator = ValueComparator(_collator.get()); } -intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) const { +intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns, + boost::optional<UUID> uuid) const { intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(std::move(ns)); + expCtx->uuid = std::move(uuid); expCtx->explain = explain; expCtx->needsMerge = needsMerge; expCtx->fromMongos = fromMongos; diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index 037f1b81383..4fe6bc8e541 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -46,6 +46,7 @@ #include "mongo/db/query/tailable_mode.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/string_map.h" +#include "mongo/util/uuid.h" namespace mongo { @@ -88,9 +89,10 @@ public: /** * Returns an ExpressionContext that is identical to 'this' that can be used to execute a - * separate aggregation pipeline on 'ns'. + * separate aggregation pipeline on 'ns' with the optional 'uuid'. */ - boost::intrusive_ptr<ExpressionContext> copyWith(NamespaceString ns) const; + boost::intrusive_ptr<ExpressionContext> copyWith( + NamespaceString ns, boost::optional<UUID> uuid = boost::none) const; /** * Returns the ResolvedNamespace corresponding to 'nss'. It is an error to call this method on a @@ -125,6 +127,7 @@ public: bool from34Mongos = false; NamespaceString ns; + boost::optional<UUID> uuid; std::string tempDir; // Defaults to empty to prevent external sorting in mongos. OperationContext* opCtx; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 66d4baf9379..75fb836c319 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -204,7 +204,17 @@ public: pipeline.getValue()->optimizePipeline(); - AutoGetCollectionForReadCommand autoColl(expCtx->opCtx, expCtx->ns); + boost::optional<AutoGetCollectionForReadCommand> autoColl; + if (expCtx->uuid) { + autoColl.emplace(expCtx->opCtx, expCtx->ns.db(), *expCtx->uuid); + if (autoColl->getCollection() == nullptr) { + // The UUID doesn't exist anymore. + return {ErrorCodes::NamespaceNotFound, + "No namespace with UUID " + expCtx->uuid->toString()}; + } + } else { + autoColl.emplace(expCtx->opCtx, expCtx->ns); + } // makePipeline() is only called to perform secondary aggregation requests and expects the // collection representing the document source to be not-sharded. We confirm sharding state @@ -217,7 +227,7 @@ public: uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata())); PipelineD::prepareCursorSource( - autoColl.getCollection(), expCtx->ns, nullptr, pipeline.getValue().get()); + autoColl->getCollection(), expCtx->ns, nullptr, pipeline.getValue().get()); return pipeline; } |