summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2010-03-22 11:47:37 -0400
committerEliot Horowitz <eliot@10gen.com>2010-03-22 11:47:37 -0400
commit1e6780f8444e2477ad6317a4edc62b43b6ad35cc (patch)
tree42907a4cfb483695ed7d8142be24eeb9bb95efb5
parentced7756639f0ed3171cea587534ef2ead75004a1 (diff)
downloadmongo-1e6780f8444e2477ad6317a4edc62b43b6ad35cc.tar.gz
can use sharding with SyncClusterConnection for 2 phase commit across 3 servers SHARDING-39
-rw-r--r--client/connpool.cpp24
-rw-r--r--client/connpool.h9
-rw-r--r--client/dbclient.h1
-rw-r--r--client/syncclusterconnection.cpp55
-rw-r--r--client/syncclusterconnection.h9
-rw-r--r--db/dbcommands.cpp3
-rw-r--r--jstests/sharding/sync2.js48
-rw-r--r--s/config.cpp11
-rw-r--r--s/strategy.cpp8
-rw-r--r--s/util.h1
-rw-r--r--scripting/sm_db.cpp1
-rw-r--r--shell/servers.js23
-rw-r--r--util/message.h6
13 files changed, 158 insertions, 41 deletions
diff --git a/client/connpool.cpp b/client/connpool.cpp
index eaaf4193d18..d69c78741f3 100644
--- a/client/connpool.cpp
+++ b/client/connpool.cpp
@@ -21,6 +21,7 @@
#include "stdafx.h"
#include "connpool.h"
#include "../db/commands.h"
+#include "syncclusterconnection.h"
namespace mongo {
@@ -33,11 +34,13 @@ namespace mongo {
if ( p == 0 )
p = new PoolForHost();
if ( p->pool.empty() ) {
- string errmsg;
+ int numCommas = DBClientBase::countCommas( host );
DBClientBase *c;
- if( host.find(',') == string::npos ) {
+
+ if( numCommas == 0 ) {
DBClientConnection *cc = new DBClientConnection(true);
log(2) << "creating new connection for pool to:" << host << endl;
+ string errmsg;
if ( !cc->connect(host.c_str(), errmsg) ) {
delete cc;
uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false);
@@ -46,7 +49,7 @@ namespace mongo {
c = cc;
onCreate( c );
}
- else {
+ else if ( numCommas == 1 ) {
DBClientPaired *p = new DBClientPaired();
if( !p->connect(host) ) {
delete p;
@@ -55,6 +58,12 @@ namespace mongo {
}
c = p;
}
+ else if ( numCommas == 2 ) {
+ c = new SyncClusterConnection( host );
+ }
+ else {
+ uassert( 13071 , (string)"invalid hostname [" + host + "]" , 0 );
+ }
return c;
}
DBClientBase *c = p->pool.top();
@@ -105,6 +114,15 @@ namespace mongo {
}
}
+ ScopedDbConnection::~ScopedDbConnection() {
+ if ( _conn && ! _conn->isFailed() ) {
+ /* see done() comments above for why we log this line */
+ log() << "~ScopedDBConnection: _conn != null" << endl;
+ kill();
+ }
+ }
+
+
class PoolFlushCmd : public Command {
public:
PoolFlushCmd() : Command( "connpoolsync" ){}
diff --git a/client/connpool.h b/client/connpool.h
index 408799ab69c..5a47b01b798 100644
--- a/client/connpool.h
+++ b/client/connpool.h
@@ -122,14 +122,9 @@ namespace mongo {
pool.release(host, _conn);
_conn = 0;
}
+
+ ~ScopedDbConnection();
- ~ScopedDbConnection() {
- if ( _conn && ! _conn->isFailed() ) {
- /* see done() comments above for why we log this line */
- log() << "~ScopedDBConnection: _conn != null" << endl;
- kill();
- }
- }
};
} // namespace mongo
diff --git a/client/dbclient.h b/client/dbclient.h
index 3141d4588b6..ebd3b7379de 100644
--- a/client/dbclient.h
+++ b/client/dbclient.h
@@ -824,7 +824,6 @@ namespace mongo {
return serverAddress;
}
- protected:
virtual bool call( Message &toSend, Message &response, bool assertOk = true );
virtual void say( Message &toSend );
virtual void sayPiggyBack( Message &toSend );
diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp
index 3a952b2ca75..0a8fc797a7c 100644
--- a/client/syncclusterconnection.cpp
+++ b/client/syncclusterconnection.cpp
@@ -18,12 +18,14 @@
#include "stdafx.h"
#include "syncclusterconnection.h"
+#include "../db/dbmessage.h"
// error codes 8000-8009
namespace mongo {
SyncClusterConnection::SyncClusterConnection( string commaSeperated ){
+ _address = commaSeperated;
string::size_type idx;
while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){
string h = commaSeperated.substr( 0 , idx );
@@ -35,12 +37,17 @@ namespace mongo {
}
SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){
+ _address = a + "," + b + "," + c;
// connect to all even if not working
_connect( a );
_connect( b );
_connect( c );
}
+ SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ){
+ assert(0);
+ }
+
SyncClusterConnection::~SyncClusterConnection(){
for ( size_t i=0; i<_conns.size(); i++ )
delete _conns[i];
@@ -111,7 +118,7 @@ namespace mongo {
}
void SyncClusterConnection::_connect( string host ){
- log() << "SyncClusterConnection connecting to: " << host << endl;
+ log() << "SyncClusterConnection connecting to [" << host << "]" << endl;
DBClientConnection * c = new DBClientConnection( true );
string errmsg;
if ( ! c->connect( host , errmsg ) )
@@ -138,7 +145,7 @@ namespace mongo {
lockType = i->second;
}
- uassert( 13054 , "write $cmd not supported in SyncClusterConnection" , lockType <= 0 );
+ uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection: " + cmdName , lockType <= 0 );
}
return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize );
@@ -193,25 +200,47 @@ namespace mongo {
uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0);
}
- void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ assert(0); }
+ void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){
+ assert(0);
+ }
- void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ assert(0); }
+ void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){
+ string errmsg;
+ if ( ! prepare( errmsg ) )
+ throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg );
- string SyncClusterConnection::_toString() const {
- stringstream ss;
- ss << "SyncClusterConnection [";
for ( size_t i=0; i<_conns.size(); i++ ){
- if ( i > 0 )
- ss << ",";
- ss << _conns[i]->toString();
+ _conns[i]->update( ns , query , obj , upsert , multi );
}
- ss << "]";
+
+ _checkLast();
+ }
+
+ string SyncClusterConnection::_toString() const {
+ stringstream ss;
+ ss << "SyncClusterConnection [" << _address << "]";
return ss.str();
}
bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk ){
- assert(0);
- return false;
+ uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" ,
+ toSend.operation() == dbQuery );
+
+ DbMessage d( toSend );
+ uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 );
+
+ for ( size_t i=0; i<_conns.size(); i++ ){
+ try {
+ bool ok = _conns[i]->call( toSend , response , assertOk );
+ if ( ok )
+ return ok;
+ log() << "call failed to: " << _conns[i]->toString() << " no data" << endl;
+ }
+ catch ( ... ){
+ log() << "call failed to: " << _conns[i]->toString() << " exception" << endl;
+ }
+ }
+ throw UserException( 8008 , "all servers down!" );
}
void SyncClusterConnection::say( Message &toSend ){
diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h
index 1a06249b3aa..e3411e1ce4b 100644
--- a/client/syncclusterconnection.h
+++ b/client/syncclusterconnection.h
@@ -68,9 +68,7 @@ namespace mongo {
virtual void say( Message &toSend );
virtual void sayPiggyBack( Message &toSend );
- virtual string getServerAddress() const {
- return _toString();
- }
+ virtual string getServerAddress() const { return _address; }
virtual bool isFailed() const {
return false;
@@ -78,6 +76,8 @@ namespace mongo {
private:
+ SyncClusterConnection( SyncClusterConnection& prev );
+
string _toString() const;
bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0);
@@ -90,8 +90,9 @@ namespace mongo {
void _checkLast();
void _connect( string host );
+
+ string _address;
vector<DBClientConnection*> _conns;
-
map<string,int> _lockTypes;
};
diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp
index 264ceff745c..a8d826dc682 100644
--- a/db/dbcommands.cpp
+++ b/db/dbcommands.cpp
@@ -437,9 +437,10 @@ namespace mongo {
virtual bool slaveOk() {
return true;
}
- virtual LockType locktype(){ return WRITE; }
+ virtual LockType locktype(){ return NONE; }
CmdGetOpTime() : Command("getoptime") { }
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
+ writelock l( "" );
result.appendDate("optime", OpTime::now().asDate());
return true;
}
diff --git a/jstests/sharding/sync2.js b/jstests/sharding/sync2.js
new file mode 100644
index 00000000000..b0bbcb6cf12
--- /dev/null
+++ b/jstests/sharding/sync2.js
@@ -0,0 +1,48 @@
+// sync2.js
+
+s = new ShardingTest( "sync2" , 3 , 50 , 2 , { sync : true } );
+
+s2 = s._mongos[1];
+
+s.adminCommand( { enablesharding : "test" } );
+s.adminCommand( { shardcollection : "test.foo" , key : { num : 1 } } );
+
+s.getDB( "test" ).foo.save( { num : 1 } );
+s.getDB( "test" ).foo.save( { num : 2 } );
+s.getDB( "test" ).foo.save( { num : 3 } );
+s.getDB( "test" ).foo.save( { num : 4 } );
+s.getDB( "test" ).foo.save( { num : 5 } );
+s.getDB( "test" ).foo.save( { num : 6 } );
+s.getDB( "test" ).foo.save( { num : 7 } );
+
+assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal A" );
+assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other A" );
+
+s.adminCommand( { split : "test.foo" , middle : { num : 4 } } );
+s.adminCommand( { movechunk : "test.foo" , find : { num : 3 } , to : s.getFirstOther( s.getServer( "test" ) ).name } );
+
+assert( s._connections[0].getDB( "test" ).foo.find().toArray().length > 0 , "blah 1" );
+assert( s._connections[1].getDB( "test" ).foo.find().toArray().length > 0 , "blah 2" );
+assert.eq( 7 , s._connections[0].getDB( "test" ).foo.find().toArray().length +
+ s._connections[1].getDB( "test" ).foo.find().toArray().length , "blah 3" );
+
+assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B" );
+assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B" );
+
+s.adminCommand( { split : "test.foo" , middle : { num : 2 } } );
+s.printChunks();
+
+print( "* A" );
+
+assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 1" );
+assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B 2" );
+print( "* B" );
+assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 3" );
+assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B 4" );
+
+for ( var i=0; i<10; i++ ){
+ print( "* C " + i );
+ assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B " + i );
+}
+
+s.stop();
diff --git a/s/config.cpp b/s/config.cpp
index 467b5d8e597..c3c3668c5ab 100644
--- a/s/config.cpp
+++ b/s/config.cpp
@@ -371,13 +371,18 @@ namespace mongo {
}
ourHostname = hn;
+ stringstream fullString;
+
set<string> hosts;
for ( size_t i=0; i<configHosts.size(); i++ ){
string host = configHosts[i];
hosts.insert( getHost( host , false ) );
configHosts[i] = getHost( host , true );
+ if ( i > 0 )
+ fullString << ",";
+ fullString << configHosts[i];
}
-
+
for ( set<string>::iterator i=hosts.begin(); i!=hosts.end(); i++ ){
string host = *i;
bool ok = false;
@@ -393,8 +398,8 @@ namespace mongo {
return false;
}
- uassert( 10188 , "can only hand 1 config db right now" , configHosts.size() == 1 );
- _primary = configHosts[0];
+ _primary = fullString.str();
+ log(1) << " config string : " << fullString.str() << endl;
return true;
}
diff --git a/s/strategy.cpp b/s/strategy.cpp
index 862956f1e2f..b7277e3190c 100644
--- a/s/strategy.cpp
+++ b/s/strategy.cpp
@@ -41,14 +41,12 @@ namespace mongo {
void Strategy::doQuery( Request& r , string server ){
try{
ScopedDbConnection dbcon( server );
- DBClientBase &_c = dbcon.conn();
+ DBClientBase &c = dbcon.conn();
- checkShardVersion( _c , r.getns() );
+ checkShardVersion( c , r.getns() );
- // TODO: This will not work with Paired connections. Fix.
- DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
Message response;
- bool ok = c.port().call( r.m(), response);
+ bool ok = c.call( r.m(), response);
{
QueryResult *qr = (QueryResult *) response.data;
diff --git a/s/util.h b/s/util.h
index ba40a29bebf..2c62642fcf4 100644
--- a/s/util.h
+++ b/s/util.h
@@ -35,6 +35,7 @@ namespace mongo {
stringstream s;
s << "StaleConfigException ns: " << ns << " " << msg;
_msg = s.str();
+ log(1) << _msg << endl;
}
virtual ~StaleConfigException() throw(){}
diff --git a/scripting/sm_db.cpp b/scripting/sm_db.cpp
index c74baf96ba1..eddf56dc5e7 100644
--- a/scripting/sm_db.cpp
+++ b/scripting/sm_db.cpp
@@ -226,6 +226,7 @@ namespace mongo {
auto_ptr<DBClientCursor> cursor = conn->query( ns , q , nToReturn , nToSkip , f.nFields() ? &f : 0 , slaveOk ? QueryOption_SlaveOk : 0 , batchSize );
if ( ! cursor.get() ){
+ log() << "query failed : " << ns << " " << q << " to: " << conn->toString() << endl;
JS_ReportError( cx , "error doing query: failed" );
return JS_FALSE;
}
diff --git a/shell/servers.js b/shell/servers.js
index f753b5f17f2..79d01a4aaaf 100644
--- a/shell/servers.js
+++ b/shell/servers.js
@@ -136,15 +136,24 @@ ShardingTest = function( testName , numServers , verboseLevel , numMongos , othe
if ( ! otherParams )
otherParams = {}
this._connections = [];
+
+ if ( otherParams.sync && numServers < 3 )
+ throw "if you want sync, you need at least 3 servers";
for ( var i=0; i<numServers; i++){
var conn = startMongodTest( 30000 + i , testName + i );
this._connections.push( conn );
}
- this._configDB = "localhost:30000";
- this._connections[0].getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } );
-
+ if ( otherParams.sync ){
+ this._configDB = "localhost:30000,localhost:30001,localhost:30002";
+ this._configConnection = new Mongo( this._configDB );
+ this._configConnection.getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } );
+ }
+ else {
+ this._configDB = "localhost:30000";
+ this._connections[0].getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } );
+ }
this._mongos = [];
var startMongosPort = 31000;
@@ -195,6 +204,14 @@ ShardingTest.prototype.getOther = function( one ){
return this._connections[0];
}
+ShardingTest.prototype.getFirstOther = function( one ){
+ for ( var i=0; i<this._connections.length; i++ ){
+ if ( this._connections[i] != one )
+ return this._connections[i];
+ }
+ throw "impossible";
+}
+
ShardingTest.prototype.stop = function(){
for ( var i=0; i<this._mongos.length; i++ ){
stopMongoProgram( 31000 - i );
diff --git a/util/message.h b/util/message.h
index 5891563c205..5dccaef3e40 100644
--- a/util/message.h
+++ b/util/message.h
@@ -168,10 +168,14 @@ namespace mongo {
~Message() {
reset();
}
-
+
SockAddr from;
MsgData *data;
+ int operation() const {
+ return data->operation();
+ }
+
Message& operator=(Message& r) {
assert( data == 0 );
data = r.data;