summaryrefslogtreecommitdiff
path: root/client/parallel.h
blob: a2189c6c74512ee2796ced8e2888058cd429e1f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// parallel.h

/*    Copyright 2009 10gen Inc.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

/**
   tools for wokring in parallel/sharded/clustered environment
 */

#include "stdafx.h"
#include "dbclient.h"
#include "../db/dbmessage.h"

namespace mongo {

    class ClusteredCursor {
    public:
        ClusteredCursor( QueryMessage& q );
        ClusteredCursor( const string& ns , const BSONObj& q , int options=0 , const BSONObj& fields=BSONObj() );
        virtual ~ClusteredCursor();

        virtual bool more() = 0;
        virtual BSONObj next() = 0;
        
        static BSONObj concatQuery( const BSONObj& query , const BSONObj& extraFilter );
        
    protected:
        auto_ptr<DBClientCursor> query( const string& server , int num = 0 , BSONObj extraFilter = BSONObj() );

        static BSONObj _concatFilter( const BSONObj& filter , const BSONObj& extraFilter );
        
        string _ns;
        BSONObj _query;
        int _options;
        BSONObj _fields;

        bool _done;
    };


    class ServerAndQuery {
    public:
        ServerAndQuery( const string& server , BSONObj extra = BSONObj() , BSONObj orderObject = BSONObj() ) : 
            _server( server ) , _extra( extra.getOwned() ) , _orderObject( orderObject.getOwned() ){
        }

        bool operator<( const ServerAndQuery& other ) const{
            if ( ! _orderObject.isEmpty() )
                return _orderObject.woCompare( other._orderObject ) < 0;
            
            if ( _server < other._server )
                return true;
            if ( other._server > _server )
                return false;
            return _extra.woCompare( other._extra ) < 0;
        }

        string _server;
        BSONObj _extra;
        BSONObj _orderObject;
    };

    class SerialServerClusteredCursor : public ClusteredCursor {
    public:
        SerialServerClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , int sortOrder=0);
        virtual bool more();
        virtual BSONObj next();
    private:
        vector<ServerAndQuery> _servers;
        unsigned _serverIndex;
        
        auto_ptr<DBClientCursor> _current;
    };
        
    class ParallelSortClusteredCursor : public ClusteredCursor {
    public:
        ParallelSortClusteredCursor( set<ServerAndQuery> servers , QueryMessage& q , const BSONObj& sortKey );
        ParallelSortClusteredCursor( set<ServerAndQuery> servers , const string& ns , 
                                     const Query& q , int options=0, const BSONObj& fields=BSONObj() );
        virtual ~ParallelSortClusteredCursor();
        virtual bool more();
        virtual BSONObj next();
    private:
        void _init();
        
        void advance();

        int _numServers;
        set<ServerAndQuery> _servers;
        BSONObj _sortKey;

        auto_ptr<DBClientCursor> * _cursors;
        BSONObj * _nexts;
    };

}