1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
|
/* Copyright 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#pragma once
#include <atomic>
#include <memory>
#include <memory>
#include <set>
#include <string>
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/client/mongo_uri.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
namespace mongo {
class BSONObj;
class ReplicaSetMonitor;
struct ReadPreferenceSetting;
typedef std::shared_ptr<ReplicaSetMonitor> ReplicaSetMonitorPtr;
/**
* Holds state about a replica set and provides a means to refresh the local view.
* All methods perform the required synchronization to allow callers from multiple threads.
*/
class ReplicaSetMonitor : public std::enable_shared_from_this<ReplicaSetMonitor> {
MONGO_DISALLOW_COPYING(ReplicaSetMonitor);
public:
class Refresher;
typedef stdx::function<void(const std::string& setName, const std::string& newConnectionString)>
ConfigChangeHook;
/**
* Initializes local state.
*
* seeds must not be empty.
*/
ReplicaSetMonitor(StringData name, const std::set<HostAndPort>& seeds);
ReplicaSetMonitor(const MongoURI& uri);
/**
* Schedules the initial refresh task into task executor.
*/
void init();
/**
* Returns a host matching the given read preference or an error, if no host matches.
*
* @param readPref Read preference to match against
* @param maxWait If no host is readily available, which matches the specified read preference,
* wait for one to become available for up to the specified time and periodically refresh
* the view of the set. The call may return with an error earlier than the specified value,
* if none of the known hosts for the set are reachable within some number of attempts.
* Note that if a maxWait of 0ms is specified, this method may still attempt to contact
* every host in the replica set up to one time.
*
* Known errors are:
* FailedToSatisfyReadPreference, if node cannot be found, which matches the read preference.
*/
StatusWith<HostAndPort> getHostOrRefresh(const ReadPreferenceSetting& readPref,
Milliseconds maxWait = kDefaultFindHostTimeout);
/**
* Returns the host we think is the current master or uasserts.
*
* This is a thin wrapper around getHostOrRefresh so this will also refresh our view if we
* don't think there is a master at first. The main difference is that this will uassert
* rather than returning an empty HostAndPort.
*/
HostAndPort getMasterOrUassert();
/**
* Returns a refresher object that can be used to update our view of the set.
* If a refresh is currently in-progress, the returned Refresher will participate in the
* current refresh round.
*/
Refresher startOrContinueRefresh();
/**
* Notifies this Monitor that a host has failed because of the specified error 'status' and
* should be considered down.
*
* Call this when you get a connection error. If you get an error while trying to refresh our
* view of a host, call Refresher::failedHost instead because it bypasses taking the monitor's
* mutex.
*/
void failedHost(const HostAndPort& host, const Status& status);
/**
* Returns true if this node is the master based ONLY on local data. Be careful, return may
* be stale.
*/
bool isPrimary(const HostAndPort& host) const;
/**
* Returns true if host is part of this set and is considered up (meaning it can accept
* queries).
*/
bool isHostUp(const HostAndPort& host) const;
/**
* Returns the minimum wire version supported across the replica set.
*/
int getMinWireVersion() const;
/**
* Returns the maximum wire version supported across the replica set.
*/
int getMaxWireVersion() const;
/**
* The name of the set.
*/
std::string getName() const;
/**
* Returns a std::string with the format name/server1,server2.
* If name is empty, returns just comma-separated list of servers.
*/
std::string getServerAddress() const;
/**
* Is server part of this set? Uses only cached information.
*/
bool contains(const HostAndPort& server) const;
/**
* Writes information about our cached view of the set to a BSONObjBuilder.
*/
void appendInfo(BSONObjBuilder& b) const;
/**
* Returns true if the monitor knows a usable primary from it's interal view.
*/
bool isKnownToHaveGoodPrimary() const;
/**
* Marks the instance as removed to exit refresh sooner.
*/
void markAsRemoved();
/**
* Creates a new ReplicaSetMonitor, if it doesn't already exist.
*/
static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(const std::string& name,
const std::set<HostAndPort>& servers);
static std::shared_ptr<ReplicaSetMonitor> createIfNeeded(const MongoURI& uri);
/**
* gets a cached Monitor per name. If the monitor is not found and createFromSeed is false,
* it will return none. If createFromSeed is true, it will try to look up the last known
* servers list for this set and will create a new monitor using that as the seed list.
*/
static std::shared_ptr<ReplicaSetMonitor> get(const std::string& name);
/**
* Removes the ReplicaSetMonitor for the given set name from _sets, which will delete it.
* If clearSeedCache is true, then the cached seed std::string for this Replica Set will be
* removed from _seedServers.
*/
static void remove(const std::string& name);
/**
* Sets the hook to be called whenever the config of any replica set changes.
* Currently only 1 globally, so this asserts if one already exists.
*
* The hook will be called from a fresh thread. It is responsible for initializing any
* thread-local state and ensuring that no exceptions escape.
*
* The hook must not be changed while the program has multiple threads.
*/
static void setAsynchronousConfigChangeHook(ConfigChangeHook hook);
/**
* Sets the hook to be called whenever the config of any replica set changes.
* Currently only 1 globally, so this asserts if one already exists.
*
* The hook will be called inline while refreshing the ReplicaSetMonitor's view of the set
* membership. It is important that the hook not block for long as it will be running under
* the ReplicaSetMonitor's mutex.
*
* The hook must not be changed while the program has multiple threads.
*/
static void setSynchronousConfigChangeHook(ConfigChangeHook hook);
/**
* Permanently stops all monitoring on replica sets and clears all cached information
* as well. As a consequence, NEVER call this if you have other threads that have a
* DBClientReplicaSet instance. This method should be used for unit test only.
*/
static void cleanup();
/**
* Use these to speed up tests by disabling the sleep-and-retry loops and cause errors to be
* reported immediately.
*/
static void disableRefreshRetries_forTest();
/**
* Permanently stops all monitoring on replica sets.
*/
static void shutdown();
/**
* Returns the refresh period that is given to all new SetStates.
*/
static Seconds getDefaultRefreshPeriod();
//
// internal types (defined in replica_set_monitor_internal.h)
//
struct IsMasterReply;
struct ScanState;
struct SetState;
typedef std::shared_ptr<ScanState> ScanStatePtr;
typedef std::shared_ptr<SetState> SetStatePtr;
/**
* Allows tests to set initial conditions and introspect the current state.
*/
explicit ReplicaSetMonitor(const SetStatePtr& initialState) : _state(initialState) {}
~ReplicaSetMonitor();
/**
* The default timeout, which will be used for finding a replica set host if the caller does
* not explicitly specify it.
*/
static const Seconds kDefaultFindHostTimeout;
/**
* Defaults to false, meaning that if multiple hosts meet a criteria we pick one at random.
* This is required by the replica set driver spec. Set this to true in tests that need host
* selection to be deterministic.
*
* NOTE: Used by unit-tests only.
*/
static bool useDeterministicHostSelection;
private:
/**
* Schedules a refresh via the task executor. (Task is automatically canceled in the d-tor.)
*/
void _scheduleRefresh(Date_t when);
/**
* This function refreshes the replica set and calls _scheduleRefresh() again.
*/
void _doScheduledRefresh(const executor::TaskExecutor::CallbackHandle& currentHandle);
// Serializes refresh and protects _refresherHandle
stdx::mutex _mutex;
executor::TaskExecutor::CallbackHandle _refresherHandle;
const SetStatePtr _state;
executor::TaskExecutor* _executor;
AtomicBool _isRemovedFromManager{false};
};
/**
* Refreshes the local view of a replica set.
*
* Use ReplicaSetMonitor::startOrContinueRefresh() to obtain a Refresher.
*
* Multiple threads can refresh a single set without any additional synchronization, however
* they must each use their own Refresher object.
*
* All logic related to choosing the hosts to contact and updating the SetState based on replies
* lives in this class.
*/
class ReplicaSetMonitor::Refresher {
public:
/**
* Contact hosts in the set to refresh our view, but stop once a host matches criteria.
* Returns the matching host or empty if none match after a refresh.
*
* This is called by ReplicaSetMonitor::getHostWithRefresh()
*/
HostAndPort refreshUntilMatches(const ReadPreferenceSetting& criteria);
/**
* Refresh all hosts. Equivalent to refreshUntilMatches with a criteria that never
* matches.
*
* This is intended to be called periodically, possibly from a background thread.
*/
void refreshAll();
//
// Remaining methods are only for testing and internal use.
// Callers are responsible for holding SetState::mutex before calling any of these methods.
//
/**
* Any passed-in pointers are shared with caller.
*
* If no scan is in-progress, this function is responsible for setting up a new scan.
*/
explicit Refresher(const SetStatePtr& setState);
struct NextStep {
enum StepKind {
CONTACT_HOST, /// Contact the returned host
WAIT, /// Wait on condition variable and try again.
DONE, /// No more hosts to contact in this Refresh round
};
explicit NextStep(StepKind step, const HostAndPort& host = HostAndPort())
: step(step), host(host) {}
StepKind step;
HostAndPort host;
};
/**
* Returns the next step to take.
*
* By calling this, you promise to call receivedIsMaster or failedHost if the NextStep is
* CONTACT_HOST.
*/
NextStep getNextStep();
/**
* Call this if a host returned from getNextStep successfully replied to an isMaster call.
* Negative latencyMicros are ignored.
*/
void receivedIsMaster(const HostAndPort& from, int64_t latencyMicros, const BSONObj& reply);
/**
* Call this if a host returned from getNextStep failed to reply to an isMaster call.
*/
void failedHost(const HostAndPort& host, const Status& status);
/**
* Starts a new scan over the hosts in set.
*/
static ScanStatePtr startNewScan(const SetState* set);
private:
/**
* First, checks that the "reply" is not from a stale primary by comparing the electionId of
* "reply" to the maxElectionId recorded by the SetState and returns OK status if "reply"
* belongs to a non-stale primary. Otherwise returns a failed status.
*
* The 'from' parameter specifies the node from which the response is received.
*
* Updates _set and _scan based on set-membership information from a master.
* Applies _scan->unconfirmedReplies to confirmed nodes.
* Does not update this host's node in _set->nodes.
*/
Status receivedIsMasterFromMaster(const HostAndPort& from, const IsMasterReply& reply);
/**
* Adjusts the _scan work queue based on information from this host.
* This should only be called with replies from non-masters.
* Does not update _set at all.
*/
void receivedIsMasterBeforeFoundMaster(const IsMasterReply& reply);
/**
* Shared implementation of refreshUntilMatches and refreshAll.
* NULL criteria means refresh every host.
* Handles own locking.
*/
HostAndPort _refreshUntilMatches(const ReadPreferenceSetting* criteria);
// Both pointers are never NULL
SetStatePtr _set;
ScanStatePtr _scan; // May differ from _set->currentScan if a new scan has started.
};
} // namespace mongo
|