summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service_test.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_test.cpp135
1 files changed, 135 insertions, 0 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
index 2527bd79986..9faa03acf2d 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_data_replication.h"
+#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
#include "mongo/db/s/resharding/resharding_recipient_service.h"
#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h"
#include "mongo/db/s/resharding/resharding_service_test_helpers.h"
@@ -598,5 +599,139 @@ TEST_F(ReshardingRecipientServiceTest, TruncatesXLErrorOnRecipientDocument) {
}
}
+TEST_F(ReshardingRecipientServiceTest, RestoreMetricsAfterStepUp) {
+ const std::vector<RecipientStateEnum> recipientStates{RecipientStateEnum::kCreatingCollection,
+ RecipientStateEnum::kCloning,
+ RecipientStateEnum::kApplying,
+ RecipientStateEnum::kStrictConsistency,
+ RecipientStateEnum::kDone};
+ // TODO (SERVER-57194): enable lock-free reads.
+ bool disableLockFreeReadsOriginalValue = storageGlobalParams.disableLockFreeReads;
+ storageGlobalParams.disableLockFreeReads = true;
+ ON_BLOCK_EXIT(
+ [&] { storageGlobalParams.disableLockFreeReads = disableLockFreeReadsOriginalValue; });
+
+ PauseDuringStateTransitions stateTransitionsGuard{controller(), recipientStates};
+ auto doc = makeStateDocument(false);
+ auto instanceId =
+ BSON(ReshardingRecipientDocument::kReshardingUUIDFieldName << doc.getReshardingUUID());
+ auto opCtx = makeOperationContext();
+ auto prevState = RecipientStateEnum::kUnused;
+
+ auto reshardedDoc = BSON("_id" << 0 << "x" << 2 << "y" << 10);
+ long oplogEntriesAppliedOnEachDonor = 10L;
+
+ for (const auto state : recipientStates) {
+ auto recipient = [&] {
+ if (prevState == RecipientStateEnum::kUnused) {
+ RecipientStateMachine::insertStateDocument(opCtx.get(), doc);
+ return RecipientStateMachine::getOrCreate(opCtx.get(), _service, doc.toBSON());
+ } else {
+ auto maybeRecipient =
+ RecipientStateMachine::lookup(opCtx.get(), _service, instanceId);
+ ASSERT_TRUE(bool(maybeRecipient));
+
+ // Allow the transition to prevState to succeed on this primary-only service
+ // instance.
+ stateTransitionsGuard.unset(prevState);
+ return *maybeRecipient;
+ }
+ }();
+
+ if (prevState == RecipientStateEnum::kCloning) {
+ std::vector<InsertStatement> inserts{InsertStatement(reshardedDoc)};
+ resharding::data_copy::insertBatch(opCtx.get(), doc.getTempReshardingNss(), inserts);
+ } else if (prevState == RecipientStateEnum::kApplying) {
+ auto insertFn = [&](const NamespaceString nss, const InsertStatement insertStatement) {
+ resharding::data_copy::ensureCollectionExists(
+ opCtx.get(), nss, CollectionOptions());
+
+ std::vector<InsertStatement> inserts{insertStatement};
+ resharding::data_copy::insertBatch(opCtx.get(), nss, inserts);
+ };
+
+ auto donorShards = doc.getDonorShards();
+ unsigned int i = 0;
+ for (const auto& donor : donorShards) {
+ // Setup oplogBuffer collection.
+ insertFn(getLocalOplogBufferNamespace(doc.getSourceUUID(), donor.getShardId()),
+ InsertStatement{
+ BSON("_id" << (ReshardingDonorOplogId{{20, i}, {19, 0}}).toBSON())});
+ ++i;
+
+ // Setup reshardingApplierProgress collection.
+ auto progressDoc = BSON(
+ ReshardingOplogApplierProgress::kOplogSourceIdFieldName
+ << (ReshardingSourceId{doc.getReshardingUUID(), donor.getShardId()}).toBSON()
+ << ReshardingOplogApplierProgress::kNumEntriesAppliedFieldName
+ << oplogEntriesAppliedOnEachDonor);
+ insertFn(NamespaceString::kReshardingApplierProgressNamespace,
+ InsertStatement{progressDoc});
+ }
+ }
+
+ if (prevState != RecipientStateEnum::kUnused) {
+ // Allow the transition to prevState to succeed on this primary-only service
+ // instance.
+ stateTransitionsGuard.unset(prevState);
+ }
+
+ // Signal the coordinator's earliest state that allows the recipient's transition
+ // into 'state' to be valid. This mimics the real system where, upon step up, the
+ // new RecipientStateMachine instance gets refreshed with the coordinator's most
+ // recent state.
+ switch (state) {
+ case RecipientStateEnum::kCreatingCollection:
+ case RecipientStateEnum::kCloning: {
+ notifyToStartCloning(opCtx.get(), *recipient, doc);
+ break;
+ }
+ case RecipientStateEnum::kDone: {
+ notifyReshardingCommitting(opCtx.get(), *recipient, doc);
+ break;
+ }
+ default:
+ break;
+ }
+
+ // Step down before the transition to state can complete.
+ stateTransitionsGuard.wait(state);
+ if (state == RecipientStateEnum::kStrictConsistency) {
+ auto currOp = recipient
+ ->reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
+ MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
+ .get();
+ ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
+ ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
+ ASSERT_EQ(currOp.getStringField("recipientState"),
+ RecipientState_serializer(RecipientStateEnum::kApplying));
+ } else if (state == RecipientStateEnum::kDone) {
+ auto currOp = recipient
+ ->reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode::kExcludeIdle,
+ MongoProcessInterface::CurrentOpSessionsMode::kExcludeIdle)
+ .get();
+ ASSERT_EQ(currOp.getField("documentsCopied").Long(), 1L);
+ ASSERT_EQ(currOp.getField("bytesCopied").Long(), (long)reshardedDoc.objsize());
+ ASSERT_EQ(currOp.getField("oplogEntriesFetched").Long(),
+ (long)(1 * doc.getDonorShards().size()));
+ ASSERT_EQ(currOp.getField("oplogEntriesApplied").Long(),
+ oplogEntriesAppliedOnEachDonor * doc.getDonorShards().size());
+ ASSERT_EQ(currOp.getStringField("recipientState"),
+ RecipientState_serializer(RecipientStateEnum::kStrictConsistency));
+ }
+ stepDown();
+
+ ASSERT_EQ(recipient->getCompletionFuture().getNoThrow(),
+ ErrorCodes::InterruptedDueToReplStateChange);
+
+ prevState = state;
+
+ recipient.reset();
+ stepUp(opCtx.get());
+ }
+}
+
} // namespace
} // namespace mongo