summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorseanzimm <sean.zimmerman@mongodb.com>2023-02-08 21:20:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-09 07:33:58 +0000
commitdc89715710fcbb11151a87a510f3bdaeced5fac9 (patch)
treed9bea6216808c757435d7e1f06189a819d9f5c91
parent183748b052afa6fecdc42a2a72c0327964ddbd13 (diff)
downloadmongo-dc89715710fcbb11151a87a510f3bdaeced5fac9.tar.gz
SERVER-71900 Support Errors in bulkWrite cursor response
-rw-r--r--jstests/core/write/bulk/bulk_write_cursor.js3
-rw-r--r--src/mongo/db/commands/SConscript4
-rw-r--r--src/mongo/db/commands/bulk_write.cpp7
-rw-r--r--src/mongo/db/commands/bulk_write.idl54
-rw-r--r--src/mongo/db/commands/bulk_write_parser.cpp173
-rw-r--r--src/mongo/db/commands/bulk_write_parser.h171
-rw-r--r--src/mongo/s/write_ops/bulk_write_exec.cpp2
7 files changed, 363 insertions, 51 deletions
diff --git a/jstests/core/write/bulk/bulk_write_cursor.js b/jstests/core/write/bulk/bulk_write_cursor.js
index a3a54aebe4e..dc60fb0afe8 100644
--- a/jstests/core/write/bulk/bulk_write_cursor.js
+++ b/jstests/core/write/bulk/bulk_write_cursor.js
@@ -110,6 +110,9 @@ assert.commandWorked(res);
assert(res.cursor.id == 0);
cursorEntryValidator(res.cursor.firstBatch[0], {ok: 1, n: 1, idx: 0});
cursorEntryValidator(res.cursor.firstBatch[1], {ok: 0, idx: 1, code: 11000});
+// Make sure that error extra info was correctly added
+assert.docEq(res.cursor.firstBatch[1].keyPattern, {_id: 1});
+assert.docEq(res.cursor.firstBatch[1].keyValue, {_id: 1});
assert(!res.cursor.firstBatch[2]);
assert.eq(coll.find().itcount(), 1);
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 8d5c82fbb12..7e59024159b 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -270,9 +270,7 @@ env.Library(
env.Library(
target='bulk_write_parser',
- source=[
- 'bulk_write.idl',
- ],
+ source=['bulk_write.idl', 'bulk_write_parser.cpp'],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/crypto/fle_fields',
'$BUILD_DIR/mongo/db/commands',
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index 0500e2ad144..7b43c365e49 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/commands/bulk_write_parser.h"
#include "mongo/db/cursor_manager.h"
#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/not_primary_error_tracker.h"
@@ -184,12 +185,10 @@ public:
// message generation.
if (auto error = write_ops_exec::generateError(
opCtx, writes.results[i].getStatus(), i, 0 /* numErrors */)) {
- auto replyItem = BulkWriteReplyItem(0, idx);
- replyItem.setCode(error.get().getStatus().code());
- replyItem.setErrmsg(StringData(error.get().getStatus().reason()));
+ auto replyItem = BulkWriteReplyItem(idx, error.get().getStatus());
_replies.emplace_back(replyItem);
} else {
- auto replyItem = BulkWriteReplyItem(1, idx);
+ auto replyItem = BulkWriteReplyItem(idx);
replyItem.setN(writes.results[i].getValue().getN());
_replies.emplace_back(replyItem);
}
diff --git a/src/mongo/db/commands/bulk_write.idl b/src/mongo/db/commands/bulk_write.idl
index 379023ef2eb..767c7561444 100644
--- a/src/mongo/db/commands/bulk_write.idl
+++ b/src/mongo/db/commands/bulk_write.idl
@@ -30,6 +30,7 @@ global:
cpp_namespace: "mongo"
cpp_includes:
- "mongo/client/read_preference.h"
+ - "mongo/db/commands/bulk_write_parser.h"
- "mongo/db/repl/optime.h"
imports:
@@ -40,6 +41,15 @@ imports:
- "mongo/db/repl/replication_types.idl"
- "mongo/db/write_concern_options.idl"
+types:
+ BulkWriteReplyItem:
+ bson_serialization_type: object
+ description: "Structure used to report a single reply resulting from a batch write
+ command."
+ cpp_type: "mongo::BulkWriteReplyItem"
+ serializer: "mongo::BulkWriteReplyItem::serialize"
+ deserializer: "mongo::BulkWriteReplyItem::parse"
+
structs:
BulkWriteInsertOp:
@@ -79,49 +89,7 @@ structs:
mongos on the time-series view, but got rewritten to target
time-series buckets namespace before being sent to shards."
type: optionalBool
- stability: internal
-
- BulkWriteReplyItem:
- description: "A single item in a batch of results in a 'bulkWrite' command response."
- strict: true
- fields:
- ok:
- type: safeDouble
- validator: { gte: 0.0, lte: 1.0 }
- stability: unstable
- idx:
- description: "Holds the index of the batch entry."
- type: int
- validator: { gte: 0 }
- stability: unstable
- n:
- description: "For insert: number of documents inserted.
- For update: number of documents that matched the query predicate.
- For delete: number of documents deleted."
- type: int
- optional: true
- stability: unstable
- nModified:
- description: "Number of updated documents."
- type: int
- optional: true
- stability: unstable
- upserted:
- type: Upserted
- optional: true
- stability: unstable
- value:
- type: IDLAnyTypeOwned
- optional: true
- stability: unstable
- code:
- type: int
- optional: true
- stability: unstable
- errmsg:
- type: string
- optional: true
- stability: unstable
+ stability: internal
BulkWriteCommandResponseCursor:
description: "Cursor holding results for a successful 'bulkWrite' command."
diff --git a/src/mongo/db/commands/bulk_write_parser.cpp b/src/mongo/db/commands/bulk_write_parser.cpp
new file mode 100644
index 00000000000..1e9404e80f4
--- /dev/null
+++ b/src/mongo/db/commands/bulk_write_parser.cpp
@@ -0,0 +1,173 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/commands/bulk_write_parser.h"
+
+#include <string>
+
+#include "mongo/bson/bsonobjbuilder.h"
+
+namespace mongo {
+
+constexpr StringData BulkWriteReplyItem::kCodeFieldName;
+constexpr StringData BulkWriteReplyItem::kErrmsgFieldName;
+constexpr StringData BulkWriteReplyItem::kIdxFieldName;
+constexpr StringData BulkWriteReplyItem::kNFieldName;
+constexpr StringData BulkWriteReplyItem::kNModifiedFieldName;
+constexpr StringData BulkWriteReplyItem::kOkFieldName;
+constexpr StringData BulkWriteReplyItem::kUpsertedFieldName;
+constexpr StringData BulkWriteReplyItem::kValueFieldName;
+
+
+BulkWriteReplyItem::BulkWriteReplyItem()
+ : _ok(mongo::idl::preparsedValue<decltype(_ok)>()),
+ _idx(mongo::idl::preparsedValue<decltype(_idx)>()),
+ _hasOk(false),
+ _hasIdx(false) {
+ // Used for initialization only
+}
+BulkWriteReplyItem::BulkWriteReplyItem(std::int32_t idx, Status status)
+ : _idx(idx), _status(std::move(status)), _hasOk(true), _hasIdx(true) {
+ _ok = _status.isOK() ? 1.0 : 0.0;
+}
+
+void BulkWriteReplyItem::validateOk(const double value) {
+ if (!(value >= 0.0)) {
+ throwComparisonError<double>("ok", ">="_sd, value, 0.0);
+ }
+ if (!(value <= 1.0)) {
+ throwComparisonError<double>("ok", "<="_sd, value, 1.0);
+ }
+}
+
+void BulkWriteReplyItem::validateIdx(const std::int32_t value) {
+ if (!(value >= 0)) {
+ throwComparisonError<std::int32_t>("idx", ">="_sd, value, 0);
+ }
+}
+
+
+BulkWriteReplyItem BulkWriteReplyItem::parse(const BSONObj& bsonObject) {
+ auto object = mongo::idl::preparsedValue<BulkWriteReplyItem>();
+ object.parseProtected(bsonObject);
+ return object;
+}
+
+void BulkWriteReplyItem::parseProtected(const BSONObj& bsonObject) {
+ boost::optional<std::int32_t> code;
+ boost::optional<std::string> errmsg;
+ for (const auto& element : bsonObject) {
+ const auto fieldName = element.fieldNameStringData();
+
+ if (fieldName == kOkFieldName) {
+ _hasOk = true;
+ auto value = element.Double();
+ validateOk(value);
+ _ok = std::move(value);
+ } else if (fieldName == kIdxFieldName) {
+ _hasIdx = true;
+ auto value = element.Int();
+ validateIdx(value);
+ _idx = std::move(value);
+ } else if (fieldName == kNFieldName) {
+ _n = element.Int();
+ } else if (fieldName == kNModifiedFieldName) {
+ _nModified = element.Int();
+ } else if (fieldName == kUpsertedFieldName) {
+ IDLParserContext ctxt("bulkWrite");
+ const auto localObject = element.Obj();
+ _upserted = mongo::write_ops::Upserted::parse(ctxt, localObject);
+ } else if (fieldName == kValueFieldName) {
+ const BSONObj localObject = element.Obj();
+ _value = BSONObj::getOwned(localObject);
+ } else if (fieldName == kCodeFieldName) {
+ code = element.Int();
+ } else if (fieldName == kErrmsgFieldName) {
+ errmsg = element.str();
+ }
+ }
+ if (code) {
+ uassert(ErrorCodes::BadValue, "ok must be 0.0 if error code is supplied", _ok == 0.0);
+ std::string err = "";
+ if (errmsg) {
+ err = errmsg.get();
+ }
+ _status = Status(ErrorCodes::Error(code.get()), err, bsonObject);
+ } else {
+ uassert(ErrorCodes::BadValue, "ok must be 1.0 if no error code is supplied", _ok == 1.0);
+ }
+}
+
+
+BSONObj BulkWriteReplyItem::serialize() const {
+ invariant(_hasOk && _hasIdx);
+
+ BSONObjBuilder builder;
+
+ builder.append(kOkFieldName, _ok);
+
+ builder.append(kIdxFieldName, _idx);
+
+ if (!_status.isOK()) {
+ invariant(_ok == 0.0);
+
+ builder.append(kCodeFieldName, int32_t(_status.code()));
+ builder.append(kErrmsgFieldName, _status.reason());
+ if (auto extraInfo = _status.extraInfo()) {
+ extraInfo->serialize(&builder);
+ }
+ } else {
+ invariant(_ok == 1.0);
+ }
+
+ if (_n) {
+ builder.append(kNFieldName, _n.get());
+ }
+
+ if (_nModified) {
+ builder.append(kNModifiedFieldName, _nModified.get());
+ }
+
+ if (_upserted) {
+ BSONObjBuilder subObjBuilder(builder.subobjStart(kUpsertedFieldName));
+ _upserted.get().serialize(&subObjBuilder);
+ }
+
+ if (_value) {
+ builder.append(kValueFieldName, _value.get());
+ }
+
+ return builder.obj();
+}
+
+
+BSONObj BulkWriteReplyItem::toBSON() const {
+ return serialize();
+}
+} // namespace mongo
diff --git a/src/mongo/db/commands/bulk_write_parser.h b/src/mongo/db/commands/bulk_write_parser.h
new file mode 100644
index 00000000000..08cd9dfd8d9
--- /dev/null
+++ b/src/mongo/db/commands/bulk_write_parser.h
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+
+#include "mongo/base/status.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/ops/write_ops_gen.h"
+
+namespace mongo {
+
+/**
+ * A single item in a batch of results in a 'bulkWrite' command response.
+ */
+class BulkWriteReplyItem {
+public:
+ static constexpr auto kCodeFieldName = "code"_sd;
+ static constexpr auto kErrmsgFieldName = "errmsg"_sd;
+ static constexpr auto kIdxFieldName = "idx"_sd;
+ static constexpr auto kNFieldName = "n"_sd;
+ static constexpr auto kNModifiedFieldName = "nModified"_sd;
+ static constexpr auto kOkFieldName = "ok"_sd;
+ static constexpr auto kUpsertedFieldName = "upserted"_sd;
+ static constexpr auto kValueFieldName = "value"_sd;
+
+ BulkWriteReplyItem();
+ BulkWriteReplyItem(std::int32_t idx, Status status = Status::OK());
+
+ BSONObj serialize() const;
+ BSONObj toBSON() const;
+
+ /**
+ * Factory function that parses a BulkWriteReplyItem from a BSONObj. A BulkWriteReplyItem parsed
+ * this way is strictly a view onto that BSONObj; the BSONObj must be kept valid to
+ * ensure the validity any members of this struct that point-into the BSONObj (i.e.
+ * unowned
+ * objects).
+ */
+ static BulkWriteReplyItem parse(const BSONObj& bsonObject);
+
+ double getOk() const {
+ return _ok;
+ }
+
+ void setOk(double value) {
+ validateOk(value);
+ _ok = std::move(value);
+ _hasOk = true;
+ }
+
+ /**
+ * Holds the index of the batch entry.
+ */
+ std::int32_t getIdx() const {
+ return _idx;
+ }
+
+ void setIdx(std::int32_t value) {
+ validateIdx(value);
+ _idx = std::move(value);
+ _hasIdx = true;
+ }
+
+ /**
+ * For insert: number of documents inserted.
+ * For update: number of documents that matched the query predicate.
+ * For delete: number of documents deleted.
+ */
+ boost::optional<std::int32_t> getN() const {
+ return _n;
+ }
+
+ void setN(boost::optional<std::int32_t> value) {
+ _n = std::move(value);
+ }
+
+ /**
+ * Number of updated documents.
+ */
+ boost::optional<std::int32_t> getNModified() const {
+ return _nModified;
+ }
+
+ void setNModified(boost::optional<std::int32_t> value) {
+ _nModified = std::move(value);
+ }
+
+ /**
+ * Contains documents that have been upserted.
+ */
+ const boost::optional<mongo::write_ops::Upserted>& getUpserted() const {
+ return _upserted;
+ }
+
+ void setUpserted(boost::optional<mongo::write_ops::Upserted> value) {
+ _upserted = std::move(value);
+ }
+
+ /**
+ * The document after the write, if the 'return' field of the request is 'post'.
+ * Otherwise, the document before the write.
+ */
+ const boost::optional<mongo::BSONObj>& getValue() const {
+ return _value;
+ }
+
+ void setValue(boost::optional<mongo::BSONObj> value) {
+ _value = std::move(value);
+ }
+
+ /**
+ * The status associated with the reply potentially containing error data for why the
+ * operation failed.
+ */
+ const Status& getStatus() const {
+ return _status;
+ }
+
+ void setStatus(const Status status) {
+ _status = std::move(status);
+ }
+
+protected:
+ void parseProtected(const BSONObj& bsonObject);
+
+private:
+ void validateOk(double value);
+
+ void validateIdx(std::int32_t value);
+
+private:
+ double _ok;
+ std::int32_t _idx;
+ boost::optional<std::int32_t> _n;
+ boost::optional<std::int32_t> _nModified;
+ boost::optional<mongo::write_ops::Upserted> _upserted;
+ boost::optional<BSONObj> _value;
+ Status _status = Status::OK();
+ bool _hasOk : 1;
+ bool _hasIdx : 1;
+};
+
+} // namespace mongo
diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp
index 82ce973b67c..51016edfae2 100644
--- a/src/mongo/s/write_ops/bulk_write_exec.cpp
+++ b/src/mongo/s/write_ops/bulk_write_exec.cpp
@@ -73,7 +73,7 @@ void execute(OperationContext* opCtx,
// Reassemble the final response based on responses from sub-batches.
auto replies = std::vector<BulkWriteReplyItem>();
- replies.emplace_back(1, 0);
+ replies.emplace_back(0);
reply->setCursor(BulkWriteCommandResponseCursor(0, replies));
LOGV2_DEBUG(7263701, 4, "Finished execution of bulkWrite");