diff options
author | Alberto Lerner <alerner@10gen.com> | 2010-10-01 12:45:36 -0400 |
---|---|---|
committer | Alberto Lerner <alerner@10gen.com> | 2010-10-01 12:45:36 -0400 |
commit | 9076c182ecfeea0985ba0fa3ae736092a2e81b6d (patch) | |
tree | d3b4a13c62fb8397bbd665da7cfaefe1da9276dc /s/strategy.cpp | |
parent | 75006ce5389594d847d94fcf3d1cb3affa577c7a (diff) | |
download | mongo-9076c182ecfeea0985ba0fa3ae736092a2e81b6d.tar.gz |
SERVER-1822 pull out WriteBackListener, next dedup will use it
Diffstat (limited to 's/strategy.cpp')
-rw-r--r-- | s/strategy.cpp | 166 |
1 files changed, 8 insertions, 158 deletions
diff --git a/s/strategy.cpp b/s/strategy.cpp index e67dbc2cec5..738797666db 100644 --- a/s/strategy.cpp +++ b/s/strategy.cpp @@ -1,3 +1,5 @@ +// @file strategy.cpp + /* * Copyright (C) 2010 10gen Inc. * @@ -14,16 +16,17 @@ * along with this program. If not, see <http://www.gnu.org/licenses/>. */ -// stragegy.cpp - #include "pch.h" -#include "request.h" -#include "../util/background.h" + #include "../client/connpool.h" #include "../db/commands.h" -#include "server.h" #include "grid.h" +#include "request.h" +#include "server.h" +#include "writeback_listener.h" + +#include "strategy.h" namespace mongo { @@ -79,159 +82,6 @@ namespace mongo { dbcon.done(); } - class WriteBackListener : public BackgroundJob { - protected: - string name() { return "WriteBackListener"; } - WriteBackListener( const string& addr ) : _addr( addr ){ - log() << "creating WriteBackListener for: " << addr << endl; - } - - void run(){ - OID lastID; - lastID.clear(); - int secsToSleep = 0; - while ( ! inShutdown() && Shard::isMember( _addr ) ){ - - if ( lastID.isSet() ){ - scoped_lock lk( _seenWritebacksLock ); - _seenWritebacks.insert( lastID ); - lastID.clear(); - } - - try { - ScopedDbConnection conn( _addr ); - - BSONObj result; - - { - BSONObjBuilder cmd; - cmd.appendOID( "writebacklisten" , &serverID ); // Command will block for data - if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){ - log() << "writebacklisten command failed! " << result << endl; - conn.done(); - continue; - } - - } - - log(1) << "writebacklisten result: " << result << endl; - - BSONObj data = result.getObjectField( "data" ); - if ( data.getBoolField( "writeBack" ) ){ - string ns = data["ns"].valuestrsafe(); - { - BSONElement e = data["id"]; - if ( e.type() == jstOID ) - lastID = e.OID(); - } - int len; - - Message m( (void*)data["msg"].binData( len ) , false ); - massert( 10427 , "invalid writeback message" , m.header()->valid() ); - - DBConfigPtr db = grid.getDBConfig( ns ); - ShardChunkVersion needVersion( data["version"] ); - - log(1) << "writeback id: " << lastID << " needVersion : " << needVersion.toString() - << " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3) - - if ( logLevel ) log(1) << debugString( m ) << endl; - - if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ){ - // this means when the write went originally, the version was old - // if we're here, it means we've already updated the config, so don't need to do again - //db->getChunkManager( ns , true ); // SERVER-1349 - } - else { - db->getChunkManager( ns , true ); - } - - Request r( m , 0 ); - r.init(); - r.process(); - } - else if ( result["noop"].trueValue() ){ - // no-op - } - else { - log() << "unknown writeBack result: " << result << endl; - } - - conn.done(); - secsToSleep = 0; - continue; - } - catch ( std::exception e ){ - - if ( inShutdown() ){ - // we're shutting down, so just clean up - return; - } - - log() << "WriteBackListener exception : " << e.what() << endl; - - // It's possible this shard was removed - Shard::reloadShardInfo(); - } - catch ( ... ){ - log() << "WriteBackListener uncaught exception!" << endl; - } - secsToSleep++; - sleepsecs(secsToSleep); - if ( secsToSleep > 10 ) - secsToSleep = 0; - } - - log() << "WriteBackListener exiting : address no longer in cluster " << _addr; - - } - - private: - string _addr; - - static map<string,WriteBackListener*> _cache; - static mongo::mutex _cacheLock; - - static set<OID> _seenWritebacks; // TODO: this can grow unbounded - static mongo::mutex _seenWritebacksLock; - - public: - static void init( DBClientBase& conn ){ - scoped_lock lk( _cacheLock ); - WriteBackListener*& l = _cache[conn.getServerAddress()]; - if ( l ) - return; - l = new WriteBackListener( conn.getServerAddress() ); - l->go(); - } - - - static void waitFor( const OID& oid ){ - Timer t; - for ( int i=0; i<5000; i++ ){ - { - scoped_lock lk( _seenWritebacksLock ); - if ( _seenWritebacks.count( oid ) ) - return; - } - sleepmillis( 10 ); - } - stringstream ss; - ss << "didn't get writeback for: " << oid << " after: " << t.millis() << " ms"; - uasserted( 13403 , ss.str() ); - } - }; - - void waitForWriteback( const OID& oid ){ - WriteBackListener::waitFor( oid ); - } - - map<string,WriteBackListener*> WriteBackListener::_cache; - mongo::mutex WriteBackListener::_cacheLock("WriteBackListener"); - - set<OID> WriteBackListener::_seenWritebacks; - mongo::mutex WriteBackListener::_seenWritebacksLock( "WriteBackListener::seen" ); - struct ConnectionShardStatus { typedef unsigned long long S; |