diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2020-04-24 15:38:30 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-05-13 21:51:36 +0000 |
commit | c97f0868afcd8d8ce72a14f09283c827b33d44e9 (patch) | |
tree | e78eac02bfb310049f8c5061912897430218f65b | |
parent | 0765846b1a3a5035db49e29e61648921529402aa (diff) | |
download | mongo-c97f0868afcd8d8ce72a14f09283c827b33d44e9.tar.gz |
SERVER-47710 Make onConfirmedSet update config.shards serially
(cherry picked from commit 71a01bc43a26ec328657d7d9768ce11cad6f89a7)
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 169 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 141 |
2 files changed, 238 insertions, 72 deletions
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 4c42d866007..989ccb56629 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -91,7 +91,9 @@ auto makeEgressHooksList(ServiceContext* service) { * Updates the config server field of the shardIdentity document with the given connection string if * setName is equal to the config server replica set name. */ -class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener { +class ShardingReplicaSetChangeListener final + : public ReplicaSetChangeNotifier::Listener, + public std::enable_shared_from_this<ShardingReplicaSetChangeListener> { public: ShardingReplicaSetChangeListener(ServiceContext* serviceContext) : _serviceContext(serviceContext) {} @@ -111,49 +113,24 @@ public: LOGV2(471692, "Unable to update the shard registry", "error"_attr = e); } - Grid::get(_serviceContext) - ->getExecutorPool() - ->getFixedExecutor() - ->schedule([serviceContext = _serviceContext, connStr](Status status) { - if (ErrorCodes::isCancelationError(status.code())) { - LOGV2_DEBUG(22067, - 2, - "Unable to schedule confirmed replica set update due to {error}", - "Unable to schedule confirmed replica set update", - "error"_attr = status); - return; - } - invariant(status); - - try { - LOGV2(22068, - "Updating config server with confirmed replica set {connectionString}", - "Updating config server with confirmed replica set", - "connectionString"_attr = connStr); - - if (MONGO_unlikely(failUpdateShardIdentityConfigString.shouldFail())) { - return; - } - - auto configsvrConnStr = Grid::get(serviceContext) - ->shardRegistry() - ->getConfigServerConnectionString(); - - // Only proceed if the notification is for the configsvr - if (configsvrConnStr.getSetName() != connStr.getSetName()) { - return; - } - - ThreadClient tc("updateShardIdentityConfigString", serviceContext); - auto opCtx = tc->makeOperationContext(); - - ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), - connStr); - } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) { - LOGV2(22069, "Unable to update config server", "error"_attr = e); - } - }); + auto setName = connStr.getSetName(); + bool updateInProgress = false; + { + stdx::lock_guard lock(_mutex); + if (!_hasUpdateState(lock, setName)) { + _updateStates.emplace(setName, std::make_shared<ReplSetConfigUpdateState>()); + } + + auto updateState = _updateStates.at(setName); + updateState->nextUpdateToSend = connStr; + updateInProgress = updateState->updateInProgress; + } + + if (!updateInProgress) { + _scheduleUpdateShardIdentityConfigString(setName); + } } + void onPossibleSet(const State& state) noexcept final { try { Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr); @@ -164,10 +141,116 @@ public: "error"_attr = ex); } } + void onDroppedSet(const Key&) noexcept final {} private: + // Schedules updates to the shard identity config string while preserving order. + void _scheduleUpdateShardIdentityConfigString(std::string setName) { + ConnectionString update; + { + stdx::lock_guard lock(_mutex); + if (!_hasUpdateState(lock, setName)) { + return; + } + auto updateState = _updateStates.at(setName); + if (updateState->updateInProgress) { + return; + } + updateState->updateInProgress = true; + update = updateState->nextUpdateToSend.get(); + updateState->nextUpdateToSend = boost::none; + } + + auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + executor->schedule([self = shared_from_this(), setName, update](Status status) { + self->_updateShardIdentityConfigString(status, setName, update); + }); + } + + void _updateShardIdentityConfigString(Status status, + std::string setName, + ConnectionString update) { + if (ErrorCodes::isCancelationError(status.code())) { + LOGV2_DEBUG(22067, + 2, + "Unable to schedule confirmed replica set update due to {error}", + "Unable to schedule confirmed replica set update", + "error"_attr = status); + stdx::lock_guard lk(_mutex); + _updateStates.erase(setName); + return; + } + invariant(status); + + if (MONGO_unlikely(failUpdateShardIdentityConfigString.shouldFail())) { + _endUpdateShardIdentityConfigString(setName, update); + return; + } + + auto configsvrConnStr = + Grid::get(_serviceContext)->shardRegistry()->getConfigServerConnectionString(); + + // Only proceed if the notification is for the configsvr. + if (configsvrConnStr.getSetName() != update.getSetName()) { + _endUpdateShardIdentityConfigString(setName, update); + return; + } + + try { + LOGV2(22068, + "Updating shard identity config string with confirmed replica set " + "{connectionString}", + "Updating shard identity config string with confirmed replica set", + "connectionString"_attr = update); + + + ThreadClient tc("updateShardIdentityConfigString", _serviceContext); + auto opCtx = tc->makeOperationContext(); + ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), update); + } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) { + LOGV2(22069, "Unable to update shard identity config string", "error"_attr = e); + } catch (...) { + _endUpdateShardIdentityConfigString(setName, update); + throw; + } + _endUpdateShardIdentityConfigString(setName, update); + } + + void _endUpdateShardIdentityConfigString(std::string setName, ConnectionString update) { + bool moreUpdates = false; + { + stdx::lock_guard lock(_mutex); + invariant(_hasUpdateState(lock, setName)); + auto updateState = _updateStates.at(setName); + updateState->updateInProgress = false; + moreUpdates = (updateState->nextUpdateToSend != boost::none); + if (!moreUpdates) { + _updateStates.erase(setName); + } + } + if (moreUpdates) { + auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + executor->schedule([self = shared_from_this(), setName](auto args) { + self->_scheduleUpdateShardIdentityConfigString(setName); + }); + } + } + + // Returns true if a ReplSetConfigUpdateState exists for replica set setName. + bool _hasUpdateState(WithLock, std::string setName) { + return (_updateStates.find(setName) != _updateStates.end()); + } + ServiceContext* _serviceContext; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardingReplicaSetChangeListenerMongod::mutex"); + + struct ReplSetConfigUpdateState { + bool updateInProgress = false; + boost::optional<ConnectionString> nextUpdateToSend; + }; + + stdx::unordered_map<std::string, std::shared_ptr<ReplSetConfigUpdateState>> _updateStates; }; } // namespace diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 8f378702a40..e2f3550486c 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -470,7 +470,9 @@ void initWireSpec() { spec.isInternalClient = true; } -class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener { +class ShardingReplicaSetChangeListener final + : public ReplicaSetChangeNotifier::Listener, + public std::enable_shared_from_this<ShardingReplicaSetChangeListener> { public: ShardingReplicaSetChangeListener(ServiceContext* serviceContext) : _serviceContext(serviceContext) {} @@ -491,29 +493,63 @@ public: "error"_attr = e); } - auto fun = [serviceContext = _serviceContext, connStr](auto args) { - if (ErrorCodes::isCancelationError(args.status.code())) { + auto setName = connStr.getSetName(); + bool updateInProgress = false; + { + stdx::lock_guard lock(_mutex); + if (!_hasUpdateState(lock, setName)) { + _updateStates.emplace(setName, std::make_shared<ReplSetConfigUpdateState>()); + } + auto updateState = _updateStates.at(setName); + updateState->nextUpdateToSend = connStr; + updateInProgress = updateState->updateInProgress; + } + + if (!updateInProgress) { + _scheduleUpdateConfigServer(setName); + } + } + + void onPossibleSet(const State& state) noexcept final { + try { + Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr); + } catch (const DBException& ex) { + LOGV2_DEBUG(22849, + 2, + "Unable to update sharding state with possible replica set due to {error}", + "Unable to update sharding state with possible replica set", + "error"_attr = ex); + } + } + + void onDroppedSet(const Key& key) noexcept final {} + +private: + // Schedules updates for replica set 'setName' on the config server. Loosly preserves ordering + // of update execution. Newer updates will not be overwritten by older updates in config.shards. + void _scheduleUpdateConfigServer(std::string setName) { + ConnectionString update; + { + stdx::lock_guard lock(_mutex); + if (!_hasUpdateState(lock, setName)) { return; } - invariant(args.status); - - try { - LOGV2(22846, - "Updating sharding state with confirmed replica set", - "connectionString"_attr = connStr); - if (MONGO_unlikely(failReplicaSetChangeConfigServerUpdateHook.shouldFail())) { - return; - } - ShardRegistry::updateReplicaSetOnConfigServer(serviceContext, connStr); - } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) { - LOGV2(22847, - "Unable to update sharding state with confirmed replica set", - "error"_attr = e); + auto updateState = _updateStates.at(setName); + if (updateState->updateInProgress) { + return; } - }; + updateState->updateInProgress = true; + update = updateState->nextUpdateToSend.get(); + updateState->nextUpdateToSend = boost::none; + } auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); - auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus(); + auto schedStatus = + executor + ->scheduleWork([self = shared_from_this(), setName, update](auto args) { + self->_updateConfigServer(args.status, setName, update); + }) + .getStatus(); if (ErrorCodes::isCancelationError(schedStatus.code())) { LOGV2_DEBUG(22848, 2, @@ -526,22 +562,69 @@ public: uassertStatusOK(schedStatus); } - void onPossibleSet(const State& state) noexcept final { + void _updateConfigServer(Status status, std::string setName, ConnectionString update) { + if (ErrorCodes::isCancelationError(status.code())) { + stdx::lock_guard lock(_mutex); + _updateStates.erase(setName); + return; + } + + if (MONGO_unlikely(failReplicaSetChangeConfigServerUpdateHook.shouldFail())) { + _endUpdateConfigServer(setName, update); + return; + } + try { - Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr); - } catch (const DBException& ex) { - LOGV2_DEBUG(22849, - 2, - "Unable to update sharding state with possible replica set due to {error}", - "Unable to update sharding state with possible replica set", - "error"_attr = ex); + LOGV2(22846, + "Updating sharding state with confirmed replica set", + "connectionString"_attr = update); + ShardRegistry::updateReplicaSetOnConfigServer(_serviceContext, update); + } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) { + LOGV2(22847, + "Unable to update sharding state with confirmed replica set", + "error"_attr = e); + } catch (...) { + _endUpdateConfigServer(setName, update); + throw; } + _endUpdateConfigServer(setName, update); } - void onDroppedSet(const Key& key) noexcept final {} + void _endUpdateConfigServer(std::string setName, ConnectionString update) { + bool moreUpdates = false; + { + stdx::lock_guard lock(_mutex); + invariant(_hasUpdateState(lock, setName)); + auto updateState = _updateStates.at(setName); + updateState->updateInProgress = false; + moreUpdates = (updateState->nextUpdateToSend != boost::none); + if (!moreUpdates) { + _updateStates.erase(setName); + } + } + if (moreUpdates) { + auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + executor->schedule([self = shared_from_this(), setName](auto args) { + self->_scheduleUpdateConfigServer(setName); + }); + } + } + + // Returns true if a ReplSetConfigUpdateState exists for replica set setName. + bool _hasUpdateState(WithLock, std::string setName) { + return (_updateStates.find(setName) != _updateStates.end()); + } -private: ServiceContext* _serviceContext; + + mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardingReplicaSetChangeListenerMongod::mutex"); + + struct ReplSetConfigUpdateState { + // True when an update to the config.shards is in progress. + bool updateInProgress = false; + boost::optional<ConnectionString> nextUpdateToSend; + }; + stdx::unordered_map<std::string, std::shared_ptr<ReplSetConfigUpdateState>> _updateStates; }; ExitCode runMongosServer(ServiceContext* serviceContext) { |