diff options
Diffstat (limited to 'src/mongo')
10 files changed, 148 insertions, 29 deletions
diff --git a/src/mongo/db/s/config/configsvr_coordinator.cpp b/src/mongo/db/s/config/configsvr_coordinator.cpp index af9e21db355..0160c830e8b 100644 --- a/src/mongo/db/s/config/configsvr_coordinator.cpp +++ b/src/mongo/db/s/config/configsvr_coordinator.cpp @@ -72,6 +72,17 @@ void ConfigsvrCoordinator::_removeStateDocument(OperationContext* opCtx) { WriteConcerns::kMajorityWriteConcernNoTimeout); } +OperationSessionInfo ConfigsvrCoordinator::_getCurrentSession() const { + const auto& coordinatorMetadata = metadata(); + invariant(coordinatorMetadata.getSession()); + ConfigsvrCoordinatorSession coordinatorSession = *coordinatorMetadata.getSession(); + + OperationSessionInfo osi; + osi.setSessionId(coordinatorSession.getLsid()); + osi.setTxnNumber(coordinatorSession.getTxnNumber()); + return osi; +} + void ConfigsvrCoordinator::interrupt(Status status) noexcept { LOGV2_DEBUG(6347303, 1, @@ -113,6 +124,15 @@ SemiFuture<void> ConfigsvrCoordinator::run(std::shared_ptr<executor::ScopedTaskE auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); _removeStateDocument(opCtx); + + const auto session = metadata().getSession(); + if (session && status.isOK()) { + // Return lsid to the InternalSessionPool. If status is not OK, let the session + // be discarded. + InternalSessionPool::get(opCtx)->release( + {session->getLsid(), session->getTxnNumber()}); + } + } catch (DBException& ex) { LOGV2_WARNING(6347302, "Failed to remove ConfigsvrCoordinator state document", diff --git a/src/mongo/db/s/config/configsvr_coordinator.h b/src/mongo/db/s/config/configsvr_coordinator.h index 426223e8dae..6c7facecb9a 100644 --- a/src/mongo/db/s/config/configsvr_coordinator.h +++ b/src/mongo/db/s/config/configsvr_coordinator.h @@ -65,10 +65,47 @@ protected: virtual ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept = 0; + virtual const ConfigsvrCoordinatorMetadata& metadata() const = 0; + void interrupt(Status status) noexcept override final; void _removeStateDocument(OperationContext* opCtx); + template <typename StateDoc> + void _updateStateDocument(OperationContext* opCtx, const StateDoc& newDoc) { + PersistentTaskStore<StateDoc> store(NamespaceString::kConfigsvrCoordinatorsNamespace); + store.update(opCtx, + BSON(StateDoc::kIdFieldName << newDoc.getId().toBSON()), + newDoc.toBSON(), + WriteConcerns::kMajorityWriteConcernShardingTimeout); + } + + template <typename StateDoc> + StateDoc _updateSession(OperationContext* opCtx, const StateDoc& doc) { + const auto newCoordinatorMetadata = [&] { + ConfigsvrCoordinatorMetadata newMetadata = doc.getConfigsvrCoordinatorMetadata(); + + const auto optPrevSession = doc.getSession(); + if (optPrevSession) { + newMetadata.setSession(ConfigsvrCoordinatorSession( + optPrevSession->getLsid(), optPrevSession->getTxnNumber() + 1)); + } else { + const auto newSession = InternalSessionPool::get(opCtx)->acquireSystemSession(); + newMetadata.setSession(ConfigsvrCoordinatorSession(newSession.getSessionId(), + newSession.getTxnNumber())); + } + + return newMetadata; + }(); + + StateDoc newDoc(doc); + newDoc.setConfigsvrCoordinatorMetadata(std::move(newCoordinatorMetadata)); + _updateStateDocument(opCtx, newDoc); + return newDoc; + } + + OperationSessionInfo _getCurrentSession() const; + Mutex _mutex = MONGO_MAKE_LATCH("ConfigsvrCoordinator::_mutex"); SharedPromise<void> _completionPromise; }; diff --git a/src/mongo/db/s/config/configsvr_coordinator.idl b/src/mongo/db/s/config/configsvr_coordinator.idl index dccf776b84d..b97e29fe6a4 100644 --- a/src/mongo/db/s/config/configsvr_coordinator.idl +++ b/src/mongo/db/s/config/configsvr_coordinator.idl @@ -52,6 +52,16 @@ structs: description: "Type of the ConfigsvrCoordinator" type: ConfigsvrCoordinatorType + ConfigsvrCoordinatorSession: + description: "Container for configsvr coordinator session used for guaranteeing remote + command idempotency." + strict: false + fields: + lsid: + type: LogicalSessionId + txnNumber: + type: TxnNumber + ConfigsvrCoordinatorMetadata: description: "Common metadata for all ConfigsvrCoordinators" generate_comparison_operators: false @@ -60,6 +70,6 @@ structs: _id: cpp_name: id type: ConfigsvrCoordinatorId - recoveredFromDisk: - type: bool - default: false + session: + type: ConfigsvrCoordinatorSession + optional: true diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp index 1a1a95b253e..209ebf8a2f3 100644 --- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp +++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.cpp @@ -87,6 +87,10 @@ void SetClusterParameterCoordinator::_enterPhase(Phase newPhase) { _doc = std::move(newDoc); } +const ConfigsvrCoordinatorMetadata& SetClusterParameterCoordinator::metadata() const { + return _doc.getConfigsvrCoordinatorMetadata(); +} + ExecutorFuture<void> SetClusterParameterCoordinator::_runImpl( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { diff --git a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h index 95aa2823608..ec0f0fead04 100644 --- a/src/mongo/db/s/config/set_cluster_parameter_coordinator.h +++ b/src/mongo/db/s/config/set_cluster_parameter_coordinator.h @@ -57,6 +57,8 @@ private: ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; + const ConfigsvrCoordinatorMetadata& metadata() const override; + template <typename Func> auto _executePhase(const Phase& newPhase, Func&& func) { return [=] { diff --git a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp index 28ff0648424..e719bdecc40 100644 --- a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp +++ b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.cpp @@ -62,18 +62,19 @@ ShardsvrSetUserWriteBlockMode makeShardsvrSetUserWriteBlockModeCommand( void sendSetUserWriteBlockModeCmdToAllShards(OperationContext* opCtx, std::shared_ptr<executor::TaskExecutor> executor, bool block, - ShardsvrSetUserWriteBlockModePhaseEnum phase) { + ShardsvrSetUserWriteBlockModePhaseEnum phase, + const OperationSessionInfo& osi) { const auto allShards = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); const auto shardsvrSetUserWriteBlockModeCmd = makeShardsvrSetUserWriteBlockModeCommand(block, phase); - sharding_util::sendCommandToShards( - opCtx, - shardsvrSetUserWriteBlockModeCmd.getDbName(), - CommandHelpers::appendMajorityWriteConcern(shardsvrSetUserWriteBlockModeCmd.toBSON({})), - allShards, - executor); + sharding_util::sendCommandToShards(opCtx, + shardsvrSetUserWriteBlockModeCmd.getDbName(), + CommandHelpers::appendMajorityWriteConcern( + shardsvrSetUserWriteBlockModeCmd.toBSON(osi.toBSON())), + allShards, + executor); } } // namespace @@ -115,15 +116,16 @@ void SetUserWriteBlockModeCoordinator::_enterPhase(Phase newPhase) { if (_doc.getPhase() == Phase::kUnset) { store.add(opCtx.get(), newDoc, WriteConcerns::kMajorityWriteConcernShardingTimeout); } else { - store.update(opCtx.get(), - BSON(StateDoc::kIdFieldName << _coordId.toBSON()), - newDoc.toBSON(), - WriteConcerns::kMajorityWriteConcernNoTimeout); + _updateStateDocument(opCtx.get(), newDoc); } _doc = std::move(newDoc); } +const ConfigsvrCoordinatorMetadata& SetUserWriteBlockModeCoordinator::metadata() const { + return _doc.getConfigsvrCoordinatorMetadata(); +} + ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { @@ -135,6 +137,11 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + // Get an incremented {lsid, txNnumber} pair that will be attached to the command + // sent to the shards to guarantee message replay protection. + _doc = _updateSession(opCtx, _doc); + const auto session = _getCurrentSession(); + // Ensure the topology is stable so we don't miss propagating the write blocking // state to any concurrently added shard. Keep it stable until we have persisted the // user write blocking state on the configsvr so that new shards that get added will @@ -147,7 +154,8 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl( opCtx, executor, _doc.getBlock(), - ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare); + ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare, + session); // Durably store the state on the configsvr. if (_doc.getBlock()) { @@ -177,6 +185,11 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + // Get an incremented {lsid, txNnumber} pair that will be attached to the command sent + // to the shards to guarantee message replay protection. + _doc = _updateSession(opCtx, _doc); + const auto session = _getCurrentSession(); + // Ensure the topology is stable so we don't miss propagating the write blocking state // to any concurrently added shard. Keep it stable until we have persisted the user // write blocking state on the configsvr so that new shards that get added will e see @@ -189,7 +202,8 @@ ExecutorFuture<void> SetUserWriteBlockModeCoordinator::_runImpl( opCtx, executor, _doc.getBlock(), - ShardsvrSetUserWriteBlockModePhaseEnum::kComplete); + ShardsvrSetUserWriteBlockModePhaseEnum::kComplete, + session); // Durably store the state on the configsvr. if (_doc.getBlock()) { diff --git a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h index f704ac4087b..6ab9d93304b 100644 --- a/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h +++ b/src/mongo/db/s/config/set_user_write_block_mode_coordinator.h @@ -57,6 +57,8 @@ private: ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; + const ConfigsvrCoordinatorMetadata& metadata() const override; + template <typename Func> auto _executePhase(const Phase& newPhase, Func&& func) { return [=] { diff --git a/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp b/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp index ef4f067f483..5ecfdc73bd6 100644 --- a/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp +++ b/src/mongo/db/s/shardsvr_set_user_write_block_mode_command.cpp @@ -33,6 +33,8 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/sharding_ddl_coordinator.h" #include "mongo/db/s/sharding_ddl_coordinator_service.h" #include "mongo/db/s/user_writes_recoverable_critical_section_service.h" @@ -63,10 +65,25 @@ public: hangInShardsvrSetUserWriteBlockMode.pauseWhileSet(); - const auto startBlocking = request().getGlobal(); + _runImpl(opCtx, request()); + + // Since it is possible that no actual write happened with this txnNumber, we need to + // make a dummy write so that secondaries can be aware of this txn. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" + << "SetUseWriteBlockModeStats"), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + + private: + void _runImpl(OperationContext* opCtx, const Request& request) { + const auto startBlocking = request.getGlobal(); if (startBlocking) { - switch (request().getPhase()) { + switch (request.getPhase()) { case ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare: UserWritesRecoverableCriticalSectionService::get(opCtx) ->acquireRecoverableCriticalSectionBlockNewShardedDDL( @@ -77,8 +94,8 @@ public: // Wait for ongoing ShardingDDLCoordinators to finish. This ensures that all // coordinators that started before enabling blocking have finish, and that // any new coordinator that is started after this point will see the - // blocking is enabled. Wait only for coordinators that don't have - // the user-write-blocking bypass enabled -- the ones allowed to bypass user + // blocking is enabled. Wait only for coordinators that don't have the + // user-write-blocking bypass enabled -- the ones allowed to bypass user // write blocking don't care about the write blocking state. { const auto shouldWaitPred = @@ -113,7 +130,7 @@ public: MONGO_UNREACHABLE; } } else { - switch (request().getPhase()) { + switch (request.getPhase()) { case ShardsvrSetUserWriteBlockModePhaseEnum::kPrepare: UserWritesRecoverableCriticalSectionService::get(opCtx) ->demoteRecoverableCriticalSectionToNoLongerBlockUserWrites( @@ -134,7 +151,6 @@ public: } } - private: NamespaceString ns() const override { return NamespaceString(); } diff --git a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp index e54abafe898..5aec4a6521f 100644 --- a/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp +++ b/src/mongo/db/s/user_writes_recoverable_critical_section_service.cpp @@ -334,12 +334,25 @@ void UserWritesRecoverableCriticalSectionService::releaseRecoverableCriticalSect // Release the critical section by deleting the critical section document. The OpObserver // will release the in-memory CS when reacting to the delete event. - PersistentTaskStore<UserWriteBlockingCriticalSectionDocument> store( - NamespaceString::kUserWritesCriticalSectionsNamespace); - store.remove( - opCtx, - BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName << nss.toString()), - ShardingCatalogClient::kLocalWriteConcern); + DBDirectClient dbClient(opCtx); + const auto cmdResponse = dbClient.runCommand([&] { + write_ops::DeleteCommandRequest deleteOp( + NamespaceString::kUserWritesCriticalSectionsNamespace); + + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(UserWriteBlockingCriticalSectionDocument::kNssFieldName + << nss.toString())); + // At most one doc can possibly match the above query. + entry.setMulti(false); + return entry; + }()}); + + return deleteOp.serialize({}); + }()); + + const auto commandReply = cmdResponse->getCommandReply(); + uassertStatusOK(getStatusFromWriteCommandReply(commandReply)); } LOGV2_DEBUG( diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index 39811a02295..5ae750ec2f5 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -60,7 +60,8 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1}, {"_configsvrRenameCollectionMetadata", 1}, {"_shardsvrParticipantBlock", 1}, {"_configsvrCollMod", 1}, - {"_shardsvrCollModParticipant", 1}}; + {"_shardsvrCollModParticipant", 1}, + {"_shardsvrSetUserWriteBlockMode", 1}}; // Commands that can be sent with session info but should not check out a session. const StringMap<int> skipSessionCheckoutList = { |