summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2010-06-15 17:42:49 -0400
committerEliot Horowitz <eliot@10gen.com>2010-06-15 17:42:49 -0400
commiteefddcd5a9948fa4ac29e3c93618fa6e18070fc0 (patch)
tree96ac724568cdb8d06a7af8bdb11cf5efb1abcd8e
parente25c1febc1ec2c09d6a0f73b93db3e4e2a6de772 (diff)
downloadmongo-eefddcd5a9948fa4ac29e3c93618fa6e18070fc0.tar.gz
checkpoint for moving chunk filtering to mongod SERVER-943
-rw-r--r--db/query.cpp18
-rw-r--r--jstests/sharding/auto2.js1
-rw-r--r--jstests/sharding/findandmodify1.js3
-rw-r--r--jstests/sharding/shard3.js4
-rw-r--r--s/commands_public.cpp4
-rw-r--r--s/config.cpp1
-rw-r--r--s/d_logic.cpp8
-rw-r--r--s/d_logic.h33
-rw-r--r--s/d_state.cpp115
-rw-r--r--s/strategy_shard.cpp2
10 files changed, 173 insertions, 16 deletions
diff --git a/db/query.cpp b/db/query.cpp
index db0fe466152..0fb08624d34 100644
--- a/db/query.cpp
+++ b/db/query.cpp
@@ -34,6 +34,7 @@
#include "commands.h"
#include "queryoptimizer.h"
#include "lasterror.h"
+#include "../s/d_logic.h"
namespace mongo {
@@ -322,6 +323,12 @@ namespace mongo {
// in some cases (clone collection) there won't be a matcher
if ( c->matcher() && !c->matcher()->matches(c->currKey(), c->currLoc() ) ) {
}
+ /*
+ TODO
+ else if ( _chunkMatcher && ! _chunkMatcher->belongsToMe( c->currKey(), c->currLoc() ) ){
+ cout << "TEMP skipping un-owned chunk: " << c->current() << endl;
+ }
+ */
else {
if( c->getsetdup(c->currLoc()) ) {
//out() << " but it's a dup \n";
@@ -561,6 +568,7 @@ namespace mongo {
_nscanned(0), _oldNscanned(0), _nscannedObjects(0), _oldNscannedObjects(0),
_n(0),
_oldN(0),
+ _chunkMatcher(shardingState.getChunkMatcher(pq.ns())),
_inMemSort(false),
_saveClientCursor(false),
_wouldSaveClientCursor(false),
@@ -628,7 +636,13 @@ namespace mongo {
else {
_nscannedObjects++;
DiskLoc cl = _c->currLoc();
- if( !_c->getsetdup(cl) ) {
+ if ( _chunkMatcher && ! _chunkMatcher->belongsToMe( _c->currKey(), _c->currLoc() ) ){
+ cout << "TEMP skipping un-owned chunk: " << _c->current() << endl;
+ }
+ else if( _c->getsetdup(cl) ) {
+ // dup
+ }
+ else {
// got a match.
if ( _inMemSort ) {
@@ -758,6 +772,8 @@ namespace mongo {
MatchDetails _details;
+ ChunkMatcherPtr _chunkMatcher;
+
bool _inMemSort;
auto_ptr< ScanAndOrder > _so;
diff --git a/jstests/sharding/auto2.js b/jstests/sharding/auto2.js
index 357adde80fa..12b3c0b0bee 100644
--- a/jstests/sharding/auto2.js
+++ b/jstests/sharding/auto2.js
@@ -44,6 +44,7 @@ print( "checkpoint B" )
assert.eq( j * 100 , counta + countb , "from each a:" + counta + " b:" + countb + " i:" + i );
print( "checkpoint B.a" )
+s.printChunks();
assert.eq( j * 100 , coll.find().limit(100000000).itcount() , "itcount A" );
print( "checkpoint C" )
diff --git a/jstests/sharding/findandmodify1.js b/jstests/sharding/findandmodify1.js
index 774701f38eb..aa87a799c3e 100644
--- a/jstests/sharding/findandmodify1.js
+++ b/jstests/sharding/findandmodify1.js
@@ -26,9 +26,10 @@ for (var i=0; i < numObjs; i++){
assert.eq(db.stuff.count({a:1}), i, "1 A");
var out = db.stuff.findAndModify({query: {a:null}, update: {$set: {a:1}}, sort: {_id:1}});
+ printjson( out )
assert.eq(db.stuff.count({a:1}), i+1, "1 B");
- assert.eq(db.stuff.findOne({_id:i}).a, 1, "1 C");
+ assert.eq(db.stuff.findOne({_id:i}).a, 1, "1 C : " + tojson( db.stuff.findOne({_id:i}) ) );
assert.eq(out._id, i, "1 D");
}
diff --git a/jstests/sharding/shard3.js b/jstests/sharding/shard3.js
index 8c5b184e798..df82907d338 100644
--- a/jstests/sharding/shard3.js
+++ b/jstests/sharding/shard3.js
@@ -1,6 +1,6 @@
// shard3.js
-s = new ShardingTest( "shard3" , 2 , 50 , 2 );
+s = new ShardingTest( "shard3" , 2 , 1 , 2 );
s2 = s._mongos[1];
@@ -35,6 +35,8 @@ assert.eq( 3 , primary.find().itcount() + secondary.find().itcount() , "blah 3"
assert.eq( 3 , a.find().toArray().length , "normal B" );
assert.eq( 3 , b.find().toArray().length , "other B" );
+printjson( a._db._adminCommand( "shardingState" ) );
+
// --- filtering ---
function doCounts( name , total ){
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index 78da12de947..b4dbe2f476f 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -314,9 +314,11 @@ namespace mongo {
for ( vector<shared_ptr<ChunkRange> >::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){
shared_ptr<ChunkRange> c = *i;
+ BSONObj myCommand = fixCmdObj( cmdObj , c );
+ cout << "myCommand: " << myCommand << endl;
ShardConnection conn( c->getShard() , fullns );
BSONObj res;
- bool ok = conn->runCommand( conf->getName() , fixCmdObj(cmdObj, c) , res );
+ bool ok = conn->runCommand( conf->getName() , myCommand , res );
conn.done();
if (ok || (strcmp(res["errmsg"].valuestrsafe(), "No matching object found") != 0)){
diff --git a/s/config.cpp b/s/config.cpp
index 7e2268b86ea..41fb4cec6c7 100644
--- a/s/config.cpp
+++ b/s/config.cpp
@@ -481,6 +481,7 @@ namespace mongo {
// indexes
conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "min" << 1 ) , true );
+ conn->ensureIndex( ShardNS::chunk , BSON( "ns" << 1 << "shard" << 1 << "min" << 1 ) , true );
conn->ensureIndex( ShardNS::shard , BSON( "host" << 1 ) , true );
conn.done();
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index b2912bb696f..fc42a8e7adf 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -42,14 +42,6 @@ using namespace std;
namespace mongo {
- bool objectBelongsToMe( const string& ns , const DiskLoc& loc ){
- if ( ! shardingState.enabled() )
- return true;
-
- return true;
- }
-
-
bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse ){
if ( ! shardingState.enabled() )
return false;
diff --git a/s/d_logic.h b/s/d_logic.h
index abcdb9e99fc..3726e9b9972 100644
--- a/s/d_logic.h
+++ b/s/d_logic.h
@@ -23,9 +23,36 @@
namespace mongo {
+ class ShardingState;
+
typedef unsigned long long ConfigVersion;
typedef map<string,ConfigVersion> NSVersionMap;
+ // -----------
+
+ /**
+ * TODO: this only works with single fields at the moment
+ */
+ class ChunkMatcher {
+ typedef map<BSONObj,pair<BSONObj,BSONObj>,BSONObjCmp> MyMap;
+ public:
+
+ bool belongsToMe( const BSONObj& key , const DiskLoc& loc ) const;
+
+ private:
+ ChunkMatcher( ConfigVersion version );
+
+ void gotRange( const BSONObj& min , const BSONObj& max );
+
+ ConfigVersion _version;
+ string _field;
+ MyMap _map;
+
+ friend class ShardingState;
+ };
+
+ typedef shared_ptr<ChunkMatcher> ChunkMatcherPtr;
+
// --------------
// --- global state ---
// --------------
@@ -48,6 +75,8 @@ namespace mongo {
void appendInfo( BSONObjBuilder& b );
+ ChunkMatcherPtr getChunkMatcher( const string& ns , bool load=false , ConfigVersion version=0 );
+
private:
bool _enabled;
@@ -59,6 +88,7 @@ namespace mongo {
mongo::mutex _mutex;
NSVersionMap _versions;
+ map<string,ChunkMatcherPtr> _chunks;
};
extern ShardingState shardingState;
@@ -110,9 +140,6 @@ namespace mongo {
*/
bool handlePossibleShardedMessage( Message &m, DbResponse &dbresponse );
-
- bool objectBelongsToMe( const string& ns , const DiskLoc& loc );
-
// -----------------
// --- writeback ---
// -----------------
diff --git a/s/d_state.cpp b/s/d_state.cpp
index fc88ea3ae96..77bd1b18e63 100644
--- a/s/d_state.cpp
+++ b/s/d_state.cpp
@@ -138,6 +138,90 @@ namespace mongo {
}
+ ChunkMatcherPtr ShardingState::getChunkMatcher( const string& ns , bool load , ConfigVersion version ){
+ return ChunkMatcherPtr();// ERH ERH ERH
+
+ if ( ! _enabled )
+ return ChunkMatcherPtr();
+
+ if ( ! ShardedConnectionInfo::get( false ) )
+ return ChunkMatcherPtr();
+
+ {
+ scoped_lock lk( _mutex );
+ if ( ! version )
+ version = _versions[ns];
+ if ( ! version )
+ return ChunkMatcherPtr();
+
+ ChunkMatcherPtr p = _chunks[ns];
+ if ( p ){
+ if ( ! load )
+ return p;
+
+ if ( p->_version >= version )
+ return p;
+ }
+ else {
+ if ( ! load ){
+ stringstream ss;
+ ss << "getChunkMatcher called with load false for ns: " << ns;
+ msgasserted( 13300 , ss.str() );
+ }
+ }
+
+ }
+
+ BSONObj q;
+ {
+ BSONObjBuilder b;
+ b.append( "ns" , ns.c_str() );
+ b.append( "shard" , BSON( "$in" << BSON_ARRAY( _shardHost << _shardName ) ) );
+ q = b.obj();
+ }
+
+ ScopedDbConnection conn( _configServer );
+
+ auto_ptr<DBClientCursor> cursor = conn->query( "config.chunks" , Query(q).sort( "min" ) );
+ if ( ! cursor->more() ){
+ conn.done();
+ return ChunkMatcherPtr();
+ }
+
+ ChunkMatcherPtr p( new ChunkMatcher( version ) );
+
+ BSONObj min,max;
+ while ( cursor->more() ){
+ BSONObj d = cursor->next();
+
+ if ( min.isEmpty() ){
+ min = d["min"].Obj();
+ max = d["max"].Obj();
+ continue;
+ }
+
+ if ( max == d["min"].Obj() ){
+ max = d["max"].Obj();
+ continue;
+ }
+
+ p->gotRange( min.getOwned() , max.getOwned() );
+ min = d["min"].Obj();
+ max = d["max"].Obj();
+ }
+ assert( ! min.isEmpty() );
+ p->gotRange( min.getOwned() , max.getOwned() );
+
+ conn.done();
+
+ {
+ scoped_lock lk( _mutex );
+ _chunks[ns] = p;
+ }
+
+ return p;
+ }
+
ShardingState shardingState;
// -----ShardingState END ----
@@ -334,6 +418,11 @@ namespace mongo {
return false;
}
+ {
+ dbtemprelease unlock;
+ shardingState.getChunkMatcher( ns , true , version );
+ }
+
result.appendTimestamp( "oldVersion" , oldVersion );
oldVersion = version;
globalVersion = version;
@@ -431,5 +520,31 @@ namespace mongo {
return false;
}
+ // --- ChunkMatcher ---
+ ChunkMatcher::ChunkMatcher( ConfigVersion version )
+ : _version( version ){
+
+ }
+
+ void ChunkMatcher::gotRange( const BSONObj& min , const BSONObj& max ){
+ assert( min.nFields() == 1 );
+ _field = min.firstElement().fieldName();
+ _map[min] = make_pair(min,max);
+ }
+
+ bool ChunkMatcher::belongsToMe( const BSONObj& key , const DiskLoc& loc ) const {
+ return true; // ERH ERH ERH
+ if ( _map.size() == 0 )
+ return false;
+
+ BSONObj x = loc.obj().getFieldDotted( _field.c_str() ).wrap(); // TODO: this is slow
+
+ MyMap::const_iterator a = _map.upper_bound( x );
+ if ( a == _map.end() )
+ a--;
+
+ return x.woCompare( a->second.first ) >= 0 && x.woCompare( a->second.second ) < 0;
+ }
+
}
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
index 70af9cfacc1..168ccd756f7 100644
--- a/s/strategy_shard.cpp
+++ b/s/strategy_shard.cpp
@@ -50,7 +50,7 @@ namespace mongo {
set<ServerAndQuery> servers;
for ( vector<shared_ptr<ChunkRange> >::iterator i = shards.begin(); i != shards.end(); i++ ){
shared_ptr<ChunkRange> c = *i;
- //servers.insert( ServerAndQuery( c->getShard().getConnString() , BSONObj() ) );
+ //servers.insert( ServerAndQuery( c->getShard().getConnString() , BSONObj() ) ); // ERH ERH ERH
servers.insert( ServerAndQuery( c->getShard().getConnString() , c->getFilter() ) );
}