diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2022-02-23 17:16:31 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-23 18:58:11 +0000 |
commit | 23d4be01ca042cd28d0d616f9334e02598e79510 (patch) | |
tree | f4ad008c37f1c9f29812f274b9734646c5de7839 /src/mongo | |
parent | 5d99bcaa999e8ca595e36642bdd174abe6986929 (diff) | |
download | mongo-23d4be01ca042cd28d0d616f9334e02598e79510.tar.gz |
SERVER-63331 Convert WriteErrors parsing into IDL
Diffstat (limited to 'src/mongo')
22 files changed, 383 insertions, 313 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f15f7660846..6ef68939530 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1944,11 +1944,11 @@ env.Library( 'vector_clock_mutable', ], LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/db/s/sharding_api_d', '$BUILD_DIR/mongo/s/grid', 'dbdirectclient', 'repl/replica_set_aware_service', 'rw_concern_d', + 's/sharding_api_d', 'server_options_core', ], ) diff --git a/src/mongo/db/commands/user_management_commands.cpp b/src/mongo/db/commands/user_management_commands.cpp index d2bb8bbb225..14a4688e01a 100644 --- a/src/mongo/db/commands/user_management_commands.cpp +++ b/src/mongo/db/commands/user_management_commands.cpp @@ -253,22 +253,9 @@ Status insertAuthzDocument(OperationContext* opCtx, const BSONObj& document) { try { DBDirectClient client(opCtx); - - BSONObj res; - client.runCommand(collectionName.db().toString(), - [&] { - write_ops::InsertCommandRequest insertOp(collectionName); - insertOp.setDocuments({document}); - return insertOp.toBSON({}); - }(), - res); - - BatchedCommandResponse response; - std::string errmsg; - if (!response.parseBSON(res, &errmsg)) { - return Status(ErrorCodes::FailedToParse, errmsg); - } - return response.toStatus(); + write_ops::checkWriteErrors( + client.insert(write_ops::InsertCommandRequest(collectionName, {document}))); + return Status::OK(); } catch (const DBException& e) { return e.toStatus(); } @@ -289,33 +276,22 @@ Status updateAuthzDocuments(OperationContext* opCtx, std::int64_t* numMatched) { try { DBDirectClient client(opCtx); - - BSONObj res; - client.runCommand(collectionName.db().toString(), - [&] { - write_ops::UpdateCommandRequest updateOp(collectionName); - updateOp.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(query); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( - updatePattern)); - entry.setMulti(multi); - entry.setUpsert(upsert); - return entry; - }()}); - return updateOp.toBSON({}); - }(), - res); - - BatchedCommandResponse response; - std::string errmsg; - if (!response.parseBSON(res, &errmsg)) { - return Status(ErrorCodes::FailedToParse, errmsg); - } - if (response.getOk()) { - *numMatched = response.getN(); - } - return response.toStatus(); + auto result = client.update([&] { + write_ops::UpdateCommandRequest updateOp(collectionName); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(query); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(updatePattern)); + entry.setMulti(multi); + entry.setUpsert(upsert); + return entry; + }()}); + return updateOp; + }()); + + *numMatched = result.getN(); + write_ops::checkWriteErrors(result); + return Status::OK(); } catch (const DBException& e) { return e.toStatus(); } @@ -363,30 +339,20 @@ Status removeAuthzDocuments(OperationContext* opCtx, std::int64_t* numRemoved) { try { DBDirectClient client(opCtx); - - BSONObj res; - client.runCommand(collectionName.db().toString(), - [&] { - write_ops::DeleteCommandRequest deleteOp(collectionName); - deleteOp.setDeletes({[&] { - write_ops::DeleteOpEntry entry; - entry.setQ(query); - entry.setMulti(true); - return entry; - }()}); - return deleteOp.toBSON({}); - }(), - res); - - BatchedCommandResponse response; - std::string errmsg; - if (!response.parseBSON(res, &errmsg)) { - return Status(ErrorCodes::FailedToParse, errmsg); - } - if (response.getOk()) { - *numRemoved = response.getN(); - } - return response.toStatus(); + auto result = client.remove([&] { + write_ops::DeleteCommandRequest deleteOp(collectionName); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(query); + entry.setMulti(true); + return entry; + }()}); + return deleteOp; + }()); + + *numRemoved = result.getN(); + write_ops::checkWriteErrors(result); + return Status::OK(); } catch (const DBException& e) { return e.toStatus(); } diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 865a69dbc7b..9b63f16641c 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -417,7 +417,6 @@ void populateReply(OperationContext* opCtx, write_ops_exec::WriteResult result, CommandReplyType* cmdReply, boost::optional<PopulateReplyHooks> hooks = boost::none) { - invariant(cmdReply); if (shouldSkipOutput(opCtx)) @@ -463,7 +462,11 @@ void populateReply(OperationContext* opCtx, } if (!errors.empty()) { - replyBase.setWriteErrors(errors); + std::vector<write_ops::WriteError> writeErrors; + for (const auto& e : errors) { + writeErrors.emplace_back(write_ops::WriteError::parse(e)); + } + replyBase.setWriteErrors(std::move(writeErrors)); } // writeConcernError field is handled by command processor. @@ -545,22 +548,20 @@ public: write_ops::InsertCommandReply typedRun(OperationContext* opCtx) final try { transactionChecks(opCtx, ns()); - write_ops::InsertCommandReply insertReply; if (isTimeseries(opCtx, request())) { // Re-throw parsing exceptions to be consistent with CmdInsert::Invocation's // constructor. try { - _performTimeseriesWrites(opCtx, &insertReply); + return _performTimeseriesWrites(opCtx); } catch (DBException& ex) { ex.addContext(str::stream() << "time-series insert failed: " << ns().ns()); throw; } - - return insertReply; } auto reply = write_ops_exec::performInserts(opCtx, request()); + write_ops::InsertCommandReply insertReply; populateReply(opCtx, !request().getWriteCommandRequestBase().getOrdered(), request().getDocuments().size(), @@ -1239,8 +1240,7 @@ public: } while (!docsToRetry.empty()); } - void _performTimeseriesWrites(OperationContext* opCtx, - write_ops::InsertCommandReply* insertReply) const { + write_ops::InsertCommandReply _performTimeseriesWrites(OperationContext* opCtx) const { auto& curOp = *CurOp::get(opCtx); ON_BLOCK_EXIT([&] { // This is the only part of finishCurOp we need to do for inserts because they reuse @@ -1276,7 +1276,8 @@ public: boost::optional<OID> electionId; bool containsRetry = false; - auto& baseReply = insertReply->getWriteCommandReplyBase(); + write_ops::InsertCommandReply insertReply; + auto& baseReply = insertReply.getWriteCommandReplyBase(); if (request().getOrdered()) { baseReply.setN(_performOrderedTimeseriesWrites( @@ -1293,7 +1294,11 @@ public: } if (!errors.empty()) { - baseReply.setWriteErrors(errors); + std::vector<write_ops::WriteError> writeErrors; + for (const auto& e : errors) { + writeErrors.emplace_back(write_ops::WriteError::parse(e)); + } + baseReply.setWriteErrors(std::move(writeErrors)); } if (opTime) { baseReply.setOpTime(*opTime); @@ -1306,6 +1311,7 @@ public: } curOp.debug().additiveMetrics.ninserted = baseReply.getN(); + return insertReply; } }; } cmdInsert; diff --git a/src/mongo/db/dbdirectclient_test.cpp b/src/mongo/db/dbdirectclient_test.cpp index f46097d4004..19ba4c35e86 100644 --- a/src/mongo/db/dbdirectclient_test.cpp +++ b/src/mongo/db/dbdirectclient_test.cpp @@ -77,7 +77,7 @@ TEST_F(DBDirectClientTest, InsertDuplicateDocumentDoesNotThrow) { ASSERT_EQ(insertReply.getN(), 1); auto writeErrors = insertReply.getWriteErrors().get(); ASSERT_EQ(writeErrors.size(), 1); - ASSERT_EQ(writeErrors[0].getIntField("code"), ErrorCodes::DuplicateKey); + ASSERT_EQ(writeErrors[0].getStatus(), ErrorCodes::DuplicateKey); } TEST_F(DBDirectClientTest, UpdateSingleDocumentSuccessfully) { @@ -113,7 +113,7 @@ TEST_F(DBDirectClientTest, UpdateDuplicateImmutableFieldDoesNotThrow) { ASSERT_EQ(updateReply.getNModified(), 0); auto writeErrors = updateReply.getWriteErrors().get(); ASSERT_EQ(writeErrors.size(), 1); - ASSERT_EQ(writeErrors[0].getIntField("code"), ErrorCodes::ImmutableField); + ASSERT_EQ(writeErrors[0].getStatus(), ErrorCodes::ImmutableField); } TEST_F(DBDirectClientTest, DeleteSingleDocumentSuccessful) { @@ -154,7 +154,7 @@ TEST_F(DBDirectClientTest, DeleteDocumentIncorrectHintDoesNotThrow) { ASSERT_EQ(deleteReply.getN(), 0); auto writeErrors = deleteReply.getWriteErrors().get(); ASSERT_EQ(writeErrors.size(), 1); - ASSERT_EQ(writeErrors[0].getIntField("code"), ErrorCodes::BadValue); + ASSERT_EQ(writeErrors[0].getStatus(), ErrorCodes::BadValue); } TEST_F(DBDirectClientTest, ExhaustQuery) { diff --git a/src/mongo/db/ops/write_ops.cpp b/src/mongo/db/ops/write_ops.cpp index c4b6d5bf54a..14c5ca97aa0 100644 --- a/src/mongo/db/ops/write_ops.cpp +++ b/src/mongo/db/ops/write_ops.cpp @@ -35,7 +35,6 @@ #include "mongo/db/pipeline/aggregation_request_helper.h" #include "mongo/db/update/update_oplog_entry_serialization.h" #include "mongo/db/update/update_oplog_entry_version.h" -#include "mongo/logv2/redaction.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" @@ -43,11 +42,16 @@ namespace mongo { +using write_ops::DeleteCommandReply; using write_ops::DeleteCommandRequest; using write_ops::DeleteOpEntry; +using write_ops::FindAndModifyCommandReply; +using write_ops::InsertCommandReply; using write_ops::InsertCommandRequest; +using write_ops::UpdateCommandReply; using write_ops::UpdateCommandRequest; using write_ops::UpdateOpEntry; +using write_ops::WriteCommandRequestBase; namespace { @@ -64,13 +68,13 @@ void checkOpCountForCommand(const T& op, size_t numOps) { str::stream() << "Number of statement ids must match the number of batch entries. Got " << stmtIds->size() << " statement ids but " << numOps << " operations. Statement ids: " << BSON("stmtIds" << *stmtIds) - << ". Write command: " << redact(op.toBSON({})), + << ". Write command: " << op.toBSON({}), stmtIds->size() == numOps); uassert(ErrorCodes::InvalidOptions, str::stream() << "May not specify both stmtId and stmtIds in write command. Got " << BSON("stmtId" << *op.getWriteCommandRequestBase().getStmtId() << "stmtIds" << *stmtIds) - << ". Write command: " << redact(op.toBSON({})), + << ". Write command: " << op.toBSON({}), !op.getWriteCommandRequestBase().getStmtId()); } } @@ -139,94 +143,19 @@ bool isClassicalUpdateReplacement(const BSONObj& update) { return update.firstElementFieldName()[0] != '$'; } -} // namespace write_ops - -write_ops::InsertCommandRequest InsertOp::parse(const OpMsgRequest& request) { - auto insertOp = InsertCommandRequest::parse(IDLParserErrorContext("insert"), request); - - validate(insertOp); - return insertOp; -} - -write_ops::InsertCommandRequest InsertOp::parseLegacy(const Message& msgRaw) { - DbMessage msg(msgRaw); - - InsertCommandRequest op(NamespaceString(msg.getns())); - - { - write_ops::WriteCommandRequestBase writeCommandBase; - writeCommandBase.setBypassDocumentValidation(false); - writeCommandBase.setOrdered(!(msg.reservedField() & InsertOption_ContinueOnError)); - op.setWriteCommandRequestBase(std::move(writeCommandBase)); - } - - uassert(ErrorCodes::InvalidLength, "Need at least one object to insert", msg.moreJSObjs()); - - op.setDocuments([&] { - std::vector<BSONObj> documents; - while (msg.moreJSObjs()) { - documents.push_back(msg.nextJsObj()); - } - - return documents; - }()); - - validate(op); - return op; -} - -write_ops::InsertCommandReply InsertOp::parseResponse(const BSONObj& obj) { - uassertStatusOK(getStatusFromCommandResult(obj)); - return write_ops::InsertCommandReply::parse(IDLParserErrorContext("insertReply"), obj); -} - -void InsertOp::validate(const write_ops::InsertCommandRequest& insertOp) { - const auto& docs = insertOp.getDocuments(); - checkOpCountForCommand(insertOp, docs.size()); -} - -write_ops::UpdateCommandRequest UpdateOp::parse(const OpMsgRequest& request) { - auto updateOp = UpdateCommandRequest::parse(IDLParserErrorContext("update"), request); - - checkOpCountForCommand(updateOp, updateOp.getUpdates().size()); - return updateOp; -} - -write_ops::UpdateCommandReply UpdateOp::parseResponse(const BSONObj& obj) { - uassertStatusOK(getStatusFromCommandResult(obj)); - - return write_ops::UpdateCommandReply::parse(IDLParserErrorContext("updateReply"), obj); -} - -void UpdateOp::validate(const UpdateCommandRequest& updateOp) { - checkOpCountForCommand(updateOp, updateOp.getUpdates().size()); -} - -write_ops::FindAndModifyCommandReply FindAndModifyOp::parseResponse(const BSONObj& obj) { - uassertStatusOK(getStatusFromCommandResult(obj)); - - return write_ops::FindAndModifyCommandReply::parse(IDLParserErrorContext("findAndModifyReply"), - obj); -} - -write_ops::DeleteCommandRequest DeleteOp::parse(const OpMsgRequest& request) { - auto deleteOp = DeleteCommandRequest::parse(IDLParserErrorContext("delete"), request); +void checkWriteErrors(const WriteCommandReplyBase& reply) { + if (!reply.getWriteErrors()) + return; - checkOpCountForCommand(deleteOp, deleteOp.getDeletes().size()); - return deleteOp; -} + const auto& writeErrors = *reply.getWriteErrors(); + uassert(633310, "Write errors must not be empty", !writeErrors.empty()); -write_ops::DeleteCommandReply DeleteOp::parseResponse(const BSONObj& obj) { - uassertStatusOK(getStatusFromCommandResult(obj)); - return write_ops::DeleteCommandReply::parse(IDLParserErrorContext("deleteReply"), obj); + const auto& firstError = writeErrors.front(); + uassertStatusOK(firstError.getStatus()); } -void DeleteOp::validate(const DeleteCommandRequest& deleteOp) { - checkOpCountForCommand(deleteOp, deleteOp.getDeletes().size()); -} - -write_ops::UpdateModification write_ops::UpdateModification::parseFromOplogEntry( - const BSONObj& oField, const DiffOptions& options) { +UpdateModification UpdateModification::parseFromOplogEntry(const BSONObj& oField, + const DiffOptions& options) { BSONElement vField = oField[kUpdateOplogEntryVersionFieldName]; BSONElement idField = oField["_id"]; @@ -257,13 +186,13 @@ write_ops::UpdateModification write_ops::UpdateModification::parseFromOplogEntry } } -write_ops::UpdateModification::UpdateModification(doc_diff::Diff diff, DiffOptions options) +UpdateModification::UpdateModification(doc_diff::Diff diff, DiffOptions options) : _update(DeltaUpdate{std::move(diff), options}) {} -write_ops::UpdateModification::UpdateModification(TransformFunc transform) +UpdateModification::UpdateModification(TransformFunc transform) : _update(TransformUpdate{std::move(transform)}) {} -write_ops::UpdateModification::UpdateModification(BSONElement update) { +UpdateModification::UpdateModification(BSONElement update) { const auto type = update.type(); if (type == BSONType::Object) { _update = UpdateModification(update.Obj(), ClassicTag{})._update; @@ -279,9 +208,7 @@ write_ops::UpdateModification::UpdateModification(BSONElement update) { // If we know whether the update is a replacement, use that value. For example, when we're parsing // the oplog entry, we know if the update is a replacement by checking whether there's an _id field. -write_ops::UpdateModification::UpdateModification(const BSONObj& update, - ClassicTag, - bool isReplacement) { +UpdateModification::UpdateModification(const BSONObj& update, ClassicTag, bool isReplacement) { if (isReplacement) { _update = ReplacementUpdate{update}; } else { @@ -292,21 +219,21 @@ write_ops::UpdateModification::UpdateModification(const BSONObj& update, // If we don't know whether the update is a replacement, for example while we are parsing a user // request, we infer this by checking whether the first element is a $-field to distinguish modifier // style updates. -write_ops::UpdateModification::UpdateModification(const BSONObj& update, ClassicTag) +UpdateModification::UpdateModification(const BSONObj& update, ClassicTag) : UpdateModification(update, ClassicTag{}, isClassicalUpdateReplacement(update)) {} -write_ops::UpdateModification::UpdateModification(std::vector<BSONObj> pipeline) +UpdateModification::UpdateModification(std::vector<BSONObj> pipeline) : _update{PipelineUpdate{std::move(pipeline)}} {} /** * IMPORTANT: The method should not be modified, as API version input/output guarantees could * break because of it. */ -write_ops::UpdateModification write_ops::UpdateModification::parseFromBSON(BSONElement elem) { +UpdateModification UpdateModification::parseFromBSON(BSONElement elem) { return UpdateModification(elem); } -int write_ops::UpdateModification::objsize() const { +int UpdateModification::objsize() const { return stdx::visit( visit_helper::Overloaded{ [](const ReplacementUpdate& replacement) -> int { return replacement.bson.objsize(); }, @@ -324,8 +251,7 @@ int write_ops::UpdateModification::objsize() const { _update); } - -write_ops::UpdateModification::Type write_ops::UpdateModification::type() const { +UpdateModification::Type UpdateModification::type() const { return stdx::visit( visit_helper::Overloaded{ [](const ReplacementUpdate& replacement) { return Type::kReplacement; }, @@ -340,8 +266,7 @@ write_ops::UpdateModification::Type write_ops::UpdateModification::type() const * IMPORTANT: The method should not be modified, as API version input/output guarantees could * break because of it. */ -void write_ops::UpdateModification::serializeToBSON(StringData fieldName, - BSONObjBuilder* bob) const { +void UpdateModification::serializeToBSON(StringData fieldName, BSONObjBuilder* bob) const { stdx::visit( visit_helper::Overloaded{ @@ -363,4 +288,145 @@ void write_ops::UpdateModification::serializeToBSON(StringData fieldName, _update); } +WriteError::WriteError(int32_t index, Status status) : _index(index), _status(std::move(status)) {} + +WriteError WriteError::parse(const BSONObj& obj) { + auto index = int32_t(obj[WriteError::kIndexFieldName].Int()); + auto status = [&] { + auto code = ErrorCodes::Error(obj[WriteError::kCodeFieldName].Int()); + auto errmsg = obj[WriteError::kErrmsgFieldName].valueStringDataSafe(); + + // At least up to FCV 5.x, the write commands operation used to convert StaleConfig errors + // into StaleShardVersion and store the extra info of StaleConfig in a sub-field called + // "errInfo". + // + // TODO (SERVER-63327): This special parsing should be removed in the stable version + // following the resolution of this ticket. + if (code == ErrorCodes::StaleShardVersion) { + return Status(ErrorCodes::StaleConfig, + std::move(errmsg), + obj[WriteError::kErrInfoFieldName].Obj()); + } + + // All remaining errors have the error stored at the same level as the code and errmsg (in + // the same way that Status is serialised as part of regular command response) + return Status(code, std::move(errmsg), obj); + }(); + + return WriteError(index, std::move(status)); +} + +BSONObj WriteError::serialize() const { + BSONObjBuilder errBuilder; + errBuilder.append(WriteError::kIndexFieldName, _index); + + // At least up to FCV 5.x, the write commands operation used to convert StaleConfig errors into + // StaleShardVersion and store the extra info of StaleConfig in a sub-field called "errInfo". + // This logic preserves this for backwards compatibility. + // + // TODO (SERVER-63327): This special serialisation should be removed in the stable version + // following the resolution of this ticket. + if (_status == ErrorCodes::StaleConfig) { + errBuilder.append(WriteError::kCodeFieldName, int32_t(ErrorCodes::StaleShardVersion)); + errBuilder.append(WriteError::kErrmsgFieldName, _status.reason()); + auto extraInfo = _status.extraInfo(); + invariant(extraInfo); + BSONObjBuilder extraInfoBuilder(errBuilder.subobjStart(WriteError::kErrInfoFieldName)); + extraInfo->serialize(&extraInfoBuilder); + } else { + errBuilder.append(WriteError::kCodeFieldName, int32_t(_status.code())); + errBuilder.append(WriteError::kErrmsgFieldName, _status.reason()); + if (auto extraInfo = _status.extraInfo()) { + extraInfo->serialize(&errBuilder); + } + } + + return errBuilder.obj(); +} + +} // namespace write_ops + +InsertCommandRequest InsertOp::parse(const OpMsgRequest& request) { + auto insertOp = InsertCommandRequest::parse(IDLParserErrorContext("insert"), request); + + validate(insertOp); + return insertOp; +} + +InsertCommandRequest InsertOp::parseLegacy(const Message& msgRaw) { + DbMessage msg(msgRaw); + + InsertCommandRequest op(NamespaceString(msg.getns())); + + { + WriteCommandRequestBase writeCommandBase; + writeCommandBase.setBypassDocumentValidation(false); + writeCommandBase.setOrdered(!(msg.reservedField() & InsertOption_ContinueOnError)); + op.setWriteCommandRequestBase(std::move(writeCommandBase)); + } + + uassert(ErrorCodes::InvalidLength, "Need at least one object to insert", msg.moreJSObjs()); + + op.setDocuments([&] { + std::vector<BSONObj> documents; + while (msg.moreJSObjs()) { + documents.push_back(msg.nextJsObj()); + } + + return documents; + }()); + + validate(op); + return op; +} + +InsertCommandReply InsertOp::parseResponse(const BSONObj& obj) { + uassertStatusOK(getStatusFromCommandResult(obj)); + return InsertCommandReply::parse(IDLParserErrorContext("insertReply"), obj); +} + +void InsertOp::validate(const InsertCommandRequest& insertOp) { + const auto& docs = insertOp.getDocuments(); + checkOpCountForCommand(insertOp, docs.size()); +} + +UpdateCommandRequest UpdateOp::parse(const OpMsgRequest& request) { + auto updateOp = UpdateCommandRequest::parse(IDLParserErrorContext("update"), request); + + checkOpCountForCommand(updateOp, updateOp.getUpdates().size()); + return updateOp; +} + +UpdateCommandReply UpdateOp::parseResponse(const BSONObj& obj) { + uassertStatusOK(getStatusFromCommandResult(obj)); + + return UpdateCommandReply::parse(IDLParserErrorContext("updateReply"), obj); +} + +void UpdateOp::validate(const UpdateCommandRequest& updateOp) { + checkOpCountForCommand(updateOp, updateOp.getUpdates().size()); +} + +FindAndModifyCommandReply FindAndModifyOp::parseResponse(const BSONObj& obj) { + uassertStatusOK(getStatusFromCommandResult(obj)); + + return FindAndModifyCommandReply::parse(IDLParserErrorContext("findAndModifyReply"), obj); +} + +DeleteCommandRequest DeleteOp::parse(const OpMsgRequest& request) { + auto deleteOp = DeleteCommandRequest::parse(IDLParserErrorContext("delete"), request); + + checkOpCountForCommand(deleteOp, deleteOp.getDeletes().size()); + return deleteOp; +} + +DeleteCommandReply DeleteOp::parseResponse(const BSONObj& obj) { + uassertStatusOK(getStatusFromCommandResult(obj)); + return DeleteCommandReply::parse(IDLParserErrorContext("deleteReply"), obj); +} + +void DeleteOp::validate(const DeleteCommandRequest& deleteOp) { + checkOpCountForCommand(deleteOp, deleteOp.getDeletes().size()); +} + } // namespace mongo diff --git a/src/mongo/db/ops/write_ops.h b/src/mongo/db/ops/write_ops.h index 5582147aba0..d78791fdfdc 100644 --- a/src/mongo/db/ops/write_ops.h +++ b/src/mongo/db/ops/write_ops.h @@ -103,5 +103,20 @@ const std::vector<BSONObj>& arrayFiltersOf(const T& opEntry) { return opEntry.getArrayFilters().get_value_or(emptyBSONArray); } +/** + * If the response from a write command contains any write errors, it will throw the first one. All + * the remaining errors will be disregarded. + * + * Usages of this utility for anything other than single-document writes would be suspicious due to + * the fact that it will swallow the remaining ones. + */ +void checkWriteErrors(const WriteCommandReplyBase& reply); + +template <class T> +T checkWriteErrors(T op) { + checkWriteErrors(op.getWriteCommandReplyBase()); + return std::move(op); +} + } // namespace write_ops } // namespace mongo diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index cf58747c7c5..41ee21b5346 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -64,6 +64,14 @@ types: serializer: "::mongo::write_ops::opTimeSerializerWithTermCheck" deserializer: "::mongo::write_ops::opTimeParser" + write_error: + bson_serialization_type: object + description: "Structure used to report a single error entry resulting from a batch write + command." + cpp_type: "mongo::write_ops::WriteError" + serializer: "mongo::write_ops::WriteError::serialize" + deserializer: "mongo::write_ops::WriteError::parse" + structs: WriteCommandReplyBase: @@ -88,7 +96,7 @@ structs: unstable: false writeErrors: description: "Contains all the errors encoutered." - type: array<object_owned> + type: array<write_error> optional: true unstable: false retriedStmtIds: @@ -119,6 +127,8 @@ structs: UpdateCommandReply: description: "Contains information related to update command reply." strict: false + chained_structs: + WriteCommandReplyBase: writeCommandReplyBase fields: upserted: description: "An array contains information about upserted documents." @@ -131,9 +141,6 @@ structs: default: 0 unstable: false - chained_structs: - WriteCommandReplyBase: writeCommandReplyBase - DeleteCommandReply: description: "Contains information related to delete command reply." strict: false diff --git a/src/mongo/db/ops/write_ops_parsers.h b/src/mongo/db/ops/write_ops_parsers.h index 797af279df7..8a012d19eeb 100644 --- a/src/mongo/db/ops/write_ops_parsers.h +++ b/src/mongo/db/ops/write_ops_parsers.h @@ -40,6 +40,7 @@ namespace mongo { namespace write_ops { + // Conservative per array element overhead. This value was calculated as 1 byte (element type) + 5 // bytes (max string encoding of the array index encoded as string and the maximum key is 99999) + 1 // byte (zero terminator) = 7 bytes @@ -203,5 +204,45 @@ private: _update; }; +/** + * Class to abstract the vagaries of how write errors are reported in write commands, which is not + * consistent between the different errors. Specifically, errors such as StaleShardVersion report + * their extraInfo in a field called errInfo, which is not consistent with how Status(es) are + * serialised and parsed. + * + * TODO (SERVER-63327): The purpose of this class is to unify that reporting in subsequent versions + * after which it can become a proper IDL type. + */ +class WriteError { +public: + static constexpr auto kIndexFieldName = "index"_sd; + static constexpr auto kCodeFieldName = "code"_sd; + static constexpr auto kErrmsgFieldName = "errmsg"_sd; + static constexpr auto kErrInfoFieldName = "errInfo"_sd; + + static WriteError parse(const BSONObj& obj); + BSONObj serialize() const; + + WriteError(int32_t index, Status status); + + int32_t getIndex() const { + return _index; + } + void setIndex(int32_t index) { + _index = index; + } + + const Status& getStatus() const { + return _status; + } + void setStatus(const Status& status) { + _status = status; + } + +private: + int32_t _index; + Status _status; +}; + } // namespace write_ops } // namespace mongo diff --git a/src/mongo/db/persistent_task_store.h b/src/mongo/db/persistent_task_store.h index e34fa209e66..8e59d6ed280 100644 --- a/src/mongo/db/persistent_task_store.h +++ b/src/mongo/db/persistent_task_store.h @@ -187,7 +187,7 @@ private: WriteConcerns::kMajorityWriteConcernShardingTimeout) { DBDirectClient dbClient(opCtx); - auto commandResponse = dbClient.update([&] { + auto commandResponse = write_ops::checkWriteErrors(dbClient.update([&] { write_ops::UpdateCommandRequest updateOp(_storageNss); auto updateModification = write_ops::UpdateModification::parseFromClassicUpdate(update); write_ops::UpdateOpEntry updateEntry(filter, updateModification); @@ -195,14 +195,7 @@ private: updateEntry.setUpsert(upsert); updateOp.setUpdates({updateEntry}); return updateOp; - }()); - - auto writeErrors = commandResponse.getWriteErrors(); - if (writeErrors) { - BSONObj firstWriteError = writeErrors->front(); - uasserted(ErrorCodes::Error(firstWriteError.getIntField("code")), - firstWriteError.getStringField("errmsg")); - } + }())); uassert(ErrorCodes::NoMatchingDocument, "No matching document found for query {} on namespace {}"_format( diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp index b07e0b3caec..01db33c2337 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.cpp @@ -42,6 +42,7 @@ #include "mongo/db/operation_time_tracker.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/future.h" diff --git a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h index 0d954fd1035..c61f654e844 100644 --- a/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/replica_set_node_process_interface.h @@ -32,7 +32,6 @@ #include "mongo/db/ops/write_ops_gen.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/pipeline/process_interface/non_shardsvr_process_interface.h" -#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { diff --git a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp index c6b93df26dc..5fd607714d1 100644 --- a/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp +++ b/src/mongo/db/s/balancer/balancer_defragmentation_policy_impl.cpp @@ -146,15 +146,6 @@ bool areMergeable(const ChunkType& firstChunk, SimpleBSONObjComparator::kInstance.evaluate(firstChunk.getMax() == secondChunk.getMin()); } -void checkForWriteErrors(const write_ops::UpdateCommandReply& response) { - const auto& writeErrors = response.getWriteErrors(); - if (writeErrors) { - BSONObj firstWriteError = writeErrors->front(); - uasserted(ErrorCodes::Error(firstWriteError.getIntField("code")), - firstWriteError.getStringField("errmsg")); - } -} - class MergeAndMeasureChunksPhase : public DefragmentationPhase { public: static std::unique_ptr<MergeAndMeasureChunksPhase> build(OperationContext* opCtx, @@ -1780,8 +1771,7 @@ void BalancerDefragmentationPolicyImpl::_persistPhaseUpdate(OperationContext* op << DefragmentationPhase_serializer(phase))))); return entry; }()}); - auto response = dbClient.update(updateOp); - checkForWriteErrors(response); + auto response = write_ops::checkWriteErrors(dbClient.update(updateOp)); uassert(ErrorCodes::NoMatchingDocument, "Collection {} not found while persisting phase change"_format(uuid.toString()), response.getN() > 0); @@ -1794,29 +1784,29 @@ void BalancerDefragmentationPolicyImpl::_persistPhaseUpdate(OperationContext* op void BalancerDefragmentationPolicyImpl::_clearDefragmentationState(OperationContext* opCtx, const UUID& uuid) { DBDirectClient dbClient(opCtx); + // Clear datasize estimates from chunks - write_ops::UpdateCommandRequest removeDataSize(ChunkType::ConfigNS); - removeDataSize.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(BSON(CollectionType::kUuidFieldName << uuid)); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( - BSON("$unset" << BSON(ChunkType::estimatedSizeBytes.name() << "")))); - entry.setMulti(true); - return entry; - }()}); - checkForWriteErrors(dbClient.update(removeDataSize)); + write_ops::checkWriteErrors(dbClient.update(write_ops::UpdateCommandRequest( + ChunkType::ConfigNS, {[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(CollectionType::kUuidFieldName << uuid)); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( + BSON("$unset" << BSON(ChunkType::estimatedSizeBytes.name() << "")))); + entry.setMulti(true); + return entry; + }()}))); + // Clear defragmentation phase and defragmenting flag from collection - write_ops::UpdateCommandRequest removeCollectionFlags(CollectionType::ConfigNS); - removeCollectionFlags.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(BSON(CollectionType::kUuidFieldName << uuid)); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate( - BSON("$unset" << BSON(CollectionType::kDefragmentCollectionFieldName - << "" << CollectionType::kDefragmentationPhaseFieldName << "")))); - return entry; - }()}); - auto response = dbClient.update(removeCollectionFlags); - checkForWriteErrors(response); + write_ops::checkWriteErrors(dbClient.update(write_ops::UpdateCommandRequest( + CollectionType::ConfigNS, {[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(BSON(CollectionType::kUuidFieldName << uuid)); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(BSON( + "$unset" << BSON(CollectionType::kDefragmentCollectionFieldName + << "" << CollectionType::kDefragmentationPhaseFieldName << "")))); + return entry; + }()}))); + WriteConcernResult ignoreResult; const auto latestOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); uassertStatusOK(waitForWriteConcern( diff --git a/src/mongo/db/s/config/configsvr_configure_collection_balancing.cpp b/src/mongo/db/s/config/configsvr_configure_collection_balancing.cpp index 784baeed259..3a025368c6d 100644 --- a/src/mongo/db/s/config/configsvr_configure_collection_balancing.cpp +++ b/src/mongo/db/s/config/configsvr_configure_collection_balancing.cpp @@ -45,7 +45,6 @@ #include "mongo/s/balancer_configuration.h" #include "mongo/s/grid.h" #include "mongo/s/request_types/configure_collection_balancing_gen.h" -#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { namespace { diff --git a/src/mongo/db/s/persistent_task_queue.h b/src/mongo/db/s/persistent_task_queue.h index 0c7e873786d..c1802278afe 100644 --- a/src/mongo/db/s/persistent_task_queue.h +++ b/src/mongo/db/s/persistent_task_queue.h @@ -134,16 +134,8 @@ TaskId PersistentTaskQueue<T>::push(OperationContext* opCtx, const T& t) { builder.append("_id", recordId); builder.append("task", t.toBSON()); - write_ops::InsertCommandRequest insertOp(_storageNss); - insertOp.setDocuments({builder.obj()}); - auto response = dbClient.insert(insertOp); - - if (response.getWriteErrors()) { - BSONObj firstWriteError = response.getWriteErrors()->front(); - uasserted(ErrorCodes::Error(firstWriteError.getIntField("code")), - firstWriteError.getStringField("errmsg")); - } - + auto response = write_ops::checkWriteErrors( + dbClient.insert(write_ops::InsertCommandRequest(_storageNss, {builder.obj()}))); _count++; } @@ -168,14 +160,7 @@ TaskId PersistentTaskQueue<T>::pop(OperationContext* opCtx) { write_ops::DeleteCommandRequest deleteOp(_storageNss); deleteOp.setDeletes({write_ops::DeleteOpEntry(builder.obj(), false)}); - auto response = client.remove(deleteOp); - auto writeErrors = response.getWriteErrors(); - if (writeErrors) { - BSONObj firstWriteError = writeErrors->front(); - uasserted(ErrorCodes::Error(firstWriteError.getIntField("code")), - firstWriteError.getStringField("errmsg")); - } - + write_ops::checkWriteErrors(client.remove(deleteOp)); _count--; TaskId id = _currentFront->id; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index f8ff97e1946..a24569ecc44 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -38,7 +38,6 @@ #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_tags.h" #include "mongo/s/shard_id.h" -#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/future.h" namespace mongo { diff --git a/src/mongo/db/s/resharding/resharding_util.h b/src/mongo/db/s/resharding/resharding_util.h index 021a5083a98..856d7cbb081 100644 --- a/src/mongo/db/s/resharding/resharding_util.h +++ b/src/mongo/db/s/resharding/resharding_util.h @@ -46,7 +46,6 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/resharding/common_types_gen.h" #include "mongo/s/shard_id.h" -#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/str.h" namespace mongo { diff --git a/src/mongo/db/s/set_allow_migrations_coordinator.cpp b/src/mongo/db/s/set_allow_migrations_coordinator.cpp index 2016b395226..1cf2edac166 100644 --- a/src/mongo/db/s/set_allow_migrations_coordinator.cpp +++ b/src/mongo/db/s/set_allow_migrations_coordinator.cpp @@ -35,8 +35,6 @@ #include "mongo/logv2/log.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/grid.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" namespace mongo { diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index ad79360157d..1f91e67abc9 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -51,7 +51,6 @@ #include "mongo/db/transaction_participant.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/scopeguard.h" @@ -151,61 +150,46 @@ int removeSessionsTransactionRecords(OperationContext* opCtx, // // We opt for this rather than performing the two sets of deletes in a single transaction simply // to reduce code complexity. - write_ops::DeleteCommandRequest imageDeleteOp(NamespaceString::kConfigImagesNamespace); - imageDeleteOp.setWriteCommandRequestBase([] { - write_ops::WriteCommandRequestBase base; - base.setOrdered(false); - return base; - }()); - imageDeleteOp.setDeletes([&] { - std::vector<write_ops::DeleteOpEntry> entries; - for (const auto& lsid : expiredSessionIds) { - entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()), - false /* multi = false */); - } - return entries; - }()); - - BatchedCommandResponse response; - std::string errmsg; - BSONObj result; - DBDirectClient client(opCtx); - client.runCommand( - NamespaceString::kConfigImagesNamespace.db().toString(), imageDeleteOp.toBSON({}), result); - - uassert(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse response " << result, - response.parseBSON(result, &errmsg)); - uassertStatusOK(response.getTopLevelStatus()); + write_ops::checkWriteErrors(client.remove([&] { + write_ops::DeleteCommandRequest imageDeleteOp(NamespaceString::kConfigImagesNamespace); + imageDeleteOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase base; + base.setOrdered(false); + return base; + }()); + imageDeleteOp.setDeletes([&] { + std::vector<write_ops::DeleteOpEntry> entries; + for (const auto& lsid : expiredSessionIds) { + entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()), + false /* multi = false */); + } + return entries; + }()); + return imageDeleteOp; + }())); // Remove the session ids from the on-disk catalog - write_ops::DeleteCommandRequest sessionDeleteOp( - NamespaceString::kSessionTransactionsTableNamespace); - sessionDeleteOp.setWriteCommandRequestBase([] { - write_ops::WriteCommandRequestBase base; - base.setOrdered(false); - return base; - }()); - sessionDeleteOp.setDeletes([&] { - std::vector<write_ops::DeleteOpEntry> entries; - for (const auto& lsid : expiredSessionIds) { - entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()), - false /* multi = false */); - } - return entries; - }()); - - - client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(), - sessionDeleteOp.toBSON({}), - result); + auto sessionDeleteReply = write_ops::checkWriteErrors(client.remove([&] { + write_ops::DeleteCommandRequest sessionDeleteOp( + NamespaceString::kSessionTransactionsTableNamespace); + sessionDeleteOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase base; + base.setOrdered(false); + return base; + }()); + sessionDeleteOp.setDeletes([&] { + std::vector<write_ops::DeleteOpEntry> entries; + for (const auto& lsid : expiredSessionIds) { + entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()), + false /* multi = false */); + } + return entries; + }()); + return sessionDeleteOp; + }())); - uassert(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse response " << result, - response.parseBSON(result, &errmsg)); - uassertStatusOK(response.getTopLevelStatus()); - return response.getN(); + return sessionDeleteReply.getN(); } void createTransactionTable(OperationContext* opCtx) { diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 17cf310e86b..697cd611cfc 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -43,7 +43,6 @@ #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/write_ops/batched_command_response.h" namespace mongo { namespace { diff --git a/src/mongo/s/chunk_manager_targeter.h b/src/mongo/s/chunk_manager_targeter.h index 5dfb08dc878..0950484eb7d 100644 --- a/src/mongo/s/chunk_manager_targeter.h +++ b/src/mongo/s/chunk_manager_targeter.h @@ -37,7 +37,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/ns_targeter.h" -#include "mongo/s/write_ops/batched_command_request.h" namespace mongo { diff --git a/src/mongo/s/client/shard.cpp b/src/mongo/s/client/shard.cpp index 5a94298bf82..ac360694af4 100644 --- a/src/mongo/s/client/shard.cpp +++ b/src/mongo/s/client/shard.cpp @@ -33,12 +33,9 @@ #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/operation_context.h" +#include "mongo/logv2/log.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" -#include "mongo/s/write_ops/batched_command_request.h" -#include "mongo/s/write_ops/batched_command_response.h" - -#include "mongo/logv2/log.h" namespace mongo { namespace { diff --git a/src/mongo/s/write_ops/batched_command_response_test.cpp b/src/mongo/s/write_ops/batched_command_response_test.cpp index 7088c2fbfe4..e11b42945d7 100644 --- a/src/mongo/s/write_ops/batched_command_response_test.cpp +++ b/src/mongo/s/write_ops/batched_command_response_test.cpp @@ -33,14 +33,13 @@ #include <string> #include "mongo/db/jsobj.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/s/write_ops/write_error_detail.h" #include "mongo/unittest/unittest.h" namespace mongo { - -using std::string; - namespace { TEST(BatchedCommandResponse, Basic) { @@ -66,7 +65,7 @@ TEST(BatchedCommandResponse, Basic) { << writeErrorsArray << BatchedCommandResponse::writeConcernError() << writeConcernError << "ok" << 1.0); - string errMsg; + std::string errMsg; BatchedCommandResponse response; bool ok = response.parseBSON(origResponseObj, &errMsg); ASSERT_TRUE(ok); @@ -186,5 +185,33 @@ TEST(BatchedCommandResponse, NoDuplicateErrInfo) { verifySingleErrInfo(elem.embeddedObject()); } } + +TEST(BatchedCommandResponse, CompatibilityFromWriteErrorToBatchCommandResponse) { + ChunkVersion versionReceived(1, 0, OID::gen(), Timestamp(2, 0)); + + write_ops::UpdateCommandReply reply; + reply.getWriteCommandReplyBase().setN(1); + reply.getWriteCommandReplyBase().setWriteErrors(std::vector<write_ops::WriteError>{ + write_ops::WriteError(1, + Status(StaleConfigInfo(NamespaceString("TestDB", "TestColl"), + versionReceived, + boost::none, + ShardId("TestShard")), + "Test stale config")), + }); + + BatchedCommandResponse response; + ASSERT_TRUE(response.parseBSON(reply.toBSON(), nullptr)); + ASSERT_EQ(1U, response.getErrDetails().size()); + ASSERT_EQ(ErrorCodes::StaleShardVersion, response.getErrDetailsAt(0)->toStatus().code()); + ASSERT_EQ("Test stale config", response.getErrDetailsAt(0)->toStatus().reason()); + auto staleInfo = + StaleConfigInfo::parseFromCommandError(response.getErrDetailsAt(0)->getErrInfo()); + ASSERT_EQ("TestDB.TestColl", staleInfo.getNss().ns()); + ASSERT_EQ(versionReceived, staleInfo.getVersionReceived()); + ASSERT(!staleInfo.getVersionWanted()); + ASSERT_EQ(ShardId("TestShard"), staleInfo.getShardId()); +} + } // namespace } // namespace mongo |