summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/SConscript15
-rw-r--r--src/mongo/db/repl/freshness_scanner.cpp125
-rw-r--r--src/mongo/db/repl/freshness_scanner.h117
-rw-r--r--src/mongo/db/repl/freshness_scanner_test.cpp220
-rw-r--r--src/mongo/executor/network_interface_mock.cpp77
-rw-r--r--src/mongo/executor/network_interface_mock.h6
-rw-r--r--src/mongo/executor/network_interface_mock_test.cpp47
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp6
8 files changed, 569 insertions, 44 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 8550b6560ba..4d0974f587d 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -337,6 +337,7 @@ env.Library('repl_coordinator_impl',
'check_quorum_for_config_change.cpp',
'elect_cmd_runner.cpp',
'freshness_checker.cpp',
+ 'freshness_scanner.cpp',
'repl_client_info.cpp',
'replica_set_config_checks.cpp',
'replication_coordinator_impl.cpp',
@@ -935,3 +936,17 @@ env.CppUnitTest(
],
)
+env.CppUnitTest(
+ target='freshness_scanner_test',
+ source=[
+ 'freshness_scanner_test.cpp',
+ ],
+ LIBDEPS=[
+ 'repl_coordinator_impl',
+ 'replica_set_messages',
+ 'replmocks',
+ '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
+ '$BUILD_DIR/mongo/db/commands_test_crutch',
+ '$BUILD_DIR/mongo/db/service_context_noop_init',
+ ],
+)
diff --git a/src/mongo/db/repl/freshness_scanner.cpp b/src/mongo/db/repl/freshness_scanner.cpp
new file mode 100644
index 00000000000..ed4f160ee8f
--- /dev/null
+++ b/src/mongo/db/repl/freshness_scanner.cpp
@@ -0,0 +1,125 @@
+/**
+ * Copyright 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/freshness_scanner.h"
+
+#include "mongo/base/status.h"
+#include "mongo/db/repl/bson_extract_optime.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/scatter_gather_runner.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace repl {
+
+using executor::RemoteCommandRequest;
+
+FreshnessScanner::Algorithm::Algorithm(const ReplicaSetConfig& rsConfig,
+ int myIndex,
+ Milliseconds timeout)
+ : _rsConfig(rsConfig), _myIndex(myIndex), _timeout(timeout) {
+ for (int index = 0; index < _rsConfig.getNumMembers(); index++) {
+ if (index != _myIndex) {
+ _targets.push_back(_rsConfig.getMemberAt(index).getHostAndPort());
+ }
+ }
+ _totalRequests = _targets.size();
+}
+
+std::vector<RemoteCommandRequest> FreshnessScanner::Algorithm::getRequests() const {
+ BSONObjBuilder cmdBuilder;
+ cmdBuilder << "replSetGetStatus" << 1;
+ const BSONObj getStatusCmd = cmdBuilder.obj();
+
+ std::vector<RemoteCommandRequest> requests;
+ for (auto& target : _targets) {
+ requests.push_back(RemoteCommandRequest(target, "admin", getStatusCmd, _timeout));
+ }
+ return requests;
+}
+
+void FreshnessScanner::Algorithm::processResponse(const RemoteCommandRequest& request,
+ const ResponseStatus& response) {
+ _responsesProcessed++;
+ if (!response.isOK()) { // failed response
+ LOG(2) << "FreshnessScanner: Got failed response from " << request.target << ": "
+ << response.getStatus();
+ } else {
+ BSONObj opTimesObj = response.getValue().data.getObjectField("optimes");
+ OpTime lastOpTime;
+ Status status = bsonExtractOpTimeField(opTimesObj, "appliedOpTime", &lastOpTime);
+ if (!status.isOK()) {
+ return;
+ }
+
+ int index = _rsConfig.findMemberIndexByHostAndPort(request.target);
+ FreshnessInfo freshnessInfo{index, lastOpTime};
+
+ auto cmp =
+ [](const FreshnessInfo& a, const FreshnessInfo& b) { return a.opTime > b.opTime; };
+ auto iter =
+ std::upper_bound(_freshnessInfos.begin(), _freshnessInfos.end(), freshnessInfo, cmp);
+ _freshnessInfos.insert(iter, freshnessInfo);
+ }
+}
+
+bool FreshnessScanner::Algorithm::hasReceivedSufficientResponses() const {
+ return _responsesProcessed == _totalRequests;
+}
+
+FreshnessScanner::Result FreshnessScanner::Algorithm::getResult() const {
+ invariant(hasReceivedSufficientResponses());
+ return _freshnessInfos;
+}
+
+StatusWith<ReplicationExecutor::EventHandle> FreshnessScanner::start(
+ ReplicationExecutor* executor,
+ const ReplicaSetConfig& rsConfig,
+ int myIndex,
+ Milliseconds timeout) {
+ _algorithm.reset(new Algorithm(rsConfig, myIndex, timeout));
+ _runner.reset(new ScatterGatherRunner(_algorithm.get(), executor));
+ return _runner->start();
+}
+
+void FreshnessScanner::cancel() {
+ _runner->cancel();
+}
+
+FreshnessScanner::Result FreshnessScanner::getResult() const {
+ return _algorithm->getResult();
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/freshness_scanner.h b/src/mongo/db/repl/freshness_scanner.h
new file mode 100644
index 00000000000..af71f5912e1
--- /dev/null
+++ b/src/mongo/db/repl/freshness_scanner.h
@@ -0,0 +1,117 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replica_set_config.h"
+#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/repl/scatter_gather_algorithm.h"
+#include "mongo/db/repl/scatter_gather_runner.h"
+#include "mongo/stdx/functional.h"
+
+namespace mongo {
+
+class Status;
+
+namespace repl {
+
+class ScatterGatherRunner;
+
+class FreshnessScanner {
+ MONGO_DISALLOW_COPYING(FreshnessScanner);
+
+public:
+ struct FreshnessInfo {
+ // The index of node in ReplicaSetConfig.
+ int index;
+ // The latest applied opTime on that node.
+ OpTime opTime;
+ };
+
+ using Result = std::vector<FreshnessInfo>;
+
+ class Algorithm : public ScatterGatherAlgorithm {
+ public:
+ Algorithm(const ReplicaSetConfig& rsConfig, int myIndex, Milliseconds timeout);
+ virtual std::vector<executor::RemoteCommandRequest> getRequests() const;
+ virtual void processResponse(const executor::RemoteCommandRequest& request,
+ const ResponseStatus& response);
+ virtual bool hasReceivedSufficientResponses() const;
+
+ /**
+ * Returns a sorted list of nodes in descending lastAppliedOptime order.
+ *
+ * It is invalid to call this before hasReceivedSufficientResponses returns true.
+ */
+ Result getResult() const;
+
+ private:
+ const ReplicaSetConfig _rsConfig;
+ std::vector<HostAndPort> _targets;
+ const int _myIndex;
+ const Milliseconds _timeout;
+ Result _freshnessInfos;
+ int _responsesProcessed = 0;
+ int _totalRequests = 0;
+ };
+
+ FreshnessScanner() = default;
+ virtual ~FreshnessScanner() = default;
+
+ /**
+ * Begins the process of sending replSetGetFreshness commands to all nodes
+ * in currentConfig, in attempt to find the most up-to-date oplog.
+ *
+ * evh can be used to schedule a callback when the process is complete.
+ * If this function returns Status::OK(), evh is then guaranteed to be signaled.
+ **/
+ StatusWith<ReplicationExecutor::EventHandle> start(ReplicationExecutor* executor,
+ const ReplicaSetConfig& rsConfig,
+ int myIndex,
+ Milliseconds timeout);
+
+ /**
+ * Informs the FreshnessScanner to cancel further processing.
+ */
+ void cancel();
+
+ Result getResult() const;
+
+private:
+ std::unique_ptr<Algorithm> _algorithm;
+ std::unique_ptr<ScatterGatherRunner> _runner;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/freshness_scanner_test.cpp b/src/mongo/db/repl/freshness_scanner_test.cpp
new file mode 100644
index 00000000000..5096b4ce9f2
--- /dev/null
+++ b/src/mongo/db/repl/freshness_scanner_test.cpp
@@ -0,0 +1,220 @@
+/**
+ * Copyright (C) 2016 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/status.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/freshness_scanner.h"
+#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/replication_executor.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace repl {
+namespace {
+
+using executor::NetworkInterfaceMock;
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+using unittest::assertGet;
+
+class FreshnessScannerTest : public mongo::unittest::Test {
+public:
+ NetworkInterfaceMock* getNet() {
+ return _net;
+ }
+ ReplicationExecutor* getExecutor() {
+ return _executor.get();
+ }
+
+ virtual void setUp() {
+ ASSERT_OK(
+ _config.initialize(BSON("_id"
+ << "rs0"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(
+ BSON("_id" << 0 << "host"
+ << "host0")
+ << BSON("_id" << 1 << "host"
+ << "host1") << BSON("_id" << 2 << "host"
+ << "host2")
+ << BSON("_id" << 3 << "host"
+ << "host3"
+ << "votes" << 0 << "priority" << 0)
+ << BSON("_id" << 4 << "host"
+ << "host4"
+ << "votes" << 0 << "priority" << 0)))));
+ ASSERT_OK(_config.validate());
+
+ _net = new NetworkInterfaceMock;
+ _executor = stdx::make_unique<ReplicationExecutor>(_net, 1 /* prng seed */);
+ _executorThread =
+ stdx::make_unique<stdx::thread>(stdx::bind(&ReplicationExecutor::run, _executor.get()));
+ }
+
+ virtual void tearDown() {
+ _executor->shutdown();
+ _executorThread->join();
+ }
+
+protected:
+ RemoteCommandRequest requestFrom(std::string hostname) {
+ return RemoteCommandRequest(HostAndPort(hostname),
+ "", // fields do not matter in FreshnessScanner
+ BSONObj(),
+ Milliseconds(0));
+ }
+
+ ResponseStatus makeResponseStatus(BSONObj response) {
+ return ResponseStatus(
+ NetworkInterfaceMock::Response(response, BSONObj(), Milliseconds(10)));
+ }
+
+ ResponseStatus badResponseStatus() {
+ return ResponseStatus(ErrorCodes::NodeNotFound, "not on my watch");
+ }
+
+ ResponseStatus goodResponseStatus(Timestamp timestamp, long long term) {
+ // OpTime part of replSetGetStatus.
+ BSONObj response =
+ BSON("optimes" << BSON("appliedOpTime" << OpTime(timestamp, term).toBSON()));
+ return makeResponseStatus(response);
+ }
+
+ ReplicaSetConfig _config;
+
+private:
+ // owned by _executor
+ NetworkInterfaceMock* _net;
+ std::unique_ptr<ReplicationExecutor> _executor;
+ std::unique_ptr<stdx::thread> _executorThread;
+};
+
+TEST_F(FreshnessScannerTest, ImmediateGoodResponse) {
+ FreshnessScanner::Algorithm algo(_config, 0, Milliseconds(2000));
+
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+ algo.processResponse(requestFrom("host1"), goodResponseStatus(Timestamp(1, 100), 1));
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+ algo.processResponse(requestFrom("host2"), goodResponseStatus(Timestamp(1, 200), 1));
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+ algo.processResponse(requestFrom("host3"), goodResponseStatus(Timestamp(1, 400), 1));
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+ algo.processResponse(requestFrom("host4"), goodResponseStatus(Timestamp(1, 300), 1));
+ ASSERT_TRUE(algo.hasReceivedSufficientResponses());
+ ASSERT_EQUALS((size_t)4, algo.getResult().size());
+ ASSERT_EQUALS(3, algo.getResult().front().index);
+ ASSERT_EQUALS(OpTime(Timestamp(1, 400), 1), algo.getResult().front().opTime);
+ ASSERT_EQUALS(1, algo.getResult().back().index);
+ ASSERT_EQUALS(OpTime(Timestamp(1, 100), 1), algo.getResult().back().opTime);
+}
+
+TEST_F(FreshnessScannerTest, ImmediateBadResponse) {
+ FreshnessScanner::Algorithm algo(_config, 0, Milliseconds(2000));
+
+ // Cannot access host 1 and host 2.
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+ algo.processResponse(requestFrom("host1"), badResponseStatus());
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+ algo.processResponse(requestFrom("host2"), badResponseStatus());
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+
+ // host 3 is in an old version, which doesn't include OpTimes in the response.
+ algo.processResponse(requestFrom("host3"), makeResponseStatus(BSONObj()));
+ ASSERT_FALSE(algo.hasReceivedSufficientResponses());
+
+ // Responses from host 4 in PV0 are considered as bad responses.
+ auto response4 = BSON("optimes" << BSON("appliedOpTime" << Timestamp(1, 300)));
+ algo.processResponse(requestFrom("host4"), makeResponseStatus(response4));
+ ASSERT_TRUE(algo.hasReceivedSufficientResponses());
+ ASSERT_EQUALS((size_t)0, algo.getResult().size());
+}
+
+TEST_F(FreshnessScannerTest, AllResponsesTimeout) {
+ Milliseconds timeout(2000);
+ FreshnessScanner scanner;
+ scanner.start(getExecutor(), _config, 0, timeout);
+
+ auto net = getNet();
+ net->enterNetwork();
+ ASSERT(net->hasReadyRequests());
+ // Black hole all requests.
+ while (net->hasReadyRequests()) {
+ net->blackHole(net->getNextReadyRequest());
+ }
+ auto later = net->now() + Milliseconds(2010);
+ ASSERT_EQ(later, net->runUntil(later));
+ net->exitNetwork();
+ ASSERT_EQUALS((size_t)0, scanner.getResult().size());
+}
+
+
+TEST_F(FreshnessScannerTest, BadResponsesAndTimeout) {
+ Milliseconds timeout(2000);
+ FreshnessScanner scanner;
+ scanner.start(getExecutor(), _config, 0, timeout);
+
+ auto net = getNet();
+ net->enterNetwork();
+
+ Date_t later = net->now() + Milliseconds(10);
+ // host 1 returns good response.
+ ASSERT(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ HostAndPort successfulHost = noi->getRequest().target;
+ net->scheduleResponse(noi, later, goodResponseStatus(Timestamp(1, 100), 1));
+
+ // host 2 has a bad connection.
+ ASSERT(net->hasReadyRequests());
+ net->scheduleResponse(net->getNextReadyRequest(), later, badResponseStatus());
+
+ // host 3 and 4 time out.
+ ASSERT(net->hasReadyRequests());
+ net->blackHole(net->getNextReadyRequest());
+ ASSERT(net->hasReadyRequests());
+ net->blackHole(net->getNextReadyRequest());
+
+ // Advance the clock.
+ ASSERT(!net->hasReadyRequests());
+ getNet()->runUntil(getNet()->now() + Milliseconds(2010));
+ getNet()->exitNetwork();
+
+ ASSERT_EQUALS((size_t)1, scanner.getResult().size());
+ auto freshnessInfo = scanner.getResult().front();
+ ASSERT_EQUALS(_config.findMemberIndexByHostAndPort(successfulHost), freshnessInfo.index);
+ ASSERT_EQUALS(OpTime(Timestamp(1, 100), 1), freshnessInfo.opTime);
+}
+
+} // namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 341b2b05afb..054693178d2 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -44,6 +44,9 @@
namespace mongo {
namespace executor {
+using CallbackHandle = TaskExecutor::CallbackHandle;
+using ResponseStatus = TaskExecutor::ResponseStatus;
+
NetworkInterfaceMock::NetworkInterfaceMock()
: _waitingToRunMask(0),
_currentlyRunning(kNoThread),
@@ -75,7 +78,7 @@ std::string NetworkInterfaceMock::getHostName() {
return "thisisourhostname";
}
-Status NetworkInterfaceMock::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
+Status NetworkInterfaceMock::startCommand(const CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish) {
if (inShutdown()) {
@@ -109,37 +112,25 @@ void NetworkInterfaceMock::setHandshakeReplyForHost(
}
}
-static bool findAndCancelIf(
- const stdx::function<bool(const NetworkInterfaceMock::NetworkOperation&)>& matchFn,
- NetworkInterfaceMock::NetworkOperationList* other,
- NetworkInterfaceMock::NetworkOperationList* scheduled,
- const Date_t now) {
- const NetworkInterfaceMock::NetworkOperationIterator noi =
- std::find_if(other->begin(), other->end(), matchFn);
- if (noi == other->end()) {
- return false;
- }
- scheduled->splice(scheduled->begin(), *other, noi);
- noi->setResponse(
- now,
- TaskExecutor::ResponseStatus(ErrorCodes::CallbackCanceled, "Network operation canceled"));
- return true;
-}
-
-void NetworkInterfaceMock::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+void NetworkInterfaceMock::cancelCommand(const CallbackHandle& cbHandle) {
invariant(!inShutdown());
stdx::lock_guard<stdx::mutex> lk(_mutex);
- stdx::function<bool(const NetworkOperation&)> matchesHandle =
- stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle);
- const Date_t now = _now_inlock();
- if (findAndCancelIf(matchesHandle, &_unscheduled, &_scheduled, now)) {
- return;
- }
- if (findAndCancelIf(matchesHandle, &_blackHoled, &_scheduled, now)) {
- return;
- }
- if (findAndCancelIf(matchesHandle, &_scheduled, &_scheduled, now)) {
+ ResponseStatus response(ErrorCodes::CallbackCanceled, "Network operation canceled");
+ _cancelCommand_inlock(cbHandle, response);
+}
+
+
+void NetworkInterfaceMock::_cancelCommand_inlock(const CallbackHandle& cbHandle,
+ const ResponseStatus& response) {
+ auto matchFn = stdx::bind(&NetworkOperation::isForCallback, stdx::placeholders::_1, cbHandle);
+ for (auto list : {&_unscheduled, &_blackHoled, &_scheduled}) {
+ auto noi = std::find_if(list->begin(), list->end(), matchFn);
+ if (noi == list->end()) {
+ continue;
+ }
+ _scheduled.splice(_scheduled.begin(), *list, noi);
+ noi->setResponse(_now_inlock(), response);
return;
}
// No not-in-progress network command matched cbHandle. Oh, well.
@@ -191,9 +182,8 @@ void NetworkInterfaceMock::shutdown() {
_waitingToRunMask |= kExecutorThread; // Prevents network thread from scheduling.
lk.unlock();
for (NetworkOperationIterator iter = todo.begin(); iter != todo.end(); ++iter) {
- iter->setResponse(now,
- TaskExecutor::ResponseStatus(ErrorCodes::ShutdownInProgress,
- "Shutting down mock network"));
+ iter->setResponse(
+ now, ResponseStatus(ErrorCodes::ShutdownInProgress, "Shutting down mock network"));
iter->finishResponse();
}
lk.lock();
@@ -264,7 +254,7 @@ NetworkInterfaceMock::NetworkOperationIterator NetworkInterfaceMock::getFrontOfU
void NetworkInterfaceMock::scheduleResponse(NetworkOperationIterator noi,
Date_t when,
- const TaskExecutor::ResponseStatus& response) {
+ const ResponseStatus& response) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(_currentlyRunning == kNetworkThread);
NetworkOperationIterator insertBefore = _scheduled.begin();
@@ -363,6 +353,14 @@ void NetworkInterfaceMock::_enqueueOperation_inlock(
});
_unscheduled.emplace(insertBefore, std::move(op));
+
+ if (op.getRequest().timeout != RemoteCommandRequest::kNoTimeout) {
+ invariant(op.getRequest().timeout >= Milliseconds(0));
+ ResponseStatus response(ErrorCodes::NetworkTimeout, "Network timeout");
+ auto action = stdx::bind(
+ &NetworkInterfaceMock::_cancelCommand_inlock, this, op.getCallbackHandle(), response);
+ _alarms.emplace(_now_inlock() + op.getRequest().timeout, action);
+ }
}
void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort& target,
@@ -528,11 +526,10 @@ NetworkInterfaceMock::NetworkOperation::NetworkOperation()
_response(kUnsetResponse),
_onFinish() {}
-NetworkInterfaceMock::NetworkOperation::NetworkOperation(
- const TaskExecutor::CallbackHandle& cbHandle,
- const RemoteCommandRequest& theRequest,
- Date_t theRequestDate,
- const RemoteCommandCompletionFn& onFinish)
+NetworkInterfaceMock::NetworkOperation::NetworkOperation(const CallbackHandle& cbHandle,
+ const RemoteCommandRequest& theRequest,
+ Date_t theRequestDate,
+ const RemoteCommandCompletionFn& onFinish)
: _requestDate(theRequestDate),
_nextConsiderationDate(theRequestDate),
_responseDate(),
@@ -549,8 +546,8 @@ void NetworkInterfaceMock::NetworkOperation::setNextConsiderationDate(
_nextConsiderationDate = nextConsiderationDate;
}
-void NetworkInterfaceMock::NetworkOperation::setResponse(
- Date_t responseDate, const TaskExecutor::ResponseStatus& response) {
+void NetworkInterfaceMock::NetworkOperation::setResponse(Date_t responseDate,
+ const ResponseStatus& response) {
invariant(responseDate >= _requestDate);
_responseDate = responseDate;
_response = response;
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 787d6415817..3d69bad78a5 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -202,6 +202,12 @@ public:
*/
void setHandshakeReplyForHost(const HostAndPort& host, RemoteCommandResponse&& reply);
+ /**
+ * Cancel a command with specified response, e.g. NetworkTimeout or CallbackCanceled errors.
+ */
+ void _cancelCommand_inlock(const TaskExecutor::CallbackHandle& cbHandle,
+ const TaskExecutor::ResponseStatus& response);
+
private:
/**
* Information describing a scheduled alarm.
diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp
index a5bb39434b1..4e1e3f45a35 100644
--- a/src/mongo/executor/network_interface_mock_test.cpp
+++ b/src/mongo/executor/network_interface_mock_test.cpp
@@ -396,6 +396,53 @@ TEST_F(NetworkInterfaceMockTest, SetAlarmReturnsNotOKIfShutdownHasStarted) {
ASSERT_NOT_OK(net().setAlarm(net().now() + Milliseconds(100), [] {}));
}
+TEST_F(NetworkInterfaceMockTest, CommandTimeout) {
+ startNetwork();
+
+ TaskExecutor::CallbackHandle cb;
+ RemoteCommandRequest request;
+ request.timeout = Milliseconds(2000);
+
+ ErrorCodes::Error statusPropagated = ErrorCodes::OK;
+ auto finishFn =
+ [&](StatusWith<RemoteCommandResponse> resp) { statusPropagated = resp.getStatus().code(); };
+
+ //
+ // Command times out.
+ //
+ ASSERT_OK(net().startCommand(cb, request, finishFn));
+ net().enterNetwork();
+ ASSERT(net().hasReadyRequests());
+ net().blackHole(net().getNextReadyRequest());
+ net().runUntil(net().now() + Milliseconds(2010));
+ net().exitNetwork();
+ ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated);
+
+ //
+ // Command finishes before timeout.
+ //
+ Date_t start = net().now();
+
+ ASSERT_OK(net().startCommand(cb, request, finishFn));
+ net().enterNetwork();
+ // Consume the request. We'll schedule a successful response later.
+ ASSERT(net().hasReadyRequests());
+ auto noi = net().getNextReadyRequest();
+
+ // Assert the command hasn't timed out after 1000ms.
+ net().runUntil(start + Milliseconds(1000));
+ ASSERT_EQUALS(start + Milliseconds(1000), net().now());
+ ASSERT_NOT_EQUALS(ErrorCodes::OK, statusPropagated);
+ // Reply with a successful response.
+ StatusWith<RemoteCommandResponse> responseStatus(RemoteCommandResponse{});
+ net().scheduleResponse(noi, net().now(), responseStatus);
+ net().runReadyNetworkOperations();
+ net().exitNetwork();
+ ASSERT_EQUALS(ErrorCodes::OK, statusPropagated);
+ ASSERT_EQUALS(start + Milliseconds(1000), net().now());
+}
+
+
} // namespace
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index d57f2e15434..f427bdd40b4 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -439,14 +439,12 @@ COMMON_EXECUTOR_TEST(RemoteCommandWithTimeout) {
const Date_t startTime = net->now();
NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
net->scheduleResponse(
- noi,
- startTime + Milliseconds(2),
- TaskExecutor::ResponseStatus(ErrorCodes::ExceededTimeLimit, "I took too long"));
+ noi, startTime + Milliseconds(2), TaskExecutor::ResponseStatus(RemoteCommandResponse{}));
net->runUntil(startTime + Milliseconds(2));
ASSERT_EQUALS(startTime + Milliseconds(2), net->now());
net->exitNetwork();
executor.wait(cbHandle);
- ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
+ ASSERT_EQUALS(ErrorCodes::NetworkTimeout, status);
}
COMMON_EXECUTOR_TEST(CallbackHandleComparison) {