diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2022-11-02 17:13:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-11-02 17:57:11 +0000 |
commit | 6c1b68295151a074848f8cfd36060fc8949928fe (patch) | |
tree | e189845f4356d866a652e6e421934ebce66bf2ee | |
parent | 8e3ffddd878f3a1faa508c2dbddd5e0cdec314a8 (diff) | |
download | mongo-6c1b68295151a074848f8cfd36060fc8949928fe.tar.gz |
SERVER-71020 Ensure aborting time series batch eventually removes bucket from catalog
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 67 |
2 files changed, 72 insertions, 1 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index 31cb39aeab0..854a2805d66 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -860,7 +860,11 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { _useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kPrepared); if (batch->finished()) { - // Someone may have aborted it while we were waiting. + // Someone may have aborted it while we were waiting. Since we have the prepared batch, we + // should now be able to fully abort the bucket. + if (bucket) { + _abort(&stripe, stripeLock, batch, getBatchStatus()); + } return getBatchStatus(); } else if (!bucket) { _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id)); diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 6733f4c9d89..7253111cbf2 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -911,6 +911,73 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { _bucketCatalog->finish(batch2, {}); } +TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { + auto batch1 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + + auto batch2 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + auto batch3 = _bucketCatalog + ->insert(_makeOperationContext().second.get(), + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + ASSERT_EQ(batch1->bucket().id, batch2->bucket().id); + ASSERT_EQ(batch1->bucket().id, batch3->bucket().id); + + ASSERT(batch1->claimCommitRights()); + ASSERT(batch2->claimCommitRights()); + ASSERT(batch3->claimCommitRights()); + + // Batch 2 will not be able to commit until batch 1 has finished. + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; + // Add a little extra wait to make sure prepareCommit actually gets to the blocking point. + stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); + ASSERT(task.future().valid()); + ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1))) + << "prepareCommit finished before expected"; + + // If we abort the third batch, it should abort the second one too, as it isn't prepared. + // However, since the first batch is prepared, we can't abort it or clean up the bucket. We can + // then finish the first batch, which will allow the second batch to proceed. It should + // recognize it has been aborted and clean up the bucket. + _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); + _bucketCatalog->finish(batch1, {}); + task.future().wait(); + ASSERT(batch2->finished()); + + // Make sure a new batch ends up in a new bucket. + auto batch4 = _bucketCatalog + ->insert(_opCtx, + _ns1, + _getCollator(_ns1), + _getTimeseriesOptions(_ns1), + BSON(_timeField << Date_t::now()), + BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) + .getValue() + .batch; + ASSERT_NE(batch2->bucket().id, batch4->bucket().id); +} + TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { auto batch1 = _bucketCatalog ->insert(_opCtx, |