diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-04-26 16:00:52 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2016-05-02 16:18:54 -0400 |
commit | 7bf7e4ee94def9b1a61d4cd6f1f8a50e315a0324 (patch) | |
tree | 07cc8824c697f3a578b089442a6a71add6888b44 /src | |
parent | 2a7f7fa63e555f4bc0e42e831fbb0b3ab3dac568 (diff) | |
download | mongo-7bf7e4ee94def9b1a61d4cd6f1f8a50e315a0324.tar.gz |
SERVER-23660 Implement scatter gather runner to scan all nodes for highest oplog entry
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/SConscript | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_scanner.cpp | 125 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_scanner.h | 117 | ||||
-rw-r--r-- | src/mongo/db/repl/freshness_scanner_test.cpp | 220 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 77 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock_test.cpp | 47 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_common.cpp | 6 |
8 files changed, 569 insertions, 44 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 8550b6560ba..4d0974f587d 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -337,6 +337,7 @@ env.Library('repl_coordinator_impl', 'check_quorum_for_config_change.cpp', 'elect_cmd_runner.cpp', 'freshness_checker.cpp', + 'freshness_scanner.cpp', 'repl_client_info.cpp', 'replica_set_config_checks.cpp', 'replication_coordinator_impl.cpp', @@ -935,3 +936,17 @@ env.CppUnitTest( ], ) +env.CppUnitTest( + target='freshness_scanner_test', + source=[ + 'freshness_scanner_test.cpp', + ], + LIBDEPS=[ + 'repl_coordinator_impl', + 'replica_set_messages', + 'replmocks', + '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', + '$BUILD_DIR/mongo/db/commands_test_crutch', + '$BUILD_DIR/mongo/db/service_context_noop_init', + ], +) diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp new file mode 100644 index 00000000000..ed4f160ee8f --- /dev/null +++ b/src/mongo/db/repl/freshness_scanner.cpp @@ -0,0 +1,125 @@ +/** + * Copyright 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/freshness_scanner.h" + +#include "mongo/base/status.h" +#include "mongo/db/repl/bson_extract_optime.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_runner.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { + +using executor::RemoteCommandRequest; + +FreshnessScanner::Algorithm::Algorithm(const ReplicaSetConfig& rsConfig, + int myIndex, + Milliseconds timeout) + : _rsConfig(rsConfig), _myIndex(myIndex), _timeout(timeout) { + for (int index = 0; index < _rsConfig.getNumMembers(); index++) { + if (index != _myIndex) { + _targets.push_back(_rsConfig.getMemberAt(index).getHostAndPort()); + } + } + _totalRequests = _targets.size(); +} + +std::vector<RemoteCommandRequest> FreshnessScanner::Algorithm::getRequests() const { + BSONObjBuilder cmdBuilder; + cmdBuilder << "replSetGetStatus" << 1; + const BSONObj getStatusCmd = cmdBuilder.obj(); + + std::vector<RemoteCommandRequest> requests; + for (auto& target : _targets) { + requests.push_back(RemoteCommandRequest(target, "admin", getStatusCmd, _timeout)); + } + return requests; +} + +void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& request, + const ResponseStatus& response) { + _responsesProcessed++; + if (!response.isOK()) { // failed response + LOG(2) << "FreshnessScanner: Got failed response from " << request.target << ": " + << response.getStatus(); + } else { + BSONObj opTimesObj = response.getValue().data.getObjectField("optimes"); + OpTime lastOpTime; + Status status = bsonExtractOpTimeField(opTimesObj, "appliedOpTime", &lastOpTime); + if (!status.isOK()) { + return; + } + + int index = _rsConfig.findMemberIndexByHostAndPort(request.target); + FreshnessInfo freshnessInfo{index, lastOpTime}; + + auto cmp = + [](const FreshnessInfo& a, const FreshnessInfo& b) { return a.opTime > b.opTime; }; + auto iter = + std::upper_bound(_freshnessInfos.begin(), _freshnessInfos.end(), freshnessInfo, cmp); + _freshnessInfos.insert(iter, freshnessInfo); + } +} + +bool FreshnessScanner::Algorithm::hasReceivedSufficientResponses() const { + return _responsesProcessed == _totalRequests; +} + +FreshnessScanner::Result FreshnessScanner::Algorithm::getResult() const { + invariant(hasReceivedSufficientResponses()); + return _freshnessInfos; +} + +StatusWith<ReplicationExecutor::EventHandle> FreshnessScanner::start( + ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + int myIndex, + Milliseconds timeout) { + _algorithm.reset(new Algorithm(rsConfig, myIndex, timeout)); + _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor)); + return _runner->start(); +} + +void FreshnessScanner::cancel() { + _runner->cancel(); +} + +FreshnessScanner::Result FreshnessScanner::getResult() const { + return _algorithm->getResult(); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/freshness_scanner.h b/src/mongo/db/repl/freshness_scanner.h new file mode 100644 index 00000000000..af71f5912e1 --- /dev/null +++ b/src/mongo/db/repl/freshness_scanner.h @@ -0,0 +1,117 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/bson/timestamp.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/repl/scatter_gather_algorithm.h" +#include "mongo/db/repl/scatter_gather_runner.h" +#include "mongo/stdx/functional.h" + +namespace mongo { + +class Status; + +namespace repl { + +class ScatterGatherRunner; + +class FreshnessScanner { + MONGO_DISALLOW_COPYING(FreshnessScanner); + +public: + struct FreshnessInfo { + // The index of node in ReplicaSetConfig. + int index; + // The latest applied opTime on that node. + OpTime opTime; + }; + + using Result = std::vector<FreshnessInfo>; + + class Algorithm : public ScatterGatherAlgorithm { + public: + Algorithm(const ReplicaSetConfig& rsConfig, int myIndex, Milliseconds timeout); + virtual std::vector<executor::RemoteCommandRequest> getRequests() const; + virtual void processResponse(const executor::RemoteCommandRequest& request, + const ResponseStatus& response); + virtual bool hasReceivedSufficientResponses() const; + + /** + * Returns a sorted list of nodes in descending lastAppliedOptime order. + * + * It is invalid to call this before hasReceivedSufficientResponses returns true. + */ + Result getResult() const; + + private: + const ReplicaSetConfig _rsConfig; + std::vector<HostAndPort> _targets; + const int _myIndex; + const Milliseconds _timeout; + Result _freshnessInfos; + int _responsesProcessed = 0; + int _totalRequests = 0; + }; + + FreshnessScanner() = default; + virtual ~FreshnessScanner() = default; + + /** + * Begins the process of sending replSetGetFreshness commands to all nodes + * in currentConfig, in attempt to find the most up-to-date oplog. + * + * evh can be used to schedule a callback when the process is complete. + * If this function returns Status::OK(), evh is then guaranteed to be signaled. + **/ + StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor, + const ReplicaSetConfig& rsConfig, + int myIndex, + Milliseconds timeout); + + /** + * Informs the FreshnessScanner to cancel further processing. + */ + void cancel(); + + Result getResult() const; + +private: + std::unique_ptr<Algorithm> _algorithm; + std::unique_ptr<ScatterGatherRunner> _runner; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/freshness_scanner_test.cpp b/src/mongo/db/repl/freshness_scanner_test.cpp new file mode 100644 index 00000000000..5096b4ce9f2 --- /dev/null +++ b/src/mongo/db/repl/freshness_scanner_test.cpp @@ -0,0 +1,220 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/status.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/freshness_scanner.h" +#include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/thread.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace repl { +namespace { + +using executor::NetworkInterfaceMock; +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; +using unittest::assertGet; + +class FreshnessScannerTest : public mongo::unittest::Test { +public: + NetworkInterfaceMock* getNet() { + return _net; + } + ReplicationExecutor* getExecutor() { + return _executor.get(); + } + + virtual void setUp() { + ASSERT_OK( + _config.initialize(BSON("_id" + << "rs0" + << "version" << 1 << "members" + << BSON_ARRAY( + BSON("_id" << 0 << "host" + << "host0") + << BSON("_id" << 1 << "host" + << "host1") << BSON("_id" << 2 << "host" + << "host2") + << BSON("_id" << 3 << "host" + << "host3" + << "votes" << 0 << "priority" << 0) + << BSON("_id" << 4 << "host" + << "host4" + << "votes" << 0 << "priority" << 0))))); + ASSERT_OK(_config.validate()); + + _net = new NetworkInterfaceMock; + _executor = stdx::make_unique<ReplicationExecutor>(_net, 1 /* prng seed */); + _executorThread = + stdx::make_unique<stdx::thread>(stdx::bind(&ReplicationExecutor::run, _executor.get())); + } + + virtual void tearDown() { + _executor->shutdown(); + _executorThread->join(); + } + +protected: + RemoteCommandRequest requestFrom(std::string hostname) { + return RemoteCommandRequest(HostAndPort(hostname), + "", // fields do not matter in FreshnessScanner + BSONObj(), + Milliseconds(0)); + } + + ResponseStatus makeResponseStatus(BSONObj response) { + return ResponseStatus( + NetworkInterfaceMock::Response(response, BSONObj(), Milliseconds(10))); + } + + ResponseStatus badResponseStatus() { + return ResponseStatus(ErrorCodes::NodeNotFound, "not on my watch"); + } + + ResponseStatus goodResponseStatus(Timestamp timestamp, long long term) { + // OpTime part of replSetGetStatus. + BSONObj response = + BSON("optimes" << BSON("appliedOpTime" << OpTime(timestamp, term).toBSON())); + return makeResponseStatus(response); + } + + ReplicaSetConfig _config; + +private: + // owned by _executor + NetworkInterfaceMock* _net; + std::unique_ptr<ReplicationExecutor> _executor; + std::unique_ptr<stdx::thread> _executorThread; +}; + +TEST_F(FreshnessScannerTest, ImmediateGoodResponse) { + FreshnessScanner::Algorithm algo(_config, 0, Milliseconds(2000)); + + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + algo.processResponse(requestFrom("host1"), goodResponseStatus(Timestamp(1, 100), 1)); + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + algo.processResponse(requestFrom("host2"), goodResponseStatus(Timestamp(1, 200), 1)); + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + algo.processResponse(requestFrom("host3"), goodResponseStatus(Timestamp(1, 400), 1)); + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + algo.processResponse(requestFrom("host4"), goodResponseStatus(Timestamp(1, 300), 1)); + ASSERT_TRUE(algo.hasReceivedSufficientResponses()); + ASSERT_EQUALS((size_t)4, algo.getResult().size()); + ASSERT_EQUALS(3, algo.getResult().front().index); + ASSERT_EQUALS(OpTime(Timestamp(1, 400), 1), algo.getResult().front().opTime); + ASSERT_EQUALS(1, algo.getResult().back().index); + ASSERT_EQUALS(OpTime(Timestamp(1, 100), 1), algo.getResult().back().opTime); +} + +TEST_F(FreshnessScannerTest, ImmediateBadResponse) { + FreshnessScanner::Algorithm algo(_config, 0, Milliseconds(2000)); + + // Cannot access host 1 and host 2. + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + algo.processResponse(requestFrom("host1"), badResponseStatus()); + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + algo.processResponse(requestFrom("host2"), badResponseStatus()); + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + + // host 3 is in an old version, which doesn't include OpTimes in the response. + algo.processResponse(requestFrom("host3"), makeResponseStatus(BSONObj())); + ASSERT_FALSE(algo.hasReceivedSufficientResponses()); + + // Responses from host 4 in PV0 are considered as bad responses. + auto response4 = BSON("optimes" << BSON("appliedOpTime" << Timestamp(1, 300))); + algo.processResponse(requestFrom("host4"), makeResponseStatus(response4)); + ASSERT_TRUE(algo.hasReceivedSufficientResponses()); + ASSERT_EQUALS((size_t)0, algo.getResult().size()); +} + +TEST_F(FreshnessScannerTest, AllResponsesTimeout) { + Milliseconds timeout(2000); + FreshnessScanner scanner; + scanner.start(getExecutor(), _config, 0, timeout); + + auto net = getNet(); + net->enterNetwork(); + ASSERT(net->hasReadyRequests()); + // Black hole all requests. + while (net->hasReadyRequests()) { + net->blackHole(net->getNextReadyRequest()); + } + auto later = net->now() + Milliseconds(2010); + ASSERT_EQ(later, net->runUntil(later)); + net->exitNetwork(); + ASSERT_EQUALS((size_t)0, scanner.getResult().size()); +} + + +TEST_F(FreshnessScannerTest, BadResponsesAndTimeout) { + Milliseconds timeout(2000); + FreshnessScanner scanner; + scanner.start(getExecutor(), _config, 0, timeout); + + auto net = getNet(); + net->enterNetwork(); + + Date_t later = net->now() + Milliseconds(10); + // host 1 returns good response. + ASSERT(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + HostAndPort successfulHost = noi->getRequest().target; + net->scheduleResponse(noi, later, goodResponseStatus(Timestamp(1, 100), 1)); + + // host 2 has a bad connection. + ASSERT(net->hasReadyRequests()); + net->scheduleResponse(net->getNextReadyRequest(), later, badResponseStatus()); + + // host 3 and 4 time out. + ASSERT(net->hasReadyRequests()); + net->blackHole(net->getNextReadyRequest()); + ASSERT(net->hasReadyRequests()); + net->blackHole(net->getNextReadyRequest()); + + // Advance the clock. + ASSERT(!net->hasReadyRequests()); + getNet()->runUntil(getNet()->now() + Milliseconds(2010)); + getNet()->exitNetwork(); + + ASSERT_EQUALS((size_t)1, scanner.getResult().size()); + auto freshnessInfo = scanner.getResult().front(); + ASSERT_EQUALS(_config.findMemberIndexByHostAndPort(successfulHost), freshnessInfo.index); + ASSERT_EQUALS(OpTime(Timestamp(1, 100), 1), freshnessInfo.opTime); +} + +} // namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 341b2b05afb..054693178d2 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -44,6 +44,9 @@ namespace mongo { namespace executor { +using CallbackHandle = TaskExecutor::CallbackHandle; +using ResponseStatus = TaskExecutor::ResponseStatus; + NetworkInterfaceMock::NetworkInterfaceMock() : _waitingToRunMask(0), _currentlyRunning(kNoThread), @@ -75,7 +78,7 @@ std::string NetworkInterfaceMock::getHostName() { return "thisisourhostname"; } -Status NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle, +Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle, const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { if (inShutdown()) { @@ -109,37 +112,25 @@ void NetworkInterfaceMock::setHandshakeReplyForHost( } } -static bool findAndCancelIf( - const stdx::function<bool(const NetworkInterfaceMock::NetworkOperation&)>& matchFn, - NetworkInterfaceMock::NetworkOperationList* other, - NetworkInterfaceMock::NetworkOperationList* scheduled, - const Date_t now) { - const NetworkInterfaceMock::NetworkOperationIterator noi = - std::find_if(other->begin(), other->end(), matchFn); - if (noi == other->end()) { - return false; - } - scheduled->splice(scheduled->begin(), *other, noi); - noi->setResponse( - now, - TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Network operation canceled")); - return true; -} - -void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { +void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) { invariant(!inShutdown()); stdx::lock_guard<stdx::mutex> lk(_mutex); - stdx::function<bool(const NetworkOperation&)> matchesHandle = - stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle); - const Date_t now = _now_inlock(); - if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) { - return; - } - if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) { - return; - } - if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) { + ResponseStatus response(ErrorCodes::CallbackCanceled, "Network operation canceled"); + _cancelCommand_inlock(cbHandle, response); +} + + +void NetworkInterfaceMock::_cancelCommand_inlock(const CallbackHandle& cbHandle, + const ResponseStatus& response) { + auto matchFn = stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle); + for (auto list : {&_unscheduled, &_blackHoled, &_scheduled}) { + auto noi = std::find_if(list->begin(), list->end(), matchFn); + if (noi == list->end()) { + continue; + } + _scheduled.splice(_scheduled.begin(), *list, noi); + noi->setResponse(_now_inlock(), response); return; } // No not-in-progress network command matched cbHandle. Oh, well. @@ -191,9 +182,8 @@ void NetworkInterfaceMock::shutdown() { _waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling. lk.unlock(); for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) { - iter->setResponse(now, - TaskExecutor::ResponseStatus(ErrorCodes::ShutdownInProgress, - "Shutting down mock network")); + iter->setResponse( + now, ResponseStatus(ErrorCodes::ShutdownInProgress, "Shutting down mock network")); iter->finishResponse(); } lk.lock(); @@ -264,7 +254,7 @@ NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfU void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi, Date_t when, - const TaskExecutor::ResponseStatus& response) { + const ResponseStatus& response) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(_currentlyRunning == kNetworkThread); NetworkOperationIterator insertBefore = _scheduled.begin(); @@ -363,6 +353,14 @@ void NetworkInterfaceMock::_enqueueOperation_inlock( }); _unscheduled.emplace(insertBefore, std::move(op)); + + if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) { + invariant(op.getRequest().timeout >= Milliseconds(0)); + ResponseStatus response(ErrorCodes::NetworkTimeout, "Network timeout"); + auto action = stdx::bind( + &NetworkInterfaceMock::_cancelCommand_inlock, this, op.getCallbackHandle(), response); + _alarms.emplace(_now_inlock() + op.getRequest().timeout, action); + } } void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target, @@ -528,11 +526,10 @@ NetworkInterfaceMock::NetworkOperation::NetworkOperation() _response(kUnsetResponse), _onFinish() {} -NetworkInterfaceMock::NetworkOperation::NetworkOperation( - const TaskExecutor::CallbackHandle& cbHandle, - const RemoteCommandRequest& theRequest, - Date_t theRequestDate, - const RemoteCommandCompletionFn& onFinish) +NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle, + const RemoteCommandRequest& theRequest, + Date_t theRequestDate, + const RemoteCommandCompletionFn& onFinish) : _requestDate(theRequestDate), _nextConsiderationDate(theRequestDate), _responseDate(), @@ -549,8 +546,8 @@ void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate( _nextConsiderationDate = nextConsiderationDate; } -void NetworkInterfaceMock::NetworkOperation::setResponse( - Date_t responseDate, const TaskExecutor::ResponseStatus& response) { +void NetworkInterfaceMock::NetworkOperation::setResponse(Date_t responseDate, + const ResponseStatus& response) { invariant(responseDate >= _requestDate); _responseDate = responseDate; _response = response; diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 787d6415817..3d69bad78a5 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -202,6 +202,12 @@ public: */ void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply); + /** + * Cancel a command with specified response, e.g. NetworkTimeout or CallbackCanceled errors. + */ + void _cancelCommand_inlock(const TaskExecutor::CallbackHandle& cbHandle, + const TaskExecutor::ResponseStatus& response); + private: /** * Information describing a scheduled alarm. diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp index a5bb39434b1..4e1e3f45a35 100644 --- a/src/mongo/executor/network_interface_mock_test.cpp +++ b/src/mongo/executor/network_interface_mock_test.cpp @@ -396,6 +396,53 @@ TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) { ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {})); } +TEST_F(NetworkInterfaceMockTest, CommandTimeout) { + startNetwork(); + + TaskExecutor::CallbackHandle cb; + RemoteCommandRequest request; + request.timeout = Milliseconds(2000); + + ErrorCodes::Error statusPropagated = ErrorCodes::OK; + auto finishFn = + [&](StatusWith<RemoteCommandResponse> resp) { statusPropagated = resp.getStatus().code(); }; + + // + // Command times out. + // + ASSERT_OK(net().startCommand(cb, request, finishFn)); + net().enterNetwork(); + ASSERT(net().hasReadyRequests()); + net().blackHole(net().getNextReadyRequest()); + net().runUntil(net().now() + Milliseconds(2010)); + net().exitNetwork(); + ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated); + + // + // Command finishes before timeout. + // + Date_t start = net().now(); + + ASSERT_OK(net().startCommand(cb, request, finishFn)); + net().enterNetwork(); + // Consume the request. We'll schedule a successful response later. + ASSERT(net().hasReadyRequests()); + auto noi = net().getNextReadyRequest(); + + // Assert the command hasn't timed out after 1000ms. + net().runUntil(start + Milliseconds(1000)); + ASSERT_EQUALS(start + Milliseconds(1000), net().now()); + ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated); + // Reply with a successful response. + StatusWith<RemoteCommandResponse> responseStatus(RemoteCommandResponse{}); + net().scheduleResponse(noi, net().now(), responseStatus); + net().runReadyNetworkOperations(); + net().exitNetwork(); + ASSERT_EQUALS(ErrorCodes::OK, statusPropagated); + ASSERT_EQUALS(start + Milliseconds(1000), net().now()); +} + + } // namespace } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index d57f2e15434..f427bdd40b4 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -439,14 +439,12 @@ COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) { const Date_t startTime = net->now(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); net->scheduleResponse( - noi, - startTime + Milliseconds(2), - TaskExecutor::ResponseStatus(ErrorCodes::ExceededTimeLimit, "I took too long")); + noi, startTime + Milliseconds(2), TaskExecutor::ResponseStatus(RemoteCommandResponse{})); net->runUntil(startTime + Milliseconds(2)); ASSERT_EQUALS(startTime + Milliseconds(2), net->now()); net->exitNetwork(); executor.wait(cbHandle); - ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status); + ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status); } COMMON_EXECUTOR_TEST(CallbackHandleComparison) { |