diff options
author | jannaerin <golden.janna@gmail.com> | 2020-03-04 12:08:42 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-25 04:30:03 +0000 |
commit | 8124a8d047ce142f6d6defc089e5e71192721a5c (patch) | |
tree | dddfd98d579eb288b31bb28cb29072dbc37b2c20 | |
parent | 4601bd54dfd3f3ab20b357b71a4b17667143c0fb (diff) | |
download | mongo-8124a8d047ce142f6d6defc089e5e71192721a5c.tar.gz |
SERVER-44954 Streamable RSM uses exhaust isMaster
17 files changed, 317 insertions, 196 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 8abedc1d771..3174589f4ff 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -28,8 +28,9 @@ selector: - jstests/sharding/test_stacked_migration_cleanup.js - jstests/sharding/killop.js - jstests/sharding/verify_sessions_expiration_sharded.js - # Enable when SERVER-44733 is backported - jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js + - jstests/sharding/repl_monitor_refresh.js + - jstests/sharding/retryable_write_error_labels.js # Enable when SERVER-43310 is backported - jstests/sharding/cluster_create_indexes_always_routes_through_primary.js diff --git a/jstests/sharding/repl_monitor_refresh.js b/jstests/sharding/repl_monitor_refresh.js index 20f1d930d98..b41fc42d761 100644 --- a/jstests/sharding/repl_monitor_refresh.js +++ b/jstests/sharding/repl_monitor_refresh.js @@ -3,6 +3,7 @@ load("jstests/replsets/rslib.js"); /** * Test for making sure that the replica seed list in the config server does not * become invalid when a replica set reconfig happens. + * @tags: [multiversion_incompatible] */ (function() { "use strict"; @@ -45,6 +46,14 @@ jsTest.log('Changing conf to ' + tojson(confDoc)); reconfig(replTest, confDoc); awaitRSClientHosts(mongos, {host: targetHostName}, {ok: true, ismaster: true}); +let rsConfig = st.rs0.getReplSetConfigFromNode(); +assert.soon(function() { + const res = st.rs0.getPrimary().adminCommand({replSetGetStatus: 1}); + return ((res.members[0].configVersion === rsConfig.version) && + (res.members[2].configVersion === rsConfig.version) && + (res.members[0].configTerm === rsConfig.term) && + (res.members[2].configTerm === rsConfig.term)); +}); // Remove first node from set confDoc.members.shift(); diff --git a/jstests/sharding/retryable_write_error_labels.js b/jstests/sharding/retryable_write_error_labels.js index fd0eb021137..54b6cd41424 100644 --- a/jstests/sharding/retryable_write_error_labels.js +++ b/jstests/sharding/retryable_write_error_labels.js @@ -14,7 +14,11 @@ const dbName = "test"; const collName = "retryable_write_error_labels"; // Use ShardingTest because we need to test both mongod and mongos behaviors. -const st = new ShardingTest({config: 1, mongos: 1, shards: 1}); +const st = new ShardingTest({ + config: 1, + mongos: {s0: {setParameter: {"failpoint.overrideMaxAwaitTimeMS": "{'mode':'alwaysOn'}"}}}, + shards: 1 +}); const primary = st.rs0.getPrimary(); assert.commandWorked(primary.getDB(dbName).runCommand( @@ -76,8 +80,11 @@ function runTest(errorCode, expectLabel, isWCError, isMongos) { // Test retryable writes. jsTestLog("Retryable write should return error " + errorCode + withOrWithout + " RetryableWriteError label"); - let res = testDB.runCommand( - {insert: collName, documents: [{a: errorCode, b: "retryable"}], txnNumber: NumberLong(0)}); + let res = testDB.runCommand({ + insert: collName, + documents: [{a: errorCode, b: "retryable"}], + txnNumber: NumberLong(0), + }); checkErrorCode(res, errorCode, isWCError); checkErrorLabels(res, expectLabel); @@ -96,7 +103,7 @@ function runTest(errorCode, expectLabel, isWCError, isMongos) { res = sessionDb.adminCommand({ commitTransaction: 1, txnNumber: NumberLong(session.getTxnNumber_forTesting()), - autocommit: false + autocommit: false, }); checkErrorCode(res, errorCode, isWCError); checkErrorLabels(res, expectLabel); @@ -133,7 +140,7 @@ function runTest(errorCode, expectLabel, isWCError, isMongos) { res = sessionDb.adminCommand({ abortTransaction: 1, txnNumber: NumberLong(session.getTxnNumber_forTesting()), - autocommit: false + autocommit: false, }); checkErrorCode(res, errorCode, isWCError); checkErrorLabels(res, expectLabel); @@ -202,5 +209,7 @@ runTest(ErrorCodes.WriteConcernFailed, true /* isWCError */, true /* isMongos */); +st.s.adminCommand({"configureFailPoint": "overrideMaxAwaitTimeMS", "mode": "off"}); + st.stop(); }()); diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 6ecc44257aa..899a94eaae3 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -351,6 +351,7 @@ error_codes: - {code: 306,name: ReadThroughCacheLookupCanceled} - {code: 307,name: RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist} + - {code: 308,name: ExhaustCommandFinished} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index 99d93dab68a..f25830de073 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -316,55 +316,41 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -Future<void> AsyncDBClient::_continueReceiveExhaustResponse( - ExhaustRequestParameters&& exhaustRequestParameters, - boost::optional<int32_t> msgId, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse( + ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) { return _waitForResponse(msgId, baton) - .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this]( - Message responseMsg) mutable -> Future<void> { - // Run callback - auto now = exhaustParameters.clkSource->now(); - auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start); + .then([stopwatch, msgId, baton, this](Message responseMsg) mutable { bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome); rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg)); - exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration), - isMoreToComeSet); - - if (!isMoreToComeSet) { - return Status::OK(); - } - - exhaustParameters.start = now; - return _continueReceiveExhaustResponse( - std::move(exhaustParameters), boost::none, baton); + auto rcResponse = executor::RemoteCommandResponse( + *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet); + return rcResponse; }); } -Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand( + const BatonHandle& baton) { + return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton); +} + +Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request, + const BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported); - auto clkSource = _svcCtx->getPreciseClockSource(); - auto start = clkSource->now(); auto msgId = nextMessageId(); - return _call(std::move(requestMsg), msgId, baton) - .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable { - ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start}; - return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton); - }); + return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable { + return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton); + }); } -Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest( + executor::RemoteCommandRequest request, const BatonHandle& baton) { auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton); + return runExhaustCommand(std::move(opMsgRequest), baton); } void AsyncDBClient::cancel(const BatonHandle& baton) { diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index c7917bb5461..d2279374dda 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -47,19 +47,6 @@ namespace mongo { class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> { public: - using RemoteCommandCallbackFn = - unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>; - - struct ExhaustRequestParameters { - ExhaustRequestParameters(ExhaustRequestParameters&&) = default; - ExhaustRequestParameters(const ExhaustRequestParameters&) = delete; - ExhaustRequestParameters& operator=(const ExhaustRequestParameters&) = delete; - - RemoteCommandCallbackFn cb; - ClockSource* clkSource; - Date_t start; - }; - explicit AsyncDBClient(const HostAndPort& peer, transport::SessionHandle session, ServiceContext* svcCtx) @@ -79,12 +66,11 @@ public: const BatonHandle& baton = nullptr, bool fireAndForget = false); - Future<void> runExhaustCommandRequest(executor::RemoteCommandRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton = nullptr); - Future<void> runExhaustCommand(OpMsgRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> beginExhaustCommandRequest( + executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> runExhaustCommand(OpMsgRequest request, + const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> awaitExhaustCommand(const BatonHandle& baton = nullptr); Future<void> authenticate(const BSONObj& params); @@ -108,8 +94,8 @@ public: const HostAndPort& local() const; private: - Future<void> _continueReceiveExhaustResponse( - ExhaustRequestParameters&& exhaustRequestParameters, + Future<executor::RemoteCommandResponse> _continueReceiveExhaustResponse( + ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton = nullptr); Future<Message> _waitForResponse(boost::optional<int32_t> msgId, diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp index 7f70753a772..f399ffeea38 100644 --- a/src/mongo/client/server_is_master_monitor.cpp +++ b/src/mongo/client/server_is_master_monitor.cpp @@ -36,10 +36,13 @@ #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/rpc/metadata/egress_metadata_hook_list.h" +#include "mongo/util/log.h" namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS); + const BSONObj IS_MASTER_BSON = BSON("isMaster" << 1); using executor::NetworkInterface; @@ -53,10 +56,12 @@ const Milliseconds kZeroMs = Milliseconds{0}; SingleServerIsMasterMonitor::SingleServerIsMasterMonitor( const MongoURI& setUri, const sdam::ServerAddress& host, + boost::optional<TopologyVersion> topologyVersion, Milliseconds heartbeatFrequencyMS, sdam::TopologyEventsPublisherPtr eventListener, std::shared_ptr<executor::TaskExecutor> executor) : _host(host), + _topologyVersion(topologyVersion), _eventListener(eventListener), _executor(executor), _heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)), @@ -103,7 +108,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() { return; } - const auto currentRefreshPeriod = _currentRefreshPeriod(lock); + const auto currentRefreshPeriod = _currentRefreshPeriod(lock, false); const Milliseconds timeSinceLastCheck = (_lastIsMasterAt) ? _executor->now() - *_lastIsMasterAt : Milliseconds::max(); @@ -161,19 +166,111 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d } void SingleServerIsMasterMonitor::_doRemoteCommand() { - auto request = executor::RemoteCommandRequest( - HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS); - request.sslMode = _setUri.getSSLMode(); - stdx::lock_guard lock(_mutex); if (_isShutdown) return; + StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() { + if (_topologyVersion) { + return _scheduleStreamableIsMaster(); + } + + return _scheduleSingleIsMaster(); + }(); + + if (!swCbHandle.isOK()) { + uasserted(46156012, swCbHandle.getStatus().toString()); + } + + _isMasterOutstanding = true; + _remoteCommandHandle = swCbHandle.getValue(); +} + +StatusWith<TaskExecutor::CallbackHandle> +SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() { + auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs); + overrideMaxAwaitTimeMS.execute( + [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); }); + auto isMasterCmd = BSON("isMaster" << 1 << "maxAwaitTimeMS" << maxAwaitTimeMS + << "topologyVersion" << _topologyVersion->toBSON()); + + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs; + auto request = executor::RemoteCommandRequest( + HostAndPort(_host), "admin", isMasterCmd, nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); + + Timer timer; + auto swCbHandle = _executor->scheduleExhaustRemoteCommand( + std::move(request), + [self = shared_from_this(), + timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + Milliseconds nextRefreshPeriod; + { + stdx::lock_guard lk(self->_mutex); + + if (self->_isShutdown) { + self->_isMasterOutstanding = false; + LOGV2_DEBUG(4495400, + kLogLevel, + "RSM {setName} not processing response: {status}", + "status"_attr = result.response.status, + "setName"_attr = self->_setUri.getSetName()); + return; + } + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + self->_lastIsMasterAt = self->_executor->now(); + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } + } + + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for + // RTT instead + Microseconds latency(timer.micros()); + LOGV2_DEBUG(43332190, + 0, + "RSM {setName} exhuast latency is ms: {status}", + "status"_attr = result.response.status, + "setName"_attr = self->_setUri.getSetName()); + if (result.response.isOK()) { + self->_onIsMasterSuccess(latency, result.response.data); + } else { + self->_onIsMasterFailure(latency, result.response.status, result.response.data); + } + }); + + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT + // instead. Also move this block into _doRemoteCommand because we will not need the latency + // value here. + if (!swCbHandle.isOK()) { + Microseconds latency(timer.micros()); + _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); + } + + return swCbHandle; +} + +StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() { + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS; + auto request = executor::RemoteCommandRequest( + HostAndPort(_host), "admin", IS_MASTER_BSON, nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); + Timer timer; auto swCbHandle = _executor->scheduleRemoteCommand( std::move(request), - [this, self = shared_from_this(), timer]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + [self = shared_from_this(), + timer](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { Milliseconds nextRefreshPeriod; { stdx::lock_guard lk(self->_mutex); @@ -184,21 +281,29 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() { kLogLevel, "RSM {setName} not processing response: {status}", "status"_attr = result.response.status, - "setName"_attr = _setUri.getSetName()); + "setName"_attr = self->_setUri.getSetName()); return; } self->_lastIsMasterAt = self->_executor->now(); - nextRefreshPeriod = self->_currentRefreshPeriod(lk); - - LOGV2_DEBUG(4333228, - kLogLevel + 1, - "RSM {setName} next refresh period in {period}", - "period"_attr = nextRefreshPeriod.toString(), - "setName"_attr = _setUri.getSetName()); - self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } } + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for + // RTT instead Microseconds latency(timer.micros()); if (result.response.isOK()) { self->_onIsMasterSuccess(latency, result.response.data); @@ -207,20 +312,22 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() { } }); + // TODO SERVER-46681: Do not pass this latency, we will use the ServerPingMonitor for RTT + // instead. Also move this block into _doRemoteCommand because we will not need the latency + // value here. if (!swCbHandle.isOK()) { Microseconds latency(timer.micros()); _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); - uasserted(31448, swCbHandle.getStatus().toString()); } - _isMasterOutstanding = true; - _remoteCommandHandle = swCbHandle.getValue(); + return swCbHandle; } void SingleServerIsMasterMonitor::shutdown() { stdx::lock_guard lock(_mutex); - if (std::exchange(_isShutdown, true)) + if (std::exchange(_isShutdown, true)) { return; + } LOGV2_DEBUG(4333220, kLogLevel + 1, @@ -296,7 +403,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or return r; } -Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) { +Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock, + bool scheduleImmediately) { + if (scheduleImmediately) + return Milliseconds(0); + return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS : _heartbeatFrequencyMS; } @@ -386,6 +497,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent( _singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>( _setUri, serverAddress, + serverDescription->getTopologyVersion(), _sdamConfiguration.getHeartBeatFrequency(), _eventPublisher, _executor); diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h index bdc3da9ec69..75121c44001 100644 --- a/src/mongo/client/server_is_master_monitor.h +++ b/src/mongo/client/server_is_master_monitor.h @@ -40,6 +40,7 @@ class SingleServerIsMasterMonitor public: explicit SingleServerIsMasterMonitor(const MongoURI& setUri, const ServerAddress& host, + boost::optional<TopologyVersion> topologyVersion, Milliseconds heartbeatFrequencyMS, TopologyEventsPublisherPtr eventListener, std::shared_ptr<executor::TaskExecutor> executor); @@ -56,15 +57,25 @@ public: void requestImmediateCheck(); void disableExpeditedChecking(); + static constexpr Milliseconds kMaxAwaitTimeMs = Milliseconds(10000); + private: void _scheduleNextIsMaster(WithLock, Milliseconds delay); void _doRemoteCommand(); + // Use the awaitable isMaster protocol with the exhaust bit set. Attach _topologyVersion and + // kMaxAwaitTimeMS to the request. + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleStreamableIsMaster(); + + // Use the old isMaster protocol. Do not attach _topologyVersion or kMaxAwaitTimeMS to the + // request. + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleSingleIsMaster(); + void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson); void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson); Milliseconds _overrideRefreshPeriod(Milliseconds original); - Milliseconds _currentRefreshPeriod(WithLock); + Milliseconds _currentRefreshPeriod(WithLock, bool scheduleImmediately); void _cancelOutstandingRequest(WithLock); static constexpr auto kLogLevel = 0; @@ -72,6 +83,7 @@ private: Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SingleServerIsMasterMonitor::mutex"); ServerAddress _host; + boost::optional<TopologyVersion> _topologyVersion; TopologyEventsPublisherPtr _eventListener; std::shared_ptr<executor::TaskExecutor> _executor; Milliseconds _heartbeatFrequencyMS; diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index d269fb2443f..abc02a2db61 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -58,8 +58,7 @@ public: using Response = RemoteCommandResponse; using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>; - using RemoteCommandOnReplyFn = - unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>; + using RemoteCommandOnReplyFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>; virtual ~NetworkInterface(); diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index 66c67b9079c..ae22469902a 100644 --- a/src/mongo/executor/network_interface_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -133,16 +133,16 @@ Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand( cbHandle, rcroa, [p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)]( - const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable { + const TaskExecutor::ResponseOnAnyStatus& rs) mutable { exhaustUtilCB(rs); if (!rs.status.isOK()) { - invariant(!isMoreToComeSet); + invariant(!rs.moreToCome); p.setError(rs.status); return; } - if (!isMoreToComeSet) { + if (!rs.moreToCome) { p.emplaceValue(); } }, diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index fcae4dee869..da7c3bcc11a 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -874,10 +874,12 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface .onError([state](Status error) { stdx::lock_guard lk(state->_onReplyMutex); state->onReplyFn(RemoteCommandOnAnyResponse( - boost::none, std::move(error), state->stopwatch.elapsed()), - false); + boost::none, std::move(error), state->stopwatch.elapsed())); }) - .getAsync([state](Status status) { state->tryFinish(status); }); + .getAsync([state](Status status) { + state->tryFinish( + Status{ErrorCodes::ExhaustCommandFinished, "Exhaust command finished"}); + }); { stdx::lock_guard lk(interface->_inProgressMutex); @@ -891,37 +893,64 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque auto requestState = requestManager->getRequest(reqId); invariant(requestState); - auto clientCallback = [this, requestState](const RemoteCommandResponse& response, - bool isMoreToComeSet) { - // Stash this response on the command state to be used to fulfill the promise. - prevResponse = response; + auto clientCallback = [this, requestState](const RemoteCommandResponse& response) { auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response); doMetadataHook(onAnyResponse); // If the command failed, we will call 'onReply' as a part of the future chain paired with // the promise. This is to be sure that all error paths will run 'onReply' only once upon // future completion. - if (!getStatusFromCommandResult(response.data).isOK()) { + if (!response.status.isOK() || !getStatusFromCommandResult(response.data).isOK()) { // The moreToCome bit should *not* be set if the command failed - invariant(!isMoreToComeSet); + invariant(!response.moreToCome); + return; + } + + stdx::lock_guard lk(_onReplyMutex); + onReplyFn(onAnyResponse); + }; + + auto& reactor = requestState->interface()->_reactor; + handleExhaustResponseFn = + [ this, requestState, reactor, clientCallback = std::move(clientCallback) ]( + StatusWith<RemoteCommandResponse> swResponse) mutable noexcept { + RemoteCommandResponse response; + if (!swResponse.isOK()) { + response = RemoteCommandResponse(std::move(swResponse.getStatus())); + } else { + response = std::move(swResponse.getValue()); + } + + clientCallback(response); + + if (!response.moreToCome) { + finalResponsePromise.emplaceValue(response); + return; + } + + if (requestState->interface()->inShutdown()) { return; } // Reset the stopwatch to measure the correct duration for the folowing reply stopwatch.restart(); + if (deadline != RemoteCommandRequest::kNoExpirationDate) { + deadline = stopwatch.start() + requestOnAny.timeout; + } setTimer(); - - stdx::lock_guard lk(_onReplyMutex); - onReplyFn(onAnyResponse, isMoreToComeSet); + requestState->client()->awaitExhaustCommand(baton).thenRunOn(reactor).getAsync( + handleExhaustResponseFn); }; - return makeReadyFutureWith( - [this, requestState, clientCallback = std::move(clientCallback)]() mutable { - setTimer(); - return requestState->client()->runExhaustCommandRequest( - *requestState->request, std::move(clientCallback), baton); - }) - .then([this, requestState] { return prevResponse; }); + setTimer(); + requestState->client() + ->beginExhaustCommandRequest(*requestState->request, baton) + .thenRunOn(reactor) + .getAsync(handleExhaustResponseFn); + + auto [promise, future] = makePromiseFuture<RemoteCommandResponse>(); + finalResponsePromise = std::move(promise); + return std::move(future).then([this](const auto& finalResponse) { return finalResponse; }); } void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise( diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index 34bec18a411..c35d9d30023 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -199,10 +199,10 @@ private: void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override; Promise<void> promise; - RemoteCommandResponse prevResponse; - Mutex _onReplyMutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex"); + Promise<RemoteCommandResponse> finalResponsePromise; + Mutex _onReplyMutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_onReplyMutex"); RemoteCommandOnReplyFn onReplyFn; + std::function<void(StatusWith<RemoteCommandResponse>)> handleExhaustResponseFn; }; enum class ConnStatus { Unset, OK, Failed }; diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp index 8baafdb3d67..0b88b3d340d 100644 --- a/src/mongo/executor/remote_command_response.cpp +++ b/src/mongo/executor/remote_command_response.cpp @@ -55,8 +55,10 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(Status s, Milliseconds mill invariant(!isOK()); }; -RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis) - : data(std::move(dataObj)), elapsedMillis(millis) { +RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, + Milliseconds millis, + bool moreToCome) + : data(std::move(dataObj)), elapsedMillis(millis), moreToCome(moreToCome) { // The buffer backing the default empty BSONObj has static duration so it is effectively // owned. invariant(data.isOwned() || data.objdata() == BSONObj().objdata()); @@ -65,8 +67,9 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Millisecon // TODO(amidvidy): we currently discard output docs when we use this constructor. We should // have RCR hold those too, but we need more machinery before that is possible. RemoteCommandResponseBase::RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, - Milliseconds millis) - : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis)) {} + Milliseconds millis, + bool moreToCome) + : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis), moreToCome) {} bool RemoteCommandResponseBase::isOK() const { return status.isOK(); diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h index 2cd90b37974..7842ebdfe14 100644 --- a/src/mongo/executor/remote_command_response.h +++ b/src/mongo/executor/remote_command_response.h @@ -63,15 +63,18 @@ struct RemoteCommandResponseBase { RemoteCommandResponseBase(Status s, Milliseconds millis); - RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis); + RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis, bool moreToCome = false); - RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, Milliseconds millis); + RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, + Milliseconds millis, + bool moreToCome = false); bool isOK() const; BSONObj data; // Always owned. May point into message. boost::optional<Milliseconds> elapsedMillis; Status status = Status::OK(); + bool moreToCome = false; // Whether or not the moreToCome bit is set on an exhaust message. protected: ~RemoteCommandResponseBase() = default; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 10a57699045..7b76b6853b8 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -690,8 +690,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust auto commandStatus = _net->startExhaustCommand( swCbHandle.getValue(), scheduledRequest, - [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response, - bool isMoreToComeSet) { + [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response) { using std::swap; LOGV2_DEBUG( @@ -706,16 +705,26 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust return; } + if (cbState->canceled.load()) { + _networkInProgressQueue.erase(cbState->iter); + return; + } + // Swap the callback function with the new one CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) { remoteCommandFinished(cbData, cb, scheduledRequest, response); }; swap(cbState->callback, newCb); - // If this is the last response, invoke the non-exhaust path. This will mark cbState as // finished and remove the task from _networkInProgressQueue - if (!isMoreToComeSet) { - scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); + if (!response.moreToCome) { + _networkInProgressQueue.erase(cbState->iter); + + WorkQueue result; + result.emplace_front(cbState); + result.front()->iter = result.begin(); + + scheduleIntoPool_inlock(&result, std::move(lk)); return; } @@ -773,26 +782,23 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) { CallbackHandle cbHandle; setCallbackForHandle(&cbHandle, cbState); + auto canceled = cbState->canceled.load(); CallbackArgs args(this, std::move(cbHandle), - cbState->canceled.load() - ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) - : Status::OK()); - invariant(!cbState->isFinished.load()); - { - // After running callback function, clear 'cbStateArg->callback' to release any resources - // that might be held by this function object. - // Swap 'cbStateArg->callback' with temporary copy before running callback for exception - // safety. - TaskExecutor::CallbackFn callback; - std::swap(cbState->callback, callback); - callback(std::move(args)); + canceled ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) + : Status::OK()); + if (!cbState->isFinished.load()) { + cbState->callback(std::move(args)); } // Do not mark cbState as finished. It will be marked as finished on the last reply. stdx::lock_guard<Latch> lk(_mutex); - invariant(cbState->exhaustIter); - _poolInProgressQueue.erase(cbState->exhaustIter.get()); + + if (cbState->exhaustIter) { + _poolInProgressQueue.erase(cbState->exhaustIter.get()); + cbState->exhaustIter = boost::none; + } + if (_inShutdown_inlock() && _poolInProgressQueue.empty()) { _stateChange.notify_all(); } diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp index d5eeeac0504..f6a8ccf4f75 100644 --- a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp @@ -163,11 +163,6 @@ TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) { ASSERT(cbHandle.isValid()); executor()->cancel(cbHandle); ASSERT(cbHandle.isCanceled()); - auto counters = exhaustRequestHandler.getCountersWhenReady(); - - // The command was cancelled so the 'fail' counter should be incremented - ASSERT_EQ(counters._success, 2); - ASSERT_EQ(counters._failed, 1); // The tasks should be removed after 'isMaster' fails ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5))); diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp index ec7981b615a..9cf669ec978 100644 --- a/src/mongo/transport/transport_layer_asio_integration_test.cpp +++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp @@ -152,42 +152,6 @@ TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) { ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout); } -class ExhaustRequestHandlerUtil { -public: - AsyncDBClient::RemoteCommandCallbackFn&& getExhaustRequestCallbackFn() { - return std::move(_callbackFn); - } - - executor::RemoteCommandResponse getReplyObjectWhenReady() { - stdx::unique_lock<Latch> lk(_mutex); - _cv.wait(_mutex, [&] { return _replyUpdated; }); - _replyUpdated = false; - return _reply; - } - -private: - // holds the server's response once it sent one - executor::RemoteCommandResponse _reply; - // set to true once 'reply' has been set. Used to indicate that a new response has been set and - // should be inspected. - bool _replyUpdated = false; - - Mutex _mutex = MONGO_MAKE_LATCH(); - stdx::condition_variable _cv; - - // called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated. - AsyncDBClient::RemoteCommandCallbackFn _callbackFn = - [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) { - { - stdx::unique_lock<Latch> lk(_mutex); - _reply = response; - _replyUpdated = true; - } - - _cv.notify_all(); - }; -}; - TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { auto connectionString = unittest::getFixtureConnectionString(); auto server = connectionString.getServers().front(); @@ -217,29 +181,27 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { BSONObj(), nullptr}; - ExhaustRequestHandlerUtil exhaustRequestHandler; - Future<void> exhaustFuture = handle->runExhaustCommandRequest( - isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn()); + Future<executor::RemoteCommandResponse> beginExhaustFuture = + handle->beginExhaustCommandRequest(isMasterRequest); Date_t prevTime; TopologyVersion topologyVersion; { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = beginExhaustFuture.get(); - ASSERT(!exhaustFuture.isReady()); ASSERT_OK(reply.status); + ASSERT(reply.moreToCome); prevTime = reply.data.getField("localTime").Date(); topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), reply.data.getField("topologyVersion").Obj()); } + Future<executor::RemoteCommandResponse> awaitExhaustFuture = handle->awaitExhaustCommand(); { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = awaitExhaustFuture.get(); - // The moreToCome bit is still set - ASSERT(!exhaustFuture.isReady()); ASSERT_OK(reply.status); - + ASSERT(reply.moreToCome); auto replyTime = reply.data.getField("localTime").Date(); ASSERT_GT(replyTime, prevTime); @@ -249,12 +211,22 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { ASSERT_EQ(replyTopologyVersion.getCounter(), topologyVersion.getCounter()); } - handle->cancel(); - handle->end(); - auto error = exhaustFuture.getNoThrow(); - // exhaustFuture will resolve with CallbackCanceled unless the socket is already closed, in - // which case it will resolve with HostUnreachable. - ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable)); + Future<executor::RemoteCommandResponse> cancelExhaustFuture = handle->awaitExhaustCommand(); + { + handle->cancel(); + handle->end(); + auto swReply = cancelExhaustFuture.getNoThrow(); + + // The original isMaster request has maxAwaitTimeMs = 1000 ms, if the cancel executes before + // the 1000ms then we expect the future to resolve with an error. It should resolve with + // CallbackCanceled unless the socket is already closed, in which case it will resolve with + // HostUnreachable. If the network is slow, the server may response before the cancel + // executes however. + if (!swReply.getStatus().isOK()) { + ASSERT((swReply.getStatus() == ErrorCodes::CallbackCanceled) || + (swReply.getStatus() == ErrorCodes::HostUnreachable)); + } + } } TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { @@ -323,16 +295,14 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { BSONObj(), nullptr}; - ExhaustRequestHandlerUtil exhaustRequestHandler; - Future<void> exhaustFuture = isMasterHandle->runExhaustCommandRequest( - isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn()); - + Future<executor::RemoteCommandResponse> beginExhaustFuture = + isMasterHandle->beginExhaustCommandRequest(isMasterRequest); { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = beginExhaustFuture.get(); - exhaustFuture.get(); ASSERT_OK(reply.status); ASSERT_EQ(reply.data["ok"].Double(), 0.0); + ASSERT(!reply.moreToCome); } } |