diff options
author | Lamont Nelson <lamont.nelson@mongodb.com> | 2020-03-26 22:26:54 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-10 03:15:45 +0000 |
commit | d65c4e67ff4cf3b69e24c292d0ce343aecc9f5a5 (patch) | |
tree | 4ff8d7b8254ba312a98b3ddca71829ea49868597 /src | |
parent | 9c5d82ff1fb2bb7367e10c9e21ac67b440e7097e (diff) | |
download | mongo-d65c4e67ff4cf3b69e24c292d0ce343aecc9f5a5.tar.gz |
SERVER-47121: Introduce pre/post handshake error handling in the RSM
Diffstat (limited to 'src')
17 files changed, 736 insertions, 48 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 6d9cfb4eefb..99cc1b539fe 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -1,6 +1,7 @@ # Group related errors into categories,can be checked against ErrorCodes::isXXXClassName methods. error_categories: - NetworkError + - NetworkTimeoutError - Interruption # isNotMasterError() includes all codes that indicate that the node that received the request # was not master at some point during command processing,regardless of whether some write may @@ -110,7 +111,7 @@ error_codes: - {code: 86,name: IndexKeySpecsConflict} - {code: 87,name: CannotSplit} - {code: 88,name: SplitFailed_OBSOLETE} - - {code: 89,name: NetworkTimeout,categories: [NetworkError,RetriableError]} + - {code: 89,name: NetworkTimeout,categories: [NetworkError,RetriableError,NetworkTimeoutError]} - {code: 90,name: CallbackCanceled,categories: [CancelationError]} - {code: 91,name: ShutdownInProgress,categories: [ShutdownError,CancelationError,RetriableError]} - {code: 92,name: SecondaryAheadOfPrimary} @@ -221,7 +222,7 @@ error_codes: - {code: 199,name: ReplicaSetMonitorRemoved} - {code: 200,name: ChunkRangeCleanupPending} - {code: 201,name: CannotBuildIndexKeys} - - {code: 202,name: NetworkInterfaceExceededTimeLimit,categories: [ExceededTimeLimitError]} + - {code: 202,name: NetworkInterfaceExceededTimeLimit,categories: [ExceededTimeLimitError,NetworkTimeoutError]} - {code: 203,name: ShardingStateNotInitialized} - {code: 204,name: TimeProofMismatch} - {code: 205,name: ClusterTimeFailsRateLimiter} @@ -285,7 +286,7 @@ error_codes: - {code: 259,name: KeyedExecutorRetry} - {code: 260,name: InvalidResumeToken} - {code: 261,name: TooManyLogicalSessions} - - {code: 262,name: ExceededTimeLimit,categories: [Interruption,ExceededTimeLimitError, RetriableError]} + - {code: 262,name: ExceededTimeLimit,categories: [Interruption,ExceededTimeLimitError,RetriableError]} - {code: 263,name: OperationNotSupportedInTransaction} - {code: 264,name: TooManyFilesOpen} - {code: 265,name: OrphanedRangeCleanUpFailed} diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 51c3df426c9..cb79546c3cd 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -199,6 +199,7 @@ clientDriverEnv.Library( 'scanning_replica_set_monitor.cpp', 'streamable_replica_set_monitor.cpp', 'streamable_replica_set_monitor_query_processor.cpp', + 'streamable_replica_set_monitor_error_handler.cpp', 'server_is_master_monitor.cpp', 'server_ping_monitor.cpp', ], @@ -333,6 +334,7 @@ env.CppUnitTest( 'scanning_replica_set_monitor_scan_test.cpp', 'scanning_replica_set_monitor_test_concurrent.cpp', 'scanning_replica_set_monitor_test_fixture.cpp', + 'streamable_replica_set_monitor_error_handler_test.cpp', 'server_ping_monitor_test.cpp', ], LIBDEPS=[ diff --git a/src/mongo/client/replica_set_monitor_interface.h b/src/mongo/client/replica_set_monitor_interface.h index 3826154dfe2..ad40412dbc4 100644 --- a/src/mongo/client/replica_set_monitor_interface.h +++ b/src/mongo/client/replica_set_monitor_interface.h @@ -97,11 +97,17 @@ public: * Notifies this Monitor that a host has failed because of the specified error 'status' and * should be considered down. * - * Call this when you get a connection error. If you get an error while trying to refresh our - * view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's - * mutex. + * The sdam version of the Monitor makes a distinction between failures happening before or + * after the initial handshake for the connection. The failedHost method is kept for backwards + * compatibility, and is equivalent to failedHostPostHandshake. */ virtual void failedHost(const HostAndPort& host, const Status& status) = 0; + virtual void failedHostPreHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) = 0; + virtual void failedHostPostHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) = 0; /** * Returns true if this node is the master based ONLY on local data. Be careful, return may diff --git a/src/mongo/client/replica_set_monitor_manager.cpp b/src/mongo/client/replica_set_monitor_manager.cpp index 8986fd143ac..384c67c6c87 100644 --- a/src/mongo/client/replica_set_monitor_manager.cpp +++ b/src/mongo/client/replica_set_monitor_manager.cpp @@ -87,8 +87,23 @@ Status ReplicaSetMonitorManagerNetworkConnectionHook::validateHost( auto publisher = streamableMonitor->getEventsPublisher(); if (publisher) { - publisher->onServerHandshakeCompleteEvent( - isMasterReply.elapsedMillis.get(), remoteHost.toString(), isMasterReply.data); + try { + if (isMasterReply.status.isOK()) { + publisher->onServerHandshakeCompleteEvent(isMasterReply.elapsedMillis.get(), + remoteHost.toString(), + isMasterReply.data); + } else { + publisher->onServerHandshakeFailedEvent( + remoteHost.toString(), isMasterReply.status, isMasterReply.data); + } + } catch (const DBException& exception) { + LOGV2_ERROR(4712101, + "An error occurred publishing a ReplicaSetMonitor handshake event", + "error"_attr = exception.toStatus(), + "setName"_attr = monitor->getName(), + "handshakeStatus"_attr = isMasterReply.status); + return exception.toStatus(); + } } } } @@ -133,10 +148,14 @@ void ReplicaSetMonitorManager::_setupTaskExecutorInLock() { // construct task executor auto hookList = std::make_unique<rpc::EgressMetadataHookList>(); auto networkConnectionHook = std::make_unique<ReplicaSetMonitorManagerNetworkConnectionHook>(); - auto net = executor::makeNetworkInterface( + + std::shared_ptr<NetworkInterface> networkInterface = executor::makeNetworkInterface( "ReplicaSetMonitor-TaskExecutor", std::move(networkConnectionHook), std::move(hookList)); - auto pool = std::make_unique<NetworkInterfaceThreadPool>(net.get()); - _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), std::move(net)); + _connectionManager = std::make_unique<ReplicaSetMonitorConnectionManager>(networkInterface); + + auto pool = std::make_unique<NetworkInterfaceThreadPool>(networkInterface.get()); + + _taskExecutor = std::make_shared<ThreadPoolTaskExecutor>(std::move(pool), networkInterface); _taskExecutor->startup(); } @@ -173,7 +192,7 @@ shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorManager::getOrCreateMonitor(const newMonitor->init(); } else { LOGV2(4333205, "Starting Streamable ReplicaSetMonitor", "uri"_attr = uri.toString()); - newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor()); + newMonitor = StreamableReplicaSetMonitor::make(uri, getExecutor(), _getConnectionManager()); } _monitors[setName] = newMonitor; return newMonitor; @@ -220,6 +239,7 @@ void ReplicaSetMonitorManager::removeMonitor(StringData setName) { void ReplicaSetMonitorManager::shutdown() { decltype(_monitors) monitors; decltype(_taskExecutor) taskExecutor; + decltype(_connectionManager) connectionManager; { stdx::lock_guard<Latch> lk(_mutex); if (std::exchange(_isShutdown, true)) { @@ -227,6 +247,7 @@ void ReplicaSetMonitorManager::shutdown() { } monitors = std::exchange(_monitors, {}); + connectionManager = std::exchange(_connectionManager, {}); taskExecutor = std::exchange(_taskExecutor, {}); } @@ -278,6 +299,11 @@ std::shared_ptr<executor::TaskExecutor> ReplicaSetMonitorManager::getExecutor() return _taskExecutor; } +std::shared_ptr<executor::EgressTagCloser> ReplicaSetMonitorManager::_getConnectionManager() { + invariant(_connectionManager); + return _connectionManager; +} + ReplicaSetChangeNotifier& ReplicaSetMonitorManager::getNotifier() { return _notifier; } @@ -286,4 +312,8 @@ bool ReplicaSetMonitorManager::isShutdown() const { stdx::lock_guard<Latch> lk(_mutex); return _isShutdown; } + +void ReplicaSetMonitorConnectionManager::dropConnections(const HostAndPort& hostAndPort) { + _network->dropConnections(hostAndPort); +} } // namespace mongo diff --git a/src/mongo/client/replica_set_monitor_manager.h b/src/mongo/client/replica_set_monitor_manager.h index de628867b6e..07b5d4e3ba4 100644 --- a/src/mongo/client/replica_set_monitor_manager.h +++ b/src/mongo/client/replica_set_monitor_manager.h @@ -33,7 +33,9 @@ #include <vector> #include "mongo/client/replica_set_change_notifier.h" +#include "mongo/executor/egress_tag_closer.h" #include "mongo/executor/network_connection_hook.h" +#include "mongo/executor/network_interface.h" #include "mongo/executor/task_executor.h" #include "mongo/platform/mutex.h" #include "mongo/util/hierarchical_acquisition.h" @@ -62,6 +64,30 @@ public: executor::RemoteCommandResponse&& response) override; }; +class ReplicaSetMonitorConnectionManager : public executor::EgressTagCloser { + ReplicaSetMonitorConnectionManager() = delete; + +public: + ReplicaSetMonitorConnectionManager(std::shared_ptr<executor::NetworkInterface> network) + : _network(network) {} + + void dropConnections(const HostAndPort& hostAndPort) override; + + // Not supported. + void dropConnections(transport::Session::TagMask tags) override { + MONGO_UNREACHABLE; + }; + // Not supported. + void mutateTags(const HostAndPort& hostAndPort, + const std::function<transport::Session::TagMask(transport::Session::TagMask)>& + mutateFunc) override { + MONGO_UNREACHABLE; + }; + +private: + std::shared_ptr<executor::NetworkInterface> _network; +}; + /** * Manages the lifetime of a set of replica set monitors. */ @@ -122,7 +148,13 @@ public: bool isShutdown() const; + private: + /** + * Returns an EgressTagCloser controlling the executor's network interface. + */ + std::shared_ptr<executor::EgressTagCloser> _getConnectionManager(); + using ReplicaSetMonitorsMap = StringMap<std::weak_ptr<ReplicaSetMonitor>>; // Protects access to the replica set monitors @@ -132,6 +164,10 @@ private: // Executor for monitoring replica sets. std::shared_ptr<executor::TaskExecutor> _taskExecutor; + // Allows closing connections established by the network interface associated with the + // _taskExecutor instance + std::shared_ptr<ReplicaSetMonitorConnectionManager> _connectionManager; + // Widget to notify listeners when a RSM notices a change ReplicaSetChangeNotifier _notifier; diff --git a/src/mongo/client/scanning_replica_set_monitor.cpp b/src/mongo/client/scanning_replica_set_monitor.cpp index 5e400d6b009..66c1fde5b27 100644 --- a/src/mongo/client/scanning_replica_set_monitor.cpp +++ b/src/mongo/client/scanning_replica_set_monitor.cpp @@ -356,6 +356,18 @@ void ScanningReplicaSetMonitor::failedHost(const HostAndPort& host, const Status _state->checkInvariants(); } +void ScanningReplicaSetMonitor::failedHostPreHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) { + failedHost(host, status); +} + +void ScanningReplicaSetMonitor::failedHostPostHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) { + failedHost(host, status); +} + bool ScanningReplicaSetMonitor::isPrimary(const HostAndPort& host) const { stdx::lock_guard<Latch> lk(_state->mutex); Node* node = _state->findNode(host); diff --git a/src/mongo/client/scanning_replica_set_monitor.h b/src/mongo/client/scanning_replica_set_monitor.h index efd5f10be14..25208682297 100644 --- a/src/mongo/client/scanning_replica_set_monitor.h +++ b/src/mongo/client/scanning_replica_set_monitor.h @@ -72,8 +72,19 @@ public: HostAndPort getMasterOrUassert() override; + /* + * For the ScanningReplicaSetMonitor, all the failedHost methods are equivalent. + */ void failedHost(const HostAndPort& host, const Status& status) override; + void failedHostPreHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) override; + + void failedHostPostHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) override; + bool isPrimary(const HostAndPort& host) const override; bool isHostUp(const HostAndPort& host) const override; diff --git a/src/mongo/client/sdam/sdam_datatypes.cpp b/src/mongo/client/sdam/sdam_datatypes.cpp index a6be94e3014..2faf7acd361 100644 --- a/src/mongo/client/sdam/sdam_datatypes.cpp +++ b/src/mongo/client/sdam/sdam_datatypes.cpp @@ -145,4 +145,24 @@ const boost::optional<TopologyVersion>& IsMasterOutcome::getTopologyVersion() co const std::string& IsMasterOutcome::getErrorMsg() const { return _errorMsg; } + +BSONObj IsMasterOutcome::toBSON() const { + BSONObjBuilder builder; + builder.append("host", _server); + builder.append("success", _success); + + if (_errorMsg != "") + builder.append("errorMessage", _errorMsg); + + if (_topologyVersion) + builder.append("topologyVersion", _topologyVersion->toBSON()); + + if (_rtt) + builder.append("duration", _rtt->toBSON()); + + if (_response) + builder.append("response", *_response); + + return builder.obj(); +} }; // namespace mongo::sdam diff --git a/src/mongo/client/sdam/sdam_datatypes.h b/src/mongo/client/sdam/sdam_datatypes.h index 0da61e14d09..3ec5363965c 100644 --- a/src/mongo/client/sdam/sdam_datatypes.h +++ b/src/mongo/client/sdam/sdam_datatypes.h @@ -107,6 +107,7 @@ public: const boost::optional<IsMasterRTT>& getRtt() const; const boost::optional<TopologyVersion>& getTopologyVersion() const; const std::string& getErrorMsg() const; + BSONObj toBSON() const; private: ServerAddress _server; diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp index 34d8ef63563..48e1f2b147b 100644 --- a/src/mongo/client/sdam/topology_listener.cpp +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -77,6 +77,21 @@ void TopologyEventsPublisher::onServerHandshakeCompleteEvent(IsMasterRTT duratio _scheduleNextDelivery(); } +void TopologyEventsPublisher::onServerHandshakeFailedEvent(const sdam::ServerAddress& address, + const Status& status, + const BSONObj reply) { + { + stdx::lock_guard<Mutex> lock(_eventQueueMutex); + EventPtr event = std::make_unique<Event>(); + event->type = EventType::HANDSHAKE_FAILURE; + event->hostAndPort = address; + event->reply = reply; + event->status = status; + _eventQueue.push_back(std::move(event)); + } + _scheduleNextDelivery(); +} + void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, const ServerAddress& hostAndPort, const BSONObj reply) { @@ -201,6 +216,9 @@ void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Eve case EventType::PING_FAILURE: listener->onServerPingFailedEvent(event.hostAndPort, event.status); break; + case EventType::HANDSHAKE_FAILURE: + listener->onServerHandshakeFailedEvent(event.hostAndPort, event.status, event.reply); + break; default: MONGO_UNREACHABLE; } diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h index 4adbb2329d2..5cc737706d0 100644 --- a/src/mongo/client/sdam/topology_listener.h +++ b/src/mongo/client/sdam/topology_listener.h @@ -64,6 +64,10 @@ public: const sdam::ServerAddress& address, const BSONObj reply = BSONObj()){}; + virtual void onServerHandshakeFailedEvent(const sdam::ServerAddress& address, + const Status& status, + const BSONObj reply){}; + /** * Called when a ServerHeartBeatSucceededEvent is published - A heartbeat sent to the server at * hostAndPort succeeded. durationMS is the execution time of the event, including the time it @@ -108,6 +112,11 @@ public: virtual void onServerHandshakeCompleteEvent(IsMasterRTT durationMs, const sdam::ServerAddress& address, const BSONObj reply = BSONObj()) override; + + void onServerHandshakeFailedEvent(const sdam::ServerAddress& address, + const Status& status, + const BSONObj reply) override; + void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, const ServerAddress& hostAndPort, const BSONObj reply) override; @@ -126,8 +135,10 @@ private: PING_SUCCESS, PING_FAILURE, TOPOLOGY_DESCRIPTION_CHANGED, - HANDSHAKE_COMPLETE + HANDSHAKE_COMPLETE, + HANDSHAKE_FAILURE }; + struct Event { EventType type; ServerAddress hostAndPort; diff --git a/src/mongo/client/sdam/topology_manager.cpp b/src/mongo/client/sdam/topology_manager.cpp index 75242baa796..74f8de94d27 100644 --- a/src/mongo/client/sdam/topology_manager.cpp +++ b/src/mongo/client/sdam/topology_manager.cpp @@ -148,7 +148,7 @@ void TopologyManager::onServerRTTUpdated(ServerAddress hostAndPort, IsMasterRTT // otherwise, the server was removed from the topology. Nothing to do. LOGV2(4333201, "Not updating RTT. Server {server} does not exist in {setName}", - "server"_attr = hostAndPort, + "host"_attr = hostAndPort, "setName"_attr = getTopologyDescription()->getSetName()); } diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 042828f76e7..6ae640dd9af 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -63,9 +63,11 @@ using std::vector; namespace { // Pull nested types to top-level scope +using executor::EgressTagCloser; using executor::TaskExecutor; using CallbackArgs = TaskExecutor::CallbackArgs; using CallbackHandle = TaskExecutor::CallbackHandle; +using HandshakeStage = StreamableReplicaSetMonitorErrorHandler::HandshakeStage; const ReadPreferenceSetting kPrimaryOnlyReadPreference(ReadPreference::PrimaryOnly, TagSet()); @@ -140,11 +142,15 @@ constexpr auto kZeroMs = Milliseconds(0); * functionality. Once they are shutdown in the drop() method the operations exposed via their api * are effectively no-ops. */ -StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri, - std::shared_ptr<TaskExecutor> executor) +StreamableReplicaSetMonitor::StreamableReplicaSetMonitor( + const MongoURI& uri, + std::shared_ptr<TaskExecutor> executor, + std::shared_ptr<executor::EgressTagCloser> connectionManager) : _serverSelector(std::make_unique<SdamServerSelector>(kServerSelectionConfig)), + _errorHandler(std::make_unique<SdamErrorHandler>(uri.getSetName())), _queryProcessor(std::make_shared<StreamableReplicaSetMonitorQueryProcessor>()), _uri(uri), + _connectionManager(connectionManager), _executor(executor), _random(PseudoRandom(SecureRandom().nextInt64())) { @@ -157,9 +163,11 @@ StreamableReplicaSetMonitor::StreamableReplicaSetMonitor(const MongoURI& uri, _sdamConfig = SdamConfiguration(seeds); } -ReplicaSetMonitorPtr StreamableReplicaSetMonitor::make(const MongoURI& uri, - std::shared_ptr<TaskExecutor> executor) { - auto result = std::make_shared<StreamableReplicaSetMonitor>(uri, executor); +ReplicaSetMonitorPtr StreamableReplicaSetMonitor::make( + const MongoURI& uri, + std::shared_ptr<TaskExecutor> executor, + std::shared_ptr<executor::EgressTagCloser> connectionManager) { + auto result = std::make_shared<StreamableReplicaSetMonitor>(uri, executor, connectionManager); result->init(); return result; } @@ -222,7 +230,7 @@ std::vector<HostAndPort> StreamableReplicaSetMonitor::_extractHosts( const std::vector<ServerDescriptionPtr>& serverDescriptions) { std::vector<HostAndPort> result; for (const auto& server : serverDescriptions) { - result.push_back(HostAndPort(server->getAddress())); + result.emplace_back(server->getAddress()); } return result; } @@ -301,7 +309,7 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutsta LOGV2_INFO(4333208, "RSM {setName} host selection timeout: {status}", "setName"_attr = getName(), - "status"_attr = errorStatus.toString()); + "error"_attr = errorStatus.toString()); }; auto swDeadlineHandle = _executor->scheduleWorkAt(query->deadline, deadlineCb); @@ -309,7 +317,7 @@ SemiFuture<std::vector<HostAndPort>> StreamableReplicaSetMonitor::_enqueueOutsta LOGV2_INFO(4333207, "RSM {setName} error scheduling deadline handler: {status}", "setName"_attr = getName(), - "status"_attr = swDeadlineHandle.getStatus()); + "error"_attr = swDeadlineHandle.getStatus()); return SemiFuture<HostAndPortList>::makeReady(swDeadlineHandle.getStatus()); } query->deadlineHandle = swDeadlineHandle.getValue(); @@ -343,16 +351,53 @@ sdam::TopologyEventsPublisherPtr StreamableReplicaSetMonitor::getEventsPublisher return _eventsPublisher; } - void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, const Status& status) { - failedHost(host, BSONObj(), status); + failedHostPostHandshake(host, status, BSONObj()); } -void StreamableReplicaSetMonitor::failedHost(const HostAndPort& host, - BSONObj bson, - const Status& status) { - IsMasterOutcome outcome(host.toString(), bson, status.toString()); - _topologyManager->onServerDescription(outcome); +void StreamableReplicaSetMonitor::failedHostPreHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) { + _failedHost(host, status, bson, HandshakeStage::kPreHandshake, true); +} + +void StreamableReplicaSetMonitor::failedHostPostHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) { + _failedHost(host, status, bson, HandshakeStage::kPostHandshake, true); +} + +void StreamableReplicaSetMonitor::_failedHost(const HostAndPort& host, + const Status& status, + BSONObj bson, + HandshakeStage stage, + bool isApplicationOperation) { + if (_isDropped.load()) + return; + + _doErrorActions( + host, + _errorHandler->computeErrorActions(host, status, stage, isApplicationOperation, bson)); +} + +void StreamableReplicaSetMonitor::_doErrorActions( + const HostAndPort& host, + const StreamableReplicaSetMonitorErrorHandler::ErrorActions& errorActions) const { + { + stdx::lock_guard lock(_mutex); + if (_isDropped.load()) + return; + + if (errorActions.dropConnections) + _connectionManager->dropConnections(host); + + if (errorActions.requestImmediateCheck) + _isMasterMonitor->requestImmediateCheck(); + } + + // Call outside of the lock since this may generate a topology change event. + if (errorActions.isMasterOutcome) + _topologyManager->onServerDescription(*errorActions.isMasterOutcome); } boost::optional<ServerDescriptionPtr> StreamableReplicaSetMonitor::_currentPrimary() const { @@ -556,7 +601,7 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( kLowerLogLevel, "Skip publishing unconfirmed replica set members since there are " "no primaries or secondaries in the new topology", - "replicaSetName"_attr = getName()); + "setName"_attr = getName()); return; } @@ -580,29 +625,29 @@ void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT dura Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) { - IsMasterOutcome outcome(hostAndPort, reply, errorStatus.toString()); - _topologyManager->onServerDescription(outcome); + _failedHost( + HostAndPort(hostAndPort), errorStatus, reply, HandshakeStage::kPostHandshake, false); } void StreamableReplicaSetMonitor::onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) { - LOGV2_DEBUG(4668133, - 0, - " StreamableReplicaSetMonitor::onServerPingFailedEvent, ServerPingMonitor got " - "status{status} ", - "addr"_attr = hostAndPort, - "status"_attr = status); - failedHost(HostAndPort(hostAndPort), status); + _failedHost(HostAndPort(hostAndPort), status, BSONObj(), HandshakeStage::kPostHandshake, false); } +void StreamableReplicaSetMonitor::onServerHandshakeFailedEvent(const sdam::ServerAddress& address, + const Status& status, + const BSONObj reply) { + _failedHost(HostAndPort(address), status, reply, HandshakeStage::kPreHandshake, false); +}; + void StreamableReplicaSetMonitor::onServerPingSucceededEvent(sdam::IsMasterRTT durationMS, const ServerAddress& hostAndPort) { LOGV2_DEBUG(4668132, - 1, - " StreamableReplicaSetMonitor::onServerPingSucceededEvent, ServerPingMonitor for " - "{addr} with {rtt}", - "addr"_attr = hostAndPort, - "rtt"_attr = durationMS); + kLowerLogLevel, + "ReplicaSetMonitor ping success", + "host"_attr = hostAndPort, + "setName"_attr = getName(), + "duration"_attr = durationMS); _topologyManager->onServerRTTUpdated(hostAndPort, durationMS); } diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 5ec56495684..650d7abe237 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -41,6 +41,8 @@ #include "mongo/client/sdam/sdam.h" #include "mongo/client/server_is_master_monitor.h" #include "mongo/client/server_ping_monitor.h" +#include "mongo/client/streamable_replica_set_monitor_error_handler.h" +#include "mongo/executor/egress_tag_closer.h" #include "mongo/executor/task_executor.h" #include "mongo/logger/log_component.h" #include "mongo/util/concurrency/with_lock.h" @@ -76,14 +78,16 @@ public: static constexpr auto kCheckTimeout = Seconds(5); StreamableReplicaSetMonitor(const MongoURI& uri, - std::shared_ptr<executor::TaskExecutor> executor); + std::shared_ptr<executor::TaskExecutor> executor, + std::shared_ptr<executor::EgressTagCloser> connectionManager); void init(); void drop(); static ReplicaSetMonitorPtr make(const MongoURI& uri, - std::shared_ptr<executor::TaskExecutor> executor = nullptr); + std::shared_ptr<executor::TaskExecutor> executor, + std::shared_ptr<executor::EgressTagCloser> connectionCloser); SemiFuture<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref, Milliseconds maxWait = kDefaultFindHostTimeout); @@ -93,8 +97,13 @@ public: HostAndPort getMasterOrUassert(); - void failedHost(const HostAndPort& host, const Status& status); - void failedHost(const HostAndPort& host, BSONObj bson, const Status& status); + void failedHost(const HostAndPort& host, const Status& status) override; + void failedHostPreHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) override; + void failedHostPostHandshake(const HostAndPort& host, + const Status& status, + BSONObj bson) override; bool isPrimary(const HostAndPort& host) const; @@ -147,10 +156,12 @@ private: std::vector<HostAndPort> _extractHosts( const std::vector<sdam::ServerDescriptionPtr>& serverDescriptions); + boost::optional<std::vector<HostAndPort>> _getHosts(const TopologyDescriptionPtr& topology, const ReadPreferenceSetting& criteria); boost::optional<std::vector<HostAndPort>> _getHosts(const ReadPreferenceSetting& criteria); + // Incoming Events void onTopologyDescriptionChangedEvent(UUID topologyId, sdam::TopologyDescriptionPtr previousDescription, sdam::TopologyDescriptionPtr newDescription) override; @@ -159,6 +170,10 @@ private: const sdam::ServerAddress& hostAndPort, const BSONObj reply) override; + void onServerHandshakeFailedEvent(const sdam::ServerAddress& address, + const Status& status, + const BSONObj reply) override; + void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, Status errorStatus, const ServerAddress& hostAndPort, @@ -197,10 +212,22 @@ private: // Try to satisfy the outstanding queries for this instance with the given topology information. void _processOutstanding(const TopologyDescriptionPtr& topologyDescription); + // Take action on error for the given host. + void _doErrorActions( + const HostAndPort& host, + const StreamableReplicaSetMonitorErrorHandler::ErrorActions& errorActions) const; + + void _failedHost(const HostAndPort& host, + const Status& status, + BSONObj bson, + StreamableReplicaSetMonitorErrorHandler::HandshakeStage stage, + bool isApplicationOperation); + sdam::SdamConfiguration _sdamConfig; sdam::TopologyManagerPtr _topologyManager; sdam::ServerSelectorPtr _serverSelector; sdam::TopologyEventsPublisherPtr _eventsPublisher; + std::unique_ptr<StreamableReplicaSetMonitorErrorHandler> _errorHandler; ServerIsMasterMonitorPtr _isMasterMonitor; std::shared_ptr<ServerPingMonitor> _pingMonitor; @@ -210,6 +237,7 @@ private: const MongoURI _uri; + std::shared_ptr<executor::EgressTagCloser> _connectionManager; std::shared_ptr<executor::TaskExecutor> _executor; AtomicWord<bool> _isDropped{true}; diff --git a/src/mongo/client/streamable_replica_set_monitor_error_handler.cpp b/src/mongo/client/streamable_replica_set_monitor_error_handler.cpp new file mode 100644 index 00000000000..be19b1e64ec --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_error_handler.cpp @@ -0,0 +1,163 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/client/streamable_replica_set_monitor_error_handler.h" + +#include "mongo/logv2/log.h" + +namespace mongo { +SdamErrorHandler::ErrorActions SdamErrorHandler::computeErrorActions(const HostAndPort& host, + const Status& status, + HandshakeStage handshakeStage, + bool isApplicationOperation, + BSONObj bson) noexcept { + // Initial state: don't drop connections, no immediate check, and don't generate an error server + // description. + ErrorActions result; + ON_BLOCK_EXIT([this, &result, &host, &status] { + if (result.isMasterOutcome) + _clearConsecutiveErrorsWithoutIsMasterOutcome(host); + + LOGV2(4712102, + "Host failed in replica set", + "setName"_attr = _setName, + "host"_attr = host, + "error"_attr = status, + "action"_attr = result); + }); + + // Helpers to mutate the actions + const auto setCreateServerDescriptionAction = [this, &result, &host, &status, bson]() { + result.isMasterOutcome = _createErrorIsMasterOutcome(host, bson, status); + }; + const auto setImmediateCheckAction = [&result]() { result.requestImmediateCheck = true; }; + const auto setDropConnectionsAction = [&result]() { result.dropConnections = true; }; + + if (!_isNetworkError(status) && !_isNotMasterOrNotRecovering(status)) { + setCreateServerDescriptionAction(); + return result; + } + + if (isApplicationOperation) { + if (_isNetworkError(status)) { + switch (handshakeStage) { + case HandshakeStage::kPreHandshake: + setCreateServerDescriptionAction(); + break; + case HandshakeStage::kPostHandshake: + if (!_isNetworkTimeout(status)) { + setCreateServerDescriptionAction(); + } + break; + } + setDropConnectionsAction(); + } else if (_isNotMasterOrNotRecovering(status)) { + setCreateServerDescriptionAction(); + setImmediateCheckAction(); + if (_isNodeShuttingDown(status)) { + setDropConnectionsAction(); + } + } + } else if (_isNetworkError(status)) { + switch (handshakeStage) { + case HandshakeStage::kPreHandshake: + setCreateServerDescriptionAction(); + break; + case HandshakeStage::kPostHandshake: + int errorCount = _getConsecutiveErrorsWithoutIsMasterOutcome(host); + if (errorCount == 1) { + setCreateServerDescriptionAction(); + } else { + setImmediateCheckAction(); + _incrementConsecutiveErrorsWithoutIsMasterOutcome(host); + } + break; + } + setDropConnectionsAction(); + } + + return result; +} + +BSONObj StreamableReplicaSetMonitorErrorHandler::ErrorActions::toBSON() const { + BSONObjBuilder builder; + builder.append("dropConnections", dropConnections); + builder.append("requestImmediateCheck", requestImmediateCheck); + if (isMasterOutcome) { + builder.append("outcome", isMasterOutcome->toBSON()); + } + return builder.obj(); +} + +bool SdamErrorHandler::_isNodeRecovering(const Status& status) const { + return ErrorCodes::isA<ErrorCategory::RetriableError>(status.code()); +} + +bool SdamErrorHandler::_isNetworkTimeout(const Status& status) const { + return ErrorCodes::isA<ErrorCategory::NetworkTimeoutError>(status.code()); +} + +bool SdamErrorHandler::_isNodeShuttingDown(const Status& status) const { + return ErrorCodes::isA<ErrorCategory::ShutdownError>(status.code()); +} + +bool SdamErrorHandler::_isNetworkError(const Status& status) const { + return ErrorCodes::isA<ErrorCategory::NetworkError>(status.code()); +} + +bool SdamErrorHandler::_isNotMasterOrNotRecovering(const Status& status) const { + return _isNodeRecovering(status) || _isNotMaster(status); +} + +bool SdamErrorHandler::_isNotMaster(const Status& status) const { + return ErrorCodes::isA<ErrorCategory::NotMasterError>(status.code()); +} + +int SdamErrorHandler::_getConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) const { + stdx::lock_guard lock(_mutex); + if (auto it = _consecutiveErrorsWithoutIsMasterOutcome.find(host); + it != _consecutiveErrorsWithoutIsMasterOutcome.end()) { + return it->second; + } + return 0; +} + +void SdamErrorHandler::_incrementConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) { + stdx::lock_guard lock(_mutex); + auto [iter, wasEmplaced] = _consecutiveErrorsWithoutIsMasterOutcome.emplace(host, 1); + if (!wasEmplaced) { + ++(iter->second); + } +} + +void SdamErrorHandler::_clearConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) { + stdx::lock_guard lock(_mutex); + _consecutiveErrorsWithoutIsMasterOutcome.erase(host); +} +} // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor_error_handler.h b/src/mongo/client/streamable_replica_set_monitor_error_handler.h new file mode 100644 index 00000000000..86e9db135e7 --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_error_handler.h @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once +#include <boost/optional.hpp> + +#include "mongo/client/sdam/sdam.h" +#include "mongo/executor/network_interface.h" +#include "mongo/platform/mutex.h" +#include "mongo/stdx/unordered_map.h" + +namespace mongo { +class StreamableReplicaSetMonitorErrorHandler { +public: + struct ErrorActions { + bool dropConnections = false; + bool requestImmediateCheck = false; + boost::optional<sdam::IsMasterOutcome> isMasterOutcome; + BSONObj toBSON() const; + }; + + // Indicates whether the initial handshake has completed. + enum class HandshakeStage { kPreHandshake, kPostHandshake }; + + virtual ~StreamableReplicaSetMonitorErrorHandler() = default; + + // Based on the error status, source of the error, and handshake stage determine what + // ErrorActions we should take. + virtual ErrorActions computeErrorActions(const HostAndPort& host, + const Status& status, + HandshakeStage handshakeStage, + bool isApplicationOperation, + BSONObj bson) noexcept = 0; + +protected: + sdam::IsMasterOutcome _createErrorIsMasterOutcome(const HostAndPort& host, + boost::optional<BSONObj> bson, + const Status& status) const { + return sdam::IsMasterOutcome(host.toString(), bson ? *bson : BSONObj(), status.toString()); + } +}; + +class SdamErrorHandler final : public StreamableReplicaSetMonitorErrorHandler { +public: + explicit SdamErrorHandler(std::string setName) : _setName(std::move(setName)){}; + + ErrorActions computeErrorActions(const HostAndPort& host, + const Status& status, + HandshakeStage handshakeStage, + bool isApplicationOperation, + BSONObj bson) noexcept override; + +private: + int _getConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host) const; + void _incrementConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host); + void _clearConsecutiveErrorsWithoutIsMasterOutcome(const HostAndPort& host); + + bool _isNodeRecovering(const Status& status) const; + bool _isNetworkTimeout(const Status& status) const; + bool _isNodeShuttingDown(const Status& status) const; + bool _isNetworkError(const Status& status) const; + bool _isNotMasterOrNotRecovering(const Status& status) const; + bool _isNotMaster(const Status& status) const; + + const std::string _setName; + mutable Mutex _mutex; + stdx::unordered_map<HostAndPort, int> _consecutiveErrorsWithoutIsMasterOutcome; +}; +} // namespace mongo diff --git a/src/mongo/client/streamable_replica_set_monitor_error_handler_test.cpp b/src/mongo/client/streamable_replica_set_monitor_error_handler_test.cpp new file mode 100644 index 00000000000..45aea46b787 --- /dev/null +++ b/src/mongo/client/streamable_replica_set_monitor_error_handler_test.cpp @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/client/streamable_replica_set_monitor_error_handler.h" + +#include <boost/optional/optional_io.hpp> + +#include "mongo/client/sdam/sdam.h" +#include "mongo/logv2/log.h" +#include "mongo/platform/basic.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +using HandshakeStage = StreamableReplicaSetMonitorErrorHandler::HandshakeStage; +using ErrorActions = StreamableReplicaSetMonitorErrorHandler::ErrorActions; +using Error = ErrorCodes::Error; + +class StreamableReplicaSetMonitorErrorHandlerTestFixture : public unittest::Test { +public: + Status makeStatus(Error error) { + return Status(error, kErrorMessage); + } + + void verifyActions(ErrorActions result, ErrorActions expectedResult) { + ASSERT_EQUALS(result.dropConnections, expectedResult.dropConnections); + ASSERT_EQUALS(result.requestImmediateCheck, expectedResult.requestImmediateCheck); + if (expectedResult.isMasterOutcome) { + ASSERT_FALSE(result.isMasterOutcome->isSuccess()); + std::string resultError = result.isMasterOutcome->getErrorMsg(); + ASSERT_NOT_EQUALS(resultError.find(expectedResult.isMasterOutcome->getErrorMsg()), + std::string::npos); + } else { + ASSERT_FALSE(result.isMasterOutcome); + } + } + + void testScenario(HandshakeStage stage, + bool isApplicationOperation, + std::vector<Error> errors, + std::function<ErrorActions(const Status&)> expectedResultGenerator, + int numAttempts = 1) { + auto testSubject = subject(); + + const auto prePost = (stage == HandshakeStage::kPreHandshake) ? "pre" : "post"; + const auto applicationOperation = (isApplicationOperation) ? "application" : "monitoring"; + LOGV2_INFO(4712105, + "Check Scenario", + "handshake"_attr = prePost, + "operationType"_attr = applicationOperation); + for (auto error : errors) { + LOGV2_INFO(4712106, "Check error ", "error"_attr = ErrorCodes::errorString(error)); + for (int attempt = 0; attempt < numAttempts; attempt++) { + auto result = testSubject->computeErrorActions( + kHost, makeStatus(error), stage, isApplicationOperation, kErrorBson); + verifyActions(result, expectedResultGenerator(makeStatus(error))); + LOGV2_INFO(4712107, "Attempt Successful", "num"_attr = attempt); + } + } + } + + void testScenario(HandshakeStage stage, + bool isApplicationOperation, + std::vector<Error> errors, + ErrorActions expectedResult, + int numAttempts = 1) { + testScenario(stage, isApplicationOperation, errors, [expectedResult](const Status&) { + return expectedResult; + }); + } + + std::unique_ptr<StreamableReplicaSetMonitorErrorHandler> subject() { + return std::make_unique<SdamErrorHandler>(kSetName); + } + + inline static const std::vector<Error> kNetworkErrors{ErrorCodes::SocketException, + ErrorCodes::NetworkTimeout, + ErrorCodes::HostNotFound, + ErrorCodes::HostUnreachable}; + inline static const std::vector<Error> kNetworkErrorsNoTimeout{ + ErrorCodes::SocketException, ErrorCodes::HostNotFound, ErrorCodes::HostUnreachable}; + inline static const std::vector<Error> kInternalError{ErrorCodes::InternalError}; + inline static const std::vector<Error> kNotMasterAndNodeRecovering{ + ErrorCodes::InterruptedAtShutdown, + ErrorCodes::InterruptedDueToReplStateChange, + ErrorCodes::NotMasterOrSecondary, + ErrorCodes::PrimarySteppedDown, + ErrorCodes::ShutdownInProgress, + ErrorCodes::NotMaster, + ErrorCodes::NotMasterNoSlaveOk}; + + inline static const std::string kSetName = "setName"; + inline static const HostAndPort kHost = HostAndPort("foobar:123"); + inline static const std::string kErrorMessage = "an error message"; + inline static const BSONObj kErrorBson = BSONObjBuilder().append("ok", 0).obj(); + inline static const sdam::IsMasterOutcome kErrorIsMasterOutcome = + sdam::IsMasterOutcome(kHost.toString(), kErrorBson, kErrorMessage); + + static constexpr bool kApplicationOperation = true; + static constexpr bool kMonitoringOperation = false; + + static constexpr int kThreeAttempts = 3; +}; + +TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, ApplicationNetworkErrorsPreHandshake) { + testScenario( + HandshakeStage::kPreHandshake, + kApplicationOperation, + kNetworkErrors, + StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, false, kErrorIsMasterOutcome}); +}; + +// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#network-error-when-reading-or-writing +TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, ApplicationNetworkErrorsPostHandshake) { + testScenario( + HandshakeStage::kPostHandshake, + kApplicationOperation, + kNetworkErrorsNoTimeout, + StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, false, kErrorIsMasterOutcome}); +}; + +// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#network-error-during-server-check +TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, MonitoringNetworkErrorsPostHandshake) { + // Two consecutive errors must occur to expect an unknown server description. + const auto errorServerDescriptionOnSecondNetworkFailure = [](const Status& status) { + static int count = 0; + count = (count + 1) % 2; + return (count == 1) + ? StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, true, boost::none} + : StreamableReplicaSetMonitorErrorHandler::ErrorActions{ + true, false, kErrorIsMasterOutcome}; + }; + + testScenario(HandshakeStage::kPostHandshake, + kMonitoringOperation, + kNetworkErrors, + errorServerDescriptionOnSecondNetworkFailure, + kThreeAttempts); +} + +// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#network-error-during-server-check +TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, MonitoringNetworkErrorsPreHandshake) { + testScenario( + HandshakeStage::kPreHandshake, + kMonitoringOperation, + kNetworkErrors, + StreamableReplicaSetMonitorErrorHandler::ErrorActions{true, false, kErrorIsMasterOutcome}, + kThreeAttempts); +} + +// https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-master-and-node-is-recovering +TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, ApplicationNotMasterOrNodeRecovering) { + const auto shutdownErrorsDropConnections = [](const Status& status) { + if (ErrorCodes::isA<ErrorCategory::ShutdownError>(status.code())) { + return StreamableReplicaSetMonitorErrorHandler::ErrorActions{ + true, true, kErrorIsMasterOutcome}; + } else { + return StreamableReplicaSetMonitorErrorHandler::ErrorActions{ + false, true, kErrorIsMasterOutcome}; + } + }; + + testScenario(HandshakeStage::kPostHandshake, + kApplicationOperation, + kNotMasterAndNodeRecovering, + shutdownErrorsDropConnections); +} + +TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, MonitoringNonNetworkError) { + testScenario( + HandshakeStage::kPostHandshake, + kMonitoringOperation, + kInternalError, + StreamableReplicaSetMonitorErrorHandler::ErrorActions{false, false, kErrorIsMasterOutcome}); +} + +TEST_F(StreamableReplicaSetMonitorErrorHandlerTestFixture, + ApplicationNonNetworkIsMasterOrRecoveringError) { + testScenario( + HandshakeStage::kPostHandshake, + kMonitoringOperation, + kInternalError, + StreamableReplicaSetMonitorErrorHandler::ErrorActions{false, false, kErrorIsMasterOutcome}); +} +} // namespace mongo |