summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-04-24 15:38:30 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-05-13 21:51:36 +0000
commitc97f0868afcd8d8ce72a14f09283c827b33d44e9 (patch)
treee78eac02bfb310049f8c5061912897430218f65b
parent0765846b1a3a5035db49e29e61648921529402aa (diff)
downloadmongo-c97f0868afcd8d8ce72a14f09283c827b33d44e9.tar.gz
SERVER-47710 Make onConfirmedSet update config.shards serially
(cherry picked from commit 71a01bc43a26ec328657d7d9768ce11cad6f89a7)
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp169
-rw-r--r--src/mongo/s/server.cpp141
2 files changed, 238 insertions, 72 deletions
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index 4c42d866007..989ccb56629 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -91,7 +91,9 @@ auto makeEgressHooksList(ServiceContext* service) {
* Updates the config server field of the shardIdentity document with the given connection string if
* setName is equal to the config server replica set name.
*/
-class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener {
+class ShardingReplicaSetChangeListener final
+ : public ReplicaSetChangeNotifier::Listener,
+ public std::enable_shared_from_this<ShardingReplicaSetChangeListener> {
public:
ShardingReplicaSetChangeListener(ServiceContext* serviceContext)
: _serviceContext(serviceContext) {}
@@ -111,49 +113,24 @@ public:
LOGV2(471692, "Unable to update the shard registry", "error"_attr = e);
}
- Grid::get(_serviceContext)
- ->getExecutorPool()
- ->getFixedExecutor()
- ->schedule([serviceContext = _serviceContext, connStr](Status status) {
- if (ErrorCodes::isCancelationError(status.code())) {
- LOGV2_DEBUG(22067,
- 2,
- "Unable to schedule confirmed replica set update due to {error}",
- "Unable to schedule confirmed replica set update",
- "error"_attr = status);
- return;
- }
- invariant(status);
-
- try {
- LOGV2(22068,
- "Updating config server with confirmed replica set {connectionString}",
- "Updating config server with confirmed replica set",
- "connectionString"_attr = connStr);
-
- if (MONGO_unlikely(failUpdateShardIdentityConfigString.shouldFail())) {
- return;
- }
-
- auto configsvrConnStr = Grid::get(serviceContext)
- ->shardRegistry()
- ->getConfigServerConnectionString();
-
- // Only proceed if the notification is for the configsvr
- if (configsvrConnStr.getSetName() != connStr.getSetName()) {
- return;
- }
-
- ThreadClient tc("updateShardIdentityConfigString", serviceContext);
- auto opCtx = tc->makeOperationContext();
-
- ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(),
- connStr);
- } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
- LOGV2(22069, "Unable to update config server", "error"_attr = e);
- }
- });
+ auto setName = connStr.getSetName();
+ bool updateInProgress = false;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (!_hasUpdateState(lock, setName)) {
+ _updateStates.emplace(setName, std::make_shared<ReplSetConfigUpdateState>());
+ }
+
+ auto updateState = _updateStates.at(setName);
+ updateState->nextUpdateToSend = connStr;
+ updateInProgress = updateState->updateInProgress;
+ }
+
+ if (!updateInProgress) {
+ _scheduleUpdateShardIdentityConfigString(setName);
+ }
}
+
void onPossibleSet(const State& state) noexcept final {
try {
Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr);
@@ -164,10 +141,116 @@ public:
"error"_attr = ex);
}
}
+
void onDroppedSet(const Key&) noexcept final {}
private:
+ // Schedules updates to the shard identity config string while preserving order.
+ void _scheduleUpdateShardIdentityConfigString(std::string setName) {
+ ConnectionString update;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (!_hasUpdateState(lock, setName)) {
+ return;
+ }
+ auto updateState = _updateStates.at(setName);
+ if (updateState->updateInProgress) {
+ return;
+ }
+ updateState->updateInProgress = true;
+ update = updateState->nextUpdateToSend.get();
+ updateState->nextUpdateToSend = boost::none;
+ }
+
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ executor->schedule([self = shared_from_this(), setName, update](Status status) {
+ self->_updateShardIdentityConfigString(status, setName, update);
+ });
+ }
+
+ void _updateShardIdentityConfigString(Status status,
+ std::string setName,
+ ConnectionString update) {
+ if (ErrorCodes::isCancelationError(status.code())) {
+ LOGV2_DEBUG(22067,
+ 2,
+ "Unable to schedule confirmed replica set update due to {error}",
+ "Unable to schedule confirmed replica set update",
+ "error"_attr = status);
+ stdx::lock_guard lk(_mutex);
+ _updateStates.erase(setName);
+ return;
+ }
+ invariant(status);
+
+ if (MONGO_unlikely(failUpdateShardIdentityConfigString.shouldFail())) {
+ _endUpdateShardIdentityConfigString(setName, update);
+ return;
+ }
+
+ auto configsvrConnStr =
+ Grid::get(_serviceContext)->shardRegistry()->getConfigServerConnectionString();
+
+ // Only proceed if the notification is for the configsvr.
+ if (configsvrConnStr.getSetName() != update.getSetName()) {
+ _endUpdateShardIdentityConfigString(setName, update);
+ return;
+ }
+
+ try {
+ LOGV2(22068,
+ "Updating shard identity config string with confirmed replica set "
+ "{connectionString}",
+ "Updating shard identity config string with confirmed replica set",
+ "connectionString"_attr = update);
+
+
+ ThreadClient tc("updateShardIdentityConfigString", _serviceContext);
+ auto opCtx = tc->makeOperationContext();
+ ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), update);
+ } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
+ LOGV2(22069, "Unable to update shard identity config string", "error"_attr = e);
+ } catch (...) {
+ _endUpdateShardIdentityConfigString(setName, update);
+ throw;
+ }
+ _endUpdateShardIdentityConfigString(setName, update);
+ }
+
+ void _endUpdateShardIdentityConfigString(std::string setName, ConnectionString update) {
+ bool moreUpdates = false;
+ {
+ stdx::lock_guard lock(_mutex);
+ invariant(_hasUpdateState(lock, setName));
+ auto updateState = _updateStates.at(setName);
+ updateState->updateInProgress = false;
+ moreUpdates = (updateState->nextUpdateToSend != boost::none);
+ if (!moreUpdates) {
+ _updateStates.erase(setName);
+ }
+ }
+ if (moreUpdates) {
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ executor->schedule([self = shared_from_this(), setName](auto args) {
+ self->_scheduleUpdateShardIdentityConfigString(setName);
+ });
+ }
+ }
+
+ // Returns true if a ReplSetConfigUpdateState exists for replica set setName.
+ bool _hasUpdateState(WithLock, std::string setName) {
+ return (_updateStates.find(setName) != _updateStates.end());
+ }
+
ServiceContext* _serviceContext;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardingReplicaSetChangeListenerMongod::mutex");
+
+ struct ReplSetConfigUpdateState {
+ bool updateInProgress = false;
+ boost::optional<ConnectionString> nextUpdateToSend;
+ };
+
+ stdx::unordered_map<std::string, std::shared_ptr<ReplSetConfigUpdateState>> _updateStates;
};
} // namespace
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 8f378702a40..e2f3550486c 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -470,7 +470,9 @@ void initWireSpec() {
spec.isInternalClient = true;
}
-class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener {
+class ShardingReplicaSetChangeListener final
+ : public ReplicaSetChangeNotifier::Listener,
+ public std::enable_shared_from_this<ShardingReplicaSetChangeListener> {
public:
ShardingReplicaSetChangeListener(ServiceContext* serviceContext)
: _serviceContext(serviceContext) {}
@@ -491,29 +493,63 @@ public:
"error"_attr = e);
}
- auto fun = [serviceContext = _serviceContext, connStr](auto args) {
- if (ErrorCodes::isCancelationError(args.status.code())) {
+ auto setName = connStr.getSetName();
+ bool updateInProgress = false;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (!_hasUpdateState(lock, setName)) {
+ _updateStates.emplace(setName, std::make_shared<ReplSetConfigUpdateState>());
+ }
+ auto updateState = _updateStates.at(setName);
+ updateState->nextUpdateToSend = connStr;
+ updateInProgress = updateState->updateInProgress;
+ }
+
+ if (!updateInProgress) {
+ _scheduleUpdateConfigServer(setName);
+ }
+ }
+
+ void onPossibleSet(const State& state) noexcept final {
+ try {
+ Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr);
+ } catch (const DBException& ex) {
+ LOGV2_DEBUG(22849,
+ 2,
+ "Unable to update sharding state with possible replica set due to {error}",
+ "Unable to update sharding state with possible replica set",
+ "error"_attr = ex);
+ }
+ }
+
+ void onDroppedSet(const Key& key) noexcept final {}
+
+private:
+ // Schedules updates for replica set 'setName' on the config server. Loosly preserves ordering
+ // of update execution. Newer updates will not be overwritten by older updates in config.shards.
+ void _scheduleUpdateConfigServer(std::string setName) {
+ ConnectionString update;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (!_hasUpdateState(lock, setName)) {
return;
}
- invariant(args.status);
-
- try {
- LOGV2(22846,
- "Updating sharding state with confirmed replica set",
- "connectionString"_attr = connStr);
- if (MONGO_unlikely(failReplicaSetChangeConfigServerUpdateHook.shouldFail())) {
- return;
- }
- ShardRegistry::updateReplicaSetOnConfigServer(serviceContext, connStr);
- } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
- LOGV2(22847,
- "Unable to update sharding state with confirmed replica set",
- "error"_attr = e);
+ auto updateState = _updateStates.at(setName);
+ if (updateState->updateInProgress) {
+ return;
}
- };
+ updateState->updateInProgress = true;
+ update = updateState->nextUpdateToSend.get();
+ updateState->nextUpdateToSend = boost::none;
+ }
auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
- auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus();
+ auto schedStatus =
+ executor
+ ->scheduleWork([self = shared_from_this(), setName, update](auto args) {
+ self->_updateConfigServer(args.status, setName, update);
+ })
+ .getStatus();
if (ErrorCodes::isCancelationError(schedStatus.code())) {
LOGV2_DEBUG(22848,
2,
@@ -526,22 +562,69 @@ public:
uassertStatusOK(schedStatus);
}
- void onPossibleSet(const State& state) noexcept final {
+ void _updateConfigServer(Status status, std::string setName, ConnectionString update) {
+ if (ErrorCodes::isCancelationError(status.code())) {
+ stdx::lock_guard lock(_mutex);
+ _updateStates.erase(setName);
+ return;
+ }
+
+ if (MONGO_unlikely(failReplicaSetChangeConfigServerUpdateHook.shouldFail())) {
+ _endUpdateConfigServer(setName, update);
+ return;
+ }
+
try {
- Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr);
- } catch (const DBException& ex) {
- LOGV2_DEBUG(22849,
- 2,
- "Unable to update sharding state with possible replica set due to {error}",
- "Unable to update sharding state with possible replica set",
- "error"_attr = ex);
+ LOGV2(22846,
+ "Updating sharding state with confirmed replica set",
+ "connectionString"_attr = update);
+ ShardRegistry::updateReplicaSetOnConfigServer(_serviceContext, update);
+ } catch (const ExceptionForCat<ErrorCategory::ShutdownError>& e) {
+ LOGV2(22847,
+ "Unable to update sharding state with confirmed replica set",
+ "error"_attr = e);
+ } catch (...) {
+ _endUpdateConfigServer(setName, update);
+ throw;
}
+ _endUpdateConfigServer(setName, update);
}
- void onDroppedSet(const Key& key) noexcept final {}
+ void _endUpdateConfigServer(std::string setName, ConnectionString update) {
+ bool moreUpdates = false;
+ {
+ stdx::lock_guard lock(_mutex);
+ invariant(_hasUpdateState(lock, setName));
+ auto updateState = _updateStates.at(setName);
+ updateState->updateInProgress = false;
+ moreUpdates = (updateState->nextUpdateToSend != boost::none);
+ if (!moreUpdates) {
+ _updateStates.erase(setName);
+ }
+ }
+ if (moreUpdates) {
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ executor->schedule([self = shared_from_this(), setName](auto args) {
+ self->_scheduleUpdateConfigServer(setName);
+ });
+ }
+ }
+
+ // Returns true if a ReplSetConfigUpdateState exists for replica set setName.
+ bool _hasUpdateState(WithLock, std::string setName) {
+ return (_updateStates.find(setName) != _updateStates.end());
+ }
-private:
ServiceContext* _serviceContext;
+
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ShardingReplicaSetChangeListenerMongod::mutex");
+
+ struct ReplSetConfigUpdateState {
+ // True when an update to the config.shards is in progress.
+ bool updateInProgress = false;
+ boost::optional<ConnectionString> nextUpdateToSend;
+ };
+ stdx::unordered_map<std::string, std::shared_ptr<ReplSetConfigUpdateState>> _updateStates;
};
ExitCode runMongosServer(ServiceContext* serviceContext) {