summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/write_commands.cpp23
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp31
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h2
3 files changed, 32 insertions, 24 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index d794025b2c0..9be0940221a 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -770,10 +770,11 @@ public:
invariant(batch->finished());
auto batchStatus = batch->getResult().getStatus();
tassert(5916402,
- str::stream() << "Got unexpected error (" << batch->getResult().getStatus()
+ str::stream() << "Got unexpected error (" << batchStatus
<< ") preparing time-series bucket to be committed for "
<< ns() << ": " << redact(request().toBSON({})),
batchStatus == ErrorCodes::TimeseriesBucketCleared ||
+ batchStatus.isA<ErrorCategory::Interruption>() ||
batchStatus.isA<ErrorCategory::StaleShardVersionError>());
docsToRetry->push_back(index);
@@ -863,15 +864,21 @@ public:
return left.get()->bucket() < right.get()->bucket();
});
+ boost::optional<Status> abortStatus;
+ ScopeGuard batchGuard{[&] {
+ for (auto batch : batchesToCommit) {
+ if (batch.get()) {
+ bucketCatalog.abort(batch, abortStatus);
+ }
+ }
+ }};
+
std::vector<write_ops::InsertCommandRequest> insertOps;
std::vector<write_ops::UpdateCommandRequest> updateOps;
for (auto batch : batchesToCommit) {
auto metadata = bucketCatalog.getMetadata(batch.get()->bucket());
if (!bucketCatalog.prepareCommit(batch)) {
- for (auto batchToAbort : batchesToCommit) {
- bucketCatalog.abort(batchToAbort);
- }
return false;
}
@@ -889,9 +896,7 @@ public:
auto result =
write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps);
if (!result.isOK()) {
- for (auto batch : batchesToCommit) {
- bucketCatalog.abort(batch, result);
- }
+ abortStatus = result;
return false;
}
@@ -915,10 +920,14 @@ public:
compressClosedBuckets = false;
}
if (!ret.canContinue) {
+ if (!ret.result.isOK()) {
+ abortStatus = ret.result.getStatus();
+ }
return false;
}
}
+ batchGuard.dismiss();
return true;
}
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 3b5da81a3bb..c6fecf6f47b 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -299,8 +299,6 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
batch->_prepareCommit();
_memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage);
- bucket->_batches.erase(batch->_opId);
-
return true;
}
@@ -351,18 +349,20 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
bucket.release();
auto lk = _lockExclusive();
- closedBucket =
- ClosedBucket{ptr->_id, ptr->getTimeField().toString(), ptr->numMeasurements()};
-
- // Only remove from _allBuckets and _idleBuckets. If it was marked full, we know that
- // happened in BucketAccess::rollover, and that there is already a new open bucket for
- // this metadata.
- _markBucketNotIdle(ptr, false /* locked */);
- {
- stdx::lock_guard statesLk{_statesMutex};
- _bucketStates.erase(ptr->_id);
+ if (_allBuckets.contains(ptr)) {
+ closedBucket =
+ ClosedBucket{ptr->_id, ptr->getTimeField().toString(), ptr->numMeasurements()};
+
+ // Only remove from _allBuckets and _idleBuckets. If it was marked full, we know
+ // that happened in BucketAccess::rollover, and that there is already a new open
+ // bucket for this metadata.
+ _markBucketNotIdle(ptr, false /* locked */);
+ {
+ stdx::lock_guard statesLk{_statesMutex};
+ _bucketStates.erase(ptr->_id);
+ }
+ _allBuckets.erase(ptr);
}
- _allBuckets.erase(ptr);
} else {
_markBucketIdle(bucket);
}
@@ -486,7 +486,7 @@ BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const
void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) {
while (true) {
BucketAccess bucket{this, batch->bucket()};
- if (!bucket) {
+ if (!bucket || batch->finished()) {
return;
}
@@ -494,6 +494,7 @@ void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch)
if (!current) {
// No other batches for this bucket are currently committing, so we can proceed.
bucket->_preparedBatch = batch;
+ bucket->_batches.erase(batch->_opId);
break;
}
@@ -1239,7 +1240,6 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) {
invariant(_commitRights.load());
invariant(!_active);
_promise.emplaceValue(info);
- _bucket = nullptr;
}
void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status,
@@ -1257,7 +1257,6 @@ void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status,
_promise.setError(status.value_or(
Status{ErrorCodes::TimeseriesBucketCleared,
str::stream() << "Time-series bucket " << bucketIdentification << "was cleared"}));
- _bucket = nullptr;
}
class BucketCatalog::ServerStatus : public ServerStatusSection {
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index 2c2b1994443..9936a9be4c3 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -158,7 +158,7 @@ public:
void _abort(const boost::optional<Status>& status, bool canAccessBucket);
- Bucket* _bucket;
+ Bucket* const _bucket;
OperationId _opId;
std::shared_ptr<ExecutionStats> _stats;