diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-27 14:13:12 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-10-30 12:59:47 -0400 |
commit | 350ee88b33f32b179b636f33b7db5b0c03932d24 (patch) | |
tree | 173c37cae94565947d25632051e900724592d1e3 | |
parent | 23ddb651c92d0310e8eddb12e89116463fa4ca8b (diff) | |
download | mongo-350ee88b33f32b179b636f33b7db5b0c03932d24.tar.gz |
SERVER-31566 Handle truncated oplog at session load time
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 89 | ||||
-rw-r--r-- | src/mongo/db/session.h | 18 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 61 |
4 files changed, 156 insertions, 45 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 125c7dee256..53b6f9e4b9a 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -248,15 +248,17 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); - if (stmtId != kIncompleteHistoryStmtId) { - try { - if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { - return lastResult; - } - } catch (const DBException& excep) { - if (excep.code() != ErrorCodes::IncompleteTransactionHistory) { - throw; - } + try { + if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { + return lastResult; + } + } catch (const DBException& ex) { + if (ex.code() != ErrorCodes::IncompleteTransactionHistory) { + throw; + } + + if (stmtId == kIncompleteHistoryStmtId) { + return lastResult; } } @@ -303,7 +305,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the // next oplog will contain the real operation if (!result.isPrePostImage) { - scopedSession->onWriteOpCompletedOnPrimary( + scopedSession->onMigrateCompletedOnPrimary( opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime()); } @@ -440,15 +442,16 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } catch (const DBException& excep) { if (excep.code() == ErrorCodes::ConflictingOperationInProgress || excep.code() == ErrorCodes::TransactionTooOld) { - // This means that the server has a newer txnNumber than the oplog being - // migrated, so just skip it. + // This means that the server has a newer txnNumber than the oplog being migrated, + // so just skip it. continue; } if (excep.code() == ErrorCodes::CommandNotFound) { - // TODO: remove this after v3.7. - // This means that the donor shard is running at an older version so it is safe - // to just end this because there is no session information to transfer. + // TODO: remove this after v3.7 + // + // This means that the donor shard is running at an older version so it is safe to + // just end this because there is no session information to transfer. break; } diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index bf3d2a08afa..e5a76b208ec 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -93,27 +93,36 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime()); while (it.hasNext()) { - const auto entry = it.next(opCtx); - invariant(entry.getStatementId()); + try { + const auto entry = it.next(opCtx); + invariant(entry.getStatementId()); + + if (*entry.getStatementId() == kIncompleteHistoryStmtId) { + // Only the dead end sentinel can have this id for oplog write history + invariant(entry.getObject2()); + invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0); + result.hasIncompleteHistory = true; + continue; + } - if (*entry.getStatementId() == kIncompleteHistoryStmtId) { - // Only the dead end sentinel can have this id for oplog write history - invariant(entry.getObject2()); - invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0); - result.hasIncompleteHistory = true; - continue; - } + const auto insertRes = + result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(opCtx, + lsid, + result.lastTxnRecord->getTxnNum(), + *entry.getStatementId(), + existingOpTime, + entry.getOpTime()); + } + } catch (const DBException& ex) { + if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { + result.hasIncompleteHistory = true; + break; + } - const auto insertRes = - result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(opCtx, - lsid, - result.lastTxnRecord->getTxnNum(), - *entry.getStatementId(), - existingOpTime, - entry.getOpTime()); + throw; } } @@ -231,6 +240,30 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } +void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + + _checkValid(ul); + _checkIsActiveTransaction(ul, txnNumber); + + const auto updateRequest = + _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate); + + ul.unlock(); + + repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); + + updateSessionEntry(opCtx, updateRequest); + _registerUpdateCacheOnCommit( + opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); +} + void Session::updateSessionRecordOnSecondary(OperationContext* opCtx, const SessionTxnRecord& sessionTxnRecord) { invariant(!opCtx->lockState()->isLocked()); @@ -280,17 +313,7 @@ boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationConte StmtId stmtId) const { const auto stmtTimestamp = [&] { stdx::lock_guard<stdx::mutex> lg(_mutex); - auto result = _checkStatementExecuted(lg, txnNumber, stmtId); - - if (!result) { - uassert(ErrorCodes::IncompleteTransactionHistory, - str::stream() << "incomplete history detected for lsid: " << _sessionId.toBSON() - << ", txnNum: " - << txnNumber, - !_hasIncompleteHistory); - } - - return result; + return _checkStatementExecuted(lg, txnNumber, stmtId); }(); if (!stmtTimestamp) @@ -358,6 +381,12 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl, const auto it = _activeTxnCommittedStatements.find(stmtId); if (it == _activeTxnCommittedStatements.end()) { + uassert(ErrorCodes::IncompleteTransactionHistory, + str::stream() << "Incomplete history detected for transaction " << txnNumber + << " on session " + << _sessionId.toBSON(), + !_hasIncompleteHistory); + return boost::none; } diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index fe5dd7a6a2c..409fc2caafd 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -104,6 +104,24 @@ public: Date_t lastStmtIdWriteDate); /** + * Called after an entry for the specified session and transaction has been written to the oplog + * during chunk migration, while the node is still primary. Must be called while the caller is + * still in the oplog write's WUOW. Updates the on-disk state of the session to match the + * specified transaction/opTime and keeps the cached state in sync. + * + * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary + * and doesn't require the session to be checked-out. + * + * Throws if the session has been invalidated or the active transaction number is newer than the + * one specified. + */ + void onMigrateCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate); + + /** * Called after a replication batch has been applied on a secondary node. Keeps the session * transaction entry in sync with the oplog chain which has been written. * diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index daaa09286e5..4f60db84aff 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -356,6 +356,67 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0)); } +TEST_F(SessionTest, IncompleteHistoryDueToOpLogTruncation) { + const auto sessionId = makeLogicalSessionIdForTest(); + const TxnNumber txnNum = 2; + + { + OperationSessionInfo osi; + osi.setSessionId(sessionId); + osi.setTxnNumber(txnNum); + + repl::OplogEntry entry0( + repl::OpTime(Timestamp(100, 0), 0), 0, repl::OpTypeEnum::kInsert, kNss, BSON("x" << 0)); + entry0.setOperationSessionInfo(osi); + entry0.setStatementId(0); + entry0.setWallClockTime(Date_t::now()); + + // Intentionally skip writing the oplog entry for statement 0, so that it appears as if the + // chain of log entries is broken because of oplog truncation + + repl::OplogEntry entry1( + repl::OpTime(Timestamp(100, 1), 0), 0, repl::OpTypeEnum::kInsert, kNss, BSON("x" << 1)); + entry1.setOperationSessionInfo(osi); + entry1.setPrevWriteOpTimeInTransaction(entry0.getOpTime()); + entry1.setStatementId(1); + entry1.setWallClockTime(Date_t::now()); + insertOplogEntry(entry1); + + repl::OplogEntry entry2( + repl::OpTime(Timestamp(100, 2), 0), 0, repl::OpTypeEnum::kInsert, kNss, BSON("x" << 2)); + entry1.setOperationSessionInfo(osi); + entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime()); + entry2.setStatementId(2); + entry2.setWallClockTime(Date_t::now()); + insertOplogEntry(entry2); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] { + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(sessionId); + sessionRecord.setTxnNum(txnNum); + sessionRecord.setLastWriteOpTime(entry2.getOpTime()); + sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); + return sessionRecord.toBSON(); + }()); + } + + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum, 0), + AssertionException, + ErrorCodes::IncompleteTransactionHistory); + ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1)); + ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2)); + + ASSERT_THROWS_CODE(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 0), + AssertionException, + ErrorCodes::IncompleteTransactionHistory); + ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1)); + ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2)); +} + TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 2; |