summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_metrics.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_metrics.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp78
1 files changed, 78 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index fe3fba53fb3..a18067a00fe 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/logv2/log.h"
#include "mongo/platform/compiler.h"
+#include "mongo/util/with_alignment.h"
namespace mongo {
@@ -62,6 +63,7 @@ constexpr auto kDonorState = "donorState";
constexpr auto kRecipientState = "recipientState";
constexpr auto kOpStatus = "opStatus";
constexpr auto kLastOpEndingChunkImbalance = "lastOpEndingChunkImbalance";
+constexpr auto kOpCounters = "opcounters";
using MetricsPtr = std::unique_ptr<ReshardingMetrics>;
@@ -97,6 +99,50 @@ static StringData serializeState(boost::optional<CoordinatorStateEnum> e) {
return CoordinatorState_serializer(*e);
}
+// Enables resharding to distinguish the ops it does to keep up with client writes from the
+// client workload tracked in the globalOpCounters.
+class ReshardingOpCounters {
+public:
+ void gotInsert() noexcept {
+ _checkWrap(&ReshardingOpCounters::_insert);
+ }
+ void gotUpdate() noexcept {
+ _checkWrap(&ReshardingOpCounters::_update);
+ }
+ void gotDelete() noexcept {
+ _checkWrap(&ReshardingOpCounters::_delete);
+ }
+
+ BSONObj getObj() const noexcept {
+ BSONObjBuilder b;
+ b.append("insert", _insert.loadRelaxed());
+ b.append("update", _update.loadRelaxed());
+ b.append("delete", _delete.loadRelaxed());
+ return b.obj();
+ }
+
+private:
+ // Increment member `counter` by 1, resetting all counters if it was > 2^60.
+ void _checkWrap(CacheAligned<AtomicWord<long long>> ReshardingOpCounters::*counter) {
+ static constexpr auto maxCount = 1LL << 60;
+ auto oldValue = (this->*counter).fetchAndAddRelaxed(1);
+ if (oldValue > maxCount - 1) {
+ LOGV2(5776000,
+ "ReshardingOpCounters exceeded maximum value, resetting all to 0",
+ "insert"_attr = _insert.loadRelaxed(),
+ "update"_attr = _update.loadRelaxed(),
+ "delete"_attr = _delete.loadRelaxed());
+ _insert.store(0);
+ _update.store(0);
+ _delete.store(0);
+ }
+ }
+
+ CacheAligned<AtomicWord<long long>> _insert;
+ CacheAligned<AtomicWord<long long>> _update;
+ CacheAligned<AtomicWord<long long>> _delete;
+};
+
// Allows tracking elapsed time for the resharding operation and its sub operations (e.g.,
// applying oplog entries).
class TimeInterval {
@@ -136,6 +182,10 @@ public:
boost::optional<Milliseconds> remainingOperationTime(Date_t now) const;
+ void gotInsert() noexcept;
+ void gotUpdate() noexcept;
+ void gotDelete() noexcept;
+
TimeInterval runningOperation;
ReshardingOperationStatusEnum opStatus = ReshardingOperationStatusEnum::kInactive;
@@ -154,11 +204,26 @@ public:
int64_t chunkImbalanceCount = 0;
+ // The ops done by resharding to keep up with the client writes.
+ ReshardingOpCounters opCounters;
+
boost::optional<DonorStateEnum> donorState;
boost::optional<RecipientStateEnum> recipientState;
boost::optional<CoordinatorStateEnum> coordinatorState;
};
+void ReshardingMetrics::OperationMetrics::gotInsert() noexcept {
+ opCounters.gotInsert();
+}
+
+void ReshardingMetrics::OperationMetrics::gotUpdate() noexcept {
+ opCounters.gotUpdate();
+}
+
+void ReshardingMetrics::OperationMetrics::gotDelete() noexcept {
+ opCounters.gotDelete();
+}
+
boost::optional<Milliseconds> ReshardingMetrics::OperationMetrics::remainingOperationTime(
Date_t now) const {
if (recipientState > RecipientStateEnum::kCloning && oplogEntriesFetched == 0) {
@@ -446,6 +511,18 @@ void ReshardingMetrics::onDocumentsCopiedForCurrentOp(int64_t documents, int64_t
_currentOp->bytesCopied += bytes;
}
+void ReshardingMetrics::gotInsert() noexcept {
+ _cumulativeOp->gotInsert();
+}
+
+void ReshardingMetrics::gotUpdate() noexcept {
+ _cumulativeOp->gotUpdate();
+}
+
+void ReshardingMetrics::gotDelete() noexcept {
+ _cumulativeOp->gotDelete();
+}
+
void ReshardingMetrics::startCopyingDocuments(Date_t start) {
stdx::lock_guard<Latch> lk(_mutex);
_currentOp->copyingDocuments.start(start);
@@ -602,6 +679,7 @@ void ReshardingMetrics::serializeCumulativeOpMetrics(BSONObjBuilder* bob) const
bob->append(kWritesDuringCritialSection, ops.writesDuringCriticalSection);
bob->append(kOplogsFetched, ops.oplogEntriesFetched);
bob->append(kLastOpEndingChunkImbalance, ops.chunkImbalanceCount);
+ bob->append(kOpCounters, ops.opCounters.getObj());
}
Date_t ReshardingMetrics::_now() const {