/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include "mongo/base/system_error.h"
#include "mongo/config.h"
#include "mongo/db/stats/counters.h"
#include "mongo/transport/asio_utils.h"
#include "mongo/transport/baton.h"
#include "mongo/transport/transport_layer_asio.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/net/socket_utils.h"
#ifdef MONGO_CONFIG_SSL
#include "mongo/util/net/ssl_manager.h"
#include "mongo/util/net/ssl_types.h"
#endif
#include "asio.hpp"
#ifdef MONGO_CONFIG_SSL
#include "mongo/util/net/ssl.hpp"
#endif
namespace mongo {
namespace transport {
MONGO_FAIL_POINT_DEFINE(transportLayerASIOshortOpportunisticReadWrite);
template
auto futurize(const std::error_code& ec, SuccessValue&& successValue) {
using Result = Future>;
if (MONGO_unlikely(ec)) {
return Result::makeReady(errorCodeToStatus(ec));
}
return Result::makeReady(successValue);
}
Future futurize(const std::error_code& ec) {
using Result = Future;
if (MONGO_unlikely(ec)) {
return Result::makeReady(errorCodeToStatus(ec));
}
return Result::makeReady();
}
using GenericSocket = asio::generic::stream_protocol::socket;
class TransportLayerASIO::ASIOSession final : public Session {
MONGO_DISALLOW_COPYING(ASIOSession);
public:
// If the socket is disconnected while any of these options are being set, this constructor
// may throw, but it is guaranteed to throw a mongo DBException.
ASIOSession(TransportLayerASIO* tl, GenericSocket socket, bool isIngressSession) try
: _socket(std::move(socket)),
_tl(tl),
_isIngressSession(isIngressSession) {
auto family = endpointToSockAddr(_socket.local_endpoint()).getType();
if (family == AF_INET || family == AF_INET6) {
_socket.set_option(asio::ip::tcp::no_delay(true));
_socket.set_option(asio::socket_base::keep_alive(true));
setSocketKeepAliveParams(_socket.native_handle());
}
_local = endpointToHostAndPort(_socket.local_endpoint());
_remote = endpointToHostAndPort(_socket.remote_endpoint());
} catch (const DBException&) {
throw;
} catch (const asio::system_error& error) {
uasserted(ErrorCodes::SocketException, error.what());
} catch (...) {
uasserted(50797, str::stream() << "Unknown exception while configuring socket.");
}
~ASIOSession() {
end();
}
TransportLayer* getTransportLayer() const override {
return _tl;
}
const HostAndPort& remote() const override {
return _remote;
}
const HostAndPort& local() const override {
return _local;
}
void end() override {
if (getSocket().is_open()) {
std::error_code ec;
cancelAsyncOperations();
getSocket().shutdown(GenericSocket::shutdown_both, ec);
if ((ec) && (ec != asio::error::not_connected)) {
error() << "Error shutting down socket: " << ec.message();
}
}
}
StatusWith sourceMessage() override {
ensureSync();
return sourceMessageImpl().getNoThrow();
}
Future asyncSourceMessage(const transport::BatonHandle& baton = nullptr) override {
ensureAsync();
return sourceMessageImpl(baton);
}
Status sinkMessage(Message message) override {
ensureSync();
return write(asio::buffer(message.buf(), message.size()))
.then([this, &message] {
if (_isIngressSession) {
networkCounter.hitPhysicalOut(message.size());
}
})
.getNoThrow();
}
Future asyncSinkMessage(Message message,
const transport::BatonHandle& baton = nullptr) override {
ensureAsync();
return write(asio::buffer(message.buf(), message.size()), baton)
.then([this, message /*keep the buffer alive*/]() {
if (_isIngressSession) {
networkCounter.hitPhysicalOut(message.size());
}
});
}
void cancelAsyncOperations(const transport::BatonHandle& baton = nullptr) override {
LOG(3) << "Cancelling outstanding I/O operations on connection to " << _remote;
if (baton) {
baton->cancelSession(*this);
} else {
getSocket().cancel();
}
}
void setTimeout(boost::optional timeout) override {
invariant(!timeout || timeout->count() > 0);
_configuredTimeout = timeout;
}
bool isConnected() override {
// socket.is_open() only returns whether the socket is a valid file descriptor and
// if we haven't marked this socket as closed already.
if (!getSocket().is_open())
return false;
auto swPollEvents = pollASIOSocket(getSocket(), POLLIN, Milliseconds{0});
if (!swPollEvents.isOK()) {
if (swPollEvents != ErrorCodes::NetworkTimeout) {
warning() << "Failed to poll socket for connectivity check: "
<< swPollEvents.getStatus();
return false;
}
return true;
}
auto revents = swPollEvents.getValue();
if (revents & POLLIN) {
char testByte;
int size = ::recv(getSocket().native_handle(), &testByte, sizeof(testByte), MSG_PEEK);
if (size == sizeof(testByte)) {
return true;
} else if (size == -1) {
auto errDesc = errnoWithDescription(errno);
warning() << "Failed to check socket connectivity: " << errDesc;
}
// If size == 0 then we got disconnected and we should return false.
}
return false;
}
protected:
friend class TransportLayerASIO;
friend TransportLayerASIO::BatonASIO;
#ifdef MONGO_CONFIG_SSL
// The unique_lock here is held by TransportLayerASIO to synchronize with the asyncConnect
// timeout callback. It will be unlocked before the SSL actually handshake begins.
Future handshakeSSLForEgressWithLock(stdx::unique_lock lk,
const HostAndPort& target) {
if (!_tl->_egressSSLContext) {
return Future::makeReady(Status(ErrorCodes::SSLHandshakeFailed,
"SSL requested but SSL support is disabled"));
}
_sslSocket.emplace(
std::move(_socket), *_tl->_egressSSLContext, removeFQDNRoot(target.host()));
lk.unlock();
auto doHandshake = [&] {
if (_blockingMode == Sync) {
std::error_code ec;
_sslSocket->handshake(asio::ssl::stream_base::client, ec);
return Future::makeReady(errorCodeToStatus(ec));
} else {
return _sslSocket->async_handshake(asio::ssl::stream_base::client, UseFuture{});
}
};
return doHandshake().then([this, target] {
_ranHandshake = true;
auto sslManager = getSSLManager();
auto swPeerInfo = uassertStatusOK(sslManager->parseAndValidatePeerCertificate(
_sslSocket->native_handle(), target.host()));
if (swPeerInfo) {
SSLPeerInfo::forSession(shared_from_this()) = std::move(*swPeerInfo);
}
});
}
// For synchronous connections where we don't have an async timer, just take a dummy lock and
// pass it to the WithLock version of handshakeSSLForEgress
Future handshakeSSLForEgress(const HostAndPort& target) {
stdx::mutex mutex;
return handshakeSSLForEgressWithLock(stdx::unique_lock(mutex), target);
}
#endif
void ensureSync() {
asio::error_code ec;
if (_blockingMode != Sync) {
getSocket().non_blocking(false, ec);
fassert(40490, errorCodeToStatus(ec));
_blockingMode = Sync;
}
if (_socketTimeout != _configuredTimeout) {
// Change boost::none (which means no timeout) into a zero value for the socket option,
// which also means no timeout.
auto timeout = _configuredTimeout.value_or(Milliseconds{0});
getSocket().set_option(ASIOSocketTimeoutOption(timeout), ec);
uassertStatusOK(errorCodeToStatus(ec));
getSocket().set_option(ASIOSocketTimeoutOption(timeout), ec);
uassertStatusOK(errorCodeToStatus(ec));
_socketTimeout = _configuredTimeout;
}
}
void ensureAsync() {
if (_blockingMode == Async)
return;
// Socket timeouts currently only effect synchronous calls, so make sure the caller isn't
// expecting a socket timeout when they do an async operation.
invariant(!_configuredTimeout);
asio::error_code ec;
getSocket().non_blocking(true, ec);
fassert(50706, errorCodeToStatus(ec));
_blockingMode = Async;
}
private:
template
class ASIOSocketTimeoutOption {
public:
#ifdef _WIN32
using TimeoutType = DWORD;
ASIOSocketTimeoutOption(Milliseconds timeoutVal) : _timeout(timeoutVal.count()) {}
#else
using TimeoutType = timeval;
ASIOSocketTimeoutOption(Milliseconds timeoutVal) {
_timeout.tv_sec = duration_cast(timeoutVal).count();
const auto minusSeconds = timeoutVal - Seconds{_timeout.tv_sec};
_timeout.tv_usec = duration_cast(minusSeconds).count();
}
#endif
template
int name(const Protocol&) const {
return Name;
}
template
const TimeoutType* data(const Protocol&) const {
return &_timeout;
}
template
std::size_t size(const Protocol&) const {
return sizeof(_timeout);
}
template
int level(const Protocol&) const {
return SOL_SOCKET;
}
private:
TimeoutType _timeout;
};
GenericSocket& getSocket() {
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
return static_cast(_sslSocket->lowest_layer());
}
#endif
return _socket;
}
Future sourceMessageImpl(const transport::BatonHandle& baton = nullptr) {
static constexpr auto kHeaderSize = sizeof(MSGHEADER::Value);
auto headerBuffer = SharedBuffer::allocate(kHeaderSize);
auto ptr = headerBuffer.get();
return read(asio::buffer(ptr, kHeaderSize), baton)
.then([ headerBuffer = std::move(headerBuffer), this, baton ]() mutable {
if (checkForHTTPRequest(asio::buffer(headerBuffer.get(), kHeaderSize))) {
return sendHTTPResponse(baton);
}
const auto msgLen = size_t(MSGHEADER::View(headerBuffer.get()).getMessageLength());
if (msgLen < kHeaderSize || msgLen > MaxMessageSizeBytes) {
StringBuilder sb;
sb << "recv(): message msgLen " << msgLen << " is invalid. "
<< "Min " << kHeaderSize << " Max: " << MaxMessageSizeBytes;
const auto str = sb.str();
LOG(0) << str;
return Future::makeReady(Status(ErrorCodes::ProtocolError, str));
}
if (msgLen == kHeaderSize) {
// This probably isn't a real case since all (current) messages have bodies.
if (_isIngressSession) {
networkCounter.hitPhysicalIn(msgLen);
}
return Future::makeReady(Message(std::move(headerBuffer)));
}
auto buffer = SharedBuffer::allocate(msgLen);
memcpy(buffer.get(), headerBuffer.get(), kHeaderSize);
MsgData::View msgView(buffer.get());
return read(asio::buffer(msgView.data(), msgView.dataLen()), baton)
.then([ this, buffer = std::move(buffer), msgLen ]() mutable {
if (_isIngressSession) {
networkCounter.hitPhysicalIn(msgLen);
}
return Message(std::move(buffer));
});
});
}
template
Future read(const MutableBufferSequence& buffers,
const transport::BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
if (_sslSocket) {
return opportunisticRead(*_sslSocket, buffers, baton);
} else if (!_ranHandshake) {
invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value));
return opportunisticRead(_socket, buffers, baton)
.then([this, buffers]() mutable {
_ranHandshake = true;
return maybeHandshakeSSLForIngress(buffers);
})
.then([this, buffers, baton](bool needsRead) mutable {
if (needsRead) {
return read(buffers, baton);
} else {
return Future::makeReady();
}
});
}
#endif
return opportunisticRead(_socket, buffers, baton);
}
template
Future write(const ConstBufferSequence& buffers,
const transport::BatonHandle& baton = nullptr) {
#ifdef MONGO_CONFIG_SSL
_ranHandshake = true;
if (_sslSocket) {
#ifdef __linux__
// We do some trickery in asio (see moreToSend), which appears to work well on linux,
// but fails on other platforms.
return opportunisticWrite(*_sslSocket, buffers, baton);
#else
if (_blockingMode == Async) {
// Opportunistic writes are broken for async egress SSL (switching between blocking
// and non-blocking mode corrupts the TLS exchange).
return asio::async_write(*_sslSocket, buffers, UseFuture{}).ignoreValue();
} else {
return opportunisticWrite(*_sslSocket, buffers, baton);
}
#endif
}
#endif
return opportunisticWrite(_socket, buffers, baton);
}
template
Future opportunisticRead(Stream& stream,
const MutableBufferSequence& buffers,
const transport::BatonHandle& baton = nullptr) {
std::error_code ec;
size_t size;
if (MONGO_FAIL_POINT(transportLayerASIOshortOpportunisticReadWrite) &&
_blockingMode == Async) {
asio::mutable_buffer localBuffer = buffers;
if (buffers.size()) {
localBuffer = asio::mutable_buffer(buffers.data(), 1);
}
size = asio::read(stream, localBuffer, ec);
if (!ec && buffers.size() > 1) {
ec = asio::error::would_block;
}
} else {
size = asio::read(stream, buffers, ec);
}
if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&
(_blockingMode == Async)) {
// asio::read is a loop internally, so some of buffers may have been read into already.
// So we need to adjust the buffers passed into async_read to be offset by size, if
// size is > 0.
MutableBufferSequence asyncBuffers(buffers);
if (size > 0) {
asyncBuffers += size;
}
if (baton) {
return baton->addSession(*this, Baton::Type::In)
.then([&stream, asyncBuffers, baton, this] {
return opportunisticRead(stream, asyncBuffers, baton);
});
}
return asio::async_read(stream, asyncBuffers, UseFuture{}).ignoreValue();
} else {
return futurize(ec);
}
}
/**
* moreToSend checks the ssl socket after an opportunisticWrite. If there are still bytes to
* send, we manually send them off the underlying socket. Then we hook that up with a future
* that gets us back to sending from the ssl side.
*
* There are two variants because we call opportunisticWrite on generic sockets and ssl sockets.
* The generic socket impl never has more to send (because it doesn't have an inner socket it
* needs to keep sending).
*/
template
boost::optional> moreToSend(GenericSocket& socket,
const ConstBufferSequence& buffers,
const transport::BatonHandle& baton) {
return boost::none;
}
#ifdef MONGO_CONFIG_SSL
template
boost::optional> moreToSend(asio::ssl::stream& socket,
const ConstBufferSequence& buffers,
const BatonHandle& baton) {
if (_sslSocket->getCoreOutputBuffer().size()) {
return opportunisticWrite(getSocket(), _sslSocket->getCoreOutputBuffer(), baton)
.then([this, &socket, buffers, baton] {
return opportunisticWrite(socket, buffers, baton);
});
}
return boost::none;
}
#endif
template
Future opportunisticWrite(Stream& stream,
const ConstBufferSequence& buffers,
const transport::BatonHandle& baton = nullptr) {
std::error_code ec;
std::size_t size;
if (MONGO_FAIL_POINT(transportLayerASIOshortOpportunisticReadWrite) &&
_blockingMode == Async) {
asio::const_buffer localBuffer = buffers;
if (buffers.size()) {
localBuffer = asio::const_buffer(buffers.data(), 1);
}
size = asio::write(stream, localBuffer, ec);
if (!ec && buffers.size() > 1) {
ec = asio::error::would_block;
}
} else {
size = asio::write(stream, buffers, ec);
}
if (((ec == asio::error::would_block) || (ec == asio::error::try_again)) &&
(_blockingMode == Async)) {
// asio::write is a loop internally, so some of buffers may have been read into already.
// So we need to adjust the buffers passed into async_write to be offset by size, if
// size is > 0.
ConstBufferSequence asyncBuffers(buffers);
if (size > 0) {
asyncBuffers += size;
}
if (auto more = moreToSend(stream, asyncBuffers, baton)) {
return std::move(*more);
}
if (baton) {
return baton->addSession(*this, Baton::Type::Out)
.then([&stream, asyncBuffers, baton, this] {
return opportunisticWrite(stream, asyncBuffers, baton);
});
}
return asio::async_write(stream, asyncBuffers, UseFuture{}).ignoreValue();
} else {
return futurize(ec);
}
}
#ifdef MONGO_CONFIG_SSL
template
Future maybeHandshakeSSLForIngress(const MutableBufferSequence& buffer) {
invariant(asio::buffer_size(buffer) >= sizeof(MSGHEADER::Value));
MSGHEADER::ConstView headerView(asio::buffer_cast(buffer));
auto responseTo = headerView.getResponseToMsgId();
if (checkForHTTPRequest(buffer)) {
return Future::makeReady(false);
}
// This logic was taken from the old mongo/util/net/sock.cpp.
//
// It lets us run both TLS and unencrypted mongo over the same port.
//
// The first message received from the client should have the responseTo field of the wire
// protocol message needs to be 0 or -1. Otherwise the connection is either sending
// garbage or a TLS Hello packet which will be caught by the TLS handshake.
if (responseTo != 0 && responseTo != -1) {
if (!_tl->_ingressSSLContext) {
return Future::makeReady(
Status(ErrorCodes::SSLHandshakeFailed,
"SSL handshake received but server is started without SSL support"));
}
auto tlsAlert = checkTLSRequest(buffer);
if (tlsAlert) {
return opportunisticWrite(getSocket(),
asio::buffer(tlsAlert->data(), tlsAlert->size()))
.then([] {
return Future::makeReady(
Status(ErrorCodes::SSLHandshakeFailed,
"SSL handshake failed, as client requested disabled protocol"));
});
}
_sslSocket.emplace(std::move(_socket), *_tl->_ingressSSLContext, "");
auto doHandshake = [&] {
if (_blockingMode == Sync) {
std::error_code ec;
_sslSocket->handshake(asio::ssl::stream_base::server, buffer, ec);
return futurize(ec, asio::buffer_size(buffer));
} else {
return _sslSocket->async_handshake(
asio::ssl::stream_base::server, buffer, UseFuture{});
}
};
return doHandshake().then([this](size_t size) {
auto& sslPeerInfo = SSLPeerInfo::forSession(shared_from_this());
if (sslPeerInfo.subjectName.empty()) {
auto sslManager = getSSLManager();
auto swPeerInfo = sslManager->parseAndValidatePeerCertificate(
_sslSocket->native_handle(), "");
// The value of swPeerInfo is a bit complicated:
//
// If !swPeerInfo.isOK(), then there was an error doing the SSL
// handshake and we should reject the connection.
//
// If !sslPeerInfo.getValue(), then the SSL handshake was successful,
// but the peer didn't provide a SSL certificate, and we do not require
// one. sslPeerInfo should be empty.
//
// Otherwise the SSL handshake was successful and the peer did provide
// a certificate that is valid, and we should store that info on the
// session's SSLPeerInfo decoration.
if (auto optPeerInfo = uassertStatusOK(swPeerInfo)) {
sslPeerInfo = *optPeerInfo;
}
}
return true;
});
} else if (_tl->_sslMode() == SSLParams::SSLMode_requireSSL) {
uasserted(ErrorCodes::SSLHandshakeFailed,
"The server is configured to only allow SSL connections");
} else {
if (!sslGlobalParams.disableNonSSLConnectionLogging &&
_tl->_sslMode() == SSLParams::SSLMode_preferSSL) {
LOG(0) << "SSL mode is set to 'preferred' and connection " << id() << " to "
<< remote() << " is not using SSL.";
}
return Future::makeReady(false);
}
}
#endif
template
bool checkForHTTPRequest(const Buffer& buffers) {
invariant(asio::buffer_size(buffers) >= 4);
const StringData bufferAsStr(asio::buffer_cast(buffers), 4);
return (bufferAsStr == "GET "_sd);
}
// Called from read() to send an HTTP response back to a client that's trying to use HTTP
// over a native MongoDB port. This returns a Future to match its only caller, but it
// always contains an error, so it could really return Future
Future sendHTTPResponse(const BatonHandle& baton = nullptr) {
constexpr auto userMsg =
"It looks like you are trying to access MongoDB over HTTP"
" on the native driver port.\r\n"_sd;
static const std::string httpResp = str::stream() << "HTTP/1.0 200 OK\r\n"
"Connection: close\r\n"
"Content-Type: text/plain\r\n"
"Content-Length: "
<< userMsg.size() << "\r\n\r\n"
<< userMsg;
return write(asio::buffer(httpResp.data(), httpResp.size()), baton)
.onError(
[](const Status& status) {
return Status(
ErrorCodes::ProtocolError,
str::stream()
<< "Client sent an HTTP request over a native MongoDB connection, "
"but there was an error sending a response: "
<< status.toString());
})
.then([] {
return StatusWith(
ErrorCodes::ProtocolError,
"Client sent an HTTP request over a native MongoDB connection");
});
}
enum BlockingMode {
Unknown,
Sync,
Async,
};
BlockingMode _blockingMode = Unknown;
HostAndPort _remote;
HostAndPort _local;
boost::optional _configuredTimeout;
boost::optional _socketTimeout;
GenericSocket _socket;
#ifdef MONGO_CONFIG_SSL
boost::optional> _sslSocket;
bool _ranHandshake = false;
#endif
TransportLayerASIO* const _tl;
bool _isIngressSession;
};
} // namespace transport
} // namespace mongo