diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2021-04-23 21:25:00 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-04-23 21:57:17 +0000 |
commit | 1be3fe09bef0be96ec5b162b89edf0ef1e26270a (patch) | |
tree | 23e0c13c1516c6cf2ecb1e7dc0f56bf05f59a1ce | |
parent | e46ccf98296b03819006153c87c85d3b87bd8987 (diff) | |
download | mongo-1be3fe09bef0be96ec5b162b89edf0ef1e26270a.tar.gz |
SERVER-56242 Clean up BucketCatalog state in case of exception
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 17 |
2 files changed, 17 insertions, 5 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 5901411c82d..9614a8f8eee 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -644,6 +644,8 @@ public: docsToRetry->push_back(index); return; } + // Now that the batch is prepared, make sure we clean up if we throw. + auto batchGuard = makeGuard([&] { bucketCatalog.abort(batch); }); hangTimeseriesInsertBeforeWrite.pauseWhileSet(); @@ -654,6 +656,7 @@ public: if (auto error = generateError(opCtx, result, start + index, errors->size())) { errors->push_back(*error); bucketCatalog.abort(batch, result.getStatus()); + batchGuard.dismiss(); return; } @@ -661,7 +664,6 @@ public: result.getValue().getNModified() == 0) { // No document in the buckets collection was found to update, meaning that it was // removed. - bucketCatalog.abort(batch); docsToRetry->push_back(index); return; } @@ -669,6 +671,7 @@ public: getOpTimeAndElectionId(opCtx, opTime, electionId); bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId}); + batchGuard.dismiss(); } bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx, diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index d5fd3a0c632..f11763c4a16 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -270,9 +270,10 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& invariant(!batch->finished()); invariant(!batch->active()); - BucketAccess bucket(this, batch->bucket()); - + Bucket* ptr(batch->bucket()); batch->_finish(info); + + BucketAccess bucket(this, ptr); if (bucket) { invariant(_setBucketState(bucket->_id, BucketState::kNormal)); bucket->_preparedBatch.reset(); @@ -291,13 +292,21 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& bucket->_numCommittedMeasurements += batch->measurements().size(); } - if (bucket && bucket->allCommitted()) { + if (!bucket) { + // It's possible that we cleared the bucket in between preparing the commit and finishing + // here. In this case, we should abort any other ongoing batches and clear the bucket from + // the catalog so it's not hanging around idle. + auto lk = _lockExclusive(); + if (_allBuckets.contains(ptr)) { + stdx::unique_lock blk{ptr->_mutex}; + _abort(blk, ptr, nullptr, boost::none); + } + } else if (bucket->allCommitted()) { if (bucket->_full) { // Everything in the bucket has been committed, and nothing more will be added since the // bucket is full. Thus, we can remove it. _memoryUsage.fetchAndSubtract(bucket->_memoryUsage); - Bucket* ptr(bucket); bucket.release(); auto lk = _lockExclusive(); |