summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-03-04 12:08:42 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-25 04:30:03 +0000
commit8124a8d047ce142f6d6defc089e5e71192721a5c (patch)
treedddfd98d579eb288b31bb28cb29072dbc37b2c20
parent4601bd54dfd3f3ab20b357b71a4b17667143c0fb (diff)
downloadmongo-8124a8d047ce142f6d6defc089e5e71192721a5c.tar.gz
SERVER-44954 Streamable RSM uses exhaust isMaster
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml3
-rw-r--r--jstests/sharding/repl_monitor_refresh.js9
-rw-r--r--jstests/sharding/retryable_write_error_labels.js19
-rw-r--r--src/mongo/base/error_codes.yml1
-rw-r--r--src/mongo/client/async_client.cpp52
-rw-r--r--src/mongo/client/async_client.h28
-rw-r--r--src/mongo/client/server_is_master_monitor.cpp154
-rw-r--r--src/mongo/client/server_is_master_monitor.h14
-rw-r--r--src/mongo/executor/network_interface.h3
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp6
-rw-r--r--src/mongo/executor/network_interface_tl.cpp67
-rw-r--r--src/mongo/executor/network_interface_tl.h6
-rw-r--r--src/mongo/executor/remote_command_response.cpp11
-rw-r--r--src/mongo/executor/remote_command_response.h7
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp44
-rw-r--r--src/mongo/executor/thread_pool_task_executor_integration_test.cpp5
-rw-r--r--src/mongo/transport/transport_layer_asio_integration_test.cpp84
17 files changed, 317 insertions, 196 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 8abedc1d771..3174589f4ff 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -28,8 +28,9 @@ selector:
- jstests/sharding/test_stacked_migration_cleanup.js
- jstests/sharding/killop.js
- jstests/sharding/verify_sessions_expiration_sharded.js
- # Enable when SERVER-44733 is backported
- jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
+ - jstests/sharding/repl_monitor_refresh.js
+ - jstests/sharding/retryable_write_error_labels.js
# Enable when SERVER-43310 is backported
- jstests/sharding/cluster_create_indexes_always_routes_through_primary.js
diff --git a/jstests/sharding/repl_monitor_refresh.js b/jstests/sharding/repl_monitor_refresh.js
index 20f1d930d98..b41fc42d761 100644
--- a/jstests/sharding/repl_monitor_refresh.js
+++ b/jstests/sharding/repl_monitor_refresh.js
@@ -3,6 +3,7 @@ load("jstests/replsets/rslib.js");
/**
* Test for making sure that the replica seed list in the config server does not
* become invalid when a replica set reconfig happens.
+ * @tags: [multiversion_incompatible]
*/
(function() {
"use strict";
@@ -45,6 +46,14 @@ jsTest.log('Changing conf to ' + tojson(confDoc));
reconfig(replTest, confDoc);
awaitRSClientHosts(mongos, {host: targetHostName}, {ok: true, ismaster: true});
+let rsConfig = st.rs0.getReplSetConfigFromNode();
+assert.soon(function() {
+ const res = st.rs0.getPrimary().adminCommand({replSetGetStatus: 1});
+ return ((res.members[0].configVersion === rsConfig.version) &&
+ (res.members[2].configVersion === rsConfig.version) &&
+ (res.members[0].configTerm === rsConfig.term) &&
+ (res.members[2].configTerm === rsConfig.term));
+});
// Remove first node from set
confDoc.members.shift();
diff --git a/jstests/sharding/retryable_write_error_labels.js b/jstests/sharding/retryable_write_error_labels.js
index fd0eb021137..54b6cd41424 100644
--- a/jstests/sharding/retryable_write_error_labels.js
+++ b/jstests/sharding/retryable_write_error_labels.js
@@ -14,7 +14,11 @@ const dbName = "test";
const collName = "retryable_write_error_labels";
// Use ShardingTest because we need to test both mongod and mongos behaviors.
-const st = new ShardingTest({config: 1, mongos: 1, shards: 1});
+const st = new ShardingTest({
+ config: 1,
+ mongos: {s0: {setParameter: {"failpoint.overrideMaxAwaitTimeMS": "{'mode':'alwaysOn'}"}}},
+ shards: 1
+});
const primary = st.rs0.getPrimary();
assert.commandWorked(primary.getDB(dbName).runCommand(
@@ -76,8 +80,11 @@ function runTest(errorCode, expectLabel, isWCError, isMongos) {
// Test retryable writes.
jsTestLog("Retryable write should return error " + errorCode + withOrWithout +
" RetryableWriteError label");
- let res = testDB.runCommand(
- {insert: collName, documents: [{a: errorCode, b: "retryable"}], txnNumber: NumberLong(0)});
+ let res = testDB.runCommand({
+ insert: collName,
+ documents: [{a: errorCode, b: "retryable"}],
+ txnNumber: NumberLong(0),
+ });
checkErrorCode(res, errorCode, isWCError);
checkErrorLabels(res, expectLabel);
@@ -96,7 +103,7 @@ function runTest(errorCode, expectLabel, isWCError, isMongos) {
res = sessionDb.adminCommand({
commitTransaction: 1,
txnNumber: NumberLong(session.getTxnNumber_forTesting()),
- autocommit: false
+ autocommit: false,
});
checkErrorCode(res, errorCode, isWCError);
checkErrorLabels(res, expectLabel);
@@ -133,7 +140,7 @@ function runTest(errorCode, expectLabel, isWCError, isMongos) {
res = sessionDb.adminCommand({
abortTransaction: 1,
txnNumber: NumberLong(session.getTxnNumber_forTesting()),
- autocommit: false
+ autocommit: false,
});
checkErrorCode(res, errorCode, isWCError);
checkErrorLabels(res, expectLabel);
@@ -202,5 +209,7 @@ runTest(ErrorCodes.WriteConcernFailed,
true /* isWCError */,
true /* isMongos */);
+st.s.adminCommand({"configureFailPoint": "overrideMaxAwaitTimeMS", "mode": "off"});
+
st.stop();
}());
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 6ecc44257aa..899a94eaae3 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -351,6 +351,7 @@ error_codes:
- {code: 306,name: ReadThroughCacheLookupCanceled}
- {code: 307,name: RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist}
+ - {code: 308,name: ExhaustCommandFinished}
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index 99d93dab68a..f25830de073 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -316,55 +316,41 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-Future<void> AsyncDBClient::_continueReceiveExhaustResponse(
- ExhaustRequestParameters&& exhaustRequestParameters,
- boost::optional<int32_t> msgId,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse(
+ ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) {
return _waitForResponse(msgId, baton)
- .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this](
- Message responseMsg) mutable -> Future<void> {
- // Run callback
- auto now = exhaustParameters.clkSource->now();
- auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start);
+ .then([stopwatch, msgId, baton, this](Message responseMsg) mutable {
bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome);
rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg));
- exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration),
- isMoreToComeSet);
-
- if (!isMoreToComeSet) {
- return Status::OK();
- }
-
- exhaustParameters.start = now;
- return _continueReceiveExhaustResponse(
- std::move(exhaustParameters), boost::none, baton);
+ auto rcResponse = executor::RemoteCommandResponse(
+ *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet);
+ return rcResponse;
});
}
-Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand(
+ const BatonHandle& baton) {
+ return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton);
+}
+
+Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
+ const BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported);
- auto clkSource = _svcCtx->getPreciseClockSource();
- auto start = clkSource->now();
auto msgId = nextMessageId();
- return _call(std::move(requestMsg), msgId, baton)
- .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable {
- ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start};
- return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton);
- });
+ return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable {
+ return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton);
+ });
}
-Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton) {
+Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest(
+ executor::RemoteCommandRequest request, const BatonHandle& baton) {
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
- return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton);
+ return runExhaustCommand(std::move(opMsgRequest), baton);
}
void AsyncDBClient::cancel(const BatonHandle& baton) {
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index c7917bb5461..d2279374dda 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -47,19 +47,6 @@ namespace mongo {
class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> {
public:
- using RemoteCommandCallbackFn =
- unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>;
-
- struct ExhaustRequestParameters {
- ExhaustRequestParameters(ExhaustRequestParameters&&) = default;
- ExhaustRequestParameters(const ExhaustRequestParameters&) = delete;
- ExhaustRequestParameters& operator=(const ExhaustRequestParameters&) = delete;
-
- RemoteCommandCallbackFn cb;
- ClockSource* clkSource;
- Date_t start;
- };
-
explicit AsyncDBClient(const HostAndPort& peer,
transport::SessionHandle session,
ServiceContext* svcCtx)
@@ -79,12 +66,11 @@ public:
const BatonHandle& baton = nullptr,
bool fireAndForget = false);
- Future<void> runExhaustCommandRequest(executor::RemoteCommandRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton = nullptr);
- Future<void> runExhaustCommand(OpMsgRequest request,
- RemoteCommandCallbackFn&& cb,
- const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> beginExhaustCommandRequest(
+ executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> runExhaustCommand(OpMsgRequest request,
+ const BatonHandle& baton = nullptr);
+ Future<executor::RemoteCommandResponse> awaitExhaustCommand(const BatonHandle& baton = nullptr);
Future<void> authenticate(const BSONObj& params);
@@ -108,8 +94,8 @@ public:
const HostAndPort& local() const;
private:
- Future<void> _continueReceiveExhaustResponse(
- ExhaustRequestParameters&& exhaustRequestParameters,
+ Future<executor::RemoteCommandResponse> _continueReceiveExhaustResponse(
+ ClockSource::StopWatch stopwatch,
boost::optional<int32_t> msgId,
const BatonHandle& baton = nullptr);
Future<Message> _waitForResponse(boost::optional<int32_t> msgId,
diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp
index 7f70753a772..f399ffeea38 100644
--- a/src/mongo/client/server_is_master_monitor.cpp
+++ b/src/mongo/client/server_is_master_monitor.cpp
@@ -36,10 +36,13 @@
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/egress_metadata_hook_list.h"
+#include "mongo/util/log.h"
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS);
+
const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1);
using executor::NetworkInterface;
@@ -53,10 +56,12 @@ const Milliseconds kZeroMs = Milliseconds{0};
SingleServerIsMasterMonitor::SingleServerIsMasterMonitor(
const MongoURI& setUri,
const sdam::ServerAddress& host,
+ boost::optional<TopologyVersion> topologyVersion,
Milliseconds heartbeatFrequencyMS,
sdam::TopologyEventsPublisherPtr eventListener,
std::shared_ptr<executor::TaskExecutor> executor)
: _host(host),
+ _topologyVersion(topologyVersion),
_eventListener(eventListener),
_executor(executor),
_heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)),
@@ -103,7 +108,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() {
return;
}
- const auto currentRefreshPeriod = _currentRefreshPeriod(lock);
+ const auto currentRefreshPeriod = _currentRefreshPeriod(lock, false);
const Milliseconds timeSinceLastCheck =
(_lastIsMasterAt) ? _executor->now() - *_lastIsMasterAt : Milliseconds::max();
@@ -161,19 +166,111 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d
}
void SingleServerIsMasterMonitor::_doRemoteCommand() {
- auto request = executor::RemoteCommandRequest(
- HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS);
- request.sslMode = _setUri.getSSLMode();
-
stdx::lock_guard lock(_mutex);
if (_isShutdown)
return;
+ StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() {
+ if (_topologyVersion) {
+ return _scheduleStreamableIsMaster();
+ }
+
+ return _scheduleSingleIsMaster();
+ }();
+
+ if (!swCbHandle.isOK()) {
+ uasserted(46156012, swCbHandle.getStatus().toString());
+ }
+
+ _isMasterOutstanding = true;
+ _remoteCommandHandle = swCbHandle.getValue();
+}
+
+StatusWith<TaskExecutor::CallbackHandle>
+SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() {
+ auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs);
+ overrideMaxAwaitTimeMS.execute(
+ [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); });
+ auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << maxAwaitTimeMS
+ << "topologyVersion" << _topologyVersion->toBSON());
+
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs;
+ auto request = executor::RemoteCommandRequest(
+ HostAndPort(_host), "admin", isMasterCmd, nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
+
+ Timer timer;
+ auto swCbHandle = _executor->scheduleExhaustRemoteCommand(
+ std::move(request),
+ [self = shared_from_this(),
+ timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ Milliseconds nextRefreshPeriod;
+ {
+ stdx::lock_guard lk(self->_mutex);
+
+ if (self->_isShutdown) {
+ self->_isMasterOutstanding = false;
+ LOGV2_DEBUG(4495400,
+ kLogLevel,
+ "RSM {setName} not processing response: {status}",
+ "status"_attr = result.response.status,
+ "setName"_attr = self->_setUri.getSetName());
+ return;
+ }
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ self->_lastIsMasterAt = self->_executor->now();
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
+ }
+
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for
+ // RTT instead
+ Microseconds latency(timer.micros());
+ LOGV2_DEBUG(43332190,
+ 0,
+ "RSM {setName} exhuast latency is ms: {status}",
+ "status"_attr = result.response.status,
+ "setName"_attr = self->_setUri.getSetName());
+ if (result.response.isOK()) {
+ self->_onIsMasterSuccess(latency, result.response.data);
+ } else {
+ self->_onIsMasterFailure(latency, result.response.status, result.response.data);
+ }
+ });
+
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT
+ // instead. Also move this block into _doRemoteCommand because we will not need the latency
+ // value here.
+ if (!swCbHandle.isOK()) {
+ Microseconds latency(timer.micros());
+ _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
+ }
+
+ return swCbHandle;
+}
+
+StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() {
+ _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS;
+ auto request = executor::RemoteCommandRequest(
+ HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS);
+ request.sslMode = _setUri.getSSLMode();
+
Timer timer;
auto swCbHandle = _executor->scheduleRemoteCommand(
std::move(request),
- [this, self = shared_from_this(), timer](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
+ [self = shared_from_this(),
+ timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable {
Milliseconds nextRefreshPeriod;
{
stdx::lock_guard lk(self->_mutex);
@@ -184,21 +281,29 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() {
kLogLevel,
"RSM {setName} not processing response: {status}",
"status"_attr = result.response.status,
- "setName"_attr = _setUri.getSetName());
+ "setName"_attr = self->_setUri.getSetName());
return;
}
self->_lastIsMasterAt = self->_executor->now();
- nextRefreshPeriod = self->_currentRefreshPeriod(lk);
-
- LOGV2_DEBUG(4333228,
- kLogLevel + 1,
- "RSM {setName} next refresh period in {period}",
- "period"_attr = nextRefreshPeriod.toString(),
- "setName"_attr = _setUri.getSetName());
- self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+
+ auto responseTopologyVersion = result.response.data.getField("topologyVersion");
+ if (responseTopologyVersion) {
+ self->_topologyVersion = TopologyVersion::parse(
+ IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj());
+ } else {
+ self->_topologyVersion = boost::none;
+ }
+
+ if (!result.response.isOK() || !result.response.moreToCome) {
+ self->_isMasterOutstanding = false;
+ nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK());
+ self->_scheduleNextIsMaster(lk, nextRefreshPeriod);
+ }
}
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for
+ // RTT instead
Microseconds latency(timer.micros());
if (result.response.isOK()) {
self->_onIsMasterSuccess(latency, result.response.data);
@@ -207,20 +312,22 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() {
}
});
+ // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT
+ // instead. Also move this block into _doRemoteCommand because we will not need the latency
+ // value here.
if (!swCbHandle.isOK()) {
Microseconds latency(timer.micros());
_onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj());
- uasserted(31448, swCbHandle.getStatus().toString());
}
- _isMasterOutstanding = true;
- _remoteCommandHandle = swCbHandle.getValue();
+ return swCbHandle;
}
void SingleServerIsMasterMonitor::shutdown() {
stdx::lock_guard lock(_mutex);
- if (std::exchange(_isShutdown, true))
+ if (std::exchange(_isShutdown, true)) {
return;
+ }
LOGV2_DEBUG(4333220,
kLogLevel + 1,
@@ -296,7 +403,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or
return r;
}
-Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) {
+Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock,
+ bool scheduleImmediately) {
+ if (scheduleImmediately)
+ return Milliseconds(0);
+
return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS
: _heartbeatFrequencyMS;
}
@@ -386,6 +497,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent(
_singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>(
_setUri,
serverAddress,
+ serverDescription->getTopologyVersion(),
_sdamConfiguration.getHeartBeatFrequency(),
_eventPublisher,
_executor);
diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h
index bdc3da9ec69..75121c44001 100644
--- a/src/mongo/client/server_is_master_monitor.h
+++ b/src/mongo/client/server_is_master_monitor.h
@@ -40,6 +40,7 @@ class SingleServerIsMasterMonitor
public:
explicit SingleServerIsMasterMonitor(const MongoURI& setUri,
const ServerAddress& host,
+ boost::optional<TopologyVersion> topologyVersion,
Milliseconds heartbeatFrequencyMS,
TopologyEventsPublisherPtr eventListener,
std::shared_ptr<executor::TaskExecutor> executor);
@@ -56,15 +57,25 @@ public:
void requestImmediateCheck();
void disableExpeditedChecking();
+ static constexpr Milliseconds kMaxAwaitTimeMs = Milliseconds(10000);
+
private:
void _scheduleNextIsMaster(WithLock, Milliseconds delay);
void _doRemoteCommand();
+ // Use the awaitable isMaster protocol with the exhaust bit set. Attach _topologyVersion and
+ // kMaxAwaitTimeMS to the request.
+ StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleStreamableIsMaster();
+
+ // Use the old isMaster protocol. Do not attach _topologyVersion or kMaxAwaitTimeMS to the
+ // request.
+ StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleSingleIsMaster();
+
void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson);
void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson);
Milliseconds _overrideRefreshPeriod(Milliseconds original);
- Milliseconds _currentRefreshPeriod(WithLock);
+ Milliseconds _currentRefreshPeriod(WithLock, bool scheduleImmediately);
void _cancelOutstandingRequest(WithLock);
static constexpr auto kLogLevel = 0;
@@ -72,6 +83,7 @@ private:
Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SingleServerIsMasterMonitor::mutex");
ServerAddress _host;
+ boost::optional<TopologyVersion> _topologyVersion;
TopologyEventsPublisherPtr _eventListener;
std::shared_ptr<executor::TaskExecutor> _executor;
Milliseconds _heartbeatFrequencyMS;
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index d269fb2443f..abc02a2db61 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -58,8 +58,7 @@ public:
using Response = RemoteCommandResponse;
using RemoteCommandCompletionFn =
unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
- using RemoteCommandOnReplyFn =
- unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>;
+ using RemoteCommandOnReplyFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
virtual ~NetworkInterface();
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index 66c67b9079c..ae22469902a 100644
--- a/src/mongo/executor/network_interface_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -133,16 +133,16 @@ Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand(
cbHandle,
rcroa,
[p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)](
- const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable {
+ const TaskExecutor::ResponseOnAnyStatus& rs) mutable {
exhaustUtilCB(rs);
if (!rs.status.isOK()) {
- invariant(!isMoreToComeSet);
+ invariant(!rs.moreToCome);
p.setError(rs.status);
return;
}
- if (!isMoreToComeSet) {
+ if (!rs.moreToCome) {
p.emplaceValue();
}
},
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index fcae4dee869..da7c3bcc11a 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -874,10 +874,12 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface
.onError([state](Status error) {
stdx::lock_guard lk(state->_onReplyMutex);
state->onReplyFn(RemoteCommandOnAnyResponse(
- boost::none, std::move(error), state->stopwatch.elapsed()),
- false);
+ boost::none, std::move(error), state->stopwatch.elapsed()));
})
- .getAsync([state](Status status) { state->tryFinish(status); });
+ .getAsync([state](Status status) {
+ state->tryFinish(
+ Status{ErrorCodes::ExhaustCommandFinished, "Exhaust command finished"});
+ });
{
stdx::lock_guard lk(interface->_inProgressMutex);
@@ -891,37 +893,64 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque
auto requestState = requestManager->getRequest(reqId);
invariant(requestState);
- auto clientCallback = [this, requestState](const RemoteCommandResponse& response,
- bool isMoreToComeSet) {
- // Stash this response on the command state to be used to fulfill the promise.
- prevResponse = response;
+ auto clientCallback = [this, requestState](const RemoteCommandResponse& response) {
auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response);
doMetadataHook(onAnyResponse);
// If the command failed, we will call 'onReply' as a part of the future chain paired with
// the promise. This is to be sure that all error paths will run 'onReply' only once upon
// future completion.
- if (!getStatusFromCommandResult(response.data).isOK()) {
+ if (!response.status.isOK() || !getStatusFromCommandResult(response.data).isOK()) {
// The moreToCome bit should *not* be set if the command failed
- invariant(!isMoreToComeSet);
+ invariant(!response.moreToCome);
+ return;
+ }
+
+ stdx::lock_guard lk(_onReplyMutex);
+ onReplyFn(onAnyResponse);
+ };
+
+ auto& reactor = requestState->interface()->_reactor;
+ handleExhaustResponseFn =
+ [ this, requestState, reactor, clientCallback = std::move(clientCallback) ](
+ StatusWith<RemoteCommandResponse> swResponse) mutable noexcept {
+ RemoteCommandResponse response;
+ if (!swResponse.isOK()) {
+ response = RemoteCommandResponse(std::move(swResponse.getStatus()));
+ } else {
+ response = std::move(swResponse.getValue());
+ }
+
+ clientCallback(response);
+
+ if (!response.moreToCome) {
+ finalResponsePromise.emplaceValue(response);
+ return;
+ }
+
+ if (requestState->interface()->inShutdown()) {
return;
}
// Reset the stopwatch to measure the correct duration for the folowing reply
stopwatch.restart();
+ if (deadline != RemoteCommandRequest::kNoExpirationDate) {
+ deadline = stopwatch.start() + requestOnAny.timeout;
+ }
setTimer();
-
- stdx::lock_guard lk(_onReplyMutex);
- onReplyFn(onAnyResponse, isMoreToComeSet);
+ requestState->client()->awaitExhaustCommand(baton).thenRunOn(reactor).getAsync(
+ handleExhaustResponseFn);
};
- return makeReadyFutureWith(
- [this, requestState, clientCallback = std::move(clientCallback)]() mutable {
- setTimer();
- return requestState->client()->runExhaustCommandRequest(
- *requestState->request, std::move(clientCallback), baton);
- })
- .then([this, requestState] { return prevResponse; });
+ setTimer();
+ requestState->client()
+ ->beginExhaustCommandRequest(*requestState->request, baton)
+ .thenRunOn(reactor)
+ .getAsync(handleExhaustResponseFn);
+
+ auto [promise, future] = makePromiseFuture<RemoteCommandResponse>();
+ finalResponsePromise = std::move(promise);
+ return std::move(future).then([this](const auto& finalResponse) { return finalResponse; });
}
void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise(
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index 34bec18a411..c35d9d30023 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -199,10 +199,10 @@ private:
void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override;
Promise<void> promise;
- RemoteCommandResponse prevResponse;
- Mutex _onReplyMutex =
- MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex");
+ Promise<RemoteCommandResponse> finalResponsePromise;
+ Mutex _onReplyMutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_onReplyMutex");
RemoteCommandOnReplyFn onReplyFn;
+ std::function<void(StatusWith<RemoteCommandResponse>)> handleExhaustResponseFn;
};
enum class ConnStatus { Unset, OK, Failed };
diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp
index 8baafdb3d67..0b88b3d340d 100644
--- a/src/mongo/executor/remote_command_response.cpp
+++ b/src/mongo/executor/remote_command_response.cpp
@@ -55,8 +55,10 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(Status s, Milliseconds mill
invariant(!isOK());
};
-RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis)
- : data(std::move(dataObj)), elapsedMillis(millis) {
+RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj,
+ Milliseconds millis,
+ bool moreToCome)
+ : data(std::move(dataObj)), elapsedMillis(millis), moreToCome(moreToCome) {
// The buffer backing the default empty BSONObj has static duration so it is effectively
// owned.
invariant(data.isOwned() || data.objdata() == BSONObj().objdata());
@@ -65,8 +67,9 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Millisecon
// TODO(amidvidy): we currently discard output docs when we use this constructor. We should
// have RCR hold those too, but we need more machinery before that is possible.
RemoteCommandResponseBase::RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply,
- Milliseconds millis)
- : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis)) {}
+ Milliseconds millis,
+ bool moreToCome)
+ : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis), moreToCome) {}
bool RemoteCommandResponseBase::isOK() const {
return status.isOK();
diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h
index 2cd90b37974..7842ebdfe14 100644
--- a/src/mongo/executor/remote_command_response.h
+++ b/src/mongo/executor/remote_command_response.h
@@ -63,15 +63,18 @@ struct RemoteCommandResponseBase {
RemoteCommandResponseBase(Status s, Milliseconds millis);
- RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis);
+ RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis, bool moreToCome = false);
- RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, Milliseconds millis);
+ RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply,
+ Milliseconds millis,
+ bool moreToCome = false);
bool isOK() const;
BSONObj data; // Always owned. May point into message.
boost::optional<Milliseconds> elapsedMillis;
Status status = Status::OK();
+ bool moreToCome = false; // Whether or not the moreToCome bit is set on an exhaust message.
protected:
~RemoteCommandResponseBase() = default;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 10a57699045..7b76b6853b8 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -690,8 +690,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust
auto commandStatus = _net->startExhaustCommand(
swCbHandle.getValue(),
scheduledRequest,
- [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response,
- bool isMoreToComeSet) {
+ [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response) {
using std::swap;
LOGV2_DEBUG(
@@ -706,16 +705,26 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust
return;
}
+ if (cbState->canceled.load()) {
+ _networkInProgressQueue.erase(cbState->iter);
+ return;
+ }
+
// Swap the callback function with the new one
CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) {
remoteCommandFinished(cbData, cb, scheduledRequest, response);
};
swap(cbState->callback, newCb);
-
// If this is the last response, invoke the non-exhaust path. This will mark cbState as
// finished and remove the task from _networkInProgressQueue
- if (!isMoreToComeSet) {
- scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
+ if (!response.moreToCome) {
+ _networkInProgressQueue.erase(cbState->iter);
+
+ WorkQueue result;
+ result.emplace_front(cbState);
+ result.front()->iter = result.begin();
+
+ scheduleIntoPool_inlock(&result, std::move(lk));
return;
}
@@ -773,26 +782,23 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) {
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, cbState);
+ auto canceled = cbState->canceled.load();
CallbackArgs args(this,
std::move(cbHandle),
- cbState->canceled.load()
- ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
- : Status::OK());
- invariant(!cbState->isFinished.load());
- {
- // After running callback function, clear 'cbStateArg->callback' to release any resources
- // that might be held by this function object.
- // Swap 'cbStateArg->callback' with temporary copy before running callback for exception
- // safety.
- TaskExecutor::CallbackFn callback;
- std::swap(cbState->callback, callback);
- callback(std::move(args));
+ canceled ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
+ : Status::OK());
+ if (!cbState->isFinished.load()) {
+ cbState->callback(std::move(args));
}
// Do not mark cbState as finished. It will be marked as finished on the last reply.
stdx::lock_guard<Latch> lk(_mutex);
- invariant(cbState->exhaustIter);
- _poolInProgressQueue.erase(cbState->exhaustIter.get());
+
+ if (cbState->exhaustIter) {
+ _poolInProgressQueue.erase(cbState->exhaustIter.get());
+ cbState->exhaustIter = boost::none;
+ }
+
if (_inShutdown_inlock() && _poolInProgressQueue.empty()) {
_stateChange.notify_all();
}
diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
index d5eeeac0504..f6a8ccf4f75 100644
--- a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
@@ -163,11 +163,6 @@ TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) {
ASSERT(cbHandle.isValid());
executor()->cancel(cbHandle);
ASSERT(cbHandle.isCanceled());
- auto counters = exhaustRequestHandler.getCountersWhenReady();
-
- // The command was cancelled so the 'fail' counter should be incremented
- ASSERT_EQ(counters._success, 2);
- ASSERT_EQ(counters._failed, 1);
// The tasks should be removed after 'isMaster' fails
ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5)));
diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp
index ec7981b615a..9cf669ec978 100644
--- a/src/mongo/transport/transport_layer_asio_integration_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp
@@ -152,42 +152,6 @@ TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) {
ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout);
}
-class ExhaustRequestHandlerUtil {
-public:
- AsyncDBClient::RemoteCommandCallbackFn&& getExhaustRequestCallbackFn() {
- return std::move(_callbackFn);
- }
-
- executor::RemoteCommandResponse getReplyObjectWhenReady() {
- stdx::unique_lock<Latch> lk(_mutex);
- _cv.wait(_mutex, [&] { return _replyUpdated; });
- _replyUpdated = false;
- return _reply;
- }
-
-private:
- // holds the server's response once it sent one
- executor::RemoteCommandResponse _reply;
- // set to true once 'reply' has been set. Used to indicate that a new response has been set and
- // should be inspected.
- bool _replyUpdated = false;
-
- Mutex _mutex = MONGO_MAKE_LATCH();
- stdx::condition_variable _cv;
-
- // called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated.
- AsyncDBClient::RemoteCommandCallbackFn _callbackFn =
- [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) {
- {
- stdx::unique_lock<Latch> lk(_mutex);
- _reply = response;
- _replyUpdated = true;
- }
-
- _cv.notify_all();
- };
-};
-
TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
auto connectionString = unittest::getFixtureConnectionString();
auto server = connectionString.getServers().front();
@@ -217,29 +181,27 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
BSONObj(),
nullptr};
- ExhaustRequestHandlerUtil exhaustRequestHandler;
- Future<void> exhaustFuture = handle->runExhaustCommandRequest(
- isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn());
+ Future<executor::RemoteCommandResponse> beginExhaustFuture =
+ handle->beginExhaustCommandRequest(isMasterRequest);
Date_t prevTime;
TopologyVersion topologyVersion;
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = beginExhaustFuture.get();
- ASSERT(!exhaustFuture.isReady());
ASSERT_OK(reply.status);
+ ASSERT(reply.moreToCome);
prevTime = reply.data.getField("localTime").Date();
topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"),
reply.data.getField("topologyVersion").Obj());
}
+ Future<executor::RemoteCommandResponse> awaitExhaustFuture = handle->awaitExhaustCommand();
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = awaitExhaustFuture.get();
- // The moreToCome bit is still set
- ASSERT(!exhaustFuture.isReady());
ASSERT_OK(reply.status);
-
+ ASSERT(reply.moreToCome);
auto replyTime = reply.data.getField("localTime").Date();
ASSERT_GT(replyTime, prevTime);
@@ -249,12 +211,22 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) {
ASSERT_EQ(replyTopologyVersion.getCounter(), topologyVersion.getCounter());
}
- handle->cancel();
- handle->end();
- auto error = exhaustFuture.getNoThrow();
- // exhaustFuture will resolve with CallbackCanceled unless the socket is already closed, in
- // which case it will resolve with HostUnreachable.
- ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable));
+ Future<executor::RemoteCommandResponse> cancelExhaustFuture = handle->awaitExhaustCommand();
+ {
+ handle->cancel();
+ handle->end();
+ auto swReply = cancelExhaustFuture.getNoThrow();
+
+ // The original isMaster request has maxAwaitTimeMs = 1000 ms, if the cancel executes before
+ // the 1000ms then we expect the future to resolve with an error. It should resolve with
+ // CallbackCanceled unless the socket is already closed, in which case it will resolve with
+ // HostUnreachable. If the network is slow, the server may response before the cancel
+ // executes however.
+ if (!swReply.getStatus().isOK()) {
+ ASSERT((swReply.getStatus() == ErrorCodes::CallbackCanceled) ||
+ (swReply.getStatus() == ErrorCodes::HostUnreachable));
+ }
+ }
}
TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
@@ -323,16 +295,14 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) {
BSONObj(),
nullptr};
- ExhaustRequestHandlerUtil exhaustRequestHandler;
- Future<void> exhaustFuture = isMasterHandle->runExhaustCommandRequest(
- isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn());
-
+ Future<executor::RemoteCommandResponse> beginExhaustFuture =
+ isMasterHandle->beginExhaustCommandRequest(isMasterRequest);
{
- auto reply = exhaustRequestHandler.getReplyObjectWhenReady();
+ auto reply = beginExhaustFuture.get();
- exhaustFuture.get();
ASSERT_OK(reply.status);
ASSERT_EQ(reply.data["ok"].Double(), 0.0);
+ ASSERT(!reply.moreToCome);
}
}