From bb12cc699844e8231389a9a878d0b35253cecfa1 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Sat, 6 Apr 2019 09:17:32 -0400 Subject: SERVER-40169 OplogApplier::getNextApplierBatch() groups unprepared commitTransaction oplog entries with CRUD ops This applies changes from commit 3328515f7d80c8cedcaf8c0df83c6effc60330d0. --- src/mongo/db/repl/SConscript | 1 + src/mongo/db/repl/oplog_applier.cpp | 81 ++++++++++++++++++++++++++------ src/mongo/db/repl/oplog_applier_test.cpp | 52 ++++++++++++++++++++ 3 files changed, 119 insertions(+), 15 deletions(-) diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 41b23991fb3..211a8bcb354 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -691,6 +691,7 @@ env.CppUnitTest( 'oplog_applier_test.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/unittest/unittest', 'oplog_application_interface', 'oplog_buffer_blocking_queue', diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 6da2f60940f..0bdcaaaa018 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/sync_tail.h" @@ -128,6 +129,69 @@ void OplogApplier::enqueue(OperationContext* opCtx, _oplogBuffer->pushAllNonBlocking(opCtx, begin, end); } +namespace { + +/** + * Returns whether an oplog entry represents a commitTransaction for a transaction which has not + * been prepared. An entry is an unprepared commit if it has a boolean "prepared" field set to + * false. + */ +bool isUnpreparedCommit(const OplogEntry& entry) { + if (entry.getCommandType() != OplogEntry::CommandType::kCommitTransaction) { + return false; + } + + auto preparedElement = entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName]; + if (!preparedElement.isBoolean()) { + return false; + } + + auto isPrepared = preparedElement.boolean(); + return !isPrepared; +} + +/** + * Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation, + * as opposed to part of a prepared transaction. + */ +bool isUnpreparedApplyOps(const OplogEntry& entry) { + return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare(); +} + +/** + * Returns true if this oplog entry must be processed in its own batch and cannot be grouped with + * other entries. + * + * Commands must be processed one at a time. The exceptions to this are unprepared applyOps, because + * applyOps oplog entries are effectively containers for CRUD operations, and unprepared + * commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to batch + * applyOps commands with CRUD operations when reading from the oplog buffer. + * + * Oplog entries on 'system.views' should also be processed one at a time. View catalog immediately + * reflects changes for each oplog entry so we can see inconsistent view catalog if multiple oplog + * entries on 'system.views' are being applied out of the original order. + * + * Process updates to 'admin.system.version' individually as well so the secondary's FCV when + * processing each operation matches the primary's when committing that operation. + */ +bool mustProcessStandalone(const OplogEntry& entry) { + if (entry.isCommand()) { + if (isUnpreparedCommit(entry)) { + return false; + } else if (isUnpreparedApplyOps(entry)) { + return false; + } + return true; + } else if (entry.getNss().isSystemDotViews()) { + return true; + } else if (entry.getNss().isServerConfigurationCollection()) { + return true; + } + return false; +} + +} // namespace + StatusWith OplogApplier::getNextApplierBatch( OperationContext* opCtx, const BatchLimits& batchLimits) { if (batchLimits.ops == 0) { @@ -149,28 +213,15 @@ StatusWith OplogApplier::getNextApplierBatch( return {ErrorCodes::BadValue, message}; } - // Commands must be processed one at a time. The only exception to this is applyOps because - // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is - // safe to batch applyOps commands with CRUD operations when reading from the oplog buffer. - // - // Oplog entries on 'system.views' should also be processed one at a time. View catalog - // immediately reflects changes for each oplog entry so we can see inconsistent view catalog - // if multiple oplog entries on 'system.views' are being applied out of the original order. - // - // Process updates to 'admin.system.version' individually as well so the secondary's FCV - // when processing each operation matches the primary's when committing that operation. - if ((entry.isCommand() && (entry.getCommandType() != OplogEntry::CommandType::kApplyOps || - entry.shouldPrepare())) || - entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) { + if (mustProcessStandalone(entry)) { if (ops.empty()) { - // Apply commands one-at-a-time. ops.push_back(std::move(entry)); BSONObj opToPopAndDiscard; invariant(_oplogBuffer->tryPop(opCtx, &opToPopAndDiscard)); dassert(ops.back() == OplogEntry(opToPopAndDiscard)); } - // Otherwise, apply what we have so far and come back for the command. + // Otherwise, apply what we have so far and come back for this entry. return std::move(ops); } diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp index 2f5feb6ce27..898a73a46de 100644 --- a/src/mongo/db/repl/oplog_applier_test.cpp +++ b/src/mongo/db/repl/oplog_applier_test.cpp @@ -33,6 +33,7 @@ #include #include +#include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/oplog_applier.h" #include "mongo/db/repl/oplog_buffer_blocking_queue.h" @@ -149,6 +150,34 @@ OplogEntry makeApplyOpsOplogEntry(int t, bool prepare) { prepare); // prepare } +/** + * Generates a commitTransaction oplog entry with the given number used for the timestamp. + */ +OplogEntry makeCommitTransactionOplogEntry(int t, StringData dbName, bool prepared, int count) { + auto nss = NamespaceString(dbName).getCommandNS(); + CommitTransactionOplogObject cmdObj; + cmdObj.setPrepared(prepared); + cmdObj.setCount(count); + BSONObj oField = cmdObj.toBSON(); + return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime + boost::none, // hash + OpTypeEnum::kCommand, // op type + nss, // namespace + boost::none, // uuid + boost::none, // fromMigrate + OplogEntry::kOplogVersion, // version + oField, // o + boost::none, // o2 + {}, // sessionInfo + boost::none, // upsert + Date_t::min() + Seconds(t), // wall clock time + boost::none, // statement id + boost::none, // optime of previous write within same transaction + boost::none, // pre-image optime + boost::none, // post-image optime + boost::none); // prepare +} + /** * Returns string representation of OplogApplier::Operations. */ @@ -222,6 +251,29 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsServerConfigurationOpInOwnBat ASSERT_EQUALS(srcOps[0], batch[0]); } +TEST_F(OplogApplierTest, GetNextApplierBatchReturnsPreparedCommitTransactionOpInOwnBatch) { + OplogApplier::Operations srcOps; + srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, true, 3)); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + + auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(1U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[0], batch[0]); +} + +TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedCommitTransactionOpWithOtherOps) { + OplogApplier::Operations srcOps; + srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, false, 3)); + srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar"))); + _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend()); + + auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits)); + ASSERT_EQUALS(2U, batch.size()) << toString(batch); + ASSERT_EQUALS(srcOps[0], batch[0]); + ASSERT_EQUALS(srcOps[1], batch[1]); +} + } // namespace } // namespace repl } // namespace mongo -- cgit v1.2.1