diff options
author | Eliot Horowitz <eliot@10gen.com> | 2011-01-03 14:12:32 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2011-01-03 14:13:07 -0500 |
commit | b99c10320e1fcb28e36167ed0022fcbf55e02cbf (patch) | |
tree | 797a7fbda356fa8994ef2fe1ca28d5a55d9b1cfb /s | |
parent | 17621a52d3cb1bce3042011f538535b34bfde2f5 (diff) | |
download | mongo-b99c10320e1fcb28e36167ed0022fcbf55e02cbf.tar.gz |
new writeback hookup with per connection info SERVER-1467 SERVER-1855
Diffstat (limited to 's')
-rw-r--r-- | s/client.cpp | 24 | ||||
-rw-r--r-- | s/client.h | 10 | ||||
-rw-r--r-- | s/d_logic.cpp | 1 | ||||
-rw-r--r-- | s/writeback_listener.cpp | 63 | ||||
-rw-r--r-- | s/writeback_listener.h | 12 |
5 files changed, 78 insertions, 32 deletions
diff --git a/s/client.cpp b/s/client.cpp index 0ce5cc8f1ca..2e803afbbe0 100644 --- a/s/client.cpp +++ b/s/client.cpp @@ -123,19 +123,29 @@ namespace mongo { _clients.erase( i ); } - void ClientInfo::_addWriteBack( vector<OID>& all , const BSONObj& o ){ - BSONElement e = o["writeback"]; + void ClientInfo::_addWriteBack( vector<WBInfo>& all , const BSONObj& o ){ + BSONElement w = o["writeback"]; - if ( e.type() == jstOID ) - all.push_back( e.OID() ); + if ( w.type() != jstOID ) + return; + + BSONElement cid = o["connectionId"]; + cout << "ELIOT : " << cid << endl; + + if ( cid.eoo() ){ + error() << "getLastError writeback can't work because of version mis-match" << endl; + return; + } + + all.push_back( WBInfo( cid.numberLong() , w.OID() ) ); } - void ClientInfo::_handleWriteBacks( vector<OID>& all ){ + void ClientInfo::_handleWriteBacks( vector<WBInfo>& all ){ if ( all.size() == 0 ) return; for ( unsigned i=0; i<all.size(); i++ ){ - WriteBackListener::waitFor( all[i] ); + WriteBackListener::waitFor( all[i].connectionId , all[i].id ); } } @@ -149,7 +159,7 @@ namespace mongo { return true; } - vector<OID> writebacks; + vector<WBInfo> writebacks; // handle single server if ( shards->size() == 1 ){ diff --git a/s/client.h b/s/client.h index 60a05d92aad..050661b1876 100644 --- a/s/client.h +++ b/s/client.h @@ -77,9 +77,15 @@ namespace mongo { private: + struct WBInfo { + WBInfo( ConnectionId c , OID o ) : connectionId( c ) , id( o ){} + ConnectionId connectionId; + OID id; + }; + // for getLastError - void _addWriteBack( vector<OID>& all , const BSONObj& o ); - void _handleWriteBacks( vector<OID>& all ); + void _addWriteBack( vector<WBInfo>& all , const BSONObj& o ); + void _handleWriteBacks( vector<WBInfo>& all ); int _id; // unique client id diff --git a/s/d_logic.cpp b/s/d_logic.cpp index 40a8c91f3c5..7de7a1ef7e6 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -103,6 +103,7 @@ namespace mongo { b.appendBool( "writeBack" , true ); b.append( "ns" , ns ); b.append( "id" , writebackID ); + b.append( "connectionId" , cc().getConnectionId() ); b.appendTimestamp( "version" , shardingState.getVersion( ns ) ); b.appendTimestamp( "yourVersion" , ShardedConnectionInfo::get( true )->getVersion( ns ) ); b.appendBinData( "msg" , m.header()->len , bdtCustom , (char*)(m.singleData()) ); diff --git a/s/writeback_listener.cpp b/s/writeback_listener.cpp index ebef9c7c83f..2e1bc10af25 100644 --- a/s/writeback_listener.cpp +++ b/s/writeback_listener.cpp @@ -34,7 +34,7 @@ namespace mongo { map<string,WriteBackListener*> WriteBackListener::_cache; mongo::mutex WriteBackListener::_cacheLock("WriteBackListener"); - set<OID> WriteBackListener::_seenWritebacks; + map<ConnectionId,WriteBackListener::WBStatus> WriteBackListener::_seenWritebacks; mongo::mutex WriteBackListener::_seenWritebacksLock("WriteBackListener::seen"); WriteBackListener::WriteBackListener( const string& addr ) : _addr( addr ){ @@ -52,13 +52,16 @@ namespace mongo { } /* static */ - void WriteBackListener::waitFor( const OID& oid ){ + void WriteBackListener::waitFor( ConnectionId connectionId, const OID& oid ){ Timer t; for ( int i=0; i<5000; i++ ){ { scoped_lock lk( _seenWritebacksLock ); - if ( _seenWritebacks.count( oid ) ) + WBStatus s = _seenWritebacks[connectionId]; + if ( oid <= s.id ){ + // TODO return gle return; + } } sleepmillis( 10 ); } @@ -68,16 +71,9 @@ namespace mongo { } void WriteBackListener::run(){ - OID lastID; - lastID.clear(); int secsToSleep = 0; while ( ! inShutdown() && Shard::isMember( _addr ) ){ - if ( lastID.isSet() ){ - scoped_lock lk( _seenWritebacksLock ); - _seenWritebacks.insert( lastID ); - lastID.clear(); - } try { ScopedDbConnection conn( _addr ); @@ -100,20 +96,25 @@ namespace mongo { BSONObj data = result.getObjectField( "data" ); if ( data.getBoolField( "writeBack" ) ){ string ns = data["ns"].valuestrsafe(); - { - BSONElement e = data["id"]; - if ( e.type() == jstOID ) - lastID = e.OID(); + + ConnectionId cid = 0; + OID wid; + if ( data["connectionId"].isNumber() && data["id"].type() == jstOID ){ + cid = data["connectionId"].numberLong(); + wid = data["id"].OID(); + } + else { + warning() << "mongos/mongod version mismatch (1.7.5 is the split)" << endl; } - int len; + int len; // not used, but needed for next call Message m( (void*)data["msg"].binData( len ) , false ); massert( 10427 , "invalid writeback message" , m.header()->valid() ); DBConfigPtr db = grid.getDBConfig( ns ); ShardChunkVersion needVersion( data["version"] ); - log(1) << "writeback id: " << lastID << " needVersion : " << needVersion.toString() + log(1) << "connectionId: " << cid << " writebackId: " << wid << " needVersion : " << needVersion.toString() << " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3) if ( logLevel ) log(1) << debugString( m ) << endl; @@ -129,10 +130,32 @@ namespace mongo { // we need to reload the chunk manager and get the new shard versions db->getChunkManager( ns , true ); } - - Request r( m , 0 ); - r.init(); - r.process(); + + // do reequest and then call getLastError + // we have to call getLastError so we can return the right fields to the user if they decide to call getLastError + + BSONObj gle; + try { + Request r( m , 0 ); + r.init(); + r.process(); + + gle = BSONObj(); // TODO + } + catch ( DBException& e ){ + error() << "error processing writeback: " << e << endl; + BSONObjBuilder b; + b.append( "err" , e.toString() ); + e.getInfo().append( b ); + gle = b.obj(); + } + + { + scoped_lock lk( _seenWritebacksLock ); + WBStatus& s = _seenWritebacks[cid]; + s.id = wid; + s.gle = gle; + } } else if ( result["noop"].trueValue() ){ // no-op diff --git a/s/writeback_listener.h b/s/writeback_listener.h index f567a073e2f..bd2664d9332 100644 --- a/s/writeback_listener.h +++ b/s/writeback_listener.h @@ -22,6 +22,7 @@ #include "../client/connpool.h" #include "../util/background.h" +#include "../db/client.h" namespace mongo { @@ -36,7 +37,7 @@ namespace mongo { public: static void init( DBClientBase& conn ); - static void waitFor( const OID& oid ); + static void waitFor( ConnectionId connectionId, const OID& oid ); protected: WriteBackListener( const string& addr ); @@ -48,10 +49,15 @@ namespace mongo { string _addr; static mongo::mutex _cacheLock; // protects _cache - static map<string,WriteBackListener*> _cache; + static map<string,WriteBackListener*> _cache; // server to listener + struct WBStatus { + OID id; + BSONObj gle; + }; + static mongo::mutex _seenWritebacksLock; // protects _seenWritbacks - static set<OID> _seenWritebacks; // TODO: this can grow unbounded + static map<ConnectionId,WBStatus> _seenWritebacks; // connectionId -> last write back GLE }; void waitForWriteback( const OID& oid ); |