summaryrefslogtreecommitdiff
path: root/src/mongo/db/ops
diff options
context:
space:
mode:
authorJames Wahlin <james@mongodb.com>2019-03-07 08:30:27 -0500
committerJames Wahlin <james@mongodb.com>2019-04-11 14:59:55 -0400
commit6b47868e5a82822a21176db3a7d3abd2df429e1f (patch)
treeca2adddd4590b62cdefbc641d97cc58b5cc05479 /src/mongo/db/ops
parent95bb948f7e5e573ca1473ba43dd6fd8e53cb5f50 (diff)
downloadmongo-6b47868e5a82822a21176db3a7d3abd2df429e1f.tar.gz
SERVER-40381 Add the ability to specify a pipeline to an update command
Diffstat (limited to 'src/mongo/db/ops')
-rw-r--r--src/mongo/db/ops/SConscript1
-rw-r--r--src/mongo/db/ops/parsed_update.cpp8
-rw-r--r--src/mongo/db/ops/update_request.h13
-rw-r--r--src/mongo/db/ops/write_ops.idl10
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp16
-rw-r--r--src/mongo/db/ops/write_ops_parsers.cpp53
-rw-r--r--src/mongo/db/ops/write_ops_parsers.h82
-rw-r--r--src/mongo/db/ops/write_ops_parsers_test.cpp79
8 files changed, 250 insertions, 12 deletions
diff --git a/src/mongo/db/ops/SConscript b/src/mongo/db/ops/SConscript
index 5ded7b6a151..727eed3a62f 100644
--- a/src/mongo/db/ops/SConscript
+++ b/src/mongo/db/ops/SConscript
@@ -35,6 +35,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/dbmessage',
+ '$BUILD_DIR/mongo/db/pipeline/aggregation_request',
'$BUILD_DIR/mongo/idl/idl_parser',
],
)
diff --git a/src/mongo/db/ops/parsed_update.cpp b/src/mongo/db/ops/parsed_update.cpp
index 6958425529b..c4b7f40c8ee 100644
--- a/src/mongo/db/ops/parsed_update.cpp
+++ b/src/mongo/db/ops/parsed_update.cpp
@@ -56,6 +56,12 @@ Status ParsedUpdate::parseRequest() {
invariant(_request->getProj().isEmpty() || _request->shouldReturnAnyDocs());
if (!_request->getCollation().isEmpty()) {
+ // TODO SERVER-40398: Remove once collation is supported and tested for pipeline updates.
+ uassert(ErrorCodes::NotImplemented,
+ "Collation is not yet supported for pipeline-style updates",
+ _request->getUpdateModification().type() !=
+ write_ops::UpdateModification::Type::kPipeline);
+
auto collator = CollatorFactoryInterface::get(_opCtx->getServiceContext())
->makeFromBSON(_request->getCollation());
if (!collator.isOK()) {
@@ -145,7 +151,7 @@ void ParsedUpdate::parseUpdate() {
_driver.setLogOp(true);
_driver.setFromOplogApplication(_request->isFromOplogApplication());
- _driver.parse(_request->getUpdates(), _arrayFilters, _request->isMulti());
+ _driver.parse(_request->getUpdateModification(), _arrayFilters, _request->isMulti());
}
StatusWith<std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>>>
diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h
index 00249500343..587fdb1b718 100644
--- a/src/mongo/db/ops/update_request.h
+++ b/src/mongo/db/ops/update_request.h
@@ -33,6 +33,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/explain.h"
#include "mongo/util/str.h"
@@ -101,12 +102,12 @@ public:
return _collation;
}
- inline void setUpdates(const BSONObj& updates) {
- _updates = updates;
+ inline void setUpdateModification(const write_ops::UpdateModification& updateMod) {
+ _updateMod = updateMod;
}
- inline const BSONObj& getUpdates() const {
- return _updates;
+ inline const write_ops::UpdateModification& getUpdateModification() const {
+ return _updateMod;
}
inline void setArrayFilters(const std::vector<BSONObj>& arrayFilters) {
@@ -206,7 +207,7 @@ public:
builder << " projection: " << _proj;
builder << " sort: " << _sort;
builder << " collation: " << _collation;
- builder << " updates: " << _updates;
+ builder << " updateModification: " << _updateMod.toString();
builder << " stmtId: " << _stmtId;
builder << " arrayFilters: [";
@@ -245,7 +246,7 @@ private:
BSONObj _collation;
// Contains the modifiers to apply to matched objects, or a replacement document.
- BSONObj _updates;
+ write_ops::UpdateModification _updateMod;
// Filters to specify which array elements should be updated.
std::vector<BSONObj> _arrayFilters;
diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl
index 800688c4b04..c570b799b36 100644
--- a/src/mongo/db/ops/write_ops.idl
+++ b/src/mongo/db/ops/write_ops.idl
@@ -44,6 +44,14 @@ types:
serializer: "::mongo::write_ops::writeMultiDeleteProperty"
deserializer: "::mongo::write_ops::readMultiDeleteProperty"
+ update_modification:
+ bson_serialization_type: any
+ description: "Holds the contents of the update command 'u' field, describing the
+ modifications to apply on update."
+ cpp_type: "mongo::write_ops::UpdateModification"
+ serializer: "mongo::write_ops::UpdateModification::serializeToBSON"
+ deserializer: "mongo::write_ops::UpdateModification::parseFromBSON"
+
structs:
WriteCommandBase:
@@ -91,7 +99,7 @@ structs:
type: object
u:
description: "Set of modifications to apply."
- type: object
+ type: update_modification
arrayFilters:
description: "Specifies which array elements an update modifier should apply to."
type: array<object>
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index e7165c3da34..cc6d2fcd041 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -672,7 +672,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(OperationContext*
UpdateRequest request(ns);
request.setQuery(op.getQ());
- request.setUpdates(op.getU());
+ request.setUpdateModification(op.getU());
request.setCollation(write_ops::collationOf(op));
request.setStmtId(stmtId);
request.setArrayFilters(write_ops::arrayFiltersOf(op));
@@ -736,6 +736,20 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
out.results.reserve(wholeOp.getUpdates().size());
for (auto&& singleOp : wholeOp.getUpdates()) {
+ if (singleOp.getU().type() == write_ops::UpdateModification::Type::kPipeline) {
+ // TODO SERVER-40400: Remove once bypassDocumentValidation is supported and tested for
+ // pipeline updates.
+ uassert(ErrorCodes::NotImplemented,
+ "bypassDocumentValidation is not yet supported for pipeline-style updates",
+ !wholeOp.getWriteCommandBase().getBypassDocumentValidation());
+
+ // TODO SERVER-40402: Remove once writeConcern is supported and tested for pipeline
+ // updates.
+ uassert(ErrorCodes::NotImplemented,
+ "writeConcern is not yet supported for pipeline-style updates",
+ opCtx->getWriteConcern().usedDefault);
+ }
+
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
if (!txnParticipant.inMultiDocumentTransaction()) {
diff --git a/src/mongo/db/ops/write_ops_parsers.cpp b/src/mongo/db/ops/write_ops_parsers.cpp
index 7d6a03ec977..c15aafce0d0 100644
--- a/src/mongo/db/ops/write_ops_parsers.cpp
+++ b/src/mongo/db/ops/write_ops_parsers.cpp
@@ -31,8 +31,10 @@
#include "mongo/db/ops/write_ops_parsers.h"
+#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
@@ -166,7 +168,8 @@ write_ops::Update UpdateOp::parseLegacy(const Message& msgRaw) {
singleUpdate.setUpsert(flags & UpdateOption_Upsert);
singleUpdate.setMulti(flags & UpdateOption_Multi);
singleUpdate.setQ(msg.nextJsObj());
- singleUpdate.setU(msg.nextJsObj());
+ singleUpdate.setU(
+ write_ops::UpdateModification::parseLegacyOpUpdateFromBSON(msg.nextJsObj()));
return updates;
}());
@@ -209,4 +212,52 @@ write_ops::Delete DeleteOp::parseLegacy(const Message& msgRaw) {
return op;
}
+write_ops::UpdateModification::UpdateModification(BSONElement update) {
+ const auto type = update.type();
+ if (type == BSONType::Object) {
+ _classicUpdate = update.Obj();
+ _type = Type::kClassic;
+ return;
+ }
+
+ uassert(
+ ErrorCodes::FailedToParse, "Update argument must be an object", getTestCommandsEnabled());
+
+ uassert(ErrorCodes::FailedToParse,
+ "Update argument must be either an object or an array",
+ type == BSONType::Array);
+
+ _type = Type::kPipeline;
+
+ _pipeline = uassertStatusOK(AggregationRequest::parsePipelineFromBSON(update));
+}
+
+write_ops::UpdateModification::UpdateModification(const BSONObj& update) {
+ _classicUpdate = update;
+ _type = Type::kClassic;
+}
+
+write_ops::UpdateModification write_ops::UpdateModification::parseFromBSON(BSONElement elem) {
+ return UpdateModification(elem);
+}
+
+write_ops::UpdateModification write_ops::UpdateModification::parseLegacyOpUpdateFromBSON(
+ const BSONObj& obj) {
+ return UpdateModification(obj);
+}
+
+void write_ops::UpdateModification::serializeToBSON(StringData fieldName,
+ BSONObjBuilder* bob) const {
+ if (_type == Type::kClassic) {
+ *bob << fieldName << *_classicUpdate;
+ return;
+ }
+
+ BSONArrayBuilder arrayBuilder(bob->subarrayStart(fieldName));
+ for (auto&& stage : *_pipeline) {
+ arrayBuilder << stage;
+ }
+ arrayBuilder.doneFast();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h
index fba5f5aa9b1..77ccb46eded 100644
--- a/src/mongo/db/ops/write_ops_parsers.h
+++ b/src/mongo/db/ops/write_ops_parsers.h
@@ -32,10 +32,16 @@
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonelement.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/pipeline/value.h"
namespace mongo {
namespace write_ops {
+// Conservative per array element overhead. This value was calculated as 1 byte (element type) + 5
+// bytes (max string encoding of the array index encoded as string and the maximum key is 99999) + 1
+// byte (zero terminator) = 7 bytes
+constexpr int kBSONArrayPerElementOverheadBytes = 7;
+
/**
* Parses the 'limit' property of a delete entry, which has inverted meaning from the 'multi'
* property of an update.
@@ -47,5 +53,81 @@ bool readMultiDeleteProperty(const BSONElement& limitElement);
*/
void writeMultiDeleteProperty(bool isMulti, StringData fieldName, BSONObjBuilder* builder);
+class UpdateModification {
+public:
+ enum class Type { kClassic, kPipeline };
+
+ static StringData typeToString(Type type) {
+ return (type == Type::kClassic ? "Classic"_sd : "Pipeline"_sd);
+ }
+
+ UpdateModification() = default;
+ UpdateModification(BSONElement update);
+
+ // This constructor exists only to provide a fast-path for constructing classic-style updates.
+ UpdateModification(const BSONObj& update);
+
+
+ /**
+ * These methods support IDL parsing of the "u" field from the update command and OP_UPDATE.
+ */
+ static UpdateModification parseFromBSON(BSONElement elem);
+ void serializeToBSON(StringData fieldName, BSONObjBuilder* bob) const;
+
+ // When parsing from legacy OP_UPDATE messages, we receive the "u" field as an object. When an
+ // array is parsed, we receive it as an object with numeric fields names and can't differentiate
+ // between a user constructed object and an array. For that reason, we don't support pipeline
+ // style update via OP_UPDATE and 'obj' is assumed to be a classic update.
+ //
+ // If a user did send a pipeline-style update via OP_UPDATE, it would fail parsing a field
+ // representing an aggregation stage, due to the leading '$'' character.
+ static UpdateModification parseLegacyOpUpdateFromBSON(const BSONObj& obj);
+
+ int objsize() const {
+ if (_type == Type::kClassic) {
+ return _classicUpdate->objsize();
+ }
+
+ int size = 0;
+ std::for_each(_pipeline->begin(), _pipeline->end(), [&size](const BSONObj& obj) {
+ size += obj.objsize() + kBSONArrayPerElementOverheadBytes;
+ });
+
+ return size + kBSONArrayPerElementOverheadBytes;
+ }
+
+ Type type() const {
+ return _type;
+ }
+
+ BSONObj getUpdateClassic() const {
+ invariant(_type == Type::kClassic);
+ return *_classicUpdate;
+ }
+
+ const std::vector<BSONObj>& getUpdatePipeline() const {
+ invariant(_type == Type::kPipeline);
+ return *_pipeline;
+ }
+
+ std::string toString() const {
+ StringBuilder sb;
+ sb << "{type: " << typeToString(_type) << ", update: ";
+
+ if (_type == Type::kClassic) {
+ sb << *_classicUpdate << "}";
+ } else {
+ sb << Value(*_pipeline).toString();
+ }
+
+ return sb.str();
+ }
+
+private:
+ Type _type = Type::kClassic;
+ boost::optional<BSONObj> _classicUpdate;
+ boost::optional<std::vector<BSONObj>> _pipeline;
+};
+
} // namespace write_ops
} // namespace mongo
diff --git a/src/mongo/db/ops/write_ops_parsers_test.cpp b/src/mongo/db/ops/write_ops_parsers_test.cpp
index e014a35b7d2..4c9b98d252b 100644
--- a/src/mongo/db/ops/write_ops_parsers_test.cpp
+++ b/src/mongo/db/ops/write_ops_parsers_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/ops/write_ops_parsers.h"
@@ -338,7 +339,11 @@ TEST(CommandWriteOpsParsers, Update) {
ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true);
ASSERT_EQ(op.getUpdates().size(), 1u);
ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query);
- ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU(), update);
+
+ const auto& updateMod = op.getUpdates()[0].getU();
+ ASSERT(updateMod.type() == write_ops::UpdateModification::Type::kClassic);
+ ASSERT_BSONOBJ_EQ(updateMod.getUpdateClassic(), update);
+
ASSERT_BSONOBJ_EQ(write_ops::collationOf(op.getUpdates()[0]), collation);
ASSERT_EQ(write_ops::arrayFiltersOf(op.getUpdates()[0]).size(), 1u);
ASSERT_BSONOBJ_EQ(write_ops::arrayFiltersOf(op.getUpdates()[0]).front(),
@@ -351,6 +356,46 @@ TEST(CommandWriteOpsParsers, Update) {
}
}
+TEST(CommandWriteOpsParsers, UpdateWithPipeline) {
+ // TODO SERVER-40419: Remove 'setTestCommandsEnable(true)' for this test.
+ setTestCommandsEnabled(true);
+ const auto ns = NamespaceString("test", "foo");
+ const BSONObj query = BSON("q" << BSON("x" << 1));
+ std::vector<BSONObj> pipeline{BSON("$addFields" << BSON("x" << 1))};
+ const BSONObj update = BSON("u" << pipeline);
+ const BSONObj collation = BSON("locale"
+ << "en_US");
+ for (bool upsert : {false, true}) {
+ for (bool multi : {false, true}) {
+ auto rawUpdate = BSON(
+ "q" << query["q"] << "u" << update["u"] << "multi" << multi << "upsert" << upsert
+ << "collation"
+ << collation);
+ auto cmd = BSON("update" << ns.coll() << "updates" << BSON_ARRAY(rawUpdate));
+ for (bool seq : {false, true}) {
+ auto request = toOpMsg(ns.db(), cmd, seq);
+ auto op = UpdateOp::parse(request);
+ ASSERT_EQ(op.getNamespace().ns(), ns.ns());
+ ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation());
+ ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true);
+ ASSERT_EQ(op.getUpdates().size(), 1u);
+ ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query["q"].Obj());
+
+ const auto& updateMod = op.getUpdates()[0].getU();
+ const auto& updateModPipeline = updateMod.getUpdatePipeline();
+ ASSERT(updateMod.type() == write_ops::UpdateModification::Type::kPipeline);
+ ASSERT_EQ(updateModPipeline.size(), 1u);
+ ASSERT_BSONOBJ_EQ(updateModPipeline[0], pipeline[0]);
+
+ ASSERT_BSONOBJ_EQ(write_ops::collationOf(op.getUpdates()[0]), collation);
+ ASSERT_EQ(op.getUpdates()[0].getUpsert(), upsert);
+ ASSERT_EQ(op.getUpdates()[0].getMulti(), multi);
+ ASSERT_BSONOBJ_EQ(op.getUpdates()[0].toBSON(), rawUpdate);
+ }
+ }
+ }
+}
+
TEST(CommandWriteOpsParsers, Remove) {
const auto ns = NamespaceString("test", "foo");
const BSONObj query = BSON("x" << 1);
@@ -451,7 +496,37 @@ TEST(LegacyWriteOpsParsers, Update) {
ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true);
ASSERT_EQ(op.getUpdates().size(), 1u);
ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query);
- ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU(), update);
+ ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU().getUpdateClassic(), update);
+ ASSERT_EQ(op.getUpdates()[0].getUpsert(), upsert);
+ ASSERT_EQ(op.getUpdates()[0].getMulti(), multi);
+ }
+ }
+}
+
+// When parsing from legacy OP_UPDATE messages, we receive the "u" field as an object. When an array
+// is parsed, we receive it as an object with numeric fields names and can't differentiate between a
+// user constructed object and an array. For that reason, we parse as a classic-style update rather
+// than as pipeline-style.
+TEST(LegacyWriteOpsParsers, UpdateWithArrayUpdateFieldIsParsedAsReplacementStyleUpdate) {
+ const std::string ns = "test.foo";
+ const BSONObj query = BSON("x" << 1);
+ const BSONObj update = BSON_ARRAY(BSON("$addFields" << BSON("x" << 1)));
+ for (bool upsert : {false, true}) {
+ for (bool multi : {false, true}) {
+ auto message = makeUpdateMessage(ns,
+ query,
+ update,
+ (upsert ? UpdateOption_Upsert : 0) |
+ (multi ? UpdateOption_Multi : 0));
+ const auto op = UpdateOp::parseLegacy(message);
+ ASSERT_EQ(op.getNamespace().ns(), ns);
+ ASSERT(!op.getWriteCommandBase().getBypassDocumentValidation());
+ ASSERT_EQ(op.getWriteCommandBase().getOrdered(), true);
+ ASSERT_EQ(op.getUpdates().size(), 1u);
+ ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getQ(), query);
+ ASSERT(op.getUpdates()[0].getU().type() ==
+ write_ops::UpdateModification::Type::kClassic);
+ ASSERT_BSONOBJ_EQ(op.getUpdates()[0].getU().getUpdateClassic(), update);
ASSERT_EQ(op.getUpdates()[0].getUpsert(), upsert);
ASSERT_EQ(op.getUpdates()[0].getMulti(), multi);
}