diff options
author | Kshitij Gupta <kshitij.gupta@mongodb.com> | 2021-06-08 14:31:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-06-11 13:16:54 +0000 |
commit | fee0349f0f83aa84bf47d66c1af139bc07dc541d (patch) | |
tree | 2f3122d85ea263d041dc9640941d9da3236aacd5 | |
parent | 1dbdd89ea20ad7d4f5c6f7e8953d2348bc336270 (diff) | |
download | mongo-fee0349f0f83aa84bf47d66c1af139bc07dc541d.tar.gz |
SERVER-53912: ReshardingRecipientService instances to load metrics state
upon instantiation.
4 files changed, 193 insertions, 1 deletions
diff --git a/src/mongo/db/s/resharding/resharding_metrics.cpp b/src/mongo/db/s/resharding/resharding_metrics.cpp index 610b2c38005..6ffd3b2daab 100644 --- a/src/mongo/db/s/resharding/resharding_metrics.cpp +++ b/src/mongo/db/s/resharding/resharding_metrics.cpp @@ -153,7 +153,6 @@ void ReshardingMetrics::onStepUp(Role role) noexcept { // TODO SERVER-53913 Implement donor metrics rehydration. // TODO SERVER-53914 Implement coordinator metrics rehydration. - // TODO SERVER-53912 Implement recipient metrics rehydration. // TODO SERVER-57094 Resume the runningOperation duration from a timestamp stored on disk // instead of starting from the current time. diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index ba96366a3fb..b59638c31da 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -38,6 +38,8 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/index_builds_coordinator.h" #include "mongo/db/ops/delete.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/query/collation/collation_spec.h" @@ -950,11 +952,64 @@ ReshardingMetrics* ReshardingRecipientService::RecipientStateMachine::_metrics() void ReshardingRecipientService::RecipientStateMachine::_startMetrics() { if (_recipientCtx.getState() > RecipientStateEnum::kAwaitingFetchTimestamp) { _metrics()->onStepUp(ReshardingMetrics::Role::kRecipient); + _restoreMetrics(); } else { _metrics()->onStart(ReshardingMetrics::Role::kRecipient, getCurrentTime()); } } +void ReshardingRecipientService::RecipientStateMachine::_restoreMetrics() { + _metrics()->setRecipientState(_recipientCtx.getState()); + + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + { + AutoGetCollection tempReshardingColl( + opCtx.get(), _metadata.getTempReshardingNss(), MODE_IS); + if (tempReshardingColl) { + int64_t bytesCopied = tempReshardingColl->dataSize(opCtx.get()); + int64_t documentsCopied = tempReshardingColl->numRecords(opCtx.get()); + if (bytesCopied > 0) { + _metrics()->onDocumentsCopiedForCurrentOp(documentsCopied, bytesCopied); + } + } + } + + for (const auto& donor : _donorShards) { + { + AutoGetCollection oplogBufferColl( + opCtx.get(), + getLocalOplogBufferNamespace(_metadata.getSourceUUID(), donor.getShardId()), + MODE_IS); + if (oplogBufferColl) { + int64_t recordsFetched = oplogBufferColl->numRecords(opCtx.get()); + if (recordsFetched > 0) + _metrics()->onOplogEntriesFetchedForCurrentOp(recordsFetched); + } + } + + { + AutoGetCollection progressApplierColl( + opCtx.get(), NamespaceString::kReshardingApplierProgressNamespace, MODE_IS); + if (progressApplierColl) { + BSONObj result; + Helpers::findOne( + opCtx.get(), + progressApplierColl.getCollection(), + BSON(ReshardingOplogApplierProgress::kOplogSourceIdFieldName + << (ReshardingSourceId{_metadata.getReshardingUUID(), donor.getShardId()}) + .toBSON()), + result); + + if (!result.isEmpty()) { + _metrics()->onOplogEntriesAppliedForCurrentOp( + result.getField(ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName) + .Long()); + } + } + } + } +} + CancellationToken ReshardingRecipientService::RecipientStateMachine::_initAbortSource( const CancellationToken& stepdownToken) { { diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index df7a6c55a90..e0e4357cead 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -229,6 +229,9 @@ private: void _startMetrics(); + // Restore metrics using the persisted metrics after stepping up. + void _restoreMetrics(); + // Initializes the _abortSource and generates a token from it to return back the caller. // // Should only be called once per lifetime. diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 2527bd79986..9faa03acf2d 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_data_replication.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" #include "mongo/db/s/resharding/resharding_recipient_service.h" #include "mongo/db/s/resharding/resharding_recipient_service_external_state.h" #include "mongo/db/s/resharding/resharding_service_test_helpers.h" @@ -598,5 +599,139 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) { } } +TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { + const std::vector<RecipientStateEnum> recipientStates{RecipientStateEnum::kCreatingCollection, + RecipientStateEnum::kCloning, + RecipientStateEnum::kApplying, + RecipientStateEnum::kStrictConsistency, + RecipientStateEnum::kDone}; + // TODO (SERVER-57194): enable lock-free reads. + bool disableLockFreeReadsOriginalValue = storageGlobalParams.disableLockFreeReads; + storageGlobalParams.disableLockFreeReads = true; + ON_BLOCK_EXIT( + [&] { storageGlobalParams.disableLockFreeReads = disableLockFreeReadsOriginalValue; }); + + PauseDuringStateTransitions stateTransitionsGuard{controller(), recipientStates}; + auto doc = makeStateDocument(false); + auto instanceId = + BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID()); + auto opCtx = makeOperationContext(); + auto prevState = RecipientStateEnum::kUnused; + + auto reshardedDoc = BSON("_id" << 0 << "x" << 2 << "y" << 10); + long oplogEntriesAppliedOnEachDonor = 10L; + + for (const auto state : recipientStates) { + auto recipient = [&] { + if (prevState == RecipientStateEnum::kUnused) { + RecipientStateMachine::insertStateDocument(opCtx.get(), doc); + return RecipientStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); + } else { + auto maybeRecipient = + RecipientStateMachine::lookup(opCtx.get(), _service, instanceId); + ASSERT_TRUE(bool(maybeRecipient)); + + // Allow the transition to prevState to succeed on this primary-only service + // instance. + stateTransitionsGuard.unset(prevState); + return *maybeRecipient; + } + }(); + + if (prevState == RecipientStateEnum::kCloning) { + std::vector<InsertStatement> inserts{InsertStatement(reshardedDoc)}; + resharding::data_copy::insertBatch(opCtx.get(), doc.getTempReshardingNss(), inserts); + } else if (prevState == RecipientStateEnum::kApplying) { + auto insertFn = [&](const NamespaceString nss, const InsertStatement insertStatement) { + resharding::data_copy::ensureCollectionExists( + opCtx.get(), nss, CollectionOptions()); + + std::vector<InsertStatement> inserts{insertStatement}; + resharding::data_copy::insertBatch(opCtx.get(), nss, inserts); + }; + + auto donorShards = doc.getDonorShards(); + unsigned int i = 0; + for (const auto& donor : donorShards) { + // Setup oplogBuffer collection. + insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()), + InsertStatement{ + BSON("_id" << (ReshardingDonorOplogId{{20, i}, {19, 0}}).toBSON())}); + ++i; + + // Setup reshardingApplierProgress collection. + auto progressDoc = BSON( + ReshardingOplogApplierProgress::kOplogSourceIdFieldName + << (ReshardingSourceId{doc.getReshardingUUID(), donor.getShardId()}).toBSON() + << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName + << oplogEntriesAppliedOnEachDonor); + insertFn(NamespaceString::kReshardingApplierProgressNamespace, + InsertStatement{progressDoc}); + } + } + + if (prevState != RecipientStateEnum::kUnused) { + // Allow the transition to prevState to succeed on this primary-only service + // instance. + stateTransitionsGuard.unset(prevState); + } + + // Signal the coordinator's earliest state that allows the recipient's transition + // into 'state' to be valid. This mimics the real system where, upon step up, the + // new RecipientStateMachine instance gets refreshed with the coordinator's most + // recent state. + switch (state) { + case RecipientStateEnum::kCreatingCollection: + case RecipientStateEnum::kCloning: { + notifyToStartCloning(opCtx.get(), *recipient, doc); + break; + } + case RecipientStateEnum::kDone: { + notifyReshardingCommitting(opCtx.get(), *recipient, doc); + break; + } + default: + break; + } + + // Step down before the transition to state can complete. + stateTransitionsGuard.wait(state); + if (state == RecipientStateEnum::kStrictConsistency) { + auto currOp = recipient + ->reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, + MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) + .get(); + ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); + ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); + ASSERT_EQ(currOp.getStringField("recipientState"), + RecipientState_serializer(RecipientStateEnum::kApplying)); + } else if (state == RecipientStateEnum::kDone) { + auto currOp = recipient + ->reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, + MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) + .get(); + ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); + ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); + ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(), + (long)(1 * doc.getDonorShards().size())); + ASSERT_EQ(currOp.getField("oplogEntriesApplied").Long(), + oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size()); + ASSERT_EQ(currOp.getStringField("recipientState"), + RecipientState_serializer(RecipientStateEnum::kStrictConsistency)); + } + stepDown(); + + ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); + + prevState = state; + + recipient.reset(); + stepUp(opCtx.get()); + } +} + } // namespace } // namespace mongo |