summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline
diff options
context:
space:
mode:
authorMindaugas Malinauskas <mindaugas.malinauskas@mongodb.com>2021-12-09 17:36:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-07 23:04:09 +0000
commit54c977ae2b278136a87f4dd46e81bed3d5224d8e (patch)
tree48cb83751603101af1473adcc665a050d931da11 /src/mongo/db/pipeline
parent1e6eb502e1754cfda0b39bf13605b73471641c70 (diff)
downloadmongo-54c977ae2b278136a87f4dd46e81bed3d5224d8e.tar.gz
SERVER-58694 Implement writing of pre-images for transactional update/replace/delete operations
Diffstat (limited to 'src/mongo/db/pipeline')
-rw-r--r--src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp8
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h1
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_transform.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp3
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h12
5 files changed, 30 insertions, 6 deletions
diff --git a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
index 68e2e6949f8..3bdbf3954c5 100644
--- a/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
+++ b/src/mongo/db/pipeline/change_stream_pre_image_helpers.cpp
@@ -44,6 +44,10 @@ namespace mongo {
void writeToChangeStreamPreImagesCollection(OperationContext* opCtx,
const ChangeStreamPreImage& preImage) {
const auto collectionNamespace = NamespaceString::kChangeStreamPreImagesNamespace;
+ tassert(5869404,
+ str::stream() << "Invalid pre-image document applyOpsIndex: "
+ << preImage.getId().getApplyOpsIndex(),
+ preImage.getId().getApplyOpsIndex() >= 0);
// This lock acquisition can block on a stronger lock held by another operation modifying the
// pre-images collection. There are no known cases where an operation holding an exclusive lock
@@ -52,7 +56,9 @@ void writeToChangeStreamPreImagesCollection(OperationContext* opCtx,
AutoGetCollection preimagesCollectionRaii(opCtx, collectionNamespace, LockMode::MODE_IX);
UpdateResult res = Helpers::upsert(opCtx, collectionNamespace.toString(), preImage.toBSON());
tassert(5868601,
- "Failed to insert a new document into pre-images collection",
+ str::stream() << "Failed to insert a new document into the pre-images collection: ts: "
+ << preImage.getId().getTs().toString()
+ << ", applyOpsIndex: " << preImage.getId().getApplyOpsIndex(),
!res.existing && !res.upsertedId.isEmpty());
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 773b13c1050..efe83289131 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -174,6 +174,7 @@ public:
static constexpr StringData kLsidField = "lsid"_sd;
static constexpr StringData kTxnOpIndexField = "txnOpIndex"_sd;
static constexpr StringData kApplyOpsIndexField = "applyOpsIndex"_sd;
+ static constexpr StringData kApplyOpsTsField = "applyOpsTs"_sd;
static constexpr StringData kRawOplogUpdateSpecField = "rawOplogUpdateSpec"_sd;
// The target namespace of a rename operation.
diff --git a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
index 148e0c9a385..b59348fc4e5 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_transform.cpp
@@ -350,6 +350,7 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
// unwinding a transaction.
auto txnOpIndex = input[DocumentSourceChangeStream::kTxnOpIndexField];
auto applyOpsIndex = input[DocumentSourceChangeStream::kApplyOpsIndexField];
+ auto applyOpsEntryTs = input[DocumentSourceChangeStream::kApplyOpsTsField];
// Add some additional fields only relevant to transactions.
if (!txnOpIndex.missing()) {
@@ -405,10 +406,10 @@ Document DocumentSourceChangeStreamTransform::applyTransformation(const Document
} else {
// Set 'kPreImageIdField' to the 'ChangeStreamPreImageId'. The DSCSAddPreImage stage
// will use the id in order to fetch the pre-image from the pre-images collection.
- const auto preImageId =
- ChangeStreamPreImageId(uuid.getUuid(),
- ts.getTimestamp(),
- applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong());
+ const auto preImageId = ChangeStreamPreImageId(
+ uuid.getUuid(),
+ applyOpsEntryTs.missing() ? ts.getTimestamp() : applyOpsEntryTs.getTimestamp(),
+ applyOpsIndex.missing() ? 0 : applyOpsIndex.getLong());
doc.addField(DocumentSourceChangeStream::kPreImageIdField, Value(preImageId.toBSON()));
}
}
@@ -447,9 +448,10 @@ DepsTracker::State DocumentSourceChangeStreamTransform::getDependencies(DepsTrac
deps->fields.insert(repl::OplogEntry::kTxnNumberFieldName.toString());
deps->fields.insert(DocumentSourceChangeStream::kTxnOpIndexField.toString());
- if (_preImageRequested) {
+ if (_preImageRequested || _postImageRequested) {
deps->fields.insert(repl::OplogEntry::kPreImageOpTimeFieldName.toString());
deps->fields.insert(DocumentSourceChangeStream::kApplyOpsIndexField.toString());
+ deps->fields.insert(DocumentSourceChangeStream::kApplyOpsTsField.toString());
}
return DepsTracker::State::EXHAUSTIVE_ALL;
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
index 8e51e29e7b9..cdd07aa79b9 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.cpp
@@ -268,6 +268,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::TransactionO
// Initialize iterators at the beginning of the transaction.
_currentApplyOpsIt = _currentApplyOps.getArray().begin();
+ _currentApplyOpsTs = firstTimestamp.getTimestamp();
_currentApplyOpsIndex = 0;
_txnOpIndex = 0;
}
@@ -304,6 +305,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::getNextTrans
BSONType::Array == bsonOp["applyOps"].type());
_currentApplyOps = Value(bsonOp["applyOps"]);
+ _currentApplyOpsTs = applyOpsEntry.getTimestamp();
_currentApplyOpsIt = _currentApplyOps.getArray().begin();
_currentApplyOpsIndex = 0;
}
@@ -338,6 +340,7 @@ DocumentSourceChangeStreamUnwindTransaction::TransactionOpIterator::_addRequired
// the current entry.
newDoc.addField(DocumentSourceChangeStream::kApplyOpsIndexField,
Value(static_cast<long long>(applyOpsIndex())));
+ newDoc.addField(DocumentSourceChangeStream::kApplyOpsTsField, Value(applyOpsTs()));
newDoc.addField(repl::OplogEntry::kTimestampFieldName, Value(_clusterTime));
newDoc.addField(repl::OplogEntry::kSessionIdFieldName, Value(_lsid));
diff --git a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
index a2659178d6b..f2b17259980 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
+++ b/src/mongo/db/pipeline/document_source_change_stream_unwind_transaction.h
@@ -133,6 +133,15 @@ private:
return _currentApplyOpsIndex - 1;
}
+ /**
+ * Returns the timestamp of the "applyOps" entry containing the last operation returned by
+ * 'getNextTransactionOp()'. If 'getNextTransactionOp()' has not been called, returns the
+ * timestamp of the first "applyOps" entry in the transaction.
+ */
+ Timestamp applyOpsTs() const {
+ return _currentApplyOpsTs;
+ }
+
Timestamp clusterTime() const {
return _clusterTime;
}
@@ -194,6 +203,9 @@ private:
// The index of the next entry within the current 'applyOps' array.
size_t _currentApplyOpsIndex;
+ // The timestamp of the current 'applyOps' entry.
+ Timestamp _currentApplyOpsTs;
+
// Our current place within the entire transaction, which may consist of multiple 'applyOps'
// arrays.
size_t _txnOpIndex;