summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp109
-rw-r--r--src/mongo/executor/async_mock_stream_factory.h2
-rw-r--r--src/mongo/executor/async_stream_test.cpp9
-rw-r--r--src/mongo/executor/connection_pool.cpp79
-rw-r--r--src/mongo/executor/connection_pool.h2
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp16
-rw-r--r--src/mongo/executor/connection_pool_asio.h4
-rw-r--r--src/mongo/executor/connection_pool_test.cpp6
-rw-r--r--src/mongo/executor/connection_pool_test_fixture.cpp12
-rw-r--r--src/mongo/executor/downconvert_find_and_getmore_commands.cpp12
-rw-r--r--src/mongo/executor/network_interface_asio.cpp3
-rw-r--r--src/mongo/executor/network_interface_asio.h2
-rw-r--r--src/mongo/executor/network_interface_asio_auth.cpp7
-rw-r--r--src/mongo/executor/network_interface_asio_command.cpp71
-rw-r--r--src/mongo/executor/network_interface_asio_connect.cpp7
-rw-r--r--src/mongo/executor/network_interface_asio_integration_test.cpp112
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp12
-rw-r--r--src/mongo/executor/network_interface_asio_test.cpp169
-rw-r--r--src/mongo/executor/network_interface_asio_test_utils.h33
-rw-r--r--src/mongo/executor/network_interface_mock.cpp44
-rw-r--r--src/mongo/executor/network_interface_mock_test.cpp58
-rw-r--r--src/mongo/executor/network_interface_thread_pool.cpp11
-rw-r--r--src/mongo/executor/network_interface_thread_pool_test.cpp2
-rw-r--r--src/mongo/executor/remote_command_response.h2
-rw-r--r--src/mongo/executor/task_executor.h4
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp50
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp32
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h2
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test.cpp25
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test_fixture.h2
30 files changed, 459 insertions, 440 deletions
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
index ae8356a4982..c2fee263c60 100644
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ b/src/mongo/executor/async_mock_stream_factory.cpp
@@ -124,18 +124,17 @@ AsyncMockStreamFactory::MockStream::~MockStream() {
void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterator endpoints,
ConnectHandler&& connectHandler) {
// Suspend execution after "connecting"
- _defer(kBlockedBeforeConnect,
- [this, connectHandler, endpoints]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- // We shim a lambda to give connectHandler the right signature since it doesn't take
- // a size_t param.
- checkCanceled(
- _strand,
- &_state,
- [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); },
- 0);
- });
+ _defer(kBlockedBeforeConnect, [this, connectHandler, endpoints]() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // We shim a lambda to give connectHandler the right signature since it doesn't take
+ // a size_t param.
+ checkCanceled(
+ _strand,
+ &_state,
+ [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); },
+ 0);
+ });
}
void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
@@ -147,11 +146,10 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
_writeQueue.push({begin, begin + size});
// Suspend execution after data is written.
- _defer_inlock(kBlockedAfterWrite,
- [this, writeHandler, size]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- checkCanceled(_strand, &_state, std::move(writeHandler), size);
- });
+ _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ checkCanceled(_strand, &_state, std::move(writeHandler), size);
+ });
}
void AsyncMockStreamFactory::MockStream::cancel() {
@@ -169,45 +167,44 @@ void AsyncMockStreamFactory::MockStream::cancel() {
void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf,
StreamHandler&& readHandler) {
// Suspend execution before data is read.
- _defer(kBlockedBeforeRead,
- [this, buf, readHandler]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- int nToCopy = 0;
-
- // If we've set an error, return that instead of a read.
- if (!_error) {
- auto nextRead = std::move(_readQueue.front());
- _readQueue.pop();
-
- auto beginDst = asio::buffer_cast<uint8_t*>(buf);
- nToCopy = std::min(nextRead.size(), asio::buffer_size(buf));
-
- auto endSrc = std::begin(nextRead);
- std::advance(endSrc, nToCopy);
-
- auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst);
- invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy));
- log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy)
- << " remaining in buffer";
- }
-
- auto handler = readHandler;
-
- // If we did not receive all the bytes, we should return an error
- if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) {
- handler = [readHandler](std::error_code ec, size_t len) {
- // If we have an error here we've been canceled, and that takes precedence
- if (ec)
- return readHandler(ec, len);
-
- // Call the original handler with an error
- readHandler(make_error_code(ErrorCodes::InvalidLength), len);
- };
- }
-
- checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error);
- _error.clear();
- });
+ _defer(kBlockedBeforeRead, [this, buf, readHandler]() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ int nToCopy = 0;
+
+ // If we've set an error, return that instead of a read.
+ if (!_error) {
+ auto nextRead = std::move(_readQueue.front());
+ _readQueue.pop();
+
+ auto beginDst = asio::buffer_cast<uint8_t*>(buf);
+ nToCopy = std::min(nextRead.size(), asio::buffer_size(buf));
+
+ auto endSrc = std::begin(nextRead);
+ std::advance(endSrc, nToCopy);
+
+ auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst);
+ invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy));
+ log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy)
+ << " remaining in buffer";
+ }
+
+ auto handler = readHandler;
+
+ // If we did not receive all the bytes, we should return an error
+ if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) {
+ handler = [readHandler](std::error_code ec, size_t len) {
+ // If we have an error here we've been canceled, and that takes precedence
+ if (ec)
+ return readHandler(ec, len);
+
+ // Call the original handler with an error
+ readHandler(make_error_code(ErrorCodes::InvalidLength), len);
+ };
+ }
+
+ checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error);
+ _error.clear();
+ });
}
void AsyncMockStreamFactory::MockStream::pushRead(std::vector<uint8_t> toRead) {
diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h
index 11c10128065..0c1e33ad3b4 100644
--- a/src/mongo/executor/async_mock_stream_factory.h
+++ b/src/mongo/executor/async_mock_stream_factory.h
@@ -30,8 +30,8 @@
#include <asio.hpp>
#include <cstdint>
-#include <queue>
#include <memory>
+#include <queue>
#include <unordered_map>
#include "mongo/executor/async_stream_factory_interface.h"
diff --git a/src/mongo/executor/async_stream_test.cpp b/src/mongo/executor/async_stream_test.cpp
index b772b5ac0e1..2731df5d713 100644
--- a/src/mongo/executor/async_stream_test.cpp
+++ b/src/mongo/executor/async_stream_test.cpp
@@ -123,11 +123,10 @@ TEST(AsyncStreamTest, IsOpen) {
executor::Deferred<bool> opened;
log() << "opening up outgoing connection";
- stream.connect(endpoints,
- [opened](std::error_code ec) mutable {
- log() << "opened outgoing connection";
- opened.emplace(!ec);
- });
+ stream.connect(endpoints, [opened](std::error_code ec) mutable {
+ log() << "opened outgoing connection";
+ opened.emplace(!ec);
+ });
ASSERT_TRUE(opened.get());
ASSERT_TRUE(stream.isOpen());
diff --git a/src/mongo/executor/connection_pool.cpp b/src/mongo/executor/connection_pool.cpp
index e6c0d150694..77e9f542af7 100644
--- a/src/mongo/executor/connection_pool.cpp
+++ b/src/mongo/executor/connection_pool.cpp
@@ -31,8 +31,8 @@
#include "mongo/executor/connection_pool.h"
-#include "mongo/executor/connection_pool_stats.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/executor/connection_pool_stats.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
@@ -363,30 +363,29 @@ void ConnectionPool::SpecificPool::addToReady(stdx::unique_lock<stdx::mutex>& lk
// Our strategy for refreshing connections is to check them out and
// immediately check them back in (which kicks off the refresh logic in
// returnConnection
- connPtr->setTimeout(_parent->_options.refreshRequirement,
- [this, connPtr]() {
- OwnedConnection conn;
+ connPtr->setTimeout(_parent->_options.refreshRequirement, [this, connPtr]() {
+ OwnedConnection conn;
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+ stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
- if (!_readyPool.count(connPtr)) {
- // We've already been checked out. We don't need to refresh
- // ourselves.
- return;
- }
+ if (!_readyPool.count(connPtr)) {
+ // We've already been checked out. We don't need to refresh
+ // ourselves.
+ return;
+ }
- conn = takeFromPool(_readyPool, connPtr);
+ conn = takeFromPool(_readyPool, connPtr);
- // If we're in shutdown, we don't need to refresh connections
- if (_state == State::kInShutdown)
- return;
+ // If we're in shutdown, we don't need to refresh connections
+ if (_state == State::kInShutdown)
+ return;
- _checkedOutPool[connPtr] = std::move(conn);
+ _checkedOutPool[connPtr] = std::move(conn);
- connPtr->indicateSuccess();
+ connPtr->indicateSuccess();
- returnConnection(connPtr, std::move(lk));
- });
+ returnConnection(connPtr, std::move(lk));
+ });
fulfillRequests(lk);
}
@@ -608,31 +607,29 @@ void ConnectionPool::SpecificPool::updateStateInLock() {
// We set a timer for the most recent request, then invoke each timed
// out request we couldn't service
- _requestTimer->setTimeout(
- timeout,
- [this]() {
- stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
-
- auto now = _parent->_factory->now();
-
- while (_requests.size()) {
- auto& x = _requests.top();
-
- if (x.first <= now) {
- auto cb = std::move(x.second);
- _requests.pop();
-
- lk.unlock();
- cb(Status(ErrorCodes::ExceededTimeLimit,
- "Couldn't get a connection within the time limit"));
- lk.lock();
- } else {
- break;
- }
+ _requestTimer->setTimeout(timeout, [this]() {
+ stdx::unique_lock<stdx::mutex> lk(_parent->_mutex);
+
+ auto now = _parent->_factory->now();
+
+ while (_requests.size()) {
+ auto& x = _requests.top();
+
+ if (x.first <= now) {
+ auto cb = std::move(x.second);
+ _requests.pop();
+
+ lk.unlock();
+ cb(Status(ErrorCodes::ExceededTimeLimit,
+ "Couldn't get a connection within the time limit"));
+ lk.lock();
+ } else {
+ break;
}
+ }
- updateStateInLock();
- });
+ updateStateInLock();
+ });
} else if (_checkedOutPool.size()) {
// If we have no requests, but someone's using a connection, we just
// hang around until the next request or a return
diff --git a/src/mongo/executor/connection_pool.h b/src/mongo/executor/connection_pool.h
index 1ac0b7194ee..68ea3ad4eba 100644
--- a/src/mongo/executor/connection_pool.h
+++ b/src/mongo/executor/connection_pool.h
@@ -28,8 +28,8 @@
#pragma once
#include <memory>
-#include <unordered_map>
#include <queue>
+#include <unordered_map>
#include "mongo/base/disallow_copying.h"
#include "mongo/stdx/chrono.h"
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
index 7be1874e7b3..2cb395df612 100644
--- a/src/mongo/executor/connection_pool_asio.cpp
+++ b/src/mongo/executor/connection_pool_asio.cpp
@@ -231,18 +231,16 @@ void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
cb(this, failedResponse.getStatus());
});
- _global->_impl->_asyncRunCommand(
- op,
- [this, op](std::error_code ec, size_t bytes) {
- cancelTimeout();
+ _global->_impl->_asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) {
+ cancelTimeout();
- auto cb = std::move(_refreshCallback);
+ auto cb = std::move(_refreshCallback);
- if (ec)
- return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
+ if (ec)
+ return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
- cb(this, Status::OK());
- });
+ cb(this, Status::OK());
+ });
});
}
diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h
index 36286fe7803..00b09667c1c 100644
--- a/src/mongo/executor/connection_pool_asio.h
+++ b/src/mongo/executor/connection_pool_asio.h
@@ -31,10 +31,10 @@
#include <memory>
+#include "mongo/executor/async_stream_interface.h"
#include "mongo/executor/connection_pool.h"
-#include "mongo/executor/network_interface_asio.h"
#include "mongo/executor/network_interface.h"
-#include "mongo/executor/async_stream_interface.h"
+#include "mongo/executor/network_interface_asio.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
diff --git a/src/mongo/executor/connection_pool_test.cpp b/src/mongo/executor/connection_pool_test.cpp
index 24949a142c7..5f7cf6676da 100644
--- a/src/mongo/executor/connection_pool_test.cpp
+++ b/src/mongo/executor/connection_pool_test.cpp
@@ -30,9 +30,9 @@
#include "mongo/executor/connection_pool_test_fixture.h"
#include "mongo/executor/connection_pool.h"
-#include "mongo/unittest/unittest.h"
-#include "mongo/stdx/memory.h"
#include "mongo/stdx/future.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/unittest/unittest.h"
namespace mongo {
namespace executor {
@@ -56,7 +56,7 @@ private:
};
#define CONN2ID(swConn) \
- [](StatusWith<ConnectionPool::ConnectionHandle> & swConn) { \
+ [](StatusWith<ConnectionPool::ConnectionHandle>& swConn) { \
ASSERT(swConn.isOK()); \
return static_cast<ConnectionImpl*>(swConn.getValue().get())->id(); \
}(swConn)
diff --git a/src/mongo/executor/connection_pool_test_fixture.cpp b/src/mongo/executor/connection_pool_test_fixture.cpp
index 2d48ad0b5e2..53badbbe1f3 100644
--- a/src/mongo/executor/connection_pool_test_fixture.cpp
+++ b/src/mongo/executor/connection_pool_test_fixture.cpp
@@ -159,9 +159,9 @@ void ConnectionImpl::cancelTimeout() {
void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) {
_setupCallback = std::move(cb);
- _timer.setTimeout(
- timeout,
- [this] { _setupCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); });
+ _timer.setTimeout(timeout, [this] {
+ _setupCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout"));
+ });
_setupQueue.push_back(this);
@@ -175,9 +175,9 @@ void ConnectionImpl::setup(Milliseconds timeout, SetupCallback cb) {
void ConnectionImpl::refresh(Milliseconds timeout, RefreshCallback cb) {
_refreshCallback = std::move(cb);
- _timer.setTimeout(
- timeout,
- [this] { _refreshCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout")); });
+ _timer.setTimeout(timeout, [this] {
+ _refreshCallback(this, Status(ErrorCodes::ExceededTimeLimit, "timeout"));
+ });
_refreshQueue.push_back(this);
diff --git a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
index e03e8a0b2d4..667c955988f 100644
--- a/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
+++ b/src/mongo/executor/downconvert_find_and_getmore_commands.cpp
@@ -68,8 +68,10 @@ StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::int32_t reque
if (header.getResponseToMsgId() != requestId) {
return {ErrorCodes::ProtocolError,
str::stream() << "responseTo field of OP_REPLY header with value '"
- << header.getResponseToMsgId() << "' does not match requestId '"
- << requestId << "'"};
+ << header.getResponseToMsgId()
+ << "' does not match requestId '"
+ << requestId
+ << "'"};
}
if ((header.dataLen() < 0) ||
@@ -95,7 +97,8 @@ StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::int32_t reque
if (qr.getNReturned() != 1) {
return {ErrorCodes::BadValue,
str::stream() << "ResultFlag_ErrSet flag set on reply, but nReturned was '"
- << qr.getNReturned() << "' - expected 1"};
+ << qr.getNReturned()
+ << "' - expected 1"};
}
// Convert error document to a Status.
// Will throw if first document is invalid BSON.
@@ -121,7 +124,8 @@ StatusWith<std::tuple<CursorId, BSONArray>> getBatchFromReply(std::int32_t reque
return {ErrorCodes::InvalidLength,
str::stream() << "Count of documents in OP_REPLY message (" << batch.arrSize()
<< ") did not match the value specified in the nReturned field ("
- << qr.getNReturned() << ")"};
+ << qr.getNReturned()
+ << ")"};
}
return {std::make_tuple(qr.getCursorId(), batch.arr())};
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index ca54f813e37..54dbeb7434f 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -155,7 +155,8 @@ void NetworkInterfaceASIO::startup() {
_io_service.run();
} catch (...) {
severe() << "Uncaught exception in NetworkInterfaceASIO IO "
- "worker thread of type: " << exceptionToStatus();
+ "worker thread of type: "
+ << exceptionToStatus();
fassertFailed(28820);
}
});
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 2bbded1202d..130e9e83e3d 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -43,8 +43,8 @@
#include "mongo/base/system_error.h"
#include "mongo/executor/async_stream_factory_interface.h"
#include "mongo/executor/async_stream_interface.h"
-#include "mongo/executor/connection_pool.h"
#include "mongo/executor/async_timer_interface.h"
+#include "mongo/executor/connection_pool.h"
#include "mongo/executor/network_connection_hook.h"
#include "mongo/executor/network_interface.h"
#include "mongo/executor/remote_command_request.h"
diff --git a/src/mongo/executor/network_interface_asio_auth.cpp b/src/mongo/executor/network_interface_asio_auth.cpp
index df01ad3b5c3..52d7029d42a 100644
--- a/src/mongo/executor/network_interface_asio_auth.cpp
+++ b/src/mongo/executor/network_interface_asio_auth.cpp
@@ -134,10 +134,9 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
};
- _asyncRunCommand(op,
- [this, op, parseIsMaster](std::error_code ec, size_t bytes) {
- _validateAndRun(op, ec, std::move(parseIsMaster));
- });
+ _asyncRunCommand(op, [this, op, parseIsMaster](std::error_code ec, size_t bytes) {
+ _validateAndRun(op, ec, std::move(parseIsMaster));
+ });
}
void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
diff --git a/src/mongo/executor/network_interface_asio_command.cpp b/src/mongo/executor/network_interface_asio_command.cpp
index db7ba556d26..f6812fd9c5a 100644
--- a/src/mongo/executor/network_interface_asio_command.cpp
+++ b/src/mongo/executor/network_interface_asio_command.cpp
@@ -143,9 +143,11 @@ ResponseStatus decodeRPC(Message* received,
return Status(ErrorCodes::RPCProtocolNegotiationFailed,
str::stream() << "Mismatched RPC protocols - request was '"
- << requestProtocol.getValue().toString() << "' '"
+ << requestProtocol.getValue().toString()
+ << "' '"
<< " but reply was '"
- << networkOpToString(received->operation()) << "'");
+ << networkOpToString(received->operation())
+ << "'");
}
auto commandReply = reply->getCommandReply();
auto replyMetadata = reply->getMetadata();
@@ -246,10 +248,9 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
return _completeOperation(op, beginStatus);
}
- _asyncRunCommand(op,
- [this, op](std::error_code ec, size_t bytes) {
- _validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); });
- });
+ _asyncRunCommand(op, [this, op](std::error_code ec, size_t bytes) {
+ _validateAndRun(op, ec, [this, op]() { _completedOpCallback(op); });
+ });
}
void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) {
@@ -370,39 +371,34 @@ void NetworkInterfaceASIO::_asyncRunCommand(AsyncOp* op, NetworkOpHandler handle
size_t bytes) {
// The operation could have been canceled after starting the command, but before
// receiving the header
- _validateAndRun(op,
- ec,
- [this, op, recvMessageCallback, ec, bytes, cmd, handler] {
- // validate response id
- uint32_t expectedId = cmd->toSend().header().getId();
- uint32_t actualId = cmd->header().constView().getResponseToMsgId();
- if (actualId != expectedId) {
- LOG(3) << "got wrong response:"
- << " expected response id: " << expectedId
- << ", got response id: " << actualId;
- return handler(make_error_code(ErrorCodes::ProtocolError), bytes);
- }
-
- asyncRecvMessageBody(cmd->conn().stream(),
- &cmd->header(),
- &cmd->toRecv(),
- std::move(recvMessageCallback));
- });
+ _validateAndRun(op, ec, [this, op, recvMessageCallback, ec, bytes, cmd, handler] {
+ // validate response id
+ uint32_t expectedId = cmd->toSend().header().getId();
+ uint32_t actualId = cmd->header().constView().getResponseToMsgId();
+ if (actualId != expectedId) {
+ LOG(3) << "got wrong response:"
+ << " expected response id: " << expectedId
+ << ", got response id: " << actualId;
+ return handler(make_error_code(ErrorCodes::ProtocolError), bytes);
+ }
+
+ asyncRecvMessageBody(cmd->conn().stream(),
+ &cmd->header(),
+ &cmd->toRecv(),
+ std::move(recvMessageCallback));
+ });
};
// Step 2
- auto sendMessageCallback =
- [this, cmd, handler, recvHeaderCallback, op](std::error_code ec, size_t bytes) {
- _validateAndRun(op,
- ec,
- [this, cmd, op, recvHeaderCallback] {
- asyncRecvMessageHeader(cmd->conn().stream(),
- &cmd->header(),
- std::move(recvHeaderCallback));
- });
+ auto sendMessageCallback = [this, cmd, handler, recvHeaderCallback, op](std::error_code ec,
+ size_t bytes) {
+ _validateAndRun(op, ec, [this, cmd, op, recvHeaderCallback] {
+ asyncRecvMessageHeader(
+ cmd->conn().stream(), &cmd->header(), std::move(recvHeaderCallback));
+ });
- };
+ };
// Step 1
asyncSendMessage(cmd->conn().stream(), &cmd->toSend(), std::move(sendMessageCallback));
@@ -451,10 +447,9 @@ void NetworkInterfaceASIO::_runConnectionHook(AsyncOp* op) {
return _beginCommunication(op);
};
- return _asyncRunCommand(op,
- [this, op, finishHook](std::error_code ec, std::size_t bytes) {
- _validateAndRun(op, ec, finishHook);
- });
+ return _asyncRunCommand(op, [this, op, finishHook](std::error_code ec, std::size_t bytes) {
+ _validateAndRun(op, ec, finishHook);
+ });
}
diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp
index a72db76bfe7..4bda92b42b9 100644
--- a/src/mongo/executor/network_interface_asio_connect.cpp
+++ b/src/mongo/executor/network_interface_asio_connect.cpp
@@ -102,10 +102,9 @@ void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator end
auto& stream = op->connection().stream();
- stream.connect(std::move(endpoints),
- [this, op](std::error_code ec) {
- _validateAndRun(op, ec, [this, op]() { _runIsMaster(op); });
- });
+ stream.connect(std::move(endpoints), [this, op](std::error_code ec) {
+ _validateAndRun(op, ec, [this, op]() { _runIsMaster(op); });
+ });
}
} // namespace executor
diff --git a/src/mongo/executor/network_interface_asio_integration_test.cpp b/src/mongo/executor/network_interface_asio_integration_test.cpp
index 617ae363937..9272e6abac4 100644
--- a/src/mongo/executor/network_interface_asio_integration_test.cpp
+++ b/src/mongo/executor/network_interface_asio_integration_test.cpp
@@ -90,11 +90,10 @@ public:
Deferred<StatusWith<RemoteCommandResponse>> runCommand(
const TaskExecutor::CallbackHandle& cbHandle, const RemoteCommandRequest& request) {
Deferred<StatusWith<RemoteCommandResponse>> deferred;
- net().startCommand(cbHandle,
- request,
- [deferred](StatusWith<RemoteCommandResponse> resp) mutable {
- deferred.emplace(std::move(resp));
- });
+ net().startCommand(
+ cbHandle, request, [deferred](StatusWith<RemoteCommandResponse> resp) mutable {
+ deferred.emplace(std::move(resp));
+ });
return deferred;
}
@@ -153,7 +152,8 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, Timeouts) {
assertCommandFailsOnClient("admin",
BSON("sleep" << 1 << "lock"
<< "none"
- << "secs" << 10),
+ << "secs"
+ << 10),
Milliseconds(100),
ErrorCodes::ExceededTimeLimit);
@@ -161,7 +161,8 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, Timeouts) {
assertCommandOK("admin",
BSON("sleep" << 1 << "lock"
<< "none"
- << "secs" << 1),
+ << "secs"
+ << 1),
Milliseconds(10000000));
}
@@ -173,25 +174,25 @@ public:
Deferred<Status> run(Fixture* fixture, Pool* pool, Milliseconds timeout = Milliseconds(60000)) {
auto cb = makeCallbackHandle();
auto self = *this;
- auto out =
- fixture->runCommand(cb,
- {unittest::getFixtureConnectionString().getServers()[0],
- "admin",
- _command,
- timeout})
- .then(pool,
- [self](StatusWith<RemoteCommandResponse> resp) -> Status {
- auto status = resp.isOK()
- ? getStatusFromCommandResult(resp.getValue().data)
- : resp.getStatus();
-
- return status == self._expected
- ? Status::OK()
- : Status{ErrorCodes::BadValue,
- str::stream() << "Expected "
- << ErrorCodes::errorString(self._expected)
- << " but got " << status.toString()};
- });
+ auto out = fixture
+ ->runCommand(cb,
+ {unittest::getFixtureConnectionString().getServers()[0],
+ "admin",
+ _command,
+ timeout})
+ .then(pool, [self](StatusWith<RemoteCommandResponse> resp) -> Status {
+ auto status = resp.isOK()
+ ? getStatusFromCommandResult(resp.getValue().data)
+ : resp.getStatus();
+
+ return status == self._expected
+ ? Status::OK()
+ : Status{ErrorCodes::BadValue,
+ str::stream() << "Expected "
+ << ErrorCodes::errorString(self._expected)
+ << " but got "
+ << status.toString()};
+ });
if (_cancel) {
invariant(fixture->randomNumberGenerator());
sleepmillis(fixture->randomNumberGenerator()->nextInt32(10));
@@ -203,33 +204,41 @@ public:
static Deferred<Status> runTimeoutOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
- << "secs" << 1),
+ << "secs"
+ << 1),
ErrorCodes::ExceededTimeLimit,
- false).run(fixture, pool, Milliseconds(100));
+ false)
+ .run(fixture, pool, Milliseconds(100));
}
static Deferred<Status> runCompleteOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
- << "millis" << 100),
+ << "millis"
+ << 100),
ErrorCodes::OK,
- false).run(fixture, pool);
+ false)
+ .run(fixture, pool);
}
static Deferred<Status> runCancelOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
- << "secs" << 10),
+ << "secs"
+ << 10),
ErrorCodes::CallbackCanceled,
- true).run(fixture, pool);
+ true)
+ .run(fixture, pool);
}
static Deferred<Status> runLongOp(Fixture* fixture, Pool* pool) {
return StressTestOp(BSON("sleep" << 1 << "lock"
<< "none"
- << "secs" << 30),
+ << "secs"
+ << 30),
ErrorCodes::OK,
- false).run(fixture, pool, RemoteCommandRequest::kNoTimeout);
+ false)
+ .run(fixture, pool, RemoteCommandRequest::kNoTimeout);
}
private:
@@ -265,26 +274,24 @@ TEST_F(NetworkInterfaceASIOIntegrationTest, StressTest) {
pool.join();
});
- std::generate_n(std::back_inserter(ops),
- numOps,
- [&rng, &pool, this] {
+ std::generate_n(std::back_inserter(ops), numOps, [&rng, &pool, this] {
- // stagger operations slightly to mitigate connection pool contention
- sleepmillis(rng.nextInt32(10));
+ // stagger operations slightly to mitigate connection pool contention
+ sleepmillis(rng.nextInt32(10));
- auto i = rng.nextCanonicalDouble();
+ auto i = rng.nextCanonicalDouble();
- if (i < .3) {
- return StressTestOp::runCancelOp(this, &pool);
- } else if (i < .7) {
- return StressTestOp::runCompleteOp(this, &pool);
- } else if (i < .99) {
- return StressTestOp::runTimeoutOp(this, &pool);
- } else {
- // Just a sprinkling of long ops, to mitigate connection pool contention
- return StressTestOp::runLongOp(this, &pool);
- }
- });
+ if (i < .3) {
+ return StressTestOp::runCancelOp(this, &pool);
+ } else if (i < .7) {
+ return StressTestOp::runCompleteOp(this, &pool);
+ } else if (i < .99) {
+ return StressTestOp::runTimeoutOp(this, &pool);
+ } else {
+ // Just a sprinkling of long ops, to mitigate connection pool contention
+ return StressTestOp::runLongOp(this, &pool);
+ }
+ });
log() << "running ops";
auto res = helpers::collect(ops, &pool)
@@ -313,7 +320,8 @@ class HangingHook : public executor::NetworkConnectionHook {
"admin",
BSON("sleep" << 1 << "lock"
<< "none"
- << "secs" << 100000000),
+ << "secs"
+ << 100000000),
BSONObj()))};
}
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index 7c0e66c6eda..1fe4f33b88d 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -370,9 +370,9 @@ bool NetworkInterfaceASIO::AsyncOp::operator==(const AsyncOp& other) const {
}
bool NetworkInterfaceASIO::AsyncOp::_hasSeenState(AsyncOp::State state) const {
- return std::any_of(std::begin(_states),
- std::end(_states),
- [state](AsyncOp::State _state) { return _state == state; });
+ return std::any_of(std::begin(_states), std::end(_states), [state](AsyncOp::State _state) {
+ return _state == state;
+ });
}
void NetworkInterfaceASIO::AsyncOp::_transitionToState(AsyncOp::State newState) {
@@ -393,9 +393,9 @@ void NetworkInterfaceASIO::AsyncOp::_transitionToState_inlock(AsyncOp::State new
// multiple times. Ignore that transition if we're already cancelled.
if (newState == State::kCanceled) {
// Find the current state
- auto iter = std::find_if_not(_states.rbegin(),
- _states.rend(),
- [](const State& state) { return state == State::kNoState; });
+ auto iter = std::find_if_not(_states.rbegin(), _states.rend(), [](const State& state) {
+ return state == State::kNoState;
+ });
// If its cancelled, just return
if (iter != _states.rend() && *iter == State::kCanceled) {
diff --git a/src/mongo/executor/network_interface_asio_test.cpp b/src/mongo/executor/network_interface_asio_test.cpp
index 7462070bad9..a6193dbccde 100644
--- a/src/mongo/executor/network_interface_asio_test.cpp
+++ b/src/mongo/executor/network_interface_asio_test.cpp
@@ -166,8 +166,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelOperation) {
// simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
{
// Cancel operation while blocked in the write for determinism. By calling cancel here we
@@ -196,8 +197,9 @@ TEST_F(NetworkInterfaceASIOTest, ImmediateCancel) {
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
auto& result = deferred.get();
ASSERT(result == ErrorCodes::CallbackCanceled);
@@ -214,8 +216,9 @@ TEST_F(NetworkInterfaceASIOTest, LateCancel) {
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
// Simulate user command
stream->simulateServer(rpc::Protocol::kOpCommandV1,
@@ -244,8 +247,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithNetworkError) {
// simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
{
WriteEvent{stream}.skip();
@@ -272,8 +276,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeout) {
ConnectEvent{stream}.skip();
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
{
WriteEvent write{stream};
@@ -299,8 +304,9 @@ TEST_F(NetworkInterfaceASIOTest, TimeoutWithNetworkError) {
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
{
WriteEvent{stream}.skip();
@@ -327,8 +333,9 @@ TEST_F(NetworkInterfaceASIOTest, CancelWithTimeoutAndNetworkError) {
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
{
WriteEvent{stream}.skip();
@@ -358,8 +365,9 @@ TEST_F(NetworkInterfaceASIOTest, AsyncOpTimeout) {
// Simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
{
// Wait for the operation to block on write so we know it's been added.
@@ -393,27 +401,28 @@ TEST_F(NetworkInterfaceASIOTest, StartCommand) {
// simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
auto expectedMetadata = BSON("meep"
<< "beep");
auto expectedCommandReply = BSON("boop"
<< "bop"
- << "ok" << 1.0);
+ << "ok"
+ << 1.0);
// simulate user command
- stream->simulateServer(rpc::Protocol::kOpCommandV1,
- [&](RemoteCommandRequest request) -> RemoteCommandResponse {
- ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()},
- "foo");
- ASSERT_EQ(request.dbname, "testDB");
+ stream->simulateServer(
+ rpc::Protocol::kOpCommandV1, [&](RemoteCommandRequest request) -> RemoteCommandResponse {
+ ASSERT_EQ(std::string{request.cmdObj.firstElementFieldName()}, "foo");
+ ASSERT_EQ(request.dbname, "testDB");
- RemoteCommandResponse response;
- response.data = expectedCommandReply;
- response.metadata = expectedMetadata;
- return response;
- });
+ RemoteCommandResponse response;
+ response.data = expectedCommandReply;
+ response.metadata = expectedMetadata;
+ return response;
+ });
auto& res = deferred.get();
@@ -450,8 +459,9 @@ public:
auto stream = streamFactory().blockUntilStreamExists(testHost);
ConnectEvent{stream}.skip();
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
uint32_t messageId = 0;
@@ -499,11 +509,9 @@ public:
};
TEST_F(MalformedMessageTest, messageHeaderWrongResponseTo) {
- runMessageTest(ErrorCodes::ProtocolError,
- false,
- [](MsgData::View message) {
- message.setResponseToMsgId(message.getResponseToMsgId() + 1);
- });
+ runMessageTest(ErrorCodes::ProtocolError, false, [](MsgData::View message) {
+ message.setResponseToMsgId(message.getResponseToMsgId() + 1);
+ });
}
TEST_F(MalformedMessageTest, messageHeaderlenZero) {
@@ -512,15 +520,15 @@ TEST_F(MalformedMessageTest, messageHeaderlenZero) {
}
TEST_F(MalformedMessageTest, MessageHeaderLenTooSmall) {
- runMessageTest(ErrorCodes::InvalidLength,
- false,
- [](MsgData::View message) { message.setLen(6); }); // min is 16
+ runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) {
+ message.setLen(6);
+ }); // min is 16
}
TEST_F(MalformedMessageTest, MessageHeaderLenTooLarge) {
- runMessageTest(ErrorCodes::InvalidLength,
- false,
- [](MsgData::View message) { message.setLen(48000001); }); // max is 48000000
+ runMessageTest(ErrorCodes::InvalidLength, false, [](MsgData::View message) {
+ message.setLen(48000001);
+ }); // max is 48000000
}
TEST_F(MalformedMessageTest, MessageHeaderLenNegative) {
@@ -529,27 +537,27 @@ TEST_F(MalformedMessageTest, MessageHeaderLenNegative) {
}
TEST_F(MalformedMessageTest, MessageLenSmallerThanActual) {
- runMessageTest(ErrorCodes::InvalidBSON,
- true,
- [](MsgData::View message) { message.setLen(message.getLen() - 10); });
+ runMessageTest(ErrorCodes::InvalidBSON, true, [](MsgData::View message) {
+ message.setLen(message.getLen() - 10);
+ });
}
TEST_F(MalformedMessageTest, FailedToReadAllBytesForMessage) {
- runMessageTest(ErrorCodes::InvalidLength,
- true,
- [](MsgData::View message) { message.setLen(message.getLen() + 100); });
+ runMessageTest(ErrorCodes::InvalidLength, true, [](MsgData::View message) {
+ message.setLen(message.getLen() + 100);
+ });
}
TEST_F(MalformedMessageTest, UnsupportedOpcode) {
- runMessageTest(ErrorCodes::UnsupportedFormat,
- true,
- [](MsgData::View message) { message.setOperation(2222); });
+ runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) {
+ message.setOperation(2222);
+ });
}
TEST_F(MalformedMessageTest, MismatchedOpcode) {
- runMessageTest(ErrorCodes::UnsupportedFormat,
- true,
- [](MsgData::View message) { message.setOperation(2006); });
+ runMessageTest(ErrorCodes::UnsupportedFormat, true, [](MsgData::View message) {
+ message.setOperation(2006);
+ });
}
class NetworkInterfaceASIOConnectionHookTest : public NetworkInterfaceASIOTest {
@@ -605,14 +613,14 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, ValidateHostInvalid) {
// simulate isMaster reply.
stream->simulateServer(
- rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ rpc::Protocol::kOpQuery, [](RemoteCommandRequest request) -> RemoteCommandResponse {
RemoteCommandResponse response;
response.data =
- BSON("minWireVersion"
- << mongo::WireSpec::instance().minWireVersionIncoming << "maxWireVersion"
- << mongo::WireSpec::instance().maxWireVersionIncoming << "TESTKEY"
- << "TESTVALUE");
+ BSON("minWireVersion" << mongo::WireSpec::instance().minWireVersionIncoming
+ << "maxWireVersion"
+ << mongo::WireSpec::instance().maxWireVersionIncoming
+ << "TESTKEY"
+ << "TESTVALUE");
return response;
});
@@ -637,8 +645,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) {
Status makeRequestError{ErrorCodes::DBPathInUse, "bloooh"};
start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
- -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ return Status::OK();
+ },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return makeRequestError;
@@ -659,8 +668,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsError) {
// simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
// We should stop here.
auto& res = deferred.get();
@@ -676,8 +686,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) {
bool handleReplyCalled = false;
start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
- -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ return Status::OK();
+ },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return {boost::none};
@@ -692,7 +703,8 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) {
auto commandReply = BSON("foo"
<< "boo"
- << "ok" << 1.0);
+ << "ok"
+ << 1.0);
auto metadata = BSON("aaa"
<< "bbb");
@@ -704,8 +716,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, MakeRequestReturnsNone) {
// simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
// Simulate user command.
stream->simulateServer(rpc::Protocol::kOpCommandV1,
@@ -739,14 +752,16 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) {
BSONObj hookCommandReply = BSON("blah"
<< "blah"
- << "ok" << 1.0);
+ << "ok"
+ << 1.0);
BSONObj hookReplyMetadata = BSON("1111" << 2222);
Status handleReplyError{ErrorCodes::AuthSchemaIncompatible, "daowdjkpowkdjpow"};
start(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
- -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ return Status::OK();
+ },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return {boost::make_optional<RemoteCommandRequest>(
@@ -769,8 +784,9 @@ TEST_F(NetworkInterfaceASIOConnectionHookTest, HandleReplyReturnsError) {
// simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
// Simulate hook reply
stream->simulateServer(rpc::Protocol::kOpCommandV1,
@@ -873,8 +889,9 @@ TEST_F(NetworkInterfaceASIOMetadataTest, Metadata) {
// simulate isMaster reply.
stream->simulateServer(rpc::Protocol::kOpQuery,
- [](RemoteCommandRequest request)
- -> RemoteCommandResponse { return simulateIsMaster(request); });
+ [](RemoteCommandRequest request) -> RemoteCommandResponse {
+ return simulateIsMaster(request);
+ });
// Simulate hook reply
stream->simulateServer(rpc::Protocol::kOpCommandV1,
diff --git a/src/mongo/executor/network_interface_asio_test_utils.h b/src/mongo/executor/network_interface_asio_test_utils.h
index bfd489514b0..0c1ff03cf11 100644
--- a/src/mongo/executor/network_interface_asio_test_utils.h
+++ b/src/mongo/executor/network_interface_asio_test_utils.h
@@ -135,23 +135,22 @@ static Deferred<std::vector<T>> collect(std::vector<Deferred<T>>& ds, ThreadPool
collectState->mem.resize(collectState->goal);
for (std::size_t i = 0; i < ds.size(); ++i) {
- ds[i].then(pool,
- [collectState, out, i](T res) mutable {
- // The bool return is unused.
- stdx::lock_guard<stdx::mutex> lk(collectState->mtx);
- collectState->mem[i] = std::move(res);
-
- // If we're done.
- if (collectState->goal == ++collectState->numFinished) {
- std::vector<T> outInitialized;
- outInitialized.reserve(collectState->mem.size());
- for (auto&& mem_entry : collectState->mem) {
- outInitialized.emplace_back(std::move(*mem_entry));
- }
- out.emplace(outInitialized);
- }
- return true;
- });
+ ds[i].then(pool, [collectState, out, i](T res) mutable {
+ // The bool return is unused.
+ stdx::lock_guard<stdx::mutex> lk(collectState->mtx);
+ collectState->mem[i] = std::move(res);
+
+ // If we're done.
+ if (collectState->goal == ++collectState->numFinished) {
+ std::vector<T> outInitialized;
+ outInitialized.reserve(collectState->mem.size());
+ for (auto&& mem_entry : collectState->mem) {
+ outInitialized.emplace_back(std::move(*mem_entry));
+ }
+ out.emplace(outInitialized);
+ }
+ return true;
+ });
}
return out;
}
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 1c822c9c48b..2a121e8fbb5 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -30,8 +30,8 @@
#include "mongo/platform/basic.h"
-#include "mongo/executor/network_interface_mock.h"
#include "mongo/executor/network_connection_hook.h"
+#include "mongo/executor/network_interface_mock.h"
#include <algorithm>
#include <iterator>
@@ -400,27 +400,27 @@ void NetworkInterfaceMock::_connectThenEnqueueOperation_inlock(const HostAndPort
}
// The completion handler for the postconnect command schedules the original command.
- auto postconnectCompletionHandler =
- [this, op](StatusWith<RemoteCommandResponse> response) mutable {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (!response.isOK()) {
- op.setResponse(_now_inlock(), response.getStatus());
- op.finishResponse();
- return;
- }
-
- auto handleStatus =
- _hook->handleReply(op.getRequest().target, std::move(response.getValue()));
-
- if (!handleStatus.isOK()) {
- op.setResponse(_now_inlock(), handleStatus);
- op.finishResponse();
- return;
- }
-
- _enqueueOperation_inlock(std::move(op));
- _connections.emplace(op.getRequest().target);
- };
+ auto postconnectCompletionHandler = [this,
+ op](StatusWith<RemoteCommandResponse> response) mutable {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (!response.isOK()) {
+ op.setResponse(_now_inlock(), response.getStatus());
+ op.finishResponse();
+ return;
+ }
+
+ auto handleStatus =
+ _hook->handleReply(op.getRequest().target, std::move(response.getValue()));
+
+ if (!handleStatus.isOK()) {
+ op.setResponse(_now_inlock(), handleStatus);
+ op.finishResponse();
+ return;
+ }
+
+ _enqueueOperation_inlock(std::move(op));
+ _connections.emplace(op.getRequest().target);
+ };
auto postconnectOp = NetworkOperation(op.getCallbackHandle(),
std::move(*hookPostconnectCommand),
diff --git a/src/mongo/executor/network_interface_mock_test.cpp b/src/mongo/executor/network_interface_mock_test.cpp
index 4e1e3f45a35..d59f7970474 100644
--- a/src/mongo/executor/network_interface_mock_test.cpp
+++ b/src/mongo/executor/network_interface_mock_test.cpp
@@ -164,16 +164,14 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHook) {
BSONObj(),
Milliseconds(0)};
- ASSERT_OK(net().startCommand(cb,
- actualCommandExpected,
- [&](StatusWith<RemoteCommandResponse> resp) {
- commandFinished = true;
- if (resp.isOK()) {
- gotCorrectCommandReply =
- (actualResponseExpected.toString() ==
- resp.getValue().toString());
- }
- }));
+ ASSERT_OK(
+ net().startCommand(cb, actualCommandExpected, [&](StatusWith<RemoteCommandResponse> resp) {
+ commandFinished = true;
+ if (resp.isOK()) {
+ gotCorrectCommandReply =
+ (actualResponseExpected.toString() == resp.getValue().toString());
+ }
+ }));
// At this point validate and makeRequest should have been called.
ASSERT(validateCalled);
@@ -224,10 +222,12 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
// We just need some obscure non-OK code.
return {ErrorCodes::ConflictingOperationInProgress, "blah"};
},
- [&](const HostAndPort& remoteHost)
- -> StatusWith<boost::optional<RemoteCommandRequest>> { MONGO_UNREACHABLE; },
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response)
- -> Status { MONGO_UNREACHABLE; }));
+ [&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
+ MONGO_UNREACHABLE;
+ },
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
+ MONGO_UNREACHABLE;
+ }));
startNetwork();
@@ -261,14 +261,16 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookFailedValidation) {
TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
bool makeRequestCalled = false;
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
- -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ return Status::OK();
+ },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return {boost::none};
},
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response)
- -> Status { MONGO_UNREACHABLE; }));
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
+ MONGO_UNREACHABLE;
+ }));
startNetwork();
@@ -296,14 +298,16 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookNoRequest) {
TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
bool makeRequestCalled = false;
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
- -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ return Status::OK();
+ },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
makeRequestCalled = true;
return {ErrorCodes::InvalidSyncSource, "blah"};
},
- [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response)
- -> Status { MONGO_UNREACHABLE; }));
+ [&](const HostAndPort& remoteHost, RemoteCommandResponse&& response) -> Status {
+ MONGO_UNREACHABLE;
+ }));
startNetwork();
@@ -333,8 +337,9 @@ TEST_F(NetworkInterfaceMockTest, ConnectionHookMakeRequestFails) {
TEST_F(NetworkInterfaceMockTest, ConnectionHookHandleReplyFails) {
bool handleReplyCalled = false;
net().setConnectionHook(makeTestHook(
- [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply)
- -> Status { return Status::OK(); },
+ [&](const HostAndPort& remoteHost, const RemoteCommandResponse& isMasterReply) -> Status {
+ return Status::OK();
+ },
[&](const HostAndPort& remoteHost) -> StatusWith<boost::optional<RemoteCommandRequest>> {
return boost::make_optional<RemoteCommandRequest>({});
},
@@ -404,8 +409,9 @@ TEST_F(NetworkInterfaceMockTest, CommandTimeout) {
request.timeout = Milliseconds(2000);
ErrorCodes::Error statusPropagated = ErrorCodes::OK;
- auto finishFn =
- [&](StatusWith<RemoteCommandResponse> resp) { statusPropagated = resp.getStatus().code(); };
+ auto finishFn = [&](StatusWith<RemoteCommandResponse> resp) {
+ statusPropagated = resp.getStatus().code();
+ };
//
// Command times out.
diff --git a/src/mongo/executor/network_interface_thread_pool.cpp b/src/mongo/executor/network_interface_thread_pool.cpp
index b678a9486d1..f556b2aab35 100644
--- a/src/mongo/executor/network_interface_thread_pool.cpp
+++ b/src/mongo/executor/network_interface_thread_pool.cpp
@@ -132,12 +132,11 @@ void NetworkInterfaceThreadPool::consumeTasks(stdx::unique_lock<stdx::mutex> lk)
if (!_registeredAlarm) {
_registeredAlarm = true;
lk.unlock();
- _net->setAlarm(_net->now(),
- [this] {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _registeredAlarm = false;
- consumeTasks(std::move(lk));
- });
+ _net->setAlarm(_net->now(), [this] {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _registeredAlarm = false;
+ consumeTasks(std::move(lk));
+ });
}
return;
diff --git a/src/mongo/executor/network_interface_thread_pool_test.cpp b/src/mongo/executor/network_interface_thread_pool_test.cpp
index 319d0299b27..b9a8c070b2d 100644
--- a/src/mongo/executor/network_interface_thread_pool_test.cpp
+++ b/src/mongo/executor/network_interface_thread_pool_test.cpp
@@ -33,10 +33,10 @@
#include "mongo/base/init.h"
#include "mongo/executor/network_interface_asio.h"
#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/concurrency/thread_pool_test_common.h"
#include "mongo/util/concurrency/thread_pool_test_fixture.h"
-#include "mongo/stdx/memory.h"
namespace {
using namespace mongo;
diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h
index b6cedb8f976..3e997e448de 100644
--- a/src/mongo/executor/remote_command_response.h
+++ b/src/mongo/executor/remote_command_response.h
@@ -29,8 +29,8 @@
#pragma once
#include <iosfwd>
-#include <string>
#include <memory>
+#include <string>
#include "mongo/db/jsobj.h"
#include "mongo/util/net/message.h"
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index ec68a837c8e..fd630e527b7 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -28,9 +28,9 @@
#pragma once
-#include <string>
-#include <memory>
#include <functional>
+#include <memory>
+#include <string>
#include "mongo/base/disallow_copying.h"
#include "mongo/base/status.h"
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index d6c479a1a32..ed956b93567 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -90,20 +90,20 @@ public:
}
};
-#define COMMON_EXECUTOR_TEST(TEST_NAME) \
- class CET_##TEST_NAME : public CommonTaskExecutorTestFixture { \
- public: \
- CET_##TEST_NAME(ExecutorFactory makeExecutor) \
- : CommonTaskExecutorTestFixture(std::move(makeExecutor)) {} \
- \
- private: \
- void _doTest() override; \
- static const CetRegistrationAgent _agent; \
- }; \
- const CetRegistrationAgent CET_##TEST_NAME::_agent(#TEST_NAME, \
- [](ExecutorFactory makeExecutor) { \
- return stdx::make_unique<CET_##TEST_NAME>(std::move(makeExecutor)); \
- }); \
+#define COMMON_EXECUTOR_TEST(TEST_NAME) \
+ class CET_##TEST_NAME : public CommonTaskExecutorTestFixture { \
+ public: \
+ CET_##TEST_NAME(ExecutorFactory makeExecutor) \
+ : CommonTaskExecutorTestFixture(std::move(makeExecutor)) {} \
+ \
+ private: \
+ void _doTest() override; \
+ static const CetRegistrationAgent _agent; \
+ }; \
+ const CetRegistrationAgent CET_##TEST_NAME::_agent( \
+ #TEST_NAME, [](ExecutorFactory makeExecutor) { \
+ return stdx::make_unique<CET_##TEST_NAME>(std::move(makeExecutor)); \
+ }); \
void CET_##TEST_NAME::_doTest()
void setStatus(const TaskExecutor::CallbackArgs& cbData, Status* target) {
@@ -132,9 +132,10 @@ void scheduleSetStatusAndShutdown(const TaskExecutor::CallbackArgs& cbData,
*outStatus1 = cbData.status;
return;
}
- *outStatus1 = cbData.executor->scheduleWork(stdx::bind(setStatusAndShutdown,
- stdx::placeholders::_1,
- outStatus2)).getStatus();
+ *outStatus1 =
+ cbData.executor
+ ->scheduleWork(stdx::bind(setStatusAndShutdown, stdx::placeholders::_1, outStatus2))
+ .getStatus();
}
COMMON_EXECUTOR_TEST(RunOne) {
@@ -180,10 +181,10 @@ COMMON_EXECUTOR_TEST(OneSchedulesAnother) {
TaskExecutor& executor = getExecutor();
Status status1 = getDetectableErrorStatus();
Status status2 = getDetectableErrorStatus();
- ASSERT_OK(executor.scheduleWork(stdx::bind(scheduleSetStatusAndShutdown,
- stdx::placeholders::_1,
- &status1,
- &status2)).getStatus());
+ ASSERT_OK(executor
+ .scheduleWork(stdx::bind(
+ scheduleSetStatusAndShutdown, stdx::placeholders::_1, &status1, &status2))
+ .getStatus());
launchExecutorThread();
joinExecutorThread();
ASSERT_OK(status1);
@@ -360,9 +361,10 @@ static void setStatusOnRemoteCommandCompletion(
Status* outStatus) {
if (cbData.request != expectedRequest) {
*outStatus = Status(ErrorCodes::BadValue,
- mongoutils::str::stream()
- << "Actual request: " << getRequestDescription(cbData.request)
- << "; expected: " << getRequestDescription(expectedRequest));
+ mongoutils::str::stream() << "Actual request: "
+ << getRequestDescription(cbData.request)
+ << "; expected: "
+ << getRequestDescription(expectedRequest));
return;
}
*outStatus = cbData.response.getStatus();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 6d58745f194..9b5d18cbd84 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -264,20 +264,18 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleWorkAt(
return cbHandle;
}
lk.unlock();
- _net->setAlarm(when,
- [this, when, cbHandle] {
- auto cbState =
- checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue()));
- if (cbState->canceled.load()) {
- return;
- }
- invariant(now() >= when);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (cbState->canceled.load()) {
- return;
- }
- scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
- });
+ _net->setAlarm(when, [this, when, cbHandle] {
+ auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle.getValue()));
+ if (cbState->canceled.load()) {
+ return;
+ }
+ invariant(now() >= when);
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ if (cbState->canceled.load()) {
+ return;
+ }
+ scheduleIntoPool_inlock(&_sleepersQueue, cbState->iter, std::move(lk));
+ });
return cbHandle;
}
@@ -350,9 +348,9 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
if (_inShutdown) {
return;
}
- LOG(3) << "Received remote response: " << (response.isOK()
- ? response.getValue().toString()
- : response.getStatus().toString());
+ LOG(3) << "Received remote response: "
+ << (response.isOK() ? response.getValue().toString()
+ : response.getStatus().toString());
swap(cbState->callback, newCb);
scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
});
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index f4afb7b58c9..9cd55507649 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -32,8 +32,8 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/executor/task_executor.h"
-#include "mongo/stdx/list.h"
#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/thread.h"
diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp
index b2d43e669a0..887cfa59d45 100644
--- a/src/mongo/executor/thread_pool_task_executor_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_test.cpp
@@ -48,10 +48,9 @@ namespace executor {
namespace {
MONGO_INITIALIZER(ThreadPoolExecutorCommonTests)(InitializerContext*) {
- addTestsForExecutor("ThreadPoolExecutorCommon",
- [](std::unique_ptr<NetworkInterfaceMock>* net) {
- return makeThreadPoolTestExecutor(std::move(*net));
- });
+ addTestsForExecutor("ThreadPoolExecutorCommon", [](std::unique_ptr<NetworkInterfaceMock>* net) {
+ return makeThreadPoolTestExecutor(std::move(*net));
+ });
return Status::OK();
}
@@ -100,14 +99,16 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
auto& executor = getExecutor();
launchExecutorThread();
- ASSERT_OK(executor.scheduleWork([&](const TaskExecutor::CallbackArgs& cbData) {
- status1 = cbData.status;
- if (!status1.isOK())
- return;
- barrier.countDownAndWait();
- cb2 = cbData.executor->scheduleWork(
- [&status2](const TaskExecutor::CallbackArgs& cbData) { status2 = cbData.status; });
- }).getStatus());
+ ASSERT_OK(executor
+ .scheduleWork([&](const TaskExecutor::CallbackArgs& cbData) {
+ status1 = cbData.status;
+ if (!status1.isOK())
+ return;
+ barrier.countDownAndWait();
+ cb2 = cbData.executor->scheduleWork([&status2](
+ const TaskExecutor::CallbackArgs& cbData) { status2 = cbData.status; });
+ })
+ .getStatus());
auto fpTPTE1 =
getGlobalFailPointRegistry()->getFailPoint("scheduleIntoPoolSpinsUntilThreadPoolShutsDown");
diff --git a/src/mongo/executor/thread_pool_task_executor_test_fixture.h b/src/mongo/executor/thread_pool_task_executor_test_fixture.h
index 04a3089bbab..b549184b26f 100644
--- a/src/mongo/executor/thread_pool_task_executor_test_fixture.h
+++ b/src/mongo/executor/thread_pool_task_executor_test_fixture.h
@@ -31,8 +31,8 @@
#include <memory>
#include "mongo/executor/network_interface_mock.h"
-#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/executor/task_executor_test_fixture.h"
+#include "mongo/executor/thread_pool_task_executor.h"
namespace mongo {
namespace executor {