diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-03 17:00:41 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-11 15:53:03 -0400 |
commit | fff261ac550155065fce4b7b1529061f18980599 (patch) | |
tree | 09ce022d7b8319f1af3c2db2354427ecfe1aa389 /src/mongo/db/pipeline | |
parent | 0fa7bcb8bea5d4585fdbc1003b5116cd7bf28540 (diff) | |
download | mongo-fff261ac550155065fce4b7b1529061f18980599.tar.gz |
SERVER-29134: Support change streams on an entire database in a sharded cluster
Diffstat (limited to 'src/mongo/db/pipeline')
12 files changed, 118 insertions, 63 deletions
diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp index 445feaa1a51..44490738890 100644 --- a/src/mongo/db/pipeline/document_source_add_fields.cpp +++ b/src/mongo/db/pipeline/document_source_add_fields.cpp @@ -47,9 +47,14 @@ REGISTER_DOCUMENT_SOURCE(addFields, intrusive_ptr<DocumentSource> DocumentSourceAddFields::create( BSONObj addFieldsSpec, const intrusive_ptr<ExpressionContext>& expCtx) { + + const bool isIndependentOfAnyCollection = false; intrusive_ptr<DocumentSourceSingleDocumentTransformation> addFields( new DocumentSourceSingleDocumentTransformation( - expCtx, ParsedAddFields::create(expCtx, addFieldsSpec), "$addFields")); + expCtx, + ParsedAddFields::create(expCtx, addFieldsSpec), + "$addFields", + isIndependentOfAnyCollection)); return addFields; } diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index b227afeac5f..0ef9addfaac 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -498,10 +498,14 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage( BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) { + // Mark the transformation stage as independent of any collection if the change stream is + // watching all collections in the database. + const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS(); return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation( expCtx, stdx::make_unique<Transformation>(expCtx, changeStreamSpec), - kStageName.toString())); + kStageName.toString(), + isIndependentOfAnyCollection)); } Document DocumentSourceChangeStream::Transformation::applyTransformation(const Document& input) { @@ -532,28 +536,28 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D Value ns = input[repl::OplogEntry::kNamespaceFieldName]; checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String); Value uuid = input[repl::OplogEntry::kUuidFieldName]; - if (!uuid.missing()) { + std::vector<FieldPath> documentKeyFields; + + // Deal with CRUD operations and commands. + auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op); + + // Ignore commands in the oplog when looking up the document key fields since a command implies + // that the change stream is about to be invalidated (e.g. collection drop). + if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) { checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData); - // We need to retrieve the document key fields if our cached copy has not been populated. If - // the collection was unsharded but has now transitioned to a sharded state, we must update - // the documentKey fields to include the shard key. We only need to re-check the documentKey - // while the collection is unsharded; if the collection is or becomes sharded, then the - // documentKey is final and will not change. - if (!_documentKeyFieldsSharded) { - // If this is not a shard server, 'catalogCache' will be nullptr and we will skip the - // routing table check. - auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache(); - const bool collectionIsSharded = catalogCache && [catalogCache, this]() { - auto routingInfo = - catalogCache->getCollectionRoutingInfo(_expCtx->opCtx, _expCtx->ns); - return routingInfo.isOK() && routingInfo.getValue().cm(); - }(); - if (_documentKeyFields.empty() || collectionIsSharded) { - _documentKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields( - _expCtx->opCtx, _expCtx->ns, uuid.getUuid()); - _documentKeyFieldsSharded = collectionIsSharded; + // We need to retrieve the document key fields if our cache does not have an entry for this + // UUID or if the cache entry is not definitively final, indicating that the collection was + // unsharded when the entry was last populated. + auto it = _documentKeyCache.find(uuid.getUuid()); + if (it == _documentKeyCache.end() || !it->second.isFinal) { + auto docKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields( + _expCtx->opCtx, uuid.getUuid()); + if (it == _documentKeyCache.end() || docKeyFields.second) { + _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields); } } + + documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields; } NamespaceString nss(ns.getString()); Value id = input.getNestedField("o._id"); @@ -563,14 +567,12 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D Value updateDescription; Value documentKey; - // Deal with CRUD operations and commands. - auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op); switch (opType) { case repl::OpTypeEnum::kInsert: { operationType = kInsertOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; documentKey = Value(document_path_support::extractDocumentKeyFromDoc( - fullDocument.getDocument(), _documentKeyFields)); + fullDocument.getDocument(), documentKeyFields)); break; } case repl::OpTypeEnum::kDelete: { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 3360a9aca14..8d39159dc0f 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -106,13 +106,25 @@ public: boost::intrusive_ptr<ExpressionContext> _expCtx; BSONObj _changeStreamSpec; - // Fields of the document key, in order, including the shard key if the collection is - // sharded, and anyway "_id". Empty until the first oplog entry with a uuid is encountered. - // Needed for transforming 'insert' oplog entries. - std::vector<FieldPath> _documentKeyFields; + struct DocumentKeyCacheEntry { + DocumentKeyCacheEntry() = default; + + DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn) + : documentKeyFields(documentKeyFieldsIn.first), + isFinal(documentKeyFieldsIn.second){}; + // Fields of the document key, in order, including "_id" and the shard key if the + // collection is sharded. Empty until the first oplog entry with a uuid is encountered. + // Needed for transforming 'insert' oplog entries. + std::vector<FieldPath> documentKeyFields; + + // Set to true if the document key fields for this entry are definitively known and will + // not change. This implies that either the collection has become sharded or has been + // dropped. + bool isFinal; + }; - // Set to true if the collection is found to be sharded while retrieving _documentKeyFields. - bool _documentKeyFieldsSharded = false; + // Map of collection UUID to document key fields. + std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache; }; // The name of the field where the document key (_id and shard key, if present) will be found diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 32df8e68959..e89e9e0395b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -85,10 +85,9 @@ struct MockMongoInterface final : public StubMongoProcessInterface { MockMongoInterface(std::vector<FieldPath> fields) : _fields(std::move(fields)) {} - std::vector<FieldPath> collectDocumentKeyFields(OperationContext*, - const NamespaceString&, - UUID) const final { - return _fields; + std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*, + UUID) const final { + return {_fields, true}; } std::vector<FieldPath> _fields; diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp index 6cf11fc5a51..f5d560c4c9d 100644 --- a/src/mongo/db/pipeline/document_source_project.cpp +++ b/src/mongo/db/pipeline/document_source_project.cpp @@ -47,8 +47,12 @@ REGISTER_DOCUMENT_SOURCE(project, intrusive_ptr<DocumentSource> DocumentSourceProject::create( BSONObj projectSpec, const intrusive_ptr<ExpressionContext>& expCtx) { + const bool isIndependentOfAnyCollection = false; intrusive_ptr<DocumentSource> project(new DocumentSourceSingleDocumentTransformation( - expCtx, ParsedAggregationProjection::create(expCtx, projectSpec), "$project")); + expCtx, + ParsedAggregationProjection::create(expCtx, projectSpec), + "$project", + isIndependentOfAnyCollection)); return project; } diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp index 089ff2d9c98..6e7ce90cc73 100644 --- a/src/mongo/db/pipeline/document_source_replace_root.cpp +++ b/src/mongo/db/pipeline/document_source_replace_root.cpp @@ -148,8 +148,12 @@ REGISTER_DOCUMENT_SOURCE(replaceRoot, intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { + const bool isIndependentOfAnyCollection = false; return new DocumentSourceSingleDocumentTransformation( - pExpCtx, ReplaceRootTransformation::create(pExpCtx, elem), "$replaceRoot"); + pExpCtx, + ReplaceRootTransformation::create(pExpCtx, elem), + "$replaceRoot", + isIndependentOfAnyCollection); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index deec3f342ae..41be7bbb096 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -45,10 +45,12 @@ using boost::intrusive_ptr; DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransformation( const intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<TransformerInterface> parsedTransform, - std::string name) + std::string name, + bool isIndependentOfAnyCollection) : DocumentSource(pExpCtx), _parsedTransform(std::move(parsedTransform)), - _name(std::move(name)) {} + _name(std::move(name)), + _isIndependentOfAnyCollection(isIndependentOfAnyCollection) {} const char* DocumentSourceSingleDocumentTransformation::getSourceName() const { return _name.c_str(); diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index bffa484c46b..5d0e850d3fd 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -94,7 +94,8 @@ public: DocumentSourceSingleDocumentTransformation( const boost::intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<TransformerInterface> parsedTransform, - std::string name); + std::string name, + bool independentOfAnyCollection); // virtuals from DocumentSource const char* getSourceName() const final; @@ -122,6 +123,9 @@ public: constraints.canSwapWithMatch = true; constraints.canSwapWithLimit = true; + // This transformation could be part of a 'collectionless' change stream on an entire + // database or cluster, mark as independent of any collection if so. + constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection; return constraints; } @@ -146,6 +150,9 @@ private: // Specific name of the transformation. std::string _name; + // Set to true if this transformation stage can be run on the collectionless namespace. + bool _isIndependentOfAnyCollection; + // Cached stage options in case this DocumentSource is disposed before serialized (e.g. explain // with a sort which will auto-dispose of the pipeline). Document _cachedStageOptions; diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 3f63f7e66ff..dac98d07acc 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -183,12 +183,14 @@ public: virtual std::string getShardName(OperationContext* opCtx) const = 0; /** - * Returns the fields of the document key (in order) for the collection given by 'nss' and - * 'UUID', including the shard key and _id. If _id is not in the shard key, it is added last. - */ - virtual std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx, - const NamespaceString& nss, - UUID uuid) const = 0; + * Returns the fields of the document key (in order) for the collection corresponding to 'uuid', + * including the shard key and _id. If _id is not in the shard key, it is added last. If the + * collection is not sharded or no longer exists, returns only _id. Also retrurns a boolean that + * indicates whether the returned fields of the document key are final and will never change for + * the given collection, either because the collection was dropped or has become sharded. + */ + virtual std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields( + OperationContext* opCtx, UUID uuid) const = 0; /** * Returns zero or one documents with the document key 'documentKey'. 'documentKey' is treated diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index a3f1bdf3275..52b7860c511 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -40,6 +40,7 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_catalog.h" +#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" @@ -743,10 +744,31 @@ std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) co return std::string(); } -std::vector<FieldPath> PipelineD::MongoDInterface::collectDocumentKeyFields( - OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const { - if (!ShardingState::get(opCtx)->enabled()) { - return {"_id"}; // Nothing is sharded. +std::pair<std::vector<FieldPath>, bool> PipelineD::MongoDInterface::collectDocumentKeyFields( + OperationContext* opCtx, UUID uuid) const { + if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { + return {{"_id"}, false}; // Nothing is sharded. + } + + // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and + // mark the fields as final. + auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid); + if (nss.isEmpty()) { + return {{"_id"}, true}; + } + + // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache + // to determine whether the collection is sharded in the first place. + auto catalogCache = Grid::get(opCtx)->catalogCache(); + + const bool collectionIsSharded = catalogCache && [&]() { + auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); + return routingInfo.isOK() && routingInfo.getValue().cm(); + }(); + + // Collection exists and is not sharded, mark as not final. + if (!collectionIsSharded) { + return {{"_id"}, false}; } auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { @@ -754,15 +776,12 @@ std::vector<FieldPath> PipelineD::MongoDInterface::collectDocumentKeyFields( return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); }(); - if (!scm) { - return {"_id"}; // Collection is not sharded. + // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated + // as sharded. + if (!scm || !scm->uuidMatches(uuid)) { + return {{"_id"}, false}; } - uassert(ErrorCodes::InvalidUUID, - str::stream() << "Collection " << nss.ns() - << " UUID differs from UUID on change stream operations", - scm->uuidMatches(uuid)); - // Unpack the shard key. std::vector<FieldPath> result; bool gotId = false; @@ -773,7 +792,8 @@ std::vector<FieldPath> PipelineD::MongoDInterface::collectDocumentKeyFields( if (!gotId) { // If not part of the shard key, "_id" comes last. result.emplace_back("_id"); } - return result; + // Collection is now sharded so the document key fields will never change, mark as final. + return {result, true}; } std::vector<GenericCursor> PipelineD::MongoDInterface::getCursors( diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index e9ab4fef8d2..4064b38e7c8 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -98,9 +98,8 @@ public: Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; - std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx, - const NamespaceString& nss, - UUID uuid) const final; + std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext* opCtx, + UUID uuid) const final; boost::optional<Document> lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 9df0bb7baba..69515dfd853 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -125,9 +125,8 @@ public: MONGO_UNREACHABLE; } - std::vector<FieldPath> collectDocumentKeyFields(OperationContext*, - const NamespaceString&, - UUID) const override { + std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*, + UUID) const override { MONGO_UNREACHABLE; } |