diff options
author | Mathias Stearn <mathias@10gen.com> | 2018-09-04 18:01:14 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2018-09-13 12:36:12 -0400 |
commit | 63868859a3213eae8e749c1f361eaac813083f59 (patch) | |
tree | 6d3ba8957515d23fc49bd2e45d180741dc69118d | |
parent | f4d62c2ba9a27dc03663779d0817bc399ab2e91f (diff) | |
download | mongo-63868859a3213eae8e749c1f361eaac813083f59.tar.gz |
SERVER-36468 Add a mechanism to allow breaking DBClientConnection out of a blocking operation
-rw-r--r-- | buildscripts/resmokeconfig/suites/integration_tests_replset.yml | 2 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/integration_tests_sharded.yml | 1 | ||||
-rw-r--r-- | src/mongo/client/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/client/connection_pool.cpp | 6 | ||||
-rw-r--r-- | src/mongo/client/connection_pool.h | 2 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection.cpp | 63 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection.h | 33 | ||||
-rw-r--r-- | src/mongo/client/dbclient_connection_integration_test.cpp | 104 |
8 files changed, 193 insertions, 30 deletions
diff --git a/buildscripts/resmokeconfig/suites/integration_tests_replset.yml b/buildscripts/resmokeconfig/suites/integration_tests_replset.yml index f0f62a7d6dc..2337449c4f5 100644 --- a/buildscripts/resmokeconfig/suites/integration_tests_replset.yml +++ b/buildscripts/resmokeconfig/suites/integration_tests_replset.yml @@ -2,6 +2,8 @@ test_kind: cpp_integration_test selector: root: build/integration_tests.txt + exclude_files: + - build/integration_tests/dbclient_connection_integration_test* # Needs connection to single host. executor: archive: diff --git a/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml b/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml index f931947b658..181fab0e990 100644 --- a/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml +++ b/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml @@ -4,6 +4,7 @@ selector: root: build/integration_tests.txt exclude_files: - build/integration_tests/network_interface_asio_integration_test* + - build/integration_tests/dbclient_connection_integration_test* # Needs sleep command executor: archive: diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index fe3cde63e2f..f6c04996994 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -217,6 +217,18 @@ env.CppIntegrationTest( ], ) +env.CppIntegrationTest( + target='dbclient_connection_integration_test', + source=[ + 'dbclient_connection_integration_test.cpp', + ], + LIBDEPS=[ + 'clientdriver_network', + '$BUILD_DIR/mongo/transport/transport_layer_egress_init', + '$BUILD_DIR/mongo/util/version_impl', + ], +) + env.Library( target='async_client', source=[ diff --git a/src/mongo/client/connection_pool.cpp b/src/mongo/client/connection_pool.cpp index c02ab68ed5d..703099e746e 100644 --- a/src/mongo/client/connection_pool.cpp +++ b/src/mongo/client/connection_pool.cpp @@ -92,20 +92,20 @@ void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostCo } } -bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const { +bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) { const Date_t expirationDate = connInfo.creationDate + kMaxConnectionAge; if (expirationDate <= now) { return false; } - return true; + return !connInfo.conn->isFailed(); } void ConnectionPool::closeAllInUseConnections() { stdx::lock_guard<stdx::mutex> lk(_mutex); for (ConnectionList::iterator iter = _inUseConnections.begin(); iter != _inUseConnections.end(); ++iter) { - iter->conn->shutdown(); + iter->conn->shutdownAndDisallowReconnect(); } } diff --git a/src/mongo/client/connection_pool.h b/src/mongo/client/connection_pool.h index bd461f1bedb..79ed4f3b1b7 100644 --- a/src/mongo/client/connection_pool.h +++ b/src/mongo/client/connection_pool.h @@ -169,7 +169,7 @@ private: /** * Returns true if the given connection is young enough to keep in the pool. */ - bool _shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const; + static bool _shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo); /** * Apply cleanup policy to any host(s) not active in the last kCleanupInterval milliseconds. diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp index 870e98f1920..71fb7ca9a98 100644 --- a/src/mongo/client/dbclient_connection.cpp +++ b/src/mongo/client/dbclient_connection.cpp @@ -273,6 +273,13 @@ Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) { _serverAddress = serverAddress; _markFailed(kReleaseSession); + + if (_stayFailed.load()) { + // This is just an optimization so we don't waste time connecting just to throw it away. + // The check below is the one that is important for correctness. + return makeSocketError(SocketErrorKind::FAILED_STATE, toString()); + } + if (serverAddress.host().empty()) { return Status(ErrorCodes::InvalidOptions, str::stream() << "couldn't connect to server " << _serverAddress.toString() @@ -311,12 +318,20 @@ Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) { << sws.getStatus()); } - _session = std::move(sws.getValue()); + { + stdx::lock_guard<stdx::mutex> lk(_sessionMutex); + if (_stayFailed.load()) { + // This object is still in a failed state. The session we just created will be destroyed + // immediately since we aren't holding on to it. + return makeSocketError(SocketErrorKind::FAILED_STATE, toString()); + } + _session = std::move(sws.getValue()); + _failed.store(false); + } _sessionCreationMicros = curTimeMicros64(); _lastConnectivityCheck = Date_t::now(); _session->setTimeout(_socketTimeout); _session->setTags(_tagMask); - _failed = false; LOG(1) << "connected to server " << toString(); return Status::OK(); } @@ -365,12 +380,15 @@ rpc::UniqueReply DBClientConnection::parseCommandReplyMessage(const std::string& } void DBClientConnection::_markFailed(FailAction action) { - _failed = true; + _failed.store(true); if (_session) { if (action == kEndSession) { _session->end(); } else if (action == kReleaseSession) { - _session.reset(); + transport::SessionHandle destroyedOutsideMutex; + + stdx::lock_guard<stdx::mutex> lk(_sessionMutex); + _session.swap(destroyedOutsideMutex); } } } @@ -379,13 +397,19 @@ bool DBClientConnection::isStillConnected() { // This method tries to figure out whether the connection is still open, but with several // caveats. - // If we don't have a _session then we may have hit an error, or we may just not have - // connected yet - the _failed flag should indicate which. - // - // Otherwise, return false if we know we've had an error (_failed is true) - if (!_session) { - return !_failed; - } else if (_failed) { + // If we don't have a _session then we are definitely not connected. If we've been marked failed + // then we are supposed to pretend that we aren't connected, even though we may be. + // HOWEVER, some unit tests have poorly designed mocks that never populate _session, even when + // the DBClientConnection should be considered healthy and connected. + + if (_stayFailed.load()) { + // Ensures there is no chance that a perma-failed connection can go back into a pool. + return false; + } else if (!_session) { + // This should always return false in practice, but needs to do this to work around poorly + // designed mocks as described above. + return !_failed.load(); + } else if (_failed.load()) { return false; } @@ -400,7 +424,11 @@ bool DBClientConnection::isStillConnected() { // This will poll() the underlying socket and do a 1 byte recv to see if the connection // has been closed. - return _session->isConnected(); + if (_session->isConnected()) + return true; + + _markFailed(kSetFlag); + return false; } void DBClientConnection::setTags(transport::Session::TagMask tags) { @@ -410,13 +438,14 @@ void DBClientConnection::setTags(transport::Session::TagMask tags) { _session->setTags(tags); } -void DBClientConnection::shutdown() { +void DBClientConnection::shutdownAndDisallowReconnect() { + stdx::lock_guard<stdx::mutex> lk(_sessionMutex); + _stayFailed.store(true); _markFailed(kEndSession); } void DBClientConnection::_checkConnection() { - if (!_failed) - return; + dassert(_failed.load()); // only called when in failed state. if (!autoReconnect) throwSocketError(SocketErrorKind::FAILED_STATE, toString()); @@ -426,7 +455,6 @@ void DBClientConnection::_checkConnection() { LOG(_logLevel) << "trying reconnect to " << toString() << endl; string errmsg; - _failed = false; auto connectStatus = connect(_serverAddress, _applicationName); if (!connectStatus.isOK()) { _markFailed(kSetFlag); @@ -520,8 +548,7 @@ DBClientConnection::DBClientConnection(bool _autoReconnect, double so_timeout, MongoURI uri, const HandshakeValidationHook& hook) - : _failed(false), - autoReconnect(_autoReconnect), + : autoReconnect(_autoReconnect), autoReconnectBackoff(1000, 2000), _hook(hook), _uri(std::move(uri)) { diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h index 7ccceb07590..3977d03e279 100644 --- a/src/mongo/client/dbclient_connection.h +++ b/src/mongo/client/dbclient_connection.h @@ -48,6 +48,7 @@ #include "mongo/rpc/protocol.h" #include "mongo/rpc/unique_message.h" #include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer.h" @@ -63,9 +64,13 @@ class DBClientCursor; class DBClientCursorBatchIterator; /** - A basic connection to the database. - This is the main entry point for talking to a simple Mongo setup -*/ + * A basic connection to the database. + * This is the main entry point for talking to a simple Mongo setup + * + * In general, this type is only allowed to be used from one thread at a time. As a special + * exception, it is legal to call shutdownAndDisallowReconnect() from any thread as a way to + * interrupt the owning thread. + */ class DBClientConnection : public DBClientBase { public: using DBClientBase::query; @@ -177,14 +182,20 @@ public: a connection will transition back to an ok state after reconnecting. */ bool isFailed() const override { - return _failed; + return _failed.load(); } bool isStillConnected() override; void setTags(transport::Session::TagMask tag); - void shutdown(); + /** + * Causes an error to be reported the next time the connection is used. Will interrupt + * operations if they are currently blocked waiting for the network. + * + * This is the only method that is allowed to be called from other threads. + */ + void shutdownAndDisallowReconnect(); void setWireVersions(int minWireVersion, int maxWireVersion) { _minWireVersion = minWireVersion; @@ -204,7 +215,7 @@ public: ss << _serverAddress; if (!_resolvedAddress.empty()) ss << " (" << _resolvedAddress << ")"; - if (_failed) + if (_failed.load()) ss << " failed"; return ss.str(); } @@ -256,7 +267,7 @@ public: // throws a NetworkException if in failed state and not reconnecting or if waiting to reconnect void checkConnection() override { - if (_failed) + if (_failed.load()) _checkConnection(); } @@ -276,13 +287,19 @@ protected: void _auth(const BSONObj& params) override; + // The session mutex must be held to shutdown the _session from a non-owning thread, or to + // rebind the handle from the owning thread. The thread that owns this DBClientConnection is + // allowed to use the _session without locking the mutex. This mutex also guards writes to + // _stayFailed, although reads are allowed outside the mutex. + stdx::mutex _sessionMutex; transport::SessionHandle _session; boost::optional<Milliseconds> _socketTimeout; transport::Session::TagMask _tagMask = transport::Session::kEmptyTagMask; uint64_t _sessionCreationMicros = INVALID_SOCK_CREATION_TIME; Date_t _lastConnectivityCheck; - bool _failed = false; + AtomicBool _stayFailed{false}; + AtomicBool _failed{false}; const bool autoReconnect; Backoff autoReconnectBackoff; diff --git a/src/mongo/client/dbclient_connection_integration_test.cpp b/src/mongo/client/dbclient_connection_integration_test.cpp new file mode 100644 index 00000000000..926ac9080a6 --- /dev/null +++ b/src/mongo/client/dbclient_connection_integration_test.cpp @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/client/dbclient_connection.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/unittest/integration_test.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +const auto sleepCmd = fromjson(R"({sleep: 1, locks: 'none', secs: 100})"); +constexpr StringData kAppName = "DBClientConnectionTest"_sd; + +class DBClientConnectionFixture : public unittest::Test { +public: + static std::unique_ptr<DBClientConnection> makeConn(StringData name = kAppName) { + std::string errMsg; + auto connHolder = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect(name, errMsg)); + uassert(ErrorCodes::SocketException, errMsg, connHolder); + + auto conn = dynamic_cast<DBClientConnection*>(connHolder.get()); + invariant(conn); + connHolder.release(); + return std::unique_ptr<DBClientConnection>(conn); + } + + void tearDown() override { + // resmoke hangs if there are any ops still running on the server, so try to clean up after + // ourselves. + auto conn = makeConn(kAppName + "-cleanup"); + + BSONObj currOp; + if (!conn->simpleCommand("admin", &currOp, "currentOp")) + uassertStatusOK(getStatusFromCommandResult(currOp)); + + for (auto&& op : currOp["inprog"].Obj()) { + if (op["clientMetadata"]["application"]["name"].String() != kAppName) + continue; + + // Ignore failures to clean up. + BSONObj ignored; + (void)conn->runCommand("admin", BSON("killOp" << 1 << "op" << op["opid"]), ignored); + } + } +}; + +TEST_F(DBClientConnectionFixture, shutdownWorksIfCalledFirst) { + auto conn = makeConn(); + + conn->shutdownAndDisallowReconnect(); + + BSONObj reply; + ASSERT_THROWS(conn->runCommand("admin", sleepCmd, reply), + ExceptionForCat<ErrorCategory::NetworkError>); // Currently SocketException. +} + +TEST_F(DBClientConnectionFixture, shutdownWorksIfRunCommandInProgress) { + auto conn = makeConn(); + + stdx::thread shutdownThread([&] { + // Try to make this run while the connection is blocked in recv. + sleepmillis(100); + conn->shutdownAndDisallowReconnect(); + }); + ON_BLOCK_EXIT([&] { shutdownThread.join(); }); + + BSONObj reply; + ASSERT_THROWS(conn->runCommand("admin", sleepCmd, reply), + ExceptionForCat<ErrorCategory::NetworkError>); // Currently HostUnreachable. +} + +} // namespace +} // namespace mongo |