summaryrefslogtreecommitdiff
path: root/src/mongo/util/net
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/net')
-rw-r--r--src/mongo/util/net/SConscript11
-rw-r--r--src/mongo/util/net/abstract_message_port.h6
-rw-r--r--src/mongo/util/net/asio_message_port.cpp46
-rw-r--r--src/mongo/util/net/asio_message_port.h3
-rw-r--r--src/mongo/util/net/listen.cpp11
-rw-r--r--src/mongo/util/net/listen.h12
-rw-r--r--src/mongo/util/net/message.h5
-rw-r--r--src/mongo/util/net/message_port.cpp46
-rw-r--r--src/mongo/util/net/message_port.h6
-rw-r--r--src/mongo/util/net/message_port_mock.cpp1
-rw-r--r--src/mongo/util/net/message_port_mock.h1
-rw-r--r--src/mongo/util/net/message_server.h83
-rw-r--r--src/mongo/util/net/message_server_port.cpp244
-rw-r--r--src/mongo/util/net/miniwebserver.cpp2
-rw-r--r--src/mongo/util/net/miniwebserver.h2
-rw-r--r--src/mongo/util/net/sockaddr.cpp19
16 files changed, 54 insertions, 444 deletions
diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript
index fa8899561e9..34105547ab6 100644
--- a/src/mongo/util/net/SConscript
+++ b/src/mongo/util/net/SConscript
@@ -95,17 +95,6 @@ env.Library(
)
env.Library(
- target="message_server_port",
- source=[
- "message_server_port.cpp",
- ],
- LIBDEPS=[
- 'network',
- '$BUILD_DIR/mongo/db/stats/counters',
- ],
-)
-
-env.Library(
target='miniwebserver',
source=[
'miniwebserver.cpp',
diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h
index c4ff5503f41..e4884e75133 100644
--- a/src/mongo/util/net/abstract_message_port.h
+++ b/src/mongo/util/net/abstract_message_port.h
@@ -32,6 +32,7 @@
#include "mongo/config.h"
#include "mongo/logger/log_severity.h"
+#include "mongo/stdx/functional.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/sockaddr.h"
#include "mongo/util/time_support.h"
@@ -88,6 +89,11 @@ public:
virtual void say(Message& toSend, int responseTo = 0) = 0;
/**
+ * Sends the message (does not set headers).
+ */
+ virtual void say(const Message& toSend) = 0;
+
+ /**
* Sends the data over the socket.
*/
virtual void send(const char* data, int len, const char* context) = 0;
diff --git a/src/mongo/util/net/asio_message_port.cpp b/src/mongo/util/net/asio_message_port.cpp
index 88e37feb2d8..16fab7ea8c1 100644
--- a/src/mongo/util/net/asio_message_port.cpp
+++ b/src/mongo/util/net/asio_message_port.cpp
@@ -54,40 +54,6 @@ const char kGET[] = "GET";
const int kHeaderLen = sizeof(MSGHEADER::Value);
const int kInitialMessageSize = 1024;
-class ASIOPorts {
-public:
- void closeAll(AbstractMessagingPort::Tag skipMask) {
- stdx::lock_guard<stdx::mutex> lock_guard(mutex);
- for (auto&& port : _portSet) {
- if (port->getTag() & skipMask) {
- LOG(3) << "Skip closing connection # " << port->connectionId();
- continue;
- }
- LOG(3) << "Closing connection # " << port->connectionId();
- port->shutdown();
- }
- }
-
- void insert(ASIOMessagingPort* p) {
- stdx::lock_guard<stdx::mutex> lock_guard(mutex);
- _portSet.insert(p);
- }
-
- void erase(ASIOMessagingPort* p) {
- stdx::lock_guard<stdx::mutex> lock_guard(mutex);
- _portSet.erase(p);
- }
-
-private:
- stdx::mutex mutex;
- std::unordered_set<ASIOMessagingPort*> _portSet;
-};
-
-// We "new" this so it will still be around when other automatic global vars are being destructed
-// during termination. TODO: This is an artifact from MessagingPort and should be removed when we
-// decide where to put networking global state.
-ASIOPorts& asioPorts = *(new ASIOPorts());
-
#ifdef MONGO_CONFIG_SSL
struct ASIOSSLContextPair {
ASIOSSLContext server;
@@ -109,10 +75,6 @@ MONGO_INITIALIZER_WITH_PREREQUISITES(ASIOSSLContextSetup, ("SSLManager"))(Initia
} // namespace
-void ASIOMessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) {
- asioPorts.closeAll(skipMask);
-}
-
ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd)
: _service(1),
_timer(_service),
@@ -145,7 +107,6 @@ ASIOMessagingPort::ASIOMessagingPort(int fd, SockAddr farEnd)
farEnd.getType() == AF_UNIX ? 0 : IPPROTO_TCP),
fd) {
#endif // MONGO_CONFIG_SSL
- asioPorts.insert(this);
_getSocket().non_blocking(true);
_remote = HostAndPort(farEnd.getAddr(), farEnd.getPort());
_timer.expires_at(decltype(_timer)::time_point::max());
@@ -177,7 +138,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l
#else
_sock(_service) {
#endif // MONGO_CONFIG_SSL
- asioPorts.insert(this);
if (*_timeout == Milliseconds(0)) {
_timeout = boost::none;
}
@@ -187,7 +147,6 @@ ASIOMessagingPort::ASIOMessagingPort(Milliseconds timeout, logger::LogSeverity l
ASIOMessagingPort::~ASIOMessagingPort() {
shutdown();
- asioPorts.erase(this);
}
void ASIOMessagingPort::setTimeout(Milliseconds millis) {
@@ -502,6 +461,11 @@ void ASIOMessagingPort::say(Message& toSend, int responseTo) {
invariant(!toSend.empty());
toSend.header().setId(nextMessageId());
toSend.header().setResponseToMsgId(responseTo);
+ return say(const_cast<const Message&>(toSend));
+}
+
+void ASIOMessagingPort::say(const Message& toSend) {
+ invariant(!toSend.empty());
auto buf = toSend.buf();
if (buf) {
send(buf, MsgData::ConstView(buf).getLen(), nullptr);
diff --git a/src/mongo/util/net/asio_message_port.h b/src/mongo/util/net/asio_message_port.h
index d474b03ef29..d3e5fc33b1c 100644
--- a/src/mongo/util/net/asio_message_port.h
+++ b/src/mongo/util/net/asio_message_port.h
@@ -78,6 +78,7 @@ public:
void reply(Message& received, Message& response) override;
void say(Message& toSend, int responseTo = 0) override;
+ void say(const Message& toSend) override;
void send(const char* data, int len, const char*) override;
void send(const std::vector<std::pair<char*, int>>& data, const char*) override;
@@ -118,8 +119,6 @@ public:
bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override;
- static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask);
-
private:
void _setTimerCallback();
asio::error_code _read(char* buf, std::size_t size);
diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp
index deb30887e5c..65cec09e45a 100644
--- a/src/mongo/util/net/listen.cpp
+++ b/src/mongo/util/net/listen.cpp
@@ -280,7 +280,9 @@ void Listener::initAndListen() {
}
struct timeval maxSelectTime;
- while (!inShutdown()) {
+ // The check against _finished allows us to actually stop the listener by signalling it through
+ // the _finished flag.
+ while (!inShutdown() && !_finished.load()) {
fd_set fds[1];
FD_ZERO(fds);
@@ -601,7 +603,7 @@ void Listener::_accepted(const std::shared_ptr<Socket>& psocket, long long conne
port = stdx::make_unique<MessagingPort>(psocket);
}
port->setConnectionId(connectionId);
- accepted(port.release());
+ accepted(std::move(port));
}
// ----- ListeningSockets -------
@@ -648,9 +650,8 @@ void Listener::checkTicketNumbers() {
globalTicketHolder.resize(want);
}
-void Listener::closeMessagingPorts(AbstractMessagingPort::Tag skipMask) {
- ASIOMessagingPort::closeSockets(skipMask);
- MessagingPort::closeSockets(skipMask);
+void Listener::shutdown() {
+ _finished.store(true);
}
TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN);
diff --git a/src/mongo/util/net/listen.h b/src/mongo/util/net/listen.h
index 0bbd3ad8a0b..f0c66a41d60 100644
--- a/src/mongo/util/net/listen.h
+++ b/src/mongo/util/net/listen.h
@@ -66,7 +66,7 @@ public:
void initAndListen(); // never returns unless error (start a thread)
/* spawn a thread, etc., then return */
- virtual void accepted(AbstractMessagingPort* mp) = 0;
+ virtual void accepted(std::unique_ptr<AbstractMessagingPort> mp) = 0;
const int _port;
@@ -83,6 +83,8 @@ public:
*/
void waitUntilListening() const;
+ void shutdown();
+
private:
std::vector<SockAddr> _mine;
std::vector<SOCKET> _socks;
@@ -94,6 +96,7 @@ private:
mutable stdx::condition_variable _readyCondition; // Used to wait for changes to _ready
// Boolean that indicates whether this Listener is ready to accept incoming network requests
bool _ready;
+ AtomicBool _finished{false};
ServiceContext* _ctx;
bool _setAsServiceCtxDecoration;
@@ -119,13 +122,6 @@ public:
/** makes sure user input is sane */
static void checkTicketNumbers();
-
- /**
- * This will close implementations of AbstractMessagingPort, skipping any that have tags
- * matching `skipMask`.
- */
- static void closeMessagingPorts(
- AbstractMessagingPort::Tag skipMask = AbstractMessagingPort::kSkipAllMask);
};
class ListeningSockets {
diff --git a/src/mongo/util/net/message.h b/src/mongo/util/net/message.h
index 37a6c0bb66b..601d02b9e12 100644
--- a/src/mongo/util/net/message.h
+++ b/src/mongo/util/net/message.h
@@ -458,11 +458,16 @@ public:
return _buf.get();
}
+ const char* buf() const {
+ return _buf.get();
+ }
+
std::string toString() const;
SharedBuffer sharedBuffer() {
return _buf;
}
+
ConstSharedBuffer sharedBuffer() const {
return _buf;
}
diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp
index 5051962ed5d..137d494c4ac 100644
--- a/src/mongo/util/net/message_port.cpp
+++ b/src/mongo/util/net/message_port.cpp
@@ -63,41 +63,6 @@ using std::string;
/* messagingport -------------------------------------------------------------- */
-class Ports {
- std::set<MessagingPort*> ports;
- stdx::mutex m;
-
-public:
- Ports() : ports() {}
- void closeAll(AbstractMessagingPort::Tag skip_mask) {
- stdx::lock_guard<stdx::mutex> bl(m);
- for (std::set<MessagingPort*>::iterator i = ports.begin(); i != ports.end(); i++) {
- if ((*i)->getTag() & skip_mask) {
- LOG(3) << "Skip closing connection # " << (*i)->connectionId();
- continue;
- }
- LOG(3) << "Closing connection # " << (*i)->connectionId();
- (*i)->shutdown();
- }
- }
- void insert(MessagingPort* p) {
- stdx::lock_guard<stdx::mutex> bl(m);
- ports.insert(p);
- }
- void erase(MessagingPort* p) {
- stdx::lock_guard<stdx::mutex> bl(m);
- ports.erase(p);
- }
-};
-
-// we "new" this so it is still be around when other automatic global vars
-// are being destructed during termination.
-Ports& ports = *(new Ports());
-
-void MessagingPort::closeSockets(AbstractMessagingPort::Tag skipMask) {
- ports.closeAll(skipMask);
-}
-
MessagingPort::MessagingPort(int fd, const SockAddr& remote)
: MessagingPort(std::make_shared<Socket>(fd, remote)) {}
@@ -108,7 +73,6 @@ MessagingPort::MessagingPort(std::shared_ptr<Socket> sock)
: _x509SubjectName(), _connectionId(), _tag(), _psock(std::move(sock)) {
SockAddr sa = _psock->remoteAddr();
_remoteParsed = HostAndPort(sa.getAddr(), sa.getPort());
- ports.insert(this);
}
void MessagingPort::setTimeout(Milliseconds millis) {
@@ -122,7 +86,6 @@ void MessagingPort::shutdown() {
MessagingPort::~MessagingPort() {
shutdown();
- ports.erase(this);
}
bool MessagingPort::recv(Message& m) {
@@ -216,6 +179,7 @@ bool MessagingPort::call(Message& toSend, Message& response) {
say(toSend);
bool success = recv(response);
if (success) {
+ invariant(!response.empty());
if (response.header().getResponseToMsgId() != toSend.header().getId()) {
response.reset();
uasserted(40134, "Response ID did not match the sent message ID.");
@@ -225,9 +189,15 @@ bool MessagingPort::call(Message& toSend, Message& response) {
}
void MessagingPort::say(Message& toSend, int responseTo) {
- verify(!toSend.empty());
+ invariant(!toSend.empty());
toSend.header().setId(nextMessageId());
toSend.header().setResponseToMsgId(responseTo);
+
+ return say(const_cast<const Message&>(toSend));
+}
+
+void MessagingPort::say(const Message& toSend) {
+ invariant(!toSend.empty());
auto buf = toSend.buf();
if (buf) {
send(buf, MsgData::ConstView(buf).getLen(), "say");
diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h
index 18931e20750..41ecc2fce3b 100644
--- a/src/mongo/util/net/message_port.h
+++ b/src/mongo/util/net/message_port.h
@@ -66,6 +66,7 @@ public:
bool call(Message& toSend, Message& response) override;
void say(Message& toSend, int responseTo = 0) override;
+ void say(const Message& toSend) override;
unsigned remotePort() const override {
return _psock->remotePort();
@@ -142,11 +143,6 @@ private:
long long _connectionId;
AbstractMessagingPort::Tag _tag;
std::shared_ptr<Socket> _psock;
-
-
-public:
- static void closeSockets(AbstractMessagingPort::Tag skipMask = kSkipAllMask);
};
-
} // namespace mongo
diff --git a/src/mongo/util/net/message_port_mock.cpp b/src/mongo/util/net/message_port_mock.cpp
index 6869723129f..de07ba032ae 100644
--- a/src/mongo/util/net/message_port_mock.cpp
+++ b/src/mongo/util/net/message_port_mock.cpp
@@ -55,6 +55,7 @@ void MessagingPortMock::reply(Message& received, Message& response, int32_t resp
void MessagingPortMock::reply(Message& received, Message& response) {}
void MessagingPortMock::say(Message& toSend, int responseTo) {}
+void MessagingPortMock::say(const Message& toSend) {}
bool MessagingPortMock::connect(SockAddr& farEnd) {
return true;
diff --git a/src/mongo/util/net/message_port_mock.h b/src/mongo/util/net/message_port_mock.h
index 61ab5be8958..721998f81f3 100644
--- a/src/mongo/util/net/message_port_mock.h
+++ b/src/mongo/util/net/message_port_mock.h
@@ -55,6 +55,7 @@ public:
void reply(Message& received, Message& response) override;
void say(Message& toSend, int responseTo = 0) override;
+ void say(const Message& toSend) override;
bool connect(SockAddr& farEnd) override;
diff --git a/src/mongo/util/net/message_server.h b/src/mongo/util/net/message_server.h
deleted file mode 100644
index 544c859c6f7..00000000000
--- a/src/mongo/util/net/message_server.h
+++ /dev/null
@@ -1,83 +0,0 @@
-// message_server.h
-
-/* Copyright 2009 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-/*
- abstract database server
- async io core, worker thread system
- */
-
-#pragma once
-
-#include "mongo/platform/basic.h"
-
-namespace mongo {
-
-class ServiceContext;
-
-class MessageHandler {
-public:
- virtual ~MessageHandler() {}
-
- /**
- * Called once when a socket is connected.
- */
- virtual void connected(AbstractMessagingPort* p) = 0;
-
- /**
- * Called every time a message comes in. Handler is responsible for responding to client.
- */
- virtual void process(Message& m, AbstractMessagingPort* p) = 0;
-
- /**
- * Called once, either when the client disconnects or when the process is shutting down. After
- * close() is called, this handler's AbstractMessagingPort pointer (passed in via the
- * connected() method) is no longer valid.
- */
- virtual void close() = 0;
-};
-
-class MessageServer {
-public:
- struct Options {
- int port; // port to bind to
- std::string ipList; // addresses to bind to
-
- Options() : port(0), ipList("") {}
- };
-
- virtual ~MessageServer() {}
- virtual void run() = 0;
- virtual bool setupSockets() = 0;
-};
-
-// TODO use a factory here to decide between port and asio variations
-MessageServer* createServer(const MessageServer::Options& opts,
- std::shared_ptr<MessageHandler> handler,
- ServiceContext* ctx);
-}
diff --git a/src/mongo/util/net/message_server_port.cpp b/src/mongo/util/net/message_server_port.cpp
deleted file mode 100644
index e8432f8c510..00000000000
--- a/src/mongo/util/net/message_server_port.cpp
+++ /dev/null
@@ -1,244 +0,0 @@
-// message_server_port.cpp
-
-/* Copyright 2009 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-#include <system_error>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/config.h"
-#include "mongo/db/lasterror.h"
-#include "mongo/db/server_options.h"
-#include "mongo/db/stats/counters.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/concurrency/thread_name.h"
-#include "mongo/util/concurrency/ticketholder.h"
-#include "mongo/util/debug_util.h"
-#include "mongo/util/exit.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/net/abstract_message_port.h"
-#include "mongo/util/net/listen.h"
-#include "mongo/util/net/message.h"
-#include "mongo/util/net/message_server.h"
-#include "mongo/util/net/socket_exception.h"
-#include "mongo/util/net/ssl_manager.h"
-#include "mongo/util/net/thread_idle_callback.h"
-#include "mongo/util/quick_exit.h"
-#include "mongo/util/scopeguard.h"
-
-#ifdef __linux__ // TODO: consider making this ifndef _WIN32
-#include <sys/resource.h>
-#endif
-
-#if !defined(__has_feature)
-#define __has_feature(x) 0
-#endif
-
-namespace mongo {
-
-using std::unique_ptr;
-using std::endl;
-
-namespace {
-
-using MessagingPortWithHandler = std::pair<AbstractMessagingPort*, std::shared_ptr<MessageHandler>>;
-
-} // namespace
-
-class PortMessageServer : public MessageServer, public Listener {
-public:
- /**
- * Creates a new message server.
- *
- * @param opts
- * @param handler the handler to use.
- */
- PortMessageServer(const MessageServer::Options& opts,
- std::shared_ptr<MessageHandler> handler,
- ServiceContext* ctx)
- : Listener("", opts.ipList, opts.port, ctx, true), _handler(std::move(handler)) {}
-
- virtual void accepted(AbstractMessagingPort* mp) {
- ScopeGuard sleepAfterClosingPort = MakeGuard(sleepmillis, 2);
- auto portWithHandler = stdx::make_unique<MessagingPortWithHandler>(mp, _handler);
-
- if (!Listener::globalTicketHolder.tryAcquire()) {
- log() << "connection refused because too many open connections: "
- << Listener::globalTicketHolder.used() << endl;
- return;
- }
-
- try {
-#ifndef __linux__ // TODO: consider making this ifdef _WIN32
- {
- stdx::thread thr(stdx::bind(&handleIncomingMsg, portWithHandler.get()));
- thr.detach();
- }
-#else
- pthread_attr_t attrs;
- pthread_attr_init(&attrs);
- pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
-
- static const size_t STACK_SIZE =
- 1024 * 1024; // if we change this we need to update the warning
-
- struct rlimit limits;
- verify(getrlimit(RLIMIT_STACK, &limits) == 0);
- if (limits.rlim_cur > STACK_SIZE) {
- size_t stackSizeToSet = STACK_SIZE;
-#if !__has_feature(address_sanitizer)
- if (kDebugBuild)
- stackSizeToSet /= 2;
-#endif
- pthread_attr_setstacksize(&attrs, stackSizeToSet);
- } else if (limits.rlim_cur < 1024 * 1024) {
- warning() << "Stack size set to " << (limits.rlim_cur / 1024)
- << "KB. We suggest 1MB" << endl;
- }
-
-
- pthread_t thread;
- int failed = pthread_create(&thread, &attrs, &handleIncomingMsg, portWithHandler.get());
-
- pthread_attr_destroy(&attrs);
-
- if (failed) {
- log() << "pthread_create failed: " << errnoWithDescription(failed) << endl;
- throw std::system_error(
- std::make_error_code(std::errc::resource_unavailable_try_again));
- }
-#endif // __linux__
-
- portWithHandler.release();
- sleepAfterClosingPort.Dismiss();
- } catch (...) {
- Listener::globalTicketHolder.release();
- log() << "failed to create thread after accepting new connection, closing connection";
- }
- }
-
- virtual bool setupSockets() {
- return Listener::setupSockets();
- }
-
- void run() {
- initAndListen();
- }
-
- virtual bool useUnixSockets() const {
- return true;
- }
-
-private:
- const std::shared_ptr<MessageHandler> _handler;
-
- /**
- * Handles incoming messages from a given socket.
- *
- * Terminating conditions:
- * 1. Assertions while handling the request.
- * 2. Socket is closed.
- * 3. Server is shutting down (based on inShutdown)
- *
- * @param arg this method is in charge of cleaning up the arg object.
- *
- * @return NULL
- */
- static void* handleIncomingMsg(void* arg) {
- TicketHolderReleaser connTicketReleaser(&Listener::globalTicketHolder);
-
- invariant(arg);
- unique_ptr<MessagingPortWithHandler> portWithHandler(
- static_cast<MessagingPortWithHandler*>(arg));
- auto mp = portWithHandler->first;
- auto handler = portWithHandler->second;
-
- setThreadName(std::string(str::stream() << "conn" << mp->connectionId()));
- mp->setLogLevel(logger::LogSeverity::Debug(1));
-
- Message m;
- int64_t counter = 0;
- try {
- handler->connected(mp);
- ON_BLOCK_EXIT([handler]() { handler->close(); });
-
- while (!inShutdown()) {
- m.reset();
- mp->clearCounters();
-
- if (!mp->recv(m)) {
- if (!serverGlobalParams.quiet) {
- int conns = Listener::globalTicketHolder.used() - 1;
- const char* word = (conns == 1 ? " connection" : " connections");
- log() << "end connection " << mp->remote().toString() << " (" << conns
- << word << " now open)" << endl;
- }
- break;
- }
-
- handler->process(m, mp);
- networkCounter.hit(mp->getBytesIn(), mp->getBytesOut());
-
- // Occasionally we want to see if we're using too much memory.
- if ((counter++ & 0xf) == 0) {
- markThreadIdle();
- }
- }
- } catch (AssertionException& e) {
- log() << "AssertionException handling request, closing client connection: " << e
- << endl;
- } catch (SocketException& e) {
- log() << "SocketException handling request, closing client connection: " << e << endl;
- } catch (const DBException&
- e) { // must be right above std::exception to avoid catching subclasses
- log() << "DBException handling request, closing client connection: " << e << endl;
- } catch (std::exception& e) {
- error() << "Uncaught std::exception: " << e.what() << ", terminating" << endl;
- quickExit(EXIT_UNCAUGHT);
- }
- mp->shutdown();
-
- return NULL;
- }
-};
-
-
-MessageServer* createServer(const MessageServer::Options& opts,
- std::shared_ptr<MessageHandler> handler,
- ServiceContext* ctx) {
- return new PortMessageServer(opts, std::move(handler), ctx);
-}
-
-} // namespace mongo
diff --git a/src/mongo/util/net/miniwebserver.cpp b/src/mongo/util/net/miniwebserver.cpp
index ec64f174560..78f81f86134 100644
--- a/src/mongo/util/net/miniwebserver.cpp
+++ b/src/mongo/util/net/miniwebserver.cpp
@@ -204,7 +204,7 @@ void MiniWebServer::_accepted(const std::shared_ptr<Socket>& psock, long long co
}
}
-void MiniWebServer::accepted(AbstractMessagingPort* mp) {
+void MiniWebServer::accepted(std::unique_ptr<AbstractMessagingPort> mp) {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/util/net/miniwebserver.h b/src/mongo/util/net/miniwebserver.h
index a2d0690225e..122281555fc 100644
--- a/src/mongo/util/net/miniwebserver.h
+++ b/src/mongo/util/net/miniwebserver.h
@@ -69,7 +69,7 @@ public:
}
// This is not currently used for the MiniWebServer. See SERVER-24200
- void accepted(AbstractMessagingPort* mp) override;
+ void accepted(std::unique_ptr<AbstractMessagingPort> mp) override;
private:
void _accepted(const std::shared_ptr<Socket>& psocket, long long connectionId) override;
diff --git a/src/mongo/util/net/sockaddr.cpp b/src/mongo/util/net/sockaddr.cpp
index 00960f2fd97..5236a76892f 100644
--- a/src/mongo/util/net/sockaddr.cpp
+++ b/src/mongo/util/net/sockaddr.cpp
@@ -46,8 +46,8 @@
#endif
#endif
+#include "mongo/bson/util/builder.h"
#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/sock.h"
namespace mongo {
@@ -150,10 +150,19 @@ bool SockAddr::isLocalHost() const {
}
std::string SockAddr::toString(bool includePort) const {
- std::string out = getAddr();
- if (includePort && getType() != AF_UNIX && getType() != AF_UNSPEC)
- out += mongoutils::str::stream() << ':' << getPort();
- return out;
+ if (includePort && (getType() != AF_UNIX) && (getType() != AF_UNSPEC)) {
+ StringBuilder ss;
+
+ if (getType() == AF_INET6) {
+ ss << '[' << getAddr() << "]:" << getPort();
+ } else {
+ ss << getAddr() << ':' << getPort();
+ }
+
+ return ss.str();
+ } else {
+ return getAddr();
+ }
}
sa_family_t SockAddr::getType() const {