summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2018-09-04 18:01:14 -0400
committerMathias Stearn <mathias@10gen.com>2018-09-13 12:36:12 -0400
commit63868859a3213eae8e749c1f361eaac813083f59 (patch)
tree6d3ba8957515d23fc49bd2e45d180741dc69118d
parentf4d62c2ba9a27dc03663779d0817bc399ab2e91f (diff)
downloadmongo-63868859a3213eae8e749c1f361eaac813083f59.tar.gz
SERVER-36468 Add a mechanism to allow breaking DBClientConnection out of a blocking operation
-rw-r--r--buildscripts/resmokeconfig/suites/integration_tests_replset.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/integration_tests_sharded.yml1
-rw-r--r--src/mongo/client/SConscript12
-rw-r--r--src/mongo/client/connection_pool.cpp6
-rw-r--r--src/mongo/client/connection_pool.h2
-rw-r--r--src/mongo/client/dbclient_connection.cpp63
-rw-r--r--src/mongo/client/dbclient_connection.h33
-rw-r--r--src/mongo/client/dbclient_connection_integration_test.cpp104
8 files changed, 193 insertions, 30 deletions
diff --git a/buildscripts/resmokeconfig/suites/integration_tests_replset.yml b/buildscripts/resmokeconfig/suites/integration_tests_replset.yml
index f0f62a7d6dc..2337449c4f5 100644
--- a/buildscripts/resmokeconfig/suites/integration_tests_replset.yml
+++ b/buildscripts/resmokeconfig/suites/integration_tests_replset.yml
@@ -2,6 +2,8 @@ test_kind: cpp_integration_test
selector:
root: build/integration_tests.txt
+ exclude_files:
+ - build/integration_tests/dbclient_connection_integration_test* # Needs connection to single host.
executor:
archive:
diff --git a/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml b/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml
index f931947b658..181fab0e990 100644
--- a/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml
+++ b/buildscripts/resmokeconfig/suites/integration_tests_sharded.yml
@@ -4,6 +4,7 @@ selector:
root: build/integration_tests.txt
exclude_files:
- build/integration_tests/network_interface_asio_integration_test*
+ - build/integration_tests/dbclient_connection_integration_test* # Needs sleep command
executor:
archive:
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index fe3cde63e2f..f6c04996994 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -217,6 +217,18 @@ env.CppIntegrationTest(
],
)
+env.CppIntegrationTest(
+ target='dbclient_connection_integration_test',
+ source=[
+ 'dbclient_connection_integration_test.cpp',
+ ],
+ LIBDEPS=[
+ 'clientdriver_network',
+ '$BUILD_DIR/mongo/transport/transport_layer_egress_init',
+ '$BUILD_DIR/mongo/util/version_impl',
+ ],
+)
+
env.Library(
target='async_client',
source=[
diff --git a/src/mongo/client/connection_pool.cpp b/src/mongo/client/connection_pool.cpp
index c02ab68ed5d..703099e746e 100644
--- a/src/mongo/client/connection_pool.cpp
+++ b/src/mongo/client/connection_pool.cpp
@@ -92,20 +92,20 @@ void ConnectionPool::_cleanUpOlderThan_inlock(Date_t now, ConnectionList* hostCo
}
}
-bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const {
+bool ConnectionPool::_shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) {
const Date_t expirationDate = connInfo.creationDate + kMaxConnectionAge;
if (expirationDate <= now) {
return false;
}
- return true;
+ return !connInfo.conn->isFailed();
}
void ConnectionPool::closeAllInUseConnections() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
for (ConnectionList::iterator iter = _inUseConnections.begin(); iter != _inUseConnections.end();
++iter) {
- iter->conn->shutdown();
+ iter->conn->shutdownAndDisallowReconnect();
}
}
diff --git a/src/mongo/client/connection_pool.h b/src/mongo/client/connection_pool.h
index bd461f1bedb..79ed4f3b1b7 100644
--- a/src/mongo/client/connection_pool.h
+++ b/src/mongo/client/connection_pool.h
@@ -169,7 +169,7 @@ private:
/**
* Returns true if the given connection is young enough to keep in the pool.
*/
- bool _shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo) const;
+ static bool _shouldKeepConnection(Date_t now, const ConnectionInfo& connInfo);
/**
* Apply cleanup policy to any host(s) not active in the last kCleanupInterval milliseconds.
diff --git a/src/mongo/client/dbclient_connection.cpp b/src/mongo/client/dbclient_connection.cpp
index 870e98f1920..71fb7ca9a98 100644
--- a/src/mongo/client/dbclient_connection.cpp
+++ b/src/mongo/client/dbclient_connection.cpp
@@ -273,6 +273,13 @@ Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) {
_serverAddress = serverAddress;
_markFailed(kReleaseSession);
+
+ if (_stayFailed.load()) {
+ // This is just an optimization so we don't waste time connecting just to throw it away.
+ // The check below is the one that is important for correctness.
+ return makeSocketError(SocketErrorKind::FAILED_STATE, toString());
+ }
+
if (serverAddress.host().empty()) {
return Status(ErrorCodes::InvalidOptions,
str::stream() << "couldn't connect to server " << _serverAddress.toString()
@@ -311,12 +318,20 @@ Status DBClientConnection::connectSocketOnly(const HostAndPort& serverAddress) {
<< sws.getStatus());
}
- _session = std::move(sws.getValue());
+ {
+ stdx::lock_guard<stdx::mutex> lk(_sessionMutex);
+ if (_stayFailed.load()) {
+ // This object is still in a failed state. The session we just created will be destroyed
+ // immediately since we aren't holding on to it.
+ return makeSocketError(SocketErrorKind::FAILED_STATE, toString());
+ }
+ _session = std::move(sws.getValue());
+ _failed.store(false);
+ }
_sessionCreationMicros = curTimeMicros64();
_lastConnectivityCheck = Date_t::now();
_session->setTimeout(_socketTimeout);
_session->setTags(_tagMask);
- _failed = false;
LOG(1) << "connected to server " << toString();
return Status::OK();
}
@@ -365,12 +380,15 @@ rpc::UniqueReply DBClientConnection::parseCommandReplyMessage(const std::string&
}
void DBClientConnection::_markFailed(FailAction action) {
- _failed = true;
+ _failed.store(true);
if (_session) {
if (action == kEndSession) {
_session->end();
} else if (action == kReleaseSession) {
- _session.reset();
+ transport::SessionHandle destroyedOutsideMutex;
+
+ stdx::lock_guard<stdx::mutex> lk(_sessionMutex);
+ _session.swap(destroyedOutsideMutex);
}
}
}
@@ -379,13 +397,19 @@ bool DBClientConnection::isStillConnected() {
// This method tries to figure out whether the connection is still open, but with several
// caveats.
- // If we don't have a _session then we may have hit an error, or we may just not have
- // connected yet - the _failed flag should indicate which.
- //
- // Otherwise, return false if we know we've had an error (_failed is true)
- if (!_session) {
- return !_failed;
- } else if (_failed) {
+ // If we don't have a _session then we are definitely not connected. If we've been marked failed
+ // then we are supposed to pretend that we aren't connected, even though we may be.
+ // HOWEVER, some unit tests have poorly designed mocks that never populate _session, even when
+ // the DBClientConnection should be considered healthy and connected.
+
+ if (_stayFailed.load()) {
+ // Ensures there is no chance that a perma-failed connection can go back into a pool.
+ return false;
+ } else if (!_session) {
+ // This should always return false in practice, but needs to do this to work around poorly
+ // designed mocks as described above.
+ return !_failed.load();
+ } else if (_failed.load()) {
return false;
}
@@ -400,7 +424,11 @@ bool DBClientConnection::isStillConnected() {
// This will poll() the underlying socket and do a 1 byte recv to see if the connection
// has been closed.
- return _session->isConnected();
+ if (_session->isConnected())
+ return true;
+
+ _markFailed(kSetFlag);
+ return false;
}
void DBClientConnection::setTags(transport::Session::TagMask tags) {
@@ -410,13 +438,14 @@ void DBClientConnection::setTags(transport::Session::TagMask tags) {
_session->setTags(tags);
}
-void DBClientConnection::shutdown() {
+void DBClientConnection::shutdownAndDisallowReconnect() {
+ stdx::lock_guard<stdx::mutex> lk(_sessionMutex);
+ _stayFailed.store(true);
_markFailed(kEndSession);
}
void DBClientConnection::_checkConnection() {
- if (!_failed)
- return;
+ dassert(_failed.load()); // only called when in failed state.
if (!autoReconnect)
throwSocketError(SocketErrorKind::FAILED_STATE, toString());
@@ -426,7 +455,6 @@ void DBClientConnection::_checkConnection() {
LOG(_logLevel) << "trying reconnect to " << toString() << endl;
string errmsg;
- _failed = false;
auto connectStatus = connect(_serverAddress, _applicationName);
if (!connectStatus.isOK()) {
_markFailed(kSetFlag);
@@ -520,8 +548,7 @@ DBClientConnection::DBClientConnection(bool _autoReconnect,
double so_timeout,
MongoURI uri,
const HandshakeValidationHook& hook)
- : _failed(false),
- autoReconnect(_autoReconnect),
+ : autoReconnect(_autoReconnect),
autoReconnectBackoff(1000, 2000),
_hook(hook),
_uri(std::move(uri)) {
diff --git a/src/mongo/client/dbclient_connection.h b/src/mongo/client/dbclient_connection.h
index 7ccceb07590..3977d03e279 100644
--- a/src/mongo/client/dbclient_connection.h
+++ b/src/mongo/client/dbclient_connection.h
@@ -48,6 +48,7 @@
#include "mongo/rpc/protocol.h"
#include "mongo/rpc/unique_message.h"
#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/session.h"
#include "mongo/transport/transport_layer.h"
@@ -63,9 +64,13 @@ class DBClientCursor;
class DBClientCursorBatchIterator;
/**
- A basic connection to the database.
- This is the main entry point for talking to a simple Mongo setup
-*/
+ * A basic connection to the database.
+ * This is the main entry point for talking to a simple Mongo setup
+ *
+ * In general, this type is only allowed to be used from one thread at a time. As a special
+ * exception, it is legal to call shutdownAndDisallowReconnect() from any thread as a way to
+ * interrupt the owning thread.
+ */
class DBClientConnection : public DBClientBase {
public:
using DBClientBase::query;
@@ -177,14 +182,20 @@ public:
a connection will transition back to an ok state after reconnecting.
*/
bool isFailed() const override {
- return _failed;
+ return _failed.load();
}
bool isStillConnected() override;
void setTags(transport::Session::TagMask tag);
- void shutdown();
+ /**
+ * Causes an error to be reported the next time the connection is used. Will interrupt
+ * operations if they are currently blocked waiting for the network.
+ *
+ * This is the only method that is allowed to be called from other threads.
+ */
+ void shutdownAndDisallowReconnect();
void setWireVersions(int minWireVersion, int maxWireVersion) {
_minWireVersion = minWireVersion;
@@ -204,7 +215,7 @@ public:
ss << _serverAddress;
if (!_resolvedAddress.empty())
ss << " (" << _resolvedAddress << ")";
- if (_failed)
+ if (_failed.load())
ss << " failed";
return ss.str();
}
@@ -256,7 +267,7 @@ public:
// throws a NetworkException if in failed state and not reconnecting or if waiting to reconnect
void checkConnection() override {
- if (_failed)
+ if (_failed.load())
_checkConnection();
}
@@ -276,13 +287,19 @@ protected:
void _auth(const BSONObj& params) override;
+ // The session mutex must be held to shutdown the _session from a non-owning thread, or to
+ // rebind the handle from the owning thread. The thread that owns this DBClientConnection is
+ // allowed to use the _session without locking the mutex. This mutex also guards writes to
+ // _stayFailed, although reads are allowed outside the mutex.
+ stdx::mutex _sessionMutex;
transport::SessionHandle _session;
boost::optional<Milliseconds> _socketTimeout;
transport::Session::TagMask _tagMask = transport::Session::kEmptyTagMask;
uint64_t _sessionCreationMicros = INVALID_SOCK_CREATION_TIME;
Date_t _lastConnectivityCheck;
- bool _failed = false;
+ AtomicBool _stayFailed{false};
+ AtomicBool _failed{false};
const bool autoReconnect;
Backoff autoReconnectBackoff;
diff --git a/src/mongo/client/dbclient_connection_integration_test.cpp b/src/mongo/client/dbclient_connection_integration_test.cpp
new file mode 100644
index 00000000000..926ac9080a6
--- /dev/null
+++ b/src/mongo/client/dbclient_connection_integration_test.cpp
@@ -0,0 +1,104 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/dbclient_connection.h"
+
+#include "mongo/base/checked_cast.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/unittest/integration_test.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+const auto sleepCmd = fromjson(R"({sleep: 1, locks: 'none', secs: 100})");
+constexpr StringData kAppName = "DBClientConnectionTest"_sd;
+
+class DBClientConnectionFixture : public unittest::Test {
+public:
+ static std::unique_ptr<DBClientConnection> makeConn(StringData name = kAppName) {
+ std::string errMsg;
+ auto connHolder = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect(name, errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, connHolder);
+
+ auto conn = dynamic_cast<DBClientConnection*>(connHolder.get());
+ invariant(conn);
+ connHolder.release();
+ return std::unique_ptr<DBClientConnection>(conn);
+ }
+
+ void tearDown() override {
+ // resmoke hangs if there are any ops still running on the server, so try to clean up after
+ // ourselves.
+ auto conn = makeConn(kAppName + "-cleanup");
+
+ BSONObj currOp;
+ if (!conn->simpleCommand("admin", &currOp, "currentOp"))
+ uassertStatusOK(getStatusFromCommandResult(currOp));
+
+ for (auto&& op : currOp["inprog"].Obj()) {
+ if (op["clientMetadata"]["application"]["name"].String() != kAppName)
+ continue;
+
+ // Ignore failures to clean up.
+ BSONObj ignored;
+ (void)conn->runCommand("admin", BSON("killOp" << 1 << "op" << op["opid"]), ignored);
+ }
+ }
+};
+
+TEST_F(DBClientConnectionFixture, shutdownWorksIfCalledFirst) {
+ auto conn = makeConn();
+
+ conn->shutdownAndDisallowReconnect();
+
+ BSONObj reply;
+ ASSERT_THROWS(conn->runCommand("admin", sleepCmd, reply),
+ ExceptionForCat<ErrorCategory::NetworkError>); // Currently SocketException.
+}
+
+TEST_F(DBClientConnectionFixture, shutdownWorksIfRunCommandInProgress) {
+ auto conn = makeConn();
+
+ stdx::thread shutdownThread([&] {
+ // Try to make this run while the connection is blocked in recv.
+ sleepmillis(100);
+ conn->shutdownAndDisallowReconnect();
+ });
+ ON_BLOCK_EXIT([&] { shutdownThread.join(); });
+
+ BSONObj reply;
+ ASSERT_THROWS(conn->runCommand("admin", sleepCmd, reply),
+ ExceptionForCat<ErrorCategory::NetworkError>); // Currently HostUnreachable.
+}
+
+} // namespace
+} // namespace mongo