summaryrefslogtreecommitdiff
path: root/src/mongo/executor/async_secure_stream.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2015-10-28 14:41:07 -0400
committerJason Carey <jcarey@argv.me>2015-11-09 18:03:18 -0500
commitc836472353e736424c9bb87868508c9e633b892d (patch)
tree71b6bd4303e722ea244ac5e3b538ad65ab417f5c /src/mongo/executor/async_secure_stream.cpp
parente8187cc8f07ac5fccd384430f33457d8a57f0381 (diff)
downloadmongo-c836472353e736424c9bb87868508c9e633b892d.tar.gz
SERVER-20143 Strand NetworkInterfaceASIO
Add strands (and an option for multiple io workers) in NetworkInterfaceASIO. strands are an asio specific mechanism for ensuring thread safety.
Diffstat (limited to 'src/mongo/executor/async_secure_stream.cpp')
-rw-r--r--src/mongo/executor/async_secure_stream.cpp32
1 files changed, 17 insertions, 15 deletions
diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp
index 964f805d07d..a3a1acc5fb7 100644
--- a/src/mongo/executor/async_secure_stream.cpp
+++ b/src/mongo/executor/async_secure_stream.cpp
@@ -43,8 +43,9 @@
namespace mongo {
namespace executor {
-AsyncSecureStream::AsyncSecureStream(asio::io_service* io_service, asio::ssl::context* sslContext)
- : _stream(*io_service, *sslContext) {}
+AsyncSecureStream::AsyncSecureStream(asio::io_service::strand* strand,
+ asio::ssl::context* sslContext)
+ : _strand(strand), _stream(_strand->get_io_service(), *sslContext) {}
AsyncSecureStream::~AsyncSecureStream() {
destroyStream(&_stream.lowest_layer(), _connected);
@@ -55,33 +56,34 @@ void AsyncSecureStream::connect(const asio::ip::tcp::resolver::iterator endpoint
// Stash the connectHandler as we won't be able to call it until we re-enter the state
// machine.
_userHandler = std::move(connectHandler);
- asio::async_connect(_stream.lowest_layer(),
- std::move(endpoints),
- [this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) {
- if (ec) {
- return _userHandler(ec);
- }
- _connected = true;
- return _handleConnect(std::move(iter));
- });
+ asio::async_connect(
+ _stream.lowest_layer(),
+ std::move(endpoints),
+ _strand->wrap([this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) {
+ if (ec) {
+ return _userHandler(ec);
+ }
+ _connected = true;
+ return _handleConnect(std::move(iter));
+ }));
}
void AsyncSecureStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) {
- writeStream(&_stream, _connected, buffer, std::move(streamHandler));
+ writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
}
void AsyncSecureStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) {
- readStream(&_stream, _connected, buffer, std::move(streamHandler));
+ readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
}
void AsyncSecureStream::_handleConnect(asio::ip::tcp::resolver::iterator iter) {
_stream.async_handshake(decltype(_stream)::client,
- [this, iter](std::error_code ec) {
+ _strand->wrap([this, iter](std::error_code ec) {
if (ec) {
return _userHandler(ec);
}
return _handleHandshake(ec, iter->host_name());
- });
+ }));
}
void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string& hostName) {