diff options
author | Dan Larkin-York <dan.larkin-york@mongodb.com> | 2023-01-26 15:11:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-01-26 16:21:37 +0000 |
commit | c79b825adb6c99540eeacc0bdb3cd04edc7ec73c (patch) | |
tree | 8df968ae4967205d52d46031911729bcf8b42df5 /src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp | |
parent | 50a3a7934a8f5239177ed373709427c9274fb54f (diff) | |
download | mongo-c79b825adb6c99540eeacc0bdb3cd04edc7ec73c.tar.gz |
SERVER-73015 De-encapsulate bucket_catalog::WriteBatch and bucket_catalog::Bucket
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp')
-rw-r--r-- | src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp | 284 |
1 files changed, 142 insertions, 142 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp index 2b2f2797e02..e371f5ba2d6 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp @@ -161,10 +161,10 @@ const CollatorInterface* BucketCatalogTest::_getCollator(const NamespaceString& void BucketCatalogTest::_commit(const std::shared_ptr<WriteBatch>& batch, uint16_t numPreviouslyCommittedMeasurements, size_t expectedBatchSize) { - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), expectedBatchSize); - ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), numPreviouslyCommittedMeasurements); + ASSERT_EQ(batch->measurements.size(), expectedBatchSize); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, numPreviouslyCommittedMeasurements); _bucketCatalog->finish(batch, {}); } @@ -241,7 +241,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { BSON(_timeField << Date_t::now()), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto batch1 = result1.getValue().batch; - ASSERT(batch1->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch1)); // A subsequent insert into the same bucket should land in the same batch, but not be able to // claim commit rights @@ -254,25 +254,25 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto batch2 = result2.getValue().batch; ASSERT_EQ(batch1, batch2); - ASSERT(!batch2->claimCommitRights()); + ASSERT(!claimWriteBatchCommitRights(*batch2)); // The batch hasn't actually been committed yet. - ASSERT(!batch1->finished()); + ASSERT(!isWriteBatchFinished(*batch1)); ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); // Still not finished. - ASSERT(!batch1->finished()); + ASSERT(!isWriteBatchFinished(*batch1)); // The batch should contain both documents since they belong in the same bucket and happened // in the same commit epoch. Nothing else has been committed in this bucket yet. - ASSERT_EQ(batch1->measurements().size(), 2); - ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch1->measurements.size(), 2); + ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Once the commit has occurred, the waiter should be notified. _bucketCatalog->finish(batch1, {}); - ASSERT(batch2->finished()); - auto result3 = batch2->getResult(); + ASSERT(isWriteBatchFinished(*batch2)); + auto result3 = getWriteBatchResult(*batch2); ASSERT_OK(result3.getStatus()); } @@ -286,8 +286,8 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch->claimCommitRights()); - auto bucket = batch->bucket(); + ASSERT(claimWriteBatchCommitRights(*batch)); + auto bucket = batch->bucketHandle; _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket)); } @@ -322,10 +322,10 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) { // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"), - _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()), - _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); - ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucket()).isEmpty()); + _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); + ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucketHandle).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. @@ -355,9 +355,9 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) { // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); } TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { @@ -386,11 +386,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) { ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ( BSON(_metaField << BSONObj(BSON( "c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); } @@ -422,11 +422,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) { ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); + _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON_ARRAY("123" << "456"))))), - _bucketCatalog->getMetadata(result2.getValue().batch->bucket())); + _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle)); } TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { @@ -450,8 +450,8 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) { // Check metadata in buckets. ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL), - _bucketCatalog->getMetadata(result1.getValue().batch->bucket())); - ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucket()).isEmpty()); + _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle)); + ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle).isEmpty()); // Committing one bucket should only return the one document in that bucket and should not // affect the other bucket. @@ -499,10 +499,10 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch1->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); - ASSERT_EQ(batch1->measurements().size(), 1); - ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch1->measurements.size(), 1); + ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Insert before finish so there's a second batch live at the same time. auto batch2 = _bucketCatalog @@ -517,7 +517,7 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) { ASSERT_NE(batch1, batch2); _bucketCatalog->finish(batch1, {}); - ASSERT(batch1->finished()); + ASSERT(isWriteBatchFinished(*batch1)); // Verify the second batch still commits one doc, and that the first batch only commited one. _commit(batch2, 1); @@ -549,7 +549,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) { .getValue() .batch; - ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucket())); + ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucketHandle)); _commit(batch, 0); } @@ -564,11 +564,11 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); ASSERT(result.isOK()); auto batch = result.getValue().batch; - auto oldId = batch->bucket().bucketId; + auto oldId = batch->bucketHandle.bucketId; _commit(batch, 0); - ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); - ASSERT(batch->newFieldNamesToBeInserted().count(_timeField)) << batch->toBSON(); - ASSERT(batch->newFieldNamesToBeInserted().count("a")) << batch->toBSON(); + ASSERT_EQ(2U, batch->newFieldNamesToBeInserted.size()) << batch->toBSON(); + ASSERT(batch->newFieldNamesToBeInserted.count(_timeField)) << batch->toBSON(); + ASSERT(batch->newFieldNamesToBeInserted.count("a")) << batch->toBSON(); // Inserting a new measurement with the same fields should return an empty set of new fields. @@ -581,7 +581,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { ASSERT(result.isOK()); batch = result.getValue().batch; _commit(batch, 1); - ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); + ASSERT_EQ(0U, batch->newFieldNamesToBeInserted.size()) << batch->toBSON(); // Insert a new measurement with the a new field. result = _bucketCatalog->insert(_opCtx, @@ -593,8 +593,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { ASSERT(result.isOK()); batch = result.getValue().batch; _commit(batch, 2); - ASSERT_EQ(1U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON(); - ASSERT(batch->newFieldNamesToBeInserted().count("b")) << batch->toBSON(); + ASSERT_EQ(1U, batch->newFieldNamesToBeInserted.size()) << batch->toBSON(); + ASSERT(batch->newFieldNamesToBeInserted.count("b")) << batch->toBSON(); // Fill up the bucket. for (auto i = 3; i < gTimeseriesBucketMaxCount; ++i) { @@ -607,7 +607,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { ASSERT(result.isOK()); batch = result.getValue().batch; _commit(batch, i); - ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << i << ":" << batch->toBSON(); + ASSERT_EQ(0U, batch->newFieldNamesToBeInserted.size()) << i << ":" << batch->toBSON(); } // When a bucket overflows, committing to the new overflow bucket should return the fields of @@ -620,11 +620,11 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) { BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount), BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); auto& batch2 = result2.getValue().batch; - ASSERT_NE(oldId, batch2->bucket().bucketId); + ASSERT_NE(oldId, batch2->bucketHandle.bucketId); _commit(batch2, 0); - ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON(); - ASSERT(batch2->newFieldNamesToBeInserted().count(_timeField)) << batch2->toBSON(); - ASSERT(batch2->newFieldNamesToBeInserted().count("a")) << batch2->toBSON(); + ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted.size()) << batch2->toBSON(); + ASSERT(batch2->newFieldNamesToBeInserted.count(_timeField)) << batch2->toBSON(); + ASSERT(batch2->newFieldNamesToBeInserted.count("a")) << batch2->toBSON(); } TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { @@ -637,10 +637,10 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch1->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); - ASSERT_EQ(batch1->measurements().size(), 1); - ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch1->measurements.size(), 1); + ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Insert before finish so there's a second batch live at the same time. auto batch2 = _bucketCatalog @@ -654,14 +654,14 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) { .batch; ASSERT_NE(batch1, batch2); - ASSERT(batch2->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch2)); _bucketCatalog->abort(batch2, {ErrorCodes::TimeseriesBucketCleared, ""}); - ASSERT(batch2->finished()); - ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + ASSERT(isWriteBatchFinished(*batch2)); + ASSERT_EQ(getWriteBatchResult(*batch2).getStatus(), ErrorCodes::TimeseriesBucketCleared); _bucketCatalog->finish(batch1, {}); - ASSERT(batch1->finished()); - ASSERT_OK(batch1->getResult().getStatus()); + ASSERT(isWriteBatchFinished(*batch1)); + ASSERT_OK(getWriteBatchResult(*batch1).getStatus()); } TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { @@ -674,13 +674,13 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); _bucketCatalog->clear(_ns1); ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT(batch->finished()); - ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + ASSERT(isWriteBatchFinished(*batch)); + ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); batch = _bucketCatalog ->insert(_opCtx, @@ -691,10 +691,10 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); - ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch->measurements.size(), 1); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); _bucketCatalog->clear(_ns1); @@ -704,8 +704,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) { // removed, and that's fine for our purposes. The finish just records the result to the batch // and updates some statistics. _bucketCatalog->finish(batch, {}); - ASSERT(batch->finished()); - ASSERT_OK(batch->getResult().getStatus()); + ASSERT(isWriteBatchFinished(*batch)); + ASSERT_OK(getWriteBatchResult(*batch).getStatus()); } @@ -719,17 +719,17 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); - ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch->measurements.size(), 1); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); - ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch->bucket().bucketId.oid), + ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch->bucketHandle.bucketId.oid), WriteConflictException); _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); - ASSERT(batch->finished()); - ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + ASSERT(isWriteBatchFinished(*batch)); + ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); } TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { @@ -742,10 +742,10 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch1->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); - ASSERT_EQ(batch1->measurements().size(), 1); - ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch1->measurements.size(), 1); + ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0); // Insert before clear so there's a second batch live at the same time. auto batch2 = _bucketCatalog @@ -758,20 +758,20 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .getValue() .batch; ASSERT_NE(batch1, batch2); - ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId); + ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId); // Now clear the bucket. Since there's a prepared batch it should conflict. - ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid), + ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucketHandle.bucketId.oid), WriteConflictException); // Now try to prepare the second batch. Ensure it aborts the batch. - ASSERT(batch2->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch2)); ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2)); - ASSERT(batch2->finished()); - ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + ASSERT(isWriteBatchFinished(*batch2)); + ASSERT_EQ(getWriteBatchResult(*batch2).getStatus(), ErrorCodes::TimeseriesBucketCleared); // Make sure we didn't clear the bucket state when we aborted the second batch. - ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid), + ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucketHandle.bucketId.oid), WriteConflictException); // Make sure a subsequent insert, which opens a new bucket, doesn't corrupt the old bucket @@ -787,15 +787,15 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) { .batch; ASSERT_NE(batch1, batch3); ASSERT_NE(batch2, batch3); - ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId); + ASSERT_NE(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId); // Clean up this batch - ASSERT(batch3->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch3)); _bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""}); // Make sure we can finish the cleanly prepared batch. _bucketCatalog->finish(batch1, {}); - ASSERT(batch1->finished()); - ASSERT_OK(batch1->getResult().getStatus()); + ASSERT(isWriteBatchFinished(*batch1)); + ASSERT_OK(getWriteBatchResult(*batch1).getStatus()); } TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { @@ -808,15 +808,15 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow) .getValue() .batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); _bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""}); - ASSERT(batch->finished()); - ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + ASSERT(isWriteBatchFinished(*batch)); + ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT(batch->finished()); - ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared); + ASSERT(isWriteBatchFinished(*batch)); + ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared); } TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) { @@ -891,8 +891,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { .getValue() .batch; - ASSERT(batch1->claimCommitRights()); - ASSERT(batch2->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch1)); + ASSERT(claimWriteBatchCommitRights(*batch2)); // Batch 2 will not be able to commit until batch 1 has finished. ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); @@ -904,11 +904,11 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) { // Finish the first batch. _bucketCatalog->finish(batch1, {}); - ASSERT(batch1->finished()); + ASSERT(isWriteBatchFinished(*batch1)); } _bucketCatalog->finish(batch2, {}); - ASSERT(batch2->finished()); + ASSERT(isWriteBatchFinished(*batch2)); } TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { @@ -940,12 +940,12 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId); - ASSERT_EQ(batch1->bucket().bucketId, batch3->bucket().bucketId); + ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId); + ASSERT_EQ(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId); - ASSERT(batch1->claimCommitRights()); - ASSERT(batch2->claimCommitRights()); - ASSERT(batch3->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch1)); + ASSERT(claimWriteBatchCommitRights(*batch2)); + ASSERT(claimWriteBatchCommitRights(*batch3)); // Batch 2 will not be able to commit until batch 1 has finished. ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); @@ -961,12 +961,12 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { // recognize it has been aborted and clean up the bucket. _bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); _bucketCatalog->finish(batch1, {}); - ASSERT(batch1->finished()); + ASSERT(isWriteBatchFinished(*batch1)); } // Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should // be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we // expect the prepareCommit() call to fail. - ASSERT(batch2->finished()); + ASSERT(isWriteBatchFinished(*batch2)); // Make sure a new batch ends up in a new bucket. auto batch4 = _bucketCatalog @@ -978,7 +978,7 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - ASSERT_NE(batch2->bucket().bucketId, batch4->bucket().bucketId); + ASSERT_NE(batch2->bucketHandle.bucketId, batch4->bucketHandle.bucketId); } TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { @@ -1003,17 +1003,17 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { .batch; // Batch 1 and 2 use the same bucket. - ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId); - ASSERT(batch1->claimCommitRights()); - ASSERT(batch2->claimCommitRights()); + ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId); + ASSERT(claimWriteBatchCommitRights(*batch1)); + ASSERT(claimWriteBatchCommitRights(*batch2)); ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); // Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be // closed after batch 1 finishes. _bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"}); _bucketCatalog->finish(batch1, {}); - ASSERT(batch1->finished()); - ASSERT(batch2->finished()); + ASSERT(isWriteBatchFinished(*batch1)); + ASSERT(isWriteBatchFinished(*batch2)); // Ensure a batch started after batch 2 aborts, does not insert future measurements into the // aborted batch/bucket. @@ -1026,7 +1026,7 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) { BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow) .getValue() .batch; - ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId); + ASSERT_NE(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId); } TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { @@ -1051,17 +1051,17 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) { .batch; // Batch 2 is the first batch to commit the time field. - ASSERT(batch2->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch2)); ASSERT_OK(_bucketCatalog->prepareCommit(batch2)); - ASSERT_EQ(batch2->newFieldNamesToBeInserted().size(), 1); - ASSERT_EQ(batch2->newFieldNamesToBeInserted().begin()->first, _timeField); + ASSERT_EQ(batch2->newFieldNamesToBeInserted.size(), 1); + ASSERT_EQ(batch2->newFieldNamesToBeInserted.begin()->first, _timeField); _bucketCatalog->finish(batch2, {}); // Batch 1 was the first batch to insert the time field, but by commit time it was already // committed by batch 2. - ASSERT(batch1->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch1)); ASSERT_OK(_bucketCatalog->prepareCommit(batch1)); - ASSERT(batch1->newFieldNamesToBeInserted().empty()); + ASSERT(batch1->newFieldNamesToBeInserted.empty()); _bucketCatalog->finish(batch1, {}); } @@ -1285,17 +1285,17 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); // The reopened bucket already contains three committed measurements. - ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 3); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 3); // Verify that the min and max is updated correctly when inserting new measurements. - ASSERT_BSONOBJ_BINARY_EQ(batch->min(), BSON("u" << BSON("a" << -100))); + ASSERT_BSONOBJ_BINARY_EQ(batch->min, BSON("u" << BSON("a" << -100))); ASSERT_BSONOBJ_BINARY_EQ( - batch->max(), + batch->max, BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100))); _bucketCatalog->finish(batch, {}); @@ -1341,12 +1341,12 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); // Since the reopened bucket was incompatible, we opened a new one. - ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); _bucketCatalog->finish(batch, {}); } @@ -1397,17 +1397,17 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement) ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); // The reopened bucket already contains three committed measurements. - ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 3); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 3); // Verify that the min and max is updated correctly when inserting new measurements. - ASSERT_BSONOBJ_BINARY_EQ(batch->min(), BSON("u" << BSON("a" << -100))); + ASSERT_BSONOBJ_BINARY_EQ(batch->min, BSON("u" << BSON("a" << -100))); ASSERT_BSONOBJ_BINARY_EQ( - batch->max(), + batch->max, BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100))); _bucketCatalog->finish(batch, {}); @@ -1459,12 +1459,12 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges)); auto batch = result.getValue().batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); // Since the reopened bucket was incompatible, we opened a new one. - ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0); + ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0); _bucketCatalog->finish(batch, {}); } @@ -1488,7 +1488,7 @@ TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) { BucketCatalog::CombineWithInsertsFromOtherClients::kAllow); ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); _bucketCatalog->finish(batch, {}); @@ -1576,10 +1576,10 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto bucketId = batch->bucket().bucketId; - ASSERT(batch->claimCommitRights()); + auto bucketId = batch->bucketHandle.bucketId; + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); _bucketCatalog->finish(batch, {}); // Time backwards should hint to re-open. @@ -1621,11 +1621,11 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) { ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold)); ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold)); batch = result.getValue().batch; - ASSERT_NE(batch->bucket().bucketId, bucketId); + ASSERT_NE(batch->bucketHandle.bucketId, bucketId); ASSERT(batch); - ASSERT(batch->claimCommitRights()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); _bucketCatalog->finish(batch, {}); // If we try to insert something that could fit in the archived bucket, we should get it back as @@ -1660,10 +1660,10 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto bucketId = batch->bucket().bucketId; - ASSERT(batch->claimCommitRights()); + auto bucketId = batch->bucketHandle.bucketId; + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); _bucketCatalog->finish(batch, {}); // Incompatible schema would close the existing bucket, so we should expect to open a new bucket @@ -1678,10 +1678,10 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket) ASSERT_OK(result.getStatus()); batch = result.getValue().batch; ASSERT(batch); - ASSERT_NE(batch->bucket().bucketId, bucketId); - ASSERT(batch->claimCommitRights()); + ASSERT_NE(batch->bucketHandle.bucketId, bucketId); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); _bucketCatalog->finish(batch, {}); } @@ -1702,10 +1702,10 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto oldBucketId = batch->bucket().bucketId; - ASSERT(batch->claimCommitRights()); + auto oldBucketId = batch->bucketHandle.bucketId; + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); _bucketCatalog->finish(batch, {}); BSONObj bucketDoc = ::mongo::fromjson( @@ -1733,10 +1733,10 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) { ASSERT_OK(result.getStatus()); batch = result.getValue().batch; ASSERT(batch); - ASSERT_EQ(batch->bucket().bucketId.oid, bucketDoc["_id"].OID()); - ASSERT(batch->claimCommitRights()); + ASSERT_EQ(batch->bucketHandle.bucketId.oid, bucketDoc["_id"].OID()); + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); _bucketCatalog->finish(batch, {}); // Verify the old bucket was soft-closed ASSERT_EQ(1, _getExecutionStat(_ns1, kNumClosedDueToReopening)); @@ -1776,10 +1776,10 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) { ASSERT_OK(result.getStatus()); auto batch = result.getValue().batch; ASSERT(batch); - auto oldBucketId = batch->bucket().bucketId; - ASSERT(batch->claimCommitRights()); + auto oldBucketId = batch->bucketHandle.bucketId; + ASSERT(claimWriteBatchCommitRights(*batch)); ASSERT_OK(_bucketCatalog->prepareCommit(batch)); - ASSERT_EQ(batch->measurements().size(), 1); + ASSERT_EQ(batch->measurements.size(), 1); _bucketCatalog->finish(batch, {}); BSONObj bucketDoc = ::mongo::fromjson( |