diff options
author | James Wahlin <james@mongodb.com> | 2019-03-07 08:30:27 -0500 |
---|---|---|
committer | James Wahlin <james@mongodb.com> | 2019-04-11 14:59:55 -0400 |
commit | 6b47868e5a82822a21176db3a7d3abd2df429e1f (patch) | |
tree | ca2adddd4590b62cdefbc641d97cc58b5cc05479 /src/mongo/db/ops | |
parent | 95bb948f7e5e573ca1473ba43dd6fd8e53cb5f50 (diff) | |
download | mongo-6b47868e5a82822a21176db3a7d3abd2df429e1f.tar.gz |
SERVER-40381 Add the ability to specify a pipeline to an update command
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r-- | src/mongo/db/ops/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/ops/parsed_update.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/ops/update_request.h | 13 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops.idl | 10 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_parsers.cpp | 53 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_parsers.h | 82 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_parsers_test.cpp | 79 |
8 files changed, 250 insertions, 12 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript index 5ded7b6a151..727eed3a62f 100644 --- a/src/mongo/db/ops/SConscript +++ b/src/mongo/db/ops/SConscript @@ -35,6 +35,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/dbmessage', + '$BUILD_DIR/mongo/db/pipeline/aggregation_request', '$BUILD_DIR/mongo/idl/idl_parser', ], ) diff --git a/src/mongo/db/ops/parsed_update.cpp b/src/mongo/db/ops/parsed_update.cpp index 6958425529b..c4b7f40c8ee 100644 --- a/src/mongo/db/ops/parsed_update.cpp +++ b/src/mongo/db/ops/parsed_update.cpp @@ -56,6 +56,12 @@ Status ParsedUpdate::parseRequest() { invariant(_request->getProj().isEmpty() || _request->shouldReturnAnyDocs()); if (!_request->getCollation().isEmpty()) { + // TODO SERVER-40398: Remove once collation is supported and tested for pipeline updates. + uassert(ErrorCodes::NotImplemented, + "Collation is not yet supported for pipeline-style updates", + _request->getUpdateModification().type() != + write_ops::UpdateModification::Type::kPipeline); + auto collator = CollatorFactoryInterface::get(_opCtx->getServiceContext()) ->makeFromBSON(_request->getCollation()); if (!collator.isOK()) { @@ -145,7 +151,7 @@ void ParsedUpdate::parseUpdate() { _driver.setLogOp(true); _driver.setFromOplogApplication(_request->isFromOplogApplication()); - _driver.parse(_request->getUpdates(), _arrayFilters, _request->isMulti()); + _driver.parse(_request->getUpdateModification(), _arrayFilters, _request->isMulti()); } StatusWith<std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>>> diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h index 00249500343..587fdb1b718 100644 --- a/src/mongo/db/ops/update_request.h +++ b/src/mongo/db/ops/update_request.h @@ -33,6 +33,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/query/explain.h" #include "mongo/util/str.h" @@ -101,12 +102,12 @@ public: return _collation; } - inline void setUpdates(const BSONObj& updates) { - _updates = updates; + inline void setUpdateModification(const write_ops::UpdateModification& updateMod) { + _updateMod = updateMod; } - inline const BSONObj& getUpdates() const { - return _updates; + inline const write_ops::UpdateModification& getUpdateModification() const { + return _updateMod; } inline void setArrayFilters(const std::vector<BSONObj>& arrayFilters) { @@ -206,7 +207,7 @@ public: builder << " projection: " << _proj; builder << " sort: " << _sort; builder << " collation: " << _collation; - builder << " updates: " << _updates; + builder << " updateModification: " << _updateMod.toString(); builder << " stmtId: " << _stmtId; builder << " arrayFilters: ["; @@ -245,7 +246,7 @@ private: BSONObj _collation; // Contains the modifiers to apply to matched objects, or a replacement document. - BSONObj _updates; + write_ops::UpdateModification _updateMod; // Filters to specify which array elements should be updated. std::vector<BSONObj> _arrayFilters; diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index 800688c4b04..c570b799b36 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -44,6 +44,14 @@ types: serializer: "::mongo::write_ops::writeMultiDeleteProperty" deserializer: "::mongo::write_ops::readMultiDeleteProperty" + update_modification: + bson_serialization_type: any + description: "Holds the contents of the update command 'u' field, describing the + modifications to apply on update." + cpp_type: "mongo::write_ops::UpdateModification" + serializer: "mongo::write_ops::UpdateModification::serializeToBSON" + deserializer: "mongo::write_ops::UpdateModification::parseFromBSON" + structs: WriteCommandBase: @@ -91,7 +99,7 @@ structs: type: object u: description: "Set of modifications to apply." - type: object + type: update_modification arrayFilters: description: "Specifies which array elements an update modifier should apply to." type: array<object> diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index e7165c3da34..cc6d2fcd041 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -672,7 +672,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext* UpdateRequest request(ns); request.setQuery(op.getQ()); - request.setUpdates(op.getU()); + request.setUpdateModification(op.getU()); request.setCollation(write_ops::collationOf(op)); request.setStmtId(stmtId); request.setArrayFilters(write_ops::arrayFiltersOf(op)); @@ -736,6 +736,20 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who out.results.reserve(wholeOp.getUpdates().size()); for (auto&& singleOp : wholeOp.getUpdates()) { + if (singleOp.getU().type() == write_ops::UpdateModification::Type::kPipeline) { + // TODO SERVER-40400: Remove once bypassDocumentValidation is supported and tested for + // pipeline updates. + uassert(ErrorCodes::NotImplemented, + "bypassDocumentValidation is not yet supported for pipeline-style updates", + !wholeOp.getWriteCommandBase().getBypassDocumentValidation()); + + // TODO SERVER-40402: Remove once writeConcern is supported and tested for pipeline + // updates. + uassert(ErrorCodes::NotImplemented, + "writeConcern is not yet supported for pipeline-style updates", + opCtx->getWriteConcern().usedDefault); + } + const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { if (!txnParticipant.inMultiDocumentTransaction()) { diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp index 7d6a03ec977..c15aafce0d0 100644 --- a/src/mongo/db/ops/write_ops_parsers.cpp +++ b/src/mongo/db/ops/write_ops_parsers.cpp @@ -31,8 +31,10 @@ #include "mongo/db/ops/write_ops_parsers.h" +#include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/dbmessage.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" @@ -166,7 +168,8 @@ write_ops::Update UpdateOp::parseLegacy(const Message& msgRaw) { singleUpdate.setUpsert(flags & UpdateOption_Upsert); singleUpdate.setMulti(flags & UpdateOption_Multi); singleUpdate.setQ(msg.nextJsObj()); - singleUpdate.setU(msg.nextJsObj()); + singleUpdate.setU( + write_ops::UpdateModification::parseLegacyOpUpdateFromBSON(msg.nextJsObj())); return updates; }()); @@ -209,4 +212,52 @@ write_ops::Delete DeleteOp::parseLegacy(const Message& msgRaw) { return op; } +write_ops::UpdateModification::UpdateModification(BSONElement update) { + const auto type = update.type(); + if (type == BSONType::Object) { + _classicUpdate = update.Obj(); + _type = Type::kClassic; + return; + } + + uassert( + ErrorCodes::FailedToParse, "Update argument must be an object", getTestCommandsEnabled()); + + uassert(ErrorCodes::FailedToParse, + "Update argument must be either an object or an array", + type == BSONType::Array); + + _type = Type::kPipeline; + + _pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(update)); +} + +write_ops::UpdateModification::UpdateModification(const BSONObj& update) { + _classicUpdate = update; + _type = Type::kClassic; +} + +write_ops::UpdateModification write_ops::UpdateModification::parseFromBSON(BSONElement elem) { + return UpdateModification(elem); +} + +write_ops::UpdateModification write_ops::UpdateModification::parseLegacyOpUpdateFromBSON( + const BSONObj& obj) { + return UpdateModification(obj); +} + +void write_ops::UpdateModification::serializeToBSON(StringData fieldName, + BSONObjBuilder* bob) const { + if (_type == Type::kClassic) { + *bob << fieldName << *_classicUpdate; + return; + } + + BSONArrayBuilder arrayBuilder(bob->subarrayStart(fieldName)); + for (auto&& stage : *_pipeline) { + arrayBuilder << stage; + } + arrayBuilder.doneFast(); +} + } // namespace mongo diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index fba5f5aa9b1..77ccb46eded 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -32,10 +32,16 @@ #include "mongo/base/string_data.h" #include "mongo/bson/bsonelement.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/pipeline/value.h" namespace mongo { namespace write_ops { +// Conservative per array element overhead. This value was calculated as 1 byte (element type) + 5 +// bytes (max string encoding of the array index encoded as string and the maximum key is 99999) + 1 +// byte (zero terminator) = 7 bytes +constexpr int kBSONArrayPerElementOverheadBytes = 7; + /** * Parses the 'limit' property of a delete entry, which has inverted meaning from the 'multi' * property of an update. @@ -47,5 +53,81 @@ bool readMultiDeleteProperty(const BSONElement& limitElement); */ void writeMultiDeleteProperty(bool isMulti, StringData fieldName, BSONObjBuilder* builder); +class UpdateModification { +public: + enum class Type { kClassic, kPipeline }; + + static StringData typeToString(Type type) { + return (type == Type::kClassic ? "Classic"_sd : "Pipeline"_sd); + } + + UpdateModification() = default; + UpdateModification(BSONElement update); + + // This constructor exists only to provide a fast-path for constructing classic-style updates. + UpdateModification(const BSONObj& update); + + + /** + * These methods support IDL parsing of the "u" field from the update command and OP_UPDATE. + */ + static UpdateModification parseFromBSON(BSONElement elem); + void serializeToBSON(StringData fieldName, BSONObjBuilder* bob) const; + + // When parsing from legacy OP_UPDATE messages, we receive the "u" field as an object. When an + // array is parsed, we receive it as an object with numeric fields names and can't differentiate + // between a user constructed object and an array. For that reason, we don't support pipeline + // style update via OP_UPDATE and 'obj' is assumed to be a classic update. + // + // If a user did send a pipeline-style update via OP_UPDATE, it would fail parsing a field + // representing an aggregation stage, due to the leading '$'' character. + static UpdateModification parseLegacyOpUpdateFromBSON(const BSONObj& obj); + + int objsize() const { + if (_type == Type::kClassic) { + return _classicUpdate->objsize(); + } + + int size = 0; + std::for_each(_pipeline->begin(), _pipeline->end(), [&size](const BSONObj& obj) { + size += obj.objsize() + kBSONArrayPerElementOverheadBytes; + }); + + return size + kBSONArrayPerElementOverheadBytes; + } + + Type type() const { + return _type; + } + + BSONObj getUpdateClassic() const { + invariant(_type == Type::kClassic); + return *_classicUpdate; + } + + const std::vector<BSONObj>& getUpdatePipeline() const { + invariant(_type == Type::kPipeline); + return *_pipeline; + } + + std::string toString() const { + StringBuilder sb; + sb << "{type: " << typeToString(_type) << ", update: "; + + if (_type == Type::kClassic) { + sb << *_classicUpdate << "}"; + } else { + sb << Value(*_pipeline).toString(); + } + + return sb.str(); + } + +private: + Type _type = Type::kClassic; + boost::optional<BSONObj> _classicUpdate; + boost::optional<std::vector<BSONObj>> _pipeline; +}; + } // namespace write_ops } // namespace mongo diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp index e014a35b7d2..4c9b98d252b 100644 --- a/src/mongo/db/ops/write_ops_parsers_test.cpp +++ b/src/mongo/db/ops/write_ops_parsers_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/catalog/document_validation.h" +#include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/dbmessage.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/ops/write_ops_parsers.h" @@ -338,7 +339,11 @@ TEST(CommandWriteOpsParsers, Update) { ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true); ASSERT_EQ(op.getUpdates().size(), 1u); ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query); - ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU(), update); + + const auto& updateMod = op.getUpdates()[0].getU(); + ASSERT(updateMod.type() == write_ops::UpdateModification::Type::kClassic); + ASSERT_BSONOBJ_EQ(updateMod.getUpdateClassic(), update); + ASSERT_BSONOBJ_EQ(write_ops::collationOf(op.getUpdates()[0]), collation); ASSERT_EQ(write_ops::arrayFiltersOf(op.getUpdates()[0]).size(), 1u); ASSERT_BSONOBJ_EQ(write_ops::arrayFiltersOf(op.getUpdates()[0]).front(), @@ -351,6 +356,46 @@ TEST(CommandWriteOpsParsers, Update) { } } +TEST(CommandWriteOpsParsers, UpdateWithPipeline) { + // TODO SERVER-40419: Remove 'setTestCommandsEnable(true)' for this test. + setTestCommandsEnabled(true); + const auto ns = NamespaceString("test", "foo"); + const BSONObj query = BSON("q" << BSON("x" << 1)); + std::vector<BSONObj> pipeline{BSON("$addFields" << BSON("x" << 1))}; + const BSONObj update = BSON("u" << pipeline); + const BSONObj collation = BSON("locale" + << "en_US"); + for (bool upsert : {false, true}) { + for (bool multi : {false, true}) { + auto rawUpdate = BSON( + "q" << query["q"] << "u" << update["u"] << "multi" << multi << "upsert" << upsert + << "collation" + << collation); + auto cmd = BSON("update" << ns.coll() << "updates" << BSON_ARRAY(rawUpdate)); + for (bool seq : {false, true}) { + auto request = toOpMsg(ns.db(), cmd, seq); + auto op = UpdateOp::parse(request); + ASSERT_EQ(op.getNamespace().ns(), ns.ns()); + ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation()); + ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true); + ASSERT_EQ(op.getUpdates().size(), 1u); + ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query["q"].Obj()); + + const auto& updateMod = op.getUpdates()[0].getU(); + const auto& updateModPipeline = updateMod.getUpdatePipeline(); + ASSERT(updateMod.type() == write_ops::UpdateModification::Type::kPipeline); + ASSERT_EQ(updateModPipeline.size(), 1u); + ASSERT_BSONOBJ_EQ(updateModPipeline[0], pipeline[0]); + + ASSERT_BSONOBJ_EQ(write_ops::collationOf(op.getUpdates()[0]), collation); + ASSERT_EQ(op.getUpdates()[0].getUpsert(), upsert); + ASSERT_EQ(op.getUpdates()[0].getMulti(), multi); + ASSERT_BSONOBJ_EQ(op.getUpdates()[0].toBSON(), rawUpdate); + } + } + } +} + TEST(CommandWriteOpsParsers, Remove) { const auto ns = NamespaceString("test", "foo"); const BSONObj query = BSON("x" << 1); @@ -451,7 +496,37 @@ TEST(LegacyWriteOpsParsers, Update) { ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true); ASSERT_EQ(op.getUpdates().size(), 1u); ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query); - ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU(), update); + ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU().getUpdateClassic(), update); + ASSERT_EQ(op.getUpdates()[0].getUpsert(), upsert); + ASSERT_EQ(op.getUpdates()[0].getMulti(), multi); + } + } +} + +// When parsing from legacy OP_UPDATE messages, we receive the "u" field as an object. When an array +// is parsed, we receive it as an object with numeric fields names and can't differentiate between a +// user constructed object and an array. For that reason, we parse as a classic-style update rather +// than as pipeline-style. +TEST(LegacyWriteOpsParsers, UpdateWithArrayUpdateFieldIsParsedAsReplacementStyleUpdate) { + const std::string ns = "test.foo"; + const BSONObj query = BSON("x" << 1); + const BSONObj update = BSON_ARRAY(BSON("$addFields" << BSON("x" << 1))); + for (bool upsert : {false, true}) { + for (bool multi : {false, true}) { + auto message = makeUpdateMessage(ns, + query, + update, + (upsert ? UpdateOption_Upsert : 0) | + (multi ? UpdateOption_Multi : 0)); + const auto op = UpdateOp::parseLegacy(message); + ASSERT_EQ(op.getNamespace().ns(), ns); + ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation()); + ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true); + ASSERT_EQ(op.getUpdates().size(), 1u); + ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query); + ASSERT(op.getUpdates()[0].getU().type() == + write_ops::UpdateModification::Type::kClassic); + ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU().getUpdateClassic(), update); ASSERT_EQ(op.getUpdates()[0].getUpsert(), upsert); ASSERT_EQ(op.getUpdates()[0].getMulti(), multi); } |