diff options
Diffstat (limited to 'src/mongo')
5 files changed, 73 insertions, 12 deletions
diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp index 57c569e7d31..37cbf1d7935 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog.cpp @@ -449,16 +449,14 @@ void BucketCatalog::directWriteStart(const NamespaceString& ns, const OID& oid) BucketId{ns, oid}, [](boost::optional<BucketState> input, std::uint64_t) -> boost::optional<BucketState> { if (input.has_value()) { - return input.value().setFlag(BucketStateFlag::kPendingDirectWrite); + return input.value().addDirectWrite(); } // The underlying bucket isn't tracked by the catalog, but we need to insert a state // here so that we can conflict reopening this bucket until we've completed our write // and the reader has refetched. - return BucketState{} - .setFlag(BucketStateFlag::kPendingDirectWrite) - .setFlag(BucketStateFlag::kUntracked); + return BucketState{}.setFlag(BucketStateFlag::kUntracked).addDirectWrite(); }); - if (result && result.value().isPrepared()) { + if (result.has_value() && result.value().isPrepared()) { hangTimeseriesDirectModificationBeforeWriteConflict.pauseWhileSet(); throwWriteConflictException("Prepared bucket can no longer be inserted into."); } @@ -483,9 +481,7 @@ void BucketCatalog::directWriteFinish(const NamespaceString& ns, const OID& oid) // state. return boost::none; } - return input.value() - .unsetFlag(BucketStateFlag::kPendingDirectWrite) - .setFlag(BucketStateFlag::kCleared); + return input.value().removeDirectWrite(); }); } diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp index e62b095d365..cc796c7394f 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_catalog_helpers.cpp @@ -330,8 +330,8 @@ void handleDirectWrite(OperationContext* opCtx, const NamespaceString& ns, const auto& bucketCatalog = BucketCatalog::get(opCtx); bucketCatalog.directWriteStart(resolvedNs, bucketId); - // Then register callbacks so we can let the BucketCatalog know that we are done with our - // direct write after the actual write takes place (or is abandoned), and allow reopening. + // Then register callbacks so we can let the BucketCatalog know that we are done with our direct + // write after the actual write takes place (or is abandoned), and allow reopening. opCtx->recoveryUnit()->onCommit( [svcCtx = opCtx->getServiceContext(), resolvedNs, bucketId](boost::optional<Timestamp>) { auto& bucketCatalog = BucketCatalog::get(svcCtx); diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp index 4f1321e30b7..73a3fcc86a4 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state.cpp @@ -43,16 +43,39 @@ BucketState& BucketState::unsetFlag(BucketStateFlag flag) { return *this; } +BucketState& BucketState::addDirectWrite() { + // Track the number of DirectWrites on the Bucket so we can properly unset the flag later on. + _numberOfDirectWrites++; + return setFlag(BucketStateFlag::kPendingDirectWrite); +} + +BucketState& BucketState::removeDirectWrite() { + invariant(isSet(BucketStateFlag::kPendingDirectWrite)); + + // We only unset the 'kPendingDirectWrite' flag when the number of direct writers reaches 0. + _numberOfDirectWrites--; + if (_numberOfDirectWrites > 0) { + return *this; + } + + // The last pending direct write must set the 'kCleared' flag. + return unsetFlag(BucketStateFlag::kPendingDirectWrite).setFlag(BucketStateFlag::kCleared); +} + BucketState& BucketState::reset() { _state = 0; + _numberOfDirectWrites = 0; return *this; } +int32_t BucketState::getNumberOfDirectWrites() const { + return _numberOfDirectWrites; +} + bool BucketState::isSet(BucketStateFlag flag) const { return _state & static_cast<decltype(_state)>(flag); } - bool BucketState::isPrepared() const { constexpr decltype(_state) mask = static_cast<decltype(_state)>(BucketStateFlag::kPrepared); return _state & mask; @@ -103,7 +126,7 @@ std::string BucketState::toString() const { } if (isSet(BucketStateFlag::kPendingDirectWrite)) { - output("pendingDirectWrite"); + output("pendingDirectWrite(count=" + std::to_string(_numberOfDirectWrites) + ")"); } if (isSet(BucketStateFlag::kUntracked)) { diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state.h b/src/mongo/db/timeseries/bucket_catalog/bucket_state.h index d11f1bf67dc..f8976ddac5c 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_state.h +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state.h @@ -55,8 +55,12 @@ class BucketState { public: BucketState& setFlag(BucketStateFlag); BucketState& unsetFlag(BucketStateFlag); + BucketState& addDirectWrite(); + BucketState& removeDirectWrite(); BucketState& reset(); + int32_t getNumberOfDirectWrites() const; + bool isSet(BucketStateFlag) const; bool isPrepared() const; bool conflictsWithReopening() const; @@ -67,6 +71,7 @@ public: private: std::underlying_type<BucketStateFlag>::type _state = 0; + int32_t _numberOfDirectWrites = 0; }; } // namespace mongo::timeseries::bucket_catalog diff --git a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp index d63356e97d5..58be5cbff5d 100644 --- a/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp +++ b/src/mongo/db/timeseries/bucket_catalog/bucket_state_registry_test.cpp @@ -614,5 +614,42 @@ TEST_F(BucketStateRegistryTest, DirectWriteFinishRemovesBucketState) { ASSERT_FALSE(state.has_value()); } +TEST_F(BucketStateRegistryTest, TestDirectWriteStartCounter) { + RAIIServerParameterControllerForTest controller{"featureFlagTimeseriesScalabilityImprovements", + true}; + auto bucket = createBucket(info1); + auto bucketId = bucket->bucketId; + + // Under the hood, the BucketState will contain a counter on the number of ongoing DirectWrites. + int32_t dwCounter = 0; + + // If no direct write has been initiated, the direct write counter should be 0. + auto state = getBucketState(_bucketStateRegistry, bucketId); + ASSERT_TRUE(state.has_value()); + ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites()); + + // Start a direct write and ensure the counter is incremented correctly. + while (dwCounter < 4) { + directWriteStart(ns1, bucketId.oid); + dwCounter++; + state = getBucketState(_bucketStateRegistry, bucketId); + ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); + ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites()); + } + + while (dwCounter > 1) { + directWriteFinish(ns1, bucketId.oid); + dwCounter--; + state = getBucketState(_bucketStateRegistry, bucketId); + ASSERT_TRUE(state.value().isSet(BucketStateFlag::kPendingDirectWrite)); + ASSERT_EQ(dwCounter, state.value().getNumberOfDirectWrites()); + } + + // When the number of direct writes reaches 0, we should clear the bucket. + directWriteFinish(ns1, bucketId.oid); + state = getBucketState(_bucketStateRegistry, bucketId); + ASSERT_TRUE(hasBeenCleared(bucket)); +} + } // namespace } // namespace mongo::timeseries::bucket_catalog |