summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream.h
blob: 05e6d623dcfb80bdab3bbea4000a5696fc048aa6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#pragma once

#include <type_traits>

#include "mongo/db/pipeline/change_stream_constants.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/pipeline/document_source_change_stream_gen.h"
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_single_document_transformation.h"
#include "mongo/db/pipeline/field_path.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/query/query_feature_flags_gen.h"
#include "mongo/util/intrusive_counter.h"

namespace mongo {

/**
 * The $changeStream stage is an alias for a cursor on oplog followed by a $match stage and a
 * transform stage on mongod.
 */
class DocumentSourceChangeStream final {
public:
    class LiteParsed : public LiteParsedDocumentSource {
    public:
        static std::unique_ptr<LiteParsed> parse(const NamespaceString& nss,
                                                 const BSONElement& spec) {
            uassert(6188500,
                    str::stream() << "$changeStream must take a nested object but found: " << spec,
                    spec.type() == BSONType::Object);
            return std::make_unique<LiteParsed>(spec.fieldName(), nss, spec);
        }

        explicit LiteParsed(std::string parseTimeName, NamespaceString nss, const BSONElement& spec)
            : LiteParsedDocumentSource(std::move(parseTimeName)),
              _nss(std::move(nss)),
              _spec(spec) {}

        bool isChangeStream() const final {
            return true;
        }

        bool allowedToPassthroughFromMongos() const final {
            return false;
        }

        stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
            return stdx::unordered_set<NamespaceString>();
        }

        ActionSet actions{ActionType::changeStream, ActionType::find};
        PrivilegeVector requiredPrivileges(bool isMongos,
                                           bool bypassDocumentValidation) const override {
            if (_nss.isAdminDB() && _nss.isCollectionlessAggregateNS()) {
                // Watching a whole cluster.
                return {Privilege(ResourcePattern::forAnyNormalResource(), actions)};
            } else if (_nss.isCollectionlessAggregateNS()) {
                // Watching a whole database.
                return {Privilege(ResourcePattern::forDatabaseName(_nss.db()), actions)};
            } else {
                // Watching a single collection. Note if this is in the admin database it will fail
                // at parse time.
                return {Privilege(ResourcePattern::forExactNamespace(_nss), actions)};
            }
        }

        ReadConcernSupportResult supportsReadConcern(repl::ReadConcernLevel level,
                                                     bool isImplicitDefault) const {
            // Change streams require "majority" readConcern. If the client did not specify an
            // explicit readConcern, change streams will internally upconvert the readConcern to
            // majority (so clients can always send aggregations without readConcern). We therefore
            // do not permit the cluster-wide default to be applied.
            return onlySingleReadConcernSupported(
                kStageName, repl::ReadConcernLevel::kMajorityReadConcern, level, isImplicitDefault);
        }

        void assertSupportsMultiDocumentTransaction() const {
            transactionNotSupported(kStageName);
        }

        void assertPermittedInAPIVersion(const APIParameters& apiParameters) const final {
            if (apiParameters.getAPIVersion() && *apiParameters.getAPIVersion() == "1" &&
                apiParameters.getAPIStrict().value_or(false)) {
                uassert(ErrorCodes::APIStrictError,
                        "The 'showExpandedEvents' parameter to $changeStream is not supported in "
                        "API Version 1",
                        _spec.Obj()[DocumentSourceChangeStreamSpec::kShowExpandedEventsFieldName]
                            .eoo());

                uassert(
                    ErrorCodes::APIStrictError,
                    "The 'showRawUpdateDescription' parameter to $changeStream is not supported in "
                    "API Version 1",
                    _spec.Obj()[DocumentSourceChangeStreamSpec::kShowRawUpdateDescriptionFieldName]
                        .eoo());

                uassert(
                    ErrorCodes::APIStrictError,
                    "The 'showSystemEvents' parameter to $changeStream is not supported in API "
                    "Version 1",
                    _spec.Obj()[DocumentSourceChangeStreamSpec::kShowSystemEventsFieldName].eoo());
            }
        }

    private:
        const NamespaceString _nss;
        BSONElement _spec;
    };

    // The name of the field where the document key (_id and shard key, if present) will be found
    // after the transformation.
    static constexpr StringData kDocumentKeyField = "documentKey"_sd;

    // The name of the field where the operation description of the non-CRUD operations will be
    // located. This is complementary to the 'documentKey' for CRUD operations.
    static constexpr StringData kOperationDescriptionField = "operationDescription"_sd;

