summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog.h
blob: de9d7fb35938d29790cdcd0721f1b3c64edc1005 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include <functional>
#include <string>
#include <vector>

#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/repl/oplog_constraint_violation_logger.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/oplog_entry_or_grouped_inserts.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/session/logical_session_id.h"

namespace mongo {
class Collection;
class CollectionPtr;
class Database;
class NamespaceString;
class OperationContext;
class OperationSessionInfo;
class ScopedCollectionAcquisition;
class Session;

using OplogSlot = repl::OpTime;

struct InsertStatement {
public:
    InsertStatement() = default;
    explicit InsertStatement(BSONObj toInsert) : doc(std::move(toInsert)) {}

    InsertStatement(std::vector<StmtId> statementIds, BSONObj toInsert)
        : stmtIds(statementIds), doc(std::move(toInsert)) {}
    InsertStatement(StmtId stmtId, BSONObj toInsert)
        : InsertStatement(std::vector<StmtId>{stmtId}, std::move(toInsert)) {}

    InsertStatement(std::vector<StmtId> statementIds, BSONObj toInsert, OplogSlot os)
        : stmtIds(statementIds), oplogSlot(std::move(os)), doc(std::move(toInsert)) {}
    InsertStatement(StmtId stmtId, BSONObj toInsert, OplogSlot os)
        : InsertStatement(std::vector<StmtId>{stmtId}, std::move(toInsert), std::move(os)) {}

    InsertStatement(BSONObj toInsert, Timestamp ts, long long term)
        : oplogSlot(repl::OpTime(ts, term)), doc(std::move(toInsert)) {}

    InsertStatement(BSONObj toInsert, RecordId rid)
        : recordId(std::move(rid)), doc(std::move(toInsert)) {}

    std::vector<StmtId> stmtIds = {kUninitializedStmtId};
    OplogSlot oplogSlot;
    RecordId recordId;
    BSONObj doc;
};

namespace repl {
class ReplSettings;

struct OplogLink {
    OplogLink() = default;

    OpTime prevOpTime;
};

/**
 * Set the "lsid", "txnNumber", "stmtId", "prevOpTime" fields of the oplogEntry based on the given
 * oplogLink for retryable writes (i.e. when stmtIds.front() != kUninitializedStmtId).
 *
 * If the given oplogLink.prevOpTime is a null OpTime, both the oplogLink.prevOpTime and the
 * "prevOpTime" field of the oplogEntry will be set to the TransactionParticipant's lastWriteOpTime.
 */
void appendOplogEntryChainInfo(OperationContext* opCtx,
                               MutableOplogEntry* oplogEntry,
                               OplogLink* oplogLink,
                               const std::vector<StmtId>& stmtIds);

/**
 * Create a new capped collection for the oplog if it doesn't yet exist.
 * If the collection already exists (and isReplSet is false),
 * set the 'last' Timestamp from the last entry of the oplog collection (side effect!)
 */
void createOplog(OperationContext* opCtx,
                 const NamespaceString& oplogCollectionName,
                 bool isReplSet);

/*
 * Shortcut for above function using oplogCollectionName = _oplogCollectionName,
 */
void createOplog(OperationContext* opCtx);

/**
 * Returns the optime of the oplog entry written to the oplog.
 * Returns a null optime if oplog was not modified.
 */
OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry);

/**
 * Low level oplog function used by logOp() and similar functions to append
 * storage engine records to the oplog collection.
 *
 * This function has to be called within the scope of a WriteUnitOfWork with
 * a valid CollectionPtr reference to the oplog.
 *
 * @param records a vector of oplog records to be written. Records hold references
 * to unowned BSONObj data.
 * @param timestamps a vector of respective Timestamp objects for each oplog record.
 * @param oplogCollection collection to be written to.
 * @param finalOpTime the OpTime of the last oplog record.
 * @param wallTime the wall clock time of the last oplog record.
 * @param isAbortIndexBuild for tenant migration use only.
 */
void logOplogRecords(OperationContext* opCtx,
                     const NamespaceString& nss,
                     std::vector<Record>* records,
                     const std::vector<Timestamp>& timestamps,
                     const CollectionPtr& oplogCollection,
                     OpTime finalOpTime,
                     Date_t wallTime,
                     bool isAbortIndexBuild);

// Flush out the cached pointer to the oplog.
void clearLocalOplogPtr(ServiceContext* service);

/**
 * Establish the cached pointer to the local oplog.
 */
void acquireOplogCollectionForLogging(OperationContext* opCtx);

/**
 * Use 'oplog' as the new cached pointer to the local oplog.
 *
 * Called by catalog::openCatalog() to re-establish the oplog collection pointer while holding onto
 * the global lock in exclusive mode.
 */
void establishOplogCollectionForLogging(OperationContext* opCtx, const Collection* oplog);

using IncrementOpsAppliedStatsFn = std::function<void()>;

/**
 * This class represents the different modes of oplog application that are used within the
 * replication system. Oplog application semantics may differ depending on the mode.
 *
 * It also includes functions to serialize/deserialize the oplog application mode.
 */
class OplogApplication {
public:
    static constexpr StringData kInitialSyncOplogApplicationMode = "InitialSync"_sd;
    // This only being used in 'applyOps' command when sent by client.
    static constexpr StringData kRecoveringOplogApplicationMode = "Recovering"_sd;
    static constexpr StringData kStableRecoveringOplogApplicationMode = "StableRecovering"_sd;
    static constexpr StringData kUnstableRecoveringOplogApplicationMode = "UnstableRecovering"_sd;
    static constexpr StringData kSecondaryOplogApplicationMode = "Secondary"_sd;
    static constexpr StringData kApplyOpsCmdOplogApplicationMode = "ApplyOps"_sd;

