summaryrefslogtreecommitdiff
path: root/s/strategy.cpp
blob: dc644cb86e76fa759332833e64d052bd6b171bdb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// stragegy.cpp

#include "stdafx.h"
#include "request.h"
#include "../client/connpool.h"
#include "../db/commands.h"
#include "shard.h"

namespace mongo {

    // ----- Strategy ------

    void Strategy::doWrite( int op , Request& r , string server ){
        ScopedDbConnection dbcon( server );
        DBClientBase &_c = dbcon.conn();
        
        /* TODO FIX - do not case and call DBClientBase::say() */
        DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
        c.port().say( r.m() );
        
        dbcon.done();
    }

    void Strategy::doQuery( Request& r , string server ){
        try{
            ScopedDbConnection dbcon( server );
            DBClientBase &_c = dbcon.conn();
            
            checkShardVersion( _c , r.getns() );

            // TODO: This will not work with Paired connections.  Fix. 
            DBClientConnection&c = dynamic_cast<DBClientConnection&>(_c);
            Message response;
            bool ok = c.port().call( r.m(), response);
            uassert("mongos: error calling db", ok);
            r.reply( response );
            dbcon.done();
        }
        catch ( AssertionException& e ) {
            BSONObjBuilder err;
            err.append("$err", string("mongos: ") + (e.msg.empty() ? "assertion during query" : e.msg));
            BSONObj errObj = err.done();
            replyToQuery(QueryResult::ResultFlag_ErrSet, r.p() , r.m() , errObj);
        }
    }
    
    void Strategy::insert( string server , const char * ns , const BSONObj& obj ){
        ScopedDbConnection dbcon( server );
        dbcon->insert( ns , obj );
        dbcon.done();
    }

    map<DBClientBase*,unsigned long long> checkShardVersionLastSequence;

    void checkShardVersion( DBClientBase& conn , const string& ns , bool authoritative ){
        // TODO: cache, optimize, etc...
        
        DBConfig * conf = grid.getDBConfig( ns );
        if ( ! conf )
            return;
        
        if ( ! conf->sharded( ns ) )
            return;
        
        
        ShardManager * manager = conf->getShardManager( ns , authoritative );

        unsigned long long & sequenceNumber = checkShardVersionLastSequence[ &conn ];        
        if ( manager->getSequenceNumber() == sequenceNumber )
            return;

        ServerShardVersion version = manager->getVersion( conn.getServerAddress() ); 

        BSONObj result;
        if ( setShardVersion( conn , ns , version , authoritative , result ) ){
            // success!
            sequenceNumber = manager->getSequenceNumber();
            return;
        }

        log(1) << "       setShardVersion failed!\n" << result << endl;

        if ( result.getBoolField( "need_authoritative" ) )
            massert( "need_authoritative set but in authoritative mode already" , ! authoritative );
        
        if ( ! authoritative ){
            checkShardVersion( conn , ns , 1 );
            return;
        }
        
        log(1) << "     setShardVersion failed: " << result << endl;
        massert( "setShardVersion failed!" , 0 );
    }
    
    bool setShardVersion( DBClientBase & conn , const string& ns , ServerShardVersion version , bool authoritative , BSONObj& result ){

        BSONObjBuilder cmdBuilder;
        cmdBuilder.append( "setShardVersion" , ns.c_str() );
        cmdBuilder.append( "configdb" , configServer.modelServer() );
        cmdBuilder.appendTimestamp( "version" , version );
        if ( authoritative )
            cmdBuilder.appendBool( "authoritative" , 1 );
        BSONObj cmd = cmdBuilder.obj();
        
        log(1) << "    setShardVersion  " << conn.getServerAddress() << "  " << ns << "  " << cmd << " " << &conn << endl;
        
        return conn.runCommand( "admin" , cmd , result );
    }
    
}