/**
* Copyright (C) 2016 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/platform/basic.h"
#include
#include
#include
#include "mongo/transport/transport_layer_legacy.h"
#include "mongo/base/checked_cast.h"
#include "mongo/config.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/stdx/functional.h"
#include "mongo/transport/service_entry_point.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/net/abstract_message_port.h"
#include "mongo/util/net/socket_exception.h"
#include "mongo/util/net/ssl_types.h"
namespace mongo {
namespace transport {
TransportLayerLegacy::Options::Options(const ServerGlobalParams* params)
: port(params->port), ipList(params->bind_ips) {}
TransportLayerLegacy::ListenerLegacy::ListenerLegacy(const TransportLayerLegacy::Options& opts,
NewConnectionCb callback)
: Listener("",
[](const std::vector& ips) -> std::string {
// convert IP vector back to string for compatibility with legacy networking code
// forgive me father
StringBuilder ip_str;
StringData comma;
for (const auto& ip : ips) {
ip_str << comma << ip;
comma = ",";
}
return ip_str.str();
}(opts.ipList),
opts.port,
getGlobalServiceContext(),
true),
_accepted(std::move(callback)) {}
void TransportLayerLegacy::ListenerLegacy::accepted(std::unique_ptr mp) {
_accepted(std::move(mp));
}
TransportLayerLegacy::TransportLayerLegacy(const TransportLayerLegacy::Options& opts,
ServiceEntryPoint* sep)
: _sep(sep),
_listener(stdx::make_unique(
opts,
stdx::bind(&TransportLayerLegacy::_handleNewConnection, this, stdx::placeholders::_1))),
_running(false),
_options(opts) {}
std::shared_ptr TransportLayerLegacy::LegacySession::create(
std::unique_ptr amp, TransportLayerLegacy* tl) {
std::shared_ptr handle(new LegacySession(std::move(amp), tl));
return handle;
}
TransportLayerLegacy::LegacySession::LegacySession(std::unique_ptr amp,
TransportLayerLegacy* tl)
: Session(amp->connectionId()),
_remote(amp->remoteAddr()),
_local(amp->localAddr()),
_tl(tl),
_connection(stdx::make_unique(std::move(amp))) {}
TransportLayerLegacy::LegacySession::~LegacySession() {
_tl->_destroy(*this);
}
TransportLayerLegacy::LegacyTicket::LegacyTicket(const LegacySessionHandle& session,
Date_t expiration,
WorkHandle work)
: _session(session),
_sessionId(session->id()),
_expiration(expiration),
_fill(std::move(work)) {}
TransportLayerLegacy::LegacySessionHandle TransportLayerLegacy::LegacyTicket::getSession() {
return _session.lock();
}
Session::Id TransportLayerLegacy::LegacyTicket::sessionId() const {
return _sessionId;
}
Date_t TransportLayerLegacy::LegacyTicket::expiration() const {
return _expiration;
}
Status TransportLayerLegacy::LegacyTicket::fill(AbstractMessagingPort* amp) {
return _fill(amp);
}
Status TransportLayerLegacy::setup() {
if (!_listener->setupSockets()) {
error() << "Failed to set up sockets during startup.";
return {ErrorCodes::InternalError, "Failed to set up sockets"};
}
return Status::OK();
}
Status TransportLayerLegacy::start() {
if (_running.swap(true)) {
return {ErrorCodes::InternalError, "TransportLayer is already running"};
}
try {
_listenerThread = stdx::thread([this]() { _listener->initAndListen(); });
_listener->waitUntilListening();
return Status::OK();
} catch (...) {
return {ErrorCodes::InternalError, "Failed to start listener thread."};
}
}
TransportLayerLegacy::~TransportLayerLegacy() = default;
Ticket TransportLayerLegacy::sourceMessage(const SessionHandle& session,
Message* message,
Date_t expiration) {
auto sourceCb = [message](AbstractMessagingPort* amp) -> Status {
if (!amp->recv(*message)) {
return {ErrorCodes::HostUnreachable, "Recv failed"};
}
networkCounter.hitPhysicalIn(message->size());
return Status::OK();
};
auto legacySession = checked_pointer_cast(session);
return Ticket(
this,
stdx::make_unique(std::move(legacySession), expiration, std::move(sourceCb)));
}
Ticket TransportLayerLegacy::sinkMessage(const SessionHandle& session,
const Message& message,
Date_t expiration) {
auto sinkCb = [&message](AbstractMessagingPort* amp) -> Status {
try {
amp->say(message);
networkCounter.hitPhysicalOut(message.size());
return Status::OK();
} catch (const SocketException& e) {
return {ErrorCodes::HostUnreachable, e.what()};
}
};
auto legacySession = checked_pointer_cast(session);
return Ticket(
this,
stdx::make_unique(std::move(legacySession), expiration, std::move(sinkCb)));
}
Status TransportLayerLegacy::wait(Ticket&& ticket) {
return _runTicket(std::move(ticket));
}
void TransportLayerLegacy::asyncWait(Ticket&& ticket, TicketCallback callback) {
// Left unimplemented because there is no reasonable way to offer general async waiting besides
// offering a background thread that can handle waits for multiple tickets. We may never
// implement this for the legacy TL.
MONGO_UNREACHABLE;
}
void TransportLayerLegacy::end(const SessionHandle& session) {
auto legacySession = checked_pointer_cast(session);
_closeConnection(legacySession->conn());
}
void TransportLayerLegacy::_closeConnection(Connection* conn) {
stdx::lock_guard lk(conn->closeMutex);
if (conn->closed) {
return;
}
conn->closed = true;
conn->amp->shutdown();
}
void TransportLayerLegacy::shutdown() {
_running.store(false);
ListeningSockets::get()->closeAll();
_listener->shutdown();
if (_listenerThread.joinable()) {
_listenerThread.join();
}
}
void TransportLayerLegacy::_destroy(LegacySession& session) {
if (!session.conn()->closed) {
_closeConnection(session.conn());
}
}
Status TransportLayerLegacy::_runTicket(Ticket ticket) {
if (!_running.load()) {
return TransportLayer::ShutdownStatus;
}
if (ticket.expiration() < Date_t::now()) {
return Ticket::ExpiredStatus;
}
// get the weak_ptr out of the ticket
// attempt to make it into a shared_ptr
auto legacyTicket = checked_cast(getTicketImpl(ticket));
auto session = legacyTicket->getSession();
if (!session) {
return TransportLayer::TicketSessionClosedStatus;
}
auto conn = session->conn();
if (conn->closed) {
return TransportLayer::TicketSessionClosedStatus;
}
Status res = Status::OK();
try {
res = legacyTicket->fill(conn->amp.get());
} catch (...) {
res = exceptionToStatus();
}
#ifdef MONGO_CONFIG_SSL
// If we didn't have an X509 subject name, see if we have one now
auto& sslPeerInfo = SSLPeerInfo::forSession(legacyTicket->getSession());
if (sslPeerInfo.subjectName.empty()) {
auto info = conn->amp->getX509PeerInfo();
if (!info.subjectName.empty()) {
sslPeerInfo = info;
}
}
#endif
return res;
}
void TransportLayerLegacy::_handleNewConnection(std::unique_ptr amp) {
amp->setLogLevel(logger::LogSeverity::Debug(1));
auto session = LegacySession::create(std::move(amp), this);
invariant(_sep);
_sep->startSession(std::move(session));
}
} // namespace transport
} // namespace mongo