summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2021-03-16 15:26:22 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-16 20:02:29 +0000
commit133d07b3e9e8dfb2aacb52dc2174bee38d791aec (patch)
treeb9534e2e7722ae853d1bac90ad82b2c11104e0c1 /src/mongo
parentf7a7901fc4b02e843cb23285ed28d08f14097ca7 (diff)
downloadmongo-133d07b3e9e8dfb2aacb52dc2174bee38d791aec.tar.gz
SERVER-55194 Allow multiple statement ids per oplog entry
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/catalog/collection.h2
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp2
-rw-r--r--src/mongo/db/exec/update_stage.cpp2
-rw-r--r--src/mongo/db/exec/upsert_stage.cpp10
-rw-r--r--src/mongo/db/op_observer_impl.cpp17
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp18
-rw-r--r--src/mongo/db/ops/update_request.h44
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp35
-rw-r--r--src/mongo/db/ops/write_ops_exec.h22
-rw-r--r--src/mongo/db/ops/write_ops_retryability_test.cpp2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp8
-rw-r--r--src/mongo/db/repl/apply_ops_test.cpp2
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp16
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp2
-rw-r--r--src/mongo/db/repl/multiapplier_test.cpp2
-rw-r--r--src/mongo/db/repl/oplog.cpp18
-rw-r--r--src/mongo/db/repl/oplog.h25
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp182
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp2
-rw-r--r--src/mongo/db/repl/oplog_batcher_test_fixture.cpp10
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp16
-rw-r--r--src/mongo/db/repl/oplog_entry.h31
-rw-r--r--src/mongo/db/repl/oplog_entry.idl7
-rw-r--r--src/mongo/db/repl/oplog_entry_test_helpers.cpp28
-rw-r--r--src/mongo/db/repl/oplog_entry_test_helpers.h10
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_recovery_test.cpp2
-rw-r--r--src/mongo/db/repl/rollback_impl_test.cpp4
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp2
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp2
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service_test.cpp2
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp19
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp7
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp92
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp11
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp143
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp13
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp82
-rw-r--r--src/mongo/db/transaction_history_iterator_test.cpp2
-rw-r--r--src/mongo/db/transaction_participant.cpp31
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp57
-rw-r--r--src/mongo/db/transaction_participant_test.cpp4
-rw-r--r--src/mongo/dbtests/repltests.cpp2
45 files changed, 538 insertions, 457 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index c6559d0ea4f..509d7cbb615 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -74,7 +74,7 @@ class RecordCursor;
struct CollectionUpdateArgs {
enum class StoreDocOption { None, PreImage, PostImage };
- StmtId stmtId = kUninitializedStmtId;
+ std::vector<StmtId> stmtIds = {kUninitializedStmtId};
// The document before modifiers were applied.
boost::optional<BSONObj> preImageDoc;
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 6ebec13d59c..aec89581a17 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -669,7 +669,7 @@ write_ops::FindAndModifyReply CmdFindAndModify::Invocation::typedRun(OperationCo
makeUpdateRequest(opCtx, req, verbosity, &updateRequest);
if (opCtx->getTxnNumber()) {
- updateRequest.setStmtId(stmtId);
+ updateRequest.setStmtIds({stmtId});
}
const ExtensionsCallbackReal extensionsCallback(
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index 02a68336ce0..689a00bcdb1 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -246,7 +246,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
CollectionUpdateArgs args;
if (!request->explain()) {
- args.stmtId = request->getStmtId();
+ args.stmtIds = request->getStmtIds();
args.update = logObj;
if (_isUserInitiatedWrite) {
args.criteria = CollectionShardingState::get(opCtx(), collection()->ns())
diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp
index fe9a18b67c0..4f7403d28d5 100644
--- a/src/mongo/db/exec/upsert_stage.cpp
+++ b/src/mongo/db/exec/upsert_stage.cpp
@@ -148,11 +148,11 @@ void UpsertStage::_performInsert(BSONObj newDocument) {
writeConflictRetry(opCtx(), "upsert", collection()->ns().ns(), [&] {
WriteUnitOfWork wunit(opCtx());
- uassertStatusOK(
- collection()->insertDocument(opCtx(),
- InsertStatement(_params.request->getStmtId(), newDocument),
- _params.opDebug,
- _params.request->isFromMigration()));
+ uassertStatusOK(collection()->insertDocument(
+ opCtx(),
+ InsertStatement(_params.request->getStmtIds(), newDocument),
+ _params.opDebug,
+ _params.request->isFromMigration()));
// Technically, we should save/restore state here, but since we are going to return
// immediately after, it would just be wasted work.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 118821ab2e5..f065a25fcde 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -165,7 +165,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
oplogEntry.setUuid(args.uuid);
repl::OplogLink oplogLink;
- repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtIds);
OpTimeBundle opTimes;
// We never want to store pre- or post- images when we're migrating oplog entries from another
@@ -202,7 +202,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
oplogEntry.setObject2(args.updateArgs.criteria);
oplogEntry.setFromMigrateIfTrue(args.updateArgs.fromMigrate);
// oplogLink could have been changed to include pre/postImageOpTime by the previous no-op write.
- repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, args.updateArgs.stmtIds);
if (args.updateArgs.oplogSlot) {
oplogEntry.setOpTime(*args.updateArgs.oplogSlot);
}
@@ -226,7 +226,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
oplogEntry.setDestinedRecipient(destinedRecipientDecoration(opCtx));
repl::OplogLink oplogLink;
- repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, {stmtId});
OpTimeBundle opTimes;
// We never want to store pre-images when we're migrating oplog entries from another
@@ -245,7 +245,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
oplogEntry.setObject(documentKeyDecoration(opCtx).get().getShardKeyAndId());
oplogEntry.setFromMigrateIfTrue(fromMigrate);
// oplogLink could have been changed to include preImageOpTime by the previous no-op write.
- repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, stmtId);
+ repl::appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, {stmtId});
opTimes.writeOpTime = logOperation(opCtx, &oplogEntry);
opTimes.wallClockTime = oplogEntry.getWallClockTime();
return opTimes;
@@ -484,10 +484,9 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
times.insert(end(times), begin(opTimeList), end(opTimeList));
std::vector<StmtId> stmtIdsWritten;
- std::transform(first,
- last,
- std::back_inserter(stmtIdsWritten),
- [](const InsertStatement& stmt) { return stmt.stmtId; });
+ std::for_each(first, last, [&](const InsertStatement& stmt) {
+ stmtIdsWritten.insert(stmtIdsWritten.end(), stmt.stmtIds.begin(), stmt.stmtIds.end());
+ });
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(lastOpTime);
@@ -575,7 +574,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setLastWriteOpTime(opTime.writeOpTime);
sessionTxnRecord.setLastWriteDate(opTime.wallClockTime);
- onWriteOpCompleted(opCtx, std::vector<StmtId>{args.updateArgs.stmtId}, sessionTxnRecord);
+ onWriteOpCompleted(opCtx, args.updateArgs.stmtIds, sessionTxnRecord);
}
if (args.nss != NamespaceString::kSessionTransactionsTableNamespace) {
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index f0baa841457..f12906e475c 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -777,7 +777,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
opObserver().onInserts(opCtx(), nss1, uuid1, inserts1.begin(), inserts1.end(), false);
CollectionUpdateArgs updateArgs2;
- updateArgs2.stmtId = 1;
+ updateArgs2.stmtIds = {1};
updateArgs2.updatedDoc = BSON("_id" << 0 << "data"
<< "y");
updateArgs2.update = BSON("$set" << BSON("data"
@@ -1236,7 +1236,7 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
txnParticipant.unstashTransactionResources(opCtx(), "update");
CollectionUpdateArgs updateArgs1;
- updateArgs1.stmtId = 0;
+ updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
<< "x");
updateArgs1.update = BSON("$set" << BSON("data"
@@ -1245,7 +1245,7 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
OplogUpdateEntryArgs update1(std::move(updateArgs1), nss1, uuid1);
CollectionUpdateArgs updateArgs2;
- updateArgs2.stmtId = 1;
+ updateArgs2.stmtIds = {1};
updateArgs2.updatedDoc = BSON("_id" << 1 << "data"
<< "y");
updateArgs2.update = BSON("$set" << BSON("data"
@@ -1438,7 +1438,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
txnParticipant.unstashTransactionResources(opCtx(), "update");
CollectionUpdateArgs updateArgs1;
- updateArgs1.stmtId = 0;
+ updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
<< "x");
updateArgs1.update = BSON("$set" << BSON("data"
@@ -1447,7 +1447,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
OplogUpdateEntryArgs update1(std::move(updateArgs1), nss1, uuid1);
CollectionUpdateArgs updateArgs2;
- updateArgs2.stmtId = 1;
+ updateArgs2.stmtIds = {1};
updateArgs2.updatedDoc = BSON("_id" << 1 << "data"
<< "y");
updateArgs2.update = BSON("$set" << BSON("data"
@@ -1514,7 +1514,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionPreImageTest) {
<< "x");
const auto updateFilter = BSON("_id" << 0);
- updateArgs1.stmtId = 0;
+ updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = updatePostImage;
updateArgs1.update = updateSpec;
updateArgs1.preImageDoc = updatePreImage;
@@ -1588,7 +1588,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPreImageTest) {
<< "x");
const auto updateFilter = BSON("_id" << 0);
- updateArgs1.stmtId = 0;
+ updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = updatePostImage;
updateArgs1.update = updateSpec;
updateArgs1.preImageDoc = updatePreImage;
@@ -1798,7 +1798,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
txnParticipant.unstashTransactionResources(opCtx(), "update");
CollectionUpdateArgs updateArgs1;
- updateArgs1.stmtId = 0;
+ updateArgs1.stmtIds = {0};
updateArgs1.updatedDoc = BSON("_id" << 0 << "data"
<< "x");
updateArgs1.update = BSON("$set" << BSON("data"
@@ -1807,7 +1807,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
OplogUpdateEntryArgs update1(std::move(updateArgs1), nss1, uuid1);
CollectionUpdateArgs updateArgs2;
- updateArgs2.stmtId = 1;
+ updateArgs2.stmtIds = {1};
updateArgs2.updatedDoc = BSON("_id" << 1 << "data"
<< "y");
updateArgs2.update = BSON("$set" << BSON("data"
diff --git a/src/mongo/db/ops/update_request.h b/src/mongo/db/ops/update_request.h
index a77945e601a..43060673b82 100644
--- a/src/mongo/db/ops/update_request.h
+++ b/src/mongo/db/ops/update_request.h
@@ -43,6 +43,20 @@ namespace mongo {
namespace {
const std::vector<BSONObj> emptyArrayFilters{};
const BSONObj emptyCollation{};
+
+template <typename T>
+void appendArrayToString(const T& arr, StringBuilder* builder) {
+ bool first = true;
+ *builder << "[";
+ for (const auto& elem : arr) {
+ if (!first) {
+ *builder << ", ";
+ }
+ first = false;
+ *builder << elem;
+ }
+ *builder << "]";
+}
} // namespace
class FieldRef;
@@ -235,12 +249,12 @@ public:
return _yieldPolicy;
}
- void setStmtId(StmtId stmtId) {
- _stmtId = std::move(stmtId);
+ void setStmtIds(std::vector<StmtId> stmtIds) {
+ _stmtIds = std::move(stmtIds);
}
- StmtId getStmtId() const {
- return _stmtId;
+ const std::vector<StmtId>& getStmtIds() const {
+ return _stmtIds;
}
const std::string toString() const {
@@ -250,18 +264,12 @@ public:
builder << " sort: " << _sort;
builder << " collation: " << getCollation();
builder << " updateModification: " << getUpdateModification().toString();
- builder << " stmtId: " << _stmtId;
-
- builder << " arrayFilters: [";
- bool first = true;
- for (auto arrayFilter : getArrayFilters()) {
- if (!first) {
- builder << ", ";
- }
- first = false;
- builder << arrayFilter;
- }
- builder << "]";
+
+ builder << " stmtIds: ";
+ appendArrayToString(getStmtIds(), &builder);
+
+ builder << " arrayFilters: ";
+ appendArrayToString(getArrayFilters(), &builder);
if (getUpdateConstants()) {
builder << " updateConstants: " << *getUpdateConstants();
@@ -302,8 +310,8 @@ private:
// by the user for each individual element of the 'updates' array in the 'update' command.
boost::optional<BSONObj> _letParameters;
- // The statement id of this request.
- StmtId _stmtId = kUninitializedStmtId;
+ // The statement ids of this request.
+ std::vector<StmtId> _stmtIds = {kUninitializedStmtId};
// Flags controlling the update.
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index 7da347238a0..e6738ae32ce 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -540,7 +540,7 @@ SingleWriteResult makeWriteResultForInsertOrDeleteRetry() {
WriteResult performInserts(OperationContext* opCtx,
const write_ops::Insert& wholeOp,
- bool fromMigrate) {
+ const InsertType& type) {
// Insert performs its own retries, so we should only be within a WriteUnitOfWork when run in a
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -609,15 +609,22 @@ WriteResult performInserts(OperationContext* opCtx,
// current batch to preserve the error results order.
} else {
BSONObj toInsert = fixedDoc.getValue().isEmpty() ? doc : std::move(fixedDoc.getValue());
- batch.emplace_back(stmtId, toInsert);
+
+ // A time-series insert can combine multiple writes into a single operation, and thus
+ // can have multiple statement ids associated with it if it is retryable.
+ batch.emplace_back(type == InsertType::kTimeseries && wholeOp.getStmtIds()
+ ? *wholeOp.getStmtIds()
+ : std::vector<StmtId>{stmtId},
+ toInsert);
+
bytesInBatch += batch.back().doc.objsize();
if (!isLastDoc && batch.size() < maxBatchSize && bytesInBatch < maxBatchBytes)
continue; // Add more to batch before inserting.
}
- bool canContinue =
- insertBatchAndHandleErrors(opCtx, wholeOp, batch, &lastOpFixer, &out, fromMigrate);
+ bool canContinue = insertBatchAndHandleErrors(
+ opCtx, wholeOp, batch, &lastOpFixer, &out, type == InsertType::kFromMigrate);
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;
@@ -660,7 +667,6 @@ WriteResult performInserts(OperationContext* opCtx,
static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
const NamespaceString& ns,
- StmtId stmtId,
const UpdateRequest& updateRequest) {
const ExtensionsCallbackReal extensionsCallback(opCtx, &updateRequest.getNamespaceString());
ParsedUpdate parsedUpdate(opCtx, &updateRequest, extensionsCallback);
@@ -760,7 +766,7 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
OperationContext* opCtx,
const NamespaceString& ns,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
const write_ops::UpdateOpEntry& op,
LegacyRuntimeConstants runtimeConstants,
const boost::optional<BSONObj>& letParams) {
@@ -786,7 +792,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
if (letParams) {
request.setLetParameters(std::move(letParams));
}
- request.setStmtId(stmtId);
+ request.setStmtIds(stmtIds);
request.setYieldPolicy(opCtx->inMultiDocumentTransaction()
? PlanYieldPolicy::YieldPolicy::INTERRUPT_ONLY
: PlanYieldPolicy::YieldPolicy::YIELD_AUTO);
@@ -796,7 +802,7 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
++numAttempts;
try {
- return performSingleUpdateOp(opCtx, ns, stmtId, request);
+ return performSingleUpdateOp(opCtx, ns, request);
} catch (ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
const ExtensionsCallbackReal extensionsCallback(opCtx, &request.getNamespaceString());
ParsedUpdate parsedUpdate(opCtx, &request, extensionsCallback);
@@ -823,7 +829,9 @@ static SingleWriteResult performSingleUpdateOpWithDupKeyRetry(
MONGO_UNREACHABLE;
}
-WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& wholeOp) {
+WriteResult performUpdates(OperationContext* opCtx,
+ const write_ops::Update& wholeOp,
+ const UpdateType& type) {
// Update performs its own retries, so we should not be in a WriteUnitOfWork unless run in a
// transaction.
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -872,9 +880,16 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
ON_BLOCK_EXIT([&] { finishCurOp(opCtx, &curOp); });
try {
lastOpFixer.startingOp();
+
+ // A time-series insert can combine multiple writes into a single operation, and thus
+ // can have multiple statement ids associated with it if it is retryable.
+ auto stmtIds = type == UpdateType::kTimeseries && wholeOp.getStmtIds()
+ ? *wholeOp.getStmtIds()
+ : std::vector<StmtId>{stmtId};
+
out.results.emplace_back(performSingleUpdateOpWithDupKeyRetry(opCtx,
wholeOp.getNamespace(),
- stmtId,
+ stmtIds,
singleOp,
runtimeConstants,
wholeOp.getLet()));
diff --git a/src/mongo/db/ops/write_ops_exec.h b/src/mongo/db/ops/write_ops_exec.h
index b50c45dd168..0b2809251cf 100644
--- a/src/mongo/db/ops/write_ops_exec.h
+++ b/src/mongo/db/ops/write_ops_exec.h
@@ -56,6 +56,19 @@ struct WriteResult {
std::vector<StatusWith<SingleWriteResult>> results;
};
+/**
+ * Enums used to differentiate between types of insert/update operations based on how they were
+ * issued.
+ */
+enum class InsertType {
+ kStandard,
+ kFromMigrate, // From a chunk migration.
+ kTimeseries,
+};
+enum class UpdateType {
+ kStandard,
+ kTimeseries,
+};
/**
* Performs a batch of inserts, updates, or deletes.
@@ -68,15 +81,18 @@ struct WriteResult {
* exception being thrown from these functions. Callers are responsible for managing LastError in
* that case. This should generally be combined with LastError handling from parse failures.
*
- * 'fromMigrate' indicates whether the operation was induced by a chunk migration
+ * 'type' indicates whether the operation was induced by a standard write, a chunk migration, or a
+ * time-series insert.
*
* Note: performInserts() gets called for both user and internal (like tenant collection cloner,
* and initial sync/tenant migration oplog buffer) inserts.
*/
WriteResult performInserts(OperationContext* opCtx,
const write_ops::Insert& op,
- bool fromMigrate = false);
-WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& op);
+ const InsertType& type = InsertType::kStandard);
+WriteResult performUpdates(OperationContext* opCtx,
+ const write_ops::Update& op,
+ const UpdateType& type = UpdateType::kStandard);
WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& op);
/**
diff --git a/src/mongo/db/ops/write_ops_retryability_test.cpp b/src/mongo/db/ops/write_ops_retryability_test.cpp
index 0f0b2240272..4aa7f3a4748 100644
--- a/src/mongo/db/ops/write_ops_retryability_test.cpp
+++ b/src/mongo/db/ops/write_ops_retryability_test.cpp
@@ -79,7 +79,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
preImageOpTime, // pre-image optime
postImageOpTime, // post-image optime
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index f84e4279257..134e0ce4543 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -384,7 +384,7 @@ public:
sessionInfo, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
prevOpTime, // optime of previous write within same transaction
preImageOpTime, // pre-image optime
boost::none, // post-image optime
@@ -1226,7 +1226,7 @@ TEST_F(ChangeStreamStageTest, CommitCommandReturnsOperationsFromPreparedTransact
sessionInfo, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
applyOpsOpTime, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -1635,7 +1635,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionWithMultipleOplogEntries) {
sessionInfo, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
applyOpsOpTime2, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -1770,7 +1770,7 @@ TEST_F(ChangeStreamStageTest, PreparedTransactionEndingWithEmptyApplyOps) {
sessionInfo, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
applyOpsOpTime2, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp
index faf8be5e827..6999de3ca98 100644
--- a/src/mongo/db/repl/apply_ops_test.cpp
+++ b/src/mongo/db/repl/apply_ops_test.cpp
@@ -349,7 +349,7 @@ OplogEntry makeOplogEntry(OpTypeEnum opType, const BSONObj& oField) {
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 223861d4d15..f265fecf08f 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -265,7 +265,7 @@ OplogEntry IdempotencyTest::prepare(LogicalSessionId lsid,
boost::none /* o2 */,
info /* sessionInfo */,
Date_t::min() /* wallClockTime -- required but not checked */,
- stmtId,
+ {stmtId},
boost::none /* uuid */,
prevOpTime);
}
@@ -278,21 +278,21 @@ OplogEntry IdempotencyTest::commitUnprepared(LogicalSessionId lsid,
OperationSessionInfo info;
info.setSessionId(lsid);
info.setTxnNumber(txnNum);
- return makeCommandOplogEntryWithSessionInfoAndStmtId(
- nextOpTime(), nss, BSON("applyOps" << ops), lsid, txnNum, stmtId, prevOpTime);
+ return makeCommandOplogEntryWithSessionInfoAndStmtIds(
+ nextOpTime(), nss, BSON("applyOps" << ops), lsid, txnNum, {stmtId}, prevOpTime);
}
OplogEntry IdempotencyTest::commitPrepared(LogicalSessionId lsid,
TxnNumber txnNum,
StmtId stmtId,
OpTime prepareOpTime) {
- return makeCommandOplogEntryWithSessionInfoAndStmtId(
+ return makeCommandOplogEntryWithSessionInfoAndStmtIds(
nextOpTime(),
nss,
BSON("commitTransaction" << 1 << "commitTimestamp" << prepareOpTime.getTimestamp()),
lsid,
txnNum,
- stmtId,
+ {stmtId},
prepareOpTime);
}
@@ -300,8 +300,8 @@ OplogEntry IdempotencyTest::abortPrepared(LogicalSessionId lsid,
TxnNumber txnNum,
StmtId stmtId,
OpTime prepareOpTime) {
- return makeCommandOplogEntryWithSessionInfoAndStmtId(
- nextOpTime(), nss, BSON("abortTransaction" << 1), lsid, txnNum, stmtId, prepareOpTime);
+ return makeCommandOplogEntryWithSessionInfoAndStmtIds(
+ nextOpTime(), nss, BSON("abortTransaction" << 1), lsid, txnNum, {stmtId}, prepareOpTime);
}
OplogEntry IdempotencyTest::partialTxn(LogicalSessionId lsid,
@@ -319,7 +319,7 @@ OplogEntry IdempotencyTest::partialTxn(LogicalSessionId lsid,
boost::none /* o2 */,
info /* sessionInfo */,
Date_t::min() /* wallClockTime -- required but not checked */,
- stmtId,
+ {stmtId},
boost::none /* uuid */,
prevOpTime);
}
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 2441177164c..c318f16dd1f 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -635,7 +635,7 @@ OplogEntry makeOplogEntry(int t,
{}, // sessionInfo
boost::none, // upsert
Date_t() + Seconds(t), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp
index c1f24145358..228cd56433b 100644
--- a/src/mongo/db/repl/multiapplier_test.cpp
+++ b/src/mongo/db/repl/multiapplier_test.cpp
@@ -78,7 +78,7 @@ OplogEntry makeOplogEntry(int ts) {
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index fae4922f49d..6fa8771aa5f 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -364,7 +364,7 @@ OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
uassert(ErrorCodes::IllegalOperation,
str::stream() << "retryable writes is not supported for unreplicated ns: "
<< oplogEntry->getNss().ns(),
- !oplogEntry->getStatementId());
+ oplogEntry->getStatementIds().empty());
return {};
}
// If this oplog entry is from a tenant migration, include the tenant migration
@@ -433,10 +433,11 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
auto nss = oplogEntryTemplate->getNss();
auto replCoord = ReplicationCoordinator::get(opCtx);
if (replCoord->isOplogDisabledFor(opCtx, nss)) {
+ invariant(!begin->stmtIds.empty());
uassert(ErrorCodes::IllegalOperation,
str::stream() << "retryable writes is not supported for unreplicated ns: "
<< nss.ns(),
- begin->stmtId == kUninitializedStmtId);
+ begin->stmtIds.front() == kUninitializedStmtId);
return {};
}
@@ -472,7 +473,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
OplogLink oplogLink;
if (i > 0)
oplogLink.prevOpTime = opTimes[i - 1];
- appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtId);
+ appendOplogEntryChainInfo(opCtx, &oplogEntry, &oplogLink, begin[i].stmtIds);
opTimes[i] = insertStatementOplogSlot;
timestamps[i] = insertStatementOplogSlot.getTimestamp();
@@ -510,7 +511,9 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
void appendOplogEntryChainInfo(OperationContext* opCtx,
MutableOplogEntry* oplogEntry,
OplogLink* oplogLink,
- StmtId stmtId) {
+ const std::vector<StmtId>& stmtIds) {
+ invariant(!stmtIds.empty());
+
// We sometimes have a pre-image no-op entry even for normal non-retryable writes
// if recordPreImages is enabled on the collection.
if (!oplogLink->preImageOpTime.isNull()) {
@@ -518,7 +521,10 @@ void appendOplogEntryChainInfo(OperationContext* opCtx,
}
// Not a retryable write.
- if (stmtId == kUninitializedStmtId) {
+ if (stmtIds.front() == kUninitializedStmtId) {
+ // If the statement id is uninitialized, it must be the only one. There cannot also be
+ // initialized statement ids.
+ invariant(stmtIds.size() == 1);
return;
}
@@ -526,7 +532,7 @@ void appendOplogEntryChainInfo(OperationContext* opCtx,
invariant(txnParticipant);
oplogEntry->setSessionId(opCtx->getLogicalSessionId());
oplogEntry->setTxnNumber(opCtx->getTxnNumber());
- oplogEntry->setStatementId(stmtId);
+ oplogEntry->setStatementIds(stmtIds);
if (oplogLink->prevOpTime.isNull()) {
oplogLink->prevOpTime = txnParticipant.getLastWriteOpTime();
}
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index e1fe8ed904d..346b1072188 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -57,15 +57,22 @@ using OplogSlot = repl::OpTime;
struct InsertStatement {
public:
InsertStatement() = default;
- explicit InsertStatement(BSONObj toInsert) : doc(toInsert) {}
+ explicit InsertStatement(BSONObj toInsert) : doc(std::move(toInsert)) {}
+
+ InsertStatement(std::vector<StmtId> statementIds, BSONObj toInsert)
+ : stmtIds(statementIds), doc(std::move(toInsert)) {}
+ InsertStatement(StmtId stmtId, BSONObj toInsert)
+ : InsertStatement(std::vector<StmtId>{stmtId}, std::move(toInsert)) {}
+
+ InsertStatement(std::vector<StmtId> statementIds, BSONObj toInsert, OplogSlot os)
+ : stmtIds(statementIds), oplogSlot(std::move(os)), doc(std::move(toInsert)) {}
+ InsertStatement(StmtId stmtId, BSONObj toInsert, OplogSlot os)
+ : InsertStatement(std::vector<StmtId>{stmtId}, std::move(toInsert), std::move(os)) {}
- InsertStatement(StmtId statementId, BSONObj toInsert) : stmtId(statementId), doc(toInsert) {}
- InsertStatement(StmtId statementId, BSONObj toInsert, OplogSlot os)
- : stmtId(statementId), oplogSlot(os), doc(toInsert) {}
InsertStatement(BSONObj toInsert, Timestamp ts, long long term)
- : oplogSlot(repl::OpTime(ts, term)), doc(toInsert) {}
+ : oplogSlot(repl::OpTime(ts, term)), doc(std::move(toInsert)) {}
- StmtId stmtId = kUninitializedStmtId;
+ std::vector<StmtId> stmtIds = {kUninitializedStmtId};
OplogSlot oplogSlot;
BSONObj doc;
};
@@ -83,8 +90,8 @@ struct OplogLink {
/**
* Set the "lsid", "txnNumber", "stmtId", "prevOpTime", "preImageOpTime" and "postImageOpTime"
- * fields of the oplogEntry based on the given oplogLink for retryable writes (i.e. when stmtId !=
- * kUninitializedStmtId).
+ * fields of the oplogEntry based on the given oplogLink for retryable writes (i.e. when
+ * stmtIds.front() != kUninitializedStmtId).
*
* If the given oplogLink.prevOpTime is a null OpTime, both the oplogLink.prevOpTime and the
* "prevOpTime" field of the oplogEntry will be set to the TransactionParticipant's lastWriteOpTime.
@@ -95,7 +102,7 @@ struct OplogLink {
void appendOplogEntryChainInfo(OperationContext* opCtx,
MutableOplogEntry* oplogEntry,
OplogLink* oplogLink,
- StmtId stmtId);
+ const std::vector<StmtId>& stmtIds);
/**
* Create a new capped collection for the oplog if it doesn't yet exist.
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index d96bab46e68..f81d3ec3fe8 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -489,7 +489,7 @@ protected:
_lsid = makeLogicalSessionId(_opCtx.get());
- _insertOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ _insertOp1 = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 1), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -499,9 +499,9 @@ protected:
<< "partialTxn" << true),
_lsid,
_txnNum,
- StmtId(0),
+ {StmtId(0)},
OpTime());
- _insertOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ _insertOp2 = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 2), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -511,9 +511,9 @@ protected:
<< "partialTxn" << true),
_lsid,
_txnNum,
- StmtId(1),
+ {StmtId(1)},
_insertOp1->getOpTime());
- _commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ _commitOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 3), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -522,7 +522,7 @@ protected:
<< BSON("_id" << 3)))),
_lsid,
_txnNum,
- StmtId(2),
+ {StmtId(2)},
_insertOp2->getOpTime());
_opObserver->onInsertsFn =
[&](OperationContext*, const NamespaceString& nss, const std::vector<BSONObj>& docs) {
@@ -680,7 +680,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionTwoBa
const NamespaceString cmdNss{"admin", "$cmd"};
for (int i = 0; i < 4; i++) {
insertDocs.push_back(BSON("_id" << i));
- insertOps.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ insertOps.push_back(makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), i + 1), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -691,16 +691,16 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyUnpreparedTransactionTwoBa
<< "partialTxn" << true),
_lsid,
_txnNum,
- StmtId(i),
+ {StmtId(i)},
i == 0 ? OpTime() : insertOps.back().getOpTime()));
}
- auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 5), 1LL},
- cmdNss,
- BSON("applyOps" << BSONArray()),
- _lsid,
- _txnNum,
- StmtId(4),
- insertOps.back().getOpTime());
+ auto commitOp = makeCommandOplogEntryWithSessionInfoAndStmtIds({Timestamp(Seconds(1), 5), 1LL},
+ cmdNss,
+ BSON("applyOps" << BSONArray()),
+ _lsid,
+ _txnNum,
+ {StmtId(4)},
+ insertOps.back().getOpTime());
NoopOplogApplierObserver observer;
OplogApplierImpl oplogApplier(
@@ -758,7 +758,7 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) {
std::vector<OplogEntry> insertOps1, insertOps2;
const NamespaceString cmdNss{"admin", "$cmd"};
- insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 1), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -768,9 +768,9 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) {
<< "partialTxn" << true),
_lsid,
txnNum1,
- StmtId(0),
+ {StmtId(0)},
OpTime()));
- insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ insertOps1.push_back(makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 2), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -781,9 +781,9 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) {
_lsid,
txnNum1,
- StmtId(1),
+ {StmtId(1)},
insertOps1.back().getOpTime()));
- insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(2), 1), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -793,9 +793,9 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) {
<< "partialTxn" << true),
_lsid,
txnNum2,
- StmtId(0),
+ {StmtId(0)},
OpTime()));
- insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtId(
+ insertOps2.push_back(makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(2), 2), 1LL},
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -805,22 +805,22 @@ TEST_F(MultiOplogEntryOplogApplierImplTest, MultiApplyTwoTransactionsOneBatch) {
<< "partialTxn" << true),
_lsid,
txnNum2,
- StmtId(1),
+ {StmtId(1)},
insertOps2.back().getOpTime()));
- auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 3), 1LL},
- _nss1,
- BSON("applyOps" << BSONArray()),
- _lsid,
- txnNum1,
- StmtId(2),
- insertOps1.back().getOpTime());
- auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(2), 3), 1LL},
- _nss1,
- BSON("applyOps" << BSONArray()),
- _lsid,
- txnNum2,
- StmtId(2),
- insertOps2.back().getOpTime());
+ auto commitOp1 = makeCommandOplogEntryWithSessionInfoAndStmtIds({Timestamp(Seconds(1), 3), 1LL},
+ _nss1,
+ BSON("applyOps" << BSONArray()),
+ _lsid,
+ txnNum1,
+ {StmtId(2)},
+ insertOps1.back().getOpTime());
+ auto commitOp2 = makeCommandOplogEntryWithSessionInfoAndStmtIds({Timestamp(Seconds(2), 3), 1LL},
+ _nss1,
+ BSON("applyOps" << BSONArray()),
+ _lsid,
+ txnNum2,
+ {StmtId(2)},
+ insertOps2.back().getOpTime());
NoopOplogApplierObserver observer;
OplogApplierImpl oplogApplier(
@@ -877,7 +877,7 @@ protected:
_stashedEnableMajorityReadConcern =
std::exchange(serverGlobalParams.enableMajorityReadConcern, true);
- _prepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ _prepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 3), 1LL},
_nss1,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -887,9 +887,9 @@ protected:
<< "prepare" << true),
_lsid,
_txnNum,
- StmtId(2),
+ {StmtId(2)},
_insertOp2->getOpTime());
- _singlePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ _singlePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 3), 1LL},
_nss1,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -899,40 +899,40 @@ protected:
<< "prepare" << true),
_lsid,
_txnNum,
- StmtId(0),
+ {StmtId(0)},
OpTime());
- _commitPrepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ _commitPrepareWithPrevOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 4), 1LL},
_nss1,
BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)),
_lsid,
_txnNum,
- StmtId(3),
+ {StmtId(3)},
_prepareWithPrevOp->getOpTime());
- _commitSinglePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ _commitSinglePrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 4), 1LL},
_nss1,
BSON("commitTransaction" << 1 << "commitTimestamp" << Timestamp(Seconds(1), 4)),
_lsid,
_txnNum,
- StmtId(1),
+ {StmtId(1)},
_prepareWithPrevOp->getOpTime());
_abortPrepareWithPrevOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL},
- _nss1,
- BSON("abortTransaction" << 1),
- _lsid,
- _txnNum,
- StmtId(3),
- _prepareWithPrevOp->getOpTime());
+ makeCommandOplogEntryWithSessionInfoAndStmtIds({Timestamp(Seconds(1), 4), 1LL},
+ _nss1,
+ BSON("abortTransaction" << 1),
+ _lsid,
+ _txnNum,
+ {StmtId(3)},
+ _prepareWithPrevOp->getOpTime());
_abortSinglePrepareApplyOp = _abortPrepareWithPrevOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId({Timestamp(Seconds(1), 4), 1LL},
- _nss1,
- BSON("abortTransaction" << 1),
- _lsid,
- _txnNum,
- StmtId(1),
- _singlePrepareApplyOp->getOpTime());
+ makeCommandOplogEntryWithSessionInfoAndStmtIds({Timestamp(Seconds(1), 4), 1LL},
+ _nss1,
+ BSON("abortTransaction" << 1),
+ _lsid,
+ _txnNum,
+ {StmtId(1)},
+ _singlePrepareApplyOp->getOpTime());
}
void tearDown() override {
@@ -1231,13 +1231,13 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary),
_writerPool.get());
- auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ auto emptyPrepareApplyOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
{Timestamp(Seconds(1), 3), 1LL},
_nss1,
BSON("applyOps" << BSONArray() << "prepare" << true),
_lsid,
_txnNum,
- StmtId(0),
+ {StmtId(0)},
OpTime());
const auto expectedStartOpTime = emptyPrepareApplyOp.getOpTime();
@@ -2436,7 +2436,7 @@ public:
sessionInfo, // sessionInfo
boost::none, // false
wallClockTime, // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -2467,7 +2467,7 @@ public:
sessionInfo, // sessionInfo
boost::none, // false
wallClockTime, // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -2692,7 +2692,7 @@ TEST_F(OplogApplierImplTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnS
repl::OpTime txnInsertOpTime(Timestamp(2, 0), 1);
sessionInfo.setTxnNumber(4);
- auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
txnInsertOpTime,
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -2702,18 +2702,18 @@ TEST_F(OplogApplierImplTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnS
<< "partialTxn" << true),
sessionId,
*sessionInfo.getTxnNumber(),
- StmtId(0),
+ {StmtId(0)},
OpTime());
repl::OpTime txnCommitOpTime(Timestamp(3, 0), 1);
auto txnCommitOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime,
- cmdNss,
- BSON("applyOps" << BSONArray()),
- sessionId,
- *sessionInfo.getTxnNumber(),
- StmtId(1),
- txnInsertOpTime);
+ makeCommandOplogEntryWithSessionInfoAndStmtIds(txnCommitOpTime,
+ cmdNss,
+ BSON("applyOps" << BSONArray()),
+ sessionId,
+ *sessionInfo.getTxnNumber(),
+ {StmtId(1)},
+ txnInsertOpTime);
auto writerPool = makeReplWriterPool();
NoopOplogApplierObserver observer;
@@ -2751,7 +2751,7 @@ TEST_F(OplogApplierImplTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnS
}();
repl::OpTime txnInsertOpTime(Timestamp(1, 0), 1);
- auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtId(
+ auto txnInsertOp = makeCommandOplogEntryWithSessionInfoAndStmtIds(
txnInsertOpTime,
cmdNss,
BSON("applyOps" << BSON_ARRAY(BSON("op"
@@ -2761,18 +2761,18 @@ TEST_F(OplogApplierImplTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnS
<< "partialTxn" << true),
sessionId,
*sessionInfo.getTxnNumber(),
- StmtId(0),
+ {StmtId(0)},
OpTime());
repl::OpTime txnCommitOpTime(Timestamp(2, 0), 1);
auto txnCommitOp =
- makeCommandOplogEntryWithSessionInfoAndStmtId(txnCommitOpTime,
- cmdNss,
- BSON("applyOps" << BSONArray()),
- sessionId,
- *sessionInfo.getTxnNumber(),
- StmtId(1),
- txnInsertOpTime);
+ makeCommandOplogEntryWithSessionInfoAndStmtIds(txnCommitOpTime,
+ cmdNss,
+ BSON("applyOps" << BSONArray()),
+ sessionId,
+ *sessionInfo.getTxnNumber(),
+ {StmtId(1)},
+ txnInsertOpTime);
repl::OpTime retryableInsertOpTime(Timestamp(3, 0), 1);
sessionInfo.setTxnNumber(4);
@@ -2834,22 +2834,22 @@ TEST_F(OplogApplierImplTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
// Entries with a session id and a txnNumber update the transaction table.
auto lsidSingle = makeLogicalSessionIdForTest();
- auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, 0);
+ auto opSingle = makeInsertDocumentOplogEntryWithSessionInfoAndStmtIds(
+ {Timestamp(Seconds(1), 0), 1LL}, ns0, uuid0, BSON("_id" << 0), lsidSingle, 5LL, {0});
// For entries with the same session, the entry with a larger txnNumber is saved.
auto lsidDiffTxn = makeLogicalSessionIdForTest();
- auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, 1);
- auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, 1);
+ auto opDiffTxnSmaller = makeInsertDocumentOplogEntryWithSessionInfoAndStmtIds(
+ {Timestamp(Seconds(2), 0), 1LL}, ns1, uuid1, BSON("_id" << 0), lsidDiffTxn, 10LL, {1});
+ auto opDiffTxnLarger = makeInsertDocumentOplogEntryWithSessionInfoAndStmtIds(
+ {Timestamp(Seconds(3), 0), 1LL}, ns1, uuid1, BSON("_id" << 1), lsidDiffTxn, 20LL, {1});
// For entries with the same session and txnNumber, the later optime is saved.
auto lsidSameTxn = makeLogicalSessionIdForTest();
- auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, 0);
- auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
- {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, 1);
+ auto opSameTxnLater = makeInsertDocumentOplogEntryWithSessionInfoAndStmtIds(
+ {Timestamp(Seconds(6), 0), 1LL}, ns2, uuid2, BSON("_id" << 0), lsidSameTxn, 30LL, {0});
+ auto opSameTxnSooner = makeInsertDocumentOplogEntryWithSessionInfoAndStmtIds(
+ {Timestamp(Seconds(5), 0), 1LL}, ns2, uuid2, BSON("_id" << 1), lsidSameTxn, 30LL, {1});
// Entries with a session id but no txnNumber do not lead to updates.
auto lsidNoTxn = makeLogicalSessionIdForTest();
diff --git a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
index 13771e8fa40..b7a745c70b4 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test_fixture.cpp
@@ -379,7 +379,7 @@ OplogEntry makeOplogEntry(OpTypeEnum opType,
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
index b747ee1c369..21adc94cc72 100644
--- a/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
+++ b/src/mongo/db/repl/oplog_batcher_test_fixture.cpp
@@ -182,7 +182,7 @@ OplogEntry makeInsertOplogEntry(int t, const NamespaceString& nss, boost::option
{}, // sessionInfo
boost::none, // upsert
Date_t() + Seconds(t), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -204,7 +204,7 @@ OplogEntry makeNoopOplogEntry(int t, const StringData& msg) {
{}, // sessionInfo
boost::none, // upsert
Date_t() + Seconds(t), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -238,7 +238,7 @@ OplogEntry makeApplyOpsOplogEntry(int t, bool prepare, const std::vector<OplogEn
{}, // sessionInfo
boost::none, // upsert
Date_t() + Seconds(t), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -272,7 +272,7 @@ OplogEntry makeCommitTransactionOplogEntry(int t, StringData dbName, bool prepar
{}, // sessionInfo
boost::none, // upsert
Date_t() + Seconds(t), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -333,7 +333,7 @@ OplogEntry makeLargeTransactionOplogEntries(int t,
{}, // sessionInfo
boost::none, // upsert
Date_t() + Seconds(t), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
prevWriteOpTime, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index cfc2e635030..2790ade9860 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -102,7 +102,7 @@ BSONObj makeOplogEntryDoc(OpTime opTime,
const OperationSessionInfo& sessionInfo,
const boost::optional<bool>& isUpsert,
const mongo::Date_t& wallClockTime,
- const boost::optional<StmtId>& statementId,
+ const std::vector<StmtId>& statementIds,
const boost::optional<OpTime>& prevWriteOpTimeInTransaction,
const boost::optional<OpTime>& preImageOpTime,
const boost::optional<OpTime>& postImageOpTime,
@@ -136,8 +136,10 @@ BSONObj makeOplogEntryDoc(OpTime opTime,
invariant(o2Field);
builder.append(OplogEntryBase::kUpsertFieldName, isUpsert.get());
}
- if (statementId) {
- builder.append(OplogEntryBase::kStatementIdFieldName, statementId.get());
+ if (statementIds.size() == 1) {
+ builder.append(OplogEntryBase::kStatementIdsFieldName, statementIds.front());
+ } else if (!statementIds.empty()) {
+ builder.append(OplogEntryBase::kStatementIdsFieldName, statementIds);
}
if (prevWriteOpTimeInTransaction) {
const BSONObj localObject = prevWriteOpTimeInTransaction.get().toBSON();
@@ -312,7 +314,7 @@ DurableOplogEntry::DurableOplogEntry(OpTime opTime,
const OperationSessionInfo& sessionInfo,
const boost::optional<bool>& isUpsert,
const mongo::Date_t& wallClockTime,
- const boost::optional<StmtId>& statementId,
+ const std::vector<StmtId>& statementIds,
const boost::optional<OpTime>& prevWriteOpTimeInTransaction,
const boost::optional<OpTime>& preImageOpTime,
const boost::optional<OpTime>& postImageOpTime,
@@ -330,7 +332,7 @@ DurableOplogEntry::DurableOplogEntry(OpTime opTime,
sessionInfo,
isUpsert,
wallClockTime,
- statementId,
+ statementIds,
prevWriteOpTimeInTransaction,
preImageOpTime,
postImageOpTime,
@@ -587,8 +589,8 @@ const boost::optional<mongo::Value>& OplogEntry::get_id() const& {
return _entry.get_id();
}
-const boost::optional<std::int32_t> OplogEntry::getStatementId() const& {
- return _entry.getStatementId();
+std::vector<StmtId> OplogEntry::getStatementIds() const& {
+ return _entry.getStatementIds();
}
const OperationSessionInfo& OplogEntry::getOperationSessionInfo() const {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index e093b526a3c..7bbea0505bd 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -36,6 +36,7 @@
#include "mongo/db/repl/apply_ops_gen.h"
#include "mongo/db/repl/oplog_entry_gen.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/util/visit_helper.h"
namespace mongo {
namespace repl {
@@ -120,6 +121,26 @@ public:
getOperationSessionInfo().setSessionId(std::move(value));
}
+ void setStatementIds(const std::vector<StmtId>& stmtIds) & {
+ if (stmtIds.empty()) {
+ OplogEntryBase::setStatementIds(boost::none);
+ } else if (stmtIds.size() == 1) {
+ OplogEntryBase::setStatementIds({{stmtIds.front()}});
+ } else {
+ OplogEntryBase::setStatementIds({{stmtIds}});
+ }
+ }
+
+ std::vector<StmtId> getStatementIds() const {
+ if (!OplogEntryBase::getStatementIds()) {
+ return {};
+ }
+ return stdx::visit(
+ visit_helper::Overloaded{[](StmtId stmtId) { return std::vector<StmtId>{stmtId}; },
+ [](const std::vector<StmtId>& stmtIds) { return stmtIds; }},
+ *OplogEntryBase::getStatementIds());
+ }
+
void setTxnNumber(boost::optional<std::int64_t> value) & {
getOperationSessionInfo().setTxnNumber(std::move(value));
}
@@ -218,7 +239,7 @@ public:
using MutableOplogEntry::kPreImageOpTimeFieldName;
using MutableOplogEntry::kPrevWriteOpTimeInTransactionFieldName;
using MutableOplogEntry::kSessionIdFieldName;
- using MutableOplogEntry::kStatementIdFieldName;
+ using MutableOplogEntry::kStatementIdsFieldName;
using MutableOplogEntry::kTermFieldName;
using MutableOplogEntry::kTimestampFieldName;
using MutableOplogEntry::kTxnNumberFieldName;
@@ -243,7 +264,7 @@ public:
using MutableOplogEntry::getPreImageOpTime;
using MutableOplogEntry::getPrevWriteOpTimeInTransaction;
using MutableOplogEntry::getSessionId;
- using MutableOplogEntry::getStatementId;
+ using MutableOplogEntry::getStatementIds;
using MutableOplogEntry::getTerm;
using MutableOplogEntry::getTimestamp;
using MutableOplogEntry::getTxnNumber;
@@ -298,7 +319,7 @@ public:
const OperationSessionInfo& sessionInfo,
const boost::optional<bool>& isUpsert,
const mongo::Date_t& wallClockTime,
- const boost::optional<StmtId>& statementId,
+ const std::vector<StmtId>& statementIds,
const boost::optional<OpTime>& prevWriteOpTimeInTransaction,
const boost::optional<OpTime>& preImageOpTime,
const boost::optional<OpTime>& postImageOpTime,
@@ -457,7 +478,7 @@ public:
static constexpr auto kPrevWriteOpTimeInTransactionFieldName =
DurableOplogEntry::kPrevWriteOpTimeInTransactionFieldName;
static constexpr auto kSessionIdFieldName = DurableOplogEntry::kSessionIdFieldName;
- static constexpr auto kStatementIdFieldName = DurableOplogEntry::kStatementIdFieldName;
+ static constexpr auto kStatementIdFieldName = DurableOplogEntry::kStatementIdsFieldName;
static constexpr auto kTermFieldName = DurableOplogEntry::kTermFieldName;
static constexpr auto kTimestampFieldName = DurableOplogEntry::kTimestampFieldName;
static constexpr auto kTxnNumberFieldName = DurableOplogEntry::kTxnNumberFieldName;
@@ -504,7 +525,7 @@ public:
// Wrapper methods for DurableOplogEntry
const boost::optional<mongo::Value>& get_id() const&;
- const boost::optional<std::int32_t> getStatementId() const&;
+ std::vector<StmtId> getStatementIds() const&;
const OperationSessionInfo& getOperationSessionInfo() const;
const boost::optional<mongo::LogicalSessionId>& getSessionId() const;
const boost::optional<std::int64_t> getTxnNumber() const;
diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl
index 86fa9f039d3..2f9d256b59f 100644
--- a/src/mongo/db/repl/oplog_entry.idl
+++ b/src/mongo/db/repl/oplog_entry.idl
@@ -132,10 +132,11 @@ structs:
description: "Used by tests in replication and also by production resharding code to
store timestamps."
stmtId:
- cpp_name: statementId
- type: StmtId
+ cpp_name: statementIds
+ type:
+ variant: [StmtId, array<StmtId>]
optional: true
- description: "Identifier of the transaction statement which generated this oplog
+ description: "Identifier of the transaction statement(s) which generated this oplog
entry"
prevOpTime:
cpp_name: prevWriteOpTimeInTransaction
diff --git a/src/mongo/db/repl/oplog_entry_test_helpers.cpp b/src/mongo/db/repl/oplog_entry_test_helpers.cpp
index a68d4824f9b..915a9b333d4 100644
--- a/src/mongo/db/repl/oplog_entry_test_helpers.cpp
+++ b/src/mongo/db/repl/oplog_entry_test_helpers.cpp
@@ -57,7 +57,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::optional<BSONObj> object2,
OperationSessionInfo sessionInfo,
Date_t wallClockTime,
- boost::optional<StmtId> stmtId,
+ const std::vector<StmtId>& stmtIds,
boost::optional<UUID> uuid,
boost::optional<OpTime> prevOpTime) {
return {repl::DurableOplogEntry(opTime, // optime
@@ -72,7 +72,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
sessionInfo, // sessionInfo
boost::none, // upsert
wallClockTime, // wall clock time
- stmtId, // statement id
+ stmtIds, // statement ids
prevOpTime, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -91,17 +91,17 @@ OplogEntry makeCommandOplogEntry(OpTime opTime,
boost::none /* o2 */,
{} /* sessionInfo */,
Date_t() /* wallClockTime*/,
- boost::none /* stmtId */,
+ {} /* stmtIds */,
uuid);
}
-OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtId(OpTime opTime,
- const NamespaceString& nss,
- const BSONObj& command,
- LogicalSessionId lsid,
- TxnNumber txnNum,
- StmtId stmtId,
- boost::optional<OpTime> prevOpTime) {
+OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtIds(OpTime opTime,
+ const NamespaceString& nss,
+ const BSONObj& command,
+ LogicalSessionId lsid,
+ TxnNumber txnNum,
+ std::vector<StmtId> stmtIds,
+ boost::optional<OpTime> prevOpTime) {
OperationSessionInfo info;
info.setSessionId(lsid);
info.setTxnNumber(txnNum);
@@ -112,7 +112,7 @@ OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtId(OpTime opTime,
boost::none /* o2 */,
info /* sessionInfo */,
Date_t::min() /* wallClockTime -- required but not checked */,
- stmtId,
+ stmtIds,
boost::none /* uuid */,
prevOpTime);
}
@@ -220,14 +220,14 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfo(OpTime opTime,
Date_t::now()); // wall clock time
}
-OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtIds(
OpTime opTime,
const NamespaceString& nss,
boost::optional<UUID> uuid,
const BSONObj& documentToInsert,
LogicalSessionId lsid,
TxnNumber txnNum,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
boost::optional<OpTime> prevOpTime) {
OperationSessionInfo info;
info.setSessionId(lsid);
@@ -239,7 +239,7 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
boost::none, // o2
info, // session info
Date_t::now(), // wall clock time
- stmtId,
+ stmtIds,
uuid,
prevOpTime); // previous optime in same session
}
diff --git a/src/mongo/db/repl/oplog_entry_test_helpers.h b/src/mongo/db/repl/oplog_entry_test_helpers.h
index 5497e386616..0375ed8a30f 100644
--- a/src/mongo/db/repl/oplog_entry_test_helpers.h
+++ b/src/mongo/db/repl/oplog_entry_test_helpers.h
@@ -48,7 +48,7 @@ OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::optional<BSONObj> object2 = boost::none,
OperationSessionInfo sessionInfo = {},
Date_t wallClockTime = Date_t(),
- boost::optional<StmtId> stmtId = boost::none,
+ const std::vector<StmtId>& stmtIds = {},
boost::optional<UUID> uuid = boost::none,
boost::optional<OpTime> prevOpTime = boost::none);
@@ -118,13 +118,13 @@ OplogEntry makeCommandOplogEntry(OpTime opTime,
/**
* Creates an oplog entry for 'command' with the given 'optime', 'namespace' and session information
*/
-OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtId(
+OplogEntry makeCommandOplogEntryWithSessionInfoAndStmtIds(
OpTime opTime,
const NamespaceString& nss,
const BSONObj& command,
LogicalSessionId lsid,
TxnNumber txnNum,
- StmtId stmtId,
+ std::vector<StmtId> stmtIds,
boost::optional<OpTime> prevOpTime = boost::none);
/**
@@ -135,14 +135,14 @@ OplogEntry makeInsertDocumentOplogEntryWithSessionInfo(OpTime opTime,
const BSONObj& documentToInsert,
OperationSessionInfo info);
-OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtId(
+OplogEntry makeInsertDocumentOplogEntryWithSessionInfoAndStmtIds(
OpTime opTime,
const NamespaceString& nss,
boost::optional<UUID> uuid,
const BSONObj& documentToInsert,
LogicalSessionId lsid,
TxnNumber txnNum,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
boost::optional<OpTime> prevOpTime = boost::none);
BSONObj makeInsertApplyOpsEntry(const NamespaceString& nss, const UUID& uuid, const BSONObj& doc);
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index d8a6f78e237..5eabd460b8c 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -96,7 +96,7 @@ BSONObj makeNoopOplogEntry(OpTime opTime) {
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp
index 3b13fae1c6d..7c75e9378fa 100644
--- a/src/mongo/db/repl/replication_recovery_test.cpp
+++ b/src/mongo/db/repl/replication_recovery_test.cpp
@@ -262,7 +262,7 @@ repl::OplogEntry _makeOplogEntry(repl::OpTime opTime,
sessionInfo, // sessionInfo
boost::none, // isUpsert
wallTime, // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp
index f034f7d399c..8a684d343bf 100644
--- a/src/mongo/db/repl/rollback_impl_test.cpp
+++ b/src/mongo/db/repl/rollback_impl_test.cpp
@@ -1444,7 +1444,7 @@ RollbackImplTest::_setUpUnpreparedTransactionForCountTest(UUID collId) {
sessionInfo, // sessionInfo
boost::none, // isUpsert
Date_t(), // wallClockTime
- boost::none, // statementId
+ {}, // statementIds
OpTime(), // prevWriteOpTimeInTransaction
boost::none, // preImageOpTime
boost::none, // postImageOpTime
@@ -1478,7 +1478,7 @@ RollbackImplTest::_setUpUnpreparedTransactionForCountTest(UUID collId) {
sessionInfo, // sessionInfo
boost::none, // isUpsert
Date_t(), // wallClockTime
- boost::none, // statementId
+ {}, // statementIds
partialApplyOpsOpTime, // prevWriteOpTimeInTransaction
boost::none, // preImageOpTime
boost::none, // postImageOpTime
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index 99f1779c8c2..f53818d2b76 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -65,7 +65,7 @@ OplogEntry createOplogEntryForTransactionTableUpdate(repl::OpTime opTime,
{}, // sessionInfo
true, // upsert
wallClockTime,
- boost::none, // statementId
+ {}, // statementIds
boost::none, // prevWriteOpTime
boost::none, // preImageOpTime
boost::none, // postImageOpTime
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index 1382b8b73c7..e2d7c2460cc 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -323,7 +323,7 @@ BSONObj _makeOplogEntry(Timestamp ts, long long term) {
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
index c2ed2a01dcc..7f0eb279406 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service_test.cpp
@@ -96,7 +96,7 @@ OplogEntry makeOplogEntry(OpTime opTime,
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 497d9d5a477..639313e66e1 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -593,11 +593,12 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
// Use the same wallclock time as the noop entry.
sessionTxnRecord.emplace(sessionId, txnNumber, OpTime(), noopEntry.getWallClockTime());
sessionTxnRecord->setState(DurableTxnStateEnum::kCommitted);
- } else if (entry.getStatementId() && !SessionUpdateTracker::isTransactionEntry(entry)) {
+ } else if (!entry.getStatementIds().empty() &&
+ !SessionUpdateTracker::isTransactionEntry(entry)) {
// If it has a statement id but isn't a transaction, it's a retryable write.
auto sessionId = *entry.getSessionId();
auto txnNumber = *entry.getTxnNumber();
- auto stmtId = *entry.getStatementId();
+ auto entryStmtIds = entry.getStatementIds();
if (entry.getOpType() == repl::OpTypeEnum::kNoop) {
// TODO(SERVER-53510): handle pre and post image no-ops
LOGV2_DEBUG(5350903,
@@ -606,19 +607,19 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
"entry"_attr = entry.getEntry(),
"sessionId"_attr = sessionId,
"txnNumber"_attr = txnNumber,
- "statementId"_attr = stmtId,
+ "statementIds"_attr = entryStmtIds,
"tenant"_attr = _tenantId,
"migrationUuid"_attr = _migrationUuid);
continue;
}
- stmtIds.push_back(stmtId);
+ stmtIds.insert(stmtIds.end(), entryStmtIds.begin(), entryStmtIds.end());
LOGV2_DEBUG(5350901,
2,
"Tenant Oplog Applier processing retryable write",
"sessionId"_attr = sessionId,
"txnNumber"_attr = txnNumber,
- "statementId"_attr = stmtId,
+ "statementIds"_attr = entryStmtIds,
"tenant"_attr = _tenantId,
"migrationUuid"_attr = _migrationUuid);
opCtx->setLogicalSessionId(sessionId);
@@ -643,15 +644,15 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
uassert(5350902,
str::stream() << "Tenant oplog application processed same retryable write "
"twice for transaction "
- << txnNumber << " statement " << stmtId << " on session "
- << sessionId,
- !txnParticipant.checkStatementExecutedNoOplogEntryFetch(stmtId));
+ << txnNumber << " statement " << entryStmtIds.front()
+ << " on session " << sessionId,
+ !txnParticipant.checkStatementExecutedNoOplogEntryFetch(entryStmtIds.front()));
prevWriteOpTime = txnParticipant.getLastWriteOpTime();
// Set sessionId, txnNumber, and statementId for all ops in a retryable write.
noopEntry.setSessionId(sessionId);
noopEntry.setTxnNumber(txnNumber);
- noopEntry.setStatementId(stmtId);
+ noopEntry.setStatementIds(entryStmtIds);
// set fromMigrate on the no-op so the session update tracker recognizes it.
noopEntry.setFromMigrate(true);
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 140a1dde83c..dfd0092f282 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -1066,7 +1066,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* outerOpCtx) {
return toInsert;
}());
- const auto reply = write_ops_exec::performInserts(opCtx, insertOp, true);
+ const auto reply = write_ops_exec::performInserts(
+ opCtx, insertOp, write_ops_exec::InsertType::kFromMigrate);
for (unsigned long i = 0; i < reply.results.size(); ++i) {
uassertStatusOKWithContext(
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 2b069fe72b2..73503728507 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -126,12 +126,13 @@ Status insertOplogAndUpdateConfigForRetryable(OperationContext* opCtx,
// If it's not a CRUD type, it's an oplog related to transaction, so convert it to
// incompleteHistory stmtId.
- const auto stmtId = oplog.isCrudOpType() ? *oplog.getStatementId() : kIncompleteHistoryStmtId;
+ const auto stmtIds = oplog.isCrudOpType() ? oplog.getStatementIds()
+ : std::vector<StmtId>{kIncompleteHistoryStmtId};
try {
txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none);
- if (txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
+ if (txnParticipant.checkStatementExecuted(opCtx, stmtIds.front())) {
// Skip the incoming statement because it has already been logged locally.
return Status::OK();
}
@@ -221,7 +222,7 @@ Status insertOplogAndUpdateConfigForRetryable(OperationContext* opCtx,
// Use the same wallTime as oplog since SessionUpdateTracker looks at the oplog entry
// wallTime when replicating.
sessionTxnRecord.setLastWriteDate(noOpOplog.getWallClockTime());
- txnParticipant.onRetryableWriteCloningCompleted(opCtx, {stmtId}, sessionTxnRecord);
+ txnParticipant.onRetryableWriteCloningCompleted(opCtx, stmtIds, sessionTxnRecord);
wunit.commit();
});
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index fb0f3ae2b94..0e666473a7e 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -213,7 +213,7 @@ public:
repl::OpTypeEnum opType,
const BSONObj& obj1,
const boost::optional<BSONObj> obj2) {
- return makeOplog(opTime, opType, obj1, obj2, {}, boost::none);
+ return makeOplog(opTime, opType, obj1, obj2, {}, {});
}
repl::OplogEntry makeOplog(const repl::OpTime& opTime,
@@ -221,7 +221,7 @@ public:
const BSONObj& obj1,
const boost::optional<BSONObj> obj2,
const OperationSessionInfo& sessionInfo,
- const boost::optional<StmtId>& statementId) {
+ const std::vector<StmtId>& statementIds) {
ReshardingDonorOplogId id(opTime.getTimestamp(), opTime.getTimestamp());
return {repl::DurableOplogEntry(opTime,
boost::none /* hash */,
@@ -235,7 +235,7 @@ public:
sessionInfo,
boost::none /* upsert */,
{} /* date */,
- statementId,
+ statementIds,
boost::none /* prevWrite */,
boost::none /* preImage */,
boost::none /* postImage */,
@@ -1937,7 +1937,7 @@ public:
UUID uuid,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
repl::OpTime prevOpTime) {
repl::MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
@@ -1945,10 +1945,10 @@ public:
oplogEntry.setUuid(uuid);
oplogEntry.setObject(BSON("TestValue" << 0));
oplogEntry.setWallClockTime(Date_t::now());
- if (stmtId != kUninitializedStmtId) {
+ if (stmtIds.front() != kUninitializedStmtId) {
oplogEntry.setSessionId(lsid);
oplogEntry.setTxnNumber(txnNumber);
- oplogEntry.setStatementId(stmtId);
+ oplogEntry.setStatementIds(stmtIds);
oplogEntry.setPrevWriteOpTimeInTransaction(prevOpTime);
}
return repl::logOp(opCtx, &oplogEntry);
@@ -1956,7 +1956,7 @@ public:
void writeTxnRecord(const LogicalSessionId& lsid,
const TxnNumber& txnNum,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
repl::OpTime prevOpTime,
boost::optional<DurableTxnStateEnum> txnState) {
auto newClient = operationContext()->getServiceContext()->makeClient("testWriteTxnRecord");
@@ -1976,7 +1976,7 @@ public:
AutoGetCollection autoColl(opCtx, kCrudNs, MODE_IX);
WriteUnitOfWork wuow(opCtx);
const auto opTime = insertRetryableOplog(
- opCtx, kCrudNs, kCrudUUID, session->getSessionId(), txnNum, stmtId, prevOpTime);
+ opCtx, kCrudNs, kCrudUUID, session->getSessionId(), txnNum, stmtIds, prevOpTime);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(session->getSessionId());
@@ -1984,7 +1984,7 @@ public:
sessionTxnRecord.setLastWriteOpTime(opTime);
sessionTxnRecord.setLastWriteDate(Date_t::now());
sessionTxnRecord.setState(txnState);
- txnParticipant.onWriteOpCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord);
+ txnParticipant.onWriteOpCompletedOnPrimary(opCtx, stmtIds, sessionTxnRecord);
wuow.commit();
}
@@ -2186,12 +2186,8 @@ public:
<< kCrudUUID << "o" << doc))
<< "prepare" << true);
- crudOps->push_back(makeOplog(prepareOptime,
- repl::OpTypeEnum::kCommand,
- applyOpsCmd,
- boost::none,
- session,
- boost::none));
+ crudOps->push_back(makeOplog(
+ prepareOptime, repl::OpTypeEnum::kCommand, applyOpsCmd, boost::none, session, {}));
repl::OpTime commitOptime(prepareOptime.getTimestamp() + 1, prepareOptime.getTerm());
crudOps->push_back(makeOplog(commitOptime,
@@ -2199,7 +2195,7 @@ public:
BSON("commitTransaction" << 1),
boost::none,
session,
- boost::none));
+ {}));
}
/**
@@ -2326,19 +2322,19 @@ TEST_F(ReshardingOplogApplierRetryableTest, GroupInserts) {
BSON("_id" << 1),
boost::none,
session,
- 1));
+ {1}));
crudOps.push_back(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
repl::OpTypeEnum::kInsert,
BSON("_id" << 2),
boost::none,
session,
- 2));
+ {2}));
crudOps.push_back(makeOplog(repl::OpTime(Timestamp(7, 3), 1),
repl::OpTypeEnum::kInsert,
BSON("_id" << 3),
boost::none,
session,
- 3));
+ {3}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 5 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2402,13 +2398,13 @@ TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) {
BSON("_id" << 1),
boost::none,
session1,
- 1));
+ {1}));
crudOps.push_back(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
repl::OpTypeEnum::kInsert,
BSON("_id" << 2),
boost::none,
session1,
- 2));
+ {2}));
OperationSessionInfo session2;
session2.setSessionId(makeLogicalSessionIdForTest());
@@ -2419,7 +2415,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) {
BSON("$set" << BSON("x" << 1)),
BSON("_id" << 2),
session2,
- 1));
+ {1}));
OperationSessionInfo session3;
session3.setSessionId(makeLogicalSessionIdForTest());
@@ -2430,7 +2426,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) {
BSON("_id" << 1),
boost::none,
session3,
- 1));
+ {1}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2491,13 +2487,13 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) {
BSON("_id" << 1),
boost::none,
session1,
- 1));
+ {1}));
crudOps.push_back(makeOplog(repl::OpTime(Timestamp(6, 3), 1),
repl::OpTypeEnum::kInsert,
BSON("_id" << 2),
boost::none,
session1,
- 2));
+ {2}));
OperationSessionInfo session2;
session2.setSessionId(makeLogicalSessionIdForTest());
@@ -2508,7 +2504,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) {
BSON("_id" << 3),
boost::none,
session2,
- 1));
+ {1}));
session1.setTxnNumber(2);
@@ -2517,7 +2513,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) {
BSON("_id" << 4),
boost::none,
session1,
- 21));
+ {21}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2565,7 +2561,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) {
TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) {
auto lsid = makeLogicalSessionIdForTest();
- writeTxnRecord(lsid, 2, 1, {}, boost::none);
+ writeTxnRecord(lsid, 2, {1}, {}, boost::none);
std::deque<repl::OplogEntry> crudOps;
@@ -2578,7 +2574,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) {
BSON("_id" << 1),
boost::none,
session,
- 21));
+ {21}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2617,7 +2613,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) {
auto lsid = makeLogicalSessionIdForTest();
const TxnNumber existingTxnNum = 20;
const StmtId existingStmtId = 1;
- writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+ writeTxnRecord(lsid, existingTxnNum, {existingStmtId}, {}, boost::none);
OperationSessionInfo session;
const TxnNumber incomingTxnNum = 15;
@@ -2632,7 +2628,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) {
BSON("_id" << 1),
boost::none,
session,
- incomingStmtId));
+ {incomingStmtId}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2682,7 +2678,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) {
auto lsid = makeLogicalSessionIdForTest();
const TxnNumber existingTxnNum = 20;
const StmtId existingStmtId = 1;
- writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+ writeTxnRecord(lsid, existingTxnNum, {existingStmtId}, {}, boost::none);
OperationSessionInfo session;
const TxnNumber incomingTxnNum = existingTxnNum;
@@ -2697,7 +2693,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) {
BSON("_id" << 1),
boost::none,
session,
- incomingStmtId));
+ {incomingStmtId}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2737,7 +2733,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted)
auto lsid = makeLogicalSessionIdForTest();
const TxnNumber existingTxnNum = 20;
const StmtId existingStmtId = 1;
- writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+ writeTxnRecord(lsid, existingTxnNum, {existingStmtId}, {}, boost::none);
OperationSessionInfo session;
const TxnNumber incomingTxnNum = existingTxnNum;
@@ -2752,7 +2748,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted)
BSON("_id" << 1),
boost::none,
session,
- incomingStmtId));
+ {incomingStmtId}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2809,7 +2805,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithActiveUnpreparedTxnSame
BSON("_id" << 1),
boost::none,
session,
- 1));
+ {1}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2867,7 +2863,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithActiveUnpreparedTxnWith
BSON("_id" << 1),
boost::none,
session,
- 1));
+ {1}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2924,7 +2920,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillComm
BSON("_id" << 1),
boost::none,
session,
- 1));
+ {1}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -2989,7 +2985,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithPreparedTxnThatWillAbor
BSON("_id" << 1),
boost::none,
session,
- 1));
+ {1}));
auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps), 2 /* batchSize */);
boost::optional<ReshardingOplogApplier> applier;
@@ -3045,21 +3041,21 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWriteWithPreImage) {
BSON("_id" << 1),
boost::none,
{},
- boost::none));
+ {}));
auto preImageOplog = makeOplog(repl::OpTime(Timestamp(6, 3), 1),
repl::OpTypeEnum::kNoop,
BSON("_id" << 1),
boost::none,
session1,
- boost::none);
+ {});
auto updateOplog = makeOplog(repl::OpTime(Timestamp(8, 3), 1),
repl::OpTypeEnum::kUpdate,
BSON("$set" << BSON("x" << 1)),
BSON("_id" << 1),
session1,
- 1);
+ {1});
updateOplog.setPreImageOp(std::make_shared<repl::DurableOplogEntry>(preImageOplog.getEntry()));
crudOps.push_back(updateOplog);
@@ -3111,21 +3107,21 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWriteWithPostImage) {
BSON("_id" << 1),
boost::none,
{},
- boost::none));
+ {}));
auto postImageOplog = makeOplog(repl::OpTime(Timestamp(6, 3), 1),
repl::OpTypeEnum::kNoop,
BSON("_id" << 1 << "x" << 1),
boost::none,
session1,
- boost::none);
+ {});
auto updateOplog = makeOplog(repl::OpTime(Timestamp(8, 3), 1),
repl::OpTypeEnum::kUpdate,
BSON("$set" << BSON("x" << 1)),
BSON("_id" << 1),
session1,
- 1);
+ {1});
updateOplog.setPostImageOp(
std::make_shared<repl::DurableOplogEntry>(postImageOplog.getEntry()));
crudOps.push_back(updateOplog);
@@ -3169,7 +3165,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, RetryableWriteWithPostImage) {
TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithLowerExistingTxn) {
auto lsid = makeLogicalSessionIdForTest();
- writeTxnRecord(lsid, 2, 1, {}, boost::none);
+ writeTxnRecord(lsid, 2, {1}, {}, boost::none);
std::deque<repl::OplogEntry> crudOps;
@@ -3218,7 +3214,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithHigherExistingTxnNum) {
auto lsid = makeLogicalSessionIdForTest();
const TxnNumber existingTxnNum = 20;
const StmtId existingStmtId = 1;
- writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+ writeTxnRecord(lsid, existingTxnNum, {existingStmtId}, {}, boost::none);
OperationSessionInfo session;
const TxnNumber incomingTxnNum = 15;
@@ -3279,7 +3275,7 @@ TEST_F(ReshardingOplogApplierRetryableTest, ApplyTxnWithEqualExistingTxnNum) {
auto lsid = makeLogicalSessionIdForTest();
const TxnNumber existingTxnNum = 20;
const StmtId existingStmtId = 1;
- writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none);
+ writeTxnRecord(lsid, existingTxnNum, {existingStmtId}, {}, boost::none);
OperationSessionInfo session;
const TxnNumber incomingTxnNum = existingTxnNum;
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
index 052dc8c1396..210d3216d09 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner.cpp
@@ -178,7 +178,7 @@ void ReshardingTxnCloner::_updateSessionRecord(OperationContext* opCtx) {
oplogEntry.setNss({});
oplogEntry.setSessionId(opCtx->getLogicalSessionId());
oplogEntry.setTxnNumber(opCtx->getTxnNumber());
- oplogEntry.setStatementId(kIncompleteHistoryStmtId);
+ oplogEntry.setStatementIds({kIncompleteHistoryStmtId});
oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
oplogEntry.setWallClockTime(Date_t::now());
oplogEntry.setFromMigrate(true);
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index d2ab326959b..1a3f06a7e42 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -141,7 +141,7 @@ repl::MutableOplogEntry parseOplog(const BSONObj& oplogBSON) {
uassert(ErrorCodes::UnsupportedFormat,
str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString()
<< " does not have stmtId: " << redact(oplogBSON),
- oplogEntry.getStatementId());
+ !oplogEntry.getStatementIds().empty());
return oplogEntry;
}
@@ -220,7 +220,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
oplogEntry.setObject2(oplogBSON); // TODO: strip redundant info?
}
- const auto stmtId = *oplogEntry.getStatementId();
+ const auto stmtIds = oplogEntry.getStatementIds();
auto uniqueOpCtx = cc().makeOperationContext();
auto opCtx = uniqueOpCtx.get();
@@ -231,7 +231,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
try {
txnParticipant.beginOrContinue(opCtx, result.txnNum, boost::none, boost::none);
- if (txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
+ if (txnParticipant.checkStatementExecuted(opCtx, stmtIds.front())) {
// Skip the incoming statement because it has already been logged locally
return lastResult;
}
@@ -242,8 +242,9 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
return lastResult;
}
- if (stmtId == kIncompleteHistoryStmtId) {
+ if (stmtIds.front() == kIncompleteHistoryStmtId) {
// No need to log entries for transactions whose history has been truncated
+ invariant(stmtIds.size() == 1);
return lastResult;
}
@@ -295,7 +296,7 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
sessionTxnRecord.setLastWriteDate(oplogEntry.getWallClockTime());
// We do not migrate transaction oplog entries so don't set the txn state.
- txnParticipant.onRetryableWriteCloningCompleted(opCtx, {stmtId}, sessionTxnRecord);
+ txnParticipant.onRetryableWriteCloningCompleted(opCtx, stmtIds, sessionTxnRecord);
}
wunit.commit();
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 27fce4da3eb..70116c968d9 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -88,7 +88,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
boost::optional<BSONObj> object2,
OperationSessionInfo sessionInfo,
Date_t wallClockTime,
- boost::optional<StmtId> stmtId,
+ const std::vector<StmtId>& stmtIds,
boost::optional<repl::OpTime> preImageOpTime = boost::none,
boost::optional<repl::OpTime> postImageOpTime = boost::none) {
return {
@@ -104,7 +104,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
sessionInfo, // sessionInfo
boost::none, // isUpsert
wallClockTime, // wall clock time
- stmtId, // statement id
+ stmtIds, // statement ids
boost::none, // optime of previous write within same transaction
preImageOpTime, // pre-image optime
postImageOpTime, // post-image optime
@@ -301,8 +301,9 @@ private:
const repl::OplogEntry& oplogToCheck) {
ASSERT_TRUE(oplogToCheck.getOpType() == OpTypeEnum::kNoop);
- ASSERT_TRUE(oplogToCheck.getStatementId());
- ASSERT_EQ(*originalOplog.getStatementId(), *oplogToCheck.getStatementId());
+ ASSERT_EQ(originalOplog.getStatementIds().size(), 1);
+ ASSERT_EQ(oplogToCheck.getStatementIds().size(), 1);
+ ASSERT_EQ(originalOplog.getStatementIds().front(), oplogToCheck.getStatementIds().front());
const auto origSessionInfo = originalOplog.getOperationSessionInfo();
const auto sessionInfoToCheck = oplogToCheck.getOperationSessionInfo();
@@ -342,7 +343,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -350,7 +351,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -358,7 +359,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
boost::none, // o2
sessionInfo, // sessionInfo
Date_t::now(), // wall clock time
- 5); // statement id
+ {5}); // statement ids
returnOplog({oplog1, oplog2, oplog3});
@@ -403,7 +404,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
sessionInfo.setTxnNumber(txnNum++);
auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
@@ -412,7 +413,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
sessionInfo.setTxnNumber(txnNum);
auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
@@ -421,7 +422,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 5); // statement id
+ {5}); // statement ids
returnOplog({oplog1, oplog2, oplog3});
@@ -457,7 +458,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -465,7 +466,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -473,7 +474,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 5); // statement id
+ {5}); // statement ids
// Return in 2 batches
returnOplog({oplog1, oplog2});
@@ -525,7 +526,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
boost::none, // o2
sessionInfo1, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -533,7 +534,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
boost::none, // o2
sessionInfo2, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -541,7 +542,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
boost::none, // o2
sessionInfo2, // session info
Date_t::now(), // wall clock time
- 5); // statement id
+ {5}); // statement ids
returnOplog({oplog1, oplog2, oplog3});
@@ -604,7 +605,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
boost::none, // o2
sessionInfo, // session info
Date_t(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto origInnerOplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -612,7 +613,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
boost::none, // o2
sessionInfo, // session info
Date_t(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto oplog1 = makeOplogEntry(OpTime(Timestamp(1100, 2), 1), // optime
OpTypeEnum::kNoop, // op type
@@ -620,7 +621,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
origInnerOplog1.getEntry().toBSON(), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto oplog2 = makeOplogEntry(OpTime(Timestamp(1080, 2), 1), // optime
OpTypeEnum::kNoop, // op type
@@ -628,7 +629,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
origInnerOplog2.getEntry().toBSON(), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
returnOplog({oplog1, oplog2});
@@ -670,7 +671,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kUpdate, // op type
@@ -678,7 +679,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
BSON("$set" << BSON("x" << 101)), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45, // statement id
+ {45}, // statement ids
repl::OpTime(Timestamp(100, 2), 1), // pre-image optime
boost::none); // post-image optime
@@ -698,8 +699,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
auto nextOplog = historyIter.next(opCtx);
ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
- ASSERT_TRUE(nextOplog.getStatementId());
- ASSERT_EQ(45, nextOplog.getStatementId().value());
+ ASSERT_EQ(1, nextOplog.getStatementIds().size());
+ ASSERT_EQ(45, nextOplog.getStatementIds().front());
auto nextSessionInfo = nextOplog.getOperationSessionInfo();
@@ -727,8 +728,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
auto preImageOpTime = nextOplog.getPreImageOpTime().value();
auto newPreImageOplog = getOplog(opCtx, preImageOpTime);
- ASSERT_TRUE(newPreImageOplog.getStatementId());
- ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
+ ASSERT_EQ(1, newPreImageOplog.getStatementIds().size());
+ ASSERT_EQ(45, newPreImageOplog.getStatementIds().front());
auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo();
@@ -761,7 +762,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kUpdate, // op type
@@ -769,7 +770,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
BSON("$set" << BSON("x" << 101)),
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45, // statement id
+ {45}, // statement ids
boost::none, // pre-image optime
repl::OpTime(Timestamp(100, 2), 1)); // post-image optime
@@ -788,8 +789,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
auto nextOplog = historyIter.next(opCtx);
ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
- ASSERT_TRUE(nextOplog.getStatementId());
- ASSERT_EQ(45, nextOplog.getStatementId().value());
+ ASSERT_EQ(1, nextOplog.getStatementIds().size());
+ ASSERT_EQ(45, nextOplog.getStatementIds().front());
auto nextSessionInfo = nextOplog.getOperationSessionInfo();
@@ -817,8 +818,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
auto postImageOpTime = nextOplog.getPostImageOpTime().value();
auto newPostImageOplog = getOplog(opCtx, postImageOpTime);
- ASSERT_TRUE(newPostImageOplog.getStatementId());
- ASSERT_EQ(45, newPostImageOplog.getStatementId().value());
+ ASSERT_EQ(1, newPostImageOplog.getStatementIds().size());
+ ASSERT_EQ(45, newPostImageOplog.getStatementIds().front());
auto preImageSessionInfo = newPostImageOplog.getOperationSessionInfo();
@@ -851,7 +852,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kUpdate, // op type
@@ -859,7 +860,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
BSON("$set" << BSON("x" << 101)), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45, // statement id
+ {45}, // statement ids
repl::OpTime(Timestamp(100, 2), 1), // pre-image optime
boost::none);
@@ -882,8 +883,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
auto nextOplog = historyIter.next(opCtx);
ASSERT_TRUE(nextOplog.getOpType() == OpTypeEnum::kNoop);
- ASSERT_TRUE(nextOplog.getStatementId());
- ASSERT_EQ(45, nextOplog.getStatementId().value());
+ ASSERT_EQ(1, nextOplog.getStatementIds().size());
+ ASSERT_EQ(45, nextOplog.getStatementIds().front());
auto nextSessionInfo = nextOplog.getOperationSessionInfo();
@@ -911,8 +912,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
auto preImageOpTime = nextOplog.getPreImageOpTime().value();
auto newPreImageOplog = getOplog(opCtx, preImageOpTime);
- ASSERT_TRUE(newPreImageOplog.getStatementId());
- ASSERT_EQ(45, newPreImageOplog.getStatementId().value());
+ ASSERT_EQ(1, nextOplog.getStatementIds().size());
+ ASSERT_EQ(45, newPreImageOplog.getStatementIds().front());
auto preImageSessionInfo = newPreImageOplog.getOperationSessionInfo();
@@ -957,7 +958,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
boost::none, // o2
oldSessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -965,7 +966,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
boost::none, // o2
oldSessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
returnOplog({oplog1, oplog2});
@@ -1008,7 +1009,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
boost::none, // o2
oldSessionInfo,
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
returnOplog({oplog1});
@@ -1031,7 +1032,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
boost::none, // o2
oldSessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
returnOplog({oplog2});
@@ -1115,7 +1116,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
returnOplog({oplog});
@@ -1139,7 +1140,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
returnOplog({oplog});
@@ -1165,7 +1166,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyForResponseWith
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- boost::none); // statement id
+ {}); // statement ids
returnOplog({oplog});
@@ -1193,7 +1194,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
returnOplog({oplog1});
@@ -1212,7 +1213,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
returnOplog({oplog2});
@@ -1261,7 +1262,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImage
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto preImageOplog2 = makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime
OpTypeEnum::kNoop, // op type
@@ -1269,7 +1270,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImage
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
returnOplog({preImageOplog1, preImageOplog2});
@@ -1299,7 +1300,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
sessionInfo.setSessionId(makeLogicalSessionIdForTest());
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
@@ -1308,7 +1309,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)),
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45, // statement id
+ {45}, // statement ids
repl::OpTime(Timestamp(100, 2), 1), // pre-image optime
boost::none); // post-image optime
@@ -1340,7 +1341,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
sessionInfo.setTxnNumber(56);
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
@@ -1349,7 +1350,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForPreImageOplogWithNo
BSON("$set" << BSON("x" << 101)), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45, // statement id
+ {45}, // statement ids
repl::OpTime(Timestamp(100, 2), 1), // pre-image optime
boost::none); // post-image optime
@@ -1382,7 +1383,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kUpdate, // op type
@@ -1390,7 +1391,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
returnOplog({preImageOplog, updateOplog});
@@ -1421,7 +1422,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kUpdate, // op type
@@ -1429,7 +1430,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45, // statement id
+ {45}, // statement ids
repl::OpTime(Timestamp(100, 2), 1), // pre-image optime
boost::none); // post-image optime
@@ -1462,7 +1463,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto updateOplog = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kUpdate, // op type
@@ -1470,7 +1471,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
BSON("$set" << BSON("x" << 101)), // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45, // statement id
+ {45}, // statement ids
boost::none, // pre-image optime
repl::OpTime(Timestamp(100, 2), 1)); // post-image optime
@@ -1502,7 +1503,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23); // statement id
+ {23}); // statement ids
auto oplog2 = makeOplogEntry(OpTime(Timestamp(70, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -1510,7 +1511,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 30); // statement id
+ {30}); // statement ids
auto oplog3 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -1518,7 +1519,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 45); // statement id
+ {45}); // statement ids
returnOplog({oplog1, oplog2, oplog3});
@@ -1540,8 +1541,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
ASSERT_TRUE(firstInsertOplog.getOpType() == OpTypeEnum::kInsert);
ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject());
- ASSERT_TRUE(firstInsertOplog.getStatementId());
- ASSERT_EQ(30, *firstInsertOplog.getStatementId());
+ ASSERT_EQ(1, firstInsertOplog.getStatementIds().size());
+ ASSERT_EQ(30, firstInsertOplog.getStatementIds().front());
checkStatementExecuted(opCtx, 19, 23, oplog1);
checkStatementExecuted(opCtx, 19, 30);
@@ -1559,14 +1560,14 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 23), // statement id
+ {23}), // statement ids
makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
OpTypeEnum::kNoop, // op type
{}, // o
TransactionParticipant::kDeadEndSentinel, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- kIncompleteHistoryStmtId), // statement id
+ {kIncompleteHistoryStmtId}), // statement ids
// This will get ignored since previous entry will make the history 'incomplete'.
makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -1574,7 +1575,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- 5)}; // statement id
+ {5})}; // statement ids
SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
sessionMigration.start(getServiceContext());
@@ -1637,7 +1638,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo1, // session info
Date_t::now(), // wall clock time
- 23), // statement id
+ {23}), // statement ids
// Session 2 entries
makeOplogEntry(OpTime(Timestamp(50, 2), 1), // optime
@@ -1646,14 +1647,14 @@ TEST_F(SessionCatalogMigrationDestinationTest,
boost::none, // o2
sessionInfo2, // session info
Date_t::now(), // wall clock time
- 56), // statement id
+ {56}), // statement ids
makeOplogEntry(OpTime(Timestamp(20, 2), 1), // optime
OpTypeEnum::kInsert, // op type
BSON("x" << 20), // o
boost::none, // o2
sessionInfo2, // session info
Date_t::now(), // wall clock time
- 55)}; // statement id
+ {55})}; // statement ids
SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
sessionMigration.start(getServiceContext());
@@ -1747,7 +1748,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, MigratingKnownStmtWhileOplogTrunc
boost::none, // o2
sessionInfo, // session info
Date_t::now(), // wall clock time
- kStmtId); // statement id
+ {kStmtId}); // statement ids
SessionCatalogMigrationDestination sessionMigration(kNs, kFromShard, migrationId());
sessionMigration.start(getServiceContext());
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 7708ad1c963..5090df2b309 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -84,7 +84,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
const boost::optional<BSONObj>& o2Field,
const OperationSessionInfo& sessionInfo,
Date_t wallClockTime,
- const boost::optional<StmtId>& statementId) {
+ const std::vector<StmtId>& statementIds) {
return {
repl::DurableOplogEntry(opTime, // optime
hash, // hash
@@ -98,7 +98,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
sessionInfo, // session info
boost::none, // upsert
wallClockTime, // wall clock time
- statementId, // statement id
+ statementIds, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -123,7 +123,7 @@ repl::OplogEntry makeSentinelOplogEntry(const LogicalSessionId& lsid,
TransactionParticipant::kDeadEndSentinel, // o2
sessionInfo, // session info
wallClockTime, // wall clock time
- kIncompleteHistoryStmtId); // statement id
+ {kIncompleteHistoryStmtId}); // statement id
}
} // namespace
@@ -263,14 +263,13 @@ std::shared_ptr<Notification<bool>> SessionCatalogMigrationSource::getNotificati
bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) {
while (_currentOplogIterator) {
if (auto nextOplog = _currentOplogIterator->getNext(opCtx)) {
- auto nextStmtId = nextOplog->getStatementId();
+ auto nextStmtIds = nextOplog->getStatementIds();
// Skip the rest of the chain for this session since the ns is unrelated with the
// current one being migrated. It is ok to not check the rest of the chain because
// retryable writes doesn't allow touching different namespaces.
- if (!nextStmtId ||
- (nextStmtId && *nextStmtId != kIncompleteHistoryStmtId &&
- nextOplog->getNss() != _ns)) {
+ if (nextStmtIds.empty() ||
+ (nextStmtIds.front() != kIncompleteHistoryStmtId && nextOplog->getNss() != _ns)) {
_currentOplogIterator.reset();
return false;
}
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 349dfac2015..46881036634 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -60,7 +60,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
BSONObj object,
boost::optional<BSONObj> object2,
Date_t wallClockTime,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
repl::OpTime prevWriteOpTimeInTransaction,
boost::optional<repl::OpTime> preImageOpTime,
boost::optional<repl::OpTime> postImageOpTime) {
@@ -77,7 +77,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
{}, // sessionInfo
boost::none, // upsert
wallClockTime, // wall clock time
- stmtId, // statement id
+ stmtIds, // statement ids
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
preImageOpTime, // pre-image optime
postImageOpTime, // post-image optime
@@ -90,7 +90,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
BSONObj object,
boost::optional<BSONObj> object2,
Date_t wallClockTime,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
repl::OpTime prevWriteOpTimeInTransaction,
boost::optional<repl::OpTime> preImageOpTime = boost::none,
boost::optional<repl::OpTime> postImageOpTime = boost::none) {
@@ -100,7 +100,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
object,
object2,
wallClockTime,
- stmtId,
+ stmtIds,
prevWriteOpTimeInTransaction,
preImageOpTime,
postImageOpTime);
@@ -119,7 +119,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -129,7 +129,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
entry1.getOpTime()); // optime of previous write within same transaction
insertOplogEntry(entry2);
@@ -173,7 +173,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
auto entry1b =
@@ -182,7 +182,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
entry1a.getOpTime()); // optime of previous write within same transaction
SessionTxnRecord sessionRecord1;
@@ -201,7 +201,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 3, // statement id
+ {3}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
auto entry2b =
@@ -210,7 +210,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 4, // statement id
+ {4}, // statement ids
entry2a.getOpTime()); // optime of previous write within same transaction
SessionTxnRecord sessionRecord2;
@@ -279,7 +279,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -289,7 +289,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
entry1.getOpTime()); // pre-image optime
insertOplogEntry(entry2);
@@ -300,7 +300,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
BSON("x" << 20), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 2, // statement id
+ {2}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry3);
@@ -310,7 +310,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
BSON("$inc" << BSON("x" << 1)), // o
BSON("x" << 19), // o2
Date_t::now(), // wall clock time
- 3, // statement id
+ {3}, // statement ids
entry2.getOpTime(), // optime of previous write within same transaction
boost::none, // pre-image optime
entry3.getOpTime()); // post-image optime
@@ -349,7 +349,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -371,7 +371,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none); // post-image optime
@@ -406,7 +406,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -427,7 +427,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry2);
@@ -437,7 +437,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
BSON("x" << 40), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 2, // statement id
+ {2}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry3);
@@ -499,7 +499,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry);
@@ -524,7 +524,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry);
@@ -548,7 +548,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
BSON("x" << 40), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 2, // statement id
+ {2}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry);
@@ -573,7 +573,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(40, 1), 2)); // optime of previous write within same transaction
insertOplogEntry(entry);
@@ -608,8 +608,8 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
auto oplog = *nextOplogResult.oplog;
ASSERT_TRUE(oplog.getObject2());
ASSERT_BSONOBJ_EQ(TransactionParticipant::kDeadEndSentinel, *oplog.getObject2());
- ASSERT_TRUE(oplog.getStatementId());
- ASSERT_EQ(kIncompleteHistoryStmtId, *oplog.getStatementId());
+ ASSERT_EQ(1, oplog.getStatementIds().size());
+ ASSERT_EQ(kIncompleteHistoryStmtId, oplog.getStatementIds().front());
ASSERT_NE(Date_t{}, oplog.getWallClockTime());
auto sessionInfo = oplog.getOperationSessionInfo();
@@ -630,7 +630,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(40, 1), 2)); // optime of previous write within same transaction
insertOplogEntry(entry);
@@ -757,7 +757,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
// Create a config.transaction entry pointing to the insert oplog entry.
@@ -832,7 +832,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsI
BSON("x" << -50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -842,7 +842,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyDeleteNotTouchingChunkIsI
BSON("x" << -50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
entry1.getOpTime()); // pre-image optime
insertOplogEntry(entry2);
@@ -867,7 +867,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingC
BSON("x" << -5), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -877,7 +877,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdatePrePostNotTouchingC
BSON("$set" << BSON("y" << 1)), // o
BSON("x" << -5), // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
entry1.getOpTime()); // pre-image optime
insertOplogEntry(entry2);
@@ -903,7 +903,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
BSON("x" << -50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -913,7 +913,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
BSON("$set" << BSON("x" << -50)), // o
BSON("x" << 10), // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
boost::none, // pre-image optime
entry1.getOpTime()); // post-image optime
@@ -953,7 +953,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -963,7 +963,7 @@ TEST_F(SessionCatalogMigrationSourceTest,
BSON("$set" << BSON("x" << 50)), // o
BSON("x" << -10), // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
boost::none, // pre-image optime
entry1.getOpTime()); // post-image optime
@@ -989,7 +989,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkSho
BSON("x" << -10 << "y" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -999,7 +999,7 @@ TEST_F(SessionCatalogMigrationSourceTest, FindAndModifyUpdateNotTouchingChunkSho
BSON("$set" << BSON("y" << 50)), // o
BSON("x" << -10), // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
repl::OpTime(Timestamp(0, 0), 0), // optime of previous write within same transaction
boost::none, // pre-image optime
entry1.getOpTime()); // post-image optime
@@ -1032,7 +1032,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
auto entry1b =
@@ -1041,7 +1041,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << -50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
entry1a.getOpTime()); // optime of previous write within same transaction
SessionTxnRecord sessionRecord1;
@@ -1060,7 +1060,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << 30), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 3, // statement id
+ {3}, // statement ids
repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
auto entry2b =
@@ -1069,7 +1069,7 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
BSON("x" << 50), // o
boost::none, // o2
Date_t::now(), // wall clock time
- 4, // statement id
+ {4}, // statement ids
entry2a.getOpTime()); // optime of previous write within same transaction
SessionTxnRecord sessionRecord2;
diff --git a/src/mongo/db/transaction_history_iterator_test.cpp b/src/mongo/db/transaction_history_iterator_test.cpp
index f6d97a50229..95798ed8663 100644
--- a/src/mongo/db/transaction_history_iterator_test.cpp
+++ b/src/mongo/db/transaction_history_iterator_test.cpp
@@ -73,7 +73,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 17bbde43b7e..0af6c1e6782 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -171,9 +171,11 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
// Each entry should correspond to a retryable write or a FCV4.0 format transaction.
// These oplog entries must have statementIds.
- invariant(entry.getStatementId());
- if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
+ auto stmtIds = entry.getStatementIds();
+ invariant(!stmtIds.empty());
+ if (stmtIds.front() == kIncompleteHistoryStmtId) {
// Only the dead end sentinel can have this id for oplog write history
+ invariant(stmtIds.size() == 1);
invariant(entry.getObject2());
invariant(entry.getObject2()->woCompare(TransactionParticipant::kDeadEndSentinel) ==
0);
@@ -187,15 +189,17 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
return result;
}
- const auto insertRes =
- result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(lsid,
- result.lastTxnRecord->getTxnNum(),
- *entry.getStatementId(),
- existingOpTime,
- entry.getOpTime());
+ for (auto stmtId : entry.getStatementIds()) {
+ const auto insertRes =
+ result.committedStatements.emplace(stmtId, entry.getOpTime());
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(lsid,
+ result.lastTxnRecord->getTxnNum(),
+ stmtId,
+ existingOpTime,
+ entry.getOpTime());
+ }
}
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
@@ -2348,8 +2352,9 @@ boost::optional<repl::OplogEntry> TransactionParticipant::Participant::checkStat
TransactionHistoryIterator txnIter(*stmtTimestamp);
while (txnIter.hasNext()) {
const auto entry = txnIter.next(opCtx);
- invariant(entry.getStatementId());
- if (*entry.getStatementId() == stmtId)
+ auto stmtIds = entry.getStatementIds();
+ invariant(!stmtIds.empty());
+ if (std::find(stmtIds.begin(), stmtIds.end(), stmtId) != stmtIds.end())
return entry;
}
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 28335181dcf..af6e6959d43 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -65,7 +65,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
BSONObj object,
OperationSessionInfo sessionInfo,
Date_t wallClockTime,
- boost::optional<StmtId> stmtId,
+ const std::vector<StmtId>& stmtIds,
boost::optional<repl::OpTime> prevWriteOpTimeInTransaction) {
return {repl::DurableOplogEntry(
opTime, // optime
@@ -80,7 +80,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
sessionInfo, // sessionInfo
boost::none, // upsert
wallClockTime, // wall clock time
- stmtId, // statement id
+ stmtIds, // statement ids
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
@@ -195,8 +195,8 @@ protected:
UUID uuid,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
- StmtId stmtId) {
- return logOp(opCtx, nss, uuid, lsid, txnNumber, stmtId, {});
+ const std::vector<StmtId>& stmtIds) {
+ return logOp(opCtx, nss, uuid, lsid, txnNumber, stmtIds, {});
}
static repl::OpTime logOp(OperationContext* opCtx,
@@ -204,7 +204,7 @@ protected:
UUID uuid,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
repl::OpTime prevOpTime) {
repl::MutableOplogEntry oplogEntry;
oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
@@ -212,17 +212,17 @@ protected:
oplogEntry.setUuid(uuid);
oplogEntry.setObject(BSON("TestValue" << 0));
oplogEntry.setWallClockTime(Date_t::now());
- if (stmtId != kUninitializedStmtId) {
+ if (stmtIds.front() != kUninitializedStmtId) {
oplogEntry.setSessionId(lsid);
oplogEntry.setTxnNumber(txnNumber);
- oplogEntry.setStatementId(stmtId);
+ oplogEntry.setStatementIds(stmtIds);
oplogEntry.setPrevWriteOpTimeInTransaction(prevOpTime);
}
return repl::logOp(opCtx, &oplogEntry);
}
repl::OpTime writeTxnRecord(TxnNumber txnNum,
- StmtId stmtId,
+ const std::vector<StmtId>& stmtIds,
repl::OpTime prevOpTime,
boost::optional<DurableTxnStateEnum> txnState) {
const auto session = OperationContextSession::get(opCtx());
@@ -234,7 +234,7 @@ protected:
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime =
- logOp(opCtx(), kNss, uuid, session->getSessionId(), txnNum, stmtId, prevOpTime);
+ logOp(opCtx(), kNss, uuid, session->getSessionId(), txnNum, stmtIds, prevOpTime);
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(session->getSessionId());
@@ -242,7 +242,7 @@ protected:
sessionTxnRecord.setLastWriteOpTime(opTime);
sessionTxnRecord.setLastWriteDate(Date_t::now());
sessionTxnRecord.setState(txnState);
- txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), {stmtId}, sessionTxnRecord);
+ txnParticipant.onWriteOpCompletedOnPrimary(opCtx(), stmtIds, sessionTxnRecord);
wuow.commit();
return opTime;
@@ -307,7 +307,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit
const TxnNumber txnNum = 21;
txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
- const auto opTime = writeTxnRecord(txnNum, 0, {}, boost::none);
+ const auto opTime = writeTxnRecord(txnNum, {0}, {}, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -332,8 +332,8 @@ TEST_F(TransactionParticipantRetryableWritesTest,
const auto& sessionId = *opCtx()->getLogicalSessionId();
- const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none);
- const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, boost::none);
+ const auto firstOpTime = writeTxnRecord(100, {0}, {}, boost::none);
+ const auto secondOpTime = writeTxnRecord(200, {1}, firstOpTime, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -359,11 +359,12 @@ TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplace
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.refreshFromStorageIfNeeded(opCtx());
- const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none);
+ const auto firstOpTime = writeTxnRecord(100, {0}, {}, boost::none);
assertTxnRecord(100, 0, firstOpTime, boost::none);
- const auto secondOpTime = writeTxnRecord(300, 2, firstOpTime, DurableTxnStateEnum::kCommitted);
+ const auto secondOpTime =
+ writeTxnRecord(300, {2}, firstOpTime, DurableTxnStateEnum::kCommitted);
assertTxnRecord(300, 2, secondOpTime, DurableTxnStateEnum::kCommitted);
- const auto thirdOpTime = writeTxnRecord(400, 3, secondOpTime, boost::none);
+ const auto thirdOpTime = writeTxnRecord(400, {3}, secondOpTime, boost::none);
assertTxnRecord(400, 3, thirdOpTime, boost::none);
}
@@ -436,7 +437,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN
WriteUnitOfWork wuow(opCtx());
const auto uuid = UUID::gen();
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, {0});
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(sessionId);
sessionTxnRecord.setTxnNum(txnNum);
@@ -455,13 +456,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) {
ASSERT(!txnParticipant.checkStatementExecuted(opCtx(), 1000));
ASSERT(!txnParticipant.checkStatementExecutedNoOplogEntryFetch(1000));
- const auto firstOpTime = writeTxnRecord(txnNum, 1000, {}, boost::none);
+ const auto firstOpTime = writeTxnRecord(txnNum, {1000}, {}, boost::none);
ASSERT(txnParticipant.checkStatementExecuted(opCtx(), 1000));
ASSERT(txnParticipant.checkStatementExecutedNoOplogEntryFetch(1000));
ASSERT(!txnParticipant.checkStatementExecuted(opCtx(), 2000));
ASSERT(!txnParticipant.checkStatementExecutedNoOplogEntryFetch(2000));
- writeTxnRecord(txnNum, 2000, firstOpTime, boost::none);
+ writeTxnRecord(txnNum, {2000}, firstOpTime, boost::none);
ASSERT(txnParticipant.checkStatementExecuted(opCtx(), 2000));
ASSERT(txnParticipant.checkStatementExecutedNoOplogEntryFetch(2000));
@@ -500,7 +501,7 @@ DEATH_TEST_REGEX_F(
{
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, {0});
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(sessionId);
@@ -514,7 +515,7 @@ DEATH_TEST_REGEX_F(
{
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum - 1, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum - 1, {0});
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(sessionId);
@@ -538,7 +539,7 @@ DEATH_TEST_REGEX_F(
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto uuid = UUID::gen();
- const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, {0});
txnParticipant.invalidate(opCtx());
@@ -565,7 +566,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru
BSON("x" << 0), // o
osi, // session info
Date_t::now(), // wall clock time
- 0, // statement id
+ {0}, // statement ids
boost::none); // optime of previous write within same transaction
// Intentionally skip writing the oplog entry for statement 0, so that it appears as if the
@@ -577,7 +578,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru
BSON("x" << 1), // o
osi, // session info
Date_t::now(), // wall clock time
- 1, // statement id
+ {1}, // statement ids
entry0.getOpTime()); // optime of previous write within same transaction
insertOplogEntry(entry1);
@@ -587,7 +588,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru
BSON("x" << 2), // o
osi, // session info
Date_t::now(), // wall clock time
- 2, // statement id
+ {2}, // statement ids
entry1.getOpTime()); // optime of previous write within same transaction
insertOplogEntry(entry2);
@@ -638,7 +639,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
oplogEntry.setObject(BSON("x" << 1));
oplogEntry.setObject2(TransactionParticipant::kDeadEndSentinel);
oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
- oplogEntry.setStatementId(1);
+ oplogEntry.setStatementIds({1});
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
@@ -664,7 +665,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
oplogEntry.setObject({});
oplogEntry.setObject2(TransactionParticipant::kDeadEndSentinel);
oplogEntry.setPrevWriteOpTimeInTransaction(firstOpTime);
- oplogEntry.setStatementId(kIncompleteHistoryStmtId);
+ oplogEntry.setStatementIds({kIncompleteHistoryStmtId});
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
@@ -736,7 +737,7 @@ TEST_F(ShardTxnParticipantRetryableWritesTest,
{
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, {0});
SessionTxnRecord sessionTxnRecord;
sessionTxnRecord.setSessionId(sessionId);
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 7545024ca5f..5880f41783b 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -74,7 +74,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
BSONObj object,
OperationSessionInfo sessionInfo,
Date_t wallClockTime,
- boost::optional<StmtId> stmtId,
+ const std::vector<StmtId>& stmtIds,
boost::optional<repl::OpTime> prevWriteOpTimeInTransaction) {
return repl::DurableOplogEntry(
opTime, // optime
@@ -89,7 +89,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
sessionInfo, // sessionInfo
boost::none, // upsert
wallClockTime, // wall clock time
- stmtId, // statement id
+ stmtIds, // statement ids
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index 6d74a23e8d0..2fb3df60f21 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -78,7 +78,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
{}, // sessionInfo
boost::none, // upsert
Date_t(), // wall clock time
- boost::none, // statement id
+ {}, // statement ids
boost::none, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none, // post-image optime