summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorBen Caimano <ben.caimano@10gen.com>2019-03-11 11:28:47 -0400
committerBen Caimano <ben.caimano@10gen.com>2019-04-16 16:24:22 -0400
commitd48f6caba6f97a578828d89ec2b95c41cbc5c67a (patch)
tree7ec768d3f9c4287fcd028f403a41bdd0b87c1cb3 /src/mongo/client
parent6fd5b4d17b87aa652381fd28cefe2a7eb5ec8d5d (diff)
downloadmongo-d48f6caba6f97a578828d89ec2b95c41cbc5c67a.tar.gz
SERVER-39818 Split RSM notification functionality into a new class
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/SConscript1
-rw-r--r--src/mongo/client/replica_set_change_notifier.cpp150
-rw-r--r--src/mongo/client/replica_set_change_notifier.h173
-rw-r--r--src/mongo/client/replica_set_monitor.cpp137
-rw-r--r--src/mongo/client/replica_set_monitor.h33
-rw-r--r--src/mongo/client/replica_set_monitor_internal.h22
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp4
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h6
-rw-r--r--src/mongo/client/replica_set_monitor_scan_test.cpp360
-rw-r--r--src/mongo/client/replica_set_monitor_test_fixture.h12
10 files changed, 785 insertions, 113 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index ffba4798db9..d1502b181bc 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -186,6 +186,7 @@ clientDriverEnv.Library(
'dbclient_rs.cpp',
'global_conn_pool.cpp',
env.Idlc('global_conn_pool.idl')[0],
+ 'replica_set_change_notifier.cpp',
'replica_set_monitor.cpp',
'replica_set_monitor_manager.cpp',
],
diff --git a/src/mongo/client/replica_set_change_notifier.cpp b/src/mongo/client/replica_set_change_notifier.cpp
new file mode 100644
index 00000000000..9e8526979a9
--- /dev/null
+++ b/src/mongo/client/replica_set_change_notifier.cpp
@@ -0,0 +1,150 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/replica_set_change_notifier.h"
+
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
+#include "mongo/util/stacktrace.h"
+
+namespace mongo {
+
+void ReplicaSetChangeNotifier::_addListener(Listener* listener) {
+ stdx::lock_guard lk(_mutex);
+
+ listener->init(this);
+ _listeners.push_back(listener);
+}
+
+void ReplicaSetChangeNotifier::_removeListener(Listener* listener) {
+ stdx::lock_guard lk(_mutex);
+
+ auto& listeners = _listeners;
+ listeners.erase(std::remove(listeners.begin(), listeners.end(), listener), listeners.end());
+}
+
+void ReplicaSetChangeNotifier::onFoundSet(const std::string& name) {
+ LOG(2) << "Signaling found set " << name;
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ _replicaSetStates.emplace(name, State{});
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onFoundSet(name);
+ };
+}
+
+void ReplicaSetChangeNotifier::onPossibleSet(ConnectionString connectionString) {
+ LOG(2) << "Signaling possible set " << connectionString;
+
+ const auto& name = connectionString.getSetName();
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto state = [&] {
+ auto& state = _replicaSetStates[name];
+ ++state.generation;
+
+ state.connStr = connectionString;
+ state.primary = {};
+
+ return state;
+ }();
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onPossibleSet(state);
+ };
+}
+
+void ReplicaSetChangeNotifier::onConfirmedSet(ConnectionString connectionString,
+ HostAndPort primary) {
+ LOG(2) << "Signaling confirmed set " << connectionString << " with primary " << primary;
+
+ const auto& name = connectionString.getSetName();
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ auto state = [&] {
+ auto& state = _replicaSetStates[name];
+ ++state.generation;
+
+ state.connStr = connectionString;
+ state.primary = primary;
+
+ return state;
+ }();
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onConfirmedSet(state);
+ };
+}
+
+void ReplicaSetChangeNotifier::onDroppedSet(const std::string& name) {
+ LOG(2) << "Signaling dropped set " << name;
+
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // If we never singaled the initial possible set, we should not on dropped set
+ auto it = _replicaSetStates.find(name);
+ if (it == _replicaSetStates.end()) {
+ return;
+ }
+
+ _replicaSetStates.erase(it);
+
+ auto listeners = _listeners;
+ lk.unlock();
+
+ for (auto listener : listeners) {
+ listener->onDroppedSet(name);
+ };
+}
+
+auto ReplicaSetChangeNotifier::Listener::getCurrentState(const Key& key) -> State {
+ invariant(_notifier);
+
+ stdx::lock_guard lk(_notifier->_mutex);
+
+ return _notifier->_replicaSetStates.at(key);
+}
+
+} // namespace mongo
diff --git a/src/mongo/client/replica_set_change_notifier.h b/src/mongo/client/replica_set_change_notifier.h
new file mode 100644
index 00000000000..23427c9fd0c
--- /dev/null
+++ b/src/mongo/client/replica_set_change_notifier.h
@@ -0,0 +1,173 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+#include <type_traits>
+#include <vector>
+
+#include "mongo/client/connection_string.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/functional.h"
+
+namespace mongo {
+
+/**
+ * A stateful notifier for events from a set of ReplicaSetMonitors
+ */
+class ReplicaSetChangeNotifier {
+public:
+ using Key = std::string;
+ class Listener;
+ using ListenerHandle = std::unique_ptr<Listener, unique_function<void(Listener*)>>;
+ struct State;
+
+public:
+ ReplicaSetChangeNotifier() = default;
+ ReplicaSetChangeNotifier(const ReplicaSetChangeNotifier&) = delete;
+ ReplicaSetChangeNotifier(ReplicaSetChangeNotifier&&) = delete;
+ ReplicaSetChangeNotifier& operator=(const ReplicaSetChangeNotifier&) = delete;
+ ReplicaSetChangeNotifier& operator=(ReplicaSetChangeNotifier&&) = delete;
+
+ /**
+ * Notify every listener that there is a new ReplicaSet and initialize the State
+ */
+ void onFoundSet(const std::string& replicaSet);
+
+ /**
+ * Notify every listener that a scan completed without finding a primary and update
+ */
+ void onPossibleSet(ConnectionString connectionString);
+
+ /**
+ * Notify every listener that a scan completed and found a new primary or config
+ */
+ void onConfirmedSet(ConnectionString connectionString, HostAndPort primary);
+
+ /**
+ * Notify every listener that a ReplicaSet is no longer in use and drop the State
+ */
+ void onDroppedSet(const std::string& replicaSet);
+
+ /**
+ * Create a listener of a given type and bind it to this notifier
+ */
+ template <typename DerivedT,
+ typename... Args,
+ typename = std::enable_if_t<std::is_constructible_v<DerivedT, Args...>>>
+ auto makeListener(Args&&... args) {
+ auto deleter = [this](auto listener) { _removeListener(listener); };
+ auto ptr = new DerivedT(std::forward<Args>(args)...);
+
+ _addListener(ptr);
+
+ return ListenerHandle(ptr, std::move(deleter));
+ }
+
+private:
+ void _addListener(Listener* listener);
+ void _removeListener(Listener* listener);
+
+ stdx::mutex _mutex;
+ std::vector<Listener*> _listeners;
+ stdx::unordered_map<Key, State> _replicaSetStates;
+};
+
+/**
+ * A listener for events from a set of ReplicaSetMonitors
+ *
+ * This class will normally be notified of events for every replica set in use in the system.
+ * The onSet functions are all called syncronously by the notifier,
+ * if your implementation would block or seriously delay execution,
+ * please schedule the majority of the work to complete asynchronously.
+ */
+class ReplicaSetChangeNotifier::Listener {
+public:
+ using Notifier = ReplicaSetChangeNotifier;
+ using Key = typename Notifier::Key;
+ using State = typename Notifier::State;
+
+public:
+ Listener(const Listener&) = delete;
+ Listener(Listener&&) = delete;
+ Listener& operator=(const Listener&) = delete;
+ Listener& operator=(Listener&&) = delete;
+
+ Listener() = default;
+ virtual ~Listener() = default;
+
+ /**
+ * Initialize this listener with a notifier
+ */
+ void init(Notifier* notifier) {
+ _notifier = notifier;
+ }
+
+ /**
+ * React to a new ReplicaSet that will soon be scanned
+ */
+ virtual void onFoundSet(const Key& key) = 0;
+
+ /**
+ * React to a finished scan that found no primary
+ */
+ virtual void onPossibleSet(const State& data) = 0;
+
+ /**
+ * React to a finished scan that found a primary
+ */
+ virtual void onConfirmedSet(const State& data) = 0;
+
+ /**
+ * React to a ReplicaSet being dropped from use
+ */
+ virtual void onDroppedSet(const Key& key) = 0;
+
+ /**
+ * Get the State as of the last signal function invoked on the Notifier
+ */
+ State getCurrentState(const Key& key);
+
+private:
+ Notifier* _notifier = nullptr;
+};
+
+using ReplicaSetChangeListenerHandle = ReplicaSetChangeNotifier::ListenerHandle;
+
+struct ReplicaSetChangeNotifier::State {
+ ConnectionString connStr;
+ HostAndPort primary;
+
+ int64_t generation = 0;
+};
+
+} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor.cpp b/src/mongo/client/replica_set_monitor.cpp
index 3e0501af9a8..a1185cb519e 100644
--- a/src/mongo/client/replica_set_monitor.cpp
+++ b/src/mongo/client/replica_set_monitor.cpp
@@ -47,7 +47,6 @@
#include "mongo/db/server_options.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/thread.h"
#include "mongo/util/background.h"
#include "mongo/util/debug_util.h"
#include "mongo/util/exit.h"
@@ -64,9 +63,6 @@ using std::set;
using std::string;
using std::vector;
-// Failpoint for disabling AsyncConfigChangeHook calls on updated RS nodes.
-MONGO_FAIL_POINT_DEFINE(failAsyncConfigChangeHook);
-
// Failpoint for changing the default refresh period
MONGO_FAIL_POINT_DEFINE(modifyReplicaSetMonitorDefaultRefreshPeriod);
@@ -95,10 +91,6 @@ const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOn
const Milliseconds kExpeditedRefreshPeriod(500);
AtomicWord<bool> areRefreshRetriesDisabledForTest{false}; // Only true in tests.
-// TODO: Move to ReplicaSetMonitorManager
-ReplicaSetMonitor::ConfigChangeHook asyncConfigChangeHook;
-ReplicaSetMonitor::ConfigChangeHook syncConfigChangeHook;
-
//
// Helpers for stl algorithms
//
@@ -190,7 +182,8 @@ Seconds ReplicaSetMonitor::getDefaultRefreshPeriod() {
ReplicaSetMonitor::ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {}
ReplicaSetMonitor::ReplicaSetMonitor(const MongoURI& uri)
- : ReplicaSetMonitor(std::make_shared<SetState>(uri, globalRSMonitorManager.getExecutor())) {}
+ : ReplicaSetMonitor(std::make_shared<SetState>(
+ uri, &globalRSMonitorManager.getNotifier(), globalRSMonitorManager.getExecutor())) {}
void ReplicaSetMonitor::init() {
if (areRefreshRetriesDisabledForTest.load()) {
@@ -198,25 +191,15 @@ void ReplicaSetMonitor::init() {
warning() << "*** Not starting background refresh because refresh retries are disabled.";
return;
}
+
+ _state->init();
+
stdx::lock_guard<stdx::mutex> lk(_state->mutex);
_scheduleRefresh(_state->now(), lk);
}
ReplicaSetMonitor::~ReplicaSetMonitor() {
- // need this lock because otherwise can get race with _scheduleRefresh()
- stdx::lock_guard<stdx::mutex> lk(_state->mutex);
- if (!_refresherHandle || !_state->executor) {
- return;
- }
-
- _state->currentScan.reset();
- _state->executor->cancel(_refresherHandle);
- // Note: calling _executor->wait(_refresherHandle); from the dispatcher thread will cause hang
- // Its ok not to call it because the d-tor is called only when the last owning pointer goes out
- // of scope, so as taskExecutor queue holds a weak pointer to RSM it will not be able to get a
- // task to execute eliminating the need to call method "wait".
- //
- _refresherHandle = {};
+ _state->drop();
}
void ReplicaSetMonitor::_scheduleRefresh(Date_t when, WithLock) {
@@ -367,7 +350,9 @@ std::string ReplicaSetMonitor::getName() const {
std::string ReplicaSetMonitor::getServerAddress() const {
stdx::lock_guard<stdx::mutex> lk(_state->mutex);
- return _state->getConfirmedServerAddress();
+ // We return our setUri until first confirmation
+ return _state->seedConnStr.isValid() ? _state->seedConnStr.toString()
+ : _state->setUri.connectionString().toString();
}
const MongoURI& ReplicaSetMonitor::getOriginalUri() const {
@@ -402,14 +387,8 @@ void ReplicaSetMonitor::remove(const string& name) {
globalConnPool.removeHost(name);
}
-void ReplicaSetMonitor::setAsynchronousConfigChangeHook(ConfigChangeHook hook) {
- invariant(!asyncConfigChangeHook);
- asyncConfigChangeHook = hook;
-}
-
-void ReplicaSetMonitor::setSynchronousConfigChangeHook(ConfigChangeHook hook) {
- invariant(!syncConfigChangeHook);
- syncConfigChangeHook = hook;
+ReplicaSetChangeNotifier& ReplicaSetMonitor::getNotifier() {
+ return globalRSMonitorManager.getNotifier();
}
// TODO move to correct order with non-statics before pushing
@@ -452,8 +431,6 @@ void ReplicaSetMonitor::shutdown() {
void ReplicaSetMonitor::cleanup() {
globalRSMonitorManager.removeAllMonitors();
- asyncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
- syncConfigChangeHook = ReplicaSetMonitor::ConfigChangeHook();
}
void ReplicaSetMonitor::disableRefreshRetries_forTest() {
@@ -605,20 +582,20 @@ Refresher::NextStep Refresher::getNextStep() {
// NOTE: we don't modify seedNodes or notify about set membership change in this
// case since it hasn't been confirmed by a master.
- const string oldAddr = _set->getUnconfirmedServerAddress();
for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
it != _scan->unconfirmedReplies.end();
++it) {
_set->findOrCreateNode(it->first)->update(it->second);
}
- const string newAddr = _set->getUnconfirmedServerAddress();
- if (oldAddr != newAddr && syncConfigChangeHook) {
- // Run the syncConfigChangeHook because the ShardRegistry needs to know about any
- // node we might talk to. Don't run the asyncConfigChangeHook because we don't
- // want to update the seed list stored on the config servers with unconfirmed hosts.
- syncConfigChangeHook(_set->name, _set->getUnconfirmedServerAddress());
+ auto connStr = _set->possibleConnectionString();
+ if (connStr != _set->workingConnStr) {
+ _set->workingConnStr = std::move(connStr);
+ _set->notifier->onPossibleSet(_set->workingConnStr);
}
+
+ // If at some point we care about lacking a primary, on it here
+ _set->lastSeenMaster = {};
}
if (_scan->foundAnyUpNodes) {
@@ -833,26 +810,23 @@ Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMa
}
}
- if (reply.normalHosts != _set->seedNodes) {
- const string oldAddr = _set->getConfirmedServerAddress();
+ bool changedHosts = reply.normalHosts != _set->seedNodes;
+ bool changedPrimary = reply.host != _set->lastSeenMaster;
+ if (changedHosts || changedPrimary) {
+ ++_set->seedGen;
_set->seedNodes = reply.normalHosts;
+ _set->seedConnStr = _set->confirmedConnectionString();
// LogLevel can be pretty low, since replica set reconfiguration should be pretty rare
// and we want to record our changes
- log() << "changing hosts to " << _set->getConfirmedServerAddress() << " from " << oldAddr;
-
- if (syncConfigChangeHook) {
- syncConfigChangeHook(_set->name, _set->getConfirmedServerAddress());
- }
+ log() << "Confirmed replica set for " << _set->name << " is " << _set->seedConnStr;
- if (asyncConfigChangeHook && !MONGO_FAIL_POINT(failAsyncConfigChangeHook)) {
- // call from a separate thread to avoid blocking and holding lock while potentially
- // going over the network
- stdx::thread bg(asyncConfigChangeHook, _set->name, _set->getConfirmedServerAddress());
- bg.detach();
- }
+ _set->notifier->onConfirmedSet(_set->seedConnStr, reply.host);
}
+ // Update our working string
+ _set->workingConnStr = _set->seedConnStr;
+
// Update other nodes's information based on replies we've already seen
for (UnconfirmedReplies::iterator it = _scan->unconfirmedReplies.begin();
it != _scan->unconfirmedReplies.end();
@@ -870,7 +844,6 @@ Status Refresher::receivedIsMasterFromMaster(const HostAndPort& from, const IsMa
void Refresher::receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply) {
invariant(!reply.isMaster);
- // This function doesn't alter _set at all. It only modifies the work queue in _scan.
// Add everyone this host claims is in the set to possibleNodes.
_scan->possibleNodes.insert(reply.normalHosts.begin(), reply.normalHosts.end());
@@ -1013,9 +986,12 @@ void Node::update(const IsMasterReply& reply) {
lastWriteDateUpdateTime = Date_t::now();
}
-SetState::SetState(const MongoURI& uri, executor::TaskExecutor* executor)
+SetState::SetState(const MongoURI& uri,
+ ReplicaSetChangeNotifier* notifier,
+ executor::TaskExecutor* executor)
: setUri(std::move(uri)),
name(setUri.getSetName()),
+ notifier(notifier),
executor(executor),
seedNodes(setUri.getServers().begin(), setUri.getServers().end()),
latencyThresholdMicros(serverGlobalParams.defaultLocalThresholdMillis * int64_t(1000)),
@@ -1223,33 +1199,21 @@ void SetState::updateNodeIfInNodes(const IsMasterReply& reply) {
node->update(reply);
}
-std::string SetState::getConfirmedServerAddress() const {
- StringBuilder ss;
- if (!name.empty())
- ss << name << "/";
-
- for (std::set<HostAndPort>::const_iterator it = seedNodes.begin(); it != seedNodes.end();
- ++it) {
- if (it != seedNodes.begin())
- ss << ",";
- it->append(ss);
- }
+ConnectionString SetState::confirmedConnectionString() const {
+ std::vector<HostAndPort> hosts(begin(seedNodes), end(seedNodes));
- return ss.str();
+ return ConnectionString::forReplicaSet(name, std::move(hosts));
}
-std::string SetState::getUnconfirmedServerAddress() const {
- StringBuilder ss;
- if (!name.empty())
- ss << name << "/";
+ConnectionString SetState::possibleConnectionString() const {
+ std::vector<HostAndPort> hosts;
+ hosts.reserve(nodes.size());
- for (std::vector<Node>::const_iterator it = nodes.begin(); it != nodes.end(); ++it) {
- if (it != nodes.begin())
- ss << ",";
- it->host.append(ss);
+ for (auto& node : nodes) {
+ hosts.push_back(node.host);
}
- return ss.str();
+ return ConnectionString::forReplicaSet(name, std::move(hosts));
}
void SetState::notify(bool finishedScan) {
@@ -1288,6 +1252,23 @@ Status SetState::makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criter
<< name);
}
+void SetState::init() {
+ notifier->onFoundSet(name);
+}
+
+void SetState::drop() {
+ {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ currentScan.reset();
+ notify(/*finishedScan*/ true);
+ }
+
+ // No point in notifying if we never started
+ if (workingConnStr.isValid()) {
+ notifier->onDroppedSet(name);
+ }
+}
+
void SetState::checkInvariants() const {
bool foundMaster = false;
for (size_t i = 0; i < nodes.size(); i++) {
@@ -1303,7 +1284,7 @@ void SetState::checkInvariants() const {
foundMaster = true;
// if we have a master it should be the same as lastSeenMaster
- invariant(nodes[i].host == lastSeenMaster);
+ invariant(lastSeenMaster.empty() || nodes[i].host == lastSeenMaster);
}
// should never end up with negative latencies
diff --git a/src/mongo/client/replica_set_monitor.h b/src/mongo/client/replica_set_monitor.h
index 6f0a9d6ddb2..a306b1b9547 100644
--- a/src/mongo/client/replica_set_monitor.h
+++ b/src/mongo/client/replica_set_monitor.h
@@ -37,6 +37,7 @@
#include "mongo/base/string_data.h"
#include "mongo/client/mongo_uri.h"
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/functional.h"
@@ -62,16 +63,9 @@ class ReplicaSetMonitor : public std::enable_shared_from_this<ReplicaSetMonitor>
public:
class Refresher;
- typedef stdx::function<void(const std::string& setName, const std::string& newConnectionString)>
- ConfigChangeHook;
-
/**
- * Initializes local state.
- *
- * seeds must not be empty.
+ * Initializes local state from a MongoURI.
*/
- ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds);
-
ReplicaSetMonitor(const MongoURI& uri);
/**
@@ -198,27 +192,9 @@ public:
static void remove(const std::string& name);
/**
- * Sets the hook to be called whenever the config of any replica set changes.
- * Currently only 1 globally, so this asserts if one already exists.
- *
- * The hook will be called from a fresh thread. It is responsible for initializing any
- * thread-local state and ensuring that no exceptions escape.
- *
- * The hook must not be changed while the program has multiple threads.
- */
- static void setAsynchronousConfigChangeHook(ConfigChangeHook hook);
-
- /**
- * Sets the hook to be called whenever the config of any replica set changes.
- * Currently only 1 globally, so this asserts if one already exists.
- *
- * The hook will be called inline while refreshing the ReplicaSetMonitor's view of the set
- * membership. It is important that the hook not block for long as it will be running under
- * the ReplicaSetMonitor's mutex.
- *
- * The hook must not be changed while the program has multiple threads.
+ * Returns the change notifier for the underlying ReplicaMonitorManager
*/
- static void setSynchronousConfigChangeHook(ConfigChangeHook hook);
+ static ReplicaSetChangeNotifier& getNotifier();
/**
* Permanently stops all monitoring on replica sets and clears all cached information
@@ -384,7 +360,6 @@ private:
/**
* Adjusts the _scan work queue based on information from this host.
* This should only be called with replies from non-masters.
- * Does not update _set at all.
*/
void receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply);
diff --git a/src/mongo/client/replica_set_monitor_internal.h b/src/mongo/client/replica_set_monitor_internal.h
index c7f72ae3671..04479b6dab9 100644
--- a/src/mongo/client/replica_set_monitor_internal.h
+++ b/src/mongo/client/replica_set_monitor_internal.h
@@ -145,7 +145,7 @@ public:
Promise<HostAndPort> promise;
};
- SetState(const MongoURI& uri, executor::TaskExecutor*);
+ SetState(const MongoURI& uri, ReplicaSetChangeNotifier*, executor::TaskExecutor*);
bool isUsable() const;
@@ -173,13 +173,13 @@ public:
* Returns the connection string of the nodes that are known the be in the set because we've
* seen them in the isMaster reply of a PRIMARY.
*/
- std::string getConfirmedServerAddress() const;
+ ConnectionString confirmedConnectionString() const;
/**
* Returns the connection string of the nodes that are believed to be in the set because we've
* seen them in the isMaster reply of non-PRIMARY nodes in our seed list.
*/
- std::string getUnconfirmedServerAddress() const;
+ ConnectionString possibleConnectionString() const;
/**
* Call this to notify waiters after a scan processes a valid reply or finishes.
@@ -193,6 +193,16 @@ public:
Status makeUnsatisfedReadPrefError(const ReadPreferenceSetting& criteria) const;
/**
+ * Notifies all listeners that the ReplicaSet is in use.
+ */
+ void init();
+
+ /**
+ * Resets the current scan and notifies all listeners that the ReplicaSet isn't in use.
+ */
+ void drop();
+
+ /**
* Before unlocking, do DEV checkInvariants();
*/
void checkInvariants() const;
@@ -200,17 +210,21 @@ public:
const MongoURI setUri; // URI passed to ctor -- THIS IS NOT UPDATED BY SCANS
const std::string name;
+ ReplicaSetChangeNotifier* const notifier;
executor::TaskExecutor* const executor;
stdx::mutex mutex; // You must hold this to access any member below.
// For starting scans
std::set<HostAndPort> seedNodes; // updated whenever a master reports set membership changes
+ ConnectionString seedConnStr; // The connection string from the last time we had valid seeds
+ int64_t seedGen = 0;
bool isMocked = false; // True if this set is using nodes from MockReplicaSet
// For tracking scans
- HostAndPort lastSeenMaster; // Empty if we have never seen a master
+ // lastSeenMaster is empty if we have never seen a master or the last scan didn't have one
+ HostAndPort lastSeenMaster;
int consecutiveFailedScans = 0;
Nodes nodes; // maintained sorted and unique by host
ConnectionString workingConnStr; // The connection string from our last scan
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index c2b05cc61ff..f8f6078b23a 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -224,4 +224,8 @@ TaskExecutor* ReplicaSetMonitorManager::getExecutor() {
return _taskExecutor.get();
}
+ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
+ return _notifier;
+}
+
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index ed92bc3b28c..1adf42fb16d 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -32,6 +32,7 @@
#include <string>
#include <vector>
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/executor/task_executor.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/string_map.h"
@@ -94,6 +95,8 @@ public:
*/
executor::TaskExecutor* getExecutor();
+ ReplicaSetChangeNotifier& getNotifier();
+
private:
using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>;
@@ -103,6 +106,9 @@ private:
// Executor for monitoring replica sets.
std::unique_ptr<executor::TaskExecutor> _taskExecutor;
+ // Widget to notify listeners when a RSM notices a change
+ ReplicaSetChangeNotifier _notifier;
+
// Needs to be after `_taskExecutor`, so that it will be destroyed before the `_taskExecutor`.
ReplicaSetMonitorsMap _monitors;
diff --git a/src/mongo/client/replica_set_monitor_scan_test.cpp b/src/mongo/client/replica_set_monitor_scan_test.cpp
index 4b4f3f4b12f..2f2cebf556a 100644
--- a/src/mongo/client/replica_set_monitor_scan_test.cpp
+++ b/src/mongo/client/replica_set_monitor_scan_test.cpp
@@ -1613,5 +1613,365 @@ TEST_F(MinOpTimeTest, MinOpTimeIgnored) {
ASSERT_EQUALS(notStale.host(), "c");
}
+// -- ReplicaSetChangeNotifier/Listener tests --
+
+class Listener : public ReplicaSetChangeNotifier::Listener {
+public:
+ void logEvent(StringData name, const Key& key) {
+ log() << name << ": " << key;
+ }
+ void logEvent(StringData name, const State& state) {
+ log() << name << ": "
+ << "(" << state.generation << ") " << state.connStr << " | " << state.primary;
+ }
+
+ void onFoundSet(const Key& key) override {
+ lastFoundSetId = ++eventId;
+ logEvent("FoundSet", key);
+ }
+ void onPossibleSet(const State& state) override {
+ lastPossibleSetId = ++eventId;
+ logEvent("PossibleSet", state);
+ lastState = state;
+ }
+ void onConfirmedSet(const State& state) override {
+ lastConfirmedSetId = ++eventId;
+ logEvent("ConfirmedSet", state);
+ lastState = state;
+ }
+ void onDroppedSet(const Key& key) override {
+ lastDroppedSetId = ++eventId;
+ logEvent("DroppedSet", key);
+ }
+
+ int64_t eventId = -1;
+ int64_t lastFoundSetId = -1;
+ int64_t lastPossibleSetId = -1;
+ int64_t lastConfirmedSetId = -1;
+ int64_t lastDroppedSetId = -1;
+ State lastState;
+};
+
+class ChangeNotifierTest : public ReplicaSetMonitorTest {
+public:
+ ChangeNotifierTest() = default;
+ virtual ~ChangeNotifierTest() = default;
+
+ enum class NodeState {
+ kUnknown = 0,
+ kPrimary,
+ kSecondary,
+ kStandalone,
+ };
+
+ auto& listener() const {
+ return *static_cast<Listener*>(_listener.get());
+ }
+
+ void updateSet(std::map<HostAndPort, NodeState> replicaSet) {
+ auto refresher = Refresher(_state);
+ std::set<HostAndPort> seen;
+ HostAndPort primary;
+ std::set<HostAndPort> members;
+
+ BSONArrayBuilder arrayBuilder;
+ for (const auto & [ host, nodeState ] : replicaSet) {
+ if (nodeState == NodeState::kStandalone) {
+ continue;
+ }
+
+ if (nodeState == NodeState::kPrimary) {
+ primary = host;
+ }
+
+ members.insert(host);
+
+ arrayBuilder.append(StringData(host.host()));
+ }
+ auto bsonHosts = arrayBuilder.arr();
+
+ auto markIsMaster = [&](auto host, bool isMaster) {
+ refresher.receivedIsMaster(
+ host,
+ -1,
+ BSON("setName" << kSetName << "ismaster" << isMaster << "secondary" << !isMaster
+ << "hosts"
+ << bsonHosts
+ << "ok"
+ << true));
+
+ };
+
+ auto markFailed = [&](auto host) {
+ refresher.failedHost(host, {ErrorCodes::InternalError, "Test error"});
+ };
+
+ auto gen = listener().lastState.generation;
+
+ NextStep ns = refresher.getNextStep();
+ for (; ns.step != NextStep::DONE; ns = refresher.getNextStep()) {
+ ASSERT_EQUALS(ns.step, NextStep::CONTACT_HOST);
+ ASSERT(replicaSet.count(ns.host));
+ ASSERT(!seen.count(ns.host));
+ seen.insert(ns.host);
+
+ // mock a reply
+ switch (replicaSet[ns.host]) {
+ case NodeState::kStandalone: {
+ markFailed(ns.host);
+ } break;
+ case NodeState::kPrimary: {
+ markIsMaster(ns.host, true);
+ } break;
+ case NodeState::kSecondary: {
+ markIsMaster(ns.host, false);
+ } break;
+ case NodeState::kUnknown:
+ MONGO_UNREACHABLE;
+ };
+ }
+
+ // Verify that the listener received the right data
+ if (gen != listener().lastState.generation) {
+ // Our State is what the notifier thinks it should be
+ ASSERT_EQUALS(listener().lastState.connStr, listener().getCurrentState().connStr);
+ ASSERT_EQUALS(listener().lastState.primary, listener().getCurrentState().primary);
+ ASSERT_EQUALS(listener().lastState.generation, listener().getCurrentState().generation);
+
+ // Our State is what we'd expect
+ ASSERT_EQUALS(listener().lastState.connStr.getSetName(), kSetName);
+ ASSERT_EQUALS(listener().lastState.connStr.getServers().size(), members.size());
+ ASSERT_EQUALS(listener().lastState.primary, primary);
+ }
+
+ ASSERT_EQUALS(ns.step, NextStep::DONE);
+ ASSERT(ns.host.empty());
+ }
+
+protected:
+ decltype(_notifier)::ListenerHandle _listener = _notifier.makeListener<Listener>();
+
+ std::shared_ptr<SetState> _state = makeState(basicUri);
+};
+
+TEST_F(ChangeNotifierTest, NotifyNominal) {
+ auto currentId = -1;
+
+ // State exists. Signal: null
+ ASSERT_EQ(listener().lastFoundSetId, currentId);
+
+ // Initializing the state. Signal: FoundSet
+ _state->init();
+ ASSERT_EQ(listener().lastFoundSetId, ++currentId);
+
+ // 'a' claims to be primary. Signal: Confirmed
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Getting another scan with the same details. Signal: null
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, currentId);
+
+ // Dropped. Signal: Dropped
+ _state->drop();
+ ASSERT_EQ(listener().lastDroppedSetId, ++currentId);
+}
+
+TEST_F(ChangeNotifierTest, NotifyElections) {
+ auto currentId = -1;
+
+ // State exists. Signal: null
+ ASSERT_EQ(listener().lastFoundSetId, currentId);
+
+ // Initializing the state. Signal: FoundSet
+ _state->init();
+ ASSERT_EQ(listener().lastFoundSetId, ++currentId);
+
+ // 'a' claims to be primary. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // 'b' claims to be primary. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // All hosts tell us that they are not primary. Signal: null
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, currentId);
+
+ // 'a' claims to be primary again. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Dropped. Signal: Dropped
+ _state->drop();
+ ASSERT_EQ(listener().lastDroppedSetId, ++currentId);
+}
+
+TEST_F(ChangeNotifierTest, NotifyReconfig) {
+ auto currentId = -1;
+
+ // State exists. Signal: null
+ ASSERT_EQ(listener().lastFoundSetId, currentId);
+
+ // Initializing the state. Signal: FoundSet
+ _state->init();
+ ASSERT_EQ(listener().lastFoundSetId, ++currentId);
+
+ // Update the set with a full scan showing no primary. Signal: PossibleSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, ++currentId);
+
+ // Mark 'a' as removed. Signal: null
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kStandalone,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().eventId, currentId);
+
+ // Discover 'd' as secondary. Signal: PossibleSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("d"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastPossibleSetId, ++currentId);
+
+ // Mark 'b' as primary, no 'd'. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("d"), NodeState::kStandalone,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Mark 'a' as removed. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("a"), NodeState::kStandalone,
+ },
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Mark 'a' as secondary again. Signal: ConfirmedSet
+ updateSet({
+ {
+ HostAndPort("b"), NodeState::kPrimary,
+ },
+ {
+ HostAndPort("c"), NodeState::kSecondary,
+ },
+ {
+ HostAndPort("a"), NodeState::kSecondary,
+ },
+ });
+ ASSERT_EQ(listener().lastConfirmedSetId, ++currentId);
+
+ // Dropped. Signal: Dropped
+ _state->drop();
+ ASSERT_EQ(listener().lastDroppedSetId, ++currentId);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_test_fixture.h b/src/mongo/client/replica_set_monitor_test_fixture.h
index cd37e5ecdab..7e842008453 100644
--- a/src/mongo/client/replica_set_monitor_test_fixture.h
+++ b/src/mongo/client/replica_set_monitor_test_fixture.h
@@ -33,6 +33,7 @@
#include <type_traits>
#include <vector>
+#include "mongo/client/replica_set_change_notifier.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/client/replica_set_monitor_internal.h"
#include "mongo/unittest/unittest.h"
@@ -61,11 +62,15 @@ public:
template <typename... Args>
using StateIsConstructible =
- std::enable_if_t<std::is_constructible_v<SetState, Args..., executor::TaskExecutor* const>>;
+ std::enable_if_t<std::is_constructible_v<SetState,
+ Args...,
+ ReplicaSetChangeNotifier* const,
+ executor::TaskExecutor* const>>;
template <typename... Args, typename = StateIsConstructible<Args...>>
auto makeState(Args&&... args) {
- return std::make_shared<SetState>(std::forward<Args>(args)..., nullptr);
+ return std::make_shared<ReplicaSetMonitor::SetState>(
+ std::forward<Args>(args)..., &_notifier, nullptr);
}
void setUp() override {}
@@ -74,6 +79,9 @@ public:
static const std::vector<HostAndPort> basicSeeds;
static const std::set<HostAndPort> basicSeedsSet;
static const MongoURI basicUri;
+
+protected:
+ ReplicaSetChangeNotifier _notifier;
};
} // namespace mongo