diff options
author | Eliot Horowitz <eliot@10gen.com> | 2010-06-15 17:42:49 -0400 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2010-06-15 17:42:49 -0400 |
commit | eefddcd5a9948fa4ac29e3c93618fa6e18070fc0 (patch) | |
tree | 96ac724568cdb8d06a7af8bdb11cf5efb1abcd8e | |
parent | e25c1febc1ec2c09d6a0f73b93db3e4e2a6de772 (diff) | |
download | mongo-eefddcd5a9948fa4ac29e3c93618fa6e18070fc0.tar.gz |
checkpoint for moving chunk filtering to mongod SERVER-943
-rw-r--r-- | db/query.cpp | 18 | ||||
-rw-r--r-- | jstests/sharding/auto2.js | 1 | ||||
-rw-r--r-- | jstests/sharding/findandmodify1.js | 3 | ||||
-rw-r--r-- | jstests/sharding/shard3.js | 4 | ||||
-rw-r--r-- | s/commands_public.cpp | 4 | ||||
-rw-r--r-- | s/config.cpp | 1 | ||||
-rw-r--r-- | s/d_logic.cpp | 8 | ||||
-rw-r--r-- | s/d_logic.h | 33 | ||||
-rw-r--r-- | s/d_state.cpp | 115 | ||||
-rw-r--r-- | s/strategy_shard.cpp | 2 |
10 files changed, 173 insertions, 16 deletions
diff --git a/db/query.cpp b/db/query.cpp index db0fe466152..0fb08624d34 100644 --- a/db/query.cpp +++ b/db/query.cpp @@ -34,6 +34,7 @@ #include "commands.h" #include "queryoptimizer.h" #include "lasterror.h" +#include "../s/d_logic.h" namespace mongo { @@ -322,6 +323,12 @@ namespace mongo { // in some cases (clone collection) there won't be a matcher if ( c->matcher() && !c->matcher()->matches(c->currKey(), c->currLoc() ) ) { } + /* + TODO + else if ( _chunkMatcher && ! _chunkMatcher->belongsToMe( c->currKey(), c->currLoc() ) ){ + cout << "TEMP skipping un-owned chunk: " << c->current() << endl; + } + */ else { if( c->getsetdup(c->currLoc()) ) { //out() << " but it's a dup \n"; @@ -561,6 +568,7 @@ namespace mongo { _nscanned(0), _oldNscanned(0), _nscannedObjects(0), _oldNscannedObjects(0), _n(0), _oldN(0), + _chunkMatcher(shardingState.getChunkMatcher(pq.ns())), _inMemSort(false), _saveClientCursor(false), _wouldSaveClientCursor(false), @@ -628,7 +636,13 @@ namespace mongo { else { _nscannedObjects++; DiskLoc cl = _c->currLoc(); - if( !_c->getsetdup(cl) ) { + if ( _chunkMatcher && ! _chunkMatcher->belongsToMe( _c->currKey(), _c->currLoc() ) ){ + cout << "TEMP skipping un-owned chunk: " << _c->current() << endl; + } + else if( _c->getsetdup(cl) ) { + // dup + } + else { // got a match. if ( _inMemSort ) { @@ -758,6 +772,8 @@ namespace mongo { MatchDetails _details; + ChunkMatcherPtr _chunkMatcher; + bool _inMemSort; auto_ptr< ScanAndOrder > _so; diff --git a/jstests/sharding/auto2.js b/jstests/sharding/auto2.js index 357adde80fa..12b3c0b0bee 100644 --- a/jstests/sharding/auto2.js +++ b/jstests/sharding/auto2.js @@ -44,6 +44,7 @@ print( "checkpoint B" ) assert.eq( j * 100 , counta + countb , "from each a:" + counta + " b:" + countb + " i:" + i ); print( "checkpoint B.a" ) +s.printChunks(); assert.eq( j * 100 , coll.find().limit(100000000).itcount() , "itcount A" ); print( "checkpoint C" ) diff --git a/jstests/sharding/findandmodify1.js b/jstests/sharding/findandmodify1.js index 774701f38eb..aa87a799c3e 100644 --- a/jstests/sharding/findandmodify1.js +++ b/jstests/sharding/findandmodify1.js @@ -26,9 +26,10 @@ for (var i=0; i < numObjs; i++){ assert.eq(db.stuff.count({a:1}), i, "1 A"); var out = db.stuff.findAndModify({query: {a:null}, update: {$set: {a:1}}, sort: {_id:1}}); + printjson( out ) assert.eq(db.stuff.count({a:1}), i+1, "1 B"); - assert.eq(db.stuff.findOne({_id:i}).a, 1, "1 C"); + assert.eq(db.stuff.findOne({_id:i}).a, 1, "1 C : " + tojson( db.stuff.findOne({_id:i}) ) ); assert.eq(out._id, i, "1 D"); } diff --git a/jstests/sharding/shard3.js b/jstests/sharding/shard3.js index 8c5b184e798..df82907d338 100644 --- a/jstests/sharding/shard3.js +++ b/jstests/sharding/shard3.js @@ -1,6 +1,6 @@ // shard3.js -s = new ShardingTest( "shard3" , 2 , 50 , 2 ); +s = new ShardingTest( "shard3" , 2 , 1 , 2 ); s2 = s._mongos[1]; @@ -35,6 +35,8 @@ assert.eq( 3 , primary.find().itcount() + secondary.find().itcount() , "blah 3" assert.eq( 3 , a.find().toArray().length , "normal B" ); assert.eq( 3 , b.find().toArray().length , "other B" ); +printjson( a._db._adminCommand( "shardingState" ) ); + // --- filtering --- function doCounts( name , total ){ diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 78da12de947..b4dbe2f476f 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -314,9 +314,11 @@ namespace mongo { for ( vector<shared_ptr<ChunkRange> >::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ shared_ptr<ChunkRange> c = *i; + BSONObj myCommand = fixCmdObj( cmdObj , c ); + cout << "myCommand: " << myCommand << endl; ShardConnection conn( c->getShard() , fullns ); BSONObj res; - bool ok = conn->runCommand( conf->getName() , fixCmdObj(cmdObj, c) , res ); + bool ok = conn->runCommand( conf->getName() , myCommand , res ); conn.done(); if (ok || (strcmp(res["errmsg"].valuestrsafe(), "No matching object found") != 0)){ diff --git a/s/config.cpp b/s/config.cpp index 7e2268b86ea..41fb4cec6c7 100644 --- a/s/config.cpp +++ b/s/config.cpp @@ -481,6 +481,7 @@ namespace mongo { // indexes conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "min" << 1 ) , true ); + conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "shard" << 1 << "min" << 1 ) , true ); conn->ensureIndex( ShardNS::shard , BSON( "host" << 1 ) , true ); conn.done(); diff --git a/s/d_logic.cpp b/s/d_logic.cpp index b2912bb696f..fc42a8e7adf 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -42,14 +42,6 @@ using namespace std; namespace mongo { - bool objectBelongsToMe( const string& ns , const DiskLoc& loc ){ - if ( ! shardingState.enabled() ) - return true; - - return true; - } - - bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){ if ( ! shardingState.enabled() ) return false; diff --git a/s/d_logic.h b/s/d_logic.h index abcdb9e99fc..3726e9b9972 100644 --- a/s/d_logic.h +++ b/s/d_logic.h @@ -23,9 +23,36 @@ namespace mongo { + class ShardingState; + typedef unsigned long long ConfigVersion; typedef map<string,ConfigVersion> NSVersionMap; + // ----------- + + /** + * TODO: this only works with single fields at the moment + */ + class ChunkMatcher { + typedef map<BSONObj,pair<BSONObj,BSONObj>,BSONObjCmp> MyMap; + public: + + bool belongsToMe( const BSONObj& key , const DiskLoc& loc ) const; + + private: + ChunkMatcher( ConfigVersion version ); + + void gotRange( const BSONObj& min , const BSONObj& max ); + + ConfigVersion _version; + string _field; + MyMap _map; + + friend class ShardingState; + }; + + typedef shared_ptr<ChunkMatcher> ChunkMatcherPtr; + // -------------- // --- global state --- // -------------- @@ -48,6 +75,8 @@ namespace mongo { void appendInfo( BSONObjBuilder& b ); + ChunkMatcherPtr getChunkMatcher( const string& ns , bool load=false , ConfigVersion version=0 ); + private: bool _enabled; @@ -59,6 +88,7 @@ namespace mongo { mongo::mutex _mutex; NSVersionMap _versions; + map<string,ChunkMatcherPtr> _chunks; }; extern ShardingState shardingState; @@ -110,9 +140,6 @@ namespace mongo { */ bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ); - - bool objectBelongsToMe( const string& ns , const DiskLoc& loc ); - // ----------------- // --- writeback --- // ----------------- diff --git a/s/d_state.cpp b/s/d_state.cpp index fc88ea3ae96..77bd1b18e63 100644 --- a/s/d_state.cpp +++ b/s/d_state.cpp @@ -138,6 +138,90 @@ namespace mongo { } + ChunkMatcherPtr ShardingState::getChunkMatcher( const string& ns , bool load , ConfigVersion version ){ + return ChunkMatcherPtr();// ERH ERH ERH + + if ( ! _enabled ) + return ChunkMatcherPtr(); + + if ( ! ShardedConnectionInfo::get( false ) ) + return ChunkMatcherPtr(); + + { + scoped_lock lk( _mutex ); + if ( ! version ) + version = _versions[ns]; + if ( ! version ) + return ChunkMatcherPtr(); + + ChunkMatcherPtr p = _chunks[ns]; + if ( p ){ + if ( ! load ) + return p; + + if ( p->_version >= version ) + return p; + } + else { + if ( ! load ){ + stringstream ss; + ss << "getChunkMatcher called with load false for ns: " << ns; + msgasserted( 13300 , ss.str() ); + } + } + + } + + BSONObj q; + { + BSONObjBuilder b; + b.append( "ns" , ns.c_str() ); + b.append( "shard" , BSON( "$in" << BSON_ARRAY( _shardHost << _shardName ) ) ); + q = b.obj(); + } + + ScopedDbConnection conn( _configServer ); + + auto_ptr<DBClientCursor> cursor = conn->query( "config.chunks" , Query(q).sort( "min" ) ); + if ( ! cursor->more() ){ + conn.done(); + return ChunkMatcherPtr(); + } + + ChunkMatcherPtr p( new ChunkMatcher( version ) ); + + BSONObj min,max; + while ( cursor->more() ){ + BSONObj d = cursor->next(); + + if ( min.isEmpty() ){ + min = d["min"].Obj(); + max = d["max"].Obj(); + continue; + } + + if ( max == d["min"].Obj() ){ + max = d["max"].Obj(); + continue; + } + + p->gotRange( min.getOwned() , max.getOwned() ); + min = d["min"].Obj(); + max = d["max"].Obj(); + } + assert( ! min.isEmpty() ); + p->gotRange( min.getOwned() , max.getOwned() ); + + conn.done(); + + { + scoped_lock lk( _mutex ); + _chunks[ns] = p; + } + + return p; + } + ShardingState shardingState; // -----ShardingState END ---- @@ -334,6 +418,11 @@ namespace mongo { return false; } + { + dbtemprelease unlock; + shardingState.getChunkMatcher( ns , true , version ); + } + result.appendTimestamp( "oldVersion" , oldVersion ); oldVersion = version; globalVersion = version; @@ -431,5 +520,31 @@ namespace mongo { return false; } + // --- ChunkMatcher --- + ChunkMatcher::ChunkMatcher( ConfigVersion version ) + : _version( version ){ + + } + + void ChunkMatcher::gotRange( const BSONObj& min , const BSONObj& max ){ + assert( min.nFields() == 1 ); + _field = min.firstElement().fieldName(); + _map[min] = make_pair(min,max); + } + + bool ChunkMatcher::belongsToMe( const BSONObj& key , const DiskLoc& loc ) const { + return true; // ERH ERH ERH + if ( _map.size() == 0 ) + return false; + + BSONObj x = loc.obj().getFieldDotted( _field.c_str() ).wrap(); // TODO: this is slow + + MyMap::const_iterator a = _map.upper_bound( x ); + if ( a == _map.end() ) + a--; + + return x.woCompare( a->second.first ) >= 0 && x.woCompare( a->second.second ) < 0; + } + } diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index 70af9cfacc1..168ccd756f7 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -50,7 +50,7 @@ namespace mongo { set<ServerAndQuery> servers; for ( vector<shared_ptr<ChunkRange> >::iterator i = shards.begin(); i != shards.end(); i++ ){ shared_ptr<ChunkRange> c = *i; - //servers.insert( ServerAndQuery( c->getShard().getConnString() , BSONObj() ) ); + //servers.insert( ServerAndQuery( c->getShard().getConnString() , BSONObj() ) ); // ERH ERH ERH servers.insert( ServerAndQuery( c->getShard().getConnString() , c->getFilter() ) ); } |