summaryrefslogtreecommitdiff
path: root/s/strategy.cpp
diff options
context:
space:
mode:
authorAlberto Lerner <alerner@10gen.com>2010-10-01 12:45:36 -0400
committerAlberto Lerner <alerner@10gen.com>2010-10-01 12:45:36 -0400
commit9076c182ecfeea0985ba0fa3ae736092a2e81b6d (patch)
treed3b4a13c62fb8397bbd665da7cfaefe1da9276dc /s/strategy.cpp
parent75006ce5389594d847d94fcf3d1cb3affa577c7a (diff)
downloadmongo-9076c182ecfeea0985ba0fa3ae736092a2e81b6d.tar.gz
SERVER-1822 pull out WriteBackListener, next dedup will use it
Diffstat (limited to 's/strategy.cpp')
-rw-r--r--s/strategy.cpp166
1 files changed, 8 insertions, 158 deletions
diff --git a/s/strategy.cpp b/s/strategy.cpp
index e67dbc2cec5..738797666db 100644
--- a/s/strategy.cpp
+++ b/s/strategy.cpp
@@ -1,3 +1,5 @@
+// @file strategy.cpp
+
/*
* Copyright (C) 2010 10gen Inc.
*
@@ -14,16 +16,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-// stragegy.cpp
-
#include "pch.h"
-#include "request.h"
-#include "../util/background.h"
+
#include "../client/connpool.h"
#include "../db/commands.h"
-#include "server.h"
#include "grid.h"
+#include "request.h"
+#include "server.h"
+#include "writeback_listener.h"
+
+#include "strategy.h"
namespace mongo {
@@ -79,159 +82,6 @@ namespace mongo {
dbcon.done();
}
- class WriteBackListener : public BackgroundJob {
- protected:
- string name() { return "WriteBackListener"; }
- WriteBackListener( const string& addr ) : _addr( addr ){
- log() << "creating WriteBackListener for: " << addr << endl;
- }
-
- void run(){
- OID lastID;
- lastID.clear();
- int secsToSleep = 0;
- while ( ! inShutdown() && Shard::isMember( _addr ) ){
-
- if ( lastID.isSet() ){
- scoped_lock lk( _seenWritebacksLock );
- _seenWritebacks.insert( lastID );
- lastID.clear();
- }
-
- try {
- ScopedDbConnection conn( _addr );
-
- BSONObj result;
-
- {
- BSONObjBuilder cmd;
- cmd.appendOID( "writebacklisten" , &serverID ); // Command will block for data
- if ( ! conn->runCommand( "admin" , cmd.obj() , result ) ){
- log() << "writebacklisten command failed! " << result << endl;
- conn.done();
- continue;
- }
-
- }
-
- log(1) << "writebacklisten result: " << result << endl;
-
- BSONObj data = result.getObjectField( "data" );
- if ( data.getBoolField( "writeBack" ) ){
- string ns = data["ns"].valuestrsafe();
- {
- BSONElement e = data["id"];
- if ( e.type() == jstOID )
- lastID = e.OID();
- }
- int len;
-
- Message m( (void*)data["msg"].binData( len ) , false );
- massert( 10427 , "invalid writeback message" , m.header()->valid() );
-
- DBConfigPtr db = grid.getDBConfig( ns );
- ShardChunkVersion needVersion( data["version"] );
-
- log(1) << "writeback id: " << lastID << " needVersion : " << needVersion.toString()
- << " mine : " << db->getChunkManager( ns )->getVersion().toString() << endl;// TODO change to log(3)
-
- if ( logLevel ) log(1) << debugString( m ) << endl;
-
- if ( needVersion.isSet() && needVersion <= db->getChunkManager( ns )->getVersion() ){
- // this means when the write went originally, the version was old
- // if we're here, it means we've already updated the config, so don't need to do again
- //db->getChunkManager( ns , true ); // SERVER-1349
- }
- else {
- db->getChunkManager( ns , true );
- }
-
- Request r( m , 0 );
- r.init();
- r.process();
- }
- else if ( result["noop"].trueValue() ){
- // no-op
- }
- else {
- log() << "unknown writeBack result: " << result << endl;
- }
-
- conn.done();
- secsToSleep = 0;
- continue;
- }
- catch ( std::exception e ){
-
- if ( inShutdown() ){
- // we're shutting down, so just clean up
- return;
- }
-
- log() << "WriteBackListener exception : " << e.what() << endl;
-
- // It's possible this shard was removed
- Shard::reloadShardInfo();
- }
- catch ( ... ){
- log() << "WriteBackListener uncaught exception!" << endl;
- }
- secsToSleep++;
- sleepsecs(secsToSleep);
- if ( secsToSleep > 10 )
- secsToSleep = 0;
- }
-
- log() << "WriteBackListener exiting : address no longer in cluster " << _addr;
-
- }
-
- private:
- string _addr;
-
- static map<string,WriteBackListener*> _cache;
- static mongo::mutex _cacheLock;
-
- static set<OID> _seenWritebacks; // TODO: this can grow unbounded
- static mongo::mutex _seenWritebacksLock;
-
- public:
- static void init( DBClientBase& conn ){
- scoped_lock lk( _cacheLock );
- WriteBackListener*& l = _cache[conn.getServerAddress()];
- if ( l )
- return;
- l = new WriteBackListener( conn.getServerAddress() );
- l->go();
- }
-
-
- static void waitFor( const OID& oid ){
- Timer t;
- for ( int i=0; i<5000; i++ ){
- {
- scoped_lock lk( _seenWritebacksLock );
- if ( _seenWritebacks.count( oid ) )
- return;
- }
- sleepmillis( 10 );
- }
- stringstream ss;
- ss << "didn't get writeback for: " << oid << " after: " << t.millis() << " ms";
- uasserted( 13403 , ss.str() );
- }
- };
-
- void waitForWriteback( const OID& oid ){
- WriteBackListener::waitFor( oid );
- }
-
- map<string,WriteBackListener*> WriteBackListener::_cache;
- mongo::mutex WriteBackListener::_cacheLock("WriteBackListener");
-
- set<OID> WriteBackListener::_seenWritebacks;
- mongo::mutex WriteBackListener::_seenWritebacksLock( "WriteBackListener::seen" );
-
struct ConnectionShardStatus {
typedef unsigned long long S;