summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKshitij Gupta <kshitij.gupta@mongodb.com>2021-06-08 14:31:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-06-11 13:16:54 +0000
commitfee0349f0f83aa84bf47d66c1af139bc07dc541d (patch)
tree2f3122d85ea263d041dc9640941d9da3236aacd5
parent1dbdd89ea20ad7d4f5c6f7e8953d2348bc336270 (diff)
downloadmongo-fee0349f0f83aa84bf47d66c1af139bc07dc541d.tar.gz
SERVER-53912: ReshardingRecipientService instances to load metrics state
upon instantiation.
-rw-r--r--src/mongo/db/s/resharding/resharding_metrics.cpp1
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp55
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp135
4 files changed, 193 insertions, 1 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp
index 610b2c38005..6ffd3b2daab 100644
--- a/src/mongo/db/s/resharding/resharding_metrics.cpp
+++ b/src/mongo/db/s/resharding/resharding_metrics.cpp
@@ -153,7 +153,6 @@ void ReshardingMetrics::onStepUp(Role role) noexcept {
// TODO SERVER-53913 Implement donor metrics rehydration.
// TODO SERVER-53914 Implement coordinator metrics rehydration.
- // TODO SERVER-53912 Implement recipient metrics rehydration.
// TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk
// instead of starting from the current time.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index ba96366a3fb..b59638c31da 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -38,6 +38,8 @@
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/index_builds_coordinator.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/persistent_task_store.h"
#include "mongo/db/query/collation/collation_spec.h"
@@ -950,11 +952,64 @@ ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics()
void ReshardingRecipientService::RecipientStateMachine::_startMetrics() {
if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) {
_metrics()->onStepUp(ReshardingMetrics::Role::kRecipient);
+ _restoreMetrics();
} else {
_metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime());
}
}
+void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() {
+ _metrics()->setRecipientState(_recipientCtx.getState());
+
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ {
+ AutoGetCollection tempReshardingColl(
+ opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS);
+ if (tempReshardingColl) {
+ int64_t bytesCopied = tempReshardingColl->dataSize(opCtx.get());
+ int64_t documentsCopied = tempReshardingColl->numRecords(opCtx.get());
+ if (bytesCopied > 0) {
+ _metrics()->onDocumentsCopiedForCurrentOp(documentsCopied, bytesCopied);
+ }
+ }
+ }
+
+ for (const auto& donor : _donorShards) {
+ {
+ AutoGetCollection oplogBufferColl(
+ opCtx.get(),
+ getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()),
+ MODE_IS);
+ if (oplogBufferColl) {
+ int64_t recordsFetched = oplogBufferColl->numRecords(opCtx.get());
+ if (recordsFetched > 0)
+ _metrics()->onOplogEntriesFetchedForCurrentOp(recordsFetched);
+ }
+ }
+
+ {
+ AutoGetCollection progressApplierColl(
+ opCtx.get(), NamespaceString::kReshardingApplierProgressNamespace, MODE_IS);
+ if (progressApplierColl) {
+ BSONObj result;
+ Helpers::findOne(
+ opCtx.get(),
+ progressApplierColl.getCollection(),
+ BSON(ReshardingOplogApplierProgress::kOplogSourceIdFieldName
+ << (ReshardingSourceId{_metadata.getReshardingUUID(), donor.getShardId()})
+ .toBSON()),
+ result);
+
+ if (!result.isEmpty()) {
+ _metrics()->onOplogEntriesAppliedForCurrentOp(
+ result.getField(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName)
+ .Long());
+ }
+ }
+ }
+ }
+}
+
CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource(
const CancellationToken& stepdownToken) {
{
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index df7a6c55a90..e0e4357cead 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -229,6 +229,9 @@ private:
void _startMetrics();
+ // Restore metrics using the persisted metrics after stepping up.
+ void _restoreMetrics();
+
// Initializes the _abortSource and generates a token from it to return back the caller.
//
// Should only be called once per lifetime.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 2527bd79986..9faa03acf2d 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_data_replication.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
#include "mongo/db/s/resharding/resharding_recipient_service.h"
#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h"
#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
@@ -598,5 +599,139 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) {
}
}
+TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
+ const std::vector<RecipientStateEnum> recipientStates{RecipientStateEnum::kCreatingCollection,
+ RecipientStateEnum::kCloning,
+ RecipientStateEnum::kApplying,
+ RecipientStateEnum::kStrictConsistency,
+ RecipientStateEnum::kDone};
+ // TODO (SERVER-57194): enable lock-free reads.
+ bool disableLockFreeReadsOriginalValue = storageGlobalParams.disableLockFreeReads;
+ storageGlobalParams.disableLockFreeReads = true;
+ ON_BLOCK_EXIT(
+ [&] { storageGlobalParams.disableLockFreeReads = disableLockFreeReadsOriginalValue; });
+
+ PauseDuringStateTransitions stateTransitionsGuard{controller(), recipientStates};
+ auto doc = makeStateDocument(false);
+ auto instanceId =
+ BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID());
+ auto opCtx = makeOperationContext();
+ auto prevState = RecipientStateEnum::kUnused;
+
+ auto reshardedDoc = BSON("_id" << 0 << "x" << 2 << "y" << 10);
+ long oplogEntriesAppliedOnEachDonor = 10L;
+
+ for (const auto state : recipientStates) {
+ auto recipient = [&] {
+ if (prevState == RecipientStateEnum::kUnused) {
+ RecipientStateMachine::insertStateDocument(opCtx.get(), doc);
+ return RecipientStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+ } else {
+ auto maybeRecipient =
+ RecipientStateMachine::lookup(opCtx.get(), _service, instanceId);
+ ASSERT_TRUE(bool(maybeRecipient));
+
+ // Allow the transition to prevState to succeed on this primary-only service
+ // instance.
+ stateTransitionsGuard.unset(prevState);
+ return *maybeRecipient;
+ }
+ }();
+
+ if (prevState == RecipientStateEnum::kCloning) {
+ std::vector<InsertStatement> inserts{InsertStatement(reshardedDoc)};
+ resharding::data_copy::insertBatch(opCtx.get(), doc.getTempReshardingNss(), inserts);
+ } else if (prevState == RecipientStateEnum::kApplying) {
+ auto insertFn = [&](const NamespaceString nss, const InsertStatement insertStatement) {
+ resharding::data_copy::ensureCollectionExists(
+ opCtx.get(), nss, CollectionOptions());
+
+ std::vector<InsertStatement> inserts{insertStatement};
+ resharding::data_copy::insertBatch(opCtx.get(), nss, inserts);
+ };
+
+ auto donorShards = doc.getDonorShards();
+ unsigned int i = 0;
+ for (const auto& donor : donorShards) {
+ // Setup oplogBuffer collection.
+ insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()),
+ InsertStatement{
+ BSON("_id" << (ReshardingDonorOplogId{{20, i}, {19, 0}}).toBSON())});
+ ++i;
+
+ // Setup reshardingApplierProgress collection.
+ auto progressDoc = BSON(
+ ReshardingOplogApplierProgress::kOplogSourceIdFieldName
+ << (ReshardingSourceId{doc.getReshardingUUID(), donor.getShardId()}).toBSON()
+ << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName
+ << oplogEntriesAppliedOnEachDonor);
+ insertFn(NamespaceString::kReshardingApplierProgressNamespace,
+ InsertStatement{progressDoc});
+ }
+ }
+
+ if (prevState != RecipientStateEnum::kUnused) {
+ // Allow the transition to prevState to succeed on this primary-only service
+ // instance.
+ stateTransitionsGuard.unset(prevState);
+ }
+
+ // Signal the coordinator's earliest state that allows the recipient's transition
+ // into 'state' to be valid. This mimics the real system where, upon step up, the
+ // new RecipientStateMachine instance gets refreshed with the coordinator's most
+ // recent state.
+ switch (state) {
+ case RecipientStateEnum::kCreatingCollection:
+ case RecipientStateEnum::kCloning: {
+ notifyToStartCloning(opCtx.get(), *recipient, doc);
+ break;
+ }
+ case RecipientStateEnum::kDone: {
+ notifyReshardingCommitting(opCtx.get(), *recipient, doc);
+ break;
+ }
+ default:
+ break;
+ }
+
+ // Step down before the transition to state can complete.
+ stateTransitionsGuard.wait(state);
+ if (state == RecipientStateEnum::kStrictConsistency) {
+ auto currOp = recipient
+ ->reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
+ MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
+ .get();
+ ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
+ ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
+ ASSERT_EQ(currOp.getStringField("recipientState"),
+ RecipientState_serializer(RecipientStateEnum::kApplying));
+ } else if (state == RecipientStateEnum::kDone) {
+ auto currOp = recipient
+ ->reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
+ MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
+ .get();
+ ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
+ ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
+ ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(),
+ (long)(1 * doc.getDonorShards().size()));
+ ASSERT_EQ(currOp.getField("oplogEntriesApplied").Long(),
+ oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size());
+ ASSERT_EQ(currOp.getStringField("recipientState"),
+ RecipientState_serializer(RecipientStateEnum::kStrictConsistency));
+ }
+ stepDown();
+
+ ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+
+ prevState = state;
+
+ recipient.reset();
+ stepUp(opCtx.get());
+ }
+}
+
} // namespace
} // namespace mongo