1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
|
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include <functional>
#include <memory>
#include "mongo/base/status.h"
#include "mongo/base/status_with.h"
#include "mongo/client/fetcher.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/executor/task_executor.h"
#include "mongo/platform/mutex.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/time_support.h"
namespace mongo {
class OperationContext;
class Status;
class OperationContext;
namespace repl {
class SyncSourceSelector;
/**
* SyncSourceResolverResponse contains the result from running SyncSourceResolver. This result will
* indicate one of the following:
* 1. A new sync source was selected. isOK() will return true and getSyncSource() will
* return the HostAndPort of the new sync source.
* 2. No sync source was selected. isOK() will return true and getSyncSource() will return
* an empty HostAndPort.
* 3. All potential sync sources are too fresh. isOK() will return false and
* syncSourceStatus will be ErrorCodes::OplogStartMissing and earliestOpTimeSeen will
* contain a new MinValid boundry. getSyncSource() is not valid to call in this state.
*/
struct SyncSourceResolverResponse {
// Contains the new syncSource if syncSourceStatus is OK and the HostAndPort is not empty.
StatusWith<HostAndPort> syncSourceStatus = {ErrorCodes::BadValue, "status not populated"};
// Contains the new MinValid boundry if syncSourceStatus is ErrorCodes::OplogStartMissing.
OpTime earliestOpTimeSeen;
// Rollback ID of the selected sync source.
// The rbid is fetched before the required optime so callers can be sure that as long as the
// rbid is the same, the required optime is still present. The rbid will remain set to
// 'kUninitializedRollbackId' if _requiredOpTime is null.
int rbid = ReplicationProcess::kUninitializedRollbackId;
bool isOK() {
return syncSourceStatus.isOK();
}
HostAndPort getSyncSource() {
invariant(syncSourceStatus.isOK());
return syncSourceStatus.getValue();
}
};
/**
* Supplies a sync source to Fetcher, Rollback and Reporter.
* Obtains sync source candidates to probe from SyncSourceSelector.
* Each instance is created as needed whenever a new sync source is required and
* is meant to be discarded after the sync source resolution is finished - 'onCompletion'
* callback is invoked with the results contained in SyncSourceResolverResponse.
*/
class SyncSourceResolver {
public:
static const NamespaceString kLocalOplogNss;
static const Seconds kFetcherTimeout;
static const Seconds kFetcherErrorDenylistDuration;
static const Seconds kOplogEmptyDenylistDuration;
static const Seconds kFirstOplogEntryEmptyDenylistDuration;
static const Seconds kFirstOplogEntryNullTimestampDenylistDuration;
static const Minutes kTooStaleDenylistDuration;
static const Seconds kNoRequiredOpTimeDenylistDuration;
/**
* Callback function to report final status of resolving sync source.
*/
typedef std::function<void(const SyncSourceResolverResponse&)> OnCompletionFn;
SyncSourceResolver(executor::TaskExecutor* taskExecutor,
SyncSourceSelector* syncSourceSelector,
const OpTime& lastOpTimeFetched,
const OpTime& requiredOpTime,
const OnCompletionFn& onCompletion);
virtual ~SyncSourceResolver();
/**
* Returns true if we are currently probing sync source candidates.
*/
bool isActive() const;
/**
* Starts probing sync source candidates returned by the sync source selector.
*/
Status startup();
/**
* Cancels all remote commands.
*/
void shutdown();
/**
* Block until inactive.
*/
void join();
private:
bool _isActive_inlock() const;
bool _isShuttingDown() const;
/**
* Returns new sync source from selector.
*/
StatusWith<HostAndPort> _chooseNewSyncSource();
/**
* Creates fetcher to read the first oplog entry on sync source.
*/
std::unique_ptr<Fetcher> _makeFirstOplogEntryFetcher(HostAndPort candidate,
OpTime earliestOpTimeSeen);
/**
* Creates fetcher to check the remote oplog for '_requiredOpTime'.
*/
std::unique_ptr<Fetcher> _makeRequiredOpTimeFetcher(HostAndPort candidate,
OpTime earliestOpTimeSeen,
int rbid);
/**
* Schedules fetcher to read oplog on sync source.
* Saves fetcher in '_fetcher' on success.
*/
Status _scheduleFetcher(std::unique_ptr<Fetcher> fetcher);
/**
* Returns optime of first oplog entry from fetcher response.
* Returns null optime on error.
*/
OpTime _parseRemoteEarliestOpTime(const HostAndPort& candidate,
const Fetcher::QueryResponse& queryResponse);
/**
* Callback for fetching first oplog entry on sync source.
*/
void _firstOplogEntryFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult,
HostAndPort candidate,
OpTime earliestOpTimeSeen);
/**
* Schedules a replSetGetRBID command against the candidate to fetch its current rollback id.
*/
Status _scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen);
void _rbidRequestCallback(HostAndPort candidate,
OpTime earliestOpTimeSeen,
const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply);
/**
* Checks query response for required optime.
*/
Status _compareRequiredOpTimeWithQueryResponse(const Fetcher::QueryResponse& queryResponse);
/**
* Callback for checking if the remote oplog contains '_requiredOpTime'.
*/
void _requiredOpTimeFetcherCallback(const StatusWith<Fetcher::QueryResponse>& queryResult,
HostAndPort candidate,
OpTime earliestOpTimeSeen,
int rbid);
/**
* Obtains new sync source candidate and schedules remote command to fetcher first oplog entry.
* May transition state to Complete.
* Returns status that could be used as result for startup().
*/
Status _chooseAndProbeNextSyncSource(OpTime earliestOpTimeSeen);
/**
* Invokes completion callback and transitions state to State::kComplete.
* Returns result.getStatus().
*/
Status _finishCallback(HostAndPort hostAndPort, int rbid);
Status _finishCallback(Status status);
Status _finishCallback(const SyncSourceResolverResponse& response);
// Executor used to send remote commands to sync source candidates.
executor::TaskExecutor* const _taskExecutor;
// Sync source selector used to obtain sync source candidates and for us to denylist non-viable
// candidates.
SyncSourceSelector* const _syncSourceSelector;
// A viable sync source must contain a starting oplog entry with a timestamp equal or earlier
// than the timestamp in '_lastOpTimeFetched'.
const OpTime _lastOpTimeFetched;
// If '_requiredOpTime' is not null, a viable sync source must contain an oplog entry with an
// optime equal to this value.
const OpTime _requiredOpTime;
// This is invoked exactly once after startup. The caller gets the results of the sync source
// resolver via this callback in a SyncSourceResolverResponse struct when the resolver finishes.
const OnCompletionFn _onCompletion;
// Protects members of this sync source resolver defined below.
mutable Mutex _mutex = MONGO_MAKE_LATCH("SyncSourceResolverResponse::_mutex");
mutable stdx::condition_variable _condition;
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
// It is possible to skip intermediate states. For example,
// Calling shutdown() when the resolver has not started will transition from PreStart directly
// to Complete.
enum class State { kPreStart, kRunning, kShuttingDown, kComplete };
State _state = State::kPreStart;
// Fetches first oplog entry on sync source candidate.
std::unique_ptr<Fetcher> _fetcher;
// Holds reference to fetcher in the process of shutting down.
std::unique_ptr<Fetcher> _shuttingDownFetcher;
executor::TaskExecutor::CallbackHandle _rbidCommandHandle;
};
} // namespace repl
} // namespace mongo
|