diff options
-rw-r--r-- | jstests/sharding/use_rsm_data_for_cs.js | 6 | ||||
-rw-r--r-- | src/mongo/client/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/replica_set_change_notifier.cpp | 150 | ||||
-rw-r--r-- | src/mongo/client/replica_set_change_notifier.h | 173 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.cpp | 137 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor.h | 33 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_internal.h | 22 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_manager.h | 6 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_scan_test.cpp | 360 | ||||
-rw-r--r-- | src/mongo/client/replica_set_monitor_test_fixture.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.cpp | 107 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_mongod.h | 11 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.cpp | 66 | ||||
-rw-r--r-- | src/mongo/s/client/shard_registry.h | 15 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 58 |
17 files changed, 956 insertions, 213 deletions
diff --git a/jstests/sharding/use_rsm_data_for_cs.js b/jstests/sharding/use_rsm_data_for_cs.js index 0b10a0542cd..c2fafec4889 100644 --- a/jstests/sharding/use_rsm_data_for_cs.js +++ b/jstests/sharding/use_rsm_data_for_cs.js @@ -13,9 +13,11 @@ assert.eq(db.foo.find({_id: 1}).next().x, 1); // prevent RSM on all nodes to update config shard - mongos.adminCommand({configureFailPoint: "failAsyncConfigChangeHook", mode: "alwaysOn"}); + mongos.adminCommand( + {configureFailPoint: "failReplicaSetChangeConfigServerUpdateHook", mode: "alwaysOn"}); rs.nodes.forEach(function(node) { - node.adminCommand({configureFailPoint: "failAsyncConfigChangeHook", mode: "alwaysOn"}); + node.adminCommand( + {configureFailPoint: "failUpdateShardIdentityConfigString", mode: "alwaysOn"}); }); // add a node to shard rs diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index ffba4798db9..d1502b181bc 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -186,6 +186,7 @@ clientDriverEnv.Library( 'dbclient_rs.cpp', 'global_conn_pool.cpp', env.Idlc('global_conn_pool.idl')[0], + 'replica_set_change_notifier.cpp', 'replica_set_monitor.cpp', 'replica_set_monitor_manager.cpp', ], diff --git a/src/mongo/client/replica_set_change_notifier.cpp b/src/mongo/client/replica_set_change_notifier.cpp new file mode 100644 index 00000000000..9e8526979a9 --- /dev/null +++ b/src/mongo/client/replica_set_change_notifier.cpp @@ -0,0 +1,150 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/client/replica_set_change_notifier.h" + +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/stacktrace.h" + +namespace mongo { + +void ReplicaSetChangeNotifier::_addListener(Listener* listener) { + stdx::lock_guard lk(_mutex); + + listener->init(this); + _listeners.push_back(listener); +} + +void ReplicaSetChangeNotifier::_removeListener(Listener* listener) { + stdx::lock_guard lk(_mutex); + + auto& listeners = _listeners; + listeners.erase(std::remove(listeners.begin(), listeners.end(), listener), listeners.end()); +} + +void ReplicaSetChangeNotifier::onFoundSet(const std::string& name) { + LOG(2) << "Signaling found set " << name; + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + _replicaSetStates.emplace(name, State{}); + + auto listeners = _listeners; + lk.unlock(); + + for (auto listener : listeners) { + listener->onFoundSet(name); + }; +} + +void ReplicaSetChangeNotifier::onPossibleSet(ConnectionString connectionString) { + LOG(2) << "Signaling possible set " << connectionString; + + const auto& name = connectionString.getSetName(); + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + auto state = [&] { + auto& state = _replicaSetStates[name]; + ++state.generation; + + state.connStr = connectionString; + state.primary = {}; + + return state; + }(); + + auto listeners = _listeners; + lk.unlock(); + + for (auto listener : listeners) { + listener->onPossibleSet(state); + }; +} + +void ReplicaSetChangeNotifier::onConfirmedSet(ConnectionString connectionString, + HostAndPort primary) { + LOG(2) << "Signaling confirmed set " << connectionString << " with primary " << primary; + + const auto& name = connectionString.getSetName(); + stdx::unique_lock<stdx::mutex> lk(_mutex); + + auto state = [&] { + auto& state = _replicaSetStates[name]; + ++state.generation; + + state.connStr = connectionString; + state.primary = primary; + + return state; + }(); + + auto listeners = _listeners; + lk.unlock(); + + for (auto listener : listeners) { + listener->onConfirmedSet(state); + }; +} + +void ReplicaSetChangeNotifier::onDroppedSet(const std::string& name) { + LOG(2) << "Signaling dropped set " << name; + + stdx::unique_lock<stdx::mutex> lk(_mutex); + + // If we never singaled the initial possible set, we should not on dropped set + auto it = _replicaSetStates.find(name); + if (it == _replicaSetStates.end()) { + return; + } + + _replicaSetStates.erase(it); + + auto listeners = _listeners; + lk.unlock(); + + for (auto listener : listeners) { + listener->onDroppedSet(name); + }; +} + +auto ReplicaSetChangeNotifier::Listener::getCurrentState(const Key& key) -> State { + invariant(_notifier); + + stdx::lock_guard lk(_notifier->_mutex); + + return _notifier->_replicaSetStates.at(key); +} + +} // namespace mongo diff --git a/src/mongo/client/replica_set_change_notifier.h b/src/mongo/client/replica_set_change_notifier.h new file mode 100644 index 00000000000..23427c9fd0c --- /dev/null +++ b/src/mongo/client/replica_set_change_notifier.h @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> +#include <type_traits> +#include <vector> + +#include "mongo/client/connection_string.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/util/functional.h" + +namespace mongo { + +/** + * A stateful notifier for events from a set of ReplicaSetMonitors + */ +class ReplicaSetChangeNotifier { +public: + using Key = std::string; + class Listener; + using ListenerHandle = std::unique_ptr<Listener, unique_function<void(Listener*)>>; + struct State; + +public: + ReplicaSetChangeNotifier() = default; + ReplicaSetChangeNotifier(const ReplicaSetChangeNotifier&) = delete; + ReplicaSetChangeNotifier(ReplicaSetChangeNotifier&&) = delete; + ReplicaSetChangeNotifier& operator=(const ReplicaSetChangeNotifier&) = delete; + ReplicaSetChangeNotifier& operator=(ReplicaSetChangeNotifier&&) = delete; + + /** + * Notify every listener that there is a new ReplicaSet and initialize the State + */ + void onFoundSet(const std::string& replicaSet); + + /** + * Notify every listener that a scan completed without finding a primary and update + */ + void onPossibleSet(ConnectionString connectionString); + + /** + * Notify every listener that a scan completed and found a new primary or config + */ + void onConfirmedSet(ConnectionString connectionString, HostAndPort primary); + + /** + * Notify every listener that a ReplicaSet is no longer in use and drop the State + */ + void onDroppedSet(const std::string& replicaSet); + + /** + * Create a listener of a given type and bind it to this notifier + */ + template <typename DerivedT, + typename... Args, + typename = std::enable_if_t<std::is_constructible_v<DerivedT, Args...>>> + auto makeListener(Args&&... args) { + auto deleter = [this](auto listener) { _removeListener(listener); }; + auto ptr = new DerivedT(std::forward<Args>(args)...); + + _addListener(ptr); + + return ListenerHandle(ptr, std::move(deleter)); + } + +private: + void _addListener(Listener* listener); + void _removeListener(Listener* listener); + + stdx::mutex _mutex; + std::vector<Listener*> _listeners; + stdx::unordered_map<Key, State> _replicaSetStates; +}; + +/** + * A listener for events from a set of ReplicaSetMonitors + * + * This class will normally be notified of events for every replica set in use in the system. + * The onSet functions are all called syncronously by the notifier, + * if your implementation would block or seriously delay execution, + * please schedule the majority of the work to complete asynchronously. + */ +class ReplicaSetChangeNotifier::Listener { +public: + using Notifier = ReplicaSetChangeNotifier; + using Key = typename Notifier::Key; + using State = typename Notifier::State; + +public: + Listener(const Listener&) = delete; + Listener(Listener&&) = delete; + Listener& operator=(const Listener&) = delete; + Listener& operator=(Listener&&) = delete; + + Listener() = default; + virtual ~Listener() = default; + + /** + * Initialize this listener with a notifier + */ + void init(Notifier* notifier) { + _notifier = notifier; + } + + /** + * React to a new ReplicaSet that will soon be scanned + */ + virtual void onFoundSet(const Key& key) = 0; + + /** + * React to a finished scan that found no primary + */ + virtual void onPossibleSet(const State& data) = 0; + + /** + * React to a finished scan that found a primary + */ + virtual void onConfirmedSet(const State& data) = 0; + + /** + * React to a ReplicaSet being dropped from use + */ + virtual void onDroppedSet(const Key& key) = 0; + + /** + * Get the State as of the last signal function invoked on the Notifier + */ + State getCurrentState(const Key& key); + +private: + Notifier* _notifier = nullptr; +}; + +using ReplicaSetChangeListenerHandle = ReplicaSetChangeNotifier::ListenerHandle; + +struct ReplicaSetChangeNotifier::State { + ConnectionString connStr; + HostAndPort primary; + + int64_t generation = 0; +}; + +} // namespace mongo diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp index 3e0501af9a8..a1185cb519e 100644 --- a/src/mongo/client/replica_set_monitor.cpp +++ b/src/mongo/client/replica_set_monitor.cpp @@ -47,7 +47,6 @@ #include "mongo/db/server_options.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" #include "mongo/util/background.h" #include "mongo/util/debug_util.h" #include "mongo/util/exit.h" @@ -64,9 +63,6 @@ using std::set; using std::string; using std::vector; -// Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes. -MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook); - // Failpoint for changing the default refresh period MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod); @@ -95,10 +91,6 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn const Milliseconds kExpeditedRefreshPeriod(500); AtomicWord<bool> areRefreshRetriesDisabledForTest{false}; // Only true in tests. -// TODO: Move to ReplicaSetMonitorManager -ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook; -ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook; - // // Helpers for stl algorithms // @@ -190,7 +182,8 @@ Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() { ReplicaSetMonitor::ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {} ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri) - : ReplicaSetMonitor(std::make_shared<SetState>(uri, globalRSMonitorManager.getExecutor())) {} + : ReplicaSetMonitor(std::make_shared<SetState>( + uri, &globalRSMonitorManager.getNotifier(), globalRSMonitorManager.getExecutor())) {} void ReplicaSetMonitor::init() { if (areRefreshRetriesDisabledForTest.load()) { @@ -198,25 +191,15 @@ void ReplicaSetMonitor::init() { warning() << "*** Not starting background refresh because refresh retries are disabled."; return; } + + _state->init(); + stdx::lock_guard<stdx::mutex> lk(_state->mutex); _scheduleRefresh(_state->now(), lk); } ReplicaSetMonitor::~ReplicaSetMonitor() { - // need this lock because otherwise can get race with _scheduleRefresh() - stdx::lock_guard<stdx::mutex> lk(_state->mutex); - if (!_refresherHandle || !_state->executor) { - return; - } - - _state->currentScan.reset(); - _state->executor->cancel(_refresherHandle); - // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang - // Its ok not to call it because the d-tor is called only when the last owning pointer goes out - // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a - // task to execute eliminating the need to call method "wait". - // - _refresherHandle = {}; + _state->drop(); } void ReplicaSetMonitor::_scheduleRefresh(Date_t when, WithLock) { @@ -367,7 +350,9 @@ std::string ReplicaSetMonitor::getName() const { std::string ReplicaSetMonitor::getServerAddress() const { stdx::lock_guard<stdx::mutex> lk(_state->mutex); - return _state->getConfirmedServerAddress(); + // We return our setUri until first confirmation + return _state->seedConnStr.isValid() ? _state->seedConnStr.toString() + : _state->setUri.connectionString().toString(); } const MongoURI& ReplicaSetMonitor::getOriginalUri() const { @@ -402,14 +387,8 @@ void ReplicaSetMonitor::remove(const string& name) { globalConnPool.removeHost(name); } -void ReplicaSetMonitor::setAsynchronousConfigChangeHook(ConfigChangeHook hook) { - invariant(!asyncConfigChangeHook); - asyncConfigChangeHook = hook; -} - -void ReplicaSetMonitor::setSynchronousConfigChangeHook(ConfigChangeHook hook) { - invariant(!syncConfigChangeHook); - syncConfigChangeHook = hook; +ReplicaSetChangeNotifier& ReplicaSetMonitor::getNotifier() { + return globalRSMonitorManager.getNotifier(); } // TODO move to correct order with non-statics before pushing @@ -452,8 +431,6 @@ void ReplicaSetMonitor::shutdown() { void ReplicaSetMonitor::cleanup() { globalRSMonitorManager.removeAllMonitors(); - asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook(); - syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook(); } void ReplicaSetMonitor::disableRefreshRetries_forTest() { @@ -605,20 +582,20 @@ Refresher::NextStep Refresher::getNextStep() { // NOTE: we don't modify seedNodes or notify about set membership change in this // case since it hasn't been confirmed by a master. - const string oldAddr = _set->getUnconfirmedServerAddress(); for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); it != _scan->unconfirmedReplies.end(); ++it) { _set->findOrCreateNode(it->first)->update(it->second); } - const string newAddr = _set->getUnconfirmedServerAddress(); - if (oldAddr != newAddr && syncConfigChangeHook) { - // Run the syncConfigChangeHook because the ShardRegistry needs to know about any - // node we might talk to. Don't run the asyncConfigChangeHook because we don't - // want to update the seed list stored on the config servers with unconfirmed hosts. - syncConfigChangeHook(_set->name, _set->getUnconfirmedServerAddress()); + auto connStr = _set->possibleConnectionString(); + if (connStr != _set->workingConnStr) { + _set->workingConnStr = std::move(connStr); + _set->notifier->onPossibleSet(_set->workingConnStr); } + + // If at some point we care about lacking a primary, on it here + _set->lastSeenMaster = {}; } if (_scan->foundAnyUpNodes) { @@ -833,26 +810,23 @@ Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMa } } - if (reply.normalHosts != _set->seedNodes) { - const string oldAddr = _set->getConfirmedServerAddress(); + bool changedHosts = reply.normalHosts != _set->seedNodes; + bool changedPrimary = reply.host != _set->lastSeenMaster; + if (changedHosts || changedPrimary) { + ++_set->seedGen; _set->seedNodes = reply.normalHosts; + _set->seedConnStr = _set->confirmedConnectionString(); // LogLevel can be pretty low, since replica set reconfiguration should be pretty rare // and we want to record our changes - log() << "changing hosts to " << _set->getConfirmedServerAddress() << " from " << oldAddr; - - if (syncConfigChangeHook) { - syncConfigChangeHook(_set->name, _set->getConfirmedServerAddress()); - } + log() << "Confirmed replica set for " << _set->name << " is " << _set->seedConnStr; - if (asyncConfigChangeHook && !MONGO_FAIL_POINT(failAsyncConfigChangeHook)) { - // call from a separate thread to avoid blocking and holding lock while potentially - // going over the network - stdx::thread bg(asyncConfigChangeHook, _set->name, _set->getConfirmedServerAddress()); - bg.detach(); - } + _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host); } + // Update our working string + _set->workingConnStr = _set->seedConnStr; + // Update other nodes's information based on replies we've already seen for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin(); it != _scan->unconfirmedReplies.end(); @@ -870,7 +844,6 @@ Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMa void Refresher::receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply) { invariant(!reply.isMaster); - // This function doesn't alter _set at all. It only modifies the work queue in _scan. // Add everyone this host claims is in the set to possibleNodes. _scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end()); @@ -1013,9 +986,12 @@ void Node::update(const IsMasterReply& reply) { lastWriteDateUpdateTime = Date_t::now(); } -SetState::SetState(const MongoURI& uri, executor::TaskExecutor* executor) +SetState::SetState(const MongoURI& uri, + ReplicaSetChangeNotifier* notifier, + executor::TaskExecutor* executor) : setUri(std::move(uri)), name(setUri.getSetName()), + notifier(notifier), executor(executor), seedNodes(setUri.getServers().begin(), setUri.getServers().end()), latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)), @@ -1223,33 +1199,21 @@ void SetState::updateNodeIfInNodes(const IsMasterReply& reply) { node->update(reply); } -std::string SetState::getConfirmedServerAddress() const { - StringBuilder ss; - if (!name.empty()) - ss << name << "/"; - - for (std::set<HostAndPort>::const_iterator it = seedNodes.begin(); it != seedNodes.end(); - ++it) { - if (it != seedNodes.begin()) - ss << ","; - it->append(ss); - } +ConnectionString SetState::confirmedConnectionString() const { + std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes)); - return ss.str(); + return ConnectionString::forReplicaSet(name, std::move(hosts)); } -std::string SetState::getUnconfirmedServerAddress() const { - StringBuilder ss; - if (!name.empty()) - ss << name << "/"; +ConnectionString SetState::possibleConnectionString() const { + std::vector<HostAndPort> hosts; + hosts.reserve(nodes.size()); - for (std::vector<Node>::const_iterator it = nodes.begin(); it != nodes.end(); ++it) { - if (it != nodes.begin()) - ss << ","; - it->host.append(ss); + for (auto& node : nodes) { + hosts.push_back(node.host); } - return ss.str(); + return ConnectionString::forReplicaSet(name, std::move(hosts)); } void SetState::notify(bool finishedScan) { @@ -1288,6 +1252,23 @@ Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criter << name); } +void SetState::init() { + notifier->onFoundSet(name); +} + +void SetState::drop() { + { + stdx::lock_guard<stdx::mutex> lk(mutex); + currentScan.reset(); + notify(/*finishedScan*/ true); + } + + // No point in notifying if we never started + if (workingConnStr.isValid()) { + notifier->onDroppedSet(name); + } +} + void SetState::checkInvariants() const { bool foundMaster = false; for (size_t i = 0; i < nodes.size(); i++) { @@ -1303,7 +1284,7 @@ void SetState::checkInvariants() const { foundMaster = true; // if we have a master it should be the same as lastSeenMaster - invariant(nodes[i].host == lastSeenMaster); + invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster); } // should never end up with negative latencies diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h index 6f0a9d6ddb2..a306b1b9547 100644 --- a/src/mongo/client/replica_set_monitor.h +++ b/src/mongo/client/replica_set_monitor.h @@ -37,6 +37,7 @@ #include "mongo/base/string_data.h" #include "mongo/client/mongo_uri.h" +#include "mongo/client/replica_set_change_notifier.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/functional.h" @@ -62,16 +63,9 @@ class ReplicaSetMonitor : public std::enable_shared_from_this<ReplicaSetMonitor> public: class Refresher; - typedef stdx::function<void(const std::string& setName, const std::string& newConnectionString)> - ConfigChangeHook; - /** - * Initializes local state. - * - * seeds must not be empty. + * Initializes local state from a MongoURI. */ - ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds); - ReplicaSetMonitor(const MongoURI& uri); /** @@ -198,27 +192,9 @@ public: static void remove(const std::string& name); /** - * Sets the hook to be called whenever the config of any replica set changes. - * Currently only 1 globally, so this asserts if one already exists. - * - * The hook will be called from a fresh thread. It is responsible for initializing any - * thread-local state and ensuring that no exceptions escape. - * - * The hook must not be changed while the program has multiple threads. - */ - static void setAsynchronousConfigChangeHook(ConfigChangeHook hook); - - /** - * Sets the hook to be called whenever the config of any replica set changes. - * Currently only 1 globally, so this asserts if one already exists. - * - * The hook will be called inline while refreshing the ReplicaSetMonitor's view of the set - * membership. It is important that the hook not block for long as it will be running under - * the ReplicaSetMonitor's mutex. - * - * The hook must not be changed while the program has multiple threads. + * Returns the change notifier for the underlying ReplicaMonitorManager */ - static void setSynchronousConfigChangeHook(ConfigChangeHook hook); + static ReplicaSetChangeNotifier& getNotifier(); /** * Permanently stops all monitoring on replica sets and clears all cached information @@ -384,7 +360,6 @@ private: /** * Adjusts the _scan work queue based on information from this host. * This should only be called with replies from non-masters. - * Does not update _set at all. */ void receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply); diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h index c7f72ae3671..04479b6dab9 100644 --- a/src/mongo/client/replica_set_monitor_internal.h +++ b/src/mongo/client/replica_set_monitor_internal.h @@ -145,7 +145,7 @@ public: Promise<HostAndPort> promise; }; - SetState(const MongoURI& uri, executor::TaskExecutor*); + SetState(const MongoURI& uri, ReplicaSetChangeNotifier*, executor::TaskExecutor*); bool isUsable() const; @@ -173,13 +173,13 @@ public: * Returns the connection string of the nodes that are known the be in the set because we've * seen them in the isMaster reply of a PRIMARY. */ - std::string getConfirmedServerAddress() const; + ConnectionString confirmedConnectionString() const; /** * Returns the connection string of the nodes that are believed to be in the set because we've * seen them in the isMaster reply of non-PRIMARY nodes in our seed list. */ - std::string getUnconfirmedServerAddress() const; + ConnectionString possibleConnectionString() const; /** * Call this to notify waiters after a scan processes a valid reply or finishes. @@ -193,6 +193,16 @@ public: Status makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const; /** + * Notifies all listeners that the ReplicaSet is in use. + */ + void init(); + + /** + * Resets the current scan and notifies all listeners that the ReplicaSet isn't in use. + */ + void drop(); + + /** * Before unlocking, do DEV checkInvariants(); */ void checkInvariants() const; @@ -200,17 +210,21 @@ public: const MongoURI setUri; // URI passed to ctor -- THIS IS NOT UPDATED BY SCANS const std::string name; + ReplicaSetChangeNotifier* const notifier; executor::TaskExecutor* const executor; stdx::mutex mutex; // You must hold this to access any member below. // For starting scans std::set<HostAndPort> seedNodes; // updated whenever a master reports set membership changes + ConnectionString seedConnStr; // The connection string from the last time we had valid seeds + int64_t seedGen = 0; bool isMocked = false; // True if this set is using nodes from MockReplicaSet // For tracking scans - HostAndPort lastSeenMaster; // Empty if we have never seen a master + // lastSeenMaster is empty if we have never seen a master or the last scan didn't have one + HostAndPort lastSeenMaster; int consecutiveFailedScans = 0; Nodes nodes; // maintained sorted and unique by host ConnectionString workingConnStr; // The connection string from our last scan diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index c2b05cc61ff..f8f6078b23a 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -224,4 +224,8 @@ TaskExecutor* ReplicaSetMonitorManager::getExecutor() { return _taskExecutor.get(); } +ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { + return _notifier; +} + } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h index ed92bc3b28c..1adf42fb16d 100644 --- a/src/mongo/client/replica_set_monitor_manager.h +++ b/src/mongo/client/replica_set_monitor_manager.h @@ -32,6 +32,7 @@ #include <string> #include <vector> +#include "mongo/client/replica_set_change_notifier.h" #include "mongo/executor/task_executor.h" #include "mongo/stdx/mutex.h" #include "mongo/util/string_map.h" @@ -94,6 +95,8 @@ public: */ executor::TaskExecutor* getExecutor(); + ReplicaSetChangeNotifier& getNotifier(); + private: using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>; @@ -103,6 +106,9 @@ private: // Executor for monitoring replica sets. std::unique_ptr<executor::TaskExecutor> _taskExecutor; + // Widget to notify listeners when a RSM notices a change + ReplicaSetChangeNotifier _notifier; + // Needs to be after `_taskExecutor`, so that it will be destroyed before the `_taskExecutor`. ReplicaSetMonitorsMap _monitors; diff --git a/src/mongo/client/replica_set_monitor_scan_test.cpp b/src/mongo/client/replica_set_monitor_scan_test.cpp index 4b4f3f4b12f..2f2cebf556a 100644 --- a/src/mongo/client/replica_set_monitor_scan_test.cpp +++ b/src/mongo/client/replica_set_monitor_scan_test.cpp @@ -1613,5 +1613,365 @@ TEST_F(MinOpTimeTest, MinOpTimeIgnored) { ASSERT_EQUALS(notStale.host(), "c"); } +// -- ReplicaSetChangeNotifier/Listener tests -- + +class Listener : public ReplicaSetChangeNotifier::Listener { +public: + void logEvent(StringData name, const Key& key) { + log() << name << ": " << key; + } + void logEvent(StringData name, const State& state) { + log() << name << ": " + << "(" << state.generation << ") " << state.connStr << " | " << state.primary; + } + + void onFoundSet(const Key& key) override { + lastFoundSetId = ++eventId; + logEvent("FoundSet", key); + } + void onPossibleSet(const State& state) override { + lastPossibleSetId = ++eventId; + logEvent("PossibleSet", state); + lastState = state; + } + void onConfirmedSet(const State& state) override { + lastConfirmedSetId = ++eventId; + logEvent("ConfirmedSet", state); + lastState = state; + } + void onDroppedSet(const Key& key) override { + lastDroppedSetId = ++eventId; + logEvent("DroppedSet", key); + } + + int64_t eventId = -1; + int64_t lastFoundSetId = -1; + int64_t lastPossibleSetId = -1; + int64_t lastConfirmedSetId = -1; + int64_t lastDroppedSetId = -1; + State lastState; +}; + +class ChangeNotifierTest : public ReplicaSetMonitorTest { +public: + ChangeNotifierTest() = default; + virtual ~ChangeNotifierTest() = default; + + enum class NodeState { + kUnknown = 0, + kPrimary, + kSecondary, + kStandalone, + }; + + auto& listener() const { + return *static_cast<Listener*>(_listener.get()); + } + + void updateSet(std::map<HostAndPort, NodeState> replicaSet) { + auto refresher = Refresher(_state); + std::set<HostAndPort> seen; + HostAndPort primary; + std::set<HostAndPort> members; + + BSONArrayBuilder arrayBuilder; + for (const auto & [ host, nodeState ] : replicaSet) { + if (nodeState == NodeState::kStandalone) { + continue; + } + + if (nodeState == NodeState::kPrimary) { + primary = host; + } + + members.insert(host); + + arrayBuilder.append(StringData(host.host())); + } + auto bsonHosts = arrayBuilder.arr(); + + auto markIsMaster = [&](auto host, bool isMaster) { + refresher.receivedIsMaster( + host, + -1, + BSON("setName" << kSetName << "ismaster" << isMaster << "secondary" << !isMaster + << "hosts" + << bsonHosts + << "ok" + << true)); + + }; + + auto markFailed = [&](auto host) { + refresher.failedHost(host, {ErrorCodes::InternalError, "Test error"}); + }; + + auto gen = listener().lastState.generation; + + NextStep ns = refresher.getNextStep(); + for (; ns.step != NextStep::DONE; ns = refresher.getNextStep()) { + ASSERT_EQUALS(ns.step, NextStep::CONTACT_HOST); + ASSERT(replicaSet.count(ns.host)); + ASSERT(!seen.count(ns.host)); + seen.insert(ns.host); + + // mock a reply + switch (replicaSet[ns.host]) { + case NodeState::kStandalone: { + markFailed(ns.host); + } break; + case NodeState::kPrimary: { + markIsMaster(ns.host, true); + } break; + case NodeState::kSecondary: { + markIsMaster(ns.host, false); + } break; + case NodeState::kUnknown: + MONGO_UNREACHABLE; + }; + } + + // Verify that the listener received the right data + if (gen != listener().lastState.generation) { + // Our State is what the notifier thinks it should be + ASSERT_EQUALS(listener().lastState.connStr, listener().getCurrentState().connStr); + ASSERT_EQUALS(listener().lastState.primary, listener().getCurrentState().primary); + ASSERT_EQUALS(listener().lastState.generation, listener().getCurrentState().generation); + + // Our State is what we'd expect + ASSERT_EQUALS(listener().lastState.connStr.getSetName(), kSetName); + ASSERT_EQUALS(listener().lastState.connStr.getServers().size(), members.size()); + ASSERT_EQUALS(listener().lastState.primary, primary); + } + + ASSERT_EQUALS(ns.step, NextStep::DONE); + ASSERT(ns.host.empty()); + } + +protected: + decltype(_notifier)::ListenerHandle _listener = _notifier.makeListener<Listener>(); + + std::shared_ptr<SetState> _state = makeState(basicUri); +}; + +TEST_F(ChangeNotifierTest, NotifyNominal) { + auto currentId = -1; + + // State exists. Signal: null + ASSERT_EQ(listener().lastFoundSetId, currentId); + + // Initializing the state. Signal: FoundSet + _state->init(); + ASSERT_EQ(listener().lastFoundSetId, ++currentId); + + // 'a' claims to be primary. Signal: Confirmed + updateSet({ + { + HostAndPort("a"), NodeState::kPrimary, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().lastConfirmedSetId, ++currentId); + + // Getting another scan with the same details. Signal: null + updateSet({ + { + HostAndPort("a"), NodeState::kPrimary, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().eventId, currentId); + + // Dropped. Signal: Dropped + _state->drop(); + ASSERT_EQ(listener().lastDroppedSetId, ++currentId); +} + +TEST_F(ChangeNotifierTest, NotifyElections) { + auto currentId = -1; + + // State exists. Signal: null + ASSERT_EQ(listener().lastFoundSetId, currentId); + + // Initializing the state. Signal: FoundSet + _state->init(); + ASSERT_EQ(listener().lastFoundSetId, ++currentId); + + // 'a' claims to be primary. Signal: ConfirmedSet + updateSet({ + { + HostAndPort("a"), NodeState::kPrimary, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().lastConfirmedSetId, ++currentId); + + // 'b' claims to be primary. Signal: ConfirmedSet + updateSet({ + { + HostAndPort("a"), NodeState::kSecondary, + }, + { + HostAndPort("b"), NodeState::kPrimary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().lastConfirmedSetId, ++currentId); + + // All hosts tell us that they are not primary. Signal: null + updateSet({ + { + HostAndPort("a"), NodeState::kSecondary, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().eventId, currentId); + + // 'a' claims to be primary again. Signal: ConfirmedSet + updateSet({ + { + HostAndPort("a"), NodeState::kPrimary, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().lastConfirmedSetId, ++currentId); + + // Dropped. Signal: Dropped + _state->drop(); + ASSERT_EQ(listener().lastDroppedSetId, ++currentId); +} + +TEST_F(ChangeNotifierTest, NotifyReconfig) { + auto currentId = -1; + + // State exists. Signal: null + ASSERT_EQ(listener().lastFoundSetId, currentId); + + // Initializing the state. Signal: FoundSet + _state->init(); + ASSERT_EQ(listener().lastFoundSetId, ++currentId); + + // Update the set with a full scan showing no primary. Signal: PossibleSet + updateSet({ + { + HostAndPort("a"), NodeState::kSecondary, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().eventId, ++currentId); + + // Mark 'a' as removed. Signal: null + updateSet({ + { + HostAndPort("a"), NodeState::kStandalone, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().eventId, currentId); + + // Discover 'd' as secondary. Signal: PossibleSet + updateSet({ + { + HostAndPort("a"), NodeState::kSecondary, + }, + { + HostAndPort("b"), NodeState::kSecondary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + { + HostAndPort("d"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().lastPossibleSetId, ++currentId); + + // Mark 'b' as primary, no 'd'. Signal: ConfirmedSet + updateSet({ + { + HostAndPort("a"), NodeState::kSecondary, + }, + { + HostAndPort("b"), NodeState::kPrimary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + { + HostAndPort("d"), NodeState::kStandalone, + }, + }); + ASSERT_EQ(listener().lastConfirmedSetId, ++currentId); + + // Mark 'a' as removed. Signal: ConfirmedSet + updateSet({ + { + HostAndPort("a"), NodeState::kStandalone, + }, + { + HostAndPort("b"), NodeState::kPrimary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().lastConfirmedSetId, ++currentId); + + // Mark 'a' as secondary again. Signal: ConfirmedSet + updateSet({ + { + HostAndPort("b"), NodeState::kPrimary, + }, + { + HostAndPort("c"), NodeState::kSecondary, + }, + { + HostAndPort("a"), NodeState::kSecondary, + }, + }); + ASSERT_EQ(listener().lastConfirmedSetId, ++currentId); + + // Dropped. Signal: Dropped + _state->drop(); + ASSERT_EQ(listener().lastDroppedSetId, ++currentId); +} + } // namespace } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_test_fixture.h b/src/mongo/client/replica_set_monitor_test_fixture.h index cd37e5ecdab..7e842008453 100644 --- a/src/mongo/client/replica_set_monitor_test_fixture.h +++ b/src/mongo/client/replica_set_monitor_test_fixture.h @@ -33,6 +33,7 @@ #include <type_traits> #include <vector> +#include "mongo/client/replica_set_change_notifier.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/client/replica_set_monitor_internal.h" #include "mongo/unittest/unittest.h" @@ -61,11 +62,15 @@ public: template <typename... Args> using StateIsConstructible = - std::enable_if_t<std::is_constructible_v<SetState, Args..., executor::TaskExecutor* const>>; + std::enable_if_t<std::is_constructible_v<SetState, + Args..., + ReplicaSetChangeNotifier* const, + executor::TaskExecutor* const>>; template <typename... Args, typename = StateIsConstructible<Args...>> auto makeState(Args&&... args) { - return std::make_shared<SetState>(std::forward<Args>(args)..., nullptr); + return std::make_shared<ReplicaSetMonitor::SetState>( + std::forward<Args>(args)..., &_notifier, nullptr); } void setUp() override {} @@ -74,6 +79,9 @@ public: static const std::vector<HostAndPort> basicSeeds; static const std::set<HostAndPort> basicSeedsSet; static const MongoURI basicUri; + +protected: + ReplicaSetChangeNotifier _notifier; }; } // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 9d8162067fc..c34fe99d4f7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -779,12 +779,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook const auto configsvrConnStr = Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString(); - status = ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString( - opCtx, configsvrConnStr); - if (!status.isOK()) { - warning() << "error encountered while trying to update config connection string to " - << configsvrConnStr << causedBy(status); - } + ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString(opCtx, + configsvrConnStr); CatalogCacheLoader::get(_service).onStepUp(); ChunkSplitter::get(_service).onStepUp(); diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp index 1902d884cee..7bee0b6a789 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.cpp +++ b/src/mongo/db/s/sharding_initialization_mongod.cpp @@ -67,6 +67,10 @@ #include "mongo/util/log.h" namespace mongo { + +// Failpoint for disabling updateShardIdentityConfigString calls on signaled RS nodes. +MONGO_FAIL_POINT_DEFINE(failUpdateShardIdentityConfigString); + namespace { const auto getInstance = ServiceContext::declareDecoration<ShardingInitializationMongoD>(); @@ -83,41 +87,73 @@ auto makeEgressHooksList(ServiceContext* service) { /** * Updates the config server field of the shardIdentity document with the given connection string if * setName is equal to the config server replica set name. - * - * NOTE: This is intended to be used on a new thread that hasn't called Client::initThread. - * - * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes - * to replica set membership. */ -void updateShardIdentityConfigStringCB(const std::string& setName, - const std::string& newConnectionString) { - auto configsvrConnStr = - Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString(); - if (configsvrConnStr.getSetName() != setName) { - // Ignore all change notification for other sets that are not the config server. - return; - } +class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener { +public: + ShardingReplicaSetChangeListener(ServiceContext* serviceContext) + : _serviceContext(serviceContext) {} + ~ShardingReplicaSetChangeListener() final = default; + + void onFoundSet(const Key&) final {} + + // Update the shard identy config string + void onConfirmedSet(const State& state) final { + auto connStr = state.connStr; + + auto fun = [ serviceContext = _serviceContext, connStr ](auto args) { + if (ErrorCodes::isCancelationError(args.status.code())) { + return; + } + uassertStatusOK(args.status); + + LOG(0) << "Updating config server with confirmed set " << connStr; + Grid::get(serviceContext)->shardRegistry()->updateReplSetHosts(connStr); + + if (MONGO_FAIL_POINT(failUpdateShardIdentityConfigString)) { + return; + } + + auto configsvrConnStr = + Grid::get(serviceContext)->shardRegistry()->getConfigServerConnectionString(); - Client::initThread("updateShardIdentityConfigConnString"); - auto uniqOpCtx = Client::getCurrent()->makeOperationContext(); + // Only proceed if the notification is for the configsvr + if (configsvrConnStr.getSetName() != connStr.getSetName()) { + return; + } + + ThreadClient tc("updateShardIdentityConfigString", serviceContext); + auto opCtx = tc->makeOperationContext(); + + ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), connStr); + }; - auto status = ShardingInitializationMongoD::updateShardIdentityConfigString( - uniqOpCtx.get(), uassertStatusOK(ConnectionString::parse(newConnectionString))); - if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) { - warning() << "Error encountered while trying to update config connection string to " - << newConnectionString << causedBy(redact(status)); + auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus(); + if (ErrorCodes::isCancelationError(schedStatus.code())) { + LOG(2) << "Unable to schedule confirmed set update due to " << schedStatus; + return; + } + uassertStatusOK(schedStatus); } -} + void onPossibleSet(const State& state) final { + Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr); + } + void onDroppedSet(const Key&) final {} -void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, - const ShardIdentity& shardIdentity, - StringData distLockProcessId) { +private: + ServiceContext* _serviceContext; +}; + +} // namespace + +void ShardingInitializationMongoD::initializeShardingEnvironmentOnShardServer( + OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) { initializeGlobalShardingStateForMongoD( opCtx, shardIdentity.getConfigsvrConnectionString(), distLockProcessId); - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); - ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB); + _replicaSetChangeListener = + ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>( + opCtx->getServiceContext()); // Determine primary/secondary/standalone state in order to properly initialize sharding // components. @@ -141,10 +177,10 @@ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, << (isStandaloneOrPrimary ? "primary" : "secondary") << " node."; } -} // namespace - ShardingInitializationMongoD::ShardingInitializationMongoD() - : _initFunc(initializeShardingEnvironmentOnShardServer) {} + : _initFunc([this](auto... args) { + this->initializeShardingEnvironmentOnShardServer(std::forward<decltype(args)>(args)...); + }) {} ShardingInitializationMongoD::~ShardingInitializationMongoD() = default; @@ -166,6 +202,7 @@ void ShardingInitializationMongoD::shutDown(OperationContext* opCtx) { grid->getExecutorPool()->shutdownAndJoin(); grid->catalogClient()->shutDown(opCtx); grid->shardRegistry()->shutdown(); + _replicaSetChangeListener.reset(); } bool ShardingInitializationMongoD::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) { @@ -306,7 +343,7 @@ void ShardingInitializationMongoD::initializeFromShardIdentity( } } -Status ShardingInitializationMongoD::updateShardIdentityConfigString( +void ShardingInitializationMongoD::updateShardIdentityConfigString( OperationContext* opCtx, const ConnectionString& newConnectionString) { BSONObj updateObj( ShardIdentityType::createConfigServerUpdateObject(newConnectionString.toString())); @@ -328,10 +365,12 @@ Status ShardingInitializationMongoD::updateShardIdentityConfigString( << newConnectionString; } } catch (const DBException& exception) { - return exception.toStatus(); + auto status = exception.toStatus(); + if (!ErrorCodes::isNotMasterError(status.code())) { + warning() << "Error encountered while trying to update config connection string to " + << newConnectionString.toString() << causedBy(redact(status)); + } } - - return Status::OK(); } void initializeGlobalShardingStateForMongoD(OperationContext* opCtx, diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h index e26302b4bb2..a205d68d1b2 100644 --- a/src/mongo/db/s/sharding_initialization_mongod.h +++ b/src/mongo/db/s/sharding_initialization_mongod.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/base/string_data.h" +#include "mongo/client/replica_set_change_notifier.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/stdx/functional.h" @@ -60,6 +61,10 @@ public: static ShardingInitializationMongoD* get(OperationContext* opCtx); static ShardingInitializationMongoD* get(ServiceContext* service); + void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx, + const ShardIdentity& shardIdentity, + StringData distLockProcessId); + /** * If started with --shardsvr, initializes sharding awareness from the shardIdentity document on * disk, if there is one. @@ -94,8 +99,8 @@ public: * Updates the config server field of the shardIdentity document with the given connection * string. */ - static Status updateShardIdentityConfigString(OperationContext* opCtx, - const ConnectionString& newConnectionString); + static void updateShardIdentityConfigString(OperationContext* opCtx, + const ConnectionString& newConnectionString); /** * For testing only. Mock the initialization method used by initializeFromConfigConnString and @@ -112,6 +117,8 @@ private: // Function for initializing the sharding environment components (i.e. everything on the Grid) ShardingEnvironmentInitFunc _initFunc; + + ReplicaSetChangeListenerHandle _replicaSetChangeListener; }; /** diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp index 54b609b92f5..fd4c6b138c3 100644 --- a/src/mongo/s/client/shard_registry.cpp +++ b/src/mongo/s/client/shard_registry.cpp @@ -251,7 +251,8 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) { } ThreadClient tc("shard-registry-reload", getGlobalServiceContext()); - auto opCtx = cc().makeOperationContext(); + + auto opCtx = tc->makeOperationContext(); try { reload(opCtx.get()); @@ -341,49 +342,36 @@ bool ShardRegistry::reload(OperationContext* opCtx) { return true; } -void ShardRegistry::replicaSetChangeShardRegistryUpdateHook( - const std::string& setName, const std::string& newConnectionString) { - // Inform the ShardRegsitry of the new connection string for the shard. - auto connString = fassert(28805, ConnectionString::parse(newConnectionString)); - invariant(setName == connString.getSetName()); - Grid::get(getGlobalServiceContext())->shardRegistry()->updateReplSetHosts(connString); -} +void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext, + const ConnectionString& connStr) noexcept { + ThreadClient tc("UpdateReplicaSetOnConfigServer", serviceContext); -void ShardRegistry::replicaSetChangeConfigServerUpdateHook(const std::string& setName, - const std::string& newConnectionString) { - // This is run in it's own thread. Exceptions escaping would result in a call to terminate. - Client::initThread("replSetChange"); - auto opCtx = cc().makeOperationContext(); + auto opCtx = tc->makeOperationContext(); auto const grid = Grid::get(opCtx.get()); - try { - std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(setName); - if (!s) { - LOG(1) << "shard not found for set: " << newConnectionString - << " when attempting to inform config servers of updated set membership"; - return; - } + std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(connStr.getSetName()); + if (!s) { + LOG(1) << "shard not found for set: " << connStr + << " when attempting to inform config servers of updated set membership"; + return; + } - if (s->isConfig()) { - // No need to tell the config servers their own connection string. - return; - } + if (s->isConfig()) { + // No need to tell the config servers their own connection string. + return; + } - auto status = grid->catalogClient()->updateConfigDocument( - opCtx.get(), - ShardType::ConfigNS, - BSON(ShardType::name(s->getId().toString())), - BSON("$set" << BSON(ShardType::host(newConnectionString))), - false, - ShardingCatalogClient::kMajorityWriteConcern); - if (!status.isOK()) { - error() << "RSChangeWatcher: could not update config db for set: " << setName - << " to: " << newConnectionString << causedBy(status.getStatus()); - } - } catch (const std::exception& e) { - warning() << "caught exception while updating config servers: " << e.what(); - } catch (...) { - warning() << "caught unknown exception while updating config servers"; + auto swWasUpdated = grid->catalogClient()->updateConfigDocument( + opCtx.get(), + ShardType::ConfigNS, + BSON(ShardType::name(s->getId().toString())), + BSON("$set" << BSON(ShardType::host(connStr.toString()))), + false, + ShardingCatalogClient::kMajorityWriteConcern); + auto status = swWasUpdated.getStatus(); + if (!status.isOK()) { + error() << "RSChangeWatcher: could not update config db with connection string " << connStr + << causedBy(redact(status)); } } diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h index 87ad1cc0523..f9707b2ae3e 100644 --- a/src/mongo/s/client/shard_registry.h +++ b/src/mongo/s/client/shard_registry.h @@ -259,22 +259,11 @@ public: void shutdown(); /** - * For use in mongos and mongod which needs notifications about changes to shard and config - * server replset membership to update the ShardRegistry. - * - * This is expected to be run in an existing thread. - */ - static void replicaSetChangeShardRegistryUpdateHook(const std::string& setName, - const std::string& newConnectionString); - - /** * For use in mongos which needs notifications about changes to shard replset membership to * update the config.shards collection. - * - * This is expected to be run in a brand new thread. */ - static void replicaSetChangeConfigServerUpdateHook(const std::string& setName, - const std::string& newConnectionString); + static void updateReplicaSetOnConfigServer(ServiceContext* serviceContex, + const ConnectionString& connStr) noexcept; private: /** diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 9a8e6ebe71a..29c490ba8b0 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -122,6 +122,9 @@ using logger::LogComponent; #define __has_feature(x) 0 #endif +// Failpoint for disabling replicaSetChangeConfigServerUpdateHook calls on signaled mongos. +MONGO_FAIL_POINT_DEFINE(failReplicaSetChangeConfigServerUpdateHook); + namespace { #if defined(_WIN32) @@ -341,6 +344,52 @@ void initWireSpec() { spec.isInternalClient = true; } +class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener { +public: + ShardingReplicaSetChangeListener(ServiceContext* serviceContext) + : _serviceContext(serviceContext) {} + ~ShardingReplicaSetChangeListener() final = default; + + void onFoundSet(const Key& key) final {} + + void onConfirmedSet(const State& state) final { + auto connStr = state.connStr; + + auto fun = [ serviceContext = _serviceContext, connStr ](auto args) { + if (ErrorCodes::isCancelationError(args.status.code())) { + return; + } + uassertStatusOK(args.status); + + LOG(0) << "Updating sharding state with confirmed set " << connStr; + + Grid::get(serviceContext)->shardRegistry()->updateReplSetHosts(connStr); + + if (MONGO_FAIL_POINT(failReplicaSetChangeConfigServerUpdateHook)) { + return; + } + ShardRegistry::updateReplicaSetOnConfigServer(serviceContext, connStr); + }; + + auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor(); + auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus(); + if (ErrorCodes::isCancelationError(schedStatus.code())) { + LOG(2) << "Unable to schedule confirmed set update due to " << schedStatus; + return; + } + uassertStatusOK(schedStatus); + } + + void onPossibleSet(const State& state) final { + Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr); + } + + void onDroppedSet(const Key& key) final {} + +private: + ServiceContext* _serviceContext; +}; + ExitCode runMongosServer(ServiceContext* serviceContext) { Client::initThread("mongosMain"); printShardingVersionInfo(false); @@ -379,10 +428,11 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { shardConnectionPool.addHook(new ShardingConnectionHook(true, std::move(shardedHookList))); - ReplicaSetMonitor::setAsynchronousConfigChangeHook( - &ShardRegistry::replicaSetChangeConfigServerUpdateHook); - ReplicaSetMonitor::setSynchronousConfigChangeHook( - &ShardRegistry::replicaSetChangeShardRegistryUpdateHook); + // Hook up a Listener for changes from the ReplicaSetMonitor + // This will last for the scope of this function. i.e. until shutdown finishes + auto shardingRSCL = + ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>( + serviceContext); // Mongos connection pools already takes care of authenticating new connections so the // replica set connection shouldn't need to. |