summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/client/async_client.cpp52
-rw-r--r--src/mongo/client/async_client.h28
-rw-r--r--src/mongo/client/sdam/topology_listener.cpp16
-rw-r--r--src/mongo/client/sdam/topology_listener.h14
-rw-r--r--src/mongo/client/server_is_master_monitor.cpp187
-rw-r--r--src/mongo/client/server_is_master_monitor.h20
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.cpp21
-rw-r--r--src/mongo/client/streamable_replica_set_monitor.h6
-rw-r--r--src/mongo/executor/network_interface.h3
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp6
-rw-r--r--src/mongo/executor/network_interface_tl.cpp96
-rw-r--r--src/mongo/executor/network_interface_tl.h7
-rw-r--r--src/mongo/executor/remote_command_response.cpp11
-rw-r--r--src/mongo/executor/remote_command_response.h7
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp98
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h3
-rw-r--r--src/mongo/executor/thread_pool_task_executor_integration_test.cpp5
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp84
19 files changed, 386 insertions, 279 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index e1124282dd7..0d1403edc5a 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -353,6 +353,7 @@ error_codes:
- {code: 307,name: RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist}
- {code: 308,name: CurrentConfigNotCommittedYet}
+ - {code: 309,name: ExhaustCommandFinished}
- {code: 310,name: PeriodicJobIsStopped,categories: [CancelationError]}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index 99d93dab68a..f25830de073 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -316,55 +316,41 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-Future<void> AsyncDBClient::_continueReceiveExhaustResponse(
- ExhaustRequestParameters&& exhaustRequestParameters,
- boost::optional<int32_t> msgId,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse(
+ ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) {
return _waitForResponse(msgId, baton)
- .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this](
- Message responseMsg) mutable -> Future<void> {
- // Run callback
- auto now = exhaustParameters.clkSource->now();
- auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start);
+ .then([stopwatch, msgId, baton, this](Message responseMsg) mutable {
bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome);
rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg));
- exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration),
- isMoreToComeSet);
-
- if (!isMoreToComeSet) {
- return Status::OK();
- }
-
- exhaustParameters.start = now;
- return _continueReceiveExhaustResponse(
- std::move(exhaustParameters), boost::none, baton);
+ auto rcResponse = executor::RemoteCommandResponse(
+ *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet);
+ return rcResponse;
});
}
-Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand(
+ const BatonHandle& baton) {
+ return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton);
+}
+
+Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
+ const BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported);
- auto clkSource = _svcCtx->getPreciseClockSource();
- auto start = clkSource->now();
auto msgId = nextMessageId();
- return _call(std::move(requestMsg), msgId, baton)
- .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable {
- ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start};
- return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton);
- });
+ return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable {
+ return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton);
+ });
}
-Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest(
+ executor::RemoteCommandRequest request, const BatonHandle& baton) {
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
- return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton);
+ return runExhaustCommand(std::move(opMsgRequest), baton);
}
void AsyncDBClient::cancel(const BatonHandle& baton) {
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index c7917bb5461..d2279374dda 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -47,19 +47,6 @@ namespace mongo {
class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> {
public:
- using RemoteCommandCallbackFn =
- unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>;
-
- struct ExhaustRequestParameters {
- ExhaustRequestParameters(ExhaustRequestParameters&&) = default;
- ExhaustRequestParameters(const ExhaustRequestParameters&) = delete;
- ExhaustRequestParameters& operator=(const ExhaustRequestParameters&) = delete;
-
- RemoteCommandCallbackFn cb;
- ClockSource* clkSource;
- Date_t start;
- };
-
explicit AsyncDBClient(const HostAndPort& peer,
transport::SessionHandle session,
ServiceContext* svcCtx)
@@ -79,12 +66,11 @@ public:
const BatonHandle& baton = nullptr,
bool fireAndForget = false);
- Future<void> runExhaustCommandRequest(executor::RemoteCommandRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton = nullptr);
- Future<void> runExhaustCommand(OpMsgRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> beginExhaustCommandRequest(
+ executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> runExhaustCommand(OpMsgRequest request,
+ const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> awaitExhaustCommand(const BatonHandle& baton = nullptr);
Future<void> authenticate(const BSONObj& params);
@@ -108,8 +94,8 @@ public:
const HostAndPort& local() const;
private:
- Future<void> _continueReceiveExhaustResponse(
- ExhaustRequestParameters&& exhaustRequestParameters,
+ Future<executor::RemoteCommandResponse> _continueReceiveExhaustResponse(
+ ClockSource::StopWatch stopwatch,
boost::optional<int32_t> msgId,
const BatonHandle& baton = nullptr);
Future<Message> _waitForResponse(boost::optional<int32_t> msgId,
diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp
index 48e1f2b147b..3d4fe2b92cb 100644
--- a/src/mongo/client/sdam/topology_listener.cpp
+++ b/src/mongo/client/sdam/topology_listener.cpp
@@ -92,14 +92,12 @@ void TopologyEventsPublisher::onServerHandshakeFailedEvent(const sdam::ServerAdd
_scheduleNextDelivery();
}
-void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply) {
{
stdx::lock_guard lock(_eventQueueMutex);
EventPtr event = std::make_unique<Event>();
event->type = EventType::HEARTBEAT_SUCCESS;
- event->duration = duration_cast<IsMasterRTT>(durationMs);
event->hostAndPort = hostAndPort;
event->reply = reply;
_eventQueue.push_back(std::move(event));
@@ -107,15 +105,13 @@ void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durati
_scheduleNextDelivery();
}
-void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+void TopologyEventsPublisher::onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) {
{
stdx::lock_guard lock(_eventQueueMutex);
EventPtr event = std::make_unique<Event>();
event->type = EventType::HEARTBEAT_FAILURE;
- event->duration = duration_cast<IsMasterRTT>(durationMs);
event->hostAndPort = hostAndPort;
event->reply = reply;
event->status = errorStatus;
@@ -189,14 +185,10 @@ void TopologyEventsPublisher::_nextDelivery() {
void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) {
switch (event.type) {
case EventType::HEARTBEAT_SUCCESS:
- listener->onServerHeartbeatSucceededEvent(
- duration_cast<IsMasterRTT>(event.duration), event.hostAndPort, event.reply);
+ listener->onServerHeartbeatSucceededEvent(event.hostAndPort, event.reply);
break;
case EventType::HEARTBEAT_FAILURE:
- listener->onServerHeartbeatFailureEvent(duration_cast<IsMasterRTT>(event.duration),
- event.status,
- event.hostAndPort,
- event.reply);
+ listener->onServerHeartbeatFailureEvent(event.status, event.hostAndPort, event.reply);
break;
case EventType::TOPOLOGY_DESCRIPTION_CHANGED:
// TODO SERVER-46497: fix uuid or just remove
diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h
index 5cc737706d0..ff550309068 100644
--- a/src/mongo/client/sdam/topology_listener.h
+++ b/src/mongo/client/sdam/topology_listener.h
@@ -52,8 +52,7 @@ public:
TopologyDescriptionPtr previousDescription,
TopologyDescriptionPtr newDescription){};
- virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+ virtual void onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply){};
/**
@@ -73,8 +72,7 @@ public:
* hostAndPort succeeded. durationMS is the execution time of the event, including the time it
* took to send the message and recieve the reply from the server.
*/
- virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+ virtual void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply){};
/*
@@ -115,13 +113,11 @@ public:
void onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
const Status& status,
- const BSONObj reply) override;
+ const BSONObj reply);
- void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+ void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply) override;
- void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+ void onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) override;
void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override;
diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp
index b4dc993a6d1..bdb81499b30 100644
--- a/src/mongo/client/server_is_master_monitor.cpp
+++ b/src/mongo/client/server_is_master_monitor.cpp
@@ -41,6 +41,8 @@
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS);
+
using executor::NetworkInterface;
using executor::NetworkInterfaceThreadPool;
using executor::TaskExecutor;
@@ -52,10 +54,12 @@ const Milliseconds kZeroMs = Milliseconds{0};
SingleServerIsMasterMonitor::SingleServerIsMasterMonitor(
const MongoURI& setUri,
const sdam::ServerAddress& host,
+ boost::optional<TopologyVersion> topologyVersion,
Milliseconds heartbeatFrequencyMS,
sdam::TopologyEventsPublisherPtr eventListener,
std::shared_ptr<executor::TaskExecutor> executor)
: _host(host),
+ _topologyVersion(topologyVersion),
_eventListener(eventListener),
_executor(executor),
_heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)),
@@ -82,7 +86,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() {
// The previous refresh period may or may not have been expedited.
// Saving the value here before we change to expedited mode.
- const auto previousRefreshPeriod = _currentRefreshPeriod(lock);
+ const auto previousRefreshPeriod = _currentRefreshPeriod(lock, false);
if (!_isExpedited) {
// save some log lines.
@@ -97,7 +101,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() {
}
// Get the new expedited refresh period.
- const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock);
+ const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock, false);
if (_isMasterOutstanding) {
LOGV2_DEBUG(4333216,
@@ -163,7 +167,6 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
invariant(!_isMasterOutstanding);
- Timer timer;
auto swCbHandle = _executor->scheduleWorkAt(
_executor->now() + delay,
[self = shared_from_this()](const executor::TaskExecutor::CallbackArgs& cbData) {
@@ -174,8 +177,7 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
});
if (!swCbHandle.isOK()) {
- Microseconds latency(timer.micros());
- _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
+ _onIsMasterFailure(swCbHandle.getStatus(), BSONObj());
return;
}
@@ -183,24 +185,107 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
}
void SingleServerIsMasterMonitor::_doRemoteCommand() {
+ stdx::lock_guard lock(_mutex);
+ if (_isShutdown)
+ return;
+
+ StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() {
+ if (_topologyVersion) {
+ return _scheduleStreamableIsMaster();
+ }
+
+ return _scheduleSingleIsMaster();
+ }();
+
+ if (!swCbHandle.isOK()) {
+ _onIsMasterFailure(swCbHandle.getStatus(), BSONObj());
+ uasserted(46156012, swCbHandle.getStatus().toString());
+ }
+
+ _isMasterOutstanding = true;
+ _remoteCommandHandle = swCbHandle.getValue();
+}
+
+StatusWith<TaskExecutor::CallbackHandle>
+SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() {
+ auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs);
+ overrideMaxAwaitTimeMS.execute(
+ [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); });
+
BSONObjBuilder bob;
bob.append("isMaster", 1);
+ bob.append("maxAwaitTimeMS", maxAwaitTimeMS);
+ bob.append("topologyVersion", _topologyVersion->toBSON());
+
if (WireSpec::instance().isInternalClient) {
WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob);
}
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs;
auto request =
executor::RemoteCommandRequest(HostAndPort(_host), "admin", bob.obj(), nullptr, _timeoutMS);
request.sslMode = _setUri.getSSLMode();
- stdx::lock_guard lock(_mutex);
- if (_isShutdown)
- return;
+ auto swCbHandle = _executor->scheduleExhaustRemoteCommand(
+ std::move(request),
+ [self = shared_from_this()](
+ const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ Milliseconds nextRefreshPeriod;
+ {
+ stdx::lock_guard lk(self->_mutex);
+
+ if (self->_isShutdown) {
+ self->_isMasterOutstanding = false;
+ LOGV2_DEBUG(4495400,
+ kLogLevel,
+ "RSM {setName} not processing response: {status}",
+ "status"_attr = result.response.status,
+ "setName"_attr = self->_setUri.getSetName());
+ return;
+ }
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ self->_lastIsMasterAt = self->_executor->now();
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
+ }
+
+ if (result.response.isOK()) {
+ self->_onIsMasterSuccess(result.response.data);
+ } else {
+ self->_onIsMasterFailure(result.response.status, result.response.data);
+ }
+ });
+
+ return swCbHandle;
+}
+
+StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() {
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS;
+
+ BSONObjBuilder bob;
+ bob.append("isMaster", 1);
+ if (WireSpec::instance().isInternalClient) {
+ WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob);
+ }
+
+ auto request =
+ executor::RemoteCommandRequest(HostAndPort(_host), "admin", bob.obj(), nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
- Timer timer;
auto swCbHandle = _executor->scheduleRemoteCommand(
std::move(request),
- [this, self = shared_from_this(), timer](
+ [self = shared_from_this()](
const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
Milliseconds nextRefreshPeriod;
{
@@ -212,43 +297,42 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() {
kLogLevel,
"RSM {setName} not processing response: {status}",
"status"_attr = result.response.status,
- "setName"_attr = _setUri.getSetName());
+ "setName"_attr = self->_setUri.getSetName());
return;
}
self->_lastIsMasterAt = self->_executor->now();
- nextRefreshPeriod = self->_currentRefreshPeriod(lk);
-
- LOGV2_DEBUG(4333228,
- kLogLevel + 1,
- "RSM {setName} next refresh period in {period}",
- "period"_attr = nextRefreshPeriod.toString(),
- "setName"_attr = _setUri.getSetName());
- self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
}
- Microseconds latency(timer.micros());
if (result.response.isOK()) {
- self->_onIsMasterSuccess(latency, result.response.data);
+ self->_onIsMasterSuccess(result.response.data);
} else {
- self->_onIsMasterFailure(latency, result.response.status, result.response.data);
+ self->_onIsMasterFailure(result.response.status, result.response.data);
}
});
- if (!swCbHandle.isOK()) {
- Microseconds latency(timer.micros());
- _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
- uasserted(31448, swCbHandle.getStatus().toString());
- }
-
- _isMasterOutstanding = true;
- _remoteCommandHandle = swCbHandle.getValue();
+ return swCbHandle;
}
void SingleServerIsMasterMonitor::shutdown() {
stdx::lock_guard lock(_mutex);
- if (std::exchange(_isShutdown, true))
+ if (std::exchange(_isShutdown, true)) {
return;
+ }
LOGV2_DEBUG(4333220,
kLogLevel + 1,
@@ -279,35 +363,27 @@ void SingleServerIsMasterMonitor::_cancelOutstandingRequest(WithLock) {
_isMasterOutstanding = false;
}
-void SingleServerIsMasterMonitor::_onIsMasterSuccess(sdam::IsMasterRTT latency,
- const BSONObj bson) {
+void SingleServerIsMasterMonitor::_onIsMasterSuccess(const BSONObj bson) {
LOGV2_DEBUG(4333221,
kLogLevel + 1,
- "RSM {setName} received successful isMaster for server {host} ({latency}): {bson}",
+ "RSM {setName} received successful isMaster for server {host}: {bson}",
"host"_attr = _host,
- "latency"_attr = latency,
"setName"_attr = _setUri.getSetName(),
"bson"_attr = bson.toString());
- _eventListener->onServerHeartbeatSucceededEvent(
- duration_cast<Milliseconds>(latency), _host, bson);
+ _eventListener->onServerHeartbeatSucceededEvent(_host, bson);
}
-void SingleServerIsMasterMonitor::_onIsMasterFailure(sdam::IsMasterRTT latency,
- const Status& status,
- const BSONObj bson) {
- LOGV2_DEBUG(
- 4333222,
- kLogLevel,
- "RSM {setName} received failed isMaster for server {host}: {status} ({latency}): {bson}",
- "host"_attr = _host,
- "status"_attr = status.toString(),
- "latency"_attr = latency,
- "setName"_attr = _setUri.getSetName(),
- "bson"_attr = bson.toString());
-
- _eventListener->onServerHeartbeatFailureEvent(
- duration_cast<Milliseconds>(latency), status, _host, bson);
+void SingleServerIsMasterMonitor::_onIsMasterFailure(const Status& status, const BSONObj bson) {
+ LOGV2_DEBUG(4333222,
+ kLogLevel,
+ "RSM {setName} received failed isMaster for server {host}: {status}: {bson}",
+ "host"_attr = _host,
+ "status"_attr = status.toString(),
+ "setName"_attr = _setUri.getSetName(),
+ "bson"_attr = bson.toString());
+
+ _eventListener->onServerHeartbeatFailureEvent(status, _host, bson);
}
Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds original) {
@@ -324,7 +400,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or
return r;
}
-Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) {
+Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock,
+ bool scheduleImmediately) {
+ if (scheduleImmediately)
+ return Milliseconds(0);
+
return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS
: _heartbeatFrequencyMS;
}
@@ -414,6 +494,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent(
_singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>(
_setUri,
serverAddress,
+ serverDescription->getTopologyVersion(),
_sdamConfiguration.getHeartBeatFrequency(),
_eventPublisher,
_executor);
diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h
index c41bd6c8ce1..e788cc3c95a 100644
--- a/src/mongo/client/server_is_master_monitor.h
+++ b/src/mongo/client/server_is_master_monitor.h
@@ -40,6 +40,7 @@ class SingleServerIsMasterMonitor
public:
explicit SingleServerIsMasterMonitor(const MongoURI& setUri,
const ServerAddress& host,
+ boost::optional<TopologyVersion> topologyVersion,
Milliseconds heartbeatFrequencyMS,
TopologyEventsPublisherPtr eventListener,
std::shared_ptr<executor::TaskExecutor> executor);
@@ -65,16 +66,28 @@ public:
const Milliseconds& expeditedRefreshPeriod,
const Milliseconds& previousRefreshPeriod);
+ // Sent in the initial isMaster request when using the streamable exhaust protocol. The max
+ // duration a server should wait for a significant topology change before sending a response.
+ static constexpr Milliseconds kMaxAwaitTimeMs = Milliseconds(10000);
+
private:
void _scheduleNextIsMaster(WithLock, Milliseconds delay);
void _rescheduleNextIsMaster(WithLock, Milliseconds delay);
void _doRemoteCommand();
- void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson);
- void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson);
+ // Use the awaitable isMaster protocol with the exhaust bit set. Attach _topologyVersion and
+ // kMaxAwaitTimeMS to the request.
+ StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleStreamableIsMaster();
+
+ // Use the old isMaster protocol. Do not attach _topologyVersion or kMaxAwaitTimeMS to the
+ // request.
+ StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleSingleIsMaster();
+
+ void _onIsMasterSuccess(const BSONObj bson);
+ void _onIsMasterFailure(const Status& status, const BSONObj bson);
Milliseconds _overrideRefreshPeriod(Milliseconds original);
- Milliseconds _currentRefreshPeriod(WithLock);
+ Milliseconds _currentRefreshPeriod(WithLock, bool scheduleImmediately);
void _cancelOutstandingRequest(WithLock);
boost::optional<Milliseconds> _timeSinceLastCheck() const;
@@ -84,6 +97,7 @@ private:
Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SingleServerIsMasterMonitor::mutex");
ServerAddress _host;
+ boost::optional<TopologyVersion> _topologyVersion;
TopologyEventsPublisherPtr _eventListener;
std::shared_ptr<executor::TaskExecutor> _executor;
Milliseconds _heartbeatFrequencyMS;
diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp
index 6ae640dd9af..88a200a33e8 100644
--- a/src/mongo/client/streamable_replica_set_monitor.cpp
+++ b/src/mongo/client/streamable_replica_set_monitor.cpp
@@ -199,17 +199,20 @@ void StreamableReplicaSetMonitor::init() {
}
void StreamableReplicaSetMonitor::drop() {
- stdx::lock_guard lock(_mutex);
- if (_isDropped.swap(true)) {
- return;
+ {
+ stdx::lock_guard lock(_mutex);
+ if (_isDropped.swap(true))
+ return;
+
+ _eventsPublisher->close();
+ _failOutstandingWithStatus(
+ lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"});
}
+
LOGV2(4333209, "Closing Replica Set Monitor {setName}", "setName"_attr = getName());
- _eventsPublisher->close();
_queryProcessor->shutdown();
_pingMonitor->shutdown();
_isMasterMonitor->shutdown();
- _failOutstandingWithStatus(
- lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"});
ReplicaSetMonitorManager::get()->getNotifier().onDroppedSet(getName());
LOGV2(4333210, "Done closing Replica Set Monitor {setName}", "setName"_attr = getName());
@@ -613,16 +616,14 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent(
}
}
-void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
- const ServerAddress& hostAndPort,
+void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort,
const BSONObj reply) {
// After the inital handshake, isMasterResponses should not update the RTT with durationMs.
IsMasterOutcome outcome(hostAndPort, reply, boost::none);
_topologyManager->onServerDescription(outcome);
}
-void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) {
_failedHost(
diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h
index 650d7abe237..07873fe199e 100644
--- a/src/mongo/client/streamable_replica_set_monitor.h
+++ b/src/mongo/client/streamable_replica_set_monitor.h
@@ -166,16 +166,14 @@ private:
sdam::TopologyDescriptionPtr previousDescription,
sdam::TopologyDescriptionPtr newDescription) override;
- void onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs,
- const sdam::ServerAddress& hostAndPort,
+ void onServerHeartbeatSucceededEvent(const sdam::ServerAddress& hostAndPort,
const BSONObj reply) override;
void onServerHandshakeFailedEvent(const sdam::ServerAddress& address,
const Status& status,
const BSONObj reply) override;
- void onServerHeartbeatFailureEvent(IsMasterRTT durationMs,
- Status errorStatus,
+ void onServerHeartbeatFailureEvent(Status errorStatus,
const ServerAddress& hostAndPort,
const BSONObj reply) override;
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index 6f9b40e8103..6e6f9244d93 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -58,8 +58,7 @@ public:
using Response = RemoteCommandResponse;
using RemoteCommandCompletionFn =
unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
- using RemoteCommandOnReplyFn =
- unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>;
+ using RemoteCommandOnReplyFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
virtual ~NetworkInterface();
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index 66c67b9079c..ae22469902a 100644
--- a/src/mongo/executor/network_interface_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -133,16 +133,16 @@ Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand(
cbHandle,
rcroa,
[p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)](
- const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable {
+ const TaskExecutor::ResponseOnAnyStatus& rs) mutable {
exhaustUtilCB(rs);
if (!rs.status.isOK()) {
- invariant(!isMoreToComeSet);
+ invariant(!rs.moreToCome);
p.setError(rs.status);
return;
}
- if (!isMoreToComeSet) {
+ if (!rs.moreToCome) {
p.emplaceValue();
}
},
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index 91289e3e61c..2f16d4a2b62 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -878,12 +878,13 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface
state->promise = std::move(promise);
std::move(future)
.onError([state](Status error) {
- stdx::lock_guard lk(state->_onReplyMutex);
state->onReplyFn(RemoteCommandOnAnyResponse(
- boost::none, std::move(error), state->stopwatch.elapsed()),
- false);
+ boost::none, std::move(error), state->stopwatch.elapsed()));
})
- .getAsync([state](Status status) { state->tryFinish(status); });
+ .getAsync([state](Status status) {
+ state->tryFinish(
+ Status{ErrorCodes::ExhaustCommandFinished, "Exhaust command finished"});
+ });
{
stdx::lock_guard lk(interface->_inProgressMutex);
@@ -901,38 +902,17 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque
auto requestState = requestManager->getRequest(reqId);
invariant(requestState);
- auto clientCallback = [this, requestState](const RemoteCommandResponse& response,
- bool isMoreToComeSet) {
- // Stash this response on the command state to be used to fulfill the promise.
- prevResponse = response;
- auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response);
- doMetadataHook(onAnyResponse);
-
- // If the command failed, we will call 'onReply' as a part of the future chain paired with
- // the promise. This is to be sure that all error paths will run 'onReply' only once upon
- // future completion.
- if (!getStatusFromCommandResult(response.data).isOK()) {
- // The moreToCome bit should *not* be set if the command failed
- invariant(!isMoreToComeSet);
- return;
- }
+ setTimer();
+ requestState->getClient(requestState->conn)
+ ->beginExhaustCommandRequest(*requestState->request, baton)
+ .thenRunOn(requestState->interface()->_reactor)
+ .getAsync([this, requestState](StatusWith<RemoteCommandResponse> swResponse) mutable {
+ continueExhaustRequest(std::move(requestState), swResponse);
+ });
- // Reset the stopwatch to measure the correct duration for the folowing reply
- stopwatch.restart();
- setTimer();
-
- stdx::lock_guard lk(_onReplyMutex);
- onReplyFn(onAnyResponse, isMoreToComeSet);
- };
-
- return makeReadyFutureWith(
- [this, requestState, clientCallback = std::move(clientCallback)]() mutable {
- setTimer();
- return RequestState::getClient(requestState->conn)
- ->runExhaustCommandRequest(
- *requestState->request, std::move(clientCallback), baton);
- })
- .then([this, requestState] { return prevResponse; });
+ auto [promise, future] = makePromiseFuture<RemoteCommandResponse>();
+ finalResponsePromise = std::move(promise);
+ return std::move(future).then([this](const auto& finalResponse) { return finalResponse; });
}
void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise(
@@ -949,6 +929,52 @@ void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise(
promise.emplaceValue();
}
+void NetworkInterfaceTL::ExhaustCommandState::continueExhaustRequest(
+ std::shared_ptr<RequestState> requestState, StatusWith<RemoteCommandResponse> swResponse) {
+ RemoteCommandResponse response;
+ if (!swResponse.isOK()) {
+ response = RemoteCommandResponse(std::move(swResponse.getStatus()));
+ } else {
+ response = std::move(swResponse.getValue());
+ }
+
+ if (requestState->interface()->inShutdown() ||
+ ErrorCodes::isCancelationError(response.status)) {
+ finalResponsePromise.emplaceValue(response);
+ return;
+ }
+
+ auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response);
+ doMetadataHook(onAnyResponse);
+
+ // If the command failed, we will call 'onReply' as a part of the future chain paired with
+ // the promise. This is to be sure that all error paths will run 'onReply' only once upon
+ // future completion.
+ if (!response.status.isOK() || !getStatusFromCommandResult(response.data).isOK()) {
+ // The moreToCome bit should *not* be set if the command failed
+ invariant(!response.moreToCome);
+
+ finalResponsePromise.emplaceValue(response);
+ return;
+ }
+
+ onReplyFn(onAnyResponse);
+
+ // Reset the stopwatch to measure the correct duration for the folowing reply
+ stopwatch.restart();
+ if (deadline != RemoteCommandRequest::kNoExpirationDate) {
+ deadline = stopwatch.start() + requestOnAny.timeout;
+ }
+ setTimer();
+
+ requestState->getClient(requestState->conn)
+ ->awaitExhaustCommand(baton)
+ .thenRunOn(requestState->interface()->_reactor)
+ .getAsync([this, requestState](StatusWith<RemoteCommandResponse> swResponse) mutable {
+ continueExhaustRequest(std::move(requestState), swResponse);
+ });
+}
+
Status NetworkInterfaceTL::startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequestOnAny& request,
RemoteCommandOnReplyFn&& onReply,
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index c989ee56003..f49ac1ecad5 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -199,10 +199,11 @@ private:
void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override;
+ void continueExhaustRequest(std::shared_ptr<RequestState> requestState,
+ StatusWith<RemoteCommandResponse> swResponse);
+
Promise<void> promise;
- RemoteCommandResponse prevResponse;
- Mutex _onReplyMutex =
- MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex");
+ Promise<RemoteCommandResponse> finalResponsePromise;
RemoteCommandOnReplyFn onReplyFn;
};
diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp
index 8baafdb3d67..0b88b3d340d 100644
--- a/src/mongo/executor/remote_command_response.cpp
+++ b/src/mongo/executor/remote_command_response.cpp
@@ -55,8 +55,10 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(Status s, Milliseconds mill
invariant(!isOK());
};
-RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis)
- : data(std::move(dataObj)), elapsedMillis(millis) {
+RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj,
+ Milliseconds millis,
+ bool moreToCome)
+ : data(std::move(dataObj)), elapsedMillis(millis), moreToCome(moreToCome) {
// The buffer backing the default empty BSONObj has static duration so it is effectively
// owned.
invariant(data.isOwned() || data.objdata() == BSONObj().objdata());
@@ -65,8 +67,9 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Millisecon
// TODO(amidvidy): we currently discard output docs when we use this constructor. We should
// have RCR hold those too, but we need more machinery before that is possible.
RemoteCommandResponseBase::RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply,
- Milliseconds millis)
- : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis)) {}
+ Milliseconds millis,
+ bool moreToCome)
+ : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis), moreToCome) {}
bool RemoteCommandResponseBase::isOK() const {
return status.isOK();
diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h
index 2cd90b37974..7842ebdfe14 100644
--- a/src/mongo/executor/remote_command_response.h
+++ b/src/mongo/executor/remote_command_response.h
@@ -63,15 +63,18 @@ struct RemoteCommandResponseBase {
RemoteCommandResponseBase(Status s, Milliseconds millis);
- RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis);
+ RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis, bool moreToCome = false);
- RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, Milliseconds millis);
+ RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply,
+ Milliseconds millis,
+ bool moreToCome = false);
bool isOK() const;
BSONObj data; // Always owned. May point into message.
boost::optional<Milliseconds> elapsedMillis;
Status status = Status::OK();
+ bool moreToCome = false; // Whether or not the moreToCome bit is set on an exhaust message.
protected:
~RemoteCommandResponseBase() = default;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 10a57699045..7b83a05c7b7 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -101,6 +101,9 @@ public:
AtomicWord<bool> isFinished{false};
boost::optional<stdx::condition_variable> finishedCondition;
BatonHandle baton;
+ AtomicWord<bool> exhaustErased{
+ false}; // Used only in the exhaust path. Used to indicate that a cbState associated with
+ // an exhaust request has been removed from the '_networkInProgressQueue'.
};
class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState {
@@ -690,8 +693,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust
auto commandStatus = _net->startExhaustCommand(
swCbHandle.getValue(),
scheduledRequest,
- [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response,
- bool isMoreToComeSet) {
+ [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response) {
using std::swap;
LOGV2_DEBUG(
@@ -701,8 +703,37 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust
"response_isOK_response_response_status_toString"_attr =
redact(response.isOK() ? response.toString() : response.status.toString()));
+ // The cbState remains in the '_networkInProgressQueue' for the entirety of the
+ // request's lifetime and is added to and removed from the '_poolInProgressQueue' each
+ // time a response is received and its callback run respectively. It must be erased from
+ // the '_networkInProgressQueue' when either the request is cancelled or a response is
+ // received that has moreToCome == false to avoid shutting down with a task still in the
+ // '_networkInProgressQueue'. It is also possible that we receive both of these
+ // responses around the same time, so the 'exhaustErased' bool protects against
+ // attempting to erase the same cbState twice.
+
stdx::unique_lock<Latch> lk(_mutex);
- if (_inShutdown_inlock()) {
+ if (_inShutdown_inlock() || cbState->exhaustErased.load()) {
+ if (cbState->exhaustIter) {
+ _poolInProgressQueue.erase(cbState->exhaustIter.get());
+ cbState->exhaustIter = boost::none;
+ }
+ return;
+ }
+
+ if (cbState->canceled.load()) {
+ // Release any resources the callback function is holding
+ TaskExecutor::CallbackFn callback = [](const CallbackArgs&) {};
+ std::swap(cbState->callback, callback);
+
+ _networkInProgressQueue.erase(cbState->iter);
+ cbState->exhaustErased.store(1);
+
+ if (cbState->exhaustIter) {
+ _poolInProgressQueue.erase(cbState->exhaustIter.get());
+ cbState->exhaustIter = boost::none;
+ }
+
return;
}
@@ -714,8 +745,15 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust
// If this is the last response, invoke the non-exhaust path. This will mark cbState as
// finished and remove the task from _networkInProgressQueue
- if (!isMoreToComeSet) {
- scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
+ if (!response.moreToCome) {
+ _networkInProgressQueue.erase(cbState->iter);
+ cbState->exhaustErased.store(1);
+
+ WorkQueue result;
+ result.emplace_front(cbState);
+ result.front()->iter = result.begin();
+
+ scheduleIntoPool_inlock(&result, std::move(lk));
return;
}
@@ -733,12 +771,13 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
stdx::unique_lock<Latch> lk) {
_poolInProgressQueue.push_back(cbState);
cbState->exhaustIter = --_poolInProgressQueue.end();
+ auto expectedExhaustIter = cbState->exhaustIter.get();
lk.unlock();
if (cbState->baton) {
- cbState->baton->schedule([this, cbState](Status status) {
+ cbState->baton->schedule([this, cbState, expectedExhaustIter](Status status) {
if (status.isOK()) {
- runCallbackExhaust(cbState);
+ runCallbackExhaust(cbState, expectedExhaustIter);
return;
}
@@ -747,14 +786,14 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
cbState->canceled.store(1);
}
- _pool->schedule([this, cbState](auto status) {
+ _pool->schedule([this, cbState, expectedExhaustIter](auto status) {
invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
- runCallbackExhaust(cbState);
+ runCallbackExhaust(cbState, expectedExhaustIter);
});
});
} else {
- _pool->schedule([this, cbState](auto status) {
+ _pool->schedule([this, cbState, expectedExhaustIter](auto status) {
if (ErrorCodes::isCancelationError(status.code())) {
stdx::lock_guard<Latch> lk(_mutex);
@@ -763,14 +802,15 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
fassert(4615617, status);
}
- runCallbackExhaust(cbState);
+ runCallbackExhaust(cbState, expectedExhaustIter);
});
}
_net->signalWorkAvailable();
}
-void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) {
+void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState,
+ WorkQueue::iterator expectedExhaustIter) {
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, cbState);
CallbackArgs args(this,
@@ -778,21 +818,35 @@ void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> c
cbState->canceled.load()
? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
: Status::OK());
- invariant(!cbState->isFinished.load());
- {
- // After running callback function, clear 'cbStateArg->callback' to release any resources
- // that might be held by this function object.
- // Swap 'cbStateArg->callback' with temporary copy before running callback for exception
- // safety.
- TaskExecutor::CallbackFn callback;
+
+ if (!cbState->isFinished.load()) {
+ TaskExecutor::CallbackFn callback = [](const CallbackArgs&) {};
std::swap(cbState->callback, callback);
callback(std::move(args));
+
+ // Leave the empty callback function if the request has been marked canceled or finished
+ // while running the callback to avoid leaking resources.
+ if (!cbState->canceled.load() && !cbState->isFinished.load()) {
+ std::swap(callback, cbState->callback);
+ }
}
- // Do not mark cbState as finished. It will be marked as finished on the last reply.
+ // Do not mark cbState as finished. It will be marked as finished on the last reply which is
+ // handled in 'runCallback'.
stdx::lock_guard<Latch> lk(_mutex);
- invariant(cbState->exhaustIter);
- _poolInProgressQueue.erase(cbState->exhaustIter.get());
+
+ // It is possible that we receive multiple responses in quick succession. If this happens, the
+ // later responses can overwrite the 'exhaustIter' value on the cbState when adding the cbState
+ // to the '_poolInProgressQueue' if the previous responses have not been run yet. We take in the
+ // 'expectedExhaustIter' so that we can still remove this task from the 'poolInProgressQueue' if
+ // this happens, but we do not want to reset the 'exhaustIter' value in this case.
+ if (cbState->exhaustIter) {
+ _poolInProgressQueue.erase(expectedExhaustIter);
+ if (cbState->exhaustIter.get() == expectedExhaustIter) {
+ cbState->exhaustIter = boost::none;
+ }
+ }
+
if (_inShutdown_inlock() && _poolInProgressQueue.empty()) {
_stateChange.notify_all();
}
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 4fbd38b2ba6..16230554948 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -198,7 +198,8 @@ private:
/**
* Executes the callback specified by "cbState". Will not mark cbState as finished.
*/
- void runCallbackExhaust(std::shared_ptr<CallbackState> cbState);
+ void runCallbackExhaust(std::shared_ptr<CallbackState> cbState,
+ WorkQueue::iterator expectedExhaustIter);
bool _inShutdown_inlock() const;
void _setState_inlock(State newState);
diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
index d5eeeac0504..f6a8ccf4f75 100644
--- a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
@@ -163,11 +163,6 @@ TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) {
ASSERT(cbHandle.isValid());
executor()->cancel(cbHandle);
ASSERT(cbHandle.isCanceled());
- auto counters = exhaustRequestHandler.getCountersWhenReady();
-
- // The command was cancelled so the 'fail' counter should be incremented
- ASSERT_EQ(counters._success, 2);
- ASSERT_EQ(counters._failed, 1);
// The tasks should be removed after 'isMaster' fails
ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5)));
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index ec7981b615a..9cf669ec978 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -152,42 +152,6 @@ TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) {
ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout);
}
-class ExhaustRequestHandlerUtil {
-public:
- AsyncDBClient::RemoteCommandCallbackFn&& getExhaustRequestCallbackFn() {
- return std::move(_callbackFn);
- }
-
- executor::RemoteCommandResponse getReplyObjectWhenReady() {
- stdx::unique_lock<Latch> lk(_mutex);
- _cv.wait(_mutex, [&] { return _replyUpdated; });
- _replyUpdated = false;
- return _reply;
- }
-
-private:
- // holds the server's response once it sent one
- executor::RemoteCommandResponse _reply;
- // set to true once 'reply' has been set. Used to indicate that a new response has been set and
- // should be inspected.
- bool _replyUpdated = false;
-
- Mutex _mutex = MONGO_MAKE_LATCH();
- stdx::condition_variable _cv;
-
- // called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated.
- AsyncDBClient::RemoteCommandCallbackFn _callbackFn =
- [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) {
- {
- stdx::unique_lock<Latch> lk(_mutex);
- _reply = response;
- _replyUpdated = true;
- }
-
- _cv.notify_all();
- };
-};
-
TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
auto connectionString = unittest::getFixtureConnectionString();
auto server = connectionString.getServers().front();
@@ -217,29 +181,27 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
BSONObj(),
nullptr};
- ExhaustRequestHandlerUtil exhaustRequestHandler;
- Future<void> exhaustFuture = handle->runExhaustCommandRequest(
- isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn());
+ Future<executor::RemoteCommandResponse> beginExhaustFuture =
+ handle->beginExhaustCommandRequest(isMasterRequest);
Date_t prevTime;
TopologyVersion topologyVersion;
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = beginExhaustFuture.get();
- ASSERT(!exhaustFuture.isReady());
ASSERT_OK(reply.status);
+ ASSERT(reply.moreToCome);
prevTime = reply.data.getField("localTime").Date();
topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"),
reply.data.getField("topologyVersion").Obj());
}
+ Future<executor::RemoteCommandResponse> awaitExhaustFuture = handle->awaitExhaustCommand();
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = awaitExhaustFuture.get();
- // The moreToCome bit is still set
- ASSERT(!exhaustFuture.isReady());
ASSERT_OK(reply.status);
-
+ ASSERT(reply.moreToCome);
auto replyTime = reply.data.getField("localTime").Date();
ASSERT_GT(replyTime, prevTime);
@@ -249,12 +211,22 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
ASSERT_EQ(replyTopologyVersion.getCounter(), topologyVersion.getCounter());
}
- handle->cancel();
- handle->end();
- auto error = exhaustFuture.getNoThrow();
- // exhaustFuture will resolve with CallbackCanceled unless the socket is already closed, in
- // which case it will resolve with HostUnreachable.
- ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable));
+ Future<executor::RemoteCommandResponse> cancelExhaustFuture = handle->awaitExhaustCommand();
+ {
+ handle->cancel();
+ handle->end();
+ auto swReply = cancelExhaustFuture.getNoThrow();
+
+ // The original isMaster request has maxAwaitTimeMs = 1000 ms, if the cancel executes before
+ // the 1000ms then we expect the future to resolve with an error. It should resolve with
+ // CallbackCanceled unless the socket is already closed, in which case it will resolve with
+ // HostUnreachable. If the network is slow, the server may response before the cancel
+ // executes however.
+ if (!swReply.getStatus().isOK()) {
+ ASSERT((swReply.getStatus() == ErrorCodes::CallbackCanceled) ||
+ (swReply.getStatus() == ErrorCodes::HostUnreachable));
+ }
+ }
}
TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
@@ -323,16 +295,14 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
BSONObj(),
nullptr};
- ExhaustRequestHandlerUtil exhaustRequestHandler;
- Future<void> exhaustFuture = isMasterHandle->runExhaustCommandRequest(
- isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn());
-
+ Future<executor::RemoteCommandResponse> beginExhaustFuture =
+ isMasterHandle->beginExhaustCommandRequest(isMasterRequest);
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = beginExhaustFuture.get();
- exhaustFuture.get();
ASSERT_OK(reply.status);
ASSERT_EQ(reply.data["ok"].Double(), 0.0);
+ ASSERT(!reply.moreToCome);
}
}