summaryrefslogtreecommitdiff
path: root/src/mongo/db/session.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-11 10:47:44 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-13 07:40:43 -0400
commit81111ce63ba47253c9bc3b44c84a4f452fff9f90 (patch)
tree6f9f7f88869a7de3b6013482d2f571c0b107e7a9 /src/mongo/db/session.cpp
parentbf8ae1b3edbf7470e92b7407f76a042cc2246d48 (diff)
downloadmongo-81111ce63ba47253c9bc3b44c84a4f452fff9f90.tar.gz
SERVER-30325 Simplify session transaction state maintenance
This change exposes a single 'onWriteCompleted' method on the Session object, which hides all the concurrency control and the session cache maintenance.
Diffstat (limited to 'src/mongo/db/session.cpp')
-rw-r--r--src/mongo/db/session.cpp338
1 files changed, 209 insertions, 129 deletions
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 632125abde6..3f1554907f0 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -32,8 +32,6 @@
#include "mongo/db/session.h"
-#include <vector>
-
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
@@ -41,6 +39,8 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/query/get_executor.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -48,177 +48,155 @@
namespace mongo {
namespace {
-boost::optional<SessionTxnRecord> loadSessionRecord(OperationContext* opCtx,
- const LogicalSessionId& sessionId) {
- DBDirectClient client(opCtx);
- Query sessionQuery(BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON()));
- auto result =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionQuery);
-
- if (result.isEmpty()) {
- return boost::none;
- }
-
- IDLParserErrorContext ctx("parse latest txn record for session");
- return SessionTxnRecord::parse(ctx, result);
-}
-
-} // namespace
-
-Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
-
-void Session::updateSessionRecord(OperationContext* opCtx,
- const LogicalSessionId& sessionId,
- const TxnNumber& txnNum,
- const Timestamp& ts) {
- repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
-
+void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) {
AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX);
- uassert(40526,
+
+ uassert(40527,
str::stream() << "Unable to persist transaction state because the session transaction "
"collection is missing. This indicates that the "
<< NamespaceString::kSessionTransactionsTableNamespace.ns()
<< " collection has been manually deleted.",
- autoColl.getCollection() != nullptr);
+ autoColl.getCollection());
- UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
- updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << sessionId.toBSON()));
- updateRequest.setUpdates(BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName
- << txnNum
- << SessionTxnRecord::kLastWriteOpTimeTsFieldName
- << ts)));
- updateRequest.setUpsert(true);
+ const auto updateResult = update(opCtx, autoColl.getDb(), updateRequest);
- auto updateResult = update(opCtx, autoColl.getDb(), updateRequest);
- uassert(40527,
- str::stream() << "Failed to update transaction progress for session " << sessionId,
- updateResult.numDocsModified >= 1 || !updateResult.upserted.isEmpty());
+ if (!updateResult.numDocsModified && updateResult.upserted.isEmpty()) {
+ throw WriteConflictException();
+ }
}
-void Session::begin(OperationContext* opCtx, const TxnNumber& txnNumber) {
+} // namespace
+
+Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
+
+void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
invariant(!opCtx->lockState()->isLocked());
- invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+ invariant(repl::ReadConcernArgs::get(opCtx).getLevel() ==
+ repl::ReadConcernLevel::kLocalReadConcern);
- // Repeats if the generation of the session changes during I/O.
- while (true) {
- int startGeneration = 0;
- boost::optional<SessionTxnRecord> txnRecord;
- {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- startGeneration = _generation;
- txnRecord = _lastWrittenTxnRecord;
- }
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ while (!_isValid) {
+ const int numInvalidations = _numInvalidations;
- // Do I/O outside of the lock.
- if (!txnRecord) {
- txnRecord = loadSessionRecord(opCtx, _sessionId);
+ ul.unlock();
- // Previous read failed to retrieve the txn record, which means it does not exist yet,
- // so create a new entry.
- if (!txnRecord) {
- updateSessionRecord(opCtx, _sessionId, txnNumber, Timestamp());
- txnRecord = makeSessionTxnRecord(_sessionId, txnNumber, Timestamp());
+ const auto lastWrittenTxnRecord = [&]() -> boost::optional<SessionTxnRecord> {
+ DBDirectClient client(opCtx);
+ auto result = client.findOne(
+ NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON())});
+ if (result.isEmpty()) {
+ return boost::none;
}
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _lastWrittenTxnRecord = txnRecord;
- }
+ return SessionTxnRecord::parse(
+ IDLParserErrorContext("parse latest txn record for session"), result);
+ }();
- uassert(40528,
- str::stream() << "cannot start transaction with id " << txnNumber << " on session "
- << _sessionId
- << " because transaction with id "
- << txnRecord->getTxnNum()
- << " already started",
- txnRecord->getTxnNum() <= txnNumber);
-
- if (txnNumber > txnRecord->getTxnNum()) {
- updateSessionRecord(opCtx, _sessionId, txnNumber, Timestamp());
- txnRecord->setTxnNum(txnNumber);
- txnRecord->setLastWriteOpTimeTs(Timestamp());
- }
+ ul.lock();
- {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
+ // Protect against concurrent refreshes or invalidations
+ if (!_isValid && _numInvalidations == numInvalidations) {
+ _isValid = true;
+ _lastWrittenSessionRecord = std::move(lastWrittenTxnRecord);
- // Reload if the session was modified since the beginning of this loop, e.g. by
- // rollback.
- if (startGeneration != _generation) {
- _lastWrittenTxnRecord.reset();
- continue;
+ if (_lastWrittenSessionRecord) {
+ _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
}
- _lastWrittenTxnRecord = std::move(txnRecord);
- return;
+ break;
}
}
}
-void Session::saveTxnProgress(OperationContext* opCtx, Timestamp opTimeTs) {
- // Needs to be in the same write unit of work with the write for this result.
+void Session::beginTxn(OperationContext* opCtx, TxnNumber txnNumber) {
+ invariant(!opCtx->lockState()->isLocked());
+
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _beginTxn(lg, txnNumber);
+}
+
+void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ Timestamp newLastWriteTs) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
- repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
- AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX);
- auto coll = autoColl.getCollection();
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ _checkValid(ul);
+ _checkIsActiveTransaction(ul, txnNumber);
- uassert(40529,
- str::stream() << "Unable to persist transaction state because the session transaction "
- "collection is missing. This indicates that the "
- << NamespaceString::kSessionTransactionsTableNamespace.ns()
- << " collection has been manually deleted.",
- coll);
+ const auto updateRequest = _makeUpdateRequest(ul, txnNumber, newLastWriteTs);
- UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
- updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
- << _sessionId.toBSON()
- << SessionTxnRecord::kTxnNumFieldName
- << getTxnNum()));
- updateRequest.setUpdates(
- BSON("$set" << BSON(SessionTxnRecord::kLastWriteOpTimeTsFieldName << opTimeTs)));
- updateRequest.setUpsert(false);
-
- auto updateResult = update(opCtx, autoColl.getDb(), updateRequest);
- uassert(40530,
- str::stream() << "Failed to update transaction progress for session " << _sessionId,
- updateResult.numDocsModified >= 1);
+ ul.unlock();
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _lastWrittenTxnRecord->setLastWriteOpTimeTs(opTimeTs);
+ repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
+
+ updateSessionEntry(opCtx, updateRequest);
+ _registerUpdateCacheOnCommit(opCtx, txnNumber, std::move(stmtIdsWritten), newLastWriteTs);
}
-TxnNumber Session::getTxnNum() const {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- invariant(_lastWrittenTxnRecord);
- return _lastWrittenTxnRecord->getTxnNum();
+void Session::updateSessionRecordOnSecondary(OperationContext* opCtx,
+ const SessionTxnRecord& sessionTxnRecord) {
+ invariant(!opCtx->lockState()->isLocked());
+
+ writeConflictRetry(
+ opCtx, "Update session txn", NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] {
+ UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
+ updateRequest.setUpsert(true);
+ updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName
+ << sessionTxnRecord.getSessionId().toBSON()));
+ updateRequest.setUpdates(sessionTxnRecord.toBSON());
+
+ repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
+
+ Lock::DBLock configDBLock(opCtx, NamespaceString::kConfigDb, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
+ updateSessionEntry(opCtx, updateRequest);
+ wuow.commit();
+ });
}
-Timestamp Session::getLastWriteOpTimeTs() const {
+void Session::invalidate() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- invariant(_lastWrittenTxnRecord);
- return _lastWrittenTxnRecord->getLastWriteOpTimeTs();
-}
+ _isValid = false;
+ _numInvalidations++;
+
+ _lastWrittenSessionRecord.reset();
-TransactionHistoryIterator Session::getWriteHistory(OperationContext* opCtx) const {
- return TransactionHistoryIterator(getLastWriteOpTimeTs());
+ _activeTxnNumber = kUninitializedTxnNumber;
}
-void Session::reset() {
+Timestamp Session::getLastWriteOpTimeTs(TxnNumber txnNumber) const {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- _lastWrittenTxnRecord.reset();
- _generation += 1;
+ _checkValid(lg);
+ _checkIsActiveTransaction(lg, txnNumber);
+
+ if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
+ return Timestamp();
+
+ return _lastWrittenSessionRecord->getLastWriteOpTimeTs();
}
boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationContext* opCtx,
- StmtId stmtId) {
- if (!opCtx->getTxnNumber()) {
+ TxnNumber txnNumber,
+ StmtId stmtId) const {
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ _checkValid(ul);
+ _checkIsActiveTransaction(ul, txnNumber);
+
+ if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
return boost::none;
- }
- auto it = getWriteHistory(opCtx);
+ auto it = TransactionHistoryIterator(_lastWrittenSessionRecord->getLastWriteOpTimeTs());
+
+ ul.unlock();
+
while (it.hasNext()) {
- auto entry = it.next(opCtx);
- if (entry.getStatementId() == stmtId) {
+ const auto entry = it.next(opCtx);
+ invariant(entry.getStatementId());
+ if (*entry.getStatementId() == stmtId) {
return entry;
}
}
@@ -226,4 +204,106 @@ boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationConte
return boost::none;
}
+void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) {
+ _checkValid(wl);
+
+ uassert(ErrorCodes::TransactionTooOld,
+ str::stream() << "Cannot start transaction " << txnNumber << " on session "
+ << getSessionId()
+ << " because a newer transaction "
+ << _activeTxnNumber
+ << " has already started.",
+ txnNumber >= _activeTxnNumber);
+
+ // Check for continuing an existing transaction
+ if (txnNumber == _activeTxnNumber)
+ return;
+
+ _activeTxnNumber = txnNumber;
+}
+
+void Session::_checkValid(WithLock) const {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Session " << getSessionId()
+ << " was concurrently modified and the operation must be retried.",
+ _isValid);
+}
+
+void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Cannot perform retryability check for transaction " << txnNumber
+ << " on session "
+ << getSessionId()
+ << " because a different transaction "
+ << _activeTxnNumber
+ << " is now active.",
+ txnNumber == _activeTxnNumber);
+}
+
+UpdateRequest Session::_makeUpdateRequest(WithLock,
+ TxnNumber newTxnNumber,
+ Timestamp newLastWriteTs) const {
+ UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
+ updateRequest.setUpsert(true);
+
+ if (_lastWrittenSessionRecord) {
+ updateRequest.setQuery(_lastWrittenSessionRecord->toBSON());
+ updateRequest.setUpdates(
+ BSON("$set" << BSON(SessionTxnRecord::kTxnNumFieldName
+ << newTxnNumber
+ << SessionTxnRecord::kLastWriteOpTimeTsFieldName
+ << newLastWriteTs)));
+ } else {
+ const auto updateBSON = [&] {
+ SessionTxnRecord newTxnRecord;
+ newTxnRecord.setSessionId(_sessionId);
+ newTxnRecord.setTxnNum(newTxnNumber);
+ newTxnRecord.setLastWriteOpTimeTs(newLastWriteTs);
+ return newTxnRecord.toBSON();
+ }();
+
+ updateRequest.setQuery(updateBSON);
+ updateRequest.setUpdates(updateBSON);
+ }
+
+ return updateRequest;
+}
+
+void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
+ TxnNumber newTxnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ Timestamp newLastWriteTs) {
+ opCtx->recoveryUnit()->onCommit(
+ [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), newLastWriteTs ] {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ if (!_isValid)
+ return;
+
+ if (newTxnNumber < _activeTxnNumber)
+ return;
+
+ // This call is necessary in order to advance the txn number and reset the cached state
+ // in the case where just before the storage transaction commits, the cache entry gets
+ // invalidated and immediately refreshed while there were no writes for newTxnNumber
+ // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to
+ // update the cache even though the write was successful.
+ _beginTxn(lg, newTxnNumber);
+
+ if (!_lastWrittenSessionRecord) {
+ _lastWrittenSessionRecord.emplace();
+
+ _lastWrittenSessionRecord->setSessionId(_sessionId);
+ _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+ _lastWrittenSessionRecord->setLastWriteOpTimeTs(newLastWriteTs);
+ } else {
+ if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
+ _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+
+ if (newLastWriteTs > _lastWrittenSessionRecord->getLastWriteOpTimeTs())
+ _lastWrittenSessionRecord->setLastWriteOpTimeTs(newLastWriteTs);
+ }
+ });
+}
+
} // namespace mongo