/** * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest #include "mongo/platform/basic.h" #include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" #include "mongo/db/s/sharding_data_transform_instance_metrics.h" #include "mongo/logv2/log.h" #include "mongo/platform/random.h" #include "mongo/stdx/thread.h" #include "mongo/stdx/unordered_map.h" #include "mongo/unittest/unittest.h" #include "mongo/util/clock_source_mock.h" #include "mongo/util/future.h" #include "mongo/util/static_immortal.h" namespace mongo { class ObserverMock : public ShardingDataTransformMetricsObserverInterface { public: constexpr static auto kDefaultRole = ShardingDataTransformMetrics::Role::kCoordinator; ObserverMock(Date_t startTime, int64_t timeRemaining) : ObserverMock{startTime, timeRemaining, timeRemaining, kDefaultRole} {} ObserverMock(Date_t startTime, int64_t timeRemainingHigh, int64_t timeRemainingLow, ShardingDataTransformMetrics::Role role) : _uuid{UUID::gen()}, _startTime{startTime}, _timeRemainingHigh{timeRemainingHigh}, _timeRemainingLow{timeRemainingLow}, _role{role} { invariant(timeRemainingHigh >= timeRemainingLow); } virtual const UUID& getUuid() const override { return _uuid; } virtual int64_t getHighEstimateRemainingTimeMillis() const override { return _timeRemainingHigh; } virtual int64_t getLowEstimateRemainingTimeMillis() const override { return _timeRemainingLow; } virtual Date_t getStartTimestamp() const override { return _startTime; } virtual ShardingDataTransformMetrics::Role getRole() const override { return _role; } private: UUID _uuid; Date_t _startTime; int64_t _timeRemainingHigh; int64_t _timeRemainingLow; ShardingDataTransformMetrics::Role _role; }; class ShardingDataTransformMetricsTestFixture : public unittest::Test { protected: constexpr static auto kTestMetricsName = "testMetrics"; constexpr static auto kYoungestTime = Date_t::fromMillisSinceEpoch(std::numeric_limits::max()); constexpr static int64_t kYoungestTimeLeft = 5000; constexpr static auto kOldestTime = Date_t::fromMillisSinceEpoch(std::numeric_limits::min()); constexpr static int64_t kOldestTimeLeft = 3000; using Role = ShardingDataTransformInstanceMetrics::Role; const NamespaceString kTestNamespace = NamespaceString("test.source"); const BSONObj kTestCommand = BSON("command" << "test"); ShardingDataTransformMetricsTestFixture() : _cumulativeMetrics{kTestMetricsName} {} const ObserverMock* getYoungestObserver() { static StaticImmortal youngest{kYoungestTime, kYoungestTimeLeft}; return &youngest.value(); } const ObserverMock* getOldestObserver() { static StaticImmortal oldest{kOldestTime, kOldestTimeLeft}; return &oldest.value(); } ClockSourceMock* getClockSource() { static StaticImmortal clock; return &clock.value(); } using SpecialIndexBehaviorMap = stdx::unordered_map>; const SpecialIndexBehaviorMap kNoSpecialBehavior{}; SpecialIndexBehaviorMap registerAtIndex(int index, const ObserverMock* mock) { return SpecialIndexBehaviorMap{ {index, [=] { auto ignore = _cumulativeMetrics.registerInstanceMetrics(mock); }}}; } template void performRandomOperations(std::vector>& inserted, const int iterations, const float removalOdds, const int64_t seed, const SpecialIndexBehaviorMap& specialBehaviors) { constexpr auto kThresholdScale = 1000; const auto kRemovalThreshold = kThresholdScale * removalOdds; PseudoRandom rng(seed); auto shouldPerformRemoval = [&] { return (rng.nextInt32(kThresholdScale)) < kRemovalThreshold; }; auto performInsertion = [&] { auto timeLeft = rng.nextInt64(std::numeric_limits::max()); auto startTime = rng.nextInt64(); startTime = (startTime == kOldestTime.asInt64()) ? startTime + 1 : startTime; inserted.emplace_back( std::make_unique(Date_t::fromMillisSinceEpoch(startTime), timeLeft, getClockSource(), &_cumulativeMetrics)); }; auto performRemoval = [&] { auto i = rng.nextInt32(inserted.size()); inserted.erase(inserted.begin() + i); }; for (auto i = 0; i < iterations; i++) { auto it = specialBehaviors.find(i); if (it != specialBehaviors.end()) { it->second(); continue; } if (!inserted.empty() && shouldPerformRemoval()) { performRemoval(); } else { performInsertion(); } } } template void doRandomOperationsTest() { constexpr auto kIterations = 10000; constexpr auto kRemovalOdds = 0.10f; const auto seed = SecureRandom().nextInt64(); LOGV2(6315200, "StillReportsOldestAfterRandomOperations", "seed"_attr = seed); PseudoRandom rng(seed); std::vector> inserted; performRandomOperations(inserted, kIterations, kRemovalOdds, rng.nextInt64(), registerAtIndex(rng.nextInt32(kIterations), getOldestObserver())); ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); } template void doRandomOperationsMultithreadedTest() { constexpr auto kIterations = 10000; constexpr auto kRemovalOdds = 0.10f; constexpr auto kThreadCount = 10; const auto seed = SecureRandom().nextInt64(); LOGV2(6315201, "StillReportsOldestAfterRandomOperationsMultithreaded", "seed"_attr = seed); PseudoRandom rng(seed); const auto threadToInsertOldest = rng.nextInt32(kThreadCount); std::vector>> threadStorage(kThreadCount); std::vector> threadPFs; for (auto i = 0; i < kThreadCount; i++) { threadPFs.emplace_back(makePromiseFuture()); } std::vector threads; for (auto i = 0; i < kThreadCount; i++) { auto& storage = threadStorage[i]; auto seed = rng.nextInt64(); auto specialBehavior = (i == threadToInsertOldest) ? registerAtIndex(rng.nextInt32(kIterations), getOldestObserver()) : kNoSpecialBehavior; auto& done = threadPFs[i].promise; threads.emplace_back( [=, &storage, specialBehavior = std::move(specialBehavior), &done] { performRandomOperations( storage, kIterations, kRemovalOdds, seed, specialBehavior); done.emplaceValue(); }); } for (auto& pf : threadPFs) { pf.future.wait(); } ASSERT_EQ(_cumulativeMetrics.getOldestOperationHighEstimateRemainingTimeMillis( ObserverMock::kDefaultRole), kOldestTimeLeft); size_t expectedCount = 1; // Special insert for kOldest is not counted in vector size. for (auto& v : threadStorage) { expectedCount += v.size(); } ASSERT_EQ(_cumulativeMetrics.getObservedMetricsCount(), expectedCount); for (auto& t : threads) { t.join(); } } ShardingDataTransformCumulativeMetrics _cumulativeMetrics; }; } // namespace mongo