summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-03-04 12:08:42 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-17 16:47:41 +0000
commit23e6f954d7ef5ab73f5540b46c6b3794b7ecfbdc (patch)
tree53d74dc86d460640a189477eb3bac78fb188a3dc /src/mongo/client
parent97e181dfdf8516ea4b7543e62130057e61a5ebc3 (diff)
downloadmongo-23e6f954d7ef5ab73f5540b46c6b3794b7ecfbdc.tar.gz
SERVER-44954 Streamable RSM uses exhaust isMaster
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/async_client.cpp52
-rw-r--r--src/mongo/client/async_client.h28
-rw-r--r--src/mongo/client/sdam/topology_listener.cpp16
-rw-r--r--src/mongo/client/sdam/topology_listener.h14
-rw-r--r--src/mongo/client/server_is_master_monitor.cpp181
-rw-r--r--src/mongo/client/server_is_master_monitor.h20
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp21
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h6
8 files changed, 192 insertions, 146 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index 99d93dab68a..f25830de073 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -316,55 +316,41 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-Future<void> AsyncDBClient::_continueReceiveExhaustResponse(
- ExhaustRequestParameters&& exhaustRequestParameters,
- boost::optional<int32_t> msgId,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse(
+ ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) {
return _waitForResponse(msgId, baton)
- .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this](
- Message responseMsg) mutable -> Future<void> {
- // Run callback
- auto now = exhaustParameters.clkSource->now();
- auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start);
+ .then([stopwatch, msgId, baton, this](Message responseMsg) mutable {
bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome);
rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg));
- exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration),
- isMoreToComeSet);
-
- if (!isMoreToComeSet) {
- return Status::OK();
- }
-
- exhaustParameters.start = now;
- return _continueReceiveExhaustResponse(
- std::move(exhaustParameters), boost::none, baton);
+ auto rcResponse = executor::RemoteCommandResponse(
+ *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet);
+ return rcResponse;
});
}
-Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand(
+ const BatonHandle& baton) {
+ return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton);
+}
+
+Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
+ const BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported);
- auto clkSource = _svcCtx->getPreciseClockSource();
- auto start = clkSource->now();
auto msgId = nextMessageId();
- return _call(std::move(requestMsg), msgId, baton)
- .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable {
- ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start};
- return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton);
- });
+ return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable {
+ return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton);
+ });
}
-Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest(
+ executor::RemoteCommandRequest request, const BatonHandle& baton) {
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
- return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton);
+ return runExhaustCommand(std::move(opMsgRequest), baton);
}
void AsyncDBClient::cancel(const BatonHandle& baton) {
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index c7917bb5461..d2279374dda 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -47,19 +47,6 @@ namespace mongo {
class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> {
public:
- using RemoteCommandCallbackFn =
- unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>;
-
- struct ExhaustRequestParameters {
- ExhaustRequestParameters(ExhaustRequestParameters&&) = default;
- ExhaustRequestParameters(const ExhaustRequestParameters&) = delete;
- ExhaustRequestParameters& operator=(const ExhaustRequestParameters&) = delete;
-
- RemoteCommandCallbackFn cb;
- ClockSource* clkSource;
- Date_t start;
- };
-
explicit AsyncDBClient(const HostAndPort& peer,
transport::SessionHandle session,
ServiceContext* svcCtx)
@@ -79,12 +66,11 @@ public:
const BatonHandle& baton = nullptr,
bool fireAndForget = false);
- Future<void> runExhaustCommandRequest(executor::RemoteCommandRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton = nullptr);
- Future<void> runExhaustCommand(OpMsgRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> beginExhaustCommandRequest(
+ executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> runExhaustCommand(OpMsgRequest request,
+ const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> awaitExhaustCommand(const BatonHandle& baton = nullptr);
Future<void> authenticate(const BSONObj& params);
@@ -108,8 +94,8 @@ public:
const HostAndPort& local() const;
private:
- Future<void> _continueReceiveExhaustResponse(
- ExhaustRequestParameters&& exhaustRequestParameters,
+ Future<executor::RemoteCommandResponse> _continueReceiveExhaustResponse(
+ ClockSource::StopWatch stopwatch,
boost::optional<int32_t> msgId,
const BatonHandle& baton = nullptr);
Future<Message> _waitForResponse(boost::optional<int32_t> msgId,
diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp
index 48e1f2b147b..3d4fe2b92cb 100644
--- a/src/mongo/client/sdam/topology_listener.cpp
+++ b/src/mongo/client/sdam/topology_listener.cpp
@@ -92,14 +92,12 @@ void TopologyEventsPublisher::onServerHandshakeFailedEvent(const sdam::ServerAdd
_scheduleNextDelivery();
}
-void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply) {
{
stdx::lock_guard lock(_eventQueueMutex);
EventPtr event = std::make_unique<Event>();
event->type = EventType::HEARTBEAT_SUCCESS;
- event->duration = duration_cast<IsMasterRTT>(durationMs);
event->hostAndPort = hostAndPort;
event->reply = reply;
_eventQueue.push_back(std::move(event));
@@ -107,15 +105,13 @@ void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durati
_scheduleNextDelivery();
}
-void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+void TopologyEventsPublisher::onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) {
{
stdx::lock_guard lock(_eventQueueMutex);
EventPtr event = std::make_unique<Event>();
event->type = EventType::HEARTBEAT_FAILURE;
- event->duration = duration_cast<IsMasterRTT>(durationMs);
event->hostAndPort = hostAndPort;
event->reply = reply;
event->status = errorStatus;
@@ -189,14 +185,10 @@ void TopologyEventsPublisher::_nextDelivery() {
void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) {
switch (event.type) {
case EventType::HEARTBEAT_SUCCESS:
- listener->onServerHeartbeatSucceededEvent(
- duration_cast<IsMasterRTT>(event.duration), event.hostAndPort, event.reply);
+ listener->onServerHeartbeatSucceededEvent(event.hostAndPort, event.reply);
break;
case EventType::HEARTBEAT_FAILURE:
- listener->onServerHeartbeatFailureEvent(duration_cast<IsMasterRTT>(event.duration),
- event.status,
- event.hostAndPort,
- event.reply);
+ listener->onServerHeartbeatFailureEvent(event.status, event.hostAndPort, event.reply);
break;
case EventType::TOPOLOGY_DESCRIPTION_CHANGED:
// TODO SERVER-46497: fix uuid or just remove
diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h
index 5cc737706d0..ff550309068 100644
--- a/src/mongo/client/sdam/topology_listener.h
+++ b/src/mongo/client/sdam/topology_listener.h
@@ -52,8 +52,7 @@ public:
TopologyDescriptionPtr previousDescription,
TopologyDescriptionPtr newDescription){};
- virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+ virtual void onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply){};
/**
@@ -73,8 +72,7 @@ public:
* hostAndPort succeeded. durationMS is the execution time of the event, including the time it
* took to send the message and recieve the reply from the server.
*/
- virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+ virtual void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply){};
/*
@@ -115,13 +113,11 @@ public:
void onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
const Status& status,
- const BSONObj reply) override;
+ const BSONObj reply);
- void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+ void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply) override;
- void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+ void onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) override;
void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override;
diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp
index 63eed95be4d..aa9f2b4b8c4 100644
--- a/src/mongo/client/server_is_master_monitor.cpp
+++ b/src/mongo/client/server_is_master_monitor.cpp
@@ -40,6 +40,8 @@
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS);
+
const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1);
using executor::NetworkInterface;
@@ -53,10 +55,12 @@ const Milliseconds kZeroMs = Milliseconds{0};
SingleServerIsMasterMonitor::SingleServerIsMasterMonitor(
const MongoURI& setUri,
const sdam::ServerAddress& host,
+ boost::optional<TopologyVersion> topologyVersion,
Milliseconds heartbeatFrequencyMS,
sdam::TopologyEventsPublisherPtr eventListener,
std::shared_ptr<executor::TaskExecutor> executor)
: _host(host),
+ _topologyVersion(topologyVersion),
_eventListener(eventListener),
_executor(executor),
_heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)),
@@ -83,7 +87,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() {
// The previous refresh period may or may not have been expedited.
// Saving the value here before we change to expedited mode.
- const auto previousRefreshPeriod = _currentRefreshPeriod(lock);
+ const auto previousRefreshPeriod = _currentRefreshPeriod(lock, false);
if (!_isExpedited) {
// save some log lines.
@@ -98,7 +102,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() {
}
// Get the new expedited refresh period.
- const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock);
+ const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock, false);
if (_isMasterOutstanding) {
LOGV2_DEBUG(4333216,
@@ -164,7 +168,6 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
invariant(!_isMasterOutstanding);
- Timer timer;
auto swCbHandle = _executor->scheduleWorkAt(
_executor->now() + delay,
[self = shared_from_this()](const executor::TaskExecutor::CallbackArgs& cbData) {
@@ -175,8 +178,7 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
});
if (!swCbHandle.isOK()) {
- Microseconds latency(timer.micros());
- _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
+ _onIsMasterFailure(swCbHandle.getStatus(), BSONObj());
return;
}
@@ -184,18 +186,93 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
}
void SingleServerIsMasterMonitor::_doRemoteCommand() {
- auto request = executor::RemoteCommandRequest(
- HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS);
- request.sslMode = _setUri.getSSLMode();
-
stdx::lock_guard lock(_mutex);
if (_isShutdown)
return;
- Timer timer;
+ StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() {
+ if (_topologyVersion) {
+ return _scheduleStreamableIsMaster();
+ }
+
+ return _scheduleSingleIsMaster();
+ }();
+
+ if (!swCbHandle.isOK()) {
+ _onIsMasterFailure(swCbHandle.getStatus(), BSONObj());
+ uasserted(46156012, swCbHandle.getStatus().toString());
+ }
+
+ _isMasterOutstanding = true;
+ _remoteCommandHandle = swCbHandle.getValue();
+}
+
+StatusWith<TaskExecutor::CallbackHandle>
+SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() {
+ auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs);
+ overrideMaxAwaitTimeMS.execute(
+ [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); });
+ auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << maxAwaitTimeMS
+ << "topologyVersion" << _topologyVersion->toBSON());
+
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs;
+ auto request = executor::RemoteCommandRequest(
+ HostAndPort(_host), "admin", isMasterCmd, nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
+
+ auto swCbHandle = _executor->scheduleExhaustRemoteCommand(
+ std::move(request),
+ [self = shared_from_this()](
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ Milliseconds nextRefreshPeriod;
+ {
+ stdx::lock_guard lk(self->_mutex);
+
+ if (self->_isShutdown) {
+ self->_isMasterOutstanding = false;
+ LOGV2_DEBUG(4495400,
+ kLogLevel,
+ "RSM {setName} not processing response: {status}",
+ "status"_attr = result.response.status,
+ "setName"_attr = self->_setUri.getSetName());
+ return;
+ }
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ self->_lastIsMasterAt = self->_executor->now();
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
+ }
+
+ if (result.response.isOK()) {
+ self->_onIsMasterSuccess(result.response.data);
+ } else {
+ self->_onIsMasterFailure(result.response.status, result.response.data);
+ }
+ });
+
+ return swCbHandle;
+}
+
+StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() {
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS;
+ auto request = executor::RemoteCommandRequest(
+ HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
+
auto swCbHandle = _executor->scheduleRemoteCommand(
std::move(request),
- [this, self = shared_from_this(), timer](
+ [self = shared_from_this()](
const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
Milliseconds nextRefreshPeriod;
{
@@ -207,43 +284,42 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() {
kLogLevel,
"RSM {setName} not processing response: {status}",
"status"_attr = result.response.status,
- "setName"_attr = _setUri.getSetName());
+ "setName"_attr = self->_setUri.getSetName());
return;
}
self->_lastIsMasterAt = self->_executor->now();
- nextRefreshPeriod = self->_currentRefreshPeriod(lk);
-
- LOGV2_DEBUG(4333228,
- kLogLevel + 1,
- "RSM {setName} next refresh period in {period}",
- "period"_attr = nextRefreshPeriod.toString(),
- "setName"_attr = _setUri.getSetName());
- self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
}
- Microseconds latency(timer.micros());
if (result.response.isOK()) {
- self->_onIsMasterSuccess(latency, result.response.data);
+ self->_onIsMasterSuccess(result.response.data);
} else {
- self->_onIsMasterFailure(latency, result.response.status, result.response.data);
+ self->_onIsMasterFailure(result.response.status, result.response.data);
}
});
- if (!swCbHandle.isOK()) {
- Microseconds latency(timer.micros());
- _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
- uasserted(31448, swCbHandle.getStatus().toString());
- }
-
- _isMasterOutstanding = true;
- _remoteCommandHandle = swCbHandle.getValue();
+ return swCbHandle;
}
void SingleServerIsMasterMonitor::shutdown() {
stdx::lock_guard lock(_mutex);
- if (std::exchange(_isShutdown, true))
+ if (std::exchange(_isShutdown, true)) {
return;
+ }
LOGV2_DEBUG(4333220,
kLogLevel + 1,
@@ -274,35 +350,27 @@ void SingleServerIsMasterMonitor::_cancelOutstandingRequest(WithLock) {
_isMasterOutstanding = false;
}
-void SingleServerIsMasterMonitor::_onIsMasterSuccess(sdam::IsMasterRTT latency,
- const BSONObj bson) {
+void SingleServerIsMasterMonitor::_onIsMasterSuccess(const BSONObj bson) {
LOGV2_DEBUG(4333221,
kLogLevel + 1,
- "RSM {setName} received successful isMaster for server {host} ({latency}): {bson}",
+ "RSM {setName} received successful isMaster for server {host}: {bson}",
"host"_attr = _host,
- "latency"_attr = latency,
"setName"_attr = _setUri.getSetName(),
"bson"_attr = bson.toString());
- _eventListener->onServerHeartbeatSucceededEvent(
- duration_cast<Milliseconds>(latency), _host, bson);
+ _eventListener->onServerHeartbeatSucceededEvent(_host, bson);
}
-void SingleServerIsMasterMonitor::_onIsMasterFailure(sdam::IsMasterRTT latency,
- const Status& status,
- const BSONObj bson) {
- LOGV2_DEBUG(
- 4333222,
- kLogLevel,
- "RSM {setName} received failed isMaster for server {host}: {status} ({latency}): {bson}",
- "host"_attr = _host,
- "status"_attr = status.toString(),
- "latency"_attr = latency,
- "setName"_attr = _setUri.getSetName(),
- "bson"_attr = bson.toString());
-
- _eventListener->onServerHeartbeatFailureEvent(
- duration_cast<Milliseconds>(latency), status, _host, bson);
+void SingleServerIsMasterMonitor::_onIsMasterFailure(const Status& status, const BSONObj bson) {
+ LOGV2_DEBUG(4333222,
+ kLogLevel,
+ "RSM {setName} received failed isMaster for server {host}: {status}: {bson}",
+ "host"_attr = _host,
+ "status"_attr = status.toString(),
+ "setName"_attr = _setUri.getSetName(),
+ "bson"_attr = bson.toString());
+
+ _eventListener->onServerHeartbeatFailureEvent(status, _host, bson);
}
Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds original) {
@@ -319,7 +387,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or
return r;
}
-Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) {
+Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock,
+ bool scheduleImmediately) {
+ if (scheduleImmediately)
+ return Milliseconds(0);
+
return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS
: _heartbeatFrequencyMS;
}
@@ -409,6 +481,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent(
_singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>(
_setUri,
serverAddress,
+ serverDescription->getTopologyVersion(),
_sdamConfiguration.getHeartBeatFrequency(),
_eventPublisher,
_executor);
diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h
index c41bd6c8ce1..e788cc3c95a 100644
--- a/src/mongo/client/server_is_master_monitor.h
+++ b/src/mongo/client/server_is_master_monitor.h
@@ -40,6 +40,7 @@ class SingleServerIsMasterMonitor
public:
explicit SingleServerIsMasterMonitor(const MongoURI& setUri,
const ServerAddress& host,
+ boost::optional<TopologyVersion> topologyVersion,
Milliseconds heartbeatFrequencyMS,
TopologyEventsPublisherPtr eventListener,
std::shared_ptr<executor::TaskExecutor> executor);
@@ -65,16 +66,28 @@ public:
const Milliseconds& expeditedRefreshPeriod,
const Milliseconds& previousRefreshPeriod);
+ // Sent in the initial isMaster request when using the streamable exhaust protocol. The max
+ // duration a server should wait for a significant topology change before sending a response.
+ static constexpr Milliseconds kMaxAwaitTimeMs = Milliseconds(10000);
+
private:
void _scheduleNextIsMaster(WithLock, Milliseconds delay);
void _rescheduleNextIsMaster(WithLock, Milliseconds delay);
void _doRemoteCommand();
- void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson);
- void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson);
+ // Use the awaitable isMaster protocol with the exhaust bit set. Attach _topologyVersion and
+ // kMaxAwaitTimeMS to the request.
+ StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleStreamableIsMaster();
+
+ // Use the old isMaster protocol. Do not attach _topologyVersion or kMaxAwaitTimeMS to the
+ // request.
+ StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleSingleIsMaster();
+
+ void _onIsMasterSuccess(const BSONObj bson);
+ void _onIsMasterFailure(const Status& status, const BSONObj bson);
Milliseconds _overrideRefreshPeriod(Milliseconds original);
- Milliseconds _currentRefreshPeriod(WithLock);
+ Milliseconds _currentRefreshPeriod(WithLock, bool scheduleImmediately);
void _cancelOutstandingRequest(WithLock);
boost::optional<Milliseconds> _timeSinceLastCheck() const;
@@ -84,6 +97,7 @@ private:
Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SingleServerIsMasterMonitor::mutex");
ServerAddress _host;
+ boost::optional<TopologyVersion> _topologyVersion;
TopologyEventsPublisherPtr _eventListener;
std::shared_ptr<executor::TaskExecutor> _executor;
Milliseconds _heartbeatFrequencyMS;
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 6ae640dd9af..88a200a33e8 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -199,17 +199,20 @@ void StreamableReplicaSetMonitor::init() {
}
void StreamableReplicaSetMonitor::drop() {
- stdx::lock_guard lock(_mutex);
- if (_isDropped.swap(true)) {
- return;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (_isDropped.swap(true))
+ return;
+
+ _eventsPublisher->close();
+ _failOutstandingWithStatus(
+ lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"});
}
+
LOGV2(4333209, "Closing Replica Set Monitor {setName}", "setName"_attr = getName());
- _eventsPublisher->close();
_queryProcessor->shutdown();
_pingMonitor->shutdown();
_isMasterMonitor->shutdown();
- _failOutstandingWithStatus(
- lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"});
ReplicaSetMonitorManager::get()->getNotifier().onDroppedSet(getName());
LOGV2(4333210, "Done closing Replica Set Monitor {setName}", "setName"_attr = getName());
@@ -613,16 +616,14 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent(
}
}
-void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply) {
// After the inital handshake, isMasterResponses should not update the RTT with durationMs.
IsMasterOutcome outcome(hostAndPort, reply, boost::none);
_topologyManager->onServerDescription(outcome);
}
-void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) {
_failedHost(
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index 650d7abe237..07873fe199e 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -166,16 +166,14 @@ private:
sdam::TopologyDescriptionPtr previousDescription,
sdam::TopologyDescriptionPtr newDescription) override;
- void onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
- const sdam::ServerAddress& hostAndPort,
+ void onServerHeartbeatSucceededEvent(const sdam::ServerAddress& hostAndPort,
const BSONObj reply) override;
void onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
const Status& status,
const BSONObj reply) override;
- void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+ void onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) override;