summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/sharding/rs_stepdown_and_pooling.js103
-rw-r--r--jstests/sharding/shard_kill_and_pooling.js80
-rw-r--r--src/mongo/client/connpool.cpp7
-rw-r--r--src/mongo/client/dbclient_rs.cpp49
-rw-r--r--src/mongo/client/dbclient_rs.h1
-rw-r--r--src/mongo/client/dbclientinterface.h7
-rw-r--r--src/mongo/client/syncclusterconnection.cpp12
-rw-r--r--src/mongo/client/syncclusterconnection.h1
-rw-r--r--src/mongo/db/instance.h5
-rw-r--r--src/mongo/shell/replsettest.js7
-rw-r--r--src/mongo/util/net/message_port.h4
-rw-r--r--src/mongo/util/net/sock.cpp155
-rw-r--r--src/mongo/util/net/sock.h5
13 files changed, 420 insertions, 16 deletions
diff --git a/jstests/sharding/rs_stepdown_and_pooling.js b/jstests/sharding/rs_stepdown_and_pooling.js
new file mode 100644
index 00000000000..5ce5399f536
--- /dev/null
+++ b/jstests/sharding/rs_stepdown_and_pooling.js
@@ -0,0 +1,103 @@
+//
+// Tests what happens when a replica set primary goes down with pooled connections.
+//
+
+// Helper for running arbitrary commands
+
+var options = {separateConfig : true, sync : true};
+
+var st = new ShardingTest({shards : {rs0 : {nodes : 2}}, mongos : 1, other : options});
+
+// Stop balancer to eliminate weird conn stuff
+st.stopBalancer();
+
+var mongos = st.s0;
+var coll = mongos.getCollection("foo.bar");
+var primary = st.rs0.getPrimary();
+var secondary = st.rs0.getSecondary();
+
+jsTest.log("Creating new connections...");
+
+// Create a bunch of connections to the primary node through mongos.
+// jstest ->(x10)-> mongos ->(x10)-> primary
+var conns = [];
+for ( var i = 0; i < 50; i++) {
+ conns.push(new Mongo(mongos.host));
+ conns[i].getCollection(coll + "").findOne();
+}
+
+assert.eq(primary.port, 31100);
+
+jsTest.log("Returning the connections back to the pool.");
+
+for ( var i = 0; i < conns.length; i++ ) {
+ conns[i] = null;
+}
+// Make sure we return connections back to the pool
+gc();
+
+// Don't make test fragile by linking to format of shardConnPoolStats, but this is useful if
+// something goes wrong.
+var connPoolStats = mongos.getDB("admin").runCommand({ shardConnPoolStats : 1 });
+printjson( connPoolStats );
+
+jsTest.log("Stepdown primary and then step back up...");
+
+var stepDown = function(node, timeSecs) {
+ var result = null;
+ try {
+ result = node.getDB("admin").runCommand({ replSetStepDown : timeSecs, force : true });
+ // Should not get here
+ } catch (e) {
+ printjson(e);
+ }
+
+ if (result != null) printjson(result);
+ assert.eq(null, result);
+}
+
+stepDown(primary, 0);
+
+jsTest.log("Waiting for mongos to acknowledge stepdown...");
+
+ReplSetTest.awaitRSClientHosts( mongos,
+ secondary,
+ { ismaster : true },
+ st.rs0,
+ 2 * 60 * 1000 ); // slow hosts can take longer to recognize the sd
+
+jsTest.log("Stepping back up...");
+
+stepDown(secondary, 10000);
+
+jsTest.log("Waiting for mongos to acknowledge step up...");
+
+ReplSetTest.awaitRSClientHosts( mongos,
+ primary,
+ { ismaster : true },
+ st.rs0,
+ 2 * 60 * 1000 );
+
+jsTest.log("Waiting for socket timeout time...");
+
+// Need to wait longer than the socket polling time.
+sleep(2 * 5000);
+
+jsTest.log("Run queries using new connections.");
+
+var numErrors = 0;
+for ( var i = 0; i < conns.length; i++) {
+ var newConn = new Mongo(mongos.host);
+ try {
+ printjson(newConn.getCollection("foo.bar").findOne());
+ } catch (e) {
+ printjson(e);
+ numErrors++;
+ }
+}
+
+assert.eq(0, numErrors);
+
+jsTest.log("DONE!");
+
+st.stop();
diff --git a/jstests/sharding/shard_kill_and_pooling.js b/jstests/sharding/shard_kill_and_pooling.js
new file mode 100644
index 00000000000..c053f1fbd7d
--- /dev/null
+++ b/jstests/sharding/shard_kill_and_pooling.js
@@ -0,0 +1,80 @@
+//
+// Tests what happens when a shard goes down with pooled connections.
+//
+
+// Run through the same test twice, once with a hard -9 kill, once with a regular shutdown
+
+for ( var test = 0; test < 2; test++ ) {
+
+var killWith = (test == 0 ? 15 : 9);
+var options = { separateConfig : true };
+
+var st = new ShardingTest({shards : 2, mongos : 1, other : options});
+
+// Stop balancer to eliminate weird conn stuff
+st.stopBalancer();
+
+var mongos = st.s0;
+var coll = mongos.getCollection("foo.bar");
+
+coll.insert({ hello : "world" })
+assert.eq( null, coll.getDB().getLastError() );
+
+jsTest.log("Creating new connections...");
+
+// Create a bunch of connections to the primary node through mongos.
+// jstest ->(x10)-> mongos ->(x10)-> primary
+var conns = [];
+for ( var i = 0; i < 50; i++) {
+ conns.push(new Mongo(mongos.host));
+ assert.neq( null, conns[i].getCollection(coll + "").findOne() );
+}
+
+jsTest.log("Returning the connections back to the pool.");
+
+for ( var i = 0; i < conns.length; i++ ) {
+ conns[i] = null;
+}
+// Make sure we return connections back to the pool
+gc();
+
+// Don't make test fragile by linking to format of shardConnPoolStats, but this is useful if
+// something goes wrong.
+var connPoolStats = mongos.getDB("admin").runCommand({ shardConnPoolStats : 1 });
+printjson( connPoolStats );
+
+jsTest.log("Shutdown shard " + (killWith == 9 ? "uncleanly" : "" ) + "...");
+
+MongoRunner.stopMongod( st.shard0, killWith );
+
+jsTest.log("Restart shard...");
+
+st.shard0 = MongoRunner.runMongod({ restart : st.shard0 });
+
+jsTest.log("Waiting for socket timeout time...");
+
+// Need to wait longer than the socket polling time.
+sleep(2 * 5000);
+
+jsTest.log("Run queries using new connections.");
+
+var numErrors = 0;
+for ( var i = 0; i < conns.length; i++) {
+ var newConn = new Mongo(mongos.host);
+ try {
+ assert.neq( null, newConn.getCollection("foo.bar").findOne() );
+ } catch (e) {
+ printjson(e);
+ numErrors++;
+ }
+}
+
+assert.eq(0, numErrors);
+
+st.stop();
+
+jsTest.log("DONE test " + test);
+
+} // End test loop
+
+jsTest.log("DONE!"); \ No newline at end of file
diff --git a/src/mongo/client/connpool.cpp b/src/mongo/client/connpool.cpp
index 740249d87bb..36cd5919e85 100644
--- a/src/mongo/client/connpool.cpp
+++ b/src/mongo/client/connpool.cpp
@@ -61,7 +61,8 @@ namespace mongo {
microSec > _minValidCreationTimeMicroSec) {
_minValidCreationTimeMicroSec = microSec;
log() << "Detected bad connection created at " << _minValidCreationTimeMicroSec
- << " microSec, clearing pool for " << _hostName << endl;
+ << " microSec, clearing pool for " << _hostName
+ << " of " << _pool.size() << " connections" << endl;
clear();
}
}
@@ -148,8 +149,8 @@ namespace mongo {
}
bool PoolForHost::StoredConnection::ok( time_t now ) {
- // if connection has been idle for 30 minutes, kill it
- return ( now - when ) < 1800;
+ // Poke the connection to see if we're still ok
+ return conn->isStillConnected();
}
void PoolForHost::createdOne( DBClientBase * base) {
diff --git a/src/mongo/client/dbclient_rs.cpp b/src/mongo/client/dbclient_rs.cpp
index 2008ed59c3e..ddc20496df4 100644
--- a/src/mongo/client/dbclient_rs.cpp
+++ b/src/mongo/client/dbclient_rs.cpp
@@ -260,19 +260,30 @@ namespace mongo {
/**
* @return the connection associated with the monitor node. Will also attempt
- * to establish connection if NULL. Can still return NULL if reconnect failed.
+ * to establish connection if NULL or broken in background.
+ * Can still return NULL if reconnect failed.
*/
- shared_ptr<DBClientConnection> _getConnWithRefresh(ReplicaSetMonitor::Node& node) {
- if (node.conn.get() == NULL) {
- ConnectionString connStr(node.addr);
+ shared_ptr<DBClientConnection> _getConnWithRefresh( ReplicaSetMonitor::Node& node ) {
+ if ( node.conn.get() == NULL || !node.conn->isStillConnected() ) {
+
+ // Note: This constructor only works with MASTER connections
+ ConnectionString connStr( node.addr );
string errmsg;
try {
- node.conn.reset(dynamic_cast<DBClientConnection*>(
- connStr.connect(errmsg, ReplicaSetMonitor::SOCKET_TIMEOUT_SECS)));
+ DBClientBase* conn = connStr.connect( errmsg,
+ ReplicaSetMonitor::SOCKET_TIMEOUT_SECS );
+ if ( conn == NULL ) {
+ node.ok = false;
+ node.conn.reset();
+ }
+ else {
+ node.conn.reset( dynamic_cast<DBClientConnection*>( conn ) );
+ }
}
- catch (const AssertionException&) {
+ catch ( const AssertionException& ) {
node.ok = false;
+ node.conn.reset();
}
}
@@ -1318,6 +1329,30 @@ namespace mongo {
return rsm->getServerAddress();
}
+ // A replica set connection is never disconnected, since it controls its own reconnection
+ // logic.
+ //
+ // Has the side effect of proactively clearing any cached connections which have been
+ // disconnected in the background.
+ bool DBClientReplicaSet::isStillConnected() {
+
+ if ( _master && !_master->isStillConnected() ) {
+ _master.reset();
+ _masterHost = HostAndPort();
+ // Don't notify monitor of bg failure, since it's not clear how long ago it happened
+ }
+
+ if ( _lastSlaveOkConn && !_lastSlaveOkConn->isStillConnected() ) {
+ _lastSlaveOkConn.reset();
+ _lastSlaveOkHost = HostAndPort();
+ // Reset read pref too, since we're re-selecting the slaveOk host anyway
+ _lastReadPref.reset();
+ // Don't notify monitor of bg failure, since it's not clear how long ago it happened
+ }
+
+ return true;
+ }
+
DBClientConnection * DBClientReplicaSet::checkMaster() {
ReplicaSetMonitorPtr monitor = _getMonitor();
HostAndPort h = monitor->getMaster();
diff --git a/src/mongo/client/dbclient_rs.h b/src/mongo/client/dbclient_rs.h
index 2b91eb945b3..3d3ce0093fa 100644
--- a/src/mongo/client/dbclient_rs.h
+++ b/src/mongo/client/dbclient_rs.h
@@ -489,6 +489,7 @@ namespace mongo {
// ----- status ------
virtual bool isFailed() const { return ! _master || _master->isFailed(); }
+ bool isStillConnected();
// ----- informational ----
diff --git a/src/mongo/client/dbclientinterface.h b/src/mongo/client/dbclientinterface.h
index 8bb715083ae..c3a93c2a9ad 100644
--- a/src/mongo/client/dbclientinterface.h
+++ b/src/mongo/client/dbclientinterface.h
@@ -1082,6 +1082,11 @@ namespace mongo {
virtual bool isFailed() const = 0;
+ /**
+ * if not checked recently, checks whether the underlying socket/sockets are still valid
+ */
+ virtual bool isStillConnected() = 0;
+
virtual void killCursor( long long cursorID ) = 0;
virtual bool callRead( Message& toSend , Message& response ) = 0;
@@ -1192,6 +1197,8 @@ namespace mongo {
*/
bool isFailed() const { return _failed; }
+ bool isStillConnected() { return p ? p->isStillConnected() : true; }
+
MessagingPort& port() { verify(p); return *p; }
string toStringLong() const {
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp
index 19d1e124bab..ec0bcf627a6 100644
--- a/src/mongo/client/syncclusterconnection.cpp
+++ b/src/mongo/client/syncclusterconnection.cpp
@@ -491,6 +491,18 @@ namespace mongo {
verify(0);
}
+ // A SCC should be reused only if all the existing connections haven't been broken in the
+ // background.
+ // Note: an SCC may have missing connections if a config server is temporarily offline,
+ // but reading from the others is still allowed.
+ bool SyncClusterConnection::isStillConnected() {
+ for ( size_t i = 0; i < _conns.size(); i++ ) {
+ if ( _conns[i] && !_conns[i]->isStillConnected() ) return false;
+
+ }
+ return true;
+ }
+
void SyncClusterConnection::setAllSoTimeouts( double socketTimeout ){
_socketTimeout = socketTimeout;
for ( size_t i=0; i<_conns.size(); i++ )
diff --git a/src/mongo/client/syncclusterconnection.h b/src/mongo/client/syncclusterconnection.h
index 3511285117c..cd81be44c6e 100644
--- a/src/mongo/client/syncclusterconnection.h
+++ b/src/mongo/client/syncclusterconnection.h
@@ -92,6 +92,7 @@ namespace mongo {
virtual string getServerAddress() const { return _address; }
virtual bool isFailed() const { return false; }
+ virtual bool isStillConnected();
virtual string toString() { return _toString(); }
virtual BSONObj getLastErrorDetailed(const std::string& db,
diff --git a/src/mongo/db/instance.h b/src/mongo/db/instance.h
index 09c1969c880..6d84a589797 100644
--- a/src/mongo/db/instance.h
+++ b/src/mongo/db/instance.h
@@ -90,6 +90,11 @@ namespace mongo {
virtual bool isFailed() const {
return false;
}
+
+ virtual bool isStillConnected() {
+ return true;
+ }
+
virtual string toString() {
return "DBDirectClient";
}
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index 05b8be4af29..fe0702b7435 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -327,7 +327,7 @@ ReplSetTest.prototype.callIsMaster = function() {
return master || false;
}
-ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs ) {
+ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs, timeout ) {
var hostCount = host.length;
if( hostCount ){
for( var i = 0; i < hostCount; i++ ) {
@@ -336,6 +336,8 @@ ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs ) {
return;
}
+ timeout = timeout || 60 * 1000;
+
if( hostOk == undefined ) hostOk = { ok : true }
if( host.host ) host = host.host
if( rs && rs.getMaster ) rs = rs.name
@@ -374,8 +376,7 @@ ReplSetTest.awaitRSClientHosts = function( conn, host, hostOk, rs ) {
}
}
return false;
- }, "timed out waiting for replica set client to recognize hosts",
- 3 * 20 * 1000 /* ReplicaSetMonitorWatcher updates every 20s */ )
+ }, "timed out waiting for replica set client to recognize hosts", timeout )
}
diff --git a/src/mongo/util/net/message_port.h b/src/mongo/util/net/message_port.h
index 9988f15a58b..fcbde98338e 100644
--- a/src/mongo/util/net/message_port.h
+++ b/src/mongo/util/net/message_port.h
@@ -115,6 +115,10 @@ namespace mongo {
}
#endif
+ bool isStillConnected() {
+ return psock->isStillConnected();
+ }
+
uint64_t getSockCreationMicroSec() const {
return psock->getSockCreationMicroSec();
}
diff --git a/src/mongo/util/net/sock.cpp b/src/mongo/util/net/sock.cpp
index f9df16c81be..c40fa3dc605 100644
--- a/src/mongo/util/net/sock.cpp
+++ b/src/mongo/util/net/sock.cpp
@@ -20,9 +20,9 @@
#include "mongo/util/net/sock.h"
#if !defined(_WIN32)
+# include <sys/poll.h>
# include <sys/socket.h>
# include <sys/types.h>
-# include <sys/socket.h>
# include <sys/un.h>
# include <netinet/in.h>
# include <netinet/tcp.h>
@@ -392,7 +392,7 @@ namespace mongo {
// ------------ Socket -----------------
Socket::Socket(int fd , const SockAddr& remote) :
- _fd(fd), _remote(remote), _timeout(0) {
+ _fd(fd), _remote(remote), _timeout(0), _lastValidityCheckAtSecs(time(0)) {
_logLevel = 0;
_init();
}
@@ -401,6 +401,7 @@ namespace mongo {
_logLevel = ll;
_fd = -1;
_timeout = timeout;
+ _lastValidityCheckAtSecs = time(0);
_init();
}
@@ -426,11 +427,17 @@ namespace mongo {
void Socket::close() {
if ( _fd >= 0 ) {
+ // Stop any blocking reads/writes, and prevent new reads/writes
+#if defined(_WIN32)
+ shutdown( _fd, SD_BOTH );
+#else
+ shutdown( _fd, SHUT_RDWR );
+#endif
closesocket( _fd );
_fd = -1;
}
}
-
+
#ifdef MONGO_SSL
void Socket::secure(SSLManagerInterface* mgr) {
fassert(16503, mgr);
@@ -757,6 +764,148 @@ namespace mongo {
setSockTimeouts( _fd, secs );
}
+ // TODO: allow modification?
+ //
+ // <positive value> : secs to wait between stillConnected checks
+ // 0 : always check
+ // -1 : never check
+ const int Socket::errorPollIntervalSecs( 5 );
+
+#if defined(NTDDI_VERSION) && ( !defined(NTDDI_VISTA) || ( NTDDI_VERSION < NTDDI_VISTA ) )
+ // Windows XP
+
+ // pre-Vista windows doesn't have WSAPoll, so don't test connections
+ bool Socket::isStillConnected() {
+ return true;
+ }
+
+#else // Not Windows XP
+
+ // Patch to allow better tolerance of flaky network connections that get broken
+ // while we aren't looking.
+ // TODO: Remove when better async changes come.
+ //
+ // isStillConnected() polls the socket at max every Socket::errorPollIntervalSecs to determine
+ // if any disconnection-type events have happened on the socket.
+ bool Socket::isStillConnected() {
+
+ if ( errorPollIntervalSecs < 0 ) return true;
+
+ time_t now = time( 0 );
+ time_t idleTimeSecs = now - _lastValidityCheckAtSecs;
+
+ // Only check once every 5 secs
+ if ( idleTimeSecs < errorPollIntervalSecs ) return true;
+ // Reset our timer, we're checking the connection
+ _lastValidityCheckAtSecs = now;
+
+ // It's been long enough, poll to see if our socket is still connected
+
+ pollfd pollInfo;
+ pollInfo.fd = _fd;
+ // We only care about reading the EOF message on clean close (and errors)
+ pollInfo.events = POLLIN;
+
+ // Poll( info[], size, timeout ) - timeout == 0 => nonblocking
+#if defined(_WIN32)
+ int nEvents = WSAPoll( &pollInfo, 1, 0 );
+#else
+ int nEvents = ::poll( &pollInfo, 1, 0 );
+#endif
+
+ LOG( 2 ) << "polling for status of connection to " << remoteString()
+ << ", " << ( nEvents == 0 ? "no events" :
+ nEvents == -1 ? "error detected" :
+ "event detected" ) << endl;
+
+ if ( nEvents == 0 ) {
+ // No events incoming, return still connected AFAWK
+ return true;
+ }
+ else if ( nEvents < 0 ) {
+ // Poll itself failed, this is weird, warn and log errno
+ warning() << "Socket poll() failed during connectivity check"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")"
+ << causedBy(errnoWithDescription()) << endl;
+
+ // Return true since it's not clear that we're disconnected.
+ return true;
+ }
+
+ dassert( nEvents == 1 );
+ dassert( pollInfo.revents > 0 );
+
+ // Return false at this point, some event happened on the socket, but log what the
+ // actual event was.
+
+ if ( pollInfo.revents & POLLIN ) {
+
+ // There shouldn't really be any data to recv here, so make sure this
+ // is a clean hangup.
+
+ // Used concurrently, but we never actually read this data
+ static char testBuf[1];
+
+ int recvd = ::recv( _fd, testBuf, 1, portRecvFlags );
+
+ if ( recvd < 0 ) {
+ // An error occurred during recv, warn and log errno
+ warning() << "Socket recv() failed during connectivity check"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")"
+ << causedBy(errnoWithDescription()) << endl;
+ }
+ else if ( recvd > 0 ) {
+ // We got nonzero data from this socket, very weird?
+ // Log and warn at runtime, log and abort at devtime
+ // TODO: Dump the data to the log somehow?
+ error() << "Socket found pending data during connectivity check"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")" << endl;
+ dassert( false );
+ }
+ else {
+ // recvd == 0, socket closed remotely, just return false
+ LOG( 0 ) << "Socket closed remotely, no longer connected"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")" << endl;
+ }
+ }
+ else if ( pollInfo.revents & POLLHUP ) {
+ // A hangup has occurred on this socket
+ LOG( _logLevel ) << "Socket hangup detected, no longer connected"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")" << endl;
+ }
+ else if ( pollInfo.revents & POLLERR ) {
+ // An error has occurred on this socket
+ LOG( _logLevel ) << "Socket error detected, no longer connected"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")" << endl;
+ }
+ else if ( pollInfo.revents & POLLNVAL ) {
+ // Socket descriptor itself is weird
+ // Log and warn at runtime, log and abort at devtime
+ error() << "Socket descriptor detected as invalid"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")" << endl;
+ dassert( false );
+ }
+ else {
+ // Don't know what poll is saying here
+ // Log and warn at runtime, log and abort at devtime
+ error() << "Socket had unknown event (" << static_cast<int>(pollInfo.revents) << ")"
+ << " (idle " << idleTimeSecs << " secs,"
+ << " remote host " << remoteString() << ")" << endl;
+ dassert( false );
+ }
+
+ return false;
+ }
+
+#endif // End Not Windows XP
+
#if defined(_WIN32)
struct WinsockInit {
WinsockInit() {
diff --git a/src/mongo/util/net/sock.h b/src/mongo/util/net/sock.h
index f867478da68..c1c610a8d01 100644
--- a/src/mongo/util/net/sock.h
+++ b/src/mongo/util/net/sock.h
@@ -177,6 +177,9 @@ namespace mongo {
*/
class Socket : boost::noncopyable {
public:
+
+ static const int errorPollIntervalSecs;
+
Socket(int sock, const SockAddr& farEnd);
/** In some cases the timeout will actually be 2x this value - eg we do a partial send,
@@ -211,6 +214,7 @@ namespace mongo {
long long getBytesOut() const { return _bytesOut; }
void setTimeout( double secs );
+ bool isStillConnected();
#ifdef MONGO_SSL
/** secures inline */
@@ -256,6 +260,7 @@ namespace mongo {
long long _bytesIn;
long long _bytesOut;
+ time_t _lastValidityCheckAtSecs;
#ifdef MONGO_SSL
SSL* _ssl;