summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2021-04-23 21:25:00 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-23 21:57:17 +0000
commit1be3fe09bef0be96ec5b162b89edf0ef1e26270a (patch)
tree23e0c13c1516c6cf2ecb1e7dc0f56bf05f59a1ce
parente46ccf98296b03819006153c87c85d3b87bd8987 (diff)
downloadmongo-1be3fe09bef0be96ec5b162b89edf0ef1e26270a.tar.gz
SERVER-56242 Clean up BucketCatalog state in case of exception
-rw-r--r--src/mongo/db/commands/write_commands.cpp5
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp17
2 files changed, 17 insertions, 5 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp
index 5901411c82d..9614a8f8eee 100644
--- a/src/mongo/db/commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands.cpp
@@ -644,6 +644,8 @@ public:
docsToRetry->push_back(index);
return;
}
+ // Now that the batch is prepared, make sure we clean up if we throw.
+ auto batchGuard = makeGuard([&] { bucketCatalog.abort(batch); });
hangTimeseriesInsertBeforeWrite.pauseWhileSet();
@@ -654,6 +656,7 @@ public:
if (auto error = generateError(opCtx, result, start + index, errors->size())) {
errors->push_back(*error);
bucketCatalog.abort(batch, result.getStatus());
+ batchGuard.dismiss();
return;
}
@@ -661,7 +664,6 @@ public:
result.getValue().getNModified() == 0) {
// No document in the buckets collection was found to update, meaning that it was
// removed.
- bucketCatalog.abort(batch);
docsToRetry->push_back(index);
return;
}
@@ -669,6 +671,7 @@ public:
getOpTimeAndElectionId(opCtx, opTime, electionId);
bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId});
+ batchGuard.dismiss();
}
bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx,
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index d5fd3a0c632..f11763c4a16 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -270,9 +270,10 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
invariant(!batch->finished());
invariant(!batch->active());
- BucketAccess bucket(this, batch->bucket());
-
+ Bucket* ptr(batch->bucket());
batch->_finish(info);
+
+ BucketAccess bucket(this, ptr);
if (bucket) {
invariant(_setBucketState(bucket->_id, BucketState::kNormal));
bucket->_preparedBatch.reset();
@@ -291,13 +292,21 @@ void BucketCatalog::finish(std::shared_ptr<WriteBatch> batch, const CommitInfo&
bucket->_numCommittedMeasurements += batch->measurements().size();
}
- if (bucket && bucket->allCommitted()) {
+ if (!bucket) {
+ // It's possible that we cleared the bucket in between preparing the commit and finishing
+ // here. In this case, we should abort any other ongoing batches and clear the bucket from
+ // the catalog so it's not hanging around idle.
+ auto lk = _lockExclusive();
+ if (_allBuckets.contains(ptr)) {
+ stdx::unique_lock blk{ptr->_mutex};
+ _abort(blk, ptr, nullptr, boost::none);
+ }
+ } else if (bucket->allCommitted()) {
if (bucket->_full) {
// Everything in the bucket has been committed, and nothing more will be added since the
// bucket is full. Thus, we can remove it.
_memoryUsage.fetchAndSubtract(bucket->_memoryUsage);
- Bucket* ptr(bucket);
bucket.release();
auto lk = _lockExclusive();