summaryrefslogtreecommitdiff
path: root/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
diff options
context:
space:
mode:
authorDan Larkin-York <dan.larkin-york@mongodb.com>2023-01-26 15:11:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-01-26 16:21:37 +0000
commitc79b825adb6c99540eeacc0bdb3cd04edc7ec73c (patch)
tree8df968ae4967205d52d46031911729bcf8b42df5 /src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
parent50a3a7934a8f5239177ed373709427c9274fb54f (diff)
downloadmongo-c79b825adb6c99540eeacc0bdb3cd04edc7ec73c.tar.gz
SERVER-73015 De-encapsulate bucket_catalog::WriteBatch and bucket_catalog::Bucket
Diffstat (limited to 'src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp284
1 files changed, 142 insertions, 142 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
index 2b2f2797e02..e371f5ba2d6 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_test.cpp
@@ -161,10 +161,10 @@ const CollatorInterface* BucketCatalogTest::_getCollator(const NamespaceString&
void BucketCatalogTest::_commit(const std::shared_ptr<WriteBatch>& batch,
uint16_t numPreviouslyCommittedMeasurements,
size_t expectedBatchSize) {
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), expectedBatchSize);
- ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), numPreviouslyCommittedMeasurements);
+ ASSERT_EQ(batch->measurements.size(), expectedBatchSize);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, numPreviouslyCommittedMeasurements);
_bucketCatalog->finish(batch, {});
}
@@ -241,7 +241,7 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
BSON(_timeField << Date_t::now()),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto batch1 = result1.getValue().batch;
- ASSERT(batch1->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch1));
// A subsequent insert into the same bucket should land in the same batch, but not be able to
// claim commit rights
@@ -254,25 +254,25 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucket) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto batch2 = result2.getValue().batch;
ASSERT_EQ(batch1, batch2);
- ASSERT(!batch2->claimCommitRights());
+ ASSERT(!claimWriteBatchCommitRights(*batch2));
// The batch hasn't actually been committed yet.
- ASSERT(!batch1->finished());
+ ASSERT(!isWriteBatchFinished(*batch1));
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
// Still not finished.
- ASSERT(!batch1->finished());
+ ASSERT(!isWriteBatchFinished(*batch1));
// The batch should contain both documents since they belong in the same bucket and happened
// in the same commit epoch. Nothing else has been committed in this bucket yet.
- ASSERT_EQ(batch1->measurements().size(), 2);
- ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch1->measurements.size(), 2);
+ ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0);
// Once the commit has occurred, the waiter should be notified.
_bucketCatalog->finish(batch1, {});
- ASSERT(batch2->finished());
- auto result3 = batch2->getResult();
+ ASSERT(isWriteBatchFinished(*batch2));
+ auto result3 = getWriteBatchResult(*batch2);
ASSERT_OK(result3.getStatus());
}
@@ -286,8 +286,8 @@ TEST_F(BucketCatalogTest, GetMetadataReturnsEmptyDocOnMissingBucket) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch->claimCommitRights());
- auto bucket = batch->bucket();
+ ASSERT(claimWriteBatchCommitRights(*batch));
+ auto bucket = batch->bucketHandle;
_bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(bucket));
}
@@ -322,10 +322,10 @@ TEST_F(BucketCatalogTest, InsertIntoDifferentBuckets) {
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << "123"),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj()),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
- ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucket()).isEmpty());
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle));
+ ASSERT(_bucketCatalog->getMetadata(result3.getValue().batch->bucketHandle).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
@@ -355,9 +355,9 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketArray) {
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSON_ARRAY(BSON("a" << 0 << "b" << 1))),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle));
}
TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
@@ -386,11 +386,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketObjArray) {
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle));
ASSERT_BSONOBJ_EQ(
BSON(_metaField << BSONObj(BSON(
"c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1) << BSON("f" << 1 << "g" << 0))))),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle));
}
@@ -422,11 +422,11 @@ TEST_F(BucketCatalogTest, InsertIntoSameBucketNestedArray) {
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle));
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONObj(BSON("c" << BSON_ARRAY(BSON("a" << 0 << "b" << 1)
<< BSON_ARRAY("123"
<< "456"))))),
- _bucketCatalog->getMetadata(result2.getValue().batch->bucket()));
+ _bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle));
}
TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
@@ -450,8 +450,8 @@ TEST_F(BucketCatalogTest, InsertNullAndMissingMetaFieldIntoDifferentBuckets) {
// Check metadata in buckets.
ASSERT_BSONOBJ_EQ(BSON(_metaField << BSONNULL),
- _bucketCatalog->getMetadata(result1.getValue().batch->bucket()));
- ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucket()).isEmpty());
+ _bucketCatalog->getMetadata(result1.getValue().batch->bucketHandle));
+ ASSERT(_bucketCatalog->getMetadata(result2.getValue().batch->bucketHandle).isEmpty());
// Committing one bucket should only return the one document in that bucket and should not
// affect the other bucket.
@@ -499,10 +499,10 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch1->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch1));
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
- ASSERT_EQ(batch1->measurements().size(), 1);
- ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch1->measurements.size(), 1);
+ ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0);
// Insert before finish so there's a second batch live at the same time.
auto batch2 = _bucketCatalog
@@ -517,7 +517,7 @@ TEST_F(BucketCatalogTest, InsertBetweenPrepareAndFinish) {
ASSERT_NE(batch1, batch2);
_bucketCatalog->finish(batch1, {});
- ASSERT(batch1->finished());
+ ASSERT(isWriteBatchFinished(*batch1));
// Verify the second batch still commits one doc, and that the first batch only commited one.
_commit(batch2, 1);
@@ -549,7 +549,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, GetMetadataReturnsEmptyDoc) {
.getValue()
.batch;
- ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucket()));
+ ASSERT_BSONOBJ_EQ(BSONObj(), _bucketCatalog->getMetadata(batch->bucketHandle));
_commit(batch, 0);
}
@@ -564,11 +564,11 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
ASSERT(result.isOK());
auto batch = result.getValue().batch;
- auto oldId = batch->bucket().bucketId;
+ auto oldId = batch->bucketHandle.bucketId;
_commit(batch, 0);
- ASSERT_EQ(2U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
- ASSERT(batch->newFieldNamesToBeInserted().count(_timeField)) << batch->toBSON();
- ASSERT(batch->newFieldNamesToBeInserted().count("a")) << batch->toBSON();
+ ASSERT_EQ(2U, batch->newFieldNamesToBeInserted.size()) << batch->toBSON();
+ ASSERT(batch->newFieldNamesToBeInserted.count(_timeField)) << batch->toBSON();
+ ASSERT(batch->newFieldNamesToBeInserted.count("a")) << batch->toBSON();
// Inserting a new measurement with the same fields should return an empty set of new fields.
@@ -581,7 +581,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
ASSERT(result.isOK());
batch = result.getValue().batch;
_commit(batch, 1);
- ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
+ ASSERT_EQ(0U, batch->newFieldNamesToBeInserted.size()) << batch->toBSON();
// Insert a new measurement with the a new field.
result = _bucketCatalog->insert(_opCtx,
@@ -593,8 +593,8 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
ASSERT(result.isOK());
batch = result.getValue().batch;
_commit(batch, 2);
- ASSERT_EQ(1U, batch->newFieldNamesToBeInserted().size()) << batch->toBSON();
- ASSERT(batch->newFieldNamesToBeInserted().count("b")) << batch->toBSON();
+ ASSERT_EQ(1U, batch->newFieldNamesToBeInserted.size()) << batch->toBSON();
+ ASSERT(batch->newFieldNamesToBeInserted.count("b")) << batch->toBSON();
// Fill up the bucket.
for (auto i = 3; i < gTimeseriesBucketMaxCount; ++i) {
@@ -607,7 +607,7 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
ASSERT(result.isOK());
batch = result.getValue().batch;
_commit(batch, i);
- ASSERT_EQ(0U, batch->newFieldNamesToBeInserted().size()) << i << ":" << batch->toBSON();
+ ASSERT_EQ(0U, batch->newFieldNamesToBeInserted.size()) << i << ":" << batch->toBSON();
}
// When a bucket overflows, committing to the new overflow bucket should return the fields of
@@ -620,11 +620,11 @@ TEST_F(BucketCatalogWithoutMetadataTest, CommitReturnsNewFields) {
BSON(_timeField << Date_t::now() << "a" << gTimeseriesBucketMaxCount),
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
auto& batch2 = result2.getValue().batch;
- ASSERT_NE(oldId, batch2->bucket().bucketId);
+ ASSERT_NE(oldId, batch2->bucketHandle.bucketId);
_commit(batch2, 0);
- ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted().size()) << batch2->toBSON();
- ASSERT(batch2->newFieldNamesToBeInserted().count(_timeField)) << batch2->toBSON();
- ASSERT(batch2->newFieldNamesToBeInserted().count("a")) << batch2->toBSON();
+ ASSERT_EQ(2U, batch2->newFieldNamesToBeInserted.size()) << batch2->toBSON();
+ ASSERT(batch2->newFieldNamesToBeInserted.count(_timeField)) << batch2->toBSON();
+ ASSERT(batch2->newFieldNamesToBeInserted.count("a")) << batch2->toBSON();
}
TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
@@ -637,10 +637,10 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch1->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch1));
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
- ASSERT_EQ(batch1->measurements().size(), 1);
- ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch1->measurements.size(), 1);
+ ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0);
// Insert before finish so there's a second batch live at the same time.
auto batch2 = _bucketCatalog
@@ -654,14 +654,14 @@ TEST_F(BucketCatalogTest, AbortBatchOnBucketWithPreparedCommit) {
.batch;
ASSERT_NE(batch1, batch2);
- ASSERT(batch2->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch2));
_bucketCatalog->abort(batch2, {ErrorCodes::TimeseriesBucketCleared, ""});
- ASSERT(batch2->finished());
- ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+ ASSERT(isWriteBatchFinished(*batch2));
+ ASSERT_EQ(getWriteBatchResult(*batch2).getStatus(), ErrorCodes::TimeseriesBucketCleared);
_bucketCatalog->finish(batch1, {});
- ASSERT(batch1->finished());
- ASSERT_OK(batch1->getResult().getStatus());
+ ASSERT(isWriteBatchFinished(*batch1));
+ ASSERT_OK(getWriteBatchResult(*batch1).getStatus());
}
TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
@@ -674,13 +674,13 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
_bucketCatalog->clear(_ns1);
ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT(batch->finished());
- ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+ ASSERT(isWriteBatchFinished(*batch));
+ ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared);
batch = _bucketCatalog
->insert(_opCtx,
@@ -691,10 +691,10 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
- ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch->measurements.size(), 1);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0);
_bucketCatalog->clear(_ns1);
@@ -704,8 +704,8 @@ TEST_F(BucketCatalogTest, ClearNamespaceWithConcurrentWrites) {
// removed, and that's fine for our purposes. The finish just records the result to the batch
// and updates some statistics.
_bucketCatalog->finish(batch, {});
- ASSERT(batch->finished());
- ASSERT_OK(batch->getResult().getStatus());
+ ASSERT(isWriteBatchFinished(*batch));
+ ASSERT_OK(getWriteBatchResult(*batch).getStatus());
}
@@ -719,17 +719,17 @@ TEST_F(BucketCatalogTest, ClearBucketWithPreparedBatchThrowsConflict) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
- ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch->measurements.size(), 1);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0);
- ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch->bucket().bucketId.oid),
+ ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch->bucketHandle.bucketId.oid),
WriteConflictException);
_bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
- ASSERT(batch->finished());
- ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+ ASSERT(isWriteBatchFinished(*batch));
+ ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
@@ -742,10 +742,10 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch1->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch1));
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
- ASSERT_EQ(batch1->measurements().size(), 1);
- ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch1->measurements.size(), 1);
+ ASSERT_EQ(batch1->numPreviouslyCommittedMeasurements, 0);
// Insert before clear so there's a second batch live at the same time.
auto batch2 = _bucketCatalog
@@ -758,20 +758,20 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.getValue()
.batch;
ASSERT_NE(batch1, batch2);
- ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId);
+ ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId);
// Now clear the bucket. Since there's a prepared batch it should conflict.
- ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid),
+ ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucketHandle.bucketId.oid),
WriteConflictException);
// Now try to prepare the second batch. Ensure it aborts the batch.
- ASSERT(batch2->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch2));
ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch2));
- ASSERT(batch2->finished());
- ASSERT_EQ(batch2->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+ ASSERT(isWriteBatchFinished(*batch2));
+ ASSERT_EQ(getWriteBatchResult(*batch2).getStatus(), ErrorCodes::TimeseriesBucketCleared);
// Make sure we didn't clear the bucket state when we aborted the second batch.
- ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucket().bucketId.oid),
+ ASSERT_THROWS(_bucketCatalog->directWriteStart(_ns1, batch1->bucketHandle.bucketId.oid),
WriteConflictException);
// Make sure a subsequent insert, which opens a new bucket, doesn't corrupt the old bucket
@@ -787,15 +787,15 @@ TEST_F(BucketCatalogTest, PrepareCommitOnClearedBatchWithAlreadyPreparedBatch) {
.batch;
ASSERT_NE(batch1, batch3);
ASSERT_NE(batch2, batch3);
- ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId);
+ ASSERT_NE(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId);
// Clean up this batch
- ASSERT(batch3->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch3));
_bucketCatalog->abort(batch3, {ErrorCodes::TimeseriesBucketCleared, ""});
// Make sure we can finish the cleanly prepared batch.
_bucketCatalog->finish(batch1, {});
- ASSERT(batch1->finished());
- ASSERT_OK(batch1->getResult().getStatus());
+ ASSERT(isWriteBatchFinished(*batch1));
+ ASSERT_OK(getWriteBatchResult(*batch1).getStatus());
}
TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
@@ -808,15 +808,15 @@ TEST_F(BucketCatalogTest, PrepareCommitOnAlreadyAbortedBatch) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow)
.getValue()
.batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
_bucketCatalog->abort(batch, {ErrorCodes::TimeseriesBucketCleared, ""});
- ASSERT(batch->finished());
- ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+ ASSERT(isWriteBatchFinished(*batch));
+ ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared);
ASSERT_NOT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT(batch->finished());
- ASSERT_EQ(batch->getResult().getStatus(), ErrorCodes::TimeseriesBucketCleared);
+ ASSERT(isWriteBatchFinished(*batch));
+ ASSERT_EQ(getWriteBatchResult(*batch).getStatus(), ErrorCodes::TimeseriesBucketCleared);
}
TEST_F(BucketCatalogTest, CombiningWithInsertsFromOtherClients) {
@@ -891,8 +891,8 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
.getValue()
.batch;
- ASSERT(batch1->claimCommitRights());
- ASSERT(batch2->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch1));
+ ASSERT(claimWriteBatchCommitRights(*batch2));
// Batch 2 will not be able to commit until batch 1 has finished.
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
@@ -904,11 +904,11 @@ TEST_F(BucketCatalogTest, CannotConcurrentlyCommitBatchesForSameBucket) {
// Finish the first batch.
_bucketCatalog->finish(batch1, {});
- ASSERT(batch1->finished());
+ ASSERT(isWriteBatchFinished(*batch1));
}
_bucketCatalog->finish(batch2, {});
- ASSERT(batch2->finished());
+ ASSERT(isWriteBatchFinished(*batch2));
}
TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
@@ -940,12 +940,12 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue()
.batch;
- ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId);
- ASSERT_EQ(batch1->bucket().bucketId, batch3->bucket().bucketId);
+ ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId);
+ ASSERT_EQ(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId);
- ASSERT(batch1->claimCommitRights());
- ASSERT(batch2->claimCommitRights());
- ASSERT(batch3->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch1));
+ ASSERT(claimWriteBatchCommitRights(*batch2));
+ ASSERT(claimWriteBatchCommitRights(*batch3));
// Batch 2 will not be able to commit until batch 1 has finished.
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
@@ -961,12 +961,12 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
// recognize it has been aborted and clean up the bucket.
_bucketCatalog->abort(batch3, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
_bucketCatalog->finish(batch1, {});
- ASSERT(batch1->finished());
+ ASSERT(isWriteBatchFinished(*batch1));
}
// Wait for the batch 2 task to finish preparing commit. Since batch 1 finished, batch 2 should
// be unblocked. Note that after aborting batch 3, batch 2 was not in a prepared state, so we
// expect the prepareCommit() call to fail.
- ASSERT(batch2->finished());
+ ASSERT(isWriteBatchFinished(*batch2));
// Make sure a new batch ends up in a new bucket.
auto batch4 = _bucketCatalog
@@ -978,7 +978,7 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresBucketIsEventuallyClosed) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue()
.batch;
- ASSERT_NE(batch2->bucket().bucketId, batch4->bucket().bucketId);
+ ASSERT_NE(batch2->bucketHandle.bucketId, batch4->bucketHandle.bucketId);
}
TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
@@ -1003,17 +1003,17 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
.batch;
// Batch 1 and 2 use the same bucket.
- ASSERT_EQ(batch1->bucket().bucketId, batch2->bucket().bucketId);
- ASSERT(batch1->claimCommitRights());
- ASSERT(batch2->claimCommitRights());
+ ASSERT_EQ(batch1->bucketHandle.bucketId, batch2->bucketHandle.bucketId);
+ ASSERT(claimWriteBatchCommitRights(*batch1));
+ ASSERT(claimWriteBatchCommitRights(*batch2));
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
// Batch 1 will be in a prepared state now. Abort the second batch so that bucket 1 will be
// closed after batch 1 finishes.
_bucketCatalog->abort(batch2, Status{ErrorCodes::TimeseriesBucketCleared, "cleared"});
_bucketCatalog->finish(batch1, {});
- ASSERT(batch1->finished());
- ASSERT(batch2->finished());
+ ASSERT(isWriteBatchFinished(*batch1));
+ ASSERT(isWriteBatchFinished(*batch2));
// Ensure a batch started after batch 2 aborts, does not insert future measurements into the
// aborted batch/bucket.
@@ -1026,7 +1026,7 @@ TEST_F(BucketCatalogTest, AbortingBatchEnsuresNewInsertsGoToNewBucket) {
BucketCatalog::CombineWithInsertsFromOtherClients::kDisallow)
.getValue()
.batch;
- ASSERT_NE(batch1->bucket().bucketId, batch3->bucket().bucketId);
+ ASSERT_NE(batch1->bucketHandle.bucketId, batch3->bucketHandle.bucketId);
}
TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
@@ -1051,17 +1051,17 @@ TEST_F(BucketCatalogTest, DuplicateNewFieldNamesAcrossConcurrentBatches) {
.batch;
// Batch 2 is the first batch to commit the time field.
- ASSERT(batch2->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch2));
ASSERT_OK(_bucketCatalog->prepareCommit(batch2));
- ASSERT_EQ(batch2->newFieldNamesToBeInserted().size(), 1);
- ASSERT_EQ(batch2->newFieldNamesToBeInserted().begin()->first, _timeField);
+ ASSERT_EQ(batch2->newFieldNamesToBeInserted.size(), 1);
+ ASSERT_EQ(batch2->newFieldNamesToBeInserted.begin()->first, _timeField);
_bucketCatalog->finish(batch2, {});
// Batch 1 was the first batch to insert the time field, but by commit time it was already
// committed by batch 2.
- ASSERT(batch1->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch1));
ASSERT_OK(_bucketCatalog->prepareCommit(batch1));
- ASSERT(batch1->newFieldNamesToBeInserted().empty());
+ ASSERT(batch1->newFieldNamesToBeInserted.empty());
_bucketCatalog->finish(batch1, {});
}
@@ -1285,17 +1285,17 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertCompatibleMeasurement
ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
// The reopened bucket already contains three committed measurements.
- ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 3);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 3);
// Verify that the min and max is updated correctly when inserting new measurements.
- ASSERT_BSONOBJ_BINARY_EQ(batch->min(), BSON("u" << BSON("a" << -100)));
+ ASSERT_BSONOBJ_BINARY_EQ(batch->min, BSON("u" << BSON("a" << -100)));
ASSERT_BSONOBJ_BINARY_EQ(
- batch->max(),
+ batch->max,
BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100)));
_bucketCatalog->finish(batch, {});
@@ -1341,12 +1341,12 @@ TEST_F(BucketCatalogTest, ReopenUncompressedBucketAndInsertIncompatibleMeasureme
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
// Since the reopened bucket was incompatible, we opened a new one.
- ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0);
_bucketCatalog->finish(batch, {});
}
@@ -1397,17 +1397,17 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertCompatibleMeasurement)
ASSERT_EQ(0, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
// The reopened bucket already contains three committed measurements.
- ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 3);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 3);
// Verify that the min and max is updated correctly when inserting new measurements.
- ASSERT_BSONOBJ_BINARY_EQ(batch->min(), BSON("u" << BSON("a" << -100)));
+ ASSERT_BSONOBJ_BINARY_EQ(batch->min, BSON("u" << BSON("a" << -100)));
ASSERT_BSONOBJ_BINARY_EQ(
- batch->max(),
+ batch->max,
BSON("u" << BSON("time" << Date_t::fromMillisSinceEpoch(1654529680000) << "b" << 100)));
_bucketCatalog->finish(batch, {});
@@ -1459,12 +1459,12 @@ TEST_F(BucketCatalogTest, ReopenCompressedBucketAndInsertIncompatibleMeasurement
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumSchemaChanges));
auto batch = result.getValue().batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
// Since the reopened bucket was incompatible, we opened a new one.
- ASSERT_EQ(batch->numPreviouslyCommittedMeasurements(), 0);
+ ASSERT_EQ(batch->numPreviouslyCommittedMeasurements, 0);
_bucketCatalog->finish(batch, {});
}
@@ -1488,7 +1488,7 @@ TEST_F(BucketCatalogTest, ArchivingUnderMemoryPressure) {
BucketCatalog::CombineWithInsertsFromOtherClients::kAllow);
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
_bucketCatalog->finish(batch, {});
@@ -1576,10 +1576,10 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto bucketId = batch->bucket().bucketId;
- ASSERT(batch->claimCommitRights());
+ auto bucketId = batch->bucketHandle.bucketId;
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
_bucketCatalog->finish(batch, {});
// Time backwards should hint to re-open.
@@ -1621,11 +1621,11 @@ TEST_F(BucketCatalogTest, TryInsertWillNotCreateBucketWhenWeShouldTryToReopen) {
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumArchivedDueToMemoryThreshold));
ASSERT_EQ(0, _getExecutionStat(_ns1, kNumClosedDueToMemoryThreshold));
batch = result.getValue().batch;
- ASSERT_NE(batch->bucket().bucketId, bucketId);
+ ASSERT_NE(batch->bucketHandle.bucketId, bucketId);
ASSERT(batch);
- ASSERT(batch->claimCommitRights());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
_bucketCatalog->finish(batch, {});
// If we try to insert something that could fit in the archived bucket, we should get it back as
@@ -1660,10 +1660,10 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket)
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto bucketId = batch->bucket().bucketId;
- ASSERT(batch->claimCommitRights());
+ auto bucketId = batch->bucketHandle.bucketId;
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
_bucketCatalog->finish(batch, {});
// Incompatible schema would close the existing bucket, so we should expect to open a new bucket
@@ -1678,10 +1678,10 @@ TEST_F(BucketCatalogTest, TryInsertWillCreateBucketIfWeWouldCloseExistingBucket)
ASSERT_OK(result.getStatus());
batch = result.getValue().batch;
ASSERT(batch);
- ASSERT_NE(batch->bucket().bucketId, bucketId);
- ASSERT(batch->claimCommitRights());
+ ASSERT_NE(batch->bucketHandle.bucketId, bucketId);
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
_bucketCatalog->finish(batch, {});
}
@@ -1702,10 +1702,10 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto oldBucketId = batch->bucket().bucketId;
- ASSERT(batch->claimCommitRights());
+ auto oldBucketId = batch->bucketHandle.bucketId;
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
_bucketCatalog->finish(batch, {});
BSONObj bucketDoc = ::mongo::fromjson(
@@ -1733,10 +1733,10 @@ TEST_F(BucketCatalogTest, InsertIntoReopenedBucket) {
ASSERT_OK(result.getStatus());
batch = result.getValue().batch;
ASSERT(batch);
- ASSERT_EQ(batch->bucket().bucketId.oid, bucketDoc["_id"].OID());
- ASSERT(batch->claimCommitRights());
+ ASSERT_EQ(batch->bucketHandle.bucketId.oid, bucketDoc["_id"].OID());
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
_bucketCatalog->finish(batch, {});
// Verify the old bucket was soft-closed
ASSERT_EQ(1, _getExecutionStat(_ns1, kNumClosedDueToReopening));
@@ -1776,10 +1776,10 @@ TEST_F(BucketCatalogTest, CannotInsertIntoOutdatedBucket) {
ASSERT_OK(result.getStatus());
auto batch = result.getValue().batch;
ASSERT(batch);
- auto oldBucketId = batch->bucket().bucketId;
- ASSERT(batch->claimCommitRights());
+ auto oldBucketId = batch->bucketHandle.bucketId;
+ ASSERT(claimWriteBatchCommitRights(*batch));
ASSERT_OK(_bucketCatalog->prepareCommit(batch));
- ASSERT_EQ(batch->measurements().size(), 1);
+ ASSERT_EQ(batch->measurements.size(), 1);
_bucketCatalog->finish(batch, {});
BSONObj bucketDoc = ::mongo::fromjson(