summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDwight <dmerriman@gmail.com>2008-09-29 18:32:39 -0400
committerDwight <dmerriman@gmail.com>2008-09-29 18:32:39 -0400
commit7123dd1454b6d0a93ae334ecdee7e8e5a250b69f (patch)
tree97bcd4f1cd3f1ccd704d614daa1e0f6db7a0c42b
parentefe9d8be2059edbb8b45c91f1560cd9545ac0814 (diff)
downloadmongo-7123dd1454b6d0a93ae334ecdee7e8e5a250b69f.tar.gz
connpool: handle socket failures properly
-rw-r--r--db/dbclient.cpp17
-rw-r--r--db/dbclient.h16
-rw-r--r--dbgrid/connpool.cpp4
-rw-r--r--dbgrid/connpool.h22
-rw-r--r--dbgrid/dbgrid.cpp23
5 files changed, 69 insertions, 13 deletions
diff --git a/db/dbclient.cpp b/db/dbclient.cpp
index ee0856a1a59..b0f3f0358e4 100644
--- a/db/dbclient.cpp
+++ b/db/dbclient.cpp
@@ -59,7 +59,8 @@ bool DBClientConnection::connect(const char *serverAddress, string& errmsg) {
//cout << "port:" << port << endl;
server = auto_ptr<SockAddr>(new SockAddr(ip.c_str(), port));
if( !p.connect(*server) ) {
- errmsg = string("couldn't connect to server ") + serverAddress + ' ' + ip;
+ errmsg = string("couldn't connect to server ") + serverAddress + ' ' + ip;
+ failed = true;
return false;
}
return true;
@@ -80,14 +81,14 @@ auto_ptr<DBClientCursor> DBClientConnection::query(const char *ns, JSObj query,
Message toSend;
toSend.setData(dbQuery, b.buf(), b.len());
auto_ptr<Message> response(new Message());
- bool ok = p.call(toSend, *response);
- if( !ok )
+ if( !p.call(toSend, *response) ) {
+ failed = true;
return auto_ptr<DBClientCursor>(0);
+ }
- auto_ptr<DBClientCursor> c(new DBClientCursor(p, response, opts));
+ auto_ptr<DBClientCursor> c(new DBClientCursor(this, p, response, opts));
c->ns = ns;
c->nToReturn = nToReturn;
-
return c;
}
@@ -105,8 +106,10 @@ void DBClientCursor::requestMore() {
Message toSend;
toSend.setData(dbGetMore, b.buf(), b.len());
auto_ptr<Message> response(new Message());
- bool ok = p.call(toSend, *response);
- massert("dbclient error communicating with server", ok);
+ if( !p.call(toSend, *response) ) {
+ conn->failed = true;
+ massert("dbclient error communicating with server", false);
+ }
m = response;
dataReceived();
diff --git a/db/dbclient.h b/db/dbclient.h
index 3af4f544452..4a14aa266d1 100644
--- a/db/dbclient.h
+++ b/db/dbclient.h
@@ -54,6 +54,7 @@ struct QueryResult : public MsgData {
class DBClientCursor : boost::noncopyable {
friend class DBClientConnection;
+ DBClientConnection *conn;
MessagingPort& p;
long long cursorId;
int nReturned;
@@ -65,13 +66,15 @@ class DBClientCursor : boost::noncopyable {
int nToReturn;
void dataReceived();
void requestMore();
-public:
- DBClientCursor(MessagingPort& _p, auto_ptr<Message> _m, int _opts) :
- p(_p), m(_m), opts(_opts) {
+
+ DBClientCursor(DBClientConnection *_conn, MessagingPort& _p, auto_ptr<Message> _m, int _opts) :
+ conn(_conn), p(_p), m(_m), opts(_opts) {
cursorId = 0;
dataReceived();
}
-
+
+public:
+
bool more(); // if true, safe to call next()
/* returns next object in the result cursor.
@@ -97,10 +100,13 @@ public:
};
class DBClientConnection : boost::noncopyable {
+ friend class DBClientCursor;
MessagingPort p;
auto_ptr<SockAddr> server;
+ bool failed; // true if some sort of fatal error has ever happened
public:
- DBClientConnection() { }
+ bool isFailed() const { return failed; }
+ DBClientConnection() : failed(false) { }
bool connect(const char *serverHostname, string& errmsg);
/* send a query to the database.
diff --git a/dbgrid/connpool.cpp b/dbgrid/connpool.cpp
index 6d78514f6a3..eed74bfe52e 100644
--- a/dbgrid/connpool.cpp
+++ b/dbgrid/connpool.cpp
@@ -21,7 +21,11 @@
#include "stdafx.h"
#include "connpool.h"
+DBConnectionPool pool;
+
DBClientConnection* DBConnectionPool::get(const string& host) {
+ boostlock L(poolMutex);
+
PoolForHost *&p = pools[host];
if( p == 0 )
p = new PoolForHost();
diff --git a/dbgrid/connpool.h b/dbgrid/connpool.h
index 1318140a345..d28a5fffee8 100644
--- a/dbgrid/connpool.h
+++ b/dbgrid/connpool.h
@@ -26,10 +26,12 @@ struct PoolForHost {
};
class DBConnectionPool {
+ boost::mutex poolMutex;
map<string,PoolForHost*> pools;
public:
DBClientConnection *get(const string& host);
void release(const string& host, DBClientConnection *c) {
+ boostlock L(poolMutex);
pools[host]->pool.push(c);
}
};
@@ -46,5 +48,23 @@ public:
ScopedDbConnection(const string& _host) :
host(_host), _conn( pool.get(_host) ) { }
- ~ScopedDbConnection() { pool.release(host, _conn); }
+ void done() {
+ if( _conn->isFailed() )
+ delete _conn;
+ else
+ pool.release(host, _conn);
+ _conn = 0;
+ }
+
+ ~ScopedDbConnection() {
+ if( _conn ) {
+ /* you are supposed to call done(). if you did that, correctly, we
+ only get here if an exception was thrown. in such a scenario, we can't
+ be sure we fully read all expected data of a reply on the socket. so
+ we don't try to reuse the connection.
+ */
+ cout << "~ScopedDBConnection: _conn != null\n";
+ delete _conn;
+ }
+ }
};
diff --git a/dbgrid/dbgrid.cpp b/dbgrid/dbgrid.cpp
index e84a054625f..1369a934dc9 100644
--- a/dbgrid/dbgrid.cpp
+++ b/dbgrid/dbgrid.cpp
@@ -20,6 +20,7 @@
#include "../grid/message.h"
#include "../util/unittest.h"
#include "database.h"
+#include "connpool.h"
const char *curNs = "";
Client *client = 0;
@@ -96,6 +97,28 @@ public:
void start() {
Database::load();
+
+/*
+ try {
+cout << "TEMP" << endl;
+{
+ ScopedDbConnection c("localhost");
+ cout << c.conn().findOne("dwight.bar", emptyObj).toString() << endl;
+ c.done();
+ cout << "OK1" << endl;
+}
+{
+ ScopedDbConnection c("localhost");
+ c.conn().findOne("dwight.bar", emptyObj);
+ c.done();
+ cout << "OK1" << endl;
+}
+cout << "OK2" << endl;
+ } catch(...) {
+cout << "exception" << endl;
+ }
+*/
+
log() << "waiting for connections on port " << port << "..." << endl;
DbGridListener l(port);
l.listen();