summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
blob: 1019454c133dec21d2a1c3be0cb7e7a330294d12 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
/**
 *    Copyright (C) 2018-present MongoDB, Inc.
 *
 *    This program is free software: you can redistribute it and/or modify
 *    it under the terms of the Server Side Public License, version 1,
 *    as published by MongoDB, Inc.
 *
 *    This program is distributed in the hope that it will be useful,
 *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *    Server Side Public License for more details.
 *
 *    You should have received a copy of the Server Side Public License
 *    along with this program. If not, see
 *    <http://www.mongodb.com/licensing/server-side-public-license>.
 *
 *    As a special exception, the copyright holders give permission to link the
 *    code of portions of this program with the OpenSSL library under certain
 *    conditions as described in each individual source file and distribute
 *    linked combinations including the program with the OpenSSL library. You
 *    must comply with the Server Side Public License in all respects for
 *    all of the code used other than as permitted herein. If you modify file(s)
 *    with this exception, you may extend this exception to your version of the
 *    file(s), but you are not obligated to do so. If you do not wish to do so,
 *    delete this exception statement from your version. If you delete this
 *    exception statement from all source files in the program, then also delete
 *    it in the license file.
 */

#include "mongo/platform/basic.h"

#include "mongo/db/pipeline/document_source_lookup_change_post_image.h"

#include "mongo/bson/simple_bsonelement_comparator.h"

namespace mongo {

constexpr StringData DocumentSourceLookupChangePostImage::kStageName;
constexpr StringData DocumentSourceLookupChangePostImage::kFullDocumentFieldName;

namespace {
Value assertFieldHasType(const Document& fullDoc, StringData fieldName, BSONType expectedType) {
    auto val = fullDoc[fieldName];
    uassert(40578,
            str::stream() << "failed to look up post image after change: expected \"" << fieldName
                          << "\" field to have type " << typeName(expectedType)
                          << ", instead found type " << typeName(val.getType()) << ": "
                          << val.toString() << ", full object: " << fullDoc.toString(),
            val.getType() == expectedType);
    return val;
}
}  // namespace

DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::doGetNext() {
    auto input = pSource->getNext();
    if (!input.isAdvanced()) {
        return input;
    }
    auto opTypeVal = assertFieldHasType(
        input.getDocument(), DocumentSourceChangeStream::kOperationTypeField, BSONType::String);
    if (opTypeVal.getString() != DocumentSourceChangeStream::kUpdateOpType) {
        return input;
    }

    MutableDocument output(input.releaseDocument());
    output[kFullDocumentFieldName] = lookupPostImage(output.peek());
    return output.freeze();
}

NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace(
    const Document& inputDoc) const {
    auto namespaceObject =
        assertFieldHasType(inputDoc, DocumentSourceChangeStream::kNamespaceField, BSONType::Object)
            .getDocument();
    auto dbName = assertFieldHasType(namespaceObject, "db"_sd, BSONType::String);
    auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String);
    NamespaceString nss(dbName.getString(), collectionName.getString());

    // Change streams on an entire database only need to verify that the database names match. If
    // the database is 'admin', then this is a cluster-wide $changeStream and we are permitted to
    // lookup into any namespace.
    uassert(40579,
            str::stream() << "unexpected namespace during post image lookup: " << nss.ns()
                          << ", expected " << pExpCtx->ns.ns(),
            nss == pExpCtx->ns ||
                (pExpCtx->isClusterAggregation() || pExpCtx->isDBAggregation(nss.db())));

    return nss;
}

Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updateOp) const {
    // Make sure we have a well-formed input.
    auto nss = assertValidNamespace(updateOp);

    auto documentKey = assertFieldHasType(updateOp,
                                          DocumentSourceChangeStream::kDocumentKeyField,
                                          BSONType::Object)
                           .getDocument();

    // Extract the UUID from resume token and do change stream lookups by UUID.
    auto resumeToken =
        ResumeToken::parse(updateOp[DocumentSourceChangeStream::kIdField].getDocument());

    const auto readConcern = pExpCtx->inMongos
        ? boost::optional<BSONObj>(BSON("level"
                                        << "majority"
                                        << "afterClusterTime" << resumeToken.getData().clusterTime))
        : boost::none;


    // Update lookup queries sent from mongoS to shards are allowed to use speculative majority
    // reads.
    const auto allowSpeculativeMajorityRead = pExpCtx->inMongos;
    invariant(resumeToken.getData().uuid);
    auto lookedUpDoc =
        pExpCtx->mongoProcessInterface->lookupSingleDocument(pExpCtx,
                                                             nss,
                                                             *resumeToken.getData().uuid,
                                                             documentKey,
                                                             readConcern,
                                                             allowSpeculativeMajorityRead);

    // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may
    // not have returned any results if the document was deleted in the time since the update op.
    return (lookedUpDoc ? Value(*lookedUpDoc) : Value(BSONNULL));
}

}  // namespace mongo