diff options
author | Nathan Myers <ncm@cantrip.org> | 2017-10-10 00:09:08 -0400 |
---|---|---|
committer | Nathan Myers <nathan.myers@10gen.com> | 2017-10-20 16:42:06 -0400 |
commit | f32371804b610a22fabd7ef8bc726c431c547cf3 (patch) | |
tree | 9e14810485ca9cd13a3997bae0dc8ef3539670f4 /src/mongo/db | |
parent | cebd8dfd83281b224bc6bb155b134f7f12d1f37f (diff) | |
download | mongo-f32371804b610a22fabd7ef8bc726c431c547cf3.tar.gz |
SERVER-31192 Make Change Stream extract documentKey from insert op log entry
Diffstat (limited to 'src/mongo/db')
10 files changed, 149 insertions, 11 deletions
diff --git a/src/mongo/db/pipeline/document_path_support.cpp b/src/mongo/db/pipeline/document_path_support.cpp index a2b1b11378d..2ac115a7a3a 100644 --- a/src/mongo/db/pipeline/document_path_support.cpp +++ b/src/mongo/db/pipeline/document_path_support.cpp @@ -151,5 +151,13 @@ BSONObj documentToBsonWithPaths(const Document& input, const std::set<std::strin return outputBuilder.obj(); } +Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths) { + MutableDocument result; + for (auto& field : paths) { + result.addField(field.fullPath(), doc.getNestedField(field)); + } + return result.freeze(); +} + } // namespace document_path_support } // namespace mongo diff --git a/src/mongo/db/pipeline/document_path_support.h b/src/mongo/db/pipeline/document_path_support.h index 8882bf816dd..d8dd894a76d 100644 --- a/src/mongo/db/pipeline/document_path_support.h +++ b/src/mongo/db/pipeline/document_path_support.h @@ -64,5 +64,10 @@ StatusWith<Value> extractElementAlongNonArrayPath(const Document& doc, const Fie */ BSONObj documentToBsonWithPaths(const Document&, const std::set<std::string>& paths); +/** + * Extracts 'paths' from the input document to a flat document. + */ +Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths); + } // namespace document_path_support } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index bc9dce714ad..6b3754f5b8e 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -47,6 +47,7 @@ #include "mongo/db/pipeline/dependencies.h" #include "mongo/db/pipeline/document.h" #include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/pipeline/field_path.h" #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/value.h" @@ -832,6 +833,12 @@ public: */ virtual std::string getShardName(OperationContext* opCtx) const = 0; + /** + * Returns the fields of the document key (in order) for the current collection, including + * the shard key and _id. If _id is not in the shard key, it is added last. + */ + virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0; + // Add new methods as needed. }; diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 5e385da0420..b42cb41bdc0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -35,6 +35,7 @@ #include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/commands/feature_compatibility_version_command_parser.h" #include "mongo/db/pipeline/close_change_stream_exception.h" +#include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_check_resume_token.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_lookup_change_post_image.h" @@ -383,6 +384,9 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D Value uuid = input[repl::OplogEntry::kUuidFieldName]; if (!uuid.missing()) { checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData); + if (_mongoProcess && _documentKeyFields.empty()) { + _documentKeyFields = _mongoProcess->collectDocumentKeyFields(uuid.getUuid()); + } } NamespaceString nss(ns.getString()); Value id = input.getNestedField("o._id"); @@ -398,7 +402,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D case repl::OpTypeEnum::kInsert: { operationType = kInsertOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; - documentKey = Value(Document{{kIdField, id}}); + documentKey = Value(document_path_support::extractDocumentKeyFromDoc( + fullDocument.getDocument(), _documentKeyFields)); break; } case repl::OpTypeEnum::kDelete: { diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index fe1d6bfa199..e2d1498e482 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -32,6 +32,7 @@ #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_single_document_transformation.h" #include "mongo/db/pipeline/document_sources_gen.h" +#include "mongo/db/pipeline/field_path.h" namespace mongo { @@ -88,6 +89,11 @@ public: private: boost::intrusive_ptr<ExpressionContext> _expCtx; BSONObj _changeStreamSpec; + + // Fields of the document key, in order, including the shard key if the collection is + // sharded, and anyway "_id". Empty until the first oplog entry with a uuid is encountered. + // Needed for transforming 'insert' oplog entries. + std::vector<FieldPath> _documentKeyFields; }; // The sort pattern used to merge results from multiple change streams on a mongos. diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 7682972c787..f59545111f0 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -36,12 +36,14 @@ #include "mongo/db/pipeline/aggregation_context_fixture.h" #include "mongo/db/pipeline/close_change_stream_exception.h" #include "mongo/db/pipeline/document.h" +#include "mongo/db/pipeline/document_source.h" #include "mongo/db/pipeline/document_source_change_stream.h" #include "mongo/db/pipeline/document_source_limit.h" #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_mock.h" #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_value_test_util.h" +#include "mongo/db/pipeline/stub_mongo_process_interface.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/pipeline/value_comparator.h" #include "mongo/db/repl/oplog_entry.h" @@ -81,6 +83,18 @@ private: EnsureFCV _ensureFCV; }; +// This is needed only for the "insert" tests. +struct MockMongoProcessInterface final : public StubMongoProcessInterface { + + MockMongoProcessInterface(std::vector<FieldPath> fields) : _fields(std::move(fields)) {} + + std::vector<FieldPath> collectDocumentKeyFields(UUID) const final { + return _fields; + } + + std::vector<FieldPath> _fields; +}; + class ChangeStreamStageTest : public ChangeStreamStageTestNoSetup { public: ChangeStreamStageTest() : ChangeStreamStageTestNoSetup() { @@ -89,10 +103,16 @@ public: getExpCtx()->opCtx->getServiceContext())); } - void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) { + void checkTransformation(const OplogEntry& entry, + const boost::optional<Document> expectedDoc, + std::vector<FieldPath> docKeyFields = {}) { vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry); auto transform = stages[2].get(); + auto mongoProcess = std::make_shared<MockMongoProcessInterface>(docKeyFields); + using NeedyDS = DocumentSourceNeedsMongoProcessInterface; + dynamic_cast<NeedyDS&>(*transform).injectMongoProcessInterface(std::move(mongoProcess)); + auto next = transform->getNext(); // Match stage should pass the doc down if expectedDoc is given. ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc)); @@ -217,20 +237,45 @@ TEST_F(ChangeStreamStageTest, StagesGeneratedCorrectly) { // TODO: Check explain result. } -TEST_F(ChangeStreamStageTest, TransformInsert) { - OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 1)); +TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) { + OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2)); + insert.setUuid(testUuid()); + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal. + }; + checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}}); + insert.setFromMigrate(false); // also check actual "fromMigrate: false" not filtered + checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}}); +} + +TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) { + OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("x" << 2 << "_id" << 1)); + insert.setUuid(testUuid()); + Document expectedInsert{ + {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1 << "x" << 2))}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, + {DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}}, + {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, + {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first + }; + checkTransformation(insert, expectedInsert, {{"_id"}, {"x"}}); +} + +TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) { + OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2)); insert.setUuid(testUuid()); - // Insert Document expectedInsert{ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1))}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType}, - {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 1}}}, + {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}}, {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}}, {DSChangeStream::kDocumentKeyField, D{{"_id", 1}}}, }; - checkTransformation(insert, expectedInsert); - insert.setFromMigrate(false); // also check actual "fromMigrate: false" not filtered - checkTransformation(insert, expectedInsert); + checkTransformation(insert, expectedInsert, {{"_id"}}); } TEST_F(ChangeStreamStageTest, TransformInsertFromMigrate) { diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp index e425699e80a..d3758128296 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp @@ -46,7 +46,9 @@ DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransfor const intrusive_ptr<ExpressionContext>& pExpCtx, std::unique_ptr<TransformerInterface> parsedTransform, std::string name) - : DocumentSource(pExpCtx), _parsedTransform(std::move(parsedTransform)), _name(name) {} + : DocumentSourceNeedsMongoProcessInterface(pExpCtx), + _parsedTransform(std::move(parsedTransform)), + _name(std::move(name)) {} const char* DocumentSourceSingleDocumentTransformation::getSourceName() const { return _name.c_str(); diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h index 70403cdf3c2..2a07072aef6 100644 --- a/src/mongo/db/pipeline/document_source_single_document_transformation.h +++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h @@ -38,7 +38,8 @@ namespace mongo { * a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be * created from BSON. */ -class DocumentSourceSingleDocumentTransformation final : public DocumentSource { +class DocumentSourceSingleDocumentTransformation final + : public DocumentSourceNeedsMongoProcessInterface { public: /** * This class defines the minimal interface that every parser wishing to take advantage of @@ -86,6 +87,16 @@ public: virtual bool isSubsetOfProjection(const BSONObj& proj) const { return false; } + + protected: + MongoProcessInterface* _mongoProcess{nullptr}; + + private: + void injectMongoProcess(MongoProcessInterface* p) { + _mongoProcess = p; + } + + friend class DocumentSourceSingleDocumentTransformation; }; DocumentSourceSingleDocumentTransformation( @@ -101,6 +112,12 @@ public: DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final; GetModPathsReturn getModifiedPaths() const final; + void doInjectMongoProcessInterface( + std::shared_ptr<MongoProcessInterface> mongoProcessInterface) override { + + _parsedTransform->injectMongoProcess(mongoProcessInterface.get()); + } + StageConstraints constraints(Pipeline::SplitState pipeState) const final { StageConstraints constraints( StreamType::kStreaming, diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 5262f04ea0e..d36b047af68 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -37,6 +37,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" @@ -65,7 +66,9 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" +#include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/metadata_manager.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/fill_locker_info.h" @@ -74,7 +77,10 @@ #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/net/sock.h" @@ -88,6 +94,7 @@ using std::string; using std::unique_ptr; namespace { + class MongodProcessInterface final : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { public: @@ -342,6 +349,38 @@ public: return std::string(); } + std::vector<FieldPath> collectDocumentKeyFields(UUID uuid) const final { + if (!ShardingState::get(_ctx->opCtx)->enabled()) { + return {"_id"}; // Nothing is sharded. + } + + auto scm = [this]() -> ScopedCollectionMetadata { + AutoGetCollection autoColl(_ctx->opCtx, _ctx->ns, MODE_IS); + return CollectionShardingState::get(_ctx->opCtx, _ctx->ns)->getMetadata(); + }(); + + if (!scm) { + return {"_id"}; // Collection is not sharded. + } + + uassert(ErrorCodes::StaleConfig, + str::stream() << "Collection " << _ctx->ns.ns() + << " UUID differs from UUID on change stream operations", + scm->uuidMatches(uuid)); + + // Unpack the shard key. + std::vector<FieldPath> result; + bool gotId = false; + for (auto& field : scm->getKeyPatternFields()) { + result.emplace_back(field->dottedField()); + gotId |= (result.back().fullPath() == "_id"); + } + if (!gotId) { // If not part of the shard key, "_id" comes last. + result.emplace_back("_id"); + } + return result; + } + private: intrusive_ptr<ExpressionContext> _ctx; DBDirectClient _client; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 9aaae5bb27f..1a1a72f1224 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -113,5 +113,9 @@ public: std::string getShardName(OperationContext* opCtx) const override { MONGO_UNREACHABLE; } + + std::vector<FieldPath> collectDocumentKeyFields(UUID) const override { + MONGO_UNREACHABLE; + } }; } // namespace mongo |