diff options
author | jannaerin <golden.janna@gmail.com> | 2020-03-04 12:08:42 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-25 04:30:03 +0000 |
commit | 8124a8d047ce142f6d6defc089e5e71192721a5c (patch) | |
tree | dddfd98d579eb288b31bb28cb29072dbc37b2c20 /src/mongo/client/server_is_master_monitor.cpp | |
parent | 4601bd54dfd3f3ab20b357b71a4b17667143c0fb (diff) | |
download | mongo-8124a8d047ce142f6d6defc089e5e71192721a5c.tar.gz |
SERVER-44954 Streamable RSM uses exhaust isMaster
Diffstat (limited to 'src/mongo/client/server_is_master_monitor.cpp')
-rw-r--r-- | src/mongo/client/server_is_master_monitor.cpp | 154 |
1 files changed, 133 insertions, 21 deletions
diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp index 7f70753a772..f399ffeea38 100644 --- a/src/mongo/client/server_is_master_monitor.cpp +++ b/src/mongo/client/server_is_master_monitor.cpp @@ -36,10 +36,13 @@ #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/util/log.h" namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS); + const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1); using executor::NetworkInterface; @@ -53,10 +56,12 @@ const Milliseconds kZeroMs = Milliseconds{0}; SingleServerIsMasterMonitor::SingleServerIsMasterMonitor( const MongoURI& setUri, const sdam::ServerAddress& host, + boost::optional<TopologyVersion> topologyVersion, Milliseconds heartbeatFrequencyMS, sdam::TopologyEventsPublisherPtr eventListener, std::shared_ptr<executor::TaskExecutor> executor) : _host(host), + _topologyVersion(topologyVersion), _eventListener(eventListener), _executor(executor), _heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)), @@ -103,7 +108,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() { return; } - const auto currentRefreshPeriod = _currentRefreshPeriod(lock); + const auto currentRefreshPeriod = _currentRefreshPeriod(lock, false); const Milliseconds timeSinceLastCheck = (_lastIsMasterAt) ? _executor->now() - *_lastIsMasterAt : Milliseconds::max(); @@ -161,19 +166,111 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d } void SingleServerIsMasterMonitor::_doRemoteCommand() { - auto request = executor::RemoteCommandRequest( - HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS); - request.sslMode = _setUri.getSSLMode(); - stdx::lock_guard lock(_mutex); if (_isShutdown) return; + StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() { + if (_topologyVersion) { + return _scheduleStreamableIsMaster(); + } + + return _scheduleSingleIsMaster(); + }(); + + if (!swCbHandle.isOK()) { + uasserted(46156012, swCbHandle.getStatus().toString()); + } + + _isMasterOutstanding = true; + _remoteCommandHandle = swCbHandle.getValue(); +} + +StatusWith<TaskExecutor::CallbackHandle> +SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() { + auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs); + overrideMaxAwaitTimeMS.execute( + [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); }); + auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << maxAwaitTimeMS + << "topologyVersion" << _topologyVersion->toBSON()); + + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs; + auto request = executor::RemoteCommandRequest( + HostAndPort(_host), "admin", isMasterCmd, nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); + + Timer timer; + auto swCbHandle = _executor->scheduleExhaustRemoteCommand( + std::move(request), + [self = shared_from_this(), + timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + Milliseconds nextRefreshPeriod; + { + stdx::lock_guard lk(self->_mutex); + + if (self->_isShutdown) { + self->_isMasterOutstanding = false; + LOGV2_DEBUG(4495400, + kLogLevel, + "RSM {setName} not processing response: {status}", + "status"_attr = result.response.status, + "setName"_attr = self->_setUri.getSetName()); + return; + } + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + self->_lastIsMasterAt = self->_executor->now(); + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } + } + + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for + // RTT instead + Microseconds latency(timer.micros()); + LOGV2_DEBUG(43332190, + 0, + "RSM {setName} exhuast latency is ms: {status}", + "status"_attr = result.response.status, + "setName"_attr = self->_setUri.getSetName()); + if (result.response.isOK()) { + self->_onIsMasterSuccess(latency, result.response.data); + } else { + self->_onIsMasterFailure(latency, result.response.status, result.response.data); + } + }); + + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT + // instead. Also move this block into _doRemoteCommand because we will not need the latency + // value here. + if (!swCbHandle.isOK()) { + Microseconds latency(timer.micros()); + _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); + } + + return swCbHandle; +} + +StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() { + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS; + auto request = executor::RemoteCommandRequest( + HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); + Timer timer; auto swCbHandle = _executor->scheduleRemoteCommand( std::move(request), - [this, self = shared_from_this(), timer]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + [self = shared_from_this(), + timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { Milliseconds nextRefreshPeriod; { stdx::lock_guard lk(self->_mutex); @@ -184,21 +281,29 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() { kLogLevel, "RSM {setName} not processing response: {status}", "status"_attr = result.response.status, - "setName"_attr = _setUri.getSetName()); + "setName"_attr = self->_setUri.getSetName()); return; } self->_lastIsMasterAt = self->_executor->now(); - nextRefreshPeriod = self->_currentRefreshPeriod(lk); - - LOGV2_DEBUG(4333228, - kLogLevel + 1, - "RSM {setName} next refresh period in {period}", - "period"_attr = nextRefreshPeriod.toString(), - "setName"_attr = _setUri.getSetName()); - self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } } + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for + // RTT instead Microseconds latency(timer.micros()); if (result.response.isOK()) { self->_onIsMasterSuccess(latency, result.response.data); @@ -207,20 +312,22 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() { } }); + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT + // instead. Also move this block into _doRemoteCommand because we will not need the latency + // value here. if (!swCbHandle.isOK()) { Microseconds latency(timer.micros()); _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); - uasserted(31448, swCbHandle.getStatus().toString()); } - _isMasterOutstanding = true; - _remoteCommandHandle = swCbHandle.getValue(); + return swCbHandle; } void SingleServerIsMasterMonitor::shutdown() { stdx::lock_guard lock(_mutex); - if (std::exchange(_isShutdown, true)) + if (std::exchange(_isShutdown, true)) { return; + } LOGV2_DEBUG(4333220, kLogLevel + 1, @@ -296,7 +403,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or return r; } -Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) { +Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock, + bool scheduleImmediately) { + if (scheduleImmediately) + return Milliseconds(0); + return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS : _heartbeatFrequencyMS; } @@ -386,6 +497,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent( _singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>( _setUri, serverAddress, + serverDescription->getTopologyVersion(), _sdamConfiguration.getHeartBeatFrequency(), _eventPublisher, _executor); |