summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2019-04-06 09:17:32 -0400
committerBenety Goh <benety@mongodb.com>2019-04-06 09:17:49 -0400
commitbb12cc699844e8231389a9a878d0b35253cecfa1 (patch)
tree0aefd7ab9bac6a6eb499b33c475625ca9a54bcf6
parent7a196636445fea6318b22c962b90b2a901a28edd (diff)
downloadmongo-bb12cc699844e8231389a9a878d0b35253cecfa1.tar.gz
SERVER-40169 OplogApplier::getNextApplierBatch() groups unprepared commitTransaction oplog entries with CRUD ops
This applies changes from commit 3328515f7d80c8cedcaf8c0df83c6effc60330d0.
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp81
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp52
3 files changed, 119 insertions, 15 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 41b23991fb3..211a8bcb354 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -691,6 +691,7 @@ env.CppUnitTest(
'oplog_applier_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/commands/txn_cmd_request',
'$BUILD_DIR/mongo/unittest/unittest',
'oplog_application_interface',
'oplog_buffer_blocking_queue',
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 6da2f60940f..0bdcaaaa018 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/sync_tail.h"
@@ -128,6 +129,69 @@ void OplogApplier::enqueue(OperationContext* opCtx,
_oplogBuffer->pushAllNonBlocking(opCtx, begin, end);
}
+namespace {
+
+/**
+ * Returns whether an oplog entry represents a commitTransaction for a transaction which has not
+ * been prepared. An entry is an unprepared commit if it has a boolean "prepared" field set to
+ * false.
+ */
+bool isUnpreparedCommit(const OplogEntry& entry) {
+ if (entry.getCommandType() != OplogEntry::CommandType::kCommitTransaction) {
+ return false;
+ }
+
+ auto preparedElement = entry.getObject()[CommitTransactionOplogObject::kPreparedFieldName];
+ if (!preparedElement.isBoolean()) {
+ return false;
+ }
+
+ auto isPrepared = preparedElement.boolean();
+ return !isPrepared;
+}
+
+/**
+ * Returns whether an oplog entry represents an applyOps which is a self-contained atomic operation,
+ * as opposed to part of a prepared transaction.
+ */
+bool isUnpreparedApplyOps(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kApplyOps && !entry.shouldPrepare();
+}
+
+/**
+ * Returns true if this oplog entry must be processed in its own batch and cannot be grouped with
+ * other entries.
+ *
+ * Commands must be processed one at a time. The exceptions to this are unprepared applyOps, because
+ * applyOps oplog entries are effectively containers for CRUD operations, and unprepared
+ * commitTransaction, because that also expands to CRUD operations. Therefore, it is safe to batch
+ * applyOps commands with CRUD operations when reading from the oplog buffer.
+ *
+ * Oplog entries on 'system.views' should also be processed one at a time. View catalog immediately
+ * reflects changes for each oplog entry so we can see inconsistent view catalog if multiple oplog
+ * entries on 'system.views' are being applied out of the original order.
+ *
+ * Process updates to 'admin.system.version' individually as well so the secondary's FCV when
+ * processing each operation matches the primary's when committing that operation.
+ */
+bool mustProcessStandalone(const OplogEntry& entry) {
+ if (entry.isCommand()) {
+ if (isUnpreparedCommit(entry)) {
+ return false;
+ } else if (isUnpreparedApplyOps(entry)) {
+ return false;
+ }
+ return true;
+ } else if (entry.getNss().isSystemDotViews()) {
+ return true;
+ } else if (entry.getNss().isServerConfigurationCollection()) {
+ return true;
+ }
+ return false;
+}
+
+} // namespace
+
StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
OperationContext* opCtx, const BatchLimits& batchLimits) {
if (batchLimits.ops == 0) {
@@ -149,28 +213,15 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
return {ErrorCodes::BadValue, message};
}
- // Commands must be processed one at a time. The only exception to this is applyOps because
- // applyOps oplog entries are effectively containers for CRUD operations. Therefore, it is
- // safe to batch applyOps commands with CRUD operations when reading from the oplog buffer.
- //
- // Oplog entries on 'system.views' should also be processed one at a time. View catalog
- // immediately reflects changes for each oplog entry so we can see inconsistent view catalog
- // if multiple oplog entries on 'system.views' are being applied out of the original order.
- //
- // Process updates to 'admin.system.version' individually as well so the secondary's FCV
- // when processing each operation matches the primary's when committing that operation.
- if ((entry.isCommand() && (entry.getCommandType() != OplogEntry::CommandType::kApplyOps ||
- entry.shouldPrepare())) ||
- entry.getNss().isSystemDotViews() || entry.getNss().isServerConfigurationCollection()) {
+ if (mustProcessStandalone(entry)) {
if (ops.empty()) {
- // Apply commands one-at-a-time.
ops.push_back(std::move(entry));
BSONObj opToPopAndDiscard;
invariant(_oplogBuffer->tryPop(opCtx, &opToPopAndDiscard));
dassert(ops.back() == OplogEntry(opToPopAndDiscard));
}
- // Otherwise, apply what we have so far and come back for the command.
+ // Otherwise, apply what we have so far and come back for this entry.
return std::move(ops);
}
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index 2f5feb6ce27..898a73a46de 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -33,6 +33,7 @@
#include <limits>
#include <memory>
+#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/oplog_applier.h"
#include "mongo/db/repl/oplog_buffer_blocking_queue.h"
@@ -150,6 +151,34 @@ OplogEntry makeApplyOpsOplogEntry(int t, bool prepare) {
}
/**
+ * Generates a commitTransaction oplog entry with the given number used for the timestamp.
+ */
+OplogEntry makeCommitTransactionOplogEntry(int t, StringData dbName, bool prepared, int count) {
+ auto nss = NamespaceString(dbName).getCommandNS();
+ CommitTransactionOplogObject cmdObj;
+ cmdObj.setPrepared(prepared);
+ cmdObj.setCount(count);
+ BSONObj oField = cmdObj.toBSON();
+ return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime
+ boost::none, // hash
+ OpTypeEnum::kCommand, // op type
+ nss, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ OplogEntry::kOplogVersion, // version
+ oField, // o
+ boost::none, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ Date_t::min() + Seconds(t), // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none, // post-image optime
+ boost::none); // prepare
+}
+
+/**
* Returns string representation of OplogApplier::Operations.
*/
std::string toString(const OplogApplier::Operations& ops) {
@@ -222,6 +251,29 @@ TEST_F(OplogApplierTest, GetNextApplierBatchReturnsServerConfigurationOpInOwnBat
ASSERT_EQUALS(srcOps[0], batch[0]);
}
+TEST_F(OplogApplierTest, GetNextApplierBatchReturnsPreparedCommitTransactionOpInOwnBatch) {
+ OplogApplier::Operations srcOps;
+ srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, true, 3));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(1U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[0], batch[0]);
+}
+
+TEST_F(OplogApplierTest, GetNextApplierBatchGroupsUnpreparedCommitTransactionOpWithOtherOps) {
+ OplogApplier::Operations srcOps;
+ srcOps.push_back(makeCommitTransactionOplogEntry(1, dbName, false, 3));
+ srcOps.push_back(makeInsertOplogEntry(2, NamespaceString(dbName, "bar")));
+ _applier->enqueue(_opCtx.get(), srcOps.cbegin(), srcOps.cend());
+
+ auto batch = unittest::assertGet(_applier->getNextApplierBatch(_opCtx.get(), _limits));
+ ASSERT_EQUALS(2U, batch.size()) << toString(batch);
+ ASSERT_EQUALS(srcOps[0], batch[0]);
+ ASSERT_EQUALS(srcOps[1], batch[1]);
+}
+
} // namespace
} // namespace repl
} // namespace mongo