summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2009-11-03 10:35:48 -0500
committerEliot Horowitz <eliot@10gen.com>2009-11-03 10:35:48 -0500
commit3a1c9831c135fbef4bf02ac3458772196bd88038 (patch)
tree54bc31a20aa6d3238612fd045142fdb1e1aeb4d6
parentf874b55f1ac769066e6ad1144a633eb0ff8d52e8 (diff)
downloadmongo-3a1c9831c135fbef4bf02ac3458772196bd88038.tar.gz
some sharded query re-factoring
-rw-r--r--SConstruct2
-rw-r--r--client/parallel.cpp196
-rw-r--r--client/parallel.h104
-rw-r--r--db/mr.cpp36
-rw-r--r--jstests/sharding/features2.js45
-rw-r--r--s/chunk.cpp2
-rw-r--r--s/commands_public.cpp60
-rw-r--r--s/cursors.cpp194
-rw-r--r--s/cursors.h84
-rw-r--r--s/d_logic.cpp4
-rw-r--r--s/request.h18
-rw-r--r--s/strategy.h2
-rw-r--r--s/strategy_shard.cpp18
-rw-r--r--s/util.h48
14 files changed, 530 insertions, 283 deletions
diff --git a/SConstruct b/SConstruct
index df01b3b4b78..965b71b3f58 100644
--- a/SConstruct
+++ b/SConstruct
@@ -278,7 +278,7 @@ commonFiles = Split( "stdafx.cpp buildinfo.cpp db/jsobj.cpp db/json.cpp db/comma
commonFiles += [ "util/background.cpp" , "util/mmap.cpp" , "util/sock.cpp" , "util/util.cpp" , "util/message.cpp" ,
"util/assert_util.cpp" , "util/httpclient.cpp" , "util/md5main.cpp" , "util/base64.cpp" ]
commonFiles += Glob( "util/*.c" )
-commonFiles += Split( "client/connpool.cpp client/dbclient.cpp client/model.cpp" )
+commonFiles += Split( "client/connpool.cpp client/dbclient.cpp client/model.cpp client/parallel.cpp" )
commonFiles += [ "scripting/engine.cpp" ]
#mmap stuff
diff --git a/client/parallel.cpp b/client/parallel.cpp
new file mode 100644
index 00000000000..e77acfccdb5
--- /dev/null
+++ b/client/parallel.cpp
@@ -0,0 +1,196 @@
+// parallel.cpp
+
+#include "stdafx.h"
+#include "parallel.h"
+#include "connpool.h"
+#include "../db/queryutil.h"
+#include "../db/dbmessage.h"
+#include "../s/util.h"
+
+namespace mongo {
+
+ // -------- ClusteredCursor -----------
+
+ ClusteredCursor::ClusteredCursor( QueryMessage& q ){
+ _ns = q.ns;
+ _query = q.query.copy();
+ _options = q.queryOptions;
+ if ( q.fields.get() )
+ _fields = q.fields->getSpec();
+ _done = false;
+ }
+
+ ClusteredCursor::ClusteredCursor( const string& ns , const BSONObj& q , int options , const BSONObj& fields ){
+ _ns = ns;
+ _query = q.getOwned();
+ _options = options;
+ _fields = fields.getOwned();
+ _done = false;
+ }
+
+ ClusteredCursor::~ClusteredCursor(){
+ _done = true; // just in case
+ }
+
+ auto_ptr<DBClientCursor> ClusteredCursor::query( const string& server , int num , BSONObj extra ){
+ uassert( "cursor already done" , ! _done );
+
+ BSONObj q = _query;
+ if ( ! extra.isEmpty() ){
+ q = concatQuery( q , extra );
+ }
+
+ ScopedDbConnection conn( server );
+ checkShardVersion( conn.conn() , _ns );
+
+ log(5) << "ClusteredCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl;
+ auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options );
+ if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) )
+ throw StaleConfigException( _ns , "ClusteredCursor::query" );
+
+ conn.done();
+ return cursor;
+ }
+
+ BSONObj ClusteredCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){
+ if ( ! query.hasField( "query" ) )
+ return _concatFilter( query , extraFilter );
+
+ BSONObjBuilder b;
+ BSONObjIterator i( query );
+ while ( i.more() ){
+ BSONElement e = i.next();
+
+ if ( strcmp( e.fieldName() , "query" ) ){
+ b.append( e );
+ continue;
+ }
+
+ b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) );
+ }
+ return b.obj();
+ }
+
+ BSONObj ClusteredCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){
+ BSONObjBuilder b;
+ b.appendElements( filter );
+ b.appendElements( extra );
+ return b.obj();
+ // TODO: should do some simplification here if possibl ideally
+ }
+
+
+ // -------- SerialServerClusteredCursor -----------
+
+ SerialServerClusteredCursor::SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder) : ClusteredCursor( q ){
+ for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ )
+ _servers.push_back( *i );
+
+ if ( sortOrder > 0 )
+ sort( _servers.begin() , _servers.end() );
+ else if ( sortOrder < 0 )
+ sort( _servers.rbegin() , _servers.rend() );
+
+ _serverIndex = 0;
+ }
+
+ bool SerialServerClusteredCursor::more(){
+ if ( _current.get() && _current->more() )
+ return true;
+
+ if ( _serverIndex >= _servers.size() )
+ return false;
+
+ ServerAndQuery& sq = _servers[_serverIndex++];
+ _current = query( sq._server , 0 , sq._extra );
+ return _current->more();
+ }
+
+ BSONObj SerialServerClusteredCursor::next(){
+ uassert( "no more items" , more() );
+ return _current->next();
+ }
+
+ // -------- ParallelSortClusteredCursor -----------
+
+ ParallelSortClusteredCursor::ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ) : ClusteredCursor( q ) , _servers( servers ){
+ _numServers = servers.size();
+ _sortKey = sortKey.getOwned();
+
+ _cursors = new auto_ptr<DBClientCursor>[_numServers];
+ _nexts = new BSONObj[_numServers];
+
+ // TODO: parellize
+ int num = 0;
+ for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){
+ const ServerAndQuery& sq = *i;
+ _cursors[num++] = query( sq._server , 0 , sq._extra );
+ }
+
+ }
+
+ ParallelSortClusteredCursor::~ParallelSortClusteredCursor(){
+ delete [] _cursors;
+ delete [] _nexts;
+ }
+
+ bool ParallelSortClusteredCursor::more(){
+ for ( int i=0; i<_numServers; i++ ){
+ if ( ! _nexts[i].isEmpty() )
+ return true;
+
+ if ( _cursors[i].get() && _cursors[i]->more() )
+ return true;
+ }
+ return false;
+ }
+
+ BSONObj ParallelSortClusteredCursor::next(){
+ advance();
+
+ BSONObj best = BSONObj();
+ int bestFrom = -1;
+
+ for ( int i=0; i<_numServers; i++){
+ if ( _nexts[i].isEmpty() )
+ continue;
+
+ if ( best.isEmpty() ){
+ best = _nexts[i];
+ bestFrom = i;
+ continue;
+ }
+
+ int comp = best.woSortOrder( _nexts[i] , _sortKey );
+ if ( comp < 0 )
+ continue;
+
+ best = _nexts[i];
+ bestFrom = i;
+ }
+
+ uassert( "no more elements" , ! best.isEmpty() );
+ _nexts[bestFrom] = BSONObj();
+
+ return best;
+ }
+
+ void ParallelSortClusteredCursor::advance(){
+ for ( int i=0; i<_numServers; i++ ){
+
+ if ( ! _nexts[i].isEmpty() ){
+ // already have a good object there
+ continue;
+ }
+
+ if ( ! _cursors[i]->more() ){
+ // cursor is dead, oh well
+ continue;
+ }
+
+ _nexts[i] = _cursors[i]->next();
+ }
+
+ }
+
+}
diff --git a/client/parallel.h b/client/parallel.h
new file mode 100644
index 00000000000..e7d39a89ba9
--- /dev/null
+++ b/client/parallel.h
@@ -0,0 +1,104 @@
+// parallel.h
+
+/* Copyright 2009 10gen Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ tools for wokring in parallel/sharded/clustered environment
+ */
+
+#include "stdafx.h"
+#include "dbclient.h"
+#include "../db/dbmessage.h"
+
+namespace mongo {
+
+ class ClusteredCursor {
+ public:
+ ClusteredCursor( QueryMessage& q );
+ ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
+ virtual ~ClusteredCursor();
+
+ virtual bool more() = 0;
+ virtual BSONObj next() = 0;
+
+ static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
+
+ protected:
+ auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() );
+
+ static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
+
+ string _ns;
+ BSONObj _query;
+ int _options;
+ BSONObj _fields;
+
+ bool _done;
+ };
+
+
+ class ServerAndQuery {
+ public:
+ ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
+ _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
+ }
+
+ bool operator<( const ServerAndQuery& other ) const{
+ if ( ! _orderObject.isEmpty() )
+ return _orderObject.woCompare( other._orderObject ) < 0;
+
+ if ( _server < other._server )
+ return true;
+ if ( other._server > _server )
+ return false;
+ return _extra.woCompare( other._extra ) < 0;
+ }
+
+ string _server;
+ BSONObj _extra;
+ BSONObj _orderObject;
+ };
+
+ class SerialServerClusteredCursor : public ClusteredCursor {
+ public:
+ SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder=0);
+ virtual bool more();
+ virtual BSONObj next();
+ private:
+ vector<ServerAndQuery> _servers;
+ unsigned _serverIndex;
+
+ auto_ptr<DBClientCursor> _current;
+ };
+
+ class ParallelSortClusteredCursor : public ClusteredCursor {
+ public:
+ ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey );
+ virtual ~ParallelSortClusteredCursor();
+ virtual bool more();
+ virtual BSONObj next();
+ private:
+ void advance();
+
+ int _numServers;
+ set<ServerAndQuery> _servers;
+ BSONObj _sortKey;
+
+ auto_ptr<DBClientCursor> * _cursors;
+ BSONObj * _nexts;
+ };
+
+}
diff --git a/db/mr.cpp b/db/mr.cpp
index 70320f26200..a3aa178ddec 100644
--- a/db/mr.cpp
+++ b/db/mr.cpp
@@ -20,6 +20,8 @@
#include "instance.h"
#include "commands.h"
#include "../scripting/engine.h"
+#include "../client/dbclient.h"
+#include "../client/connpool.h"
namespace mongo {
@@ -427,6 +429,40 @@ namespace mongo {
DBDirectClient db;
} mapReduceCommand;
+
+ class MapReduceFinishCommand : public Command {
+ public:
+ MapReduceFinishCommand() : Command( "mapreduce.shardedfinish" ){}
+ virtual bool slaveOk() { return true; }
+
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ string dbname = cc().database()->name;
+
+ BSONObj origCmd = cmdObj.firstElement().embeddedObjectUserCheck();
+ cout << origCmd << endl;
+ errmsg = "eliot was here";
+
+ BSONObj shards = cmdObj["shards"].embeddedObjectUserCheck();
+ vector< auto_ptr<DBClientCursor> > shardCursors;
+ BSONObjIterator i( shards );
+ while ( i.more() ){
+ BSONElement e = i.next();
+ string shard = e.fieldName();
+ BSONObj res = e.embeddedObjectUserCheck();
+ cout << "\t" << shard << "\t" << res << endl;
+
+ ScopedDbConnection conn( shard );
+ //shardCursors.push_back( conn->query( dbname + "." + res["result"].valuestrsafe() , Query().sort( BSON( "_id" << 1 ) ) ) );
+ }
+
+ //while ( true ){
+
+ //}
+
+ return 0;
+ }
+ } mapReduceFinishCommand;
}
diff --git a/jstests/sharding/features2.js b/jstests/sharding/features2.js
index 559228c15a7..46a398e2596 100644
--- a/jstests/sharding/features2.js
+++ b/jstests/sharding/features2.js
@@ -51,6 +51,49 @@ assert.eq( 0 , db.foo.count() , "D7" );
// --- map/reduce
-
+db.mr.save( { x : 1 , tags : [ "a" , "b" ] } );
+db.mr.save( { x : 2 , tags : [ "b" , "c" ] } );
+db.mr.save( { x : 3 , tags : [ "c" , "a" ] } );
+db.mr.save( { x : 4 , tags : [ "b" , "c" ] } );
+
+m = function(){
+ this.tags.forEach(
+ function(z){
+ emit( z , { count : 1 } );
+ }
+ );
+};
+
+r = function( key , values ){
+ var total = 0;
+ for ( var i=0; i<values.length; i++ ){
+ total += values[i].count;
+ }
+ return { count : total };
+};
+
+doMR = function( n ){
+ var res = db.mr.mapReduce( m , r );
+ printjson( res );
+ var x = db[res.result];
+ assert.eq( 3 , x.find().count() , "MR T1 " + n );
+
+ var z = {};
+ x.find().forEach( function(a){ z[a._id] = a.value.count; } );
+ assert.eq( 3 , z.keySet().length , "MR T2 " + n );
+ assert.eq( 2 , z.a , "MR T2 " + n );
+ assert.eq( 3 , z.b , "MR T2 " + n );
+ assert.eq( 3 , z.c , "MR T2 " + n );
+
+ x.drop();
+}
+
+doMR( "before" );
+
+assert.eq( 1 , s.onNumShards( "mr" ) , "E1" );
+//s.shardGo( "mr" , { x : 1 } , { x : 2 } , { x : 3 } );
+//assert.eq( 2 , s.onNumShards( "mr" ) , "E1" );
+
+doMR( "after" );
s.stop();
diff --git a/s/chunk.cpp b/s/chunk.cpp
index 987d77232bb..4aa02303df6 100644
--- a/s/chunk.cpp
+++ b/s/chunk.cpp
@@ -283,7 +283,7 @@ namespace mongo {
BSONObj f = getFilter();
if ( ! filter.isEmpty() )
- f = ShardedCursor::concatQuery( f , filter );
+ f = ClusteredCursor::concatQuery( f , filter );
BSONObj result;
unsigned long long n = conn->count( _ns , f );
diff --git a/s/commands_public.cpp b/s/commands_public.cpp
index 01ab747edb1..a9f512cf35d 100644
--- a/s/commands_public.cpp
+++ b/s/commands_public.cpp
@@ -146,7 +146,7 @@ namespace mongo {
help << "{ distinct : 'collection name' , key : 'a.b' }";
}
bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
-
+
string dbName = getDBName( ns );
string collection = cmdObj.firstElement().valuestrsafe();
string fullns = dbName + "." + collection;
@@ -200,5 +200,63 @@ namespace mongo {
return true;
}
} disinctCmd;
+
+ class MRCmd : public PublicGridCommand {
+ public:
+ MRCmd() : PublicGridCommand( "mapreduce" ){}
+ bool run(const char *ns, BSONObj& cmdObj, string& errmsg, BSONObjBuilder& result, bool){
+
+ string dbName = getDBName( ns );
+ string collection = cmdObj.firstElement().valuestrsafe();
+ string fullns = dbName + "." + collection;
+
+ DBConfig * conf = grid.getDBConfig( dbName , false );
+
+ if ( ! conf || ! conf->isShardingEnabled() || ! conf->isSharded( fullns ) ){
+ return passthrough( conf , cmdObj , result );
+ }
+
+ ChunkManager * cm = conf->getChunkManager( fullns );
+
+ BSONObj q;
+ if ( cmdObj["query"].type() == Object ){
+ q = cmdObj["query"].embeddedObjectUserCheck();
+ }
+
+ vector<Chunk*> chunks;
+ cm->getChunksForQuery( chunks , q );
+
+ BSONObjBuilder finalB;
+ finalB.append( "mapreduce.shardedfinish" , cmdObj );
+
+ BSONObjBuilder shardresults;
+ for ( vector<Chunk*>::iterator i = chunks.begin() ; i != chunks.end() ; i++ ){
+ Chunk * c = *i;
+ ScopedDbConnection conn( c->getShard() );
+
+ BSONObj myres;
+ if ( ! conn->runCommand( dbName , cmdObj , myres ) ){
+ errmsg = "mongod mr failed: ";
+ errmsg += myres.toString();
+ return 0;
+ }
+ shardresults.append( c->getShard() , myres );
+ }
+
+ finalB.append( "shards" , shardresults.obj() );
+
+ BSONObj final = finalB.obj();
+
+ ScopedDbConnection conn( conf->getPrimary() );
+ BSONObj finalResult;
+ if ( ! conn->runCommand( dbName , final , finalResult ) ){
+ errmsg = "final reduce failed: ";
+ errmsg += finalResult.toString();
+ return 0;
+ }
+ result.appendElements( finalResult );
+ return 1;
+ }
+ } mrCmd;
}
}
diff --git a/s/cursors.cpp b/s/cursors.cpp
index 2c491abef63..9835680174b 100644
--- a/s/cursors.cpp
+++ b/s/cursors.cpp
@@ -9,81 +9,29 @@ namespace mongo {
// -------- ShardedCursor -----------
- ShardedCursor::ShardedCursor( QueryMessage& q ){
- _ns = q.ns;
- _query = q.query.copy();
- _options = q.queryOptions;
+ ShardedClientCursor::ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor ){
+ assert( cursor );
+ _cursor = cursor;
+
_skip = q.ntoskip;
_ntoreturn = q.ntoreturn;
_totalSent = 0;
_done = false;
- if ( q.fields.get() ){
- _fields = q.fields->getSpec();
- }
- else {
- _fields = BSONObj();
- }
-
do {
_id = security.getNonce();
} while ( _id == 0 );
}
- ShardedCursor::~ShardedCursor(){
- _done = true; // just in case
+ ShardedClientCursor::~ShardedClientCursor(){
+ assert( _cursor );
+ delete _cursor;
+ _cursor = 0;
}
-
- auto_ptr<DBClientCursor> ShardedCursor::query( const string& server , int num , BSONObj extra ){
- uassert( "cursor already done" , ! _done );
-
- BSONObj q = _query;
- if ( ! extra.isEmpty() ){
- q = concatQuery( q , extra );
- }
-
- ScopedDbConnection conn( server );
- checkShardVersion( conn.conn() , _ns );
- log(5) << "ShardedCursor::query server:" << server << " ns:" << _ns << " query:" << q << " num:" << num << " _fields:" << _fields << " options: " << _options << endl;
- auto_ptr<DBClientCursor> cursor = conn->query( _ns.c_str() , q , num , 0 , ( _fields.isEmpty() ? 0 : &_fields ) , _options );
- if ( cursor->hasResultFlag( QueryResult::ResultFlag_ShardConfigStale ) )
- throw StaleConfigException( _ns , "ShardedCursor::query" );
-
- conn.done();
- return cursor;
- }
-
- BSONObj ShardedCursor::concatQuery( const BSONObj& query , const BSONObj& extraFilter ){
- if ( ! query.hasField( "query" ) )
- return _concatFilter( query , extraFilter );
-
- BSONObjBuilder b;
- BSONObjIterator i( query );
- while ( i.more() ){
- BSONElement e = i.next();
-
- if ( strcmp( e.fieldName() , "query" ) ){
- b.append( e );
- continue;
- }
-
- b.append( "query" , _concatFilter( e.embeddedObjectUserCheck() , extraFilter ) );
- }
- return b.obj();
- }
-
- BSONObj ShardedCursor::_concatFilter( const BSONObj& filter , const BSONObj& extra ){
- BSONObjBuilder b;
- b.appendElements( filter );
- b.appendElements( extra );
- return b.obj();
- // TODO: should do some simplification here if possibl ideally
- }
-
- bool ShardedCursor::sendNextBatch( Request& r , int ntoreturn ){
+ bool ShardedClientCursor::sendNextBatch( Request& r , int ntoreturn ){
uassert( "cursor already done" , ! _done );
int maxSize = 1024 * 1024;
@@ -95,8 +43,8 @@ namespace mongo {
int num = 0;
bool sendMore = true;
- while ( more() ){
- BSONObj o = next();
+ while ( _cursor->more() ){
+ BSONObj o = _cursor->next();
b.append( (void*)o.objdata() , o.objsize() );
num++;
@@ -116,7 +64,7 @@ namespace mongo {
}
}
- bool hasMore = sendMore && more();
+ bool hasMore = sendMore && _cursor->more();
log(6) << "\t hasMore:" << hasMore << " wouldSendMoreIfHad: " << sendMore << " id:" << _id << " totalSent: " << _totalSent << endl;
replyToQuery( 0 , r.p() , r.m() , b.buf() , b.len() , num , _totalSent , hasMore ? _id : 0 );
@@ -126,118 +74,6 @@ namespace mongo {
return hasMore;
}
- // -------- SerialServerShardedCursor -----------
-
- SerialServerShardedCursor::SerialServerShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder) : ShardedCursor( q ){
- for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ )
- _servers.push_back( *i );
-
- if ( sortOrder > 0 )
- sort( _servers.begin() , _servers.end() );
- else if ( sortOrder < 0 )
- sort( _servers.rbegin() , _servers.rend() );
-
- _serverIndex = 0;
- }
-
- bool SerialServerShardedCursor::more(){
- if ( _current.get() && _current->more() )
- return true;
-
- if ( _serverIndex >= _servers.size() )
- return false;
-
- ServerAndQuery& sq = _servers[_serverIndex++];
- _current = query( sq._server , 0 , sq._extra );
- return _current->more();
- }
-
- BSONObj SerialServerShardedCursor::next(){
- uassert( "no more items" , more() );
- return _current->next();
- }
-
- // -------- ParallelSortShardedCursor -----------
-
- ParallelSortShardedCursor::ParallelSortShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey ) : ShardedCursor( q ) , _servers( servers ){
- _numServers = servers.size();
- _sortKey = sortKey.getOwned();
-
- _cursors = new auto_ptr<DBClientCursor>[_numServers];
- _nexts = new BSONObj[_numServers];
-
- // TODO: parellize
- int num = 0;
- for ( set<ServerAndQuery>::iterator i = servers.begin(); i!=servers.end(); i++ ){
- const ServerAndQuery& sq = *i;
- _cursors[num++] = query( sq._server , 0 , sq._extra );
- }
-
- }
-
- ParallelSortShardedCursor::~ParallelSortShardedCursor(){
- delete [] _cursors;
- delete [] _nexts;
- }
-
- bool ParallelSortShardedCursor::more(){
- for ( int i=0; i<_numServers; i++ ){
- if ( ! _nexts[i].isEmpty() )
- return true;
-
- if ( _cursors[i].get() && _cursors[i]->more() )
- return true;
- }
- return false;
- }
-
- BSONObj ParallelSortShardedCursor::next(){
- advance();
-
- BSONObj best = BSONObj();
- int bestFrom = -1;
-
- for ( int i=0; i<_numServers; i++){
- if ( _nexts[i].isEmpty() )
- continue;
-
- if ( best.isEmpty() ){
- best = _nexts[i];
- bestFrom = i;
- continue;
- }
-
- int comp = best.woSortOrder( _nexts[i] , _sortKey );
- if ( comp < 0 )
- continue;
-
- best = _nexts[i];
- bestFrom = i;
- }
-
- uassert( "no more elements" , ! best.isEmpty() );
- _nexts[bestFrom] = BSONObj();
-
- return best;
- }
-
- void ParallelSortShardedCursor::advance(){
- for ( int i=0; i<_numServers; i++ ){
-
- if ( ! _nexts[i].isEmpty() ){
- // already have a good object there
- continue;
- }
-
- if ( ! _cursors[i]->more() ){
- // cursor is dead, oh well
- continue;
- }
-
- _nexts[i] = _cursors[i]->next();
- }
-
- }
CursorCache::CursorCache(){
}
@@ -246,8 +82,8 @@ namespace mongo {
// TODO: delete old cursors?
}
- ShardedCursor* CursorCache::get( long long id ){
- map<long long,ShardedCursor*>::iterator i = _cursors.find( id );
+ ShardedClientCursor* CursorCache::get( long long id ){
+ map<long long,ShardedClientCursor*>::iterator i = _cursors.find( id );
if ( i == _cursors.end() ){
OCCASIONALLY log() << "Sharded CursorCache missing cursor id: " << id << endl;
return 0;
@@ -255,7 +91,7 @@ namespace mongo {
return i->second;
}
- void CursorCache::store( ShardedCursor * cursor ){
+ void CursorCache::store( ShardedClientCursor * cursor ){
_cursors[cursor->getId()] = cursor;
}
void CursorCache::remove( long long id ){
diff --git a/s/cursors.h b/s/cursors.h
index 218d9025d0e..b1ed4b05c86 100644
--- a/s/cursors.h
+++ b/s/cursors.h
@@ -7,18 +7,16 @@
#include "../db/jsobj.h"
#include "../db/dbmessage.h"
#include "../client/dbclient.h"
+#include "../client/parallel.h"
#include "request.h"
namespace mongo {
- class ShardedCursor {
+ class ShardedClientCursor {
public:
- ShardedCursor( QueryMessage& q );
- virtual ~ShardedCursor();
-
- virtual bool more() = 0;
- virtual BSONObj next() = 0;
+ ShardedClientCursor( QueryMessage& q , ClusteredCursor * cursor );
+ virtual ~ShardedClientCursor();
long long getId(){ return _id; }
@@ -27,78 +25,18 @@ namespace mongo {
*/
bool sendNextBatch( Request& r ){ return sendNextBatch( r , _ntoreturn ); }
bool sendNextBatch( Request& r , int ntoreturn );
-
- static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
protected:
- auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() );
-
- static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
-
- string _ns;
- int _options;
+
+ ClusteredCursor * _cursor;
+
int _skip;
int _ntoreturn;
-
- BSONObj _query;
- BSONObj _fields;
-
- long long _id;
int _totalSent;
bool _done;
- };
-
-
- class ServerAndQuery {
- public:
- ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) :
- _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
- }
- bool operator<( const ServerAndQuery& other ) const{
- if ( ! _orderObject.isEmpty() )
- return _orderObject.woCompare( other._orderObject ) < 0;
-
- if ( _server < other._server )
- return true;
- if ( other._server > _server )
- return false;
- return _extra.woCompare( other._extra ) < 0;
- }
-
- string _server;
- BSONObj _extra;
- BSONObj _orderObject;
- };
-
- class SerialServerShardedCursor : public ShardedCursor {
- public:
- SerialServerShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder=0);
- virtual bool more();
- virtual BSONObj next();
- private:
- vector<ServerAndQuery> _servers;
- unsigned _serverIndex;
-
- auto_ptr<DBClientCursor> _current;
- };
-
- class ParallelSortShardedCursor : public ShardedCursor {
- public:
- ParallelSortShardedCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey );
- virtual ~ParallelSortShardedCursor();
- virtual bool more();
- virtual BSONObj next();
- private:
- void advance();
-
- int _numServers;
- set<ServerAndQuery> _servers;
- BSONObj _sortKey;
-
- auto_ptr<DBClientCursor> * _cursors;
- BSONObj * _nexts;
+ long long _id;
};
class CursorCache {
@@ -106,12 +44,12 @@ namespace mongo {
CursorCache();
~CursorCache();
- ShardedCursor* get( long long id );
- void store( ShardedCursor* cursor );
+ ShardedClientCursor * get( long long id );
+ void store( ShardedClientCursor* cursor );
void remove( long long id );
private:
- map<long long,ShardedCursor*> _cursors;
+ map<long long,ShardedClientCursor*> _cursors;
};
extern CursorCache cursorCache;
diff --git a/s/d_logic.cpp b/s/d_logic.cpp
index ded4b1b85d6..87bcb5db293 100644
--- a/s/d_logic.cpp
+++ b/s/d_logic.cpp
@@ -496,6 +496,10 @@ namespace mongo {
return true;
}
+
+ void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false ){
+ // no-op in mongod
+ }
}
diff --git a/s/request.h b/s/request.h
index cdc75332486..689216c61c2 100644
--- a/s/request.h
+++ b/s/request.h
@@ -6,6 +6,7 @@
#include "../util/message.h"
#include "../db/dbmessage.h"
#include "config.h"
+#include "util.h"
namespace mongo {
@@ -86,23 +87,6 @@ namespace mongo {
int _clientId;
ClientInfo * _clientInfo;
};
-
- class StaleConfigException : public std::exception {
- public:
- StaleConfigException( const string& ns , const string& msg){
- stringstream s;
- s << "StaleConfigException ns: " << ns << " " << msg;
- _msg = s.str();
- }
-
- virtual ~StaleConfigException() throw(){}
-
- virtual const char* what() const throw(){
- return _msg.c_str();
- }
- private:
- string _msg;
- };
typedef map<int,ClientInfo*> ClientCache;
diff --git a/s/strategy.h b/s/strategy.h
index bdeb4fa4f7c..e4b93b5a155 100644
--- a/s/strategy.h
+++ b/s/strategy.h
@@ -27,8 +27,6 @@ namespace mongo {
extern Strategy * SINGLE;
extern Strategy * SHARDED;
- void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false );
-
bool setShardVersion( DBClientBase & conn , const string& ns , ShardChunkVersion version , bool authoritative , BSONObj& result );
bool lockNamespaceOnServer( const string& server , const string& ns );
diff --git a/s/strategy_shard.cpp b/s/strategy_shard.cpp
index a34324d9ca2..25925180d25 100644
--- a/s/strategy_shard.cpp
+++ b/s/strategy_shard.cpp
@@ -35,13 +35,13 @@ namespace mongo {
num++;
}
- ShardedCursor * cursor = 0;
+ ClusteredCursor * cursor = 0;
BSONObj sort = query.getSort();
if ( sort.isEmpty() ){
// 1. no sort, can just hit them in serial
- cursor = new SerialServerShardedCursor( servers , q );
+ cursor = new SerialServerClusteredCursor( servers , q );
}
else {
int shardKeyOrder = info->getShardKey().canOrder( sort );
@@ -59,21 +59,23 @@ namespace mongo {
}
buckets.insert( ServerAndQuery( s->getShard() , extra , s->getMin() ) );
}
- cursor = new SerialServerShardedCursor( buckets , q , shardKeyOrder );
+ cursor = new SerialServerClusteredCursor( buckets , q , shardKeyOrder );
}
else {
// 3. sort on non-sharded key, pull back a portion from each server and iterate slowly
- cursor = new ParallelSortShardedCursor( servers , q , sort );
+ cursor = new ParallelSortClusteredCursor( servers , q , sort );
}
}
assert( cursor );
- if ( ! cursor->sendNextBatch( r ) ){
+
+ ShardedClientCursor * cc = new ShardedClientCursor( q , cursor );
+ if ( ! cc->sendNextBatch( r ) ){
delete( cursor );
return;
}
- log(6) << "storing cursor : " << cursor->getId() << endl;
- cursorCache.store( cursor );
+ log(6) << "storing cursor : " << cc->getId() << endl;
+ cursorCache.store( cc );
}
virtual void getMore( Request& r ){
@@ -82,7 +84,7 @@ namespace mongo {
log(6) << "want cursor : " << id << endl;
- ShardedCursor * cursor = cursorCache.get( id );
+ ShardedClientCursor * cursor = cursorCache.get( id );
if ( ! cursor ){
log(6) << "\t invalid cursor :(" << endl;
replyToQuery( QueryResult::ResultFlag_CursorNotFound , r.p() , r.m() , 0 , 0 , 0 );
diff --git a/s/util.h b/s/util.h
new file mode 100644
index 00000000000..fb307fbc597
--- /dev/null
+++ b/s/util.h
@@ -0,0 +1,48 @@
+// util.h
+
+/**
+* Copyright (C) 2008 10gen Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "stdafx.h"
+
+/**
+ some generic sharding utils that can be used in mongod or mongos
+ */
+
+namespace mongo {
+
+ /**
+ your config info for a given shard/chunk is out of date */
+ class StaleConfigException : public std::exception {
+ public:
+ StaleConfigException( const string& ns , const string& msg){
+ stringstream s;
+ s << "StaleConfigException ns: " << ns << " " << msg;
+ _msg = s.str();
+ }
+
+ virtual ~StaleConfigException() throw(){}
+
+ virtual const char* what() const throw(){
+ return _msg.c_str();
+ }
+ private:
+ string _msg;
+ };
+
+ void checkShardVersion( DBClientBase & conn , const string& ns , bool authoritative = false );
+
+}