summaryrefslogtreecommitdiff
path: root/src/mongo/client/dbclient_rs.h
blob: 2b91eb945b3fa2900472f53219c5d70b9648554b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
/** @file dbclient_rs.h Connect to a Replica Set, from C++ */

/*    Copyright 2009 10gen Inc.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */

#pragma once

#include "mongo/pch.h"

#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <set>
#include <utility>

#include "mongo/client/dbclientinterface.h"
#include "mongo/util/net/hostandport.h"

namespace mongo {

    class ReplicaSetMonitor;
    class TagSet;
    struct ReadPreferenceSetting;
    typedef shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr;
    typedef pair<set<string>,set<int> > NodeDiff;

    /**
     * manages state about a replica set for client
     * keeps tabs on whose master and what slaves are up
     * can hand a slave to someone for SLAVE_OK
     * one instance per process per replica set
     * TODO: we might be able to use a regular Node * to avoid _lock
     */
    class ReplicaSetMonitor {
    public:
        typedef boost::function1<void,const ReplicaSetMonitor*> ConfigChangeHook;

        /**
         * Data structure for keeping track of the states of individual replica
         * members. This class is not thread-safe so proper measures should be taken
         * when sharing this object across multiple threads.
         *
         * Note: these get copied around in the nodes vector so be sure to maintain
         * copyable semantics here
         */
        struct Node {
            Node( const HostAndPort& a , DBClientConnection* c ) :
                addr( a ),
                conn(c),
                ok( c != NULL ),
                ismaster(false),
                secondary( false ),
                hidden( false ),
                pingTimeMillis( 0 ) {
            }

            bool okForSecondaryQueries() const {
                return ok && secondary && ! hidden;
            }

            /**
             * Checks if the given tag matches the tag attached to this node.
             *
             * Example:
             *
             * Tag of this node: { "dc": "nyc", "region": "na", "rack": "4" }
             *
             * match: {}
             * match: { "dc": "nyc", "rack": 4 }
             * match: { "region": "na", "dc": "nyc" }
             * not match: { "dc: "nyc", "rack": 2 }
             * not match: { "dc": "sf" }
             *
             * @param tag the tag to use to compare. Should not contain any
             *     embedded documents.
             *
             * @return true if the given tag matches the this node's tag
             *     specification
             */
            bool matchesTag(const BSONObj& tag) const;

            /**
             * @param  threshold  max ping time (in ms) to be considered local
             * @return true if node is a local secondary, and can handle queries
             **/
            bool isLocalSecondary( const int threshold ) const {
                return pingTimeMillis < threshold;
            }

            /**
             * Checks whether this nodes is compatible with the given readPreference and
             * tag. Compatibility check is strict in the sense that secondary preferred
             * is treated like secondary only and primary preferred is treated like
             * primary only.
             *
             * @return true if this node is compatible with the read preference and tags.
             */
            bool isCompatible(ReadPreference readPreference, const TagSet* tag) const;

            BSONObj toBSON() const;

            string toString() const {
                return toBSON().toString();
            }

            HostAndPort addr;
            boost::shared_ptr<DBClientConnection> conn;

            // if this node is in a failure state
            // used for slave routing
            // this is too simple, should make it better
            bool ok;

            // as reported by ismaster
            BSONObj lastIsMaster;

            bool ismaster;
            bool secondary;
            bool hidden;

            int pingTimeMillis;

        };

        static const double SOCKET_TIMEOUT_SECS;

        /**
         * Selects the right node given the nodes to pick from and the preference.
         *
         * @param nodes the nodes to select from
         * @param preference the read mode to use
         * @param tags the tags used for filtering nodes
         * @param localThresholdMillis the exclusive upper bound of ping time to be
         *     considered as a local node. Local nodes are favored over non-local
         *     nodes if multiple nodes matches the other criteria.
         * @param lastHost the host used in the last successful request. This is used for
         *     selecting a different node as much as possible, by doing a simple round
         *     robin, starting from the node next to this lastHost. This will be overwritten
         *     with the newly chosen host if not empty, not primary and when preference
         *     is not Nearest.
         * @param isPrimarySelected out parameter that is set to true if the returned host
         *     is a primary. Cannot be NULL and valid only if returned host is not empty.
         *
         * @return the host object of the node selected. If none of the nodes are
         *     eligible, returns an empty host.
         */
        static HostAndPort selectNode(const std::vector<Node>& nodes,
                                      ReadPreference preference,
                                      TagSet* tags,
                                      int localThresholdMillis,
                                      HostAndPort* lastHost,
                                      bool* isPrimarySelected);

