diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2021-07-12 17:58:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-08-19 19:18:21 +0000 |
commit | 98937ba21a64a127d6238d641fb676bdef797cf4 (patch) | |
tree | ff5df17a4f2006a28eeb5b9f42d95d5e467555ac | |
parent | 8c0f53e5b73298c520cf09f2f3d0dc7853d58011 (diff) | |
download | mongo-98937ba21a64a127d6238d641fb676bdef797cf4.tar.gz |
SERVER-58081 Make _flushReshardingStateChange return instead of blocking if the critical section is held
(cherry picked from commit 2ca1f733d619809d1e712860fc0070f0cc8d81f5)
4 files changed, 161 insertions, 58 deletions
diff --git a/src/mongo/db/s/flush_resharding_state_change_command.cpp b/src/mongo/db/s/flush_resharding_state_change_command.cpp index a159c530de0..b97dd33b5ea 100644 --- a/src/mongo/db/s/flush_resharding_state_change_command.cpp +++ b/src/mongo/db/s/flush_resharding_state_change_command.cpp @@ -35,19 +35,45 @@ #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" +#include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/op_observer.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_state.h" #include "mongo/logv2/log.h" #include "mongo/s/catalog_cache_loader.h" +#include "mongo/s/grid.h" #include "mongo/s/request_types/flush_resharding_state_change_gen.h" namespace mongo { namespace { +void doNoopWrite(OperationContext* opCtx, const NamespaceString& nss) { + writeConflictRetry( + opCtx, "_flushReshardingStateChange no-op", NamespaceString::kRsOplogNamespace.ns(), [&] { + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); + + const std::string msg = str::stream() + << "no-op for _flushReshardingStateChange on " << nss; + WriteUnitOfWork wuow(opCtx); + opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( + opCtx, + {}, + boost::none, + BSON("msg" << msg), + boost::none, + boost::none, + boost::none, + boost::none, + boost::none); + wuow.commit(); + }); +} + class FlushReshardingStateChangeCmd final : public TypedCommand<FlushReshardingStateChangeCmd> { public: using Request = _flushReshardingStateChange; @@ -101,11 +127,27 @@ public: "Can't call _flushReshardingStateChange if in read-only mode", !storageGlobalParams.readOnly); - onShardVersionMismatch(opCtx, ns(), boost::none /* shardVersionReceived */); - - CatalogCacheLoader::get(opCtx).waitForCollectionFlush(opCtx, ns()); - - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + ExecutorFuture<void>(Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor()) + .then([svcCtx = opCtx->getServiceContext(), nss = ns()] { + ThreadClient tc("FlushReshardingStateChange", svcCtx); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + + auto opCtx = tc->makeOperationContext(); + onShardVersionMismatch( + opCtx.get(), nss, boost::none /* shardVersionReceived */); + }) + .onError([](const Status& status) { + LOGV2_WARNING(5808100, + "Error on deferred _flushReshardingStateChange execution", + "error"_attr = redact(status)); + }) + .getAsync([](auto) {}); + + // Ensure the command isn't run on a stale primary. + doNoopWrite(opCtx, ns()); } }; } _flushReshardingStateChange; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index acfa9b205cc..3ddaae7ff63 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -58,6 +58,7 @@ #include "mongo/s/request_types/abort_reshard_collection_gen.h" #include "mongo/s/request_types/commit_reshard_collection_gen.h" #include "mongo/s/request_types/flush_resharding_state_change_gen.h" +#include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h" #include "mongo/s/shard_id.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/fail_point.h" @@ -593,6 +594,13 @@ void executeMetadataChangesInTxn( }); } +BSONObj makeFlushRoutingTableCacheUpdatesCmd(const NamespaceString& nss) { + auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss); + cmd.setSyncFromConfig(true); + cmd.setDbName(nss.db()); + return CommandHelpers::appendMajorityWriteConcern(cmd.toBSON({})).getOwned(); +} + } // namespace namespace resharding { @@ -1071,9 +1079,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsReshard }) .then([this, executor] { pauseBeforeTellDonorToRefresh.pauseWhileSet(); - _tellAllDonorsToRefresh(executor); + _establishAllDonorsAsParticipants(executor); }) - .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) + .then([this, executor] { _establishAllRecipientsAsParticipants(executor); }) .onCompletion([this](Status status) { // Swap back to using operation contexts canceled upon abort until ready to // persist the decision or unrecoverable error. @@ -1242,7 +1250,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOper return _commit(updatedCoordinatorDoc); }) .then([this, executor] { - _tellAllParticipantsToRefresh(_coordinatorDoc.getSourceNss(), executor); + _tellAllParticipantsToCommit(_coordinatorDoc.getSourceNss(), executor); }) .then([this] { _updateChunkImbalanceMetrics(_coordinatorDoc.getSourceNss()); }) .then([this, executor] { return _awaitAllParticipantShardsDone(executor); }) @@ -1278,6 +1286,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_commitAndFinishReshardOper "error"_attr = redact(status)); }); } + SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& stepdownToken) noexcept { @@ -1740,11 +1749,66 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: installCoordinatorDoc(opCtx.get(), updatedCoordinatorDoc); } -void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { +void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllParticipants( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto recipientIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); + auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); + auto recipientShardIds = + extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); + std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()}; + participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); + _reshardingCoordinatorExternalState->sendCommandToShards( + opCtx.get(), + NamespaceString::kAdminDb, + command, + {participantShardIds.begin(), participantShardIds.end()}, + **executor); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllRecipients( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto recipientShardIds = + extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); + + _reshardingCoordinatorExternalState->sendCommandToShards( + opCtx.get(), + NamespaceString::kAdminDb, + command, + {recipientShardIds.begin(), recipientShardIds.end()}, + **executor); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::_sendCommandToAllDonors( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command) { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); + + _reshardingCoordinatorExternalState->sendCommandToShards( + opCtx.get(), + NamespaceString::kAdminDb, + command, + {donorShardIds.begin(), donorShardIds.end()}, + **executor); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::_establishAllDonorsAsParticipants( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + invariant(_coordinatorDoc.getState() == CoordinatorStateEnum::kPreparingToDonate); + auto flushCmd = makeFlushRoutingTableCacheUpdatesCmd(_coordinatorDoc.getSourceNss()); + _sendCommandToAllDonors(executor, flushCmd); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::_establishAllRecipientsAsParticipants( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { + invariant(_coordinatorDoc.getState() == CoordinatorStateEnum::kPreparingToDonate); + auto flushCmd = makeFlushRoutingTableCacheUpdatesCmd(_coordinatorDoc.getTempReshardingNss()); + _sendCommandToAllRecipients(executor, flushCmd); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { NamespaceString nssToRefresh; // Refresh the temporary namespace if the coordinator is in a state prior to 'kCommitting'. // A refresh of recipients while in 'kCommitting' should be accompanied by a refresh of @@ -1757,68 +1821,30 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe auto refreshCmd = createFlushReshardingStateChangeCommand(nssToRefresh, _coordinatorDoc.getReshardingUUID()); - - _reshardingCoordinatorExternalState->sendCommandToShards( - opCtx.get(), - NamespaceString::kAdminDb, - refreshCmd, - {recipientIds.begin(), recipientIds.end()}, - **executor); + _sendCommandToAllRecipients(executor, refreshCmd); } void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto donorIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); - auto refreshCmd = createFlushReshardingStateChangeCommand(_coordinatorDoc.getSourceNss(), _coordinatorDoc.getReshardingUUID()); - _reshardingCoordinatorExternalState->sendCommandToShards(opCtx.get(), - NamespaceString::kAdminDb, - refreshCmd, - {donorIds.begin(), donorIds.end()}, - **executor); + _sendCommandToAllDonors(executor, refreshCmd); } -void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh( +void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToCommit( const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - - auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); - auto recipientShardIds = - extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); - std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()}; - participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); - auto commitCmd = createShardsvrCommitReshardCollectionCmd(nss, _coordinatorDoc.getReshardingUUID()); - _reshardingCoordinatorExternalState->sendCommandToShards( - opCtx.get(), - NamespaceString::kAdminDb, - commitCmd, - {participantShardIds.begin(), participantShardIds.end()}, - **executor); + _sendCommandToAllParticipants(executor, commitCmd); } void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToAbort( const std::shared_ptr<executor::ScopedTaskExecutor>& executor, bool isUserAborted) { - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - - auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); - auto recipientShardIds = - extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); - std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()}; - participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end()); - ShardsvrAbortReshardCollection abortCmd(_coordinatorDoc.getReshardingUUID(), isUserAborted); abortCmd.setDbName("admin"); - - sharding_util::sendCommandToShards(opCtx.get(), - NamespaceString::kAdminDb, - abortCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField - << WriteConcernOptions::Majority)), - {participantShardIds.begin(), participantShardIds.end()}, - **executor); + _sendCommandToAllParticipants(executor, + abortCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField + << WriteConcernOptions::Majority))); } void ReshardingCoordinatorService::ReshardingCoordinator::_updateChunkImbalanceMetrics( diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index ec479666469..ec34d0c2a88 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -411,6 +411,30 @@ private: boost::optional<Status> abortReason = boost::none); /** + * Sends the command to the specified participants asynchronously. + */ + void _sendCommandToAllParticipants( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor, const BSONObj& command); + void _sendCommandToAllDonors(const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const BSONObj& command); + void _sendCommandToAllRecipients(const std::shared_ptr<executor::ScopedTaskExecutor>& executor, + const BSONObj& command); + + /** + * Sends '_flushRoutingTableCacheUpdatesWithWriteConcern' to ensure donor state machine creation + * by the time the refresh completes. + */ + void _establishAllDonorsAsParticipants( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Sends '_flushRoutingTableCacheUpdatesWithWriteConcern' to ensure recipient state machine + * creation by the time the refresh completes. + */ + void _establishAllRecipientsAsParticipants( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** * Sends '_flushReshardingStateChange' to all recipient shards. * * When the coordinator is in a state before 'kCommitting', refreshes the temporary @@ -425,9 +449,9 @@ private: void _tellAllDonorsToRefresh(const std::shared_ptr<executor::ScopedTaskExecutor>& executor); /** - * Sends '_flushReshardingStateChange' for the original namespace to all participant shards. + * Sends '_shardsvrCommitReshardCollection' to all participant shards. */ - void _tellAllParticipantsToRefresh( + void _tellAllParticipantsToCommit( const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor); /** diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 8e1a936a524..f7342dec5cd 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -344,6 +344,17 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardin _dropOriginalCollectionThenTransitionToDone(); } else if (_donorCtx.getState() != DonorStateEnum::kDone) { + { + // Unblock the RecoverRefreshThread as quickly as possible when aborting. + stdx::lock_guard<Latch> lk(_mutex); + ensureFulfilledPromise(lk, + _critSecWasAcquired, + {ErrorCodes::ReshardCollectionAborted, "aborted"}); + ensureFulfilledPromise(lk, + _critSecWasPromoted, + {ErrorCodes::ReshardCollectionAborted, "aborted"}); + } + // If aborted, the donor must be allowed to transition to done from any state. _transitionState(DonorStateEnum::kDone); } |