diff options
44 files changed, 511 insertions, 379 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 6aaac5cdef3..d82ec7e909b 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -46,6 +46,19 @@ env.Library( ] ) +env.Library( + target='remote_command_executor', + source=[ + 'remote_command_executor.cpp', + 'remote_command_executor_impl.cpp', + ], + LIBDEPS=[ + 'connection_pool', + '$BUILD_DIR/mongo/db/query/getmore_request', + '$BUILD_DIR/mongo/db/query/lite_parsed_query', + ] +) + env.CppUnitTest("replica_set_monitor_test", ["replica_set_monitor_test.cpp"], LIBDEPS=["clientdriver"]) diff --git a/src/mongo/client/remote_command_executor.cpp b/src/mongo/client/remote_command_executor.cpp new file mode 100644 index 00000000000..23688c55827 --- /dev/null +++ b/src/mongo/client/remote_command_executor.cpp @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/remote_command_executor.h" + +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + + std::string RemoteCommandRequest::toString() const { + str::stream out; + out << "RemoteCommand -- target:" << target.toString() << " db:" << dbname; + + if (expirationDate != kNoExpirationDate) { + out << " expDate:" << expirationDate.toString(); + } + + out << " cmd:" << cmdObj.toString(); + return out; + } + + std::string RemoteCommandResponse::toString() const { + str::stream out; + out << "RemoteResponse -- " << " cmd:" << data.toString(); + + return out; + } + +} // namespace mongo diff --git a/src/mongo/client/remote_command_executor.h b/src/mongo/client/remote_command_executor.h new file mode 100644 index 00000000000..9eb7f5cf57c --- /dev/null +++ b/src/mongo/client/remote_command_executor.h @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/disallow_copying.h" +#include "mongo/db/jsobj.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { + + template<typename T> class StatusWith; + + + const Milliseconds kNoTimeout(-1); + const Date_t kNoExpirationDate(-1); + + + /** + * Type of object describing a command to execute against a remote MongoDB node. + */ + struct RemoteCommandRequest { + RemoteCommandRequest() : timeout(kNoTimeout), + expirationDate(kNoExpirationDate) { + + } + + RemoteCommandRequest(const HostAndPort& theTarget, + const std::string& theDbName, + const BSONObj& theCmdObj, + const Milliseconds timeoutMillis = kNoTimeout) + : target(theTarget), + dbname(theDbName), + cmdObj(theCmdObj), + timeout(timeoutMillis) { + + if (timeoutMillis == kNoTimeout) { + expirationDate = kNoExpirationDate; + } + } + + std::string toString() const; + + HostAndPort target; + std::string dbname; + BSONObj cmdObj; + Milliseconds timeout; + + // Deadline by when the request must be completed + Date_t expirationDate; + }; + + + /** + * Type of object describing the response of previously sent RemoteCommandRequest. + */ + struct RemoteCommandResponse { + RemoteCommandResponse() : data(), + elapsedMillis(Milliseconds(0)) { + + } + + RemoteCommandResponse(BSONObj obj, Milliseconds millis) : data(obj), + elapsedMillis(millis) { + + } + + std::string toString() const; + + BSONObj data; + Milliseconds elapsedMillis; + }; + + + /** + * Abstract interface used for executing commands against a MongoDB instance and retrieving + * results. It abstracts the logic of managing connections and turns the remote instance into + * a stateless request-response service. + */ + class RemoteCommandExecutor { + MONGO_DISALLOW_COPYING(RemoteCommandExecutor); + public: + virtual ~RemoteCommandExecutor() = default; + + /** + * Synchronously invokes the command described by "request" and returns the server's + * response or any status. + */ + virtual StatusWith<RemoteCommandResponse> runCommand( + const RemoteCommandRequest& request) = 0; + + protected: + RemoteCommandExecutor() = default; + }; + +} // namespace mongo diff --git a/src/mongo/db/repl/network_interface_impl_downconvert_find_getmore.cpp b/src/mongo/client/remote_command_executor_impl.cpp index 280ee8b6a9c..3bac4886a5e 100644 --- a/src/mongo/db/repl/network_interface_impl_downconvert_find_getmore.cpp +++ b/src/mongo/client/remote_command_executor_impl.cpp @@ -28,22 +28,46 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/network_interface_impl_downconvert_find_getmore.h" +#include "mongo/client/remote_command_executor_impl.h" -#include <memory> - -#include "mongo/client/dbclientinterface.h" #include "mongo/db/commands.h" #include "mongo/db/commands/cursor_responses.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/getmore_request.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/base/status_with.h" +#include "mongo/util/assert_util.h" namespace mongo { -namespace repl { - namespace { /** + * Calculates the timeout for a network operation expiring at "expDate", given that it is now + * "nowDate". + * + * Returns 0 to indicate no expiration date, a number of milliseconds until "expDate", or + * ErrorCodes::ExceededTimeLimit if "expDate" is not later than "nowDate". + * + * TODO: Change return type to StatusWith<Milliseconds> once Milliseconds supports default + * construction or StatusWith<T> supports not constructing T when the result is a non-OK + * status. + */ + StatusWith<int64_t> getTimeoutMillis(const Date_t expDate, const Date_t nowDate) { + if (expDate == kNoExpirationDate) { + return StatusWith<int64_t>(0); + } + + if (expDate <= nowDate) { + return StatusWith<int64_t>( + ErrorCodes::ExceededTimeLimit, + str::stream() << "Went to run command, but it was too late. " + "Expiration was set to " << dateToISOStringUTC(expDate)); + } + + return StatusWith<int64_t>(expDate.asInt64() - nowDate.asInt64()); + } + + /** * Updates command output document with status. */ BSONObj getCommandResultFromStatus(const Status& status) { @@ -61,30 +85,37 @@ namespace { if (!cursor.peekError(&error)) { return Status::OK(); } + BSONElement e = error.getField("code"); - return Status(e.isNumber() ? ErrorCodes::fromInt(e.numberInt()) : ErrorCodes::UnknownError, + return Status(e.isNumber() ? ErrorCodes::fromInt(e.numberInt()) : + ErrorCodes::UnknownError, getErrField(error).valuestrsafe()); } -} // namespace - + /** + * Downconverts the specified find command to a find protocol operation and sends it to the + * server on the specified connection. + */ Status runDownconvertedFindCommand(DBClientConnection* conn, const std::string& dbname, const BSONObj& cmdObj, BSONObj* output) { - NamespaceString nss(dbname, cmdObj.firstElement().String()); + + const NamespaceString nss(dbname, cmdObj.firstElement().String()); const std::string& ns = nss.ns(); + std::unique_ptr<LiteParsedQuery> lpq; { LiteParsedQuery* lpqRaw; - // It is a little heavy handed to use LiteParsedQuery to convert the command - // object to query() arguments but we get validation and consistent behavior - // with the find command implementation on the remote server. + // It is a little heavy handed to use LiteParsedQuery to convert the command object to + // query() arguments but we get validation and consistent behavior with the find + // command implementation on the remote server. Status status = LiteParsedQuery::make(ns, cmdObj, false, &lpqRaw); if (!status.isOK()) { *output = getCommandResultFromStatus(status); return status; } + lpq.reset(lpqRaw); } @@ -115,6 +146,7 @@ namespace { while (cursor->moreInCurrentBatch()) { batch.append(cursor->next()); } + BSONObjBuilder result; appendCursorResponseObject(cursor->getCursorId(), ns, batch.arr(), &result); Command::appendCommandStatus(result, Status::OK()); @@ -122,16 +154,22 @@ namespace { return Status::OK(); } + /** + * Downconverts the specified getMore command to legacy getMore operation and sends it to the + * server on the specified connection. + */ Status runDownconvertedGetMoreCommand(DBClientConnection* conn, const std::string& dbname, const BSONObj& cmdObj, BSONObj* output) { + StatusWith<GetMoreRequest> parseResult = GetMoreRequest::parseFromBSON(dbname, cmdObj); if (!parseResult.isOK()) { const Status& status = parseResult.getStatus(); *output = getCommandResultFromStatus(status); return status; } + const GetMoreRequest& req = parseResult.getValue(); const std::string& ns = req.nss.ns(); @@ -148,6 +186,7 @@ namespace { while (cursor->moreInCurrentBatch()) { batch.append(cursor->next()); } + BSONObjBuilder result; appendGetMoreResponseObject(cursor->getCursorId(), ns, batch.arr(), &result); Command::appendCommandStatus(result, Status::OK()); @@ -155,5 +194,90 @@ namespace { return Status::OK(); } -} // namespace repl +} //namespace + + RemoteCommandExecutorImpl::RemoteCommandExecutorImpl(int messagingPortTags) + : _connPool(messagingPortTags), + _shutDown(false) { + + } + + RemoteCommandExecutorImpl::~RemoteCommandExecutorImpl() { + invariant(_shutDown); + } + + void RemoteCommandExecutorImpl::shutdown() { + if (_shutDown) { + return; + } + + _shutDown = true; + _connPool.closeAllInUseConnections(); + } + + StatusWith<RemoteCommandResponse> RemoteCommandExecutorImpl::runCommand( + const RemoteCommandRequest& request) { + try { + BSONObj output; + + const Date_t requestStartDate = curTimeMillis64(); + StatusWith<int64_t> timeoutMillis = getTimeoutMillis(request.expirationDate, + requestStartDate); + if (!timeoutMillis.isOK()) { + return StatusWith<RemoteCommandResponse>(timeoutMillis.getStatus()); + } + + ConnectionPool::ConnectionPtr conn(&_connPool, + request.target, + requestStartDate, + Milliseconds(timeoutMillis.getValue())); + + bool ok = conn.get()->runCommand(request.dbname, request.cmdObj, output); + + // If remote server does not support either find or getMore commands, down convert + // to using DBClientInterface::query()/getMore(). + // TODO: Perform down conversion based on wire protocol version. + // Refer to the down conversion implementation in the shell. + if (!ok && + getStatusFromCommandResult(output).code() == ErrorCodes::CommandNotFound) { + + // 'commandName' will be an empty string if the command object is an empty BSON + // document. + StringData commandName = request.cmdObj.firstElement().fieldNameStringData(); + if (commandName == "find") { + runDownconvertedFindCommand( + conn.get(), + request.dbname, + request.cmdObj, + &output); + } + else if (commandName == "getMore") { + runDownconvertedGetMoreCommand( + conn.get(), + request.dbname, + request.cmdObj, + &output); + } + } + + const Date_t requestFinishDate = curTimeMillis64(); + conn.done(requestFinishDate); + + return StatusWith<RemoteCommandResponse>( + RemoteCommandResponse(output, + Milliseconds(requestFinishDate - requestStartDate))); + } + catch (const DBException& ex) { + return StatusWith<RemoteCommandResponse>(ex.toStatus()); + } + catch (const std::exception& ex) { + return StatusWith<RemoteCommandResponse>( + ErrorCodes::UnknownError, + str::stream() << "Sending command " << request.cmdObj << " on database " + << request.dbname << " over network to " + << request.target.toString() << " received exception " + << ex.what()); + } + } + } // namespace mongo diff --git a/src/mongo/db/repl/network_interface_impl_downconvert_find_getmore.h b/src/mongo/client/remote_command_executor_impl.h index 38dfe0ad8ea..84852341276 100644 --- a/src/mongo/db/repl/network_interface_impl_downconvert_find_getmore.h +++ b/src/mongo/client/remote_command_executor_impl.h @@ -28,34 +28,29 @@ #pragma once -#include <string> - -#include "mongo/base/status.h" +#include "mongo/client/connection_pool.h" +#include "mongo/client/remote_command_executor.h" namespace mongo { - class BSONObj; - class DBClientConnection; - -namespace repl { - - /** - * Run find command as a downconverted DBClientInterface::query() on remote servers - * that do not support the find command. - */ - Status runDownconvertedFindCommand(DBClientConnection* conn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObj* output); - - /** - * Run getMore command as a downconverted DBClientInterface::getMore() on remote servers - * that do not support the find command. - */ - Status runDownconvertedGetMoreCommand(DBClientConnection* conn, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObj* output); - -} // namespace repl + class RemoteCommandExecutorImpl : public RemoteCommandExecutor { + public: + RemoteCommandExecutorImpl(int messagingPortTags); + virtual ~RemoteCommandExecutorImpl(); + + /** + * Closes all underlying connections. Must be called before the destructor runs. + */ + void shutdown(); + + virtual StatusWith<RemoteCommandResponse> runCommand(const RemoteCommandRequest& request); + + private: + // The connection pool on which to send requests + ConnectionPool _connPool; + + // Whether shutdown has been called + bool _shutDown; + }; + } // namespace mongo diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index dcb2084f1ea..fbb37662882 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -19,16 +19,13 @@ env.Library( target='network_interface_impl', source=[ 'network_interface_impl.cpp', - 'network_interface_impl_downconvert_find_getmore.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base/base', '$BUILD_DIR/mongo/bson/bson', - '$BUILD_DIR/mongo/client/connection_pool', + '$BUILD_DIR/mongo/client/remote_command_executor', '$BUILD_DIR/mongo/db/coredb', '$BUILD_DIR/mongo/db/common', - '$BUILD_DIR/mongo/db/query/getmore_request', - '$BUILD_DIR/mongo/db/query/lite_parsed_query', ]) env.Library( diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 5ad6bcaaf39..45d1ca8b29d 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -143,7 +143,7 @@ namespace repl { const BSONObj& obj) { auto net = getNet(); ReplicationExecutor::Milliseconds millis(0); - ReplicationExecutor::RemoteCommandResponse response(obj, millis); + RemoteCommandResponse response(obj, millis); ReplicationExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(noi, net->now(), responseStatus); } diff --git a/src/mongo/db/repl/check_quorum_for_config_change.cpp b/src/mongo/db/repl/check_quorum_for_config_change.cpp index 7064b9a473d..7ddc31f3b43 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change.cpp @@ -70,11 +70,11 @@ namespace repl { QuorumChecker::~QuorumChecker() {} - std::vector<ReplicationExecutor::RemoteCommandRequest> QuorumChecker::getRequests() const { + std::vector<RemoteCommandRequest> QuorumChecker::getRequests() const { const bool isInitialConfig = _rsConfig->getConfigVersion() == 1; const MemberConfig& myConfig = _rsConfig->getMemberAt(_myIndex); - std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + std::vector<RemoteCommandRequest> requests; if (hasReceivedSufficientResponses()) { return requests; } @@ -97,7 +97,7 @@ namespace repl { // No need to check self for liveness or unreadiness. continue; } - requests.push_back(ReplicationExecutor::RemoteCommandRequest( + requests.push_back(RemoteCommandRequest( _rsConfig->getMemberAt(i).getHostAndPort(), "admin", hbRequest, @@ -108,7 +108,7 @@ namespace repl { } void QuorumChecker::processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { _tabulateHeartbeatResponse(request, response); @@ -178,7 +178,7 @@ namespace repl { } void QuorumChecker::_tabulateHeartbeatResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { ++_numResponses; diff --git a/src/mongo/db/repl/check_quorum_for_config_change.h b/src/mongo/db/repl/check_quorum_for_config_change.h index 396ac4dea39..c38a49106be 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change.h +++ b/src/mongo/db/repl/check_quorum_for_config_change.h @@ -59,9 +59,9 @@ namespace repl { QuorumChecker(const ReplicaSetConfig* rsConfig, int myIndex); virtual ~QuorumChecker(); - virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual std::vector<RemoteCommandRequest> getRequests() const; virtual void processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response); virtual bool hasReceivedSufficientResponses() const; @@ -80,9 +80,8 @@ namespace repl { /** * Updates the QuorumChecker state based on the data from a single heartbeat response. */ - void _tabulateHeartbeatResponse( - const ReplicationExecutor::RemoteCommandRequest& request, - const ResponseStatus& response); + void _tabulateHeartbeatResponse(const RemoteCommandRequest& request, + const ResponseStatus& response); // Pointer to the replica set configuration for which we're checking quorum. const ReplicaSetConfig* const _rsConfig; diff --git a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp index 49d87e9cf4f..677141c727e 100644 --- a/src/mongo/db/repl/check_quorum_for_config_change_test.cpp +++ b/src/mongo/db/repl/check_quorum_for_config_change_test.cpp @@ -62,8 +62,6 @@ namespace mongo { namespace repl { namespace { - typedef ReplicationExecutor::RemoteCommandRequest RemoteCommandRequest; - class CheckQuorumTest : public mongo::unittest::Test { protected: CheckQuorumTest(); @@ -244,14 +242,14 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << "Already saw " << request.target.toString(); _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), Milliseconds(8)))); } _net->runUntil(startDate + 10); @@ -286,7 +284,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -299,7 +297,7 @@ namespace { else { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), Milliseconds(8)))); } } @@ -341,7 +339,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -349,14 +347,14 @@ namespace { if (request.target == HostAndPort("h4", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 0 << "mismatch" << true), Milliseconds(8)))); } else { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), Milliseconds(8)))); } } @@ -397,7 +395,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -405,7 +403,7 @@ namespace { if (request.target == HostAndPort("h5", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 0 << "set" << "rs0" << "v" << 1), @@ -414,7 +412,7 @@ namespace { else { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), Milliseconds(8)))); } } @@ -458,7 +456,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -466,7 +464,7 @@ namespace { if (request.target == HostAndPort("h5", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 0 << "set" << "rs0" << "v" << 1), @@ -514,7 +512,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -525,7 +523,7 @@ namespace { if (request.target == HostAndPort("h5", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( hbResp.toBSON(), Milliseconds(8)))); } @@ -566,7 +564,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -574,7 +572,7 @@ namespace { if (request.target == HostAndPort("h1", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 0 << "set" << "rs0" << "v" << 5), @@ -618,7 +616,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -626,7 +624,7 @@ namespace { if (request.target == HostAndPort("h2", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 0 << "mismatch" << true), Milliseconds(8)))); } @@ -672,7 +670,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -680,7 +678,7 @@ namespace { if (request.target == HostAndPort("h1", 1) || request.target == HostAndPort("h5", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), Milliseconds(8)))); } @@ -725,7 +723,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -733,7 +731,7 @@ namespace { if (request.target == HostAndPort("h5", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), Milliseconds(8)))); } @@ -774,7 +772,7 @@ namespace { _net->enterNetwork(); for (int i = 0; i < numCommandsExpected; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS("admin", request.dbname); ASSERT_EQUALS(hbRequest, request.cmdObj); ASSERT(seenHosts.insert(request.target).second) << @@ -782,7 +780,7 @@ namespace { if (request.target == HostAndPort("h1", 1) || request.target == HostAndPort("h2", 1)) { _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), Milliseconds(8)))); } diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 43f222dd01c..4d1964c59ae 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -52,8 +52,8 @@ namespace repl { using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>; using CallbackFn = ReplicationExecutor::CallbackFn; - using Request = ReplicationExecutor::RemoteCommandRequest; - using Response = ReplicationExecutor::RemoteCommandResponse; + using Request = RemoteCommandRequest; + using Response = RemoteCommandResponse; using CommandCallbackData = ReplicationExecutor::RemoteCommandCallbackData; // typedef void (*run_func)(); diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index a4fa9afbbd6..db8ed712737 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -85,7 +85,7 @@ namespace { NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); ReplicationExecutor::Milliseconds millis(0); - ReplicationExecutor::RemoteCommandResponse response(obj, millis); + RemoteCommandResponse response(obj, millis); ReplicationExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); } diff --git a/src/mongo/db/repl/elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index 640d01031fb..adb908a2306 100644 --- a/src/mongo/db/repl/elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -60,11 +60,11 @@ namespace repl { ElectCmdRunner::Algorithm::~Algorithm() {} - std::vector<ReplicationExecutor::RemoteCommandRequest> + std::vector<RemoteCommandRequest> ElectCmdRunner::Algorithm::getRequests() const { const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); - std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + std::vector<RemoteCommandRequest> requests; BSONObjBuilder electCmdBuilder; electCmdBuilder.append("replSetElect", 1); electCmdBuilder.append("set", _rsConfig.getReplSetName()); @@ -80,7 +80,7 @@ namespace repl { ++it) { invariant(*it != selfConfig.getHostAndPort()); - requests.push_back(ReplicationExecutor::RemoteCommandRequest( + requests.push_back(RemoteCommandRequest( *it, "admin", replSetElectCmd, @@ -107,7 +107,7 @@ namespace repl { } void ElectCmdRunner::Algorithm::processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { ++_actualResponses; diff --git a/src/mongo/db/repl/elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h index 3007f5dc2b5..d92ba9326a2 100644 --- a/src/mongo/db/repl/elect_cmd_runner.h +++ b/src/mongo/db/repl/elect_cmd_runner.h @@ -57,9 +57,9 @@ namespace repl { OID round); virtual ~Algorithm(); - virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual std::vector<RemoteCommandRequest> getRequests() const; virtual void processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response); virtual bool hasReceivedSufficientResponses() const; diff --git a/src/mongo/db/repl/elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index 983e39a3b1b..ef9751dc084 100644 --- a/src/mongo/db/repl/elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -48,8 +48,6 @@ namespace mongo { namespace repl { namespace { - typedef ReplicationExecutor::RemoteCommandRequest RemoteCommandRequest; - class ElectCmdRunnerTest : public mongo::unittest::Test { public: void startTest(ElectCmdRunner* electCmdRunner, @@ -202,7 +200,7 @@ namespace { ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); _net->scheduleResponse(noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1 << "vote" << 1 << "round" << 380865962699346850ll), diff --git a/src/mongo/db/repl/election_winner_declarer.cpp b/src/mongo/db/repl/election_winner_declarer.cpp index f56d1064c37..0172f61a49b 100644 --- a/src/mongo/db/repl/election_winner_declarer.cpp +++ b/src/mongo/db/repl/election_winner_declarer.cpp @@ -52,7 +52,7 @@ namespace repl { ElectionWinnerDeclarer::Algorithm::~Algorithm() {} - std::vector<ReplicationExecutor::RemoteCommandRequest> + std::vector<RemoteCommandRequest> ElectionWinnerDeclarer::Algorithm::getRequests() const { BSONObjBuilder declareElectionWinnerCmdBuilder; declareElectionWinnerCmdBuilder.append("replSetDeclareElectionWinner", 1); @@ -61,9 +61,9 @@ namespace repl { declareElectionWinnerCmdBuilder.append("term", _term); const BSONObj declareElectionWinnerCmd = declareElectionWinnerCmdBuilder.obj(); - std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + std::vector<RemoteCommandRequest> requests; for (const auto& target : _targets) { - requests.push_back(ReplicationExecutor::RemoteCommandRequest( + requests.push_back(RemoteCommandRequest( target, "admin", declareElectionWinnerCmd, @@ -74,7 +74,7 @@ namespace repl { } void ElectionWinnerDeclarer::Algorithm::processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { _responsesProcessed++; Status cmdResponseStatus = getStatusFromCommandResult(response.getValue().data); diff --git a/src/mongo/db/repl/election_winner_declarer.h b/src/mongo/db/repl/election_winner_declarer.h index 70da7f1ee25..16d5f04cc0a 100644 --- a/src/mongo/db/repl/election_winner_declarer.h +++ b/src/mongo/db/repl/election_winner_declarer.h @@ -58,9 +58,9 @@ namespace repl { long long term, const std::vector<HostAndPort>& targets); virtual ~Algorithm(); - virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual std::vector<RemoteCommandRequest> getRequests() const; virtual void processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response); virtual bool hasReceivedSufficientResponses() const; diff --git a/src/mongo/db/repl/election_winner_declarer_test.cpp b/src/mongo/db/repl/election_winner_declarer_test.cpp index 1f957fa3ae4..bf006dd4355 100644 --- a/src/mongo/db/repl/election_winner_declarer_test.cpp +++ b/src/mongo/db/repl/election_winner_declarer_test.cpp @@ -48,8 +48,6 @@ namespace { using unittest::assertGet; - using RemoteCommandRequest = ReplicationExecutor::RemoteCommandRequest; - bool stringContains(const std::string &haystack, const std::string& needle) { return haystack.find(needle) != std::string::npos; } diff --git a/src/mongo/db/repl/fetcher.cpp b/src/mongo/db/repl/fetcher.cpp index 18f927fd206..6ae2419d50d 100644 --- a/src/mongo/db/repl/fetcher.cpp +++ b/src/mongo/db/repl/fetcher.cpp @@ -224,7 +224,7 @@ namespace { StatusWith<ReplicationExecutor::CallbackHandle> scheduleResult = _executor->scheduleRemoteCommand( - ReplicationExecutor::RemoteCommandRequest(_source, _dbname, cmdObj), + RemoteCommandRequest(_source, _dbname, cmdObj), stdx::bind(&Fetcher::_callback, this, stdx::placeholders::_1, batchFieldName)); if (!scheduleResult.isOK()) { diff --git a/src/mongo/db/repl/fetcher_test.cpp b/src/mongo/db/repl/fetcher_test.cpp index 3fd8d6612ea..427f888ff50 100644 --- a/src/mongo/db/repl/fetcher_test.cpp +++ b/src/mongo/db/repl/fetcher_test.cpp @@ -110,7 +110,7 @@ namespace { NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); ReplicationExecutor::Milliseconds millis(0); - ReplicationExecutor::RemoteCommandResponse response(obj, millis); + RemoteCommandResponse response(obj, millis); ReplicationExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); } diff --git a/src/mongo/db/repl/freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index 2281d0b54ee..2c5ef4872b8 100644 --- a/src/mongo/db/repl/freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -79,7 +79,7 @@ namespace repl { FreshnessChecker::Algorithm::~Algorithm() {} - std::vector<ReplicationExecutor::RemoteCommandRequest> + std::vector<RemoteCommandRequest> FreshnessChecker::Algorithm::getRequests() const { const MemberConfig& selfConfig = _rsConfig.getMemberAt(_selfIndex); @@ -94,12 +94,12 @@ namespace repl { freshCmdBuilder.append("id", selfConfig.getId()); const BSONObj replSetFreshCmd = freshCmdBuilder.obj(); - std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + std::vector<RemoteCommandRequest> requests; for (std::vector<HostAndPort>::const_iterator it = _targets.begin(); it != _targets.end(); ++it) { invariant(*it != selfConfig.getHostAndPort()); - requests.push_back(ReplicationExecutor::RemoteCommandRequest( + requests.push_back(RemoteCommandRequest( *it, "admin", replSetFreshCmd, @@ -127,7 +127,7 @@ namespace repl { } void FreshnessChecker::Algorithm::processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { ++_responsesProcessed; bool votingMember = _isVotingMember(request.target); diff --git a/src/mongo/db/repl/freshness_checker.h b/src/mongo/db/repl/freshness_checker.h index 3570ee8b051..c2d6c99343b 100644 --- a/src/mongo/db/repl/freshness_checker.h +++ b/src/mongo/db/repl/freshness_checker.h @@ -64,9 +64,9 @@ namespace repl { int selfIndex, const std::vector<HostAndPort>& targets); virtual ~Algorithm(); - virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual std::vector<RemoteCommandRequest> getRequests() const; virtual void processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response); virtual bool hasReceivedSufficientResponses() const; ElectionAbortReason shouldAbortElection() const; diff --git a/src/mongo/db/repl/freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index 2a7480e8d56..08766cfd7ca 100644 --- a/src/mongo/db/repl/freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -51,8 +51,6 @@ namespace { using unittest::assertGet; - typedef ReplicationExecutor::RemoteCommandRequest RemoteCommandRequest; - bool stringContains(const std::string &haystack, const std::string& needle) { return haystack.find(needle) != std::string::npos; } @@ -189,7 +187,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << @@ -257,7 +255,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << @@ -303,7 +301,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << @@ -347,7 +345,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << @@ -394,7 +392,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << @@ -469,7 +467,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } @@ -530,7 +528,7 @@ namespace { _net->scheduleResponse( noi, startDate + 20, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } @@ -545,7 +543,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } @@ -609,7 +607,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } @@ -669,7 +667,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } @@ -733,7 +731,7 @@ namespace { _net->scheduleResponse( noi, startDate + 20, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } @@ -748,7 +746,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } @@ -816,7 +814,7 @@ namespace { _net->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } diff --git a/src/mongo/db/repl/network_interface_impl.cpp b/src/mongo/db/repl/network_interface_impl.cpp index 2b05ee39893..3dd08e51a14 100644 --- a/src/mongo/db/repl/network_interface_impl.cpp +++ b/src/mongo/db/repl/network_interface_impl.cpp @@ -38,7 +38,6 @@ #include "mongo/client/connection_pool.h" #include "mongo/db/client.h" #include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/network_interface_impl_downconvert_find_getmore.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" @@ -61,8 +60,9 @@ namespace { _lastFullUtilizationDate(), _isExecutorRunnable(false), _inShutdown(false), + _commandExec(kMessagingPortKeepOpen), _numActiveNetworkRequests(0) { - _connPool.reset(new ConnectionPool(kMessagingPortKeepOpen)); + } NetworkInterfaceImpl::~NetworkInterfaceImpl() { } @@ -126,7 +126,7 @@ namespace { ThreadList threadsToJoin; swap(threadsToJoin, _threads); lk.unlock(); - _connPool->closeAllInUseConnections(); + _commandExec.shutdown(); std::for_each(threadsToJoin.begin(), threadsToJoin.end(), stdx::bind(&boost::thread::join, stdx::placeholders::_1)); @@ -198,7 +198,7 @@ namespace { ++_numActiveNetworkRequests; --_numIdleThreads; lk.unlock(); - ResponseStatus result = _runCommand(todo.request); + ResponseStatus result = _commandExec.runCommand(todo.request); LOG(2) << "Network status of sending " << todo.request.cmdObj.firstElementFieldName() << " to " << todo.request.target << " was " << result.getStatus(); todo.onFinish(result); @@ -229,7 +229,7 @@ namespace { void NetworkInterfaceImpl::startCommand( const ReplicationExecutor::CallbackHandle& cbHandle, - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { LOG(2) << "Scheduling " << request.cmdObj.firstElementFieldName() << " to " << request.target; @@ -273,96 +273,6 @@ namespace { return curTimeMillis64(); } - namespace { - - /** - * Calculates the timeout for a network operation expiring at "expDate", given - * that it is now "nowDate". - * - * Returns 0 to indicate no expiration date, a number of milliseconds until "expDate", or - * ErrorCodes::ExceededTimeLimit if "expDate" is not later than "nowDate". - * - * TODO: Change return type to StatusWith<Milliseconds> once Milliseconds supports default - * construction or StatusWith<T> supports not constructing T when the result is a non-OK - * status. - */ - StatusWith<int64_t> getTimeoutMillis(const Date_t expDate, const Date_t nowDate) { - if (expDate == ReplicationExecutor::kNoExpirationDate) { - return StatusWith<int64_t>(0); - } - if (expDate <= nowDate) { - return StatusWith<int64_t>( - ErrorCodes::ExceededTimeLimit, - str::stream() << "Went to run command, but it was too late. " - "Expiration was set to " << dateToISOStringUTC(expDate)); - } - return StatusWith<int64_t>(expDate.asInt64() - nowDate.asInt64()); - } - - } //namespace - - ResponseStatus NetworkInterfaceImpl::_runCommand( - const ReplicationExecutor::RemoteCommandRequest& request) { - - try { - BSONObj output; - - const Date_t requestStartDate = now(); - StatusWith<int64_t> timeoutMillis = getTimeoutMillis(request.expirationDate, - requestStartDate); - if (!timeoutMillis.isOK()) { - return ResponseStatus(timeoutMillis.getStatus()); - } - - ConnectionPool::ConnectionPtr conn(_connPool.get(), - request.target, - requestStartDate, - Milliseconds(timeoutMillis.getValue())); - bool ok = conn.get()->runCommand(request.dbname, request.cmdObj, output); - - // If remote server does not support either find or getMore commands, down convert - // to using DBClientInterface::query()/getMore(). - // TODO: Perform down conversion based on wire protocol version. - // Refer to the down conversion implementation in the shell. - if (!ok && - getStatusFromCommandResult(output).code() == ErrorCodes::CommandNotFound) { - - // 'commandName' will be an empty string if the command object is an empty BSON - // document. - StringData commandName = request.cmdObj.firstElement().fieldNameStringData(); - if (commandName == "find") { - runDownconvertedFindCommand( - conn.get(), - request.dbname, - request.cmdObj, - &output); - } - else if (commandName == "getMore") { - runDownconvertedGetMoreCommand( - conn.get(), - request.dbname, - request.cmdObj, - &output); - } - } - const Date_t requestFinishDate = now(); - conn.done(requestFinishDate); - return ResponseStatus(Response(output, - Milliseconds(requestFinishDate - requestStartDate))); - } - catch (const DBException& ex) { - return ResponseStatus(ex.toStatus()); - } - catch (const std::exception& ex) { - return ResponseStatus( - ErrorCodes::UnknownError, - str::stream() << "Sending command " << request.cmdObj << " on database " - << request.dbname << " over network to " - << request.target.toString() << " received exception " - << ex.what()); - } - } - OperationContext* NetworkInterfaceImpl::createOperationContext() { Client::initThreadIfNotAlready(); return new OperationContextImpl(); diff --git a/src/mongo/db/repl/network_interface_impl.h b/src/mongo/db/repl/network_interface_impl.h index 13a3e55bb43..0e59fefa5ca 100644 --- a/src/mongo/db/repl/network_interface_impl.h +++ b/src/mongo/db/repl/network_interface_impl.h @@ -29,20 +29,17 @@ #pragma once -#include <boost/scoped_ptr.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> #include <boost/thread/condition_variable.hpp> #include <boost/thread/mutex.hpp> #include <vector> +#include "mongo/client/remote_command_executor_impl.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/stdx/list.h" namespace mongo { - - class ConnectionPool; - namespace repl { /** @@ -82,7 +79,7 @@ namespace repl { virtual Date_t now(); virtual void startCommand( const ReplicationExecutor::CallbackHandle& cbHandle, - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish); virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle); OperationContext* createOperationContext() override; @@ -95,7 +92,7 @@ namespace repl { */ struct CommandData { ReplicationExecutor::CallbackHandle cbHandle; - ReplicationExecutor::RemoteCommandRequest request; + RemoteCommandRequest request; RemoteCommandCompletionFn onFinish; }; typedef stdx::list<CommandData> CommandDataList; @@ -114,11 +111,6 @@ namespace repl { void _consumeNetworkRequests(); /** - * Synchronously invokes the command described by "request". - */ - ResponseStatus _runCommand(const ReplicationExecutor::RemoteCommandRequest& request); - - /** * Notifies the network threads that there is work available. */ void _signalWorkAvailable_inlock(); @@ -128,8 +120,8 @@ namespace repl { */ void _startNewNetworkThread_inlock(); - // Mutex guarding the state of the network interface, except for the pool pointed to by - // _connPool. + // Mutex guarding the state of this network interface, except for the remote command + // executor, which has its own concurrency control. boost::mutex _mutex; // Condition signaled to indicate that there is work in the _pending queue. @@ -161,9 +153,8 @@ namespace repl { // Flag indicating when this interface is being shut down (because shutdown() has executed). bool _inShutdown; - // Pool of connections to remote nodes, used by the worker threads to execute network - // requests. - boost::scoped_ptr<ConnectionPool> _connPool; // (R) + // Interface for executing remote commands + RemoteCommandExecutorImpl _commandExec; // Number of active network requests size_t _numActiveNetworkRequests; diff --git a/src/mongo/db/repl/network_interface_mock.cpp b/src/mongo/db/repl/network_interface_mock.cpp index 34ac78b8772..ed10d4743a0 100644 --- a/src/mongo/db/repl/network_interface_mock.cpp +++ b/src/mongo/db/repl/network_interface_mock.cpp @@ -73,7 +73,7 @@ namespace repl { void NetworkInterfaceMock::startCommand( const ReplicationExecutor::CallbackHandle& cbHandle, - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish) { boost::lock_guard<boost::mutex> lk(_mutex); @@ -353,7 +353,7 @@ namespace repl { return _waitingToRunMask & kExecutorThread; } - static const StatusWith<ReplicationExecutor::RemoteCommandResponse> kUnsetResponse( + static const StatusWith<RemoteCommandResponse> kUnsetResponse( ErrorCodes::InternalError, "NetworkOperation::_response never set"); @@ -368,7 +368,7 @@ namespace repl { NetworkInterfaceMock::NetworkOperation::NetworkOperation( const ReplicationExecutor::CallbackHandle& cbHandle, - const ReplicationExecutor::RemoteCommandRequest& theRequest, + const RemoteCommandRequest& theRequest, Date_t theRequestDate, const RemoteCommandCompletionFn& onFinish) : _requestDate(theRequestDate), diff --git a/src/mongo/db/repl/network_interface_mock.h b/src/mongo/db/repl/network_interface_mock.h index 1682c3beaeb..255eb7efec7 100644 --- a/src/mongo/db/repl/network_interface_mock.h +++ b/src/mongo/db/repl/network_interface_mock.h @@ -82,7 +82,7 @@ namespace repl { virtual void signalWorkAvailable(); virtual Date_t now(); virtual void startCommand(const ReplicationExecutor::CallbackHandle& cbHandle, - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const RemoteCommandCompletionFn& onFinish); virtual void cancelCommand(const ReplicationExecutor::CallbackHandle& cbHandle); OperationContext* createOperationContext() override; @@ -263,7 +263,7 @@ namespace repl { public: NetworkOperation(); NetworkOperation(const ReplicationExecutor::CallbackHandle& cbHandle, - const ReplicationExecutor::RemoteCommandRequest& theRequest, + const RemoteCommandRequest& theRequest, Date_t theRequestDate, const RemoteCommandCompletionFn& onFinish); ~NetworkOperation(); @@ -290,7 +290,7 @@ namespace repl { /** * Gets the request that initiated this operation. */ - const ReplicationExecutor::RemoteCommandRequest& getRequest() const { return _request; } + const RemoteCommandRequest& getRequest() const { return _request; } /** * Gets the virtual time at which the operation was started. @@ -319,7 +319,7 @@ namespace repl { Date_t _nextConsiderationDate; Date_t _responseDate; ReplicationExecutor::CallbackHandle _cbHandle; - ReplicationExecutor::RemoteCommandRequest _request; + RemoteCommandRequest _request; ResponseStatus _response; RemoteCommandCompletionFn _onFinish; }; diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index 5ea30666316..db4104ed761 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -48,8 +48,6 @@ namespace mongo { namespace repl { namespace { - typedef ReplicationExecutor::RemoteCommandRequest RemoteCommandRequest; - class ReplCoordElectTest : public ReplCoordTest { protected: void simulateEnoughHeartbeatsForElectability(); @@ -63,7 +61,7 @@ namespace { net->enterNetwork(); for (int i = 0; i < rsConfig.getNumMembers() - 1; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { @@ -93,7 +91,7 @@ namespace { net->enterNetwork(); for (int i = 0; i < rsConfig.getNumMembers() - 1; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; if (request.cmdObj.firstElement().fieldNameStringData() == "replSetFresh") { net->scheduleResponse(noi, net->now(), makeResponseStatus( @@ -241,7 +239,7 @@ namespace { net->enterNetwork(); while (net->hasReadyRequests()) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; if (request.target != HostAndPort("node2", 12345)) { net->blackHole(noi); @@ -288,7 +286,7 @@ namespace { net->enterNetwork(); while (net->hasReadyRequests()) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; if (request.target != HostAndPort("node2", 12345)) { net->blackHole(noi); @@ -372,7 +370,7 @@ namespace { net->enterNetwork(); for (int i = 0; i < 2; ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index d51ea95e690..cbe8b1448ce 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -55,8 +55,8 @@ namespace mongo { namespace repl { namespace { + typedef StatusWith<ReplicationExecutor::CallbackHandle> CBHStatus; - typedef ReplicationExecutor::RemoteCommandRequest CmdRequest; typedef ReplicationExecutor::CallbackHandle CBHandle; } //namespace @@ -77,7 +77,10 @@ namespace { _settings.ourSetName(), target); - const CmdRequest request(target, "admin", hbRequest.first.toBSON(), hbRequest.second); + const RemoteCommandRequest request(target, + "admin", + hbRequest.first.toBSON(), + hbRequest.second); const ReplicationExecutor::RemoteCommandCallbackFn callback = stdx::bind( &ReplicationCoordinatorImpl::_handleHeartbeatResponse, this, @@ -252,7 +255,7 @@ namespace { } // namespace void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort& target) { - CmdRequest request(target, "admin", BSON("replSetStepDown" << 1)); + RemoteCommandRequest request(target, "admin", BSON("replSetStepDown" << 1)); log() << "Requesting " << target << " step down from primary"; CBHStatus cbh = _replExecutor.scheduleRemoteCommand( diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp index 9008dbb9854..30c564284b1 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp @@ -100,7 +100,7 @@ namespace { enterNetwork(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS(HostAndPort("h1", 1), request.target); ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); @@ -159,7 +159,7 @@ namespace { enterNetwork(); NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); ASSERT_EQUALS(HostAndPort("h1", 1), request.target); ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); @@ -222,7 +222,7 @@ namespace { // process heartbeat enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus( BSON("ok" << 0.0 << diff --git a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp index f29a830bb6f..086a574d630 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_reconfig_test.cpp @@ -48,7 +48,6 @@ namespace repl { namespace { typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; - typedef ReplicationExecutor::RemoteCommandRequest RemoteCommandRequest; TEST_F(ReplCoordTest, ReconfigBeforeInitialized) { // start up but do not initiate @@ -216,7 +215,7 @@ namespace { NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); repl::ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); repl::ReplSetHeartbeatResponse hbResp; @@ -254,7 +253,7 @@ namespace { NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); repl::ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); repl::ReplSetHeartbeatResponse hbResp; @@ -363,7 +362,7 @@ namespace { NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); repl::ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); repl::ReplSetHeartbeatResponse hbResp; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index ca87ebf9fe3..f486fed8694 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -270,8 +270,7 @@ namespace { getNet()->scheduleResponse( noi, startDate + 10, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse(hbResp.toBSON(), - Milliseconds(8)))); + ResponseStatus(RemoteCommandResponse(hbResp.toBSON(), Milliseconds(8)))); getNet()->runUntil(startDate + 10); getNet()->exitNetwork(); ASSERT_EQUALS(startDate + 10, getNet()->now()); @@ -990,7 +989,7 @@ namespace { getNet()->runUntil(getNet()->now() + 2000); ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); - ReplicationExecutor::RemoteCommandRequest request = noi->getRequest(); + RemoteCommandRequest request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { @@ -1178,7 +1177,7 @@ namespace { getNet()->runUntil(getNet()->now() + 2000); ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); - ReplicationExecutor::RemoteCommandRequest request = noi->getRequest(); + RemoteCommandRequest request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { @@ -1700,7 +1699,7 @@ namespace { NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); repl::ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); repl::ReplSetHeartbeatResponse hbResp; @@ -1770,7 +1769,7 @@ namespace { NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); repl::ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); repl::ReplSetHeartbeatResponse hbResp; @@ -1838,7 +1837,7 @@ namespace { NetworkInterfaceMock* net = getNet(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); repl::ReplSetHeartbeatArgs hbArgs; ASSERT_OK(hbArgs.initialize(request.cmdObj)); repl::ReplSetHeartbeatResponse hbResp; diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index 808bffe819c..160cb55281d 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -160,7 +160,7 @@ namespace { ResponseStatus ReplCoordTest::makeResponseStatus(const BSONObj& doc, Milliseconds millis) { log() << "Responding with " << doc; - return ResponseStatus(ReplicationExecutor::RemoteCommandResponse(doc, millis)); + return ResponseStatus(RemoteCommandResponse(doc, millis)); } void ReplCoordTest::simulateSuccessfulElection() { @@ -174,7 +174,7 @@ namespace { log() << "Waiting on network in state " << replCoord->getMemberState(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { @@ -236,7 +236,7 @@ namespace { getNet()->enterNetwork(); net->runUntil(net->now() + 10000); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); - const ReplicationExecutor::RemoteCommandRequest& request = noi->getRequest(); + const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { diff --git a/src/mongo/db/repl/replication_executor.cpp b/src/mongo/db/repl/replication_executor.cpp index 65bc66ab2ff..be8dbb0ba9f 100644 --- a/src/mongo/db/repl/replication_executor.cpp +++ b/src/mongo/db/repl/replication_executor.cpp @@ -43,9 +43,6 @@ namespace { stdx::function<void ()> makeNoExcept(const stdx::function<void ()> &fn); } // namespace - const ReplicationExecutor::Milliseconds ReplicationExecutor::kNoTimeout(-1); - const Date_t ReplicationExecutor::kNoExpirationDate(-1); - ReplicationExecutor::ReplicationExecutor(NetworkInterface* netInterface, int64_t prngSeed) : _random(prngSeed), _networkInterface(netInterface), @@ -236,7 +233,7 @@ namespace { static void remoteCommandFinished( const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::RemoteCommandCallbackFn& cb, - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { if (cbData.status.isOK()) { @@ -255,7 +252,7 @@ namespace { static void remoteCommandFailedEarly( const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::RemoteCommandCallbackFn& cb, - const ReplicationExecutor::RemoteCommandRequest& request) { + const RemoteCommandRequest& request) { invariant(!cbData.status.isOK()); cb(ReplicationExecutor::RemoteCommandCallbackData( @@ -543,35 +540,6 @@ namespace { txn(theTxn) { } - ReplicationExecutor::RemoteCommandRequest::RemoteCommandRequest() : - timeout(kNoTimeout), - expirationDate(kNoExpirationDate) { - } - - ReplicationExecutor::RemoteCommandRequest::RemoteCommandRequest( - const HostAndPort& theTarget, - const std::string& theDbName, - const BSONObj& theCmdObj, - const Milliseconds timeoutMillis) : - target(theTarget), - dbname(theDbName), - cmdObj(theCmdObj), - timeout(timeoutMillis) { - if (timeoutMillis == kNoTimeout) { - expirationDate = kNoExpirationDate; - } - } - - std::string ReplicationExecutor::RemoteCommandRequest::getDiagnosticString() { - str::stream out; - out << "RemoteCommand -- target:" << target.toString() << " db:" << dbname; - - if (expirationDate != kNoExpirationDate) - out << " expDate:" << expirationDate.toString(); - - out << " cmd:" << cmdObj.getOwned().toString(); - return out; - } ReplicationExecutor::RemoteCommandCallbackData::RemoteCommandCallbackData( ReplicationExecutor* theExecutor, diff --git a/src/mongo/db/repl/replication_executor.h b/src/mongo/db/repl/replication_executor.h index 7ed930f0f66..ea19ac411c2 100644 --- a/src/mongo/db/repl/replication_executor.h +++ b/src/mongo/db/repl/replication_executor.h @@ -38,7 +38,7 @@ #include "mongo/base/status.h" #include "mongo/base/status_with.h" #include "mongo/base/string_data.h" -#include "mongo/db/jsobj.h" +#include "mongo/client/remote_command_executor.h" #include "mongo/db/concurrency/lock_manager_defs.h" #include "mongo/db/repl/task_runner.h" #include "mongo/platform/compiler.h" @@ -116,13 +116,8 @@ namespace repl { class EventHandle; class NetworkInterface; struct RemoteCommandCallbackData; - struct RemoteCommandRequest; - struct RemoteCommandResponse; typedef StatusWith<RemoteCommandResponse> ResponseStatus; - static const Milliseconds kNoTimeout; - static const Date_t kNoExpirationDate; - /** * Type of a regular callback function. * @@ -517,36 +512,6 @@ namespace repl { }; /** - * Type of object describing a command to execute against a remote MongoDB node. - */ - struct ReplicationExecutor::RemoteCommandRequest { - RemoteCommandRequest(); - RemoteCommandRequest(const HostAndPort& theTarget, - const std::string& theDbName, - const BSONObj& theCmdObj, - const Milliseconds timeoutMillis = kNoTimeout); - - // Returns diagnostic info. - std::string getDiagnosticString(); - - HostAndPort target; - std::string dbname; - BSONObj cmdObj; - Milliseconds timeout; - Date_t expirationDate; // Set by scheduleRemoteCommand. - }; - - struct ReplicationExecutor::RemoteCommandResponse { - RemoteCommandResponse() : data(), elapsedMillis(Milliseconds(0)) {} - RemoteCommandResponse(BSONObj obj, Milliseconds millis) - : data(obj), - elapsedMillis(millis) {} - - BSONObj data; - Milliseconds elapsedMillis; - }; - - /** * Interface to networking and lock manager. */ class ReplicationExecutor::NetworkInterface { diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index f5ec39f3f8e..5b1c62ea13d 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -48,15 +48,15 @@ namespace repl { namespace { - bool operator==(const ReplicationExecutor::RemoteCommandRequest lhs, - const ReplicationExecutor::RemoteCommandRequest rhs) { + bool operator==(const RemoteCommandRequest lhs, + const RemoteCommandRequest rhs) { return lhs.target == rhs.target && lhs.dbname == rhs.dbname && lhs.cmdObj == rhs.cmdObj; } - bool operator!=(const ReplicationExecutor::RemoteCommandRequest lhs, - const ReplicationExecutor::RemoteCommandRequest rhs) { + bool operator!=(const RemoteCommandRequest lhs, + const RemoteCommandRequest rhs) { return !(lhs == rhs); } @@ -309,14 +309,14 @@ namespace { ASSERT_EQUALS(status3, ErrorCodes::CallbackCanceled); } - std::string getRequestDescription(const ReplicationExecutor::RemoteCommandRequest& request) { + std::string getRequestDescription(const RemoteCommandRequest& request) { return mongoutils::str::stream() << "Request(" << request.target.toString() << ", " << request.dbname << ", " << request.cmdObj << ')'; } static void setStatusOnRemoteCommandCompletion( const ReplicationExecutor::RemoteCommandCallbackData& cbData, - const ReplicationExecutor::RemoteCommandRequest& expectedRequest, + const RemoteCommandRequest& expectedRequest, Status* outStatus) { if (cbData.request != expectedRequest) { @@ -335,7 +335,7 @@ namespace { ReplicationExecutor& executor = getExecutor(); launchExecutorThread(); Status status1(ErrorCodes::InternalError, "Not mutated"); - const ReplicationExecutor::RemoteCommandRequest request( + const RemoteCommandRequest request( HostAndPort("localhost", 27017), "mydb", BSON("whatsUp" << "doc")); @@ -362,7 +362,7 @@ namespace { TEST_F(ReplicationExecutorTest, ScheduleAndCancelRemoteCommand) { ReplicationExecutor& executor = getExecutor(); Status status1(ErrorCodes::InternalError, "Not mutated"); - const ReplicationExecutor::RemoteCommandRequest request( + const RemoteCommandRequest request( HostAndPort("localhost", 27017), "mydb", BSON("whatsUp" << "doc")); @@ -453,7 +453,7 @@ namespace { ReplicationExecutor& executor = getExecutor(); Status status(ErrorCodes::InternalError, ""); launchExecutorThread(); - const ReplicationExecutor::RemoteCommandRequest request( + const RemoteCommandRequest request( HostAndPort("lazy", 27017), "admin", BSON("sleep" << 1), @@ -480,7 +480,7 @@ namespace { TEST_F(ReplicationExecutorTest, CallbackHandleComparison) { ReplicationExecutor& executor = getExecutor(); Status status(ErrorCodes::InternalError, ""); - const ReplicationExecutor::RemoteCommandRequest request( + const RemoteCommandRequest request( HostAndPort("lazy", 27017), "admin", BSON("cmd" << 1)); diff --git a/src/mongo/db/repl/reporter.cpp b/src/mongo/db/repl/reporter.cpp index f532642dd25..a3120adf01c 100644 --- a/src/mongo/db/repl/reporter.cpp +++ b/src/mongo/db/repl/reporter.cpp @@ -98,7 +98,7 @@ namespace repl { _updatePositionSource->prepareReplSetUpdatePositionCommand(&cmd); StatusWith<ReplicationExecutor::CallbackHandle> scheduleResult = _executor->scheduleRemoteCommand( - ReplicationExecutor::RemoteCommandRequest(_target, "admin", cmd.obj()), + RemoteCommandRequest(_target, "admin", cmd.obj()), stdx::bind(&Reporter::_callback, this, stdx::placeholders::_1)); if (!scheduleResult.isOK()) { diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index d0a9bc51638..c060f714339 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -100,7 +100,7 @@ namespace { NetworkInterfaceMock* net = getNet(); ASSERT_TRUE(net->hasReadyRequests()); ReplicationExecutor::Milliseconds millis(0); - ReplicationExecutor::RemoteCommandResponse response(obj, millis); + RemoteCommandResponse response(obj, millis); ReplicationExecutor::ResponseStatus responseStatus(response); net->scheduleResponse(net->getNextReadyRequest(), net->now(), responseStatus); } diff --git a/src/mongo/db/repl/scatter_gather_algorithm.h b/src/mongo/db/repl/scatter_gather_algorithm.h index 7622f0e385e..9b6bd05d896 100644 --- a/src/mongo/db/repl/scatter_gather_algorithm.h +++ b/src/mongo/db/repl/scatter_gather_algorithm.h @@ -56,14 +56,13 @@ namespace repl { /** * Returns the list of requests that should be sent. */ - virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const = 0; + virtual std::vector<RemoteCommandRequest> getRequests() const = 0; /** * Method to call once for each received response. */ - virtual void processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, - const ResponseStatus& response) = 0; + virtual void processResponse(const RemoteCommandRequest& request, + const ResponseStatus& response) = 0; /** * Returns true if no more calls to processResponse are needed to consider the diff --git a/src/mongo/db/repl/scatter_gather_runner.cpp b/src/mongo/db/repl/scatter_gather_runner.cpp index ce2d8a7dbb9..16b3aff915c 100644 --- a/src/mongo/db/repl/scatter_gather_runner.cpp +++ b/src/mongo/db/repl/scatter_gather_runner.cpp @@ -95,7 +95,7 @@ namespace repl { stdx::placeholders::_1, this); - std::vector<ReplicationExecutor::RemoteCommandRequest> requests = _algorithm->getRequests(); + std::vector<RemoteCommandRequest> requests = _algorithm->getRequests(); for (size_t i = 0; i < requests.size(); ++i) { const StatusWith<ReplicationExecutor::CallbackHandle> cbh = executor->scheduleRemoteCommand(requests[i], cb); diff --git a/src/mongo/db/repl/scatter_gather_test.cpp b/src/mongo/db/repl/scatter_gather_test.cpp index e878ab38f62..a98c5a1f51f 100644 --- a/src/mongo/db/repl/scatter_gather_test.cpp +++ b/src/mongo/db/repl/scatter_gather_test.cpp @@ -54,10 +54,10 @@ namespace { _numResponses(0), _maxResponses(maxResponses) {} - virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const { - std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + virtual std::vector<RemoteCommandRequest> getRequests() const { + std::vector<RemoteCommandRequest> requests; for (int i = 0; i < 3; i++) { - requests.push_back(ReplicationExecutor::RemoteCommandRequest( + requests.push_back(RemoteCommandRequest( HostAndPort("hostname", i), "admin", BSONObj(), @@ -67,7 +67,7 @@ namespace { } virtual void processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { _numResponses++; } @@ -187,7 +187,7 @@ namespace { NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now()+2000, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); ASSERT_FALSE(ranCompletion); @@ -195,7 +195,7 @@ namespace { noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now()+2000, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); ASSERT_FALSE(ranCompletion); @@ -203,7 +203,7 @@ namespace { noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now()+5000, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); ASSERT_FALSE(ranCompletion); @@ -291,7 +291,7 @@ namespace { NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now()+2000, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); ASSERT_FALSE(ranCompletion); @@ -299,7 +299,7 @@ namespace { noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now()+2000, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); ASSERT_FALSE(ranCompletion); @@ -307,7 +307,7 @@ namespace { noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now()+5000, - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); ASSERT_FALSE(ranCompletion); @@ -359,7 +359,7 @@ namespace { NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); @@ -368,7 +368,7 @@ namespace { noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); @@ -377,7 +377,7 @@ namespace { noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); @@ -398,7 +398,7 @@ namespace { NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); @@ -410,7 +410,7 @@ namespace { noi = net->getNextReadyRequest(); net->scheduleResponse(noi, net->now(), - ResponseStatus(ReplicationExecutor::RemoteCommandResponse( + ResponseStatus(RemoteCommandResponse( BSON("ok" << 1), boost::posix_time::milliseconds(10)))); net->runReadyNetworkOperations(); diff --git a/src/mongo/db/repl/vote_requester.cpp b/src/mongo/db/repl/vote_requester.cpp index ecabc80fbd5..b2116649b4f 100644 --- a/src/mongo/db/repl/vote_requester.cpp +++ b/src/mongo/db/repl/vote_requester.cpp @@ -61,7 +61,7 @@ namespace repl { VoteRequester::Algorithm::~Algorithm() {} - std::vector<ReplicationExecutor::RemoteCommandRequest> + std::vector<RemoteCommandRequest> VoteRequester::Algorithm::getRequests() const { BSONObjBuilder requestVotesCmdBuilder; requestVotesCmdBuilder.append("replSetDeclareElectionWinner", 1); @@ -77,9 +77,9 @@ namespace repl { const BSONObj requestVotesCmd = requestVotesCmdBuilder.obj(); - std::vector<ReplicationExecutor::RemoteCommandRequest> requests; + std::vector<RemoteCommandRequest> requests; for (const auto& target : _targets) { - requests.push_back(ReplicationExecutor::RemoteCommandRequest( + requests.push_back(RemoteCommandRequest( target, "admin", requestVotesCmd, @@ -90,7 +90,7 @@ namespace repl { } void VoteRequester::Algorithm::processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response) { _responsesProcessed++; if (!response.isOK()) { // failed response diff --git a/src/mongo/db/repl/vote_requester.h b/src/mongo/db/repl/vote_requester.h index 72c4f90c3f7..578e54c05a5 100644 --- a/src/mongo/db/repl/vote_requester.h +++ b/src/mongo/db/repl/vote_requester.h @@ -59,9 +59,9 @@ namespace repl { long long term, OpTime lastOplogEntry); virtual ~Algorithm(); - virtual std::vector<ReplicationExecutor::RemoteCommandRequest> getRequests() const; + virtual std::vector<RemoteCommandRequest> getRequests() const; virtual void processResponse( - const ReplicationExecutor::RemoteCommandRequest& request, + const RemoteCommandRequest& request, const ResponseStatus& response); virtual bool hasReceivedSufficientResponses() const; diff --git a/src/mongo/db/repl/vote_requester_test.cpp b/src/mongo/db/repl/vote_requester_test.cpp index 5a36015f8c6..43a1bbba3ae 100644 --- a/src/mongo/db/repl/vote_requester_test.cpp +++ b/src/mongo/db/repl/vote_requester_test.cpp @@ -45,7 +45,7 @@ namespace repl { namespace { using unittest::assertGet; - using RemoteCommandRequest = ReplicationExecutor::RemoteCommandRequest; + using RemoteCommandRequest = RemoteCommandRequest; bool stringContains(const std::string &haystack, const std::string& needle) { return haystack.find(needle) != std::string::npos; |