        /**
         * Selects the right node given the nodes to pick from and the preference. This
         * will also attempt to refresh the local view of the replica set configuration
         * if the primary node needs to be returned but is not currently available (except
         * for ReadPrefrence_Nearest).
         *
         * @param preference the read mode to use.
         * @param tags the tags used for filtering nodes.
         * @param isPrimarySelected out parameter that is set to true if the returned host
         *     is a primary. Cannot be NULL and valid only if returned host is not empty.
         *
         * @return the host object of the node selected. If none of the nodes are
         *     eligible, returns an empty host.
         */
        HostAndPort selectAndCheckNode(ReadPreference preference,
                                       TagSet* tags,
                                       bool* isPrimarySelected);

        /**
         * Creates a new ReplicaSetMonitor, if it doesn't already exist.
         */
        static void createIfNeeded( const string& name , const vector<HostAndPort>& servers );

        /**
         * gets a cached Monitor per name. If the monitor is not found and createFromSeed is false,
         * it will return none. If createFromSeed is true, it will try to look up the last known
         * servers list for this set and will create a new monitor using that as the seed list.
         */
        static ReplicaSetMonitorPtr get( const string& name, const bool createFromSeed = false );

        /**
         * Populates activeSets with all the currently tracked replica set names.
         */
        static void getAllTrackedSets(set<string>* activeSets);

        /**
         * checks all sets for current master and new secondaries
         * usually only called from a BackgroundJob
         */
        static void checkAll();

        /**
         * Removes the ReplicaSetMonitor for the given set name from _sets, which will delete it.
         * If clearSeedCache is true, then the cached seed string for this Replica Set will be removed
         * from _setServers.
         */
        static void remove( const string& name, bool clearSeedCache = false );

        static int getMaxFailedChecks() { return _maxFailedChecks; };
        static void setMaxFailedChecks(int numChecks) { _maxFailedChecks = numChecks; };

        /**
         * this is called whenever the config of any replica set changes
         * currently only 1 globally
         * asserts if one already exists
         * ownership passes to ReplicaSetMonitor and the hook will actually never be deleted
         */
        static void setConfigChangeHook( ConfigChangeHook hook );

        ~ReplicaSetMonitor();

        /** @return HostAndPort or throws an exception */
        HostAndPort getMaster();

        /**
         * notify the monitor that server has faild
         */
        void notifyFailure( const HostAndPort& server );

        /**
         * @deprecated use #getCandidateNode instead
         * @return prev if its still ok, and if not returns a random slave that is ok for reads
         */
        HostAndPort getSlave( const HostAndPort& prev );

        /**
         * @param  preferLocal  Prefer a local secondary, otherwise pick any
         *                      secondary, or fall back to primary
         * @return a random slave that is ok for reads
         */
        HostAndPort getSlave( bool preferLocal = true );

        /**
         * notify the monitor that server has faild
         */
        void notifySlaveFailure( const HostAndPort& server );

        /**
         * checks for current master and new secondaries
         */
        void check();

        string getName() const { return _name; }

        string getServerAddress() const;

        bool contains( const string& server ) const;
        
        void appendInfo( BSONObjBuilder& b ) const;

        /**
         * Set the threshold value (in ms) for a node to be considered local.
         * NOTE:  This function acquires the _lock mutex.
         **/
        void setLocalThresholdMillis( const int millis );

        /**
         * @return true if the host is compatible with the given readPreference and tag set.
         */
        bool isHostCompatible(const HostAndPort& host, ReadPreference readPreference,
                const TagSet* tagSet) const;

        /**
         * Performs a quick check if at least one node is up based on the cached
         * view of the set.
         *
         * @return true if any node is ok
         */
        bool isAnyNodeOk() const;

    private:
        /**
         * This populates a list of hosts from the list of seeds (discarding the
         * seed list). Should only be called from within _setsLock.
         * @param name set name
         * @param servers seeds
         */
        ReplicaSetMonitor( const string& name , const vector<HostAndPort>& servers );

