diff options
author | Scott Hernandez <scotthernandez@gmail.com> | 2014-07-02 15:46:35 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@gmail.com> | 2014-07-17 16:09:01 -0400 |
commit | ae0d61700c0e7f0d6c236743b9605ea97a1fb55d (patch) | |
tree | 08e0b4f830af3d736abb5bf1a62a87149e72cb4b /src | |
parent | 48e603792832e6d4914fdb1285c16ea745390c46 (diff) | |
download | mongo-ae0d61700c0e7f0d6c236743b9605ea97a1fb55d.tar.gz |
SERVER-14250: add timeout to replication executor
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/repl/network_interface_impl.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/network_interface_mock.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/network_interface_mock.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor_test.cpp | 25 |
6 files changed, 109 insertions, 4 deletions
diff --git a/src/mongo/db/repl/network_interface_impl.cpp b/src/mongo/db/repl/network_interface_impl.cpp index 2509d2d5dfa..ef14ea11b97 100644 --- a/src/mongo/db/repl/network_interface_impl.cpp +++ b/src/mongo/db/repl/network_interface_impl.cpp @@ -45,12 +45,38 @@ namespace repl { return curTimeMillis64(); } + namespace { + // Duplicated in mock impl + StatusWith<int> getTimeoutMillis(Date_t expDate) { + // check for timeout + int timeout = 0; + if (expDate != ReplicationExecutor::kNoExpirationDate) { + Date_t nowDate = curTimeMillis64(); + timeout = expDate >= nowDate ? expDate - nowDate : + ReplicationExecutor::kNoTimeout.total_milliseconds(); + if (timeout < 0 ) { + return StatusWith<int>(ErrorCodes::ExceededTimeLimit, + str::stream() << "Went to run command," + " but it was too late. Expiration was set to " + << expDate); + } + } + return StatusWith<int>(timeout); + } + } //namespace + StatusWith<BSONObj> NetworkInterfaceImpl::runCommand( const ReplicationExecutor::RemoteCommandRequest& request) { try { BSONObj output; - ScopedDbConnection conn(request.target.toString()); + + StatusWith<int> timeoutStatus = getTimeoutMillis(request.expirationDate); + if (!timeoutStatus.isOK()) + return StatusWith<BSONObj>(timeoutStatus.getStatus()); + + int timeout = timeoutStatus.getValue(); + ScopedDbConnection conn(request.target.toString(), timeout); conn->runCommand(request.dbname, request.cmdObj, output); conn.done(); return StatusWith<BSONObj>(output); diff --git a/src/mongo/db/repl/network_interface_mock.cpp b/src/mongo/db/repl/network_interface_mock.cpp index a9621842695..6edb99ad18e 100644 --- a/src/mongo/db/repl/network_interface_mock.cpp +++ b/src/mongo/db/repl/network_interface_mock.cpp @@ -57,8 +57,36 @@ namespace repl { return curTimeMillis64(); } + namespace { + // Duplicated in real impl + StatusWith<int> getTimeoutMillis(Date_t expDate) { + // check for timeout + int timeout = 0; + if (expDate != ReplicationExecutor::kNoExpirationDate) { + Date_t nowDate = curTimeMillis64(); + timeout = expDate >= nowDate ? expDate - nowDate : + ReplicationExecutor::kNoTimeout.total_milliseconds(); + if (timeout < 0 ) { + return StatusWith<int>(ErrorCodes::ExceededTimeLimit, + str::stream() << "Went to run command," + " but it was too late. Expiration was set to " + << expDate); + } + } + return StatusWith<int>(timeout); + } + } //namespace + StatusWith<BSONObj> NetworkInterfaceMock::runCommand( const ReplicationExecutor::RemoteCommandRequest& request) { + if (_simulatedNetworkLatencyMillis) { + sleepmillis(_simulatedNetworkLatencyMillis); + } + + StatusWith<int> toStatus = getTimeoutMillis(request.expirationDate); + if (!toStatus.isOK()) + return StatusWith<BSONObj>(toStatus.getStatus()); + return mapFindWithDefault( _responses, request, @@ -82,5 +110,9 @@ namespace repl { return _responses.insert(std::make_pair(request, response)).second; } + void NetworkInterfaceMock::simulatedNetworkLatency(int millis) { + _simulatedNetworkLatencyMillis = millis; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/network_interface_mock.h b/src/mongo/db/repl/network_interface_mock.h index 3d2527b0ade..8aca1f118f9 100644 --- a/src/mongo/db/repl/network_interface_mock.h +++ b/src/mongo/db/repl/network_interface_mock.h @@ -37,7 +37,7 @@ namespace repl { class NetworkInterfaceMock : public ReplicationExecutor::NetworkInterface { public: - NetworkInterfaceMock() {} + NetworkInterfaceMock() : _simulatedNetworkLatencyMillis(0) {} virtual ~NetworkInterfaceMock() {} virtual Date_t now(); virtual StatusWith<BSONObj> runCommand( @@ -45,13 +45,23 @@ namespace repl { virtual void runCallbackWithGlobalExclusiveLock( const stdx::function<void ()>& callback); + /** + * Add a response (StatusWith<BSONObj>) for this mock to return for a given request. + * For each request, the mock will return the corresponding response for all future calls. + */ bool addResponse(const ReplicationExecutor::RemoteCommandRequest& request, const StatusWith<BSONObj>& response); + /** + * Network latency added for each remote command, defaults to 0. + */ + void simulatedNetworkLatency(int millis); + private: typedef std::map<ReplicationExecutor::RemoteCommandRequest, StatusWith<BSONObj> > RequestResponseMap; RequestResponseMap _responses; + int _simulatedNetworkLatencyMillis; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index ec01b9b46c9..541d50013e4 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -42,6 +42,9 @@ namespace { stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn); } // namespace + const ReplicationExecutor::Milliseconds ReplicationExecutor::kNoTimeout(-1); + const Date_t ReplicationExecutor::kNoExpirationDate(-1); + ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface) : _networkInterface(netInterface), _totalEventWaiters(0), @@ -441,10 +444,14 @@ namespace { ReplicationExecutor::RemoteCommandRequest::RemoteCommandRequest( const HostAndPort& theTarget, const std::string& theDbName, - const BSONObj& theCmdObj) : + const BSONObj& theCmdObj, + const Milliseconds timeoutMillis) : target(theTarget), dbname(theDbName), cmdObj(theCmdObj) { + expirationDate = timeoutMillis == kNoTimeout ? kNoExpirationDate : + Date_t(curTimeMillis64() + + timeoutMillis.total_milliseconds()); } ReplicationExecutor::RemoteCommandCallbackData::RemoteCommandCallbackData( diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index c973105512b..41ea9a2c158 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -111,6 +111,9 @@ namespace repl { struct RemoteCommandCallbackData; struct RemoteCommandRequest; + static const Milliseconds kNoTimeout; + static const Date_t kNoExpirationDate; + /** * Type of a regular callback function. * @@ -411,11 +414,13 @@ namespace repl { RemoteCommandRequest(); RemoteCommandRequest(const HostAndPort& theTarget, const std::string& theDbName, - const BSONObj& theCmdObj); + const BSONObj& theCmdObj, + const Milliseconds timeoutMillis = kNoTimeout); HostAndPort target; std::string dbname; BSONObj cmdObj; + Date_t expirationDate; }; struct ReplicationExecutor::RemoteCommandCallbackData { diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index 0532807461c..6eb46063390 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -368,6 +368,31 @@ namespace { ASSERT_OK(status1); } + TEST(ReplicationExecutor, RemoteCommandWithTimeout) { + NetworkInterfaceMock* net = new NetworkInterfaceMock; + net->simulatedNetworkLatency(5); + ReplicationExecutor executor(net); + Status status(ErrorCodes::InternalError, ""); + const ReplicationExecutor::RemoteCommandRequest request( + HostAndPort("lazy", 27017), + "admin", + BSON("sleep" << 1), + ReplicationExecutor::Milliseconds(1)); + ReplicationExecutor::CallbackHandle cbHandle = unittest::assertGet( + executor.scheduleRemoteCommand( + request, + stdx::bind(setStatusOnRemoteCommandCompletion, + stdx::placeholders::_1, + request, + &status))); + sleepmillis(2); + boost::thread executorThread(stdx::bind(&ReplicationExecutor::run, &executor)); + executor.wait(cbHandle); + executor.shutdown(); + executorThread.join(); + ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status); + } + } // namespace } // namespace repl } // namespace mongo |