diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service_test.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service_test.cpp | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp index 2527bd79986..9faa03acf2d 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp @@ -42,6 +42,7 @@ #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_data_replication.h" +#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h" #include "mongo/db/s/resharding/resharding_recipient_service.h" #include "mongo/db/s/resharding/resharding_recipient_service_external_state.h" #include "mongo/db/s/resharding/resharding_service_test_helpers.h" @@ -598,5 +599,139 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) { } } +TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) { + const std::vector<RecipientStateEnum> recipientStates{RecipientStateEnum::kCreatingCollection, + RecipientStateEnum::kCloning, + RecipientStateEnum::kApplying, + RecipientStateEnum::kStrictConsistency, + RecipientStateEnum::kDone}; + // TODO (SERVER-57194): enable lock-free reads. + bool disableLockFreeReadsOriginalValue = storageGlobalParams.disableLockFreeReads; + storageGlobalParams.disableLockFreeReads = true; + ON_BLOCK_EXIT( + [&] { storageGlobalParams.disableLockFreeReads = disableLockFreeReadsOriginalValue; }); + + PauseDuringStateTransitions stateTransitionsGuard{controller(), recipientStates}; + auto doc = makeStateDocument(false); + auto instanceId = + BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID()); + auto opCtx = makeOperationContext(); + auto prevState = RecipientStateEnum::kUnused; + + auto reshardedDoc = BSON("_id" << 0 << "x" << 2 << "y" << 10); + long oplogEntriesAppliedOnEachDonor = 10L; + + for (const auto state : recipientStates) { + auto recipient = [&] { + if (prevState == RecipientStateEnum::kUnused) { + RecipientStateMachine::insertStateDocument(opCtx.get(), doc); + return RecipientStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON()); + } else { + auto maybeRecipient = + RecipientStateMachine::lookup(opCtx.get(), _service, instanceId); + ASSERT_TRUE(bool(maybeRecipient)); + + // Allow the transition to prevState to succeed on this primary-only service + // instance. + stateTransitionsGuard.unset(prevState); + return *maybeRecipient; + } + }(); + + if (prevState == RecipientStateEnum::kCloning) { + std::vector<InsertStatement> inserts{InsertStatement(reshardedDoc)}; + resharding::data_copy::insertBatch(opCtx.get(), doc.getTempReshardingNss(), inserts); + } else if (prevState == RecipientStateEnum::kApplying) { + auto insertFn = [&](const NamespaceString nss, const InsertStatement insertStatement) { + resharding::data_copy::ensureCollectionExists( + opCtx.get(), nss, CollectionOptions()); + + std::vector<InsertStatement> inserts{insertStatement}; + resharding::data_copy::insertBatch(opCtx.get(), nss, inserts); + }; + + auto donorShards = doc.getDonorShards(); + unsigned int i = 0; + for (const auto& donor : donorShards) { + // Setup oplogBuffer collection. + insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()), + InsertStatement{ + BSON("_id" << (ReshardingDonorOplogId{{20, i}, {19, 0}}).toBSON())}); + ++i; + + // Setup reshardingApplierProgress collection. + auto progressDoc = BSON( + ReshardingOplogApplierProgress::kOplogSourceIdFieldName + << (ReshardingSourceId{doc.getReshardingUUID(), donor.getShardId()}).toBSON() + << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName + << oplogEntriesAppliedOnEachDonor); + insertFn(NamespaceString::kReshardingApplierProgressNamespace, + InsertStatement{progressDoc}); + } + } + + if (prevState != RecipientStateEnum::kUnused) { + // Allow the transition to prevState to succeed on this primary-only service + // instance. + stateTransitionsGuard.unset(prevState); + } + + // Signal the coordinator's earliest state that allows the recipient's transition + // into 'state' to be valid. This mimics the real system where, upon step up, the + // new RecipientStateMachine instance gets refreshed with the coordinator's most + // recent state. + switch (state) { + case RecipientStateEnum::kCreatingCollection: + case RecipientStateEnum::kCloning: { + notifyToStartCloning(opCtx.get(), *recipient, doc); + break; + } + case RecipientStateEnum::kDone: { + notifyReshardingCommitting(opCtx.get(), *recipient, doc); + break; + } + default: + break; + } + + // Step down before the transition to state can complete. + stateTransitionsGuard.wait(state); + if (state == RecipientStateEnum::kStrictConsistency) { + auto currOp = recipient + ->reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, + MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) + .get(); + ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); + ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); + ASSERT_EQ(currOp.getStringField("recipientState"), + RecipientState_serializer(RecipientStateEnum::kApplying)); + } else if (state == RecipientStateEnum::kDone) { + auto currOp = recipient + ->reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle, + MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle) + .get(); + ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L); + ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize()); + ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(), + (long)(1 * doc.getDonorShards().size())); + ASSERT_EQ(currOp.getField("oplogEntriesApplied").Long(), + oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size()); + ASSERT_EQ(currOp.getStringField("recipientState"), + RecipientState_serializer(RecipientStateEnum::kStrictConsistency)); + } + stepDown(); + + ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); + + prevState = state; + + recipient.reset(); + stepUp(opCtx.get()); + } +} + } // namespace } // namespace mongo |