diff options
author | Jason Chan <jason.chan@10gen.com> | 2019-08-02 10:14:00 -0400 |
---|---|---|
committer | Jason Chan <jason.chan@10gen.com> | 2019-08-02 10:14:12 -0400 |
commit | ebfc0dc2e6f80e61d6b94742a79d1194997a7959 (patch) | |
tree | df49cd31af10c5629c1fdc7239bf4aed09dc3583 /src/mongo/db/op_observer_impl.cpp | |
parent | d2b16396526edd39aa0dcbaf609ffe2b0caa5d5e (diff) | |
download | mongo-ebfc0dc2e6f80e61d6b94742a79d1194997a7959.tar.gz |
SERVER-41470 Remove FCV checks for multiple-entry transaction format
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 104 |
1 files changed, 15 insertions, 89 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 23266878f34..ad9c5c2b2b1 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -745,15 +745,11 @@ namespace { // field. Appends as many operations as possible until either the constructed object exceeds the // 16MB limit or the maximum number of transaction statements allowed in one entry. // -// If 'limitSize' is false, then it attempts to include all given operations, regardless of whether -// or not they fit. If the ops don't fit, TransactionTooLarge will be thrown in that case. -// // Returns an iterator to the first statement that wasn't packed into the applyOps object. std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApplyOps( BSONObjBuilder* applyOpsBuilder, std::vector<repl::ReplOperation>::const_iterator stmtBegin, - std::vector<repl::ReplOperation>::const_iterator stmtEnd, - bool limitSize) { + std::vector<repl::ReplOperation>::const_iterator stmtEnd) { std::vector<repl::ReplOperation>::const_iterator stmtIter; BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd)); @@ -765,11 +761,9 @@ std::vector<repl::ReplOperation>::const_iterator packTransactionStatementsForApp // BSON overhead and the other applyOps fields. But if the array with a single operation // exceeds BSONObjMaxUserSize, we still log it, as a single max-length operation // should be able to be applied. - if (limitSize && - (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry || - (opsArray.arrSize() > 0 && - (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) > - BSONObjMaxUserSize)))) + if (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry || + (opsArray.arrSize() > 0 && + (opsArray.len() + OplogEntry::getDurableReplOperationSize(stmt) > BSONObjMaxUserSize))) break; opsArray.append(stmt.toBSON()); } @@ -880,8 +874,8 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, while (stmtsIter != stmts.end()) { BSONObjBuilder applyOpsBuilder; - auto nextStmt = packTransactionStatementsForApplyOps( - &applyOpsBuilder, stmtsIter, stmts.end(), true /* limitSize */); + auto nextStmt = + packTransactionStatementsForApplyOps(&applyOpsBuilder, stmtsIter, stmts.end()); // If we packed the last op, then the next oplog entry we log should be the implicit // commit or implicit prepare, i.e. we omit the 'partialTxn' field. @@ -999,41 +993,14 @@ void OpObserverImpl::onUnpreparedTransactionCommit( return; repl::OpTime commitOpTime; - // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in - // a single call into the OpObserver. Therefore, we store the result here and pass it as an - // argument. - const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); - if (fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { - auto txnParticipant = TransactionParticipant::get(opCtx); - const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime(); - invariant(lastWriteOpTime.isNull()); - MutableOplogEntry oplogEntry; - oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime); - oplogEntry.setStatementId(StmtId(0)); - - BSONObjBuilder applyOpsBuilder; - // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no longer - // needed. - packTransactionStatementsForApplyOps( - &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */); - oplogEntry.setObject(applyOpsBuilder.done()); - - auto txnState = boost::make_optional( - fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42, - DurableTxnStateEnum::kCommitted); - commitOpTime = logApplyOpsForTransaction( - opCtx, &oplogEntry, txnState, boost::none, true /* updateTxnTable */) - .writeOpTime; - } else { - // Reserve all the optimes in advance, so we only need to get the optime mutex once. We - // reserve enough entries for all statements in the transaction. - auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); - invariant(oplogSlots.size() == statements.size()); - - // Log in-progress entries for the transaction along with the implicit commit. - int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false); - commitOpTime = oplogSlots[numOplogEntries - 1]; - } + // Reserve all the optimes in advance, so we only need to get the optime mutex once. We + // reserve enough entries for all statements in the transaction. + auto oplogSlots = repl::getNextOpTimes(opCtx, statements.size()); + invariant(oplogSlots.size() == statements.size()); + + // Log in-progress entries for the transaction along with the implicit commit. + int numOplogEntries = logOplogEntriesForTransaction(opCtx, statements, oplogSlots, false); + commitOpTime = oplogSlots[numOplogEntries - 1]; invariant(!commitOpTime.isNull()); shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, commitOpTime); } @@ -1074,48 +1041,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, return; } - // As FCV downgrade/upgrade is racey, we want to avoid performing a FCV check multiple times in - // a single call into the OpObserver. Therefore, we store the result here and pass it as an - // argument. - const auto fcv = serverGlobalParams.featureCompatibility.getVersion(); - if (fcv < ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42) { - // We write the oplog entry in a side transaction so that we do not commit the now-prepared - // transaction. - // We write an empty 'applyOps' entry if there were no writes to choose a prepare timestamp - // and allow this transaction to be continued on failover. - TransactionParticipant::SideTransactionBlock sideTxn(opCtx); - - writeConflictRetry( - opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { - // Writes to the oplog only require a Global intent lock. Guaranteed by - // OplogSlotReserver. - invariant(opCtx->lockState()->isWriteLocked()); - - WriteUnitOfWork wuow(opCtx); - auto txnParticipant = TransactionParticipant::get(opCtx); - const auto lastWriteOpTime = txnParticipant.getLastWriteOpTime(); - invariant(lastWriteOpTime.isNull()); - - MutableOplogEntry oplogEntry; - oplogEntry.setOpTime(prepareOpTime); - oplogEntry.setPrevWriteOpTimeInTransaction(lastWriteOpTime); - - BSONObjBuilder applyOpsBuilder; - // TODO(SERVER-41470): Remove limitSize==false once old transaction format is no - // longer needed. - packTransactionStatementsForApplyOps( - &applyOpsBuilder, statements.begin(), statements.end(), false /* limitSize */); - applyOpsBuilder.append("prepare", true); - oplogEntry.setObject(applyOpsBuilder.done()); - - auto txnState = boost::make_optional( - fcv >= ServerGlobalParams::FeatureCompatibility::Version::kUpgradingTo42, - DurableTxnStateEnum::kPrepared); - logApplyOpsForTransaction( - opCtx, &oplogEntry, txnState, prepareOpTime, true /* updateTxnTable */); - wuow.commit(); - }); - } else { + { // We should have reserved enough slots. invariant(reservedSlots.size() >= statements.size()); TransactionParticipant::SideTransactionBlock sideTxn(opCtx); |