diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 177 |
1 files changed, 115 insertions, 62 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 86e2e49c775..9b63f16641c 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -319,45 +319,78 @@ boost::optional<std::pair<Status, bool>> checkFailUnorderedTimeseriesInsertFailP return boost::none; } -boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx, - const Status& status, - int index, - size_t numErrors) { +boost::optional<BSONObj> generateError(OperationContext* opCtx, + const Status& status, + int index, + size_t numErrors) { if (status.isOK()) { return boost::none; } - boost::optional<Status> overwrittenStatus; + auto errorMessage = [numErrors, errorSize = size_t(0)](StringData rawMessage) mutable { + // Start truncating error messages once both of these limits are exceeded. + constexpr size_t kErrorSizeTruncationMin = 1024 * 1024; + constexpr size_t kErrorCountTruncationMin = 2; + if (errorSize >= kErrorSizeTruncationMin && numErrors >= kErrorCountTruncationMin) { + return ""_sd; + } + + errorSize += rawMessage.size(); + return rawMessage; + }; - if (status == ErrorCodes::TenantMigrationConflict) { + BSONSizeTracker errorsSizeTracker; + BSONObjBuilder error(errorsSizeTracker); + error.append("index", index); + if (auto staleInfo = status.extraInfo<StaleConfigInfo>()) { + error.append("code", int(ErrorCodes::StaleShardVersion)); // Different from exception! + { + BSONObjBuilder errInfo(error.subobjStart("errInfo")); + staleInfo->serialize(&errInfo); + } + } else if (auto docValidationError = + status.extraInfo<doc_validation_error::DocumentValidationFailureInfo>()) { + error.append("code", static_cast<int>(ErrorCodes::DocumentValidationFailure)); + error.append("errInfo", docValidationError->getDetails()); + } else if (status.code() == ErrorCodes::TenantMigrationConflict) { hangWriteBeforeWaitingForMigrationDecision.pauseWhileSet(opCtx); - overwrittenStatus.emplace( - tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status)); + auto migrationStatus = + tenant_migration_access_blocker::handleTenantMigrationConflict(opCtx, status); // Interruption errors encountered during batch execution fail the entire batch, so throw on // such errors here for consistency. - if (ErrorCodes::isInterruption(*overwrittenStatus)) { - uassertStatusOK(*overwrittenStatus); + if (ErrorCodes::isInterruption(migrationStatus)) { + uassertStatusOK(migrationStatus); } - } - constexpr size_t kErrorCountTruncationMin = 2; - if (numErrors >= kErrorCountTruncationMin) - overwrittenStatus = - overwrittenStatus ? overwrittenStatus->withReason("") : status.withReason(""); + error.append("code", migrationStatus.code()); - if (overwrittenStatus) - return write_ops::WriteError(index, std::move(*overwrittenStatus)); - else - return write_ops::WriteError(index, status); + // We want to append an empty errmsg for the errors after the first one, so let the + // code below that appends errmsg do that. + if (status.reason() != "") { + error.append("errmsg", errorMessage(migrationStatus.reason())); + } + } else { + error.append("code", int(status.code())); + if (auto const extraInfo = status.extraInfo()) { + extraInfo->serialize(&error); + } + } + + // Skip appending errmsg if it has already been appended like in the case of + // TenantMigrationConflict. + if (!error.hasField("errmsg")) { + error.append("errmsg", errorMessage(status.reason())); + } + return error.obj(); } template <typename T> -boost::optional<write_ops::WriteError> generateError(OperationContext* opCtx, - const StatusWith<T>& result, - int index, - size_t numErrors) { +boost::optional<BSONObj> generateError(OperationContext* opCtx, + const StatusWith<T>& result, + int index, + size_t numErrors) { return generateError(opCtx, result.getStatus(), index, numErrors); } @@ -406,10 +439,11 @@ void populateReply(OperationContext* opCtx, } long long nVal = 0; - std::vector<write_ops::WriteError> errors; + std::vector<BSONObj> errors; + for (size_t i = 0; i < result.results.size(); ++i) { if (auto error = generateError(opCtx, result.results[i], i, errors.size())) { - errors.emplace_back(std::move(*error)); + errors.push_back(*error); continue; } @@ -422,13 +456,17 @@ void populateReply(OperationContext* opCtx, } auto& replyBase = cmdReply->getWriteCommandReplyBase(); - replyBase.setN(nVal); if (!result.retriedStmtIds.empty()) { - replyBase.setRetriedStmtIds(std::move(result.retriedStmtIds)); + replyBase.setRetriedStmtIds(result.retriedStmtIds); } + if (!errors.empty()) { - replyBase.setWriteErrors(std::move(errors)); + std::vector<write_ops::WriteError> writeErrors; + for (const auto& e : errors) { + writeErrors.emplace_back(write_ops::WriteError::parse(e)); + } + replyBase.setWriteErrors(std::move(writeErrors)); } // writeConcernError field is handled by command processor. @@ -446,6 +484,7 @@ void populateReply(OperationContext* opCtx, } } + // Call the called-defined post processing handler. if (hooks && hooks->postProcessHandler) hooks->postProcessHandler(); } @@ -737,7 +776,7 @@ public: size_t start, size_t index, std::vector<StmtId>&& stmtIds, - std::vector<write_ops::WriteError>* errors, + std::vector<BSONObj>* errors, boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, std::vector<size_t>* docsToRetry) const { @@ -762,7 +801,7 @@ public: _performTimeseriesInsert(opCtx, batch, metadata, std::move(stmtIds)); if (auto error = generateError(opCtx, output.result, start + index, errors->size())) { - errors->emplace_back(std::move(*error)); + errors->push_back(*error); bucketCatalog.abort(batch, output.result.getStatus()); batchGuard.dismiss(); return output.canContinue; @@ -777,7 +816,7 @@ public: _performTimeseriesUpdate(opCtx, batch, metadata, std::move(stmtIds)); if (auto error = generateError(opCtx, output.result, start + index, errors->size())) { - errors->emplace_back(std::move(*error)); + errors->push_back(*error); bucketCatalog.abort(batch, output.result.getStatus()); batchGuard.dismiss(); return output.canContinue; @@ -801,7 +840,7 @@ public: auto output = _performTimeseriesBucketCompression(opCtx, *closedBucket); if (auto error = generateError(opCtx, output.result, start + index, errors->size())) { - errors->emplace_back(std::move(*error)); + errors->push_back(*error); return output.canContinue; } } @@ -811,7 +850,7 @@ public: bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx, TimeseriesBatches* batches, TimeseriesStmtIds&& stmtIds, - std::vector<write_ops::WriteError>* errors, + std::vector<BSONObj>* errors, boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId) const { auto& bucketCatalog = BucketCatalog::get(opCtx); @@ -909,7 +948,7 @@ public: size_t start, size_t numDocs, const std::vector<size_t>& indices, - std::vector<write_ops::WriteError>* errors, + std::vector<BSONObj>* errors, bool* containsRetry) const { auto& bucketCatalog = BucketCatalog::get(opCtx); @@ -953,7 +992,7 @@ public: _canCombineTimeseriesInsertWithOtherClients(opCtx)); if (auto error = generateError(opCtx, result, start + index, errors->size())) { - errors->emplace_back(std::move(*error)); + errors->push_back(*error); return false; } else { const auto& batch = result.getValue().batch; @@ -977,7 +1016,7 @@ public: // Bucket compression only fail when we may not try to perform any other // write operation. When handleError() inside write_ops_exec.cpp return // false. - errors->emplace_back(std::move(*error)); + errors->push_back(*error); canContinue = false; return false; } @@ -1003,16 +1042,28 @@ public: canContinue}; } + BSONObj _cloneErrorWithIndex(BSONObj error, size_t index) const { + BSONObjBuilder bob; + for (auto&& elem : error) { + if (elem.fieldNameStringData() == "index") { + bob.append("index", static_cast<int>(index)); + } else { + bob.append(elem); + } + } + return bob.obj(); + } + void _getTimeseriesBatchResults(OperationContext* opCtx, const TimeseriesBatches& batches, size_t start, size_t indexOfLastProcessedBatch, bool canContinue, - std::vector<write_ops::WriteError>* errors, + std::vector<BSONObj>* errors, boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, std::vector<size_t>* docsToRetry = nullptr) const { - boost::optional<write_ops::WriteError> lastError; + boost::optional<BSONObj> lastError; if (!errors->empty()) { lastError = errors->back(); } @@ -1033,7 +1084,7 @@ public: 6023100, "there should be at least one error if the batch processing exited early", lastError); - errors->emplace_back(start + index, lastError->getStatus()); + errors->push_back(_cloneErrorWithIndex(*lastError, start + index)); continue; } @@ -1045,7 +1096,7 @@ public: } if (auto error = generateError( opCtx, swCommitInfo.getStatus(), start + index, errors->size())) { - errors->emplace_back(std::move(*error)); + errors->push_back(*error); continue; } @@ -1062,14 +1113,14 @@ public: // error. if (!canContinue && docsToRetry) { for (auto&& index : *docsToRetry) { - errors->emplace_back(start + index, lastError->getStatus()); + errors->push_back(_cloneErrorWithIndex(*lastError, start + index)); } docsToRetry->clear(); } } bool _performOrderedTimeseriesWritesAtomically(OperationContext* opCtx, - std::vector<write_ops::WriteError>* errors, + std::vector<BSONObj>* errors, boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, bool* containsRetry) const { @@ -1098,7 +1149,7 @@ public: * Returns the number of documents that were inserted. */ size_t _performOrderedTimeseriesWrites(OperationContext* opCtx, - std::vector<write_ops::WriteError>* errors, + std::vector<BSONObj>* errors, boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, bool* containsRetry) const { @@ -1124,15 +1175,14 @@ public: * can be passed as the 'indices' parameter in a subsequent call to this function, in order * to to be retried. */ - std::vector<size_t> _performUnorderedTimeseriesWrites( - OperationContext* opCtx, - size_t start, - size_t numDocs, - const std::vector<size_t>& indices, - std::vector<write_ops::WriteError>* errors, - boost::optional<repl::OpTime>* opTime, - boost::optional<OID>* electionId, - bool* containsRetry) const { + std::vector<size_t> _performUnorderedTimeseriesWrites(OperationContext* opCtx, + size_t start, + size_t numDocs, + const std::vector<size_t>& indices, + std::vector<BSONObj>* errors, + boost::optional<repl::OpTime>* opTime, + boost::optional<OID>* electionId, + bool* containsRetry) const { auto [batches, bucketStmtIds, _, canContinue] = _insertIntoBucketCatalog(opCtx, start, numDocs, indices, errors, containsRetry); @@ -1176,14 +1226,13 @@ public: return docsToRetry; } - void _performUnorderedTimeseriesWritesWithRetries( - OperationContext* opCtx, - size_t start, - size_t numDocs, - std::vector<write_ops::WriteError>* errors, - boost::optional<repl::OpTime>* opTime, - boost::optional<OID>* electionId, - bool* containsRetry) const { + void _performUnorderedTimeseriesWritesWithRetries(OperationContext* opCtx, + size_t start, + size_t numDocs, + std::vector<BSONObj>* errors, + boost::optional<repl::OpTime>* opTime, + boost::optional<OID>* electionId, + bool* containsRetry) const { std::vector<size_t> docsToRetry; do { docsToRetry = _performUnorderedTimeseriesWrites( @@ -1222,7 +1271,7 @@ public: curOp.debug().additiveMetrics.ninserted = 0; } - std::vector<write_ops::WriteError> errors; + std::vector<BSONObj> errors; boost::optional<repl::OpTime> opTime; boost::optional<OID> electionId; bool containsRetry = false; @@ -1245,7 +1294,11 @@ public: } if (!errors.empty()) { - baseReply.setWriteErrors(std::move(errors)); + std::vector<write_ops::WriteError> writeErrors; + for (const auto& e : errors) { + writeErrors.emplace_back(write_ops::WriteError::parse(e)); + } + baseReply.setWriteErrors(std::move(writeErrors)); } if (opTime) { baseReply.setOpTime(*opTime); |