diff options
author | jannaerin <golden.janna@gmail.com> | 2020-03-04 12:08:42 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-04-17 16:47:41 +0000 |
commit | 23e6f954d7ef5ab73f5540b46c6b3794b7ecfbdc (patch) | |
tree | 53d74dc86d460640a189477eb3bac78fb188a3dc /src/mongo/client | |
parent | 97e181dfdf8516ea4b7543e62130057e61a5ebc3 (diff) | |
download | mongo-23e6f954d7ef5ab73f5540b46c6b3794b7ecfbdc.tar.gz |
SERVER-44954 Streamable RSM uses exhaust isMaster
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/async_client.cpp | 52 | ||||
-rw-r--r-- | src/mongo/client/async_client.h | 28 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.cpp | 16 | ||||
-rw-r--r-- | src/mongo/client/sdam/topology_listener.h | 14 | ||||
-rw-r--r-- | src/mongo/client/server_is_master_monitor.cpp | 181 | ||||
-rw-r--r-- | src/mongo/client/server_is_master_monitor.h | 20 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.cpp | 21 | ||||
-rw-r--r-- | src/mongo/client/streamable_replica_set_monitor.h | 6 |
8 files changed, 192 insertions, 146 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index 99d93dab68a..f25830de073 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -316,55 +316,41 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -Future<void> AsyncDBClient::_continueReceiveExhaustResponse( - ExhaustRequestParameters&& exhaustRequestParameters, - boost::optional<int32_t> msgId, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse( + ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) { return _waitForResponse(msgId, baton) - .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this]( - Message responseMsg) mutable -> Future<void> { - // Run callback - auto now = exhaustParameters.clkSource->now(); - auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start); + .then([stopwatch, msgId, baton, this](Message responseMsg) mutable { bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome); rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg)); - exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration), - isMoreToComeSet); - - if (!isMoreToComeSet) { - return Status::OK(); - } - - exhaustParameters.start = now; - return _continueReceiveExhaustResponse( - std::move(exhaustParameters), boost::none, baton); + auto rcResponse = executor::RemoteCommandResponse( + *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet); + return rcResponse; }); } -Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand( + const BatonHandle& baton) { + return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton); +} + +Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request, + const BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported); - auto clkSource = _svcCtx->getPreciseClockSource(); - auto start = clkSource->now(); auto msgId = nextMessageId(); - return _call(std::move(requestMsg), msgId, baton) - .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable { - ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start}; - return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton); - }); + return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable { + return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton); + }); } -Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest( + executor::RemoteCommandRequest request, const BatonHandle& baton) { auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton); + return runExhaustCommand(std::move(opMsgRequest), baton); } void AsyncDBClient::cancel(const BatonHandle& baton) { diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index c7917bb5461..d2279374dda 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -47,19 +47,6 @@ namespace mongo { class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> { public: - using RemoteCommandCallbackFn = - unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>; - - struct ExhaustRequestParameters { - ExhaustRequestParameters(ExhaustRequestParameters&&) = default; - ExhaustRequestParameters(const ExhaustRequestParameters&) = delete; - ExhaustRequestParameters& operator=(const ExhaustRequestParameters&) = delete; - - RemoteCommandCallbackFn cb; - ClockSource* clkSource; - Date_t start; - }; - explicit AsyncDBClient(const HostAndPort& peer, transport::SessionHandle session, ServiceContext* svcCtx) @@ -79,12 +66,11 @@ public: const BatonHandle& baton = nullptr, bool fireAndForget = false); - Future<void> runExhaustCommandRequest(executor::RemoteCommandRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton = nullptr); - Future<void> runExhaustCommand(OpMsgRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> beginExhaustCommandRequest( + executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> runExhaustCommand(OpMsgRequest request, + const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> awaitExhaustCommand(const BatonHandle& baton = nullptr); Future<void> authenticate(const BSONObj& params); @@ -108,8 +94,8 @@ public: const HostAndPort& local() const; private: - Future<void> _continueReceiveExhaustResponse( - ExhaustRequestParameters&& exhaustRequestParameters, + Future<executor::RemoteCommandResponse> _continueReceiveExhaustResponse( + ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton = nullptr); Future<Message> _waitForResponse(boost::optional<int32_t> msgId, diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp index 48e1f2b147b..3d4fe2b92cb 100644 --- a/src/mongo/client/sdam/topology_listener.cpp +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -92,14 +92,12 @@ void TopologyEventsPublisher::onServerHandshakeFailedEvent(const sdam::ServerAdd _scheduleNextDelivery(); } -void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort, +void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply) { { stdx::lock_guard lock(_eventQueueMutex); EventPtr event = std::make_unique<Event>(); event->type = EventType::HEARTBEAT_SUCCESS; - event->duration = duration_cast<IsMasterRTT>(durationMs); event->hostAndPort = hostAndPort; event->reply = reply; _eventQueue.push_back(std::move(event)); @@ -107,15 +105,13 @@ void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durati _scheduleNextDelivery(); } -void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, +void TopologyEventsPublisher::onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) { { stdx::lock_guard lock(_eventQueueMutex); EventPtr event = std::make_unique<Event>(); event->type = EventType::HEARTBEAT_FAILURE; - event->duration = duration_cast<IsMasterRTT>(durationMs); event->hostAndPort = hostAndPort; event->reply = reply; event->status = errorStatus; @@ -189,14 +185,10 @@ void TopologyEventsPublisher::_nextDelivery() { void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) { switch (event.type) { case EventType::HEARTBEAT_SUCCESS: - listener->onServerHeartbeatSucceededEvent( - duration_cast<IsMasterRTT>(event.duration), event.hostAndPort, event.reply); + listener->onServerHeartbeatSucceededEvent(event.hostAndPort, event.reply); break; case EventType::HEARTBEAT_FAILURE: - listener->onServerHeartbeatFailureEvent(duration_cast<IsMasterRTT>(event.duration), - event.status, - event.hostAndPort, - event.reply); + listener->onServerHeartbeatFailureEvent(event.status, event.hostAndPort, event.reply); break; case EventType::TOPOLOGY_DESCRIPTION_CHANGED: // TODO SERVER-46497: fix uuid or just remove diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h index 5cc737706d0..ff550309068 100644 --- a/src/mongo/client/sdam/topology_listener.h +++ b/src/mongo/client/sdam/topology_listener.h @@ -52,8 +52,7 @@ public: TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription){}; - virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, + virtual void onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply){}; /** @@ -73,8 +72,7 @@ public: * hostAndPort succeeded. durationMS is the execution time of the event, including the time it * took to send the message and recieve the reply from the server. */ - virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort, + virtual void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply){}; /* @@ -115,13 +113,11 @@ public: void onServerHandshakeFailedEvent(const sdam::ServerAddress& address, const Status& status, - const BSONObj reply) override; + const BSONObj reply); - void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort, + void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply) override; - void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, + void onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) override; void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp index 63eed95be4d..aa9f2b4b8c4 100644 --- a/src/mongo/client/server_is_master_monitor.cpp +++ b/src/mongo/client/server_is_master_monitor.cpp @@ -40,6 +40,8 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS); + const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1); using executor::NetworkInterface; @@ -53,10 +55,12 @@ const Milliseconds kZeroMs = Milliseconds{0}; SingleServerIsMasterMonitor::SingleServerIsMasterMonitor( const MongoURI& setUri, const sdam::ServerAddress& host, + boost::optional<TopologyVersion> topologyVersion, Milliseconds heartbeatFrequencyMS, sdam::TopologyEventsPublisherPtr eventListener, std::shared_ptr<executor::TaskExecutor> executor) : _host(host), + _topologyVersion(topologyVersion), _eventListener(eventListener), _executor(executor), _heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)), @@ -83,7 +87,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() { // The previous refresh period may or may not have been expedited. // Saving the value here before we change to expedited mode. - const auto previousRefreshPeriod = _currentRefreshPeriod(lock); + const auto previousRefreshPeriod = _currentRefreshPeriod(lock, false); if (!_isExpedited) { // save some log lines. @@ -98,7 +102,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() { } // Get the new expedited refresh period. - const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock); + const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock, false); if (_isMasterOutstanding) { LOGV2_DEBUG(4333216, @@ -164,7 +168,6 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d invariant(!_isMasterOutstanding); - Timer timer; auto swCbHandle = _executor->scheduleWorkAt( _executor->now() + delay, [self = shared_from_this()](const executor::TaskExecutor::CallbackArgs& cbData) { @@ -175,8 +178,7 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d }); if (!swCbHandle.isOK()) { - Microseconds latency(timer.micros()); - _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); + _onIsMasterFailure(swCbHandle.getStatus(), BSONObj()); return; } @@ -184,18 +186,93 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d } void SingleServerIsMasterMonitor::_doRemoteCommand() { - auto request = executor::RemoteCommandRequest( - HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS); - request.sslMode = _setUri.getSSLMode(); - stdx::lock_guard lock(_mutex); if (_isShutdown) return; - Timer timer; + StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() { + if (_topologyVersion) { + return _scheduleStreamableIsMaster(); + } + + return _scheduleSingleIsMaster(); + }(); + + if (!swCbHandle.isOK()) { + _onIsMasterFailure(swCbHandle.getStatus(), BSONObj()); + uasserted(46156012, swCbHandle.getStatus().toString()); + } + + _isMasterOutstanding = true; + _remoteCommandHandle = swCbHandle.getValue(); +} + +StatusWith<TaskExecutor::CallbackHandle> +SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() { + auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs); + overrideMaxAwaitTimeMS.execute( + [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); }); + auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << maxAwaitTimeMS + << "topologyVersion" << _topologyVersion->toBSON()); + + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs; + auto request = executor::RemoteCommandRequest( + HostAndPort(_host), "admin", isMasterCmd, nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); + + auto swCbHandle = _executor->scheduleExhaustRemoteCommand( + std::move(request), + [self = shared_from_this()]( + const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + Milliseconds nextRefreshPeriod; + { + stdx::lock_guard lk(self->_mutex); + + if (self->_isShutdown) { + self->_isMasterOutstanding = false; + LOGV2_DEBUG(4495400, + kLogLevel, + "RSM {setName} not processing response: {status}", + "status"_attr = result.response.status, + "setName"_attr = self->_setUri.getSetName()); + return; + } + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + self->_lastIsMasterAt = self->_executor->now(); + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } + } + + if (result.response.isOK()) { + self->_onIsMasterSuccess(result.response.data); + } else { + self->_onIsMasterFailure(result.response.status, result.response.data); + } + }); + + return swCbHandle; +} + +StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() { + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS; + auto request = executor::RemoteCommandRequest( + HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); + auto swCbHandle = _executor->scheduleRemoteCommand( std::move(request), - [this, self = shared_from_this(), timer]( + [self = shared_from_this()]( const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { Milliseconds nextRefreshPeriod; { @@ -207,43 +284,42 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() { kLogLevel, "RSM {setName} not processing response: {status}", "status"_attr = result.response.status, - "setName"_attr = _setUri.getSetName()); + "setName"_attr = self->_setUri.getSetName()); return; } self->_lastIsMasterAt = self->_executor->now(); - nextRefreshPeriod = self->_currentRefreshPeriod(lk); - - LOGV2_DEBUG(4333228, - kLogLevel + 1, - "RSM {setName} next refresh period in {period}", - "period"_attr = nextRefreshPeriod.toString(), - "setName"_attr = _setUri.getSetName()); - self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } } - Microseconds latency(timer.micros()); if (result.response.isOK()) { - self->_onIsMasterSuccess(latency, result.response.data); + self->_onIsMasterSuccess(result.response.data); } else { - self->_onIsMasterFailure(latency, result.response.status, result.response.data); + self->_onIsMasterFailure(result.response.status, result.response.data); } }); - if (!swCbHandle.isOK()) { - Microseconds latency(timer.micros()); - _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); - uasserted(31448, swCbHandle.getStatus().toString()); - } - - _isMasterOutstanding = true; - _remoteCommandHandle = swCbHandle.getValue(); + return swCbHandle; } void SingleServerIsMasterMonitor::shutdown() { stdx::lock_guard lock(_mutex); - if (std::exchange(_isShutdown, true)) + if (std::exchange(_isShutdown, true)) { return; + } LOGV2_DEBUG(4333220, kLogLevel + 1, @@ -274,35 +350,27 @@ void SingleServerIsMasterMonitor::_cancelOutstandingRequest(WithLock) { _isMasterOutstanding = false; } -void SingleServerIsMasterMonitor::_onIsMasterSuccess(sdam::IsMasterRTT latency, - const BSONObj bson) { +void SingleServerIsMasterMonitor::_onIsMasterSuccess(const BSONObj bson) { LOGV2_DEBUG(4333221, kLogLevel + 1, - "RSM {setName} received successful isMaster for server {host} ({latency}): {bson}", + "RSM {setName} received successful isMaster for server {host}: {bson}", "host"_attr = _host, - "latency"_attr = latency, "setName"_attr = _setUri.getSetName(), "bson"_attr = bson.toString()); - _eventListener->onServerHeartbeatSucceededEvent( - duration_cast<Milliseconds>(latency), _host, bson); + _eventListener->onServerHeartbeatSucceededEvent(_host, bson); } -void SingleServerIsMasterMonitor::_onIsMasterFailure(sdam::IsMasterRTT latency, - const Status& status, - const BSONObj bson) { - LOGV2_DEBUG( - 4333222, - kLogLevel, - "RSM {setName} received failed isMaster for server {host}: {status} ({latency}): {bson}", - "host"_attr = _host, - "status"_attr = status.toString(), - "latency"_attr = latency, - "setName"_attr = _setUri.getSetName(), - "bson"_attr = bson.toString()); - - _eventListener->onServerHeartbeatFailureEvent( - duration_cast<Milliseconds>(latency), status, _host, bson); +void SingleServerIsMasterMonitor::_onIsMasterFailure(const Status& status, const BSONObj bson) { + LOGV2_DEBUG(4333222, + kLogLevel, + "RSM {setName} received failed isMaster for server {host}: {status}: {bson}", + "host"_attr = _host, + "status"_attr = status.toString(), + "setName"_attr = _setUri.getSetName(), + "bson"_attr = bson.toString()); + + _eventListener->onServerHeartbeatFailureEvent(status, _host, bson); } Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds original) { @@ -319,7 +387,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or return r; } -Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) { +Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock, + bool scheduleImmediately) { + if (scheduleImmediately) + return Milliseconds(0); + return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS : _heartbeatFrequencyMS; } @@ -409,6 +481,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent( _singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>( _setUri, serverAddress, + serverDescription->getTopologyVersion(), _sdamConfiguration.getHeartBeatFrequency(), _eventPublisher, _executor); diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h index c41bd6c8ce1..e788cc3c95a 100644 --- a/src/mongo/client/server_is_master_monitor.h +++ b/src/mongo/client/server_is_master_monitor.h @@ -40,6 +40,7 @@ class SingleServerIsMasterMonitor public: explicit SingleServerIsMasterMonitor(const MongoURI& setUri, const ServerAddress& host, + boost::optional<TopologyVersion> topologyVersion, Milliseconds heartbeatFrequencyMS, TopologyEventsPublisherPtr eventListener, std::shared_ptr<executor::TaskExecutor> executor); @@ -65,16 +66,28 @@ public: const Milliseconds& expeditedRefreshPeriod, const Milliseconds& previousRefreshPeriod); + // Sent in the initial isMaster request when using the streamable exhaust protocol. The max + // duration a server should wait for a significant topology change before sending a response. + static constexpr Milliseconds kMaxAwaitTimeMs = Milliseconds(10000); + private: void _scheduleNextIsMaster(WithLock, Milliseconds delay); void _rescheduleNextIsMaster(WithLock, Milliseconds delay); void _doRemoteCommand(); - void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson); - void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson); + // Use the awaitable isMaster protocol with the exhaust bit set. Attach _topologyVersion and + // kMaxAwaitTimeMS to the request. + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleStreamableIsMaster(); + + // Use the old isMaster protocol. Do not attach _topologyVersion or kMaxAwaitTimeMS to the + // request. + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleSingleIsMaster(); + + void _onIsMasterSuccess(const BSONObj bson); + void _onIsMasterFailure(const Status& status, const BSONObj bson); Milliseconds _overrideRefreshPeriod(Milliseconds original); - Milliseconds _currentRefreshPeriod(WithLock); + Milliseconds _currentRefreshPeriod(WithLock, bool scheduleImmediately); void _cancelOutstandingRequest(WithLock); boost::optional<Milliseconds> _timeSinceLastCheck() const; @@ -84,6 +97,7 @@ private: Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SingleServerIsMasterMonitor::mutex"); ServerAddress _host; + boost::optional<TopologyVersion> _topologyVersion; TopologyEventsPublisherPtr _eventListener; std::shared_ptr<executor::TaskExecutor> _executor; Milliseconds _heartbeatFrequencyMS; diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 6ae640dd9af..88a200a33e8 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -199,17 +199,20 @@ void StreamableReplicaSetMonitor::init() { } void StreamableReplicaSetMonitor::drop() { - stdx::lock_guard lock(_mutex); - if (_isDropped.swap(true)) { - return; + { + stdx::lock_guard lock(_mutex); + if (_isDropped.swap(true)) + return; + + _eventsPublisher->close(); + _failOutstandingWithStatus( + lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"}); } + LOGV2(4333209, "Closing Replica Set Monitor {setName}", "setName"_attr = getName()); - _eventsPublisher->close(); _queryProcessor->shutdown(); _pingMonitor->shutdown(); _isMasterMonitor->shutdown(); - _failOutstandingWithStatus( - lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"}); ReplicaSetMonitorManager::get()->getNotifier().onDroppedSet(getName()); LOGV2(4333210, "Done closing Replica Set Monitor {setName}", "setName"_attr = getName()); @@ -613,16 +616,14 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( } } -void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, - const ServerAddress& hostAndPort, +void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply) { // After the inital handshake, isMasterResponses should not update the RTT with durationMs. IsMasterOutcome outcome(hostAndPort, reply, boost::none); _topologyManager->onServerDescription(outcome); } -void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, +void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) { _failedHost( diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 650d7abe237..07873fe199e 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -166,16 +166,14 @@ private: sdam::TopologyDescriptionPtr previousDescription, sdam::TopologyDescriptionPtr newDescription) override; - void onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, - const sdam::ServerAddress& hostAndPort, + void onServerHeartbeatSucceededEvent(const sdam::ServerAddress& hostAndPort, const BSONObj reply) override; void onServerHandshakeFailedEvent(const sdam::ServerAddress& address, const Status& status, const BSONObj reply) override; - void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, + void onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) override; |