diff options
author | Justin Seyster <justin.seyster@mongodb.com> | 2019-05-29 19:35:34 -0400 |
---|---|---|
committer | Justin Seyster <justin.seyster@mongodb.com> | 2019-05-29 19:35:34 -0400 |
commit | 938fea1a5f5129f6acf404e6938ba0d56a54ac93 (patch) | |
tree | 5311ed6cfee55396ceea74d23cac14015e400ab1 /src/mongo/db/pipeline/document_source_change_stream_transform.cpp | |
parent | 97d1373931ac47dd2bc659dfd82a0086d58cf1dd (diff) | |
download | mongo-938fea1a5f5129f6acf404e6938ba0d56a54ac93.tar.gz |
SERVER-41182 Change streams support for transactions larger than 16MB
Diffstat (limited to 'src/mongo/db/pipeline/document_source_change_stream_transform.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream_transform.cpp | 237 |
1 files changed, 161 insertions, 76 deletions
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 1220286da9e..0773a2fb347 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -141,19 +141,15 @@ ResumeTokenData DocumentSourceChangeStreamTransform::getResumeToken(Value ts, Value uuid, Value documentKey) { ResumeTokenData resumeTokenData; - if (_txnContext) { + if (_txnIterator) { // We're in the middle of unwinding an 'applyOps'. // Use the clusterTime from the higher level applyOps - resumeTokenData.clusterTime = _txnContext->clusterTime; - - // 'pos' points to the _next_ applyOps index, so we must subtract one to get the index of - // the entry being examined right now. - invariant(_txnContext->pos >= 1); - resumeTokenData.applyOpsIndex = _txnContext->pos - 1; + resumeTokenData.clusterTime = _txnIterator->clusterTime(); + resumeTokenData.txnOpIndex = _txnIterator->txnOpIndex(); } else { resumeTokenData.clusterTime = ts.getTimestamp(); - resumeTokenData.applyOpsIndex = 0; + resumeTokenData.txnOpIndex = 0; } resumeTokenData.documentKey = documentKey; @@ -328,10 +324,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document auto resumeToken = ResumeToken(resumeTokenData).toDocument(); // Add some additional fields only relevant to transactions. - if (_txnContext) { + if (_txnIterator) { doc.addField(DocumentSourceChangeStream::kTxnNumberField, - Value(static_cast<long long>(_txnContext->txnNumber))); - doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnContext->lsid)); + Value(static_cast<long long>(_txnIterator->txnNumber()))); + doc.addField(DocumentSourceChangeStream::kLsidField, Value(_txnIterator->lsid())); } doc.addField(DocumentSourceChangeStream::kIdField, Value(resumeToken)); @@ -409,55 +405,6 @@ DocumentSource::GetModPathsReturn DocumentSourceChangeStreamTransform::getModifi return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<string>{}, {}}; } -void DocumentSourceChangeStreamTransform::initializeTransactionContext(const Document& input) { - // The only two commands we will see here are an applyOps or a commit, which both mean we - // need to open a "transaction context" representing a group of updates that all occurred at - // once as part of a transaction. If we already have a transaction context open, that would - // mean we are looking at an applyOps or commit nested within an applyOps, which is not - // allowed in the oplog. - invariant(!_txnContext); - - Value lsid = input["lsid"]; - checkValueType(lsid, "lsid", BSONType::Object); - - Value txnNumber = input["txnNumber"]; - checkValueType(txnNumber, "txnNumber", BSONType::NumberLong); - - Value ts = input[repl::OplogEntry::kTimestampFieldName]; - Timestamp txnApplyTime = ts.getTimestamp(); - - auto commandObj = input["o"].getDocument(); - Value applyOps = commandObj["applyOps"]; - if (!applyOps.missing()) { - // An "applyOps" command represents an immediately-committed transaction. We place the - // operations within the "applyOps" array directly into the transaction context. - applyOps = input.getNestedField("o.applyOps"); - } else { - invariant(!commandObj["commitTransaction"].missing()); - - // A "commit" command is the second part of a transaction that has been split up into - // two oplog entries. The lsid, txnNumber, and timestamp are in this entry, but the - // "applyOps" array is in a previous entry, which we must look up. - repl::OpTime opTime; - uassertStatusOK(bsonExtractOpTimeField(input.toBson(), "prevOpTime", &opTime)); - - auto applyOpsEntry = - pExpCtx->mongoProcessInterface->lookUpOplogEntryByOpTime(pExpCtx->opCtx, opTime); - invariant(applyOpsEntry.isCommand() && - (repl::OplogEntry::CommandType::kApplyOps == applyOpsEntry.getCommandType())); - invariant(applyOpsEntry.shouldPrepare()); - - auto bsonOp = applyOpsEntry.getOperationToApply(); - invariant(BSONType::Array == bsonOp["applyOps"].type()); - applyOps = Value(bsonOp["applyOps"]); - } - - checkValueType(applyOps, "applyOps", BSONType::Array); - invariant(applyOps.getArrayLength() > 0); - - _txnContext.emplace(applyOps, txnApplyTime, lsid.getDocument(), txnNumber.getLong()); -} - DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { pExpCtx->checkForInterrupt(); @@ -469,10 +416,11 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { while (1) { // If we're unwinding an 'applyOps' from a transaction, check if there are any documents we // have stored that can be returned. - if (_txnContext) { - if (auto next = extractNextApplyOpsEntry()) { + if (_txnIterator) { + if (auto next = _txnIterator->getNextTransactionOp(pExpCtx->opCtx)) { return applyTransformation(*next); } + _txnIterator = boost::none; } // Get the next input document. @@ -498,16 +446,103 @@ DocumentSource::GetNextResult DocumentSourceChangeStreamTransform::getNext() { return applyTransformation(doc); } - initializeTransactionContext(doc); + // The only two commands we will see here are an applyOps or a commit, which both mean we + // need to open a "transaction context" representing a group of updates that all occurred at + // once as part of a transaction. If we already have a transaction context open, that would + // mean we are looking at an applyOps or commit nested within an applyOps, which is not + // allowed in the oplog. + invariant(!_txnIterator); + _txnIterator.emplace(pExpCtx->opCtx, pExpCtx->mongoProcessInterface, doc, *_nsRegex); + + // Once we initialize the transaction iterator, we can loop back to the top in order to call + // 'getNextTransactionOp' on it. Note that is possible for the transaction iterator + // to be empty of any relevant operations, meaning that this loop may need to execute + // multiple times before it encounters a relevant change to return. + } +} + +DocumentSourceChangeStreamTransform::TransactionOpIterator::TransactionOpIterator( + OperationContext* opCtx, + std::shared_ptr<MongoProcessInterface> mongoProcessInterface, + const Document& input, + const pcrecpp::RE& nsRegex) + : _mongoProcessInterface(mongoProcessInterface), _nsRegex(nsRegex) { + Value lsidValue = input["lsid"]; + checkValueType(lsidValue, "lsid", BSONType::Object); + _lsid = lsidValue.getDocument(); + + Value txnNumberValue = input["txnNumber"]; + checkValueType(txnNumberValue, "txnNumber", BSONType::NumberLong); + _txnNumber = txnNumberValue.getLong(); + + // We want to parse the OpTime out of this document using the BSON OpTime parser. Instead of + // converting the entire Document back to BSON, we convert only the fields we need. + repl::OpTime txnOpTime = repl::OpTime::parse(BSON(repl::OpTime::kTimestampFieldName + << input[repl::OpTime::kTimestampFieldName] + << repl::OpTime::kTermFieldName + << input[repl::OpTime::kTermFieldName])); + _clusterTime = txnOpTime.getTimestamp(); + + auto commandObj = input["o"].getDocument(); + Value applyOps = commandObj["applyOps"]; + + if (!applyOps.missing()) { + // We found an applyOps that implicitly commits a transaction. We include it in the + // '_txnOplogEntries' stack of applyOps entries that the change stream should process as + // part of this transaction. There may be additional applyOps entries linked through the + // 'prevOpTime' field, which will also get added to '_txnOplogEntries' later in this + // function. Note that this style of transaction does not have a 'commitTransaction' + // command. + _txnOplogEntries.push(txnOpTime); + } else { + // This must be a "commitTransaction" command, which commits a prepared transaction. This + // style of transaction does not have an applyOps entry that implicitly commits it, as in + // the previous case. We're going to iterate through the other oplog entries in the + // transaction, but this entry does not have any updates in it, so we do not include it in + // the '_txnOplogEntries' stack. + invariant(!commandObj["commitTransaction"].missing()); + } - // Once we initialize the transaction context, we can loop back to the top in order to call - // 'extractNextApplyOpsEntry' on it. Note that is possible for the transaction context to be - // empty of any relevant operations, meaning that this loop may need to execute multiple - // times before it encounters a relevant change to return. + if (BSONType::Object == + input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getType()) { + // As with the 'txnOpTime' parsing above, we convert a portion of 'input' back to BSON in + // order to parse an OpTime, this time from the "prevOpTime" field. + repl::OpTime prevOpTime = repl::OpTime::parse( + input[repl::OplogEntry::kPrevWriteOpTimeInTransactionFieldName].getDocument().toBson()); + _collectAllOpTimesFromTransaction(opCtx, prevOpTime); } + + // Pop the first OpTime off the stack and use it to load the first oplog entry into the + // '_currentApplyOps' field. + invariant(_txnOplogEntries.size() > 0); + const auto firstTimestamp = _txnOplogEntries.top(); + _txnOplogEntries.pop(); + + if (firstTimestamp == txnOpTime) { + // This transaction consists of only one oplog entry, from which we have already extracted + // the "applyOps" array, so there is no need to do any more work. + invariant(_txnOplogEntries.size() == 0); + _currentApplyOps = std::move(applyOps); + } else { + // This transaction consists of multiple oplog entries; grab the chronologically first entry + // and extract its "applyOps" array. + auto firstApplyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, firstTimestamp); + + auto bsonOp = firstApplyOpsEntry.getOperationToApply(); + invariant(BSONType::Array == bsonOp["applyOps"].type()); + _currentApplyOps = Value(bsonOp["applyOps"]); + } + + checkValueType(_currentApplyOps, "applyOps", BSONType::Array); + invariant(_currentApplyOps.getArrayLength() > 0); + + // Initialize iterators at the beginning of the transaction. + _currentApplyOpsIt = _currentApplyOps.getArray().begin(); + _txnOpIndex = 0; } -bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) { +bool DocumentSourceChangeStreamTransform::TransactionOpIterator::_isDocumentRelevant( + const Document& d) const { invariant( d["op"].getType() == BSONType::String, str::stream() @@ -519,21 +554,71 @@ bool DocumentSourceChangeStreamTransform::isDocumentRelevant(const Document& d) Value nsField = d["ns"]; invariant(!nsField.missing()); - return _nsRegex->PartialMatch(nsField.getString()); + return _nsRegex.PartialMatch(nsField.getString()); } -boost::optional<Document> DocumentSourceChangeStreamTransform::extractNextApplyOpsEntry() { +boost::optional<Document> +DocumentSourceChangeStreamTransform::TransactionOpIterator::getNextTransactionOp( + OperationContext* opCtx) { + while (true) { + while (_currentApplyOpsIt != _currentApplyOps.getArray().end()) { + Document d = (_currentApplyOpsIt++)->getDocument(); + ++_txnOpIndex; + if (_isDocumentRelevant(d)) { + return d; + } + } - while (_txnContext && _txnContext->pos < _txnContext->arr.size()) { - Document d = _txnContext->arr[_txnContext->pos++].getDocument(); - if (isDocumentRelevant(d)) { - return d; + if (_txnOplogEntries.empty()) { + // There are no more operations in this transaction. + return boost::none; } + + // We've processed all the operations in the previous applyOps entry, but we have a new one + // to process. + auto applyOpsEntry = _lookUpOplogEntryByOpTime(opCtx, _txnOplogEntries.top()); + _txnOplogEntries.pop(); + + auto bsonOp = applyOpsEntry.getOperationToApply(); + invariant(BSONType::Array == bsonOp["applyOps"].type()); + + _currentApplyOps = Value(bsonOp["applyOps"]); + _currentApplyOpsIt = _currentApplyOps.getArray().begin(); } +} + +repl::OplogEntry +DocumentSourceChangeStreamTransform::TransactionOpIterator::_lookUpOplogEntryByOpTime( + OperationContext* opCtx, repl::OpTime lookupTime) const { + invariant(!lookupTime.isNull()); + + std::unique_ptr<TransactionHistoryIteratorBase> iterator( + _mongoProcessInterface->createTransactionHistoryIterator(lookupTime)); + try { + return iterator->next(opCtx); + } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) { + ex.addContext( + "Oplog no longer has history necessary for $changeStream to observe operations from a " + "committed transaction."); + uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason()); + } +} - _txnContext = boost::none; +void DocumentSourceChangeStreamTransform::TransactionOpIterator::_collectAllOpTimesFromTransaction( + OperationContext* opCtx, repl::OpTime firstOpTime) { + std::unique_ptr<TransactionHistoryIteratorBase> iterator( + _mongoProcessInterface->createTransactionHistoryIterator(firstOpTime)); - return boost::none; + try { + while (iterator->hasNext()) { + _txnOplogEntries.push(iterator->nextOpTime(opCtx)); + } + } catch (ExceptionFor<ErrorCodes::IncompleteTransactionHistory>& ex) { + ex.addContext( + "Oplog no longer has history necessary for $changeStream to observe operations from a " + "committed transaction."); + uasserted(ErrorCodes::ChangeStreamHistoryLost, ex.reason()); + } } } // namespace mongo |