        static void _remove_inlock( const string& name, bool clearSeedCache = false );

        /**
         * Checks all connections from the host list and sets the current
         * master.
         */
        void _check();

        /**
         * Add array of hosts to host list. Doesn't do anything if hosts are
         * already in host list.
         * @param hostList the list of hosts to add
         * @param changed if new hosts were added
         */
        void _checkHosts(const BSONObj& hostList, bool& changed);

        /**
         * Updates host list.
         * Invariant: if nodesOffset is >= 0, _nodes[nodesOffset].conn should be
         *  equal to conn.
         *
         * @param conn the connection to check
         * @param maybePrimary OUT
         * @param verbose
         * @param nodesOffset - offset into _nodes array, -1 for not in it
         *
         * @return true if the connection is good or false if invariant
         *   is broken
         */
        bool _checkConnection( DBClientConnection* conn, string& maybePrimary,
                bool verbose, int nodesOffset );

        /**
         * Save the seed list for the current set into the _setServers map
         * Should only be called if you're already holding _setsLock and this
         * monitor's _lock.
         */
        void _cacheServerAddresses_inlock();

        string _getServerAddress_inlock() const;

        NodeDiff _getHostDiff_inlock( const BSONObj& hostList );
        bool _shouldChangeHosts( const BSONObj& hostList, bool inlock );

        /**
         * @return the index to _nodes corresponding to the server address.
         */
        int _find( const string& server ) const ;
        int _find_inlock( const string& server ) const ;

        /**
         * Checks whether the given connection matches the connection stored in _nodes.
         * Mainly used for sanity checking to confirm that nodeOffset still
         * refers to the right connection after releasing and reacquiring
         * a mutex.
         */
        bool _checkConnMatch_inlock( DBClientConnection* conn, size_t nodeOffset ) const;

        /**
         * Populates the local view of the set using the list of servers.
         *
         * Invariants:
         * 1. Should be called while holding _setsLock and while not holding _lock since
         *    this calls #_checkConnection, which locks _checkConnectionLock
         * 2. _nodes should be empty before this is called
         */
        void _populateHosts_inSetsLock(const std::vector<HostAndPort>& seedList);

        // protects _localThresholdMillis, _nodes and refs to _nodes
        // (eg. _master & _lastReadPrefHost)
        mutable mongo::mutex _lock;

        /**
         * "Synchronizes" the _checkConnection method. Should ideally be one mutex per
         * connection object being used. The purpose of this lock is to make sure that
         * the reply from the connection the lock holder got is the actual response
         * to what it sent.
         *
         * Deadlock WARNING: never acquire this while holding _lock
         */
        mutable mongo::mutex  _checkConnectionLock;

        string _name;

        /**
         * Host list.
         */
        std::vector<Node> _nodes;
        int _master; // which node is the current master.  -1 means no master is known
        int _nextSlave; // which node is the current slave, only used by the deprecated getSlave

        // last host returned by _selectNode, used for round robin selection
        HostAndPort _lastReadPrefHost;

        // The number of consecutive times the set has been checked and every member in the set was down.
        int _failedChecks;

        static mongo::mutex _setsLock; // protects _sets and _setServers
        static map<string,ReplicaSetMonitorPtr> _sets; // set name to Monitor
        static map<string,vector<HostAndPort> > _seedServers; // set name to seed list. Used to rebuild the monitor if it is cleaned up but then the set is accessed again.

        static ConfigChangeHook _hook;
        int _localThresholdMillis; // local ping latency threshold (protected by _lock)

        static int _maxFailedChecks;
    };

    /** Use this class to connect to a replica set of servers.  The class will manage
       checking for which server in a replica set is master, and do failover automatically.

       This can also be used to connect to replica pairs since pairs are a subset of sets

       On a failover situation, expect at least one operation to return an error (throw
       an exception) before the failover is complete.  Operations are not retried.
    */
    class DBClientReplicaSet : public DBClientBase {
    public:
        using DBClientBase::query;
        using DBClientBase::update;
        using DBClientBase::remove;

        /** Call connect() after constructing. autoReconnect is always on for DBClientReplicaSet connections. */
        DBClientReplicaSet( const string& name , const vector<HostAndPort>& servers, double so_timeout=0 );
        virtual ~DBClientReplicaSet();

