From 81111ce63ba47253c9bc3b44c84a4f452fff9f90 Mon Sep 17 00:00:00 2001 From: Kaloian Manassiev Date: Mon, 11 Sep 2017 10:47:44 -0400 Subject: SERVER-30325 Simplify session transaction state maintenance This change exposes a single 'onWriteCompleted' method on the Session object, which hides all the concurrency control and the session cache maintenance. --- src/mongo/db/session.cpp | 338 +++++++++++++++++++++++++++++------------------ 1 file changed, 209 insertions(+), 129 deletions(-) (limited to 'src/mongo/db/session.cpp') diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 632125abde6..3f1554907f0 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -32,8 +32,6 @@ #include "mongo/db/session.h" -#include - #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" @@ -41,6 +39,8 @@ #include "mongo/db/operation_context.h" #include "mongo/db/ops/update.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/transaction_history_iterator.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -48,177 +48,155 @@ namespace mongo { namespace { -boost::optional loadSessionRecord(OperationContext* opCtx, - const LogicalSessionId& sessionId) { - DBDirectClient client(opCtx); - Query sessionQuery(BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON())); - auto result = - client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionQuery); - - if (result.isEmpty()) { - return boost::none; - } - - IDLParserErrorContext ctx("parse latest txn record for session"); - return SessionTxnRecord::parse(ctx, result); -} - -} // namespace - -Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} - -void Session::updateSessionRecord(OperationContext* opCtx, - const LogicalSessionId& sessionId, - const TxnNumber& txnNum, - const Timestamp& ts) { - repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); - +void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) { AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); - uassert(40526, + + uassert(40527, str::stream() << "Unable to persist transaction state because the session transaction " "collection is missing. This indicates that the " << NamespaceString::kSessionTransactionsTableNamespace.ns() << " collection has been manually deleted.", - autoColl.getCollection() != nullptr); + autoColl.getCollection()); - UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); - updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON())); - updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName - << txnNum - << SessionTxnRecord::kLastWriteOpTimeTsFieldName - << ts))); - updateRequest.setUpsert(true); + const auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); - auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); - uassert(40527, - str::stream() << "Failed to update transaction progress for session " << sessionId, - updateResult.numDocsModified >= 1 || !updateResult.upserted.isEmpty()); + if (!updateResult.numDocsModified && updateResult.upserted.isEmpty()) { + throw WriteConflictException(); + } } -void Session::begin(OperationContext* opCtx, const TxnNumber& txnNumber) { +} // namespace + +Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} + +void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { invariant(!opCtx->lockState()->isLocked()); - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + invariant(repl::ReadConcernArgs::get(opCtx).getLevel() == + repl::ReadConcernLevel::kLocalReadConcern); - // Repeats if the generation of the session changes during I/O. - while (true) { - int startGeneration = 0; - boost::optional txnRecord; - { - stdx::lock_guard lg(_mutex); - startGeneration = _generation; - txnRecord = _lastWrittenTxnRecord; - } + stdx::unique_lock ul(_mutex); + + while (!_isValid) { + const int numInvalidations = _numInvalidations; - // Do I/O outside of the lock. - if (!txnRecord) { - txnRecord = loadSessionRecord(opCtx, _sessionId); + ul.unlock(); - // Previous read failed to retrieve the txn record, which means it does not exist yet, - // so create a new entry. - if (!txnRecord) { - updateSessionRecord(opCtx, _sessionId, txnNumber, Timestamp()); - txnRecord = makeSessionTxnRecord(_sessionId, txnNumber, Timestamp()); + const auto lastWrittenTxnRecord = [&]() -> boost::optional { + DBDirectClient client(opCtx); + auto result = client.findOne( + NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON())}); + if (result.isEmpty()) { + return boost::none; } - stdx::lock_guard lg(_mutex); - _lastWrittenTxnRecord = txnRecord; - } + return SessionTxnRecord::parse( + IDLParserErrorContext("parse latest txn record for session"), result); + }(); - uassert(40528, - str::stream() << "cannot start transaction with id " << txnNumber << " on session " - << _sessionId - << " because transaction with id " - << txnRecord->getTxnNum() - << " already started", - txnRecord->getTxnNum() <= txnNumber); - - if (txnNumber > txnRecord->getTxnNum()) { - updateSessionRecord(opCtx, _sessionId, txnNumber, Timestamp()); - txnRecord->setTxnNum(txnNumber); - txnRecord->setLastWriteOpTimeTs(Timestamp()); - } + ul.lock(); - { - stdx::lock_guard lg(_mutex); + // Protect against concurrent refreshes or invalidations + if (!_isValid && _numInvalidations == numInvalidations) { + _isValid = true; + _lastWrittenSessionRecord = std::move(lastWrittenTxnRecord); - // Reload if the session was modified since the beginning of this loop, e.g. by - // rollback. - if (startGeneration != _generation) { - _lastWrittenTxnRecord.reset(); - continue; + if (_lastWrittenSessionRecord) { + _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); } - _lastWrittenTxnRecord = std::move(txnRecord); - return; + break; } } } -void Session::saveTxnProgress(OperationContext* opCtx, Timestamp opTimeTs) { - // Needs to be in the same write unit of work with the write for this result. +void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) { + invariant(!opCtx->lockState()->isLocked()); + + stdx::lock_guard lg(_mutex); + _beginTxn(lg, txnNumber); +} + +void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector stmtIdsWritten, + Timestamp newLastWriteTs) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); - repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); - AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); - auto coll = autoColl.getCollection(); + stdx::unique_lock ul(_mutex); + _checkValid(ul); + _checkIsActiveTransaction(ul, txnNumber); - uassert(40529, - str::stream() << "Unable to persist transaction state because the session transaction " - "collection is missing. This indicates that the " - << NamespaceString::kSessionTransactionsTableNamespace.ns() - << " collection has been manually deleted.", - coll); + const auto updateRequest = _makeUpdateRequest(ul, txnNumber, newLastWriteTs); - UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); - updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName - << _sessionId.toBSON() - << SessionTxnRecord::kTxnNumFieldName - << getTxnNum())); - updateRequest.setUpdates( - BSON("$set" << BSON(SessionTxnRecord::kLastWriteOpTimeTsFieldName << opTimeTs))); - updateRequest.setUpsert(false); - - auto updateResult = update(opCtx, autoColl.getDb(), updateRequest); - uassert(40530, - str::stream() << "Failed to update transaction progress for session " << _sessionId, - updateResult.numDocsModified >= 1); + ul.unlock(); - stdx::lock_guard lg(_mutex); - _lastWrittenTxnRecord->setLastWriteOpTimeTs(opTimeTs); + repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); + + updateSessionEntry(opCtx, updateRequest); + _registerUpdateCacheOnCommit(opCtx, txnNumber, std::move(stmtIdsWritten), newLastWriteTs); } -TxnNumber Session::getTxnNum() const { - stdx::lock_guard lg(_mutex); - invariant(_lastWrittenTxnRecord); - return _lastWrittenTxnRecord->getTxnNum(); +void Session::updateSessionRecordOnSecondary(OperationContext* opCtx, + const SessionTxnRecord& sessionTxnRecord) { + invariant(!opCtx->lockState()->isLocked()); + + writeConflictRetry( + opCtx, "Update session txn", NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] { + UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); + updateRequest.setUpsert(true); + updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName + << sessionTxnRecord.getSessionId().toBSON())); + updateRequest.setUpdates(sessionTxnRecord.toBSON()); + + repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); + + Lock::DBLock configDBLock(opCtx, NamespaceString::kConfigDb, MODE_IX); + WriteUnitOfWork wuow(opCtx); + updateSessionEntry(opCtx, updateRequest); + wuow.commit(); + }); } -Timestamp Session::getLastWriteOpTimeTs() const { +void Session::invalidate() { stdx::lock_guard lg(_mutex); - invariant(_lastWrittenTxnRecord); - return _lastWrittenTxnRecord->getLastWriteOpTimeTs(); -} + _isValid = false; + _numInvalidations++; + + _lastWrittenSessionRecord.reset(); -TransactionHistoryIterator Session::getWriteHistory(OperationContext* opCtx) const { - return TransactionHistoryIterator(getLastWriteOpTimeTs()); + _activeTxnNumber = kUninitializedTxnNumber; } -void Session::reset() { +Timestamp Session::getLastWriteOpTimeTs(TxnNumber txnNumber) const { stdx::lock_guard lg(_mutex); - _lastWrittenTxnRecord.reset(); - _generation += 1; + _checkValid(lg); + _checkIsActiveTransaction(lg, txnNumber); + + if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) + return Timestamp(); + + return _lastWrittenSessionRecord->getLastWriteOpTimeTs(); } boost::optional Session::checkStatementExecuted(OperationContext* opCtx, - StmtId stmtId) { - if (!opCtx->getTxnNumber()) { + TxnNumber txnNumber, + StmtId stmtId) const { + stdx::unique_lock ul(_mutex); + _checkValid(ul); + _checkIsActiveTransaction(ul, txnNumber); + + if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) return boost::none; - } - auto it = getWriteHistory(opCtx); + auto it = TransactionHistoryIterator(_lastWrittenSessionRecord->getLastWriteOpTimeTs()); + + ul.unlock(); + while (it.hasNext()) { - auto entry = it.next(opCtx); - if (entry.getStatementId() == stmtId) { + const auto entry = it.next(opCtx); + invariant(entry.getStatementId()); + if (*entry.getStatementId() == stmtId) { return entry; } } @@ -226,4 +204,106 @@ boost::optional Session::checkStatementExecuted(OperationConte return boost::none; } +void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) { + _checkValid(wl); + + uassert(ErrorCodes::TransactionTooOld, + str::stream() << "Cannot start transaction " << txnNumber << " on session " + << getSessionId() + << " because a newer transaction " + << _activeTxnNumber + << " has already started.", + txnNumber >= _activeTxnNumber); + + // Check for continuing an existing transaction + if (txnNumber == _activeTxnNumber) + return; + + _activeTxnNumber = txnNumber; +} + +void Session::_checkValid(WithLock) const { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Session " << getSessionId() + << " was concurrently modified and the operation must be retried.", + _isValid); +} + +void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Cannot perform retryability check for transaction " << txnNumber + << " on session " + << getSessionId() + << " because a different transaction " + << _activeTxnNumber + << " is now active.", + txnNumber == _activeTxnNumber); +} + +UpdateRequest Session::_makeUpdateRequest(WithLock, + TxnNumber newTxnNumber, + Timestamp newLastWriteTs) const { + UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); + updateRequest.setUpsert(true); + + if (_lastWrittenSessionRecord) { + updateRequest.setQuery(_lastWrittenSessionRecord->toBSON()); + updateRequest.setUpdates( + BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName + << newTxnNumber + << SessionTxnRecord::kLastWriteOpTimeTsFieldName + << newLastWriteTs))); + } else { + const auto updateBSON = [&] { + SessionTxnRecord newTxnRecord; + newTxnRecord.setSessionId(_sessionId); + newTxnRecord.setTxnNum(newTxnNumber); + newTxnRecord.setLastWriteOpTimeTs(newLastWriteTs); + return newTxnRecord.toBSON(); + }(); + + updateRequest.setQuery(updateBSON); + updateRequest.setUpdates(updateBSON); + } + + return updateRequest; +} + +void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, + TxnNumber newTxnNumber, + std::vector stmtIdsWritten, + Timestamp newLastWriteTs) { + opCtx->recoveryUnit()->onCommit( + [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), newLastWriteTs ] { + stdx::lock_guard lg(_mutex); + + if (!_isValid) + return; + + if (newTxnNumber < _activeTxnNumber) + return; + + // This call is necessary in order to advance the txn number and reset the cached state + // in the case where just before the storage transaction commits, the cache entry gets + // invalidated and immediately refreshed while there were no writes for newTxnNumber + // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to + // update the cache even though the write was successful. + _beginTxn(lg, newTxnNumber); + + if (!_lastWrittenSessionRecord) { + _lastWrittenSessionRecord.emplace(); + + _lastWrittenSessionRecord->setSessionId(_sessionId); + _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + _lastWrittenSessionRecord->setLastWriteOpTimeTs(newLastWriteTs); + } else { + if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) + _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + + if (newLastWriteTs > _lastWrittenSessionRecord->getLastWriteOpTimeTs()) + _lastWrittenSessionRecord->setLastWriteOpTimeTs(newLastWriteTs); + } + }); +} + } // namespace mongo -- cgit v1.2.1