summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_check_resume_token.cpp
blob: fab7fcf2e5f1cea181d4a2016cb8be1ecfc6b893 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314

/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#include "mongo/platform/basic.h"

#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source_check_resume_token.h"

using boost::intrusive_ptr;
namespace mongo {
namespace {

using ResumeStatus = DocumentSourceEnsureResumeTokenPresent::ResumeStatus;

// Returns ResumeStatus::kFoundToken if the document retrieved from the resumed pipeline satisfies
// the client's resume token, ResumeStatus::kCheckNextDoc if it is older than the client's token,
// and ResumeToken::kCannotResume if it is more recent than the client's resume token (indicating
// that we will never see the token). If the resume token's documentKey contains only the _id field
// while the pipeline documentKey contains additional fields, then the collection has become
// sharded since the resume token was generated. In that case, we relax the requirements such that
// only the timestamp, version, applyOpsIndex, UUID and documentKey._id need match. This remains
// correct, since the only circumstances under which the resume token omits the shard key is if it
// was generated either (1) before the collection was sharded, (2) after the collection was sharded
// but before the primary shard became aware of that fact, implying that it was before the first
// chunk moved off the shard, or (3) by a malicious client who has constructed their own resume
// token. In the first two cases, we can be guaranteed that the _id is unique and the stream can
// therefore be resumed seamlessly; in the third case, the worst that can happen is that some
// entries are missed or duplicated. Note that the simple collation is used to compare the resume
// tokens, and that we purposefully avoid the user's requested collation if present.
ResumeStatus compareAgainstClientResumeToken(const intrusive_ptr<ExpressionContext>& expCtx,
                                             const Document& documentFromResumedStream,
                                             const ResumeTokenData& tokenDataFromClient) {
    // Parse the stream doc into comprehensible ResumeTokenData.
    auto tokenDataFromResumedStream =
        ResumeToken::parse(documentFromResumedStream["_id"].getDocument()).getData();

    // We start the resume with a $gte query on the timestamp, so we never expect it to be lower
    // than our resume token's timestamp.
    invariant(tokenDataFromResumedStream.clusterTime >= tokenDataFromClient.clusterTime);

    // If the clusterTime differs from the client's token, this stream cannot be resumed.
    if (tokenDataFromResumedStream.clusterTime != tokenDataFromClient.clusterTime) {
        return ResumeStatus::kSurpassedToken;
    }

    // If the tokenType exceeds the client token's type, then we have passed the resume token point.
    // This can happen if the client resumes from a synthetic 'high water mark' token from another
    // shard which happens to have the same clusterTime as an actual change on this shard.
    if (tokenDataFromResumedStream.tokenType != tokenDataFromClient.tokenType) {
        return tokenDataFromResumedStream.tokenType > tokenDataFromClient.tokenType
            ? ResumeStatus::kSurpassedToken
            : ResumeStatus::kCheckNextDoc;
    }

    if (tokenDataFromResumedStream.applyOpsIndex < tokenDataFromClient.applyOpsIndex) {
        return ResumeStatus::kCheckNextDoc;
    } else if (tokenDataFromResumedStream.applyOpsIndex > tokenDataFromClient.applyOpsIndex) {
        // This could happen if the client provided an applyOpsIndex of 0, yet the 0th document in
        // the applyOps was irrelevant (meaning it was an operation on a collection or DB not being
        // watched). If we are looking for the resume token on a shard then this simply means that
        // the resume token may be on a different shard; otherwise, it indicates a corrupt token.
        uassert(50792, "Invalid resumeToken: applyOpsIndex was skipped", expCtx->needsMerge);
        // We are running on a merging shard. Signal that we have read beyond the resume token.
        return ResumeStatus::kSurpassedToken;
    }

    // It is acceptable for the stream UUID to differ from the client's, if this is a whole-database
    // or cluster-wide stream and we are comparing operations from different shards at the same
    // clusterTime. If the stream UUID sorts after the client's, however, then the stream is not
    // resumable; we are past the point in the stream where the token should have appeared.
    if (tokenDataFromResumedStream.uuid != tokenDataFromClient.uuid) {
        // If we are running on a replica set deployment, we don't ever expect to see identical time
        // stamps and applyOpsIndex but differing UUIDs, and we reject the resume attempt at once.
        if (!expCtx->inMongos && !expCtx->needsMerge) {
            return ResumeStatus::kSurpassedToken;
        }
        // Otherwise, return a ResumeStatus based on the sort-order of the client and stream UUIDs.
        return tokenDataFromResumedStream.uuid > tokenDataFromClient.uuid
            ? ResumeStatus::kSurpassedToken
            : ResumeStatus::kCheckNextDoc;
    }

    // If all the fields match exactly, then we have found the token.
    if (ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey ==
                                            tokenDataFromClient.documentKey)) {
        return ResumeStatus::kFoundToken;
    }

