summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2014-07-02 15:46:35 -0400
committerScott Hernandez <scotthernandez@gmail.com>2014-07-17 16:09:01 -0400
commitae0d61700c0e7f0d6c236743b9605ea97a1fb55d (patch)
tree08e0b4f830af3d736abb5bf1a62a87149e72cb4b /src
parent48e603792832e6d4914fdb1285c16ea745390c46 (diff)
downloadmongo-ae0d61700c0e7f0d6c236743b9605ea97a1fb55d.tar.gz
SERVER-14250: add timeout to replication executor
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/network_interface_impl.cpp28
-rw-r--r--src/mongo/db/repl/network_interface_mock.cpp32
-rw-r--r--src/mongo/db/repl/network_interface_mock.h12
-rw-r--r--src/mongo/db/repl/replication_executor.cpp9
-rw-r--r--src/mongo/db/repl/replication_executor.h7
-rw-r--r--src/mongo/db/repl/replication_executor_test.cpp25
6 files changed, 109 insertions, 4 deletions
diff --git a/src/mongo/db/repl/network_interface_impl.cpp b/src/mongo/db/repl/network_interface_impl.cpp
index 2509d2d5dfa..ef14ea11b97 100644
--- a/src/mongo/db/repl/network_interface_impl.cpp
+++ b/src/mongo/db/repl/network_interface_impl.cpp
@@ -45,12 +45,38 @@ namespace repl {
return curTimeMillis64();
}
+ namespace {
+ // Duplicated in mock impl
+ StatusWith<int> getTimeoutMillis(Date_t expDate) {
+ // check for timeout
+ int timeout = 0;
+ if (expDate != ReplicationExecutor::kNoExpirationDate) {
+ Date_t nowDate = curTimeMillis64();
+ timeout = expDate >= nowDate ? expDate - nowDate :
+ ReplicationExecutor::kNoTimeout.total_milliseconds();
+ if (timeout < 0 ) {
+ return StatusWith<int>(ErrorCodes::ExceededTimeLimit,
+ str::stream() << "Went to run command,"
+ " but it was too late. Expiration was set to "
+ << expDate);
+ }
+ }
+ return StatusWith<int>(timeout);
+ }
+ } //namespace
+
StatusWith<BSONObj> NetworkInterfaceImpl::runCommand(
const ReplicationExecutor::RemoteCommandRequest& request) {
try {
BSONObj output;
- ScopedDbConnection conn(request.target.toString());
+
+ StatusWith<int> timeoutStatus = getTimeoutMillis(request.expirationDate);
+ if (!timeoutStatus.isOK())
+ return StatusWith<BSONObj>(timeoutStatus.getStatus());
+
+ int timeout = timeoutStatus.getValue();
+ ScopedDbConnection conn(request.target.toString(), timeout);
conn->runCommand(request.dbname, request.cmdObj, output);
conn.done();
return StatusWith<BSONObj>(output);
diff --git a/src/mongo/db/repl/network_interface_mock.cpp b/src/mongo/db/repl/network_interface_mock.cpp
index a9621842695..6edb99ad18e 100644
--- a/src/mongo/db/repl/network_interface_mock.cpp
+++ b/src/mongo/db/repl/network_interface_mock.cpp
@@ -57,8 +57,36 @@ namespace repl {
return curTimeMillis64();
}
+ namespace {
+ // Duplicated in real impl
+ StatusWith<int> getTimeoutMillis(Date_t expDate) {
+ // check for timeout
+ int timeout = 0;
+ if (expDate != ReplicationExecutor::kNoExpirationDate) {
+ Date_t nowDate = curTimeMillis64();
+ timeout = expDate >= nowDate ? expDate - nowDate :
+ ReplicationExecutor::kNoTimeout.total_milliseconds();
+ if (timeout < 0 ) {
+ return StatusWith<int>(ErrorCodes::ExceededTimeLimit,
+ str::stream() << "Went to run command,"
+ " but it was too late. Expiration was set to "
+ << expDate);
+ }
+ }
+ return StatusWith<int>(timeout);
+ }
+ } //namespace
+
StatusWith<BSONObj> NetworkInterfaceMock::runCommand(
const ReplicationExecutor::RemoteCommandRequest& request) {
+ if (_simulatedNetworkLatencyMillis) {
+ sleepmillis(_simulatedNetworkLatencyMillis);
+ }
+
+ StatusWith<int> toStatus = getTimeoutMillis(request.expirationDate);
+ if (!toStatus.isOK())
+ return StatusWith<BSONObj>(toStatus.getStatus());
+
return mapFindWithDefault(
_responses,
request,
@@ -82,5 +110,9 @@ namespace repl {
return _responses.insert(std::make_pair(request, response)).second;
}
+ void NetworkInterfaceMock::simulatedNetworkLatency(int millis) {
+ _simulatedNetworkLatencyMillis = millis;
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/network_interface_mock.h b/src/mongo/db/repl/network_interface_mock.h
index 3d2527b0ade..8aca1f118f9 100644
--- a/src/mongo/db/repl/network_interface_mock.h
+++ b/src/mongo/db/repl/network_interface_mock.h
@@ -37,7 +37,7 @@ namespace repl {
class NetworkInterfaceMock : public ReplicationExecutor::NetworkInterface {
public:
- NetworkInterfaceMock() {}
+ NetworkInterfaceMock() : _simulatedNetworkLatencyMillis(0) {}
virtual ~NetworkInterfaceMock() {}
virtual Date_t now();
virtual StatusWith<BSONObj> runCommand(
@@ -45,13 +45,23 @@ namespace repl {
virtual void runCallbackWithGlobalExclusiveLock(
const stdx::function<void ()>& callback);
+ /**
+ * Add a response (StatusWith<BSONObj>) for this mock to return for a given request.
+ * For each request, the mock will return the corresponding response for all future calls.
+ */
bool addResponse(const ReplicationExecutor::RemoteCommandRequest& request,
const StatusWith<BSONObj>& response);
+ /**
+ * Network latency added for each remote command, defaults to 0.
+ */
+ void simulatedNetworkLatency(int millis);
+
private:
typedef std::map<ReplicationExecutor::RemoteCommandRequest,
StatusWith<BSONObj> > RequestResponseMap;
RequestResponseMap _responses;
+ int _simulatedNetworkLatencyMillis;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp
index ec01b9b46c9..541d50013e4 100644
--- a/src/mongo/db/repl/replication_executor.cpp
+++ b/src/mongo/db/repl/replication_executor.cpp
@@ -42,6 +42,9 @@ namespace {
stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn);
} // namespace
+ const ReplicationExecutor::Milliseconds ReplicationExecutor::kNoTimeout(-1);
+ const Date_t ReplicationExecutor::kNoExpirationDate(-1);
+
ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface) :
_networkInterface(netInterface),
_totalEventWaiters(0),
@@ -441,10 +444,14 @@ namespace {
ReplicationExecutor::RemoteCommandRequest::RemoteCommandRequest(
const HostAndPort& theTarget,
const std::string& theDbName,
- const BSONObj& theCmdObj) :
+ const BSONObj& theCmdObj,
+ const Milliseconds timeoutMillis) :
target(theTarget),
dbname(theDbName),
cmdObj(theCmdObj) {
+ expirationDate = timeoutMillis == kNoTimeout ? kNoExpirationDate :
+ Date_t(curTimeMillis64() +
+ timeoutMillis.total_milliseconds());
}
ReplicationExecutor::RemoteCommandCallbackData::RemoteCommandCallbackData(
diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h
index c973105512b..41ea9a2c158 100644
--- a/src/mongo/db/repl/replication_executor.h
+++ b/src/mongo/db/repl/replication_executor.h
@@ -111,6 +111,9 @@ namespace repl {
struct RemoteCommandCallbackData;
struct RemoteCommandRequest;
+ static const Milliseconds kNoTimeout;
+ static const Date_t kNoExpirationDate;
+
/**
* Type of a regular callback function.
*
@@ -411,11 +414,13 @@ namespace repl {
RemoteCommandRequest();
RemoteCommandRequest(const HostAndPort& theTarget,
const std::string& theDbName,
- const BSONObj& theCmdObj);
+ const BSONObj& theCmdObj,
+ const Milliseconds timeoutMillis = kNoTimeout);
HostAndPort target;
std::string dbname;
BSONObj cmdObj;
+ Date_t expirationDate;
};
struct ReplicationExecutor::RemoteCommandCallbackData {
diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp
index 0532807461c..6eb46063390 100644
--- a/src/mongo/db/repl/replication_executor_test.cpp
+++ b/src/mongo/db/repl/replication_executor_test.cpp
@@ -368,6 +368,31 @@ namespace {
ASSERT_OK(status1);
}
+ TEST(ReplicationExecutor, RemoteCommandWithTimeout) {
+ NetworkInterfaceMock* net = new NetworkInterfaceMock;
+ net->simulatedNetworkLatency(5);
+ ReplicationExecutor executor(net);
+ Status status(ErrorCodes::InternalError, "");
+ const ReplicationExecutor::RemoteCommandRequest request(
+ HostAndPort("lazy", 27017),
+ "admin",
+ BSON("sleep" << 1),
+ ReplicationExecutor::Milliseconds(1));
+ ReplicationExecutor::CallbackHandle cbHandle = unittest::assertGet(
+ executor.scheduleRemoteCommand(
+ request,
+ stdx::bind(setStatusOnRemoteCommandCompletion,
+ stdx::placeholders::_1,
+ request,
+ &status)));
+ sleepmillis(2);
+ boost::thread executorThread(stdx::bind(&ReplicationExecutor::run, &executor));
+ executor.wait(cbHandle);
+ executor.shutdown();
+ executorThread.join();
+ ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
+ }
+
} // namespace
} // namespace repl
} // namespace mongo