diff options
author | Rui Liu <rui.liu@mongodb.com> | 2022-03-03 14:49:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-03 16:08:32 +0000 |
commit | 340e8998c2ff2058e4c8cbaff599acf34e38876b (patch) | |
tree | fd0f6d134ca6d6ff5345b1be70ff6c82b9ea18e0 | |
parent | 9fb59e90fdb97de986174f745bd76747ece5bc40 (diff) | |
download | mongo-340e8998c2ff2058e4c8cbaff599acf34e38876b.tar.gz |
SERVER-64110 Stop retrying unordered writes after committing ordered writes hits a non-continuable error
(cherry picked from commit d53632893e0a3915a63e54fa48b55813636eea0b)
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 69 |
1 files changed, 43 insertions, 26 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 865a69dbc7b..7a0a9f863da 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -846,12 +846,19 @@ public: return true; } - bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx, - TimeseriesBatches* batches, - TimeseriesStmtIds&& stmtIds, - std::vector<BSONObj>* errors, - boost::optional<repl::OpTime>* opTime, - boost::optional<OID>* electionId) const { + enum struct TimeseriesAtomicWriteResult { + kSuccess, + kContinuableError, + kNonContinuableError, + }; + + TimeseriesAtomicWriteResult _commitTimeseriesBucketsAtomically( + OperationContext* opCtx, + TimeseriesBatches* batches, + TimeseriesStmtIds&& stmtIds, + std::vector<BSONObj>* errors, + boost::optional<repl::OpTime>* opTime, + boost::optional<OID>* electionId) const { auto& bucketCatalog = BucketCatalog::get(opCtx); std::vector<std::reference_wrapper<std::shared_ptr<BucketCatalog::WriteBatch>>> @@ -864,7 +871,7 @@ public: } if (batchesToCommit.empty()) { - return true; + return TimeseriesAtomicWriteResult::kSuccess; } // Sort by bucket so that preparing the commit for each batch cannot deadlock. @@ -887,7 +894,7 @@ public: for (auto batch : batchesToCommit) { auto metadata = bucketCatalog.getMetadata(batch.get()->bucket()); if (!bucketCatalog.prepareCommit(batch)) { - return false; + return TimeseriesAtomicWriteResult::kContinuableError; } if (batch.get()->numPreviouslyCommittedMeasurements() == 0) { @@ -905,7 +912,7 @@ public: write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps); if (!result.isOK()) { abortStatus = result; - return false; + return TimeseriesAtomicWriteResult::kContinuableError; } getOpTimeAndElectionId(opCtx, opTime, electionId); @@ -931,12 +938,12 @@ public: if (!ret.result.isOK()) { abortStatus = ret.result.getStatus(); } - return false; + return TimeseriesAtomicWriteResult::kNonContinuableError; } } batchGuard.dismiss(); - return true; + return TimeseriesAtomicWriteResult::kSuccess; } std::tuple<TimeseriesBatches, @@ -1118,30 +1125,30 @@ public: } } - bool _performOrderedTimeseriesWritesAtomically(OperationContext* opCtx, - std::vector<BSONObj>* errors, - boost::optional<repl::OpTime>* opTime, - boost::optional<OID>* electionId, - bool* containsRetry) const { + TimeseriesAtomicWriteResult _performOrderedTimeseriesWritesAtomically( + OperationContext* opCtx, + std::vector<BSONObj>* errors, + boost::optional<repl::OpTime>* opTime, + boost::optional<OID>* electionId, + bool* containsRetry) const { auto [batches, stmtIds, numInserted, canContinue] = _insertIntoBucketCatalog( opCtx, 0, request().getDocuments().size(), {}, errors, containsRetry); if (!canContinue) { - // If we are not allowed to continue with any write operation return true here to - // prevent the ordered inserts from being retried one by one. - return true; + return TimeseriesAtomicWriteResult::kNonContinuableError; } hangTimeseriesInsertBeforeCommit.pauseWhileSet(); - if (!_commitTimeseriesBucketsAtomically( - opCtx, &batches, std::move(stmtIds), errors, opTime, electionId)) { - return false; + auto result = _commitTimeseriesBucketsAtomically( + opCtx, &batches, std::move(stmtIds), errors, opTime, electionId); + if (result != TimeseriesAtomicWriteResult::kSuccess) { + return result; } _getTimeseriesBatchResults( opCtx, batches, 0, batches.size(), true, errors, opTime, electionId); - return true; + return TimeseriesAtomicWriteResult::kSuccess; } /** @@ -1152,9 +1159,19 @@ public: boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, bool* containsRetry) const { - if (_performOrderedTimeseriesWritesAtomically( - opCtx, errors, opTime, electionId, containsRetry)) { - return request().getDocuments().size(); + auto result = _performOrderedTimeseriesWritesAtomically( + opCtx, errors, opTime, electionId, containsRetry); + switch (result) { + case TimeseriesAtomicWriteResult::kSuccess: + return request().getDocuments().size(); + case TimeseriesAtomicWriteResult::kNonContinuableError: + // If we can't continue, we know that 0 were inserted since this function should + // guarantee that the inserts are atomic. + return 0; + case TimeseriesAtomicWriteResult::kContinuableError: + break; + default: + MONGO_UNREACHABLE; } for (size_t i = 0; i < request().getDocuments().size(); ++i) { |