        /**
         * Returns false if no member of the set were reachable. This object
         * can still be used even when false was returned as it will try to
         * reconnect when you use it later.
         */
        bool connect();

        /**
         * Logs out the connection for the given database.
         *
         * @param dbname the database to logout from.
         * @param info the result object for the logout command (provided for backwards
         *     compatibility with mongo shell)
         */
        virtual void logout(const string& dbname, BSONObj& info);

        // ----------- simple functions --------------

        /** throws userassertion "no master found" */
        virtual auto_ptr<DBClientCursor> query(const string &ns, Query query, int nToReturn = 0, int nToSkip = 0,
                                               const BSONObj *fieldsToReturn = 0, int queryOptions = 0 , int batchSize = 0 );

        /** throws userassertion "no master found" */
        virtual BSONObj findOne(const string &ns, const Query& query, const BSONObj *fieldsToReturn = 0, int queryOptions = 0);

        virtual void insert( const string &ns , BSONObj obj , int flags=0);

        /** insert multiple objects.  Note that single object insert is asynchronous, so this version
            is only nominally faster and not worth a special effort to try to use.  */
        virtual void insert( const string &ns, const vector< BSONObj >& v , int flags=0);

        virtual void remove( const string &ns , Query obj , int flags );

        virtual void update( const string &ns , Query query , BSONObj obj , int flags );

        virtual void killCursor( long long cursorID );

        // ---- access raw connections ----

        /**
         * WARNING: this method is very dangerous - this object can decide to free the
         *     returned master connection any time.
         *
         * @return the reference to the address that points to the master connection.
         */
        DBClientConnection& masterConn();

        /**
         * WARNING: this method is very dangerous - this object can decide to free the
         *     returned master connection any time. This can also unpin the cached
         *     slaveOk/read preference connection.
         *
         * @return the reference to the address that points to a secondary connection.
         */
        DBClientConnection& slaveConn();

        // ---- callback pieces -------

        virtual void say( Message &toSend, bool isRetry = false , string* actualServer = 0);
        virtual bool recv( Message &toRecv );
        virtual void checkResponse( const char* data, int nReturned, bool* retry = NULL, string* targetHost = NULL );

        /* this is the callback from our underlying connections to notify us that we got a "not master" error.
         */
        void isntMaster();

        /* this is used to indicate we got a "not master or secondary" error from a secondary.
         */
        void isntSecondary();

        // ----- status ------

        virtual bool isFailed() const { return ! _master || _master->isFailed(); }

        // ----- informational ----

        double getSoTimeout() const { return _so_timeout; }

        string toString() { return getServerAddress(); }

        string getServerAddress() const;

        virtual ConnectionString::ConnectionType type() const { return ConnectionString::SET; }
        virtual bool lazySupported() const { return true; }

        // ---- low level ------

        virtual bool call( Message &toSend, Message &response, bool assertOk=true , string * actualServer = 0 );
        virtual bool callRead( Message& toSend , Message& response ) { return checkMaster()->callRead( toSend , response ); }


    protected:
        /** Authorize.  Authorizes all nodes as needed
        */
        virtual void _auth(const BSONObj& params);

        virtual void sayPiggyBack( Message &toSend ) { checkMaster()->say( toSend ); }

    private:
        /**
         * Used to simplify slave-handling logic on errors
         *
         * @return back the passed cursor
         * @throws DBException if the directed node cannot accept the query because it
         *     is not a master
         */
        auto_ptr<DBClientCursor> checkSlaveQueryResult( auto_ptr<DBClientCursor> result );

        DBClientConnection * checkMaster();

        /**
         * Helper method for selecting a node based on the read preference. Will advance
         * the tag tags object if it cannot find a node that matches the current tag.
         *
         * @param readPref the preference to use for selecting a node.
         *
         * @return a pointer to the new connection object if it can find a good connection.
         *     Otherwise it returns NULL.
         *
         * @throws DBException when an error occurred either when trying to connect to
         *     a node that was thought to be ok or when an assertion happened.
         */
        DBClientConnection* selectNodeUsingTags(shared_ptr<ReadPreferenceSetting> readPref);

        /**
         * @return true if the last host used in the last slaveOk query is still in the
         * set and can be used for the given read preference.
         */
        bool checkLastHost(const ReadPreferenceSetting* readPref);

