diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_metrics.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_metrics.cpp | 78 |
1 files changed, 78 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index fe3fba53fb3..a18067a00fe 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -34,6 +34,7 @@ #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/logv2/log.h" #include "mongo/platform/compiler.h" +#include "mongo/util/with_alignment.h" namespace mongo { @@ -62,6 +63,7 @@ constexpr auto kDonorState = "donorState"; constexpr auto kRecipientState = "recipientState"; constexpr auto kOpStatus = "opStatus"; constexpr auto kLastOpEndingChunkImbalance = "lastOpEndingChunkImbalance"; +constexpr auto kOpCounters = "opcounters"; using MetricsPtr = std::unique_ptr<ReshardingMetrics>; @@ -97,6 +99,50 @@ static StringData serializeState(boost::optional<CoordinatorStateEnum> e) { return CoordinatorState_serializer(*e); } +// Enables resharding to distinguish the ops it does to keep up with client writes from the +// client workload tracked in the globalOpCounters. +class ReshardingOpCounters { +public: + void gotInsert() noexcept { + _checkWrap(&ReshardingOpCounters::_insert); + } + void gotUpdate() noexcept { + _checkWrap(&ReshardingOpCounters::_update); + } + void gotDelete() noexcept { + _checkWrap(&ReshardingOpCounters::_delete); + } + + BSONObj getObj() const noexcept { + BSONObjBuilder b; + b.append("insert", _insert.loadRelaxed()); + b.append("update", _update.loadRelaxed()); + b.append("delete", _delete.loadRelaxed()); + return b.obj(); + } + +private: + // Increment member `counter` by 1, resetting all counters if it was > 2^60. + void _checkWrap(CacheAligned<AtomicWord<long long>> ReshardingOpCounters::*counter) { + static constexpr auto maxCount = 1LL << 60; + auto oldValue = (this->*counter).fetchAndAddRelaxed(1); + if (oldValue > maxCount - 1) { + LOGV2(5776000, + "ReshardingOpCounters exceeded maximum value, resetting all to 0", + "insert"_attr = _insert.loadRelaxed(), + "update"_attr = _update.loadRelaxed(), + "delete"_attr = _delete.loadRelaxed()); + _insert.store(0); + _update.store(0); + _delete.store(0); + } + } + + CacheAligned<AtomicWord<long long>> _insert; + CacheAligned<AtomicWord<long long>> _update; + CacheAligned<AtomicWord<long long>> _delete; +}; + // Allows tracking elapsed time for the resharding operation and its sub operations (e.g., // applying oplog entries). class TimeInterval { @@ -136,6 +182,10 @@ public: boost::optional<Milliseconds> remainingOperationTime(Date_t now) const; + void gotInsert() noexcept; + void gotUpdate() noexcept; + void gotDelete() noexcept; + TimeInterval runningOperation; ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive; @@ -154,11 +204,26 @@ public: int64_t chunkImbalanceCount = 0; + // The ops done by resharding to keep up with the client writes. + ReshardingOpCounters opCounters; + boost::optional<DonorStateEnum> donorState; boost::optional<RecipientStateEnum> recipientState; boost::optional<CoordinatorStateEnum> coordinatorState; }; +void ReshardingMetrics::OperationMetrics::gotInsert() noexcept { + opCounters.gotInsert(); +} + +void ReshardingMetrics::OperationMetrics::gotUpdate() noexcept { + opCounters.gotUpdate(); +} + +void ReshardingMetrics::OperationMetrics::gotDelete() noexcept { + opCounters.gotDelete(); +} + boost::optional<Milliseconds> ReshardingMetrics::OperationMetrics::remainingOperationTime( Date_t now) const { if (recipientState > RecipientStateEnum::kCloning && oplogEntriesFetched == 0) { @@ -446,6 +511,18 @@ void ReshardingMetrics::onDocumentsCopiedForCurrentOp(int64_t documents, int64_t _currentOp->bytesCopied += bytes; } +void ReshardingMetrics::gotInsert() noexcept { + _cumulativeOp->gotInsert(); +} + +void ReshardingMetrics::gotUpdate() noexcept { + _cumulativeOp->gotUpdate(); +} + +void ReshardingMetrics::gotDelete() noexcept { + _cumulativeOp->gotDelete(); +} + void ReshardingMetrics::startCopyingDocuments(Date_t start) { stdx::lock_guard<Latch> lk(_mutex); _currentOp->copyingDocuments.start(start); @@ -602,6 +679,7 @@ void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const bob->append(kWritesDuringCritialSection, ops.writesDuringCriticalSection); bob->append(kOplogsFetched, ops.oplogEntriesFetched); bob->append(kLastOpEndingChunkImbalance, ops.chunkImbalanceCount); + bob->append(kOpCounters, ops.opCounters.getObj()); } Date_t ReshardingMetrics::_now() const { |