    // The name of the field where the pre-image document will be found, if requested and available.
    static constexpr StringData kFullDocumentBeforeChangeField = "fullDocumentBeforeChange"_sd;

    // The name of the field where the full document will be found after the transformation. The
    // full document is only present for certain types of operations, such as an insert.
    static constexpr StringData kFullDocumentField = "fullDocument"_sd;

    // The name of the field where the pre-image id will be found. Needed for fetching the pre-image
    // from the pre-images collection.
    static constexpr StringData kPreImageIdField = "preImageId"_sd;

    // The name of the field where the change identifier will be located after the transformation.
    static constexpr StringData kIdField = "_id"_sd;

    // The name of the field where the namespace of the change will be located after the
    // transformation.
    static constexpr StringData kNamespaceField = "ns"_sd;

    // Name of the field which stores information about updates. Only applies when OperationType
    // is "update". Note that this field will be omitted if the 'showRawUpdateDescription' option
    // is enabled in the change stream spec.
    static constexpr StringData kUpdateDescriptionField = "updateDescription"_sd;

    // Name of the field which stores the raw update description from the oplog about updates.
    // Only applies when OperationType is "update". Note that this field is only present when
    // the 'showRawUpdateDescription' option is enabled in the change stream spec.
    static constexpr StringData kRawUpdateDescriptionField = "rawUpdateDescription"_sd;

    // The name of the subfield of '_id' where the UUID of the namespace will be located after the
    // transformation.
    static constexpr StringData kUuidField = "uuid"_sd;

    // This UUID field represents all of:
    // 1. The UUID for a particular resharding operation.
    // 2. The UUID for the temporary collection that exists during a resharding operation.
    // 3. The UUID for a collection being resharded, once a resharding operation has completed.
    static constexpr StringData kReshardingUuidField = "reshardingUUID"_sd;

    // The name of the field where the type of the operation will be located after the
    // transformation.
    static constexpr StringData kOperationTypeField = "operationType"_sd;

    // The name of the field where the clusterTime of the change will be located after the
    // transformation. The cluster time will be located inside the change identifier, so the full
    // path to the cluster time will be kIdField + "." + kClusterTimeField.
    static constexpr StringData kClusterTimeField = "clusterTime"_sd;

    // The name of this stage.
    static constexpr StringData kStageName = "$changeStream"_sd;

    static constexpr StringData kTxnNumberField = "txnNumber"_sd;
    static constexpr StringData kLsidField = "lsid"_sd;
    static constexpr StringData kTxnOpIndexField = "txnOpIndex"_sd;
    static constexpr StringData kApplyOpsIndexField = "applyOpsIndex"_sd;
    static constexpr StringData kApplyOpsTsField = "applyOpsTs"_sd;
    static constexpr StringData kRawOplogUpdateSpecField = "rawOplogUpdateSpec"_sd;

    // The target namespace of a rename operation.
    static constexpr StringData kRenameTargetNssField = "to"_sd;

    // Wall time of the corresponding oplog entry.
    static constexpr StringData kWallTimeField = "wallTime"_sd;

    // UUID of a collection corresponding to the event (if applicable).
    static constexpr StringData kCollectionUuidField = "collectionUUID"_sd;

    //
    // The different types of operations we can use for the operation type.
    //

    // The classic change events.
    static constexpr StringData kUpdateOpType = "update"_sd;
    static constexpr StringData kDeleteOpType = "delete"_sd;
    static constexpr StringData kReplaceOpType = "replace"_sd;
    static constexpr StringData kInsertOpType = "insert"_sd;
    static constexpr StringData kDropCollectionOpType = "drop"_sd;
    static constexpr StringData kRenameCollectionOpType = "rename"_sd;
    static constexpr StringData kDropDatabaseOpType = "dropDatabase"_sd;
    static constexpr StringData kInvalidateOpType = "invalidate"_sd;

    // The internal change events that are not exposed to the users.
    static constexpr StringData kReshardBeginOpType = "reshardBegin"_sd;
    static constexpr StringData kReshardDoneCatchUpOpType = "reshardDoneCatchUp"_sd;
    // Internal op type to signal mongos to open cursors on new shards.
    static constexpr StringData kNewShardDetectedOpType = "kNewShardDetected"_sd;

