summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2022-02-26 08:31:49 +0100
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-26 07:58:31 +0000
commitfcad5cd7a9267980fefda51b1e4a3db0a12000ec (patch)
treeccebaaea6a52422e5c887ddf55a49224a9b47f82 /src/mongo
parent3703e0ec90ebed2d332720b20dafa6a5bcb72a23 (diff)
downloadmongo-fcad5cd7a9267980fefda51b1e4a3db0a12000ec.tar.gz
Revert "SERVER-63331 Remove unnecessary conversion between BSON and WriteError IDL"
This reverts commit 8aca92da9115a723c9f4c16be96c64cdb4b8362b.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/write_commands.cpp177
1 files changed, 115 insertions, 62 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 86e2e49c775..9b63f16641c 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -319,45 +319,78 @@ boost::optional<std::pair<Status, bool>> checkFailUnorderedTimeseriesInsertFailP
return boost::none;
}
-boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx,
- const Status& status,
- int index,
- size_t numErrors) {
+boost::optional<BSONObj> generateError(OperationContext* opCtx,
+ const Status& status,
+ int index,
+ size_t numErrors) {
if (status.isOK()) {
return boost::none;
}
- boost::optional<Status> overwrittenStatus;
+ auto errorMessage = [numErrors, errorSize = size_t(0)](StringData rawMessage) mutable {
+ // Start truncating error messages once both of these limits are exceeded.
+ constexpr size_t kErrorSizeTruncationMin = 1024 * 1024;
+ constexpr size_t kErrorCountTruncationMin = 2;
+ if (errorSize >= kErrorSizeTruncationMin && numErrors >= kErrorCountTruncationMin) {
+ return ""_sd;
+ }
+
+ errorSize += rawMessage.size();
+ return rawMessage;
+ };
- if (status == ErrorCodes::TenantMigrationConflict) {
+ BSONSizeTracker errorsSizeTracker;
+ BSONObjBuilder error(errorsSizeTracker);
+ error.append("index", index);
+ if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) {
+ error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception!
+ {
+ BSONObjBuilder errInfo(error.subobjStart("errInfo"));
+ staleInfo->serialize(&errInfo);
+ }
+ } else if (auto docValidationError =
+ status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>()) {
+ error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure));
+ error.append("errInfo", docValidationError->getDetails());
+ } else if (status.code() == ErrorCodes::TenantMigrationConflict) {
hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx);
- overwrittenStatus.emplace(
- tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status));
+ auto migrationStatus =
+ tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status);
// Interruption errors encountered during batch execution fail the entire batch, so throw on
// such errors here for consistency.
- if (ErrorCodes::isInterruption(*overwrittenStatus)) {
- uassertStatusOK(*overwrittenStatus);
+ if (ErrorCodes::isInterruption(migrationStatus)) {
+ uassertStatusOK(migrationStatus);
}
- }
- constexpr size_t kErrorCountTruncationMin = 2;
- if (numErrors >= kErrorCountTruncationMin)
- overwrittenStatus =
- overwrittenStatus ? overwrittenStatus->withReason("") : status.withReason("");
+ error.append("code", migrationStatus.code());
- if (overwrittenStatus)
- return write_ops::WriteError(index, std::move(*overwrittenStatus));
- else
- return write_ops::WriteError(index, status);
+ // We want to append an empty errmsg for the errors after the first one, so let the
+ // code below that appends errmsg do that.
+ if (status.reason() != "") {
+ error.append("errmsg", errorMessage(migrationStatus.reason()));
+ }
+ } else {
+ error.append("code", int(status.code()));
+ if (auto const extraInfo = status.extraInfo()) {
+ extraInfo->serialize(&error);
+ }
+ }
+
+ // Skip appending errmsg if it has already been appended like in the case of
+ // TenantMigrationConflict.
+ if (!error.hasField("errmsg")) {
+ error.append("errmsg", errorMessage(status.reason()));
+ }
+ return error.obj();
}
template <typename T>
-boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx,
- const StatusWith<T>& result,
- int index,
- size_t numErrors) {
+boost::optional<BSONObj> generateError(OperationContext* opCtx,
+ const StatusWith<T>& result,
+ int index,
+ size_t numErrors) {
return generateError(opCtx, result.getStatus(), index, numErrors);
}
@@ -406,10 +439,11 @@ void populateReply(OperationContext* opCtx,
}
long long nVal = 0;
- std::vector<write_ops::WriteError> errors;
+ std::vector<BSONObj> errors;
+
for (size_t i = 0; i < result.results.size(); ++i) {
if (auto error = generateError(opCtx, result.results[i], i, errors.size())) {
- errors.emplace_back(std::move(*error));
+ errors.push_back(*error);
continue;
}
@@ -422,13 +456,17 @@ void populateReply(OperationContext* opCtx,
}
auto& replyBase = cmdReply->getWriteCommandReplyBase();
-
replyBase.setN(nVal);
if (!result.retriedStmtIds.empty()) {
- replyBase.setRetriedStmtIds(std::move(result.retriedStmtIds));
+ replyBase.setRetriedStmtIds(result.retriedStmtIds);
}
+
if (!errors.empty()) {
- replyBase.setWriteErrors(std::move(errors));
+ std::vector<write_ops::WriteError> writeErrors;
+ for (const auto& e : errors) {
+ writeErrors.emplace_back(write_ops::WriteError::parse(e));
+ }
+ replyBase.setWriteErrors(std::move(writeErrors));
}
// writeConcernError field is handled by command processor.
@@ -446,6 +484,7 @@ void populateReply(OperationContext* opCtx,
}
}
+ // Call the called-defined post processing handler.
if (hooks && hooks->postProcessHandler)
hooks->postProcessHandler();
}
@@ -737,7 +776,7 @@ public:
size_t start,
size_t index,
std::vector<StmtId>&& stmtIds,
- std::vector<write_ops::WriteError>* errors,
+ std::vector<BSONObj>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
std::vector<size_t>* docsToRetry) const {
@@ -762,7 +801,7 @@ public:
_performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds));
if (auto error =
generateError(opCtx, output.result, start + index, errors->size())) {
- errors->emplace_back(std::move(*error));
+ errors->push_back(*error);
bucketCatalog.abort(batch, output.result.getStatus());
batchGuard.dismiss();
return output.canContinue;
@@ -777,7 +816,7 @@ public:
_performTimeseriesUpdate(opCtx, batch, metadata, std::move(stmtIds));
if (auto error =
generateError(opCtx, output.result, start + index, errors->size())) {
- errors->emplace_back(std::move(*error));
+ errors->push_back(*error);
bucketCatalog.abort(batch, output.result.getStatus());
batchGuard.dismiss();
return output.canContinue;
@@ -801,7 +840,7 @@ public:
auto output = _performTimeseriesBucketCompression(opCtx, *closedBucket);
if (auto error =
generateError(opCtx, output.result, start + index, errors->size())) {
- errors->emplace_back(std::move(*error));
+ errors->push_back(*error);
return output.canContinue;
}
}
@@ -811,7 +850,7 @@ public:
bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx,
TimeseriesBatches* batches,
TimeseriesStmtIds&& stmtIds,
- std::vector<write_ops::WriteError>* errors,
+ std::vector<BSONObj>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
@@ -909,7 +948,7 @@ public:
size_t start,
size_t numDocs,
const std::vector<size_t>& indices,
- std::vector<write_ops::WriteError>* errors,
+ std::vector<BSONObj>* errors,
bool* containsRetry) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
@@ -953,7 +992,7 @@ public:
_canCombineTimeseriesInsertWithOtherClients(opCtx));
if (auto error = generateError(opCtx, result, start + index, errors->size())) {
- errors->emplace_back(std::move(*error));
+ errors->push_back(*error);
return false;
} else {
const auto& batch = result.getValue().batch;
@@ -977,7 +1016,7 @@ public:
// Bucket compression only fail when we may not try to perform any other
// write operation. When handleError() inside write_ops_exec.cpp return
// false.
- errors->emplace_back(std::move(*error));
+ errors->push_back(*error);
canContinue = false;
return false;
}
@@ -1003,16 +1042,28 @@ public:
canContinue};
}
+ BSONObj _cloneErrorWithIndex(BSONObj error, size_t index) const {
+ BSONObjBuilder bob;
+ for (auto&& elem : error) {
+ if (elem.fieldNameStringData() == "index") {
+ bob.append("index", static_cast<int>(index));
+ } else {
+ bob.append(elem);
+ }
+ }
+ return bob.obj();
+ }
+
void _getTimeseriesBatchResults(OperationContext* opCtx,
const TimeseriesBatches& batches,
size_t start,
size_t indexOfLastProcessedBatch,
bool canContinue,
- std::vector<write_ops::WriteError>* errors,
+ std::vector<BSONObj>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
std::vector<size_t>* docsToRetry = nullptr) const {
- boost::optional<write_ops::WriteError> lastError;
+ boost::optional<BSONObj> lastError;
if (!errors->empty()) {
lastError = errors->back();
}
@@ -1033,7 +1084,7 @@ public:
6023100,
"there should be at least one error if the batch processing exited early",
lastError);
- errors->emplace_back(start + index, lastError->getStatus());
+ errors->push_back(_cloneErrorWithIndex(*lastError, start + index));
continue;
}
@@ -1045,7 +1096,7 @@ public:
}
if (auto error = generateError(
opCtx, swCommitInfo.getStatus(), start + index, errors->size())) {
- errors->emplace_back(std::move(*error));
+ errors->push_back(*error);
continue;
}
@@ -1062,14 +1113,14 @@ public:
// error.
if (!canContinue && docsToRetry) {
for (auto&& index : *docsToRetry) {
- errors->emplace_back(start + index, lastError->getStatus());
+ errors->push_back(_cloneErrorWithIndex(*lastError, start + index));
}
docsToRetry->clear();
}
}
bool _performOrderedTimeseriesWritesAtomically(OperationContext* opCtx,
- std::vector<write_ops::WriteError>* errors,
+ std::vector<BSONObj>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
@@ -1098,7 +1149,7 @@ public:
* Returns the number of documents that were inserted.
*/
size_t _performOrderedTimeseriesWrites(OperationContext* opCtx,
- std::vector<write_ops::WriteError>* errors,
+ std::vector<BSONObj>* errors,
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
@@ -1124,15 +1175,14 @@ public:
* can be passed as the 'indices' parameter in a subsequent call to this function, in order
* to to be retried.
*/
- std::vector<size_t> _performUnorderedTimeseriesWrites(
- OperationContext* opCtx,
- size_t start,
- size_t numDocs,
- const std::vector<size_t>& indices,
- std::vector<write_ops::WriteError>* errors,
- boost::optional<repl::OpTime>* opTime,
- boost::optional<OID>* electionId,
- bool* containsRetry) const {
+ std::vector<size_t> _performUnorderedTimeseriesWrites(OperationContext* opCtx,
+ size_t start,
+ size_t numDocs,
+ const std::vector<size_t>& indices,
+ std::vector<BSONObj>* errors,
+ boost::optional<repl::OpTime>* opTime,
+ boost::optional<OID>* electionId,
+ bool* containsRetry) const {
auto [batches, bucketStmtIds, _, canContinue] =
_insertIntoBucketCatalog(opCtx, start, numDocs, indices, errors, containsRetry);
@@ -1176,14 +1226,13 @@ public:
return docsToRetry;
}
- void _performUnorderedTimeseriesWritesWithRetries(
- OperationContext* opCtx,
- size_t start,
- size_t numDocs,
- std::vector<write_ops::WriteError>* errors,
- boost::optional<repl::OpTime>* opTime,
- boost::optional<OID>* electionId,
- bool* containsRetry) const {
+ void _performUnorderedTimeseriesWritesWithRetries(OperationContext* opCtx,
+ size_t start,
+ size_t numDocs,
+ std::vector<BSONObj>* errors,
+ boost::optional<repl::OpTime>* opTime,
+ boost::optional<OID>* electionId,
+ bool* containsRetry) const {
std::vector<size_t> docsToRetry;
do {
docsToRetry = _performUnorderedTimeseriesWrites(
@@ -1222,7 +1271,7 @@ public:
curOp.debug().additiveMetrics.ninserted = 0;
}
- std::vector<write_ops::WriteError> errors;
+ std::vector<BSONObj> errors;
boost::optional<repl::OpTime> opTime;
boost::optional<OID> electionId;
bool containsRetry = false;
@@ -1245,7 +1294,11 @@ public:
}
if (!errors.empty()) {
- baseReply.setWriteErrors(std::move(errors));
+ std::vector<write_ops::WriteError> writeErrors;
+ for (const auto& e : errors) {
+ writeErrors.emplace_back(write_ops::WriteError::parse(e));
+ }
+ baseReply.setWriteErrors(std::move(writeErrors));
}
if (opTime) {
baseReply.setOpTime(*opTime);