summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/consensus.cpp
blob: 3d38b120ad8b77c09762f66ff8c6021226539f9f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
/**
*    Copyright (C) 2010 10gen Inc.
*
*    This program is free software: you can redistribute it and/or  modify
*    it under the terms of the GNU Affero General Public License, version 3,
*    as published by the Free Software Foundation.
*
*    This program is distributed in the hope that it will be useful,
*    but WITHOUT ANY WARRANTY; without even the implied warranty of
*    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
*    GNU Affero General Public License for more details.
*
*    You should have received a copy of the GNU Affero General Public License
*    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "pch.h"
#include "../commands.h"
#include "rs.h"
#include "multicmd.h"

namespace mongo {

    /** the first cmd called by a node seeking election and it's a basic sanity 
        test: do any of the nodes it can reach know that it can't be the primary?
        */
    class CmdReplSetFresh : public ReplSetCommand {
    public:
        CmdReplSetFresh() : ReplSetCommand("replSetFresh") { }
    private:

        bool shouldVeto(const BSONObj& cmdObj, string& errmsg) {
            // don't veto older versions
            if (cmdObj["id"].eoo()) {
                // they won't be looking for the veto field
                return false;
            }

            unsigned id = cmdObj["id"].Int();
            const Member* primary = theReplSet->box.getPrimary();
            const Member* hopeful = theReplSet->findById(id);
            const Member *highestPriority = theReplSet->getMostElectable();

            if( !hopeful ) {
                errmsg = str::stream() << "replSet couldn't find member with id " << id;
                return true;
            }
            else if( theReplSet->isPrimary() && theReplSet->lastOpTimeWritten >= hopeful->hbinfo().opTime ) {
                // hbinfo is not updated, so we have to check the primary's last optime separately
                errmsg = str::stream() << "I am already primary, " << hopeful->fullName() <<
                    " can try again once I've stepped down";
                return true;
            }
            else if( primary && primary->hbinfo().opTime >= hopeful->hbinfo().opTime ) {
                // other members might be aware of more up-to-date nodes
                errmsg = str::stream() << hopeful->fullName() << " is trying to elect itself but " <<
                    primary->fullName() << " is already primary and more up-to-date";
                return true;
            }
            else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
                errmsg = str::stream() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
                return true;
            }

            if ( !theReplSet->isElectable(id) ||
                (highestPriority && highestPriority->config().priority > hopeful->config().priority)) {
                return true;
            }

            return false;
        }

        virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
            if( !check(errmsg, result) )
                return false;

            if( cmdObj["set"].String() != theReplSet->name() ) {
                errmsg = "wrong repl set name";
                return false;
            }
            string who = cmdObj["who"].String();
            int cfgver = cmdObj["cfgver"].Int();
            OpTime opTime(cmdObj["opTime"].Date());

            bool weAreFresher = false;
            if( theReplSet->config().version > cfgver ) {
                log() << "replSet member " << who << " is not yet aware its cfg version " << cfgver << " is stale" << rsLog;
                result.append("info", "config version stale");
                weAreFresher = true;
            }
            // check not only our own optime, but any other member we can reach
            else if( opTime < theReplSet->lastOpTimeWritten ||
                     opTime < theReplSet->lastOtherOpTime())  {
                weAreFresher = true;
            }
            result.appendDate("opTime", theReplSet->lastOpTimeWritten.asDate());
            result.append("fresher", weAreFresher);
            result.append("veto", shouldVeto(cmdObj, errmsg));

            return true;
        }
    } cmdReplSetFresh;

    class CmdReplSetElect : public ReplSetCommand {
    public:
        CmdReplSetElect() : ReplSetCommand("replSetElect") { }
    private:
        virtual bool run(const string& , BSONObj& cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
            if( !check(errmsg, result) )
                return false;
            theReplSet->elect.electCmdReceived(cmdObj, &result);
            return true;
        }
    } cmdReplSetElect;

    int Consensus::totalVotes() const {
        static int complain = 0;
        int vTot = rs._self->config().votes;
        for( Member *m = rs.head(); m; m=m->next() )
            vTot += m->config().votes;
        if( vTot % 2 == 0 && vTot && complain++ == 0 )
            log() << "replSet " /*buildbot! warning */ "total number of votes is even - add arbiter or give one member an extra vote" << rsLog;
        return vTot;
    }

    bool Consensus::aMajoritySeemsToBeUp() const {
        int vUp = rs._self->config().votes;
        for( Member *m = rs.head(); m; m=m->next() )
            vUp += m->hbinfo().up() ? m->config().votes : 0;
        return vUp * 2 > totalVotes();
    }

    bool Consensus::shouldRelinquish() const {
        int vUp = rs._self->config().votes;
        const long long T = rs.config().ho.heartbeatTimeoutMillis * rs.config().ho.heartbeatConnRetries;
        for( Member *m = rs.head(); m; m=m->next() ) {
            long long dt = m->hbinfo().timeDown();
            if( dt < T )
                vUp += m->config().votes;
        }

        // the manager will handle calling stepdown if another node should be
        // primary due to priority

        return !( vUp * 2 > totalVotes() );
    }

    static const int VETO = -10000;

    const time_t LeaseTime = 30;

    SimpleMutex Consensus::lyMutex("ly");

    unsigned Consensus::yea(unsigned memberId) { /* throws VoteException */
        SimpleMutex::scoped_lock lk(lyMutex);
        LastYea &L = this->ly.ref(lk);
        time_t now = time(0);
        if( L.when + LeaseTime >= now && L.who != memberId ) {
            LOG(1) << "replSet not voting yea for " << memberId <<
                   " voted for " << L.who << ' ' << now-L.when << " secs ago" << rsLog;
            throw VoteException();
        }
        L.when = now;
        L.who = memberId;
        return rs._self->config().votes;
    }

    /* we vote for ourself at start of election.  once it fails, we can cancel the lease we had in
       place instead of leaving it for a long time.
       */
    void Consensus::electionFailed(unsigned meid) {
        SimpleMutex::scoped_lock lk(lyMutex);
        LastYea &L = ly.ref(lk);
        DEV verify( L.who == meid ); // this may not always always hold, so be aware, but adding for now as a quick sanity test
        if( L.who == meid )
            L.when = 0;
    }

    /* todo: threading **************** !!!!!!!!!!!!!!!! */
    void Consensus::electCmdReceived(BSONObj cmd, BSONObjBuilder* _b) {
        BSONObjBuilder& b = *_b;
        DEV log() << "replSet received elect msg " << cmd.toString() << rsLog;
        else LOG(2) << "replSet received elect msg " << cmd.toString() << rsLog;
        string set = cmd["set"].String();
        unsigned whoid = cmd["whoid"].Int();
        int cfgver = cmd["cfgver"].Int();
        OID round = cmd["round"].OID();
        int myver = rs.config().version;

        const Member* primary = rs.box.getPrimary();
        const Member* hopeful = rs.findById(whoid);
        const Member* highestPriority = rs.getMostElectable();

        int vote = 0;
        if( set != rs.name() ) {
            log() << "replSet error received an elect request for '" << set << "' but our set name is '" << rs.name() << "'" << rsLog;
        }
        else if( myver < cfgver ) {
            // we are stale.  don't vote
        }
        else if( myver > cfgver ) {
            // they are stale!
            log() << "replSet electCmdReceived info got stale version # during election" << rsLog;
            vote = -10000;
        }
        else if( !hopeful ) {
            log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog;
            vote = -10000;
        }
        else if( primary && primary == rs._self) {
            log() << "I am already primary, " << hopeful->fullName()
                  << " can try again once I've stepped down" << rsLog;
            vote = -10000;
        }
        else if (primary) {
            log() << hopeful->fullName() << " is trying to elect itself but " <<
                  primary->fullName() << " is already primary" << rsLog;
            vote = -10000;
        }
        else if( highestPriority && highestPriority->config().priority > hopeful->config().priority) {
            log() << hopeful->fullName() << " has lower priority than " << highestPriority->fullName();
            vote = -10000;
        }
        else {
            try {
                vote = yea(whoid);
                dassert( hopeful->id() == whoid );
                rs.relinquish();
                log() << "replSet info voting yea for " <<  hopeful->fullName() << " (" << whoid << ')' << rsLog;
            }
            catch(VoteException&) {
                log() << "replSet voting no for " << hopeful->fullName() << " already voted for another" << rsLog;
            }
        }

        b.append("vote", vote);
        b.append("round", round);
    }

    void ReplSetImpl::_getTargets(list<Target>& L, int& configVersion) {
        configVersion = config().version;
        for( Member *m = head(); m; m=m->next() )
            if( m->hbinfo().maybeUp() )
                L.push_back( Target(m->fullName()) );
    }

    /* config version is returned as it is ok to use this unlocked.  BUT, if unlocked, you would need
       to check later that the config didn't change. */
    void ReplSetImpl::getTargets(list<Target>& L, int& configVersion) {
        if( lockedByMe() ) {
            _getTargets(L, configVersion);
            return;
        }
        lock lk(this);
        _getTargets(L, configVersion);
    }

    /* Do we have the newest data of them all?
       @param allUp - set to true if all members are up.  Only set if true returned.
       @return true if we are freshest.  Note we may tie.
    */
    bool Consensus::weAreFreshest(bool& allUp, int& nTies) {
        const OpTime ord = theReplSet->lastOpTimeWritten;
        nTies = 0;
        verify( !ord.isNull() );
        BSONObj cmd = BSON(
                          "replSetFresh" << 1 <<
                          "set" << rs.name() <<
                          "opTime" << Date_t(ord.asDate()) <<
                          "who" << rs._self->fullName() <<
                          "cfgver" << rs._cfg->version <<
                          "id" << rs._self->id());
        list<Target> L;
        int ver;
        /* the following queries arbiters, even though they are never fresh.  wonder if that makes sense.
           it doesn't, but it could, if they "know" what freshness it one day.  so consider removing
           arbiters from getTargets() here.  although getTargets is used elsewhere for elections; there
           arbiters are certainly targets - so a "includeArbs" bool would be necessary if we want to make
           not fetching them herein happen.
           */
        rs.getTargets(L, ver);
        multiCommand(cmd, L);
        int nok = 0;
        allUp = true;
        for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
            if( i->ok ) {
                nok++;
                if( i->result["fresher"].trueValue() ) {
                    log() << "not electing self, we are not freshest" << rsLog;
                    return false;
                }
                OpTime remoteOrd( i->result["opTime"].Date() );
                if( remoteOrd == ord )
                    nTies++;
                verify( remoteOrd <= ord );

                if( i->result["veto"].trueValue() ) {
                    BSONElement msg = i->result["errmsg"];
                    if (!msg.eoo()) {
                        log() << "not electing self, " << i->toHost << " would veto with '" <<
                            msg.String() << "'" << rsLog;
                    }
                    else {
                        log() << "not electing self, " << i->toHost << " would veto" << rsLog;
                    }
                    return false;
                }
            }
            else {
                DEV log() << "replSet freshest returns " << i->result.toString() << rsLog;
                allUp = false;
            }
        }
        LOG(1) << "replSet dev we are freshest of up nodes, nok:" << nok << " nTies:" << nTies << rsLog;
        verify( ord <= theReplSet->lastOpTimeWritten ); // <= as this may change while we are working...
        return true;
    }

    extern time_t started;

    void Consensus::multiCommand(BSONObj cmd, list<Target>& L) {
        verify( !rs.lockedByMe() );
        mongo::multiCommand(cmd, L);
    }

    void Consensus::_electSelf() {
        if( time(0) < steppedDown )
            return;

        {
            const OpTime ord = theReplSet->lastOpTimeWritten;
            if( ord == 0 ) {
                log() << "replSet info not trying to elect self, do not yet have a complete set of data from any point in time" << rsLog;
                return;
            }
        }

        bool allUp;
        int nTies;
        if( !weAreFreshest(allUp, nTies) ) {
            return;
        }

        rs.sethbmsg("",9);

        if( !allUp && time(0) - started < 60 * 5 ) {
            /* the idea here is that if a bunch of nodes bounce all at once, we don't want to drop data
               if we don't have to -- we'd rather be offline and wait a little longer instead
               todo: make this configurable.
               */
            rs.sethbmsg("not electing self, not all members up and we have been up less than 5 minutes");
            return;
        }

        Member& me = *rs._self;

        if( nTies ) {
            /* tie?  we then randomly sleep to try to not collide on our voting. */
            /* todo: smarter. */
            if( me.id() == 0 || sleptLast ) {
                // would be fine for one node not to sleep
                // todo: biggest / highest priority nodes should be the ones that get to not sleep
            }
            else {
                verify( !rs.lockedByMe() ); // bad to go to sleep locked
                unsigned ms = ((unsigned) rand()) % 1000 + 50;
                DEV log() << "replSet tie " << nTies << " sleeping a little " << ms << "ms" << rsLog;
                sleptLast = true;
                sleepmillis(ms);
                throw RetryAfterSleepException();
            }
        }
        sleptLast = false;

        time_t start = time(0);
        unsigned meid = me.id();
        int tally = yea( meid );
        bool success = false;
        try {
            log() << "replSet info electSelf " << meid << rsLog;

            BSONObj electCmd = BSON(
                                   "replSetElect" << 1 <<
                                   "set" << rs.name() <<
                                   "who" << me.fullName() <<
                                   "whoid" << me.hbinfo().id() <<
                                   "cfgver" << rs._cfg->version <<
                                   "round" << OID::gen() /* this is just for diagnostics */
                               );

            int configVersion;
            list<Target> L;
            rs.getTargets(L, configVersion);
            multiCommand(electCmd, L);

            {
                for( list<Target>::iterator i = L.begin(); i != L.end(); i++ ) {
                    DEV log() << "replSet elect res: " << i->result.toString() << rsLog;
                    if( i->ok ) {
                        int v = i->result["vote"].Int();
                        tally += v;
                    }
                }
                if( tally*2 <= totalVotes() ) {
                    log() << "replSet couldn't elect self, only received " << tally << " votes" << rsLog;
                }
                else if( time(0) - start > 30 ) {
                    // defensive; should never happen as we have timeouts on connection and operation for our conn
                    log() << "replSet too much time passed during our election, ignoring result" << rsLog;
                }
                else if( configVersion != rs.config().version ) {
                    log() << "replSet config version changed during our election, ignoring result" << rsLog;
                }
                else {
                    /* succeeded. */
                    LOG(1) << "replSet election succeeded, assuming primary role" << rsLog;
                    success = true;
                    rs.assumePrimary();
                }
            }
        }
        catch( std::exception& ) {
            if( !success ) electionFailed(meid);
            throw;
        }
        if( !success ) electionFailed(meid);
    }

    void Consensus::electSelf() {
        verify( !rs.lockedByMe() );
        verify( !rs.myConfig().arbiterOnly );
        verify( rs.myConfig().slaveDelay == 0 );
        try {
            _electSelf();
        }
        catch(RetryAfterSleepException&) {
            throw;
        }
        catch(VoteException& ) {
            log() << "replSet not trying to elect self as responded yea to someone else recently" << rsLog;
        }
        catch(DBException& e) {
            log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() << rsLog;
        }
        catch(...) {
            log() << "replSet warning caught unexpected exception in electSelf()" << rsLog;
        }
    }

}