diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-03-22 11:47:37 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-03-22 11:47:37 -0400 |
commit | 1e6780f8444e2477ad6317a4edc62b43b6ad35cc (patch) | |
tree | 42907a4cfb483695ed7d8142be24eeb9bb95efb5 | |
parent | ced7756639f0ed3171cea587534ef2ead75004a1 (diff) | |
download | mongo-1e6780f8444e2477ad6317a4edc62b43b6ad35cc.tar.gz |
can use sharding with SyncClusterConnection for 2 phase commit across 3 servers SHARDING-39
-rw-r--r-- | client/connpool.cpp | 24 | ||||
-rw-r--r-- | client/connpool.h | 9 | ||||
-rw-r--r-- | client/dbclient.h | 1 | ||||
-rw-r--r-- | client/syncclusterconnection.cpp | 55 | ||||
-rw-r--r-- | client/syncclusterconnection.h | 9 | ||||
-rw-r--r-- | db/dbcommands.cpp | 3 | ||||
-rw-r--r-- | jstests/sharding/sync2.js | 48 | ||||
-rw-r--r-- | s/config.cpp | 11 | ||||
-rw-r--r-- | s/strategy.cpp | 8 | ||||
-rw-r--r-- | s/util.h | 1 | ||||
-rw-r--r-- | scripting/sm_db.cpp | 1 | ||||
-rw-r--r-- | shell/servers.js | 23 | ||||
-rw-r--r-- | util/message.h | 6 |
13 files changed, 158 insertions, 41 deletions
diff --git a/client/connpool.cpp b/client/connpool.cpp index eaaf4193d18..d69c78741f3 100644 --- a/client/connpool.cpp +++ b/client/connpool.cpp @@ -21,6 +21,7 @@ #include "stdafx.h" #include "connpool.h" #include "../db/commands.h" +#include "syncclusterconnection.h" namespace mongo { @@ -33,11 +34,13 @@ namespace mongo { if ( p == 0 ) p = new PoolForHost(); if ( p->pool.empty() ) { - string errmsg; + int numCommas = DBClientBase::countCommas( host ); DBClientBase *c; - if( host.find(',') == string::npos ) { + + if( numCommas == 0 ) { DBClientConnection *cc = new DBClientConnection(true); log(2) << "creating new connection for pool to:" << host << endl; + string errmsg; if ( !cc->connect(host.c_str(), errmsg) ) { delete cc; uassert( 11002 , (string)"dbconnectionpool: connect failed " + host , false); @@ -46,7 +49,7 @@ namespace mongo { c = cc; onCreate( c ); } - else { + else if ( numCommas == 1 ) { DBClientPaired *p = new DBClientPaired(); if( !p->connect(host) ) { delete p; @@ -55,6 +58,12 @@ namespace mongo { } c = p; } + else if ( numCommas == 2 ) { + c = new SyncClusterConnection( host ); + } + else { + uassert( 13071 , (string)"invalid hostname [" + host + "]" , 0 ); + } return c; } DBClientBase *c = p->pool.top(); @@ -105,6 +114,15 @@ namespace mongo { } } + ScopedDbConnection::~ScopedDbConnection() { + if ( _conn && ! _conn->isFailed() ) { + /* see done() comments above for why we log this line */ + log() << "~ScopedDBConnection: _conn != null" << endl; + kill(); + } + } + + class PoolFlushCmd : public Command { public: PoolFlushCmd() : Command( "connpoolsync" ){} diff --git a/client/connpool.h b/client/connpool.h index 408799ab69c..5a47b01b798 100644 --- a/client/connpool.h +++ b/client/connpool.h @@ -122,14 +122,9 @@ namespace mongo { pool.release(host, _conn); _conn = 0; } + + ~ScopedDbConnection(); - ~ScopedDbConnection() { - if ( _conn && ! _conn->isFailed() ) { - /* see done() comments above for why we log this line */ - log() << "~ScopedDBConnection: _conn != null" << endl; - kill(); - } - } }; } // namespace mongo diff --git a/client/dbclient.h b/client/dbclient.h index 3141d4588b6..ebd3b7379de 100644 --- a/client/dbclient.h +++ b/client/dbclient.h @@ -824,7 +824,6 @@ namespace mongo { return serverAddress; } - protected: virtual bool call( Message &toSend, Message &response, bool assertOk = true ); virtual void say( Message &toSend ); virtual void sayPiggyBack( Message &toSend ); diff --git a/client/syncclusterconnection.cpp b/client/syncclusterconnection.cpp index 3a952b2ca75..0a8fc797a7c 100644 --- a/client/syncclusterconnection.cpp +++ b/client/syncclusterconnection.cpp @@ -18,12 +18,14 @@ #include "stdafx.h" #include "syncclusterconnection.h" +#include "../db/dbmessage.h" // error codes 8000-8009 namespace mongo { SyncClusterConnection::SyncClusterConnection( string commaSeperated ){ + _address = commaSeperated; string::size_type idx; while ( ( idx = commaSeperated.find( ',' ) ) != string::npos ){ string h = commaSeperated.substr( 0 , idx ); @@ -35,12 +37,17 @@ namespace mongo { } SyncClusterConnection::SyncClusterConnection( string a , string b , string c ){ + _address = a + "," + b + "," + c; // connect to all even if not working _connect( a ); _connect( b ); _connect( c ); } + SyncClusterConnection::SyncClusterConnection( SyncClusterConnection& prev ){ + assert(0); + } + SyncClusterConnection::~SyncClusterConnection(){ for ( size_t i=0; i<_conns.size(); i++ ) delete _conns[i]; @@ -111,7 +118,7 @@ namespace mongo { } void SyncClusterConnection::_connect( string host ){ - log() << "SyncClusterConnection connecting to: " << host << endl; + log() << "SyncClusterConnection connecting to [" << host << "]" << endl; DBClientConnection * c = new DBClientConnection( true ); string errmsg; if ( ! c->connect( host , errmsg ) ) @@ -138,7 +145,7 @@ namespace mongo { lockType = i->second; } - uassert( 13054 , "write $cmd not supported in SyncClusterConnection" , lockType <= 0 ); + uassert( 13054 , (string)"write $cmd not supported in SyncClusterConnection: " + cmdName , lockType <= 0 ); } return _queryOnActive( ns , query , nToReturn , nToSkip , fieldsToReturn , queryOptions , batchSize ); @@ -193,25 +200,47 @@ namespace mongo { uassert( 10023 , "SyncClusterConnection bulk insert not implemented" , 0); } - void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ assert(0); } + void SyncClusterConnection::remove( const string &ns , Query query, bool justOne ){ + assert(0); + } - void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ assert(0); } + void SyncClusterConnection::update( const string &ns , Query query , BSONObj obj , bool upsert , bool multi ){ + string errmsg; + if ( ! prepare( errmsg ) ) + throw UserException( 8005 , (string)"SyncClusterConnection::udpate prepare failed: " + errmsg ); - string SyncClusterConnection::_toString() const { - stringstream ss; - ss << "SyncClusterConnection ["; for ( size_t i=0; i<_conns.size(); i++ ){ - if ( i > 0 ) - ss << ","; - ss << _conns[i]->toString(); + _conns[i]->update( ns , query , obj , upsert , multi ); } - ss << "]"; + + _checkLast(); + } + + string SyncClusterConnection::_toString() const { + stringstream ss; + ss << "SyncClusterConnection [" << _address << "]"; return ss.str(); } bool SyncClusterConnection::call( Message &toSend, Message &response, bool assertOk ){ - assert(0); - return false; + uassert( 8006 , "SyncClusterConnection::call can only be used directly for dbQuery" , + toSend.operation() == dbQuery ); + + DbMessage d( toSend ); + uassert( 8007 , "SyncClusterConnection::call can't handle $cmd" , strstr( d.getns(), "$cmd" ) == 0 ); + + for ( size_t i=0; i<_conns.size(); i++ ){ + try { + bool ok = _conns[i]->call( toSend , response , assertOk ); + if ( ok ) + return ok; + log() << "call failed to: " << _conns[i]->toString() << " no data" << endl; + } + catch ( ... ){ + log() << "call failed to: " << _conns[i]->toString() << " exception" << endl; + } + } + throw UserException( 8008 , "all servers down!" ); } void SyncClusterConnection::say( Message &toSend ){ diff --git a/client/syncclusterconnection.h b/client/syncclusterconnection.h index 1a06249b3aa..e3411e1ce4b 100644 --- a/client/syncclusterconnection.h +++ b/client/syncclusterconnection.h @@ -68,9 +68,7 @@ namespace mongo { virtual void say( Message &toSend ); virtual void sayPiggyBack( Message &toSend ); - virtual string getServerAddress() const { - return _toString(); - } + virtual string getServerAddress() const { return _address; } virtual bool isFailed() const { return false; @@ -78,6 +76,8 @@ namespace mongo { private: + SyncClusterConnection( SyncClusterConnection& prev ); + string _toString() const; bool _commandOnActive(const string &dbname, const BSONObj& cmd, BSONObj &info, int options=0); @@ -90,8 +90,9 @@ namespace mongo { void _checkLast(); void _connect( string host ); + + string _address; vector<DBClientConnection*> _conns; - map<string,int> _lockTypes; }; diff --git a/db/dbcommands.cpp b/db/dbcommands.cpp index 264ceff745c..a8d826dc682 100644 --- a/db/dbcommands.cpp +++ b/db/dbcommands.cpp @@ -437,9 +437,10 @@ namespace mongo { virtual bool slaveOk() { return true; } - virtual LockType locktype(){ return WRITE; } + virtual LockType locktype(){ return NONE; } CmdGetOpTime() : Command("getoptime") { } bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool fromRepl) { + writelock l( "" ); result.appendDate("optime", OpTime::now().asDate()); return true; } diff --git a/jstests/sharding/sync2.js b/jstests/sharding/sync2.js new file mode 100644 index 00000000000..b0bbcb6cf12 --- /dev/null +++ b/jstests/sharding/sync2.js @@ -0,0 +1,48 @@ +// sync2.js + +s = new ShardingTest( "sync2" , 3 , 50 , 2 , { sync : true } ); + +s2 = s._mongos[1]; + +s.adminCommand( { enablesharding : "test" } ); +s.adminCommand( { shardcollection : "test.foo" , key : { num : 1 } } ); + +s.getDB( "test" ).foo.save( { num : 1 } ); +s.getDB( "test" ).foo.save( { num : 2 } ); +s.getDB( "test" ).foo.save( { num : 3 } ); +s.getDB( "test" ).foo.save( { num : 4 } ); +s.getDB( "test" ).foo.save( { num : 5 } ); +s.getDB( "test" ).foo.save( { num : 6 } ); +s.getDB( "test" ).foo.save( { num : 7 } ); + +assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal A" ); +assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other A" ); + +s.adminCommand( { split : "test.foo" , middle : { num : 4 } } ); +s.adminCommand( { movechunk : "test.foo" , find : { num : 3 } , to : s.getFirstOther( s.getServer( "test" ) ).name } ); + +assert( s._connections[0].getDB( "test" ).foo.find().toArray().length > 0 , "blah 1" ); +assert( s._connections[1].getDB( "test" ).foo.find().toArray().length > 0 , "blah 2" ); +assert.eq( 7 , s._connections[0].getDB( "test" ).foo.find().toArray().length + + s._connections[1].getDB( "test" ).foo.find().toArray().length , "blah 3" ); + +assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B" ); +assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B" ); + +s.adminCommand( { split : "test.foo" , middle : { num : 2 } } ); +s.printChunks(); + +print( "* A" ); + +assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 1" ); +assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B 2" ); +print( "* B" ); +assert.eq( 7 , s.getDB( "test" ).foo.find().toArray().length , "normal B 3" ); +assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B 4" ); + +for ( var i=0; i<10; i++ ){ + print( "* C " + i ); + assert.eq( 7 , s2.getDB( "test" ).foo.find().toArray().length , "other B " + i ); +} + +s.stop(); diff --git a/s/config.cpp b/s/config.cpp index 467b5d8e597..c3c3668c5ab 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -371,13 +371,18 @@ namespace mongo { } ourHostname = hn; + stringstream fullString; + set<string> hosts; for ( size_t i=0; i<configHosts.size(); i++ ){ string host = configHosts[i]; hosts.insert( getHost( host , false ) ); configHosts[i] = getHost( host , true ); + if ( i > 0 ) + fullString << ","; + fullString << configHosts[i]; } - + for ( set<string>::iterator i=hosts.begin(); i!=hosts.end(); i++ ){ string host = *i; bool ok = false; @@ -393,8 +398,8 @@ namespace mongo { return false; } - uassert( 10188 , "can only hand 1 config db right now" , configHosts.size() == 1 ); - _primary = configHosts[0]; + _primary = fullString.str(); + log(1) << " config string : " << fullString.str() << endl; return true; } diff --git a/s/strategy.cpp b/s/strategy.cpp index 862956f1e2f..b7277e3190c 100644 --- a/s/strategy.cpp +++ b/s/strategy.cpp @@ -41,14 +41,12 @@ namespace mongo { void Strategy::doQuery( Request& r , string server ){ try{ ScopedDbConnection dbcon( server ); - DBClientBase &_c = dbcon.conn(); + DBClientBase &c = dbcon.conn(); - checkShardVersion( _c , r.getns() ); + checkShardVersion( c , r.getns() ); - // TODO: This will not work with Paired connections. Fix. - DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c); Message response; - bool ok = c.port().call( r.m(), response); + bool ok = c.call( r.m(), response); { QueryResult *qr = (QueryResult *) response.data; @@ -35,6 +35,7 @@ namespace mongo { stringstream s; s << "StaleConfigException ns: " << ns << " " << msg; _msg = s.str(); + log(1) << _msg << endl; } virtual ~StaleConfigException() throw(){} diff --git a/scripting/sm_db.cpp b/scripting/sm_db.cpp index c74baf96ba1..eddf56dc5e7 100644 --- a/scripting/sm_db.cpp +++ b/scripting/sm_db.cpp @@ -226,6 +226,7 @@ namespace mongo { auto_ptr<DBClientCursor> cursor = conn->query( ns , q , nToReturn , nToSkip , f.nFields() ? &f : 0 , slaveOk ? QueryOption_SlaveOk : 0 , batchSize ); if ( ! cursor.get() ){ + log() << "query failed : " << ns << " " << q << " to: " << conn->toString() << endl; JS_ReportError( cx , "error doing query: failed" ); return JS_FALSE; } diff --git a/shell/servers.js b/shell/servers.js index f753b5f17f2..79d01a4aaaf 100644 --- a/shell/servers.js +++ b/shell/servers.js @@ -136,15 +136,24 @@ ShardingTest = function( testName , numServers , verboseLevel , numMongos , othe if ( ! otherParams ) otherParams = {} this._connections = []; + + if ( otherParams.sync && numServers < 3 ) + throw "if you want sync, you need at least 3 servers"; for ( var i=0; i<numServers; i++){ var conn = startMongodTest( 30000 + i , testName + i ); this._connections.push( conn ); } - this._configDB = "localhost:30000"; - this._connections[0].getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } ); - + if ( otherParams.sync ){ + this._configDB = "localhost:30000,localhost:30001,localhost:30002"; + this._configConnection = new Mongo( this._configDB ); + this._configConnection.getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } ); + } + else { + this._configDB = "localhost:30000"; + this._connections[0].getDB( "config" ).settings.insert( { _id : "chunksize" , value : otherParams.chunksize || 50 } ); + } this._mongos = []; var startMongosPort = 31000; @@ -195,6 +204,14 @@ ShardingTest.prototype.getOther = function( one ){ return this._connections[0]; } +ShardingTest.prototype.getFirstOther = function( one ){ + for ( var i=0; i<this._connections.length; i++ ){ + if ( this._connections[i] != one ) + return this._connections[i]; + } + throw "impossible"; +} + ShardingTest.prototype.stop = function(){ for ( var i=0; i<this._mongos.length; i++ ){ stopMongoProgram( 31000 - i ); diff --git a/util/message.h b/util/message.h index 5891563c205..5dccaef3e40 100644 --- a/util/message.h +++ b/util/message.h @@ -168,10 +168,14 @@ namespace mongo { ~Message() { reset(); } - + SockAddr from; MsgData *data; + int operation() const { + return data->operation(); + } + Message& operator=(Message& r) { assert( data == 0 ); data = r.data; |