summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrett Nawrocki <brett.nawrocki@mongodb.com>2022-02-11 18:49:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-15 02:00:12 +0000
commit1fa6d9faa1812f78ce97bbd988fca10b10946ead (patch)
tree57538c0340bdfeefa91d9397d78339a1655e23dd
parent5c03327ab12e2ff0e1d49ce8a4517623dd8cc7ba (diff)
downloadmongo-1fa6d9faa1812f78ce97bbd988fca10b10946ead.tar.gz
SERVER-63152 Implement ShardingDataTransformCumulativeMetrics stub
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp66
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics.h65
-rw-r--r--src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp244
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_observer.cpp14
-rw-r--r--src/mongo/db/s/sharding_data_transform_metrics_observer.h15
6 files changed, 397 insertions, 9 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index bf3001d84cd..fa5a4927e11 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -108,6 +108,7 @@ env.Library(
'shard_metadata_util.cpp',
'shard_server_catalog_cache_loader.cpp',
'shard_server_op_observer.cpp',
+ 'sharding_data_transform_cumulative_metrics.cpp',
'sharding_data_transform_metrics.cpp',
'sharding_data_transform_metrics_observer.cpp',
'sharding_initialization_mongod.cpp',
@@ -508,6 +509,7 @@ env.CppUnitTest(
'shard_local_test.cpp',
'shard_metadata_util_test.cpp',
'shard_server_catalog_cache_loader_test.cpp',
+ 'sharding_data_transform_cumulative_metrics_test.cpp',
'sharding_initialization_mongod_test.cpp',
'sharding_initialization_op_observer_test.cpp',
'sharding_logging_test.cpp',
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
new file mode 100644
index 00000000000..33c5370f75d
--- /dev/null
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp
@@ -0,0 +1,66 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+ShardingDataTransformCumulativeMetrics::DeregistrationFunction
+ShardingDataTransformCumulativeMetrics::registerInstanceMetrics(const InstanceObserver* metrics) {
+ auto it = insertMetrics(metrics);
+ return [=] {
+ stdx::unique_lock guard(_mutex);
+ _instanceMetrics.erase(it);
+ };
+}
+
+int64_t ShardingDataTransformCumulativeMetrics::getOldestOperationRemainingTimeMillis() const {
+ stdx::unique_lock guard(_mutex);
+ if (_instanceMetrics.empty()) {
+ return 0;
+ }
+ return (*_instanceMetrics.begin())->getRemainingTimeMillis();
+}
+
+size_t ShardingDataTransformCumulativeMetrics::getObservedMetricsCount() const {
+ stdx::unique_lock guard(_mutex);
+ return _instanceMetrics.size();
+}
+
+ShardingDataTransformCumulativeMetrics::MetricsSet::iterator
+ShardingDataTransformCumulativeMetrics::insertMetrics(const InstanceObserver* metrics) {
+ stdx::unique_lock guard(_mutex);
+ auto before = _instanceMetrics.size();
+ auto it = _instanceMetrics.insert(_instanceMetrics.end(), metrics);
+ invariant(before + 1 == _instanceMetrics.size());
+ return it;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
new file mode 100644
index 00000000000..ffb5c7c3770
--- /dev/null
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h
@@ -0,0 +1,65 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/s/sharding_data_transform_metrics_observer.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/util/functional.h"
+#include <set>
+
+namespace mongo {
+
+class ShardingDataTransformCumulativeMetrics {
+public:
+ using InstanceObserver = ShardingDataTransformMetricsObserverInterface;
+ using DeregistrationFunction = unique_function<void()>;
+
+ [[nodiscard]] DeregistrationFunction registerInstanceMetrics(const InstanceObserver* metrics);
+ int64_t getOldestOperationRemainingTimeMillis() const;
+ size_t getObservedMetricsCount() const;
+
+private:
+ struct MetricsComparer {
+ inline bool operator()(const InstanceObserver* a, const InstanceObserver* b) const {
+ auto aTime = a->getStartTimestamp();
+ auto bTime = b->getStartTimestamp();
+ if (aTime == bTime) {
+ return a->getUuid() < b->getUuid();
+ }
+ return aTime < bTime;
+ }
+ };
+ using MetricsSet = std::set<const InstanceObserver*, MetricsComparer>;
+
+ MetricsSet::iterator insertMetrics(const InstanceObserver* metrics);
+
+ mutable Mutex _mutex;
+ MetricsSet _instanceMetrics;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
new file mode 100644
index 00000000000..17e167d8970
--- /dev/null
+++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp
@@ -0,0 +1,244 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_data_transform_cumulative_metrics.h"
+#include "mongo/logv2/log.h"
+#include "mongo/platform/random.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/future.h"
+#include "mongo/util/static_immortal.h"
+
+namespace mongo {
+namespace {
+
+class ObserverMock : public ShardingDataTransformMetricsObserverInterface {
+public:
+ ObserverMock(int64_t startTime, int64_t timeRemaining)
+ : _uuid{UUID::gen()}, _startTime{startTime}, _timeRemaining{timeRemaining} {}
+
+ virtual const UUID& getUuid() const override {
+ return _uuid;
+ }
+
+ virtual int64_t getRemainingTimeMillis() const override {
+ return _timeRemaining;
+ }
+
+ virtual int64_t getStartTimestamp() const override {
+ return _startTime;
+ }
+
+private:
+ UUID _uuid;
+ int64_t _startTime;
+ int64_t _timeRemaining;
+};
+
+class ScopedObserverMock {
+public:
+ using Ptr = std::unique_ptr<ScopedObserverMock>;
+
+ ScopedObserverMock(int64_t startTime,
+ int64_t timeRemaining,
+ ShardingDataTransformCumulativeMetrics& parent)
+ : _mock{startTime, timeRemaining}, _deregister{parent.registerInstanceMetrics(&_mock)} {}
+
+ ~ScopedObserverMock() {
+ if (_deregister) {
+ _deregister();
+ }
+ }
+
+private:
+ ObserverMock _mock;
+ ShardingDataTransformCumulativeMetrics::DeregistrationFunction _deregister;
+};
+
+class ShardingDataTransformCumulativeMetricsTest : public unittest::Test {
+protected:
+ constexpr static int64_t kYoungestTime = std::numeric_limits<int64_t>::max();
+ constexpr static int64_t kOldestTime = 1;
+
+ const ObserverMock* getYoungestObserver() {
+ static StaticImmortal<ObserverMock> youngest{kYoungestTime, kYoungestTime};
+ return &youngest.value();
+ }
+
+ const ObserverMock* getOldestObserver() {
+ static StaticImmortal<ObserverMock> oldest{kOldestTime, kOldestTime};
+ return &oldest.value();
+ }
+
+ using SpecialIndexBehaviorMap = stdx::unordered_map<int, std::function<void()>>;
+ const SpecialIndexBehaviorMap kNoSpecialBehavior{};
+ SpecialIndexBehaviorMap registerAtIndex(int index, const ObserverMock* mock) {
+ return SpecialIndexBehaviorMap{
+ {index, [=] { auto ignore = _metrics.registerInstanceMetrics(mock); }}};
+ }
+
+ void performRandomOperations(std::vector<ScopedObserverMock::Ptr>& inserted,
+ const int iterations,
+ const float removalOdds,
+ const int64_t seed,
+ const SpecialIndexBehaviorMap& specialBehaviors) {
+ constexpr auto kThresholdScale = 1000;
+ const auto kRemovalThreshold = kThresholdScale * removalOdds;
+ PseudoRandom rng(seed);
+ auto shouldPerformRemoval = [&] {
+ return (rng.nextInt32(kThresholdScale)) < kRemovalThreshold;
+ };
+ auto performInsertion = [&] {
+ auto time = rng.nextInt64(kYoungestTime - 1) + 1;
+ inserted.emplace_back(std::make_unique<ScopedObserverMock>(time, time, _metrics));
+ };
+ auto performRemoval = [&] {
+ auto i = rng.nextInt32(inserted.size());
+ inserted.erase(inserted.begin() + i);
+ };
+ for (auto i = 0; i < iterations; i++) {
+ auto it = specialBehaviors.find(i);
+ if (it != specialBehaviors.end()) {
+ it->second();
+ continue;
+ }
+ if (!inserted.empty() && shouldPerformRemoval()) {
+ performRemoval();
+ } else {
+ performInsertion();
+ }
+ }
+ }
+
+ ShardingDataTransformCumulativeMetrics _metrics;
+};
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest, AddAndRemoveMetrics) {
+ auto deregister = _metrics.registerInstanceMetrics(getOldestObserver());
+ ASSERT_EQ(_metrics.getObservedMetricsCount(), 1);
+ deregister();
+ ASSERT_EQ(_metrics.getObservedMetricsCount(), 0);
+}
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedFirst) {
+ auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver());
+ auto deregisterYoungest = _metrics.registerInstanceMetrics(getYoungestObserver());
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime);
+}
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest, MetricsReportsOldestWhenInsertedLast) {
+ auto deregisterYoungest = _metrics.registerInstanceMetrics(getYoungestObserver());
+ auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver());
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime);
+}
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest, RemainingTimeReports0WhenEmpty) {
+ ASSERT_EQ(_metrics.getObservedMetricsCount(), 0);
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), 0);
+}
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest, UpdatesOldestWhenOldestIsRemoved) {
+ auto deregisterYoungest = _metrics.registerInstanceMetrics(getYoungestObserver());
+ auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver());
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime);
+ deregisterOldest();
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kYoungestTime);
+}
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest, InsertsTwoWithSameStartTime) {
+ auto deregisterOldest = _metrics.registerInstanceMetrics(getOldestObserver());
+ ObserverMock sameAsOldest{kOldestTime, kOldestTime};
+ auto deregisterOldest2 = _metrics.registerInstanceMetrics(&sameAsOldest);
+ ASSERT_EQ(_metrics.getObservedMetricsCount(), 2);
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime);
+}
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest, StillReportsOldestAfterRandomOperations) {
+ constexpr auto kIterations = 10000;
+ constexpr auto kRemovalOdds = 0.10f;
+ const auto seed = SecureRandom().nextInt64();
+ LOGV2(6315200, "StillReportsOldestAfterRandomOperations", "seed"_attr = seed);
+ PseudoRandom rng(seed);
+ std::vector<ScopedObserverMock::Ptr> inserted;
+ performRandomOperations(inserted,
+ kIterations,
+ kRemovalOdds,
+ rng.nextInt64(),
+ registerAtIndex(rng.nextInt32(kIterations), getOldestObserver()));
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime);
+}
+
+TEST_F(ShardingDataTransformCumulativeMetricsTest,
+ StillReportsOldestAfterRandomOperationsMultithreaded) {
+ constexpr auto kIterations = 10000;
+ constexpr auto kRemovalOdds = 0.10f;
+ constexpr auto kThreadCount = 10;
+ const auto seed = SecureRandom().nextInt64();
+ LOGV2(6315201, "StillReportsOldestAfterRandomOperationsMultithreaded", "seed"_attr = seed);
+ PseudoRandom rng(seed);
+ const auto threadToInsertOldest = rng.nextInt32(kThreadCount);
+ std::vector<std::vector<ScopedObserverMock::Ptr>> threadStorage(kThreadCount);
+ std::vector<PromiseAndFuture<void>> threadPFs;
+ for (auto i = 0; i < kThreadCount; i++) {
+ threadPFs.emplace_back(makePromiseFuture<void>());
+ }
+ std::vector<stdx::thread> threads;
+ for (auto i = 0; i < kThreadCount; i++) {
+ auto& storage = threadStorage[i];
+ auto seed = rng.nextInt64();
+ auto specialBehavior = (i == threadToInsertOldest)
+ ? registerAtIndex(rng.nextInt32(kIterations), getOldestObserver())
+ : kNoSpecialBehavior;
+ auto& done = threadPFs[i].promise;
+ threads.emplace_back([=, &storage, specialBehavior = std::move(specialBehavior), &done] {
+ performRandomOperations(storage, kIterations, kRemovalOdds, seed, specialBehavior);
+ done.emplaceValue();
+ });
+ }
+ for (auto& pf : threadPFs) {
+ pf.future.wait();
+ }
+ ASSERT_EQ(_metrics.getOldestOperationRemainingTimeMillis(), kOldestTime);
+ size_t expectedCount = 1; // Special insert for kOldest is not counted in vector size.
+ for (auto& v : threadStorage) {
+ expectedCount += v.size();
+ }
+ ASSERT_EQ(_metrics.getObservedMetricsCount(), expectedCount);
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
index 81ae095f8c9..91b1b94aeb6 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
+++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.cpp
@@ -31,11 +31,21 @@
#include "mongo/util/assert_util.h"
namespace mongo {
-long ShardingDataTransformMetricsObserver::getRemainingTimeMillis() const {
+
+ShardingDataTransformMetricsObserver::ShardingDataTransformMetricsObserver(
+ ShardingDataTransformInstanceMetrics* metrics)
+ : _metrics(metrics) {}
+
+int64_t ShardingDataTransformMetricsObserver::getRemainingTimeMillis() const {
+ MONGO_UNREACHABLE;
+}
+
+int64_t ShardingDataTransformMetricsObserver::getStartTimestamp() const {
MONGO_UNREACHABLE;
}
-long ShardingDataTransformMetricsObserver::getTimeRunningMillis() const {
+const UUID& ShardingDataTransformMetricsObserver::getUuid() const {
MONGO_UNREACHABLE;
}
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_data_transform_metrics_observer.h b/src/mongo/db/s/sharding_data_transform_metrics_observer.h
index d85e52e988c..7eff3e2521b 100644
--- a/src/mongo/db/s/sharding_data_transform_metrics_observer.h
+++ b/src/mongo/db/s/sharding_data_transform_metrics_observer.h
@@ -30,21 +30,22 @@
#pragma once
#include "mongo/db/s/sharding_data_transform_metrics.h"
+#include "mongo/util/uuid.h"
namespace mongo {
class ShardingDataTransformMetricsObserverInterface {
public:
- virtual long getRemainingTimeMillis() const = 0;
- virtual long getTimeRunningMillis() const = 0;
+ virtual int64_t getRemainingTimeMillis() const = 0;
+ virtual int64_t getStartTimestamp() const = 0;
+ virtual const UUID& getUuid() const = 0;
};
class ShardingDataTransformMetricsObserver : public ShardingDataTransformMetricsObserverInterface {
public:
- ShardingDataTransformMetricsObserver(ShardingDataTransformInstanceMetrics* metrics)
- : _metrics(metrics) {}
- long getRemainingTimeMillis() const override;
-
- long getTimeRunningMillis() const override;
+ ShardingDataTransformMetricsObserver(ShardingDataTransformInstanceMetrics* metrics);
+ int64_t getRemainingTimeMillis() const override;
+ int64_t getStartTimestamp() const override;
+ const UUID& getUuid() const override;
private:
ShardingDataTransformInstanceMetrics* _metrics;