summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/data_replicator.h
blob: 8f1717db09a88939cad014ec8e2720fb864bc511 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/**
 *    Copyright (C) 2015 MongoDB Inc.
 *
 *    This program is free software: you can redistribute it and/or  modify
 *    it under the terms of the GNU Affero General Public License, version 3,
 *    as published by the Free Software Foundation.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    GNU Affero General Public License for more details.
 *
 *    You should have received a copy of the GNU Affero General Public License
 *    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the GNU Affero General Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */


#pragma once

#include <vector>

#include "mongo/platform/basic.h"

#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/applier.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/reporter.h"
#include "mongo/stdx/thread.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/queue.h"

namespace mongo {

class QueryFetcher;

namespace repl {

using Operations = Applier::Operations;
using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>;
using CallbackArgs = ReplicationExecutor::CallbackArgs;
using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>;
using CommandCallbackArgs = ReplicationExecutor::RemoteCommandCallbackArgs;
using Event = ReplicationExecutor::EventHandle;
using Handle = ReplicationExecutor::CallbackHandle;
using LockGuard = stdx::lock_guard<stdx::mutex>;
using NextAction = Fetcher::NextAction;
using Request = RemoteCommandRequest;
using Response = RemoteCommandResponse;
using TimestampStatus = StatusWith<Timestamp>;
using UniqueLock = stdx::unique_lock<stdx::mutex>;

class OplogFetcher;
struct InitialSyncState;

/** State for decision tree */
enum class DataReplicatorState {
    Steady, // Default
    InitialSync,
    Rollback,
    Uninitialized,
};

std::string toString(DataReplicatorState s);

// TBD -- ignore for now
enum class DataReplicatorScope {
    ReplicateAll,
    ReplicateDB,
    ReplicateCollection
};

struct DataReplicatorOptions {
    // Error and retry values
    Milliseconds syncSourceRetryWait{1000};
    Milliseconds initialSyncRetryWait{1000};
    Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10};
    Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10};

    // Replication settings
    Timestamp startOptime;
    NamespaceString localOplogNS = NamespaceString("local.oplog.rs");
    NamespaceString remoteOplogNS = NamespaceString("local.oplog.rs");

    // TBD -- ignore below for now
    DataReplicatorScope scope = DataReplicatorScope::ReplicateAll;
    std::string scopeNS;
    BSONObj filterCriteria;
    HostAndPort syncSource; // for use without replCoord -- maybe some kind of rsMonitor/interface

    // TODO: replace with real applier function
    Applier::ApplyOperationFn applierFn = [] (OperationContext*, const BSONObj&) -> Status {
        return Status::OK();
    };

    std::string toString() const {
        return str::stream() << "DataReplicatorOptions -- "
                             << " localOplogNs: " << localOplogNS.toString()
                             << " remoteOplogNS: " << remoteOplogNS.toString()
                             << " syncSource: " << syncSource.toString()
                             << " startOptime: " << startOptime.toString();
    }
};

/**
 * The data replicator provides services to keep collection in sync by replicating
 * changes via an oplog source to the local system storage.
 *
 * This class will use existing machinery like the Executor to schedule work and
 * network tasks, as well as provide serial access and synchronization of state.
 */
class DataReplicator {
public:
    /** Function to call when a batch is applied. */
    using OnBatchCompleteFn = stdx::function<void (const Timestamp&)>;

    DataReplicator(DataReplicatorOptions opts,
                   ReplicationExecutor* exec,
                   ReplicationCoordinator* replCoord);
    /**
     * Used by non-replication coordinator processes, like sharding.
     */
    DataReplicator(DataReplicatorOptions opts,
                   ReplicationExecutor* exec);

    /**
     * Used for testing.
     */
    DataReplicator(DataReplicatorOptions opts,
                   ReplicationExecutor* exec,
                   ReplicationCoordinator* replCoord,
                   OnBatchCompleteFn batchCompletedFn);

    virtual ~DataReplicator();

    Status start();
    Status shutdown();

    /**
     * Cancels outstanding work and begins shutting down.
     */
    Status scheduleShutdown();

    /**
     * Waits for data replicator to finish shutting down.
     * Data replicator will go into uninitialized state.
     */
    void waitForShutdown();

    // Resumes apply replication events from the oplog
    Status resume(bool wait=false);

    // Pauses replication and application
    Status pause();

    // Pauses replication and waits to return until all un-applied ops have been applied
    TimestampStatus flushAndPause();

    // Called when a slave has progressed to a new oplog position
    void slavesHaveProgressed();

    // just like initialSync but can be called anytime.
    TimestampStatus resync();

    // Don't use above methods before these
    TimestampStatus initialSync();