        /**
         * Destroys all cached information about the last slaveOk operation.
         */
        void invalidateLastSlaveOkCache();

        void _auth( DBClientConnection * conn );

        /**
         * Maximum number of retries to make for auto-retry logic when performing a slave ok
         * operation.
         */
        static const size_t MAX_RETRY;

        // Throws a DBException if the monitor doesn't exist and there isn't a cached seed to use.
        ReplicaSetMonitorPtr _getMonitor() const;

        string _setName;

        HostAndPort _masterHost;
        // Note: reason why this is a shared_ptr is because we want _lastSlaveOkConn to
        // keep a reference of the _master connection when it selected a primary node.
        // This is because the primary connection is special in mongos - it is the only
        // connection that is versioned.
        // WARNING: do not assign this variable (which will increment the internal ref
        // counter) to any other variable other than _lastSlaveOkConn.
        boost::shared_ptr<DBClientConnection> _master;

        // Last used host in a slaveOk query (can be a primary)
        HostAndPort _lastSlaveOkHost;
        // Last used connection in a slaveOk query (can be a primary)
        boost::shared_ptr<DBClientConnection> _lastSlaveOkConn;
        boost::shared_ptr<ReadPreferenceSetting> _lastReadPref;
        
        double _so_timeout;

        // we need to store so that when we connect to a new node on failure
        // we can re-auth
        // this could be a security issue, as the password is stored in memory
        // not sure if/how we should handle
        std::map<string, BSONObj> _auths; // dbName -> auth parameters

    protected:

        /**
         * for storing (non-threadsafe) information between lazy calls
         */
        class LazyState {
        public:
            LazyState() : _lastClient( NULL ), _lastOp( -1 ), _slaveOk( false ), _retries( 0 ) {}
            DBClientConnection* _lastClient;
            int _lastOp;
            bool _slaveOk;
            int _retries;

        } _lazyState;

    };

    /**
     * A simple object for representing the list of tags. The initial state will
     * have a valid current tag as long as the list is not empty.
     */
    class TagSet {
    public:
        /**
         * Creates an empty tag list that is initially exhausted.
         */
        TagSet();

        /**
         * Creates a copy of the given TagSet. The new copy will have the
         * iterator pointing at the initial position.
         */
        explicit TagSet(const TagSet& other);

        /**
         * Creates a tag set object that lazily iterates over the tag list.
         *
         * @param tags the list of tags associated with this option. This object
         *     will get a shared copy of the list. Therefore, it is important
         *     for the the given tag to live longer than the created tag set.
         */
        explicit TagSet(const BSONArray& tags);

        /**
         * Advance to the next tag.
         *
         * @throws AssertionException if iterator is exhausted before this is called.
         */
        void next();

        //
        // Getters
        //

        /**
         * @return the current tag. Returned tag is invalid if isExhausted is true.
         */
        const BSONObj& getCurrentTag() const;

        /**
         * @return true if the iterator has been exhausted.
         */
        bool isExhausted() const;

        /**
         * @return an unordered iterator to the tag list. The caller is responsible for
         *     destroying the returned iterator.
         */
        BSONObjIterator* getIterator() const;

        /**
         * @returns true if the other TagSet has the same tag set specification with
         *     this tag set, disregarding where the current iterator is pointing to.
         */
        bool equals(const TagSet& other) const;

    private:
        /**
         * This is purposely undefined as the semantics for assignment can be
         * confusing. This is because BSONArrayIteratorSorted shouldn't be
         * copied (because of how it manages internal buffer).
         */
        TagSet& operator=(const TagSet& other);
        BSONObj _currentTag;
        bool _isExhausted;

        // Important: do not re-order _tags & _tagIterator
        BSONArray _tags;
        BSONArrayIteratorSorted _tagIterator;
    };

    struct ReadPreferenceSetting {
        /**
         * @parm pref the read preference mode.
         * @param tag the tag set. Note that this object will have the
         *     tag set will have this in a reset state (meaning, this
         *     object's copy of tag will have the iterator in the initial
         *     position).
         */
        ReadPreferenceSetting(ReadPreference pref, const TagSet& tag):
            pref(pref), tags(tag) {
        }

        inline bool equals(const ReadPreferenceSetting& other) const {
            return pref == other.pref && tags.equals(other.tags);
        }

        const ReadPreference pref;
        TagSet tags;
    };
}