diff options
author | Randolph Tan <randolph@10gen.com> | 2021-06-15 14:43:35 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-08-11 20:38:34 +0000 |
commit | 900707746299e4684fa6b7f29beb65f6ff13b97c (patch) | |
tree | 6324cfd18872eed8e2bb3140464dda668559153e | |
parent | 3be1e000ea09a88202cf7bf754d4d490ba2863e7 (diff) | |
download | mongo-900707746299e4684fa6b7f29beb65f6ff13b97c.tar.gz |
SERVER-50937 Refactor resharding coordinator to consolidate distinct error handling into separate phases.
(cherry picked from commit cbddf73dc78aa6a208fe3a43ca5e8674f67d5b87)
5 files changed, 367 insertions, 171 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d27aea7d17e..7bd2b48d0a7 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -430,6 +430,7 @@ env.Library( target='sharding_mongod_test_fixture', source=[ 'sharding_mongod_test_fixture.cpp', + 'resharding/resharding_service_test_helpers.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/repl/drop_pending_collection_reaper', @@ -502,7 +503,6 @@ env.CppUnitTest( 'resharding/resharding_recipient_service_external_state_test.cpp', 'resharding/resharding_recipient_service_test.cpp', 'resharding/resharding_donor_service_test.cpp', - 'resharding/resharding_service_test_helpers.cpp', 'resharding/resharding_txn_cloner_test.cpp', 'session_catalog_migration_destination_test.cpp', 'session_catalog_migration_source_test.cpp', diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index e438004e78f..514da4f01b3 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -699,7 +699,7 @@ void writeParticipantShardsAndTempCollInfo( writeToConfigCollectionsForTempNss( opCtx, updatedCoordinatorDoc, chunkVersion, CollationSpec::kSimpleSpec, txnNumber); - insertChunkAndTagDocsForTempNss(opCtx, std::move(initialChunks), zones, txnNumber); + insertChunkAndTagDocsForTempNss(opCtx, initialChunks, zones, txnNumber); }); } @@ -981,6 +981,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator( }())), _reshardingCoordinatorExternalState(externalState) { _reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>(); + + // If the coordinator is recovering from step-up, make sure to properly initialize the + // promises to reflect the latest state of this resharding operation. + if (coordinatorDoc.getState() != CoordinatorStateEnum::kUnused) { + _reshardingCoordinatorObserver->onReshardingParticipantTransition(coordinatorDoc); + } } void ReshardingCoordinatorService::ReshardingCoordinator::installCoordinatorDoc( @@ -1044,50 +1050,96 @@ BSONObj createShardsvrCommitReshardCollectionCmd(const NamespaceString& nss, BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); } -ExecutorFuture<ReshardingCoordinatorDocument> -ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDecision( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept { +ExecutorFuture<void> +ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsReshardingStarted( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + if (_coordinatorDoc.getState() > CoordinatorStateEnum::kPreparingToDonate) { + return ExecutorFuture<void>(**executor, Status::OK()); + } + + return ExecutorFuture<void>(**executor) + .then([this] { + // Ensure the flushes to create participant state machines don't get interrupted + // upon abort. + _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor); + }) + .then([this, executor] { + pauseBeforeTellDonorToRefresh.pauseWhileSet(); + _tellAllDonorsToRefresh(executor); + }) + .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) + .onCompletion([this, self = shared_from_this()](Status status) { + // Swap back to using operation contexts canceled upon abort until ready to + // persist the decision or unrecoverable error. + _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor); + + return status; + }); +} + +ExecutorFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::_initializeCoordinator( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { return ExecutorFuture<void>(**executor) .then([this, executor] { _insertCoordDocAndChangeOrigCollEntry(); }) .then([this, executor] { _calculateParticipantsAndChunksThenWriteToDisk(); }) .onCompletion([this, executor](Status status) { if (_ctHolder->isSteppingOrShuttingDown()) { - // Propagate any errors from the coordinator stepping down. return ExecutorFuture<void>(**executor, status); } if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) { - // Propagate any errors if the coordinator failed before transitioning to - // kPreparingToDonate, meaning participants were never and should never be made - // aware of this failed resharding operation. - invariant(!status.isOK()); return ExecutorFuture<void>(**executor, status); } // Regardless of error or non-error, guarantee that once the coordinator completes its // transition to kPreparingToDonate, participants are aware of the resharding operation // and their state machines are created. - return ExecutorFuture<void>(**executor) - .then([this] { - // Ensure the flushes to create participant state machines don't get interrupted - // upon abort. - _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(), - _markKilledExecutor); - }) - .then([this, executor] { - pauseBeforeTellDonorToRefresh.pauseWhileSet(); - - _tellAllDonorsToRefresh(executor); - }) - .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) - .then([this] { - // Swap back to using operation contexts canceled upon abort until ready to - // persist the decision or unrecoverable error. - _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), - _markKilledExecutor); - }) - .then([status] { return status; }); + return _tellAllParticipantsReshardingStarted(executor); }) + .onError([this, self = shared_from_this(), executor](Status status) -> Status { + { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); + } + + if (_ctHolder->isSteppingOrShuttingDown()) { + return status; + } + + auto nss = _coordinatorDoc.getSourceNss(); + LOGV2(4956903, + "Resharding failed", + "namespace"_attr = nss.ns(), + "newShardKeyPattern"_attr = _coordinatorDoc.getReshardingKey(), + "error"_attr = status); + + if (_ctHolder->isAborted()) { + // If the abort cancellation token was triggered, implying that a user ran the abort + // command, override status with a resharding abort error. + // + // Note for debugging purposes: Ensure the original error status is recorded in the + // logs before replacing it. + status = {ErrorCodes::ReshardCollectionAborted, "aborted"}; + } + + if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) { + return status; + } + + if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) { + _onAbortCoordinatorOnly(executor, status); + } else { + _onAbortCoordinatorAndParticipants(executor, status); + } + + return status; + }); +} + +ExecutorFuture<ReshardingCoordinatorDocument> +ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToCommit( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept { + return ExecutorFuture<void>(**executor) .then([this, executor] { return _awaitAllDonorsReadyToDonate(executor); }) .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsFinishedCloning(executor); }) @@ -1126,28 +1178,21 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci status = {ErrorCodes::ReshardCollectionAborted, "aborted"}; } - if (_coordinatorDoc.getState() == CoordinatorStateEnum::kUnused) { - return status; - } + invariant(_coordinatorDoc.getState() >= CoordinatorStateEnum::kPreparingToDonate); + + _onAbortCoordinatorAndParticipants(executor, status); - if (_coordinatorDoc.getState() < CoordinatorStateEnum::kPreparingToDonate) { - // Participants were never made aware of the resharding operation. Abort without - // waiting for participant acknowledgement. - _onAbortCoordinatorOnly(executor, status); - } else { - _onAbortCoordinatorAndParticipants(executor, status); - } return status; }); } ExecutorFuture<void> -ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishReshardOperation( +ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOperation( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept { return ExecutorFuture<void>(**executor) .then([this, self = shared_from_this(), executor, updatedCoordinatorDoc] { - return _persistDecision(updatedCoordinatorDoc); + return _commit(updatedCoordinatorDoc); }) .then([this, self = shared_from_this(), executor] { _tellAllParticipantsToRefresh(_coordinatorDoc.getSourceNss(), executor); @@ -1181,10 +1226,13 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( _markKilledExecutor->startup(); _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor); - return _runUntilReadyToPersistDecision(executor) + return _initializeCoordinator(executor) + .then([this, self = shared_from_this(), executor] { + return _runUntilReadyToCommit(executor); + }) .then([this, self = shared_from_this(), executor]( const ReshardingCoordinatorDocument& updatedCoordinatorDoc) { - return _persistDecisionAndFinishReshardOperation(executor, updatedCoordinatorDoc); + return _commitAndFinishReshardOperation(executor, updatedCoordinatorDoc); }) .onCompletion([this, self = shared_from_this(), executor](Status status) { if (!_ctHolder->isSteppingOrShuttingDown() && @@ -1519,7 +1567,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrict .thenRunOn(**executor); } -Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecision( +Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit( const ReshardingCoordinatorDocument& coordinatorDoc) { if (_coordinatorDoc.getState() > CoordinatorStateEnum::kBlockingWrites) { return Status::OK(); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index b0a6a3a40c1..6f52a19e11e 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -277,19 +277,31 @@ private: }; /** + * Construct the initial chunks splits and write down the initial coordinator state to storage. + */ + ExecutorFuture<void> _initializeCoordinator( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** * Runs resharding up through preparing to persist the decision. */ - ExecutorFuture<ReshardingCoordinatorDocument> _runUntilReadyToPersistDecision( + ExecutorFuture<ReshardingCoordinatorDocument> _runUntilReadyToCommit( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) noexcept; /** * Runs resharding through persisting the decision until cleanup. */ - ExecutorFuture<void> _persistDecisionAndFinishReshardOperation( + ExecutorFuture<void> _commitAndFinishReshardOperation( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const ReshardingCoordinatorDocument& updatedCoordinatorDoc) noexcept; /** + * Inform all of the donors and recipients of this resharding operation to begin. + */ + ExecutorFuture<void> _tellAllParticipantsReshardingStarted( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** * Runs abort cleanup logic when only the coordinator is aware of the resharding operation. * * Only safe to call if an unrecoverable error is encountered before the coordinator completes @@ -373,7 +385,7 @@ private: * * Transitions to 'kCommitting'. */ - Future<void> _persistDecision(const ReshardingCoordinatorDocument& updatedDoc); + Future<void> _commit(const ReshardingCoordinatorDocument& updatedDoc); /** * Waits on _reshardingCoordinatorObserver to notify that: @@ -459,8 +471,7 @@ private: boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory; /** - * Must be locked while the `_canEnterCritical` or `_completionPromise` - * promises are fulfilled. + * Must be locked while the `_canEnterCritical` promise is being fulfilled. */ mutable Mutex _fulfillmentMutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_fulfillmentMutex"); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp index 36699779728..071ec5c94ae 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service_test.cpp @@ -42,7 +42,9 @@ #include "mongo/db/repl/wait_for_majority_service.h" #include "mongo/db/s/config/config_server_test_fixture.h" #include "mongo/db/s/resharding/resharding_coordinator_service.h" +#include "mongo/db/s/resharding/resharding_op_observer.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" +#include "mongo/db/s/resharding/resharding_service_test_helpers.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/idl/server_parameter_test_util.h" @@ -55,107 +57,23 @@ namespace mongo { namespace { -class OpObserverForTest; -class PauseDuringStateTransitions; +using CoordinatorStateTransitionController = + resharding_service_test_helpers::StateTransitionController<CoordinatorStateEnum>; +using OpObserverForTest = + resharding_service_test_helpers::OpObserverForTest<CoordinatorStateEnum, + ReshardingCoordinatorDocument>; +using PauseDuringStateTransitions = + resharding_service_test_helpers::PauseDuringStateTransitions<CoordinatorStateEnum>; -class CoordinatorStateTransitionController { +class CoordinatorOpObserverForTest : public OpObserverForTest { public: - CoordinatorStateTransitionController() = default; + CoordinatorOpObserverForTest(std::shared_ptr<CoordinatorStateTransitionController> controller) + : OpObserverForTest(std::move(controller), + NamespaceString::kConfigReshardingOperationsNamespace) {} - void waitUntilStateIsReached(CoordinatorStateEnum state) { - stdx::unique_lock lk(_mutex); - _waitUntilUnpausedCond.wait(lk, [this, state] { return _state == state; }); + CoordinatorStateEnum getState(const ReshardingCoordinatorDocument& coordinatorDoc) override { + return coordinatorDoc.getState(); } - -private: - friend OpObserverForTest; - friend PauseDuringStateTransitions; - - void setPauseDuringTransition(CoordinatorStateEnum state) { - stdx::lock_guard lk(_mutex); - _pauseDuringTransition.insert(state); - } - - void unsetPauseDuringTransition(CoordinatorStateEnum state) { - stdx::lock_guard lk(_mutex); - _pauseDuringTransition.erase(state); - _pauseDuringTransitionCond.notify_all(); - } - - void notifyNewStateAndWaitUntilUnpaused(OperationContext* opCtx, - CoordinatorStateEnum newState) { - stdx::unique_lock lk(_mutex); - _state = newState; - _waitUntilUnpausedCond.notify_all(); - opCtx->waitForConditionOrInterrupt(_pauseDuringTransitionCond, lk, [this, newState] { - return _pauseDuringTransition.count(newState) == 0; - }); - } - - Mutex _mutex = MONGO_MAKE_LATCH("CoordinatorStateTransitionController::_mutex"); - stdx::condition_variable _pauseDuringTransitionCond; - stdx::condition_variable _waitUntilUnpausedCond; - - std::set<CoordinatorStateEnum> _pauseDuringTransition; - CoordinatorStateEnum _state = CoordinatorStateEnum::kUnused; -}; - -class PauseDuringStateTransitions { -public: - PauseDuringStateTransitions(CoordinatorStateTransitionController* controller, - CoordinatorStateEnum state) - : PauseDuringStateTransitions(controller, std::vector<CoordinatorStateEnum>{state}) {} - - PauseDuringStateTransitions(CoordinatorStateTransitionController* controller, - std::vector<CoordinatorStateEnum> states) - : _controller{controller}, _states{std::move(states)} { - for (auto state : _states) { - _controller->setPauseDuringTransition(state); - } - } - - ~PauseDuringStateTransitions() { - for (auto state : _states) { - _controller->unsetPauseDuringTransition(state); - } - } - - PauseDuringStateTransitions(const PauseDuringStateTransitions&) = delete; - PauseDuringStateTransitions& operator=(const PauseDuringStateTransitions&) = delete; - - PauseDuringStateTransitions(PauseDuringStateTransitions&&) = delete; - PauseDuringStateTransitions& operator=(PauseDuringStateTransitions&&) = delete; - - void wait(CoordinatorStateEnum state) { - _controller->waitUntilStateIsReached(state); - } - - void unset(CoordinatorStateEnum state) { - _controller->unsetPauseDuringTransition(state); - } - -private: - CoordinatorStateTransitionController* const _controller; - const std::vector<CoordinatorStateEnum> _states; -}; - -class OpObserverForTest : public OpObserverNoop { -public: - OpObserverForTest(std::shared_ptr<CoordinatorStateTransitionController> controller) - : _controller{std::move(controller)} {} - - void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) override { - if (args.nss != NamespaceString::kConfigReshardingOperationsNamespace) { - return; - } - auto doc = - ReshardingCoordinatorDocument::parse({"OpObserverForTest"}, args.updateArgs.updatedDoc); - - _controller->notifyNewStateAndWaitUntilUnpaused(opCtx, doc.getState()); - } - -private: - std::shared_ptr<CoordinatorStateTransitionController> _controller; }; class ExternalStateForTest : public ReshardingCoordinatorExternalState { @@ -184,7 +102,8 @@ class ExternalStateForTest : public ReshardingCoordinatorExternalState { version.incMinor(); } } - return ParticipantShardsAndChunks({{}, {}, initialChunks}); + return ParticipantShardsAndChunks( + {coordinatorDoc.getDonorShards(), coordinatorDoc.getRecipientShards(), initialChunks}); } void sendCommandToShards(OperationContext* opCtx, @@ -247,7 +166,9 @@ public: invariant(_opObserverRegistry); _opObserverRegistry->addObserver(std::make_unique<OpObserverImpl>()); - _opObserverRegistry->addObserver(std::make_unique<OpObserverForTest>(_controller)); + _opObserverRegistry->addObserver(std::make_unique<ReshardingOpObserver>()); + _opObserverRegistry->addObserver( + std::make_unique<CoordinatorOpObserverForTest>(_controller)); _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext()); auto service = makeService(getServiceContext()); @@ -283,8 +204,61 @@ public: return doc; } - void notifyDonorsReadyToDonate(ReshardingCoordinator& coordinator, - ReshardingCoordinatorDocument& coordDoc) { + std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator> getCoordinator( + OperationContext* opCtx, repl::PrimaryOnlyService::InstanceID instanceId) { + auto coordinator = getCoordinatorIfExists(opCtx, instanceId); + ASSERT_TRUE(bool(coordinator)); + return coordinator; + } + + std::shared_ptr<ReshardingCoordinatorService::ReshardingCoordinator> getCoordinatorIfExists( + OperationContext* opCtx, repl::PrimaryOnlyService::InstanceID instanceId) { + auto coordinatorOpt = ReshardingCoordinatorService::ReshardingCoordinator::lookup( + opCtx, _service, instanceId); + if (!coordinatorOpt) { + return nullptr; + } + + auto coordinator = *coordinatorOpt; + return coordinator ? coordinator : nullptr; + } + + ReshardingCoordinatorDocument getCoordinatorDoc(OperationContext* opCtx) { + DBDirectClient client(opCtx); + + auto doc = client.findOne(NamespaceString::kConfigReshardingOperationsNamespace.ns(), {}); + IDLParserErrorContext errCtx("reshardingCoordFromTest"); + return ReshardingCoordinatorDocument::parse(errCtx, doc); + } + + void replaceCoordinatorDoc(OperationContext* opCtx, + const ReshardingCoordinatorDocument& newDoc) { + DBDirectClient client(opCtx); + + const BSONObj query(BSON(ReshardingCoordinatorDocument::kReshardingUUIDFieldName + << newDoc.getReshardingUUID())); + client.update( + NamespaceString::kConfigReshardingOperationsNamespace.ns(), {}, newDoc.toBSON()); + } + + void waitUntilCommittedCoordinatorDocReach(OperationContext* opCtx, + CoordinatorStateEnum state) { + DBDirectClient client(opCtx); + + while (true) { + auto coordinatorDoc = getCoordinatorDoc(opCtx); + + if (coordinatorDoc.getState() == state) { + break; + } + + sleepmillis(50); + } + } + + void makeDonorsReadyToDonate(OperationContext* opCtx) { + auto coordDoc = getCoordinatorDoc(opCtx); + auto donorShards = coordDoc.getDonorShards(); DonorShardContext donorCtx; donorCtx.setState(DonorStateEnum::kDonatingInitialData); @@ -294,11 +268,12 @@ public: } coordDoc.setDonorShards(donorShards); - coordinator.getObserver()->onReshardingParticipantTransition(coordDoc); + replaceCoordinatorDoc(opCtx, coordDoc); } - void notifyRecipientsFinishedCloning(ReshardingCoordinator& coordinator, - ReshardingCoordinatorDocument& coordDoc) { + void makeRecipientsFinishedCloning(OperationContext* opCtx) { + auto coordDoc = getCoordinatorDoc(opCtx); + auto shards = coordDoc.getRecipientShards(); RecipientShardContext ctx; ctx.setState(RecipientStateEnum::kApplying); @@ -307,11 +282,12 @@ public: } coordDoc.setRecipientShards(shards); - coordinator.getObserver()->onReshardingParticipantTransition(coordDoc); + replaceCoordinatorDoc(opCtx, coordDoc); } - void notifyRecipientsInStrictConsistency(ReshardingCoordinator& coordinator, - ReshardingCoordinatorDocument& coordDoc) { + void makeRecipientsBeInStrictConsistency(OperationContext* opCtx) { + auto coordDoc = getCoordinatorDoc(opCtx); + auto shards = coordDoc.getRecipientShards(); RecipientShardContext ctx; ctx.setState(RecipientStateEnum::kStrictConsistency); @@ -320,11 +296,11 @@ public: } coordDoc.setRecipientShards(shards); - coordinator.getObserver()->onReshardingParticipantTransition(coordDoc); + replaceCoordinatorDoc(opCtx, coordDoc); } - void notifyDonorsDone(ReshardingCoordinator& coordinator, - ReshardingCoordinatorDocument& coordDoc) { + void makeDonorsProceedToDone(OperationContext* opCtx) { + auto coordDoc = getCoordinatorDoc(opCtx); auto donorShards = coordDoc.getDonorShards(); DonorShardContext donorCtx; donorCtx.setState(DonorStateEnum::kDone); @@ -332,13 +308,12 @@ public: shard.setMutableState(donorCtx); } coordDoc.setDonorShards(donorShards); - coordDoc.setState(CoordinatorStateEnum::kCommitting); - coordinator.getObserver()->onReshardingParticipantTransition(coordDoc); + replaceCoordinatorDoc(opCtx, coordDoc); } - void notifyRecipientsDone(ReshardingCoordinator& coordinator, - ReshardingCoordinatorDocument& coordDoc) { + void makeRecipientsProceedToDone(OperationContext* opCtx) { + auto coordDoc = getCoordinatorDoc(opCtx); auto shards = coordDoc.getRecipientShards(); RecipientShardContext ctx; ctx.setState(RecipientStateEnum::kDone); @@ -346,9 +321,8 @@ public: shard.setMutableState(ctx); } coordDoc.setRecipientShards(shards); - coordDoc.setState(CoordinatorStateEnum::kCommitting); - coordinator.getObserver()->onReshardingParticipantTransition(coordDoc); + replaceCoordinatorDoc(opCtx, coordDoc); } CollectionType makeOriginalCollectionCatalogEntry( @@ -464,6 +438,51 @@ public: return donorChunk; } + void stepUp(OperationContext* opCtx) { + auto replCoord = repl::ReplicationCoordinator::get(getServiceContext()); + auto currOpTime = replCoord->getMyLastAppliedOpTime(); + + // Advance the term and last applied opTime. We retain the timestamp component of the + // current last applied opTime to avoid log messages from + // ReplClientInfo::setLastOpToSystemLastOpTime() about the opTime having moved backwards. + ++_term; + auto newOpTime = repl::OpTime{currOpTime.getTimestamp(), _term}; + + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + ASSERT_OK(replCoord->updateTerm(opCtx, _term)); + replCoord->setMyLastAppliedOpTimeAndWallTime({newOpTime, {}}); + + _registry->onStepUpComplete(opCtx, _term); + } + + void stepDown(OperationContext* opCtx) { + ASSERT_OK(repl::ReplicationCoordinator::get(getServiceContext()) + ->setFollowerMode(repl::MemberState::RS_SECONDARY)); + _registry->onStepDown(); + + // Some opCtx can be created via AlternativeClientRegion and not tied to any resharding + // cancellation token, so we also need to simulate the repl step down killOp thread. + + auto serviceCtx = opCtx->getServiceContext(); + for (ServiceContext::LockedClientsCursor cursor(serviceCtx); + Client* client = cursor.next();) { + stdx::lock_guard<Client> lk(*client); + if (client->isFromSystemConnection() && !client->canKillSystemOperationInStepdown(lk)) { + continue; + } + + OperationContext* toKill = client->getOperationContext(); + + if (toKill && !toKill->isKillPending() && toKill->getOpID() != opCtx->getOpID()) { + auto locker = toKill->lockState(); + if (toKill->shouldAlwaysInterruptAtStepDownOrUp() || + locker->wasGlobalLockTakenInModeConflictingWithWrites()) { + serviceCtx->killOperation(lk, toKill); + } + } + } + } + repl::PrimaryOnlyService* _service = nullptr; std::shared_ptr<CoordinatorStateTransitionController> _controller; @@ -497,6 +516,8 @@ public: "reshardingMinimumOperationDurationMillis", 0}; const std::vector<ShardId> _recipientShards = {{"shard0001"}}; + + long long _term = 0; }; TEST_F(ReshardingCoordinatorServiceTest, ReshardingCoordinatorSuccessfullyTransitionsTokDone) { @@ -524,31 +545,144 @@ TEST_F(ReshardingCoordinatorServiceTest, ReshardingCoordinatorSuccessfullyTransi doc.setPresetReshardedChunks(presetReshardedChunks); auto coordinator = ReshardingCoordinator::getOrCreate(opCtx, _service, doc.toBSON()); + stateTransitionsGuard.wait(CoordinatorStateEnum::kPreparingToDonate); stateTransitionsGuard.unset(CoordinatorStateEnum::kPreparingToDonate); - notifyDonorsReadyToDonate(*coordinator, doc); + waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kPreparingToDonate); + makeDonorsReadyToDonate(opCtx); stateTransitionsGuard.wait(CoordinatorStateEnum::kCloning); stateTransitionsGuard.unset(CoordinatorStateEnum::kCloning); + waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kCloning); - notifyRecipientsFinishedCloning(*coordinator, doc); + makeRecipientsFinishedCloning(opCtx); stateTransitionsGuard.wait(CoordinatorStateEnum::kApplying); stateTransitionsGuard.unset(CoordinatorStateEnum::kApplying); + waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kApplying); coordinator->onOkayToEnterCritical(); stateTransitionsGuard.wait(CoordinatorStateEnum::kBlockingWrites); stateTransitionsGuard.unset(CoordinatorStateEnum::kBlockingWrites); + waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kBlockingWrites); - notifyRecipientsInStrictConsistency(*coordinator, doc); + makeRecipientsBeInStrictConsistency(opCtx); stateTransitionsGuard.wait(CoordinatorStateEnum::kCommitting); stateTransitionsGuard.unset(CoordinatorStateEnum::kCommitting); - notifyDonorsDone(*coordinator, doc); - notifyRecipientsDone(*coordinator, doc); + waitUntilCommittedCoordinatorDocReach(opCtx, CoordinatorStateEnum::kCommitting); + + makeDonorsProceedToDone(opCtx); + makeRecipientsProceedToDone(opCtx); coordinator->getCompletionFuture().get(opCtx); } +/** + * Test stepping down right when coordinator doc is being updated. Causing the change to be + * rolled back and redo the work again on step up. + */ +TEST_F(ReshardingCoordinatorServiceTest, StepDownStepUpEachTransition) { + const std::vector<CoordinatorStateEnum> coordinatorStates{ + // Skip kInitializing, as we don't write that state transition to storage. + CoordinatorStateEnum::kPreparingToDonate, + CoordinatorStateEnum::kCloning, + CoordinatorStateEnum::kApplying, + CoordinatorStateEnum::kBlockingWrites, + CoordinatorStateEnum::kCommitting, + CoordinatorStateEnum::kDone}; + PauseDuringStateTransitions stateTransitionsGuard{controller(), coordinatorStates}; + + auto doc = insertStateAndCatalogEntries(CoordinatorStateEnum::kUnused, _originalEpoch); + auto opCtx = operationContext(); + auto donorChunk = makeAndInsertChunksForDonorShard( + _originalUUID, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); + + auto initialChunks = + makeChunks(_reshardingUUID, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); + + std::vector<ReshardedChunk> presetReshardedChunks; + for (const auto& chunk : initialChunks) { + presetReshardedChunks.emplace_back(chunk.getShard(), chunk.getMin(), chunk.getMax()); + } + + doc.setPresetReshardedChunks(presetReshardedChunks); + + (void)ReshardingCoordinator::getOrCreate(opCtx, _service, doc.toBSON()); + auto instanceId = + BSON(ReshardingCoordinatorDocument::kReshardingUUIDFieldName << doc.getReshardingUUID()); + + for (const auto state : coordinatorStates) { + auto coordinator = getCoordinator(opCtx, instanceId); + + LOGV2(5093701, + "Running step down test case", + "stepDownAfter"_attr = (CoordinatorState_serializer(state).toString())); + + switch (state) { + case CoordinatorStateEnum::kCloning: { + makeDonorsReadyToDonate(opCtx); + break; + } + + case CoordinatorStateEnum::kApplying: { + makeRecipientsFinishedCloning(opCtx); + break; + } + + case CoordinatorStateEnum::kBlockingWrites: { + // Pretend that the recipients are already ready to commit. + coordinator->onOkayToEnterCritical(); + break; + } + + case CoordinatorStateEnum::kCommitting: { + makeRecipientsBeInStrictConsistency(opCtx); + break; + } + + case CoordinatorStateEnum::kDone: { + makeDonorsProceedToDone(opCtx); + makeRecipientsProceedToDone(opCtx); + break; + } + + default: + break; + } + + if (state != CoordinatorStateEnum::kDone) { + // 'done' state is never written to storage so don't wait for it + stateTransitionsGuard.wait(state); + } + + stepDown(opCtx); + + ASSERT_EQ(coordinator->getCompletionFuture().getNoThrow(), + ErrorCodes::InterruptedDueToReplStateChange); + + coordinator.reset(); + stepUp(opCtx); + + stateTransitionsGuard.unset(state); + + if (state == CoordinatorStateEnum::kBlockingWrites) { + // We have to fake this again as the effects are not persistent. + coordinator = getCoordinator(opCtx, instanceId); + coordinator->onOkayToEnterCritical(); + } + + if (state != CoordinatorStateEnum::kDone) { + // 'done' state is never written to storage so don't wait for it. + waitUntilCommittedCoordinatorDocReach(opCtx, state); + } + } + + // Join the coordinator if it has not yet been cleaned up. + if (auto coordinator = getCoordinatorIfExists(opCtx, instanceId)) { + coordinator->getCompletionFuture().get(opCtx); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp index e512fde17f3..01b800167e6 100644 --- a/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp +++ b/src/mongo/db/s/resharding/resharding_service_test_helpers.cpp @@ -142,12 +142,15 @@ void OpObserverForTest<StateEnum, ReshardingDocument>::onUpdate(OperationContext template class StateTransitionController<DonorStateEnum>; template class StateTransitionController<RecipientStateEnum>; +template class StateTransitionController<CoordinatorStateEnum>; template class PauseDuringStateTransitions<DonorStateEnum>; template class PauseDuringStateTransitions<RecipientStateEnum>; +template class PauseDuringStateTransitions<CoordinatorStateEnum>; template class OpObserverForTest<DonorStateEnum, ReshardingDonorDocument>; template class OpObserverForTest<RecipientStateEnum, ReshardingRecipientDocument>; +template class OpObserverForTest<CoordinatorStateEnum, ReshardingCoordinatorDocument>; } // namespace resharding_service_test_helpers } // namespace mongo |