    DataReplicatorState getState() const;
    Timestamp getLastTimestampFetched() const;
    std::string getDiagnosticString() const;

    // For testing only

    void _resetState_inlock(Timestamp lastAppliedOptime);
    void __setSourceForTesting(HostAndPort src) { _syncSource = src; }
    void _setInitialSyncStorageInterface(CollectionCloner::StorageInterface* si);

private:

    // Returns OK when there is a good syncSource at _syncSource.
    Status _ensureGoodSyncSource_inlock();

    // Only executed via executor
    void _resumeFinish(CallbackArgs cbData);
    void _onOplogFetchFinish(const QueryResponseStatus& fetchResult,
                             Fetcher::NextAction* nextAction);
    void _doNextActions();
    void _doNextActions_InitialSync_inlock();
    void _doNextActions_Rollback_inlock();
    void _doNextActions_Steady_inlock();

    // Applies up till the specified Timestamp and pauses automatic application
    Timestamp _applyUntilAndPause(Timestamp);
    Timestamp _applyUntil(Timestamp);
    void _pauseApplier();

    Operations _getNextApplierBatch_inlock();
    void _onApplyBatchFinish(const CallbackArgs&,
                             const TimestampStatus&,
                             const Operations&,
                             const size_t numApplied);
    void _handleFailedApplyBatch(const TimestampStatus&, const Operations&);
    // Fetches the last doc from the first operation, and reschedules the apply for the ops.
    void _scheduleApplyAfterFetch(const Operations&);
    void _onMissingFetched(const QueryResponseStatus& fetchResult,
                           Fetcher::NextAction* nextAction,
                           const Operations& ops,
                           const NamespaceString nss);

    void _onDataClonerFinish(const Status& status);
    // Called after _onDataClonerFinish when the new Timestamp is avail, to use for minvalid
    void _onApplierReadyStart(const QueryResponseStatus& fetchResult,
                              Fetcher::NextAction* nextAction);

    Status _scheduleApplyBatch();
    Status _scheduleApplyBatch_inlock();
    Status _scheduleApplyBatch_inlock(const Operations& ops);
    Status _scheduleFetch();
    Status _scheduleFetch_inlock();
    Status _scheduleReport();

    void _cancelAllHandles_inlock();
    void _waitOnAll_inlock();
    bool _anyActiveHandles_inlock() const;

    Status _shutdown();
    void _changeStateIfNeeded();

    // Set during construction
    const DataReplicatorOptions _opts;
    ReplicationExecutor* _exec;
    ReplicationCoordinator* _replCoord;

    //
    // All member variables are labeled with one of the following codes indicating the
    // synchronization rules for accessing them.
    //
    // (R)  Read-only in concurrent operation; no synchronization required.
    // (S)  Self-synchronizing; access in any way from any context.
    // (PS) Pointer is read-only in concurrent operation, item pointed to is self-synchronizing;
    //      Access in any context.
    // (M)  Reads and writes guarded by _mutex
    // (X)  Reads and writes must be performed in a callback in _exec
    // (MX) Must hold _mutex and be in a callback in _exec to write; must either hold
    //      _mutex or be in a callback in _exec to read.
    // (I)  Independently synchronized, see member variable comment.

    // Protects member data of this ReplicationCoordinator.
    mutable stdx::mutex _mutex;                                                         // (S)
    DataReplicatorState _state;                                                        // (MX)

    // initial sync state
    std::unique_ptr<InitialSyncState> _initialSyncState;                                // (M)
    CollectionCloner::StorageInterface* _storage;                                       // (M)

    // set during scheduling and onFinish
    bool _fetcherPaused;                                                                // (X)
    std::unique_ptr<OplogFetcher> _fetcher;                                             // (S)
    std::unique_ptr<QueryFetcher> _tmpFetcher;                                          // (S)

    bool _reporterPaused;                                                               // (M)
    Handle  _reporterHandle;                                                            // (M)
    std::unique_ptr<Reporter> _reporter;                                                // (M)

    bool _applierActive;                                                                // (M)
    bool _applierPaused;                                                                // (X)
    std::unique_ptr<Applier> _applier;                                                  // (M)
    OnBatchCompleteFn _batchCompletedFn;                                                // (M)


    HostAndPort _syncSource;                                                            // (M)
    Timestamp _lastTimestampFetched;                                                    // (MX)
    Timestamp _lastTimestampApplied;                                                    // (MX)
    BlockingQueue<BSONObj> _oplogBuffer;                                                // (M)

    // Shutdown
    Event _onShutdown;                                                                  // (M)

    // Rollback stuff
    Timestamp _rollbackCommonOptime;                                                    // (MX)
};

} // namespace repl
} // namespace mongo