summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-07-26 16:00:20 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-08-02 12:23:03 -0400
commitafbe688f0f18c5cb474fb1bcd933d6e06c0c5291 (patch)
treea7c1a3192f335c5e2947e395159eaa03782c3a50
parentafaf46687eb3930ddbfc8b528bd68295b6a09676 (diff)
downloadmongo-afbe688f0f18c5cb474fb1bcd933d6e06c0c5291.tar.gz
SERVER-35896: Support 'replaceDocuments' mode in $out
-rw-r--r--jstests/aggregation/sources/out/batch_writes.js61
-rw-r--r--jstests/aggregation/sources/out/mode_replace_documents.js93
-rw-r--r--src/mongo/bson/bsonobj.cpp14
-rw-r--r--src/mongo/bson/bsonobj.h19
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp3
-rw-r--r--src/mongo/db/pipeline/SConscript37
-rw-r--r--src/mongo/db/pipeline/document.h5
-rw-r--r--src/mongo/db/pipeline/document_path_support.cpp8
-rw-r--r--src/mongo/db/pipeline/document_path_support.h12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_group.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp67
-rw-r--r--src/mongo/db/pipeline/document_source_out.h51
-rw-r--r--src/mongo/db/pipeline/document_source_out_in_place.h24
-rw-r--r--src/mongo/db/pipeline/document_source_out_test.cpp138
-rw-r--r--src/mongo/db/pipeline/field_path.h4
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h19
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.cpp424
-rw-r--r--src/mongo/db/pipeline/mongod_process_interface.h119
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp (renamed from src/mongo/s/commands/pipeline_s.cpp)11
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h152
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp335
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h75
-rw-r--r--src/mongo/db/pipeline/pipeline_test.cpp3
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h15
-rw-r--r--src/mongo/db/repl/idempotency_update_sequence_test.cpp9
-rw-r--r--src/mongo/s/commands/SConscript2
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp4
-rw-r--r--src/mongo/s/commands/pipeline_s.h154
30 files changed, 1176 insertions, 688 deletions
diff --git a/jstests/aggregation/sources/out/batch_writes.js b/jstests/aggregation/sources/out/batch_writes.js
new file mode 100644
index 00000000000..6bbaac7737a
--- /dev/null
+++ b/jstests/aggregation/sources/out/batch_writes.js
@@ -0,0 +1,61 @@
+// Tests the behavior of an $out stage which encounters an error in the middle of processing. We
+// don't guarantee any particular behavior in this scenario, but this test exists to make sure
+// nothing horrendous happens and to characterize the current behavior.
+// @tags: [assumes_unsharded_collection]
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+ const coll = db.batch_writes;
+ const outColl = db.batch_writes_out;
+ coll.drop();
+ outColl.drop();
+
+ // Test with 2 very large documents that do not fit into a single batch.
+ const kSize15MB = 15 * 1024 * 1024;
+ const largeArray = new Array(kSize15MB).join("a");
+ assert.commandWorked(coll.insert({_id: 0, a: largeArray}));
+ assert.commandWorked(coll.insert({_id: 1, a: largeArray}));
+
+ // Make sure the $out succeeds without any duplicate keys.
+ ["replaceCollection", "insertDocuments", "replaceDocuments"].forEach(mode => {
+ coll.aggregate([{$out: {to: outColl.getName(), mode: mode}}]);
+ assert.eq(2, outColl.find().itcount());
+ outColl.drop();
+ });
+
+ coll.drop();
+ for (let i = 0; i < 10; i++) {
+ assert.commandWorked(coll.insert({_id: i, a: i}));
+ }
+
+ // Create a unique index on 'a' in the output collection to create a unique key violation when
+ // running the $out. The second document to be written ({_id: 1, a: 1}) will conflict with the
+ // existing document in the output collection. We use a unique index on a field other than _id
+ // because "replaceDocuments" mode will not change _id when one already exists.
+ outColl.drop();
+ assert.commandWorked(outColl.insert({_id: 2, a: 1}));
+ assert.commandWorked(outColl.createIndex({a: 1}, {unique: true}));
+
+ // Test that both batched updates and inserts will successfully write the first document but
+ // fail on the second. We don't guarantee any particular behavior in this case, but this test is
+ // meant to characterize the current behavior.
+ assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "insertDocuments"}}], 16996);
+ assert.soon(() => {
+ return outColl.find().itcount() == 2;
+ });
+
+ assertErrorCode(coll, [{$out: {to: outColl.getName(), mode: "replaceDocuments"}}], 50904);
+ assert.soon(() => {
+ return outColl.find().itcount() == 2;
+ });
+
+ // Mode "replaceCollection" will drop the contents of the output collection, so there is no
+ // duplicate key error.
+ outColl.drop();
+ assert.commandWorked(outColl.insert({_id: 2, a: 1}));
+ assert.commandWorked(outColl.createIndex({a: 1}, {unique: true}));
+ coll.aggregate([{$out: {to: outColl.getName(), mode: "replaceCollection"}}]);
+ assert.eq(10, outColl.find().itcount());
+}());
diff --git a/jstests/aggregation/sources/out/mode_replace_documents.js b/jstests/aggregation/sources/out/mode_replace_documents.js
new file mode 100644
index 00000000000..8335981a1ff
--- /dev/null
+++ b/jstests/aggregation/sources/out/mode_replace_documents.js
@@ -0,0 +1,93 @@
+// Tests for the $out stage with mode set to "replaceDocuments".
+// @tags: [assumes_unsharded_collection]
+(function() {
+ "use strict";
+
+ load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+
+ const coll = db.replace_docs;
+ const outColl = db.replace_docs_out;
+ coll.drop();
+ outColl.drop();
+
+ const nDocs = 10;
+ for (let i = 0; i < nDocs; i++) {
+ assert.commandWorked(coll.insert({_id: i, a: i}));
+ }
+
+ // Test that a $out with 'replaceDocuments' mode will default the unique key to "_id".
+ coll.aggregate([{$out: {to: outColl.getName(), mode: "replaceDocuments"}}]);
+ assert.eq(nDocs, outColl.find().itcount());
+
+ // Test that 'replaceDocuments' mode will update existing documents that match the unique key.
+ const nDocsReplaced = 5;
+ coll.aggregate([
+ {$project: {_id: {$mod: ["$_id", nDocsReplaced]}}},
+ {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1}}}
+ ]);
+ assert.eq(nDocsReplaced, outColl.find({a: {$exists: false}}).itcount());
+
+ // Test 'replaceDocuments' mode with a dotted path unique key.
+ coll.drop();
+ outColl.drop();
+ assert.commandWorked(coll.insert({_id: 0, a: {b: 1}}));
+ assert.commandWorked(coll.insert({_id: 1, a: {b: 1}, c: 1}));
+ coll.aggregate([
+ {$addFields: {_id: 0}},
+ {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1, "a.b": 1}}}
+ ]);
+ assert.eq([{_id: 0, a: {b: 1}, c: 1}], outColl.find().toArray());
+
+ // TODO SERVER-36100: 'replaceDocuments' mode should allow a missing "_id" unique key.
+ assertErrorCode(coll,
+ [
+ {$project: {_id: 0}},
+ {
+ $out: {
+ to: outColl.getName(),
+ mode: "replaceDocuments",
+ }
+ }
+ ],
+ 50905);
+
+ // Test that 'replaceDocuments' mode with a missing non-id unique key fails.
+ assertErrorCode(
+ coll,
+ [{$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {missing: 1}}}],
+ 50905);
+
+ // Test that a replace fails to insert a document if it violates a unique index constraint. In
+ // this example, $out will attempt to insert multiple documents with {a: 0} which is not allowed
+ // with the unique index on {a: 1}.
+ coll.drop();
+ assert.commandWorked(coll.insert([{_id: 0}, {_id: 1}]));
+
+ outColl.drop();
+ assert.commandWorked(outColl.createIndex({a: 1}, {unique: true}));
+ assertErrorCode(
+ coll,
+ [{$addFields: {a: 0}}, {$out: {to: outColl.getName(), mode: "replaceDocuments"}}],
+ 50904);
+
+ // Test that $out fails if the unique key contains an array.
+ coll.drop();
+ assert.commandWorked(coll.insert({_id: 0, a: [1, 2]}));
+ assertErrorCode(
+ coll,
+ [
+ {$addFields: {_id: 0}},
+ {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1, "a.b": 1}}}
+ ],
+ 50905);
+
+ coll.drop();
+ assert.commandWorked(coll.insert({_id: 0, a: [{b: 1}]}));
+ assertErrorCode(
+ coll,
+ [
+ {$addFields: {_id: 0}},
+ {$out: {to: outColl.getName(), mode: "replaceDocuments", uniqueKey: {_id: 1, "a.b": 1}}}
+ ],
+ 50905);
+}());
diff --git a/src/mongo/bson/bsonobj.cpp b/src/mongo/bson/bsonobj.cpp
index 244d4ff5c67..ff03f9aac79 100644
--- a/src/mongo/bson/bsonobj.cpp
+++ b/src/mongo/bson/bsonobj.cpp
@@ -271,20 +271,6 @@ BSONElement BSONObj::getFieldUsingIndexNames(StringData fieldName, const BSONObj
return BSONElement();
}
-/* grab names of all the fields in this object */
-int BSONObj::getFieldNames(set<string>& fields) const {
- int n = 0;
- BSONObjIterator i(*this);
- while (i.moreWithEOO()) {
- BSONElement e = i.next();
- if (e.eoo())
- break;
- fields.insert(e.fieldName());
- n++;
- }
- return n;
-}
-
/* note: addFields always adds _id even if not specified
returns n added not counting _id unless requested.
*/
diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h
index c83f1abf368..ee441046c25 100644
--- a/src/mongo/bson/bsonobj.h
+++ b/src/mongo/bson/bsonobj.h
@@ -274,8 +274,11 @@ public:
*/
int nFields() const;
- /** adds the field names to the fields set. does NOT clear it (appends). */
- int getFieldNames(std::set<std::string>& fields) const;
+ /**
+ * Returns a 'Container' populated with the field names of the object.
+ */
+ template <class Container>
+ Container getFieldNames() const;
/** Get the field of the specified name. eoo() is true on the returned
element if not found.
@@ -838,4 +841,16 @@ inline void BSONObj::getFields(const std::array<StringData, N>& fieldNames,
break;
}
}
+
+template <class Container>
+Container BSONObj::getFieldNames() const {
+ Container fields;
+ for (auto&& elem : *this) {
+ if (elem.eoo())
+ break;
+ fields.insert(elem.fieldName());
+ }
+ return fields;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 0c218665a46..6d391e461bf 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -240,6 +240,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands',
'$BUILD_DIR/mongo/db/curop_failpoint_helpers',
'$BUILD_DIR/mongo/db/ops/write_ops_exec',
+ '$BUILD_DIR/mongo/db/pipeline/mongod_process_interface',
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/db/repair_database',
'$BUILD_DIR/mongo/db/rw_concern_d',
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 60efa126c3c..3fa7c3ffa93 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/pipeline/expression.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
+#include "mongo/db/pipeline/mongod_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
@@ -490,7 +491,7 @@ Status runAggregate(OperationContext* opCtx,
new ExpressionContext(opCtx,
request,
std::move(*collatorToUse),
- std::make_shared<PipelineD::MongoDInterface>(opCtx),
+ std::make_shared<MongoDInterface>(opCtx),
uassertStatusOK(resolveInvolvedNamespaces(opCtx, request)),
uuid));
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 0195c38f596..2b53ed9c181 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -261,6 +261,41 @@ env.Library(
]
)
+env.Library(
+ target='mongo_process_common',
+ source=[
+ 'mongo_process_common.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_global',
+ ]
+)
+
+env.Library(
+ target='mongod_process_interface',
+ source=[
+ 'mongod_process_interface.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/query_exec',
+ '$BUILD_DIR/mongo/db/ops/write_ops_exec',
+ '$BUILD_DIR/mongo/db/stats/top',
+ 'mongo_process_common',
+ ]
+)
+
+env.Library(
+ target='mongos_process_interface',
+ source=[
+ 'mongos_process_interface.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/s/query/async_results_merger',
+ '$BUILD_DIR/mongo/s/commands/cluster_commands_helpers',
+ 'mongo_process_common',
+ ]
+)
+
pipelineeEnv = env.Clone()
pipelineeEnv.InjectThirdPartyIncludePaths(libraries=['snappy'])
pipelineeEnv.Library(
@@ -310,7 +345,6 @@ pipelineeEnv.Library(
"cluster_aggregation_planner.cpp",
'document_source_tee_consumer.cpp',
'document_source_unwind.cpp',
- 'mongo_process_common.cpp',
'pipeline.cpp',
'sequential_document_cache.cpp',
'tee_buffer.cpp',
@@ -333,7 +367,6 @@ pipelineeEnv.Library(
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/service_context',
'$BUILD_DIR/mongo/db/sessions_collection',
- '$BUILD_DIR/mongo/db/stats/top',
'$BUILD_DIR/mongo/db/storage/encryption_hooks',
'$BUILD_DIR/mongo/db/storage/storage_options',
'$BUILD_DIR/mongo/s/is_mongos',
diff --git a/src/mongo/db/pipeline/document.h b/src/mongo/db/pipeline/document.h
index 76d207276ce..21991ef483a 100644
--- a/src/mongo/db/pipeline/document.h
+++ b/src/mongo/db/pipeline/document.h
@@ -500,8 +500,9 @@ public:
/** Gets/Sets a nested field given a path.
*
- * All fields along path are created as empty Documents if they don't exist
- * or are any other type.
+ * All fields along path are created as empty Documents if they don't exist or are any other
+ * type. Does *not* traverse nested arrays when evaluating a nested path, instead returning
+ * Value() if the dotted field points to a nested object within an array.
*/
MutableValue getNestedField(const FieldPath& dottedField);
void setNestedField(const FieldPath& dottedField, const Value& val) {
diff --git a/src/mongo/db/pipeline/document_path_support.cpp b/src/mongo/db/pipeline/document_path_support.cpp
index 2ac115a7a3a..a2b1b11378d 100644
--- a/src/mongo/db/pipeline/document_path_support.cpp
+++ b/src/mongo/db/pipeline/document_path_support.cpp
@@ -151,13 +151,5 @@ BSONObj documentToBsonWithPaths(const Document& input, const std::set<std::strin
return outputBuilder.obj();
}
-Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths) {
- MutableDocument result;
- for (auto& field : paths) {
- result.addField(field.fullPath(), doc.getNestedField(field));
- }
- return result.freeze();
-}
-
} // namespace document_path_support
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_path_support.h b/src/mongo/db/pipeline/document_path_support.h
index d8dd894a76d..b493a4bcaf8 100644
--- a/src/mongo/db/pipeline/document_path_support.h
+++ b/src/mongo/db/pipeline/document_path_support.h
@@ -66,8 +66,18 @@ BSONObj documentToBsonWithPaths(const Document&, const std::set<std::string>& pa
/**
* Extracts 'paths' from the input document to a flat document.
+ *
+ * This does *not* traverse arrays when extracting values from dotted paths. For example, the path
+ * "a.b" will not populate the result document if the input document is {a: [{b: 1}]}.
*/
-Document extractDocumentKeyFromDoc(const Document& doc, const std::vector<FieldPath>& paths);
+template <class Container>
+Document extractPathsFromDoc(const Document& doc, const Container& paths) {
+ MutableDocument result;
+ for (const auto& field : paths) {
+ result.addField(field.fullPath(), doc.getNestedField(field));
+ }
+ return result.freeze();
+}
} // namespace document_path_support
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 06e3c6230bf..6b744558186 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -240,7 +240,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
case repl::OpTypeEnum::kInsert: {
operationType = DocumentSourceChangeStream::kInsertOpType;
fullDocument = input[repl::OplogEntry::kObjectFieldName];
- documentKey = Value(document_path_support::extractDocumentKeyFromDoc(
+ documentKey = Value(document_path_support::extractPathsFromDoc(
fullDocument.getDocument(), documentKeyFields));
break;
}
diff --git a/src/mongo/db/pipeline/document_source_group.cpp b/src/mongo/db/pipeline/document_source_group.cpp
index 867a9f8a539..fc5d6b1e2ee 100644
--- a/src/mongo/db/pipeline/document_source_group.cpp
+++ b/src/mongo/db/pipeline/document_source_group.cpp
@@ -686,8 +686,7 @@ boost::optional<BSONObj> DocumentSourceGroup::findRelevantInputSort() const {
// for every permutation of group by (a, b, c), since we are guaranteed that documents with
// the same value of (a, b, c) will be consecutive in the input stream, no matter what our
// _id is.
- std::set<std::string> fieldNames;
- obj.getFieldNames(fieldNames);
+ auto fieldNames = obj.getFieldNames<std::set<std::string>>();
if (fieldNames == deps.fields) {
return obj;
}
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index aac7f5d31c8..071c9bcb37f 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_out_gen.h"
#include "mongo/db/pipeline/document_source_out_in_place.h"
@@ -83,13 +84,6 @@ const char* DocumentSourceOut::getSourceName() const {
return "$out";
}
-void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) {
- BSONObj err = pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), toInsert);
- uassert(16996,
- str::stream() << "insert for $out failed: " << err,
- DBClientBase::getLastErrorString(err).empty());
-}
-
DocumentSource::GetNextResult DocumentSourceOut::getNext() {
pExpCtx->checkForInterrupt();
@@ -103,24 +97,31 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
}
// Insert all documents into temp collection, batching to perform vectored inserts.
- vector<BSONObj> bufferedObjects;
+ BatchedObjects batch;
int bufferedBytes = 0;
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
- BSONObj toInsert = nextInput.releaseDocument().toBson();
-
- bufferedBytes += toInsert.objsize();
- if (!bufferedObjects.empty() && (bufferedBytes > BSONObjMaxUserSize ||
- bufferedObjects.size() >= write_ops::kMaxWriteBatchSize)) {
- spill(bufferedObjects);
- bufferedObjects.clear();
- bufferedBytes = toInsert.objsize();
+ auto doc = nextInput.releaseDocument();
+
+ // Extract the unique key before converting the document to BSON.
+ auto uniqueKey = document_path_support::extractPathsFromDoc(doc, _uniqueKeyFields);
+ auto insertObj = doc.toBson();
+ uassert(50905,
+ str::stream() << "Failed to extract unique key from input document: " << insertObj,
+ uniqueKey.size() == _uniqueKeyFields.size());
+
+ bufferedBytes += insertObj.objsize();
+ if (!batch.empty() &&
+ (bufferedBytes > BSONObjMaxUserSize || batch.size() >= write_ops::kMaxWriteBatchSize)) {
+ spill(batch);
+ batch.clear();
+ bufferedBytes = insertObj.objsize();
}
- bufferedObjects.push_back(toInsert);
+ batch.emplace(std::move(insertObj), uniqueKey.toBson());
}
- if (!bufferedObjects.empty())
- spill(bufferedObjects);
+ if (!batch.empty())
+ spill(batch);
switch (nextInput.getStatus()) {
case GetNextResult::ReturnStatus::kAdvanced: {
@@ -144,12 +145,12 @@ DocumentSource::GetNextResult DocumentSourceOut::getNext() {
DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- boost::optional<Document> uniqueKey)
+ std::set<FieldPath> uniqueKey)
: DocumentSource(expCtx),
_done(false),
_outputNs(outputNs),
_mode(mode),
- _uniqueKey(uniqueKey) {}
+ _uniqueKeyFields(std::move(uniqueKey)) {}
intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
@@ -164,22 +165,24 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
readConcernLevel != repl::ReadConcernLevel::kMajorityReadConcern);
auto mode = WriteModeEnum::kModeReplaceCollection;
- boost::optional<Document> uniqueKey;
+ std::set<FieldPath> uniqueKey;
NamespaceString outputNs;
if (elem.type() == BSONType::String) {
outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str());
+ uniqueKey.emplace("_id");
} else if (elem.type() == BSONType::Object) {
auto spec =
DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject());
mode = spec.getMode();
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "$out is not currently supported with mode "
- << WriteMode_serializer(mode),
- mode != WriteModeEnum::kModeReplaceDocuments);
- if (auto uniqueKeyDoc = spec.getUniqueKey()) {
- uniqueKey = Document{{uniqueKeyDoc.get()}};
+ // Convert unique key object to a vector of FieldPaths.
+ if (auto uniqueKeyObj = spec.getUniqueKey()) {
+ uniqueKey = uniqueKeyObj->getFieldNames<std::set<FieldPath>>();
+ } else {
+ // TODO SERVER-35954: If not present, build the unique key from the shard key of the
+ // output collection.
+ uniqueKey.emplace("_id");
}
// Retrieve the target database from the user command, otherwise use the namespace from the
@@ -203,6 +206,8 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
return new DocumentSourceOutReplaceColl(outputNs, expCtx, mode, uniqueKey);
case WriteModeEnum::kModeInsertDocuments:
return new DocumentSourceOutInPlace(outputNs, expCtx, mode, uniqueKey);
+ case WriteModeEnum::kModeReplaceDocuments:
+ return new DocumentSourceOutInPlaceReplace(outputNs, expCtx, mode, uniqueKey);
default:
MONGO_UNREACHABLE;
}
@@ -213,9 +218,11 @@ Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> ex
Document{{DocumentSourceOutSpec::kTargetCollectionFieldName, _outputNs.coll()},
{DocumentSourceOutSpec::kTargetDbFieldName, _outputNs.db()},
{DocumentSourceOutSpec::kModeFieldName, WriteMode_serializer(_mode)}});
- if (_uniqueKey) {
- serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(_uniqueKey.get());
+ BSONObjBuilder uniqueKeyBob;
+ for (auto path : _uniqueKeyFields) {
+ uniqueKeyBob.append(path.fullPath(), 1);
}
+ serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(uniqueKeyBob.done());
return Value(Document{{getSourceName(), serialized.freeze()}});
}
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 18a44cd8ac3..5168346957b 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -44,7 +44,7 @@ public:
DocumentSourceOut(const NamespaceString& outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- boost::optional<Document> uniqueKey);
+ std::set<FieldPath> uniqueKey);
virtual ~DocumentSourceOut() = default;
@@ -86,6 +86,44 @@ public:
virtual void initializeWriteNs() = 0;
/**
+ * Storage for a batch of BSON Objects to be inserted/updated to the write namespace. The
+ * extracted unique key values are also stored in a batch, used by $out with mode
+ * "replaceDocuments" as the query portion of the update.
+ *
+ */
+ struct BatchedObjects {
+ void emplace(BSONObj obj, BSONObj key) {
+ objects.emplace_back(std::move(obj));
+ uniqueKeys.emplace_back(std::move(key));
+ }
+
+ bool empty() const {
+ return objects.empty();
+ }
+
+ size_t size() const {
+ return objects.size();
+ }
+
+ void clear() {
+ objects.clear();
+ uniqueKeys.clear();
+ }
+
+ std::vector<BSONObj> objects;
+ // Store the unique keys as BSON objects instead of Documents for compatibility with the
+ // batch update command. (e.g. {q: <array of uniqueKeys>, u: <array of objects>})
+ std::vector<BSONObj> uniqueKeys;
+ };
+
+ /**
+ * Writes the documents in 'batch' to the write namespace.
+ */
+ virtual void spill(const BatchedObjects& batch) {
+ pExpCtx->mongoProcessInterface->insert(pExpCtx, getWriteNs(), batch.objects);
+ };
+
+ /**
* Finalize the output collection, called when there are no more documents to write.
*/
virtual void finalize() = 0;
@@ -94,17 +132,16 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
private:
- /**
- * Inserts all of 'toInsert' into the collection returned from getWriteNs().
- */
- void spill(const std::vector<BSONObj>& toInsert);
-
bool _initialized = false;
bool _done = false;
const NamespaceString _outputNs;
WriteModeEnum _mode;
- boost::optional<Document> _uniqueKey;
+
+ // Holds the unique key used for uniquely identifying documents. There must exist a unique index
+ // with this key pattern (up to order). Default is "_id" for unsharded collections, and "_id"
+ // plus the shard key for sharded collections.
+ std::set<FieldPath> _uniqueKeyFields;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h
index 6bdb13ee395..fa28ae78cae 100644
--- a/src/mongo/db/pipeline/document_source_out_in_place.h
+++ b/src/mongo/db/pipeline/document_source_out_in_place.h
@@ -33,9 +33,10 @@
namespace mongo {
/**
- * Version of $out which writes directly to the output collection.
+ * Version of $out which performs inserts directly to the output collection, failing if there's a
+ * duplicate key.
*/
-class DocumentSourceOutInPlace final : public DocumentSourceOut {
+class DocumentSourceOutInPlace : public DocumentSourceOut {
public:
using DocumentSourceOut::DocumentSourceOut;
@@ -51,4 +52,23 @@ public:
void finalize() final{};
};
+
+/**
+ * Version of $out which replaces the documents in the output collection that match the unique key,
+ * or inserts the document if there is no match.
+ */
+class DocumentSourceOutInPlaceReplace final : public DocumentSourceOutInPlace {
+public:
+ using DocumentSourceOutInPlace::DocumentSourceOutInPlace;
+
+ void spill(const BatchedObjects& batch) final {
+ // Set upsert to true and multi to false as there should be at most one document to update
+ // or insert.
+ constexpr auto upsert = true;
+ constexpr auto multi = false;
+ pExpCtx->mongoProcessInterface->update(
+ pExpCtx, getWriteNs(), batch.uniqueKeys, batch.objects, upsert, multi);
+ }
+};
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp
index 412f49e7ab8..1c01b88ab35 100644
--- a/src/mongo/db/pipeline/document_source_out_test.cpp
+++ b/src/mongo/db/pipeline/document_source_out_test.cpp
@@ -40,13 +40,18 @@ namespace {
using boost::intrusive_ptr;
+StringData kModeFieldName = DocumentSourceOutSpec::kModeFieldName;
+StringData kUniqueKeyFieldName = DocumentSourceOutSpec::kUniqueKeyFieldName;
StringData kDefaultMode = WriteMode_serializer(WriteModeEnum::kModeReplaceCollection);
class DocumentSourceOutTest : public AggregationContextFixture {
public:
- intrusive_ptr<DocumentSource> createOutStage(BSONObj spec) {
+ intrusive_ptr<DocumentSourceOut> createOutStage(BSONObj spec) {
auto specElem = spec.firstElement();
- return DocumentSourceOut::createFromBson(specElem, getExpCtx());
+ intrusive_ptr<DocumentSourceOut> outStage = dynamic_cast<DocumentSourceOut*>(
+ DocumentSourceOut::createFromBson(specElem, getExpCtx()).get());
+ ASSERT_TRUE(outStage);
+ return outStage;
}
};
@@ -61,41 +66,124 @@ TEST_F(DocumentSourceOutTest, FailsToParseIncorrectType) {
TEST_F(DocumentSourceOutTest, AcceptsStringArgument) {
BSONObj spec = BSON("$out"
<< "some_collection");
- auto docSource = createOutStage(spec);
- auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get());
+ auto outStage = createOutStage(spec);
ASSERT_EQ(outStage->getOutputNs().coll(), "some_collection");
}
TEST_F(DocumentSourceOutTest, SerializeDefaultsModeRecreateCollection) {
BSONObj spec = BSON("$out"
<< "some_collection");
- auto docSource = createOutStage(spec);
- auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get());
+ auto outStage = createOutStage(spec);
auto serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][DocumentSourceOutSpec::kModeFieldName].getStringData(),
- kDefaultMode);
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
// Make sure we can reparse the serialized BSON.
- auto reparsedDocSource = createOutStage(serialized.toBson());
- auto reparsedOut = dynamic_cast<DocumentSourceOut*>(reparsedDocSource.get());
- auto reSerialized = reparsedOut->serialize().getDocument();
- ASSERT_EQ(reSerialized["$out"][DocumentSourceOutSpec::kModeFieldName].getStringData(),
- kDefaultMode);
+ auto reparsedOutStage = createOutStage(serialized.toBson());
+ auto reSerialized = reparsedOutStage->serialize().getDocument();
+ ASSERT_EQ(reSerialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
}
-TEST_F(DocumentSourceOutTest, SerializeUniqueKeyOnlyIfSpecified) {
+TEST_F(DocumentSourceOutTest, SerializeUniqueKeyDefaultsToId) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "target"
+ << "mode"
+ << kDefaultMode));
+ auto outStage = createOutStage(spec);
+ auto serialized = outStage->serialize().getDocument();
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
+ (Document{{"_id", 1}}));
+
+ spec = BSON("$out"
+ << "some_collection");
+ outStage = createOutStage(spec);
+ serialized = outStage->serialize().getDocument();
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
+ (Document{{"_id", 1}}));
+}
+
+TEST_F(DocumentSourceOutTest, SerializeCompoundUniqueKey) {
BSONObj spec = BSON("$out" << BSON("to"
<< "target"
<< "mode"
<< kDefaultMode
<< "uniqueKey"
<< BSON("_id" << 1 << "shardKey" << 1)));
- auto docSource = createOutStage(spec);
- auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get());
+ auto outStage = createOutStage(spec);
+ auto serialized = outStage->serialize().getDocument();
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
+ (Document{{"_id", 1}, {"shardKey", 1}}));
+}
+
+TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKey) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "target"
+ << "mode"
+ << kDefaultMode
+ << "uniqueKey"
+ << BSON("_id" << 1 << "a.b" << 1)));
+ auto outStage = createOutStage(spec);
+ auto serialized = outStage->serialize().getDocument();
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
+ (Document{{"_id", 1}, {"a.b", 1}}));
+
+ spec = BSON("$out" << BSON("to"
+ << "target"
+ << "mode"
+ << kDefaultMode
+ << "uniqueKey"
+ << BSON("_id.a" << 1)));
+ outStage = createOutStage(spec);
+ serialized = outStage->serialize().getDocument();
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
+ (Document{{"_id.a", 1}}));
+}
+
+TEST_F(DocumentSourceOutTest, SerializeDottedPathUniqueKeySharedPrefix) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "target"
+ << "mode"
+ << kDefaultMode
+ << "uniqueKey"
+ << BSON("_id" << 1 << "a.b" << 1 << "a.c" << 1)));
+ auto outStage = createOutStage(spec);
auto serialized = outStage->serialize().getDocument();
- ASSERT_EQ(serialized["$out"][DocumentSourceOutSpec::kModeFieldName].getStringData(),
- kDefaultMode);
- ASSERT_DOCUMENT_EQ(serialized["$out"][DocumentSourceOutSpec::kUniqueKeyFieldName].getDocument(),
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
+ (Document{{"_id", 1}, {"a.b", 1}, {"a.c", 1}}));
+}
+
+TEST_F(DocumentSourceOutTest, SerializeDuplicateUniqueKey) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "target"
+ << "mode"
+ << kDefaultMode
+ << "uniqueKey"
+ << BSON("_id" << 1 << "dupKey" << 1 << "dupKey" << 1)));
+ auto outStage = createOutStage(spec);
+ auto serialized = outStage->serialize().getDocument();
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
+ (Document{{"_id", 1}, {"dupKey", 1}}));
+}
+
+// TODO SERVER-36367: Nested objects should not be allowed in the uniqueKey spec.
+TEST_F(DocumentSourceOutTest, SerializeNestedObjectInUniqueKey) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "target"
+ << "mode"
+ << kDefaultMode
+ << "uniqueKey"
+ << BSON("_id" << 1 << "shardKey"
+ << BSON("subShardKey" << 1))));
+ auto outStage = createOutStage(spec);
+ auto serialized = outStage->serialize().getDocument();
+ ASSERT_EQ(serialized["$out"][kModeFieldName].getStringData(), kDefaultMode);
+ ASSERT_DOCUMENT_EQ(serialized["$out"][kUniqueKeyFieldName].getDocument(),
(Document{{"_id", 1}, {"shardKey", 1}}));
}
@@ -241,20 +329,10 @@ TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbIfSpecified) {
BSONObj spec =
BSON("$out" << BSON("to" << targetColl << "mode" << kDefaultMode << "db" << targetDb));
- auto docSource = createOutStage(spec);
- auto outStage = dynamic_cast<DocumentSourceOut*>(docSource.get());
+ auto outStage = createOutStage(spec);
ASSERT_EQ(outStage->getOutputNs().db(), targetDb);
ASSERT_EQ(outStage->getOutputNs().coll(), targetColl);
}
-TEST_F(DocumentSourceOutTest, ModeReplaceDocumentsNotSupported) {
- BSONObj spec = BSON("$out" << BSON("to"
- << "test"
- << "mode"
- << WriteMode_serializer(
- WriteModeEnum::kModeReplaceDocuments)));
- ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::InvalidOptions);
-}
-
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/pipeline/field_path.h b/src/mongo/db/pipeline/field_path.h
index 171bf89a5a3..64fa7fe0611 100644
--- a/src/mongo/db/pipeline/field_path.h
+++ b/src/mongo/db/pipeline/field_path.h
@@ -119,4 +119,8 @@ private:
// lookup.
std::vector<size_t> _fieldPathDotPosition;
};
+
+inline bool operator<(const FieldPath& lhs, const FieldPath& rhs) {
+ return lhs.fullPath() < rhs.fullPath();
+}
}
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index 3d0dafc306b..b650855d438 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -95,11 +95,22 @@ public:
virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0;
/**
- * Inserts 'objs' into 'ns' and returns the "detailed" last error object.
+ * Inserts 'objs' into 'ns' and throws a UserException if the insert fails.
*/
- virtual BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& objs) = 0;
+ virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) = 0;
+
+ /**
+ * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException
+ * if any of the updates fail.
+ */
+ virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi) = 0;
virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) = 0;
diff --git a/src/mongo/db/pipeline/mongod_process_interface.cpp b/src/mongo/db/pipeline/mongod_process_interface.cpp
new file mode 100644
index 00000000000..e80f65da3df
--- /dev/null
+++ b/src/mongo/db/pipeline/mongod_process_interface.cpp
@@ -0,0 +1,424 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/mongod_process_interface.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/catalog/uuid_catalog.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/ops/write_ops_exec.h"
+#include "mongo/db/ops/write_ops_gen.h"
+#include "mongo/db/pipeline/document_source_cursor.h"
+#include "mongo/db/pipeline/pipeline_d.h"
+#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/session_catalog.h"
+#include "mongo/db/stats/fill_locker_info.h"
+#include "mongo/db/stats/storage_stats.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+using boost::intrusive_ptr;
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using write_ops::Update;
+
+MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {}
+
+void MongoDInterface::setOperationContext(OperationContext* opCtx) {
+ _client.setOpCtx(opCtx);
+}
+
+DBClientBase* MongoDInterface::directClient() {
+ return &_client;
+}
+
+bool MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
+ AutoGetCollectionForReadCommand autoColl(opCtx, nss);
+ auto const css = CollectionShardingState::get(opCtx, nss);
+ return css->getMetadata(opCtx)->isSharded();
+}
+
+void MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) {
+ boost::optional<DisableDocumentValidation> maybeDisableValidation;
+ if (expCtx->bypassDocumentValidation)
+ maybeDisableValidation.emplace(expCtx->opCtx);
+
+ _client.insert(ns.ns(), objs);
+ uassert(16996,
+ str::stream() << "Insert failed: " << _client.getLastError(),
+ _client.getLastError().empty());
+}
+
+void MongoDInterface::update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi) {
+ BSONObjBuilder updateBob;
+ updateBob.append(Update::kCommandName, ns.coll());
+ updateBob.append(Update::kDbNameFieldName, ns.db());
+ updateBob.append(Update::kBypassDocumentValidationFieldName, expCtx->bypassDocumentValidation);
+
+ // Build the array of UpdateOp entries.
+ invariant(queries.size() == updates.size());
+ BSONArrayBuilder updatesArray;
+ for (size_t index = 0; index < queries.size(); ++index) {
+ updatesArray.append(BSON("q" << queries[index] << "u" << updates[index] << "multi" << multi
+ << "upsert"
+ << upsert));
+ }
+ updateBob.append(Update::kUpdatesFieldName, updatesArray.arr());
+
+ auto updateObj = updateBob.done();
+ auto writeResults =
+ performUpdates(expCtx->opCtx, Update::parse(IDLParserErrorContext("update"), updateObj));
+
+ // Verify that each of the update results is successful.
+ for (const auto& result : writeResults.results) {
+ uassert(50904,
+ str::stream() << "Update failed: " << result.getStatus(),
+ result.getStatus() == Status::OK());
+ }
+}
+
+CollectionIndexUsageMap MongoDInterface::getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) {
+ AutoGetCollectionForReadCommand autoColl(opCtx, ns);
+
+ Collection* collection = autoColl.getCollection();
+ if (!collection) {
+ LOG(2) << "Collection not found on index stats retrieval: " << ns.ns();
+ return CollectionIndexUsageMap();
+ }
+
+ return collection->infoCache()->getIndexUsageStats();
+}
+
+void MongoDInterface::appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const {
+ Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder);
+}
+
+Status MongoDInterface::appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const {
+ return appendCollectionStorageStats(opCtx, nss, param, builder);
+}
+
+Status MongoDInterface::appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const {
+ return appendCollectionRecordCount(opCtx, nss, builder);
+}
+
+BSONObj MongoDInterface::getCollectionOptions(const NamespaceString& nss) {
+ const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
+}
+
+void MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) {
+ Lock::GlobalWrite globalLock(opCtx);
+
+ uassert(ErrorCodes::CommandFailed,
+ str::stream() << "collection options of target collection " << targetNs.ns()
+ << " changed during processing. Original options: "
+ << originalCollectionOptions
+ << ", new options: "
+ << getCollectionOptions(targetNs),
+ SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions ==
+ getCollectionOptions(targetNs)));
+
+ auto currentIndexes = _client.getIndexSpecs(targetNs.ns());
+ uassert(ErrorCodes::CommandFailed,
+ str::stream() << "indexes of target collection " << targetNs.ns()
+ << " changed during processing.",
+ originalIndexes.size() == currentIndexes.size() &&
+ std::equal(originalIndexes.begin(),
+ originalIndexes.end(),
+ currentIndexes.begin(),
+ SimpleBSONObjComparator::kInstance.makeEqualTo()));
+
+ BSONObj info;
+ uassert(ErrorCodes::CommandFailed,
+ str::stream() << "renameCollection failed: " << info,
+ _client.runCommand("admin", renameCommandObj, info));
+}
+
+StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> MongoDInterface::makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts) {
+ auto pipeline = Pipeline::parse(rawPipeline, expCtx);
+ if (!pipeline.isOK()) {
+ return pipeline.getStatus();
+ }
+
+ if (opts.optimize) {
+ pipeline.getValue()->optimizePipeline();
+ }
+
+ Status cursorStatus = Status::OK();
+
+ if (opts.attachCursorSource) {
+ cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
+ }
+
+ return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
+}
+
+Status MongoDInterface::attachCursorSourceToPipeline(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
+ invariant(pipeline->getSources().empty() ||
+ !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
+
+ boost::optional<AutoGetCollectionForReadCommand> autoColl;
+ if (expCtx->uuid) {
+ try {
+ autoColl.emplace(expCtx->opCtx,
+ NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid});
+ } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
+ // The UUID doesn't exist anymore
+ return ex.toStatus();
+ }
+ } else {
+ autoColl.emplace(expCtx->opCtx, expCtx->ns);
+ }
+
+ // makePipeline() is only called to perform secondary aggregation requests and expects the
+ // collection representing the document source to be not-sharded. We confirm sharding state
+ // here to avoid taking a collection lock elsewhere for this purpose alone.
+ // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor
+ // until after we release the lock, leaving room for a collection to be sharded in-between.
+ auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns);
+ uassert(4567,
+ str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded",
+ !css->getMetadata(expCtx->opCtx)->isSharded());
+
+ PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
+
+ return Status::OK();
+}
+
+std::string MongoDInterface::getShardName(OperationContext* opCtx) const {
+ if (ShardingState::get(opCtx)->enabled()) {
+ return ShardingState::get(opCtx)->shardId().toString();
+ }
+
+ return std::string();
+}
+
+std::pair<std::vector<FieldPath>, bool> MongoDInterface::collectDocumentKeyFields(
+ OperationContext* opCtx, UUID uuid) const {
+ if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
+ return {{"_id"}, false}; // Nothing is sharded.
+ }
+
+ // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and
+ // mark the fields as final.
+ auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid);
+ if (nss.isEmpty()) {
+ return {{"_id"}, true};
+ }
+
+ // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache
+ // to determine whether the collection is sharded in the first place.
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+
+ const bool collectionIsSharded = catalogCache && [&]() {
+ auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss);
+ return routingInfo.isOK() && routingInfo.getValue().cm();
+ }();
+
+ // Collection exists and is not sharded, mark as not final.
+ if (!collectionIsSharded) {
+ return {{"_id"}, false};
+ }
+
+ auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IS);
+ return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx);
+ }();
+
+ // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated
+ // as sharded.
+ if (!scm->isSharded() || !scm->uuidMatches(uuid)) {
+ return {{"_id"}, false};
+ }
+
+ // Unpack the shard key.
+ std::vector<FieldPath> result;
+ bool gotId = false;
+ for (auto& field : scm->getKeyPatternFields()) {
+ result.emplace_back(field->dottedField());
+ gotId |= (result.back().fullPath() == "_id");
+ }
+ if (!gotId) { // If not part of the shard key, "_id" comes last.
+ result.emplace_back("_id");
+ }
+ // Collection is now sharded so the document key fields will never change, mark as final.
+ return {result, true};
+}
+
+std::vector<GenericCursor> MongoDInterface::getCursors(
+ const intrusive_ptr<ExpressionContext>& expCtx) const {
+ return CursorManager::getAllCursors(expCtx->opCtx);
+}
+
+boost::optional<Document> MongoDInterface::lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) {
+ invariant(!readConcern); // We don't currently support a read concern on mongod - it's only
+ // expected to be necessary on mongos.
+
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
+ try {
+ // Be sure to do the lookup using the collection default collation
+ auto foreignExpCtx = expCtx->copyWith(
+ nss,
+ collectionUUID,
+ _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
+ pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx));
+ } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
+ return boost::none;
+ }
+
+ auto lookedUpDocument = pipeline->getNext();
+ if (auto next = pipeline->getNext()) {
+ uasserted(ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "found more than one document with document key "
+ << documentKey.toString()
+ << " ["
+ << lookedUpDocument->toString()
+ << ", "
+ << next->toString()
+ << "]");
+ }
+ return lookedUpDocument;
+}
+
+BSONObj MongoDInterface::_reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ CurrentOpTruncateMode truncateOps) const {
+ BSONObjBuilder builder;
+
+ CurOp::reportCurrentOpForClient(
+ opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder);
+
+ OperationContext* clientOpCtx = client->getOperationContext();
+
+ if (clientOpCtx) {
+ if (auto opCtxSession = OperationContextSession::get(clientOpCtx)) {
+ opCtxSession->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder);
+ }
+
+ // Append lock stats before returning.
+ if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo()) {
+ fillLockerInfo(*lockerInfo, builder);
+ }
+ }
+
+ return builder.obj();
+}
+
+void MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
+ CurrentOpUserMode userMode,
+ std::vector<BSONObj>* ops) const {
+ auto sessionCatalog = SessionCatalog::get(opCtx);
+
+ const bool authEnabled =
+ AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled();
+
+ // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to
+ // create a pattern that will match against all authenticated usernames for the current client.
+ // If the user is listing ops for all users, we create an empty pattern; constructing an
+ // instance of SessionKiller::Matcher with this empty pattern will return all sessions.
+ auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers
+ ? makeSessionFilterForAuthenticatedUsers(opCtx)
+ : KillAllSessionsByPatternSet{{}});
+
+ sessionCatalog->scanSessions(opCtx,
+ {std::move(sessionFilter)},
+ [&](OperationContext* opCtx, Session* session) {
+ auto op = session->reportStashedState();
+ if (!op.isEmpty()) {
+ ops->emplace_back(op);
+ }
+ });
+}
+
+std::unique_ptr<CollatorInterface> MongoDInterface::_getCollectionDefaultCollator(
+ OperationContext* opCtx, StringData dbName, UUID collectionUUID) {
+ auto it = _collatorCache.find(collectionUUID);
+ if (it == _collatorCache.end()) {
+ auto collator = [&]() -> std::unique_ptr<CollatorInterface> {
+ AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS);
+ if (!autoColl.getCollection()) {
+ // This collection doesn't exist, so assume a nullptr default collation
+ return nullptr;
+ } else {
+ auto defaultCollator = autoColl.getCollection()->getDefaultCollator();
+ // Clone the collator so that we can safely use the pointer if the collection
+ // disappears right after we release the lock.
+ return defaultCollator ? defaultCollator->clone() : nullptr;
+ }
+ }();
+
+ it = _collatorCache.emplace(collectionUUID, std::move(collator)).first;
+ }
+
+ auto& collator = it->second;
+ return collator ? collator->clone() : nullptr;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongod_process_interface.h b/src/mongo/db/pipeline/mongod_process_interface.h
new file mode 100644
index 00000000000..2b9ab8d8f19
--- /dev/null
+++ b/src/mongo/db/pipeline/mongod_process_interface.h
@@ -0,0 +1,119 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/pipeline/mongo_process_common.h"
+#include "mongo/db/pipeline/pipeline.h"
+
+namespace mongo {
+
+/**
+ * Class to provide access to mongod-specific implementations of methods required by some
+ * document sources.
+ */
+class MongoDInterface final : public MongoProcessCommon {
+public:
+ MongoDInterface(OperationContext* opCtx);
+
+ void setOperationContext(OperationContext* opCtx) final;
+ DBClientBase* directClient() final;
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
+ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) final;
+ void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi) final;
+
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
+ void appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const final;
+ Status appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const final;
+ Status appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const final;
+ BSONObj getCollectionOptions(const NamespaceString& nss) final;
+ void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) final;
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions opts = MakePipelineOptions{}) final;
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final;
+ std::string getShardName(OperationContext* opCtx) const final;
+ std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext* opCtx,
+ UUID uuid) const final;
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final;
+ std::vector<GenericCursor> getCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+
+protected:
+ BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ CurrentOpTruncateMode truncateOps) const final;
+
+ void _reportCurrentOpsForIdleSessions(OperationContext* opCtx,
+ CurrentOpUserMode userMode,
+ std::vector<BSONObj>* ops) const final;
+
+private:
+ /**
+ * Looks up the collection default collator for the collection given by 'collectionUUID'. A
+ * collection's default collation is not allowed to change, so we cache the result to allow
+ * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr'
+ * if the collection does not exist or if the collection's default collation is the simple
+ * collation.
+ */
+ std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(OperationContext* opCtx,
+ StringData dbName,
+ UUID collectionUUID);
+
+ DBDirectClient _client;
+ std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 4d91ddfff08..7e3e451a63c 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -28,7 +28,7 @@
#include "mongo/platform/basic.h"
-#include "mongo/s/commands/pipeline_s.h"
+#include "mongo/db/pipeline/mongos_process_interface.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source.h"
@@ -90,7 +90,7 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(
} // namespace
-boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
+boost::optional<Document> MongoSInterface::lookupSingleDocument(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
UUID collectionUUID,
@@ -185,8 +185,9 @@ boost::optional<Document> PipelineS::MongoSInterface::lookupSingleDocument(
return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{});
}
-BSONObj PipelineS::MongoSInterface::_reportCurrentOpForClient(
- OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const {
+BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ CurrentOpTruncateMode truncateOps) const {
BSONObjBuilder builder;
CurOp::reportCurrentOpForClient(
@@ -195,7 +196,7 @@ BSONObj PipelineS::MongoSInterface::_reportCurrentOpForClient(
return builder.obj();
}
-std::vector<GenericCursor> PipelineS::MongoSInterface::getCursors(
+std::vector<GenericCursor> MongoSInterface::getCursors(
const intrusive_ptr<ExpressionContext>& expCtx) const {
invariant(hasGlobalServiceContext());
auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager();
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
new file mode 100644
index 00000000000..f01a3390deb
--- /dev/null
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -0,0 +1,152 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/pipeline/mongo_process_common.h"
+#include "mongo/db/pipeline/pipeline.h"
+
+namespace mongo {
+
+/**
+ * Class to provide access to mongos-specific implementations of methods required by some
+ * document sources.
+ */
+class MongoSInterface final : public MongoProcessCommon {
+public:
+ MongoSInterface() = default;
+
+ virtual ~MongoSInterface() = default;
+
+ void setOperationContext(OperationContext* opCtx) final {}
+
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final;
+
+ std::vector<GenericCursor> getCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+
+ DBClientBase* directClient() final {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi) final {
+ MONGO_UNREACHABLE;
+ }
+
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj getCollectionOptions(const NamespaceString& nss) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void renameIfOptionsAndIndexesHaveNotChanged(OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*,
+ UUID) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions) final {
+ MONGO_UNREACHABLE;
+ }
+
+protected:
+ BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
+ Client* client,
+ CurrentOpTruncateMode truncateOps) const final;
+
+ void _reportCurrentOpsForIdleSessions(OperationContext* opCtx,
+ CurrentOpUserMode userMode,
+ std::vector<BSONObj>* ops) const final {
+ // This implementation is a no-op, since mongoS does not maintain a SessionCatalog or
+ // hold stashed locks for idle sessions.
+ }
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f49b3dfdd8b..f3317c81b3b 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -33,24 +33,18 @@
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
-#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
-#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
-#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/multi_iterator.h"
#include "mongo/db/exec/shard_filter.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/index/index_access_method.h"
-#include "mongo/db/kill_sessions.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/document_source.h"
@@ -73,9 +67,6 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/session_catalog.h"
-#include "mongo/db/stats/fill_locker_info.h"
-#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
@@ -671,330 +662,4 @@ void PipelineD::getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats*
statsOut->usedDisk = usedDisk;
}
-PipelineD::MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {}
-
-void PipelineD::MongoDInterface::setOperationContext(OperationContext* opCtx) {
- _client.setOpCtx(opCtx);
-}
-
-DBClientBase* PipelineD::MongoDInterface::directClient() {
- return &_client;
-}
-
-bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- AutoGetCollectionForReadCommand autoColl(opCtx, nss);
- auto const css = CollectionShardingState::get(opCtx, nss);
- return css->getMetadata(opCtx)->isSharded();
-}
-
-BSONObj PipelineD::MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& objs) {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (expCtx->bypassDocumentValidation)
- maybeDisableValidation.emplace(expCtx->opCtx);
-
- _client.insert(ns.ns(), objs);
- return _client.getLastErrorDetailed();
-}
-
-CollectionIndexUsageMap PipelineD::MongoDInterface::getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) {
- AutoGetCollectionForReadCommand autoColl(opCtx, ns);
-
- Collection* collection = autoColl.getCollection();
- if (!collection) {
- LOG(2) << "Collection not found on index stats retrieval: " << ns.ns();
- return CollectionIndexUsageMap();
- }
-
- return collection->infoCache()->getIndexUsageStats();
-}
-
-void PipelineD::MongoDInterface::appendLatencyStats(OperationContext* opCtx,
- const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const {
- Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder);
-}
-
-Status PipelineD::MongoDInterface::appendStorageStats(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const {
- return appendCollectionStorageStats(opCtx, nss, param, builder);
-}
-
-Status PipelineD::MongoDInterface::appendRecordCount(OperationContext* opCtx,
- const NamespaceString& nss,
- BSONObjBuilder* builder) const {
- return appendCollectionRecordCount(opCtx, nss, builder);
-}
-
-BSONObj PipelineD::MongoDInterface::getCollectionOptions(const NamespaceString& nss) {
- const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
-}
-
-void PipelineD::MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged(
- OperationContext* opCtx,
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) {
- Lock::GlobalWrite globalLock(opCtx);
-
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "collection options of target collection " << targetNs.ns()
- << " changed during processing. Original options: "
- << originalCollectionOptions
- << ", new options: "
- << getCollectionOptions(targetNs),
- SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions ==
- getCollectionOptions(targetNs)));
-
- auto currentIndexes = _client.getIndexSpecs(targetNs.ns());
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "indexes of target collection " << targetNs.ns()
- << " changed during processing.",
- originalIndexes.size() == currentIndexes.size() &&
- std::equal(originalIndexes.begin(),
- originalIndexes.end(),
- currentIndexes.begin(),
- SimpleBSONObjComparator::kInstance.makeEqualTo()));
-
- BSONObj info;
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "renameCollection failed: " << info,
- _client.runCommand("admin", renameCommandObj, info));
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterface::makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
-
- if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
- }
-
- Status cursorStatus = Status::OK();
-
- if (opts.attachCursorSource) {
- cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
- }
-
- return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
-}
-
-Status PipelineD::MongoDInterface::attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
-
- boost::optional<AutoGetCollectionForReadCommand> autoColl;
- if (expCtx->uuid) {
- try {
- autoColl.emplace(expCtx->opCtx,
- NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid});
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
- // The UUID doesn't exist anymore
- return ex.toStatus();
- }
- } else {
- autoColl.emplace(expCtx->opCtx, expCtx->ns);
- }
-
- // makePipeline() is only called to perform secondary aggregation requests and expects the
- // collection representing the document source to be not-sharded. We confirm sharding state
- // here to avoid taking a collection lock elsewhere for this purpose alone.
- // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor
- // until after we release the lock, leaving room for a collection to be sharded in-between.
- auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns);
- uassert(4567,
- str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded",
- !css->getMetadata(expCtx->opCtx)->isSharded());
-
- PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
-
- return Status::OK();
-}
-
-std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) const {
- if (ShardingState::get(opCtx)->enabled()) {
- return ShardingState::get(opCtx)->shardId().toString();
- }
-
- return std::string();
-}
-
-std::pair<std::vector<FieldPath>, bool> PipelineD::MongoDInterface::collectDocumentKeyFields(
- OperationContext* opCtx, UUID uuid) const {
- if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
- return {{"_id"}, false}; // Nothing is sharded.
- }
-
- // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and
- // mark the fields as final.
- auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid);
- if (nss.isEmpty()) {
- return {{"_id"}, true};
- }
-
- // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache
- // to determine whether the collection is sharded in the first place.
- auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- const bool collectionIsSharded = catalogCache && [&]() {
- auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss);
- return routingInfo.isOK() && routingInfo.getValue().cm();
- }();
-
- // Collection exists and is not sharded, mark as not final.
- if (!collectionIsSharded) {
- return {{"_id"}, false};
- }
-
- auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata {
- AutoGetCollection autoColl(opCtx, nss, MODE_IS);
- return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx);
- }();
-
- // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated
- // as sharded.
- if (!scm->isSharded() || !scm->uuidMatches(uuid)) {
- return {{"_id"}, false};
- }
-
- // Unpack the shard key.
- std::vector<FieldPath> result;
- bool gotId = false;
- for (auto& field : scm->getKeyPatternFields()) {
- result.emplace_back(field->dottedField());
- gotId |= (result.back().fullPath() == "_id");
- }
- if (!gotId) { // If not part of the shard key, "_id" comes last.
- result.emplace_back("_id");
- }
- // Collection is now sharded so the document key fields will never change, mark as final.
- return {result, true};
-}
-
-std::vector<GenericCursor> PipelineD::MongoDInterface::getCursors(
- const intrusive_ptr<ExpressionContext>& expCtx) const {
- return CursorManager::getAllCursors(expCtx->opCtx);
-}
-
-boost::optional<Document> PipelineD::MongoDInterface::lookupSingleDocument(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) {
- invariant(!readConcern); // We don't currently support a read concern on mongod - it's only
- // expected to be necessary on mongos.
-
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
- try {
- // Be sure to do the lookup using the collection default collation
- auto foreignExpCtx = expCtx->copyWith(
- nss,
- collectionUUID,
- _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
- pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx));
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
- return boost::none;
- }
-
- auto lookedUpDocument = pipeline->getNext();
- if (auto next = pipeline->getNext()) {
- uasserted(ErrorCodes::TooManyMatchingDocuments,
- str::stream() << "found more than one document with document key "
- << documentKey.toString()
- << " ["
- << lookedUpDocument->toString()
- << ", "
- << next->toString()
- << "]");
- }
- return lookedUpDocument;
-}
-
-BSONObj PipelineD::MongoDInterface::_reportCurrentOpForClient(
- OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const {
- BSONObjBuilder builder;
-
- CurOp::reportCurrentOpForClient(
- opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder);
-
- OperationContext* clientOpCtx = client->getOperationContext();
-
- if (clientOpCtx) {
- if (auto opCtxSession = OperationContextSession::get(clientOpCtx)) {
- opCtxSession->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder);
- }
-
- // Append lock stats before returning.
- if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo()) {
- fillLockerInfo(*lockerInfo, builder);
- }
- }
-
- return builder.obj();
-}
-
-void PipelineD::MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
- CurrentOpUserMode userMode,
- std::vector<BSONObj>* ops) const {
- auto sessionCatalog = SessionCatalog::get(opCtx);
-
- const bool authEnabled =
- AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled();
-
- // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to
- // create a pattern that will match against all authenticated usernames for the current client.
- // If the user is listing ops for all users, we create an empty pattern; constructing an
- // instance of SessionKiller::Matcher with this empty pattern will return all sessions.
- auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers
- ? makeSessionFilterForAuthenticatedUsers(opCtx)
- : KillAllSessionsByPatternSet{{}});
-
- sessionCatalog->scanSessions(opCtx,
- {std::move(sessionFilter)},
- [&](OperationContext* opCtx, Session* session) {
- auto op = session->reportStashedState();
- if (!op.isEmpty()) {
- ops->emplace_back(op);
- }
- });
-}
-
-std::unique_ptr<CollatorInterface> PipelineD::MongoDInterface::_getCollectionDefaultCollator(
- OperationContext* opCtx, StringData dbName, UUID collectionUUID) {
- auto it = _collatorCache.find(collectionUUID);
- if (it == _collatorCache.end()) {
- auto collator = [&]() -> std::unique_ptr<CollatorInterface> {
- AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS);
- if (!autoColl.getCollection()) {
- // This collection doesn't exist, so assume a nullptr default collation
- return nullptr;
- } else {
- auto defaultCollator = autoColl.getCollection()->getDefaultCollator();
- // Clone the collator so that we can safely use the pointer if the collection
- // disappears right after we release the lock.
- return defaultCollator ? defaultCollator->clone() : nullptr;
- }
- }();
-
- it = _collatorCache.emplace(collectionUUID, std::move(collator)).first;
- }
-
- auto& collator = it->second;
- return collator ? collator->clone() : nullptr;
-}
-
} // namespace mongo
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 8732461fe56..5c3d0464462 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -32,11 +32,9 @@
#include <memory>
#include "mongo/bson/bsonobj.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/document_source_cursor.h"
-#include "mongo/db/pipeline/mongo_process_common.h"
#include "mongo/db/query/plan_executor.h"
namespace mongo {
@@ -62,79 +60,6 @@ struct DepsTracker;
*/
class PipelineD {
public:
- class MongoDInterface final : public MongoProcessCommon {
- public:
- MongoDInterface(OperationContext* opCtx);
-
- void setOperationContext(OperationContext* opCtx) final;
- DBClientBase* directClient() final;
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
- BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& objs) final;
- CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) final;
- void appendLatencyStats(OperationContext* opCtx,
- const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const final;
- Status appendStorageStats(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const final;
- Status appendRecordCount(OperationContext* opCtx,
- const NamespaceString& nss,
- BSONObjBuilder* builder) const final;
- BSONObj getCollectionOptions(const NamespaceString& nss) final;
- void renameIfOptionsAndIndexesHaveNotChanged(
- OperationContext* opCtx,
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) final;
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts = MakePipelineOptions{}) final;
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final;
- std::string getShardName(OperationContext* opCtx) const final;
- std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext* opCtx,
- UUID uuid) const final;
- boost::optional<Document> lookupSingleDocument(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) final;
- std::vector<GenericCursor> getCursors(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
-
- protected:
- BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
- Client* client,
- CurrentOpTruncateMode truncateOps) const final;
-
- void _reportCurrentOpsForIdleSessions(OperationContext* opCtx,
- CurrentOpUserMode userMode,
- std::vector<BSONObj>* ops) const final;
-
- private:
- /**
- * Looks up the collection default collator for the collection given by 'collectionUUID'. A
- * collection's default collation is not allowed to change, so we cache the result to allow
- * for quick lookups in the future. Looks up the collection by UUID, and returns 'nullptr'
- * if the collection does not exist or if the collection's default collation is the simple
- * collation.
- */
- std::unique_ptr<CollatorInterface> _getCollectionDefaultCollator(OperationContext* opCtx,
- StringData dbName,
- UUID collectionUUID);
-
- DBDirectClient _client;
- std::map<UUID, std::unique_ptr<const CollatorInterface>> _collatorCache;
- };
-
/**
* If the first stage in the pipeline does not generate its own output documents, attaches a
* cursor document source to the front of the pipeline which will output documents from the
diff --git a/src/mongo/db/pipeline/pipeline_test.cpp b/src/mongo/db/pipeline/pipeline_test.cpp
index bdfae082c72..9d28060c00d 100644
--- a/src/mongo/db/pipeline/pipeline_test.cpp
+++ b/src/mongo/db/pipeline/pipeline_test.cpp
@@ -2120,7 +2120,8 @@ class Out : public needsPrimaryShardMergerBase {
}
string mergePipeJson() {
return "[{$out: {to: 'outColl', db: 'a', mode: '" +
- WriteMode_serializer(WriteModeEnum::kModeReplaceCollection) + "'}}]";
+ WriteMode_serializer(WriteModeEnum::kModeReplaceCollection) +
+ "', uniqueKey: {_id: 1}}}]";
}
};
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index 6c7ed0b7138..3e80f90e3a5 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -57,9 +57,18 @@ public:
MONGO_UNREACHABLE;
}
- BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& objs) override {
+ void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) override {
+ MONGO_UNREACHABLE;
+ }
+
+ void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& queries,
+ const std::vector<BSONObj>& updates,
+ bool upsert,
+ bool multi) final {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/repl/idempotency_update_sequence_test.cpp b/src/mongo/db/repl/idempotency_update_sequence_test.cpp
index b9a251dabbb..11797d5a6a6 100644
--- a/src/mongo/db/repl/idempotency_update_sequence_test.cpp
+++ b/src/mongo/db/repl/idempotency_update_sequence_test.cpp
@@ -126,8 +126,7 @@ TEST(UpdateGenTest, UpdatesHaveValidPaths) {
FAIL(sb.str());
}
- std::set<std::string> argPaths;
- updateArg.getFieldNames(argPaths);
+ auto argPaths = updateArg.getFieldNames<std::set<std::string>>();
std::set<std::string> correctPaths{"a", "b", "a.0", "a.b", "b.0"};
for (auto path : argPaths) {
FieldRef pathRef(path);
@@ -165,8 +164,7 @@ TEST(UpdateGenTest, UpdatesAreNotAmbiguous) {
sb << "The generated update is not a $set or $unset BSONObj: " << update;
FAIL(sb.str());
}
- std::set<std::string> argPathsSet;
- updateArg.getFieldNames(argPathsSet);
+ auto argPathsSet = updateArg.getFieldNames<std::set<std::string>>();
std::vector<std::unique_ptr<FieldRef>> argPathsRefVec;
FieldRefSet pathRefSet;
@@ -211,8 +209,7 @@ TEST(UpdateGenTest, UpdatesPreserveDepthConstraint) {
setElem = update["$set"];
BSONObj updateArg = setElem.Obj();
- std::set<std::string> argPaths;
- updateArg.getFieldNames(argPaths);
+ auto argPaths = updateArg.getFieldNames<std::set<std::string>>();
for (auto path : argPaths) {
auto pathDepth = getPathDepth_forTest(path);
auto particularSetArgument = updateArg[path];
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index e4aa90e5524..60ba31793e0 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -99,7 +99,6 @@ env.Library(
'cluster_write_cmd.cpp',
'commands_public.cpp',
'kill_sessions_remote.cpp',
- 'pipeline_s.cpp',
'strategy.cpp',
env.Idlc('cluster_multicast.idl')[0],
],
@@ -118,6 +117,7 @@ env.Library(
'$BUILD_DIR/mongo/db/ftdc/ftdc_server',
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/pipeline/aggregation',
+ '$BUILD_DIR/mongo/db/pipeline/mongos_process_interface',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/db/views/views',
'$BUILD_DIR/mongo/executor/async_multicaster',
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 1aa3045ea5f..867c6932d21 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -48,6 +48,7 @@
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/lite_parsed_pipeline.h"
+#include "mongo/db/pipeline/mongos_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/collation/collator_factory_interface.h"
#include "mongo/db/query/cursor_response.h"
@@ -59,7 +60,6 @@
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
-#include "mongo/s/commands/pipeline_s.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_client_cursor_impl.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
@@ -781,7 +781,7 @@ boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext*
auto mergeCtx = new ExpressionContext(opCtx,
request,
std::move(collation),
- std::make_shared<PipelineS::MongoSInterface>(),
+ std::make_shared<MongoSInterface>(),
resolveInvolvedNamespaces(opCtx, litePipe),
uuid);
diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h
deleted file mode 100644
index ebbe8ab72ca..00000000000
--- a/src/mongo/s/commands/pipeline_s.h
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/mongo_process_common.h"
-#include "mongo/db/pipeline/pipeline.h"
-
-namespace mongo {
-/**
- * PipelineS is an extension of the Pipeline class to provide additional utility functions on
- * mongoS. For example, it can inject the pipeline with an implementation of MongoProcessInterface
- * which provides mongos-specific versions of methods required by some document sources.
- */
-class PipelineS {
-public:
- /**
- * Class to provide access to mongos-specific implementations of methods required by some
- * document sources.
- */
- class MongoSInterface final : public MongoProcessCommon {
- public:
- MongoSInterface() = default;
-
- virtual ~MongoSInterface() = default;
-
- void setOperationContext(OperationContext* opCtx) final {}
-
- boost::optional<Document> lookupSingleDocument(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) final;
-
- std::vector<GenericCursor> getCursors(
- const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
-
- DBClientBase* directClient() final {
- MONGO_UNREACHABLE;
- }
-
- bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
- MONGO_UNREACHABLE;
- }
-
- BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& objs) final {
- MONGO_UNREACHABLE;
- }
-
- CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) final {
- MONGO_UNREACHABLE;
- }
-
- void appendLatencyStats(OperationContext* opCtx,
- const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- Status appendStorageStats(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- Status appendRecordCount(OperationContext* opCtx,
- const NamespaceString& nss,
- BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- BSONObj getCollectionOptions(const NamespaceString& nss) final {
- MONGO_UNREACHABLE;
- }
-
- void renameIfOptionsAndIndexesHaveNotChanged(
- OperationContext* opCtx,
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) final {
- MONGO_UNREACHABLE;
- }
-
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- MONGO_UNREACHABLE;
- }
-
- std::string getShardName(OperationContext* opCtx) const final {
- MONGO_UNREACHABLE;
- }
-
- std::pair<std::vector<FieldPath>, bool> collectDocumentKeyFields(OperationContext*,
- UUID) const final {
- MONGO_UNREACHABLE;
- }
-
- StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions pipelineOptions) final {
- MONGO_UNREACHABLE;
- }
-
- protected:
- BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
- Client* client,
- CurrentOpTruncateMode truncateOps) const final;
-
- void _reportCurrentOpsForIdleSessions(OperationContext* opCtx,
- CurrentOpUserMode userMode,
- std::vector<BSONObj>* ops) const final {
- // This implementation is a no-op, since mongoS does not maintain a SessionCatalog or
- // hold stashed locks for idle sessions.
- }
- };
-
-private:
- PipelineS() = delete; // Should never be instantiated.
-};
-
-} // namespace mongo