summaryrefslogtreecommitdiff
path: root/src/mongo/client/server_is_master_monitor.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/client/server_is_master_monitor.cpp')
-rw-r--r--src/mongo/client/server_is_master_monitor.cpp154
1 files changed, 133 insertions, 21 deletions
diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp
index 7f70753a772..f399ffeea38 100644
--- a/src/mongo/client/server_is_master_monitor.cpp
+++ b/src/mongo/client/server_is_master_monitor.cpp
@@ -36,10 +36,13 @@
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS);
+
const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1);
using executor::NetworkInterface;
@@ -53,10 +56,12 @@ const Milliseconds kZeroMs = Milliseconds{0};
SingleServerIsMasterMonitor::SingleServerIsMasterMonitor(
const MongoURI& setUri,
const sdam::ServerAddress& host,
+ boost::optional<TopologyVersion> topologyVersion,
Milliseconds heartbeatFrequencyMS,
sdam::TopologyEventsPublisherPtr eventListener,
std::shared_ptr<executor::TaskExecutor> executor)
: _host(host),
+ _topologyVersion(topologyVersion),
_eventListener(eventListener),
_executor(executor),
_heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)),
@@ -103,7 +108,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() {
return;
}
- const auto currentRefreshPeriod = _currentRefreshPeriod(lock);
+ const auto currentRefreshPeriod = _currentRefreshPeriod(lock, false);
const Milliseconds timeSinceLastCheck =
(_lastIsMasterAt) ? _executor->now() - *_lastIsMasterAt : Milliseconds::max();
@@ -161,19 +166,111 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
}
void SingleServerIsMasterMonitor::_doRemoteCommand() {
- auto request = executor::RemoteCommandRequest(
- HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS);
- request.sslMode = _setUri.getSSLMode();
-
stdx::lock_guard lock(_mutex);
if (_isShutdown)
return;
+ StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() {
+ if (_topologyVersion) {
+ return _scheduleStreamableIsMaster();
+ }
+
+ return _scheduleSingleIsMaster();
+ }();
+
+ if (!swCbHandle.isOK()) {
+ uasserted(46156012, swCbHandle.getStatus().toString());
+ }
+
+ _isMasterOutstanding = true;
+ _remoteCommandHandle = swCbHandle.getValue();
+}
+
+StatusWith<TaskExecutor::CallbackHandle>
+SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() {
+ auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs);
+ overrideMaxAwaitTimeMS.execute(
+ [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); });
+ auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << maxAwaitTimeMS
+ << "topologyVersion" << _topologyVersion->toBSON());
+
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs;
+ auto request = executor::RemoteCommandRequest(
+ HostAndPort(_host), "admin", isMasterCmd, nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
+
+ Timer timer;
+ auto swCbHandle = _executor->scheduleExhaustRemoteCommand(
+ std::move(request),
+ [self = shared_from_this(),
+ timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ Milliseconds nextRefreshPeriod;
+ {
+ stdx::lock_guard lk(self->_mutex);
+
+ if (self->_isShutdown) {
+ self->_isMasterOutstanding = false;
+ LOGV2_DEBUG(4495400,
+ kLogLevel,
+ "RSM {setName} not processing response: {status}",
+ "status"_attr = result.response.status,
+ "setName"_attr = self->_setUri.getSetName());
+ return;
+ }
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ self->_lastIsMasterAt = self->_executor->now();
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
+ }
+
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for
+ // RTT instead
+ Microseconds latency(timer.micros());
+ LOGV2_DEBUG(43332190,
+ 0,
+ "RSM {setName} exhuast latency is ms: {status}",
+ "status"_attr = result.response.status,
+ "setName"_attr = self->_setUri.getSetName());
+ if (result.response.isOK()) {
+ self->_onIsMasterSuccess(latency, result.response.data);
+ } else {
+ self->_onIsMasterFailure(latency, result.response.status, result.response.data);
+ }
+ });
+
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT
+ // instead. Also move this block into _doRemoteCommand because we will not need the latency
+ // value here.
+ if (!swCbHandle.isOK()) {
+ Microseconds latency(timer.micros());
+ _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
+ }
+
+ return swCbHandle;
+}
+
+StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() {
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS;
+ auto request = executor::RemoteCommandRequest(
+ HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
+
Timer timer;
auto swCbHandle = _executor->scheduleRemoteCommand(
std::move(request),
- [this, self = shared_from_this(), timer](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ [self = shared_from_this(),
+ timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
Milliseconds nextRefreshPeriod;
{
stdx::lock_guard lk(self->_mutex);
@@ -184,21 +281,29 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() {
kLogLevel,
"RSM {setName} not processing response: {status}",
"status"_attr = result.response.status,
- "setName"_attr = _setUri.getSetName());
+ "setName"_attr = self->_setUri.getSetName());
return;
}
self->_lastIsMasterAt = self->_executor->now();
- nextRefreshPeriod = self->_currentRefreshPeriod(lk);
-
- LOGV2_DEBUG(4333228,
- kLogLevel + 1,
- "RSM {setName} next refresh period in {period}",
- "period"_attr = nextRefreshPeriod.toString(),
- "setName"_attr = _setUri.getSetName());
- self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
}
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for
+ // RTT instead
Microseconds latency(timer.micros());
if (result.response.isOK()) {
self->_onIsMasterSuccess(latency, result.response.data);
@@ -207,20 +312,22 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() {
}
});
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT
+ // instead. Also move this block into _doRemoteCommand because we will not need the latency
+ // value here.
if (!swCbHandle.isOK()) {
Microseconds latency(timer.micros());
_onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
- uasserted(31448, swCbHandle.getStatus().toString());
}
- _isMasterOutstanding = true;
- _remoteCommandHandle = swCbHandle.getValue();
+ return swCbHandle;
}
void SingleServerIsMasterMonitor::shutdown() {
stdx::lock_guard lock(_mutex);
- if (std::exchange(_isShutdown, true))
+ if (std::exchange(_isShutdown, true)) {
return;
+ }
LOGV2_DEBUG(4333220,
kLogLevel + 1,
@@ -296,7 +403,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or
return r;
}
-Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) {
+Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock,
+ bool scheduleImmediately) {
+ if (scheduleImmediately)
+ return Milliseconds(0);
+
return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS
: _heartbeatFrequencyMS;
}
@@ -386,6 +497,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent(
_singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>(
_setUri,
serverAddress,
+ serverDescription->getTopologyVersion(),
_sdamConfiguration.getHeartBeatFrequency(),
_eventPublisher,
_executor);