diff options
author | Eliot Horowitz <eliot@10gen.com> | 2009-11-03 10:35:48 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2009-11-03 10:35:48 -0500 |
commit | 3a1c9831c135fbef4bf02ac3458772196bd88038 (patch) | |
tree | 54bc31a20aa6d3238612fd045142fdb1e1aeb4d6 | |
parent | f874b55f1ac769066e6ad1144a633eb0ff8d52e8 (diff) | |
download | mongo-3a1c9831c135fbef4bf02ac3458772196bd88038.tar.gz |
some sharded query re-factoring
-rw-r--r-- | SConstruct | 2 | ||||
-rw-r--r-- | client/parallel.cpp | 196 | ||||
-rw-r--r-- | client/parallel.h | 104 | ||||
-rw-r--r-- | db/mr.cpp | 36 | ||||
-rw-r--r-- | jstests/sharding/features2.js | 45 | ||||
-rw-r--r-- | s/chunk.cpp | 2 | ||||
-rw-r--r-- | s/commands_public.cpp | 60 | ||||
-rw-r--r-- | s/cursors.cpp | 194 | ||||
-rw-r--r-- | s/cursors.h | 84 | ||||
-rw-r--r-- | s/d_logic.cpp | 4 | ||||
-rw-r--r-- | s/request.h | 18 | ||||
-rw-r--r-- | s/strategy.h | 2 | ||||
-rw-r--r-- | s/strategy_shard.cpp | 18 | ||||
-rw-r--r-- | s/util.h | 48 |
14 files changed, 530 insertions, 283 deletions
diff --git a/SConstruct b/SConstruct index df01b3b4b78..965b71b3f58 100644 --- a/SConstruct +++ b/SConstruct @@ -278,7 +278,7 @@ commonFiles = Split( "stdafx.cpp buildinfo.cpp db/jsobj.cpp db/json.cpp db/comma commonFiles += [ "util/background.cpp" , "util/mmap.cpp" , "util/sock.cpp" , "util/util.cpp" , "util/message.cpp" , "util/assert_util.cpp" , "util/httpclient.cpp" , "util/md5main.cpp" , "util/base64.cpp" ] commonFiles += Glob( "util/*.c" ) -commonFiles += Split( "client/connpool.cpp client/dbclient.cpp client/model.cpp" ) +commonFiles += Split( "client/connpool.cpp client/dbclient.cpp client/model.cpp client/parallel.cpp" ) commonFiles += [ "scripting/engine.cpp" ] #mmap stuff diff --git a/client/parallel.cpp b/client/parallel.cpp new file mode 100644 index 00000000000..e77acfccdb5 --- /dev/null +++ b/client/parallel.cpp @@ -0,0 +1,196 @@ +// parallel.cpp + +#include "stdafx.h" +#include "parallel.h" +#include "connpool.h" +#include "../db/queryutil.h" +#include "../db/dbmessage.h" +#include "../s/util.h" + +namespace mongo { + + // -------- ClusteredCursor ----------- + + ClusteredCursor::ClusteredCursor( QueryMessage& q ){ + _ns = q.ns; + _query = q.query.copy(); + _options = q.queryOptions; + if ( q.fields.get() ) + _fields = q.fields->getSpec(); + _done = false; + } + + ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){ + _ns = ns; + _query = q.getOwned(); + _options = options; + _fields = fields.getOwned(); + _done = false; + } + + ClusteredCursor::~ClusteredCursor(){ + _done = true; // just in case + } + + auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra ){ + uassert( "cursor already done" , ! _done ); + + BSONObj q = _query; + if ( ! extra.isEmpty() ){ + q = concatQuery( q , extra ); + } + + ScopedDbConnection conn( server ); + checkShardVersion( conn.conn() , _ns ); + + log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl; + auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options ); + if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) ) + throw StaleConfigException( _ns , "ClusteredCursor::query" ); + + conn.done(); + return cursor; + } + + BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ + if ( ! query.hasField( "query" ) ) + return _concatFilter( query , extraFilter ); + + BSONObjBuilder b; + BSONObjIterator i( query ); + while ( i.more() ){ + BSONElement e = i.next(); + + if ( strcmp( e.fieldName() , "query" ) ){ + b.append( e ); + continue; + } + + b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) ); + } + return b.obj(); + } + + BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){ + BSONObjBuilder b; + b.appendElements( filter ); + b.appendElements( extra ); + return b.obj(); + // TODO: should do some simplification here if possibl ideally + } + + + // -------- SerialServerClusteredCursor ----------- + + SerialServerClusteredCursor::SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){ + for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ) + _servers.push_back( *i ); + + if ( sortOrder > 0 ) + sort( _servers.begin() , _servers.end() ); + else if ( sortOrder < 0 ) + sort( _servers.rbegin() , _servers.rend() ); + + _serverIndex = 0; + } + + bool SerialServerClusteredCursor::more(){ + if ( _current.get() && _current->more() ) + return true; + + if ( _serverIndex >= _servers.size() ) + return false; + + ServerAndQuery& sq = _servers[_serverIndex++]; + _current = query( sq._server , 0 , sq._extra ); + return _current->more(); + } + + BSONObj SerialServerClusteredCursor::next(){ + uassert( "no more items" , more() ); + return _current->next(); + } + + // -------- ParallelSortClusteredCursor ----------- + + ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ) : ClusteredCursor( q ) , _servers( servers ){ + _numServers = servers.size(); + _sortKey = sortKey.getOwned(); + + _cursors = new auto_ptr<DBClientCursor>[_numServers]; + _nexts = new BSONObj[_numServers]; + + // TODO: parellize + int num = 0; + for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){ + const ServerAndQuery& sq = *i; + _cursors[num++] = query( sq._server , 0 , sq._extra ); + } + + } + + ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){ + delete [] _cursors; + delete [] _nexts; + } + + bool ParallelSortClusteredCursor::more(){ + for ( int i=0; i<_numServers; i++ ){ + if ( ! _nexts[i].isEmpty() ) + return true; + + if ( _cursors[i].get() && _cursors[i]->more() ) + return true; + } + return false; + } + + BSONObj ParallelSortClusteredCursor::next(){ + advance(); + + BSONObj best = BSONObj(); + int bestFrom = -1; + + for ( int i=0; i<_numServers; i++){ + if ( _nexts[i].isEmpty() ) + continue; + + if ( best.isEmpty() ){ + best = _nexts[i]; + bestFrom = i; + continue; + } + + int comp = best.woSortOrder( _nexts[i] , _sortKey ); + if ( comp < 0 ) + continue; + + best = _nexts[i]; + bestFrom = i; + } + + uassert( "no more elements" , ! best.isEmpty() ); + _nexts[bestFrom] = BSONObj(); + + return best; + } + + void ParallelSortClusteredCursor::advance(){ + for ( int i=0; i<_numServers; i++ ){ + + if ( ! _nexts[i].isEmpty() ){ + // already have a good object there + continue; + } + + if ( ! _cursors[i]->more() ){ + // cursor is dead, oh well + continue; + } + + _nexts[i] = _cursors[i]->next(); + } + + } + +} diff --git a/client/parallel.h b/client/parallel.h new file mode 100644 index 00000000000..e7d39a89ba9 --- /dev/null +++ b/client/parallel.h @@ -0,0 +1,104 @@ +// parallel.h + +/* Copyright 2009 10gen Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + tools for wokring in parallel/sharded/clustered environment + */ + +#include "stdafx.h" +#include "dbclient.h" +#include "../db/dbmessage.h" + +namespace mongo { + + class ClusteredCursor { + public: + ClusteredCursor( QueryMessage& q ); + ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() ); + virtual ~ClusteredCursor(); + + virtual bool more() = 0; + virtual BSONObj next() = 0; + + static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ); + + protected: + auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() ); + + static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); + + string _ns; + BSONObj _query; + int _options; + BSONObj _fields; + + bool _done; + }; + + + class ServerAndQuery { + public: + ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : + _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ + } + + bool operator<( const ServerAndQuery& other ) const{ + if ( ! _orderObject.isEmpty() ) + return _orderObject.woCompare( other._orderObject ) < 0; + + if ( _server < other._server ) + return true; + if ( other._server > _server ) + return false; + return _extra.woCompare( other._extra ) < 0; + } + + string _server; + BSONObj _extra; + BSONObj _orderObject; + }; + + class SerialServerClusteredCursor : public ClusteredCursor { + public: + SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder=0); + virtual bool more(); + virtual BSONObj next(); + private: + vector<ServerAndQuery> _servers; + unsigned _serverIndex; + + auto_ptr<DBClientCursor> _current; + }; + + class ParallelSortClusteredCursor : public ClusteredCursor { + public: + ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ); + virtual ~ParallelSortClusteredCursor(); + virtual bool more(); + virtual BSONObj next(); + private: + void advance(); + + int _numServers; + set<ServerAndQuery> _servers; + BSONObj _sortKey; + + auto_ptr<DBClientCursor> * _cursors; + BSONObj * _nexts; + }; + +} diff --git a/db/mr.cpp b/db/mr.cpp index 70320f26200..a3aa178ddec 100644 --- a/db/mr.cpp +++ b/db/mr.cpp @@ -20,6 +20,8 @@ #include "instance.h" #include "commands.h" #include "../scripting/engine.h" +#include "../client/dbclient.h" +#include "../client/connpool.h" namespace mongo { @@ -427,6 +429,40 @@ namespace mongo { DBDirectClient db; } mapReduceCommand; + + class MapReduceFinishCommand : public Command { + public: + MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){} + virtual bool slaveOk() { return true; } + + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + string dbname = cc().database()->name; + + BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck(); + cout << origCmd << endl; + errmsg = "eliot was here"; + + BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck(); + vector< auto_ptr<DBClientCursor> > shardCursors; + BSONObjIterator i( shards ); + while ( i.more() ){ + BSONElement e = i.next(); + string shard = e.fieldName(); + BSONObj res = e.embeddedObjectUserCheck(); + cout << "\t" << shard << "\t" << res << endl; + + ScopedDbConnection conn( shard ); + //shardCursors.push_back( conn->query( dbname + "." + res["result"].valuestrsafe() , Query().sort( BSON( "_id" << 1 ) ) ) ); + } + + //while ( true ){ + + //} + + return 0; + } + } mapReduceFinishCommand; } diff --git a/jstests/sharding/features2.js b/jstests/sharding/features2.js index 559228c15a7..46a398e2596 100644 --- a/jstests/sharding/features2.js +++ b/jstests/sharding/features2.js @@ -51,6 +51,49 @@ assert.eq( 0 , db.foo.count() , "D7" ); // --- map/reduce - +db.mr.save( { x : 1 , tags : [ "a" , "b" ] } ); +db.mr.save( { x : 2 , tags : [ "b" , "c" ] } ); +db.mr.save( { x : 3 , tags : [ "c" , "a" ] } ); +db.mr.save( { x : 4 , tags : [ "b" , "c" ] } ); + +m = function(){ + this.tags.forEach( + function(z){ + emit( z , { count : 1 } ); + } + ); +}; + +r = function( key , values ){ + var total = 0; + for ( var i=0; i<values.length; i++ ){ + total += values[i].count; + } + return { count : total }; +}; + +doMR = function( n ){ + var res = db.mr.mapReduce( m , r ); + printjson( res ); + var x = db[res.result]; + assert.eq( 3 , x.find().count() , "MR T1 " + n ); + + var z = {}; + x.find().forEach( function(a){ z[a._id] = a.value.count; } ); + assert.eq( 3 , z.keySet().length , "MR T2 " + n ); + assert.eq( 2 , z.a , "MR T2 " + n ); + assert.eq( 3 , z.b , "MR T2 " + n ); + assert.eq( 3 , z.c , "MR T2 " + n ); + + x.drop(); +} + +doMR( "before" ); + +assert.eq( 1 , s.onNumShards( "mr" ) , "E1" ); +//s.shardGo( "mr" , { x : 1 } , { x : 2 } , { x : 3 } ); +//assert.eq( 2 , s.onNumShards( "mr" ) , "E1" ); + +doMR( "after" ); s.stop(); diff --git a/s/chunk.cpp b/s/chunk.cpp index 987d77232bb..4aa02303df6 100644 --- a/s/chunk.cpp +++ b/s/chunk.cpp @@ -283,7 +283,7 @@ namespace mongo { BSONObj f = getFilter(); if ( ! filter.isEmpty() ) - f = ShardedCursor::concatQuery( f , filter ); + f = ClusteredCursor::concatQuery( f , filter ); BSONObj result; unsigned long long n = conn->count( _ns , f ); diff --git a/s/commands_public.cpp b/s/commands_public.cpp index 01ab747edb1..a9f512cf35d 100644 --- a/s/commands_public.cpp +++ b/s/commands_public.cpp @@ -146,7 +146,7 @@ namespace mongo { help << "{ distinct : 'collection name' , key : 'a.b' }"; } bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ - + string dbName = getDBName( ns ); string collection = cmdObj.firstElement().valuestrsafe(); string fullns = dbName + "." + collection; @@ -200,5 +200,63 @@ namespace mongo { return true; } } disinctCmd; + + class MRCmd : public PublicGridCommand { + public: + MRCmd() : PublicGridCommand( "mapreduce" ){} + bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){ + + string dbName = getDBName( ns ); + string collection = cmdObj.firstElement().valuestrsafe(); + string fullns = dbName + "." + collection; + + DBConfig * conf = grid.getDBConfig( dbName , false ); + + if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){ + return passthrough( conf , cmdObj , result ); + } + + ChunkManager * cm = conf->getChunkManager( fullns ); + + BSONObj q; + if ( cmdObj["query"].type() == Object ){ + q = cmdObj["query"].embeddedObjectUserCheck(); + } + + vector<Chunk*> chunks; + cm->getChunksForQuery( chunks , q ); + + BSONObjBuilder finalB; + finalB.append( "mapreduce.shardedfinish" , cmdObj ); + + BSONObjBuilder shardresults; + for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){ + Chunk * c = *i; + ScopedDbConnection conn( c->getShard() ); + + BSONObj myres; + if ( ! conn->runCommand( dbName , cmdObj , myres ) ){ + errmsg = "mongod mr failed: "; + errmsg += myres.toString(); + return 0; + } + shardresults.append( c->getShard() , myres ); + } + + finalB.append( "shards" , shardresults.obj() ); + + BSONObj final = finalB.obj(); + + ScopedDbConnection conn( conf->getPrimary() ); + BSONObj finalResult; + if ( ! conn->runCommand( dbName , final , finalResult ) ){ + errmsg = "final reduce failed: "; + errmsg += finalResult.toString(); + return 0; + } + result.appendElements( finalResult ); + return 1; + } + } mrCmd; } } diff --git a/s/cursors.cpp b/s/cursors.cpp index 2c491abef63..9835680174b 100644 --- a/s/cursors.cpp +++ b/s/cursors.cpp @@ -9,81 +9,29 @@ namespace mongo { // -------- ShardedCursor ----------- - ShardedCursor::ShardedCursor( QueryMessage& q ){ - _ns = q.ns; - _query = q.query.copy(); - _options = q.queryOptions; + ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ){ + assert( cursor ); + _cursor = cursor; + _skip = q.ntoskip; _ntoreturn = q.ntoreturn; _totalSent = 0; _done = false; - if ( q.fields.get() ){ - _fields = q.fields->getSpec(); - } - else { - _fields = BSONObj(); - } - do { _id = security.getNonce(); } while ( _id == 0 ); } - ShardedCursor::~ShardedCursor(){ - _done = true; // just in case + ShardedClientCursor::~ShardedClientCursor(){ + assert( _cursor ); + delete _cursor; + _cursor = 0; } - - auto_ptr<DBClientCursor> ShardedCursor::query( const string& server , int num , BSONObj extra ){ - uassert( "cursor already done" , ! _done ); - - BSONObj q = _query; - if ( ! extra.isEmpty() ){ - q = concatQuery( q , extra ); - } - - ScopedDbConnection conn( server ); - checkShardVersion( conn.conn() , _ns ); - log(5) << "ShardedCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl; - auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options ); - if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) ) - throw StaleConfigException( _ns , "ShardedCursor::query" ); - - conn.done(); - return cursor; - } - - BSONObj ShardedCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){ - if ( ! query.hasField( "query" ) ) - return _concatFilter( query , extraFilter ); - - BSONObjBuilder b; - BSONObjIterator i( query ); - while ( i.more() ){ - BSONElement e = i.next(); - - if ( strcmp( e.fieldName() , "query" ) ){ - b.append( e ); - continue; - } - - b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) ); - } - return b.obj(); - } - - BSONObj ShardedCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){ - BSONObjBuilder b; - b.appendElements( filter ); - b.appendElements( extra ); - return b.obj(); - // TODO: should do some simplification here if possibl ideally - } - - bool ShardedCursor::sendNextBatch( Request& r , int ntoreturn ){ + bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){ uassert( "cursor already done" , ! _done ); int maxSize = 1024 * 1024; @@ -95,8 +43,8 @@ namespace mongo { int num = 0; bool sendMore = true; - while ( more() ){ - BSONObj o = next(); + while ( _cursor->more() ){ + BSONObj o = _cursor->next(); b.append( (void*)o.objdata() , o.objsize() ); num++; @@ -116,7 +64,7 @@ namespace mongo { } } - bool hasMore = sendMore && more(); + bool hasMore = sendMore && _cursor->more(); log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << _id << " totalSent: " << _totalSent << endl; replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? _id : 0 ); @@ -126,118 +74,6 @@ namespace mongo { return hasMore; } - // -------- SerialServerShardedCursor ----------- - - SerialServerShardedCursor::SerialServerShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder) : ShardedCursor( q ){ - for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ) - _servers.push_back( *i ); - - if ( sortOrder > 0 ) - sort( _servers.begin() , _servers.end() ); - else if ( sortOrder < 0 ) - sort( _servers.rbegin() , _servers.rend() ); - - _serverIndex = 0; - } - - bool SerialServerShardedCursor::more(){ - if ( _current.get() && _current->more() ) - return true; - - if ( _serverIndex >= _servers.size() ) - return false; - - ServerAndQuery& sq = _servers[_serverIndex++]; - _current = query( sq._server , 0 , sq._extra ); - return _current->more(); - } - - BSONObj SerialServerShardedCursor::next(){ - uassert( "no more items" , more() ); - return _current->next(); - } - - // -------- ParallelSortShardedCursor ----------- - - ParallelSortShardedCursor::ParallelSortShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ) : ShardedCursor( q ) , _servers( servers ){ - _numServers = servers.size(); - _sortKey = sortKey.getOwned(); - - _cursors = new auto_ptr<DBClientCursor>[_numServers]; - _nexts = new BSONObj[_numServers]; - - // TODO: parellize - int num = 0; - for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){ - const ServerAndQuery& sq = *i; - _cursors[num++] = query( sq._server , 0 , sq._extra ); - } - - } - - ParallelSortShardedCursor::~ParallelSortShardedCursor(){ - delete [] _cursors; - delete [] _nexts; - } - - bool ParallelSortShardedCursor::more(){ - for ( int i=0; i<_numServers; i++ ){ - if ( ! _nexts[i].isEmpty() ) - return true; - - if ( _cursors[i].get() && _cursors[i]->more() ) - return true; - } - return false; - } - - BSONObj ParallelSortShardedCursor::next(){ - advance(); - - BSONObj best = BSONObj(); - int bestFrom = -1; - - for ( int i=0; i<_numServers; i++){ - if ( _nexts[i].isEmpty() ) - continue; - - if ( best.isEmpty() ){ - best = _nexts[i]; - bestFrom = i; - continue; - } - - int comp = best.woSortOrder( _nexts[i] , _sortKey ); - if ( comp < 0 ) - continue; - - best = _nexts[i]; - bestFrom = i; - } - - uassert( "no more elements" , ! best.isEmpty() ); - _nexts[bestFrom] = BSONObj(); - - return best; - } - - void ParallelSortShardedCursor::advance(){ - for ( int i=0; i<_numServers; i++ ){ - - if ( ! _nexts[i].isEmpty() ){ - // already have a good object there - continue; - } - - if ( ! _cursors[i]->more() ){ - // cursor is dead, oh well - continue; - } - - _nexts[i] = _cursors[i]->next(); - } - - } CursorCache::CursorCache(){ } @@ -246,8 +82,8 @@ namespace mongo { // TODO: delete old cursors? } - ShardedCursor* CursorCache::get( long long id ){ - map<long long,ShardedCursor*>::iterator i = _cursors.find( id ); + ShardedClientCursor* CursorCache::get( long long id ){ + map<long long,ShardedClientCursor*>::iterator i = _cursors.find( id ); if ( i == _cursors.end() ){ OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl; return 0; @@ -255,7 +91,7 @@ namespace mongo { return i->second; } - void CursorCache::store( ShardedCursor * cursor ){ + void CursorCache::store( ShardedClientCursor * cursor ){ _cursors[cursor->getId()] = cursor; } void CursorCache::remove( long long id ){ diff --git a/s/cursors.h b/s/cursors.h index 218d9025d0e..b1ed4b05c86 100644 --- a/s/cursors.h +++ b/s/cursors.h @@ -7,18 +7,16 @@ #include "../db/jsobj.h" #include "../db/dbmessage.h" #include "../client/dbclient.h" +#include "../client/parallel.h" #include "request.h" namespace mongo { - class ShardedCursor { + class ShardedClientCursor { public: - ShardedCursor( QueryMessage& q ); - virtual ~ShardedCursor(); - - virtual bool more() = 0; - virtual BSONObj next() = 0; + ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ); + virtual ~ShardedClientCursor(); long long getId(){ return _id; } @@ -27,78 +25,18 @@ namespace mongo { */ bool sendNextBatch( Request& r ){ return sendNextBatch( r , _ntoreturn ); } bool sendNextBatch( Request& r , int ntoreturn ); - - static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter ); protected: - auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() ); - - static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter ); - - string _ns; - int _options; + + ClusteredCursor * _cursor; + int _skip; int _ntoreturn; - - BSONObj _query; - BSONObj _fields; - - long long _id; int _totalSent; bool _done; - }; - - - class ServerAndQuery { - public: - ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : - _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){ - } - bool operator<( const ServerAndQuery& other ) const{ - if ( ! _orderObject.isEmpty() ) - return _orderObject.woCompare( other._orderObject ) < 0; - - if ( _server < other._server ) - return true; - if ( other._server > _server ) - return false; - return _extra.woCompare( other._extra ) < 0; - } - - string _server; - BSONObj _extra; - BSONObj _orderObject; - }; - - class SerialServerShardedCursor : public ShardedCursor { - public: - SerialServerShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder=0); - virtual bool more(); - virtual BSONObj next(); - private: - vector<ServerAndQuery> _servers; - unsigned _serverIndex; - - auto_ptr<DBClientCursor> _current; - }; - - class ParallelSortShardedCursor : public ShardedCursor { - public: - ParallelSortShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ); - virtual ~ParallelSortShardedCursor(); - virtual bool more(); - virtual BSONObj next(); - private: - void advance(); - - int _numServers; - set<ServerAndQuery> _servers; - BSONObj _sortKey; - - auto_ptr<DBClientCursor> * _cursors; - BSONObj * _nexts; + long long _id; }; class CursorCache { @@ -106,12 +44,12 @@ namespace mongo { CursorCache(); ~CursorCache(); - ShardedCursor* get( long long id ); - void store( ShardedCursor* cursor ); + ShardedClientCursor * get( long long id ); + void store( ShardedClientCursor* cursor ); void remove( long long id ); private: - map<long long,ShardedCursor*> _cursors; + map<long long,ShardedClientCursor*> _cursors; }; extern CursorCache cursorCache; diff --git a/s/d_logic.cpp b/s/d_logic.cpp index ded4b1b85d6..87bcb5db293 100644 --- a/s/d_logic.cpp +++ b/s/d_logic.cpp @@ -496,6 +496,10 @@ namespace mongo { return true; } + + void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false ){ + // no-op in mongod + } } diff --git a/s/request.h b/s/request.h index cdc75332486..689216c61c2 100644 --- a/s/request.h +++ b/s/request.h @@ -6,6 +6,7 @@ #include "../util/message.h" #include "../db/dbmessage.h" #include "config.h" +#include "util.h" namespace mongo { @@ -86,23 +87,6 @@ namespace mongo { int _clientId; ClientInfo * _clientInfo; }; - - class StaleConfigException : public std::exception { - public: - StaleConfigException( const string& ns , const string& msg){ - stringstream s; - s << "StaleConfigException ns: " << ns << " " << msg; - _msg = s.str(); - } - - virtual ~StaleConfigException() throw(){} - - virtual const char* what() const throw(){ - return _msg.c_str(); - } - private: - string _msg; - }; typedef map<int,ClientInfo*> ClientCache; diff --git a/s/strategy.h b/s/strategy.h index bdeb4fa4f7c..e4b93b5a155 100644 --- a/s/strategy.h +++ b/s/strategy.h @@ -27,8 +27,6 @@ namespace mongo { extern Strategy * SINGLE; extern Strategy * SHARDED; - void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false ); - bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result ); bool lockNamespaceOnServer( const string& server , const string& ns ); diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp index a34324d9ca2..25925180d25 100644 --- a/s/strategy_shard.cpp +++ b/s/strategy_shard.cpp @@ -35,13 +35,13 @@ namespace mongo { num++; } - ShardedCursor * cursor = 0; + ClusteredCursor * cursor = 0; BSONObj sort = query.getSort(); if ( sort.isEmpty() ){ // 1. no sort, can just hit them in serial - cursor = new SerialServerShardedCursor( servers , q ); + cursor = new SerialServerClusteredCursor( servers , q ); } else { int shardKeyOrder = info->getShardKey().canOrder( sort ); @@ -59,21 +59,23 @@ namespace mongo { } buckets.insert( ServerAndQuery( s->getShard() , extra , s->getMin() ) ); } - cursor = new SerialServerShardedCursor( buckets , q , shardKeyOrder ); + cursor = new SerialServerClusteredCursor( buckets , q , shardKeyOrder ); } else { // 3. sort on non-sharded key, pull back a portion from each server and iterate slowly - cursor = new ParallelSortShardedCursor( servers , q , sort ); + cursor = new ParallelSortClusteredCursor( servers , q , sort ); } } assert( cursor ); - if ( ! cursor->sendNextBatch( r ) ){ + + ShardedClientCursor * cc = new ShardedClientCursor( q , cursor ); + if ( ! cc->sendNextBatch( r ) ){ delete( cursor ); return; } - log(6) << "storing cursor : " << cursor->getId() << endl; - cursorCache.store( cursor ); + log(6) << "storing cursor : " << cc->getId() << endl; + cursorCache.store( cc ); } virtual void getMore( Request& r ){ @@ -82,7 +84,7 @@ namespace mongo { log(6) << "want cursor : " << id << endl; - ShardedCursor * cursor = cursorCache.get( id ); + ShardedClientCursor * cursor = cursorCache.get( id ); if ( ! cursor ){ log(6) << "\t invalid cursor :(" << endl; replyToQuery( QueryResult::ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 ); diff --git a/s/util.h b/s/util.h new file mode 100644 index 00000000000..fb307fbc597 --- /dev/null +++ b/s/util.h @@ -0,0 +1,48 @@ +// util.h + +/** +* Copyright (C) 2008 10gen Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "stdafx.h" + +/** + some generic sharding utils that can be used in mongod or mongos + */ + +namespace mongo { + + /** + your config info for a given shard/chunk is out of date */ + class StaleConfigException : public std::exception { + public: + StaleConfigException( const string& ns , const string& msg){ + stringstream s; + s << "StaleConfigException ns: " << ns << " " << msg; + _msg = s.str(); + } + + virtual ~StaleConfigException() throw(){} + + virtual const char* what() const throw(){ + return _msg.c_str(); + } + private: + string _msg; + }; + + void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false ); + +} |