summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/SConscript2
-rw-r--r--src/mongo/client/replica_set_monitor_interface.h12
-rw-r--r--src/mongo/client/replica_set_monitor_manager.cpp42
-rw-r--r--src/mongo/client/replica_set_monitor_manager.h36
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.cpp12
-rw-r--r--src/mongo/client/scanning_replica_set_monitor.h11
-rw-r--r--src/mongo/client/sdam/sdam_datatypes.cpp20
-rw-r--r--src/mongo/client/sdam/sdam_datatypes.h1
-rw-r--r--src/mongo/client/sdam/topology_listener.cpp18
-rw-r--r--src/mongo/client/sdam/topology_listener.h13
-rw-r--r--src/mongo/client/sdam/topology_manager.cpp2
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp105
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h36
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_error_handler.cpp163
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_error_handler.h94
-rw-r--r--src/mongo/client/streamable_replica_set_monitor_error_handler_test.cpp210
16 files changed, 732 insertions, 45 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 51c3df426c9..cb79546c3cd 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -199,6 +199,7 @@ clientDriverEnv.Library(
'scanning_replica_set_monitor.cpp',
'streamable_replica_set_monitor.cpp',
'streamable_replica_set_monitor_query_processor.cpp',
+ 'streamable_replica_set_monitor_error_handler.cpp',
'server_is_master_monitor.cpp',
'server_ping_monitor.cpp',
],
@@ -333,6 +334,7 @@ env.CppUnitTest(
'scanning_replica_set_monitor_scan_test.cpp',
'scanning_replica_set_monitor_test_concurrent.cpp',
'scanning_replica_set_monitor_test_fixture.cpp',
+ 'streamable_replica_set_monitor_error_handler_test.cpp',
'server_ping_monitor_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/client/replica_set_monitor_interface.h b/src/mongo/client/replica_set_monitor_interface.h
index 3826154dfe2..ad40412dbc4 100644
--- a/src/mongo/client/replica_set_monitor_interface.h
+++ b/src/mongo/client/replica_set_monitor_interface.h
@@ -97,11 +97,17 @@ public:
* Notifies this Monitor that a host has failed because of the specified error 'status' and
* should be considered down.
*
- * Call this when you get a connection error. If you get an error while trying to refresh our
- * view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's
- * mutex.
+ * The sdam version of the Monitor makes a distinction between failures happening before or
+ * after the initial handshake for the connection. The failedHost method is kept for backwards
+ * compatibility, and is equivalent to failedHostPostHandshake.
*/
virtual void failedHost(const HostAndPort& host, const Status& status) = 0;
+ virtual void failedHostPreHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) = 0;
+ virtual void failedHostPostHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) = 0;
/**
* Returns true if this node is the master based ONLY on local data. Be careful, return may
diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp
index 8986fd143ac..384c67c6c87 100644
--- a/src/mongo/client/replica_set_monitor_manager.cpp
+++ b/src/mongo/client/replica_set_monitor_manager.cpp
@@ -87,8 +87,23 @@ Status ReplicaSetMonitorManagerNetworkConnectionHook::validateHost(
auto publisher = streamableMonitor->getEventsPublisher();
if (publisher) {
- publisher->onServerHandshakeCompleteEvent(
- isMasterReply.elapsedMillis.get(), remoteHost.toString(), isMasterReply.data);
+ try {
+ if (isMasterReply.status.isOK()) {
+ publisher->onServerHandshakeCompleteEvent(isMasterReply.elapsedMillis.get(),
+ remoteHost.toString(),
+ isMasterReply.data);
+ } else {
+ publisher->onServerHandshakeFailedEvent(
+ remoteHost.toString(), isMasterReply.status, isMasterReply.data);
+ }
+ } catch (const DBException& exception) {
+ LOGV2_ERROR(4712101,
+ "An error occurred publishing a ReplicaSetMonitor handshake event",
+ "error"_attr = exception.toStatus(),
+ "setName"_attr = monitor->getName(),
+ "handshakeStatus"_attr = isMasterReply.status);
+ return exception.toStatus();
+ }
}
}
}
@@ -133,10 +148,14 @@ void ReplicaSetMonitorManager::_setupTaskExecutorInLock() {
// construct task executor
auto hookList = std::make_unique<rpc::EgressMetadataHookList>();
auto networkConnectionHook = std::make_unique<ReplicaSetMonitorManagerNetworkConnectionHook>();
- auto net = executor::makeNetworkInterface(
+
+ std::shared_ptr<NetworkInterface> networkInterface = executor::makeNetworkInterface(
"ReplicaSetMonitor-TaskExecutor", std::move(networkConnectionHook), std::move(hookList));
- auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get());
- _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _connectionManager = std::make_unique<ReplicaSetMonitorConnectionManager>(networkInterface);
+
+ auto pool = std::make_unique<NetworkInterfaceThreadPool>(networkInterface.get());
+
+ _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), networkInterface);
_taskExecutor->startup();
}
@@ -173,7 +192,7 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const
newMonitor->init();
} else {
LOGV2(4333205, "Starting Streamable ReplicaSetMonitor", "uri"_attr = uri.toString());
- newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor());
+ newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor(), _getConnectionManager());
}
_monitors[setName] = newMonitor;
return newMonitor;
@@ -220,6 +239,7 @@ void ReplicaSetMonitorManager::removeMonitor(StringData setName) {
void ReplicaSetMonitorManager::shutdown() {
decltype(_monitors) monitors;
decltype(_taskExecutor) taskExecutor;
+ decltype(_connectionManager) connectionManager;
{
stdx::lock_guard<Latch> lk(_mutex);
if (std::exchange(_isShutdown, true)) {
@@ -227,6 +247,7 @@ void ReplicaSetMonitorManager::shutdown() {
}
monitors = std::exchange(_monitors, {});
+ connectionManager = std::exchange(_connectionManager, {});
taskExecutor = std::exchange(_taskExecutor, {});
}
@@ -278,6 +299,11 @@ std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor()
return _taskExecutor;
}
+std::shared_ptr<executor::EgressTagCloser> ReplicaSetMonitorManager::_getConnectionManager() {
+ invariant(_connectionManager);
+ return _connectionManager;
+}
+
ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() {
return _notifier;
}
@@ -286,4 +312,8 @@ bool ReplicaSetMonitorManager::isShutdown() const {
stdx::lock_guard<Latch> lk(_mutex);
return _isShutdown;
}
+
+void ReplicaSetMonitorConnectionManager::dropConnections(const HostAndPort& hostAndPort) {
+ _network->dropConnections(hostAndPort);
+}
} // namespace mongo
diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h
index de628867b6e..07b5d4e3ba4 100644
--- a/src/mongo/client/replica_set_monitor_manager.h
+++ b/src/mongo/client/replica_set_monitor_manager.h
@@ -33,7 +33,9 @@
#include <vector>
#include "mongo/client/replica_set_change_notifier.h"
+#include "mongo/executor/egress_tag_closer.h"
#include "mongo/executor/network_connection_hook.h"
+#include "mongo/executor/network_interface.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/util/hierarchical_acquisition.h"
@@ -62,6 +64,30 @@ public:
executor::RemoteCommandResponse&& response) override;
};
+class ReplicaSetMonitorConnectionManager : public executor::EgressTagCloser {
+ ReplicaSetMonitorConnectionManager() = delete;
+
+public:
+ ReplicaSetMonitorConnectionManager(std::shared_ptr<executor::NetworkInterface> network)
+ : _network(network) {}
+
+ void dropConnections(const HostAndPort& hostAndPort) override;
+
+ // Not supported.
+ void dropConnections(transport::Session::TagMask tags) override {
+ MONGO_UNREACHABLE;
+ };
+ // Not supported.
+ void mutateTags(const HostAndPort& hostAndPort,
+ const std::function<transport::Session::TagMask(transport::Session::TagMask)>&
+ mutateFunc) override {
+ MONGO_UNREACHABLE;
+ };
+
+private:
+ std::shared_ptr<executor::NetworkInterface> _network;
+};
+
/**
* Manages the lifetime of a set of replica set monitors.
*/
@@ -122,7 +148,13 @@ public:
bool isShutdown() const;
+
private:
+ /**
+ * Returns an EgressTagCloser controlling the executor's network interface.
+ */
+ std::shared_ptr<executor::EgressTagCloser> _getConnectionManager();
+
using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>;
// Protects access to the replica set monitors
@@ -132,6 +164,10 @@ private:
// Executor for monitoring replica sets.
std::shared_ptr<executor::TaskExecutor> _taskExecutor;
+ // Allows closing connections established by the network interface associated with the
+ // _taskExecutor instance
+ std::shared_ptr<ReplicaSetMonitorConnectionManager> _connectionManager;
+
// Widget to notify listeners when a RSM notices a change
ReplicaSetChangeNotifier _notifier;
diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp
index 5e400d6b009..66c1fde5b27 100644
--- a/src/mongo/client/scanning_replica_set_monitor.cpp
+++ b/src/mongo/client/scanning_replica_set_monitor.cpp
@@ -356,6 +356,18 @@ void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status
_state->checkInvariants();
}
+void ScanningReplicaSetMonitor::failedHostPreHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) {
+ failedHost(host, status);
+}
+
+void ScanningReplicaSetMonitor::failedHostPostHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) {
+ failedHost(host, status);
+}
+
bool ScanningReplicaSetMonitor::isPrimary(const HostAndPort& host) const {
stdx::lock_guard<Latch> lk(_state->mutex);
Node* node = _state->findNode(host);
diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h
index efd5f10be14..25208682297 100644
--- a/src/mongo/client/scanning_replica_set_monitor.h
+++ b/src/mongo/client/scanning_replica_set_monitor.h
@@ -72,8 +72,19 @@ public:
HostAndPort getMasterOrUassert() override;
+ /*
+ * For the ScanningReplicaSetMonitor, all the failedHost methods are equivalent.
+ */
void failedHost(const HostAndPort& host, const Status& status) override;
+ void failedHostPreHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) override;
+
+ void failedHostPostHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) override;
+
bool isPrimary(const HostAndPort& host) const override;
bool isHostUp(const HostAndPort& host) const override;
diff --git a/src/mongo/client/sdam/sdam_datatypes.cpp b/src/mongo/client/sdam/sdam_datatypes.cpp
index a6be94e3014..2faf7acd361 100644
--- a/src/mongo/client/sdam/sdam_datatypes.cpp
+++ b/src/mongo/client/sdam/sdam_datatypes.cpp
@@ -145,4 +145,24 @@ const boost::optional<TopologyVersion>& IsMasterOutcome::getTopologyVersion() co
const std::string& IsMasterOutcome::getErrorMsg() const {
return _errorMsg;
}
+
+BSONObj IsMasterOutcome::toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("host", _server);
+ builder.append("success", _success);
+
+ if (_errorMsg != "")
+ builder.append("errorMessage", _errorMsg);
+
+ if (_topologyVersion)
+ builder.append("topologyVersion", _topologyVersion->toBSON());
+
+ if (_rtt)
+ builder.append("duration", _rtt->toBSON());
+
+ if (_response)
+ builder.append("response", *_response);
+
+ return builder.obj();
+}
}; // namespace mongo::sdam
diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h
index 0da61e14d09..3ec5363965c 100644
--- a/src/mongo/client/sdam/sdam_datatypes.h
+++ b/src/mongo/client/sdam/sdam_datatypes.h
@@ -107,6 +107,7 @@ public:
const boost::optional<IsMasterRTT>& getRtt() const;
const boost::optional<TopologyVersion>& getTopologyVersion() const;
const std::string& getErrorMsg() const;
+ BSONObj toBSON() const;
private:
ServerAddress _server;
diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp
index 34d8ef63563..48e1f2b147b 100644
--- a/src/mongo/client/sdam/topology_listener.cpp
+++ b/src/mongo/client/sdam/topology_listener.cpp
@@ -77,6 +77,21 @@ void TopologyEventsPublisher::onServerHandshakeCompleteEvent(IsMasterRTT duratio
_scheduleNextDelivery();
}
+void TopologyEventsPublisher::onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
+ const Status& status,
+ const BSONObj reply) {
+ {
+ stdx::lock_guard<Mutex> lock(_eventQueueMutex);
+ EventPtr event = std::make_unique<Event>();
+ event->type = EventType::HANDSHAKE_FAILURE;
+ event->hostAndPort = address;
+ event->reply = reply;
+ event->status = status;
+ _eventQueue.push_back(std::move(event));
+ }
+ _scheduleNextDelivery();
+}
+
void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
const ServerAddress& hostAndPort,
const BSONObj reply) {
@@ -201,6 +216,9 @@ void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Eve
case EventType::PING_FAILURE:
listener->onServerPingFailedEvent(event.hostAndPort, event.status);
break;
+ case EventType::HANDSHAKE_FAILURE:
+ listener->onServerHandshakeFailedEvent(event.hostAndPort, event.status, event.reply);
+ break;
default:
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h
index 4adbb2329d2..5cc737706d0 100644
--- a/src/mongo/client/sdam/topology_listener.h
+++ b/src/mongo/client/sdam/topology_listener.h
@@ -64,6 +64,10 @@ public:
const sdam::ServerAddress& address,
const BSONObj reply = BSONObj()){};
+ virtual void onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
+ const Status& status,
+ const BSONObj reply){};
+
/**
* Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at
* hostAndPort succeeded. durationMS is the execution time of the event, including the time it
@@ -108,6 +112,11 @@ public:
virtual void onServerHandshakeCompleteEvent(IsMasterRTT durationMs,
const sdam::ServerAddress& address,
const BSONObj reply = BSONObj()) override;
+
+ void onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
+ const Status& status,
+ const BSONObj reply) override;
+
void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
const ServerAddress& hostAndPort,
const BSONObj reply) override;
@@ -126,8 +135,10 @@ private:
PING_SUCCESS,
PING_FAILURE,
TOPOLOGY_DESCRIPTION_CHANGED,
- HANDSHAKE_COMPLETE
+ HANDSHAKE_COMPLETE,
+ HANDSHAKE_FAILURE
};
+
struct Event {
EventType type;
ServerAddress hostAndPort;
diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp
index 75242baa796..74f8de94d27 100644
--- a/src/mongo/client/sdam/topology_manager.cpp
+++ b/src/mongo/client/sdam/topology_manager.cpp
@@ -148,7 +148,7 @@ void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT
// otherwise, the server was removed from the topology. Nothing to do.
LOGV2(4333201,
"Not updating RTT. Server {server} does not exist in {setName}",
- "server"_attr = hostAndPort,
+ "host"_attr = hostAndPort,
"setName"_attr = getTopologyDescription()->getSetName());
}
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 042828f76e7..6ae640dd9af 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -63,9 +63,11 @@ using std::vector;
namespace {
// Pull nested types to top-level scope
+using executor::EgressTagCloser;
using executor::TaskExecutor;
using CallbackArgs = TaskExecutor::CallbackArgs;
using CallbackHandle = TaskExecutor::CallbackHandle;
+using HandshakeStage = StreamableReplicaSetMonitorErrorHandler::HandshakeStage;
const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet());
@@ -140,11 +142,15 @@ constexpr auto kZeroMs = Milliseconds(0);
* functionality. Once they are shutdown in the drop() method the operations exposed via their api
* are effectively no-ops.
*/
-StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri,
- std::shared_ptr<TaskExecutor> executor)
+StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(
+ const MongoURI& uri,
+ std::shared_ptr<TaskExecutor> executor,
+ std::shared_ptr<executor::EgressTagCloser> connectionManager)
: _serverSelector(std::make_unique<SdamServerSelector>(kServerSelectionConfig)),
+ _errorHandler(std::make_unique<SdamErrorHandler>(uri.getSetName())),
_queryProcessor(std::make_shared<StreamableReplicaSetMonitorQueryProcessor>()),
_uri(uri),
+ _connectionManager(connectionManager),
_executor(executor),
_random(PseudoRandom(SecureRandom().nextInt64())) {
@@ -157,9 +163,11 @@ StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri,
_sdamConfig = SdamConfiguration(seeds);
}
-ReplicaSetMonitorPtr StreamableReplicaSetMonitor::make(const MongoURI& uri,
- std::shared_ptr<TaskExecutor> executor) {
- auto result = std::make_shared<StreamableReplicaSetMonitor>(uri, executor);
+ReplicaSetMonitorPtr StreamableReplicaSetMonitor::make(
+ const MongoURI& uri,
+ std::shared_ptr<TaskExecutor> executor,
+ std::shared_ptr<executor::EgressTagCloser> connectionManager) {
+ auto result = std::make_shared<StreamableReplicaSetMonitor>(uri, executor, connectionManager);
result->init();
return result;
}
@@ -222,7 +230,7 @@ std::vector<HostAndPort> StreamableReplicaSetMonitor::_extractHosts(
const std::vector<ServerDescriptionPtr>& serverDescriptions) {
std::vector<HostAndPort> result;
for (const auto& server : serverDescriptions) {
- result.push_back(HostAndPort(server->getAddress()));
+ result.emplace_back(server->getAddress());
}
return result;
}
@@ -301,7 +309,7 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutsta
LOGV2_INFO(4333208,
"RSM {setName} host selection timeout: {status}",
"setName"_attr = getName(),
- "status"_attr = errorStatus.toString());
+ "error"_attr = errorStatus.toString());
};
auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb);
@@ -309,7 +317,7 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutsta
LOGV2_INFO(4333207,
"RSM {setName} error scheduling deadline handler: {status}",
"setName"_attr = getName(),
- "status"_attr = swDeadlineHandle.getStatus());
+ "error"_attr = swDeadlineHandle.getStatus());
return SemiFuture<HostAndPortList>::makeReady(swDeadlineHandle.getStatus());
}
query->deadlineHandle = swDeadlineHandle.getValue();
@@ -343,16 +351,53 @@ sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher
return _eventsPublisher;
}
-
void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) {
- failedHost(host, BSONObj(), status);
+ failedHostPostHandshake(host, status, BSONObj());
}
-void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host,
- BSONObj bson,
- const Status& status) {
- IsMasterOutcome outcome(host.toString(), bson, status.toString());
- _topologyManager->onServerDescription(outcome);
+void StreamableReplicaSetMonitor::failedHostPreHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) {
+ _failedHost(host, status, bson, HandshakeStage::kPreHandshake, true);
+}
+
+void StreamableReplicaSetMonitor::failedHostPostHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) {
+ _failedHost(host, status, bson, HandshakeStage::kPostHandshake, true);
+}
+
+void StreamableReplicaSetMonitor::_failedHost(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson,
+ HandshakeStage stage,
+ bool isApplicationOperation) {
+ if (_isDropped.load())
+ return;
+
+ _doErrorActions(
+ host,
+ _errorHandler->computeErrorActions(host, status, stage, isApplicationOperation, bson));
+}
+
+void StreamableReplicaSetMonitor::_doErrorActions(
+ const HostAndPort& host,
+ const StreamableReplicaSetMonitorErrorHandler::ErrorActions& errorActions) const {
+ {
+ stdx::lock_guard lock(_mutex);
+ if (_isDropped.load())
+ return;
+
+ if (errorActions.dropConnections)
+ _connectionManager->dropConnections(host);
+
+ if (errorActions.requestImmediateCheck)
+ _isMasterMonitor->requestImmediateCheck();
+ }
+
+ // Call outside of the lock since this may generate a topology change event.
+ if (errorActions.isMasterOutcome)
+ _topologyManager->onServerDescription(*errorActions.isMasterOutcome);
}
boost::optional<ServerDescriptionPtr> StreamableReplicaSetMonitor::_currentPrimary() const {
@@ -556,7 +601,7 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent(
kLowerLogLevel,
"Skip publishing unconfirmed replica set members since there are "
"no primaries or secondaries in the new topology",
- "replicaSetName"_attr = getName());
+ "setName"_attr = getName());
return;
}
@@ -580,29 +625,29 @@ void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT dura
Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) {
- IsMasterOutcome outcome(hostAndPort, reply, errorStatus.toString());
- _topologyManager->onServerDescription(outcome);
+ _failedHost(
+ HostAndPort(hostAndPort), errorStatus, reply, HandshakeStage::kPostHandshake, false);
}
void StreamableReplicaSetMonitor::onServerPingFailedEvent(const ServerAddress& hostAndPort,
const Status& status) {
- LOGV2_DEBUG(4668133,
- 0,
- " StreamableReplicaSetMonitor::onServerPingFailedEvent, ServerPingMonitor got "
- "status{status} ",
- "addr"_attr = hostAndPort,
- "status"_attr = status);
- failedHost(HostAndPort(hostAndPort), status);
+ _failedHost(HostAndPort(hostAndPort), status, BSONObj(), HandshakeStage::kPostHandshake, false);
}
+void StreamableReplicaSetMonitor::onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
+ const Status& status,
+ const BSONObj reply) {
+ _failedHost(HostAndPort(address), status, reply, HandshakeStage::kPreHandshake, false);
+};
+
void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT durationMS,
const ServerAddress& hostAndPort) {
LOGV2_DEBUG(4668132,
- 1,
- " StreamableReplicaSetMonitor::onServerPingSucceededEvent, ServerPingMonitor for "
- "{addr} with {rtt}",
- "addr"_attr = hostAndPort,
- "rtt"_attr = durationMS);
+ kLowerLogLevel,
+ "ReplicaSetMonitor ping success",
+ "host"_attr = hostAndPort,
+ "setName"_attr = getName(),
+ "duration"_attr = durationMS);
_topologyManager->onServerRTTUpdated(hostAndPort, durationMS);
}
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index 5ec56495684..650d7abe237 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -41,6 +41,8 @@
#include "mongo/client/sdam/sdam.h"
#include "mongo/client/server_is_master_monitor.h"
#include "mongo/client/server_ping_monitor.h"
+#include "mongo/client/streamable_replica_set_monitor_error_handler.h"
+#include "mongo/executor/egress_tag_closer.h"
#include "mongo/executor/task_executor.h"
#include "mongo/logger/log_component.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -76,14 +78,16 @@ public:
static constexpr auto kCheckTimeout = Seconds(5);
StreamableReplicaSetMonitor(const MongoURI& uri,
- std::shared_ptr<executor::TaskExecutor> executor);
+ std::shared_ptr<executor::TaskExecutor> executor,
+ std::shared_ptr<executor::EgressTagCloser> connectionManager);
void init();
void drop();
static ReplicaSetMonitorPtr make(const MongoURI& uri,
- std::shared_ptr<executor::TaskExecutor> executor = nullptr);
+ std::shared_ptr<executor::TaskExecutor> executor,
+ std::shared_ptr<executor::EgressTagCloser> connectionCloser);
SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
Milliseconds maxWait = kDefaultFindHostTimeout);
@@ -93,8 +97,13 @@ public:
HostAndPort getMasterOrUassert();
- void failedHost(const HostAndPort& host, const Status& status);
- void failedHost(const HostAndPort& host, BSONObj bson, const Status& status);
+ void failedHost(const HostAndPort& host, const Status& status) override;
+ void failedHostPreHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) override;
+ void failedHostPostHandshake(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson) override;
bool isPrimary(const HostAndPort& host) const;
@@ -147,10 +156,12 @@ private:
std::vector<HostAndPort> _extractHosts(
const std::vector<sdam::ServerDescriptionPtr>& serverDescriptions);
+
boost::optional<std::vector<HostAndPort>> _getHosts(const TopologyDescriptionPtr& topology,
const ReadPreferenceSetting& criteria);
boost::optional<std::vector<HostAndPort>> _getHosts(const ReadPreferenceSetting& criteria);
+ // Incoming Events
void onTopologyDescriptionChangedEvent(UUID topologyId,
sdam::TopologyDescriptionPtr previousDescription,
sdam::TopologyDescriptionPtr newDescription) override;
@@ -159,6 +170,10 @@ private:
const sdam::ServerAddress& hostAndPort,
const BSONObj reply) override;
+ void onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
+ const Status& status,
+ const BSONObj reply) override;
+
void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
Status errorStatus,
const ServerAddress& hostAndPort,
@@ -197,10 +212,22 @@ private:
// Try to satisfy the outstanding queries for this instance with the given topology information.
void _processOutstanding(const TopologyDescriptionPtr& topologyDescription);
+ // Take action on error for the given host.
+ void _doErrorActions(
+ const HostAndPort& host,
+ const StreamableReplicaSetMonitorErrorHandler::ErrorActions& errorActions) const;
+
+ void _failedHost(const HostAndPort& host,
+ const Status& status,
+ BSONObj bson,
+ StreamableReplicaSetMonitorErrorHandler::HandshakeStage stage,
+ bool isApplicationOperation);
+
sdam::SdamConfiguration _sdamConfig;
sdam::TopologyManagerPtr _topologyManager;
sdam::ServerSelectorPtr _serverSelector;
sdam::TopologyEventsPublisherPtr _eventsPublisher;
+ std::unique_ptr<StreamableReplicaSetMonitorErrorHandler> _errorHandler;
ServerIsMasterMonitorPtr _isMasterMonitor;
std::shared_ptr<ServerPingMonitor> _pingMonitor;
@@ -210,6 +237,7 @@ private:
const MongoURI _uri;
+ std::shared_ptr<executor::EgressTagCloser> _connectionManager;
std::shared_ptr<executor::TaskExecutor> _executor;
AtomicWord<bool> _isDropped{true};
diff --git a/src/mongo/client/streamable_replica_set_monitor_error_handler.cpp b/src/mongo/client/streamable_replica_set_monitor_error_handler.cpp
new file mode 100644
index 00000000000..be19b1e64ec
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_error_handler.cpp
@@ -0,0 +1,163 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+#include "mongo/client/streamable_replica_set_monitor_error_handler.h"
+
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+SdamErrorHandler::ErrorActions SdamErrorHandler::computeErrorActions(const HostAndPort& host,
+ const Status& status,
+ HandshakeStage handshakeStage,
+ bool isApplicationOperation,
+ BSONObj bson) noexcept {
+ // Initial state: don't drop connections, no immediate check, and don't generate an error server
+ // description.
+ ErrorActions result;
+ ON_BLOCK_EXIT([this, &result, &host, &status] {
+ if (result.isMasterOutcome)
+ _clearConsecutiveErrorsWithoutIsMasterOutcome(host);
+
+ LOGV2(4712102,
+ "Host failed in replica set",
+ "setName"_attr = _setName,
+ "host"_attr = host,
+ "error"_attr = status,
+ "action"_attr = result);
+ });
+
+ // Helpers to mutate the actions
+ const auto setCreateServerDescriptionAction = [this, &result, &host, &status, bson]() {
+ result.isMasterOutcome = _createErrorIsMasterOutcome(host, bson, status);
+ };
+ const auto setImmediateCheckAction = [&result]() { result.requestImmediateCheck = true; };
+ const auto setDropConnectionsAction = [&result]() { result.dropConnections = true; };
+
+ if (!_isNetworkError(status) && !_isNotMasterOrNotRecovering(status)) {
+ setCreateServerDescriptionAction();
+ return result;
+ }
+
+ if (isApplicationOperation) {
+ if (_isNetworkError(status)) {
+ switch (handshakeStage) {
+ case HandshakeStage::kPreHandshake:
+ setCreateServerDescriptionAction();
+ break;
+ case HandshakeStage::kPostHandshake:
+ if (!_isNetworkTimeout(status)) {
+ setCreateServerDescriptionAction();
+ }
+ break;
+ }
+ setDropConnectionsAction();
+ } else if (_isNotMasterOrNotRecovering(status)) {
+ setCreateServerDescriptionAction();
+ setImmediateCheckAction();
+ if (_isNodeShuttingDown(status)) {
+ setDropConnectionsAction();
+ }
+ }
+ } else if (_isNetworkError(status)) {
+ switch (handshakeStage) {
+ case HandshakeStage::kPreHandshake:
+ setCreateServerDescriptionAction();
+ break;
+ case HandshakeStage::kPostHandshake:
+ int errorCount = _getConsecutiveErrorsWithoutIsMasterOutcome(host);
+ if (errorCount == 1) {
+ setCreateServerDescriptionAction();
+ } else {
+ setImmediateCheckAction();
+ _incrementConsecutiveErrorsWithoutIsMasterOutcome(host);
+ }
+ break;
+ }
+ setDropConnectionsAction();
+ }
+
+ return result;
+}
+
+BSONObj StreamableReplicaSetMonitorErrorHandler::ErrorActions::toBSON() const {
+ BSONObjBuilder builder;
+ builder.append("dropConnections", dropConnections);
+ builder.append("requestImmediateCheck", requestImmediateCheck);
+ if (isMasterOutcome) {
+ builder.append("outcome", isMasterOutcome->toBSON());
+ }
+ return builder.obj();
+}
+
+bool SdamErrorHandler::_isNodeRecovering(const Status& status) const {
+ return ErrorCodes::isA<ErrorCategory::RetriableError>(status.code());
+}
+
+bool SdamErrorHandler::_isNetworkTimeout(const Status& status) const {
+ return ErrorCodes::isA<ErrorCategory::NetworkTimeoutError>(status.code());
+}
+
+bool SdamErrorHandler::_isNodeShuttingDown(const Status& status) const {
+ return ErrorCodes::isA<ErrorCategory::ShutdownError>(status.code());
+}
+
+bool SdamErrorHandler::_isNetworkError(const Status& status) const {
+ return ErrorCodes::isA<ErrorCategory::NetworkError>(status.code());
+}
+
+bool SdamErrorHandler::_isNotMasterOrNotRecovering(const Status& status) const {
+ return _isNodeRecovering(status) || _isNotMaster(status);
+}
+
+bool SdamErrorHandler::_isNotMaster(const Status& status) const {
+ return ErrorCodes::isA<ErrorCategory::NotMasterError>(status.code());
+}
+
+int SdamErrorHandler::_getConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) const {
+ stdx::lock_guard lock(_mutex);
+ if (auto it = _consecutiveErrorsWithoutIsMasterOutcome.find(host);
+ it != _consecutiveErrorsWithoutIsMasterOutcome.end()) {
+ return it->second;
+ }
+ return 0;
+}
+
+void SdamErrorHandler::_incrementConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) {
+ stdx::lock_guard lock(_mutex);
+ auto [iter, wasEmplaced] = _consecutiveErrorsWithoutIsMasterOutcome.emplace(host, 1);
+ if (!wasEmplaced) {
+ ++(iter->second);
+ }
+}
+
+void SdamErrorHandler::_clearConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) {
+ stdx::lock_guard lock(_mutex);
+ _consecutiveErrorsWithoutIsMasterOutcome.erase(host);
+}
+} // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor_error_handler.h b/src/mongo/client/streamable_replica_set_monitor_error_handler.h
new file mode 100644
index 00000000000..86e9db135e7
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_error_handler.h
@@ -0,0 +1,94 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+#include <boost/optional.hpp>
+
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/executor/network_interface.h"
+#include "mongo/platform/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+
+namespace mongo {
+class StreamableReplicaSetMonitorErrorHandler {
+public:
+ struct ErrorActions {
+ bool dropConnections = false;
+ bool requestImmediateCheck = false;
+ boost::optional<sdam::IsMasterOutcome> isMasterOutcome;
+ BSONObj toBSON() const;
+ };
+
+ // Indicates whether the initial handshake has completed.
+ enum class HandshakeStage { kPreHandshake, kPostHandshake };
+
+ virtual ~StreamableReplicaSetMonitorErrorHandler() = default;
+
+ // Based on the error status, source of the error, and handshake stage determine what
+ // ErrorActions we should take.
+ virtual ErrorActions computeErrorActions(const HostAndPort& host,
+ const Status& status,
+ HandshakeStage handshakeStage,
+ bool isApplicationOperation,
+ BSONObj bson) noexcept = 0;
+
+protected:
+ sdam::IsMasterOutcome _createErrorIsMasterOutcome(const HostAndPort& host,
+ boost::optional<BSONObj> bson,
+ const Status& status) const {
+ return sdam::IsMasterOutcome(host.toString(), bson ? *bson : BSONObj(), status.toString());
+ }
+};
+
+class SdamErrorHandler final : public StreamableReplicaSetMonitorErrorHandler {
+public:
+ explicit SdamErrorHandler(std::string setName) : _setName(std::move(setName)){};
+
+ ErrorActions computeErrorActions(const HostAndPort& host,
+ const Status& status,
+ HandshakeStage handshakeStage,
+ bool isApplicationOperation,
+ BSONObj bson) noexcept override;
+
+private:
+ int _getConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) const;
+ void _incrementConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host);
+ void _clearConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host);
+
+ bool _isNodeRecovering(const Status& status) const;
+ bool _isNetworkTimeout(const Status& status) const;
+ bool _isNodeShuttingDown(const Status& status) const;
+ bool _isNetworkError(const Status& status) const;
+ bool _isNotMasterOrNotRecovering(const Status& status) const;
+ bool _isNotMaster(const Status& status) const;
+
+ const std::string _setName;
+ mutable Mutex _mutex;
+ stdx::unordered_map<HostAndPort, int> _consecutiveErrorsWithoutIsMasterOutcome;
+};
+} // namespace mongo
diff --git a/src/mongo/client/streamable_replica_set_monitor_error_handler_test.cpp b/src/mongo/client/streamable_replica_set_monitor_error_handler_test.cpp
new file mode 100644
index 00000000000..45aea46b787
--- /dev/null
+++ b/src/mongo/client/streamable_replica_set_monitor_error_handler_test.cpp
@@ -0,0 +1,210 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+#include "mongo/client/streamable_replica_set_monitor_error_handler.h"
+
+#include <boost/optional/optional_io.hpp>
+
+#include "mongo/client/sdam/sdam.h"
+#include "mongo/logv2/log.h"
+#include "mongo/platform/basic.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+using HandshakeStage = StreamableReplicaSetMonitorErrorHandler::HandshakeStage;
+using ErrorActions = StreamableReplicaSetMonitorErrorHandler::ErrorActions;
+using Error = ErrorCodes::Error;
+
+class StreamableReplicaSetMonitorErrorHandlerTestFixture : public unittest::Test {
+public:
+ Status makeStatus(Error error) {
+ return Status(error, kErrorMessage);
+ }
+
+ void verifyActions(ErrorActions result, ErrorActions expectedResult) {
+ ASSERT_EQUALS(result.dropConnections, expectedResult.dropConnections);
+ ASSERT_EQUALS(result.requestImmediateCheck, expectedResult.requestImmediateCheck);
+ if (expectedResult.isMasterOutcome) {
+ ASSERT_FALSE(result.isMasterOutcome->isSuccess());
+ std::string resultError = result.isMasterOutcome->getErrorMsg();
+ ASSERT_NOT_EQUALS(resultError.find(expectedResult.isMasterOutcome->getErrorMsg()),
+ std::string::npos);
+ } else {
+ ASSERT_FALSE(result.isMasterOutcome);
+ }
+ }
+
+ void testScenario(HandshakeStage stage,
+ bool isApplicationOperation,
+ std::vector<Error> errors,
+ std::function<ErrorActions(const Status&)> expectedResultGenerator,
+ int numAttempts = 1) {
+ auto testSubject = subject();
+
+ const auto prePost = (stage == HandshakeStage::kPreHandshake) ? "pre" : "post";
+ const auto applicationOperation = (isApplicationOperation) ? "application" : "monitoring";
+ LOGV2_INFO(4712105,
+ "Check Scenario",
+ "handshake"_attr = prePost,
+ "operationType"_attr = applicationOperation);
+ for (auto error : errors) {
+ LOGV2_INFO(4712106, "Check error ", "error"_attr = ErrorCodes::errorString(error));
+ for (int attempt = 0; attempt < numAttempts; attempt++) {
+ auto result = testSubject->computeErrorActions(
+ kHost, makeStatus(error), stage, isApplicationOperation, kErrorBson);
+ verifyActions(result, expectedResultGenerator(makeStatus(error)));
+ LOGV2_INFO(4712107, "Attempt Successful", "num"_attr = attempt);
+ }
+ }
+ }
+
+ void testScenario(HandshakeStage stage,
+ bool isApplicationOperation,
+ std::vector<Error> errors,
+ ErrorActions expectedResult,
+ int numAttempts = 1) {
+ testScenario(stage, isApplicationOperation, errors, [expectedResult](const Status&) {
+ return expectedResult;
+ });
+ }
+
+ std::unique_ptr<StreamableReplicaSetMonitorErrorHandler> subject() {
+ return std::make_unique<SdamErrorHandler>(kSetName);
+ }
+
+ inline static const std::vector<Error> kNetworkErrors{ErrorCodes::SocketException,
+ ErrorCodes::NetworkTimeout,
+ ErrorCodes::HostNotFound,
+ ErrorCodes::HostUnreachable};
+ inline static const std::vector<Error> kNetworkErrorsNoTimeout{
+ ErrorCodes::SocketException, ErrorCodes::HostNotFound, ErrorCodes::HostUnreachable};
+ inline static const std::vector<Error> kInternalError{ErrorCodes::InternalError};
+ inline static const std::vector<Error> kNotMasterAndNodeRecovering{
+ ErrorCodes::InterruptedAtShutdown,
+ ErrorCodes::InterruptedDueToReplStateChange,
+ ErrorCodes::NotMasterOrSecondary,
+ ErrorCodes::PrimarySteppedDown,
+ ErrorCodes::ShutdownInProgress,
+ ErrorCodes::NotMaster,
+ ErrorCodes::NotMasterNoSlaveOk};
+
+ inline static const std::string kSetName = "setName";
+ inline static const HostAndPort kHost = HostAndPort("foobar:123");
+ inline static const std::string kErrorMessage = "an error message";
+ inline static const BSONObj kErrorBson = BSONObjBuilder().append("ok", 0).obj();
+ inline static const sdam::IsMasterOutcome kErrorIsMasterOutcome =
+ sdam::IsMasterOutcome(kHost.toString(), kErrorBson, kErrorMessage);
+
+ static constexpr bool kApplicationOperation = true;
+ static constexpr bool kMonitoringOperation = false;
+
+ static constexpr int kThreeAttempts = 3;
+};
+
+TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, ApplicationNetworkErrorsPreHandshake) {
+ testScenario(
+ HandshakeStage::kPreHandshake,
+ kApplicationOperation,
+ kNetworkErrors,
+ StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, false, kErrorIsMasterOutcome});
+};
+
+// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#network-error-when-reading-or-writing
+TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, ApplicationNetworkErrorsPostHandshake) {
+ testScenario(
+ HandshakeStage::kPostHandshake,
+ kApplicationOperation,
+ kNetworkErrorsNoTimeout,
+ StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, false, kErrorIsMasterOutcome});
+};
+
+// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#network-error-during-server-check
+TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, MonitoringNetworkErrorsPostHandshake) {
+ // Two consecutive errors must occur to expect an unknown server description.
+ const auto errorServerDescriptionOnSecondNetworkFailure = [](const Status& status) {
+ static int count = 0;
+ count = (count + 1) % 2;
+ return (count == 1)
+ ? StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, true, boost::none}
+ : StreamableReplicaSetMonitorErrorHandler::ErrorActions{
+ true, false, kErrorIsMasterOutcome};
+ };
+
+ testScenario(HandshakeStage::kPostHandshake,
+ kMonitoringOperation,
+ kNetworkErrors,
+ errorServerDescriptionOnSecondNetworkFailure,
+ kThreeAttempts);
+}
+
+// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#network-error-during-server-check
+TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, MonitoringNetworkErrorsPreHandshake) {
+ testScenario(
+ HandshakeStage::kPreHandshake,
+ kMonitoringOperation,
+ kNetworkErrors,
+ StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, false, kErrorIsMasterOutcome},
+ kThreeAttempts);
+}
+
+// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-master-and-node-is-recovering
+TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, ApplicationNotMasterOrNodeRecovering) {
+ const auto shutdownErrorsDropConnections = [](const Status& status) {
+ if (ErrorCodes::isA<ErrorCategory::ShutdownError>(status.code())) {
+ return StreamableReplicaSetMonitorErrorHandler::ErrorActions{
+ true, true, kErrorIsMasterOutcome};
+ } else {
+ return StreamableReplicaSetMonitorErrorHandler::ErrorActions{
+ false, true, kErrorIsMasterOutcome};
+ }
+ };
+
+ testScenario(HandshakeStage::kPostHandshake,
+ kApplicationOperation,
+ kNotMasterAndNodeRecovering,
+ shutdownErrorsDropConnections);
+}
+
+TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, MonitoringNonNetworkError) {
+ testScenario(
+ HandshakeStage::kPostHandshake,
+ kMonitoringOperation,
+ kInternalError,
+ StreamableReplicaSetMonitorErrorHandler::ErrorActions{false, false, kErrorIsMasterOutcome});
+}
+
+TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture,
+ ApplicationNonNetworkIsMasterOrRecoveringError) {
+ testScenario(
+ HandshakeStage::kPostHandshake,
+ kMonitoringOperation,
+ kInternalError,
+ StreamableReplicaSetMonitorErrorHandler::ErrorActions{false, false, kErrorIsMasterOutcome});
+}
+} // namespace mongo