From 335bef8b6c3632ce7228553ef1c8fa3522768858 Mon Sep 17 00:00:00 2001 From: Gregory Noma Date: Mon, 28 Feb 2022 16:55:25 +0000 Subject: SERVER-61010 Require a `Status` for `BucketCatalog::abort` --- src/mongo/db/timeseries/bucket_catalog.cpp | 68 ++++++++++++++++--------- src/mongo/db/timeseries/bucket_catalog.h | 21 ++++---- src/mongo/db/timeseries/bucket_catalog_test.cpp | 42 ++++++++------- 3 files changed, 72 insertions(+), 59 deletions(-) (limited to 'src/mongo/db/timeseries') diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index d817f3e1bc5..f2710fff960 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -192,6 +192,17 @@ std::pair generateBucketId(const Date_t& time, const TimeseriesOpti return {bucketId, roundedTime}; } + +Status getTimeseriesBucketClearedError(const OID& bucketId, + const boost::optional& ns = boost::none) { + std::string nsIdentification; + if (ns) { + nsIdentification.assign(str::stream() << " for namespace " << *ns); + } + return {ErrorCodes::TimeseriesBucketCleared, + str::stream() << "Time-series bucket " << bucketId << nsIdentification + << " was cleared"}; +} } // namespace struct BucketCatalog::ExecutionStats { @@ -524,19 +535,12 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) { _promise.emplaceValue(info); } -void BucketCatalog::WriteBatch::_abort(const boost::optional& status, - const Bucket* bucket) { +void BucketCatalog::WriteBatch::_abort(const Status& status) { if (finished()) { return; } - std::string nsIdentification; - if (bucket) { - nsIdentification.append(str::stream() << " for namespace " << bucket->_ns); - } - _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared, - str::stream() << "Time-series bucket " << _bucket.id - << nsIdentification << " was cleared"})); + _promise.setError(status); } BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) { @@ -675,10 +679,12 @@ StatusWith BucketCatalog::insert( return InsertResult{batch, closedBuckets}; } -bool BucketCatalog::prepareCommit(std::shared_ptr batch) { +Status BucketCatalog::prepareCommit(std::shared_ptr batch) { + auto getBatchStatus = [&] { return batch->_promise.getFuture().getNoThrow().getStatus(); }; + if (batch->finished()) { // In this case, someone else aborted the batch behind our back. Oops. - return false; + return getBatchStatus(); } auto& stripe = _stripes[batch->bucket().stripe]; @@ -690,17 +696,17 @@ bool BucketCatalog::prepareCommit(std::shared_ptr batch) { if (batch->finished()) { // Someone may have aborted it while we were waiting. - return false; + return getBatchStatus(); } else if (!bucket) { - _abort(&stripe, stripeLock, batch, boost::none); - return false; + _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id)); + return getBatchStatus(); } auto prevMemoryUsage = bucket->_memoryUsage; batch->_prepareCommit(bucket); _memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage); - return true; + return Status::OK(); } boost::optional BucketCatalog::finish( @@ -741,7 +747,11 @@ boost::optional BucketCatalog::finish( if (it != stripe.allBuckets.end()) { bucket = it->second.get(); bucket->_preparedBatch.reset(); - _abort(&stripe, stripeLock, bucket, nullptr, boost::none); + _abort(&stripe, + stripeLock, + bucket, + nullptr, + getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); } } else if (bucket->allCommitted()) { if (bucket->_full) { @@ -772,8 +782,7 @@ boost::optional BucketCatalog::finish( return closedBucket; } -void BucketCatalog::abort(std::shared_ptr batch, - const boost::optional& status) { +void BucketCatalog::abort(std::shared_ptr batch, const Status& status) { invariant(batch); invariant(batch->_commitRights.load()); @@ -807,7 +816,11 @@ void BucketCatalog::clear(const std::function& sho stdx::lock_guard catalogLock{_mutex}; _executionStats.erase(bucket->_ns); } - _abort(&stripe, stripeLock, bucket.get(), nullptr, boost::none); + _abort(&stripe, + stripeLock, + bucket.get(), + nullptr, + getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); } it = nextIt; @@ -948,7 +961,12 @@ BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe, return bucket; } - _abort(stripe, stripeLock, bucket, nullptr, boost::none); + _abort(stripe, + stripeLock, + bucket, + nullptr, + getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); + return _allocateBucket(stripe, stripeLock, info); } @@ -1000,12 +1018,12 @@ bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* b void BucketCatalog::_abort(Stripe* stripe, WithLock stripeLock, std::shared_ptr batch, - const boost::optional& status) { + const Status& status) { // Before we access the bucket, make sure it's still there. Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kYes); if (!bucket) { // Special case, bucket has already been cleared, and we need only abort this batch. - batch->_abort(status, nullptr); + batch->_abort(status); return; } @@ -1017,11 +1035,11 @@ void BucketCatalog::_abort(Stripe* stripe, WithLock stripeLock, Bucket* bucket, std::shared_ptr batch, - const boost::optional& status) { + const Status& status) { // Abort any unprepared batches. This should be safe since we have a lock on the stripe, // preventing anyone else from using these. for (const auto& [_, current] : bucket->_batches) { - current->_abort(status, bucket); + current->_abort(status); } bucket->_batches.clear(); @@ -1032,7 +1050,7 @@ void BucketCatalog::_abort(Stripe* stripe, if (auto& prepared = bucket->_preparedBatch) { if (prepared == batch) { // We own the prepared batch, so we can go ahead and abort it and remove the bucket. - prepared->_abort(status, bucket); + prepared->_abort(status); prepared.reset(); } else { doRemove = false; diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index b33a9025b3c..9807cdea1a1 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -160,10 +160,8 @@ public: /** * Abandons the write batch and notifies any waiters that the bucket has been cleared. - * Parameter 'bucket' provides a pointer to the bucket if still available, nullptr - * otherwise. */ - void _abort(const boost::optional& status, const Bucket* bucket); + void _abort(const Status& status); const BucketHandle _bucket; OperationId _opId; @@ -220,10 +218,10 @@ public: /** * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have - * commit rights on batch. Returns true if the batch was successfully prepared, or false if the - * batch was aborted. + * commit rights on batch. Returns OK if the batch was successfully prepared, or a status + * indicating why the batch was previously aborted by another operation. */ - bool prepareCommit(std::shared_ptr batch); + Status prepareCommit(std::shared_ptr batch); /** * Records the result of a batch commit. Caller must already have commit rights on batch, and @@ -234,11 +232,10 @@ public: boost::optional finish(std::shared_ptr batch, const CommitInfo& info); /** - * Aborts the given write batch and any other outstanding batches on the same bucket. Uses the - * provided status when clearing the bucket, or TimeseriesBucketCleared if not provided. + * Aborts the given write batch and any other outstanding batches on the same bucket, using the + * provided status. */ - void abort(std::shared_ptr batch, - const boost::optional& status = boost::none); + void abort(std::shared_ptr batch, const Status& status); /** * Marks any bucket with the specified OID as cleared and prevents any future inserts from @@ -415,7 +412,7 @@ private: void _abort(Stripe* stripe, WithLock stripeLock, std::shared_ptr batch, - const boost::optional& status = boost::none); + const Status& status); /** * Aborts any unprepared batches for the given bucket, then removes the bucket if there is no @@ -426,7 +423,7 @@ private: WithLock stripeLock, Bucket* bucket, std::shared_ptr batch, - const boost::optional& status); + const Status& status); /** * Adds the bucket to a list of idle buckets to be expired at a later date. diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 93352e64bb9..b39b1065b89 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -158,7 +158,7 @@ void BucketCatalogTest::_commit(const std::shared_ptr uint16_t numPreviouslyCommittedMeasurements, size_t expectedBatchSize) { ASSERT(batch->claimCommitRights()); - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), expectedBatchSize); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), numPreviouslyCommittedMeasurements); @@ -261,7 +261,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { // The batch hasn't actually been committed yet. ASSERT(!batch1->finished()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); // Still not finished. ASSERT(!batch1->finished()); @@ -290,7 +290,7 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { .batch; ASSERT(batch->claimCommitRights()); auto bucket = batch->bucket(); - _bucketCatalog->abort(batch); + _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket)); } @@ -502,7 +502,7 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { .getValue() .batch; ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT_EQ(batch1->measurements().size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); @@ -533,7 +533,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") { BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto& batch = result.getValue().batch; - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); // BucketCatalog::prepareCommit uses dassert, so it will only invariant in debug mode. Ensure we // die here in non-debug mode as well. @@ -640,7 +640,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { .getValue() .batch; ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT_EQ(batch1->measurements().size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); @@ -657,7 +657,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { ASSERT_NE(batch1, batch2); ASSERT(batch2->claimCommitRights()); - _bucketCatalog->abort(batch2); + _bucketCatalog->abort(batch2, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(batch2->finished()); ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); @@ -680,8 +680,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { _bucketCatalog->clear(_ns1); - bool prepared = _bucketCatalog->prepareCommit(batch); - ASSERT(!prepared); + ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); @@ -695,7 +694,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { .getValue() .batch; ASSERT(batch->claimCommitRights()); - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); @@ -723,13 +722,13 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { .getValue() .batch; ASSERT(batch->claimCommitRights()); - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); ASSERT_THROWS(_bucketCatalog->clear(batch->bucket().id), WriteConflictException); - _bucketCatalog->abort(batch); + _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); } @@ -745,7 +744,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .getValue() .batch; ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT_EQ(batch1->measurements().size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); @@ -767,7 +766,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { // Now try to prepare the second batch. Ensure it aborts the batch. ASSERT(batch2->claimCommitRights()); - ASSERT(!_bucketCatalog->prepareCommit(batch2)); + ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); ASSERT(batch2->finished()); ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); @@ -790,7 +789,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { ASSERT_NE(batch1->bucket().id, batch3->bucket().id); // Clean up this batch ASSERT(batch3->claimCommitRights()); - _bucketCatalog->abort(batch3); + _bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""}); // Make sure we can finish the cleanly prepared batch. _bucketCatalog->finish(batch1, {}); @@ -810,12 +809,11 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { .batch; ASSERT(batch->claimCommitRights()); - _bucketCatalog->abort(batch); + _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); - bool prepared = _bucketCatalog->prepareCommit(batch); - ASSERT(!prepared); + ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); } @@ -896,8 +894,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { ASSERT(batch2->claimCommitRights()); // Batch 2 will not be able to commit until batch 1 has finished. - _bucketCatalog->prepareCommit(batch1); - auto task = Task{[&]() { _bucketCatalog->prepareCommit(batch2); }}; + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; // Add a little extra wait to make sure prepareCommit actually gets to the blocking point. stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); ASSERT(task.future().valid()); @@ -932,7 +930,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { // Batch 2 is the first batch to commit the time field. ASSERT(batch2->claimCommitRights()); - _bucketCatalog->prepareCommit(batch2); + ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); ASSERT_EQ(batch2->newFieldNamesToBeInserted().size(), 1); ASSERT_EQ(batch2->newFieldNamesToBeInserted().begin()->first, _timeField); _bucketCatalog->finish(batch2, {}); @@ -940,7 +938,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { // Batch 1 was the first batch to insert the time field, but by commit time it was already // committed by batch 2. ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT(batch1->newFieldNamesToBeInserted().empty()); _bucketCatalog->finish(batch1, {}); } -- cgit v1.2.1