    // At this point, we know that the tokens differ only by documentKey. The status we return will
    // depend on whether the stream token is logically before or after the client token. If the
    // latter, then we will never see the resume token and the stream cannot be resumed. However,
    // before we can return this value, we need to check the possibility that the resumed stream is
    // on a sharded collection and the client token is from before the collection was sharded.
    const auto defaultResumeStatus =
        ValueComparator::kInstance.evaluate(tokenDataFromResumedStream.documentKey >
                                            tokenDataFromClient.documentKey)
        ? ResumeStatus::kSurpassedToken
        : ResumeStatus::kCheckNextDoc;

    // If we're not running in a sharded context, we don't need to proceed any further.
    if (!expCtx->needsMerge && !expCtx->inMongos) {
        return defaultResumeStatus;
    }

    // If we reach here, we still need to check the possibility that the collection has become
    // sharded in the time since the client's resume token was generated. If so, then the client
    // token will only have an _id field, while the token from the new pipeline may have additional
    // shard key fields.

    // We expect the documentKey to be an object in both the client and stream tokens. If either is
    // not, then we cannot compare the embedded _id values in each, and so the stream token does not
    // satisfy the client token.
    if (tokenDataFromClient.documentKey.getType() != BSONType::Object ||
        tokenDataFromResumedStream.documentKey.getType() != BSONType::Object) {
        return defaultResumeStatus;
    }

    auto documentKeyFromResumedStream = tokenDataFromResumedStream.documentKey.getDocument();
    auto documentKeyFromClient = tokenDataFromClient.documentKey.getDocument();

    // In order for the relaxed comparison to be applicable, the client token must have a single _id
    // field, and the resumed stream token must have additional fields beyond _id.
    if (!(documentKeyFromClient.size() == 1 && documentKeyFromResumedStream.size() > 1)) {
        return defaultResumeStatus;
    }

    // If the resume token's documentKey only contains the _id field while the pipeline's
    // documentKey contains additional fields, we require only that the _ids match.
    return (!documentKeyFromClient["_id"].missing() &&
                    ValueComparator::kInstance.evaluate(documentKeyFromResumedStream["_id"] ==
                                                        documentKeyFromClient["_id"])
                ? ResumeStatus::kFoundToken
                : defaultResumeStatus);
}
}  // namespace

const char* DocumentSourceEnsureResumeTokenPresent::getSourceName() const {
    return "$_ensureResumeTokenPresent";
}

Value DocumentSourceEnsureResumeTokenPresent::serialize(
    boost::optional<ExplainOptions::Verbosity> explain) const {
    // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
    // would result in it being created twice.
    return Value();
}

intrusive_ptr<DocumentSourceEnsureResumeTokenPresent>
DocumentSourceEnsureResumeTokenPresent::create(const intrusive_ptr<ExpressionContext>& expCtx,
                                               ResumeTokenData token) {
    return new DocumentSourceEnsureResumeTokenPresent(expCtx, std::move(token));
}

DocumentSourceEnsureResumeTokenPresent::DocumentSourceEnsureResumeTokenPresent(
    const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
    : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}

DocumentSource::GetNextResult DocumentSourceEnsureResumeTokenPresent::getNext() {
    pExpCtx->checkForInterrupt();

    if (_resumeStatus == ResumeStatus::kFoundToken) {
        // We've already verified the resume token is present.
        return pSource->getNext();
    }

    Document documentFromResumedStream;

    // Keep iterating the stream until we see either the resume token we're looking for,
    // or a change with a higher timestamp than our resume token.
    while (_resumeStatus == ResumeStatus::kCheckNextDoc) {
        auto nextInput = pSource->getNext();

        if (!nextInput.isAdvanced())
            return nextInput;

        // The incoming documents are sorted on clusterTime, uuid, documentKey. We examine a range
        // of documents that have the same prefix (i.e. clusterTime and uuid). If the user provided
        // token would sort before this received document we cannot resume the change stream.
        _resumeStatus = compareAgainstClientResumeToken(
            pExpCtx, (documentFromResumedStream = nextInput.getDocument()), _tokenFromClient);
    }

    uassert(40585,
            str::stream()
                << "resume of change stream was not possible, as the resume token was not found. "
                << documentFromResumedStream["_id"].getDocument().toString(),
            _resumeStatus != ResumeStatus::kSurpassedToken);

    // If we reach this point, then we've seen the resume token.
    invariant(_resumeStatus == ResumeStatus::kFoundToken);

    // Don't return the document which has the token; the user has already seen it.
    return pSource->getNext();
}

