summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-27 14:13:12 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-10-30 12:59:47 -0400
commit350ee88b33f32b179b636f33b7db5b0c03932d24 (patch)
tree173c37cae94565947d25632051e900724592d1e3
parent23ddb651c92d0310e8eddb12e89116463fa4ca8b (diff)
downloadmongo-350ee88b33f32b179b636f33b7db5b0c03932d24.tar.gz
SERVER-31566 Handle truncated oplog at session load time
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp33
-rw-r--r--src/mongo/db/session.cpp89
-rw-r--r--src/mongo/db/session.h18
-rw-r--r--src/mongo/db/session_test.cpp61
4 files changed, 156 insertions, 45 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 125c7dee256..53b6f9e4b9a 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -248,15 +248,17 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
- if (stmtId != kIncompleteHistoryStmtId) {
- try {
- if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
- return lastResult;
- }
- } catch (const DBException& excep) {
- if (excep.code() != ErrorCodes::IncompleteTransactionHistory) {
- throw;
- }
+ try {
+ if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
+ return lastResult;
+ }
+ } catch (const DBException& ex) {
+ if (ex.code() != ErrorCodes::IncompleteTransactionHistory) {
+ throw;
+ }
+
+ if (stmtId == kIncompleteHistoryStmtId) {
+ return lastResult;
}
}
@@ -303,7 +305,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
// Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the
// next oplog will contain the real operation
if (!result.isPrePostImage) {
- scopedSession->onWriteOpCompletedOnPrimary(
+ scopedSession->onMigrateCompletedOnPrimary(
opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime());
}
@@ -440,15 +442,16 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service
} catch (const DBException& excep) {
if (excep.code() == ErrorCodes::ConflictingOperationInProgress ||
excep.code() == ErrorCodes::TransactionTooOld) {
- // This means that the server has a newer txnNumber than the oplog being
- // migrated, so just skip it.
+ // This means that the server has a newer txnNumber than the oplog being migrated,
+ // so just skip it.
continue;
}
if (excep.code() == ErrorCodes::CommandNotFound) {
- // TODO: remove this after v3.7.
- // This means that the donor shard is running at an older version so it is safe
- // to just end this because there is no session information to transfer.
+ // TODO: remove this after v3.7
+ //
+ // This means that the donor shard is running at an older version so it is safe to
+ // just end this because there is no session information to transfer.
break;
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index bf3d2a08afa..e5a76b208ec 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -93,27 +93,36 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime());
while (it.hasNext()) {
- const auto entry = it.next(opCtx);
- invariant(entry.getStatementId());
+ try {
+ const auto entry = it.next(opCtx);
+ invariant(entry.getStatementId());
+
+ if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
+ // Only the dead end sentinel can have this id for oplog write history
+ invariant(entry.getObject2());
+ invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0);
+ result.hasIncompleteHistory = true;
+ continue;
+ }
- if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
- // Only the dead end sentinel can have this id for oplog write history
- invariant(entry.getObject2());
- invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0);
- result.hasIncompleteHistory = true;
- continue;
- }
+ const auto insertRes =
+ result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(opCtx,
+ lsid,
+ result.lastTxnRecord->getTxnNum(),
+ *entry.getStatementId(),
+ existingOpTime,
+ entry.getOpTime());
+ }
+ } catch (const DBException& ex) {
+ if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
+ result.hasIncompleteHistory = true;
+ break;
+ }
- const auto insertRes =
- result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(opCtx,
- lsid,
- result.lastTxnRecord->getTxnNum(),
- *entry.getStatementId(),
- existingOpTime,
- entry.getOpTime());
+ throw;
}
}
@@ -231,6 +240,30 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
+void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate) {
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ _checkValid(ul);
+ _checkIsActiveTransaction(ul, txnNumber);
+
+ const auto updateRequest =
+ _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate);
+
+ ul.unlock();
+
+ repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
+
+ updateSessionEntry(opCtx, updateRequest);
+ _registerUpdateCacheOnCommit(
+ opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
+}
+
void Session::updateSessionRecordOnSecondary(OperationContext* opCtx,
const SessionTxnRecord& sessionTxnRecord) {
invariant(!opCtx->lockState()->isLocked());
@@ -280,17 +313,7 @@ boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationConte
StmtId stmtId) const {
const auto stmtTimestamp = [&] {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- auto result = _checkStatementExecuted(lg, txnNumber, stmtId);
-
- if (!result) {
- uassert(ErrorCodes::IncompleteTransactionHistory,
- str::stream() << "incomplete history detected for lsid: " << _sessionId.toBSON()
- << ", txnNum: "
- << txnNumber,
- !_hasIncompleteHistory);
- }
-
- return result;
+ return _checkStatementExecuted(lg, txnNumber, stmtId);
}();
if (!stmtTimestamp)
@@ -358,6 +381,12 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
const auto it = _activeTxnCommittedStatements.find(stmtId);
if (it == _activeTxnCommittedStatements.end()) {
+ uassert(ErrorCodes::IncompleteTransactionHistory,
+ str::stream() << "Incomplete history detected for transaction " << txnNumber
+ << " on session "
+ << _sessionId.toBSON(),
+ !_hasIncompleteHistory);
+
return boost::none;
}
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index fe5dd7a6a2c..409fc2caafd 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -104,6 +104,24 @@ public:
Date_t lastStmtIdWriteDate);
/**
+ * Called after an entry for the specified session and transaction has been written to the oplog
+ * during chunk migration, while the node is still primary. Must be called while the caller is
+ * still in the oplog write's WUOW. Updates the on-disk state of the session to match the
+ * specified transaction/opTime and keeps the cached state in sync.
+ *
+ * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary
+ * and doesn't require the session to be checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number is newer than the
+ * one specified.
+ */
+ void onMigrateCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate);
+
+ /**
* Called after a replication batch has been applied on a secondary node. Keeps the session
* transaction entry in sync with the oplog chain which has been written.
*
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index daaa09286e5..4f60db84aff 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -356,6 +356,67 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0));
}
+TEST_F(SessionTest, IncompleteHistoryDueToOpLogTruncation) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ const TxnNumber txnNum = 2;
+
+ {
+ OperationSessionInfo osi;
+ osi.setSessionId(sessionId);
+ osi.setTxnNumber(txnNum);
+
+ repl::OplogEntry entry0(
+ repl::OpTime(Timestamp(100, 0), 0), 0, repl::OpTypeEnum::kInsert, kNss, BSON("x" << 0));
+ entry0.setOperationSessionInfo(osi);
+ entry0.setStatementId(0);
+ entry0.setWallClockTime(Date_t::now());
+
+ // Intentionally skip writing the oplog entry for statement 0, so that it appears as if the
+ // chain of log entries is broken because of oplog truncation
+
+ repl::OplogEntry entry1(
+ repl::OpTime(Timestamp(100, 1), 0), 0, repl::OpTypeEnum::kInsert, kNss, BSON("x" << 1));
+ entry1.setOperationSessionInfo(osi);
+ entry1.setPrevWriteOpTimeInTransaction(entry0.getOpTime());
+ entry1.setStatementId(1);
+ entry1.setWallClockTime(Date_t::now());
+ insertOplogEntry(entry1);
+
+ repl::OplogEntry entry2(
+ repl::OpTime(Timestamp(100, 2), 0), 0, repl::OpTypeEnum::kInsert, kNss, BSON("x" << 2));
+ entry1.setOperationSessionInfo(osi);
+ entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime());
+ entry2.setStatementId(2);
+ entry2.setWallClockTime(Date_t::now());
+ insertOplogEntry(entry2);
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] {
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(sessionId);
+ sessionRecord.setTxnNum(txnNum);
+ sessionRecord.setLastWriteOpTime(entry2.getOpTime());
+ sessionRecord.setLastWriteDate(*entry2.getWallClockTime());
+ return sessionRecord.toBSON();
+ }());
+ }
+
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum, 0),
+ AssertionException,
+ ErrorCodes::IncompleteTransactionHistory);
+ ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1));
+ ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2));
+
+ ASSERT_THROWS_CODE(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 0),
+ AssertionException,
+ ErrorCodes::IncompleteTransactionHistory);
+ ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1));
+ ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2));
+}
+
TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 2;