summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/large_txn_correctness.js55
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp108
-rw-r--r--src/mongo/db/repl/oplog_batcher.cpp17
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp40
-rw-r--r--src/mongo/db/repl/oplog_entry.h16
5 files changed, 223 insertions, 13 deletions
diff --git a/jstests/noPassthrough/large_txn_correctness.js b/jstests/noPassthrough/large_txn_correctness.js
new file mode 100644
index 00000000000..5f240dd3cca
--- /dev/null
+++ b/jstests/noPassthrough/large_txn_correctness.js
@@ -0,0 +1,55 @@
+/**
+ * This test serves to ensure that the oplog batcher behavior correctly processes large transactions
+ * so that it does not cause any correctness problems.
+ *
+ * @tags: [requires_journaling]
+ */
+(function() {
+"use strict";
+load("jstests/core/txns/libs/prepare_helpers.js");
+
+// Declare constants.
+const DB_NAME = "db_large_txn_correctness";
+const COLL_NAME = "db_large_txn_correctness";
+
+// Make a large document of size 'numMB' so that it can easily fill up an oplog entry.
+const makeLargeDoc = numMB => new Array(numMB * 1024 * 1024).join('a');
+
+// Spin up a replica set.
+const replSet = new ReplSetTest({nodes: 1});
+replSet.startSet();
+replSet.initiate();
+const primary = replSet.getPrimary();
+
+const session = primary.startSession();
+
+// Creating a collection so the first test can just test if regular CRUD operations work.
+session.getDatabase(DB_NAME).createCollection(COLL_NAME);
+
+let commitRes;
+
+try {
+ // Perform a large transaction (>16MB) with only CRUD operations to ensure that nothing
+ // fundamental is broken.
+ session.startTransaction();
+ session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
+ session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
+ commitRes = session.commitTransaction_forTesting();
+ assert.eq(1, commitRes.ok);
+
+ // Ensure that the collection has been dropped so that collection creation can be tested
+ // in a txn.
+ session.getDatabase(DB_NAME)[COLL_NAME].drop();
+
+ // Create a large transaction (>16MB) with a command and ensure that it works.
+ session.startTransaction();
+ session.getDatabase(DB_NAME).createCollection(COLL_NAME);
+ session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
+ session.getDatabase(DB_NAME)[COLL_NAME].insert({doc: makeLargeDoc(10)});
+ commitRes = session.commitTransaction_forTesting();
+ assert.eq(1, commitRes.ok);
+} finally {
+ session.endSession();
+ replSet.stopSet();
+}
+})();
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index efd8a75a421..4e8e915db35 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -186,6 +186,73 @@ OplogEntry makeCommitTransactionOplogEntry(int t, StringData dbName, bool prepar
}
/**
+ * Creates oplog entries that are meant to be all parts of a mocked large transaction. This function
+ * does the following:
+ *
+ * 1. If we intend to make the first oplog entry of the transaction, we add a Null prevOptime to
+ * denote that there is no entry that comes before this one. This entry will just be a applyOps.
+ * 2. If we intend to make the last oplog entry of the transaction, then we make a commit oplog
+ * entry.
+ * 3. Otherwise, we create applyOps oplog entries that denote all of the intermediate oplog entries.
+ */
+OplogEntry makeLargeTransactionOplogEntries(
+ int t, bool prepared, bool isFirst, bool isLast, int curr, int count) {
+ auto nss = NamespaceString(NamespaceString::kAdminDb).getCommandNS();
+ OpTime prevWriteOpTime = isFirst ? OpTime() : OpTime(Timestamp(t - 1, 1), 1);
+ BSONObj oField;
+ if (isLast) {
+ // Makes a commit oplog entry if this is the last oplog entry we wish to create.
+ if (prepared) {
+ CommitTransactionOplogObject cmdObj;
+ cmdObj.setCount(count);
+ oField = cmdObj.toBSON();
+ } else {
+ oField = BSON("applyOps" << BSONArray() << "count" << count);
+ }
+ } else {
+ BSONObjBuilder oFieldBuilder;
+ oFieldBuilder.append("applyOps", BSONArray());
+ if (prepared && curr == count - 1) {
+ oFieldBuilder.append("prepare", true);
+ }
+ oFieldBuilder.append("partialTxn", true);
+ oField = oFieldBuilder.obj();
+ }
+ return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime
+ boost::none, // hash
+ OpTypeEnum::kCommand, // op type
+ nss, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ OplogEntry::kOplogVersion, // version
+ oField, // o
+ boost::none, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ Date_t() + Seconds(t), // wall clock time
+ boost::none, // statement id
+ prevWriteOpTime, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+}
+
+/**
+ * Generates a mock large-transaction which has more than one oplog entry.
+ */
+std::vector<OplogEntry> makeMultiEntryTransactionOplogEntries(int t,
+ StringData dbName,
+ bool prepared,
+ int count) {
+ ASSERT_GTE(count, 2);
+ std::vector<OplogEntry> vec;
+ for (int i = 0; i < count; i++) {
+ vec.push_back(makeLargeTransactionOplogEntries(
+ t + i, prepared, i == 0, i == count - 1, i + 1, count));
+ }
+ return vec;
+}
+
+/**
* Returns string representation of std::vector<OplogEntry>.
*/
std::string toString(const std::vector<OplogEntry>& ops) {
@@ -401,6 +468,47 @@ TEST_F(OplogApplierTest,
ASSERT_EQUALS(srcOps[1], batch[0]);
}
+TEST_F(OplogApplierTest, LastOpInLargeTransactionIsProcessedIndividually) {
+ std::vector<OplogEntry> srcOps;
+ srcOps.push_back(makeInsertOplogEntry(1, NamespaceString(dbName, "bar")));
+
+ // Makes entries with ts from range [2, 5).
+ std::vector<OplogEntry> multiEntryTransaction =
+ makeMultiEntryTransactionOplogEntries(2, dbName, /* prepared */ false, /* num entries*/ 3);
+ for (auto entry : multiEntryTransaction) {
+ srcOps.push_back(entry);
+ }
+
+ // Push one extra operation to ensure that the last oplog entry of a large transaction
+ // is processed by itself.
+ srcOps.push_back(makeInsertOplogEntry(5, NamespaceString(dbName, "bar")));
+
+ _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+
+ // Set large enough batch limit to ensure that batcher is not batching because of limit, but
+ // rather because it encountered the final oplog entry of a large transaction.
+ _limits.ops = 10U;
+
+ // First batch: [insert, applyOps, applyOps]
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(3U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[0], batch[0]);
+ ASSERT_EQUALS(srcOps[1], batch[1]);
+ ASSERT_EQUALS(srcOps[2], batch[2]);
+
+ // Second batch: [applyOps]. The last oplog entry of a large transaction must be processed by
+ // itself.
+ batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(1U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[3], batch[0]);
+
+ // Third batch: [insert]. The this confirms that the last oplog entry of a large txn will be
+ // batched individually.
+ batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(1U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[4], batch[0]);
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_batcher.cpp b/src/mongo/db/repl/oplog_batcher.cpp
index 21d154f2e39..83809ce0b81 100644
--- a/src/mongo/db/repl/oplog_batcher.cpp
+++ b/src/mongo/db/repl/oplog_batcher.cpp
@@ -109,23 +109,32 @@ bool isUnpreparedCommit(const OplogEntry& entry) {
*
* Commands, in most cases, must be processed one at a time. The exceptions to this rule are
* unprepared applyOps and unprepared commitTransaction for transactions that only contain CRUD
- * operations. These two cases expand to CRUD operations, which can be safely batched with other
- * CRUD operations. All other command oplog entries, including unprepared applyOps/commitTransaction
- * for transactions that contain commands, must be processed in their own batch.
+ * operations and commands found within large transactions (>16MB). The prior two cases expand to
+ * CRUD operations, which can be safely batched with other CRUD operations. All other command oplog
+ * entries, including unprepared applyOps/commitTransaction for transactions that contain commands,
+ * must be processed in their own batch.
* Note that 'unprepared applyOps' could mean a partial transaction oplog entry, an implicit commit
* applyOps oplog entry, or an atomic applyOps oplog entry outside of a transaction.
*
+ * Command operations inside large transactions do not need to be processed individually as long as
+ * the final oplog entry in the transaction is processed individually, since the operations are not
+ * actually run until the commit operation is reached.
+ *
* Oplog entries on 'system.views' should also be processed one at a time. View catalog immediately
* reflects changes for each oplog entry so we can see inconsistent view catalog if multiple oplog
* entries on 'system.views' are being applied out of the original order.
*
* Process updates to 'admin.system.version' individually as well so the secondary's FCV when
* processing each operation matches the primary's when committing that operation.
+ *
+ * The ends of large transactions (> 16MB) should also be processed immediately on its own in order
+ * to avoid scenarios where parts of the transaction is batched with other operations not in the
+ * transaction.
*/
bool mustProcessIndividually(const OplogEntry& entry) {
if (entry.isCommand()) {
if (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || entry.shouldPrepare() ||
- entry.isTransactionWithCommand()) {
+ entry.isSingleOplogEntryTransactionWithCommand() || entry.isEndOfLargeTransaction()) {
return true;
} else {
// This branch covers unprepared CRUD applyOps and unprepared CRUD commits.
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 0412f6b2362..980630a3579 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -353,16 +353,44 @@ bool OplogEntry::shouldPrepare() const {
getObject()[ApplyOpsCommandInfoBase::kPrepareFieldName].booleanSafe();
}
-bool OplogEntry::isTransactionWithCommand() const {
- auto applyOps = getObject().getField("applyOps");
- if (applyOps.eoo()) {
+bool OplogEntry::isSingleOplogEntryTransaction() const {
+ if (getCommandType() != CommandType::kApplyOps || !getTxnNumber() || !getSessionId() ||
+ getObject()[ApplyOpsCommandInfoBase::kPartialTxnFieldName].booleanSafe()) {
+ return false;
+ }
+ auto prevOptimeOpt = getPrevWriteOpTimeInTransaction();
+ if (!prevOptimeOpt) {
+ // If there is no prevWriteOptime, then this oplog entry is not a part of a transaction.
+ return false;
+ }
+ return prevOptimeOpt->isNull();
+}
+
+bool OplogEntry::isEndOfLargeTransaction() const {
+ if (getCommandType() != CommandType::kApplyOps) {
+ // If the oplog entry is neither commit nor abort, then it must be an applyOps. Otherwise,
+ // it cannot be a termainal oplog entry of a large transaction.
+ return false;
+ }
+ auto prevOptimeOpt = getPrevWriteOpTimeInTransaction();
+ if (!prevOptimeOpt) {
+ // If the oplog entry is neither commit nor abort, then it must be an applyOps. Otherwise,
+ // it cannot be a terminal oplog entry of a large transaction.
return false;
}
- if (!getTxnNumber() || !getSessionId()) {
- // Only transactions can produce applyOps oplog entries with transaction numbers and
- // session IDs.
+ // There should be a previous oplog entry in a multiple oplog entry transaction if this is
+ // supposed to be the last one. The first oplog entry in a large transaction will have a null
+ // ts.
+ return !prevOptimeOpt->isNull() && !isPartialTransaction();
+}
+
+bool OplogEntry::isSingleOplogEntryTransactionWithCommand() const {
+ if (!isSingleOplogEntryTransaction()) {
return false;
}
+ // Since we know that this oplog entry at this point is part of a transaction, we can safely
+ // assume that it has an applyOps field.
+ auto applyOps = getObject().getField("applyOps");
// Iterating through the entire applyOps array is not optimal for performance. A potential
// optimization, if necessary, could be to ensure the primary always constructs applyOps oplog
// entries with commands at the beginning.
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index d17c0a4c512..9f4d6c5ba0c 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -317,6 +317,11 @@ public:
}
/**
+ * Returns whether if the oplog entry is the last applyOps in a multiple-entry transaction.
+ */
+ bool isEndOfLargeTransaction() const;
+
+ /**
* Returns if this is a prepared 'commitTransaction' oplog entry.
*/
bool isPreparedCommit() const {
@@ -334,10 +339,15 @@ public:
}
/**
- * Returns whether the oplog entry represents an applyOps with a commnd inside. This will occur
- * if a multi-document transaction performs a command.
+ * Returns whether the oplog entry represents a single oplog entry transaction.
+ */
+ bool isSingleOplogEntryTransaction() const;
+
+ /**
+ * Returns whether the oplog entry represents an applyOps with a command inside. This is only
+ * for transactions with only one oplog entry.
*/
- bool isTransactionWithCommand() const;
+ bool isSingleOplogEntryTransactionWithCommand() const;
/**
* Returns if the oplog entry is for a CRUD operation.