diff options
author | Dwight <dmerriman@gmail.com> | 2008-09-29 18:32:39 -0400 |
---|---|---|
committer | Dwight <dmerriman@gmail.com> | 2008-09-29 18:32:39 -0400 |
commit | 7123dd1454b6d0a93ae334ecdee7e8e5a250b69f (patch) | |
tree | 97bcd4f1cd3f1ccd704d614daa1e0f6db7a0c42b | |
parent | efe9d8be2059edbb8b45c91f1560cd9545ac0814 (diff) | |
download | mongo-7123dd1454b6d0a93ae334ecdee7e8e5a250b69f.tar.gz |
connpool: handle socket failures properly
-rw-r--r-- | db/dbclient.cpp | 17 | ||||
-rw-r--r-- | db/dbclient.h | 16 | ||||
-rw-r--r-- | dbgrid/connpool.cpp | 4 | ||||
-rw-r--r-- | dbgrid/connpool.h | 22 | ||||
-rw-r--r-- | dbgrid/dbgrid.cpp | 23 |
5 files changed, 69 insertions, 13 deletions
diff --git a/db/dbclient.cpp b/db/dbclient.cpp index ee0856a1a59..b0f3f0358e4 100644 --- a/db/dbclient.cpp +++ b/db/dbclient.cpp @@ -59,7 +59,8 @@ bool DBClientConnection::connect(const char *serverAddress, string& errmsg) { //cout << "port:" << port << endl; server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
if( !p.connect(*server) ) { - errmsg = string("couldn't connect to server ") + serverAddress + ' ' + ip; + errmsg = string("couldn't connect to server ") + serverAddress + ' ' + ip; + failed = true; return false; } return true;
@@ -80,14 +81,14 @@ auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query, Message toSend;
toSend.setData(dbQuery, b.buf(), b.len());
auto_ptr<Message> response(new Message());
- bool ok = p.call(toSend, *response);
- if( !ok )
+ if( !p.call(toSend, *response) ) {
+ failed = true;
return auto_ptr<DBClientCursor>(0);
+ }
- auto_ptr<DBClientCursor> c(new DBClientCursor(p, response, opts));
+ auto_ptr<DBClientCursor> c(new DBClientCursor(this, p, response, opts));
c->ns = ns;
c->nToReturn = nToReturn;
-
return c;
}
@@ -105,8 +106,10 @@ void DBClientCursor::requestMore() { Message toSend;
toSend.setData(dbGetMore, b.buf(), b.len());
auto_ptr<Message> response(new Message());
- bool ok = p.call(toSend, *response);
- massert("dbclient error communicating with server", ok);
+ if( !p.call(toSend, *response) ) {
+ conn->failed = true;
+ massert("dbclient error communicating with server", false);
+ }
m = response;
dataReceived();
diff --git a/db/dbclient.h b/db/dbclient.h index 3af4f544452..4a14aa266d1 100644 --- a/db/dbclient.h +++ b/db/dbclient.h @@ -54,6 +54,7 @@ struct QueryResult : public MsgData { class DBClientCursor : boost::noncopyable { friend class DBClientConnection; + DBClientConnection *conn; MessagingPort& p; long long cursorId; int nReturned; @@ -65,13 +66,15 @@ class DBClientCursor : boost::noncopyable { int nToReturn; void dataReceived(); void requestMore(); -public: - DBClientCursor(MessagingPort& _p, auto_ptr<Message> _m, int _opts) : - p(_p), m(_m), opts(_opts) { + + DBClientCursor(DBClientConnection *_conn, MessagingPort& _p, auto_ptr<Message> _m, int _opts) : + conn(_conn), p(_p), m(_m), opts(_opts) { cursorId = 0; dataReceived(); } - + +public: + bool more(); // if true, safe to call next() /* returns next object in the result cursor. @@ -97,10 +100,13 @@ public: }; class DBClientConnection : boost::noncopyable { + friend class DBClientCursor; MessagingPort p; auto_ptr<SockAddr> server; + bool failed; // true if some sort of fatal error has ever happened public: - DBClientConnection() { } + bool isFailed() const { return failed; } + DBClientConnection() : failed(false) { } bool connect(const char *serverHostname, string& errmsg); /* send a query to the database. diff --git a/dbgrid/connpool.cpp b/dbgrid/connpool.cpp index 6d78514f6a3..eed74bfe52e 100644 --- a/dbgrid/connpool.cpp +++ b/dbgrid/connpool.cpp @@ -21,7 +21,11 @@ #include "stdafx.h"
#include "connpool.h"
+DBConnectionPool pool;
+
DBClientConnection* DBConnectionPool::get(const string& host) {
+ boostlock L(poolMutex);
+
PoolForHost *&p = pools[host];
if( p == 0 )
p = new PoolForHost();
diff --git a/dbgrid/connpool.h b/dbgrid/connpool.h index 1318140a345..d28a5fffee8 100644 --- a/dbgrid/connpool.h +++ b/dbgrid/connpool.h @@ -26,10 +26,12 @@ struct PoolForHost { }; class DBConnectionPool { + boost::mutex poolMutex; map<string,PoolForHost*> pools; public: DBClientConnection *get(const string& host); void release(const string& host, DBClientConnection *c) { + boostlock L(poolMutex); pools[host]->pool.push(c); } }; @@ -46,5 +48,23 @@ public: ScopedDbConnection(const string& _host) : host(_host), _conn( pool.get(_host) ) { } - ~ScopedDbConnection() { pool.release(host, _conn); } + void done() { + if( _conn->isFailed() ) + delete _conn; + else + pool.release(host, _conn); + _conn = 0; + } + + ~ScopedDbConnection() { + if( _conn ) { + /* you are supposed to call done(). if you did that, correctly, we + only get here if an exception was thrown. in such a scenario, we can't + be sure we fully read all expected data of a reply on the socket. so + we don't try to reuse the connection. + */ + cout << "~ScopedDBConnection: _conn != null\n"; + delete _conn; + } + } }; diff --git a/dbgrid/dbgrid.cpp b/dbgrid/dbgrid.cpp index e84a054625f..1369a934dc9 100644 --- a/dbgrid/dbgrid.cpp +++ b/dbgrid/dbgrid.cpp @@ -20,6 +20,7 @@ #include "../grid/message.h" #include "../util/unittest.h" #include "database.h" +#include "connpool.h" const char *curNs = ""; Client *client = 0; @@ -96,6 +97,28 @@ public: void start() { Database::load(); + +/* + try { +cout << "TEMP" << endl; +{ + ScopedDbConnection c("localhost"); + cout << c.conn().findOne("dwight.bar", emptyObj).toString() << endl; + c.done(); + cout << "OK1" << endl; +} +{ + ScopedDbConnection c("localhost"); + c.conn().findOne("dwight.bar", emptyObj); + c.done(); + cout << "OK1" << endl; +} +cout << "OK2" << endl; + } catch(...) { +cout << "exception" << endl; + } +*/ + log() << "waiting for connections on port " << port << "..." << endl; DbGridListener l(port); l.listen(); |