diff options
30 files changed, 1176 insertions, 688 deletions
diff --git a/jstests/aggregation/sources/out/batch_writes.js b/jstests/aggregation/sources/out/batch_writes.js new file mode 100644 index 00000000000..6bbaac7737a --- /dev/null +++ b/jstests/aggregation/sources/out/batch_writes.js @@ -0,0 +1,61 @@ +// Tests the behavior of an $out stage which encounters an error in the middle of processing. We +// don't guarantee any particular behavior in this scenario, but this test exists to make sure +// nothing horrendous happens and to characterize the current behavior. +// @tags: [assumes_unsharded_collection] +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + + const coll = db.batch_writes; + const outColl = db.batch_writes_out; + coll.drop(); + outColl.drop(); + + // Test with 2 very large documents that do not fit into a single batch. + const kSize15MB = 15 * 1024 * 1024; + const largeArray = new Array(kSize15MB).join("a"); + assert.commandWorked(coll.insert({_id: 0, a: largeArray})); + assert.commandWorked(coll.insert({_id: 1, a: largeArray})); + + // Make sure the $out succeeds without any duplicate keys. + ["replaceCollection", "insertDocuments", "replaceDocuments"].forEach(mode => { + coll.aggregate([{$out: {to: outColl.getName(), mode: mode}}]); + assert.eq(2, outColl.find().itcount()); + outColl.drop(); + }); + + coll.drop(); + for (let i = 0; i < 10; i++) { + assert.commandWorked(coll.insert({_id: i, a: i})); + } + + // Create a unique index on 'a' in the output collection to create a unique key violation when + // running the $out. The second document to be written ({_id: 1, a: 1}) will conflict with the + // existing document in the output collection. We use a unique index on a field other than _id + // because "replaceDocuments" mode will not change _id when one already exists. + outColl.drop(); + assert.commandWorked(outColl.insert({_id: 2, a: 1})); + assert.commandWorked(outColl.createIndex({a: 1}, {unique: true})); + + // Test that both batched updates and inserts will successfully write the first document but + // fail on the second. We don't guarantee any particular behavior in this case, but this test is + // meant to characterize the current behavior. + assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "insertDocuments"}}], 16996); + assert.soon(() => { + return outColl.find().itcount() == 2; + }); + + assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "replaceDocuments"}}], 50904); + assert.soon(() => { + return outColl.find().itcount() == 2; + }); + + // Mode "replaceCollection" will drop the contents of the output collection, so there is no + // duplicate key error. + outColl.drop(); + assert.commandWorked(outColl.insert({_id: 2, a: 1})); + assert.commandWorked(outColl.createIndex({a: 1}, {unique: true})); + coll.aggregate([{$out: {to: outColl.getName(), mode: "replaceCollection"}}]); + assert.eq(10, outColl.find().itcount()); +}()); diff --git a/jstests/aggregation/sources/out/mode_replace_documents.js b/jstests/aggregation/sources/out/mode_replace_documents.js new file mode 100644 index 00000000000..8335981a1ff --- /dev/null +++ b/jstests/aggregation/sources/out/mode_replace_documents.js @@ -0,0 +1,93 @@ +// Tests for the $out stage with mode set to "replaceDocuments". +// @tags: [assumes_unsharded_collection] +(function() { + "use strict"; + + load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. + + const coll = db.replace_docs; + const outColl = db.replace_docs_out; + coll.drop(); + outColl.drop(); + + const nDocs = 10; + for (let i = 0; i < nDocs; i++) { + assert.commandWorked(coll.insert({_id: i, a: i})); + } + + // Test that a $out with 'replaceDocuments' mode will default the unique key to "_id". + coll.aggregate([{$out: {to: outColl.getName(), mode: "replaceDocuments"}}]); + assert.eq(nDocs, outColl.find().itcount()); + + // Test that 'replaceDocuments' mode will update existing documents that match the unique key. + const nDocsReplaced = 5; + coll.aggregate([ + {$project: {_id: {$mod: ["$_id", nDocsReplaced]}}}, + {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1}}} + ]); + assert.eq(nDocsReplaced, outColl.find({a: {$exists: false}}).itcount()); + + // Test 'replaceDocuments' mode with a dotted path unique key. + coll.drop(); + outColl.drop(); + assert.commandWorked(coll.insert({_id: 0, a: {b: 1}})); + assert.commandWorked(coll.insert({_id: 1, a: {b: 1}, c: 1})); + coll.aggregate([ + {$addFields: {_id: 0}}, + {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1, "a.b": 1}}} + ]); + assert.eq([{_id: 0, a: {b: 1}, c: 1}], outColl.find().toArray()); + + // TODO SERVER-36100: 'replaceDocuments' mode should allow a missing "_id" unique key. + assertErrorCode(coll, + [ + {$project: {_id: 0}}, + { + $out: { + to: outColl.getName(), + mode: "replaceDocuments", + } + } + ], + 50905); + + // Test that 'replaceDocuments' mode with a missing non-id unique key fails. + assertErrorCode( + coll, + [{$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {missing: 1}}}], + 50905); + + // Test that a replace fails to insert a document if it violates a unique index constraint. In + // this example, $out will attempt to insert multiple documents with {a: 0} which is not allowed + // with the unique index on {a: 1}. + coll.drop(); + assert.commandWorked(coll.insert([{_id: 0}, {_id: 1}])); + + outColl.drop(); + assert.commandWorked(outColl.createIndex({a: 1}, {unique: true})); + assertErrorCode( + coll, + [{$addFields: {a: 0}}, {$out: {to: outColl.getName(), mode: "replaceDocuments"}}], + 50904); + + // Test that $out fails if the unique key contains an array. + coll.drop(); + assert.commandWorked(coll.insert({_id: 0, a: [1, 2]})); + assertErrorCode( + coll, + [ + {$addFields: {_id: 0}}, + {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1, "a.b": 1}}} + ], + 50905); + + coll.drop(); + assert.commandWorked(coll.insert({_id: 0, a: [{b: 1}]})); + assertErrorCode( + coll, + [ + {$addFields: {_id: 0}}, + {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1, "a.b": 1}}} + ], + 50905); +}()); diff --git a/src/mongo/bson/bsonobj.cpp b/src/mongo/bson/bsonobj.cpp index 244d4ff5c67..ff03f9aac79 100644 --- a/src/mongo/bson/bsonobj.cpp +++ b/src/mongo/bson/bsonobj.cpp @@ -271,20 +271,6 @@ BSONElement BSONObj::getFieldUsingIndexNames(StringData fieldName, const BSONObj return BSONElement(); } -/* grab names of all the fields in this object */ -int BSONObj::getFieldNames(set<string>& fields) const { - int n = 0; - BSONObjIterator i(*this); - while (i.moreWithEOO()) { - BSONElement e = i.next(); - if (e.eoo()) - break; - fields.insert(e.fieldName()); - n++; - } - return n; -} - /* note: addFields always adds _id even if not specified returns n added not counting _id unless requested. */ diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h index c83f1abf368..ee441046c25 100644 --- a/src/mongo/bson/bsonobj.h +++ b/src/mongo/bson/bsonobj.h @@ -274,8 +274,11 @@ public: */ int nFields() const; - /** adds the field names to the fields set. does NOT clear it (appends). */ - int getFieldNames(std::set<std::string>& fields) const; + /** + * Returns a 'Container' populated with the field names of the object. + */ + template <class Container> + Container getFieldNames() const; /** Get the field of the specified name. eoo() is true on the returned element if not found. @@ -838,4 +841,16 @@ inline void BSONObj::getFields(const std::array<StringData, N>& fieldNames, break; } } + +template <class Container> +Container BSONObj::getFieldNames() const { + Container fields; + for (auto&& elem : *this) { + if (elem.eoo()) + break; + fields.insert(elem.fieldName()); + } + return fields; +} + } // namespace mongo diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 0c218665a46..6d391e461bf 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -240,6 +240,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands', '$BUILD_DIR/mongo/db/curop_failpoint_helpers', '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/pipeline/mongod_process_interface', '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/db/repair_database', '$BUILD_DIR/mongo/db/rw_concern_d', diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 60efa126c3c..3fa7c3ffa93 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -49,6 +49,7 @@ #include "mongo/db/pipeline/expression.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" +#include "mongo/db/pipeline/mongod_process_interface.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/db/query/collation/collator_factory_interface.h" @@ -490,7 +491,7 @@ Status runAggregate(OperationContext* opCtx, new ExpressionContext(opCtx, request, std::move(*collatorToUse), - std::make_shared<PipelineD::MongoDInterface>(opCtx), + std::make_shared<MongoDInterface>(opCtx), uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)), uuid)); expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 0195c38f596..2b53ed9c181 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -261,6 +261,41 @@ env.Library( ] ) +env.Library( + target='mongo_process_common', + source=[ + 'mongo_process_common.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/auth/authorization_manager_global', + ] +) + +env.Library( + target='mongod_process_interface', + source=[ + 'mongod_process_interface.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/query_exec', + '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/stats/top', + 'mongo_process_common', + ] +) + +env.Library( + target='mongos_process_interface', + source=[ + 'mongos_process_interface.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/s/query/async_results_merger', + '$BUILD_DIR/mongo/s/commands/cluster_commands_helpers', + 'mongo_process_common', + ] +) + pipelineeEnv = env.Clone() pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy']) pipelineeEnv.Library( @@ -310,7 +345,6 @@ pipelineeEnv.Library( "cluster_aggregation_planner.cpp", 'document_source_tee_consumer.cpp', 'document_source_unwind.cpp', - 'mongo_process_common.cpp', 'pipeline.cpp', 'sequential_document_cache.cpp', 'tee_buffer.cpp', @@ -333,7 +367,6 @@ pipelineeEnv.Library( '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/sessions_collection', - '$BUILD_DIR/mongo/db/stats/top', '$BUILD_DIR/mongo/db/storage/encryption_hooks', '$BUILD_DIR/mongo/db/storage/storage_options', '$BUILD_DIR/mongo/s/is_mongos', diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h index 76d207276ce..21991ef483a 100644 --- a/src/mongo/db/pipeline/document.h +++ b/src/mongo/db/pipeline/document.h @@ -500,8 +500,9 @@ public: /** Gets/Sets a nested field given a path. * - * All fields along path are created as empty Documents if they don't exist - * or are any other type. + * All fields along path are created as empty Documents if they don't exist or are any other + * type. Does *not* traverse nested arrays when evaluating a nested path, instead returning + * Value() if the dotted field points to a nested object within an array. */ MutableValue getNestedField(const FieldPath& dottedField); void setNestedField(const FieldPath& dottedField, const Value& val) { diff --git a/src/mongo/db/pipeline/document_path_support.cpp b/src/mongo/db/pipeline/document_path_support.cpp index 2ac115a7a3a..a2b1b11378d 100644 --- a/src/mongo/db/pipeline/document_path_support.cpp +++ b/src/mongo/db/pipeline/document_path_support.cpp @@ -151,13 +151,5 @@ BSONObj documentToBsonWithPaths(const Document& input, const std::set<std::strin return outputBuilder.obj(); } -Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths) { - MutableDocument result; - for (auto& field : paths) { - result.addField(field.fullPath(), doc.getNestedField(field)); - } - return result.freeze(); -} - } // namespace document_path_support } // namespace mongo diff --git a/src/mongo/db/pipeline/document_path_support.h b/src/mongo/db/pipeline/document_path_support.h index d8dd894a76d..b493a4bcaf8 100644 --- a/src/mongo/db/pipeline/document_path_support.h +++ b/src/mongo/db/pipeline/document_path_support.h @@ -66,8 +66,18 @@ BSONObj documentToBsonWithPaths(const Document&, const std::set<std::string>& pa /** * Extracts 'paths' from the input document to a flat document. + * + * This does *not* traverse arrays when extracting values from dotted paths. For example, the path + * "a.b" will not populate the result document if the input document is {a: [{b: 1}]}. */ -Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths); +template <class Container> +Document extractPathsFromDoc(const Document& doc, const Container& paths) { + MutableDocument result; + for (const auto& field : paths) { + result.addField(field.fullPath(), doc.getNestedField(field)); + } + return result.freeze(); +} } // namespace document_path_support } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 06e3c6230bf..6b744558186 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -240,7 +240,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document case repl::OpTypeEnum::kInsert: { operationType = DocumentSourceChangeStream::kInsertOpType; fullDocument = input[repl::OplogEntry::kObjectFieldName]; - documentKey = Value(document_path_support::extractDocumentKeyFromDoc( + documentKey = Value(document_path_support::extractPathsFromDoc( fullDocument.getDocument(), documentKeyFields)); break; } diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp index 867a9f8a539..fc5d6b1e2ee 100644 --- a/src/mongo/db/pipeline/document_source_group.cpp +++ b/src/mongo/db/pipeline/document_source_group.cpp @@ -686,8 +686,7 @@ boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const { // for every permutation of group by (a, b, c), since we are guaranteed that documents with // the same value of (a, b, c) will be consecutive in the input stream, no matter what our // _id is. - std::set<std::string> fieldNames; - obj.getFieldNames(fieldNames); + auto fieldNames = obj.getFieldNames<std::set<std::string>>(); if (fieldNames == deps.fields) { return obj; } diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index aac7f5d31c8..071c9bcb37f 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_out_gen.h" #include "mongo/db/pipeline/document_source_out_in_place.h" @@ -83,13 +84,6 @@ const char* DocumentSourceOut::getSourceName() const { return "$out"; } -void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) { - BSONObj err = pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), toInsert); - uassert(16996, - str::stream() << "insert for $out failed: " << err, - DBClientBase::getLastErrorString(err).empty()); -} - DocumentSource::GetNextResult DocumentSourceOut::getNext() { pExpCtx->checkForInterrupt(); @@ -103,24 +97,31 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { } // Insert all documents into temp collection, batching to perform vectored inserts. - vector<BSONObj> bufferedObjects; + BatchedObjects batch; int bufferedBytes = 0; auto nextInput = pSource->getNext(); for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) { - BSONObj toInsert = nextInput.releaseDocument().toBson(); - - bufferedBytes += toInsert.objsize(); - if (!bufferedObjects.empty() && (bufferedBytes > BSONObjMaxUserSize || - bufferedObjects.size() >= write_ops::kMaxWriteBatchSize)) { - spill(bufferedObjects); - bufferedObjects.clear(); - bufferedBytes = toInsert.objsize(); + auto doc = nextInput.releaseDocument(); + + // Extract the unique key before converting the document to BSON. + auto uniqueKey = document_path_support::extractPathsFromDoc(doc, _uniqueKeyFields); + auto insertObj = doc.toBson(); + uassert(50905, + str::stream() << "Failed to extract unique key from input document: " << insertObj, + uniqueKey.size() == _uniqueKeyFields.size()); + + bufferedBytes += insertObj.objsize(); + if (!batch.empty() && + (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) { + spill(batch); + batch.clear(); + bufferedBytes = insertObj.objsize(); } - bufferedObjects.push_back(toInsert); + batch.emplace(std::move(insertObj), uniqueKey.toBson()); } - if (!bufferedObjects.empty()) - spill(bufferedObjects); + if (!batch.empty()) + spill(batch); switch (nextInput.getStatus()) { case GetNextResult::ReturnStatus::kAdvanced: { @@ -144,12 +145,12 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() { DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, WriteModeEnum mode, - boost::optional<Document> uniqueKey) + std::set<FieldPath> uniqueKey) : DocumentSource(expCtx), _done(false), _outputNs(outputNs), _mode(mode), - _uniqueKey(uniqueKey) {} + _uniqueKeyFields(std::move(uniqueKey)) {} intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { @@ -164,22 +165,24 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern); auto mode = WriteModeEnum::kModeReplaceCollection; - boost::optional<Document> uniqueKey; + std::set<FieldPath> uniqueKey; NamespaceString outputNs; if (elem.type() == BSONType::String) { outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str()); + uniqueKey.emplace("_id"); } else if (elem.type() == BSONType::Object) { auto spec = DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject()); mode = spec.getMode(); - uassert(ErrorCodes::InvalidOptions, - str::stream() << "$out is not currently supported with mode " - << WriteMode_serializer(mode), - mode != WriteModeEnum::kModeReplaceDocuments); - if (auto uniqueKeyDoc = spec.getUniqueKey()) { - uniqueKey = Document{{uniqueKeyDoc.get()}}; + // Convert unique key object to a vector of FieldPaths. + if (auto uniqueKeyObj = spec.getUniqueKey()) { + uniqueKey = uniqueKeyObj->getFieldNames<std::set<FieldPath>>(); + } else { + // TODO SERVER-35954: If not present, build the unique key from the shard key of the + // output collection. + uniqueKey.emplace("_id"); } // Retrieve the target database from the user command, otherwise use the namespace from the @@ -203,6 +206,8 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( return new DocumentSourceOutReplaceColl(outputNs, expCtx, mode, uniqueKey); case WriteModeEnum::kModeInsertDocuments: return new DocumentSourceOutInPlace(outputNs, expCtx, mode, uniqueKey); + case WriteModeEnum::kModeReplaceDocuments: + return new DocumentSourceOutInPlaceReplace(outputNs, expCtx, mode, uniqueKey); default: MONGO_UNREACHABLE; } @@ -213,9 +218,11 @@ Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> ex Document{{DocumentSourceOutSpec::kTargetCollectionFieldName, _outputNs.coll()}, {DocumentSourceOutSpec::kTargetDbFieldName, _outputNs.db()}, {DocumentSourceOutSpec::kModeFieldName, WriteMode_serializer(_mode)}}); - if (_uniqueKey) { - serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(_uniqueKey.get()); + BSONObjBuilder uniqueKeyBob; + for (auto path : _uniqueKeyFields) { + uniqueKeyBob.append(path.fullPath(), 1); } + serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(uniqueKeyBob.done()); return Value(Document{{getSourceName(), serialized.freeze()}}); } diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 18a44cd8ac3..5168346957b 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -44,7 +44,7 @@ public: DocumentSourceOut(const NamespaceString& outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, WriteModeEnum mode, - boost::optional<Document> uniqueKey); + std::set<FieldPath> uniqueKey); virtual ~DocumentSourceOut() = default; @@ -86,6 +86,44 @@ public: virtual void initializeWriteNs() = 0; /** + * Storage for a batch of BSON Objects to be inserted/updated to the write namespace. The + * extracted unique key values are also stored in a batch, used by $out with mode + * "replaceDocuments" as the query portion of the update. + * + */ + struct BatchedObjects { + void emplace(BSONObj obj, BSONObj key) { + objects.emplace_back(std::move(obj)); + uniqueKeys.emplace_back(std::move(key)); + } + + bool empty() const { + return objects.empty(); + } + + size_t size() const { + return objects.size(); + } + + void clear() { + objects.clear(); + uniqueKeys.clear(); + } + + std::vector<BSONObj> objects; + // Store the unique keys as BSON objects instead of Documents for compatibility with the + // batch update command. (e.g. {q: <array of uniqueKeys>, u: <array of objects>}) + std::vector<BSONObj> uniqueKeys; + }; + + /** + * Writes the documents in 'batch' to the write namespace. + */ + virtual void spill(const BatchedObjects& batch) { + pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), batch.objects); + }; + + /** * Finalize the output collection, called when there are no more documents to write. */ virtual void finalize() = 0; @@ -94,17 +132,16 @@ public: BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); private: - /** - * Inserts all of 'toInsert' into the collection returned from getWriteNs(). - */ - void spill(const std::vector<BSONObj>& toInsert); - bool _initialized = false; bool _done = false; const NamespaceString _outputNs; WriteModeEnum _mode; - boost::optional<Document> _uniqueKey; + + // Holds the unique key used for uniquely identifying documents. There must exist a unique index + // with this key pattern (up to order). Default is "_id" for unsharded collections, and "_id" + // plus the shard key for sharded collections. + std::set<FieldPath> _uniqueKeyFields; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h index 6bdb13ee395..fa28ae78cae 100644 --- a/src/mongo/db/pipeline/document_source_out_in_place.h +++ b/src/mongo/db/pipeline/document_source_out_in_place.h @@ -33,9 +33,10 @@ namespace mongo { /** - * Version of $out which writes directly to the output collection. + * Version of $out which performs inserts directly to the output collection, failing if there's a + * duplicate key. */ -class DocumentSourceOutInPlace final : public DocumentSourceOut { +class DocumentSourceOutInPlace : public DocumentSourceOut { public: using DocumentSourceOut::DocumentSourceOut; @@ -51,4 +52,23 @@ public: void finalize() final{}; }; + +/** + * Version of $out which replaces the documents in the output collection that match the unique key, + * or inserts the document if there is no match. + */ +class DocumentSourceOutInPlaceReplace final : public DocumentSourceOutInPlace { +public: + using DocumentSourceOutInPlace::DocumentSourceOutInPlace; + + void spill(const BatchedObjects& batch) final { + // Set upsert to true and multi to false as there should be at most one document to update + // or insert. + constexpr auto upsert = true; + constexpr auto multi = false; + pExpCtx->mongoProcessInterface->update( + pExpCtx, getWriteNs(), batch.uniqueKeys, batch.objects, upsert, multi); + } +}; + } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp index 412f49e7ab8..1c01b88ab35 100644 --- a/src/mongo/db/pipeline/document_source_out_test.cpp +++ b/src/mongo/db/pipeline/document_source_out_test.cpp @@ -40,13 +40,18 @@ namespace { using boost::intrusive_ptr; +StringData kModeFieldName = DocumentSourceOutSpec::kModeFieldName; +StringData kUniqueKeyFieldName = DocumentSourceOutSpec::kUniqueKeyFieldName; StringData kDefaultMode = WriteMode_serializer(WriteModeEnum::kModeReplaceCollection); class DocumentSourceOutTest : public AggregationContextFixture { public: - intrusive_ptr<DocumentSource> createOutStage(BSONObj spec) { + intrusive_ptr<DocumentSourceOut> createOutStage(BSONObj spec) { auto specElem = spec.firstElement(); - return DocumentSourceOut::createFromBson(specElem, getExpCtx()); + intrusive_ptr<DocumentSourceOut> outStage = dynamic_cast<DocumentSourceOut*>( + DocumentSourceOut::createFromBson(specElem, getExpCtx()).get()); + ASSERT_TRUE(outStage); + return outStage; } }; @@ -61,41 +66,124 @@ TEST_F(DocumentSourceOutTest, FailsToParseIncorrectType) { TEST_F(DocumentSourceOutTest, AcceptsStringArgument) { BSONObj spec = BSON("$out" << "some_collection"); - auto docSource = createOutStage(spec); - auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get()); + auto outStage = createOutStage(spec); ASSERT_EQ(outStage->getOutputNs().coll(), "some_collection"); } TEST_F(DocumentSourceOutTest, SerializeDefaultsModeRecreateCollection) { BSONObj spec = BSON("$out" << "some_collection"); - auto docSource = createOutStage(spec); - auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get()); + auto outStage = createOutStage(spec); auto serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][DocumentSourceOutSpec::kModeFieldName].getStringData(), - kDefaultMode); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); // Make sure we can reparse the serialized BSON. - auto reparsedDocSource = createOutStage(serialized.toBson()); - auto reparsedOut = dynamic_cast<DocumentSourceOut*>(reparsedDocSource.get()); - auto reSerialized = reparsedOut->serialize().getDocument(); - ASSERT_EQ(reSerialized["$out"][DocumentSourceOutSpec::kModeFieldName].getStringData(), - kDefaultMode); + auto reparsedOutStage = createOutStage(serialized.toBson()); + auto reSerialized = reparsedOutStage->serialize().getDocument(); + ASSERT_EQ(reSerialized["$out"][kModeFieldName].getStringData(), kDefaultMode); } -TEST_F(DocumentSourceOutTest, SerializeUniqueKeyOnlyIfSpecified) { +TEST_F(DocumentSourceOutTest, SerializeUniqueKeyDefaultsToId) { + BSONObj spec = BSON("$out" << BSON("to" + << "target" + << "mode" + << kDefaultMode)); + auto outStage = createOutStage(spec); + auto serialized = outStage->serialize().getDocument(); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), + (Document{{"_id", 1}})); + + spec = BSON("$out" + << "some_collection"); + outStage = createOutStage(spec); + serialized = outStage->serialize().getDocument(); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), + (Document{{"_id", 1}})); +} + +TEST_F(DocumentSourceOutTest, SerializeCompoundUniqueKey) { BSONObj spec = BSON("$out" << BSON("to" << "target" << "mode" << kDefaultMode << "uniqueKey" << BSON("_id" << 1 << "shardKey" << 1))); - auto docSource = createOutStage(spec); - auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get()); + auto outStage = createOutStage(spec); + auto serialized = outStage->serialize().getDocument(); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), + (Document{{"_id", 1}, {"shardKey", 1}})); +} + +TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKey) { + BSONObj spec = BSON("$out" << BSON("to" + << "target" + << "mode" + << kDefaultMode + << "uniqueKey" + << BSON("_id" << 1 << "a.b" << 1))); + auto outStage = createOutStage(spec); + auto serialized = outStage->serialize().getDocument(); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), + (Document{{"_id", 1}, {"a.b", 1}})); + + spec = BSON("$out" << BSON("to" + << "target" + << "mode" + << kDefaultMode + << "uniqueKey" + << BSON("_id.a" << 1))); + outStage = createOutStage(spec); + serialized = outStage->serialize().getDocument(); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), + (Document{{"_id.a", 1}})); +} + +TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKeySharedPrefix) { + BSONObj spec = BSON("$out" << BSON("to" + << "target" + << "mode" + << kDefaultMode + << "uniqueKey" + << BSON("_id" << 1 << "a.b" << 1 << "a.c" << 1))); + auto outStage = createOutStage(spec); auto serialized = outStage->serialize().getDocument(); - ASSERT_EQ(serialized["$out"][DocumentSourceOutSpec::kModeFieldName].getStringData(), - kDefaultMode); - ASSERT_DOCUMENT_EQ(serialized["$out"][DocumentSourceOutSpec::kUniqueKeyFieldName].getDocument(), + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), + (Document{{"_id", 1}, {"a.b", 1}, {"a.c", 1}})); +} + +TEST_F(DocumentSourceOutTest, SerializeDuplicateUniqueKey) { + BSONObj spec = BSON("$out" << BSON("to" + << "target" + << "mode" + << kDefaultMode + << "uniqueKey" + << BSON("_id" << 1 << "dupKey" << 1 << "dupKey" << 1))); + auto outStage = createOutStage(spec); + auto serialized = outStage->serialize().getDocument(); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), + (Document{{"_id", 1}, {"dupKey", 1}})); +} + +// TODO SERVER-36367: Nested objects should not be allowed in the uniqueKey spec. +TEST_F(DocumentSourceOutTest, SerializeNestedObjectInUniqueKey) { + BSONObj spec = BSON("$out" << BSON("to" + << "target" + << "mode" + << kDefaultMode + << "uniqueKey" + << BSON("_id" << 1 << "shardKey" + << BSON("subShardKey" << 1)))); + auto outStage = createOutStage(spec); + auto serialized = outStage->serialize().getDocument(); + ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode); + ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(), (Document{{"_id", 1}, {"shardKey", 1}})); } @@ -241,20 +329,10 @@ TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbIfSpecified) { BSONObj spec = BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db" << targetDb)); - auto docSource = createOutStage(spec); - auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get()); + auto outStage = createOutStage(spec); ASSERT_EQ(outStage->getOutputNs().db(), targetDb); ASSERT_EQ(outStage->getOutputNs().coll(), targetColl); } -TEST_F(DocumentSourceOutTest, ModeReplaceDocumentsNotSupported) { - BSONObj spec = BSON("$out" << BSON("to" - << "test" - << "mode" - << WriteMode_serializer( - WriteModeEnum::kModeReplaceDocuments))); - ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::InvalidOptions); -} - } // namespace } // namespace mongo diff --git a/src/mongo/db/pipeline/field_path.h b/src/mongo/db/pipeline/field_path.h index 171bf89a5a3..64fa7fe0611 100644 --- a/src/mongo/db/pipeline/field_path.h +++ b/src/mongo/db/pipeline/field_path.h @@ -119,4 +119,8 @@ private: // lookup. std::vector<size_t> _fieldPathDotPosition; }; + +inline bool operator<(const FieldPath& lhs, const FieldPath& rhs) { + return lhs.fullPath() < rhs.fullPath(); +} } diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index 3d0dafc306b..b650855d438 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -95,11 +95,22 @@ public: virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0; /** - * Inserts 'objs' into 'ns' and returns the "detailed" last error object. + * Inserts 'objs' into 'ns' and throws a UserException if the insert fails. */ - virtual BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& objs) = 0; + virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) = 0; + + /** + * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException + * if any of the updates fail. + */ + virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi) = 0; virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) = 0; diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp new file mode 100644 index 00000000000..e80f65da3df --- /dev/null +++ b/src/mongo/db/pipeline/mongod_process_interface.cpp @@ -0,0 +1,424 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/mongod_process_interface.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog/uuid_catalog.h" +#include "mongo/db/curop.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/ops/write_ops_gen.h" +#include "mongo/db/pipeline/document_source_cursor.h" +#include "mongo/db/pipeline/pipeline_d.h" +#include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/stats/fill_locker_info.h" +#include "mongo/db/stats/storage_stats.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" +#include "mongo/util/log.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using write_ops::Update; + +MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {} + +void MongoDInterface::setOperationContext(OperationContext* opCtx) { + _client.setOpCtx(opCtx); +} + +DBClientBase* MongoDInterface::directClient() { + return &_client; +} + +bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { + AutoGetCollectionForReadCommand autoColl(opCtx, nss); + auto const css = CollectionShardingState::get(opCtx, nss); + return css->getMetadata(opCtx)->isSharded(); +} + +void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) { + boost::optional<DisableDocumentValidation> maybeDisableValidation; + if (expCtx->bypassDocumentValidation) + maybeDisableValidation.emplace(expCtx->opCtx); + + _client.insert(ns.ns(), objs); + uassert(16996, + str::stream() << "Insert failed: " << _client.getLastError(), + _client.getLastError().empty()); +} + +void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi) { + BSONObjBuilder updateBob; + updateBob.append(Update::kCommandName, ns.coll()); + updateBob.append(Update::kDbNameFieldName, ns.db()); + updateBob.append(Update::kBypassDocumentValidationFieldName, expCtx->bypassDocumentValidation); + + // Build the array of UpdateOp entries. + invariant(queries.size() == updates.size()); + BSONArrayBuilder updatesArray; + for (size_t index = 0; index < queries.size(); ++index) { + updatesArray.append(BSON("q" << queries[index] << "u" << updates[index] << "multi" << multi + << "upsert" + << upsert)); + } + updateBob.append(Update::kUpdatesFieldName, updatesArray.arr()); + + auto updateObj = updateBob.done(); + auto writeResults = + performUpdates(expCtx->opCtx, Update::parse(IDLParserErrorContext("update"), updateObj)); + + // Verify that each of the update results is successful. + for (const auto& result : writeResults.results) { + uassert(50904, + str::stream() << "Update failed: " << result.getStatus(), + result.getStatus() == Status::OK()); + } +} + +CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) { + AutoGetCollectionForReadCommand autoColl(opCtx, ns); + + Collection* collection = autoColl.getCollection(); + if (!collection) { + LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); + return CollectionIndexUsageMap(); + } + + return collection->infoCache()->getIndexUsageStats(); +} + +void MongoDInterface::appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const { + Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder); +} + +Status MongoDInterface::appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const { + return appendCollectionStorageStats(opCtx, nss, param, builder); +} + +Status MongoDInterface::appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const { + return appendCollectionRecordCount(opCtx, nss, builder); +} + +BSONObj MongoDInterface::getCollectionOptions(const NamespaceString& nss) { + const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); +} + +void MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) { + Lock::GlobalWrite globalLock(opCtx); + + uassert(ErrorCodes::CommandFailed, + str::stream() << "collection options of target collection " << targetNs.ns() + << " changed during processing. Original options: " + << originalCollectionOptions + << ", new options: " + << getCollectionOptions(targetNs), + SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions == + getCollectionOptions(targetNs))); + + auto currentIndexes = _client.getIndexSpecs(targetNs.ns()); + uassert(ErrorCodes::CommandFailed, + str::stream() << "indexes of target collection " << targetNs.ns() + << " changed during processing.", + originalIndexes.size() == currentIndexes.size() && + std::equal(originalIndexes.begin(), + originalIndexes.end(), + currentIndexes.begin(), + SimpleBSONObjComparator::kInstance.makeEqualTo())); + + BSONObj info; + uassert(ErrorCodes::CommandFailed, + str::stream() << "renameCollection failed: " << info, + _client.runCommand("admin", renameCommandObj, info)); +} + +StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoDInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) { + auto pipeline = Pipeline::parse(rawPipeline, expCtx); + if (!pipeline.isOK()) { + return pipeline.getStatus(); + } + + if (opts.optimize) { + pipeline.getValue()->optimizePipeline(); + } + + Status cursorStatus = Status::OK(); + + if (opts.attachCursorSource) { + cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); + } + + return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; +} + +Status MongoDInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { + invariant(pipeline->getSources().empty() || + !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); + + boost::optional<AutoGetCollectionForReadCommand> autoColl; + if (expCtx->uuid) { + try { + autoColl.emplace(expCtx->opCtx, + NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}); + } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { + // The UUID doesn't exist anymore + return ex.toStatus(); + } + } else { + autoColl.emplace(expCtx->opCtx, expCtx->ns); + } + + // makePipeline() is only called to perform secondary aggregation requests and expects the + // collection representing the document source to be not-sharded. We confirm sharding state + // here to avoid taking a collection lock elsewhere for this purpose alone. + // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor + // until after we release the lock, leaving room for a collection to be sharded in-between. + auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); + uassert(4567, + str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", + !css->getMetadata(expCtx->opCtx)->isSharded()); + + PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); + + return Status::OK(); +} + +std::string MongoDInterface::getShardName(OperationContext* opCtx) const { + if (ShardingState::get(opCtx)->enabled()) { + return ShardingState::get(opCtx)->shardId().toString(); + } + + return std::string(); +} + +std::pair<std::vector<FieldPath>, bool> MongoDInterface::collectDocumentKeyFields( + OperationContext* opCtx, UUID uuid) const { + if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { + return {{"_id"}, false}; // Nothing is sharded. + } + + // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and + // mark the fields as final. + auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid); + if (nss.isEmpty()) { + return {{"_id"}, true}; + } + + // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache + // to determine whether the collection is sharded in the first place. + auto catalogCache = Grid::get(opCtx)->catalogCache(); + + const bool collectionIsSharded = catalogCache && [&]() { + auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); + return routingInfo.isOK() && routingInfo.getValue().cm(); + }(); + + // Collection exists and is not sharded, mark as not final. + if (!collectionIsSharded) { + return {{"_id"}, false}; + } + + auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { + AutoGetCollection autoColl(opCtx, nss, MODE_IS); + return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); + }(); + + // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated + // as sharded. + if (!scm->isSharded() || !scm->uuidMatches(uuid)) { + return {{"_id"}, false}; + } + + // Unpack the shard key. + std::vector<FieldPath> result; + bool gotId = false; + for (auto& field : scm->getKeyPatternFields()) { + result.emplace_back(field->dottedField()); + gotId |= (result.back().fullPath() == "_id"); + } + if (!gotId) { // If not part of the shard key, "_id" comes last. + result.emplace_back("_id"); + } + // Collection is now sharded so the document key fields will never change, mark as final. + return {result, true}; +} + +std::vector<GenericCursor> MongoDInterface::getCursors( + const intrusive_ptr<ExpressionContext>& expCtx) const { + return CursorManager::getAllCursors(expCtx->opCtx); +} + +boost::optional<Document> MongoDInterface::lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) { + invariant(!readConcern); // We don't currently support a read concern on mongod - it's only + // expected to be necessary on mongos. + + std::unique_ptr<Pipeline, PipelineDeleter> pipeline; + try { + // Be sure to do the lookup using the collection default collation + auto foreignExpCtx = expCtx->copyWith( + nss, + collectionUUID, + _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); + pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); + } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { + return boost::none; + } + + auto lookedUpDocument = pipeline->getNext(); + if (auto next = pipeline->getNext()) { + uasserted(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "found more than one document with document key " + << documentKey.toString() + << " [" + << lookedUpDocument->toString() + << ", " + << next->toString() + << "]"); + } + return lookedUpDocument; +} + +BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const { + BSONObjBuilder builder; + + CurOp::reportCurrentOpForClient( + opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder); + + OperationContext* clientOpCtx = client->getOperationContext(); + + if (clientOpCtx) { + if (auto opCtxSession = OperationContextSession::get(clientOpCtx)) { + opCtxSession->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder); + } + + // Append lock stats before returning. + if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo()) { + fillLockerInfo(*lockerInfo, builder); + } + } + + return builder.obj(); +} + +void MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const { + auto sessionCatalog = SessionCatalog::get(opCtx); + + const bool authEnabled = + AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); + + // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to + // create a pattern that will match against all authenticated usernames for the current client. + // If the user is listing ops for all users, we create an empty pattern; constructing an + // instance of SessionKiller::Matcher with this empty pattern will return all sessions. + auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers + ? makeSessionFilterForAuthenticatedUsers(opCtx) + : KillAllSessionsByPatternSet{{}}); + + sessionCatalog->scanSessions(opCtx, + {std::move(sessionFilter)}, + [&](OperationContext* opCtx, Session* session) { + auto op = session->reportStashedState(); + if (!op.isEmpty()) { + ops->emplace_back(op); + } + }); +} + +std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollator( + OperationContext* opCtx, StringData dbName, UUID collectionUUID) { + auto it = _collatorCache.find(collectionUUID); + if (it == _collatorCache.end()) { + auto collator = [&]() -> std::unique_ptr<CollatorInterface> { + AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS); + if (!autoColl.getCollection()) { + // This collection doesn't exist, so assume a nullptr default collation + return nullptr; + } else { + auto defaultCollator = autoColl.getCollection()->getDefaultCollator(); + // Clone the collator so that we can safely use the pointer if the collection + // disappears right after we release the lock. + return defaultCollator ? defaultCollator->clone() : nullptr; + } + }(); + + it = _collatorCache.emplace(collectionUUID, std::move(collator)).first; + } + + auto& collator = it->second; + return collator ? collator->clone() : nullptr; +} + +} // namespace mongo diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h new file mode 100644 index 00000000000..2b9ab8d8f19 --- /dev/null +++ b/src/mongo/db/pipeline/mongod_process_interface.h @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/pipeline/mongo_process_common.h" +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { + +/** + * Class to provide access to mongod-specific implementations of methods required by some + * document sources. + */ +class MongoDInterface final : public MongoProcessCommon { +public: + MongoDInterface(OperationContext* opCtx); + + void setOperationContext(OperationContext* opCtx) final; + DBClientBase* directClient() final; + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; + void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) final; + void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi) final; + + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; + void appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const final; + Status appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const final; + Status appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const final; + BSONObj getCollectionOptions(const NamespaceString& nss) final; + void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) final; + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts = MakePipelineOptions{}) final; + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final; + std::string getShardName(OperationContext* opCtx) const final; + std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext* opCtx, + UUID uuid) const final; + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final; + std::vector<GenericCursor> getCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + +protected: + BSONObj _reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const final; + + void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const final; + +private: + /** + * Looks up the collection default collator for the collection given by 'collectionUUID'. A + * collection's default collation is not allowed to change, so we cache the result to allow + * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr' + * if the collection does not exist or if the collection's default collation is the simple + * collation. + */ + std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(OperationContext* opCtx, + StringData dbName, + UUID collectionUUID); + + DBDirectClient _client; + std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache; +}; + +} // namespace mongo diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 4d91ddfff08..7e3e451a63c 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -28,7 +28,7 @@ #include "mongo/platform/basic.h" -#include "mongo/s/commands/pipeline_s.h" +#include "mongo/db/pipeline/mongos_process_interface.h" #include "mongo/db/curop.h" #include "mongo/db/pipeline/document_source.h" @@ -90,7 +90,7 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( } // namespace -boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( +boost::optional<Document> MongoSInterface::lookupSingleDocument( const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, UUID collectionUUID, @@ -185,8 +185,9 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument( return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{}); } -BSONObj PipelineS::MongoSInterface::_reportCurrentOpForClient( - OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { +BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const { BSONObjBuilder builder; CurOp::reportCurrentOpForClient( @@ -195,7 +196,7 @@ BSONObj PipelineS::MongoSInterface::_reportCurrentOpForClient( return builder.obj(); } -std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors( +std::vector<GenericCursor> MongoSInterface::getCursors( const intrusive_ptr<ExpressionContext>& expCtx) const { invariant(hasGlobalServiceContext()); auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager(); diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h new file mode 100644 index 00000000000..f01a3390deb --- /dev/null +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/mongo_process_common.h" +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { + +/** + * Class to provide access to mongos-specific implementations of methods required by some + * document sources. + */ +class MongoSInterface final : public MongoProcessCommon { +public: + MongoSInterface() = default; + + virtual ~MongoSInterface() = default; + + void setOperationContext(OperationContext* opCtx) final {} + + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final; + + std::vector<GenericCursor> getCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + + DBClientBase* directClient() final { + MONGO_UNREACHABLE; + } + + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { + MONGO_UNREACHABLE; + } + + void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) final { + MONGO_UNREACHABLE; + } + + void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi) final { + MONGO_UNREACHABLE; + } + + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) final { + MONGO_UNREACHABLE; + } + + void appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + Status appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + Status appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + BSONObj getCollectionOptions(const NamespaceString& nss) final { + MONGO_UNREACHABLE; + } + + void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) final { + MONGO_UNREACHABLE; + } + + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + MONGO_UNREACHABLE; + } + + std::string getShardName(OperationContext* opCtx) const final { + MONGO_UNREACHABLE; + } + + std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*, + UUID) const final { + MONGO_UNREACHABLE; + } + + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) final { + MONGO_UNREACHABLE; + } + +protected: + BSONObj _reportCurrentOpForClient(OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps) const final; + + void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const final { + // This implementation is a no-op, since mongoS does not maintain a SessionCatalog or + // hold stashed locks for idle sessions. + } +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f49b3dfdd8b..f3317c81b3b 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -33,24 +33,18 @@ #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_catalog.h" -#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/multi_iterator.h" #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" -#include "mongo/db/kill_sessions.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/document_source.h" @@ -73,9 +67,6 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" -#include "mongo/db/session_catalog.h" -#include "mongo/db/stats/fill_locker_info.h" -#include "mongo/db/stats/storage_stats.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" @@ -671,330 +662,4 @@ void PipelineD::getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut->usedDisk = usedDisk; } -PipelineD::MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {} - -void PipelineD::MongoDInterface::setOperationContext(OperationContext* opCtx) { - _client.setOpCtx(opCtx); -} - -DBClientBase* PipelineD::MongoDInterface::directClient() { - return &_client; -} - -bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForReadCommand autoColl(opCtx, nss); - auto const css = CollectionShardingState::get(opCtx, nss); - return css->getMetadata(opCtx)->isSharded(); -} - -BSONObj PipelineD::MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& objs) { - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (expCtx->bypassDocumentValidation) - maybeDisableValidation.emplace(expCtx->opCtx); - - _client.insert(ns.ns(), objs); - return _client.getLastErrorDetailed(); -} - -CollectionIndexUsageMap PipelineD::MongoDInterface::getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) { - AutoGetCollectionForReadCommand autoColl(opCtx, ns); - - Collection* collection = autoColl.getCollection(); - if (!collection) { - LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); - return CollectionIndexUsageMap(); - } - - return collection->infoCache()->getIndexUsageStats(); -} - -void PipelineD::MongoDInterface::appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const { - Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder); -} - -Status PipelineD::MongoDInterface::appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const { - return appendCollectionStorageStats(opCtx, nss, param, builder); -} - -Status PipelineD::MongoDInterface::appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const { - return appendCollectionRecordCount(opCtx, nss, builder); -} - -BSONObj PipelineD::MongoDInterface::getCollectionOptions(const NamespaceString& nss) { - const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); -} - -void PipelineD::MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged( - OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) { - Lock::GlobalWrite globalLock(opCtx); - - uassert(ErrorCodes::CommandFailed, - str::stream() << "collection options of target collection " << targetNs.ns() - << " changed during processing. Original options: " - << originalCollectionOptions - << ", new options: " - << getCollectionOptions(targetNs), - SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions == - getCollectionOptions(targetNs))); - - auto currentIndexes = _client.getIndexSpecs(targetNs.ns()); - uassert(ErrorCodes::CommandFailed, - str::stream() << "indexes of target collection " << targetNs.ns() - << " changed during processing.", - originalIndexes.size() == currentIndexes.size() && - std::equal(originalIndexes.begin(), - originalIndexes.end(), - currentIndexes.begin(), - SimpleBSONObjComparator::kInstance.makeEqualTo())); - - BSONObj info; - uassert(ErrorCodes::CommandFailed, - str::stream() << "renameCollection failed: " << info, - _client.runCommand("admin", renameCommandObj, info)); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterface::makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } - - if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); - } - - Status cursorStatus = Status::OK(); - - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); - } - - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; -} - -Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); - - boost::optional<AutoGetCollectionForReadCommand> autoColl; - if (expCtx->uuid) { - try { - autoColl.emplace(expCtx->opCtx, - NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - // The UUID doesn't exist anymore - return ex.toStatus(); - } - } else { - autoColl.emplace(expCtx->opCtx, expCtx->ns); - } - - // makePipeline() is only called to perform secondary aggregation requests and expects the - // collection representing the document source to be not-sharded. We confirm sharding state - // here to avoid taking a collection lock elsewhere for this purpose alone. - // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor - // until after we release the lock, leaving room for a collection to be sharded in-between. - auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); - uassert(4567, - str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", - !css->getMetadata(expCtx->opCtx)->isSharded()); - - PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - - return Status::OK(); -} - -std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) const { - if (ShardingState::get(opCtx)->enabled()) { - return ShardingState::get(opCtx)->shardId().toString(); - } - - return std::string(); -} - -std::pair<std::vector<FieldPath>, bool> PipelineD::MongoDInterface::collectDocumentKeyFields( - OperationContext* opCtx, UUID uuid) const { - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { - return {{"_id"}, false}; // Nothing is sharded. - } - - // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and - // mark the fields as final. - auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid); - if (nss.isEmpty()) { - return {{"_id"}, true}; - } - - // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache - // to determine whether the collection is sharded in the first place. - auto catalogCache = Grid::get(opCtx)->catalogCache(); - - const bool collectionIsSharded = catalogCache && [&]() { - auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); - return routingInfo.isOK() && routingInfo.getValue().cm(); - }(); - - // Collection exists and is not sharded, mark as not final. - if (!collectionIsSharded) { - return {{"_id"}, false}; - } - - auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { - AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); - }(); - - // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated - // as sharded. - if (!scm->isSharded() || !scm->uuidMatches(uuid)) { - return {{"_id"}, false}; - } - - // Unpack the shard key. - std::vector<FieldPath> result; - bool gotId = false; - for (auto& field : scm->getKeyPatternFields()) { - result.emplace_back(field->dottedField()); - gotId |= (result.back().fullPath() == "_id"); - } - if (!gotId) { // If not part of the shard key, "_id" comes last. - result.emplace_back("_id"); - } - // Collection is now sharded so the document key fields will never change, mark as final. - return {result, true}; -} - -std::vector<GenericCursor> PipelineD::MongoDInterface::getCursors( - const intrusive_ptr<ExpressionContext>& expCtx) const { - return CursorManager::getAllCursors(expCtx->opCtx); -} - -boost::optional<Document> PipelineD::MongoDInterface::lookupSingleDocument( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) { - invariant(!readConcern); // We don't currently support a read concern on mongod - it's only - // expected to be necessary on mongos. - - std::unique_ptr<Pipeline, PipelineDeleter> pipeline; - try { - // Be sure to do the lookup using the collection default collation - auto foreignExpCtx = expCtx->copyWith( - nss, - collectionUUID, - _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { - return boost::none; - } - - auto lookedUpDocument = pipeline->getNext(); - if (auto next = pipeline->getNext()) { - uasserted(ErrorCodes::TooManyMatchingDocuments, - str::stream() << "found more than one document with document key " - << documentKey.toString() - << " [" - << lookedUpDocument->toString() - << ", " - << next->toString() - << "]"); - } - return lookedUpDocument; -} - -BSONObj PipelineD::MongoDInterface::_reportCurrentOpForClient( - OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { - BSONObjBuilder builder; - - CurOp::reportCurrentOpForClient( - opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder); - - OperationContext* clientOpCtx = client->getOperationContext(); - - if (clientOpCtx) { - if (auto opCtxSession = OperationContextSession::get(clientOpCtx)) { - opCtxSession->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder); - } - - // Append lock stats before returning. - if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo()) { - fillLockerInfo(*lockerInfo, builder); - } - } - - return builder.obj(); -} - -void PipelineD::MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const { - auto sessionCatalog = SessionCatalog::get(opCtx); - - const bool authEnabled = - AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); - - // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to - // create a pattern that will match against all authenticated usernames for the current client. - // If the user is listing ops for all users, we create an empty pattern; constructing an - // instance of SessionKiller::Matcher with this empty pattern will return all sessions. - auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers - ? makeSessionFilterForAuthenticatedUsers(opCtx) - : KillAllSessionsByPatternSet{{}}); - - sessionCatalog->scanSessions(opCtx, - {std::move(sessionFilter)}, - [&](OperationContext* opCtx, Session* session) { - auto op = session->reportStashedState(); - if (!op.isEmpty()) { - ops->emplace_back(op); - } - }); -} - -std::unique_ptr<CollatorInterface> PipelineD::MongoDInterface::_getCollectionDefaultCollator( - OperationContext* opCtx, StringData dbName, UUID collectionUUID) { - auto it = _collatorCache.find(collectionUUID); - if (it == _collatorCache.end()) { - auto collator = [&]() -> std::unique_ptr<CollatorInterface> { - AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS); - if (!autoColl.getCollection()) { - // This collection doesn't exist, so assume a nullptr default collation - return nullptr; - } else { - auto defaultCollator = autoColl.getCollection()->getDefaultCollator(); - // Clone the collator so that we can safely use the pointer if the collection - // disappears right after we release the lock. - return defaultCollator ? defaultCollator->clone() : nullptr; - } - }(); - - it = _collatorCache.emplace(collectionUUID, std::move(collator)).first; - } - - auto& collator = it->second; - return collator ? collator->clone() : nullptr; -} - } // namespace mongo diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 8732461fe56..5c3d0464462 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -32,11 +32,9 @@ #include <memory> #include "mongo/bson/bsonobj.h" -#include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/document_source_cursor.h" -#include "mongo/db/pipeline/mongo_process_common.h" #include "mongo/db/query/plan_executor.h" namespace mongo { @@ -62,79 +60,6 @@ struct DepsTracker; */ class PipelineD { public: - class MongoDInterface final : public MongoProcessCommon { - public: - MongoDInterface(OperationContext* opCtx); - - void setOperationContext(OperationContext* opCtx) final; - DBClientBase* directClient() final; - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final; - BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& objs) final; - CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) final; - void appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const final; - Status appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const final; - Status appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const final; - BSONObj getCollectionOptions(const NamespaceString& nss) final; - void renameIfOptionsAndIndexesHaveNotChanged( - OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) final; - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts = MakePipelineOptions{}) final; - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final; - std::string getShardName(OperationContext* opCtx) const final; - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext* opCtx, - UUID uuid) const final; - boost::optional<Document> lookupSingleDocument( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) final; - std::vector<GenericCursor> getCursors( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; - - protected: - BSONObj _reportCurrentOpForClient(OperationContext* opCtx, - Client* client, - CurrentOpTruncateMode truncateOps) const final; - - void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const final; - - private: - /** - * Looks up the collection default collator for the collection given by 'collectionUUID'. A - * collection's default collation is not allowed to change, so we cache the result to allow - * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr' - * if the collection does not exist or if the collection's default collation is the simple - * collation. - */ - std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(OperationContext* opCtx, - StringData dbName, - UUID collectionUUID); - - DBDirectClient _client; - std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache; - }; - /** * If the first stage in the pipeline does not generate its own output documents, attaches a * cursor document source to the front of the pipeline which will output documents from the diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp index bdfae082c72..9d28060c00d 100644 --- a/src/mongo/db/pipeline/pipeline_test.cpp +++ b/src/mongo/db/pipeline/pipeline_test.cpp @@ -2120,7 +2120,8 @@ class Out : public needsPrimaryShardMergerBase { } string mergePipeJson() { return "[{$out: {to: 'outColl', db: 'a', mode: '" + - WriteMode_serializer(WriteModeEnum::kModeReplaceCollection) + "'}}]"; + WriteMode_serializer(WriteModeEnum::kModeReplaceCollection) + + "', uniqueKey: {_id: 1}}}]"; } }; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 6c7ed0b7138..3e80f90e3a5 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -57,9 +57,18 @@ public: MONGO_UNREACHABLE; } - BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& objs) override { + void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) override { + MONGO_UNREACHABLE; + } + + void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& queries, + const std::vector<BSONObj>& updates, + bool upsert, + bool multi) final { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/repl/idempotency_update_sequence_test.cpp b/src/mongo/db/repl/idempotency_update_sequence_test.cpp index b9a251dabbb..11797d5a6a6 100644 --- a/src/mongo/db/repl/idempotency_update_sequence_test.cpp +++ b/src/mongo/db/repl/idempotency_update_sequence_test.cpp @@ -126,8 +126,7 @@ TEST(UpdateGenTest, UpdatesHaveValidPaths) { FAIL(sb.str()); } - std::set<std::string> argPaths; - updateArg.getFieldNames(argPaths); + auto argPaths = updateArg.getFieldNames<std::set<std::string>>(); std::set<std::string> correctPaths{"a", "b", "a.0", "a.b", "b.0"}; for (auto path : argPaths) { FieldRef pathRef(path); @@ -165,8 +164,7 @@ TEST(UpdateGenTest, UpdatesAreNotAmbiguous) { sb << "The generated update is not a $set or $unset BSONObj: " << update; FAIL(sb.str()); } - std::set<std::string> argPathsSet; - updateArg.getFieldNames(argPathsSet); + auto argPathsSet = updateArg.getFieldNames<std::set<std::string>>(); std::vector<std::unique_ptr<FieldRef>> argPathsRefVec; FieldRefSet pathRefSet; @@ -211,8 +209,7 @@ TEST(UpdateGenTest, UpdatesPreserveDepthConstraint) { setElem = update["$set"]; BSONObj updateArg = setElem.Obj(); - std::set<std::string> argPaths; - updateArg.getFieldNames(argPaths); + auto argPaths = updateArg.getFieldNames<std::set<std::string>>(); for (auto path : argPaths) { auto pathDepth = getPathDepth_forTest(path); auto particularSetArgument = updateArg[path]; diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index e4aa90e5524..60ba31793e0 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -99,7 +99,6 @@ env.Library( 'cluster_write_cmd.cpp', 'commands_public.cpp', 'kill_sessions_remote.cpp', - 'pipeline_s.cpp', 'strategy.cpp', env.Idlc('cluster_multicast.idl')[0], ], @@ -118,6 +117,7 @@ env.Library( '$BUILD_DIR/mongo/db/ftdc/ftdc_server', '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/pipeline/aggregation', + '$BUILD_DIR/mongo/db/pipeline/mongos_process_interface', '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/db/views/views', '$BUILD_DIR/mongo/executor/async_multicaster', diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 1aa3045ea5f..867c6932d21 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -48,6 +48,7 @@ #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/lite_parsed_pipeline.h" +#include "mongo/db/pipeline/mongos_process_interface.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/collation/collator_factory_interface.h" #include "mongo/db/query/cursor_response.h" @@ -59,7 +60,6 @@ #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" -#include "mongo/s/commands/pipeline_s.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_client_cursor_params.h" @@ -781,7 +781,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* auto mergeCtx = new ExpressionContext(opCtx, request, std::move(collation), - std::make_shared<PipelineS::MongoSInterface>(), + std::make_shared<MongoSInterface>(), resolveInvolvedNamespaces(opCtx, litePipe), uuid); diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h deleted file mode 100644 index ebbe8ab72ca..00000000000 --- a/src/mongo/s/commands/pipeline_s.h +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/db/pipeline/mongo_process_common.h" -#include "mongo/db/pipeline/pipeline.h" - -namespace mongo { -/** - * PipelineS is an extension of the Pipeline class to provide additional utility functions on - * mongoS. For example, it can inject the pipeline with an implementation of MongoProcessInterface - * which provides mongos-specific versions of methods required by some document sources. - */ -class PipelineS { -public: - /** - * Class to provide access to mongos-specific implementations of methods required by some - * document sources. - */ - class MongoSInterface final : public MongoProcessCommon { - public: - MongoSInterface() = default; - - virtual ~MongoSInterface() = default; - - void setOperationContext(OperationContext* opCtx) final {} - - boost::optional<Document> lookupSingleDocument( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) final; - - std::vector<GenericCursor> getCursors( - const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; - - DBClientBase* directClient() final { - MONGO_UNREACHABLE; - } - - bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { - MONGO_UNREACHABLE; - } - - BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& objs) final { - MONGO_UNREACHABLE; - } - - CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) final { - MONGO_UNREACHABLE; - } - - void appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - Status appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - Status appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - BSONObj getCollectionOptions(const NamespaceString& nss) final { - MONGO_UNREACHABLE; - } - - void renameIfOptionsAndIndexesHaveNotChanged( - OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) final { - MONGO_UNREACHABLE; - } - - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - MONGO_UNREACHABLE; - } - - std::string getShardName(OperationContext* opCtx) const final { - MONGO_UNREACHABLE; - } - - std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*, - UUID) const final { - MONGO_UNREACHABLE; - } - - StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) final { - MONGO_UNREACHABLE; - } - - protected: - BSONObj _reportCurrentOpForClient(OperationContext* opCtx, - Client* client, - CurrentOpTruncateMode truncateOps) const final; - - void _reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const final { - // This implementation is a no-op, since mongoS does not maintain a SessionCatalog or - // hold stashed locks for idle sessions. - } - }; - -private: - PipelineS() = delete; // Should never be instantiated. -}; - -} // namespace mongo |