diff options
author | Mindaugas Malinauskas <mindaugas.malinauskas@mongodb.com> | 2021-12-09 17:36:50 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-07 23:04:09 +0000 |
commit | 54c977ae2b278136a87f4dd46e81bed3d5224d8e (patch) | |
tree | 48cb83751603101af1473adcc665a050d931da11 /src/mongo/db/pipeline | |
parent | 1e6eb502e1754cfda0b39bf13605b73471641c70 (diff) | |
download | mongo-54c977ae2b278136a87f4dd46e81bed3d5224d8e.tar.gz |
SERVER-58694 Implement writing of pre-images for transactional update/replace/delete operations
Diffstat (limited to 'src/mongo/db/pipeline')
5 files changed, 30 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp index 68e2e6949f8..3bdbf3954c5 100644 --- a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp +++ b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp @@ -44,6 +44,10 @@ namespace mongo { void writeToChangeStreamPreImagesCollection(OperationContext* opCtx, const ChangeStreamPreImage& preImage) { const auto collectionNamespace = NamespaceString::kChangeStreamPreImagesNamespace; + tassert(5869404, + str::stream() << "Invalid pre-image document applyOpsIndex: " + << preImage.getId().getApplyOpsIndex(), + preImage.getId().getApplyOpsIndex() >= 0); // This lock acquisition can block on a stronger lock held by another operation modifying the // pre-images collection. There are no known cases where an operation holding an exclusive lock @@ -52,7 +56,9 @@ void writeToChangeStreamPreImagesCollection(OperationContext* opCtx, AutoGetCollection preimagesCollectionRaii(opCtx, collectionNamespace, LockMode::MODE_IX); UpdateResult res = Helpers::upsert(opCtx, collectionNamespace.toString(), preImage.toBSON()); tassert(5868601, - "Failed to insert a new document into pre-images collection", + str::stream() << "Failed to insert a new document into the pre-images collection: ts: " + << preImage.getId().getTs().toString() + << ", applyOpsIndex: " << preImage.getId().getApplyOpsIndex(), !res.existing && !res.upsertedId.isEmpty()); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 773b13c1050..efe83289131 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -174,6 +174,7 @@ public: static constexpr StringData kLsidField = "lsid"_sd; static constexpr StringData kTxnOpIndexField = "txnOpIndex"_sd; static constexpr StringData kApplyOpsIndexField = "applyOpsIndex"_sd; + static constexpr StringData kApplyOpsTsField = "applyOpsTs"_sd; static constexpr StringData kRawOplogUpdateSpecField = "rawOplogUpdateSpec"_sd; // The target namespace of a rename operation. diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp index 148e0c9a385..b59348fc4e5 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp @@ -350,6 +350,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document // unwinding a transaction. auto txnOpIndex = input[DocumentSourceChangeStream::kTxnOpIndexField]; auto applyOpsIndex = input[DocumentSourceChangeStream::kApplyOpsIndexField]; + auto applyOpsEntryTs = input[DocumentSourceChangeStream::kApplyOpsTsField]; // Add some additional fields only relevant to transactions. if (!txnOpIndex.missing()) { @@ -405,10 +406,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document } else { // Set 'kPreImageIdField' to the 'ChangeStreamPreImageId'. The DSCSAddPreImage stage // will use the id in order to fetch the pre-image from the pre-images collection. - const auto preImageId = - ChangeStreamPreImageId(uuid.getUuid(), - ts.getTimestamp(), - applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong()); + const auto preImageId = ChangeStreamPreImageId( + uuid.getUuid(), + applyOpsEntryTs.missing() ? ts.getTimestamp() : applyOpsEntryTs.getTimestamp(), + applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong()); doc.addField(DocumentSourceChangeStream::kPreImageIdField, Value(preImageId.toBSON())); } } @@ -447,9 +448,10 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString()); deps->fields.insert(DocumentSourceChangeStream::kTxnOpIndexField.toString()); - if (_preImageRequested) { + if (_preImageRequested || _postImageRequested) { deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString()); deps->fields.insert(DocumentSourceChangeStream::kApplyOpsIndexField.toString()); + deps->fields.insert(DocumentSourceChangeStream::kApplyOpsTsField.toString()); } return DepsTracker::State::EXHAUSTIVE_ALL; } diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp index 8e51e29e7b9..cdd07aa79b9 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp @@ -268,6 +268,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::TransactionO // Initialize iterators at the beginning of the transaction. _currentApplyOpsIt = _currentApplyOps.getArray().begin(); + _currentApplyOpsTs = firstTimestamp.getTimestamp(); _currentApplyOpsIndex = 0; _txnOpIndex = 0; } @@ -304,6 +305,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::getNextTrans BSONType::Array == bsonOp["applyOps"].type()); _currentApplyOps = Value(bsonOp["applyOps"]); + _currentApplyOpsTs = applyOpsEntry.getTimestamp(); _currentApplyOpsIt = _currentApplyOps.getArray().begin(); _currentApplyOpsIndex = 0; } @@ -338,6 +340,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::_addRequired // the current entry. newDoc.addField(DocumentSourceChangeStream::kApplyOpsIndexField, Value(static_cast<long long>(applyOpsIndex()))); + newDoc.addField(DocumentSourceChangeStream::kApplyOpsTsField, Value(applyOpsTs())); newDoc.addField(repl::OplogEntry::kTimestampFieldName, Value(_clusterTime)); newDoc.addField(repl::OplogEntry::kSessionIdFieldName, Value(_lsid)); diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h index a2659178d6b..f2b17259980 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h +++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h @@ -133,6 +133,15 @@ private: return _currentApplyOpsIndex - 1; } + /** + * Returns the timestamp of the "applyOps" entry containing the last operation returned by + * 'getNextTransactionOp()'. If 'getNextTransactionOp()' has not been called, returns the + * timestamp of the first "applyOps" entry in the transaction. + */ + Timestamp applyOpsTs() const { + return _currentApplyOpsTs; + } + Timestamp clusterTime() const { return _clusterTime; } @@ -194,6 +203,9 @@ private: // The index of the next entry within the current 'applyOps' array. size_t _currentApplyOpsIndex; + // The timestamp of the current 'applyOps' entry. + Timestamp _currentApplyOpsTs; + // Our current place within the entire transaction, which may consist of multiple 'applyOps' // arrays. size_t _txnOpIndex; |