diff options
6 files changed, 397 insertions, 9 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index bf3001d84cd..fa5a4927e11 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -108,6 +108,7 @@ env.Library( 'shard_metadata_util.cpp', 'shard_server_catalog_cache_loader.cpp', 'shard_server_op_observer.cpp', + 'sharding_data_transform_cumulative_metrics.cpp', 'sharding_data_transform_metrics.cpp', 'sharding_data_transform_metrics_observer.cpp', 'sharding_initialization_mongod.cpp', @@ -508,6 +509,7 @@ env.CppUnitTest( 'shard_local_test.cpp', 'shard_metadata_util_test.cpp', 'shard_server_catalog_cache_loader_test.cpp', + 'sharding_data_transform_cumulative_metrics_test.cpp', 'sharding_initialization_mongod_test.cpp', 'sharding_initialization_op_observer_test.cpp', 'sharding_logging_test.cpp', diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp new file mode 100644 index 00000000000..33c5370f75d --- /dev/null +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp @@ -0,0 +1,66 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +ShardingDataTransformCumulativeMetrics::DeregistrationFunction +ShardingDataTransformCumulativeMetrics::registerInstanceMetrics(const InstanceObserver* metrics) { + auto it = insertMetrics(metrics); + return [=] { + stdx::unique_lock guard(_mutex); + _instanceMetrics.erase(it); + }; +} + +int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationRemainingTimeMillis() const { + stdx::unique_lock guard(_mutex); + if (_instanceMetrics.empty()) { + return 0; + } + return (*_instanceMetrics.begin())->getRemainingTimeMillis(); +} + +size_t ShardingDataTransformCumulativeMetrics::getObservedMetricsCount() const { + stdx::unique_lock guard(_mutex); + return _instanceMetrics.size(); +} + +ShardingDataTransformCumulativeMetrics::MetricsSet::iterator +ShardingDataTransformCumulativeMetrics::insertMetrics(const InstanceObserver* metrics) { + stdx::unique_lock guard(_mutex); + auto before = _instanceMetrics.size(); + auto it = _instanceMetrics.insert(_instanceMetrics.end(), metrics); + invariant(before + 1 == _instanceMetrics.size()); + return it; +} + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h new file mode 100644 index 00000000000..ffb5c7c3770 --- /dev/null +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/s/sharding_data_transform_metrics_observer.h" +#include "mongo/platform/mutex.h" +#include "mongo/util/functional.h" +#include <set> + +namespace mongo { + +class ShardingDataTransformCumulativeMetrics { +public: + using InstanceObserver = ShardingDataTransformMetricsObserverInterface; + using DeregistrationFunction = unique_function<void()>; + + [[nodiscard]] DeregistrationFunction registerInstanceMetrics(const InstanceObserver* metrics); + int64_t getOldestOperationRemainingTimeMillis() const; + size_t getObservedMetricsCount() const; + +private: + struct MetricsComparer { + inline bool operator()(const InstanceObserver* a, const InstanceObserver* b) const { + auto aTime = a->getStartTimestamp(); + auto bTime = b->getStartTimestamp(); + if (aTime == bTime) { + return a->getUuid() < b->getUuid(); + } + return aTime < bTime; + } + }; + using MetricsSet = std::set<const InstanceObserver*, MetricsComparer>; + + MetricsSet::iterator insertMetrics(const InstanceObserver* metrics); + + mutable Mutex _mutex; + MetricsSet _instanceMetrics; +}; + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp new file mode 100644 index 00000000000..17e167d8970 --- /dev/null +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp @@ -0,0 +1,244 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/random.h" +#include "mongo/stdx/thread.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/future.h" +#include "mongo/util/static_immortal.h" + +namespace mongo { +namespace { + +class ObserverMock : public ShardingDataTransformMetricsObserverInterface { +public: + ObserverMock(int64_t startTime, int64_t timeRemaining) + : _uuid{UUID::gen()}, _startTime{startTime}, _timeRemaining{timeRemaining} {} + + virtual const UUID& getUuid() const override { + return _uuid; + } + + virtual int64_t getRemainingTimeMillis() const override { + return _timeRemaining; + } + + virtual int64_t getStartTimestamp() const override { + return _startTime; + } + +private: + UUID _uuid; + int64_t _startTime; + int64_t _timeRemaining; +}; + +class ScopedObserverMock { +public: + using Ptr = std::unique_ptr<ScopedObserverMock>; + + ScopedObserverMock(int64_t startTime, + int64_t timeRemaining, + ShardingDataTransformCumulativeMetrics& parent) + : _mock{startTime, timeRemaining}, _deregister{parent.registerInstanceMetrics(&_mock)} {} + + ~ScopedObserverMock() { + if (_deregister) { + _deregister(); + } + } + +private: + ObserverMock _mock; + ShardingDataTransformCumulativeMetrics::DeregistrationFunction _deregister; +}; + +class ShardingDataTransformCumulativeMetricsTest : public unittest::Test { +protected: + constexpr static int64_t kYoungestTime = std::numeric_limits<int64_t>::max(); + constexpr static int64_t kOldestTime = 1; + + const ObserverMock* getYoungestObserver() { + static StaticImmortal<ObserverMock> youngest{kYoungestTime, kYoungestTime}; + return &youngest.value(); + } + + const ObserverMock* getOldestObserver() { + static StaticImmortal<ObserverMock> oldest{kOldestTime, kOldestTime}; + return &oldest.value(); + } + + using SpecialIndexBehaviorMap = stdx::unordered_map<int, std::function<void()>>; + const SpecialIndexBehaviorMap kNoSpecialBehavior{}; + SpecialIndexBehaviorMap registerAtIndex(int index, const ObserverMock* mock) { + return SpecialIndexBehaviorMap{ + {index, [=] { auto ignore = _metrics.registerInstanceMetrics(mock); }}}; + } + + void performRandomOperations(std::vector<ScopedObserverMock::Ptr>& inserted, + const int iterations, + const float removalOdds, + const int64_t seed, + const SpecialIndexBehaviorMap& specialBehaviors) { + constexpr auto kThresholdScale = 1000; + const auto kRemovalThreshold = kThresholdScale * removalOdds; + PseudoRandom rng(seed); + auto shouldPerformRemoval = [&] { + return (rng.nextInt32(kThresholdScale)) < kRemovalThreshold; + }; + auto performInsertion = [&] { + auto time = rng.nextInt64(kYoungestTime - 1) + 1; + inserted.emplace_back(std::make_unique<ScopedObserverMock>(time, time, _metrics)); + }; + auto performRemoval = [&] { + auto i = rng.nextInt32(inserted.size()); + inserted.erase(inserted.begin() + i); + }; + for (auto i = 0; i < iterations; i++) { + auto it = specialBehaviors.find(i); + if (it != specialBehaviors.end()) { + it->second(); + continue; + } + if (!inserted.empty() && shouldPerformRemoval()) { + performRemoval(); + } else { + performInsertion(); + } + } + } + + ShardingDataTransformCumulativeMetrics _metrics; +}; + +TEST_F(ShardingDataTransformCumulativeMetricsTest, AddAndRemoveMetrics) { + auto deregister = _metrics.registerInstanceMetrics(getOldestObserver()); + ASSERT_EQ(_metrics.getObservedMetricsCount(), 1); + deregister(); + ASSERT_EQ(_metrics.getObservedMetricsCount(), 0); +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedFirst) { + auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver()); + auto deregisterYoungest = _metrics.registerInstanceMetrics(getYoungestObserver()); + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime); +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedLast) { + auto deregisterYoungest = _metrics.registerInstanceMetrics(getYoungestObserver()); + auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver()); + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime); +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, RemainingTimeReports0WhenEmpty) { + ASSERT_EQ(_metrics.getObservedMetricsCount(), 0); + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), 0); +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, UpdatesOldestWhenOldestIsRemoved) { + auto deregisterYoungest = _metrics.registerInstanceMetrics(getYoungestObserver()); + auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver()); + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime); + deregisterOldest(); + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kYoungestTime); +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, InsertsTwoWithSameStartTime) { + auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver()); + ObserverMock sameAsOldest{kOldestTime, kOldestTime}; + auto deregisterOldest2 = _metrics.registerInstanceMetrics(&sameAsOldest); + ASSERT_EQ(_metrics.getObservedMetricsCount(), 2); + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime); +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, StillReportsOldestAfterRandomOperations) { + constexpr auto kIterations = 10000; + constexpr auto kRemovalOdds = 0.10f; + const auto seed = SecureRandom().nextInt64(); + LOGV2(6315200, "StillReportsOldestAfterRandomOperations", "seed"_attr = seed); + PseudoRandom rng(seed); + std::vector<ScopedObserverMock::Ptr> inserted; + performRandomOperations(inserted, + kIterations, + kRemovalOdds, + rng.nextInt64(), + registerAtIndex(rng.nextInt32(kIterations), getOldestObserver())); + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime); +} + +TEST_F(ShardingDataTransformCumulativeMetricsTest, + StillReportsOldestAfterRandomOperationsMultithreaded) { + constexpr auto kIterations = 10000; + constexpr auto kRemovalOdds = 0.10f; + constexpr auto kThreadCount = 10; + const auto seed = SecureRandom().nextInt64(); + LOGV2(6315201, "StillReportsOldestAfterRandomOperationsMultithreaded", "seed"_attr = seed); + PseudoRandom rng(seed); + const auto threadToInsertOldest = rng.nextInt32(kThreadCount); + std::vector<std::vector<ScopedObserverMock::Ptr>> threadStorage(kThreadCount); + std::vector<PromiseAndFuture<void>> threadPFs; + for (auto i = 0; i < kThreadCount; i++) { + threadPFs.emplace_back(makePromiseFuture<void>()); + } + std::vector<stdx::thread> threads; + for (auto i = 0; i < kThreadCount; i++) { + auto& storage = threadStorage[i]; + auto seed = rng.nextInt64(); + auto specialBehavior = (i == threadToInsertOldest) + ? registerAtIndex(rng.nextInt32(kIterations), getOldestObserver()) + : kNoSpecialBehavior; + auto& done = threadPFs[i].promise; + threads.emplace_back([=, &storage, specialBehavior = std::move(specialBehavior), &done] { + performRandomOperations(storage, kIterations, kRemovalOdds, seed, specialBehavior); + done.emplaceValue(); + }); + } + for (auto& pf : threadPFs) { + pf.future.wait(); + } + ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime); + size_t expectedCount = 1; // Special insert for kOldest is not counted in vector size. + for (auto& v : threadStorage) { + expectedCount += v.size(); + } + ASSERT_EQ(_metrics.getObservedMetricsCount(), expectedCount); + for (auto& t : threads) { + t.join(); + } +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp index 81ae095f8c9..91b1b94aeb6 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp +++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp @@ -31,11 +31,21 @@ #include "mongo/util/assert_util.h" namespace mongo { -long ShardingDataTransformMetricsObserver::getRemainingTimeMillis() const { + +ShardingDataTransformMetricsObserver::ShardingDataTransformMetricsObserver( + ShardingDataTransformInstanceMetrics* metrics) + : _metrics(metrics) {} + +int64_t ShardingDataTransformMetricsObserver::getRemainingTimeMillis() const { + MONGO_UNREACHABLE; +} + +int64_t ShardingDataTransformMetricsObserver::getStartTimestamp() const { MONGO_UNREACHABLE; } -long ShardingDataTransformMetricsObserver::getTimeRunningMillis() const { +const UUID& ShardingDataTransformMetricsObserver::getUuid() const { MONGO_UNREACHABLE; } + } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.h b/src/mongo/db/s/sharding_data_transform_metrics_observer.h index d85e52e988c..7eff3e2521b 100644 --- a/src/mongo/db/s/sharding_data_transform_metrics_observer.h +++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.h @@ -30,21 +30,22 @@ #pragma once #include "mongo/db/s/sharding_data_transform_metrics.h" +#include "mongo/util/uuid.h" namespace mongo { class ShardingDataTransformMetricsObserverInterface { public: - virtual long getRemainingTimeMillis() const = 0; - virtual long getTimeRunningMillis() const = 0; + virtual int64_t getRemainingTimeMillis() const = 0; + virtual int64_t getStartTimestamp() const = 0; + virtual const UUID& getUuid() const = 0; }; class ShardingDataTransformMetricsObserver : public ShardingDataTransformMetricsObserverInterface { public: - ShardingDataTransformMetricsObserver(ShardingDataTransformInstanceMetrics* metrics) - : _metrics(metrics) {} - long getRemainingTimeMillis() const override; - - long getTimeRunningMillis() const override; + ShardingDataTransformMetricsObserver(ShardingDataTransformInstanceMetrics* metrics); + int64_t getRemainingTimeMillis() const override; + int64_t getStartTimestamp() const override; + const UUID& getUuid() const override; private: ShardingDataTransformInstanceMetrics* _metrics; |