summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2022-11-02 17:13:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-21 16:03:38 +0000
commite7acc50f7d2e0ee599a13721b6d235495c6c0ca9 (patch)
treecff13dd145571190e34ae7b156af79db79b51c6a
parent9ba29038b9316acb6f0a5a414285ca39fddb3fa5 (diff)
downloadmongo-e7acc50f7d2e0ee599a13721b6d235495c6c0ca9.tar.gz
SERVER-71020 SERVER-71939 Ensure aborting time series batch eventually removes bucket from catalog
(cherry picked from commit 6c1b68295151a074848f8cfd36060fc8949928fe) (cherry picked from commit bc4e993a11b434a7cf60aff416182a8ee640d6b3)
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp15
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp185
2 files changed, 163 insertions, 37 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index 10e0e2df903..5a935ae439e 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -51,6 +51,7 @@ void normalizeObject(BSONObjBuilder* builder, const BSONObj& obj);
const auto getBucketCatalog = ServiceContext::declareDecoration<BucketCatalog>();
MONGO_FAIL_POINT_DEFINE(hangTimeseriesDirectModificationBeforeWriteConflict);
+MONGO_FAIL_POINT_DEFINE(hangWaitingForConflictingPreparedBatch);
uint8_t numDigits(uint32_t num) {
uint8_t numDigits = 0;
@@ -746,7 +747,11 @@ Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
_useBucketInState(&stripe, stripeLock, batch->bucket().id, BucketState::kPrepared);
if (batch->finished()) {
- // Someone may have aborted it while we were waiting.
+ // Someone may have aborted it while we were waiting. Since we have the prepared batch, we
+ // should now be able to fully abort the bucket.
+ if (bucket) {
+ _abort(&stripe, stripeLock, batch, getBatchStatus());
+ }
return getBatchStatus();
} else if (!bucket) {
_abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id));
@@ -1051,6 +1056,10 @@ void BucketCatalog::_waitToCommitBatch(Stripe* stripe, const std::shared_ptr<Wri
}
}
+ // We only hit this failpoint when there are conflicting prepared batches on the same
+ // bucket.
+ hangWaitingForConflictingPreparedBatch.pauseWhileSet();
+
// We have to wait for someone else to finish.
current->getResult().getStatus().ignore(); // We don't care about the result.
}
@@ -1108,7 +1117,7 @@ void BucketCatalog::_abort(Stripe* stripe,
// user is doing with it, but we need to keep the bucket around until
// that batch is finished.
if (auto& prepared = bucket->_preparedBatch) {
- if (prepared == batch) {
+ if (batch && prepared == batch) {
// We own the prepared batch, so we can go ahead and abort it and remove the bucket.
prepared->_abort(status);
prepared.reset();
@@ -1119,6 +1128,8 @@ void BucketCatalog::_abort(Stripe* stripe,
if (doRemove) {
[[maybe_unused]] bool removed = _removeBucket(stripe, stripeLock, bucket);
+ } else {
+ _setBucketState(bucket->id(), BucketState::kCleared);
}
}
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index b39b1065b89..99ce87d5b1b 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -43,17 +43,13 @@ namespace mongo {
namespace {
class BucketCatalogTest : public CatalogTestFixture {
protected:
- class Task {
- AtomicWord<bool> _running{false};
- stdx::packaged_task<void()> _task;
- stdx::future<void> _future;
+ class RunBackgroundTaskAndWaitForFailpoint {
stdx::thread _taskThread;
public:
- Task(std::function<void()>&& fn);
- ~Task();
-
- const stdx::future<void>& future();
+ RunBackgroundTaskAndWaitForFailpoint(const std::string& failpointName,
+ std::function<void()>&& fn);
+ ~RunBackgroundTaskAndWaitForFailpoint();
};
void setUp() override;
@@ -96,25 +92,6 @@ protected:
BSONObj _makeTimeseriesOptionsForCreate() const override;
};
-BucketCatalogTest::Task::Task(std::function<void()>&& fn)
- : _task{[this, fn = std::move(fn)]() {
- _running.store(true);
- fn();
- }},
- _future{_task.get_future()},
- _taskThread{std::move(_task)} {
- while (!_running.load()) {
- stdx::this_thread::yield();
- }
-}
-BucketCatalogTest::Task::~Task() {
- _taskThread.join();
-}
-
-const stdx::future<void>& BucketCatalogTest::Task::future() {
- return _future;
-}
-
void BucketCatalogTest::setUp() {
CatalogTestFixture::setUp();
@@ -129,6 +106,23 @@ void BucketCatalogTest::setUp() {
}
}
+BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::RunBackgroundTaskAndWaitForFailpoint(
+ const std::string& failpointName, std::function<void()>&& fn) {
+ auto fp = globalFailPointRegistry().find(failpointName);
+ auto timesEntered = fp->setMode(FailPoint::alwaysOn, 0);
+
+ // Start background job.
+ _taskThread = stdx::thread(std::move(fn));
+
+ // Once we hit the failpoint once, turn it off.
+ fp->waitForTimesEntered(timesEntered + 1);
+ fp->setMode(FailPoint::off, 0);
+}
+
+BucketCatalogTest::RunBackgroundTaskAndWaitForFailpoint::~RunBackgroundTaskAndWaitForFailpoint() {
+ _taskThread.join();
+}
+
std::pair<ServiceContext::UniqueClient, ServiceContext::UniqueOperationContext>
BucketCatalogTest::_makeOperationContext() {
auto client = getServiceContext()->makeClient("BucketCatalogTest");
@@ -895,16 +889,137 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
// Batch 2 will not be able to commit until batch 1 has finished.
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
- auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
- // Add a little extra wait to make sure prepareCommit actually gets to the blocking point.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
- ASSERT(task.future().valid());
- ASSERT(stdx::future_status::timeout == task.future().wait_for(stdx::chrono::microseconds(1)))
- << "prepareCommit finished before expected";
- _bucketCatalog->finish(batch1, {});
- task.future().wait();
+ {
+ auto task = RunBackgroundTaskAndWaitForFailpoint{
+ "hangWaitingForConflictingPreparedBatch",
+ [&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
+
+ // Finish the first batch.
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ }
+
_bucketCatalog->finish(batch2, {});
+ ASSERT(batch2->finished());
+}
+
+TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
+ auto batch1 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+
+ auto batch2 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ auto batch3 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ ASSERT_EQ(batch1->bucket().id, batch2->bucket().id);
+ ASSERT_EQ(batch1->bucket().id, batch3->bucket().id);
+
+ ASSERT(batch1->claimCommitRights());
+ ASSERT(batch2->claimCommitRights());
+ ASSERT(batch3->claimCommitRights());
+
+ // Batch 2 will not be able to commit until batch 1 has finished.
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+
+ {
+ auto task = RunBackgroundTaskAndWaitForFailpoint{
+ "hangWaitingForConflictingPreparedBatch",
+ [&]() { ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); }};
+
+ // If we abort the third batch, it should abort the second one too, as it isn't prepared.
+ // However, since the first batch is prepared, we can't abort it or clean up the bucket. We
+ // can then finish the first batch, which will allow the second batch to proceed. It should
+ // recognize it has been aborted and clean up the bucket.
+ _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ }
+ // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should
+ // be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we
+ // expect the prepareCommit() call to fail.
+ ASSERT(batch2->finished());
+
+ // Make sure a new batch ends up in a new bucket.
+ auto batch4 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ ASSERT_NE(batch2->bucket().id, batch4->bucket().id);
+}
+
+TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
+ auto batch1 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+
+ auto batch2 = _bucketCatalog
+ ->insert(_makeOperationContext().second.get(),
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+
+ // Batch 1 and 2 use the same bucket.
+ ASSERT_EQ(batch1->bucket().id, batch2->bucket().id);
+ ASSERT(batch1->claimCommitRights());
+ ASSERT(batch2->claimCommitRights());
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+
+ // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be
+ // closed after batch 1 finishes.
+ _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
+ _bucketCatalog->finish(batch1, {});
+ ASSERT(batch1->finished());
+ ASSERT(batch2->finished());
+
+ // Ensure a batch started after batch 2 aborts, does not insert future measurements into the
+ // aborted batch/bucket.
+ auto batch3 = _bucketCatalog
+ ->insert(_opCtx,
+ _ns1,
+ _getCollator(_ns1),
+ _getTimeseriesOptions(_ns1),
+ BSON(_timeField << Date_t::now()),
+ BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
+ .getValue()
+ .batch;
+ ASSERT_NE(batch1->bucket().id, batch3->bucket().id);
}
TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {