diff options
Diffstat (limited to 'src/mongo')
19 files changed, 386 insertions, 279 deletions
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index e1124282dd7..0d1403edc5a 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -353,6 +353,7 @@ error_codes: - {code: 307,name: RangeDeletionAbandonedBecauseTaskDocumentDoesNotExist} - {code: 308,name: CurrentConfigNotCommittedYet} + - {code: 309,name: ExhaustCommandFinished} - {code: 310,name: PeriodicJobIsStopped,categories: [CancelationError]} # Error codes 4000-8999 are reserved. diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index 99d93dab68a..f25830de073 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -316,55 +316,41 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -Future<void> AsyncDBClient::_continueReceiveExhaustResponse( - ExhaustRequestParameters&& exhaustRequestParameters, - boost::optional<int32_t> msgId, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse( + ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) { return _waitForResponse(msgId, baton) - .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this]( - Message responseMsg) mutable -> Future<void> { - // Run callback - auto now = exhaustParameters.clkSource->now(); - auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start); + .then([stopwatch, msgId, baton, this](Message responseMsg) mutable { bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome); rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg)); - exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration), - isMoreToComeSet); - - if (!isMoreToComeSet) { - return Status::OK(); - } - - exhaustParameters.start = now; - return _continueReceiveExhaustResponse( - std::move(exhaustParameters), boost::none, baton); + auto rcResponse = executor::RemoteCommandResponse( + *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet); + return rcResponse; }); } -Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand( + const BatonHandle& baton) { + return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton); +} + +Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request, + const BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported); - auto clkSource = _svcCtx->getPreciseClockSource(); - auto start = clkSource->now(); auto msgId = nextMessageId(); - return _call(std::move(requestMsg), msgId, baton) - .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable { - ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start}; - return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton); - }); + return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable { + return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton); + }); } -Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton) { +Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest( + executor::RemoteCommandRequest request, const BatonHandle& baton) { auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton); + return runExhaustCommand(std::move(opMsgRequest), baton); } void AsyncDBClient::cancel(const BatonHandle& baton) { diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index c7917bb5461..d2279374dda 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -47,19 +47,6 @@ namespace mongo { class AsyncDBClient : public std::enable_shared_from_this<AsyncDBClient> { public: - using RemoteCommandCallbackFn = - unique_function<void(const executor::RemoteCommandResponse&, bool isMoreToComeSet)>; - - struct ExhaustRequestParameters { - ExhaustRequestParameters(ExhaustRequestParameters&&) = default; - ExhaustRequestParameters(const ExhaustRequestParameters&) = delete; - ExhaustRequestParameters& operator=(const ExhaustRequestParameters&) = delete; - - RemoteCommandCallbackFn cb; - ClockSource* clkSource; - Date_t start; - }; - explicit AsyncDBClient(const HostAndPort& peer, transport::SessionHandle session, ServiceContext* svcCtx) @@ -79,12 +66,11 @@ public: const BatonHandle& baton = nullptr, bool fireAndForget = false); - Future<void> runExhaustCommandRequest(executor::RemoteCommandRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton = nullptr); - Future<void> runExhaustCommand(OpMsgRequest request, - RemoteCommandCallbackFn&& cb, - const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> beginExhaustCommandRequest( + executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> runExhaustCommand(OpMsgRequest request, + const BatonHandle& baton = nullptr); + Future<executor::RemoteCommandResponse> awaitExhaustCommand(const BatonHandle& baton = nullptr); Future<void> authenticate(const BSONObj& params); @@ -108,8 +94,8 @@ public: const HostAndPort& local() const; private: - Future<void> _continueReceiveExhaustResponse( - ExhaustRequestParameters&& exhaustRequestParameters, + Future<executor::RemoteCommandResponse> _continueReceiveExhaustResponse( + ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton = nullptr); Future<Message> _waitForResponse(boost::optional<int32_t> msgId, diff --git a/src/mongo/client/sdam/topology_listener.cpp b/src/mongo/client/sdam/topology_listener.cpp index 48e1f2b147b..3d4fe2b92cb 100644 --- a/src/mongo/client/sdam/topology_listener.cpp +++ b/src/mongo/client/sdam/topology_listener.cpp @@ -92,14 +92,12 @@ void TopologyEventsPublisher::onServerHandshakeFailedEvent(const sdam::ServerAdd _scheduleNextDelivery(); } -void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort, +void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply) { { stdx::lock_guard lock(_eventQueueMutex); EventPtr event = std::make_unique<Event>(); event->type = EventType::HEARTBEAT_SUCCESS; - event->duration = duration_cast<IsMasterRTT>(durationMs); event->hostAndPort = hostAndPort; event->reply = reply; _eventQueue.push_back(std::move(event)); @@ -107,15 +105,13 @@ void TopologyEventsPublisher::onServerHeartbeatSucceededEvent(IsMasterRTT durati _scheduleNextDelivery(); } -void TopologyEventsPublisher::onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, +void TopologyEventsPublisher::onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) { { stdx::lock_guard lock(_eventQueueMutex); EventPtr event = std::make_unique<Event>(); event->type = EventType::HEARTBEAT_FAILURE; - event->duration = duration_cast<IsMasterRTT>(durationMs); event->hostAndPort = hostAndPort; event->reply = reply; event->status = errorStatus; @@ -189,14 +185,10 @@ void TopologyEventsPublisher::_nextDelivery() { void TopologyEventsPublisher::_sendEvent(TopologyListenerPtr listener, const Event& event) { switch (event.type) { case EventType::HEARTBEAT_SUCCESS: - listener->onServerHeartbeatSucceededEvent( - duration_cast<IsMasterRTT>(event.duration), event.hostAndPort, event.reply); + listener->onServerHeartbeatSucceededEvent(event.hostAndPort, event.reply); break; case EventType::HEARTBEAT_FAILURE: - listener->onServerHeartbeatFailureEvent(duration_cast<IsMasterRTT>(event.duration), - event.status, - event.hostAndPort, - event.reply); + listener->onServerHeartbeatFailureEvent(event.status, event.hostAndPort, event.reply); break; case EventType::TOPOLOGY_DESCRIPTION_CHANGED: // TODO SERVER-46497: fix uuid or just remove diff --git a/src/mongo/client/sdam/topology_listener.h b/src/mongo/client/sdam/topology_listener.h index 5cc737706d0..ff550309068 100644 --- a/src/mongo/client/sdam/topology_listener.h +++ b/src/mongo/client/sdam/topology_listener.h @@ -52,8 +52,7 @@ public: TopologyDescriptionPtr previousDescription, TopologyDescriptionPtr newDescription){}; - virtual void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, + virtual void onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply){}; /** @@ -73,8 +72,7 @@ public: * hostAndPort succeeded. durationMS is the execution time of the event, including the time it * took to send the message and recieve the reply from the server. */ - virtual void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort, + virtual void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply){}; /* @@ -115,13 +113,11 @@ public: void onServerHandshakeFailedEvent(const sdam::ServerAddress& address, const Status& status, - const BSONObj reply) override; + const BSONObj reply); - void onServerHeartbeatSucceededEvent(IsMasterRTT durationMs, - const ServerAddress& hostAndPort, + void onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply) override; - void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, + void onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) override; void onServerPingFailedEvent(const ServerAddress& hostAndPort, const Status& status) override; diff --git a/src/mongo/client/server_is_master_monitor.cpp b/src/mongo/client/server_is_master_monitor.cpp index b4dc993a6d1..bdb81499b30 100644 --- a/src/mongo/client/server_is_master_monitor.cpp +++ b/src/mongo/client/server_is_master_monitor.cpp @@ -41,6 +41,8 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(overrideMaxAwaitTimeMS); + using executor::NetworkInterface; using executor::NetworkInterfaceThreadPool; using executor::TaskExecutor; @@ -52,10 +54,12 @@ const Milliseconds kZeroMs = Milliseconds{0}; SingleServerIsMasterMonitor::SingleServerIsMasterMonitor( const MongoURI& setUri, const sdam::ServerAddress& host, + boost::optional<TopologyVersion> topologyVersion, Milliseconds heartbeatFrequencyMS, sdam::TopologyEventsPublisherPtr eventListener, std::shared_ptr<executor::TaskExecutor> executor) : _host(host), + _topologyVersion(topologyVersion), _eventListener(eventListener), _executor(executor), _heartbeatFrequencyMS(_overrideRefreshPeriod(heartbeatFrequencyMS)), @@ -82,7 +86,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() { // The previous refresh period may or may not have been expedited. // Saving the value here before we change to expedited mode. - const auto previousRefreshPeriod = _currentRefreshPeriod(lock); + const auto previousRefreshPeriod = _currentRefreshPeriod(lock, false); if (!_isExpedited) { // save some log lines. @@ -97,7 +101,7 @@ void SingleServerIsMasterMonitor::requestImmediateCheck() { } // Get the new expedited refresh period. - const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock); + const auto expeditedRefreshPeriod = _currentRefreshPeriod(lock, false); if (_isMasterOutstanding) { LOGV2_DEBUG(4333216, @@ -163,7 +167,6 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d invariant(!_isMasterOutstanding); - Timer timer; auto swCbHandle = _executor->scheduleWorkAt( _executor->now() + delay, [self = shared_from_this()](const executor::TaskExecutor::CallbackArgs& cbData) { @@ -174,8 +177,7 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d }); if (!swCbHandle.isOK()) { - Microseconds latency(timer.micros()); - _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); + _onIsMasterFailure(swCbHandle.getStatus(), BSONObj()); return; } @@ -183,24 +185,107 @@ void SingleServerIsMasterMonitor::_scheduleNextIsMaster(WithLock, Milliseconds d } void SingleServerIsMasterMonitor::_doRemoteCommand() { + stdx::lock_guard lock(_mutex); + if (_isShutdown) + return; + + StatusWith<executor::TaskExecutor::CallbackHandle> swCbHandle = [&]() { + if (_topologyVersion) { + return _scheduleStreamableIsMaster(); + } + + return _scheduleSingleIsMaster(); + }(); + + if (!swCbHandle.isOK()) { + _onIsMasterFailure(swCbHandle.getStatus(), BSONObj()); + uasserted(46156012, swCbHandle.getStatus().toString()); + } + + _isMasterOutstanding = true; + _remoteCommandHandle = swCbHandle.getValue(); +} + +StatusWith<TaskExecutor::CallbackHandle> +SingleServerIsMasterMonitor::_scheduleStreamableIsMaster() { + auto maxAwaitTimeMS = durationCount<Milliseconds>(kMaxAwaitTimeMs); + overrideMaxAwaitTimeMS.execute( + [&](const BSONObj&) { maxAwaitTimeMS = durationCount<Milliseconds>(Milliseconds(1000)); }); + BSONObjBuilder bob; bob.append("isMaster", 1); + bob.append("maxAwaitTimeMS", maxAwaitTimeMS); + bob.append("topologyVersion", _topologyVersion->toBSON()); + if (WireSpec::instance().isInternalClient) { WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob); } + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS + kMaxAwaitTimeMs; auto request = executor::RemoteCommandRequest(HostAndPort(_host), "admin", bob.obj(), nullptr, _timeoutMS); request.sslMode = _setUri.getSSLMode(); - stdx::lock_guard lock(_mutex); - if (_isShutdown) - return; + auto swCbHandle = _executor->scheduleExhaustRemoteCommand( + std::move(request), + [self = shared_from_this()]( + const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { + Milliseconds nextRefreshPeriod; + { + stdx::lock_guard lk(self->_mutex); + + if (self->_isShutdown) { + self->_isMasterOutstanding = false; + LOGV2_DEBUG(4495400, + kLogLevel, + "RSM {setName} not processing response: {status}", + "status"_attr = result.response.status, + "setName"_attr = self->_setUri.getSetName()); + return; + } + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + self->_lastIsMasterAt = self->_executor->now(); + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } + } + + if (result.response.isOK()) { + self->_onIsMasterSuccess(result.response.data); + } else { + self->_onIsMasterFailure(result.response.status, result.response.data); + } + }); + + return swCbHandle; +} + +StatusWith<TaskExecutor::CallbackHandle> SingleServerIsMasterMonitor::_scheduleSingleIsMaster() { + _timeoutMS = SdamConfiguration::kDefaultConnectTimeoutMS; + + BSONObjBuilder bob; + bob.append("isMaster", 1); + if (WireSpec::instance().isInternalClient) { + WireSpec::appendInternalClientWireVersion(WireSpec::instance().outgoing, &bob); + } + + auto request = + executor::RemoteCommandRequest(HostAndPort(_host), "admin", bob.obj(), nullptr, _timeoutMS); + request.sslMode = _setUri.getSSLMode(); - Timer timer; auto swCbHandle = _executor->scheduleRemoteCommand( std::move(request), - [this, self = shared_from_this(), timer]( + [self = shared_from_this()]( const executor::TaskExecutor::RemoteCommandCallbackArgs& result) mutable { Milliseconds nextRefreshPeriod; { @@ -212,43 +297,42 @@ void SingleServerIsMasterMonitor::_doRemoteCommand() { kLogLevel, "RSM {setName} not processing response: {status}", "status"_attr = result.response.status, - "setName"_attr = _setUri.getSetName()); + "setName"_attr = self->_setUri.getSetName()); return; } self->_lastIsMasterAt = self->_executor->now(); - nextRefreshPeriod = self->_currentRefreshPeriod(lk); - - LOGV2_DEBUG(4333228, - kLogLevel + 1, - "RSM {setName} next refresh period in {period}", - "period"_attr = nextRefreshPeriod.toString(), - "setName"_attr = _setUri.getSetName()); - self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + + auto responseTopologyVersion = result.response.data.getField("topologyVersion"); + if (responseTopologyVersion) { + self->_topologyVersion = TopologyVersion::parse( + IDLParserErrorContext("TopologyVersion"), responseTopologyVersion.Obj()); + } else { + self->_topologyVersion = boost::none; + } + + if (!result.response.isOK() || !result.response.moreToCome) { + self->_isMasterOutstanding = false; + nextRefreshPeriod = self->_currentRefreshPeriod(lk, result.response.isOK()); + self->_scheduleNextIsMaster(lk, nextRefreshPeriod); + } } - Microseconds latency(timer.micros()); if (result.response.isOK()) { - self->_onIsMasterSuccess(latency, result.response.data); + self->_onIsMasterSuccess(result.response.data); } else { - self->_onIsMasterFailure(latency, result.response.status, result.response.data); + self->_onIsMasterFailure(result.response.status, result.response.data); } }); - if (!swCbHandle.isOK()) { - Microseconds latency(timer.micros()); - _onIsMasterFailure(latency, swCbHandle.getStatus(), BSONObj()); - uasserted(31448, swCbHandle.getStatus().toString()); - } - - _isMasterOutstanding = true; - _remoteCommandHandle = swCbHandle.getValue(); + return swCbHandle; } void SingleServerIsMasterMonitor::shutdown() { stdx::lock_guard lock(_mutex); - if (std::exchange(_isShutdown, true)) + if (std::exchange(_isShutdown, true)) { return; + } LOGV2_DEBUG(4333220, kLogLevel + 1, @@ -279,35 +363,27 @@ void SingleServerIsMasterMonitor::_cancelOutstandingRequest(WithLock) { _isMasterOutstanding = false; } -void SingleServerIsMasterMonitor::_onIsMasterSuccess(sdam::IsMasterRTT latency, - const BSONObj bson) { +void SingleServerIsMasterMonitor::_onIsMasterSuccess(const BSONObj bson) { LOGV2_DEBUG(4333221, kLogLevel + 1, - "RSM {setName} received successful isMaster for server {host} ({latency}): {bson}", + "RSM {setName} received successful isMaster for server {host}: {bson}", "host"_attr = _host, - "latency"_attr = latency, "setName"_attr = _setUri.getSetName(), "bson"_attr = bson.toString()); - _eventListener->onServerHeartbeatSucceededEvent( - duration_cast<Milliseconds>(latency), _host, bson); + _eventListener->onServerHeartbeatSucceededEvent(_host, bson); } -void SingleServerIsMasterMonitor::_onIsMasterFailure(sdam::IsMasterRTT latency, - const Status& status, - const BSONObj bson) { - LOGV2_DEBUG( - 4333222, - kLogLevel, - "RSM {setName} received failed isMaster for server {host}: {status} ({latency}): {bson}", - "host"_attr = _host, - "status"_attr = status.toString(), - "latency"_attr = latency, - "setName"_attr = _setUri.getSetName(), - "bson"_attr = bson.toString()); - - _eventListener->onServerHeartbeatFailureEvent( - duration_cast<Milliseconds>(latency), status, _host, bson); +void SingleServerIsMasterMonitor::_onIsMasterFailure(const Status& status, const BSONObj bson) { + LOGV2_DEBUG(4333222, + kLogLevel, + "RSM {setName} received failed isMaster for server {host}: {status}: {bson}", + "host"_attr = _host, + "status"_attr = status.toString(), + "setName"_attr = _setUri.getSetName(), + "bson"_attr = bson.toString()); + + _eventListener->onServerHeartbeatFailureEvent(status, _host, bson); } Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds original) { @@ -324,7 +400,11 @@ Milliseconds SingleServerIsMasterMonitor::_overrideRefreshPeriod(Milliseconds or return r; } -Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock) { +Milliseconds SingleServerIsMasterMonitor::_currentRefreshPeriod(WithLock, + bool scheduleImmediately) { + if (scheduleImmediately) + return Milliseconds(0); + return (_isExpedited) ? sdam::SdamConfiguration::kMinHeartbeatFrequencyMS : _heartbeatFrequencyMS; } @@ -414,6 +494,7 @@ void ServerIsMasterMonitor::onTopologyDescriptionChangedEvent( _singleMonitors[serverAddress] = std::make_shared<SingleServerIsMasterMonitor>( _setUri, serverAddress, + serverDescription->getTopologyVersion(), _sdamConfiguration.getHeartBeatFrequency(), _eventPublisher, _executor); diff --git a/src/mongo/client/server_is_master_monitor.h b/src/mongo/client/server_is_master_monitor.h index c41bd6c8ce1..e788cc3c95a 100644 --- a/src/mongo/client/server_is_master_monitor.h +++ b/src/mongo/client/server_is_master_monitor.h @@ -40,6 +40,7 @@ class SingleServerIsMasterMonitor public: explicit SingleServerIsMasterMonitor(const MongoURI& setUri, const ServerAddress& host, + boost::optional<TopologyVersion> topologyVersion, Milliseconds heartbeatFrequencyMS, TopologyEventsPublisherPtr eventListener, std::shared_ptr<executor::TaskExecutor> executor); @@ -65,16 +66,28 @@ public: const Milliseconds& expeditedRefreshPeriod, const Milliseconds& previousRefreshPeriod); + // Sent in the initial isMaster request when using the streamable exhaust protocol. The max + // duration a server should wait for a significant topology change before sending a response. + static constexpr Milliseconds kMaxAwaitTimeMs = Milliseconds(10000); + private: void _scheduleNextIsMaster(WithLock, Milliseconds delay); void _rescheduleNextIsMaster(WithLock, Milliseconds delay); void _doRemoteCommand(); - void _onIsMasterSuccess(IsMasterRTT latency, const BSONObj bson); - void _onIsMasterFailure(IsMasterRTT latency, const Status& status, const BSONObj bson); + // Use the awaitable isMaster protocol with the exhaust bit set. Attach _topologyVersion and + // kMaxAwaitTimeMS to the request. + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleStreamableIsMaster(); + + // Use the old isMaster protocol. Do not attach _topologyVersion or kMaxAwaitTimeMS to the + // request. + StatusWith<executor::TaskExecutor::CallbackHandle> _scheduleSingleIsMaster(); + + void _onIsMasterSuccess(const BSONObj bson); + void _onIsMasterFailure(const Status& status, const BSONObj bson); Milliseconds _overrideRefreshPeriod(Milliseconds original); - Milliseconds _currentRefreshPeriod(WithLock); + Milliseconds _currentRefreshPeriod(WithLock, bool scheduleImmediately); void _cancelOutstandingRequest(WithLock); boost::optional<Milliseconds> _timeSinceLastCheck() const; @@ -84,6 +97,7 @@ private: Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SingleServerIsMasterMonitor::mutex"); ServerAddress _host; + boost::optional<TopologyVersion> _topologyVersion; TopologyEventsPublisherPtr _eventListener; std::shared_ptr<executor::TaskExecutor> _executor; Milliseconds _heartbeatFrequencyMS; diff --git a/src/mongo/client/streamable_replica_set_monitor.cpp b/src/mongo/client/streamable_replica_set_monitor.cpp index 6ae640dd9af..88a200a33e8 100644 --- a/src/mongo/client/streamable_replica_set_monitor.cpp +++ b/src/mongo/client/streamable_replica_set_monitor.cpp @@ -199,17 +199,20 @@ void StreamableReplicaSetMonitor::init() { } void StreamableReplicaSetMonitor::drop() { - stdx::lock_guard lock(_mutex); - if (_isDropped.swap(true)) { - return; + { + stdx::lock_guard lock(_mutex); + if (_isDropped.swap(true)) + return; + + _eventsPublisher->close(); + _failOutstandingWithStatus( + lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"}); } + LOGV2(4333209, "Closing Replica Set Monitor {setName}", "setName"_attr = getName()); - _eventsPublisher->close(); _queryProcessor->shutdown(); _pingMonitor->shutdown(); _isMasterMonitor->shutdown(); - _failOutstandingWithStatus( - lock, Status{ErrorCodes::ShutdownInProgress, "the ReplicaSetMonitor is shutting down"}); ReplicaSetMonitorManager::get()->getNotifier().onDroppedSet(getName()); LOGV2(4333210, "Done closing Replica Set Monitor {setName}", "setName"_attr = getName()); @@ -613,16 +616,14 @@ void StreamableReplicaSetMonitor::onTopologyDescriptionChangedEvent( } } -void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, - const ServerAddress& hostAndPort, +void StreamableReplicaSetMonitor::onServerHeartbeatSucceededEvent(const ServerAddress& hostAndPort, const BSONObj reply) { // After the inital handshake, isMasterResponses should not update the RTT with durationMs. IsMasterOutcome outcome(hostAndPort, reply, boost::none); _topologyManager->onServerDescription(outcome); } -void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, +void StreamableReplicaSetMonitor::onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) { _failedHost( diff --git a/src/mongo/client/streamable_replica_set_monitor.h b/src/mongo/client/streamable_replica_set_monitor.h index 650d7abe237..07873fe199e 100644 --- a/src/mongo/client/streamable_replica_set_monitor.h +++ b/src/mongo/client/streamable_replica_set_monitor.h @@ -166,16 +166,14 @@ private: sdam::TopologyDescriptionPtr previousDescription, sdam::TopologyDescriptionPtr newDescription) override; - void onServerHeartbeatSucceededEvent(sdam::IsMasterRTT durationMs, - const sdam::ServerAddress& hostAndPort, + void onServerHeartbeatSucceededEvent(const sdam::ServerAddress& hostAndPort, const BSONObj reply) override; void onServerHandshakeFailedEvent(const sdam::ServerAddress& address, const Status& status, const BSONObj reply) override; - void onServerHeartbeatFailureEvent(IsMasterRTT durationMs, - Status errorStatus, + void onServerHeartbeatFailureEvent(Status errorStatus, const ServerAddress& hostAndPort, const BSONObj reply) override; diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index 6f9b40e8103..6e6f9244d93 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -58,8 +58,7 @@ public: using Response = RemoteCommandResponse; using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>; - using RemoteCommandOnReplyFn = - unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>; + using RemoteCommandOnReplyFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>; virtual ~NetworkInterface(); diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index 66c67b9079c..ae22469902a 100644 --- a/src/mongo/executor/network_interface_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -133,16 +133,16 @@ Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand( cbHandle, rcroa, [p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)]( - const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable { + const TaskExecutor::ResponseOnAnyStatus& rs) mutable { exhaustUtilCB(rs); if (!rs.status.isOK()) { - invariant(!isMoreToComeSet); + invariant(!rs.moreToCome); p.setError(rs.status); return; } - if (!isMoreToComeSet) { + if (!rs.moreToCome) { p.emplaceValue(); } }, diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index 91289e3e61c..2f16d4a2b62 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -878,12 +878,13 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface state->promise = std::move(promise); std::move(future) .onError([state](Status error) { - stdx::lock_guard lk(state->_onReplyMutex); state->onReplyFn(RemoteCommandOnAnyResponse( - boost::none, std::move(error), state->stopwatch.elapsed()), - false); + boost::none, std::move(error), state->stopwatch.elapsed())); }) - .getAsync([state](Status status) { state->tryFinish(status); }); + .getAsync([state](Status status) { + state->tryFinish( + Status{ErrorCodes::ExhaustCommandFinished, "Exhaust command finished"}); + }); { stdx::lock_guard lk(interface->_inProgressMutex); @@ -901,38 +902,17 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque auto requestState = requestManager->getRequest(reqId); invariant(requestState); - auto clientCallback = [this, requestState](const RemoteCommandResponse& response, - bool isMoreToComeSet) { - // Stash this response on the command state to be used to fulfill the promise. - prevResponse = response; - auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response); - doMetadataHook(onAnyResponse); - - // If the command failed, we will call 'onReply' as a part of the future chain paired with - // the promise. This is to be sure that all error paths will run 'onReply' only once upon - // future completion. - if (!getStatusFromCommandResult(response.data).isOK()) { - // The moreToCome bit should *not* be set if the command failed - invariant(!isMoreToComeSet); - return; - } + setTimer(); + requestState->getClient(requestState->conn) + ->beginExhaustCommandRequest(*requestState->request, baton) + .thenRunOn(requestState->interface()->_reactor) + .getAsync([this, requestState](StatusWith<RemoteCommandResponse> swResponse) mutable { + continueExhaustRequest(std::move(requestState), swResponse); + }); - // Reset the stopwatch to measure the correct duration for the folowing reply - stopwatch.restart(); - setTimer(); - - stdx::lock_guard lk(_onReplyMutex); - onReplyFn(onAnyResponse, isMoreToComeSet); - }; - - return makeReadyFutureWith( - [this, requestState, clientCallback = std::move(clientCallback)]() mutable { - setTimer(); - return RequestState::getClient(requestState->conn) - ->runExhaustCommandRequest( - *requestState->request, std::move(clientCallback), baton); - }) - .then([this, requestState] { return prevResponse; }); + auto [promise, future] = makePromiseFuture<RemoteCommandResponse>(); + finalResponsePromise = std::move(promise); + return std::move(future).then([this](const auto& finalResponse) { return finalResponse; }); } void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise( @@ -949,6 +929,52 @@ void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise( promise.emplaceValue(); } +void NetworkInterfaceTL::ExhaustCommandState::continueExhaustRequest( + std::shared_ptr<RequestState> requestState, StatusWith<RemoteCommandResponse> swResponse) { + RemoteCommandResponse response; + if (!swResponse.isOK()) { + response = RemoteCommandResponse(std::move(swResponse.getStatus())); + } else { + response = std::move(swResponse.getValue()); + } + + if (requestState->interface()->inShutdown() || + ErrorCodes::isCancelationError(response.status)) { + finalResponsePromise.emplaceValue(response); + return; + } + + auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response); + doMetadataHook(onAnyResponse); + + // If the command failed, we will call 'onReply' as a part of the future chain paired with + // the promise. This is to be sure that all error paths will run 'onReply' only once upon + // future completion. + if (!response.status.isOK() || !getStatusFromCommandResult(response.data).isOK()) { + // The moreToCome bit should *not* be set if the command failed + invariant(!response.moreToCome); + + finalResponsePromise.emplaceValue(response); + return; + } + + onReplyFn(onAnyResponse); + + // Reset the stopwatch to measure the correct duration for the folowing reply + stopwatch.restart(); + if (deadline != RemoteCommandRequest::kNoExpirationDate) { + deadline = stopwatch.start() + requestOnAny.timeout; + } + setTimer(); + + requestState->getClient(requestState->conn) + ->awaitExhaustCommand(baton) + .thenRunOn(requestState->interface()->_reactor) + .getAsync([this, requestState](StatusWith<RemoteCommandResponse> swResponse) mutable { + continueExhaustRequest(std::move(requestState), swResponse); + }); +} + Status NetworkInterfaceTL::startExhaustCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequestOnAny& request, RemoteCommandOnReplyFn&& onReply, diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index c989ee56003..f49ac1ecad5 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -199,10 +199,11 @@ private: void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override; + void continueExhaustRequest(std::shared_ptr<RequestState> requestState, + StatusWith<RemoteCommandResponse> swResponse); + Promise<void> promise; - RemoteCommandResponse prevResponse; - Mutex _onReplyMutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex"); + Promise<RemoteCommandResponse> finalResponsePromise; RemoteCommandOnReplyFn onReplyFn; }; diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp index 8baafdb3d67..0b88b3d340d 100644 --- a/src/mongo/executor/remote_command_response.cpp +++ b/src/mongo/executor/remote_command_response.cpp @@ -55,8 +55,10 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(Status s, Milliseconds mill invariant(!isOK()); }; -RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis) - : data(std::move(dataObj)), elapsedMillis(millis) { +RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, + Milliseconds millis, + bool moreToCome) + : data(std::move(dataObj)), elapsedMillis(millis), moreToCome(moreToCome) { // The buffer backing the default empty BSONObj has static duration so it is effectively // owned. invariant(data.isOwned() || data.objdata() == BSONObj().objdata()); @@ -65,8 +67,9 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Millisecon // TODO(amidvidy): we currently discard output docs when we use this constructor. We should // have RCR hold those too, but we need more machinery before that is possible. RemoteCommandResponseBase::RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, - Milliseconds millis) - : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis)) {} + Milliseconds millis, + bool moreToCome) + : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis), moreToCome) {} bool RemoteCommandResponseBase::isOK() const { return status.isOK(); diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h index 2cd90b37974..7842ebdfe14 100644 --- a/src/mongo/executor/remote_command_response.h +++ b/src/mongo/executor/remote_command_response.h @@ -63,15 +63,18 @@ struct RemoteCommandResponseBase { RemoteCommandResponseBase(Status s, Milliseconds millis); - RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis); + RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis, bool moreToCome = false); - RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, Milliseconds millis); + RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, + Milliseconds millis, + bool moreToCome = false); bool isOK() const; BSONObj data; // Always owned. May point into message. boost::optional<Milliseconds> elapsedMillis; Status status = Status::OK(); + bool moreToCome = false; // Whether or not the moreToCome bit is set on an exhaust message. protected: ~RemoteCommandResponseBase() = default; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 10a57699045..7b83a05c7b7 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -101,6 +101,9 @@ public: AtomicWord<bool> isFinished{false}; boost::optional<stdx::condition_variable> finishedCondition; BatonHandle baton; + AtomicWord<bool> exhaustErased{ + false}; // Used only in the exhaust path. Used to indicate that a cbState associated with + // an exhaust request has been removed from the '_networkInProgressQueue'. }; class ThreadPoolTaskExecutor::EventState : public TaskExecutor::EventState { @@ -690,8 +693,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust auto commandStatus = _net->startExhaustCommand( swCbHandle.getValue(), scheduledRequest, - [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response, - bool isMoreToComeSet) { + [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response) { using std::swap; LOGV2_DEBUG( @@ -701,8 +703,37 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust "response_isOK_response_response_status_toString"_attr = redact(response.isOK() ? response.toString() : response.status.toString())); + // The cbState remains in the '_networkInProgressQueue' for the entirety of the + // request's lifetime and is added to and removed from the '_poolInProgressQueue' each + // time a response is received and its callback run respectively. It must be erased from + // the '_networkInProgressQueue' when either the request is cancelled or a response is + // received that has moreToCome == false to avoid shutting down with a task still in the + // '_networkInProgressQueue'. It is also possible that we receive both of these + // responses around the same time, so the 'exhaustErased' bool protects against + // attempting to erase the same cbState twice. + stdx::unique_lock<Latch> lk(_mutex); - if (_inShutdown_inlock()) { + if (_inShutdown_inlock() || cbState->exhaustErased.load()) { + if (cbState->exhaustIter) { + _poolInProgressQueue.erase(cbState->exhaustIter.get()); + cbState->exhaustIter = boost::none; + } + return; + } + + if (cbState->canceled.load()) { + // Release any resources the callback function is holding + TaskExecutor::CallbackFn callback = [](const CallbackArgs&) {}; + std::swap(cbState->callback, callback); + + _networkInProgressQueue.erase(cbState->iter); + cbState->exhaustErased.store(1); + + if (cbState->exhaustIter) { + _poolInProgressQueue.erase(cbState->exhaustIter.get()); + cbState->exhaustIter = boost::none; + } + return; } @@ -714,8 +745,15 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust // If this is the last response, invoke the non-exhaust path. This will mark cbState as // finished and remove the task from _networkInProgressQueue - if (!isMoreToComeSet) { - scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); + if (!response.moreToCome) { + _networkInProgressQueue.erase(cbState->iter); + cbState->exhaustErased.store(1); + + WorkQueue result; + result.emplace_front(cbState); + result.front()->iter = result.begin(); + + scheduleIntoPool_inlock(&result, std::move(lk)); return; } @@ -733,12 +771,13 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call stdx::unique_lock<Latch> lk) { _poolInProgressQueue.push_back(cbState); cbState->exhaustIter = --_poolInProgressQueue.end(); + auto expectedExhaustIter = cbState->exhaustIter.get(); lk.unlock(); if (cbState->baton) { - cbState->baton->schedule([this, cbState](Status status) { + cbState->baton->schedule([this, cbState, expectedExhaustIter](Status status) { if (status.isOK()) { - runCallbackExhaust(cbState); + runCallbackExhaust(cbState, expectedExhaustIter); return; } @@ -747,14 +786,14 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call cbState->canceled.store(1); } - _pool->schedule([this, cbState](auto status) { + _pool->schedule([this, cbState, expectedExhaustIter](auto status) { invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); - runCallbackExhaust(cbState); + runCallbackExhaust(cbState, expectedExhaustIter); }); }); } else { - _pool->schedule([this, cbState](auto status) { + _pool->schedule([this, cbState, expectedExhaustIter](auto status) { if (ErrorCodes::isCancelationError(status.code())) { stdx::lock_guard<Latch> lk(_mutex); @@ -763,14 +802,15 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call fassert(4615617, status); } - runCallbackExhaust(cbState); + runCallbackExhaust(cbState, expectedExhaustIter); }); } _net->signalWorkAvailable(); } -void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) { +void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState, + WorkQueue::iterator expectedExhaustIter) { CallbackHandle cbHandle; setCallbackForHandle(&cbHandle, cbState); CallbackArgs args(this, @@ -778,21 +818,35 @@ void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> c cbState->canceled.load() ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) : Status::OK()); - invariant(!cbState->isFinished.load()); - { - // After running callback function, clear 'cbStateArg->callback' to release any resources - // that might be held by this function object. - // Swap 'cbStateArg->callback' with temporary copy before running callback for exception - // safety. - TaskExecutor::CallbackFn callback; + + if (!cbState->isFinished.load()) { + TaskExecutor::CallbackFn callback = [](const CallbackArgs&) {}; std::swap(cbState->callback, callback); callback(std::move(args)); + + // Leave the empty callback function if the request has been marked canceled or finished + // while running the callback to avoid leaking resources. + if (!cbState->canceled.load() && !cbState->isFinished.load()) { + std::swap(callback, cbState->callback); + } } - // Do not mark cbState as finished. It will be marked as finished on the last reply. + // Do not mark cbState as finished. It will be marked as finished on the last reply which is + // handled in 'runCallback'. stdx::lock_guard<Latch> lk(_mutex); - invariant(cbState->exhaustIter); - _poolInProgressQueue.erase(cbState->exhaustIter.get()); + + // It is possible that we receive multiple responses in quick succession. If this happens, the + // later responses can overwrite the 'exhaustIter' value on the cbState when adding the cbState + // to the '_poolInProgressQueue' if the previous responses have not been run yet. We take in the + // 'expectedExhaustIter' so that we can still remove this task from the 'poolInProgressQueue' if + // this happens, but we do not want to reset the 'exhaustIter' value in this case. + if (cbState->exhaustIter) { + _poolInProgressQueue.erase(expectedExhaustIter); + if (cbState->exhaustIter.get() == expectedExhaustIter) { + cbState->exhaustIter = boost::none; + } + } + if (_inShutdown_inlock() && _poolInProgressQueue.empty()) { _stateChange.notify_all(); } diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 4fbd38b2ba6..16230554948 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -198,7 +198,8 @@ private: /** * Executes the callback specified by "cbState". Will not mark cbState as finished. */ - void runCallbackExhaust(std::shared_ptr<CallbackState> cbState); + void runCallbackExhaust(std::shared_ptr<CallbackState> cbState, + WorkQueue::iterator expectedExhaustIter); bool _inShutdown_inlock() const; void _setState_inlock(State newState); diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp index d5eeeac0504..f6a8ccf4f75 100644 --- a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp @@ -163,11 +163,6 @@ TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) { ASSERT(cbHandle.isValid()); executor()->cancel(cbHandle); ASSERT(cbHandle.isCanceled()); - auto counters = exhaustRequestHandler.getCountersWhenReady(); - - // The command was cancelled so the 'fail' counter should be incremented - ASSERT_EQ(counters._success, 2); - ASSERT_EQ(counters._failed, 1); // The tasks should be removed after 'isMaster' fails ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5))); diff --git a/src/mongo/transport/transport_layer_asio_integration_test.cpp b/src/mongo/transport/transport_layer_asio_integration_test.cpp index ec7981b615a..9cf669ec978 100644 --- a/src/mongo/transport/transport_layer_asio_integration_test.cpp +++ b/src/mongo/transport/transport_layer_asio_integration_test.cpp @@ -152,42 +152,6 @@ TEST(TransportLayerASIO, asyncConnectTimeoutCleansUpSocket) { ASSERT_EQ(client.getStatus(), ErrorCodes::NetworkTimeout); } -class ExhaustRequestHandlerUtil { -public: - AsyncDBClient::RemoteCommandCallbackFn&& getExhaustRequestCallbackFn() { - return std::move(_callbackFn); - } - - executor::RemoteCommandResponse getReplyObjectWhenReady() { - stdx::unique_lock<Latch> lk(_mutex); - _cv.wait(_mutex, [&] { return _replyUpdated; }); - _replyUpdated = false; - return _reply; - } - -private: - // holds the server's response once it sent one - executor::RemoteCommandResponse _reply; - // set to true once 'reply' has been set. Used to indicate that a new response has been set and - // should be inspected. - bool _replyUpdated = false; - - Mutex _mutex = MONGO_MAKE_LATCH(); - stdx::condition_variable _cv; - - // called when a server sends a new isMaster exhaust response. Updates _reply and _replyUpdated. - AsyncDBClient::RemoteCommandCallbackFn _callbackFn = - [&](const executor::RemoteCommandResponse& response, bool isMoreToComeSet) { - { - stdx::unique_lock<Latch> lk(_mutex); - _reply = response; - _replyUpdated = true; - } - - _cv.notify_all(); - }; -}; - TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { auto connectionString = unittest::getFixtureConnectionString(); auto server = connectionString.getServers().front(); @@ -217,29 +181,27 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { BSONObj(), nullptr}; - ExhaustRequestHandlerUtil exhaustRequestHandler; - Future<void> exhaustFuture = handle->runExhaustCommandRequest( - isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn()); + Future<executor::RemoteCommandResponse> beginExhaustFuture = + handle->beginExhaustCommandRequest(isMasterRequest); Date_t prevTime; TopologyVersion topologyVersion; { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = beginExhaustFuture.get(); - ASSERT(!exhaustFuture.isReady()); ASSERT_OK(reply.status); + ASSERT(reply.moreToCome); prevTime = reply.data.getField("localTime").Date(); topologyVersion = TopologyVersion::parse(IDLParserErrorContext("TopologyVersion"), reply.data.getField("topologyVersion").Obj()); } + Future<executor::RemoteCommandResponse> awaitExhaustFuture = handle->awaitExhaustCommand(); { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = awaitExhaustFuture.get(); - // The moreToCome bit is still set - ASSERT(!exhaustFuture.isReady()); ASSERT_OK(reply.status); - + ASSERT(reply.moreToCome); auto replyTime = reply.data.getField("localTime").Date(); ASSERT_GT(replyTime, prevTime); @@ -249,12 +211,22 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldReceiveMultipleReplies) { ASSERT_EQ(replyTopologyVersion.getCounter(), topologyVersion.getCounter()); } - handle->cancel(); - handle->end(); - auto error = exhaustFuture.getNoThrow(); - // exhaustFuture will resolve with CallbackCanceled unless the socket is already closed, in - // which case it will resolve with HostUnreachable. - ASSERT((error == ErrorCodes::CallbackCanceled) || (error == ErrorCodes::HostUnreachable)); + Future<executor::RemoteCommandResponse> cancelExhaustFuture = handle->awaitExhaustCommand(); + { + handle->cancel(); + handle->end(); + auto swReply = cancelExhaustFuture.getNoThrow(); + + // The original isMaster request has maxAwaitTimeMs = 1000 ms, if the cancel executes before + // the 1000ms then we expect the future to resolve with an error. It should resolve with + // CallbackCanceled unless the socket is already closed, in which case it will resolve with + // HostUnreachable. If the network is slow, the server may response before the cancel + // executes however. + if (!swReply.getStatus().isOK()) { + ASSERT((swReply.getStatus() == ErrorCodes::CallbackCanceled) || + (swReply.getStatus() == ErrorCodes::HostUnreachable)); + } + } } TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { @@ -323,16 +295,14 @@ TEST(TransportLayerASIO, exhaustIsMasterShouldStopOnFailure) { BSONObj(), nullptr}; - ExhaustRequestHandlerUtil exhaustRequestHandler; - Future<void> exhaustFuture = isMasterHandle->runExhaustCommandRequest( - isMasterRequest, exhaustRequestHandler.getExhaustRequestCallbackFn()); - + Future<executor::RemoteCommandResponse> beginExhaustFuture = + isMasterHandle->beginExhaustCommandRequest(isMasterRequest); { - auto reply = exhaustRequestHandler.getReplyObjectWhenReady(); + auto reply = beginExhaustFuture.get(); - exhaustFuture.get(); ASSERT_OK(reply.status); ASSERT_EQ(reply.data["ok"].Double(), 0.0); + ASSERT(!reply.moreToCome); } } |