diff options
author | Randolph Tan <randolph@10gen.com> | 2022-04-22 21:15:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-22 22:13:00 +0000 |
commit | 5e9dcbad6a8b7f1443c824c3bd9d3498cfeae334 (patch) | |
tree | 4a992508b01575136640a39e816ed1d37bd29c4d /src/mongo/db | |
parent | 8330369765ad317d27f05277877be521bdb921de (diff) | |
download | mongo-5e9dcbad6a8b7f1443c824c3bd9d3498cfeae334.tar.gz |
SERVER-64386 Track and Report New Coordinator Fields in ServerStatus
Diffstat (limited to 'src/mongo/db')
4 files changed, 328 insertions, 8 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index c826c3dbb85..64d64da074b 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -564,6 +564,43 @@ BSONObj makeFlushRoutingTableCacheUpdatesCmd(const NamespaceString& nss) { BSON(WriteConcernOptions::kWriteConcernField << kMajorityWriteConcern.toBSON())); } +ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum toMetricsState( + CoordinatorStateEnum enumVal) { + switch (enumVal) { + case CoordinatorStateEnum::kUnused: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused; + + case CoordinatorStateEnum::kInitializing: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kInitializing; + + case CoordinatorStateEnum::kPreparingToDonate: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kPreparingToDonate; + + case CoordinatorStateEnum::kCloning: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCloning; + + case CoordinatorStateEnum::kApplying: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kApplying; + + case CoordinatorStateEnum::kBlockingWrites: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kBlockingWrites; + + case CoordinatorStateEnum::kAborting: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kAborting; + + case CoordinatorStateEnum::kCommitting: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kCommitting; + + case CoordinatorStateEnum::kDone: + return ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kDone; + default: + invariant(false, + str::stream() << "Unexpected resharding coordinator state: " + << CoordinatorState_serializer(enumVal)); + MONGO_UNREACHABLE; + } +} + } // namespace namespace resharding { @@ -792,6 +829,12 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, opCtx, updatedCoordinatorDoc, boost::none, boost::none, txnNumber); }, ShardingCatalogClient::kLocalWriteConcern); + + if (ShardingDataTransformMetrics::isEnabled()) { + ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) + ->onCoordinatorStateTransition(toMetricsState(coordinatorDoc.getState()), + toMetricsState(updatedCoordinatorDoc.getState())); + } } } // namespace resharding @@ -999,6 +1042,11 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator( if (coordinatorDoc.getState() > CoordinatorStateEnum::kInitializing) { _reshardingCoordinatorObserver->onReshardingParticipantTransition(coordinatorDoc); } + + if (ShardingDataTransformMetrics::isEnabled()) { + ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) + ->onCoordinatorStateTransition(boost::none, toMetricsState(coordinatorDoc.getState())); + } } void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( @@ -1020,7 +1068,15 @@ void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( "collectionUUID"_attr = doc.getSourceUUID(), "reshardingUUID"_attr = doc.getReshardingUUID()); + const auto previousState = _coordinatorDoc.getState(); _coordinatorDoc = doc; + + if (ShardingDataTransformMetrics::isEnabled()) { + ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) + ->onCoordinatorStateTransition(toMetricsState(previousState), + toMetricsState(_coordinatorDoc.getState())); + } + ShardingLogging::get(opCtx)->logChange(opCtx, "resharding.coordinator.transition", doc.getSourceNss().toString(), @@ -1373,6 +1429,12 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( } _reshardingCoordinatorObserver->interrupt(status); } + + if (ShardingDataTransformMetrics::isEnabled()) { + ShardingDataTransformCumulativeMetrics::getForResharding(cc().getServiceContext()) + ->onCoordinatorStateTransition(toMetricsState(_coordinatorDoc.getState()), + boost::none); + } }) .semi(); } diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp index 9e53bfd346b..877aca139fd 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.cpp @@ -29,6 +29,7 @@ #include "mongo/db/s/sharding_data_transform_cumulative_metrics.h" #include "mongo/util/assert_util.h" +#include <cstdint> namespace mongo { @@ -132,6 +133,7 @@ const auto getMetrics = ServiceContext::declareDecoration<MetricsPtr>(); const auto metricsRegisterer = ServiceContext::ConstructorActionRegisterer{ "ShardingDataTransformMetrics", [](ServiceContext* ctx) { getMetrics(ctx) = std::make_unique<Metrics>(); }}; + } // namespace ShardingDataTransformCumulativeMetrics* ShardingDataTransformCumulativeMetrics::getForResharding( @@ -150,7 +152,15 @@ ShardingDataTransformCumulativeMetrics::ShardingDataTransformCumulativeMetrics( const std::string& rootSectionName) : _rootSectionName{rootSectionName}, _instanceMetricsForAllRoles(ShardingDataTransformMetrics::kRoleCount), - _operationWasAttempted{false} {} + _operationWasAttempted{false}, + _coordinatorStateList{AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}, + AtomicWord<int64_t>{0}} {} ShardingDataTransformCumulativeMetrics::DeregistrationFunction ShardingDataTransformCumulativeMetrics::registerInstanceMetrics(const InstanceObserver* metrics) { @@ -253,13 +263,19 @@ void ShardingDataTransformCumulativeMetrics::reportLatencies(BSONObjBuilder* bob void ShardingDataTransformCumulativeMetrics::reportCurrentInSteps(BSONObjBuilder* bob) const { BSONObjBuilder s(bob->subobjStart(kCurrentInSteps)); - s.append(kCountInstancesInCoordinatorState1Initializing, kPlaceholderInt); - s.append(kCountInstancesInCoordinatorState2PreparingToDonate, kPlaceholderInt); - s.append(kCountInstancesInCoordinatorState3Cloning, kPlaceholderInt); - s.append(kCountInstancesInCoordinatorState4Applying, kPlaceholderInt); - s.append(kCountInstancesInCoordinatorState5BlockingWrites, kPlaceholderInt); - s.append(kCountInstancesInCoordinatorState6Aborting, kPlaceholderInt); - s.append(kCountInstancesInCoordinatorState7Committing, kPlaceholderInt); + + auto reportCoordinatorState = [this, &s](auto state) { + s.append(fieldNameFor(state), getCoordinatorStateCounter(state)->load()); + }; + + reportCoordinatorState(CoordinatorStateEnum::kInitializing); + reportCoordinatorState(CoordinatorStateEnum::kPreparingToDonate); + reportCoordinatorState(CoordinatorStateEnum::kCloning); + reportCoordinatorState(CoordinatorStateEnum::kApplying); + reportCoordinatorState(CoordinatorStateEnum::kBlockingWrites); + reportCoordinatorState(CoordinatorStateEnum::kAborting); + reportCoordinatorState(CoordinatorStateEnum::kCommitting); + s.append(kCountInstancesInRecipientState1AwaitingFetchTimestamp, kPlaceholderInt); s.append(kCountInstancesInRecipientState2CreatingCollection, kPlaceholderInt); s.append(kCountInstancesInRecipientState3Cloning, kPlaceholderInt); @@ -329,4 +345,78 @@ void ShardingDataTransformCumulativeMetrics::setLastOpEndingChunkImbalance(int64 _lastOpEndingChunkImbalance.store(imbalanceCount); } +AtomicWord<int64_t>* ShardingDataTransformCumulativeMetrics::getMutableCoordinatorStateCounter( + ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum state) { + if (state == ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused) { + return nullptr; + } + + invariant(static_cast<size_t>(state) < + static_cast<size_t>( + ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kNumStates)); + return &_coordinatorStateList[static_cast<size_t>(state)]; +} + +const AtomicWord<int64_t>* ShardingDataTransformCumulativeMetrics::getCoordinatorStateCounter( + ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum state) const { + if (state == ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused) { + return nullptr; + } + + invariant(static_cast<size_t>(state) < + static_cast<size_t>( + ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kNumStates)); + return &_coordinatorStateList[static_cast<size_t>(state)]; +} + +void ShardingDataTransformCumulativeMetrics::onCoordinatorStateTransition( + boost::optional<ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum> before, + boost::optional<ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum> after) { + if (before) { + if (auto counter = getMutableCoordinatorStateCounter(*before)) { + counter->fetchAndSubtract(1); + } + } + + if (after) { + if (auto counter = getMutableCoordinatorStateCounter(*after)) { + counter->fetchAndAdd(1); + } + } +} + +const char* ShardingDataTransformCumulativeMetrics::fieldNameFor( + ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum state) { + switch (state) { + case CoordinatorStateEnum::kInitializing: + return kCountInstancesInCoordinatorState1Initializing; + + case CoordinatorStateEnum::kPreparingToDonate: + return kCountInstancesInCoordinatorState2PreparingToDonate; + + case CoordinatorStateEnum::kCloning: + return kCountInstancesInCoordinatorState3Cloning; + + case CoordinatorStateEnum::kApplying: + return kCountInstancesInCoordinatorState4Applying; + + case CoordinatorStateEnum::kBlockingWrites: + return kCountInstancesInCoordinatorState5BlockingWrites; + + case CoordinatorStateEnum::kAborting: + return kCountInstancesInCoordinatorState6Aborting; + + case CoordinatorStateEnum::kCommitting: + return kCountInstancesInCoordinatorState7Committing; + + default: + uasserted(6438601, + str::stream() + << "no field name for coordinator state " << static_cast<int32_t>(state)); + break; + } + + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h index 8374f018753..4afc6520327 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics.h @@ -42,10 +42,24 @@ namespace mongo { class ShardingDataTransformCumulativeMetrics { public: + enum class CoordinatorStateEnum : int32_t { + kUnused = -1, + kInitializing, + kPreparingToDonate, + kCloning, + kApplying, + kBlockingWrites, + kAborting, + kCommitting, + kDone, + kNumStates + }; + using Role = ShardingDataTransformMetrics::Role; using InstanceObserver = ShardingDataTransformMetricsObserverInterface; using DeregistrationFunction = unique_function<void()>; + static ShardingDataTransformCumulativeMetrics* getForResharding(ServiceContext* context); static ShardingDataTransformCumulativeMetrics* getForGlobalIndexes(ServiceContext* context); @@ -62,6 +76,15 @@ public: void setLastOpEndingChunkImbalance(int64_t imbalanceCount); + /** + * The before can be boost::none to represent the initial state transition and + * after can be boost::none to represent cases where it is no longer active. + */ + void onCoordinatorStateTransition(boost::optional<CoordinatorStateEnum> before, + boost::optional<CoordinatorStateEnum> after); + + static const char* fieldNameFor(CoordinatorStateEnum state); + private: struct MetricsComparer { inline bool operator()(const InstanceObserver* a, const InstanceObserver* b) const { @@ -79,9 +102,13 @@ private: void reportOldestActive(BSONObjBuilder* bob) const; void reportLatencies(BSONObjBuilder* bob) const; void reportCurrentInSteps(BSONObjBuilder* bob) const; + MetricsSet& getMetricsSetForRole(Role role); const MetricsSet& getMetricsSetForRole(Role role) const; const InstanceObserver* getOldestOperation(WithLock, Role role) const; + const AtomicWord<int64_t>* getCoordinatorStateCounter(CoordinatorStateEnum state) const; + AtomicWord<int64_t>* getMutableCoordinatorStateCounter(CoordinatorStateEnum state); + MetricsSet::iterator insertMetrics(const InstanceObserver* metrics, MetricsSet& set); mutable Mutex _mutex; @@ -95,6 +122,9 @@ private: AtomicWord<int64_t> _countCancelled{0}; AtomicWord<int64_t> _lastOpEndingChunkImbalance{0}; + + std::array<AtomicWord<int64_t>, static_cast<size_t>(CoordinatorStateEnum::kNumStates)> + _coordinatorStateList; }; } // namespace mongo diff --git a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp index 15de8cf5199..11527443b62 100644 --- a/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp +++ b/src/mongo/db/s/sharding_data_transform_cumulative_metrics_test.cpp @@ -297,5 +297,143 @@ TEST_F(ShardingDataTransformCumulativeMetricsTest, ReportContainsLastChunkImbala } } +class ShardingDataTransformCumulativeStateTest : public ShardingDataTransformCumulativeMetricsTest { +public: + using CoordinatorStateEnum = ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum; + + BSONObj getStateSubObj(const ShardingDataTransformCumulativeMetrics& metrics) { + BSONObjBuilder bob; + metrics.reportForServerStatus(&bob); + auto report = bob.done(); + return report.getObjectField(kTestMetricsName).getObjectField("currentInSteps").getOwned(); + } + + bool checkCoordinateStateField(const ShardingDataTransformCumulativeMetrics& metrics, + boost::optional<CoordinatorStateEnum> expectedState) { + auto serverStatusSubObj = getStateSubObj(metrics); + std::map<std::string, int> expectedStateFieldCount; + + auto addExpectedField = [&](CoordinatorStateEnum stateToPopulate) { + expectedStateFieldCount.emplace( + ShardingDataTransformCumulativeMetrics::fieldNameFor(stateToPopulate), + ((expectedState && (stateToPopulate == expectedState)) ? 1 : 0)); + }; + + addExpectedField(CoordinatorStateEnum::kInitializing); + addExpectedField(CoordinatorStateEnum::kPreparingToDonate); + addExpectedField(CoordinatorStateEnum::kCloning); + addExpectedField(CoordinatorStateEnum::kApplying); + addExpectedField(CoordinatorStateEnum::kBlockingWrites); + addExpectedField(CoordinatorStateEnum::kAborting); + addExpectedField(CoordinatorStateEnum::kCommitting); + + for (const auto expectedState : expectedStateFieldCount) { + const auto actualValue = serverStatusSubObj.getIntField(expectedState.first); + if (actualValue != expectedState.second) { + LOGV2_DEBUG(6438600, + 0, + "coordinator state field value does not match expected value", + "field"_attr = expectedState.first, + "serverStatus"_attr = serverStatusSubObj); + return false; + } + } + + return true; + } +}; + +TEST_F(ShardingDataTransformCumulativeStateTest, + SimulatedNormalCoordinatorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + ASSERT(checkCoordinateStateField(_cumulativeMetrics, CoordinatorStateEnum::kUnused)); + + boost::optional<CoordinatorStateEnum> prevState; + boost::optional<CoordinatorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _cumulativeMetrics.onCoordinatorStateTransition(prevState, nextState); + return checkCoordinateStateField(_cumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCloning)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kBlockingWrites)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kCommitting)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kDone)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ShardingDataTransformCumulativeStateTest, + SimulatedAbortedCoordinatorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + ASSERT(checkCoordinateStateField( + _cumulativeMetrics, ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused)); + + boost::optional<CoordinatorStateEnum> prevState; + boost::optional<CoordinatorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _cumulativeMetrics.onCoordinatorStateTransition(prevState, nextState); + return checkCoordinateStateField(_cumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kAborting)); + ASSERT(simulateTransitionTo(boost::none)); +} + +TEST_F(ShardingDataTransformCumulativeStateTest, + SimulatedSteppedDownCoordinatorStateFromUnusedReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + auto initState = CoordinatorStateEnum::kUnused; + ASSERT(checkCoordinateStateField(_cumulativeMetrics, initState)); + + _cumulativeMetrics.onCoordinatorStateTransition(initState, boost::none); + ASSERT(checkCoordinateStateField(_cumulativeMetrics, initState)); +} + +TEST_F(ShardingDataTransformCumulativeStateTest, + SimulatedSteppedDownCoordinatorStateTransitionReportsStateCorrectly) { + using Role = ShardingDataTransformMetrics::Role; + ObserverMock coordinator{Date_t::fromMillisSinceEpoch(200), 400, 300, Role::kCoordinator}; + auto ignore = _cumulativeMetrics.registerInstanceMetrics(&coordinator); + + ASSERT(checkCoordinateStateField( + _cumulativeMetrics, ShardingDataTransformCumulativeMetrics::CoordinatorStateEnum::kUnused)); + + boost::optional<CoordinatorStateEnum> prevState; + boost::optional<CoordinatorStateEnum> nextState; + + auto simulateTransitionTo = [&](boost::optional<CoordinatorStateEnum> newState) { + prevState = nextState; + nextState = newState; + _cumulativeMetrics.onCoordinatorStateTransition(prevState, nextState); + return checkCoordinateStateField(_cumulativeMetrics, nextState); + }; + + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kUnused)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kInitializing)); + ASSERT(simulateTransitionTo(CoordinatorStateEnum::kPreparingToDonate)); + ASSERT(simulateTransitionTo(boost::none)); +} + } // namespace } // namespace mongo |