diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 2 |
3 files changed, 32 insertions, 24 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index d794025b2c0..9be0940221a 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -770,10 +770,11 @@ public: invariant(batch->finished()); auto batchStatus = batch->getResult().getStatus(); tassert(5916402, - str::stream() << "Got unexpected error (" << batch->getResult().getStatus() + str::stream() << "Got unexpected error (" << batchStatus << ") preparing time-series bucket to be committed for " << ns() << ": " << redact(request().toBSON({})), batchStatus == ErrorCodes::TimeseriesBucketCleared || + batchStatus.isA<ErrorCategory::Interruption>() || batchStatus.isA<ErrorCategory::StaleShardVersionError>()); docsToRetry->push_back(index); @@ -863,15 +864,21 @@ public: return left.get()->bucket() < right.get()->bucket(); }); + boost::optional<Status> abortStatus; + ScopeGuard batchGuard{[&] { + for (auto batch : batchesToCommit) { + if (batch.get()) { + bucketCatalog.abort(batch, abortStatus); + } + } + }}; + std::vector<write_ops::InsertCommandRequest> insertOps; std::vector<write_ops::UpdateCommandRequest> updateOps; for (auto batch : batchesToCommit) { auto metadata = bucketCatalog.getMetadata(batch.get()->bucket()); if (!bucketCatalog.prepareCommit(batch)) { - for (auto batchToAbort : batchesToCommit) { - bucketCatalog.abort(batchToAbort); - } return false; } @@ -889,9 +896,7 @@ public: auto result = write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps); if (!result.isOK()) { - for (auto batch : batchesToCommit) { - bucketCatalog.abort(batch, result); - } + abortStatus = result; return false; } @@ -915,10 +920,14 @@ public: compressClosedBuckets = false; } if (!ret.canContinue) { + if (!ret.result.isOK()) { + abortStatus = ret.result.getStatus(); + } return false; } } + batchGuard.dismiss(); return true; } diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 3b5da81a3bb..c6fecf6f47b 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -299,8 +299,6 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { batch->_prepareCommit(); _memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage); - bucket->_batches.erase(batch->_opId); - return true; } @@ -351,18 +349,20 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( bucket.release(); auto lk = _lockExclusive(); - closedBucket = - ClosedBucket{ptr->_id, ptr->getTimeField().toString(), ptr->numMeasurements()}; - - // Only remove from _allBuckets and _idleBuckets. If it was marked full, we know that - // happened in BucketAccess::rollover, and that there is already a new open bucket for - // this metadata. - _markBucketNotIdle(ptr, false /* locked */); - { - stdx::lock_guard statesLk{_statesMutex}; - _bucketStates.erase(ptr->_id); + if (_allBuckets.contains(ptr)) { + closedBucket = + ClosedBucket{ptr->_id, ptr->getTimeField().toString(), ptr->numMeasurements()}; + + // Only remove from _allBuckets and _idleBuckets. If it was marked full, we know + // that happened in BucketAccess::rollover, and that there is already a new open + // bucket for this metadata. + _markBucketNotIdle(ptr, false /* locked */); + { + stdx::lock_guard statesLk{_statesMutex}; + _bucketStates.erase(ptr->_id); + } + _allBuckets.erase(ptr); } - _allBuckets.erase(ptr); } else { _markBucketIdle(bucket); } @@ -486,7 +486,7 @@ BucketCatalog::StripedMutex::ExclusiveLock BucketCatalog::_lockExclusive() const void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) { while (true) { BucketAccess bucket{this, batch->bucket()}; - if (!bucket) { + if (!bucket || batch->finished()) { return; } @@ -494,6 +494,7 @@ void BucketCatalog::_waitToCommitBatch(const std::shared_ptr<WriteBatch>& batch) if (!current) { // No other batches for this bucket are currently committing, so we can proceed. bucket->_preparedBatch = batch; + bucket->_batches.erase(batch->_opId); break; } @@ -1239,7 +1240,6 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) { invariant(_commitRights.load()); invariant(!_active); _promise.emplaceValue(info); - _bucket = nullptr; } void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status, @@ -1257,7 +1257,6 @@ void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status, _promise.setError(status.value_or( Status{ErrorCodes::TimeseriesBucketCleared, str::stream() << "Time-series bucket " << bucketIdentification << "was cleared"})); - _bucket = nullptr; } class BucketCatalog::ServerStatus : public ServerStatusSection { diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index 2c2b1994443..9936a9be4c3 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -158,7 +158,7 @@ public: void _abort(const boost::optional<Status>& status, bool canAccessBucket); - Bucket* _bucket; + Bucket* const _bucket; OperationId _opId; std::shared_ptr<ExecutionStats> _stats; |