1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
|
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include <boost/optional.hpp>
#include "mongo/base/string_data.h"
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/pipeline/document.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/util/uuid.h"
namespace mongo {
struct ResumeTokenData {
/**
* Flag to indicate if the resume token is from an invalidate notification.
*/
enum FromInvalidate : bool {
kFromInvalidate = true,
kNotFromInvalidate = false,
};
/**
* Flag to indicate the type of resume token we are generating.
*/
enum TokenType : int {
kHighWaterMarkToken = 0, // Token refers to a point in time, not an event.
kEventToken = 128, // Token refers to an actual event in the stream.
};
ResumeTokenData(){};
ResumeTokenData(Timestamp clusterTimeIn,
int versionIn,
size_t txnOpIndexIn,
const boost::optional<UUID>& uuidIn,
Value documentKeyIn)
: clusterTime(clusterTimeIn),
version(versionIn),
txnOpIndex(txnOpIndexIn),
uuid(uuidIn),
documentKey(std::move(documentKeyIn)){};
bool operator==(const ResumeTokenData& other) const;
bool operator!=(const ResumeTokenData& other) const {
return !(*this == other);
}
Timestamp clusterTime;
int version = 1;
TokenType tokenType = TokenType::kEventToken;
// When a resume token references an operation in a transaction, the 'clusterTime' stores the
// commit time of the transaction, and the 'txnOpIndex' field stores the index of the operation
// within its transaction. Operations that are not in a transaction always have a value of 0 for
// this field.
size_t txnOpIndex = 0;
// Flag to indicate that this resume token is from an "invalidate" entry. This will not be set
// on a token from a command that *would* invalidate a change stream, but rather the invalidate
// notification itself.
FromInvalidate fromInvalidate = FromInvalidate::kNotFromInvalidate;
boost::optional<UUID> uuid;
Value documentKey;
};
std::ostream& operator<<(std::ostream& out, const ResumeTokenData& tokenData);
/**
* A token passed in by the user to indicate where in the oplog we should start for $changeStream.
* This token has the following format:
* {
* _data: String, A hex encoding of the binary generated by keystring encoding the clusterTime,
* version, txnOpIndex, UUID, then documentKey in that order.
* _typeBits: BinData - The keystring type bits used for deserialization.
* }
* The _data field data is encoded such that string comparisons provide the correct ordering of
* tokens. Unlike the BinData, this can be sorted correctly using a MongoDB sort. BinData
* unfortunately orders by the length of the data first, then by the contents.
*
* As an optimization, the _typeBits field may be missing and should not affect token comparison.
*/
class ResumeToken {
public:
constexpr static StringData kDataFieldName = "_data"_sd;
constexpr static StringData kTypeBitsFieldName = "_typeBits"_sd;
/**
* Parse a resume token from a BSON object; used as an interface to the IDL parser.
*/
static ResumeToken parse(const BSONObj& resumeBson) {
return ResumeToken::parse(Document(resumeBson));
}
static ResumeToken parse(const Document& document);
/**
* Generate a high-water-mark token for 'clusterTime', with no UUID or documentKey.
*/
static ResumeToken makeHighWaterMarkToken(Timestamp clusterTime);
/**
* Returns true if the given token data represents a valid high-water-mark resume token; that
* is, it does not refer to a specific operation, but instead specifies a clusterTime after
* which the stream should resume.
*/
static bool isHighWaterMarkToken(const ResumeTokenData& tokenData);
/**
* The default no-argument constructor is required by the IDL for types used as non-optional
* fields.
*/
ResumeToken() = default;
/**
* Parses 'resumeValue' into a ResumeToken using the hex-encoded string format.
*/
explicit ResumeToken(const ResumeTokenData& resumeValue);
Document toDocument() const;
/**
* Because we use the IDL we require a serializer. However, the serialization format depends on
* the feature compatibility version, so a serializer without an argument doesn't make sense.
* This should never be used.
*/
BSONObj toBSON_do_not_use() const {
MONGO_UNREACHABLE;
}
ResumeTokenData getData() const;
Timestamp getClusterTime() const {
return getData().clusterTime;
}
bool operator==(const ResumeToken&) const;
bool operator!=(const ResumeToken& other) const {
return !(*this == other);
}
friend std::ostream& operator<<(std::ostream& out, const ResumeToken& token) {
return out << token.getData();
}
private:
explicit ResumeToken(const Document& resumeData);
// This is the hex-encoded string encoding all the pieces of the resume token.
std::string _hexKeyString;
// Since we are using a KeyString encoding, we might lose some information about what the
// original types of the serialized values were. For example, the integer 2 and the double 2.0
// will generate the same KeyString. We keep the type bits around so we can deserialize without
// losing information.
Value _typeBits;
};
} // namespace mongo
|