summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-11-02 17:13:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-02 17:57:11 +0000
commit6c1b68295151a074848f8cfd36060fc8949928fe (patch)
treee189845f4356d866a652e6e421934ebce66bf2ee
parent8e3ffddd878f3a1faa508c2dbddd5e0cdec314a8 (diff)
downloadmongo-6c1b68295151a074848f8cfd36060fc8949928fe.tar.gz
SERVER-71020 Ensure aborting time series batch eventually removes bucket from catalog
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp6
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp67
2 files changed, 72 insertions, 1 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 31cb39aeab0..854a2805d66 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -860,7 +860,11 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
_useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kPrepared);
if (batch->finished()) {
- // Someone may have aborted it while we were waiting.
+ // Someone may have aborted it while we were waiting. Since we have the prepared batch, we
+ // should now be able to fully abort the bucket.
+ if (bucket) {
+ _abort(&stripe, stripeLock, batch, getBatchStatus());
+ }
return getBatchStatus();
} else if (!bucket) {
_abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id));
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 6733f4c9d89..7253111cbf2 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -911,6 +911,73 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
_bucketCatalog->finish(batch2, {});
}
+TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
+ auto batch1 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+
+ auto batch2 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ auto batch3 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ ASSERT_EQ(batch1->bucket().id, batch2->bucket().id);
+ ASSERT_EQ(batch1->bucket().id, batch3->bucket().id);
+
+ ASSERT(batch1->claimCommitRights());
+ ASSERT(batch2->claimCommitRights());
+ ASSERT(batch3->claimCommitRights());
+
+ // Batch 2 will not be able to commit until batch 1 has finished.
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+ auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
+ // Add a little extra wait to make sure prepareCommit actually gets to the blocking point.
+ stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
+ ASSERT(task.future().valid());
+ ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1)))
+ << "prepareCommit finished before expected";
+
+ // If we abort the third batch, it should abort the second one too, as it isn't prepared.
+ // However, since the first batch is prepared, we can't abort it or clean up the bucket. We can
+ // then finish the first batch, which will allow the second batch to proceed. It should
+ // recognize it has been aborted and clean up the bucket.
+ _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
+ _bucketCatalog->finish(batch1, {});
+ task.future().wait();
+ ASSERT(batch2->finished());
+
+ // Make sure a new batch ends up in a new bucket.
+ auto batch4 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ ASSERT_NE(batch2->bucket().id, batch4->bucket().id);
+}
+
TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
auto batch1 = _bucketCatalog
->insert(_opCtx,