summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorAndrew Witten <andrew.witten@mongodb.com>2022-08-11 14:01:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-11 14:31:47 +0000
commitbbc0bb5e9878e48b6bc3b666affbdf102b379450 (patch)
tree479875f04f1bffc34185e4dc6b43cef4db067623 /src/mongo
parent2f27abe1eaad20074706fd740dcde445001201a6 (diff)
downloadmongo-bbc0bb5e9878e48b6bc3b666affbdf102b379450.tar.gz
SERVER-67650 persist additional recipient resharding metrics
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/resharding/recipient_document.idl5
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp70
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.h29
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp58
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h2
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp42
6 files changed, 163 insertions, 43 deletions
diff --git a/src/mongo/db/s/resharding/recipient_document.idl b/src/mongo/db/s/resharding/recipient_document.idl
index e3128de0db5..1eda4620b8c 100644
--- a/src/mongo/db/s/resharding/recipient_document.idl
+++ b/src/mongo/db/s/resharding/recipient_document.idl
@@ -82,6 +82,11 @@ structs:
startConfigTxnCloneTime:
type: date
optional: true
+ approxBytesToCopy:
+ type: long
+ description: >-
+ Approximate number of bytes to copy during cloning
+ optional: true
metrics:
type: ReshardingRecipientMetrics
description: "Metrics related to this recipient."
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 6e4e4e041e9..48576c26f45 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -406,6 +406,60 @@ void ReshardingMetrics::onStepUp(Role role) noexcept {
// instead of starting from the current time.
}
+void ReshardingMetrics::onStepUp(RecipientStateEnum state,
+ const ReshardingRecipientCountsAndMetrics& recipientMetrics) {
+ stdx::lock_guard<Latch> lk(_mutex);
+
+ _emplaceCurrentOpForRole(Role::kRecipient, boost::none);
+ _onStepUpCalled = true;
+
+ invariant(_currentOp, kNoOperationInProgress);
+ invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore);
+ invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore);
+
+ _currentOp->recipientState = state;
+ _currentOp->documentsCopied = recipientMetrics.documentCountCopied;
+ _currentOp->bytesCopied = recipientMetrics.documentBytesCopied;
+ _currentOp->oplogEntriesFetched = recipientMetrics.oplogEntriesFetched;
+ _currentOp->oplogEntriesApplied = recipientMetrics.oplogEntriesApplied;
+
+ if (recipientMetrics.approxBytesToCopy)
+ _currentOp->bytesToCopy = recipientMetrics.approxBytesToCopy.get();
+
+
+ const auto& timeIntervals = recipientMetrics.metrics;
+
+ // Restore in memory state of document copy metrics.
+ // Not calling startCopyingDocuments or endCopyingDocuments because they acquire a mutex that we
+ // already have.
+ //
+ // Also, note that it is possible for documentCopyInterval->getStart() to be none and for
+ // documentCopyInterval->getStop() to be not none. That can happen if the cluster is upgraded
+ // to include code for persisting time intervals during a resharding operation.
+ // In that case, restore neither the start nor stop time. The resharding coordinator will still
+ // treat this scenario as the recipient shard being completely caught up after a primary
+ // failover and engage the critical section too early.
+ const auto& documentCopyInterval = timeIntervals.getDocumentCopy();
+ if (documentCopyInterval && documentCopyInterval->getStart()) {
+ _currentOp->copyingDocuments.start(documentCopyInterval->getStart().get());
+ if (documentCopyInterval->getStop()) {
+ _currentOp->copyingDocuments.end(documentCopyInterval->getStop().get());
+ }
+ }
+ // Restore in memory state of oplog application metrics.
+ // Not calling startApplyingOplogEntries or endApplyingOplogEntries because they acquire a mutex
+ // that we already have.
+ const auto& oplogApplicationInterval = timeIntervals.getOplogApplication();
+ if (oplogApplicationInterval && oplogApplicationInterval->getStart()) {
+ _currentOp->applyingOplogEntries.start(oplogApplicationInterval->getStart().get());
+ if (oplogApplicationInterval->getStop()) {
+ _currentOp->applyingOplogEntries.end(oplogApplicationInterval->getStop().get());
+ }
+ }
+}
+
void ReshardingMetrics::onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics) {
stdx::lock_guard<Latch> lk(_mutex);
auto operationRuntime = donorMetrics.getOperationRuntime();
@@ -662,22 +716,6 @@ void ReshardingMetrics::onOplogEntriesApplied(int64_t entries) noexcept {
_cumulativeOp->oplogEntriesApplied += entries;
}
-void ReshardingMetrics::restoreForCurrentOp(int64_t documentCountCopied,
- int64_t documentBytesCopied,
- int64_t oplogEntriesFetched,
- int64_t oplogEntriesApplied) noexcept {
- invariant(_currentOp, kNoOperationInProgress);
- invariant(_currentOp->documentsCopied == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->bytesCopied == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->oplogEntriesFetched == 0, kMetricsSetBeforeRestore);
- invariant(_currentOp->oplogEntriesApplied == 0, kMetricsSetBeforeRestore);
-
- _currentOp->documentsCopied = documentCountCopied;
- _currentOp->bytesCopied = documentBytesCopied;
- _currentOp->oplogEntriesFetched = oplogEntriesFetched;
- _currentOp->oplogEntriesApplied = oplogEntriesApplied;
-}
-
void ReshardingMetrics::onWriteDuringCriticalSection(int64_t writes) noexcept {
stdx::lock_guard<Latch> lk(_mutex);
if (!_currentOp)
diff --git a/src/mongo/db/s/resharding/resharding_metrics.h b/src/mongo/db/s/resharding/resharding_metrics.h
index a6964c9d611..9ba80d35417 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.h
+++ b/src/mongo/db/s/resharding/resharding_metrics.h
@@ -35,6 +35,7 @@
#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/s/resharding/donor_document_gen.h"
+#include "mongo/db/s/resharding/recipient_document_gen.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/mutex.h"
#include "mongo/s/resharding/common_types_gen.h"
@@ -71,6 +72,29 @@ public:
void onStepUp(DonorStateEnum state, ReshardingDonorMetrics donorMetrics);
+ struct ReshardingRecipientCountsAndMetrics {
+ ReshardingRecipientCountsAndMetrics(int64_t documentCountCopied,
+ int64_t documentBytesCopied,
+ int64_t oplogEntriesFetched,
+ int64_t oplogEntriesApplied,
+ boost::optional<int64_t> approxBytesToCopy,
+ ReshardingRecipientMetrics metrics)
+ : documentCountCopied{documentCountCopied},
+ documentBytesCopied{documentBytesCopied},
+ oplogEntriesFetched{oplogEntriesFetched},
+ oplogEntriesApplied{oplogEntriesApplied},
+ approxBytesToCopy{approxBytesToCopy},
+ metrics{metrics} {}
+ int64_t documentCountCopied;
+ int64_t documentBytesCopied;
+ int64_t oplogEntriesFetched;
+ int64_t oplogEntriesApplied;
+ boost::optional<int64_t> approxBytesToCopy;
+ ReshardingRecipientMetrics metrics;
+ };
+
+ void onStepUp(RecipientStateEnum, const ReshardingRecipientCountsAndMetrics&);
+
// So long as a resharding operation is in progress, the following may be used to update the
// state of a donor, a recipient, and a coordinator, respectively.
void setDonorState(DonorStateEnum) noexcept;
@@ -113,11 +137,6 @@ public:
// Allows restoring "oplog entries to apply" metrics.
void onOplogEntriesApplied(int64_t entries) noexcept;
- void restoreForCurrentOp(int64_t documentCountCopied,
- int64_t documentBytesCopied,
- int64_t oplogEntriesFetched,
- int64_t oplogEntriesApplied) noexcept;
-
// Allows tracking writes during a critical section when the donor's state is either of
// "donating-oplog-entries" or "blocking-writes".
void onWriteDuringCriticalSection(int64_t writes) noexcept;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index ccfd5fefdea..7c07564a688 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -145,6 +145,8 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
_recipientCtx{recipientDoc.getMutableState()},
_donorShards{recipientDoc.getDonorShards()},
_cloneTimestamp{recipientDoc.getCloneTimestamp()},
+ _timeIntervals{recipientDoc.getMetrics().get_value_or({})},
+ _approxBytesToCopy{recipientDoc.getApproxBytesToCopy()},
_externalState{std::move(externalState)},
_startConfigTxnCloneAt{recipientDoc.getStartConfigTxnCloneTime()},
_markKilledExecutor(std::make_shared<ThreadPool>([] {
@@ -166,6 +168,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
return donor.getShardId() == myShardId;
}) != _donorShards.end();
}()) {
+
invariant(_externalState);
}
@@ -813,27 +816,53 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionToCloning(
const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kCloning);
+ auto cloningStartTime = getCurrentTime();
+
+ // Record cloning start time.
+ ReshardingMetricsTimeInterval interval;
+ interval.setStart(cloningStartTime);
+ _timeIntervals.setDocumentCopy(interval);
+
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- _metrics()->startCopyingDocuments(getCurrentTime());
+ _metrics()->startCopyingDocuments(cloningStartTime);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToApplying(
const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kApplying);
+ auto oplogApplicationStartTime = getCurrentTime();
+
+ // Record oplog application start time.
+ ReshardingMetricsTimeInterval interval;
+ interval.setStart(oplogApplicationStartTime);
+ _timeIntervals.setOplogApplication(interval);
+
+ // Record document copy stop time.
+ ReshardingMetricsTimeInterval documentCopy{_timeIntervals.getDocumentCopy().get_value_or({})};
+ documentCopy.setStop(oplogApplicationStartTime);
+ _timeIntervals.setDocumentCopy(documentCopy);
+
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- auto currentTime = getCurrentTime();
- _metrics()->endCopyingDocuments(currentTime);
- _metrics()->startApplyingOplogEntries(currentTime);
+ _metrics()->endCopyingDocuments(oplogApplicationStartTime);
+ _metrics()->startApplyingOplogEntries(oplogApplicationStartTime);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToStrictConsistency(
const CancelableOperationContextFactory& factory) {
auto newRecipientCtx = _recipientCtx;
newRecipientCtx.setState(RecipientStateEnum::kStrictConsistency);
+ auto oplogApplicationStopTime = getCurrentTime();
+
+ // Record oplog application stop time
+ ReshardingMetricsTimeInterval oplogApplication{
+ _timeIntervals.getOplogApplication().get_value_or({})};
+ oplogApplication.setStop(oplogApplicationStopTime);
+ _timeIntervals.setOplogApplication(oplogApplication);
+
+
_transitionState(std::move(newRecipientCtx), boost::none, boost::none, factory);
- auto currentTime = getCurrentTime();
- _metrics()->endApplyingOplogEntries(currentTime);
+ _metrics()->endApplyingOplogEntries(oplogApplicationStopTime);
}
void ReshardingRecipientService::RecipientStateMachine::_transitionToError(
@@ -985,6 +1014,9 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
setBuilder.append(ReshardingRecipientDocument::kDonorShardsFieldName,
donorShardsArrayBuilder.arr());
+
+ setBuilder.append(ReshardingRecipientDocument::kApproxBytesToCopyFieldName,
+ cloneDetails->approxBytesToCopy);
}
if (configStartTime) {
@@ -992,6 +1024,8 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
*configStartTime);
}
+ setBuilder.append(ReshardingRecipientDocument::kMetricsFieldName, _timeIntervals.toBSON());
+
setBuilder.doneFast();
}
@@ -1009,6 +1043,7 @@ void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument
if (cloneDetails) {
_cloneTimestamp = cloneDetails->cloneTimestamp;
_donorShards = std::move(cloneDetails->donorShards);
+ _approxBytesToCopy = cloneDetails->approxBytesToCopy;
}
if (configStartTime) {
@@ -1068,7 +1103,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMe
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
- _metrics()->onStepUp(ReshardingMetrics::Role::kRecipient);
return _restoreMetricsWithRetry(executor, abortToken);
}
_metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
@@ -1078,7 +1112,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_startMe
ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_restoreMetricsWithRetry(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor,
const CancellationToken& abortToken) {
- _metrics()->setRecipientState(_recipientCtx.getState());
return _retryingCancelableOpCtxFactory
->withAutomaticRetry(
[this, executor, abortToken](const auto& factory) { _restoreMetrics(factory); })
@@ -1144,8 +1177,13 @@ void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics(
}
}
- _metrics()->restoreForCurrentOp(
- documentCountCopied, documentBytesCopied, oplogEntriesFetched, oplogEntriesApplied);
+ _metrics()->onStepUp(_recipientCtx.getState(),
+ ReshardingMetrics::ReshardingRecipientCountsAndMetrics{documentCountCopied,
+ documentBytesCopied,
+ oplogEntriesFetched,
+ oplogEntriesApplied,
+ _approxBytesToCopy,
+ _timeIntervals});
}
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index e1fe504a970..86d3236b9c9 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -288,6 +288,8 @@ private:
RecipientShardContext _recipientCtx;
std::vector<DonorShardFetchTimestamp> _donorShards;
boost::optional<Timestamp> _cloneTimestamp;
+ ReshardingRecipientMetrics _timeIntervals;
+ boost::optional<int64_t> _approxBytesToCopy;
const std::unique_ptr<RecipientStateMachineExternalState> _externalState;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 43ccab32bec..b243a433995 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -50,8 +50,10 @@
#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
#include "mongo/logv2/log.h"
#include "mongo/unittest/death_test.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/fail_point.h"
+
namespace mongo {
namespace {
@@ -208,6 +210,8 @@ public:
*/
class ReshardingRecipientServiceTest : public repl::PrimaryOnlyServiceMongoDTest {
public:
+ ReshardingRecipientServiceTest() : PrimaryOnlyServiceMongoDTest(Options{}.useMockClock(true)) {}
+
using RecipientStateMachine = ReshardingRecipientService::RecipientStateMachine;
std::unique_ptr<repl::PrimaryOnlyService> makeService(ServiceContext* serviceContext) override {
@@ -222,7 +226,6 @@ public:
repl::DropPendingCollectionReaper::set(
serviceContext, std::make_unique<repl::DropPendingCollectionReaper>(storageMock.get()));
repl::StorageInterface::set(serviceContext, std::move(storageMock));
-
_controller = std::make_shared<RecipientStateTransitionController>();
_opObserverRegistry->addObserver(std::make_unique<RecipientOpObserverForTest>(_controller));
}
@@ -816,22 +819,33 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
}
// Step down before the transition to state can complete.
stateTransitionsGuard.wait(state);
- if (state == RecipientStateEnum::kStrictConsistency) {
- auto currOp = recipient
- ->reportForCurrentOp(
- MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
- MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
- .get();
+
+ dynamic_cast<ClockSourceMock*>(getServiceContext()->getFastClockSource())
+ ->advance(Seconds(1));
+ auto currOp =
+ recipient
+ ->reportForCurrentOp(MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
+ MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
+ .get();
+
+
+ if (state == RecipientStateEnum::kApplying) {
+ ASSERT_EQ(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0);
+ ASSERT_EQ(currOp.getStringField("recipientState"),
+ RecipientState_serializer(RecipientStateEnum::kCloning));
+ ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0);
+
+ } else if (state == RecipientStateEnum::kStrictConsistency) {
ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
ASSERT_EQ(currOp.getStringField("recipientState"),
RecipientState_serializer(RecipientStateEnum::kApplying));
+ ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0);
+
} else if (state == RecipientStateEnum::kDone) {
- auto currOp = recipient
- ->reportForCurrentOp(
- MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
- MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
- .get();
ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(),
@@ -840,7 +854,11 @@ TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size());
ASSERT_EQ(currOp.getStringField("recipientState"),
RecipientState_serializer(RecipientStateEnum::kStrictConsistency));
+ ASSERT_GT(currOp.getField("totalCopyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("totalApplyTimeElapsedSecs").Long(), 0);
+ ASSERT_GT(currOp.getField("approxBytesToCopy").Long(), 0);
}
+
stepDown();
ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(),