diff options
author | Randolph Tan <randolph@10gen.com> | 2020-11-02 21:16:20 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-20 16:55:53 +0000 |
commit | a16b6119d7361395040d40dafdee2c3448081c8c (patch) | |
tree | 20e45de44164767676eb19d2ea3ddb2956af2276 | |
parent | 74fe293b16ae2d84983c32cf7ce6cbe70fa55f7a (diff) | |
download | mongo-a16b6119d7361395040d40dafdee2c3448081c8c.tar.gz |
SERVER-49904 Handle retryable write oplog entries in the resharding oplog applier
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_applier.cpp | 194 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_applier.h | 16 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp | 499 | ||||
-rw-r--r-- | src/mongo/db/s/resharding_txn_cloner.cpp | 2 |
4 files changed, 695 insertions, 16 deletions
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 7b93d3c643f..3eae9fd93e0 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -35,10 +35,15 @@ #include <fmt/format.h> +#include "mongo/base/simple_string_data_comparator.h" #include "mongo/db/catalog/create_collection.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/oplog_applier_utils.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" #include "mongo/logv2/log.h" #include "mongo/logv2/redaction.h" #include "mongo/stdx/mutex.h" @@ -50,6 +55,115 @@ namespace mongo { using namespace fmt::literals; +namespace { + +// Used for marking intermediate oplog entries created by the resharding applier that will require +// special handling in the repl writer thread. These intermediate oplog entries serve as a message +// container and will never be written to an actual collection. +const BSONObj kReshardingOplogTag(BSON("$resharding" << 1)); + +/** + * Writes the oplog entries and updates to config.transactions for enabling retrying the write + * described in the oplog entry. + */ +Status insertOplogAndUpdateConfigForRetryable(OperationContext* opCtx, + const repl::OplogEntry& oplog) { + auto txnNumber = *oplog.getTxnNumber(); + + opCtx->setLogicalSessionId(*oplog.getSessionId()); + opCtx->setTxnNumber(txnNumber); + + MongoDOperationContextSession ocs(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + uassert(4990400, "Failed to get transaction Participant", txnParticipant); + const auto stmtId = *oplog.getStatementId(); + + try { + txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none); + + if (txnParticipant.checkStatementExecuted(opCtx, stmtId)) { + // Skip the incoming statement because it has already been logged locally. + return Status::OK(); + } + } catch (const DBException& ex) { + if (ex.code() == ErrorCodes::TransactionTooOld) { + return Status::OK(); + } else if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { + // If the transaction chain is incomplete because oplog was truncated, just ignore the + // incoming oplog and don't attempt to 'patch up' the missing pieces. + return Status::OK(); + } + + throw; + } + + // TODO: handle pre/post image + + auto rawOplogBSON = oplog.toBSON(); + auto noOpOplog = uassertStatusOK(repl::MutableOplogEntry::parse(rawOplogBSON)); + noOpOplog.setObject2(rawOplogBSON); + noOpOplog.setNss({}); + noOpOplog.setObject(BSON("$reshardingOplogApply" << 1)); + // TODO: link pre/post image + noOpOplog.setPrevWriteOpTimeInTransaction(txnParticipant.getLastWriteOpTime()); + noOpOplog.setOpType(repl::OpTypeEnum::kNoop); + // Reset OpTime so logOp() can assign a new one. + noOpOplog.setOpTime(OplogSlot()); + noOpOplog.setWallClockTime(Date_t::now()); + + writeConflictRetry( + opCtx, + "ReshardingUpdateConfigTransaction", + NamespaceString::kSessionTransactionsTableNamespace.ns(), + [&] { + // Need to take global lock here so repl::logOp will not unlock it and trigger the + // invariant that disallows unlocking global lock while inside a WUOW. Take the + // transaction table db lock to ensure the same lock ordering with normal replicated + // updates to the table. + Lock::DBLock lk( + opCtx, NamespaceString::kSessionTransactionsTableNamespace.db(), MODE_IX); + WriteUnitOfWork wunit(opCtx); + + const auto& oplogOpTime = repl::logOp(opCtx, &noOpOplog); + + uassert(4990402, + str::stream() << "Failed to create new oplog entry for oplog with opTime: " + << noOpOplog.getOpTime().toString() << ": " + << redact(noOpOplog.toBSON()), + !oplogOpTime.isNull()); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(*oplog.getSessionId()); + sessionTxnRecord.setTxnNum(txnNumber); + sessionTxnRecord.setLastWriteOpTime(oplogOpTime); + sessionTxnRecord.setLastWriteDate(noOpOplog.getWallClockTime()); + txnParticipant.onRetryableWriteCloningCompleted(opCtx, {stmtId}, sessionTxnRecord); + + wunit.commit(); + }); + + return Status::OK(); +} + +/** + * Returns true if the given oplog is a special no-op oplog entry that contains the information for + * retryable writes. + */ +bool isRetryableNoOp(const repl::OplogEntryOrGroupedInserts& oplogOrGroupedInserts) { + if (oplogOrGroupedInserts.isGroupedInserts()) { + return false; + } + + const auto& op = oplogOrGroupedInserts.getOp(); + if (op.getOpType() != repl::OpTypeEnum::kNoop) { + return false; + } + + return op.getObject().woCompare(kReshardingOplogTag) == 0; +} + +} // anonymous namespace + ReshardingOplogApplier::ReshardingOplogApplier( ServiceContext* service, ReshardingSourceId sourceId, @@ -218,8 +332,7 @@ void ReshardingOplogApplier::_scheduleNextBatch() { Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx) { // TODO: handle config.transaction updates with derivedOps - std::vector<std::vector<repl::OplogEntry>> derivedOps; - auto writerVectors = _fillWriterVectors(opCtx, &_currentBatchToApply, &derivedOps); + auto writerVectors = _fillWriterVectors(opCtx, &_currentBatchToApply, &_currentDerivedOps); _currentWriterVectors.swap(writerVectors); auto pf = makePromiseFuture<void>(); @@ -249,14 +362,51 @@ Future<void> ReshardingOplogApplier::_applyBatch(OperationContext* opCtx) { return std::move(pf.future); } +repl::OplogEntry convertToNoOpWithReshardingTag(const repl::OplogEntry& oplog) { + return repl::OplogEntry(oplog.getOpTime(), + oplog.getHash(), + repl::OpTypeEnum::kNoop, + oplog.getNss(), + boost::none /* uuid */, + oplog.getFromMigrate(), + oplog.getVersion(), + kReshardingOplogTag, + // Set the o2 field with the original oplog. + oplog.toBSON(), + oplog.getOperationSessionInfo(), + oplog.getUpsert(), + oplog.getWallClockTime(), + oplog.getStatementId(), + oplog.getPrevWriteOpTimeInTransaction(), + oplog.getPreImageOpTime(), + oplog.getPostImageOpTime(), + oplog.getDestinedRecipient(), + oplog.get_id()); +} + +void addDerivedOpsToWriterVector(std::vector<std::vector<const repl::OplogEntry*>>* writerVectors, + const std::vector<repl::OplogEntry>& derivedOps) { + for (auto&& op : derivedOps) { + invariant(op.getObject().woCompare(kReshardingOplogTag) == 0); + uassert(4990403, + "expected resharding derived oplog to have session id: {}"_format( + op.toBSON().toString()), + op.getSessionId()); + + LogicalSessionIdHash hasher; + auto writerId = hasher(*op.getSessionId()) % writerVectors->size(); + (*writerVectors)[writerId].push_back(&op); + } +} + std::vector<std::vector<const repl::OplogEntry*>> ReshardingOplogApplier::_fillWriterVectors( - OperationContext* opCtx, - OplogBatch* batch, - std::vector<std::vector<repl::OplogEntry>>* derivedOps) { + OperationContext* opCtx, OplogBatch* batch, OplogBatch* derivedOps) { std::vector<std::vector<const repl::OplogEntry*>> writerVectors( _writerPool->getStats().numThreads); repl::CachedCollectionProperties collPropertiesCache; + LogicalSessionIdMap<RetryableOpsList> sessionTracker; + for (auto&& op : *batch) { uassert(5012000, "Resharding oplog application does not support prepared transactions.", @@ -268,12 +418,37 @@ std::vector<std::vector<const repl::OplogEntry*>> ReshardingOplogApplier::_fillW if (op.getOpType() == repl::OpTypeEnum::kNoop) continue; - // TODO: handle prePostImageOps. - repl::OplogApplierUtils::addToWriterVector( opCtx, &op, &writerVectors, &collPropertiesCache); + + if (auto sessionId = op.getSessionId()) { + auto& retryableOpList = sessionTracker[*sessionId]; + auto txnNumber = *op.getTxnNumber(); + + if (retryableOpList.txnNum == txnNumber) { + retryableOpList.ops.push_back(&op); + } else if (retryableOpList.txnNum < txnNumber) { + retryableOpList.ops.clear(); + retryableOpList.ops.push_back(&op); + retryableOpList.txnNum = txnNumber; + } else { + uasserted(4990401, + str::stream() << "retryable oplog applier for " << _sourceId.toBSON() + << " encountered out of order txnNum, saw " << op.toBSON() + << " after " << retryableOpList.ops.front()->toBSON()); + } + } } + for (const auto& sessionsToUpdate : sessionTracker) { + for (const auto& op : sessionsToUpdate.second.ops) { + auto noOpWithPrePost = convertToNoOpWithReshardingTag(*op); + derivedOps->push_back(std::move(noOpWithPrePost)); + } + } + + addDerivedOpsToWriterVector(&writerVectors, _currentDerivedOps); + return writerVectors; } @@ -308,6 +483,10 @@ Status ReshardingOplogApplier::_applyOplogEntryOrGroupedInserts( // We don't care about applied stats in resharding. auto incrementOpsAppliedStats = [] {}; + if (isRetryableNoOp(entryOrGroupedInserts)) { + return insertOplogAndUpdateConfigForRetryable(opCtx, entryOrGroupedInserts.getOp()); + } + // We always use oplog application mode 'kInitialSync', because we're applying oplog entries to // a cloned database the way initial sync does. return repl::OplogApplierUtils::applyOplogEntryOrGroupedInsertsCommon(opCtx, @@ -428,6 +607,7 @@ Timestamp ReshardingOplogApplier::_clearAppliedOpsAndStoreProgress(OperationCont << oplogId.toBSON()))); _currentBatchToApply.clear(); + _currentDerivedOps.clear(); return lastAppliedTs; } diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.h b/src/mongo/db/s/resharding/resharding_oplog_applier.h index cbb8fb41d8f..a95bb3e7029 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.h +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.h @@ -85,6 +85,12 @@ private: enum class Stage { kStarted, kErrorOccurred, kReachedCloningTS, kFinished }; + struct RetryableOpsList { + public: + TxnNumber txnNum{kUninitializedTxnNumber}; + std::vector<repl::OplogEntry*> ops; + }; + /** * Schedule to collect and apply the next batch of oplog entries. */ @@ -99,10 +105,9 @@ private: /** * Partition the currently buffered oplog entries so they can be applied in parallel. */ - std::vector<std::vector<const repl::OplogEntry*>> _fillWriterVectors( - OperationContext* opCtx, - OplogBatch* batch, - std::vector<std::vector<repl::OplogEntry>>* derivedOps); + std::vector<std::vector<const repl::OplogEntry*>> _fillWriterVectors(OperationContext* opCtx, + OplogBatch* batch, + OplogBatch* derivedOps); /** * Apply a slice of oplog entries from the current batch for a worker thread. @@ -190,6 +195,9 @@ private: // (R) Buffer for the current batch of oplog entries to apply. OplogBatch _currentBatchToApply; + // (R) Buffer for internally generated oplog entries that needs to be processed for this batch. + OplogBatch _currentDerivedOps; + // (R) A temporary scratch pad that contains pointers to oplog entries in _currentBatchToApply // that is used by the writer vector when applying oplog in parallel. std::vector<std::vector<const repl::OplogEntry*>> _currentWriterVectors; diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index e2a038450c0..d6dd459d982 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -34,13 +34,17 @@ #include <fmt/format.h> #include "mongo/db/catalog/create_collection.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/s/resharding/resharding_donor_oplog_iterator_interface.h" #include "mongo/db/s/resharding/resharding_oplog_applier.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/sharding_mongod_test_fixture.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/logv2/log.h" #include "mongo/unittest/unittest.h" @@ -113,6 +117,15 @@ public: repl::OpTypeEnum opType, const BSONObj& obj1, const boost::optional<BSONObj> obj2) { + return makeOplog(opTime, opType, obj1, obj2, {}, boost::none); + } + + repl::OplogEntry makeOplog(const repl::OpTime& opTime, + repl::OpTypeEnum opType, + const BSONObj& obj1, + const boost::optional<BSONObj> obj2, + const OperationSessionInfo& sessionInfo, + const boost::optional<StmtId>& statementId) { ReshardingDonorOplogId id(opTime.getTimestamp(), opTime.getTimestamp()); return repl::OplogEntry(opTime, boost::none /* hash */, @@ -123,10 +136,10 @@ public: 0 /* version */, obj1, obj2, - {} /* sessionInfo */, + sessionInfo, boost::none /* upsert */, {} /* date */, - boost::none /* statementId */, + statementId, boost::none /* prevWrite */, boost::none /* preImage */, boost::none /* postImage */, @@ -158,7 +171,7 @@ public: return _sourceId; } -private: +protected: static constexpr int kWriterPoolSize = 4; const NamespaceString kOplogNs{"config.localReshardingOplogBuffer.xxx.yyy"}; const NamespaceString kCrudNs{"foo.bar"}; @@ -810,7 +823,6 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) { getExecutor(), writerPool()); - auto future = applier.applyUntilCloneFinishedTs(); future.get(); @@ -829,5 +841,484 @@ TEST_F(ReshardingOplogApplierTest, WriterPoolIsShutDownCatchUpPhase) { ASSERT_EQ(Timestamp(6, 3), progressDoc->getProgress().getTs()); } +class ReshardingOplogApplierRetryableTest : public ReshardingOplogApplierTest { +public: + void setUp() override { + ReshardingOplogApplierTest::setUp(); + + repl::StorageInterface::set(operationContext()->getServiceContext(), + std::make_unique<repl::StorageInterfaceImpl>()); + MongoDSessionCatalog::onStepUp(operationContext()); + } + + static repl::OpTime insertRetryableOplog(OperationContext* opCtx, + const NamespaceString& nss, + UUID uuid, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + StmtId stmtId, + repl::OpTime prevOpTime) { + repl::MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kNoop); + oplogEntry.setNss(nss); + oplogEntry.setUuid(uuid); + oplogEntry.setObject(BSON("TestValue" << 0)); + oplogEntry.setWallClockTime(Date_t::now()); + if (stmtId != kUninitializedStmtId) { + oplogEntry.setSessionId(lsid); + oplogEntry.setTxnNumber(txnNumber); + oplogEntry.setStatementId(stmtId); + oplogEntry.setPrevWriteOpTimeInTransaction(prevOpTime); + } + return repl::logOp(opCtx, &oplogEntry); + } + + void writeTxnRecord(const LogicalSessionId& lsid, + const TxnNumber& txnNum, + StmtId stmtId, + repl::OpTime prevOpTime, + boost::optional<DurableTxnStateEnum> txnState) { + auto newClient = operationContext()->getServiceContext()->makeClient("testWriteTxnRecord"); + AlternativeClientRegion acr(newClient); + auto scopedOpCtx = cc().makeOperationContext(); + auto opCtx = scopedOpCtx.get(); + + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNum); + OperationContextSession scopedSession(opCtx); + + const auto session = OperationContextSession::get(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.refreshFromStorageIfNeeded(opCtx); + txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none); + + AutoGetCollection autoColl(opCtx, kCrudNs, MODE_IX); + WriteUnitOfWork wuow(opCtx); + const auto opTime = insertRetryableOplog( + opCtx, kCrudNs, kCrudUUID, session->getSessionId(), txnNum, stmtId, prevOpTime); + + SessionTxnRecord sessionTxnRecord; + sessionTxnRecord.setSessionId(session->getSessionId()); + sessionTxnRecord.setTxnNum(txnNum); + sessionTxnRecord.setLastWriteOpTime(opTime); + sessionTxnRecord.setLastWriteDate(Date_t::now()); + sessionTxnRecord.setState(txnState); + txnParticipant.onWriteOpCompletedOnPrimary(opCtx, {stmtId}, sessionTxnRecord); + wuow.commit(); + } + + bool isWriteAlreadyExecuted(const OperationSessionInfo& session, StmtId stmtId) { + auto newClient = + operationContext()->getServiceContext()->makeClient("testCheckStmtExecuted"); + AlternativeClientRegion acr(newClient); + auto scopedOpCtx = cc().makeOperationContext(); + auto opCtx = scopedOpCtx.get(); + + opCtx->setLogicalSessionId(*session.getSessionId()); + OperationContextSession scopedSession(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.refreshFromStorageIfNeeded(opCtx); + txnParticipant.beginOrContinue(opCtx, *session.getTxnNumber(), boost::none, boost::none); + + return txnParticipant.checkStatementExecuted(opCtx, stmtId).is_initialized(); + } +}; + +TEST_F(ReshardingOplogApplierRetryableTest, CrudWithEmptyConfigTransactions) { + std::queue<repl::OplogEntry> crudOps; + + OperationSessionInfo session1; + session1.setSessionId(makeLogicalSessionIdForTest()); + session1.setTxnNumber(1); + + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + session1, + 1)); + crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none, + session1, + 2)); + + OperationSessionInfo session2; + session2.setSessionId(makeLogicalSessionIdForTest()); + session2.setTxnNumber(1); + + crudOps.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1), + repl::OpTypeEnum::kUpdate, + BSON("$set" << BSON("x" << 1)), + BSON("_id" << 2), + session2, + 1)); + + OperationSessionInfo session3; + session3.setSessionId(makeLogicalSessionIdForTest()); + session3.setTxnNumber(1); + + crudOps.push(makeOplog(repl::OpTime(Timestamp(8, 3), 1), + repl::OpTypeEnum::kDelete, + BSON("_id" << 1), + boost::none, + session3, + 1)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + future = applier.applyUntilDone(); + future.get(); + + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSONObj(), doc); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 2 << "x" << 1), doc); + + auto progressDoc = ReshardingOplogApplier::checkStoredProgress(operationContext(), sourceId()); + ASSERT_TRUE(progressDoc); + ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getClusterTime()); + ASSERT_EQ(Timestamp(8, 3), progressDoc->getProgress().getTs()); + + ASSERT_TRUE(isWriteAlreadyExecuted(session1, 1)); + ASSERT_TRUE(isWriteAlreadyExecuted(session1, 2)); + ASSERT_TRUE(isWriteAlreadyExecuted(session2, 1)); + ASSERT_TRUE(isWriteAlreadyExecuted(session3, 1)); + + ASSERT_FALSE(isWriteAlreadyExecuted(session2, 2)); + ASSERT_FALSE(isWriteAlreadyExecuted(session3, 2)); +} + +TEST_F(ReshardingOplogApplierRetryableTest, MultipleTxnSameLsidInOneBatch) { + std::queue<repl::OplogEntry> crudOps; + + OperationSessionInfo session1; + session1.setSessionId(makeLogicalSessionIdForTest()); + session1.setTxnNumber(1); + + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + session1, + 1)); + crudOps.push(makeOplog(repl::OpTime(Timestamp(6, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 2), + boost::none, + session1, + 2)); + + OperationSessionInfo session2; + session2.setSessionId(makeLogicalSessionIdForTest()); + session2.setTxnNumber(1); + + crudOps.push(makeOplog(repl::OpTime(Timestamp(7, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 3), + boost::none, + session2, + 1)); + + session1.setTxnNumber(2); + + crudOps.push(makeOplog(repl::OpTime(Timestamp(8, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 4), + boost::none, + session1, + 21)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + future = applier.applyUntilDone(); + future.get(); + + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 2)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 2), doc); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 3)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 3), doc); + + doc = client.findOne(appliedToNs().ns(), BSON("_id" << 4)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 4), doc); + + ASSERT_TRUE(isWriteAlreadyExecuted(session1, 21)); + ASSERT_TRUE(isWriteAlreadyExecuted(session2, 1)); +} + +TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxn) { + auto lsid = makeLogicalSessionIdForTest(); + + writeTxnRecord(lsid, 2, 1, {}, boost::none); + + std::queue<repl::OplogEntry> crudOps; + + OperationSessionInfo session; + session.setSessionId(lsid); + session.setTxnNumber(5); + + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + session, + 21)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + future = applier.applyUntilDone(); + future.get(); + + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + ASSERT_TRUE(isWriteAlreadyExecuted(session, 21)); +} + +TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithHigherExistingTxnNum) { + auto lsid = makeLogicalSessionIdForTest(); + const TxnNumber existingTxnNum = 20; + const StmtId existingStmtId = 1; + writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none); + + OperationSessionInfo session; + const TxnNumber incomingTxnNum = 15; + const StmtId incomingStmtId = 21; + session.setSessionId(lsid); + session.setTxnNumber(incomingTxnNum); + + std::queue<repl::OplogEntry> crudOps; + + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + session, + incomingStmtId)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + future = applier.applyUntilDone(); + future.get(); + + // Op should always be applied, even if session info was not compatible. + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + ASSERT_THROWS_CODE(isWriteAlreadyExecuted(session, incomingStmtId), + DBException, + ErrorCodes::TransactionTooOld); + + // Check that original txn info is intact. + OperationSessionInfo origSession; + origSession.setSessionId(lsid); + origSession.setTxnNumber(existingTxnNum); + + ASSERT_TRUE(isWriteAlreadyExecuted(origSession, existingStmtId)); +} + +TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithLowerExistingTxnNum) { + auto lsid = makeLogicalSessionIdForTest(); + const TxnNumber existingTxnNum = 20; + const StmtId existingStmtId = 1; + writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none); + + OperationSessionInfo session; + const TxnNumber incomingTxnNum = 25; + const StmtId incomingStmtId = 21; + session.setSessionId(lsid); + session.setTxnNumber(incomingTxnNum); + + std::queue<repl::OplogEntry> crudOps; + + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + session, + incomingStmtId)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + future = applier.applyUntilDone(); + future.get(); + + // Op should always be applied, even if session info was not compatible. + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + ASSERT_TRUE(isWriteAlreadyExecuted(session, incomingStmtId)); +} + +TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithEqualExistingTxnNum) { + auto lsid = makeLogicalSessionIdForTest(); + const TxnNumber existingTxnNum = 20; + const StmtId existingStmtId = 1; + writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none); + + OperationSessionInfo session; + const TxnNumber incomingTxnNum = existingTxnNum; + const StmtId incomingStmtId = 21; + session.setSessionId(lsid); + session.setTxnNumber(incomingTxnNum); + + std::queue<repl::OplogEntry> crudOps; + + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + session, + incomingStmtId)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + future = applier.applyUntilDone(); + future.get(); + + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + ASSERT_TRUE(isWriteAlreadyExecuted(session, incomingStmtId)); + ASSERT_TRUE(isWriteAlreadyExecuted(session, existingStmtId)); +} + +TEST_F(ReshardingOplogApplierRetryableTest, RetryableWithStmtIdAlreadyExecuted) { + auto lsid = makeLogicalSessionIdForTest(); + const TxnNumber existingTxnNum = 20; + const StmtId existingStmtId = 1; + writeTxnRecord(lsid, existingTxnNum, existingStmtId, {}, boost::none); + + OperationSessionInfo session; + const TxnNumber incomingTxnNum = existingTxnNum; + const StmtId incomingStmtId = existingStmtId; + session.setSessionId(lsid); + session.setTxnNumber(incomingTxnNum); + + std::queue<repl::OplogEntry> crudOps; + + crudOps.push(makeOplog(repl::OpTime(Timestamp(5, 3), 1), + repl::OpTypeEnum::kInsert, + BSON("_id" << 1), + boost::none, + session, + incomingStmtId)); + + auto iterator = std::make_unique<OplogIteratorMock>(std::move(crudOps)); + ReshardingOplogApplier applier(getServiceContext(), + sourceId(), + oplogNs(), + crudNs(), + crudUUID(), + Timestamp(6, 3), + std::move(iterator), + 2 /* batchSize */, + getExecutor(), + writerPool()); + + auto future = applier.applyUntilCloneFinishedTs(); + future.get(); + + future = applier.applyUntilDone(); + future.get(); + + DBDirectClient client(operationContext()); + auto doc = client.findOne(appliedToNs().ns(), BSON("_id" << 1)); + ASSERT_BSONOBJ_EQ(BSON("_id" << 1), doc); + + ASSERT_TRUE(isWriteAlreadyExecuted(session, incomingStmtId)); +} + } // unnamed namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding_txn_cloner.cpp b/src/mongo/db/s/resharding_txn_cloner.cpp index 8fb88ac3437..893d6d7d252 100644 --- a/src/mongo/db/s/resharding_txn_cloner.cpp +++ b/src/mongo/db/s/resharding_txn_cloner.cpp @@ -179,8 +179,8 @@ void configTxnsMergerForResharding(OperationContext* opCtx, BSONObj donorBsonTra auto ocs = std::make_unique<MongoDOperationContextSession>(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - // Which error code should this be? what message? uassert(4989900, "Failed to get transaction Participant", txnParticipant); + try { txnParticipant.beginOrContinue( opCtx, donorTransaction.getTxnNum(), boost::none, boost::none); |