diff options
author | seanzimm <sean.zimmerman@mongodb.com> | 2023-02-08 21:20:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-09 07:33:58 +0000 |
commit | dc89715710fcbb11151a87a510f3bdaeced5fac9 (patch) | |
tree | d9bea6216808c757435d7e1f06189a819d9f5c91 | |
parent | 183748b052afa6fecdc42a2a72c0327964ddbd13 (diff) | |
download | mongo-dc89715710fcbb11151a87a510f3bdaeced5fac9.tar.gz |
SERVER-71900 Support Errors in bulkWrite cursor response
-rw-r--r-- | jstests/core/write/bulk/bulk_write_cursor.js | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/db/commands/bulk_write.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/commands/bulk_write.idl | 54 | ||||
-rw-r--r-- | src/mongo/db/commands/bulk_write_parser.cpp | 173 | ||||
-rw-r--r-- | src/mongo/db/commands/bulk_write_parser.h | 171 | ||||
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec.cpp | 2 |
7 files changed, 363 insertions, 51 deletions
diff --git a/jstests/core/write/bulk/bulk_write_cursor.js b/jstests/core/write/bulk/bulk_write_cursor.js index a3a54aebe4e..dc60fb0afe8 100644 --- a/jstests/core/write/bulk/bulk_write_cursor.js +++ b/jstests/core/write/bulk/bulk_write_cursor.js @@ -110,6 +110,9 @@ assert.commandWorked(res); assert(res.cursor.id == 0); cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0}); cursorEntryValidator(res.cursor.firstBatch[1], {ok: 0, idx: 1, code: 11000}); +// Make sure that error extra info was correctly added +assert.docEq(res.cursor.firstBatch[1].keyPattern, {_id: 1}); +assert.docEq(res.cursor.firstBatch[1].keyValue, {_id: 1}); assert(!res.cursor.firstBatch[2]); assert.eq(coll.find().itcount(), 1); diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 8d5c82fbb12..7e59024159b 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -270,9 +270,7 @@ env.Library( env.Library( target='bulk_write_parser', - source=[ - 'bulk_write.idl', - ], + source=['bulk_write.idl', 'bulk_write_parser.cpp'], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/crypto/fle_fields', '$BUILD_DIR/mongo/db/commands', diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp index 0500e2ad144..7b43c365e49 100644 --- a/src/mongo/db/commands/bulk_write.cpp +++ b/src/mongo/db/commands/bulk_write.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog/document_validation.h" #include "mongo/db/commands.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/commands/bulk_write_parser.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/not_primary_error_tracker.h" @@ -184,12 +185,10 @@ public: // message generation. if (auto error = write_ops_exec::generateError( opCtx, writes.results[i].getStatus(), i, 0 /* numErrors */)) { - auto replyItem = BulkWriteReplyItem(0, idx); - replyItem.setCode(error.get().getStatus().code()); - replyItem.setErrmsg(StringData(error.get().getStatus().reason())); + auto replyItem = BulkWriteReplyItem(idx, error.get().getStatus()); _replies.emplace_back(replyItem); } else { - auto replyItem = BulkWriteReplyItem(1, idx); + auto replyItem = BulkWriteReplyItem(idx); replyItem.setN(writes.results[i].getValue().getN()); _replies.emplace_back(replyItem); } diff --git a/src/mongo/db/commands/bulk_write.idl b/src/mongo/db/commands/bulk_write.idl index 379023ef2eb..767c7561444 100644 --- a/src/mongo/db/commands/bulk_write.idl +++ b/src/mongo/db/commands/bulk_write.idl @@ -30,6 +30,7 @@ global: cpp_namespace: "mongo" cpp_includes: - "mongo/client/read_preference.h" + - "mongo/db/commands/bulk_write_parser.h" - "mongo/db/repl/optime.h" imports: @@ -40,6 +41,15 @@ imports: - "mongo/db/repl/replication_types.idl" - "mongo/db/write_concern_options.idl" +types: + BulkWriteReplyItem: + bson_serialization_type: object + description: "Structure used to report a single reply resulting from a batch write + command." + cpp_type: "mongo::BulkWriteReplyItem" + serializer: "mongo::BulkWriteReplyItem::serialize" + deserializer: "mongo::BulkWriteReplyItem::parse" + structs: BulkWriteInsertOp: @@ -79,49 +89,7 @@ structs: mongos on the time-series view, but got rewritten to target time-series buckets namespace before being sent to shards." type: optionalBool - stability: internal - - BulkWriteReplyItem: - description: "A single item in a batch of results in a 'bulkWrite' command response." - strict: true - fields: - ok: - type: safeDouble - validator: { gte: 0.0, lte: 1.0 } - stability: unstable - idx: - description: "Holds the index of the batch entry." - type: int - validator: { gte: 0 } - stability: unstable - n: - description: "For insert: number of documents inserted. - For update: number of documents that matched the query predicate. - For delete: number of documents deleted." - type: int - optional: true - stability: unstable - nModified: - description: "Number of updated documents." - type: int - optional: true - stability: unstable - upserted: - type: Upserted - optional: true - stability: unstable - value: - type: IDLAnyTypeOwned - optional: true - stability: unstable - code: - type: int - optional: true - stability: unstable - errmsg: - type: string - optional: true - stability: unstable + stability: internal BulkWriteCommandResponseCursor: description: "Cursor holding results for a successful 'bulkWrite' command." diff --git a/src/mongo/db/commands/bulk_write_parser.cpp b/src/mongo/db/commands/bulk_write_parser.cpp new file mode 100644 index 00000000000..1e9404e80f4 --- /dev/null +++ b/src/mongo/db/commands/bulk_write_parser.cpp @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/commands/bulk_write_parser.h" + +#include <string> + +#include "mongo/bson/bsonobjbuilder.h" + +namespace mongo { + +constexpr StringData BulkWriteReplyItem::kCodeFieldName; +constexpr StringData BulkWriteReplyItem::kErrmsgFieldName; +constexpr StringData BulkWriteReplyItem::kIdxFieldName; +constexpr StringData BulkWriteReplyItem::kNFieldName; +constexpr StringData BulkWriteReplyItem::kNModifiedFieldName; +constexpr StringData BulkWriteReplyItem::kOkFieldName; +constexpr StringData BulkWriteReplyItem::kUpsertedFieldName; +constexpr StringData BulkWriteReplyItem::kValueFieldName; + + +BulkWriteReplyItem::BulkWriteReplyItem() + : _ok(mongo::idl::preparsedValue<decltype(_ok)>()), + _idx(mongo::idl::preparsedValue<decltype(_idx)>()), + _hasOk(false), + _hasIdx(false) { + // Used for initialization only +} +BulkWriteReplyItem::BulkWriteReplyItem(std::int32_t idx, Status status) + : _idx(idx), _status(std::move(status)), _hasOk(true), _hasIdx(true) { + _ok = _status.isOK() ? 1.0 : 0.0; +} + +void BulkWriteReplyItem::validateOk(const double value) { + if (!(value >= 0.0)) { + throwComparisonError<double>("ok", ">="_sd, value, 0.0); + } + if (!(value <= 1.0)) { + throwComparisonError<double>("ok", "<="_sd, value, 1.0); + } +} + +void BulkWriteReplyItem::validateIdx(const std::int32_t value) { + if (!(value >= 0)) { + throwComparisonError<std::int32_t>("idx", ">="_sd, value, 0); + } +} + + +BulkWriteReplyItem BulkWriteReplyItem::parse(const BSONObj& bsonObject) { + auto object = mongo::idl::preparsedValue<BulkWriteReplyItem>(); + object.parseProtected(bsonObject); + return object; +} + +void BulkWriteReplyItem::parseProtected(const BSONObj& bsonObject) { + boost::optional<std::int32_t> code; + boost::optional<std::string> errmsg; + for (const auto& element : bsonObject) { + const auto fieldName = element.fieldNameStringData(); + + if (fieldName == kOkFieldName) { + _hasOk = true; + auto value = element.Double(); + validateOk(value); + _ok = std::move(value); + } else if (fieldName == kIdxFieldName) { + _hasIdx = true; + auto value = element.Int(); + validateIdx(value); + _idx = std::move(value); + } else if (fieldName == kNFieldName) { + _n = element.Int(); + } else if (fieldName == kNModifiedFieldName) { + _nModified = element.Int(); + } else if (fieldName == kUpsertedFieldName) { + IDLParserContext ctxt("bulkWrite"); + const auto localObject = element.Obj(); + _upserted = mongo::write_ops::Upserted::parse(ctxt, localObject); + } else if (fieldName == kValueFieldName) { + const BSONObj localObject = element.Obj(); + _value = BSONObj::getOwned(localObject); + } else if (fieldName == kCodeFieldName) { + code = element.Int(); + } else if (fieldName == kErrmsgFieldName) { + errmsg = element.str(); + } + } + if (code) { + uassert(ErrorCodes::BadValue, "ok must be 0.0 if error code is supplied", _ok == 0.0); + std::string err = ""; + if (errmsg) { + err = errmsg.get(); + } + _status = Status(ErrorCodes::Error(code.get()), err, bsonObject); + } else { + uassert(ErrorCodes::BadValue, "ok must be 1.0 if no error code is supplied", _ok == 1.0); + } +} + + +BSONObj BulkWriteReplyItem::serialize() const { + invariant(_hasOk && _hasIdx); + + BSONObjBuilder builder; + + builder.append(kOkFieldName, _ok); + + builder.append(kIdxFieldName, _idx); + + if (!_status.isOK()) { + invariant(_ok == 0.0); + + builder.append(kCodeFieldName, int32_t(_status.code())); + builder.append(kErrmsgFieldName, _status.reason()); + if (auto extraInfo = _status.extraInfo()) { + extraInfo->serialize(&builder); + } + } else { + invariant(_ok == 1.0); + } + + if (_n) { + builder.append(kNFieldName, _n.get()); + } + + if (_nModified) { + builder.append(kNModifiedFieldName, _nModified.get()); + } + + if (_upserted) { + BSONObjBuilder subObjBuilder(builder.subobjStart(kUpsertedFieldName)); + _upserted.get().serialize(&subObjBuilder); + } + + if (_value) { + builder.append(kValueFieldName, _value.get()); + } + + return builder.obj(); +} + + +BSONObj BulkWriteReplyItem::toBSON() const { + return serialize(); +} +} // namespace mongo diff --git a/src/mongo/db/commands/bulk_write_parser.h b/src/mongo/db/commands/bulk_write_parser.h new file mode 100644 index 00000000000..08cd9dfd8d9 --- /dev/null +++ b/src/mongo/db/commands/bulk_write_parser.h @@ -0,0 +1,171 @@ +/** + * Copyright (C) 2023-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/base/status.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/ops/write_ops_gen.h" + +namespace mongo { + +/** + * A single item in a batch of results in a 'bulkWrite' command response. + */ +class BulkWriteReplyItem { +public: + static constexpr auto kCodeFieldName = "code"_sd; + static constexpr auto kErrmsgFieldName = "errmsg"_sd; + static constexpr auto kIdxFieldName = "idx"_sd; + static constexpr auto kNFieldName = "n"_sd; + static constexpr auto kNModifiedFieldName = "nModified"_sd; + static constexpr auto kOkFieldName = "ok"_sd; + static constexpr auto kUpsertedFieldName = "upserted"_sd; + static constexpr auto kValueFieldName = "value"_sd; + + BulkWriteReplyItem(); + BulkWriteReplyItem(std::int32_t idx, Status status = Status::OK()); + + BSONObj serialize() const; + BSONObj toBSON() const; + + /** + * Factory function that parses a BulkWriteReplyItem from a BSONObj. A BulkWriteReplyItem parsed + * this way is strictly a view onto that BSONObj; the BSONObj must be kept valid to + * ensure the validity any members of this struct that point-into the BSONObj (i.e. + * unowned + * objects). + */ + static BulkWriteReplyItem parse(const BSONObj& bsonObject); + + double getOk() const { + return _ok; + } + + void setOk(double value) { + validateOk(value); + _ok = std::move(value); + _hasOk = true; + } + + /** + * Holds the index of the batch entry. + */ + std::int32_t getIdx() const { + return _idx; + } + + void setIdx(std::int32_t value) { + validateIdx(value); + _idx = std::move(value); + _hasIdx = true; + } + + /** + * For insert: number of documents inserted. + * For update: number of documents that matched the query predicate. + * For delete: number of documents deleted. + */ + boost::optional<std::int32_t> getN() const { + return _n; + } + + void setN(boost::optional<std::int32_t> value) { + _n = std::move(value); + } + + /** + * Number of updated documents. + */ + boost::optional<std::int32_t> getNModified() const { + return _nModified; + } + + void setNModified(boost::optional<std::int32_t> value) { + _nModified = std::move(value); + } + + /** + * Contains documents that have been upserted. + */ + const boost::optional<mongo::write_ops::Upserted>& getUpserted() const { + return _upserted; + } + + void setUpserted(boost::optional<mongo::write_ops::Upserted> value) { + _upserted = std::move(value); + } + + /** + * The document after the write, if the 'return' field of the request is 'post'. + * Otherwise, the document before the write. + */ + const boost::optional<mongo::BSONObj>& getValue() const { + return _value; + } + + void setValue(boost::optional<mongo::BSONObj> value) { + _value = std::move(value); + } + + /** + * The status associated with the reply potentially containing error data for why the + * operation failed. + */ + const Status& getStatus() const { + return _status; + } + + void setStatus(const Status status) { + _status = std::move(status); + } + +protected: + void parseProtected(const BSONObj& bsonObject); + +private: + void validateOk(double value); + + void validateIdx(std::int32_t value); + +private: + double _ok; + std::int32_t _idx; + boost::optional<std::int32_t> _n; + boost::optional<std::int32_t> _nModified; + boost::optional<mongo::write_ops::Upserted> _upserted; + boost::optional<BSONObj> _value; + Status _status = Status::OK(); + bool _hasOk : 1; + bool _hasIdx : 1; +}; + +} // namespace mongo diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp index 82ce973b67c..51016edfae2 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec.cpp @@ -73,7 +73,7 @@ void execute(OperationContext* opCtx, // Reassemble the final response based on responses from sub-batches. auto replies = std::vector<BulkWriteReplyItem>(); - replies.emplace_back(1, 0); + replies.emplace_back(0); reply->setCursor(BulkWriteCommandResponseCursor(0, replies)); LOGV2_DEBUG(7263701, 4, "Finished execution of bulkWrite"); |