summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Witten <andrew.witten@mongodb.com>2022-08-11 14:01:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-12 19:25:24 +0000
commit5a750e26658c06386d16291e4cf55e05e3379dbe (patch)
tree42865c1e157791e148d17692358862a9263e25e5
parent50b526fecf6e13b2ac8541b69564f368ac17a8ac (diff)
downloadmongo-5a750e26658c06386d16291e4cf55e05e3379dbe.tar.gz
SERVER-67650 persist additional recipient resharding metrics
(cherry picked from commit bbc0bb5e9878e48b6bc3b666affbdf102b379450) fix comilation errors after cherry-pick
-rw-r--r--src/mongo/db/s/resharding/recipient_document.idl5
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp69
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h29
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp58
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp44
6 files changed, 163 insertions, 44 deletions
diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl
index e3128de0db5..1eda4620b8c 100644
--- a/src/mongo/db/s/resharding/recipient_document.idl
+++ b/src/mongo/db/s/resharding/recipient_document.idl
@@ -82,6 +82,11 @@ structs:
startConfigTxnCloneTime:
type: date
optional: true
+ approxBytesToCopy:
+ type: long
+ description: >-
+ Approximate number of bytes to copy during cloning
+ optional: true
metrics:
type: ReshardingRecipientMetrics
description: "Metrics related to this recipient."
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 2f7f99f186b..fcd4abc7f27 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -406,6 +406,59 @@ void ReshardingMetrics::onStepUp(Role role) noexcept {
// instead of starting from the current time.
}
+void ReshardingMetrics::onStepUp(RecipientStateEnum state,
+ const ReshardingRecipientCountsAndMetrics& recipientMetrics) {
+ stdx::lock_guard<Latch> lk(_mutex);
+
+ _emplaceCurrentOpForRole(Role::kRecipient, boost::none);
+
+ invariant(_currentOp, kNoOperationInProgress);
+ invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore);
+
+ _currentOp->recipientState = state;
+ _currentOp->documentsCopied = recipientMetrics.documentCountCopied;
+ _currentOp->bytesCopied = recipientMetrics.documentBytesCopied;
+ _currentOp->oplogEntriesFetched = recipientMetrics.oplogEntriesFetched;
+ _currentOp->oplogEntriesApplied = recipientMetrics.oplogEntriesApplied;
+
+ if (recipientMetrics.approxBytesToCopy)
+ _currentOp->bytesToCopy = recipientMetrics.approxBytesToCopy.get();
+
+
+ const auto& timeIntervals = recipientMetrics.metrics;
+
+ // Restore in memory state of document copy metrics.
+ // Not calling startCopyingDocuments or endCopyingDocuments because they acquire a mutex that we
+ // already have.
+ //
+ // Also, note that it is possible for documentCopyInterval->getStart() to be none and for
+ // documentCopyInterval->getStop() to be not none. That can happen if the cluster is upgraded
+ // to include code for persisting time intervals during a resharding operation.
+ // In that case, restore neither the start nor stop time. The resharding coordinator will still
+ // treat this scenario as the recipient shard being completely caught up after a primary
+ // failover and engage the critical section too early.
+ const auto& documentCopyInterval = timeIntervals.getDocumentCopy();
+ if (documentCopyInterval && documentCopyInterval->getStart()) {
+ _currentOp->copyingDocuments.start(documentCopyInterval->getStart().get());
+ if (documentCopyInterval->getStop()) {
+ _currentOp->copyingDocuments.end(documentCopyInterval->getStop().get());
+ }
+ }
+ // Restore in memory state of oplog application metrics.
+ // Not calling startApplyingOplogEntries or endApplyingOplogEntries because they acquire a mutex
+ // that we already have.
+ const auto& oplogApplicationInterval = timeIntervals.getOplogApplication();
+ if (oplogApplicationInterval && oplogApplicationInterval->getStart()) {
+ _currentOp->applyingOplogEntries.start(oplogApplicationInterval->getStart().get());
+ if (oplogApplicationInterval->getStop()) {
+ _currentOp->applyingOplogEntries.end(oplogApplicationInterval->getStop().get());
+ }
+ }
+}
+
void ReshardingMetrics::onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics) {
stdx::lock_guard<Latch> lk(_mutex);
auto operationRuntime = donorMetrics.getOperationRuntime();
@@ -661,22 +714,6 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
_cumulativeOp->oplogEntriesApplied += entries;
}
-void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied,
- int64_t documentBytesCopied,
- int64_t oplogEntriesFetched,
- int64_t oplogEntriesApplied) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
- invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore);
-
- _currentOp->documentsCopied = documentCountCopied;
- _currentOp->bytesCopied = documentBytesCopied;
- _currentOp->oplogEntriesFetched = oplogEntriesFetched;
- _currentOp->oplogEntriesApplied = oplogEntriesApplied;
-}
-
void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
if (!_currentOp)
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index a92ba12ca5d..0368d90861d 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -35,6 +35,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/resharding/donor_document_gen.h"
+#include "mongo/db/s/resharding/recipient_document_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
#include "mongo/s/resharding/common_types_gen.h"
@@ -71,6 +72,29 @@ public:
void onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics);
+ struct ReshardingRecipientCountsAndMetrics {
+ ReshardingRecipientCountsAndMetrics(int64_t documentCountCopied,
+ int64_t documentBytesCopied,
+ int64_t oplogEntriesFetched,
+ int64_t oplogEntriesApplied,
+ boost::optional<int64_t> approxBytesToCopy,
+ ReshardingRecipientMetrics metrics)
+ : documentCountCopied{documentCountCopied},
+ documentBytesCopied{documentBytesCopied},
+ oplogEntriesFetched{oplogEntriesFetched},
+ oplogEntriesApplied{oplogEntriesApplied},
+ approxBytesToCopy{approxBytesToCopy},
+ metrics{metrics} {}
+ int64_t documentCountCopied;
+ int64_t documentBytesCopied;
+ int64_t oplogEntriesFetched;
+ int64_t oplogEntriesApplied;
+ boost::optional<int64_t> approxBytesToCopy;
+ ReshardingRecipientMetrics metrics;
+ };
+
+ void onStepUp(RecipientStateEnum, const ReshardingRecipientCountsAndMetrics&);
+
// So long as a resharding operation is in progress, the following may be used to update the
// state of a donor, a recipient, and a coordinator, respectively.
void setDonorState(DonorStateEnum) noexcept;
@@ -113,11 +137,6 @@ public:
// Allows restoring "oplog entries to apply" metrics.
void onOplogEntriesApplied(int64_t entries) noexcept;
- void restoreForCurrentOp(int64_t documentCountCopied,
- int64_t documentBytesCopied,
- int64_t oplogEntriesFetched,
- int64_t oplogEntriesApplied) noexcept;
-
// Allows tracking writes during a critical section when the donor's state is either of
// "donating-oplog-entries" or "blocking-writes".
void onWriteDuringCriticalSection(int64_t writes) noexcept;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index ff6442a6b12..88fd4dfa38f 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -141,6 +141,8 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
_recipientCtx{recipientDoc.getMutableState()},
_donorShards{recipientDoc.getDonorShards()},
_cloneTimestamp{recipientDoc.getCloneTimestamp()},
+ _timeIntervals{recipientDoc.getMetrics().get_value_or({})},
+ _approxBytesToCopy{recipientDoc.getApproxBytesToCopy()},
_externalState{std::move(externalState)},
_startConfigTxnCloneAt{recipientDoc.getStartConfigTxnCloneTime()},
_markKilledExecutor(std::make_shared<ThreadPool>([] {
@@ -162,6 +164,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
return donor.getShardId() == myShardId;
}) != _donorShards.end();
}()) {
+
invariant(_externalState);
}
@@ -801,27 +804,53 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning(
const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCloning);
+ auto cloningStartTime = getCurrentTime();
+
+ // Record cloning start time.
+ ReshardingMetricsTimeInterval interval;
+ interval.setStart(cloningStartTime);
+ _timeIntervals.setDocumentCopy(interval);
+
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- _metrics()->startCopyingDocuments(getCurrentTime());
+ _metrics()->startCopyingDocuments(cloningStartTime);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kApplying);
+ auto oplogApplicationStartTime = getCurrentTime();
+
+ // Record oplog application start time.
+ ReshardingMetricsTimeInterval interval;
+ interval.setStart(oplogApplicationStartTime);
+ _timeIntervals.setOplogApplication(interval);
+
+ // Record document copy stop time.
+ ReshardingMetricsTimeInterval documentCopy{_timeIntervals.getDocumentCopy().get_value_or({})};
+ documentCopy.setStop(oplogApplicationStartTime);
+ _timeIntervals.setDocumentCopy(documentCopy);
+
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- auto currentTime = getCurrentTime();
- _metrics()->endCopyingDocuments(currentTime);
- _metrics()->startApplyingOplogEntries(currentTime);
+ _metrics()->endCopyingDocuments(oplogApplicationStartTime);
+ _metrics()->startApplyingOplogEntries(oplogApplicationStartTime);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency(
const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency);
+ auto oplogApplicationStopTime = getCurrentTime();
+
+ // Record oplog application stop time
+ ReshardingMetricsTimeInterval oplogApplication{
+ _timeIntervals.getOplogApplication().get_value_or({})};
+ oplogApplication.setStop(oplogApplicationStopTime);
+ _timeIntervals.setOplogApplication(oplogApplication);
+
+
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- auto currentTime = getCurrentTime();
- _metrics()->endApplyingOplogEntries(currentTime);
+ _metrics()->endApplyingOplogEntries(oplogApplicationStopTime);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
@@ -973,6 +1002,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
setBuilder.append(ReshardingRecipientDocument::kDonorShardsFieldName,
donorShardsArrayBuilder.arr());
+
+ setBuilder.append(ReshardingRecipientDocument::kApproxBytesToCopyFieldName,
+ cloneDetails->approxBytesToCopy);
}
if (configStartTime) {
@@ -980,6 +1012,8 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
*configStartTime);
}
+ setBuilder.append(ReshardingRecipientDocument::kMetricsFieldName, _timeIntervals.toBSON());
+
setBuilder.doneFast();
}
@@ -997,6 +1031,7 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
if (cloneDetails) {
_cloneTimestamp = cloneDetails->cloneTimestamp;
_donorShards = std::move(cloneDetails->donorShards);
+ _approxBytesToCopy = cloneDetails->approxBytesToCopy;
}
if (configStartTime) {
@@ -1056,7 +1091,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMe
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
- _metrics()->onStepUp(ReshardingMetrics::Role::kRecipient);
return _restoreMetricsWithRetry(executor, abortToken);
}
_metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
@@ -1066,7 +1100,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMe
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
- _metrics()->setRecipientState(_recipientCtx.getState());
return _retryingCancelableOpCtxFactory
->withAutomaticRetry(
[this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); })
@@ -1132,8 +1165,13 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
}
}
- _metrics()->restoreForCurrentOp(
- documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied);
+ _metrics()->onStepUp(_recipientCtx.getState(),
+ ReshardingMetrics::ReshardingRecipientCountsAndMetrics{documentCountCopied,
+ documentBytesCopied,
+ oplogEntriesFetched,
+ oplogEntriesApplied,
+ _approxBytesToCopy,
+ _timeIntervals});
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 3e9a56b4646..48b1445b080 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -278,6 +278,8 @@ private:
RecipientShardContext _recipientCtx;
std::vector<DonorShardFetchTimestamp> _donorShards;
boost::optional<Timestamp> _cloneTimestamp;
+ ReshardingRecipientMetrics _timeIntervals;
+ boost::optional<int64_t> _approxBytesToCopy;
const std::unique_ptr<RecipientStateMachineExternalState> _externalState;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 1886736f134..0a8beca187e 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -50,8 +50,10 @@
#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/death_test.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/fail_point.h"
+
namespace mongo {
namespace {
@@ -213,13 +215,14 @@ public:
void setUp() override {
repl::PrimaryOnlyServiceMongoDTest::setUp();
-
+ auto clockSource = std::make_unique<ClockSourceMock>();
+ clockSource->advance(Seconds(1));
+ getServiceContext()->setFastClockSource(std::move(clockSource));
auto serviceContext = getServiceContext();
auto storageMock = std::make_unique<repl::StorageInterfaceMock>();
repl::DropPendingCollectionReaper::set(
serviceContext, std::make_unique<repl::DropPendingCollectionReaper>(storageMock.get()));
repl::StorageInterface::set(serviceContext, std::move(storageMock));
-
_controller = std::make_shared<RecipientStateTransitionController>();
_opObserverRegistry->addObserver(std::make_unique<RecipientOpObserverForTest>(_controller));
}
@@ -842,22 +845,33 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
}
// Step down before the transition to state can complete.
stateTransitionsGuard.wait(state);
- if (state == RecipientStateEnum::kStrictConsistency) {
- auto currOp = recipient
- ->reportForCurrentOp(
- MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
- MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
- .get();
+
+ dynamic_cast<ClockSourceMock*>(getServiceContext()->getFastClockSource())
+ ->advance(Seconds(1));
+ auto currOp =
+ recipient
+ ->reportForCurrentOp(MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
+ MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
+ .get();
+
+
+ if (state == RecipientStateEnum::kApplying) {
+ ASSERT_EQ(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0);
+ ASSERT_EQ(currOp.getStringField("recipientState"),
+ RecipientState_serializer(RecipientStateEnum::kCloning));
+ ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0);
+
+ } else if (state == RecipientStateEnum::kStrictConsistency) {
ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
ASSERT_EQ(currOp.getStringField("recipientState"),
RecipientState_serializer(RecipientStateEnum::kApplying));
+ ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0);
+
} else if (state == RecipientStateEnum::kDone) {
- auto currOp = recipient
- ->reportForCurrentOp(
- MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
- MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
- .get();
ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(),
@@ -866,7 +880,11 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size());
ASSERT_EQ(currOp.getStringField("recipientState"),
RecipientState_serializer(RecipientStateEnum::kStrictConsistency));
+ ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0);
}
+
stepDown();
ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(),