    enum class Mode {
        // Used during the oplog application phase of the initial sync process.
        kInitialSync,

        // Used when we are applying oplog operations to recover the database state following an
        // clean/unclean shutdown, or when we are recovering from the oplog after we rollback to a
        // checkpoint.
        // If recovering from a unstable stable checkpoint.
        kUnstableRecovering,
        // If recovering from a stable checkpoint.~
        kStableRecovering,

        // Used when a secondary node is applying oplog operations from the primary during steady
        // state replication.
        kSecondary,

        // Used when we are applying operations as part of a direct client invocation of the
        // 'applyOps' command.
        kApplyOpsCmd
    };

    static bool inRecovering(Mode mode) {
        return mode == Mode::kUnstableRecovering || mode == Mode::kStableRecovering;
    }

    static StringData modeToString(Mode mode);

    static StatusWith<Mode> parseMode(const std::string& mode);

    // Server will crash on oplog application failure during recovery from stable checkpoint in the
    // test environment.
    static void checkOnOplogFailureForRecovery(OperationContext* opCtx,
                                               const mongo::NamespaceString& nss,
                                               const mongo::BSONObj& oplogEntry,
                                               const std::string& errorMsg);
};

inline std::ostream& operator<<(std::ostream& s, OplogApplication::Mode mode) {
    return (s << OplogApplication::modeToString(mode));
}

/**
 * Logs an oplog constraint violation and writes an entry into the health log.
 */
void logOplogConstraintViolation(OperationContext* opCtx,
                                 const NamespaceString& nss,
                                 OplogConstraintViolationEnum type,
                                 const std::string& operation,
                                 const BSONObj& opObj,
                                 boost::optional<Status> status);

/**
 * Used for applying from an oplog entry or grouped inserts.
 * @param opOrGroupedInserts a single oplog entry or grouped inserts to be applied.
 * @param alwaysUpsert convert some updates to upserts for idempotency reasons
 * @param mode specifies what oplog application mode we are in
 * @param incrementOpsAppliedStats is called whenever an op is applied.
 * Returns failure status if the op was an update that could not be applied.
 */
Status applyOperation_inlock(OperationContext* opCtx,
                             ScopedCollectionAcquisition& collectionAcquisition,
                             const OplogEntryOrGroupedInserts& opOrGroupedInserts,
                             bool alwaysUpsert,
                             OplogApplication::Mode mode,
                             bool isDataConsistent,
                             IncrementOpsAppliedStatsFn incrementOpsAppliedStats = {});

/**
 * Take a command op and apply it locally
 * Used for applying from an oplog and for applyOps command.
 * Returns failure status if the op that could not be applied.
 */
Status applyCommand_inlock(OperationContext* opCtx,
                           const ApplierOperation& op,
                           OplogApplication::Mode mode);

/**
 * Initializes the global Timestamp with the value from the timestamp of the last oplog entry.
 */
void initTimestampFromOplog(OperationContext* opCtx, const NamespaceString& oplogNS);

/**
 * Sets the global Timestamp to be 'newTime'.
 */
void setNewTimestamp(ServiceContext* opCtx, const Timestamp& newTime);

/**
 * Signal any waiting AwaitData queries on the oplog that there is new data or metadata available.
 */
void signalOplogWaiters();

/**
 * Creates a new index in the given namespace.
 */
void createIndexForApplyOps(OperationContext* opCtx,
                            const BSONObj& indexSpec,
                            const NamespaceString& indexNss,
                            OplogApplication::Mode mode);

/**
 * Allocates optimes for new entries in the oplog.  Returns a vector of OplogSlots, which
 * contain the new optimes along with their terms and newly calculated hash fields.
 */
std::vector<OplogSlot> getNextOpTimes(OperationContext* opCtx, std::size_t count);

inline OplogSlot getNextOpTime(OperationContext* opCtx) {
    auto slots = getNextOpTimes(opCtx, 1);
    invariant(slots.size() == 1);
    return slots.back();
}

using ApplyImportCollectionFn = std::function<void(OperationContext*,
                                                   const UUID&,
                                                   const NamespaceString&,
                                                   long long,
                                                   long long,
                                                   const BSONObj&,
                                                   const BSONObj&,
                                                   bool,
                                                   OplogApplication::Mode)>;

void registerApplyImportCollectionFn(ApplyImportCollectionFn func);

}  // namespace repl
}  // namespace mongo