diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2022-02-28 16:55:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-28 17:39:42 +0000 |
commit | 335bef8b6c3632ce7228553ef1c8fa3522768858 (patch) | |
tree | 8ed3f2fc3775c83eef237bf8523da0bd0d2568c6 /src/mongo/db | |
parent | c05a030a3e807d1f444b4ee91b2331d5b7c19916 (diff) | |
download | mongo-335bef8b6c3632ce7228553ef1c8fa3522768858.tar.gz |
SERVER-61010 Require a `Status` for `BucketCatalog::abort`
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/write_commands.cpp | 108 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog.h | 21 | ||||
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog_test.cpp | 42 |
4 files changed, 124 insertions, 115 deletions
diff --git a/src/mongo/db/commands/write_commands.cpp b/src/mongo/db/commands/write_commands.cpp index 276af6bb02a..47c688309f9 100644 --- a/src/mongo/db/commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands.cpp @@ -740,18 +740,16 @@ public: std::vector<write_ops::WriteError>* errors, boost::optional<repl::OpTime>* opTime, boost::optional<OID>* electionId, - std::vector<size_t>* docsToRetry) const { + std::vector<size_t>* docsToRetry) const try { auto& bucketCatalog = BucketCatalog::get(opCtx); auto metadata = bucketCatalog.getMetadata(batch->bucket()); - bool prepared = bucketCatalog.prepareCommit(batch); - if (!prepared) { + auto status = bucketCatalog.prepareCommit(batch); + if (!status.isOK()) { invariant(batch->finished()); docsToRetry->push_back(index); return true; } - // Now that the batch is prepared, make sure we clean up if we throw. - ScopeGuard batchGuard([&] { bucketCatalog.abort(batch); }); hangTimeseriesInsertBeforeWrite.pauseWhileSet(); @@ -764,7 +762,6 @@ public: generateError(opCtx, output.result, start + index, errors->size())) { errors->emplace_back(std::move(*error)); bucketCatalog.abort(batch, output.result.getStatus()); - batchGuard.dismiss(); return output.canContinue; } @@ -779,7 +776,6 @@ public: generateError(opCtx, output.result, start + index, errors->size())) { errors->emplace_back(std::move(*error)); bucketCatalog.abort(batch, output.result.getStatus()); - batchGuard.dismiss(); return output.canContinue; } @@ -794,8 +790,6 @@ public: auto closedBucket = bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId}); - batchGuard.dismiss(); - if (closedBucket) { // If this write closed a bucket, compress the bucket auto output = _performTimeseriesBucketCompression(opCtx, *closedBucket); @@ -806,6 +800,9 @@ public: } } return true; + } catch (const DBException& ex) { + BucketCatalog::get(opCtx).abort(batch, ex.toStatus()); + throw; } bool _commitTimeseriesBucketsAtomically(OperationContext* opCtx, @@ -834,7 +831,7 @@ public: return left.get()->bucket().id < right.get()->bucket().id; }); - boost::optional<Status> abortStatus; + Status abortStatus = Status::OK(); ScopeGuard batchGuard{[&] { for (auto batch : batchesToCommit) { if (batch.get()) { @@ -843,58 +840,63 @@ public: } }}; - std::vector<write_ops::InsertCommandRequest> insertOps; - std::vector<write_ops::UpdateCommandRequest> updateOps; + try { + std::vector<write_ops::InsertCommandRequest> insertOps; + std::vector<write_ops::UpdateCommandRequest> updateOps; - for (auto batch : batchesToCommit) { - auto metadata = bucketCatalog.getMetadata(batch.get()->bucket()); - if (!bucketCatalog.prepareCommit(batch)) { - return false; - } + for (auto batch : batchesToCommit) { + auto metadata = bucketCatalog.getMetadata(batch.get()->bucket()); + auto prepareCommitStatus = bucketCatalog.prepareCommit(batch); + if (!prepareCommitStatus.isOK()) { + abortStatus = prepareCommitStatus; + return false; + } - if (batch.get()->numPreviouslyCommittedMeasurements() == 0) { - insertOps.push_back(_makeTimeseriesInsertOp( - batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); - } else { - updateOps.push_back(_makeTimeseriesUpdateOp( - opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); + if (batch.get()->numPreviouslyCommittedMeasurements() == 0) { + insertOps.push_back(_makeTimeseriesInsertOp( + batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); + } else { + updateOps.push_back(_makeTimeseriesUpdateOp( + opCtx, batch, metadata, std::move(stmtIds[batch.get()->bucket().id]))); + } } - } - hangTimeseriesInsertBeforeWrite.pauseWhileSet(); + hangTimeseriesInsertBeforeWrite.pauseWhileSet(); - auto result = - write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps); - if (!result.isOK()) { - abortStatus = result; - return false; - } + auto result = + write_ops_exec::performAtomicTimeseriesWrites(opCtx, insertOps, updateOps); + if (!result.isOK()) { + abortStatus = result; + return false; + } - getOpTimeAndElectionId(opCtx, opTime, electionId); + getOpTimeAndElectionId(opCtx, opTime, electionId); - bool compressClosedBuckets = true; - for (auto batch : batchesToCommit) { - auto closedBucket = - bucketCatalog.finish(batch, BucketCatalog::CommitInfo{*opTime, *electionId}); - batch.get().reset(); + bool compressClosedBuckets = true; + for (auto batch : batchesToCommit) { + auto closedBucket = bucketCatalog.finish( + batch, BucketCatalog::CommitInfo{*opTime, *electionId}); + batch.get().reset(); - if (!closedBucket || !compressClosedBuckets) { - continue; - } + if (!closedBucket || !compressClosedBuckets) { + continue; + } - // If this write closed a bucket, compress the bucket - auto ret = _performTimeseriesBucketCompression(opCtx, *closedBucket); - if (!ret.result.isOK()) { - // Don't try to compress any other buckets if we fail. We're not allowed to do - // more write operations. - compressClosedBuckets = false; - } - if (!ret.canContinue) { + // If this write closed a bucket, compress the bucket + auto ret = _performTimeseriesBucketCompression(opCtx, *closedBucket); if (!ret.result.isOK()) { + // Don't try to compress any other buckets if we fail. We're not allowed to + // do more write operations. + compressClosedBuckets = false; + } + if (!ret.canContinue) { abortStatus = ret.result.getStatus(); + return false; } - return false; } + } catch (const DBException& ex) { + abortStatus = ex.toStatus(); + throw; } batchGuard.dismiss(); @@ -1026,13 +1028,7 @@ public: // If there are any unprocessed batches, we mark them as error with the last known // error. if (itr > indexOfLastProcessedBatch && batch->claimCommitRights()) { - auto& bucketCatalog = BucketCatalog::get(opCtx); - bucketCatalog.abort(batch); - - tassert( - 6023100, - "there should be at least one error if the batch processing exited early", - lastError); + BucketCatalog::get(opCtx).abort(batch, lastError->getStatus()); errors->emplace_back(start + index, lastError->getStatus()); continue; } diff --git a/src/mongo/db/timeseries/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog.cpp index d817f3e1bc5..f2710fff960 100644 --- a/src/mongo/db/timeseries/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog.cpp @@ -192,6 +192,17 @@ std::pair<OID, Date_t> generateBucketId(const Date_t& time, const TimeseriesOpti return {bucketId, roundedTime}; } + +Status getTimeseriesBucketClearedError(const OID& bucketId, + const boost::optional<NamespaceString>& ns = boost::none) { + std::string nsIdentification; + if (ns) { + nsIdentification.assign(str::stream() << " for namespace " << *ns); + } + return {ErrorCodes::TimeseriesBucketCleared, + str::stream() << "Time-series bucket " << bucketId << nsIdentification + << " was cleared"}; +} } // namespace struct BucketCatalog::ExecutionStats { @@ -524,19 +535,12 @@ void BucketCatalog::WriteBatch::_finish(const CommitInfo& info) { _promise.emplaceValue(info); } -void BucketCatalog::WriteBatch::_abort(const boost::optional<Status>& status, - const Bucket* bucket) { +void BucketCatalog::WriteBatch::_abort(const Status& status) { if (finished()) { return; } - std::string nsIdentification; - if (bucket) { - nsIdentification.append(str::stream() << " for namespace " << bucket->_ns); - } - _promise.setError(status.value_or(Status{ErrorCodes::TimeseriesBucketCleared, - str::stream() << "Time-series bucket " << _bucket.id - << nsIdentification << " was cleared"})); + _promise.setError(status); } BucketCatalog& BucketCatalog::get(ServiceContext* svcCtx) { @@ -675,10 +679,12 @@ StatusWith<BucketCatalog::InsertResult> BucketCatalog::insert( return InsertResult{batch, closedBuckets}; } -bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { +Status BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { + auto getBatchStatus = [&] { return batch->_promise.getFuture().getNoThrow().getStatus(); }; + if (batch->finished()) { // In this case, someone else aborted the batch behind our back. Oops. - return false; + return getBatchStatus(); } auto& stripe = _stripes[batch->bucket().stripe]; @@ -690,17 +696,17 @@ bool BucketCatalog::prepareCommit(std::shared_ptr<WriteBatch> batch) { if (batch->finished()) { // Someone may have aborted it while we were waiting. - return false; + return getBatchStatus(); } else if (!bucket) { - _abort(&stripe, stripeLock, batch, boost::none); - return false; + _abort(&stripe, stripeLock, batch, getTimeseriesBucketClearedError(batch->bucket().id)); + return getBatchStatus(); } auto prevMemoryUsage = bucket->_memoryUsage; batch->_prepareCommit(bucket); _memoryUsage.fetchAndAdd(bucket->_memoryUsage - prevMemoryUsage); - return true; + return Status::OK(); } boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( @@ -741,7 +747,11 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( if (it != stripe.allBuckets.end()) { bucket = it->second.get(); bucket->_preparedBatch.reset(); - _abort(&stripe, stripeLock, bucket, nullptr, boost::none); + _abort(&stripe, + stripeLock, + bucket, + nullptr, + getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); } } else if (bucket->allCommitted()) { if (bucket->_full) { @@ -772,8 +782,7 @@ boost::optional<BucketCatalog::ClosedBucket> BucketCatalog::finish( return closedBucket; } -void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, - const boost::optional<Status>& status) { +void BucketCatalog::abort(std::shared_ptr<WriteBatch> batch, const Status& status) { invariant(batch); invariant(batch->_commitRights.load()); @@ -807,7 +816,11 @@ void BucketCatalog::clear(const std::function<bool(const NamespaceString&)>& sho stdx::lock_guard catalogLock{_mutex}; _executionStats.erase(bucket->_ns); } - _abort(&stripe, stripeLock, bucket.get(), nullptr, boost::none); + _abort(&stripe, + stripeLock, + bucket.get(), + nullptr, + getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); } it = nextIt; @@ -948,7 +961,12 @@ BucketCatalog::Bucket* BucketCatalog::_useOrCreateBucket(Stripe* stripe, return bucket; } - _abort(stripe, stripeLock, bucket, nullptr, boost::none); + _abort(stripe, + stripeLock, + bucket, + nullptr, + getTimeseriesBucketClearedError(bucket->id(), bucket->_ns)); + return _allocateBucket(stripe, stripeLock, info); } @@ -1000,12 +1018,12 @@ bool BucketCatalog::_removeBucket(Stripe* stripe, WithLock stripeLock, Bucket* b void BucketCatalog::_abort(Stripe* stripe, WithLock stripeLock, std::shared_ptr<WriteBatch> batch, - const boost::optional<Status>& status) { + const Status& status) { // Before we access the bucket, make sure it's still there. Bucket* bucket = _useBucket(stripe, stripeLock, batch->bucket().id, ReturnClearedBuckets::kYes); if (!bucket) { // Special case, bucket has already been cleared, and we need only abort this batch. - batch->_abort(status, nullptr); + batch->_abort(status); return; } @@ -1017,11 +1035,11 @@ void BucketCatalog::_abort(Stripe* stripe, WithLock stripeLock, Bucket* bucket, std::shared_ptr<WriteBatch> batch, - const boost::optional<Status>& status) { + const Status& status) { // Abort any unprepared batches. This should be safe since we have a lock on the stripe, // preventing anyone else from using these. for (const auto& [_, current] : bucket->_batches) { - current->_abort(status, bucket); + current->_abort(status); } bucket->_batches.clear(); @@ -1032,7 +1050,7 @@ void BucketCatalog::_abort(Stripe* stripe, if (auto& prepared = bucket->_preparedBatch) { if (prepared == batch) { // We own the prepared batch, so we can go ahead and abort it and remove the bucket. - prepared->_abort(status, bucket); + prepared->_abort(status); prepared.reset(); } else { doRemove = false; diff --git a/src/mongo/db/timeseries/bucket_catalog.h b/src/mongo/db/timeseries/bucket_catalog.h index b33a9025b3c..9807cdea1a1 100644 --- a/src/mongo/db/timeseries/bucket_catalog.h +++ b/src/mongo/db/timeseries/bucket_catalog.h @@ -160,10 +160,8 @@ public: /** * Abandons the write batch and notifies any waiters that the bucket has been cleared. - * Parameter 'bucket' provides a pointer to the bucket if still available, nullptr - * otherwise. */ - void _abort(const boost::optional<Status>& status, const Bucket* bucket); + void _abort(const Status& status); const BucketHandle _bucket; OperationId _opId; @@ -220,10 +218,10 @@ public: /** * Prepares a batch for commit, transitioning it to an inactive state. Caller must already have - * commit rights on batch. Returns true if the batch was successfully prepared, or false if the - * batch was aborted. + * commit rights on batch. Returns OK if the batch was successfully prepared, or a status + * indicating why the batch was previously aborted by another operation. */ - bool prepareCommit(std::shared_ptr<WriteBatch> batch); + Status prepareCommit(std::shared_ptr<WriteBatch> batch); /** * Records the result of a batch commit. Caller must already have commit rights on batch, and @@ -234,11 +232,10 @@ public: boost::optional<ClosedBucket> finish(std::shared_ptr<WriteBatch> batch, const CommitInfo& info); /** - * Aborts the given write batch and any other outstanding batches on the same bucket. Uses the - * provided status when clearing the bucket, or TimeseriesBucketCleared if not provided. + * Aborts the given write batch and any other outstanding batches on the same bucket, using the + * provided status. */ - void abort(std::shared_ptr<WriteBatch> batch, - const boost::optional<Status>& status = boost::none); + void abort(std::shared_ptr<WriteBatch> batch, const Status& status); /** * Marks any bucket with the specified OID as cleared and prevents any future inserts from @@ -415,7 +412,7 @@ private: void _abort(Stripe* stripe, WithLock stripeLock, std::shared_ptr<WriteBatch> batch, - const boost::optional<Status>& status = boost::none); + const Status& status); /** * Aborts any unprepared batches for the given bucket, then removes the bucket if there is no @@ -426,7 +423,7 @@ private: WithLock stripeLock, Bucket* bucket, std::shared_ptr<WriteBatch> batch, - const boost::optional<Status>& status); + const Status& status); /** * Adds the bucket to a list of idle buckets to be expired at a later date. diff --git a/src/mongo/db/timeseries/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog_test.cpp index 93352e64bb9..b39b1065b89 100644 --- a/src/mongo/db/timeseries/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog_test.cpp @@ -158,7 +158,7 @@ void BucketCatalogTest::_commit(const std::shared_ptr<BucketCatalog::WriteBatch> uint16_t numPreviouslyCommittedMeasurements, size_t expectedBatchSize) { ASSERT(batch->claimCommitRights()); - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), expectedBatchSize); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), numPreviouslyCommittedMeasurements); @@ -261,7 +261,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { // The batch hasn't actually been committed yet. ASSERT(!batch1->finished()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); // Still not finished. ASSERT(!batch1->finished()); @@ -290,7 +290,7 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { .batch; ASSERT(batch->claimCommitRights()); auto bucket = batch->bucket(); - _bucketCatalog->abort(batch); + _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket)); } @@ -502,7 +502,7 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { .getValue() .batch; ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT_EQ(batch1->measurements().size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); @@ -533,7 +533,7 @@ DEATH_TEST_F(BucketCatalogTest, CannotCommitWithoutRights, "invariant") { BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto& batch = result.getValue().batch; - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); // BucketCatalog::prepareCommit uses dassert, so it will only invariant in debug mode. Ensure we // die here in non-debug mode as well. @@ -640,7 +640,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { .getValue() .batch; ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT_EQ(batch1->measurements().size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); @@ -657,7 +657,7 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { ASSERT_NE(batch1, batch2); ASSERT(batch2->claimCommitRights()); - _bucketCatalog->abort(batch2); + _bucketCatalog->abort(batch2, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(batch2->finished()); ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); @@ -680,8 +680,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { _bucketCatalog->clear(_ns1); - bool prepared = _bucketCatalog->prepareCommit(batch); - ASSERT(!prepared); + ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); @@ -695,7 +694,7 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { .getValue() .batch; ASSERT(batch->claimCommitRights()); - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); @@ -723,13 +722,13 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { .getValue() .batch; ASSERT(batch->claimCommitRights()); - _bucketCatalog->prepareCommit(batch); + ASSERT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT_EQ(batch->measurements().size(), 1); ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); ASSERT_THROWS(_bucketCatalog->clear(batch->bucket().id), WriteConflictException); - _bucketCatalog->abort(batch); + _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); } @@ -745,7 +744,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .getValue() .batch; ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT_EQ(batch1->measurements().size(), 1); ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); @@ -767,7 +766,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { // Now try to prepare the second batch. Ensure it aborts the batch. ASSERT(batch2->claimCommitRights()); - ASSERT(!_bucketCatalog->prepareCommit(batch2)); + ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); ASSERT(batch2->finished()); ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); @@ -790,7 +789,7 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { ASSERT_NE(batch1->bucket().id, batch3->bucket().id); // Clean up this batch ASSERT(batch3->claimCommitRights()); - _bucketCatalog->abort(batch3); + _bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""}); // Make sure we can finish the cleanly prepared batch. _bucketCatalog->finish(batch1, {}); @@ -810,12 +809,11 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { .batch; ASSERT(batch->claimCommitRights()); - _bucketCatalog->abort(batch); + _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); - bool prepared = _bucketCatalog->prepareCommit(batch); - ASSERT(!prepared); + ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); ASSERT(batch->finished()); ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); } @@ -896,8 +894,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { ASSERT(batch2->claimCommitRights()); // Batch 2 will not be able to commit until batch 1 has finished. - _bucketCatalog->prepareCommit(batch1); - auto task = Task{[&]() { _bucketCatalog->prepareCommit(batch2); }}; + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); + auto task = Task{[&]() { ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); }}; // Add a little extra wait to make sure prepareCommit actually gets to the blocking point. stdx::this_thread::sleep_for(stdx::chrono::milliseconds(10)); ASSERT(task.future().valid()); @@ -932,7 +930,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { // Batch 2 is the first batch to commit the time field. ASSERT(batch2->claimCommitRights()); - _bucketCatalog->prepareCommit(batch2); + ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); ASSERT_EQ(batch2->newFieldNamesToBeInserted().size(), 1); ASSERT_EQ(batch2->newFieldNamesToBeInserted().begin()->first, _timeField); _bucketCatalog->finish(batch2, {}); @@ -940,7 +938,7 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { // Batch 1 was the first batch to insert the time field, but by commit time it was already // committed by batch 2. ASSERT(batch1->claimCommitRights()); - _bucketCatalog->prepareCommit(batch1); + ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); ASSERT(batch1->newFieldNamesToBeInserted().empty()); _bucketCatalog->finish(batch1, {}); } |