const char* DocumentSourceShardCheckResumability::getSourceName() const {
    return "$_checkShardResumability";
}

Value DocumentSourceShardCheckResumability::serialize(
    boost::optional<ExplainOptions::Verbosity> explain) const {
    // This stage is created by the DocumentSourceChangeStream stage, so serializing it here
    // would result in it being created twice.
    return Value();
}

intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
    const intrusive_ptr<ExpressionContext>& expCtx, Timestamp ts) {
    // We are resuming from a point in time, not an event. Seed the stage with a high water mark.
    return create(expCtx, ResumeToken::makeHighWaterMarkToken(ts, expCtx->uuid).getData());
}

intrusive_ptr<DocumentSourceShardCheckResumability> DocumentSourceShardCheckResumability::create(
    const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token) {
    return new DocumentSourceShardCheckResumability(expCtx, std::move(token));
}

DocumentSourceShardCheckResumability::DocumentSourceShardCheckResumability(
    const intrusive_ptr<ExpressionContext>& expCtx, ResumeTokenData token)
    : DocumentSource(expCtx), _tokenFromClient(std::move(token)) {}

DocumentSource::GetNextResult DocumentSourceShardCheckResumability::getNext() {
    pExpCtx->checkForInterrupt();

    if (_surpassedResumeToken)
        return pSource->getNext();

    while (!_surpassedResumeToken) {
        auto nextInput = pSource->getNext();

        // If we hit EOF, check the oplog to make sure that we are able to resume. This prevents us
        // from continually returning EOF in cases where the resume point has fallen off the oplog.
        if (!nextInput.isAdvanced()) {
            _assertOplogHasEnoughHistory(nextInput);
            return nextInput;
        }
        // Determine whether the current event sorts before, equal to or after the resume token.
        auto resumeStatus =
            compareAgainstClientResumeToken(pExpCtx, nextInput.getDocument(), _tokenFromClient);
        switch (resumeStatus) {
            case ResumeStatus::kCheckNextDoc:
                // If the result was kCheckNextDoc, we are resumable but must swallow this event.
                _verifiedOplogHasEnoughHistory = true;
                continue;
            case ResumeStatus::kSurpassedToken:
                // In this case the resume token wasn't found; it must be on another shard. We must
                // examine the oplog to ensure that its history reaches back to before the resume
                // token, otherwise we may have missed events that fell off the oplog. If we can
                // resume, fall through into the following case and set _surpassedResumeToken.
                _assertOplogHasEnoughHistory(nextInput);
            case ResumeStatus::kFoundToken:
                // We found the actual token! Set _surpassedResumeToken and return the result.
                _surpassedResumeToken = true;
                return nextInput;
        }
    }
    MONGO_UNREACHABLE;
}

void DocumentSourceShardCheckResumability::_assertOplogHasEnoughHistory(
    const GetNextResult& nextInput) {
    // If we have already verified that this stream is resumable, return immediately.
    if (_verifiedOplogHasEnoughHistory) {
        return;
    }
    // Look up the first document in the oplog and compare it with the resume token's clusterTime.
    auto firstEntryExpCtx = pExpCtx->copyWith(NamespaceString::kRsOplogNamespace);
    auto matchSpec = BSON("$match" << BSONObj());
    auto pipeline = uassertStatusOK(
        pExpCtx->mongoProcessInterface->makePipeline({matchSpec}, firstEntryExpCtx));
    if (auto first = pipeline->getNext()) {
        auto firstOplogEntry = Value(*first);
        uassert(40576,
                "Resume of change stream was not possible, as the resume point may no longer "
                "be in the oplog. ",
                firstOplogEntry["ts"].getTimestamp() < _tokenFromClient.clusterTime);
    } else {
        // Very unusual case: the oplog is empty.  We can always resume. However, it should never be
        // possible to have obtained a document that matched the filter if the oplog is empty.
        uassert(51087,
                "Oplog was empty but found an event in the change stream pipeline. It should not "
                "be possible for this to happen",
                nextInput.isEOF());
    }
    _verifiedOplogHasEnoughHistory = true;
}
}  // namespace mongo