diff options
-rw-r--r-- | jstests/sharding/rs_stepdown_and_pooling.js | 103 | ||||
-rw-r--r-- | jstests/sharding/shard_kill_and_pooling.js | 80 | ||||
-rw-r--r-- | src/mongo/client/connpool.cpp | 7 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.cpp | 49 | ||||
-rw-r--r-- | src/mongo/client/dbclient_rs.h | 1 | ||||
-rw-r--r-- | src/mongo/client/dbclientinterface.h | 7 | ||||
-rw-r--r-- | src/mongo/client/syncclusterconnection.cpp | 12 | ||||
-rw-r--r-- | src/mongo/client/syncclusterconnection.h | 1 | ||||
-rw-r--r-- | src/mongo/db/instance.h | 5 | ||||
-rw-r--r-- | src/mongo/shell/replsettest.js | 7 | ||||
-rw-r--r-- | src/mongo/util/net/message_port.h | 4 | ||||
-rw-r--r-- | src/mongo/util/net/sock.cpp | 155 | ||||
-rw-r--r-- | src/mongo/util/net/sock.h | 5 |
13 files changed, 420 insertions, 16 deletions
diff --git a/jstests/sharding/rs_stepdown_and_pooling.js b/jstests/sharding/rs_stepdown_and_pooling.js new file mode 100644 index 00000000000..5ce5399f536 --- /dev/null +++ b/jstests/sharding/rs_stepdown_and_pooling.js @@ -0,0 +1,103 @@ +// +// Tests what happens when a replica set primary goes down with pooled connections. +// + +// Helper for running arbitrary commands + +var options = {separateConfig : true, sync : true}; + +var st = new ShardingTest({shards : {rs0 : {nodes : 2}}, mongos : 1, other : options}); + +// Stop balancer to eliminate weird conn stuff +st.stopBalancer(); + +var mongos = st.s0; +var coll = mongos.getCollection("foo.bar"); +var primary = st.rs0.getPrimary(); +var secondary = st.rs0.getSecondary(); + +jsTest.log("Creating new connections..."); + +// Create a bunch of connections to the primary node through mongos. +// jstest ->(x10)-> mongos ->(x10)-> primary +var conns = []; +for ( var i = 0; i < 50; i++) { + conns.push(new Mongo(mongos.host)); + conns[i].getCollection(coll + "").findOne(); +} + +assert.eq(primary.port, 31100); + +jsTest.log("Returning the connections back to the pool."); + +for ( var i = 0; i < conns.length; i++ ) { + conns[i] = null; +} +// Make sure we return connections back to the pool +gc(); + +// Don't make test fragile by linking to format of shardConnPoolStats, but this is useful if +// something goes wrong. +var connPoolStats = mongos.getDB("admin").runCommand({ shardConnPoolStats : 1 }); +printjson( connPoolStats ); + +jsTest.log("Stepdown primary and then step back up..."); + +var stepDown = function(node, timeSecs) { + var result = null; + try { + result = node.getDB("admin").runCommand({ replSetStepDown : timeSecs, force : true }); + // Should not get here + } catch (e) { + printjson(e); + } + + if (result != null) printjson(result); + assert.eq(null, result); +} + +stepDown(primary, 0); + +jsTest.log("Waiting for mongos to acknowledge stepdown..."); + +ReplSetTest.awaitRSClientHosts( mongos, + secondary, + { ismaster : true }, + st.rs0, + 2 * 60 * 1000 ); // slow hosts can take longer to recognize the sd + +jsTest.log("Stepping back up..."); + +stepDown(secondary, 10000); + +jsTest.log("Waiting for mongos to acknowledge step up..."); + +ReplSetTest.awaitRSClientHosts( mongos, + primary, + { ismaster : true }, + st.rs0, + 2 * 60 * 1000 ); + +jsTest.log("Waiting for socket timeout time..."); + +// Need to wait longer than the socket polling time. +sleep(2 * 5000); + +jsTest.log("Run queries using new connections."); + +var numErrors = 0; +for ( var i = 0; i < conns.length; i++) { + var newConn = new Mongo(mongos.host); + try { + printjson(newConn.getCollection("foo.bar").findOne()); + } catch (e) { + printjson(e); + numErrors++; + } +} + +assert.eq(0, numErrors); + +jsTest.log("DONE!"); + +st.stop(); diff --git a/jstests/sharding/shard_kill_and_pooling.js b/jstests/sharding/shard_kill_and_pooling.js new file mode 100644 index 00000000000..c053f1fbd7d --- /dev/null +++ b/jstests/sharding/shard_kill_and_pooling.js @@ -0,0 +1,80 @@ +// +// Tests what happens when a shard goes down with pooled connections. +// + +// Run through the same test twice, once with a hard -9 kill, once with a regular shutdown + +for ( var test = 0; test < 2; test++ ) { + +var killWith = (test == 0 ? 15 : 9); +var options = { separateConfig : true }; + +var st = new ShardingTest({shards : 2, mongos : 1, other : options}); + +// Stop balancer to eliminate weird conn stuff +st.stopBalancer(); + +var mongos = st.s0; +var coll = mongos.getCollection("foo.bar"); + +coll.insert({ hello : "world" }) +assert.eq( null, coll.getDB().getLastError() ); + +jsTest.log("Creating new connections..."); + +// Create a bunch of connections to the primary node through mongos. +// jstest ->(x10)-> mongos ->(x10)-> primary +var conns = []; +for ( var i = 0; i < 50; i++) { + conns.push(new Mongo(mongos.host)); + assert.neq( null, conns[i].getCollection(coll + "").findOne() ); +} + +jsTest.log("Returning the connections back to the pool."); + +for ( var i = 0; i < conns.length; i++ ) { + conns[i] = null; +} +// Make sure we return connections back to the pool +gc(); + +// Don't make test fragile by linking to format of shardConnPoolStats, but this is useful if +// something goes wrong. +var connPoolStats = mongos.getDB("admin").runCommand({ shardConnPoolStats : 1 }); +printjson( connPoolStats ); + +jsTest.log("Shutdown shard " + (killWith == 9 ? "uncleanly" : "" ) + "..."); + +MongoRunner.stopMongod( st.shard0, killWith ); + +jsTest.log("Restart shard..."); + +st.shard0 = MongoRunner.runMongod({ restart : st.shard0 }); + +jsTest.log("Waiting for socket timeout time..."); + +// Need to wait longer than the socket polling time. +sleep(2 * 5000); + +jsTest.log("Run queries using new connections."); + +var numErrors = 0; +for ( var i = 0; i < conns.length; i++) { + var newConn = new Mongo(mongos.host); + try { + assert.neq( null, newConn.getCollection("foo.bar").findOne() ); + } catch (e) { + printjson(e); + numErrors++; + } +} + +assert.eq(0, numErrors); + +st.stop(); + +jsTest.log("DONE test " + test); + +} // End test loop + +jsTest.log("DONE!");
\ No newline at end of file diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp index 740249d87bb..36cd5919e85 100644 --- a/src/mongo/client/connpool.cpp +++ b/src/mongo/client/connpool.cpp @@ -61,7 +61,8 @@ namespace mongo { microSec > _minValidCreationTimeMicroSec) { _minValidCreationTimeMicroSec = microSec; log() << "Detected bad connection created at " << _minValidCreationTimeMicroSec - << " microSec, clearing pool for " << _hostName << endl; + << " microSec, clearing pool for " << _hostName + << " of " << _pool.size() << " connections" << endl; clear(); } } @@ -148,8 +149,8 @@ namespace mongo { } bool PoolForHost::StoredConnection::ok( time_t now ) { - // if connection has been idle for 30 minutes, kill it - return ( now - when ) < 1800; + // Poke the connection to see if we're still ok + return conn->isStillConnected(); } void PoolForHost::createdOne( DBClientBase * base) { diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp index 2008ed59c3e..ddc20496df4 100644 --- a/src/mongo/client/dbclient_rs.cpp +++ b/src/mongo/client/dbclient_rs.cpp @@ -260,19 +260,30 @@ namespace mongo { /** * @return the connection associated with the monitor node. Will also attempt - * to establish connection if NULL. Can still return NULL if reconnect failed. + * to establish connection if NULL or broken in background. + * Can still return NULL if reconnect failed. */ - shared_ptr<DBClientConnection> _getConnWithRefresh(ReplicaSetMonitor::Node& node) { - if (node.conn.get() == NULL) { - ConnectionString connStr(node.addr); + shared_ptr<DBClientConnection> _getConnWithRefresh( ReplicaSetMonitor::Node& node ) { + if ( node.conn.get() == NULL || !node.conn->isStillConnected() ) { + + // Note: This constructor only works with MASTER connections + ConnectionString connStr( node.addr ); string errmsg; try { - node.conn.reset(dynamic_cast<DBClientConnection*>( - connStr.connect(errmsg, ReplicaSetMonitor::SOCKET_TIMEOUT_SECS))); + DBClientBase* conn = connStr.connect( errmsg, + ReplicaSetMonitor::SOCKET_TIMEOUT_SECS ); + if ( conn == NULL ) { + node.ok = false; + node.conn.reset(); + } + else { + node.conn.reset( dynamic_cast<DBClientConnection*>( conn ) ); + } } - catch (const AssertionException&) { + catch ( const AssertionException& ) { node.ok = false; + node.conn.reset(); } } @@ -1318,6 +1329,30 @@ namespace mongo { return rsm->getServerAddress(); } + // A replica set connection is never disconnected, since it controls its own reconnection + // logic. + // + // Has the side effect of proactively clearing any cached connections which have been + // disconnected in the background. + bool DBClientReplicaSet::isStillConnected() { + + if ( _master && !_master->isStillConnected() ) { + _master.reset(); + _masterHost = HostAndPort(); + // Don't notify monitor of bg failure, since it's not clear how long ago it happened + } + + if ( _lastSlaveOkConn && !_lastSlaveOkConn->isStillConnected() ) { + _lastSlaveOkConn.reset(); + _lastSlaveOkHost = HostAndPort(); + // Reset read pref too, since we're re-selecting the slaveOk host anyway + _lastReadPref.reset(); + // Don't notify monitor of bg failure, since it's not clear how long ago it happened + } + + return true; + } + DBClientConnection * DBClientReplicaSet::checkMaster() { ReplicaSetMonitorPtr monitor = _getMonitor(); HostAndPort h = monitor->getMaster(); diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h index 2b91eb945b3..3d3ce0093fa 100644 --- a/src/mongo/client/dbclient_rs.h +++ b/src/mongo/client/dbclient_rs.h @@ -489,6 +489,7 @@ namespace mongo { // ----- status ------ virtual bool isFailed() const { return ! _master || _master->isFailed(); } + bool isStillConnected(); // ----- informational ---- diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h index 8bb715083ae..c3a93c2a9ad 100644 --- a/src/mongo/client/dbclientinterface.h +++ b/src/mongo/client/dbclientinterface.h @@ -1082,6 +1082,11 @@ namespace mongo { virtual bool isFailed() const = 0; + /** + * if not checked recently, checks whether the underlying socket/sockets are still valid + */ + virtual bool isStillConnected() = 0; + virtual void killCursor( long long cursorID ) = 0; virtual bool callRead( Message& toSend , Message& response ) = 0; @@ -1192,6 +1197,8 @@ namespace mongo { */ bool isFailed() const { return _failed; } + bool isStillConnected() { return p ? p->isStillConnected() : true; } + MessagingPort& port() { verify(p); return *p; } string toStringLong() const { diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp index 19d1e124bab..ec0bcf627a6 100644 --- a/src/mongo/client/syncclusterconnection.cpp +++ b/src/mongo/client/syncclusterconnection.cpp @@ -491,6 +491,18 @@ namespace mongo { verify(0); } + // A SCC should be reused only if all the existing connections haven't been broken in the + // background. + // Note: an SCC may have missing connections if a config server is temporarily offline, + // but reading from the others is still allowed. + bool SyncClusterConnection::isStillConnected() { + for ( size_t i = 0; i < _conns.size(); i++ ) { + if ( _conns[i] && !_conns[i]->isStillConnected() ) return false; + + } + return true; + } + void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){ _socketTimeout = socketTimeout; for ( size_t i=0; i<_conns.size(); i++ ) diff --git a/src/mongo/client/syncclusterconnection.h b/src/mongo/client/syncclusterconnection.h index 3511285117c..cd81be44c6e 100644 --- a/src/mongo/client/syncclusterconnection.h +++ b/src/mongo/client/syncclusterconnection.h @@ -92,6 +92,7 @@ namespace mongo { virtual string getServerAddress() const { return _address; } virtual bool isFailed() const { return false; } + virtual bool isStillConnected(); virtual string toString() { return _toString(); } virtual BSONObj getLastErrorDetailed(const std::string& db, diff --git a/src/mongo/db/instance.h b/src/mongo/db/instance.h index 09c1969c880..6d84a589797 100644 --- a/src/mongo/db/instance.h +++ b/src/mongo/db/instance.h @@ -90,6 +90,11 @@ namespace mongo { virtual bool isFailed() const { return false; } + + virtual bool isStillConnected() { + return true; + } + virtual string toString() { return "DBDirectClient"; } diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js index 05b8be4af29..fe0702b7435 100644 --- a/src/mongo/shell/replsettest.js +++ b/src/mongo/shell/replsettest.js @@ -327,7 +327,7 @@ ReplSetTest.prototype.callIsMaster = function() { return master || false; } -ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs ) { +ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs, timeout ) { var hostCount = host.length; if( hostCount ){ for( var i = 0; i < hostCount; i++ ) { @@ -336,6 +336,8 @@ ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs ) { return; } + timeout = timeout || 60 * 1000; + if( hostOk == undefined ) hostOk = { ok : true } if( host.host ) host = host.host if( rs && rs.getMaster ) rs = rs.name @@ -374,8 +376,7 @@ ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs ) { } } return false; - }, "timed out waiting for replica set client to recognize hosts", - 3 * 20 * 1000 /* ReplicaSetMonitorWatcher updates every 20s */ ) + }, "timed out waiting for replica set client to recognize hosts", timeout ) } diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h index 9988f15a58b..fcbde98338e 100644 --- a/src/mongo/util/net/message_port.h +++ b/src/mongo/util/net/message_port.h @@ -115,6 +115,10 @@ namespace mongo { } #endif + bool isStillConnected() { + return psock->isStillConnected(); + } + uint64_t getSockCreationMicroSec() const { return psock->getSockCreationMicroSec(); } diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp index f9df16c81be..c40fa3dc605 100644 --- a/src/mongo/util/net/sock.cpp +++ b/src/mongo/util/net/sock.cpp @@ -20,9 +20,9 @@ #include "mongo/util/net/sock.h" #if !defined(_WIN32) +# include <sys/poll.h> # include <sys/socket.h> # include <sys/types.h> -# include <sys/socket.h> # include <sys/un.h> # include <netinet/in.h> # include <netinet/tcp.h> @@ -392,7 +392,7 @@ namespace mongo { // ------------ Socket ----------------- Socket::Socket(int fd , const SockAddr& remote) : - _fd(fd), _remote(remote), _timeout(0) { + _fd(fd), _remote(remote), _timeout(0), _lastValidityCheckAtSecs(time(0)) { _logLevel = 0; _init(); } @@ -401,6 +401,7 @@ namespace mongo { _logLevel = ll; _fd = -1; _timeout = timeout; + _lastValidityCheckAtSecs = time(0); _init(); } @@ -426,11 +427,17 @@ namespace mongo { void Socket::close() { if ( _fd >= 0 ) { + // Stop any blocking reads/writes, and prevent new reads/writes +#if defined(_WIN32) + shutdown( _fd, SD_BOTH ); +#else + shutdown( _fd, SHUT_RDWR ); +#endif closesocket( _fd ); _fd = -1; } } - + #ifdef MONGO_SSL void Socket::secure(SSLManagerInterface* mgr) { fassert(16503, mgr); @@ -757,6 +764,148 @@ namespace mongo { setSockTimeouts( _fd, secs ); } + // TODO: allow modification? + // + // <positive value> : secs to wait between stillConnected checks + // 0 : always check + // -1 : never check + const int Socket::errorPollIntervalSecs( 5 ); + +#if defined(NTDDI_VERSION) && ( !defined(NTDDI_VISTA) || ( NTDDI_VERSION < NTDDI_VISTA ) ) + // Windows XP + + // pre-Vista windows doesn't have WSAPoll, so don't test connections + bool Socket::isStillConnected() { + return true; + } + +#else // Not Windows XP + + // Patch to allow better tolerance of flaky network connections that get broken + // while we aren't looking. + // TODO: Remove when better async changes come. + // + // isStillConnected() polls the socket at max every Socket::errorPollIntervalSecs to determine + // if any disconnection-type events have happened on the socket. + bool Socket::isStillConnected() { + + if ( errorPollIntervalSecs < 0 ) return true; + + time_t now = time( 0 ); + time_t idleTimeSecs = now - _lastValidityCheckAtSecs; + + // Only check once every 5 secs + if ( idleTimeSecs < errorPollIntervalSecs ) return true; + // Reset our timer, we're checking the connection + _lastValidityCheckAtSecs = now; + + // It's been long enough, poll to see if our socket is still connected + + pollfd pollInfo; + pollInfo.fd = _fd; + // We only care about reading the EOF message on clean close (and errors) + pollInfo.events = POLLIN; + + // Poll( info[], size, timeout ) - timeout == 0 => nonblocking +#if defined(_WIN32) + int nEvents = WSAPoll( &pollInfo, 1, 0 ); +#else + int nEvents = ::poll( &pollInfo, 1, 0 ); +#endif + + LOG( 2 ) << "polling for status of connection to " << remoteString() + << ", " << ( nEvents == 0 ? "no events" : + nEvents == -1 ? "error detected" : + "event detected" ) << endl; + + if ( nEvents == 0 ) { + // No events incoming, return still connected AFAWK + return true; + } + else if ( nEvents < 0 ) { + // Poll itself failed, this is weird, warn and log errno + warning() << "Socket poll() failed during connectivity check" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" + << causedBy(errnoWithDescription()) << endl; + + // Return true since it's not clear that we're disconnected. + return true; + } + + dassert( nEvents == 1 ); + dassert( pollInfo.revents > 0 ); + + // Return false at this point, some event happened on the socket, but log what the + // actual event was. + + if ( pollInfo.revents & POLLIN ) { + + // There shouldn't really be any data to recv here, so make sure this + // is a clean hangup. + + // Used concurrently, but we never actually read this data + static char testBuf[1]; + + int recvd = ::recv( _fd, testBuf, 1, portRecvFlags ); + + if ( recvd < 0 ) { + // An error occurred during recv, warn and log errno + warning() << "Socket recv() failed during connectivity check" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" + << causedBy(errnoWithDescription()) << endl; + } + else if ( recvd > 0 ) { + // We got nonzero data from this socket, very weird? + // Log and warn at runtime, log and abort at devtime + // TODO: Dump the data to the log somehow? + error() << "Socket found pending data during connectivity check" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" << endl; + dassert( false ); + } + else { + // recvd == 0, socket closed remotely, just return false + LOG( 0 ) << "Socket closed remotely, no longer connected" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" << endl; + } + } + else if ( pollInfo.revents & POLLHUP ) { + // A hangup has occurred on this socket + LOG( _logLevel ) << "Socket hangup detected, no longer connected" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" << endl; + } + else if ( pollInfo.revents & POLLERR ) { + // An error has occurred on this socket + LOG( _logLevel ) << "Socket error detected, no longer connected" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" << endl; + } + else if ( pollInfo.revents & POLLNVAL ) { + // Socket descriptor itself is weird + // Log and warn at runtime, log and abort at devtime + error() << "Socket descriptor detected as invalid" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" << endl; + dassert( false ); + } + else { + // Don't know what poll is saying here + // Log and warn at runtime, log and abort at devtime + error() << "Socket had unknown event (" << static_cast<int>(pollInfo.revents) << ")" + << " (idle " << idleTimeSecs << " secs," + << " remote host " << remoteString() << ")" << endl; + dassert( false ); + } + + return false; + } + +#endif // End Not Windows XP + #if defined(_WIN32) struct WinsockInit { WinsockInit() { diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h index f867478da68..c1c610a8d01 100644 --- a/src/mongo/util/net/sock.h +++ b/src/mongo/util/net/sock.h @@ -177,6 +177,9 @@ namespace mongo { */ class Socket : boost::noncopyable { public: + + static const int errorPollIntervalSecs; + Socket(int sock, const SockAddr& farEnd); /** In some cases the timeout will actually be 2x this value - eg we do a partial send, @@ -211,6 +214,7 @@ namespace mongo { long long getBytesOut() const { return _bytesOut; } void setTimeout( double secs ); + bool isStillConnected(); #ifdef MONGO_SSL /** secures inline */ @@ -256,6 +260,7 @@ namespace mongo { long long _bytesIn; long long _bytesOut; + time_t _lastValidityCheckAtSecs; #ifdef MONGO_SSL SSL* _ssl; |