summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2022-02-28 16:55:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-28 17:39:42 +0000
commit335bef8b6c3632ce7228553ef1c8fa3522768858 (patch)
tree8ed3f2fc3775c83eef237bf8523da0bd0d2568c6 /src/mongo/db/timeseries
parentc05a030a3e807d1f444b4ee91b2331d5b7c19916 (diff)
downloadmongo-335bef8b6c3632ce7228553ef1c8fa3522768858.tar.gz
SERVER-61010 Require a `Status` for `BucketCatalog::abort`
Diffstat (limited to 'src/mongo/db/timeseries')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.cpp68
-rw-r--r--src/mongo/db/timeseries/bucket_catalog.h21
-rw-r--r--src/mongo/db/timeseries/bucket_catalog_test.cpp42
3 files changed, 72 insertions, 59 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp
index d817f3e1bc5..f2710fff960 100644
--- a/src/mongo/db/timeseries/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog.cpp
@@ -192,6 +192,17 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti
return {bucketId, roundedTime};
}
+
+Status getTimeseriesBucketClearedError(const OID& bucketId,
+ const boost::optional<NamespaceString>& ns = boost::none) {
+ std::string nsIdentification;
+ if (ns) {
+ nsIdentification.assign(str::stream() << " for namespace " << *ns);
+ }
+ return {ErrorCodes::TimeseriesBucketCleared,
+ str::stream() << "Time-series bucket " << bucketId << nsIdentification
+ << " was cleared"};
+}
} // namespace
struct BucketCatalog::ExecutionStats {
@@ -524,19 +535,12 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) {
_promise.emplaceValue(info);
}
-void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status,
- const Bucket* bucket) {
+void BucketCatalog::WriteBatch::_abort(const Status& status) {
if (finished()) {
return;
}
- std::string nsIdentification;
- if (bucket) {
- nsIdentification.append(str::stream() << " for namespace " << bucket->_ns);
- }
- _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared,
- str::stream() << "Time-series bucket " << _bucket.id
- << nsIdentification << " was cleared"}));
+ _promise.setError(status);
}
BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) {
@@ -675,10 +679,12 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert(
return InsertResult{batch, closedBuckets};
}
-bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
+Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
+ auto getBatchStatus = [&] { return batch->_promise.getFuture().getNoThrow().getStatus(); };
+
if (batch->finished()) {
// In this case, someone else aborted the batch behind our back. Oops.
- return false;
+ return getBatchStatus();
}
auto& stripe = _stripes[batch->bucket().stripe];
@@ -690,17 +696,17 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) {
if (batch->finished()) {
// Someone may have aborted it while we were waiting.
- return false;
+ return getBatchStatus();
} else if (!bucket) {
- _abort(&stripe, stripeLock, batch, boost::none);
- return false;
+ _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id));
+ return getBatchStatus();
}
auto prevMemoryUsage = bucket->_memoryUsage;
batch->_prepareCommit(bucket);
_memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage);
- return true;
+ return Status::OK();
}
boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
@@ -741,7 +747,11 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
if (it != stripe.allBuckets.end()) {
bucket = it->second.get();
bucket->_preparedBatch.reset();
- _abort(&stripe, stripeLock, bucket, nullptr, boost::none);
+ _abort(&stripe,
+ stripeLock,
+ bucket,
+ nullptr,
+ getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
}
} else if (bucket->allCommitted()) {
if (bucket->_full) {
@@ -772,8 +782,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish(
return closedBucket;
}
-void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status) {
+void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& status) {
invariant(batch);
invariant(batch->_commitRights.load());
@@ -807,7 +816,11 @@ void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& sho
stdx::lock_guard catalogLock{_mutex};
_executionStats.erase(bucket->_ns);
}
- _abort(&stripe, stripeLock, bucket.get(), nullptr, boost::none);
+ _abort(&stripe,
+ stripeLock,
+ bucket.get(),
+ nullptr,
+ getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
}
it = nextIt;
@@ -948,7 +961,12 @@ BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe,
return bucket;
}
- _abort(stripe, stripeLock, bucket, nullptr, boost::none);
+ _abort(stripe,
+ stripeLock,
+ bucket,
+ nullptr,
+ getTimeseriesBucketClearedError(bucket->id(), bucket->_ns));
+
return _allocateBucket(stripe, stripeLock, info);
}
@@ -1000,12 +1018,12 @@ bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* b
void BucketCatalog::_abort(Stripe* stripe,
WithLock stripeLock,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status) {
+ const Status& status) {
// Before we access the bucket, make sure it's still there.
Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kYes);
if (!bucket) {
// Special case, bucket has already been cleared, and we need only abort this batch.
- batch->_abort(status, nullptr);
+ batch->_abort(status);
return;
}
@@ -1017,11 +1035,11 @@ void BucketCatalog::_abort(Stripe* stripe,
WithLock stripeLock,
Bucket* bucket,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status) {
+ const Status& status) {
// Abort any unprepared batches. This should be safe since we have a lock on the stripe,
// preventing anyone else from using these.
for (const auto& [_, current] : bucket->_batches) {
- current->_abort(status, bucket);
+ current->_abort(status);
}
bucket->_batches.clear();
@@ -1032,7 +1050,7 @@ void BucketCatalog::_abort(Stripe* stripe,
if (auto& prepared = bucket->_preparedBatch) {
if (prepared == batch) {
// We own the prepared batch, so we can go ahead and abort it and remove the bucket.
- prepared->_abort(status, bucket);
+ prepared->_abort(status);
prepared.reset();
} else {
doRemove = false;
diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h
index b33a9025b3c..9807cdea1a1 100644
--- a/src/mongo/db/timeseries/bucket_catalog.h
+++ b/src/mongo/db/timeseries/bucket_catalog.h
@@ -160,10 +160,8 @@ public:
/**
* Abandons the write batch and notifies any waiters that the bucket has been cleared.
- * Parameter 'bucket' provides a pointer to the bucket if still available, nullptr
- * otherwise.
*/
- void _abort(const boost::optional<Status>& status, const Bucket* bucket);
+ void _abort(const Status& status);
const BucketHandle _bucket;
OperationId _opId;
@@ -220,10 +218,10 @@ public:
/**
* Prepares a batch for commit, transitioning it to an inactive state. Caller must already have
- * commit rights on batch. Returns true if the batch was successfully prepared, or false if the
- * batch was aborted.
+ * commit rights on batch. Returns OK if the batch was successfully prepared, or a status
+ * indicating why the batch was previously aborted by another operation.
*/
- bool prepareCommit(std::shared_ptr<WriteBatch> batch);
+ Status prepareCommit(std::shared_ptr<WriteBatch> batch);
/**
* Records the result of a batch commit. Caller must already have commit rights on batch, and
@@ -234,11 +232,10 @@ public:
boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info);
/**
- * Aborts the given write batch and any other outstanding batches on the same bucket. Uses the
- * provided status when clearing the bucket, or TimeseriesBucketCleared if not provided.
+ * Aborts the given write batch and any other outstanding batches on the same bucket, using the
+ * provided status.
*/
- void abort(std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status = boost::none);
+ void abort(std::shared_ptr<WriteBatch> batch, const Status& status);
/**
* Marks any bucket with the specified OID as cleared and prevents any future inserts from
@@ -415,7 +412,7 @@ private:
void _abort(Stripe* stripe,
WithLock stripeLock,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status = boost::none);
+ const Status& status);
/**
* Aborts any unprepared batches for the given bucket, then removes the bucket if there is no
@@ -426,7 +423,7 @@ private:
WithLock stripeLock,
Bucket* bucket,
std::shared_ptr<WriteBatch> batch,
- const boost::optional<Status>& status);
+ const Status& status);
/**
* Adds the bucket to a list of idle buckets to be expired at a later date.
diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp
index 93352e64bb9..b39b1065b89 100644
--- a/src/mongo/db/timeseries/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp
@@ -158,7 +158,7 @@ void BucketCatalogTest::_commit(const std::shared_ptr<BucketCatalog::WriteBatch>
uint16_t numPreviouslyCommittedMeasurements,
size_t expectedBatchSize) {
ASSERT(batch->claimCommitRights());
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), expectedBatchSize);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), numPreviouslyCommittedMeasurements);
@@ -261,7 +261,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
// The batch hasn't actually been committed yet.
ASSERT(!batch1->finished());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
// Still not finished.
ASSERT(!batch1->finished());
@@ -290,7 +290,7 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
.batch;
ASSERT(batch->claimCommitRights());
auto bucket = batch->bucket();
- _bucketCatalog->abort(batch);
+ _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket));
}
@@ -502,7 +502,7 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
.getValue()
.batch;
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT_EQ(batch1->measurements().size(), 1);
ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
@@ -533,7 +533,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") {
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto& batch = result.getValue().batch;
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
// BucketCatalog::prepareCommit uses dassert, so it will only invariant in debug mode. Ensure we
// die here in non-debug mode as well.
@@ -640,7 +640,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
.getValue()
.batch;
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT_EQ(batch1->measurements().size(), 1);
ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
@@ -657,7 +657,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
ASSERT_NE(batch1, batch2);
ASSERT(batch2->claimCommitRights());
- _bucketCatalog->abort(batch2);
+ _bucketCatalog->abort(batch2, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT(batch2->finished());
ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
@@ -680,8 +680,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
_bucketCatalog->clear(_ns1);
- bool prepared = _bucketCatalog->prepareCommit(batch);
- ASSERT(!prepared);
+ ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
@@ -695,7 +694,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
.getValue()
.batch;
ASSERT(batch->claimCommitRights());
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
@@ -723,13 +722,13 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
.getValue()
.batch;
ASSERT(batch->claimCommitRights());
- _bucketCatalog->prepareCommit(batch);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT_EQ(batch->measurements().size(), 1);
ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
ASSERT_THROWS(_bucketCatalog->clear(batch->bucket().id), WriteConflictException);
- _bucketCatalog->abort(batch);
+ _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
@@ -745,7 +744,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.getValue()
.batch;
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT_EQ(batch1->measurements().size(), 1);
ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
@@ -767,7 +766,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
// Now try to prepare the second batch. Ensure it aborts the batch.
ASSERT(batch2->claimCommitRights());
- ASSERT(!_bucketCatalog->prepareCommit(batch2));
+ ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2));
ASSERT(batch2->finished());
ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
@@ -790,7 +789,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
ASSERT_NE(batch1->bucket().id, batch3->bucket().id);
// Clean up this batch
ASSERT(batch3->claimCommitRights());
- _bucketCatalog->abort(batch3);
+ _bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""});
// Make sure we can finish the cleanly prepared batch.
_bucketCatalog->finish(batch1, {});
@@ -810,12 +809,11 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
.batch;
ASSERT(batch->claimCommitRights());
- _bucketCatalog->abort(batch);
+ _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
- bool prepared = _bucketCatalog->prepareCommit(batch);
- ASSERT(!prepared);
+ ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch));
ASSERT(batch->finished());
ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
@@ -896,8 +894,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
ASSERT(batch2->claimCommitRights());
// Batch 2 will not be able to commit until batch 1 has finished.
- _bucketCatalog->prepareCommit(batch1);
- auto task = Task{[&]() { _bucketCatalog->prepareCommit(batch2); }};
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
+ auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }};
// Add a little extra wait to make sure prepareCommit actually gets to the blocking point.
stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10));
ASSERT(task.future().valid());
@@ -932,7 +930,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
// Batch 2 is the first batch to commit the time field.
ASSERT(batch2->claimCommitRights());
- _bucketCatalog->prepareCommit(batch2);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch2));
ASSERT_EQ(batch2->newFieldNamesToBeInserted().size(), 1);
ASSERT_EQ(batch2->newFieldNamesToBeInserted().begin()->first, _timeField);
_bucketCatalog->finish(batch2, {});
@@ -940,7 +938,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
// Batch 1 was the first batch to insert the time field, but by commit time it was already
// committed by batch 2.
ASSERT(batch1->claimCommitRights());
- _bucketCatalog->prepareCommit(batch1);
+ ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
ASSERT(batch1->newFieldNamesToBeInserted().empty());
_bucketCatalog->finish(batch1, {});
}