summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec/change_stream_proxy.cpp
blob: c16255a897be3a02664a1c79bc9d3a956b2d746c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#include "mongo/platform/basic.h"

#include "mongo/db/exec/change_stream_proxy.h"

#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/db/pipeline/resume_token.h"
#include "mongo/db/repl/speculative_majority_read_info.h"

namespace mongo {

const char* ChangeStreamProxyStage::kStageType = "CHANGE_STREAM_PROXY";

ChangeStreamProxyStage::ChangeStreamProxyStage(ExpressionContext* expCtx,
                                               std::unique_ptr<Pipeline, PipelineDeleter> pipeline,
                                               WorkingSet* ws)
    : PipelineProxyStage(expCtx, std::move(pipeline), ws, kStageType) {
    // Set _postBatchResumeToken to the initial PBRT that was added to the expression context during
    // pipeline construction, and use it to obtain the starting time for _latestOplogTimestamp.
    invariant(!_pipeline->getContext()->initialPostBatchResumeToken.isEmpty());
    _postBatchResumeToken = _pipeline->getContext()->initialPostBatchResumeToken.getOwned();
    _latestOplogTimestamp = ResumeToken::parse(_postBatchResumeToken).getData().clusterTime;
}

boost::optional<Document> ChangeStreamProxyStage::getNext() {
    if (auto next = _pipeline->getNext()) {
        // While we have more results to return, we track both the timestamp and the resume token of
        // the latest event observed in the oplog, the latter via its sort key metadata field.
        _validateResumeToken(*next);
        _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp(_pipeline.get());
        _postBatchResumeToken = next->metadata().getSortKey().getDocument().toBson();
        _setSpeculativeReadTimestamp();
        return next;
    }

    // We ran out of results to return. Check whether the oplog cursor has moved forward since the
    // last recorded timestamp. Because we advance _latestOplogTimestamp for every event we return,
    // if the new time is higher than the last then we are guaranteed not to have already returned
    // any events at this timestamp. We can set _postBatchResumeToken to a new high-water-mark token
    // at the current clusterTime.
    auto highWaterMark = PipelineD::getLatestOplogTimestamp(_pipeline.get());
    if (highWaterMark > _latestOplogTimestamp) {
        auto token = ResumeToken::makeHighWaterMarkToken(highWaterMark);
        _postBatchResumeToken = token.toDocument().toBson();
        _latestOplogTimestamp = highWaterMark;
        _setSpeculativeReadTimestamp();
    }
    return boost::none;
}

void ChangeStreamProxyStage::_validateResumeToken(const Document& event) const {
    // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
    if (_includeMetaData) {
        return;
    }
    // Confirm that the document _id field matches the original resume token in the sort key field.
    auto eventBSON = event.toBson();
    auto resumeToken = event.metadata().getSortKey();
    auto idField = eventBSON.getObjectField("_id");
    invariant(!resumeToken.missing());
    uassert(ErrorCodes::ChangeStreamFatalError,
            str::stream() << "Encountered an event whose _id field, which contains the resume "
                             "token, was modified by the pipeline. Modifying the _id field of an "
                             "event makes it impossible to resume the stream from that point. Only "
                             "transformations that retain the unmodified _id field are allowed. "
                             "Expected: "
                          << BSON("_id" << resumeToken) << " but found: "
                          << (eventBSON["_id"] ? BSON("_id" << eventBSON["_id"]) : BSONObj()),
            (resumeToken.getType() == BSONType::Object) &&
                idField.binaryEqual(resumeToken.getDocument().toBson()));
}

void ChangeStreamProxyStage::_setSpeculativeReadTimestamp() {
    repl::SpeculativeMajorityReadInfo& speculativeMajorityReadInfo =
        repl::SpeculativeMajorityReadInfo::get(_pipeline->getContext()->opCtx);
    if (speculativeMajorityReadInfo.isSpeculativeRead() && !_latestOplogTimestamp.isNull()) {
        speculativeMajorityReadInfo.setSpeculativeReadTimestampForward(_latestOplogTimestamp);
    }
}

std::unique_ptr<PlanStageStats> ChangeStreamProxyStage::getStats() {
    std::unique_ptr<PlanStageStats> ret =
        std::make_unique<PlanStageStats>(CommonStats(kStageType), STAGE_CHANGE_STREAM_PROXY);
    ret->specific = std::make_unique<CollectionScanStats>();
    return ret;
}

}  // namespace mongo