summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp12
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp4
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp27
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_state.h5
-rw-r--r--src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp37
5 files changed, 73 insertions, 12 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
index 57c569e7d31..37cbf1d7935 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp
@@ -449,16 +449,14 @@ void BucketCatalog::directWriteStart(const NamespaceString& ns, const OID& oid)
BucketId{ns, oid},
[](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> {
if (input.has_value()) {
- return input.value().setFlag(BucketStateFlag::kPendingDirectWrite);
+ return input.value().addDirectWrite();
}
// The underlying bucket isn't tracked by the catalog, but we need to insert a state
// here so that we can conflict reopening this bucket until we've completed our write
// and the reader has refetched.
- return BucketState{}
- .setFlag(BucketStateFlag::kPendingDirectWrite)
- .setFlag(BucketStateFlag::kUntracked);
+ return BucketState{}.setFlag(BucketStateFlag::kUntracked).addDirectWrite();
});
- if (result && result.value().isPrepared()) {
+ if (result.has_value() && result.value().isPrepared()) {
hangTimeseriesDirectModificationBeforeWriteConflict.pauseWhileSet();
throwWriteConflictException("Prepared bucket can no longer be inserted into.");
}
@@ -483,9 +481,7 @@ void BucketCatalog::directWriteFinish(const NamespaceString& ns, const OID& oid)
// state.
return boost::none;
}
- return input.value()
- .unsetFlag(BucketStateFlag::kPendingDirectWrite)
- .setFlag(BucketStateFlag::kCleared);
+ return input.value().removeDirectWrite();
});
}
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp
index e62b095d365..cc796c7394f 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp
@@ -330,8 +330,8 @@ void handleDirectWrite(OperationContext* opCtx, const NamespaceString& ns, const
auto& bucketCatalog = BucketCatalog::get(opCtx);
bucketCatalog.directWriteStart(resolvedNs, bucketId);
- // Then register callbacks so we can let the BucketCatalog know that we are done with our
- // direct write after the actual write takes place (or is abandoned), and allow reopening.
+ // Then register callbacks so we can let the BucketCatalog know that we are done with our direct
+ // write after the actual write takes place (or is abandoned), and allow reopening.
opCtx->recoveryUnit()->onCommit(
[svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId](boost::optional<Timestamp>) {
auto& bucketCatalog = BucketCatalog::get(svcCtx);
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp
index 4f1321e30b7..73a3fcc86a4 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp
@@ -43,16 +43,39 @@ BucketState& BucketState::unsetFlag(BucketStateFlag flag) {
return *this;
}
+BucketState& BucketState::addDirectWrite() {
+ // Track the number of DirectWrites on the Bucket so we can properly unset the flag later on.
+ _numberOfDirectWrites++;
+ return setFlag(BucketStateFlag::kPendingDirectWrite);
+}
+
+BucketState& BucketState::removeDirectWrite() {
+ invariant(isSet(BucketStateFlag::kPendingDirectWrite));
+
+ // We only unset the 'kPendingDirectWrite' flag when the number of direct writers reaches 0.
+ _numberOfDirectWrites--;
+ if (_numberOfDirectWrites > 0) {
+ return *this;
+ }
+
+ // The last pending direct write must set the 'kCleared' flag.
+ return unsetFlag(BucketStateFlag::kPendingDirectWrite).setFlag(BucketStateFlag::kCleared);
+}
+
BucketState& BucketState::reset() {
_state = 0;
+ _numberOfDirectWrites = 0;
return *this;
}
+int32_t BucketState::getNumberOfDirectWrites() const {
+ return _numberOfDirectWrites;
+}
+
bool BucketState::isSet(BucketStateFlag flag) const {
return _state & static_cast<decltype(_state)>(flag);
}
-
bool BucketState::isPrepared() const {
constexpr decltype(_state) mask = static_cast<decltype(_state)>(BucketStateFlag::kPrepared);
return _state & mask;
@@ -103,7 +126,7 @@ std::string BucketState::toString() const {
}
if (isSet(BucketStateFlag::kPendingDirectWrite)) {
- output("pendingDirectWrite");
+ output("pendingDirectWrite(count=" + std::to_string(_numberOfDirectWrites) + ")");
}
if (isSet(BucketStateFlag::kUntracked)) {
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state.h b/src/mongo/db/timeseries/bucket_catalog/bucket_state.h
index d11f1bf67dc..f8976ddac5c 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_state.h
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state.h
@@ -55,8 +55,12 @@ class BucketState {
public:
BucketState& setFlag(BucketStateFlag);
BucketState& unsetFlag(BucketStateFlag);
+ BucketState& addDirectWrite();
+ BucketState& removeDirectWrite();
BucketState& reset();
+ int32_t getNumberOfDirectWrites() const;
+
bool isSet(BucketStateFlag) const;
bool isPrepared() const;
bool conflictsWithReopening() const;
@@ -67,6 +71,7 @@ public:
private:
std::underlying_type<BucketStateFlag>::type _state = 0;
+ int32_t _numberOfDirectWrites = 0;
};
} // namespace mongo::timeseries::bucket_catalog
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp
index d63356e97d5..58be5cbff5d 100644
--- a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp
+++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp
@@ -614,5 +614,42 @@ TEST_F(BucketStateRegistryTest, DirectWriteFinishRemovesBucketState) {
ASSERT_FALSE(state.has_value());
}
+TEST_F(BucketStateRegistryTest, TestDirectWriteStartCounter) {
+ RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements",
+ true};
+ auto bucket = createBucket(info1);
+ auto bucketId = bucket->bucketId;
+
+ // Under the hood, the BucketState will contain a counter on the number of ongoing DirectWrites.
+ int32_t dwCounter = 0;
+
+ // If no direct write has been initiated, the direct write counter should be 0.
+ auto state = getBucketState(_bucketStateRegistry, bucketId);
+ ASSERT_TRUE(state.has_value());
+ ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites());
+
+ // Start a direct write and ensure the counter is incremented correctly.
+ while (dwCounter < 4) {
+ directWriteStart(ns1, bucketId.oid);
+ dwCounter++;
+ state = getBucketState(_bucketStateRegistry, bucketId);
+ ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite));
+ ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites());
+ }
+
+ while (dwCounter > 1) {
+ directWriteFinish(ns1, bucketId.oid);
+ dwCounter--;
+ state = getBucketState(_bucketStateRegistry, bucketId);
+ ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite));
+ ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites());
+ }
+
+ // When the number of direct writes reaches 0, we should clear the bucket.
+ directWriteFinish(ns1, bucketId.oid);
+ state = getBucketState(_bucketStateRegistry, bucketId);
+ ASSERT_TRUE(hasBeenCleared(bucket));
+}
+
} // namespace
} // namespace mongo::timeseries::bucket_catalog