summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRui Liu <rui.liu@mongodb.com>2022-03-03 14:49:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-03 16:43:44 +0000
commita4a6069c61519fb4473d3c9d77276476f19836c9 (patch)
tree03c8e6a78f408b3d3bb6794b23c3b813349eb33a
parent2041c2351fef9734e0b6452b1f459d590de9bc2d (diff)
downloadmongo-a4a6069c61519fb4473d3c9d77276476f19836c9.tar.gz
SERVER-64110 Stop retrying unordered writes after committing ordered writes hits a non-continuable error
(cherry picked from commit d53632893e0a3915a63e54fa48b55813636eea0b)
-rw-r--r--src/mongo/db/commands/write_commands.cpp69
1 files changed, 43 insertions, 26 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 5f84b93af1e..bd8d56a80ca 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -843,12 +843,19 @@ public:
return true;
}
- bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx,
- TimeseriesBatches* batches,
- TimeseriesStmtIds&& stmtIds,
- std::vector<BSONObj>* errors,
- boost::optional<repl::OpTime>* opTime,
- boost::optional<OID>* electionId) const {
+ enum struct TimeseriesAtomicWriteResult {
+ kSuccess,
+ kContinuableError,
+ kNonContinuableError,
+ };
+
+ TimeseriesAtomicWriteResult _commitTimeseriesBucketsAtomically(
+ OperationContext* opCtx,
+ TimeseriesBatches* batches,
+ TimeseriesStmtIds&& stmtIds,
+ std::vector<BSONObj>* errors,
+ boost::optional<repl::OpTime>* opTime,
+ boost::optional<OID>* electionId) const {
auto& bucketCatalog = BucketCatalog::get(opCtx);
std::vector<std::reference_wrapper<std::shared_ptr<BucketCatalog::WriteBatch>>>
@@ -861,7 +868,7 @@ public:
}
if (batchesToCommit.empty()) {
- return true;
+ return TimeseriesAtomicWriteResult::kSuccess;
}
// Sort by bucket so that preparing the commit for each batch cannot deadlock.
@@ -884,7 +891,7 @@ public:
for (auto batch : batchesToCommit) {
auto metadata = bucketCatalog.getMetadata(batch.get()->bucketId());
if (!bucketCatalog.prepareCommit(batch)) {
- return false;
+ return TimeseriesAtomicWriteResult::kContinuableError;
}
if (batch.get()->numPreviouslyCommittedMeasurements() == 0) {
@@ -902,7 +909,7 @@ public:
write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps);
if (!result.isOK()) {
abortStatus = result;
- return false;
+ return TimeseriesAtomicWriteResult::kContinuableError;
}
getOpTimeAndElectionId(opCtx, opTime, electionId);
@@ -928,12 +935,12 @@ public:
if (!ret.result.isOK()) {
abortStatus = ret.result.getStatus();
}
- return false;
+ return TimeseriesAtomicWriteResult::kNonContinuableError;
}
}
batchGuard.dismiss();
- return true;
+ return TimeseriesAtomicWriteResult::kSuccess;
}
std::tuple<TimeseriesBatches,
@@ -1115,30 +1122,30 @@ public:
}
}
- bool _performOrderedTimeseriesWritesAtomically(OperationContext* opCtx,
- std::vector<BSONObj>* errors,
- boost::optional<repl::OpTime>* opTime,
- boost::optional<OID>* electionId,
- bool* containsRetry) const {
+ TimeseriesAtomicWriteResult _performOrderedTimeseriesWritesAtomically(
+ OperationContext* opCtx,
+ std::vector<BSONObj>* errors,
+ boost::optional<repl::OpTime>* opTime,
+ boost::optional<OID>* electionId,
+ bool* containsRetry) const {
auto [batches, stmtIds, numInserted, canContinue] = _insertIntoBucketCatalog(
opCtx, 0, request().getDocuments().size(), {}, errors, containsRetry);
if (!canContinue) {
- // If we are not allowed to continue with any write operation return true here to
- // prevent the ordered inserts from being retried one by one.
- return true;
+ return TimeseriesAtomicWriteResult::kNonContinuableError;
}
hangTimeseriesInsertBeforeCommit.pauseWhileSet();
- if (!_commitTimeseriesBucketsAtomically(
- opCtx, &batches, std::move(stmtIds), errors, opTime, electionId)) {
- return false;
+ auto result = _commitTimeseriesBucketsAtomically(
+ opCtx, &batches, std::move(stmtIds), errors, opTime, electionId);
+ if (result != TimeseriesAtomicWriteResult::kSuccess) {
+ return result;
}
_getTimeseriesBatchResults(
opCtx, batches, 0, batches.size(), true, errors, opTime, electionId);
- return true;
+ return TimeseriesAtomicWriteResult::kSuccess;
}
/**
@@ -1149,9 +1156,19 @@ public:
boost::optional<repl::OpTime>* opTime,
boost::optional<OID>* electionId,
bool* containsRetry) const {
- if (_performOrderedTimeseriesWritesAtomically(
- opCtx, errors, opTime, electionId, containsRetry)) {
- return request().getDocuments().size();
+ auto result = _performOrderedTimeseriesWritesAtomically(
+ opCtx, errors, opTime, electionId, containsRetry);
+ switch (result) {
+ case TimeseriesAtomicWriteResult::kSuccess:
+ return request().getDocuments().size();
+ case TimeseriesAtomicWriteResult::kNonContinuableError:
+ // If we can't continue, we know that 0 were inserted since this function should
+ // guarantee that the inserts are atomic.
+ return 0;
+ case TimeseriesAtomicWriteResult::kContinuableError:
+ break;
+ default:
+ MONGO_UNREACHABLE;
}
for (size_t i = 0; i < request().getDocuments().size(); ++i) {