diff options
Diffstat (limited to 'src/mongo/executor')
30 files changed, 459 insertions, 440 deletions
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp index ae8356a4982..c2fee263c60 100644 --- a/src/mongo/executor/async_mock_stream_factory.cpp +++ b/src/mongo/executor/async_mock_stream_factory.cpp @@ -124,18 +124,17 @@ AsyncMockStreamFactory::MockStream::~MockStream() { void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterator endpoints, ConnectHandler&& connectHandler) { // Suspend execution after "connecting" - _defer(kBlockedBeforeConnect, - [this, connectHandler, endpoints]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - // We shim a lambda to give connectHandler the right signature since it doesn't take - // a size_t param. - checkCanceled( - _strand, - &_state, - [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); }, - 0); - }); + _defer(kBlockedBeforeConnect, [this, connectHandler, endpoints]() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + // We shim a lambda to give connectHandler the right signature since it doesn't take + // a size_t param. + checkCanceled( + _strand, + &_state, + [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); }, + 0); + }); } void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf, @@ -147,11 +146,10 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf, _writeQueue.push({begin, begin + size}); // Suspend execution after data is written. - _defer_inlock(kBlockedAfterWrite, - [this, writeHandler, size]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - checkCanceled(_strand, &_state, std::move(writeHandler), size); - }); + _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + checkCanceled(_strand, &_state, std::move(writeHandler), size); + }); } void AsyncMockStreamFactory::MockStream::cancel() { @@ -169,45 +167,44 @@ void AsyncMockStreamFactory::MockStream::cancel() { void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf, StreamHandler&& readHandler) { // Suspend execution before data is read. - _defer(kBlockedBeforeRead, - [this, buf, readHandler]() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - int nToCopy = 0; - - // If we've set an error, return that instead of a read. - if (!_error) { - auto nextRead = std::move(_readQueue.front()); - _readQueue.pop(); - - auto beginDst = asio::buffer_cast<uint8_t*>(buf); - nToCopy = std::min(nextRead.size(), asio::buffer_size(buf)); - - auto endSrc = std::begin(nextRead); - std::advance(endSrc, nToCopy); - - auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst); - invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy)); - log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy) - << " remaining in buffer"; - } - - auto handler = readHandler; - - // If we did not receive all the bytes, we should return an error - if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) { - handler = [readHandler](std::error_code ec, size_t len) { - // If we have an error here we've been canceled, and that takes precedence - if (ec) - return readHandler(ec, len); - - // Call the original handler with an error - readHandler(make_error_code(ErrorCodes::InvalidLength), len); - }; - } - - checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error); - _error.clear(); - }); + _defer(kBlockedBeforeRead, [this, buf, readHandler]() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + int nToCopy = 0; + + // If we've set an error, return that instead of a read. + if (!_error) { + auto nextRead = std::move(_readQueue.front()); + _readQueue.pop(); + + auto beginDst = asio::buffer_cast<uint8_t*>(buf); + nToCopy = std::min(nextRead.size(), asio::buffer_size(buf)); + + auto endSrc = std::begin(nextRead); + std::advance(endSrc, nToCopy); + + auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst); + invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy)); + log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy) + << " remaining in buffer"; + } + + auto handler = readHandler; + + // If we did not receive all the bytes, we should return an error + if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) { + handler = [readHandler](std::error_code ec, size_t len) { + // If we have an error here we've been canceled, and that takes precedence + if (ec) + return readHandler(ec, len); + + // Call the original handler with an error + readHandler(make_error_code(ErrorCodes::InvalidLength), len); + }; + } + + checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error); + _error.clear(); + }); } void AsyncMockStreamFactory::MockStream::pushRead(std::vector<uint8_t> toRead) { diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h index 11c10128065..0c1e33ad3b4 100644 --- a/src/mongo/executor/async_mock_stream_factory.h +++ b/src/mongo/executor/async_mock_stream_factory.h @@ -30,8 +30,8 @@ #include <asio.hpp> #include <cstdint> -#include <queue> #include <memory> +#include <queue> #include <unordered_map> #include "mongo/executor/async_stream_factory_interface.h" diff --git a/src/mongo/executor/async_stream_test.cpp b/src/mongo/executor/async_stream_test.cpp index b772b5ac0e1..2731df5d713 100644 --- a/src/mongo/executor/async_stream_test.cpp +++ b/src/mongo/executor/async_stream_test.cpp @@ -123,11 +123,10 @@ TEST(AsyncStreamTest, IsOpen) { executor::Deferred<bool> opened; log() << "opening up outgoing connection"; - stream.connect(endpoints, - [opened](std::error_code ec) mutable { - log() << "opened outgoing connection"; - opened.emplace(!ec); - }); + stream.connect(endpoints, [opened](std::error_code ec) mutable { + log() << "opened outgoing connection"; + opened.emplace(!ec); + }); ASSERT_TRUE(opened.get()); ASSERT_TRUE(stream.isOpen()); diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp index e6c0d150694..77e9f542af7 100644 --- a/src/mongo/executor/connection_pool.cpp +++ b/src/mongo/executor/connection_pool.cpp @@ -31,8 +31,8 @@ #include "mongo/executor/connection_pool.h" -#include "mongo/executor/connection_pool_stats.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/executor/connection_pool_stats.h" #include "mongo/executor/remote_command_request.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" @@ -363,30 +363,29 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk // Our strategy for refreshing connections is to check them out and // immediately check them back in (which kicks off the refresh logic in // returnConnection - connPtr->setTimeout(_parent->_options.refreshRequirement, - [this, connPtr]() { - OwnedConnection conn; + connPtr->setTimeout(_parent->_options.refreshRequirement, [this, connPtr]() { + OwnedConnection conn; - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - if (!_readyPool.count(connPtr)) { - // We've already been checked out. We don't need to refresh - // ourselves. - return; - } + if (!_readyPool.count(connPtr)) { + // We've already been checked out. We don't need to refresh + // ourselves. + return; + } - conn = takeFromPool(_readyPool, connPtr); + conn = takeFromPool(_readyPool, connPtr); - // If we're in shutdown, we don't need to refresh connections - if (_state == State::kInShutdown) - return; + // If we're in shutdown, we don't need to refresh connections + if (_state == State::kInShutdown) + return; - _checkedOutPool[connPtr] = std::move(conn); + _checkedOutPool[connPtr] = std::move(conn); - connPtr->indicateSuccess(); + connPtr->indicateSuccess(); - returnConnection(connPtr, std::move(lk)); - }); + returnConnection(connPtr, std::move(lk)); + }); fulfillRequests(lk); } @@ -608,31 +607,29 @@ void ConnectionPool::SpecificPool::updateStateInLock() { // We set a timer for the most recent request, then invoke each timed // out request we couldn't service - _requestTimer->setTimeout( - timeout, - [this]() { - stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); - - auto now = _parent->_factory->now(); - - while (_requests.size()) { - auto& x = _requests.top(); - - if (x.first <= now) { - auto cb = std::move(x.second); - _requests.pop(); - - lk.unlock(); - cb(Status(ErrorCodes::ExceededTimeLimit, - "Couldn't get a connection within the time limit")); - lk.lock(); - } else { - break; - } + _requestTimer->setTimeout(timeout, [this]() { + stdx::unique_lock<stdx::mutex> lk(_parent->_mutex); + + auto now = _parent->_factory->now(); + + while (_requests.size()) { + auto& x = _requests.top(); + + if (x.first <= now) { + auto cb = std::move(x.second); + _requests.pop(); + + lk.unlock(); + cb(Status(ErrorCodes::ExceededTimeLimit, + "Couldn't get a connection within the time limit")); + lk.lock(); + } else { + break; } + } - updateStateInLock(); - }); + updateStateInLock(); + }); } else if (_checkedOutPool.size()) { // If we have no requests, but someone's using a connection, we just // hang around until the next request or a return diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h index 1ac0b7194ee..68ea3ad4eba 100644 --- a/src/mongo/executor/connection_pool.h +++ b/src/mongo/executor/connection_pool.h @@ -28,8 +28,8 @@ #pragma once #include <memory> -#include <unordered_map> #include <queue> +#include <unordered_map> #include "mongo/base/disallow_copying.h" #include "mongo/stdx/chrono.h" diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index 7be1874e7b3..2cb395df612 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -231,18 +231,16 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { cb(this, failedResponse.getStatus()); }); - _global->_impl->_asyncRunCommand( - op, - [this, op](std::error_code ec, size_t bytes) { - cancelTimeout(); + _global->_impl->_asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) { + cancelTimeout(); - auto cb = std::move(_refreshCallback); + auto cb = std::move(_refreshCallback); - if (ec) - return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + if (ec) + return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); - cb(this, Status::OK()); - }); + cb(this, Status::OK()); + }); }); } diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h index 36286fe7803..00b09667c1c 100644 --- a/src/mongo/executor/connection_pool_asio.h +++ b/src/mongo/executor/connection_pool_asio.h @@ -31,10 +31,10 @@ #include <memory> +#include "mongo/executor/async_stream_interface.h" #include "mongo/executor/connection_pool.h" -#include "mongo/executor/network_interface_asio.h" #include "mongo/executor/network_interface.h" -#include "mongo/executor/async_stream_interface.h" +#include "mongo/executor/network_interface_asio.h" #include "mongo/stdx/mutex.h" namespace mongo { diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp index 24949a142c7..5f7cf6676da 100644 --- a/src/mongo/executor/connection_pool_test.cpp +++ b/src/mongo/executor/connection_pool_test.cpp @@ -30,9 +30,9 @@ #include "mongo/executor/connection_pool_test_fixture.h" #include "mongo/executor/connection_pool.h" -#include "mongo/unittest/unittest.h" -#include "mongo/stdx/memory.h" #include "mongo/stdx/future.h" +#include "mongo/stdx/memory.h" +#include "mongo/unittest/unittest.h" namespace mongo { namespace executor { @@ -56,7 +56,7 @@ private: }; #define CONN2ID(swConn) \ - [](StatusWith<ConnectionPool::ConnectionHandle> & swConn) { \ + [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \ ASSERT(swConn.isOK()); \ return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \ }(swConn) diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp index 2d48ad0b5e2..53badbbe1f3 100644 --- a/src/mongo/executor/connection_pool_test_fixture.cpp +++ b/src/mongo/executor/connection_pool_test_fixture.cpp @@ -159,9 +159,9 @@ void ConnectionImpl::cancelTimeout() { void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) { _setupCallback = std::move(cb); - _timer.setTimeout( - timeout, - [this] { _setupCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); }); + _timer.setTimeout(timeout, [this] { + _setupCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); + }); _setupQueue.push_back(this); @@ -175,9 +175,9 @@ void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) { void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) { _refreshCallback = std::move(cb); - _timer.setTimeout( - timeout, - [this] { _refreshCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); }); + _timer.setTimeout(timeout, [this] { + _refreshCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); + }); _refreshQueue.push_back(this); diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp index e03e8a0b2d4..667c955988f 100644 --- a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp +++ b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp @@ -68,8 +68,10 @@ StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::int32_t reque if (header.getResponseToMsgId() != requestId) { return {ErrorCodes::ProtocolError, str::stream() << "responseTo field of OP_REPLY header with value '" - << header.getResponseToMsgId() << "' does not match requestId '" - << requestId << "'"}; + << header.getResponseToMsgId() + << "' does not match requestId '" + << requestId + << "'"}; } if ((header.dataLen() < 0) || @@ -95,7 +97,8 @@ StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::int32_t reque if (qr.getNReturned() != 1) { return {ErrorCodes::BadValue, str::stream() << "ResultFlag_ErrSet flag set on reply, but nReturned was '" - << qr.getNReturned() << "' - expected 1"}; + << qr.getNReturned() + << "' - expected 1"}; } // Convert error document to a Status. // Will throw if first document is invalid BSON. @@ -121,7 +124,8 @@ StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::int32_t reque return {ErrorCodes::InvalidLength, str::stream() << "Count of documents in OP_REPLY message (" << batch.arrSize() << ") did not match the value specified in the nReturned field (" - << qr.getNReturned() << ")"}; + << qr.getNReturned() + << ")"}; } return {std::make_tuple(qr.getCursorId(), batch.arr())}; diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index ca54f813e37..54dbeb7434f 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -155,7 +155,8 @@ void NetworkInterfaceASIO::startup() { _io_service.run(); } catch (...) { severe() << "Uncaught exception in NetworkInterfaceASIO IO " - "worker thread of type: " << exceptionToStatus(); + "worker thread of type: " + << exceptionToStatus(); fassertFailed(28820); } }); diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 2bbded1202d..130e9e83e3d 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -43,8 +43,8 @@ #include "mongo/base/system_error.h" #include "mongo/executor/async_stream_factory_interface.h" #include "mongo/executor/async_stream_interface.h" -#include "mongo/executor/connection_pool.h" #include "mongo/executor/async_timer_interface.h" +#include "mongo/executor/connection_pool.h" #include "mongo/executor/network_connection_hook.h" #include "mongo/executor/network_interface.h" #include "mongo/executor/remote_command_request.h" diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp index df01ad3b5c3..52d7029d42a 100644 --- a/src/mongo/executor/network_interface_asio_auth.cpp +++ b/src/mongo/executor/network_interface_asio_auth.cpp @@ -134,10 +134,9 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) { }; - _asyncRunCommand(op, - [this, op, parseIsMaster](std::error_code ec, size_t bytes) { - _validateAndRun(op, ec, std::move(parseIsMaster)); - }); + _asyncRunCommand(op, [this, op, parseIsMaster](std::error_code ec, size_t bytes) { + _validateAndRun(op, ec, std::move(parseIsMaster)); + }); } void NetworkInterfaceASIO::_authenticate(AsyncOp* op) { diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp index db7ba556d26..f6812fd9c5a 100644 --- a/src/mongo/executor/network_interface_asio_command.cpp +++ b/src/mongo/executor/network_interface_asio_command.cpp @@ -143,9 +143,11 @@ ResponseStatus decodeRPC(Message* received, return Status(ErrorCodes::RPCProtocolNegotiationFailed, str::stream() << "Mismatched RPC protocols - request was '" - << requestProtocol.getValue().toString() << "' '" + << requestProtocol.getValue().toString() + << "' '" << " but reply was '" - << networkOpToString(received->operation()) << "'"); + << networkOpToString(received->operation()) + << "'"); } auto commandReply = reply->getCommandReply(); auto replyMetadata = reply->getMetadata(); @@ -246,10 +248,9 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) { return _completeOperation(op, beginStatus); } - _asyncRunCommand(op, - [this, op](std::error_code ec, size_t bytes) { - _validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); }); - }); + _asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) { + _validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); }); + }); } void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) { @@ -370,39 +371,34 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handle size_t bytes) { // The operation could have been canceled after starting the command, but before // receiving the header - _validateAndRun(op, - ec, - [this, op, recvMessageCallback, ec, bytes, cmd, handler] { - // validate response id - uint32_t expectedId = cmd->toSend().header().getId(); - uint32_t actualId = cmd->header().constView().getResponseToMsgId(); - if (actualId != expectedId) { - LOG(3) << "got wrong response:" - << " expected response id: " << expectedId - << ", got response id: " << actualId; - return handler(make_error_code(ErrorCodes::ProtocolError), bytes); - } - - asyncRecvMessageBody(cmd->conn().stream(), - &cmd->header(), - &cmd->toRecv(), - std::move(recvMessageCallback)); - }); + _validateAndRun(op, ec, [this, op, recvMessageCallback, ec, bytes, cmd, handler] { + // validate response id + uint32_t expectedId = cmd->toSend().header().getId(); + uint32_t actualId = cmd->header().constView().getResponseToMsgId(); + if (actualId != expectedId) { + LOG(3) << "got wrong response:" + << " expected response id: " << expectedId + << ", got response id: " << actualId; + return handler(make_error_code(ErrorCodes::ProtocolError), bytes); + } + + asyncRecvMessageBody(cmd->conn().stream(), + &cmd->header(), + &cmd->toRecv(), + std::move(recvMessageCallback)); + }); }; // Step 2 - auto sendMessageCallback = - [this, cmd, handler, recvHeaderCallback, op](std::error_code ec, size_t bytes) { - _validateAndRun(op, - ec, - [this, cmd, op, recvHeaderCallback] { - asyncRecvMessageHeader(cmd->conn().stream(), - &cmd->header(), - std::move(recvHeaderCallback)); - }); + auto sendMessageCallback = [this, cmd, handler, recvHeaderCallback, op](std::error_code ec, + size_t bytes) { + _validateAndRun(op, ec, [this, cmd, op, recvHeaderCallback] { + asyncRecvMessageHeader( + cmd->conn().stream(), &cmd->header(), std::move(recvHeaderCallback)); + }); - }; + }; // Step 1 asyncSendMessage(cmd->conn().stream(), &cmd->toSend(), std::move(sendMessageCallback)); @@ -451,10 +447,9 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) { return _beginCommunication(op); }; - return _asyncRunCommand(op, - [this, op, finishHook](std::error_code ec, std::size_t bytes) { - _validateAndRun(op, ec, finishHook); - }); + return _asyncRunCommand(op, [this, op, finishHook](std::error_code ec, std::size_t bytes) { + _validateAndRun(op, ec, finishHook); + }); } diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp index a72db76bfe7..4bda92b42b9 100644 --- a/src/mongo/executor/network_interface_asio_connect.cpp +++ b/src/mongo/executor/network_interface_asio_connect.cpp @@ -102,10 +102,9 @@ void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator end auto& stream = op->connection().stream(); - stream.connect(std::move(endpoints), - [this, op](std::error_code ec) { - _validateAndRun(op, ec, [this, op]() { _runIsMaster(op); }); - }); + stream.connect(std::move(endpoints), [this, op](std::error_code ec) { + _validateAndRun(op, ec, [this, op]() { _runIsMaster(op); }); + }); } } // namespace executor diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp index 617ae363937..9272e6abac4 100644 --- a/src/mongo/executor/network_interface_asio_integration_test.cpp +++ b/src/mongo/executor/network_interface_asio_integration_test.cpp @@ -90,11 +90,10 @@ public: Deferred<StatusWith<RemoteCommandResponse>> runCommand( const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) { Deferred<StatusWith<RemoteCommandResponse>> deferred; - net().startCommand(cbHandle, - request, - [deferred](StatusWith<RemoteCommandResponse> resp) mutable { - deferred.emplace(std::move(resp)); - }); + net().startCommand( + cbHandle, request, [deferred](StatusWith<RemoteCommandResponse> resp) mutable { + deferred.emplace(std::move(resp)); + }); return deferred; } @@ -153,7 +152,8 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, Timeouts) { assertCommandFailsOnClient("admin", BSON("sleep" << 1 << "lock" << "none" - << "secs" << 10), + << "secs" + << 10), Milliseconds(100), ErrorCodes::ExceededTimeLimit); @@ -161,7 +161,8 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, Timeouts) { assertCommandOK("admin", BSON("sleep" << 1 << "lock" << "none" - << "secs" << 1), + << "secs" + << 1), Milliseconds(10000000)); } @@ -173,25 +174,25 @@ public: Deferred<Status> run(Fixture* fixture, Pool* pool, Milliseconds timeout = Milliseconds(60000)) { auto cb = makeCallbackHandle(); auto self = *this; - auto out = - fixture->runCommand(cb, - {unittest::getFixtureConnectionString().getServers()[0], - "admin", - _command, - timeout}) - .then(pool, - [self](StatusWith<RemoteCommandResponse> resp) -> Status { - auto status = resp.isOK() - ? getStatusFromCommandResult(resp.getValue().data) - : resp.getStatus(); - - return status == self._expected - ? Status::OK() - : Status{ErrorCodes::BadValue, - str::stream() << "Expected " - << ErrorCodes::errorString(self._expected) - << " but got " << status.toString()}; - }); + auto out = fixture + ->runCommand(cb, + {unittest::getFixtureConnectionString().getServers()[0], + "admin", + _command, + timeout}) + .then(pool, [self](StatusWith<RemoteCommandResponse> resp) -> Status { + auto status = resp.isOK() + ? getStatusFromCommandResult(resp.getValue().data) + : resp.getStatus(); + + return status == self._expected + ? Status::OK() + : Status{ErrorCodes::BadValue, + str::stream() << "Expected " + << ErrorCodes::errorString(self._expected) + << " but got " + << status.toString()}; + }); if (_cancel) { invariant(fixture->randomNumberGenerator()); sleepmillis(fixture->randomNumberGenerator()->nextInt32(10)); @@ -203,33 +204,41 @@ public: static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" - << "secs" << 1), + << "secs" + << 1), ErrorCodes::ExceededTimeLimit, - false).run(fixture, pool, Milliseconds(100)); + false) + .run(fixture, pool, Milliseconds(100)); } static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" - << "millis" << 100), + << "millis" + << 100), ErrorCodes::OK, - false).run(fixture, pool); + false) + .run(fixture, pool); } static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" - << "secs" << 10), + << "secs" + << 10), ErrorCodes::CallbackCanceled, - true).run(fixture, pool); + true) + .run(fixture, pool); } static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) { return StressTestOp(BSON("sleep" << 1 << "lock" << "none" - << "secs" << 30), + << "secs" + << 30), ErrorCodes::OK, - false).run(fixture, pool, RemoteCommandRequest::kNoTimeout); + false) + .run(fixture, pool, RemoteCommandRequest::kNoTimeout); } private: @@ -265,26 +274,24 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) { pool.join(); }); - std::generate_n(std::back_inserter(ops), - numOps, - [&rng, &pool, this] { + std::generate_n(std::back_inserter(ops), numOps, [&rng, &pool, this] { - // stagger operations slightly to mitigate connection pool contention - sleepmillis(rng.nextInt32(10)); + // stagger operations slightly to mitigate connection pool contention + sleepmillis(rng.nextInt32(10)); - auto i = rng.nextCanonicalDouble(); + auto i = rng.nextCanonicalDouble(); - if (i < .3) { - return StressTestOp::runCancelOp(this, &pool); - } else if (i < .7) { - return StressTestOp::runCompleteOp(this, &pool); - } else if (i < .99) { - return StressTestOp::runTimeoutOp(this, &pool); - } else { - // Just a sprinkling of long ops, to mitigate connection pool contention - return StressTestOp::runLongOp(this, &pool); - } - }); + if (i < .3) { + return StressTestOp::runCancelOp(this, &pool); + } else if (i < .7) { + return StressTestOp::runCompleteOp(this, &pool); + } else if (i < .99) { + return StressTestOp::runTimeoutOp(this, &pool); + } else { + // Just a sprinkling of long ops, to mitigate connection pool contention + return StressTestOp::runLongOp(this, &pool); + } + }); log() << "running ops"; auto res = helpers::collect(ops, &pool) @@ -313,7 +320,8 @@ class HangingHook : public executor::NetworkConnectionHook { "admin", BSON("sleep" << 1 << "lock" << "none" - << "secs" << 100000000), + << "secs" + << 100000000), BSONObj()))}; } diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index 7c0e66c6eda..1fe4f33b88d 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -370,9 +370,9 @@ bool NetworkInterfaceASIO::AsyncOp::operator==(const AsyncOp& other) const { } bool NetworkInterfaceASIO::AsyncOp::_hasSeenState(AsyncOp::State state) const { - return std::any_of(std::begin(_states), - std::end(_states), - [state](AsyncOp::State _state) { return _state == state; }); + return std::any_of(std::begin(_states), std::end(_states), [state](AsyncOp::State _state) { + return _state == state; + }); } void NetworkInterfaceASIO::AsyncOp::_transitionToState(AsyncOp::State newState) { @@ -393,9 +393,9 @@ void NetworkInterfaceASIO::AsyncOp::_transitionToState_inlock(AsyncOp::State new // multiple times. Ignore that transition if we're already cancelled. if (newState == State::kCanceled) { // Find the current state - auto iter = std::find_if_not(_states.rbegin(), - _states.rend(), - [](const State& state) { return state == State::kNoState; }); + auto iter = std::find_if_not(_states.rbegin(), _states.rend(), [](const State& state) { + return state == State::kNoState; + }); // If its cancelled, just return if (iter != _states.rend() && *iter == State::kCanceled) { diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp index 7462070bad9..a6193dbccde 100644 --- a/src/mongo/executor/network_interface_asio_test.cpp +++ b/src/mongo/executor/network_interface_asio_test.cpp @@ -166,8 +166,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) { // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); { // Cancel operation while blocked in the write for determinism. By calling cancel here we @@ -196,8 +197,9 @@ TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) { auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); auto& result = deferred.get(); ASSERT(result == ErrorCodes::CallbackCanceled); @@ -214,8 +216,9 @@ TEST_F(NetworkInterfaceASIOTest, LateCancel) { auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); // Simulate user command stream->simulateServer(rpc::Protocol::kOpCommandV1, @@ -244,8 +247,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) { // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); { WriteEvent{stream}.skip(); @@ -272,8 +276,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) { ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); { WriteEvent write{stream}; @@ -299,8 +304,9 @@ TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) { auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); { WriteEvent{stream}.skip(); @@ -327,8 +333,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) { auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); { WriteEvent{stream}.skip(); @@ -358,8 +365,9 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) { // Simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); { // Wait for the operation to block on write so we know it's been added. @@ -393,27 +401,28 @@ TEST_F(NetworkInterfaceASIOTest, StartCommand) { // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); auto expectedMetadata = BSON("meep" << "beep"); auto expectedCommandReply = BSON("boop" << "bop" - << "ok" << 1.0); + << "ok" + << 1.0); // simulate user command - stream->simulateServer(rpc::Protocol::kOpCommandV1, - [&](RemoteCommandRequest request) -> RemoteCommandResponse { - ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, - "foo"); - ASSERT_EQ(request.dbname, "testDB"); + stream->simulateServer( + rpc::Protocol::kOpCommandV1, [&](RemoteCommandRequest request) -> RemoteCommandResponse { + ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "foo"); + ASSERT_EQ(request.dbname, "testDB"); - RemoteCommandResponse response; - response.data = expectedCommandReply; - response.metadata = expectedMetadata; - return response; - }); + RemoteCommandResponse response; + response.data = expectedCommandReply; + response.metadata = expectedMetadata; + return response; + }); auto& res = deferred.get(); @@ -450,8 +459,9 @@ public: auto stream = streamFactory().blockUntilStreamExists(testHost); ConnectEvent{stream}.skip(); stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); uint32_t messageId = 0; @@ -499,11 +509,9 @@ public: }; TEST_F(MalformedMessageTest, messageHeaderWrongResponseTo) { - runMessageTest(ErrorCodes::ProtocolError, - false, - [](MsgData::View message) { - message.setResponseToMsgId(message.getResponseToMsgId() + 1); - }); + runMessageTest(ErrorCodes::ProtocolError, false, [](MsgData::View message) { + message.setResponseToMsgId(message.getResponseToMsgId() + 1); + }); } TEST_F(MalformedMessageTest, messageHeaderlenZero) { @@ -512,15 +520,15 @@ TEST_F(MalformedMessageTest, messageHeaderlenZero) { } TEST_F(MalformedMessageTest, MessageHeaderLenTooSmall) { - runMessageTest(ErrorCodes::InvalidLength, - false, - [](MsgData::View message) { message.setLen(6); }); // min is 16 + runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) { + message.setLen(6); + }); // min is 16 } TEST_F(MalformedMessageTest, MessageHeaderLenTooLarge) { - runMessageTest(ErrorCodes::InvalidLength, - false, - [](MsgData::View message) { message.setLen(48000001); }); // max is 48000000 + runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) { + message.setLen(48000001); + }); // max is 48000000 } TEST_F(MalformedMessageTest, MessageHeaderLenNegative) { @@ -529,27 +537,27 @@ TEST_F(MalformedMessageTest, MessageHeaderLenNegative) { } TEST_F(MalformedMessageTest, MessageLenSmallerThanActual) { - runMessageTest(ErrorCodes::InvalidBSON, - true, - [](MsgData::View message) { message.setLen(message.getLen() - 10); }); + runMessageTest(ErrorCodes::InvalidBSON, true, [](MsgData::View message) { + message.setLen(message.getLen() - 10); + }); } TEST_F(MalformedMessageTest, FailedToReadAllBytesForMessage) { - runMessageTest(ErrorCodes::InvalidLength, - true, - [](MsgData::View message) { message.setLen(message.getLen() + 100); }); + runMessageTest(ErrorCodes::InvalidLength, true, [](MsgData::View message) { + message.setLen(message.getLen() + 100); + }); } TEST_F(MalformedMessageTest, UnsupportedOpcode) { - runMessageTest(ErrorCodes::UnsupportedFormat, - true, - [](MsgData::View message) { message.setOperation(2222); }); + runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) { + message.setOperation(2222); + }); } TEST_F(MalformedMessageTest, MismatchedOpcode) { - runMessageTest(ErrorCodes::UnsupportedFormat, - true, - [](MsgData::View message) { message.setOperation(2006); }); + runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) { + message.setOperation(2006); + }); } class NetworkInterfaceASIOConnectionHookTest : public NetworkInterfaceASIOTest { @@ -605,14 +613,14 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) { // simulate isMaster reply. stream->simulateServer( - rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) -> RemoteCommandResponse { + rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse { RemoteCommandResponse response; response.data = - BSON("minWireVersion" - << mongo::WireSpec::instance().minWireVersionIncoming << "maxWireVersion" - << mongo::WireSpec::instance().maxWireVersionIncoming << "TESTKEY" - << "TESTVALUE"); + BSON("minWireVersion" << mongo::WireSpec::instance().minWireVersionIncoming + << "maxWireVersion" + << mongo::WireSpec::instance().maxWireVersionIncoming + << "TESTKEY" + << "TESTVALUE"); return response; }); @@ -637,8 +645,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) { Status makeRequestError{ErrorCodes::DBPathInUse, "bloooh"}; start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) - -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + return Status::OK(); + }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return makeRequestError; @@ -659,8 +668,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) { // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); // We should stop here. auto& res = deferred.get(); @@ -676,8 +686,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) { bool handleReplyCalled = false; start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) - -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + return Status::OK(); + }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return {boost::none}; @@ -692,7 +703,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) { auto commandReply = BSON("foo" << "boo" - << "ok" << 1.0); + << "ok" + << 1.0); auto metadata = BSON("aaa" << "bbb"); @@ -704,8 +716,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) { // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); // Simulate user command. stream->simulateServer(rpc::Protocol::kOpCommandV1, @@ -739,14 +752,16 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { BSONObj hookCommandReply = BSON("blah" << "blah" - << "ok" << 1.0); + << "ok" + << 1.0); BSONObj hookReplyMetadata = BSON("1111" << 2222); Status handleReplyError{ErrorCodes::AuthSchemaIncompatible, "daowdjkpowkdjpow"}; start(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) - -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + return Status::OK(); + }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return {boost::make_optional<RemoteCommandRequest>( @@ -769,8 +784,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) { // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); // Simulate hook reply stream->simulateServer(rpc::Protocol::kOpCommandV1, @@ -873,8 +889,9 @@ TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) { // simulate isMaster reply. stream->simulateServer(rpc::Protocol::kOpQuery, - [](RemoteCommandRequest request) - -> RemoteCommandResponse { return simulateIsMaster(request); }); + [](RemoteCommandRequest request) -> RemoteCommandResponse { + return simulateIsMaster(request); + }); // Simulate hook reply stream->simulateServer(rpc::Protocol::kOpCommandV1, diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h index bfd489514b0..0c1ff03cf11 100644 --- a/src/mongo/executor/network_interface_asio_test_utils.h +++ b/src/mongo/executor/network_interface_asio_test_utils.h @@ -135,23 +135,22 @@ static Deferred<std::vector<T>> collect(std::vector<Deferred<T>>& ds, ThreadPool collectState->mem.resize(collectState->goal); for (std::size_t i = 0; i < ds.size(); ++i) { - ds[i].then(pool, - [collectState, out, i](T res) mutable { - // The bool return is unused. - stdx::lock_guard<stdx::mutex> lk(collectState->mtx); - collectState->mem[i] = std::move(res); - - // If we're done. - if (collectState->goal == ++collectState->numFinished) { - std::vector<T> outInitialized; - outInitialized.reserve(collectState->mem.size()); - for (auto&& mem_entry : collectState->mem) { - outInitialized.emplace_back(std::move(*mem_entry)); - } - out.emplace(outInitialized); - } - return true; - }); + ds[i].then(pool, [collectState, out, i](T res) mutable { + // The bool return is unused. + stdx::lock_guard<stdx::mutex> lk(collectState->mtx); + collectState->mem[i] = std::move(res); + + // If we're done. + if (collectState->goal == ++collectState->numFinished) { + std::vector<T> outInitialized; + outInitialized.reserve(collectState->mem.size()); + for (auto&& mem_entry : collectState->mem) { + outInitialized.emplace_back(std::move(*mem_entry)); + } + out.emplace(outInitialized); + } + return true; + }); } return out; } diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 1c822c9c48b..2a121e8fbb5 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -30,8 +30,8 @@ #include "mongo/platform/basic.h" -#include "mongo/executor/network_interface_mock.h" #include "mongo/executor/network_connection_hook.h" +#include "mongo/executor/network_interface_mock.h" #include <algorithm> #include <iterator> @@ -400,27 +400,27 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort } // The completion handler for the postconnect command schedules the original command. - auto postconnectCompletionHandler = - [this, op](StatusWith<RemoteCommandResponse> response) mutable { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!response.isOK()) { - op.setResponse(_now_inlock(), response.getStatus()); - op.finishResponse(); - return; - } - - auto handleStatus = - _hook->handleReply(op.getRequest().target, std::move(response.getValue())); - - if (!handleStatus.isOK()) { - op.setResponse(_now_inlock(), handleStatus); - op.finishResponse(); - return; - } - - _enqueueOperation_inlock(std::move(op)); - _connections.emplace(op.getRequest().target); - }; + auto postconnectCompletionHandler = [this, + op](StatusWith<RemoteCommandResponse> response) mutable { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (!response.isOK()) { + op.setResponse(_now_inlock(), response.getStatus()); + op.finishResponse(); + return; + } + + auto handleStatus = + _hook->handleReply(op.getRequest().target, std::move(response.getValue())); + + if (!handleStatus.isOK()) { + op.setResponse(_now_inlock(), handleStatus); + op.finishResponse(); + return; + } + + _enqueueOperation_inlock(std::move(op)); + _connections.emplace(op.getRequest().target); + }; auto postconnectOp = NetworkOperation(op.getCallbackHandle(), std::move(*hookPostconnectCommand), diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp index 4e1e3f45a35..d59f7970474 100644 --- a/src/mongo/executor/network_interface_mock_test.cpp +++ b/src/mongo/executor/network_interface_mock_test.cpp @@ -164,16 +164,14 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) { BSONObj(), Milliseconds(0)}; - ASSERT_OK(net().startCommand(cb, - actualCommandExpected, - [&](StatusWith<RemoteCommandResponse> resp) { - commandFinished = true; - if (resp.isOK()) { - gotCorrectCommandReply = - (actualResponseExpected.toString() == - resp.getValue().toString()); - } - })); + ASSERT_OK( + net().startCommand(cb, actualCommandExpected, [&](StatusWith<RemoteCommandResponse> resp) { + commandFinished = true; + if (resp.isOK()) { + gotCorrectCommandReply = + (actualResponseExpected.toString() == resp.getValue().toString()); + } + })); // At this point validate and makeRequest should have been called. ASSERT(validateCalled); @@ -224,10 +222,12 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { // We just need some obscure non-OK code. return {ErrorCodes::ConflictingOperationInProgress, "blah"}; }, - [&](const HostAndPort& remoteHost) - -> StatusWith<boost::optional<RemoteCommandRequest>> { MONGO_UNREACHABLE; }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) - -> Status { MONGO_UNREACHABLE; })); + [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { + MONGO_UNREACHABLE; + }, + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status { + MONGO_UNREACHABLE; + })); startNetwork(); @@ -261,14 +261,16 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) { TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { bool makeRequestCalled = false; net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) - -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + return Status::OK(); + }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return {boost::none}; }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) - -> Status { MONGO_UNREACHABLE; })); + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status { + MONGO_UNREACHABLE; + })); startNetwork(); @@ -296,14 +298,16 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) { TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { bool makeRequestCalled = false; net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) - -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + return Status::OK(); + }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { makeRequestCalled = true; return {ErrorCodes::InvalidSyncSource, "blah"}; }, - [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) - -> Status { MONGO_UNREACHABLE; })); + [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status { + MONGO_UNREACHABLE; + })); startNetwork(); @@ -333,8 +337,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) { TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) { bool handleReplyCalled = false; net().setConnectionHook(makeTestHook( - [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) - -> Status { return Status::OK(); }, + [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status { + return Status::OK(); + }, [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> { return boost::make_optional<RemoteCommandRequest>({}); }, @@ -404,8 +409,9 @@ TEST_F(NetworkInterfaceMockTest, CommandTimeout) { request.timeout = Milliseconds(2000); ErrorCodes::Error statusPropagated = ErrorCodes::OK; - auto finishFn = - [&](StatusWith<RemoteCommandResponse> resp) { statusPropagated = resp.getStatus().code(); }; + auto finishFn = [&](StatusWith<RemoteCommandResponse> resp) { + statusPropagated = resp.getStatus().code(); + }; // // Command times out. diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp index b678a9486d1..f556b2aab35 100644 --- a/src/mongo/executor/network_interface_thread_pool.cpp +++ b/src/mongo/executor/network_interface_thread_pool.cpp @@ -132,12 +132,11 @@ void NetworkInterfaceThreadPool::consumeTasks(stdx::unique_lock<stdx::mutex> lk) if (!_registeredAlarm) { _registeredAlarm = true; lk.unlock(); - _net->setAlarm(_net->now(), - [this] { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _registeredAlarm = false; - consumeTasks(std::move(lk)); - }); + _net->setAlarm(_net->now(), [this] { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _registeredAlarm = false; + consumeTasks(std::move(lk)); + }); } return; diff --git a/src/mongo/executor/network_interface_thread_pool_test.cpp b/src/mongo/executor/network_interface_thread_pool_test.cpp index 319d0299b27..b9a8c070b2d 100644 --- a/src/mongo/executor/network_interface_thread_pool_test.cpp +++ b/src/mongo/executor/network_interface_thread_pool_test.cpp @@ -33,10 +33,10 @@ #include "mongo/base/init.h" #include "mongo/executor/network_interface_asio.h" #include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/concurrency/thread_pool_test_common.h" #include "mongo/util/concurrency/thread_pool_test_fixture.h" -#include "mongo/stdx/memory.h" namespace { using namespace mongo; diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h index b6cedb8f976..3e997e448de 100644 --- a/src/mongo/executor/remote_command_response.h +++ b/src/mongo/executor/remote_command_response.h @@ -29,8 +29,8 @@ #pragma once #include <iosfwd> -#include <string> #include <memory> +#include <string> #include "mongo/db/jsobj.h" #include "mongo/util/net/message.h" diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index ec68a837c8e..fd630e527b7 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -28,9 +28,9 @@ #pragma once -#include <string> -#include <memory> #include <functional> +#include <memory> +#include <string> #include "mongo/base/disallow_copying.h" #include "mongo/base/status.h" diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp index d6c479a1a32..ed956b93567 100644 --- a/src/mongo/executor/task_executor_test_common.cpp +++ b/src/mongo/executor/task_executor_test_common.cpp @@ -90,20 +90,20 @@ public: } }; -#define COMMON_EXECUTOR_TEST(TEST_NAME) \ - class CET_##TEST_NAME : public CommonTaskExecutorTestFixture { \ - public: \ - CET_##TEST_NAME(ExecutorFactory makeExecutor) \ - : CommonTaskExecutorTestFixture(std::move(makeExecutor)) {} \ - \ - private: \ - void _doTest() override; \ - static const CetRegistrationAgent _agent; \ - }; \ - const CetRegistrationAgent CET_##TEST_NAME::_agent(#TEST_NAME, \ - [](ExecutorFactory makeExecutor) { \ - return stdx::make_unique<CET_##TEST_NAME>(std::move(makeExecutor)); \ - }); \ +#define COMMON_EXECUTOR_TEST(TEST_NAME) \ + class CET_##TEST_NAME : public CommonTaskExecutorTestFixture { \ + public: \ + CET_##TEST_NAME(ExecutorFactory makeExecutor) \ + : CommonTaskExecutorTestFixture(std::move(makeExecutor)) {} \ + \ + private: \ + void _doTest() override; \ + static const CetRegistrationAgent _agent; \ + }; \ + const CetRegistrationAgent CET_##TEST_NAME::_agent( \ + #TEST_NAME, [](ExecutorFactory makeExecutor) { \ + return stdx::make_unique<CET_##TEST_NAME>(std::move(makeExecutor)); \ + }); \ void CET_##TEST_NAME::_doTest() void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* target) { @@ -132,9 +132,10 @@ void scheduleSetStatusAndShutdown(const TaskExecutor::CallbackArgs& cbData, *outStatus1 = cbData.status; return; } - *outStatus1 = cbData.executor->scheduleWork(stdx::bind(setStatusAndShutdown, - stdx::placeholders::_1, - outStatus2)).getStatus(); + *outStatus1 = + cbData.executor + ->scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, outStatus2)) + .getStatus(); } COMMON_EXECUTOR_TEST(RunOne) { @@ -180,10 +181,10 @@ COMMON_EXECUTOR_TEST(OneSchedulesAnother) { TaskExecutor& executor = getExecutor(); Status status1 = getDetectableErrorStatus(); Status status2 = getDetectableErrorStatus(); - ASSERT_OK(executor.scheduleWork(stdx::bind(scheduleSetStatusAndShutdown, - stdx::placeholders::_1, - &status1, - &status2)).getStatus()); + ASSERT_OK(executor + .scheduleWork(stdx::bind( + scheduleSetStatusAndShutdown, stdx::placeholders::_1, &status1, &status2)) + .getStatus()); launchExecutorThread(); joinExecutorThread(); ASSERT_OK(status1); @@ -360,9 +361,10 @@ static void setStatusOnRemoteCommandCompletion( Status* outStatus) { if (cbData.request != expectedRequest) { *outStatus = Status(ErrorCodes::BadValue, - mongoutils::str::stream() - << "Actual request: " << getRequestDescription(cbData.request) - << "; expected: " << getRequestDescription(expectedRequest)); + mongoutils::str::stream() << "Actual request: " + << getRequestDescription(cbData.request) + << "; expected: " + << getRequestDescription(expectedRequest)); return; } *outStatus = cbData.response.getStatus(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 6d58745f194..9b5d18cbd84 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -264,20 +264,18 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt( return cbHandle; } lk.unlock(); - _net->setAlarm(when, - [this, when, cbHandle] { - auto cbState = - checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); - if (cbState->canceled.load()) { - return; - } - invariant(now() >= when); - stdx::unique_lock<stdx::mutex> lk(_mutex); - if (cbState->canceled.load()) { - return; - } - scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); - }); + _net->setAlarm(when, [this, when, cbHandle] { + auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue())); + if (cbState->canceled.load()) { + return; + } + invariant(now() >= when); + stdx::unique_lock<stdx::mutex> lk(_mutex); + if (cbState->canceled.load()) { + return; + } + scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk)); + }); return cbHandle; } @@ -350,9 +348,9 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC if (_inShutdown) { return; } - LOG(3) << "Received remote response: " << (response.isOK() - ? response.getValue().toString() - : response.getStatus().toString()); + LOG(3) << "Received remote response: " + << (response.isOK() ? response.getValue().toString() + : response.getStatus().toString()); swap(cbState->callback, newCb); scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); }); diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index f4afb7b58c9..9cd55507649 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -32,8 +32,8 @@ #include "mongo/base/disallow_copying.h" #include "mongo/executor/task_executor.h" -#include "mongo/stdx/list.h" #include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp index b2d43e669a0..887cfa59d45 100644 --- a/src/mongo/executor/thread_pool_task_executor_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -48,10 +48,9 @@ namespace executor { namespace { MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) { - addTestsForExecutor("ThreadPoolExecutorCommon", - [](std::unique_ptr<NetworkInterfaceMock>* net) { - return makeThreadPoolTestExecutor(std::move(*net)); - }); + addTestsForExecutor("ThreadPoolExecutorCommon", [](std::unique_ptr<NetworkInterfaceMock>* net) { + return makeThreadPoolTestExecutor(std::move(*net)); + }); return Status::OK(); } @@ -100,14 +99,16 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { auto& executor = getExecutor(); launchExecutorThread(); - ASSERT_OK(executor.scheduleWork([&](const TaskExecutor::CallbackArgs& cbData) { - status1 = cbData.status; - if (!status1.isOK()) - return; - barrier.countDownAndWait(); - cb2 = cbData.executor->scheduleWork( - [&status2](const TaskExecutor::CallbackArgs& cbData) { status2 = cbData.status; }); - }).getStatus()); + ASSERT_OK(executor + .scheduleWork([&](const TaskExecutor::CallbackArgs& cbData) { + status1 = cbData.status; + if (!status1.isOK()) + return; + barrier.countDownAndWait(); + cb2 = cbData.executor->scheduleWork([&status2]( + const TaskExecutor::CallbackArgs& cbData) { status2 = cbData.status; }); + }) + .getStatus()); auto fpTPTE1 = getGlobalFailPointRegistry()->getFailPoint("scheduleIntoPoolSpinsUntilThreadPoolShutsDown"); diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.h b/src/mongo/executor/thread_pool_task_executor_test_fixture.h index 04a3089bbab..b549184b26f 100644 --- a/src/mongo/executor/thread_pool_task_executor_test_fixture.h +++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.h @@ -31,8 +31,8 @@ #include <memory> #include "mongo/executor/network_interface_mock.h" -#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/executor/task_executor_test_fixture.h" +#include "mongo/executor/thread_pool_task_executor.h" namespace mongo { namespace executor { |