summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
authorJason Chan <jason.chan@10gen.com>2019-08-02 10:14:00 -0400
committerJason Chan <jason.chan@10gen.com>2019-08-02 10:14:12 -0400
commitebfc0dc2e6f80e61d6b94742a79d1194997a7959 (patch)
treedf49cd31af10c5629c1fdc7239bf4aed09dc3583 /src/mongo/db/op_observer_impl.cpp
parentd2b16396526edd39aa0dcbaf609ffe2b0caa5d5e (diff)
downloadmongo-ebfc0dc2e6f80e61d6b94742a79d1194997a7959.tar.gz
SERVER-41470 Remove FCV checks for multiple-entry transaction format
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp104
1 files changed, 15 insertions, 89 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 23266878f34..ad9c5c2b2b1 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -745,15 +745,11 @@ namespace {
// field. Appends as many operations as possible until either the constructed object exceeds the
// 16MB limit or the maximum number of transaction statements allowed in one entry.
//
-// If 'limitSize' is false, then it attempts to include all given operations, regardless of whether
-// or not they fit. If the ops don't fit, TransactionTooLarge will be thrown in that case.
-//
// Returns an iterator to the first statement that wasn't packed into the applyOps object.
std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApplyOps(
BSONObjBuilder* applyOpsBuilder,
std::vector<repl::ReplOperation>::const_iterator stmtBegin,
- std::vector<repl::ReplOperation>::const_iterator stmtEnd,
- bool limitSize) {
+ std::vector<repl::ReplOperation>::const_iterator stmtEnd) {
std::vector<repl::ReplOperation>::const_iterator stmtIter;
BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd));
@@ -765,11 +761,9 @@ std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApp
// BSON overhead and the other applyOps fields. But if the array with a single operation
// exceeds BSONObjMaxUserSize, we still log it, as a single max-length operation
// should be able to be applied.
- if (limitSize &&
- (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
- (opsArray.arrSize() > 0 &&
- (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) >
- BSONObjMaxUserSize))))
+ if (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
+ (opsArray.arrSize() > 0 &&
+ (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) > BSONObjMaxUserSize)))
break;
opsArray.append(stmt.toBSON());
}
@@ -880,8 +874,8 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
while (stmtsIter != stmts.end()) {
BSONObjBuilder applyOpsBuilder;
- auto nextStmt = packTransactionStatementsForApplyOps(
- &applyOpsBuilder, stmtsIter, stmts.end(), true /* limitSize */);
+ auto nextStmt =
+ packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts.end());
// If we packed the last op, then the next oplog entry we log should be the implicit
// commit or implicit prepare, i.e. we omit the 'partialTxn' field.
@@ -999,41 +993,14 @@ void OpObserverImpl::onUnpreparedTransactionCommit(
return;
repl::OpTime commitOpTime;
- // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in
- // a single call into the OpObserver. Therefore, we store the result here and pass it as an
- // argument.
- const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
- if (fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
- auto txnParticipant = TransactionParticipant::get(opCtx);
- const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime();
- invariant(lastWriteOpTime.isNull());
- MutableOplogEntry oplogEntry;
- oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime);
- oplogEntry.setStatementId(StmtId(0));
-
- BSONObjBuilder applyOpsBuilder;
- // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no longer
- // needed.
- packTransactionStatementsForApplyOps(
- &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */);
- oplogEntry.setObject(applyOpsBuilder.done());
-
- auto txnState = boost::make_optional(
- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42,
- DurableTxnStateEnum::kCommitted);
- commitOpTime = logApplyOpsForTransaction(
- opCtx, &oplogEntry, txnState, boost::none, true /* updateTxnTable */)
- .writeOpTime;
- } else {
- // Reserve all the optimes in advance, so we only need to get the optime mutex once. We
- // reserve enough entries for all statements in the transaction.
- auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size());
- invariant(oplogSlots.size() == statements.size());
-
- // Log in-progress entries for the transaction along with the implicit commit.
- int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false);
- commitOpTime = oplogSlots[numOplogEntries - 1];
- }
+ // Reserve all the optimes in advance, so we only need to get the optime mutex once. We
+ // reserve enough entries for all statements in the transaction.
+ auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size());
+ invariant(oplogSlots.size() == statements.size());
+
+ // Log in-progress entries for the transaction along with the implicit commit.
+ int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false);
+ commitOpTime = oplogSlots[numOplogEntries - 1];
invariant(!commitOpTime.isNull());
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime);
}
@@ -1074,48 +1041,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
return;
}
- // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in
- // a single call into the OpObserver. Therefore, we store the result here and pass it as an
- // argument.
- const auto fcv = serverGlobalParams.featureCompatibility.getVersion();
- if (fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) {
- // We write the oplog entry in a side transaction so that we do not commit the now-prepared
- // transaction.
- // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp
- // and allow this transaction to be continued on failover.
- TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
-
- writeConflictRetry(
- opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
- // Writes to the oplog only require a Global intent lock. Guaranteed by
- // OplogSlotReserver.
- invariant(opCtx->lockState()->isWriteLocked());
-
- WriteUnitOfWork wuow(opCtx);
- auto txnParticipant = TransactionParticipant::get(opCtx);
- const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime();
- invariant(lastWriteOpTime.isNull());
-
- MutableOplogEntry oplogEntry;
- oplogEntry.setOpTime(prepareOpTime);
- oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime);
-
- BSONObjBuilder applyOpsBuilder;
- // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no
- // longer needed.
- packTransactionStatementsForApplyOps(
- &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */);
- applyOpsBuilder.append("prepare", true);
- oplogEntry.setObject(applyOpsBuilder.done());
-
- auto txnState = boost::make_optional(
- fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42,
- DurableTxnStateEnum::kPrepared);
- logApplyOpsForTransaction(
- opCtx, &oplogEntry, txnState, prepareOpTime, true /* updateTxnTable */);
- wuow.commit();
- });
- } else {
+ {
// We should have reserved enough slots.
invariant(reservedSlots.size() >= statements.size());
TransactionParticipant::SideTransactionBlock sideTxn(opCtx);