    // These events are guarded behind the 'showExpandedEvents' flag.
    static constexpr StringData kCreateOpType = "create"_sd;
    static constexpr StringData kCreateIndexesOpType = "createIndexes"_sd;
    static constexpr StringData kDropIndexesOpType = "dropIndexes"_sd;
    static constexpr StringData kShardCollectionOpType = "shardCollection"_sd;
    static constexpr StringData kMigrateLastChunkFromShardOpType = "migrateLastChunkFromShard"_sd;

    // Default regex for collections match which prohibits system collections.
    static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd;
    // Regex matching all regular collections plus certain system collections.
    static constexpr StringData kRegexAllCollectionsShowSystemEvents =
        R"((?!(\$|system\.(?!(js$)))))"_sd;

    static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd;
    static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd;

    enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster };

    /**
     * Helpers for Determining which regex to match a change stream against.
     */
    static ChangeStreamType getChangeStreamType(const NamespaceString& nss);
    static std::string getNsRegexForChangeStream(
        const boost::intrusive_ptr<ExpressionContext>& expCtx);
    static std::string getCollRegexForChangeStream(
        const boost::intrusive_ptr<ExpressionContext>& expCtx);
    static std::string getCmdNsRegexForChangeStream(
        const boost::intrusive_ptr<ExpressionContext>& expCtx);
    static StringData resolveAllCollectionsRegex(
        const boost::intrusive_ptr<ExpressionContext>& expCtx);

    static std::string regexEscapeNsForChangeStream(StringData source);

    /**
     * Parses a $changeStream stage from 'elem' and produces the $match and transformation
     * stages required.
     */
    static std::list<boost::intrusive_ptr<DocumentSource>> createFromBson(
        BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);

    /**
     * Helper used by various change stream stages. Used for asserting that a certain Value of a
     * field has a certain type. Will uassert() if the field does not have the expected type.
     */
    static void checkValueType(Value v, StringData fieldName, BSONType expectedType);

    /**
     * Extracts the resume token from the given spec. If a 'startAtOperationTime' is specified,
     * returns the equivalent high-watermark token. This method should only ever be called on a spec
     * where one of 'resumeAfter', 'startAfter', or 'startAtOperationTime' is populated.
     */
    static ResumeTokenData resolveResumeTokenFromSpec(const DocumentSourceChangeStreamSpec& spec);

    /**
     * For a change stream with no resume information supplied by the user, returns the clusterTime
     * at which the new stream should begin scanning the oplog.
     */
    static Timestamp getStartTimeForNewStream(
        const boost::intrusive_ptr<ExpressionContext>& expCtx);

private:
    // Constructs and returns a series of stages representing the full change stream pipeline.
    static std::list<boost::intrusive_ptr<DocumentSource>> _buildPipeline(
        const boost::intrusive_ptr<ExpressionContext>& expCtx, DocumentSourceChangeStreamSpec spec);

    // Helper function which throws if the $changeStream fails any of a series of semantic checks.
    // For instance, whether it is permitted to run given the current FCV, whether the namespace is
    // valid for the options specified in the spec, etc.
    static void assertIsLegalSpecification(const boost::intrusive_ptr<ExpressionContext>& expCtx,
                                           const DocumentSourceChangeStreamSpec& spec);

    // It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
    // instead.
    DocumentSourceChangeStream() = default;
};

/**
 * A LiteParse class to be used to register all internal change stream stages. This class will
 * ensure that all the necessary authentication and input validation checks are applied while
 * parsing.
 */
class LiteParsedDocumentSourceChangeStreamInternal final
    : public DocumentSourceChangeStream::LiteParsed {
public:
    static std::unique_ptr<LiteParsedDocumentSourceChangeStreamInternal> parse(
        const NamespaceString& nss, const BSONElement& spec) {
        return std::make_unique<LiteParsedDocumentSourceChangeStreamInternal>(
            spec.fieldName(), nss, spec);
    }

    LiteParsedDocumentSourceChangeStreamInternal(std::string parseTimeName,
                                                 NamespaceString nss,
                                                 const BSONElement& spec)
        : DocumentSourceChangeStream::LiteParsed(std::move(parseTimeName), std::move(nss), spec) {}

    PrivilegeVector requiredPrivileges(bool isMongos,
                                       bool bypassDocumentValidation) const override final {
        return {Privilege(ResourcePattern::forClusterResource(), ActionType::internal)};
    }
};

}  // namespace mongo