summaryrefslogtreecommitdiff
path: root/src/mongo/db/session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/session.cpp')
-rw-r--r--src/mongo/db/session.cpp36
1 files changed, 34 insertions, 2 deletions
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 7c66713b13c..8acfb4cb2d0 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -102,6 +102,8 @@ MONGO_FP_DECLARE(onPrimaryTransactionalWrite);
} // namespace
+const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
+
Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
@@ -131,11 +133,21 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
CommittedStatementTimestampMap activeTxnCommittedStatements;
+ bool hasIncompleteHistory = false;
if (lastWrittenTxnRecord) {
auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTime());
while (it.hasNext()) {
const auto entry = it.next(opCtx);
invariant(entry.getStatementId());
+
+ if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
+ // Only the dead end sentinel can have this id for oplog write history.
+ invariant(entry.getObject2());
+ invariant(entry.getObject2()->woCompare(kDeadEndSentinel) == 0);
+ hasIncompleteHistory = true;
+ continue;
+ }
+
const auto insertRes = activeTxnCommittedStatements.emplace(*entry.getStatementId(),
entry.getOpTime());
if (!insertRes.second) {
@@ -152,6 +164,8 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
ul.lock();
+ _hasIncompleteHistory = hasIncompleteHistory;
+
// Protect against concurrent refreshes or invalidations
if (!_isValid && _numInvalidations == numInvalidations) {
_isValid = true;
@@ -232,6 +246,7 @@ void Session::invalidate() {
_activeTxnNumber = kUninitializedTxnNumber;
_activeTxnCommittedStatements.clear();
+ _hasIncompleteHistory = false;
}
repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const {
@@ -250,7 +265,17 @@ boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationConte
StmtId stmtId) const {
const auto stmtTimestamp = [&] {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _checkStatementExecuted(lg, txnNumber, stmtId);
+ auto result = _checkStatementExecuted(lg, txnNumber, stmtId);
+
+ if (!result) {
+ uassert(ErrorCodes::IncompleteTransactionHistory,
+ str::stream() << "incomplete history detected for lsid: " << _sessionId.toBSON()
+ << ", txnNum: "
+ << txnNumber,
+ !_hasIncompleteHistory);
+ }
+
+ return result;
}();
if (!stmtTimestamp)
@@ -289,6 +314,7 @@ void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) {
_activeTxnNumber = txnNumber;
_activeTxnCommittedStatements.clear();
+ _hasIncompleteHistory = false;
}
void Session::_checkValid(WithLock) const {
@@ -316,8 +342,9 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
_checkIsActiveTransaction(wl, txnNumber);
const auto it = _activeTxnCommittedStatements.find(stmtId);
- if (it == _activeTxnCommittedStatements.end())
+ if (it == _activeTxnCommittedStatements.end()) {
return boost::none;
+ }
invariant(_lastWrittenSessionRecord);
invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber);
@@ -396,6 +423,11 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
if (newTxnNumber == _activeTxnNumber) {
for (const auto stmtId : stmtIdsWritten) {
+ if (stmtId == kIncompleteHistoryStmtId) {
+ _hasIncompleteHistory = true;
+ continue;
+ }
+
const auto insertRes =
_activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
if (!insertRes.second) {