summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorNathan Myers <ncm@cantrip.org>2017-10-10 00:09:08 -0400
committerNathan Myers <nathan.myers@10gen.com>2017-10-20 16:42:06 -0400
commitf32371804b610a22fabd7ef8bc726c431c547cf3 (patch)
tree9e14810485ca9cd13a3997bae0dc8ef3539670f4 /src/mongo/db
parentcebd8dfd83281b224bc6bb155b134f7f12d1f37f (diff)
downloadmongo-f32371804b610a22fabd7ef8bc726c431c547cf3.tar.gz
SERVER-31192 Make Change Stream extract documentKey from insert op log entry
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/pipeline/document_path_support.cpp8
-rw-r--r--src/mongo/db/pipeline/document_path_support.h5
-rw-r--r--src/mongo/db/pipeline/document_source.h7
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp61
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h19
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp39
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
10 files changed, 149 insertions, 11 deletions
diff --git a/src/mongo/db/pipeline/document_path_support.cpp b/src/mongo/db/pipeline/document_path_support.cpp
index a2b1b11378d..2ac115a7a3a 100644
--- a/src/mongo/db/pipeline/document_path_support.cpp
+++ b/src/mongo/db/pipeline/document_path_support.cpp
@@ -151,5 +151,13 @@ BSONObj documentToBsonWithPaths(const Document& input, const std::set<std::strin
return outputBuilder.obj();
}
+Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths) {
+ MutableDocument result;
+ for (auto& field : paths) {
+ result.addField(field.fullPath(), doc.getNestedField(field));
+ }
+ return result.freeze();
+}
+
} // namespace document_path_support
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_path_support.h b/src/mongo/db/pipeline/document_path_support.h
index 8882bf816dd..d8dd894a76d 100644
--- a/src/mongo/db/pipeline/document_path_support.h
+++ b/src/mongo/db/pipeline/document_path_support.h
@@ -64,5 +64,10 @@ StatusWith<Value> extractElementAlongNonArrayPath(const Document& doc, const Fie
*/
BSONObj documentToBsonWithPaths(const Document&, const std::set<std::string>& paths);
+/**
+ * Extracts 'paths' from the input document to a flat document.
+ */
+Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths);
+
} // namespace document_path_support
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index bc9dce714ad..6b3754f5b8e 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -47,6 +47,7 @@
#include "mongo/db/pipeline/dependencies.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/value.h"
@@ -832,6 +833,12 @@ public:
*/
virtual std::string getShardName(OperationContext* opCtx) const = 0;
+ /**
+ * Returns the fields of the document key (in order) for the current collection, including
+ * the shard key and _id. If _id is not in the shard key, it is added last.
+ */
+ virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0;
+
// Add new methods as needed.
};
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 5e385da0420..b42cb41bdc0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/commands/feature_compatibility_version_command_parser.h"
#include "mongo/db/pipeline/close_change_stream_exception.h"
+#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"
@@ -383,6 +384,9 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
Value uuid = input[repl::OplogEntry::kUuidFieldName];
if (!uuid.missing()) {
checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData);
+ if (_mongoProcess && _documentKeyFields.empty()) {
+ _documentKeyFields = _mongoProcess->collectDocumentKeyFields(uuid.getUuid());
+ }
}
NamespaceString nss(ns.getString());
Value id = input.getNestedField("o._id");
@@ -398,7 +402,8 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
case repl::OpTypeEnum::kInsert: {
operationType = kInsertOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
- documentKey = Value(Document{{kIdField, id}});
+ documentKey = Value(document_path_support::extractDocumentKeyFromDoc(
+ fullDocument.getDocument(), _documentKeyFields));
break;
}
case repl::OpTypeEnum::kDelete: {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index fe1d6bfa199..e2d1498e482 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -32,6 +32,7 @@
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/document_sources_gen.h"
+#include "mongo/db/pipeline/field_path.h"
namespace mongo {
@@ -88,6 +89,11 @@ public:
private:
boost::intrusive_ptr<ExpressionContext> _expCtx;
BSONObj _changeStreamSpec;
+
+ // Fields of the document key, in order, including the shard key if the collection is
+ // sharded, and anyway "_id". Empty until the first oplog entry with a uuid is encountered.
+ // Needed for transforming 'insert' oplog entries.
+ std::vector<FieldPath> _documentKeyFields;
};
// The sort pattern used to merge results from multiple change streams on a mongos.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 7682972c787..f59545111f0 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -36,12 +36,14 @@
#include "mongo/db/pipeline/aggregation_context_fixture.h"
#include "mongo/db/pipeline/close_change_stream_exception.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream.h"
#include "mongo/db/pipeline/document_source_limit.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_mock.h"
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/stub_mongo_process_interface.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/pipeline/value_comparator.h"
#include "mongo/db/repl/oplog_entry.h"
@@ -81,6 +83,18 @@ private:
EnsureFCV _ensureFCV;
};
+// This is needed only for the "insert" tests.
+struct MockMongoProcessInterface final : public StubMongoProcessInterface {
+
+ MockMongoProcessInterface(std::vector<FieldPath> fields) : _fields(std::move(fields)) {}
+
+ std::vector<FieldPath> collectDocumentKeyFields(UUID) const final {
+ return _fields;
+ }
+
+ std::vector<FieldPath> _fields;
+};
+
class ChangeStreamStageTest : public ChangeStreamStageTestNoSetup {
public:
ChangeStreamStageTest() : ChangeStreamStageTestNoSetup() {
@@ -89,10 +103,16 @@ public:
getExpCtx()->opCtx->getServiceContext()));
}
- void checkTransformation(const OplogEntry& entry, const boost::optional<Document> expectedDoc) {
+ void checkTransformation(const OplogEntry& entry,
+ const boost::optional<Document> expectedDoc,
+ std::vector<FieldPath> docKeyFields = {}) {
vector<intrusive_ptr<DocumentSource>> stages = makeStages(entry);
auto transform = stages[2].get();
+ auto mongoProcess = std::make_shared<MockMongoProcessInterface>(docKeyFields);
+ using NeedyDS = DocumentSourceNeedsMongoProcessInterface;
+ dynamic_cast<NeedyDS&>(*transform).injectMongoProcessInterface(std::move(mongoProcess));
+
auto next = transform->getNext();
// Match stage should pass the doc down if expectedDoc is given.
ASSERT_EQ(next.isAdvanced(), static_cast<bool>(expectedDoc));
@@ -217,20 +237,45 @@ TEST_F(ChangeStreamStageTest, StagesGeneratedCorrectly) {
// TODO: Check explain result.
}
-TEST_F(ChangeStreamStageTest, TransformInsert) {
- OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 1));
+TEST_F(ChangeStreamStageTest, TransformInsertDocKeyXAndId) {
+ OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2));
+ insert.setUuid(testUuid());
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("x" << 2 << "_id" << 1))},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"x", 2}, {"_id", 1}}}, // Note _id <-> x reversal.
+ };
+ checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}});
+ insert.setFromMigrate(false); // also check actual "fromMigrate: false" not filtered
+ checkTransformation(insert, expectedInsert, {{"x"}, {"_id"}});
+}
+
+TEST_F(ChangeStreamStageTest, TransformInsertDocKeyIdAndX) {
+ OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("x" << 2 << "_id" << 1));
+ insert.setUuid(testUuid());
+ Document expectedInsert{
+ {DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1 << "x" << 2))},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
+ {DSChangeStream::kFullDocumentField, D{{"x", 2}, {"_id", 1}}},
+ {DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
+ {DSChangeStream::kDocumentKeyField, D{{"_id", 1}, {"x", 2}}}, // _id first
+ };
+ checkTransformation(insert, expectedInsert, {{"_id"}, {"x"}});
+}
+
+TEST_F(ChangeStreamStageTest, TransformInsertDocKeyJustId) {
+ OplogEntry insert(optime, 1, OpTypeEnum::kInsert, nss, BSON("_id" << 1 << "x" << 2));
insert.setUuid(testUuid());
- // Insert
Document expectedInsert{
{DSChangeStream::kIdField, makeResumeToken(ts, testUuid(), BSON("_id" << 1))},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInsertOpType},
- {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 1}}},
+ {DSChangeStream::kFullDocumentField, D{{"_id", 1}, {"x", 2}}},
{DSChangeStream::kNamespaceField, D{{"db", nss.db()}, {"coll", nss.coll()}}},
{DSChangeStream::kDocumentKeyField, D{{"_id", 1}}},
};
- checkTransformation(insert, expectedInsert);
- insert.setFromMigrate(false); // also check actual "fromMigrate: false" not filtered
- checkTransformation(insert, expectedInsert);
+ checkTransformation(insert, expectedInsert, {{"_id"}});
}
TEST_F(ChangeStreamStageTest, TransformInsertFromMigrate) {
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index e425699e80a..d3758128296 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -46,7 +46,9 @@ DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransfor
const intrusive_ptr<ExpressionContext>& pExpCtx,
std::unique_ptr<TransformerInterface> parsedTransform,
std::string name)
- : DocumentSource(pExpCtx), _parsedTransform(std::move(parsedTransform)), _name(name) {}
+ : DocumentSourceNeedsMongoProcessInterface(pExpCtx),
+ _parsedTransform(std::move(parsedTransform)),
+ _name(std::move(name)) {}
const char* DocumentSourceSingleDocumentTransformation::getSourceName() const {
return _name.c_str();
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index 70403cdf3c2..2a07072aef6 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -38,7 +38,8 @@ namespace mongo {
* a ParsedSingleDocumentTransformation. It is not a registered DocumentSource, and it cannot be
* created from BSON.
*/
-class DocumentSourceSingleDocumentTransformation final : public DocumentSource {
+class DocumentSourceSingleDocumentTransformation final
+ : public DocumentSourceNeedsMongoProcessInterface {
public:
/**
* This class defines the minimal interface that every parser wishing to take advantage of
@@ -86,6 +87,16 @@ public:
virtual bool isSubsetOfProjection(const BSONObj& proj) const {
return false;
}
+
+ protected:
+ MongoProcessInterface* _mongoProcess{nullptr};
+
+ private:
+ void injectMongoProcess(MongoProcessInterface* p) {
+ _mongoProcess = p;
+ }
+
+ friend class DocumentSourceSingleDocumentTransformation;
};
DocumentSourceSingleDocumentTransformation(
@@ -101,6 +112,12 @@ public:
DocumentSource::GetDepsReturn getDependencies(DepsTracker* deps) const final;
GetModPathsReturn getModifiedPaths() const final;
+ void doInjectMongoProcessInterface(
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface) override {
+
+ _parsedTransform->injectMongoProcess(mongoProcessInterface.get());
+ }
+
StageConstraints constraints(Pipeline::SplitState pipeState) const final {
StageConstraints constraints(
StreamType::kStreaming,
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 5262f04ea0e..d36b047af68 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/concurrency/d_concurrency.h"
@@ -65,7 +66,9 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner.h"
+#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/metadata_manager.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/fill_locker_info.h"
@@ -74,7 +77,10 @@
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_version.h"
+#include "mongo/s/grid.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/net/sock.h"
@@ -88,6 +94,7 @@ using std::string;
using std::unique_ptr;
namespace {
+
class MongodProcessInterface final
: public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface {
public:
@@ -342,6 +349,38 @@ public:
return std::string();
}
+ std::vector<FieldPath> collectDocumentKeyFields(UUID uuid) const final {
+ if (!ShardingState::get(_ctx->opCtx)->enabled()) {
+ return {"_id"}; // Nothing is sharded.
+ }
+
+ auto scm = [this]() -> ScopedCollectionMetadata {
+ AutoGetCollection autoColl(_ctx->opCtx, _ctx->ns, MODE_IS);
+ return CollectionShardingState::get(_ctx->opCtx, _ctx->ns)->getMetadata();
+ }();
+
+ if (!scm) {
+ return {"_id"}; // Collection is not sharded.
+ }
+
+ uassert(ErrorCodes::StaleConfig,
+ str::stream() << "Collection " << _ctx->ns.ns()
+ << " UUID differs from UUID on change stream operations",
+ scm->uuidMatches(uuid));
+
+ // Unpack the shard key.
+ std::vector<FieldPath> result;
+ bool gotId = false;
+ for (auto& field : scm->getKeyPatternFields()) {
+ result.emplace_back(field->dottedField());
+ gotId |= (result.back().fullPath() == "_id");
+ }
+ if (!gotId) { // If not part of the shard key, "_id" comes last.
+ result.emplace_back("_id");
+ }
+ return result;
+ }
+
private:
intrusive_ptr<ExpressionContext> _ctx;
DBDirectClient _client;
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 9aaae5bb27f..1a1a72f1224 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -113,5 +113,9 @@ public:
std::string getShardName(OperationContext* opCtx) const override {
MONGO_UNREACHABLE;
}
+
+ std::vector<FieldPath> collectDocumentKeyFields(UUID) const override {
+ MONGO_UNREACHABLE;
+ }
};
} // namespace mongo