summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/use_rsm_data_for_cs.js6
-rw-r--r--src/mongo/client/SConscript1
-rw-r--r--src/mongo/client/replica_set_change_notifier.cpp150
-rw-r--r--src/mongo/client/replica_set_change_notifier.h173
-rw-r--r--src/mongo/client/replica_set_monitor.cpp137
-rw-r--r--src/mongo/client/replica_set_monitor.h33
-rw-r--r--src/mongo/client/replica_set_monitor_internal.h22
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp4
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h6
-rw-r--r--src/mongo/client/replica_set_monitor_scan_test.cpp360
-rw-r--r--src/mongo/client/replica_set_monitor_test_fixture.h12
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp8
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.cpp107
-rw-r--r--src/mongo/db/s/sharding_initialization_mongod.h11
-rw-r--r--src/mongo/s/client/shard_registry.cpp66
-rw-r--r--src/mongo/s/client/shard_registry.h15
-rw-r--r--src/mongo/s/server.cpp58
17 files changed, 956 insertions, 213 deletions
diff --git a/jstests/sharding/use_rsm_data_for_cs.js b/jstests/sharding/use_rsm_data_for_cs.js
index 0b10a0542cd..c2fafec4889 100644
--- a/jstests/sharding/use_rsm_data_for_cs.js
+++ b/jstests/sharding/use_rsm_data_for_cs.js
@@ -13,9 +13,11 @@
assert.eq(db.foo.find({_id: 1}).next().x, 1);
// prevent RSM on all nodes to update config shard
- mongos.adminCommand({configureFailPoint: "failAsyncConfigChangeHook", mode: "alwaysOn"});
+ mongos.adminCommand(
+ {configureFailPoint: "failReplicaSetChangeConfigServerUpdateHook", mode: "alwaysOn"});
rs.nodes.forEach(function(node) {
- node.adminCommand({configureFailPoint: "failAsyncConfigChangeHook", mode: "alwaysOn"});
+ node.adminCommand(
+ {configureFailPoint: "failUpdateShardIdentityConfigString", mode: "alwaysOn"});
});
// add a node to shard rs
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index ffba4798db9..d1502b181bc 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -186,6 +186,7 @@ clientDriverEnv.Library(
'dbclient_rs.cpp',
'global_conn_pool.cpp',
env.Idlc('global_conn_pool.idl')[0],
+ 'replica_set_change_notifier.cpp',
'replica_set_monitor.cpp',
'replica_set_monitor_manager.cpp',
],
diff --git a/src/mongo/client/replica_set_change_notifier.cpp b/src/mongo/client/replica_set_change_notifier.cpp
new file mode 100644
index 00000000000..9e8526979a9
--- /dev/null
+++ b/src/mongo/client/replica_set_change_notifier.cpp
@@ -0,0 +1,150 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/replica_set_change_notifier.h"
+
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
+#include "mongo/util/stacktrace.h"
+
+namespace mongo {
+
+void ReplicaSetChangeNotifier::_addListener(Listener* listener) {
+ stdx::lock_guard lk(_mutex);
+
+ listener->init(this);
+ _listeners.push_back(listener);
+}
+
+void ReplicaSetChangeNotifier::_removeListener(Listener* listener) {
+ stdx::lock_guard lk(_mutex);
+
+ auto& listeners = _listeners;
+ listeners.erase(std::remove(listeners.begin(), listeners.end(), listener), listeners.end());
+}
+
+void ReplicaSetChangeNotifier::onFoundSet(const std::string& name) {
+ LOG(2) << "Signaling found set " << name;
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ _replicaSetStates.emplace(name, State{});
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onFoundSet(name);
+ };
+}
+
+void ReplicaSetChangeNotifier::onPossibleSet(ConnectionString connectionString) {
+ LOG(2) << "Signaling possible set " << connectionString;
+
+ const auto& name = connectionString.getSetName();
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto state = [&] {
+ auto& state = _replicaSetStates[name];
+ ++state.generation;
+
+ state.connStr = connectionString;
+ state.primary = {};
+
+ return state;
+ }();
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onPossibleSet(state);
+ };
+}
+
+void ReplicaSetChangeNotifier::onConfirmedSet(ConnectionString connectionString,
+ HostAndPort primary) {
+ LOG(2) << "Signaling confirmed set " << connectionString << " with primary " << primary;
+
+ const auto& name = connectionString.getSetName();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto state = [&] {
+ auto& state = _replicaSetStates[name];
+ ++state.generation;
+
+ state.connStr = connectionString;
+ state.primary = primary;
+
+ return state;
+ }();
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onConfirmedSet(state);
+ };
+}
+
+void ReplicaSetChangeNotifier::onDroppedSet(const std::string& name) {
+ LOG(2) << "Signaling dropped set " << name;
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // If we never singaled the initial possible set, we should not on dropped set
+ auto it = _replicaSetStates.find(name);
+ if (it == _replicaSetStates.end()) {
+ return;
+ }
+
+ _replicaSetStates.erase(it);
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onDroppedSet(name);
+ };
+}
+
+auto ReplicaSetChangeNotifier::Listener::getCurrentState(const Key& key) -> State {
+ invariant(_notifier);
+
+ stdx::lock_guard lk(_notifier->_mutex);
+
+ return _notifier->_replicaSetStates.at(key);
+}
+
+} // namespace mongo
diff --git a/src/mongo/client/replica_set_change_notifier.h b/src/mongo/client/replica_set_change_notifier.h
new file mode 100644
index 00000000000..23427c9fd0c
--- /dev/null
+++ b/src/mongo/client/replica_set_change_notifier.h
@@ -0,0 +1,173 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "mongo/client/connection_string.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/functional.h"
+
+namespace mongo {
+
+/**
+ * A stateful notifier for events from a set of ReplicaSetMonitors
+ */
+class ReplicaSetChangeNotifier {
+public:
+ using Key = std::string;
+ class Listener;
+ using ListenerHandle = std::unique_ptr<Listener, unique_function<void(Listener*)>>;
+ struct State;
+
+public:
+ ReplicaSetChangeNotifier() = default;
+ ReplicaSetChangeNotifier(const ReplicaSetChangeNotifier&) = delete;
+ ReplicaSetChangeNotifier(ReplicaSetChangeNotifier&&) = delete;
+ ReplicaSetChangeNotifier& operator=(const ReplicaSetChangeNotifier&) = delete;
+ ReplicaSetChangeNotifier& operator=(ReplicaSetChangeNotifier&&) = delete;
+
+ /**
+ * Notify every listener that there is a new ReplicaSet and initialize the State
+ */
+ void onFoundSet(const std::string& replicaSet);
+
+ /**
+ * Notify every listener that a scan completed without finding a primary and update
+ */
+ void onPossibleSet(ConnectionString connectionString);
+
+ /**
+ * Notify every listener that a scan completed and found a new primary or config
+ */
+ void onConfirmedSet(ConnectionString connectionString, HostAndPort primary);
+
+ /**
+ * Notify every listener that a ReplicaSet is no longer in use and drop the State
+ */
+ void onDroppedSet(const std::string& replicaSet);
+
+ /**
+ * Create a listener of a given type and bind it to this notifier
+ */
+ template <typename DerivedT,
+ typename... Args,
+ typename = std::enable_if_t<std::is_constructible_v<DerivedT, Args...>>>
+ auto makeListener(Args&&... args) {
+ auto deleter = [this](auto listener) { _removeListener(listener); };
+ auto ptr = new DerivedT(std::forward<Args>(args)...);
+
+ _addListener(ptr);
+
+ return ListenerHandle(ptr, std::move(deleter));
+ }
+
+private:
+ void _addListener(Listener* listener);
+ void _removeListener(Listener* listener);
+
+ stdx::mutex _mutex;
+ std::vector<Listener*> _listeners;
+ stdx::unordered_map<Key, State> _replicaSetStates;
+};
+
+/**
+ * A listener for events from a set of ReplicaSetMonitors
+ *
+ * This class will normally be notified of events for every replica set in use in the system.
+ * The onSet functions are all called syncronously by the notifier,
+ * if your implementation would block or seriously delay execution,
+ * please schedule the majority of the work to complete asynchronously.
+ */
+class ReplicaSetChangeNotifier::Listener {
+public:
+ using Notifier = ReplicaSetChangeNotifier;
+ using Key = typename Notifier::Key;
+ using State = typename Notifier::State;
+
+public:
+ Listener(const Listener&) = delete;
+ Listener(Listener&&) = delete;
+ Listener& operator=(const Listener&) = delete;
+ Listener& operator=(Listener&&) = delete;
+
+ Listener() = default;
+ virtual ~Listener() = default;
+
+ /**
+ * Initialize this listener with a notifier
+ */
+ void init(Notifier* notifier) {
+ _notifier = notifier;
+ }
+
+ /**
+ * React to a new ReplicaSet that will soon be scanned
+ */
+ virtual void onFoundSet(const Key& key) = 0;
+
+ /**
+ * React to a finished scan that found no primary
+ */
+ virtual void onPossibleSet(const State& data) = 0;
+
+ /**
+ * React to a finished scan that found a primary
+ */
+ virtual void onConfirmedSet(const State& data) = 0;
+
+ /**
+ * React to a ReplicaSet being dropped from use
+ */
+ virtual void onDroppedSet(const Key& key) = 0;
+
+ /**
+ * Get the State as of the last signal function invoked on the Notifier
+ */
+ State getCurrentState(const Key& key);
+
+private:
+ Notifier* _notifier = nullptr;
+};
+
+using ReplicaSetChangeListenerHandle = ReplicaSetChangeNotifier::ListenerHandle;
+
+struct ReplicaSetChangeNotifier::State {
+ ConnectionString connStr;
+ HostAndPort primary;
+
+ int64_t generation = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 3e0501af9a8..a1185cb519e 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -47,7 +47,6 @@
#include "mongo/db/server_options.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
#include "mongo/util/background.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/exit.h"
@@ -64,9 +63,6 @@ using std::set;
using std::string;
using std::vector;
-// Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes.
-MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook);
-
// Failpoint for changing the default refresh period
MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod);
@@ -95,10 +91,6 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn
const Milliseconds kExpeditedRefreshPeriod(500);
AtomicWord<bool> areRefreshRetriesDisabledForTest{false}; // Only true in tests.
-// TODO: Move to ReplicaSetMonitorManager
-ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook;
-ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook;
-
//
// Helpers for stl algorithms
//
@@ -190,7 +182,8 @@ Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() {
ReplicaSetMonitor::ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {}
ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
- : ReplicaSetMonitor(std::make_shared<SetState>(uri, globalRSMonitorManager.getExecutor())) {}
+ : ReplicaSetMonitor(std::make_shared<SetState>(
+ uri, &globalRSMonitorManager.getNotifier(), globalRSMonitorManager.getExecutor())) {}
void ReplicaSetMonitor::init() {
if (areRefreshRetriesDisabledForTest.load()) {
@@ -198,25 +191,15 @@ void ReplicaSetMonitor::init() {
warning() << "*** Not starting background refresh because refresh retries are disabled.";
return;
}
+
+ _state->init();
+
stdx::lock_guard<stdx::mutex> lk(_state->mutex);
_scheduleRefresh(_state->now(), lk);
}
ReplicaSetMonitor::~ReplicaSetMonitor() {
- // need this lock because otherwise can get race with _scheduleRefresh()
- stdx::lock_guard<stdx::mutex> lk(_state->mutex);
- if (!_refresherHandle || !_state->executor) {
- return;
- }
-
- _state->currentScan.reset();
- _state->executor->cancel(_refresherHandle);
- // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang
- // Its ok not to call it because the d-tor is called only when the last owning pointer goes out
- // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a
- // task to execute eliminating the need to call method "wait".
- //
- _refresherHandle = {};
+ _state->drop();
}
void ReplicaSetMonitor::_scheduleRefresh(Date_t when, WithLock) {
@@ -367,7 +350,9 @@ std::string ReplicaSetMonitor::getName() const {
std::string ReplicaSetMonitor::getServerAddress() const {
stdx::lock_guard<stdx::mutex> lk(_state->mutex);
- return _state->getConfirmedServerAddress();
+ // We return our setUri until first confirmation
+ return _state->seedConnStr.isValid() ? _state->seedConnStr.toString()
+ : _state->setUri.connectionString().toString();
}
const MongoURI& ReplicaSetMonitor::getOriginalUri() const {
@@ -402,14 +387,8 @@ void ReplicaSetMonitor::remove(const string& name) {
globalConnPool.removeHost(name);
}
-void ReplicaSetMonitor::setAsynchronousConfigChangeHook(ConfigChangeHook hook) {
- invariant(!asyncConfigChangeHook);
- asyncConfigChangeHook = hook;
-}
-
-void ReplicaSetMonitor::setSynchronousConfigChangeHook(ConfigChangeHook hook) {
- invariant(!syncConfigChangeHook);
- syncConfigChangeHook = hook;
+ReplicaSetChangeNotifier& ReplicaSetMonitor::getNotifier() {
+ return globalRSMonitorManager.getNotifier();
}
// TODO move to correct order with non-statics before pushing
@@ -452,8 +431,6 @@ void ReplicaSetMonitor::shutdown() {
void ReplicaSetMonitor::cleanup() {
globalRSMonitorManager.removeAllMonitors();
- asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
- syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
}
void ReplicaSetMonitor::disableRefreshRetries_forTest() {
@@ -605,20 +582,20 @@ Refresher::NextStep Refresher::getNextStep() {
// NOTE: we don't modify seedNodes or notify about set membership change in this
// case since it hasn't been confirmed by a master.
- const string oldAddr = _set->getUnconfirmedServerAddress();
for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
it != _scan->unconfirmedReplies.end();
++it) {
_set->findOrCreateNode(it->first)->update(it->second);
}
- const string newAddr = _set->getUnconfirmedServerAddress();
- if (oldAddr != newAddr && syncConfigChangeHook) {
- // Run the syncConfigChangeHook because the ShardRegistry needs to know about any
- // node we might talk to. Don't run the asyncConfigChangeHook because we don't
- // want to update the seed list stored on the config servers with unconfirmed hosts.
- syncConfigChangeHook(_set->name, _set->getUnconfirmedServerAddress());
+ auto connStr = _set->possibleConnectionString();
+ if (connStr != _set->workingConnStr) {
+ _set->workingConnStr = std::move(connStr);
+ _set->notifier->onPossibleSet(_set->workingConnStr);
}
+
+ // If at some point we care about lacking a primary, on it here
+ _set->lastSeenMaster = {};
}
if (_scan->foundAnyUpNodes) {
@@ -833,26 +810,23 @@ Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMa
}
}
- if (reply.normalHosts != _set->seedNodes) {
- const string oldAddr = _set->getConfirmedServerAddress();
+ bool changedHosts = reply.normalHosts != _set->seedNodes;
+ bool changedPrimary = reply.host != _set->lastSeenMaster;
+ if (changedHosts || changedPrimary) {
+ ++_set->seedGen;
_set->seedNodes = reply.normalHosts;
+ _set->seedConnStr = _set->confirmedConnectionString();
// LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
// and we want to record our changes
- log() << "changing hosts to " << _set->getConfirmedServerAddress() << " from " << oldAddr;
-
- if (syncConfigChangeHook) {
- syncConfigChangeHook(_set->name, _set->getConfirmedServerAddress());
- }
+ log() << "Confirmed replica set for " << _set->name << " is " << _set->seedConnStr;
- if (asyncConfigChangeHook && !MONGO_FAIL_POINT(failAsyncConfigChangeHook)) {
- // call from a separate thread to avoid blocking and holding lock while potentially
- // going over the network
- stdx::thread bg(asyncConfigChangeHook, _set->name, _set->getConfirmedServerAddress());
- bg.detach();
- }
+ _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host);
}
+ // Update our working string
+ _set->workingConnStr = _set->seedConnStr;
+
// Update other nodes's information based on replies we've already seen
for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
it != _scan->unconfirmedReplies.end();
@@ -870,7 +844,6 @@ Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMa
void Refresher::receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply) {
invariant(!reply.isMaster);
- // This function doesn't alter _set at all. It only modifies the work queue in _scan.
// Add everyone this host claims is in the set to possibleNodes.
_scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end());
@@ -1013,9 +986,12 @@ void Node::update(const IsMasterReply& reply) {
lastWriteDateUpdateTime = Date_t::now();
}
-SetState::SetState(const MongoURI& uri, executor::TaskExecutor* executor)
+SetState::SetState(const MongoURI& uri,
+ ReplicaSetChangeNotifier* notifier,
+ executor::TaskExecutor* executor)
: setUri(std::move(uri)),
name(setUri.getSetName()),
+ notifier(notifier),
executor(executor),
seedNodes(setUri.getServers().begin(), setUri.getServers().end()),
latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)),
@@ -1223,33 +1199,21 @@ void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
node->update(reply);
}
-std::string SetState::getConfirmedServerAddress() const {
- StringBuilder ss;
- if (!name.empty())
- ss << name << "/";
-
- for (std::set<HostAndPort>::const_iterator it = seedNodes.begin(); it != seedNodes.end();
- ++it) {
- if (it != seedNodes.begin())
- ss << ",";
- it->append(ss);
- }
+ConnectionString SetState::confirmedConnectionString() const {
+ std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes));
- return ss.str();
+ return ConnectionString::forReplicaSet(name, std::move(hosts));
}
-std::string SetState::getUnconfirmedServerAddress() const {
- StringBuilder ss;
- if (!name.empty())
- ss << name << "/";
+ConnectionString SetState::possibleConnectionString() const {
+ std::vector<HostAndPort> hosts;
+ hosts.reserve(nodes.size());
- for (std::vector<Node>::const_iterator it = nodes.begin(); it != nodes.end(); ++it) {
- if (it != nodes.begin())
- ss << ",";
- it->host.append(ss);
+ for (auto& node : nodes) {
+ hosts.push_back(node.host);
}
- return ss.str();
+ return ConnectionString::forReplicaSet(name, std::move(hosts));
}
void SetState::notify(bool finishedScan) {
@@ -1288,6 +1252,23 @@ Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criter
<< name);
}
+void SetState::init() {
+ notifier->onFoundSet(name);
+}
+
+void SetState::drop() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ currentScan.reset();
+ notify(/*finishedScan*/ true);
+ }
+
+ // No point in notifying if we never started
+ if (workingConnStr.isValid()) {
+ notifier->onDroppedSet(name);
+ }
+}
+
void SetState::checkInvariants() const {
bool foundMaster = false;
for (size_t i = 0; i < nodes.size(); i++) {
@@ -1303,7 +1284,7 @@ void SetState::checkInvariants() const {
foundMaster = true;
// if we have a master it should be the same as lastSeenMaster
- invariant(nodes[i].host == lastSeenMaster);
+ invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster);
}
// should never end up with negative latencies
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index 6f0a9d6ddb2..a306b1b9547 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -37,6 +37,7 @@
#include "mongo/base/string_data.h"
#include "mongo/client/mongo_uri.h"
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/functional.h"
@@ -62,16 +63,9 @@ class ReplicaSetMonitor : public std::enable_shared_from_this<ReplicaSetMonitor>
public:
class Refresher;
- typedef stdx::function<void(const std::string& setName, const std::string& newConnectionString)>
- ConfigChangeHook;
-
/**
- * Initializes local state.
- *
- * seeds must not be empty.
+ * Initializes local state from a MongoURI.
*/
- ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds);
-
ReplicaSetMonitor(const MongoURI& uri);
/**
@@ -198,27 +192,9 @@ public:
static void remove(const std::string& name);
/**
- * Sets the hook to be called whenever the config of any replica set changes.
- * Currently only 1 globally, so this asserts if one already exists.
- *
- * The hook will be called from a fresh thread. It is responsible for initializing any
- * thread-local state and ensuring that no exceptions escape.
- *
- * The hook must not be changed while the program has multiple threads.
- */
- static void setAsynchronousConfigChangeHook(ConfigChangeHook hook);
-
- /**
- * Sets the hook to be called whenever the config of any replica set changes.
- * Currently only 1 globally, so this asserts if one already exists.
- *
- * The hook will be called inline while refreshing the ReplicaSetMonitor's view of the set
- * membership. It is important that the hook not block for long as it will be running under
- * the ReplicaSetMonitor's mutex.
- *
- * The hook must not be changed while the program has multiple threads.
+ * Returns the change notifier for the underlying ReplicaMonitorManager
*/
- static void setSynchronousConfigChangeHook(ConfigChangeHook hook);
+ static ReplicaSetChangeNotifier& getNotifier();
/**
* Permanently stops all monitoring on replica sets and clears all cached information
@@ -384,7 +360,6 @@ private:
/**
* Adjusts the _scan work queue based on information from this host.
* This should only be called with replies from non-masters.
- * Does not update _set at all.
*/
void receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply);
diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h
index c7f72ae3671..04479b6dab9 100644
--- a/src/mongo/client/replica_set_monitor_internal.h
+++ b/src/mongo/client/replica_set_monitor_internal.h
@@ -145,7 +145,7 @@ public:
Promise<HostAndPort> promise;
};
- SetState(const MongoURI& uri, executor::TaskExecutor*);
+ SetState(const MongoURI& uri, ReplicaSetChangeNotifier*, executor::TaskExecutor*);
bool isUsable() const;
@@ -173,13 +173,13 @@ public:
* Returns the connection string of the nodes that are known the be in the set because we've
* seen them in the isMaster reply of a PRIMARY.
*/
- std::string getConfirmedServerAddress() const;
+ ConnectionString confirmedConnectionString() const;
/**
* Returns the connection string of the nodes that are believed to be in the set because we've
* seen them in the isMaster reply of non-PRIMARY nodes in our seed list.
*/
- std::string getUnconfirmedServerAddress() const;
+ ConnectionString possibleConnectionString() const;
/**
* Call this to notify waiters after a scan processes a valid reply or finishes.
@@ -193,6 +193,16 @@ public:
Status makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const;
/**
+ * Notifies all listeners that the ReplicaSet is in use.
+ */
+ void init();
+
+ /**
+ * Resets the current scan and notifies all listeners that the ReplicaSet isn't in use.
+ */
+ void drop();
+
+ /**
* Before unlocking, do DEV checkInvariants();
*/
void checkInvariants() const;
@@ -200,17 +210,21 @@ public:
const MongoURI setUri; // URI passed to ctor -- THIS IS NOT UPDATED BY SCANS
const std::string name;
+ ReplicaSetChangeNotifier* const notifier;
executor::TaskExecutor* const executor;
stdx::mutex mutex; // You must hold this to access any member below.
// For starting scans
std::set<HostAndPort> seedNodes; // updated whenever a master reports set membership changes
+ ConnectionString seedConnStr; // The connection string from the last time we had valid seeds
+ int64_t seedGen = 0;
bool isMocked = false; // True if this set is using nodes from MockReplicaSet
// For tracking scans
- HostAndPort lastSeenMaster; // Empty if we have never seen a master
+ // lastSeenMaster is empty if we have never seen a master or the last scan didn't have one
+ HostAndPort lastSeenMaster;
int consecutiveFailedScans = 0;
Nodes nodes; // maintained sorted and unique by host
ConnectionString workingConnStr; // The connection string from our last scan
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index c2b05cc61ff..f8f6078b23a 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -224,4 +224,8 @@ TaskExecutor* ReplicaSetMonitorManager::getExecutor() {
return _taskExecutor.get();
}
+ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
+ return _notifier;
+}
+
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index ed92bc3b28c..1adf42fb16d 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -32,6 +32,7 @@
#include <string>
#include <vector>
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/string_map.h"
@@ -94,6 +95,8 @@ public:
*/
executor::TaskExecutor* getExecutor();
+ ReplicaSetChangeNotifier& getNotifier();
+
private:
using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>;
@@ -103,6 +106,9 @@ private:
// Executor for monitoring replica sets.
std::unique_ptr<executor::TaskExecutor> _taskExecutor;
+ // Widget to notify listeners when a RSM notices a change
+ ReplicaSetChangeNotifier _notifier;
+
// Needs to be after `_taskExecutor`, so that it will be destroyed before the `_taskExecutor`.
ReplicaSetMonitorsMap _monitors;
diff --git a/src/mongo/client/replica_set_monitor_scan_test.cpp b/src/mongo/client/replica_set_monitor_scan_test.cpp
index 4b4f3f4b12f..2f2cebf556a 100644
--- a/src/mongo/client/replica_set_monitor_scan_test.cpp
+++ b/src/mongo/client/replica_set_monitor_scan_test.cpp
@@ -1613,5 +1613,365 @@ TEST_F(MinOpTimeTest, MinOpTimeIgnored) {
ASSERT_EQUALS(notStale.host(), "c");
}
+// -- ReplicaSetChangeNotifier/Listener tests --
+
+class Listener : public ReplicaSetChangeNotifier::Listener {
+public:
+ void logEvent(StringData name, const Key& key) {
+ log() << name << ": " << key;
+ }
+ void logEvent(StringData name, const State& state) {
+ log() << name << ": "
+ << "(" << state.generation << ") " << state.connStr << " | " << state.primary;
+ }
+
+ void onFoundSet(const Key& key) override {
+ lastFoundSetId = ++eventId;
+ logEvent("FoundSet", key);
+ }
+ void onPossibleSet(const State& state) override {
+ lastPossibleSetId = ++eventId;
+ logEvent("PossibleSet", state);
+ lastState = state;
+ }
+ void onConfirmedSet(const State& state) override {
+ lastConfirmedSetId = ++eventId;
+ logEvent("ConfirmedSet", state);
+ lastState = state;
+ }
+ void onDroppedSet(const Key& key) override {
+ lastDroppedSetId = ++eventId;
+ logEvent("DroppedSet", key);
+ }
+
+ int64_t eventId = -1;
+ int64_t lastFoundSetId = -1;
+ int64_t lastPossibleSetId = -1;
+ int64_t lastConfirmedSetId = -1;
+ int64_t lastDroppedSetId = -1;
+ State lastState;
+};
+
+class ChangeNotifierTest : public ReplicaSetMonitorTest {
+public:
+ ChangeNotifierTest() = default;
+ virtual ~ChangeNotifierTest() = default;
+
+ enum class NodeState {
+ kUnknown = 0,
+ kPrimary,
+ kSecondary,
+ kStandalone,
+ };
+
+ auto& listener() const {
+ return *static_cast<Listener*>(_listener.get());
+ }
+
+ void updateSet(std::map<HostAndPort, NodeState> replicaSet) {
+ auto refresher = Refresher(_state);
+ std::set<HostAndPort> seen;
+ HostAndPort primary;
+ std::set<HostAndPort> members;
+
+ BSONArrayBuilder arrayBuilder;
+ for (const auto & [ host, nodeState ] : replicaSet) {
+ if (nodeState == NodeState::kStandalone) {
+ continue;
+ }
+
+ if (nodeState == NodeState::kPrimary) {
+ primary = host;
+ }
+
+ members.insert(host);
+
+ arrayBuilder.append(StringData(host.host()));
+ }
+ auto bsonHosts = arrayBuilder.arr();
+
+ auto markIsMaster = [&](auto host, bool isMaster) {
+ refresher.receivedIsMaster(
+ host,
+ -1,
+ BSON("setName" << kSetName << "ismaster" << isMaster << "secondary" << !isMaster
+ << "hosts"
+ << bsonHosts
+ << "ok"
+ << true));
+
+ };
+
+ auto markFailed = [&](auto host) {
+ refresher.failedHost(host, {ErrorCodes::InternalError, "Test error"});
+ };
+
+ auto gen = listener().lastState.generation;
+
+ NextStep ns = refresher.getNextStep();
+ for (; ns.step != NextStep::DONE; ns = refresher.getNextStep()) {
+ ASSERT_EQUALS(ns.step, NextStep::CONTACT_HOST);
+ ASSERT(replicaSet.count(ns.host));
+ ASSERT(!seen.count(ns.host));
+ seen.insert(ns.host);
+
+ // mock a reply
+ switch (replicaSet[ns.host]) {
+ case NodeState::kStandalone: {
+ markFailed(ns.host);
+ } break;
+ case NodeState::kPrimary: {
+ markIsMaster(ns.host, true);
+ } break;
+ case NodeState::kSecondary: {
+ markIsMaster(ns.host, false);
+ } break;
+ case NodeState::kUnknown:
+ MONGO_UNREACHABLE;
+ };
+ }
+
+ // Verify that the listener received the right data
+ if (gen != listener().lastState.generation) {
+ // Our State is what the notifier thinks it should be
+ ASSERT_EQUALS(listener().lastState.connStr, listener().getCurrentState().connStr);
+ ASSERT_EQUALS(listener().lastState.primary, listener().getCurrentState().primary);
+ ASSERT_EQUALS(listener().lastState.generation, listener().getCurrentState().generation);
+
+ // Our State is what we'd expect
+ ASSERT_EQUALS(listener().lastState.connStr.getSetName(), kSetName);
+ ASSERT_EQUALS(listener().lastState.connStr.getServers().size(), members.size());
+ ASSERT_EQUALS(listener().lastState.primary, primary);
+ }
+
+ ASSERT_EQUALS(ns.step, NextStep::DONE);
+ ASSERT(ns.host.empty());
+ }
+
+protected:
+ decltype(_notifier)::ListenerHandle _listener = _notifier.makeListener<Listener>();
+
+ std::shared_ptr<SetState> _state = makeState(basicUri);
+};
+
+TEST_F(ChangeNotifierTest, NotifyNominal) {
+ auto currentId = -1;
+
+ // State exists. Signal: null
+ ASSERT_EQ(listener().lastFoundSetId, currentId);
+
+ // Initializing the state. Signal: FoundSet
+ _state->init();
+ ASSERT_EQ(listener().lastFoundSetId, ++currentId);
+
+ // 'a' claims to be primary. Signal: Confirmed
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Getting another scan with the same details. Signal: null
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, currentId);
+
+ // Dropped. Signal: Dropped
+ _state->drop();
+ ASSERT_EQ(listener().lastDroppedSetId, ++currentId);
+}
+
+TEST_F(ChangeNotifierTest, NotifyElections) {
+ auto currentId = -1;
+
+ // State exists. Signal: null
+ ASSERT_EQ(listener().lastFoundSetId, currentId);
+
+ // Initializing the state. Signal: FoundSet
+ _state->init();
+ ASSERT_EQ(listener().lastFoundSetId, ++currentId);
+
+ // 'a' claims to be primary. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // 'b' claims to be primary. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // All hosts tell us that they are not primary. Signal: null
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, currentId);
+
+ // 'a' claims to be primary again. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Dropped. Signal: Dropped
+ _state->drop();
+ ASSERT_EQ(listener().lastDroppedSetId, ++currentId);
+}
+
+TEST_F(ChangeNotifierTest, NotifyReconfig) {
+ auto currentId = -1;
+
+ // State exists. Signal: null
+ ASSERT_EQ(listener().lastFoundSetId, currentId);
+
+ // Initializing the state. Signal: FoundSet
+ _state->init();
+ ASSERT_EQ(listener().lastFoundSetId, ++currentId);
+
+ // Update the set with a full scan showing no primary. Signal: PossibleSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, ++currentId);
+
+ // Mark 'a' as removed. Signal: null
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kStandalone,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, currentId);
+
+ // Discover 'd' as secondary. Signal: PossibleSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("d"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastPossibleSetId, ++currentId);
+
+ // Mark 'b' as primary, no 'd'. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("d"), NodeState::kStandalone,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Mark 'a' as removed. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kStandalone,
+ },
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Mark 'a' as secondary again. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Dropped. Signal: Dropped
+ _state->drop();
+ ASSERT_EQ(listener().lastDroppedSetId, ++currentId);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_test_fixture.h b/src/mongo/client/replica_set_monitor_test_fixture.h
index cd37e5ecdab..7e842008453 100644
--- a/src/mongo/client/replica_set_monitor_test_fixture.h
+++ b/src/mongo/client/replica_set_monitor_test_fixture.h
@@ -33,6 +33,7 @@
#include <type_traits>
#include <vector>
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/replica_set_monitor_internal.h"
#include "mongo/unittest/unittest.h"
@@ -61,11 +62,15 @@ public:
template <typename... Args>
using StateIsConstructible =
- std::enable_if_t<std::is_constructible_v<SetState, Args..., executor::TaskExecutor* const>>;
+ std::enable_if_t<std::is_constructible_v<SetState,
+ Args...,
+ ReplicaSetChangeNotifier* const,
+ executor::TaskExecutor* const>>;
template <typename... Args, typename = StateIsConstructible<Args...>>
auto makeState(Args&&... args) {
- return std::make_shared<SetState>(std::forward<Args>(args)..., nullptr);
+ return std::make_shared<ReplicaSetMonitor::SetState>(
+ std::forward<Args>(args)..., &_notifier, nullptr);
}
void setUp() override {}
@@ -74,6 +79,9 @@ public:
static const std::vector<HostAndPort> basicSeeds;
static const std::set<HostAndPort> basicSeedsSet;
static const MongoURI basicUri;
+
+protected:
+ ReplicaSetChangeNotifier _notifier;
};
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 9d8162067fc..c34fe99d4f7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -779,12 +779,8 @@ void ReplicationCoordinatorExternalStateImpl::_shardingOnTransitionToPrimaryHook
const auto configsvrConnStr =
Grid::get(opCtx)->shardRegistry()->getConfigShard()->getConnString();
- status = ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString(
- opCtx, configsvrConnStr);
- if (!status.isOK()) {
- warning() << "error encountered while trying to update config connection string to "
- << configsvrConnStr << causedBy(status);
- }
+ ShardingInitializationMongoD::get(opCtx)->updateShardIdentityConfigString(opCtx,
+ configsvrConnStr);
CatalogCacheLoader::get(_service).onStepUp();
ChunkSplitter::get(_service).onStepUp();
diff --git a/src/mongo/db/s/sharding_initialization_mongod.cpp b/src/mongo/db/s/sharding_initialization_mongod.cpp
index 1902d884cee..7bee0b6a789 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.cpp
+++ b/src/mongo/db/s/sharding_initialization_mongod.cpp
@@ -67,6 +67,10 @@
#include "mongo/util/log.h"
namespace mongo {
+
+// Failpoint for disabling updateShardIdentityConfigString calls on signaled RS nodes.
+MONGO_FAIL_POINT_DEFINE(failUpdateShardIdentityConfigString);
+
namespace {
const auto getInstance = ServiceContext::declareDecoration<ShardingInitializationMongoD>();
@@ -83,41 +87,73 @@ auto makeEgressHooksList(ServiceContext* service) {
/**
* Updates the config server field of the shardIdentity document with the given connection string if
* setName is equal to the config server replica set name.
- *
- * NOTE: This is intended to be used on a new thread that hasn't called Client::initThread.
- *
- * One example use case is for the ReplicaSetMonitor asynchronous callback when it detects changes
- * to replica set membership.
*/
-void updateShardIdentityConfigStringCB(const std::string& setName,
- const std::string& newConnectionString) {
- auto configsvrConnStr =
- Grid::get(getGlobalServiceContext())->shardRegistry()->getConfigServerConnectionString();
- if (configsvrConnStr.getSetName() != setName) {
- // Ignore all change notification for other sets that are not the config server.
- return;
- }
+class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener {
+public:
+ ShardingReplicaSetChangeListener(ServiceContext* serviceContext)
+ : _serviceContext(serviceContext) {}
+ ~ShardingReplicaSetChangeListener() final = default;
+
+ void onFoundSet(const Key&) final {}
+
+ // Update the shard identy config string
+ void onConfirmedSet(const State& state) final {
+ auto connStr = state.connStr;
+
+ auto fun = [ serviceContext = _serviceContext, connStr ](auto args) {
+ if (ErrorCodes::isCancelationError(args.status.code())) {
+ return;
+ }
+ uassertStatusOK(args.status);
+
+ LOG(0) << "Updating config server with confirmed set " << connStr;
+ Grid::get(serviceContext)->shardRegistry()->updateReplSetHosts(connStr);
+
+ if (MONGO_FAIL_POINT(failUpdateShardIdentityConfigString)) {
+ return;
+ }
+
+ auto configsvrConnStr =
+ Grid::get(serviceContext)->shardRegistry()->getConfigServerConnectionString();
- Client::initThread("updateShardIdentityConfigConnString");
- auto uniqOpCtx = Client::getCurrent()->makeOperationContext();
+ // Only proceed if the notification is for the configsvr
+ if (configsvrConnStr.getSetName() != connStr.getSetName()) {
+ return;
+ }
+
+ ThreadClient tc("updateShardIdentityConfigString", serviceContext);
+ auto opCtx = tc->makeOperationContext();
+
+ ShardingInitializationMongoD::updateShardIdentityConfigString(opCtx.get(), connStr);
+ };
- auto status = ShardingInitializationMongoD::updateShardIdentityConfigString(
- uniqOpCtx.get(), uassertStatusOK(ConnectionString::parse(newConnectionString)));
- if (!status.isOK() && !ErrorCodes::isNotMasterError(status.code())) {
- warning() << "Error encountered while trying to update config connection string to "
- << newConnectionString << causedBy(redact(status));
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus();
+ if (ErrorCodes::isCancelationError(schedStatus.code())) {
+ LOG(2) << "Unable to schedule confirmed set update due to " << schedStatus;
+ return;
+ }
+ uassertStatusOK(schedStatus);
}
-}
+ void onPossibleSet(const State& state) final {
+ Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr);
+ }
+ void onDroppedSet(const Key&) final {}
-void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
- const ShardIdentity& shardIdentity,
- StringData distLockProcessId) {
+private:
+ ServiceContext* _serviceContext;
+};
+
+} // namespace
+
+void ShardingInitializationMongoD::initializeShardingEnvironmentOnShardServer(
+ OperationContext* opCtx, const ShardIdentity& shardIdentity, StringData distLockProcessId) {
initializeGlobalShardingStateForMongoD(
opCtx, shardIdentity.getConfigsvrConnectionString(), distLockProcessId);
- ReplicaSetMonitor::setSynchronousConfigChangeHook(
- &ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
- ReplicaSetMonitor::setAsynchronousConfigChangeHook(&updateShardIdentityConfigStringCB);
+ _replicaSetChangeListener =
+ ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>(
+ opCtx->getServiceContext());
// Determine primary/secondary/standalone state in order to properly initialize sharding
// components.
@@ -141,10 +177,10 @@ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
<< (isStandaloneOrPrimary ? "primary" : "secondary") << " node.";
}
-} // namespace
-
ShardingInitializationMongoD::ShardingInitializationMongoD()
- : _initFunc(initializeShardingEnvironmentOnShardServer) {}
+ : _initFunc([this](auto... args) {
+ this->initializeShardingEnvironmentOnShardServer(std::forward<decltype(args)>(args)...);
+ }) {}
ShardingInitializationMongoD::~ShardingInitializationMongoD() = default;
@@ -166,6 +202,7 @@ void ShardingInitializationMongoD::shutDown(OperationContext* opCtx) {
grid->getExecutorPool()->shutdownAndJoin();
grid->catalogClient()->shutDown(opCtx);
grid->shardRegistry()->shutdown();
+ _replicaSetChangeListener.reset();
}
bool ShardingInitializationMongoD::initializeShardingAwarenessIfNeeded(OperationContext* opCtx) {
@@ -306,7 +343,7 @@ void ShardingInitializationMongoD::initializeFromShardIdentity(
}
}
-Status ShardingInitializationMongoD::updateShardIdentityConfigString(
+void ShardingInitializationMongoD::updateShardIdentityConfigString(
OperationContext* opCtx, const ConnectionString& newConnectionString) {
BSONObj updateObj(
ShardIdentityType::createConfigServerUpdateObject(newConnectionString.toString()));
@@ -328,10 +365,12 @@ Status ShardingInitializationMongoD::updateShardIdentityConfigString(
<< newConnectionString;
}
} catch (const DBException& exception) {
- return exception.toStatus();
+ auto status = exception.toStatus();
+ if (!ErrorCodes::isNotMasterError(status.code())) {
+ warning() << "Error encountered while trying to update config connection string to "
+ << newConnectionString.toString() << causedBy(redact(status));
+ }
}
-
- return Status::OK();
}
void initializeGlobalShardingStateForMongoD(OperationContext* opCtx,
diff --git a/src/mongo/db/s/sharding_initialization_mongod.h b/src/mongo/db/s/sharding_initialization_mongod.h
index e26302b4bb2..a205d68d1b2 100644
--- a/src/mongo/db/s/sharding_initialization_mongod.h
+++ b/src/mongo/db/s/sharding_initialization_mongod.h
@@ -30,6 +30,7 @@
#pragma once
#include "mongo/base/string_data.h"
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/type_shard_identity.h"
#include "mongo/stdx/functional.h"
@@ -60,6 +61,10 @@ public:
static ShardingInitializationMongoD* get(OperationContext* opCtx);
static ShardingInitializationMongoD* get(ServiceContext* service);
+ void initializeShardingEnvironmentOnShardServer(OperationContext* opCtx,
+ const ShardIdentity& shardIdentity,
+ StringData distLockProcessId);
+
/**
* If started with --shardsvr, initializes sharding awareness from the shardIdentity document on
* disk, if there is one.
@@ -94,8 +99,8 @@ public:
* Updates the config server field of the shardIdentity document with the given connection
* string.
*/
- static Status updateShardIdentityConfigString(OperationContext* opCtx,
- const ConnectionString& newConnectionString);
+ static void updateShardIdentityConfigString(OperationContext* opCtx,
+ const ConnectionString& newConnectionString);
/**
* For testing only. Mock the initialization method used by initializeFromConfigConnString and
@@ -112,6 +117,8 @@ private:
// Function for initializing the sharding environment components (i.e. everything on the Grid)
ShardingEnvironmentInitFunc _initFunc;
+
+ ReplicaSetChangeListenerHandle _replicaSetChangeListener;
};
/**
diff --git a/src/mongo/s/client/shard_registry.cpp b/src/mongo/s/client/shard_registry.cpp
index 54b609b92f5..fd4c6b138c3 100644
--- a/src/mongo/s/client/shard_registry.cpp
+++ b/src/mongo/s/client/shard_registry.cpp
@@ -251,7 +251,8 @@ void ShardRegistry::_internalReload(const CallbackArgs& cbArgs) {
}
ThreadClient tc("shard-registry-reload", getGlobalServiceContext());
- auto opCtx = cc().makeOperationContext();
+
+ auto opCtx = tc->makeOperationContext();
try {
reload(opCtx.get());
@@ -341,49 +342,36 @@ bool ShardRegistry::reload(OperationContext* opCtx) {
return true;
}
-void ShardRegistry::replicaSetChangeShardRegistryUpdateHook(
- const std::string& setName, const std::string& newConnectionString) {
- // Inform the ShardRegsitry of the new connection string for the shard.
- auto connString = fassert(28805, ConnectionString::parse(newConnectionString));
- invariant(setName == connString.getSetName());
- Grid::get(getGlobalServiceContext())->shardRegistry()->updateReplSetHosts(connString);
-}
+void ShardRegistry::updateReplicaSetOnConfigServer(ServiceContext* serviceContext,
+ const ConnectionString& connStr) noexcept {
+ ThreadClient tc("UpdateReplicaSetOnConfigServer", serviceContext);
-void ShardRegistry::replicaSetChangeConfigServerUpdateHook(const std::string& setName,
- const std::string& newConnectionString) {
- // This is run in it's own thread. Exceptions escaping would result in a call to terminate.
- Client::initThread("replSetChange");
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = tc->makeOperationContext();
auto const grid = Grid::get(opCtx.get());
- try {
- std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(setName);
- if (!s) {
- LOG(1) << "shard not found for set: " << newConnectionString
- << " when attempting to inform config servers of updated set membership";
- return;
- }
+ std::shared_ptr<Shard> s = grid->shardRegistry()->lookupRSName(connStr.getSetName());
+ if (!s) {
+ LOG(1) << "shard not found for set: " << connStr
+ << " when attempting to inform config servers of updated set membership";
+ return;
+ }
- if (s->isConfig()) {
- // No need to tell the config servers their own connection string.
- return;
- }
+ if (s->isConfig()) {
+ // No need to tell the config servers their own connection string.
+ return;
+ }
- auto status = grid->catalogClient()->updateConfigDocument(
- opCtx.get(),
- ShardType::ConfigNS,
- BSON(ShardType::name(s->getId().toString())),
- BSON("$set" << BSON(ShardType::host(newConnectionString))),
- false,
- ShardingCatalogClient::kMajorityWriteConcern);
- if (!status.isOK()) {
- error() << "RSChangeWatcher: could not update config db for set: " << setName
- << " to: " << newConnectionString << causedBy(status.getStatus());
- }
- } catch (const std::exception& e) {
- warning() << "caught exception while updating config servers: " << e.what();
- } catch (...) {
- warning() << "caught unknown exception while updating config servers";
+ auto swWasUpdated = grid->catalogClient()->updateConfigDocument(
+ opCtx.get(),
+ ShardType::ConfigNS,
+ BSON(ShardType::name(s->getId().toString())),
+ BSON("$set" << BSON(ShardType::host(connStr.toString()))),
+ false,
+ ShardingCatalogClient::kMajorityWriteConcern);
+ auto status = swWasUpdated.getStatus();
+ if (!status.isOK()) {
+ error() << "RSChangeWatcher: could not update config db with connection string " << connStr
+ << causedBy(redact(status));
}
}
diff --git a/src/mongo/s/client/shard_registry.h b/src/mongo/s/client/shard_registry.h
index 87ad1cc0523..f9707b2ae3e 100644
--- a/src/mongo/s/client/shard_registry.h
+++ b/src/mongo/s/client/shard_registry.h
@@ -259,22 +259,11 @@ public:
void shutdown();
/**
- * For use in mongos and mongod which needs notifications about changes to shard and config
- * server replset membership to update the ShardRegistry.
- *
- * This is expected to be run in an existing thread.
- */
- static void replicaSetChangeShardRegistryUpdateHook(const std::string& setName,
- const std::string& newConnectionString);
-
- /**
* For use in mongos which needs notifications about changes to shard replset membership to
* update the config.shards collection.
- *
- * This is expected to be run in a brand new thread.
*/
- static void replicaSetChangeConfigServerUpdateHook(const std::string& setName,
- const std::string& newConnectionString);
+ static void updateReplicaSetOnConfigServer(ServiceContext* serviceContex,
+ const ConnectionString& connStr) noexcept;
private:
/**
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 9a8e6ebe71a..29c490ba8b0 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -122,6 +122,9 @@ using logger::LogComponent;
#define __has_feature(x) 0
#endif
+// Failpoint for disabling replicaSetChangeConfigServerUpdateHook calls on signaled mongos.
+MONGO_FAIL_POINT_DEFINE(failReplicaSetChangeConfigServerUpdateHook);
+
namespace {
#if defined(_WIN32)
@@ -341,6 +344,52 @@ void initWireSpec() {
spec.isInternalClient = true;
}
+class ShardingReplicaSetChangeListener final : public ReplicaSetChangeNotifier::Listener {
+public:
+ ShardingReplicaSetChangeListener(ServiceContext* serviceContext)
+ : _serviceContext(serviceContext) {}
+ ~ShardingReplicaSetChangeListener() final = default;
+
+ void onFoundSet(const Key& key) final {}
+
+ void onConfirmedSet(const State& state) final {
+ auto connStr = state.connStr;
+
+ auto fun = [ serviceContext = _serviceContext, connStr ](auto args) {
+ if (ErrorCodes::isCancelationError(args.status.code())) {
+ return;
+ }
+ uassertStatusOK(args.status);
+
+ LOG(0) << "Updating sharding state with confirmed set " << connStr;
+
+ Grid::get(serviceContext)->shardRegistry()->updateReplSetHosts(connStr);
+
+ if (MONGO_FAIL_POINT(failReplicaSetChangeConfigServerUpdateHook)) {
+ return;
+ }
+ ShardRegistry::updateReplicaSetOnConfigServer(serviceContext, connStr);
+ };
+
+ auto executor = Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor();
+ auto schedStatus = executor->scheduleWork(std::move(fun)).getStatus();
+ if (ErrorCodes::isCancelationError(schedStatus.code())) {
+ LOG(2) << "Unable to schedule confirmed set update due to " << schedStatus;
+ return;
+ }
+ uassertStatusOK(schedStatus);
+ }
+
+ void onPossibleSet(const State& state) final {
+ Grid::get(_serviceContext)->shardRegistry()->updateReplSetHosts(state.connStr);
+ }
+
+ void onDroppedSet(const Key& key) final {}
+
+private:
+ ServiceContext* _serviceContext;
+};
+
ExitCode runMongosServer(ServiceContext* serviceContext) {
Client::initThread("mongosMain");
printShardingVersionInfo(false);
@@ -379,10 +428,11 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
shardConnectionPool.addHook(new ShardingConnectionHook(true, std::move(shardedHookList)));
- ReplicaSetMonitor::setAsynchronousConfigChangeHook(
- &ShardRegistry::replicaSetChangeConfigServerUpdateHook);
- ReplicaSetMonitor::setSynchronousConfigChangeHook(
- &ShardRegistry::replicaSetChangeShardRegistryUpdateHook);
+ // Hook up a Listener for changes from the ReplicaSetMonitor
+ // This will last for the scope of this function. i.e. until shutdown finishes
+ auto shardingRSCL =
+ ReplicaSetMonitor::getNotifier().makeListener<ShardingReplicaSetChangeListener>(
+ serviceContext);
// Mongos connection pools already takes care of authenticating new connections so the
// replica set connection shouldn't need to.