diff options
Diffstat (limited to 'src')
20 files changed, 6 insertions, 2044 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index 2e063b97746..5f054e5588d 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -295,22 +295,6 @@ env.CppUnitTest( ], ) -env.CppUnitTest( - target='scoped_db_conn_test', - source=[ - 'scoped_db_conn_test.cpp', - ], - LIBDEPS=[ - 'clientdriver', - '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init', - '$BUILD_DIR/mongo/db/service_context_noop_init', - '$BUILD_DIR/mongo/transport/transport_layer', - '$BUILD_DIR/mongo/transport/transport_layer_egress_init', - '$BUILD_DIR/mongo/util/net/network', - '$BUILD_DIR/mongo/util/version_impl', - ], -) - env.Library( target='fetcher', source=[ diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index 457839a9fba..ed79c9cf51c 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -39,6 +39,7 @@ #include "mongo/db/dbmessage.h" #include "mongo/db/jsobj.h" #include "mongo/db/write_concern_options.h" +#include "mongo/logger/log_severity.h" #include "mongo/platform/atomic_word.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/protocol.h" diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp deleted file mode 100644 index 50c00521947..00000000000 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ /dev/null @@ -1,613 +0,0 @@ -/* Copyright 2012 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault - -#include "mongo/platform/basic.h" - -#include <string> -#include <vector> - -#include "mongo/client/connpool.h" -#include "mongo/client/global_conn_pool.h" -#include "mongo/db/service_context.h" -#include "mongo/db/wire_version.h" -#include "mongo/rpc/factory.h" -#include "mongo/rpc/reply_builder_interface.h" -#include "mongo/stdx/mutex.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_entry_point.h" -#include "mongo/transport/session.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/transport/transport_layer_asio.h" -#include "mongo/unittest/unittest.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" -#include "mongo/util/net/listen.h" -#include "mongo/util/net/socket_exception.h" -#include "mongo/util/quick_exit.h" -#include "mongo/util/time_support.h" -#include "mongo/util/timer.h" - -/** - * Tests for ScopedDbConnection, particularly in connection pool management. - * The tests also indirectly tests DBClientConnection's failure detection - * logic (hence the use of the dummy server as opposed to mocking the - * connection). - */ - -namespace mongo { - -using std::unique_ptr; -using std::string; -using std::vector; - -class Client; -class OperationContext; - -namespace { - -const string TARGET_HOST = "localhost:27017"; -const int TARGET_PORT = 27017; - -class DummyServiceEntryPoint : public ServiceEntryPoint { - MONGO_DISALLOW_COPYING(DummyServiceEntryPoint); - -public: - DummyServiceEntryPoint() {} - - virtual ~DummyServiceEntryPoint() { - for (auto& t : _threads) { - t.join(); - } - } - - void startSession(transport::SessionHandle session) override { - _threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session)); - } - - // This is not used in this test, so it is only here to complete the interface of - // ServiceEntryPoint - void endAllSessions(transport::Session::TagMask tags) override { - MONGO_UNREACHABLE; - } - - bool shutdown(Milliseconds timeout) override { - return true; - } - - Stats sessionStats() const override { - return {}; - } - - size_t numOpenSessions() const override { - return 0ULL; - } - - void setReplyDelay(Milliseconds delay) { - _replyDelay = delay; - } - - DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { - MONGO_UNREACHABLE; - } - -private: - void run(transport::SessionHandle session) { - auto swInMessage = session->sourceMessage(); - if (!swInMessage.isOK()) { - return; - } - Message inMessage = swInMessage.getValue(); - - auto request = rpc::opMsgRequestFromAnyProtocol(inMessage); - commandRequestHook(request); - - auto reply = rpc::makeReplyBuilder(rpc::protocolForMessage(inMessage)); - - BSONObjBuilder commandResponse; - - // We need to handle the isMaster received during connection. - if (request.getCommandName() == "isMaster") { - commandResponse.append("maxWireVersion", WireVersion::LATEST_WIRE_VERSION); - commandResponse.append("minWireVersion", WireVersion::RELEASE_2_4_AND_BEFORE); - } - - auto response = reply->setCommandReply(commandResponse.obj()) - .setMetadata(rpc::makeEmptyMetadata()) - .done(); - - response.header().setResponseToMsgId(inMessage.header().getId()); - - if (_replyDelay.count() > 0) { - log() << "Delaying response for " << _replyDelay; - sleepFor(_replyDelay); - } - - session->sinkMessage(response).ignore(); - } - - /** - * Subclasses can override this in order to make assertions about the command request. - */ - virtual void commandRequestHook(const OpMsgRequest& request) const {} - - std::vector<stdx::thread> _threads; - Milliseconds _replyDelay{0}; -}; - -// TODO: Take this out and make it as a reusable class in a header file. The only -// thing that is preventing this from happening is the dependency on the inShutdown -// method to terminate the socket listener thread. -/** - * Very basic server that accepts connections. Note: can only create one instance - * at a time. Should not create as a static global instance because of dependency - * with ListeningSockets::_instance. - * - * Warning: Not thread-safe - * - * Note: external symbols used: - * shutDownMutex, shuttingDown - */ -class DummyServer { -public: - /** - * Creates a new server that listens to the given port. - * - * @param port the port number to listen to. - */ - DummyServer(int port) : _port(port) {} - - ~DummyServer() { - stop(); - } - - /** - * Starts the server if it is not yet running. - * - * @param messageHandler the message handler to use for this server. Ownership - * of this object is passed to this server. - */ - void run(ServiceEntryPoint* serviceEntryPoint) { - if (_server) { - return; - } - - transport::TransportLayerASIO::Options options; - options.port = _port; - - _server = stdx::make_unique<transport::TransportLayerASIO>(options, serviceEntryPoint); - _serverThread = stdx::thread(runServer, _server.get()); - } - - /** - * Stops the server if it is running. - */ - void stop() { - if (!_server) { - return; - } - - ListeningSockets::get()->closeAll(); - _serverThread.join(); - - int connCount = Listener::globalTicketHolder.used(); - size_t iterCount = 0; - while (connCount > 0) { - if ((++iterCount % 20) == 0) { - log() << "DummyServer: Waiting for " << connCount << " connections to close." - << std::endl; - } - - sleepmillis(500); - connCount = Listener::globalTicketHolder.used(); - } - _server->shutdown(); - _server.reset(); - } - - /** - * Helper method for running the server on a separate thread. - */ - static void runServer(transport::TransportLayer* server) { - server->setup().transitional_ignore(); - server->start().transitional_ignore(); - } - -private: - const int _port; - - stdx::thread _serverThread; - unique_ptr<transport::TransportLayer> _server; -}; - -/** - * Warning: cannot run in parallel - */ -class DummyServerFixture : public unittest::Test { -public: - void setUp() { - WireSpec::instance().isInternalClient = isInternalClient(); - - _maxPoolSizePerHost = globalConnPool.getMaxPoolSize(); - _dummyServer = stdx::make_unique<DummyServer>(TARGET_PORT); - - _dummyServiceEntryPoint = makeServiceEntryPoint(); - _dummyServer->run(_dummyServiceEntryPoint.get()); - DBClientConnection conn; - Timer timer; - - // Make sure the dummy server is up and running before proceeding - while (true) { - auto connectStatus = conn.connect(HostAndPort{TARGET_HOST}, StringData()); - if (connectStatus.isOK()) { - break; - } - if (timer.seconds() > 20) { - FAIL(str::stream() << "Timed out connecting to dummy server: " - << connectStatus.toString()); - } - } - } - - void tearDown() { - ScopedDbConnection::clearPool(); - _dummyServer.reset(); - _dummyServiceEntryPoint.reset(); - - globalConnPool.setMaxPoolSize(_maxPoolSizePerHost); - } - -protected: - static void assertGreaterThan(uint64_t a, uint64_t b) { - ASSERT_GREATER_THAN(a, b); - } - - static void assertNotEqual(uint64_t a, uint64_t b) { - ASSERT_NOT_EQUALS(a, b); - } - - void setReplyDelay(Milliseconds delay) { - _dummyServiceEntryPoint->setReplyDelay(delay); - } - - /** - * Tries to grab a series of connections from the pool, perform checks on - * them, then put them back into the globalConnPool. After that, it checks these - * connections can be retrieved again from the pool. - * - * @param checkFunc method for comparing new connections and arg2. - * @param arg2 the value to pass as the 2nd parameter of checkFunc. - * @param newConnsToCreate the number of new connections to make. - */ - void checkNewConns(void (*checkFunc)(uint64_t, uint64_t), - uint64_t arg2, - const int newConnsToCreate) { - vector<ScopedDbConnection*> newConnList; - - for (int x = 0; x < newConnsToCreate; x++) { - ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST); - checkFunc(newConn->get()->getSockCreationMicroSec(), arg2); - newConnList.push_back(newConn); - } - - std::set<long long> connIds; - int prevNumBadConns = globalConnPool.getNumBadConns(TARGET_HOST); - for (vector<ScopedDbConnection*>::iterator iter = newConnList.begin(); - iter != newConnList.end(); - ++iter) { - - connIds.insert((*iter)->get()->getConnectionId()); - - // ScopedDbConnection::done() may not successfuly add the connection back to - // the pool if the connection has failed. If the connection is not addded back - // to the pool, we increment the number of bad connections in the pool by one - // (see PoolForHost::done()). We then use the number of bad connections to - // verify the number of connections successfully added back to the pool. - (*iter)->done(); - delete *iter; - } - int numBadConns = globalConnPool.getNumBadConns(TARGET_HOST) - prevNumBadConns; - const int numConnsInPool = globalConnPool.getNumAvailableConns(TARGET_HOST); - ASSERT_EQ(numConnsInPool, newConnsToCreate - numBadConns); - - newConnList.clear(); - - // Check that connections created after the purge were put back to the pool. - int numReusedConns = 0; - prevNumBadConns = globalConnPool.getNumBadConns(TARGET_HOST); - for (int x = 0; x < newConnsToCreate; x++) { - // ScopedDBConnection will attempt to reuse a connection from the pool if - // the pool is not empty. It may fail to reuse that connection if that - // connection has gone bad, in that case, it'll try to get another connection - // from the pool and increment the number of bad connections in the pool - // If the pool is empty however, it'll create a new connection. - // See PoolForHost::get() and DBConnectionPool::get(). - ScopedDbConnection* newConn = new ScopedDbConnection(TARGET_HOST); - newConnList.push_back(newConn); - if (connIds.count(newConn->get()->getConnectionId())) { - numReusedConns++; - } - } - numBadConns = globalConnPool.getNumBadConns(TARGET_HOST) - prevNumBadConns; - // Each bad connection is not a reused connection. Therefore the number of reused - // connections plus the number of bad ones should be equal to the number of connections - // that were in the pool. - ASSERT_EQ(numConnsInPool, numReusedConns + numBadConns); - ASSERT_EQ(globalConnPool.getNumAvailableConns(TARGET_HOST), 0); - - for (vector<ScopedDbConnection*>::iterator iter = newConnList.begin(); - iter != newConnList.end(); - ++iter) { - (*iter)->done(); - delete *iter; - } - } - -private: - static void runServer(transport::TransportLayer* server) { - server->setup().transitional_ignore(); - server->start().transitional_ignore(); - } - - /** - * Subclasses can override this in order to use a specialized service entry point. - */ - virtual std::unique_ptr<DummyServiceEntryPoint> makeServiceEntryPoint() const { - return stdx::make_unique<DummyServiceEntryPoint>(); - } - - /** - * Subclasses can override this to make the client code behave like an internal client (e.g. - * mongod or mongos) as opposed to an external one (e.g. the shell). - */ - virtual bool isInternalClient() const { - return false; - } - - std::unique_ptr<DummyServer> _dummyServer; - std::unique_ptr<DummyServiceEntryPoint> _dummyServiceEntryPoint; - uint32_t _maxPoolSizePerHost; -}; - -TEST_F(DummyServerFixture, BasicScopedDbConnection) { - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); - - DBClientBase* conn1Ptr = conn1.get(); - conn1.done(); - - ScopedDbConnection conn3(TARGET_HOST); - ASSERT_EQUALS(conn1Ptr, conn3.get()); - - conn2.done(); - conn3.done(); -} - -TEST_F(DummyServerFixture, ScopedDbConnectionWithTimeout) { - auto delay = Milliseconds{8000}; - auto gracePeriod = Milliseconds{100}; - auto uri_sw = MongoURI::parse("mongodb://" + TARGET_HOST + "/?socketTimeoutMS=4000"); - ASSERT_OK(uri_sw.getStatus()); - auto uri = uri_sw.getValue(); - Date_t start, end; - const auto uriTimeout = Seconds{4}; - const auto overrideTimeout = Seconds{1}; - - ScopedDbConnection conn1(TARGET_HOST); - - setReplyDelay(delay); - - log() << "Testing ConnectionString timeouts"; - start = Date_t::now(); - ASSERT_THROWS(ScopedDbConnection conn2(TARGET_HOST, overrideTimeout.count()), NetworkException); - end = Date_t::now(); - // We add 100 milliseconds here because on some platforms the connection might timeout after - // 997ms instead of >= 1000ms. - ASSERT_GTE((end - start) + gracePeriod, overrideTimeout); - ASSERT_LT(end - start, uriTimeout); - - log() << "Testing MongoURI with explicit timeout"; - start = Date_t::now(); - ASSERT_THROWS(ScopedDbConnection conn4(uri, overrideTimeout.count()), AssertionException); - end = Date_t::now(); - ASSERT_GTE((end - start) + gracePeriod, overrideTimeout); - ASSERT_LT(end - start, uriTimeout); - - log() << "Testing MongoURI doesn't timeout"; - start = Date_t::now(); - ScopedDbConnection conn5(uri); - end = Date_t::now(); - ASSERT_GREATER_THAN((end - start) + gracePeriod, uriTimeout); - - setReplyDelay(Milliseconds{0}); -} - -TEST_F(DummyServerFixture, InvalidateBadConnInPool) { - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); - ScopedDbConnection conn3(TARGET_HOST); - - conn1.done(); - conn3.done(); - - const uint64_t badCreationTime = curTimeMicros64(); - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn); - - try { - conn2->query("test.user", Query()); - } catch (const NetworkException&) { - } - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off); - conn2.done(); - - checkNewConns(assertGreaterThan, badCreationTime, 10); -} - -TEST_F(DummyServerFixture, DontReturnKnownBadConnToPool) { - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); - ScopedDbConnection conn3(TARGET_HOST); - - conn1.done(); - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn); - - try { - conn3->query("test.user", Query()); - } catch (const NetworkException&) { - } - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off); - - const uint64_t badCreationTime = conn3->getSockCreationMicroSec(); - conn3.done(); - // attempting to put a 'bad' connection back to the pool - conn2.done(); - - checkNewConns(assertGreaterThan, badCreationTime, 10); -} - -TEST_F(DummyServerFixture, InvalidateBadConnEvenWhenPoolIsFull) { - globalConnPool.setMaxPoolSize(2); - - ScopedDbConnection conn1(TARGET_HOST); - ScopedDbConnection conn2(TARGET_HOST); - ScopedDbConnection conn3(TARGET_HOST); - - conn1.done(); - conn3.done(); - - const uint64_t badCreationTime = curTimeMicros64(); - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::alwaysOn); - - try { - conn2->query("test.user", Query()); - } catch (const NetworkException&) { - } - - getGlobalFailPointRegistry()->getFailPoint("throwSockExcep")->setMode(FailPoint::off); - conn2.done(); - - checkNewConns(assertGreaterThan, badCreationTime, 2); -} - -TEST_F(DummyServerFixture, DontReturnConnGoneBadToPool) { - ScopedDbConnection conn1(TARGET_HOST); - - const uint64_t conn1CreationTime = conn1->getSockCreationMicroSec(); - - uint64_t conn2CreationTime = 0; - - { - ScopedDbConnection conn2(TARGET_HOST); - conn2CreationTime = conn2->getSockCreationMicroSec(); - - conn1.done(); - // conn2 gets out of scope without calling done() - } - - // conn2 should not have been put back into the pool but it should - // also not invalidate older connections since it didn't encounter - // a socket exception. - - ScopedDbConnection conn1Again(TARGET_HOST); - ASSERT_EQUALS(conn1CreationTime, conn1Again->getSockCreationMicroSec()); - - checkNewConns(assertNotEqual, conn2CreationTime, 10); - - conn1Again.done(); -} - -class DummyServiceEntryPointWithInternalClientInfoCheck final : public DummyServiceEntryPoint { -private: - void commandRequestHook(const OpMsgRequest& request) const final { - if (request.getCommandName() != "isMaster") { - // It's not an isMaster request. Nothing to do. - return; - } - - auto internalClientElem = request.body["internalClient"]; - ASSERT_EQ(internalClientElem.type(), BSONType::Object); - auto minWireVersionElem = internalClientElem.Obj()["minWireVersion"]; - auto maxWireVersionElem = internalClientElem.Obj()["maxWireVersion"]; - ASSERT_EQ(minWireVersionElem.type(), BSONType::NumberInt); - ASSERT_EQ(maxWireVersionElem.type(), BSONType::NumberInt); - ASSERT_EQ(minWireVersionElem.numberInt(), WireSpec::instance().outgoing.minWireVersion); - ASSERT_EQ(maxWireVersionElem.numberInt(), WireSpec::instance().outgoing.maxWireVersion); - } -}; - -class DummyServerFixtureWithInternalClientInfoCheck : public DummyServerFixture { -private: - std::unique_ptr<DummyServiceEntryPoint> makeServiceEntryPoint() const final { - return stdx::make_unique<DummyServiceEntryPointWithInternalClientInfoCheck>(); - } - - bool isInternalClient() const final { - return true; - } -}; - -TEST_F(DummyServerFixtureWithInternalClientInfoCheck, VerifyIsMasterRequestOnConnectionOpen) { - // The isMaster handshake will occur on connection open. The request is verified by the test - // fixture. - ScopedDbConnection conn(TARGET_HOST); - conn.done(); -} - -class DummyServiceEntryPointWithInternalClientMissingCheck final : public DummyServiceEntryPoint { -private: - void commandRequestHook(const OpMsgRequest& request) const final { - if (request.getCommandName() != "isMaster") { - // It's not an isMaster request. Nothing to do. - return; - } - - ASSERT_FALSE(request.body["internalClient"]); - } -}; - -class DummyServerFixtureWithInternalClientMissingCheck : public DummyServerFixture { -private: - std::unique_ptr<DummyServiceEntryPoint> makeServiceEntryPoint() const final { - return stdx::make_unique<DummyServiceEntryPointWithInternalClientMissingCheck>(); - } -}; - -TEST_F(DummyServerFixtureWithInternalClientMissingCheck, VerifyIsMasterRequestOnConnectionOpen) { - // The isMaster handshake will occur on connection open. The request is verified by the test - // fixture. - ScopedDbConnection conn(TARGET_HOST); - conn.done(); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/auth/authorization_manager_test.cpp b/src/mongo/db/auth/authorization_manager_test.cpp index ad15977de7e..90cd5a6b243 100644 --- a/src/mongo/db/auth/authorization_manager_test.cpp +++ b/src/mongo/db/auth/authorization_manager_test.cpp @@ -51,7 +51,6 @@ #include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/unittest.h" #include "mongo/util/map_util.h" -#include "mongo/util/net/message_port.h" #define ASSERT_NULL(EXPR) ASSERT_FALSE(EXPR) #define ASSERT_NON_NULL(EXPR) ASSERT_TRUE(EXPR) diff --git a/src/mongo/db/auth/authz_manager_external_state_local.cpp b/src/mongo/db/auth/authz_manager_external_state_local.cpp index 9bef6410790..56cd2e2969f 100644 --- a/src/mongo/db/auth/authz_manager_external_state_local.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_local.cpp @@ -42,6 +42,7 @@ #include "mongo/db/server_options.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/ssl_types.h" namespace mongo { diff --git a/src/mongo/db/auth/authz_manager_external_state_s.cpp b/src/mongo/db/auth/authz_manager_external_state_s.cpp index 53700bb9ac9..14fef957ec9 100644 --- a/src/mongo/db/auth/authz_manager_external_state_s.cpp +++ b/src/mongo/db/auth/authz_manager_external_state_s.cpp @@ -47,6 +47,7 @@ #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/ssl_types.h" #include "mongo/util/stringutils.h" namespace mongo { diff --git a/src/mongo/db/client.h b/src/mongo/db/client.h index 40e77ac04ab..99b172505e3 100644 --- a/src/mongo/db/client.h +++ b/src/mongo/db/client.h @@ -47,7 +47,6 @@ #include "mongo/transport/session.h" #include "mongo/util/concurrency/spin_lock.h" #include "mongo/util/decorable.h" -#include "mongo/util/net/abstract_message_port.h" #include "mongo/util/net/hostandport.h" namespace mongo { diff --git a/src/mongo/db/commands/authentication_commands.cpp b/src/mongo/db/commands/authentication_commands.cpp index 3787189f1c5..f701d2f760b 100644 --- a/src/mongo/db/commands/authentication_commands.cpp +++ b/src/mongo/db/commands/authentication_commands.cpp @@ -55,6 +55,7 @@ #include "mongo/util/concurrency/mutex.h" #include "mongo/util/log.h" #include "mongo/util/net/ssl_manager.h" +#include "mongo/util/net/ssl_types.h" #include "mongo/util/text.h" namespace mongo { diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 80f6629dec1..745d235926a 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -158,7 +158,7 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/fast_clock_source_factory.h" #include "mongo/util/log.h" -#include "mongo/util/net/listen.h" +#include "mongo/util/net/sock.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/ntservice.h" #include "mongo/util/options_parser/startup_options.h" diff --git a/src/mongo/db/initialize_server_global_state.cpp b/src/mongo/db/initialize_server_global_state.cpp index 9e222e45c48..17b17180073 100644 --- a/src/mongo/db/initialize_server_global_state.cpp +++ b/src/mongo/db/initialize_server_global_state.cpp @@ -65,7 +65,6 @@ #include "mongo/platform/process_id.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/listen.h" #include "mongo/util/net/ssl_manager.h" #include "mongo/util/processinfo.h" #include "mongo/util/quick_exit.h" @@ -366,7 +365,6 @@ MONGO_INITIALIZER(MungeUmask)(InitializerContext*) { #endif bool initializeServerGlobalState() { - Listener::globalTicketHolder.resize(serverGlobalParams.maxConns).transitional_ignore(); #ifndef _WIN32 if (!serverGlobalParams.noUnixSocket && !fs::is_directory(serverGlobalParams.socket)) { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 1a1f842b9d9..bb87c7312d7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -99,7 +99,6 @@ #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/hostandport.h" -#include "mongo/util/net/listen.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" diff --git a/src/mongo/db/server_options_helpers.cpp b/src/mongo/db/server_options_helpers.cpp index b3e25106d89..96275fb5083 100644 --- a/src/mongo/db/server_options_helpers.cpp +++ b/src/mongo/db/server_options_helpers.cpp @@ -54,7 +54,7 @@ #include "mongo/util/log.h" #include "mongo/util/map_util.h" #include "mongo/util/mongoutils/str.h" -#include "mongo/util/net/listen.h" // For DEFAULT_MAX_CONN +#include "mongo/util/net/sock.h" #include "mongo/util/net/ssl_options.h" #include "mongo/util/options_parser/startup_options.h" diff --git a/src/mongo/db/storage/mmap_v1/record_access_tracker.cpp b/src/mongo/db/storage/mmap_v1/record_access_tracker.cpp index 67e0f1fd79a..1d55d272efc 100644 --- a/src/mongo/db/storage/mmap_v1/record_access_tracker.cpp +++ b/src/mongo/db/storage/mmap_v1/record_access_tracker.cpp @@ -39,7 +39,6 @@ #include "mongo/stdx/memory.h" #include "mongo/util/clock_source.h" #include "mongo/util/debug_util.h" -#include "mongo/util/net/listen.h" #include "mongo/util/processinfo.h" namespace mongo { diff --git a/src/mongo/util/net/SConscript b/src/mongo/util/net/SConscript index 133ee826998..818a1344331 100644 --- a/src/mongo/util/net/SConscript +++ b/src/mongo/util/net/SConscript @@ -22,9 +22,7 @@ env.Library( env.Library( target='network', source=[ - "listen.cpp", "message.cpp", - "message_port.cpp", "op_msg.cpp", "private/socket_poll.cpp", "private/ssl_expiration.cpp", diff --git a/src/mongo/util/net/abstract_message_port.h b/src/mongo/util/net/abstract_message_port.h deleted file mode 100644 index d6db353cfa3..00000000000 --- a/src/mongo/util/net/abstract_message_port.h +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <vector> - -#include "mongo/config.h" -#include "mongo/logger/log_severity.h" -#include "mongo/stdx/functional.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/sockaddr.h" -#include "mongo/util/net/ssl_types.h" -#include "mongo/util/time_support.h" - -namespace mongo { - -class SSLManagerInterface; - -class AbstractMessagingPort { - MONGO_DISALLOW_COPYING(AbstractMessagingPort); - -protected: - AbstractMessagingPort() = default; - -public: - virtual ~AbstractMessagingPort() = default; - - using Tag = uint32_t; - - /** - * Used when closing sockets. This value will not close any tagged sockets. - */ - static const Tag kSkipAllMask = 0xffffffff; - - /** - * Sets the maximum amount of time (in ms) to wait for io operations to run. - */ - virtual void setTimeout(Milliseconds millis) = 0; - - /** - * Closes the underlying socket. - */ - virtual void shutdown() = 0; - - /** - * Sends a message and waits for a response. This is equivalent to calling `say` then `recv`. - */ - virtual bool call(const Message& toSend, Message& response) = 0; - - /** - * Reads the next message from the socket. - */ - virtual bool recv(Message& m) = 0; - - /** - * Sends the message (does not set headers). - */ - virtual void say(const Message& toSend) = 0; - - /** - * Sends the data over the socket. - */ - virtual void send(const char* data, int len, const char* context) = 0; - virtual void send(const std::vector<std::pair<char*, int>>& data, const char* context) = 0; - - /** - * Connect to the remote endpoint. - */ - virtual bool connect(SockAddr& farEnd) = 0; - - /** - * The remote endpoint. - */ - virtual HostAndPort remote() const = 0; - - /** - * The port of the remote endpoint. - */ - virtual unsigned remotePort() const = 0; - - /** - * The remote endpoint. - */ - virtual SockAddr remoteAddr() const = 0; - - /** - * The local endpoint. - */ - virtual SockAddr localAddr() const = 0; - - /** - * Whether or not this is still connected. - */ - virtual bool isStillConnected() const = 0; - - /** - * Point in time (in micro seconds) when this was created. - */ - virtual uint64_t getSockCreationMicroSec() const = 0; - - /** - * Sets the severity level for all logging. - */ - virtual void setLogLevel(logger::LogSeverity logLevel) = 0; - - /** - * Clear the io counters. - */ - virtual void clearCounters() = 0; - - /** - * The total number of bytes read (since initialization or clearing the counters). - */ - virtual long long getBytesIn() const = 0; - - /** - * The total number of bytes written (since initialization or clearing the counters). - */ - virtual long long getBytesOut() const = 0; - - /** - * Set the x509 peer information (used for SSL). - */ - virtual void setX509PeerInfo(SSLPeerInfo x509PeerInfo) = 0; - - /** - * Get the current x509 peer information (used for SSL). - */ - virtual const SSLPeerInfo& getX509PeerInfo() const = 0; - - /** - * Set the connection ID. - */ - virtual void setConnectionId(const long long connectionId) = 0; - - /** - * Get the connection ID. - */ - virtual long long connectionId() const = 0; - - /** - * Set the tag for this messaging port, used when closing with a mask. - * @see Listener::closeMessagingPorts() - */ - virtual void setTag(const Tag tag) = 0; - - /** - * Get the tag for this messaging port. - */ - virtual Tag getTag() const = 0; - - /** - * Initiates the TLS/SSL handshake on this AbstractMessagingPort. When this function returns, - * further communication on this AbstractMessagingPort will be encrypted. - * ssl - Pointer to the global SSLManager. - * remoteHost - The hostname of the remote server. - */ - virtual bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) = 0; -}; - -} // namespace mongo diff --git a/src/mongo/util/net/listen.cpp b/src/mongo/util/net/listen.cpp deleted file mode 100644 index c53e5a2a9b9..00000000000 --- a/src/mongo/util/net/listen.cpp +++ /dev/null @@ -1,684 +0,0 @@ -// listen.cpp - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/util/net/listen.h" - -#include <memory> -#include <vector> - -#include "mongo/base/owned_pointer_vector.h" -#include "mongo/base/status.h" -#include "mongo/config.h" -#include "mongo/db/server_options.h" -#include "mongo/db/service_context.h" -#include "mongo/stdx/memory.h" -#include "mongo/util/concurrency/idle_thread_block.h" -#include "mongo/util/exit.h" -#include "mongo/util/log.h" -#include "mongo/util/net/message_port.h" -#include "mongo/util/net/ssl_manager.h" -#include "mongo/util/net/ssl_options.h" -#include "mongo/util/scopeguard.h" - -#ifndef _WIN32 - -#ifndef __sun -#include <ifaddrs.h> -#endif -#include <sys/resource.h> -#include <sys/stat.h> - -#include <arpa/inet.h> -#include <errno.h> -#include <netdb.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <sys/un.h> -#ifdef __OpenBSD__ -#include <sys/uio.h> -#endif - -#else - -// errno doesn't work for winsock. -#undef errno -#define errno WSAGetLastError() - -#endif - -namespace mongo { - -namespace { -const auto getListener = ServiceContext::declareDecoration<Listener*>(); -} // namespace - -using std::shared_ptr; -using std::string; -using std::vector; - -// ----- Listener ------- - -vector<SockAddr> ipToAddrs(const char* ips, int port, bool useUnixSockets) { - vector<SockAddr> out; - if (*ips == '\0') { - out.push_back(SockAddr("127.0.0.1", port, AF_INET)); // IPv4 localhost - - if (IPv6Enabled()) - out.push_back(SockAddr("::1", port, AF_INET6)); // IPv6 localhost -#ifndef _WIN32 - if (useUnixSockets) - out.push_back(SockAddr(makeUnixSockPath(port), port, AF_UNIX)); // Unix socket -#endif - return out; - } - - while (*ips) { - string ip; - const char* comma = strchr(ips, ','); - if (comma) { - ip = string(ips, comma - ips); - ips = comma + 1; - } else { - ip = string(ips); - ips = ""; - } - - SockAddr sa(ip.c_str(), port, IPv6Enabled() ? AF_UNSPEC : AF_INET); - out.push_back(sa); - -#ifndef _WIN32 - if (sa.isValid() && useUnixSockets && - (sa.getAddr() == "127.0.0.1" || sa.getAddr() == "0.0.0.0")) // only IPv4 - out.push_back(SockAddr(makeUnixSockPath(port), port, AF_INET)); -#endif - } - return out; -} - -Listener* Listener::get(ServiceContext* ctx) { - return getListener(ctx); -} - -Listener::Listener(const string& name, - const string& ip, - int port, - ServiceContext* ctx, - bool setAsServiceCtxDecoration, - bool logConnect) - : _port(port), - _name(name), - _ip(ip), - _setupSocketsSuccessful(false), - _logConnect(logConnect), - _ready(false), - _ctx(ctx), - _setAsServiceCtxDecoration(setAsServiceCtxDecoration) { -#ifdef MONGO_CONFIG_SSL - _ssl = getSSLManager(); -#endif - if (setAsServiceCtxDecoration) { - auto& listener = getListener(ctx); - invariant(!listener); - listener = this; - } -} - -Listener::~Listener() { - if (_setAsServiceCtxDecoration) { - auto& listener = getListener(_ctx); - listener = nullptr; - } -} - -bool Listener::setupSockets() { - if (!_setAsServiceCtxDecoration) { - checkTicketNumbers(); - } - -#if !defined(_WIN32) - _mine = ipToAddrs(_ip.c_str(), _port, (!serverGlobalParams.noUnixSocket && useUnixSockets())); -#else - _mine = ipToAddrs(_ip.c_str(), _port, false); -#endif - - for (std::vector<SockAddr>::const_iterator it = _mine.begin(), end = _mine.end(); it != end; - ++it) { - const SockAddr& me = *it; - - if (!me.isValid()) { - error() << "listen(): socket is invalid."; - return _setupSocketsSuccessful; - } - - SOCKET sock = ::socket(me.getType(), SOCK_STREAM, 0); - ScopeGuard socketGuard = MakeGuard(&closesocket, sock); - massert(15863, - str::stream() << "listen(): invalid socket? " << errnoWithDescription(), - sock >= 0); - - if (me.getType() == AF_UNIX) { -#if !defined(_WIN32) - if (unlink(me.getAddr().c_str()) == -1) { - if (errno != ENOENT) { - error() << "Failed to unlink socket file " << me << " " - << errnoWithDescription(errno); - fassertFailedNoTrace(28578); - } - } -#endif - } else if (me.getType() == AF_INET6) { - // IPv6 can also accept IPv4 connections as mapped addresses (::ffff:127.0.0.1) - // That causes a conflict if we don't do set it to IPV6_ONLY - const int one = 1; - setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&one, sizeof(one)); - } - -#if !defined(_WIN32) - { - const int one = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0) - log() << "Failed to set socket opt, SO_REUSEADDR"; - } -#endif - - if (::bind(sock, me.raw(), me.addressSize) != 0) { - int x = errno; - error() << "listen(): bind() failed " << errnoWithDescription(x) - << " for socket: " << me.toString(); - if (x == EADDRINUSE) - error() << " addr already in use"; - return _setupSocketsSuccessful; - } - -#if !defined(_WIN32) - if (me.getType() == AF_UNIX) { - if (chmod(me.getAddr().c_str(), serverGlobalParams.unixSocketPermissions) == -1) { - error() << "Failed to chmod socket file " << me << " " - << errnoWithDescription(errno); - fassertFailedNoTrace(28582); - } - ListeningSockets::get()->addPath(me.getAddr()); - } -#endif - - _socks.push_back(sock); - socketGuard.Dismiss(); - } - - _setupSocketsSuccessful = true; - return _setupSocketsSuccessful; -} - - -#if !defined(_WIN32) -void Listener::initAndListen() { - if (!_setupSocketsSuccessful) { - return; - } - - SOCKET maxfd = 0; // needed for select() - for (unsigned i = 0; i < _socks.size(); i++) { - if (::listen(_socks[i], serverGlobalParams.listenBacklog) != 0) { - error() << "listen(): listen() failed " << errnoWithDescription(); - return; - } - - ListeningSockets::get()->add(_socks[i]); - - if (_socks[i] > maxfd) { - maxfd = _socks[i]; - } - } - - if (maxfd >= FD_SETSIZE) { - error() << "socket " << maxfd << " is higher than " << FD_SETSIZE - 1 << "; not supported" - << warnings; - return; - } - -#ifdef MONGO_CONFIG_SSL - _logListen(_port, _ssl); -#else - _logListen(_port, false); -#endif - - { - // Wake up any threads blocked in waitUntilListening() - stdx::lock_guard<stdx::mutex> lock(_readyMutex); - _ready = true; - _readyCondition.notify_all(); - } - - struct timeval maxSelectTime; - // The check against _finished allows us to actually stop the listener by signalling it through - // the _finished flag. - while (!globalInShutdownDeprecated() && !_finished.load()) { - fd_set fds[1]; - FD_ZERO(fds); - - for (vector<SOCKET>::iterator it = _socks.begin(), end = _socks.end(); it != end; ++it) { - FD_SET(*it, fds); - } - - maxSelectTime.tv_sec = 0; - maxSelectTime.tv_usec = 250000; - const int ret = [&] { - MONGO_IDLE_THREAD_BLOCK; - return select(maxfd + 1, fds, nullptr, nullptr, &maxSelectTime); - }(); - - if (ret == 0) { - continue; - } else if (ret < 0) { - int x = errno; -#ifdef EINTR - if (x == EINTR) { - log() << "select() signal caught, continuing"; - continue; - } -#endif - if (!globalInShutdownDeprecated()) - log() << "select() failure: ret=" << ret << " " << errnoWithDescription(x); - return; - } - - for (vector<SOCKET>::iterator it = _socks.begin(), end = _socks.end(); it != end; ++it) { - if (!(FD_ISSET(*it, fds))) - continue; - SockAddr from; - int s = accept(*it, from.raw(), &from.addressSize); - if (s < 0) { - int x = errno; // so no global issues - if (x == EBADF) { - log() << "Port " << _port << " is no longer valid"; - return; - } else if (x == ECONNABORTED) { - log() << "Connection on port " << _port << " aborted"; - continue; - } - if (x == 0 && globalInShutdownDeprecated()) { - return; // socket closed - } - if (!globalInShutdownDeprecated()) { - log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x); - if (x == EMFILE || x == ENFILE) { - // Connection still in listen queue but we can't accept it yet - error() << "Out of file descriptors. Waiting one second before trying to " - "accept more connections." - << warnings; - sleepsecs(1); - } - } - continue; - } - - long long myConnectionNumber = globalConnectionNumber.addAndFetch(1); - - if (_logConnect && !_setAsServiceCtxDecoration && !serverGlobalParams.quiet.load()) { - int conns = globalTicketHolder.used() + 1; - const char* word = (conns == 1 ? " connection" : " connections"); - log() << "connection accepted from " << from.toString() << " #" - << myConnectionNumber << " (" << conns << word << " now open)"; - } - - if (from.getType() != AF_UNIX) - disableNagle(s); - -#ifdef SO_NOSIGPIPE - // ignore SIGPIPE signals on osx, to avoid process exit - const int one = 1; - setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof(int)); -#endif - - std::shared_ptr<Socket> pnewSock(new Socket(s, from)); -#ifdef MONGO_CONFIG_SSL - if (_ssl) { - pnewSock->secureAccepted(_ssl); - } -#endif - _accepted(pnewSock, myConnectionNumber); - } - } -} - -#else -// Windows - -// Given a SOCKET, turns off nonblocking mode -static void disableNonblockingMode(SOCKET socket) { - unsigned long resultBuffer = 0; - unsigned long resultBufferBytesWritten = 0; - unsigned long newNonblockingEnabled = 0; - const int status = WSAIoctl(socket, - FIONBIO, - &newNonblockingEnabled, - sizeof(unsigned long), - &resultBuffer, - sizeof(resultBuffer), - &resultBufferBytesWritten, - NULL, - NULL); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAIoctl returned " << errnoWithDescription(mongo_errno); - fassertFailed(16726); - } -} - -// RAII wrapper class to ensure we do not leak WSAEVENTs. -class EventHolder { - WSAEVENT _socketEventHandle; - -public: - EventHolder() { - _socketEventHandle = WSACreateEvent(); - if (_socketEventHandle == WSA_INVALID_EVENT) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSACreateEvent returned " << errnoWithDescription(mongo_errno); - fassertFailed(16728); - } - } - ~EventHolder() { - BOOL bstatus = WSACloseEvent(_socketEventHandle); - if (bstatus == FALSE) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSACloseEvent returned " << errnoWithDescription(mongo_errno); - fassertFailed(16725); - } - } - WSAEVENT get() { - return _socketEventHandle; - } -}; - -void Listener::initAndListen() { - if (!_setupSocketsSuccessful) { - return; - } - - for (unsigned i = 0; i < _socks.size(); i++) { - if (::listen(_socks[i], serverGlobalParams.listenBacklog) != 0) { - error() << "listen(): listen() failed " << errnoWithDescription(); - return; - } - - ListeningSockets::get()->add(_socks[i]); - } - -#ifdef MONGO_CONFIG_SSL - _logListen(_port, _ssl); -#else - _logListen(_port, false); -#endif - - { - // Wake up any threads blocked in waitUntilListening() - stdx::lock_guard<stdx::mutex> lock(_readyMutex); - _ready = true; - _readyCondition.notify_all(); - } - - std::vector<std::unique_ptr<EventHolder>> eventHolders; - std::unique_ptr<WSAEVENT[]> events(new WSAEVENT[_socks.size()]); - - - // Populate events array with an event for each socket we are watching - for (size_t count = 0; count < _socks.size(); ++count) { - auto ev = stdx::make_unique<EventHolder>(); - eventHolders.push_back(std::move(ev)); - events[count] = eventHolders.back()->get(); - } - - // The check against _finished allows us to actually stop the listener by signalling it through - // the _finished flag. - while (!globalInShutdownDeprecated() && !_finished.load()) { - // Turn on listening for accept-ready sockets - for (size_t count = 0; count < _socks.size(); ++count) { - int status = WSAEventSelect(_socks[count], events[count], FD_ACCEPT | FD_CLOSE); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - - // We may fail to listen on the socket if it has - // already been closed. If we are not in shutdown, - // that is possibly interesting, so log an error. - if (!globalInShutdownDeprecated()) { - error() << "Windows WSAEventSelect returned " - << errnoWithDescription(mongo_errno); - } - - return; - } - } - - // Wait till one of them goes active, or we time out - DWORD result = WSAWaitForMultipleEvents(_socks.size(), - events.get(), - FALSE, // don't wait for all the events - 250, // timeout, in ms - FALSE); // do not allow I/O interruptions - - if (result == WSA_WAIT_TIMEOUT) { - continue; - } else if (result == WSA_WAIT_FAILED) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAWaitForMultipleEvents returned " - << errnoWithDescription(mongo_errno); - fassertFailed(16723); - } - - // Determine which socket is ready - DWORD eventIndex = result - WSA_WAIT_EVENT_0; - WSANETWORKEVENTS networkEvents; - // Extract event details, and clear event for next pass - int status = WSAEnumNetworkEvents(_socks[eventIndex], events[eventIndex], &networkEvents); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAEnumNetworkEvents returned " - << errnoWithDescription(mongo_errno); - continue; - } - - if (networkEvents.lNetworkEvents & FD_CLOSE) { - log() << "listen socket closed"; - break; - } - - if (!(networkEvents.lNetworkEvents & FD_ACCEPT)) { - error() << "Unexpected network event: " << networkEvents.lNetworkEvents; - continue; - } - - int iec = networkEvents.iErrorCode[FD_ACCEPT_BIT]; - if (iec != 0) { - error() << "Windows socket accept did not work:" << errnoWithDescription(iec); - continue; - } - - status = WSAEventSelect(_socks[eventIndex], NULL, 0); - if (status == SOCKET_ERROR) { - const int mongo_errno = WSAGetLastError(); - error() << "Windows WSAEventSelect returned " << errnoWithDescription(mongo_errno); - continue; - } - - disableNonblockingMode(_socks[eventIndex]); - - SockAddr from; - int s = accept(_socks[eventIndex], from.raw(), &from.addressSize); - if (s < 0) { - int x = errno; // so no global issues - if (x == EBADF) { - log() << "Port " << _port << " is no longer valid"; - continue; - } else if (x == ECONNABORTED) { - log() << "Listener on port " << _port << " aborted"; - continue; - } - if (x == 0 && globalInShutdownDeprecated()) { - return; // socket closed - } - if (!globalInShutdownDeprecated()) { - log() << "Listener: accept() returns " << s << " " << errnoWithDescription(x); - if (x == EMFILE || x == ENFILE) { - // Connection still in listen queue but we can't accept it yet - error() << "Out of file descriptors. Waiting one second before" - " trying to accept more connections." - << warnings; - sleepsecs(1); - } - } - continue; - } - - long long myConnectionNumber = globalConnectionNumber.addAndFetch(1); - - if (_logConnect && !_setAsServiceCtxDecoration && !serverGlobalParams.quiet.load()) { - int conns = globalTicketHolder.used() + 1; - const char* word = (conns == 1 ? " connection" : " connections"); - log() << "connection accepted from " << from.toString() << " #" << myConnectionNumber - << " (" << conns << word << " now open)"; - } - - if (from.getType() != AF_UNIX) - disableNagle(s); - - std::shared_ptr<Socket> pnewSock(new Socket(s, from)); -#ifdef MONGO_CONFIG_SSL - if (_ssl) { - pnewSock->secureAccepted(_ssl); - } -#endif - _accepted(pnewSock, myConnectionNumber); - } -} -#endif - -void Listener::_logListen(int port, bool ssl) { - log() << _name << (_name.size() ? " " : "") << "waiting for connections on port " << port - << (ssl ? " ssl" : ""); -} - -void Listener::waitUntilListening() const { - stdx::unique_lock<stdx::mutex> lock(_readyMutex); - while (!_ready) { - _readyCondition.wait(lock); - } -} - -void Listener::_accepted(const std::shared_ptr<Socket>& psocket, long long connectionId) { - std::unique_ptr<AbstractMessagingPort> port; - port = stdx::make_unique<MessagingPort>(psocket); - port->setConnectionId(connectionId); - accepted(std::move(port)); -} - -// ----- ListeningSockets ------- - -ListeningSockets* ListeningSockets::_instance = new ListeningSockets(); - -ListeningSockets* ListeningSockets::get() { - return _instance; -} - -// ------ connection ticket and control ------ - -int getMaxConnections() { -#ifdef _WIN32 - return DEFAULT_MAX_CONN; -#else - struct rlimit limit; - verify(getrlimit(RLIMIT_NOFILE, &limit) == 0); - - int max = (int)(limit.rlim_cur * .8); - - LOG(1) << "fd limit" - << " hard:" << limit.rlim_max << " soft:" << limit.rlim_cur << " max conn: " << max; - - return max; -#endif -} - -void Listener::checkTicketNumbers() { - int want = getMaxConnections(); - int current = globalTicketHolder.outof(); - if (current != DEFAULT_MAX_CONN) { - if (current < want) { - // they want fewer than they can handle - // which is fine - LOG(1) << " only allowing " << current << " connections"; - return; - } - if (current > want) { - log() << " --maxConns too high, can only handle " << want; - } - } - globalTicketHolder.resize(want).transitional_ignore(); -} - -void Listener::shutdown() { - _finished.store(true); -} - -TicketHolder Listener::globalTicketHolder(DEFAULT_MAX_CONN); -AtomicInt64 Listener::globalConnectionNumber; - -void ListeningSockets::closeAll() { - std::set<int>* sockets; - std::set<std::string>* paths; - - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - sockets = _sockets; - _sockets = new std::set<int>(); - paths = _socketPaths; - _socketPaths = new std::set<std::string>(); - } - - for (std::set<int>::iterator i = sockets->begin(); i != sockets->end(); i++) { - int sock = *i; - log() << "closing listening socket: " << sock; - closesocket(sock); - } - delete sockets; - - for (std::set<std::string>::iterator i = paths->begin(); i != paths->end(); i++) { - std::string path = *i; - log() << "removing socket file: " << path; - ::remove(path.c_str()); - } - delete paths; -} -} diff --git a/src/mongo/util/net/listen.h b/src/mongo/util/net/listen.h deleted file mode 100644 index eb473bae009..00000000000 --- a/src/mongo/util/net/listen.h +++ /dev/null @@ -1,149 +0,0 @@ -// listen.h - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include <set> -#include <string> -#include <vector> - -#include "mongo/config.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" -#include "mongo/util/concurrency/ticketholder.h" -#include "mongo/util/net/abstract_message_port.h" -#include "mongo/util/net/sock.h" - -namespace mongo { - -class ServiceContext; - -class Listener { - MONGO_DISALLOW_COPYING(Listener); - -public: - /** Obtain the Listener for a provided ServiceContext. */ - static Listener* get(ServiceContext* context); - - Listener(const std::string& name, - const std::string& ip, - int port, - ServiceContext* ctx, - bool setAsServiceCtxDecoration, - bool logConnect = true); - - virtual ~Listener(); - - void initAndListen(); // never returns unless error (start a thread) - - /* spawn a thread, etc., then return */ - virtual void accepted(std::unique_ptr<AbstractMessagingPort> mp) = 0; - - const int _port; - - /** - * Allocate sockets for the listener and set _setupSocketsSuccessful to true - * iff the process was successful. - * Returns _setupSocketsSuccessful. - */ - bool setupSockets(); - - /** - * Blocks until initAndListen has been called on this instance and gotten far enough that - * it is ready to receive incoming network requests. - */ - void waitUntilListening() const; - - void shutdown(); - -private: - std::vector<SockAddr> _mine; - std::vector<SOCKET> _socks; - std::string _name; - std::string _ip; - bool _setupSocketsSuccessful; - bool _logConnect; - mutable stdx::mutex _readyMutex; // Protects _ready - mutable stdx::condition_variable _readyCondition; // Used to wait for changes to _ready - // Boolean that indicates whether this Listener is ready to accept incoming network requests - bool _ready; - AtomicBool _finished{false}; - - ServiceContext* _ctx; - bool _setAsServiceCtxDecoration; - - virtual void _accepted(const std::shared_ptr<Socket>& psocket, long long connectionId); - -#ifdef MONGO_CONFIG_SSL - SSLManagerInterface* _ssl; -#endif - - void _logListen(int port, bool ssl); - - virtual bool useUnixSockets() const { - return false; - } - -public: - /** the "next" connection number. every connection to this process has a unique number */ - static AtomicInt64 globalConnectionNumber; - - /** keeps track of how many allowed connections there are and how many are being used*/ - static TicketHolder globalTicketHolder; - - /** makes sure user input is sane */ - static void checkTicketNumbers(); -}; - -class ListeningSockets { -public: - ListeningSockets() : _sockets(new std::set<int>()), _socketPaths(new std::set<std::string>()) {} - void add(int sock) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _sockets->insert(sock); - } - void addPath(const std::string& path) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _socketPaths->insert(path); - } - void remove(int sock) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _sockets->erase(sock); - } - void closeAll(); - static ListeningSockets* get(); - -private: - stdx::mutex _mutex; - std::set<int>* _sockets; - std::set<std::string>* _socketPaths; // for unix domain sockets - static ListeningSockets* _instance; -}; -} diff --git a/src/mongo/util/net/message_port.cpp b/src/mongo/util/net/message_port.cpp deleted file mode 100644 index 504d61c8eeb..00000000000 --- a/src/mongo/util/net/message_port.cpp +++ /dev/null @@ -1,239 +0,0 @@ -// message_port.cpp - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork - -#include "mongo/platform/basic.h" - -#include "mongo/util/net/message_port.h" - -#include <fcntl.h> -#include <time.h> - -#include "mongo/config.h" -#include "mongo/util/allocator.h" -#include "mongo/util/background.h" -#include "mongo/util/log.h" -#include "mongo/util/net/listen.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/socket_exception.h" -#include "mongo/util/net/ssl_manager.h" -#include "mongo/util/net/ssl_options.h" -#include "mongo/util/scopeguard.h" -#include "mongo/util/time_support.h" - -#ifndef _WIN32 -#ifndef __sun -#include <ifaddrs.h> -#endif -#include <sys/resource.h> -#include <sys/stat.h> -#endif - -namespace mongo { - -using std::shared_ptr; -using std::string; - -/* messagingport -------------------------------------------------------------- */ - -MessagingPort::MessagingPort(int fd, const SockAddr& remote) - : MessagingPort(std::make_shared<Socket>(fd, remote)) {} - -MessagingPort::MessagingPort(double timeout, logger::LogSeverity ll) - : MessagingPort(std::make_shared<Socket>(timeout, ll)) {} - -MessagingPort::MessagingPort(std::shared_ptr<Socket> sock) - : _x509PeerInfo(), _connectionId(), _tag(), _psock(std::move(sock)) { - SockAddr sa = _psock->remoteAddr(); - _remoteParsed = HostAndPort(sa.getAddr(), sa.getPort()); -} - -void MessagingPort::setTimeout(Milliseconds millis) { - double timeout = double(millis.count()) / 1000; - _psock->setTimeout(timeout); -} - -void MessagingPort::shutdown() { - _psock->close(); -} - -MessagingPort::~MessagingPort() { - shutdown(); -} - -bool MessagingPort::recv(Message& m) { - try { -#ifdef MONGO_CONFIG_SSL - again: -#endif - MSGHEADER::Value header; - _psock->recv((char*)&header, sizeof(header)); - int len = header.constView().getMessageLength(); - - if (len == 542393671) { - // an http GET - string msg = - "It looks like you are trying to access MongoDB over HTTP on the native driver " - "port.\n"; - LOG(_psock->getLogLevel()) << msg; - std::stringstream ss; - ss << "HTTP/1.0 200 OK\r\nConnection: close\r\nContent-Type: " - "text/plain\r\nContent-Length: " - << msg.size() << "\r\n\r\n" - << msg; - string s = ss.str(); - send(s.c_str(), s.size(), "http"); - return false; - } - // If responseTo is not 0 or -1 for first packet assume SSL - else if (_psock->isAwaitingHandshake()) { -#ifndef MONGO_CONFIG_SSL - if (header.constView().getResponseToMsgId() != 0 && - header.constView().getResponseToMsgId() != -1) { - uasserted(17133, - "SSL handshake requested, SSL feature not available in this build"); - } -#else - if (header.constView().getResponseToMsgId() != 0 && - header.constView().getResponseToMsgId() != -1) { - uassert(17132, - "SSL handshake received but server is started without SSL support", - sslGlobalParams.sslMode.load() != SSLParams::SSLMode_disabled); - setX509PeerInfo( - _psock->doSSLHandshake(reinterpret_cast<const char*>(&header), sizeof(header))); - LOG(1) << "new ssl connection, SNI server name [" << _psock->getSNIServerName() - << "]"; - _psock->setHandshakeReceived(); - - goto again; - } - - auto sslMode = sslGlobalParams.sslMode.load(); - - uassert(17189, - "The server is configured to only allow SSL connections", - sslMode != SSLParams::SSLMode_requireSSL); - - // For users attempting to upgrade their applications from no SSL to SSL, provide - // information about connections that still aren't using SSL (but only once per - // connection) - if (!sslGlobalParams.disableNonSSLConnectionLogging && - (sslMode == SSLParams::SSLMode_preferSSL)) { - LOG(0) << "SSL mode is set to 'preferred' and connection " << _connectionId - << " to " << remote() << " is not using SSL."; - } - -#endif // MONGO_CONFIG_SSL - } - if (static_cast<size_t>(len) < sizeof(header) || - static_cast<size_t>(len) > MaxMessageSizeBytes) { - LOG(0) << "recv(): message len " << len << " is invalid. " - << "Min " << sizeof(header) << " Max: " << MaxMessageSizeBytes; - return false; - } - - _psock->setHandshakeReceived(); - - auto buf = SharedBuffer::allocate(len); - MsgData::View md = buf.get(); - memcpy(md.view2ptr(), &header, sizeof(header)); - - const int left = len - sizeof(header); - if (left) - _psock->recv(md.data(), left); - - m.setData(std::move(buf)); - return true; - - } catch (const NetworkException& e) { - LOG(_psock->getLogLevel()) << "NetworkException: remote: " << remote() << " error: " << e; - m.reset(); - return false; - } -} - -bool MessagingPort::call(const Message& toSend, Message& response) { - say(toSend); - bool success = recv(response); - if (success) { - invariant(!response.empty()); - if (response.header().getResponseToMsgId() != toSend.header().getId()) { - response.reset(); - uasserted(40134, "Response ID did not match the sent message ID."); - } - } - return success; -} - -void MessagingPort::say(const Message& toSend) { - invariant(!toSend.empty()); - auto buf = toSend.buf(); - if (buf) { - send(buf, MsgData::ConstView(buf).getLen(), "say"); - } -} - -HostAndPort MessagingPort::remote() const { - return _remoteParsed; -} - -SockAddr MessagingPort::remoteAddr() const { - return _psock->remoteAddr(); -} - -SockAddr MessagingPort::localAddr() const { - return _psock->localAddr(); -} - -void MessagingPort::setX509PeerInfo(SSLPeerInfo x509PeerInfo) { - _x509PeerInfo = std::move(x509PeerInfo); -} - -const SSLPeerInfo& MessagingPort::getX509PeerInfo() const { - return _x509PeerInfo; -} - -void MessagingPort::setConnectionId(const long long connectionId) { - _connectionId = connectionId; -} - -long long MessagingPort::connectionId() const { - return _connectionId; -} - -void MessagingPort::setTag(const AbstractMessagingPort::Tag tag) { - _tag = tag; -} - -AbstractMessagingPort::Tag MessagingPort::getTag() const { - return _tag; -} - -} // namespace mongo diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h deleted file mode 100644 index 4744451db5f..00000000000 --- a/src/mongo/util/net/message_port.h +++ /dev/null @@ -1,145 +0,0 @@ -// message_port.h - -/* Copyright 2009 10gen Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include <vector> - -#include "mongo/config.h" -#include "mongo/util/net/abstract_message_port.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/sock.h" - -namespace mongo { - -class MessagingPort; - -class MessagingPort final : public AbstractMessagingPort { -public: - MessagingPort(int fd, const SockAddr& remote); - - // in some cases the timeout will actually be 2x this value - eg we do a partial send, - // then the timeout fires, then we try to send again, then the timeout fires again with - // no data sent, then we detect that the other side is down - MessagingPort(double so_timeout = 0, logger::LogSeverity logLevel = logger::LogSeverity::Log()); - - MessagingPort(std::shared_ptr<Socket> socket); - - ~MessagingPort() override; - - void setTimeout(Milliseconds millis) override; - - void shutdown() override; - - /* it's assumed if you reuse a message object, that it doesn't cross MessagingPort's. - also, the Message data will go out of scope on the subsequent recv call. - */ - bool recv(Message& m) override; - bool call(const Message& toSend, Message& response) override; - - void say(const Message& toSend) override; - - unsigned remotePort() const override { - return _psock->remotePort(); - } - virtual HostAndPort remote() const override; - virtual SockAddr remoteAddr() const override; - virtual SockAddr localAddr() const override; - - void send(const char* data, int len, const char* context) override { - _psock->send(data, len, context); - } - void send(const std::vector<std::pair<char*, int>>& data, const char* context) override { - _psock->send(data, context); - } - bool connect(SockAddr& farEnd) override { - return _psock->connect(farEnd); - } - - void setLogLevel(logger::LogSeverity ll) override { - _psock->setLogLevel(ll); - } - - void clearCounters() override { - _psock->clearCounters(); - } - - long long getBytesIn() const override { - return _psock->getBytesIn(); - } - - long long getBytesOut() const override { - return _psock->getBytesOut(); - } - - void setX509PeerInfo(SSLPeerInfo x509PeerInfo) override; - - const SSLPeerInfo& getX509PeerInfo() const override; - - void setConnectionId(const long long connectionId) override; - - long long connectionId() const override; - - void setTag(const AbstractMessagingPort::Tag tag) override; - - AbstractMessagingPort::Tag getTag() const override; - - /** - * Initiates the TLS/SSL handshake on this MessagingPort. - * When this function returns, further communication on this - * MessagingPort will be encrypted. - * ssl - Pointer to the global SSLManager. - * remoteHost - The hostname of the remote server. - */ - bool secure(SSLManagerInterface* ssl, const std::string& remoteHost) override { -#ifdef MONGO_CONFIG_SSL - return _psock->secure(ssl, remoteHost); -#else - return false; -#endif - } - - bool isStillConnected() const override { - return _psock->isStillConnected(); - } - - uint64_t getSockCreationMicroSec() const override { - return _psock->getSockCreationMicroSec(); - } - -private: - // this is the parsed version of remote - HostAndPort _remoteParsed; - SSLPeerInfo _x509PeerInfo; - long long _connectionId; - AbstractMessagingPort::Tag _tag; - std::shared_ptr<Socket> _psock; -}; - -} // namespace mongo diff --git a/src/mongo/util/tcmalloc_server_status_section.cpp b/src/mongo/util/tcmalloc_server_status_section.cpp index f420dc055f4..68513ef842b 100644 --- a/src/mongo/util/tcmalloc_server_status_section.cpp +++ b/src/mongo/util/tcmalloc_server_status_section.cpp @@ -43,7 +43,6 @@ #include "mongo/transport/service_entry_point.h" #include "mongo/transport/thread_idle_callback.h" #include "mongo/util/log.h" -#include "mongo/util/net/listen.h" namespace mongo { |