summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-03 17:00:41 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-04-11 15:53:03 -0400
commitfff261ac550155065fce4b7b1529061f18980599 (patch)
tree09ce022d7b8319f1af3c2db2354427ecfe1aa389 /src/mongo/db/pipeline
parent0fa7bcb8bea5d4585fdbc1003b5116cd7bf28540 (diff)
downloadmongo-fff261ac550155065fce4b7b1529061f18980599.tar.gz
SERVER-29134: Support change streams on an entire database in a sharded cluster
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/document_source_add_fields.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp48
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h24
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp7
-rw-r--r--src/mongo/db/pipeline/document_source_project.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_replace_root.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.cpp6
-rw-r--r--src/mongo/db/pipeline/document_source_single_document_transformation.h9
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h14
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp44
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h5
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h5
12 files changed, 118 insertions, 63 deletions
diff --git a/src/mongo/db/pipeline/document_source_add_fields.cpp b/src/mongo/db/pipeline/document_source_add_fields.cpp
index 445feaa1a51..44490738890 100644
--- a/src/mongo/db/pipeline/document_source_add_fields.cpp
+++ b/src/mongo/db/pipeline/document_source_add_fields.cpp
@@ -47,9 +47,14 @@ REGISTER_DOCUMENT_SOURCE(addFields,
intrusive_ptr<DocumentSource> DocumentSourceAddFields::create(
BSONObj addFieldsSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
+
+ const bool isIndependentOfAnyCollection = false;
intrusive_ptr<DocumentSourceSingleDocumentTransformation> addFields(
new DocumentSourceSingleDocumentTransformation(
- expCtx, ParsedAddFields::create(expCtx, addFieldsSpec), "$addFields"));
+ expCtx,
+ ParsedAddFields::create(expCtx, addFieldsSpec),
+ "$addFields",
+ isIndependentOfAnyCollection));
return addFields;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b227afeac5f..0ef9addfaac 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -498,10 +498,14 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or
intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage(
BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
+ // Mark the transformation stage as independent of any collection if the change stream is
+ // watching all collections in the database.
+ const bool isIndependentOfAnyCollection = expCtx->ns.isCollectionlessAggregateNS();
return intrusive_ptr<DocumentSource>(new DocumentSourceSingleDocumentTransformation(
expCtx,
stdx::make_unique<Transformation>(expCtx, changeStreamSpec),
- kStageName.toString()));
+ kStageName.toString(),
+ isIndependentOfAnyCollection));
}
Document DocumentSourceChangeStream::Transformation::applyTransformation(const Document& input) {
@@ -532,28 +536,28 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
Value ns = input[repl::OplogEntry::kNamespaceFieldName];
checkValueType(ns, repl::OplogEntry::kNamespaceFieldName, BSONType::String);
Value uuid = input[repl::OplogEntry::kUuidFieldName];
- if (!uuid.missing()) {
+ std::vector<FieldPath> documentKeyFields;
+
+ // Deal with CRUD operations and commands.
+ auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op);
+
+ // Ignore commands in the oplog when looking up the document key fields since a command implies
+ // that the change stream is about to be invalidated (e.g. collection drop).
+ if (!uuid.missing() && opType != repl::OpTypeEnum::kCommand) {
checkValueType(uuid, repl::OplogEntry::kUuidFieldName, BSONType::BinData);
- // We need to retrieve the document key fields if our cached copy has not been populated. If
- // the collection was unsharded but has now transitioned to a sharded state, we must update
- // the documentKey fields to include the shard key. We only need to re-check the documentKey
- // while the collection is unsharded; if the collection is or becomes sharded, then the
- // documentKey is final and will not change.
- if (!_documentKeyFieldsSharded) {
- // If this is not a shard server, 'catalogCache' will be nullptr and we will skip the
- // routing table check.
- auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache();
- const bool collectionIsSharded = catalogCache && [catalogCache, this]() {
- auto routingInfo =
- catalogCache->getCollectionRoutingInfo(_expCtx->opCtx, _expCtx->ns);
- return routingInfo.isOK() && routingInfo.getValue().cm();
- }();
- if (_documentKeyFields.empty() || collectionIsSharded) {
- _documentKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields(
- _expCtx->opCtx, _expCtx->ns, uuid.getUuid());
- _documentKeyFieldsSharded = collectionIsSharded;
+ // We need to retrieve the document key fields if our cache does not have an entry for this
+ // UUID or if the cache entry is not definitively final, indicating that the collection was
+ // unsharded when the entry was last populated.
+ auto it = _documentKeyCache.find(uuid.getUuid());
+ if (it == _documentKeyCache.end() || !it->second.isFinal) {
+ auto docKeyFields = _expCtx->mongoProcessInterface->collectDocumentKeyFields(
+ _expCtx->opCtx, uuid.getUuid());
+ if (it == _documentKeyCache.end() || docKeyFields.second) {
+ _documentKeyCache[uuid.getUuid()] = DocumentKeyCacheEntry(docKeyFields);
}
}
+
+ documentKeyFields = _documentKeyCache.find(uuid.getUuid())->second.documentKeyFields;
}
NamespaceString nss(ns.getString());
Value id = input.getNestedField("o._id");
@@ -563,14 +567,12 @@ Document DocumentSourceChangeStream::Transformation::applyTransformation(const D
Value updateDescription;
Value documentKey;
- // Deal with CRUD operations and commands.
- auto opType = repl::OpType_parse(IDLParserErrorContext("ChangeStreamEntry.op"), op);
switch (opType) {
case repl::OpTypeEnum::kInsert: {
operationType = kInsertOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
documentKey = Value(document_path_support::extractDocumentKeyFromDoc(
- fullDocument.getDocument(), _documentKeyFields));
+ fullDocument.getDocument(), documentKeyFields));
break;
}
case repl::OpTypeEnum::kDelete: {
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 3360a9aca14..8d39159dc0f 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -106,13 +106,25 @@ public:
boost::intrusive_ptr<ExpressionContext> _expCtx;
BSONObj _changeStreamSpec;
- // Fields of the document key, in order, including the shard key if the collection is
- // sharded, and anyway "_id". Empty until the first oplog entry with a uuid is encountered.
- // Needed for transforming 'insert' oplog entries.
- std::vector<FieldPath> _documentKeyFields;
+ struct DocumentKeyCacheEntry {
+ DocumentKeyCacheEntry() = default;
+
+ DocumentKeyCacheEntry(std::pair<std::vector<FieldPath>, bool> documentKeyFieldsIn)
+ : documentKeyFields(documentKeyFieldsIn.first),
+ isFinal(documentKeyFieldsIn.second){};
+ // Fields of the document key, in order, including "_id" and the shard key if the
+ // collection is sharded. Empty until the first oplog entry with a uuid is encountered.
+ // Needed for transforming 'insert' oplog entries.
+ std::vector<FieldPath> documentKeyFields;
+
+ // Set to true if the document key fields for this entry are definitively known and will
+ // not change. This implies that either the collection has become sharded or has been
+ // dropped.
+ bool isFinal;
+ };
- // Set to true if the collection is found to be sharded while retrieving _documentKeyFields.
- bool _documentKeyFieldsSharded = false;
+ // Map of collection UUID to document key fields.
+ std::map<UUID, DocumentKeyCacheEntry> _documentKeyCache;
};
// The name of the field where the document key (_id and shard key, if present) will be found
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 32df8e68959..e89e9e0395b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -85,10 +85,9 @@ struct MockMongoInterface final : public StubMongoProcessInterface {
MockMongoInterface(std::vector<FieldPath> fields) : _fields(std::move(fields)) {}
- std::vector<FieldPath> collectDocumentKeyFields(OperationContext*,
- const NamespaceString&,
- UUID) const final {
- return _fields;
+ std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*,
+ UUID) const final {
+ return {_fields, true};
}
std::vector<FieldPath> _fields;
diff --git a/src/mongo/db/pipeline/document_source_project.cpp b/src/mongo/db/pipeline/document_source_project.cpp
index 6cf11fc5a51..f5d560c4c9d 100644
--- a/src/mongo/db/pipeline/document_source_project.cpp
+++ b/src/mongo/db/pipeline/document_source_project.cpp
@@ -47,8 +47,12 @@ REGISTER_DOCUMENT_SOURCE(project,
intrusive_ptr<DocumentSource> DocumentSourceProject::create(
BSONObj projectSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
+ const bool isIndependentOfAnyCollection = false;
intrusive_ptr<DocumentSource> project(new DocumentSourceSingleDocumentTransformation(
- expCtx, ParsedAggregationProjection::create(expCtx, projectSpec), "$project"));
+ expCtx,
+ ParsedAggregationProjection::create(expCtx, projectSpec),
+ "$project",
+ isIndependentOfAnyCollection));
return project;
}
diff --git a/src/mongo/db/pipeline/document_source_replace_root.cpp b/src/mongo/db/pipeline/document_source_replace_root.cpp
index 089ff2d9c98..6e7ce90cc73 100644
--- a/src/mongo/db/pipeline/document_source_replace_root.cpp
+++ b/src/mongo/db/pipeline/document_source_replace_root.cpp
@@ -148,8 +148,12 @@ REGISTER_DOCUMENT_SOURCE(replaceRoot,
intrusive_ptr<DocumentSource> DocumentSourceReplaceRoot::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ const bool isIndependentOfAnyCollection = false;
return new DocumentSourceSingleDocumentTransformation(
- pExpCtx, ReplaceRootTransformation::create(pExpCtx, elem), "$replaceRoot");
+ pExpCtx,
+ ReplaceRootTransformation::create(pExpCtx, elem),
+ "$replaceRoot",
+ isIndependentOfAnyCollection);
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
index deec3f342ae..41be7bbb096 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.cpp
@@ -45,10 +45,12 @@ using boost::intrusive_ptr;
DocumentSourceSingleDocumentTransformation::DocumentSourceSingleDocumentTransformation(
const intrusive_ptr<ExpressionContext>& pExpCtx,
std::unique_ptr<TransformerInterface> parsedTransform,
- std::string name)
+ std::string name,
+ bool isIndependentOfAnyCollection)
: DocumentSource(pExpCtx),
_parsedTransform(std::move(parsedTransform)),
- _name(std::move(name)) {}
+ _name(std::move(name)),
+ _isIndependentOfAnyCollection(isIndependentOfAnyCollection) {}
const char* DocumentSourceSingleDocumentTransformation::getSourceName() const {
return _name.c_str();
diff --git a/src/mongo/db/pipeline/document_source_single_document_transformation.h b/src/mongo/db/pipeline/document_source_single_document_transformation.h
index bffa484c46b..5d0e850d3fd 100644
--- a/src/mongo/db/pipeline/document_source_single_document_transformation.h
+++ b/src/mongo/db/pipeline/document_source_single_document_transformation.h
@@ -94,7 +94,8 @@ public:
DocumentSourceSingleDocumentTransformation(
const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
std::unique_ptr<TransformerInterface> parsedTransform,
- std::string name);
+ std::string name,
+ bool independentOfAnyCollection);
// virtuals from DocumentSource
const char* getSourceName() const final;
@@ -122,6 +123,9 @@ public:
constraints.canSwapWithMatch = true;
constraints.canSwapWithLimit = true;
+ // This transformation could be part of a 'collectionless' change stream on an entire
+ // database or cluster, mark as independent of any collection if so.
+ constraints.isIndependentOfAnyCollection = _isIndependentOfAnyCollection;
return constraints;
}
@@ -146,6 +150,9 @@ private:
// Specific name of the transformation.
std::string _name;
+ // Set to true if this transformation stage can be run on the collectionless namespace.
+ bool _isIndependentOfAnyCollection;
+
// Cached stage options in case this DocumentSource is disposed before serialized (e.g. explain
// with a sort which will auto-dispose of the pipeline).
Document _cachedStageOptions;
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 3f63f7e66ff..dac98d07acc 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -183,12 +183,14 @@ public:
virtual std::string getShardName(OperationContext* opCtx) const = 0;
/**
- * Returns the fields of the document key (in order) for the collection given by 'nss' and
- * 'UUID', including the shard key and _id. If _id is not in the shard key, it is added last.
- */
- virtual std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx,
- const NamespaceString& nss,
- UUID uuid) const = 0;
+ * Returns the fields of the document key (in order) for the collection corresponding to 'uuid',
+ * including the shard key and _id. If _id is not in the shard key, it is added last. If the
+ * collection is not sharded or no longer exists, returns only _id. Also retrurns a boolean that
+ * indicates whether the returned fields of the document key are final and will never change for
+ * the given collection, either because the collection was dropped or has become sharded.
+ */
+ virtual std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(
+ OperationContext* opCtx, UUID uuid) const = 0;
/**
* Returns zero or one documents with the document key 'documentKey'. 'documentKey' is treated
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index a3f1bdf3275..52b7860c511 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
+#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
@@ -743,10 +744,31 @@ std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) co
return std::string();
}
-std::vector<FieldPath> PipelineD::MongoDInterface::collectDocumentKeyFields(
- OperationContext* opCtx, const NamespaceString& nss, UUID uuid) const {
- if (!ShardingState::get(opCtx)->enabled()) {
- return {"_id"}; // Nothing is sharded.
+std::pair<std::vector<FieldPath>, bool> PipelineD::MongoDInterface::collectDocumentKeyFields(
+ OperationContext* opCtx, UUID uuid) const {
+ if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
+ return {{"_id"}, false}; // Nothing is sharded.
+ }
+
+ // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and
+ // mark the fields as final.
+ auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid);
+ if (nss.isEmpty()) {
+ return {{"_id"}, true};
+ }
+
+ // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache
+ // to determine whether the collection is sharded in the first place.
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+
+ const bool collectionIsSharded = catalogCache && [&]() {
+ auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss);
+ return routingInfo.isOK() && routingInfo.getValue().cm();
+ }();
+
+ // Collection exists and is not sharded, mark as not final.
+ if (!collectionIsSharded) {
+ return {{"_id"}, false};
}
auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata {
@@ -754,15 +776,12 @@ std::vector<FieldPath> PipelineD::MongoDInterface::collectDocumentKeyFields(
return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx);
}();
- if (!scm) {
- return {"_id"}; // Collection is not sharded.
+ // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated
+ // as sharded.
+ if (!scm || !scm->uuidMatches(uuid)) {
+ return {{"_id"}, false};
}
- uassert(ErrorCodes::InvalidUUID,
- str::stream() << "Collection " << nss.ns()
- << " UUID differs from UUID on change stream operations",
- scm->uuidMatches(uuid));
-
// Unpack the shard key.
std::vector<FieldPath> result;
bool gotId = false;
@@ -773,7 +792,8 @@ std::vector<FieldPath> PipelineD::MongoDInterface::collectDocumentKeyFields(
if (!gotId) { // If not part of the shard key, "_id" comes last.
result.emplace_back("_id");
}
- return result;
+ // Collection is now sharded so the document key fields will never change, mark as final.
+ return {result, true};
}
std::vector<GenericCursor> PipelineD::MongoDInterface::getCursors(
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index e9ab4fef8d2..4064b38e7c8 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -98,9 +98,8 @@ public:
Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
- std::vector<FieldPath> collectDocumentKeyFields(OperationContext* opCtx,
- const NamespaceString& nss,
- UUID uuid) const final;
+ std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext* opCtx,
+ UUID uuid) const final;
boost::optional<Document> lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 9df0bb7baba..69515dfd853 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -125,9 +125,8 @@ public:
MONGO_UNREACHABLE;
}
- std::vector<FieldPath> collectDocumentKeyFields(OperationContext*,
- const NamespaceString&,
- UUID) const override {
+ std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*,
+ UUID) const override {
MONGO_UNREACHABLE;
}