summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
diff options
context:
space:
mode:
authorJustin Seyster <justin.seyster@mongodb.com>2019-05-29 19:35:34 -0400
committerJustin Seyster <justin.seyster@mongodb.com>2019-05-29 19:35:34 -0400
commit938fea1a5f5129f6acf404e6938ba0d56a54ac93 (patch)
tree5311ed6cfee55396ceea74d23cac14015e400ab1 /src/mongo/db/pipeline/document_source_change_stream_transform.cpp
parent97d1373931ac47dd2bc659dfd82a0086d58cf1dd (diff)
downloadmongo-938fea1a5f5129f6acf404e6938ba0d56a54ac93.tar.gz
SERVER-41182 Change streams support for transactions larger than 16MB
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream_transform.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp237
1 files changed, 161 insertions, 76 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 1220286da9e..0773a2fb347 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -141,19 +141,15 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts,
Value uuid,
Value documentKey) {
ResumeTokenData resumeTokenData;
- if (_txnContext) {
+ if (_txnIterator) {
// We're in the middle of unwinding an 'applyOps'.
// Use the clusterTime from the higher level applyOps
- resumeTokenData.clusterTime = _txnContext->clusterTime;
-
- // 'pos' points to the _next_ applyOps index, so we must subtract one to get the index of
- // the entry being examined right now.
- invariant(_txnContext->pos >= 1);
- resumeTokenData.applyOpsIndex = _txnContext->pos - 1;
+ resumeTokenData.clusterTime = _txnIterator->clusterTime();
+ resumeTokenData.txnOpIndex = _txnIterator->txnOpIndex();
} else {
resumeTokenData.clusterTime = ts.getTimestamp();
- resumeTokenData.applyOpsIndex = 0;
+ resumeTokenData.txnOpIndex = 0;
}
resumeTokenData.documentKey = documentKey;
@@ -328,10 +324,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
auto resumeToken = ResumeToken(resumeTokenData).toDocument();
// Add some additional fields only relevant to transactions.
- if (_txnContext) {
+ if (_txnIterator) {
doc.addField(DocumentSourceChangeStream::kTxnNumberField,
- Value(static_cast<long long>(_txnContext->txnNumber)));
- doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid));
+ Value(static_cast<long long>(_txnIterator->txnNumber())));
+ doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnIterator->lsid()));
}
doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken));
@@ -409,55 +405,6 @@ DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifi
return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}};
}
-void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Document& input) {
- // The only two commands we will see here are an applyOps or a commit, which both mean we
- // need to open a "transaction context" representing a group of updates that all occurred at
- // once as part of a transaction. If we already have a transaction context open, that would
- // mean we are looking at an applyOps or commit nested within an applyOps, which is not
- // allowed in the oplog.
- invariant(!_txnContext);
-
- Value lsid = input["lsid"];
- checkValueType(lsid, "lsid", BSONType::Object);
-
- Value txnNumber = input["txnNumber"];
- checkValueType(txnNumber, "txnNumber", BSONType::NumberLong);
-
- Value ts = input[repl::OplogEntry::kTimestampFieldName];
- Timestamp txnApplyTime = ts.getTimestamp();
-
- auto commandObj = input["o"].getDocument();
- Value applyOps = commandObj["applyOps"];
- if (!applyOps.missing()) {
- // An "applyOps" command represents an immediately-committed transaction. We place the
- // operations within the "applyOps" array directly into the transaction context.
- applyOps = input.getNestedField("o.applyOps");
- } else {
- invariant(!commandObj["commitTransaction"].missing());
-
- // A "commit" command is the second part of a transaction that has been split up into
- // two oplog entries. The lsid, txnNumber, and timestamp are in this entry, but the
- // "applyOps" array is in a previous entry, which we must look up.
- repl::OpTime opTime;
- uassertStatusOK(bsonExtractOpTimeField(input.toBson(), "prevOpTime", &opTime));
-
- auto applyOpsEntry =
- pExpCtx->mongoProcessInterface->lookUpOplogEntryByOpTime(pExpCtx->opCtx, opTime);
- invariant(applyOpsEntry.isCommand() &&
- (repl::OplogEntry::CommandType::kApplyOps == applyOpsEntry.getCommandType()));
- invariant(applyOpsEntry.shouldPrepare());
-
- auto bsonOp = applyOpsEntry.getOperationToApply();
- invariant(BSONType::Array == bsonOp["applyOps"].type());
- applyOps = Value(bsonOp["applyOps"]);
- }
-
- checkValueType(applyOps, "applyOps", BSONType::Array);
- invariant(applyOps.getArrayLength() > 0);
-
- _txnContext.emplace(applyOps, txnApplyTime, lsid.getDocument(), txnNumber.getLong());
-}
-
DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
pExpCtx->checkForInterrupt();
@@ -469,10 +416,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
while (1) {
// If we're unwinding an 'applyOps' from a transaction, check if there are any documents we
// have stored that can be returned.
- if (_txnContext) {
- if (auto next = extractNextApplyOpsEntry()) {
+ if (_txnIterator) {
+ if (auto next = _txnIterator->getNextTransactionOp(pExpCtx->opCtx)) {
return applyTransformation(*next);
}
+ _txnIterator = boost::none;
}
// Get the next input document.
@@ -498,16 +446,103 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() {
return applyTransformation(doc);
}
- initializeTransactionContext(doc);
+ // The only two commands we will see here are an applyOps or a commit, which both mean we
+ // need to open a "transaction context" representing a group of updates that all occurred at
+ // once as part of a transaction. If we already have a transaction context open, that would
+ // mean we are looking at an applyOps or commit nested within an applyOps, which is not
+ // allowed in the oplog.
+ invariant(!_txnIterator);
+ _txnIterator.emplace(pExpCtx->opCtx, pExpCtx->mongoProcessInterface, doc, *_nsRegex);
+
+ // Once we initialize the transaction iterator, we can loop back to the top in order to call
+ // 'getNextTransactionOp' on it. Note that is possible for the transaction iterator
+ // to be empty of any relevant operations, meaning that this loop may need to execute
+ // multiple times before it encounters a relevant change to return.
+ }
+}
+
+DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterator(
+ OperationContext* opCtx,
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface,
+ const Document& input,
+ const pcrecpp::RE& nsRegex)
+ : _mongoProcessInterface(mongoProcessInterface), _nsRegex(nsRegex) {
+ Value lsidValue = input["lsid"];
+ checkValueType(lsidValue, "lsid", BSONType::Object);
+ _lsid = lsidValue.getDocument();
+
+ Value txnNumberValue = input["txnNumber"];
+ checkValueType(txnNumberValue, "txnNumber", BSONType::NumberLong);
+ _txnNumber = txnNumberValue.getLong();
+
+ // We want to parse the OpTime out of this document using the BSON OpTime parser. Instead of
+ // converting the entire Document back to BSON, we convert only the fields we need.
+ repl::OpTime txnOpTime = repl::OpTime::parse(BSON(repl::OpTime::kTimestampFieldName
+ << input[repl::OpTime::kTimestampFieldName]
+ << repl::OpTime::kTermFieldName
+ << input[repl::OpTime::kTermFieldName]));
+ _clusterTime = txnOpTime.getTimestamp();
+
+ auto commandObj = input["o"].getDocument();
+ Value applyOps = commandObj["applyOps"];
+
+ if (!applyOps.missing()) {
+ // We found an applyOps that implicitly commits a transaction. We include it in the
+ // '_txnOplogEntries' stack of applyOps entries that the change stream should process as
+ // part of this transaction. There may be additional applyOps entries linked through the
+ // 'prevOpTime' field, which will also get added to '_txnOplogEntries' later in this
+ // function. Note that this style of transaction does not have a 'commitTransaction'
+ // command.
+ _txnOplogEntries.push(txnOpTime);
+ } else {
+ // This must be a "commitTransaction" command, which commits a prepared transaction. This
+ // style of transaction does not have an applyOps entry that implicitly commits it, as in
+ // the previous case. We're going to iterate through the other oplog entries in the
+ // transaction, but this entry does not have any updates in it, so we do not include it in
+ // the '_txnOplogEntries' stack.
+ invariant(!commandObj["commitTransaction"].missing());
+ }
- // Once we initialize the transaction context, we can loop back to the top in order to call
- // 'extractNextApplyOpsEntry' on it. Note that is possible for the transaction context to be
- // empty of any relevant operations, meaning that this loop may need to execute multiple
- // times before it encounters a relevant change to return.
+ if (BSONType::Object ==
+ input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getType()) {
+ // As with the 'txnOpTime' parsing above, we convert a portion of 'input' back to BSON in
+ // order to parse an OpTime, this time from the "prevOpTime" field.
+ repl::OpTime prevOpTime = repl::OpTime::parse(
+ input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getDocument().toBson());
+ _collectAllOpTimesFromTransaction(opCtx, prevOpTime);
}
+
+ // Pop the first OpTime off the stack and use it to load the first oplog entry into the
+ // '_currentApplyOps' field.
+ invariant(_txnOplogEntries.size() > 0);
+ const auto firstTimestamp = _txnOplogEntries.top();
+ _txnOplogEntries.pop();
+
+ if (firstTimestamp == txnOpTime) {
+ // This transaction consists of only one oplog entry, from which we have already extracted
+ // the "applyOps" array, so there is no need to do any more work.
+ invariant(_txnOplogEntries.size() == 0);
+ _currentApplyOps = std::move(applyOps);
+ } else {
+ // This transaction consists of multiple oplog entries; grab the chronologically first entry
+ // and extract its "applyOps" array.
+ auto firstApplyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, firstTimestamp);
+
+ auto bsonOp = firstApplyOpsEntry.getOperationToApply();
+ invariant(BSONType::Array == bsonOp["applyOps"].type());
+ _currentApplyOps = Value(bsonOp["applyOps"]);
+ }
+
+ checkValueType(_currentApplyOps, "applyOps", BSONType::Array);
+ invariant(_currentApplyOps.getArrayLength() > 0);
+
+ // Initialize iterators at the beginning of the transaction.
+ _currentApplyOpsIt = _currentApplyOps.getArray().begin();
+ _txnOpIndex = 0;
}
-bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) {
+bool DocumentSourceChangeStreamTransform::TransactionOpIterator::_isDocumentRelevant(
+ const Document& d) const {
invariant(
d["op"].getType() == BSONType::String,
str::stream()
@@ -519,21 +554,71 @@ bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d)
Value nsField = d["ns"];
invariant(!nsField.missing());
- return _nsRegex->PartialMatch(nsField.getString());
+ return _nsRegex.PartialMatch(nsField.getString());
}
-boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() {
+boost::optional<Document>
+DocumentSourceChangeStreamTransform::TransactionOpIterator::getNextTransactionOp(
+ OperationContext* opCtx) {
+ while (true) {
+ while (_currentApplyOpsIt != _currentApplyOps.getArray().end()) {
+ Document d = (_currentApplyOpsIt++)->getDocument();
+ ++_txnOpIndex;
+ if (_isDocumentRelevant(d)) {
+ return d;
+ }
+ }
- while (_txnContext && _txnContext->pos < _txnContext->arr.size()) {
- Document d = _txnContext->arr[_txnContext->pos++].getDocument();
- if (isDocumentRelevant(d)) {
- return d;
+ if (_txnOplogEntries.empty()) {
+ // There are no more operations in this transaction.
+ return boost::none;
}
+
+ // We've processed all the operations in the previous applyOps entry, but we have a new one
+ // to process.
+ auto applyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, _txnOplogEntries.top());
+ _txnOplogEntries.pop();
+
+ auto bsonOp = applyOpsEntry.getOperationToApply();
+ invariant(BSONType::Array == bsonOp["applyOps"].type());
+
+ _currentApplyOps = Value(bsonOp["applyOps"]);
+ _currentApplyOpsIt = _currentApplyOps.getArray().begin();
}
+}
+
+repl::OplogEntry
+DocumentSourceChangeStreamTransform::TransactionOpIterator::_lookUpOplogEntryByOpTime(
+ OperationContext* opCtx, repl::OpTime lookupTime) const {
+ invariant(!lookupTime.isNull());
+
+ std::unique_ptr<TransactionHistoryIteratorBase> iterator(
+ _mongoProcessInterface->createTransactionHistoryIterator(lookupTime));
+ try {
+ return iterator->next(opCtx);
+ } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) {
+ ex.addContext(
+ "Oplog no longer has history necessary for $changeStream to observe operations from a "
+ "committed transaction.");
+ uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason());
+ }
+}
- _txnContext = boost::none;
+void DocumentSourceChangeStreamTransform::TransactionOpIterator::_collectAllOpTimesFromTransaction(
+ OperationContext* opCtx, repl::OpTime firstOpTime) {
+ std::unique_ptr<TransactionHistoryIteratorBase> iterator(
+ _mongoProcessInterface->createTransactionHistoryIterator(firstOpTime));
- return boost::none;
+ try {
+ while (iterator->hasNext()) {
+ _txnOplogEntries.push(iterator->nextOpTime(opCtx));
+ }
+ } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) {
+ ex.addContext(
+ "Oplog no longer has history necessary for $changeStream to observe operations from a "
+ "committed transaction.");
+ uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason());
+ }
}
} // namespace mongo