summaryrefslogtreecommitdiff
path: root/src/mongo/executor/async_mock_stream_factory.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/executor/async_mock_stream_factory.cpp')
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp109
1 files changed, 53 insertions, 56 deletions
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
index ae8356a4982..c2fee263c60 100644
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ b/src/mongo/executor/async_mock_stream_factory.cpp
@@ -124,18 +124,17 @@ AsyncMockStreamFactory::MockStream::~MockStream() {
void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterator endpoints,
ConnectHandler&& connectHandler) {
// Suspend execution after "connecting"
- _defer(kBlockedBeforeConnect,
- [this, connectHandler, endpoints]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- // We shim a lambda to give connectHandler the right signature since it doesn't take
- // a size_t param.
- checkCanceled(
- _strand,
- &_state,
- [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); },
- 0);
- });
+ _defer(kBlockedBeforeConnect, [this, connectHandler, endpoints]() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ // We shim a lambda to give connectHandler the right signature since it doesn't take
+ // a size_t param.
+ checkCanceled(
+ _strand,
+ &_state,
+ [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); },
+ 0);
+ });
}
void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
@@ -147,11 +146,10 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
_writeQueue.push({begin, begin + size});
// Suspend execution after data is written.
- _defer_inlock(kBlockedAfterWrite,
- [this, writeHandler, size]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- checkCanceled(_strand, &_state, std::move(writeHandler), size);
- });
+ _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ checkCanceled(_strand, &_state, std::move(writeHandler), size);
+ });
}
void AsyncMockStreamFactory::MockStream::cancel() {
@@ -169,45 +167,44 @@ void AsyncMockStreamFactory::MockStream::cancel() {
void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf,
StreamHandler&& readHandler) {
// Suspend execution before data is read.
- _defer(kBlockedBeforeRead,
- [this, buf, readHandler]() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- int nToCopy = 0;
-
- // If we've set an error, return that instead of a read.
- if (!_error) {
- auto nextRead = std::move(_readQueue.front());
- _readQueue.pop();
-
- auto beginDst = asio::buffer_cast<uint8_t*>(buf);
- nToCopy = std::min(nextRead.size(), asio::buffer_size(buf));
-
- auto endSrc = std::begin(nextRead);
- std::advance(endSrc, nToCopy);
-
- auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst);
- invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy));
- log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy)
- << " remaining in buffer";
- }
-
- auto handler = readHandler;
-
- // If we did not receive all the bytes, we should return an error
- if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) {
- handler = [readHandler](std::error_code ec, size_t len) {
- // If we have an error here we've been canceled, and that takes precedence
- if (ec)
- return readHandler(ec, len);
-
- // Call the original handler with an error
- readHandler(make_error_code(ErrorCodes::InvalidLength), len);
- };
- }
-
- checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error);
- _error.clear();
- });
+ _defer(kBlockedBeforeRead, [this, buf, readHandler]() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ int nToCopy = 0;
+
+ // If we've set an error, return that instead of a read.
+ if (!_error) {
+ auto nextRead = std::move(_readQueue.front());
+ _readQueue.pop();
+
+ auto beginDst = asio::buffer_cast<uint8_t*>(buf);
+ nToCopy = std::min(nextRead.size(), asio::buffer_size(buf));
+
+ auto endSrc = std::begin(nextRead);
+ std::advance(endSrc, nToCopy);
+
+ auto endDst = std::copy(std::begin(nextRead), endSrc, beginDst);
+ invariant((endDst - beginDst) == static_cast<std::ptrdiff_t>(nToCopy));
+ log() << "read " << nToCopy << " bytes, " << (nextRead.size() - nToCopy)
+ << " remaining in buffer";
+ }
+
+ auto handler = readHandler;
+
+ // If we did not receive all the bytes, we should return an error
+ if (static_cast<size_t>(nToCopy) < asio::buffer_size(buf)) {
+ handler = [readHandler](std::error_code ec, size_t len) {
+ // If we have an error here we've been canceled, and that takes precedence
+ if (ec)
+ return readHandler(ec, len);
+
+ // Call the original handler with an error
+ readHandler(make_error_code(ErrorCodes::InvalidLength), len);
+ };
+ }
+
+ checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error);
+ _error.clear();
+ });
}
void AsyncMockStreamFactory::MockStream::pushRead(std::vector<uint8_t> toRead) {