// wiredtiger_recovery_unit.cpp /** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/server_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/util/hex.h" #include "mongo/util/log.h" namespace mongo { namespace { // SnapshotIds need to be globally unique, as they are used in a WorkingSetMember to // determine if documents changed, but a different recovery unit may be used across a getMore, // so there is a chance the snapshot ID will be reused. AtomicUInt64 nextSnapshotId{1}; logger::LogSeverity kSlowTransactionSeverity = logger::LogSeverity::Debug(1); } // namespace WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc) : WiredTigerRecoveryUnit(sc, sc->getKVEngine()->getOplogManager()) {} WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc, WiredTigerOplogManager* oplogManager) : _sessionCache(sc), _oplogManager(oplogManager), _inUnitOfWork(false), _active(false), _mySnapshotId(nextSnapshotId.fetchAndAdd(1)) {} WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() { invariant(!_inUnitOfWork); _abort(); } void WiredTigerRecoveryUnit::_commit() { try { if (_session && _active) { _txnClose(true); } for (Changes::const_iterator it = _changes.begin(), end = _changes.end(); it != end; ++it) { (*it)->commit(); } _changes.clear(); invariant(!_active); } catch (...) { std::terminate(); } } void WiredTigerRecoveryUnit::_abort() { try { if (_session && _active) { _txnClose(false); } for (Changes::const_reverse_iterator it = _changes.rbegin(), end = _changes.rend(); it != end; ++it) { Change* change = it->get(); LOG(2) << "CUSTOM ROLLBACK " << redact(demangleName(typeid(*change))); change->rollback(); } _changes.clear(); invariant(!_active); } catch (...) { std::terminate(); } } void WiredTigerRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) { invariant(!_areWriteUnitOfWorksBanned); invariant(!_inUnitOfWork); _inUnitOfWork = true; } void WiredTigerRecoveryUnit::commitUnitOfWork() { invariant(_inUnitOfWork); _inUnitOfWork = false; _commit(); } void WiredTigerRecoveryUnit::abortUnitOfWork() { invariant(_inUnitOfWork); _inUnitOfWork = false; _abort(); } void WiredTigerRecoveryUnit::_ensureSession() { if (!_session) { _session = _sessionCache->getSession(); } } bool WiredTigerRecoveryUnit::waitUntilDurable() { invariant(!_inUnitOfWork); // _session may be nullptr. We cannot _ensureSession() here as that needs shutdown protection. const bool forceCheckpoint = false; const bool stableCheckpoint = false; _sessionCache->waitUntilDurable(forceCheckpoint, stableCheckpoint); return true; } void WiredTigerRecoveryUnit::registerChange(Change* change) { invariant(_inUnitOfWork); _changes.push_back(std::unique_ptr{change}); } void WiredTigerRecoveryUnit::assertInActiveTxn() const { fassert(28575, _active); } WiredTigerSession* WiredTigerRecoveryUnit::getSession() { if (!_active) { _txnOpen(); } return _session.get(); } WiredTigerSession* WiredTigerRecoveryUnit::getSessionNoTxn() { _ensureSession(); return _session.get(); } void WiredTigerRecoveryUnit::abandonSnapshot() { invariant(!_inUnitOfWork); if (_active) { // Can't be in a WriteUnitOfWork, so safe to rollback _txnClose(false); } _areWriteUnitOfWorksBanned = false; } void WiredTigerRecoveryUnit::preallocateSnapshot() { // Begin a new transaction, if one is not already started. getSession(); } void* WiredTigerRecoveryUnit::writingPtr(void* data, size_t len) { // This API should not be used for anything other than the MMAP V1 storage engine MONGO_UNREACHABLE; } void WiredTigerRecoveryUnit::_txnClose(bool commit) { invariant(_active); WT_SESSION* s = _session->getSession(); if (_timer) { const int transactionTime = _timer->millis(); if (transactionTime >= serverGlobalParams.slowMS) { LOG(kSlowTransactionSeverity) << "Slow WT transaction. Lifetime of SnapshotId " << _mySnapshotId << " was " << transactionTime << "ms"; } } int wtRet; if (commit) { if (!_commitTimestamp.isNull()) { const std::string conf = "commit_timestamp=" + integerToHex(_commitTimestamp.asULL()); invariantWTOK(s->timestamp_transaction(s, conf.c_str())); _isTimestamped = true; } wtRet = s->commit_transaction(s, NULL); LOG(3) << "WT commit_transaction for snapshot id " << _mySnapshotId; } else { wtRet = s->rollback_transaction(s, NULL); invariant(!wtRet); LOG(3) << "WT rollback_transaction for snapshot id " << _mySnapshotId; } if (_isTimestamped) { _oplogManager->triggerJournalFlush(); _isTimestamped = false; } invariantWTOK(wtRet); _active = false; _mySnapshotId = nextSnapshotId.fetchAndAdd(1); _isOplogReader = false; } SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const { // TODO: use actual wiredtiger txn id return SnapshotId(_mySnapshotId); } Status WiredTigerRecoveryUnit::obtainMajorityCommittedSnapshot() { invariant(_isReadingFromPointInTime()); auto snapshotName = _sessionCache->snapshotManager().getMinSnapshotForNextCommittedRead(); if (!snapshotName) { return {ErrorCodes::ReadConcernMajorityNotAvailableYet, "Read concern majority reads are currently not possible."}; } _majorityCommittedSnapshot = *snapshotName; return Status::OK(); } boost::optional WiredTigerRecoveryUnit::getPointInTimeReadTimestamp() const { if (!_isReadingFromPointInTime()) return boost::none; if (getReadConcernLevel() == repl::ReadConcernLevel::kSnapshotReadConcern && !_readAtTimestamp.isNull()) { return _readAtTimestamp; } invariant(!_majorityCommittedSnapshot.isNull()); return _majorityCommittedSnapshot; } void WiredTigerRecoveryUnit::_txnOpen() { invariant(!_active); _ensureSession(); // Only start a timer for transaction's lifetime if we're going to log it. if (shouldLog(kSlowTransactionSeverity)) { _timer.reset(new Timer()); } WT_SESSION* session = _session->getSession(); // '_readAtTimestamp' is available outside of a check for readConcern level 'snapshot' to // accommodate unit testing. if (_readAtTimestamp != Timestamp::min()) { auto status = _sessionCache->snapshotManager().beginTransactionAtTimestamp(_readAtTimestamp, session); if (!status.isOK() && status.code() == ErrorCodes::BadValue) { uasserted(ErrorCodes::SnapshotTooOld, str::stream() << "Read timestamp " << _readAtTimestamp.toString() << " is older than the oldest available timestamp."); } uassertStatusOK(status); } else if (_isReadingFromPointInTime()) { // We reset _majorityCommittedSnapshot to the actual read timestamp used when the // transaction was started. _majorityCommittedSnapshot = _sessionCache->snapshotManager().beginTransactionOnCommittedSnapshot(session); } else if (_isOplogReader) { _sessionCache->snapshotManager().beginTransactionOnOplog( _sessionCache->getKVEngine()->getOplogManager(), session); } else { invariantWTOK(session->begin_transaction( session, _readConcernLevel == repl::ReadConcernLevel::kAvailableReadConcern ? "ignore_prepare=true" : nullptr)); } LOG(3) << "WT begin_transaction for snapshot id " << _mySnapshotId; _active = true; } Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) { _ensureSession(); LOG(3) << "WT set timestamp of future write operations to " << timestamp; WT_SESSION* session = _session->getSession(); invariant(_inUnitOfWork); invariant(_commitTimestamp.isNull(), str::stream() << "Commit timestamp set to " << _commitTimestamp.toString() << " and trying to set WUOW timestamp to " << timestamp.toString()); // Starts the WT transaction associated with this session. getSession(); const std::string conf = "commit_timestamp=" + integerToHex(timestamp.asULL()); auto rc = session->timestamp_transaction(session, conf.c_str()); if (rc == 0) { _isTimestamped = true; } return wtRCToStatus(rc, "timestamp_transaction"); } void WiredTigerRecoveryUnit::setCommitTimestamp(Timestamp timestamp) { invariant(!_inUnitOfWork); invariant(_commitTimestamp.isNull(), str::stream() << "Commit timestamp set to " << _commitTimestamp.toString() << " and trying to set it to " << timestamp.toString()); _commitTimestamp = timestamp; } Timestamp WiredTigerRecoveryUnit::getCommitTimestamp() { return _commitTimestamp; } void WiredTigerRecoveryUnit::clearCommitTimestamp() { invariant(!_inUnitOfWork); invariant(!_commitTimestamp.isNull()); _commitTimestamp = Timestamp(); } Status WiredTigerRecoveryUnit::setPointInTimeReadTimestamp(Timestamp timestamp) { _readAtTimestamp = timestamp; return Status::OK(); } void WiredTigerRecoveryUnit::setIsOplogReader() { // Note: it would be nice to assert !active here, but OplogStones currently opens a cursor on // the oplog while the recovery unit is already active. _isOplogReader = true; } void WiredTigerRecoveryUnit::beginIdle() { // Close all cursors, we don't want to keep any old cached cursors around. if (_session) { _session->closeAllCursors(""); } } // --------------------- WiredTigerCursor::WiredTigerCursor(const std::string& uri, uint64_t tableId, bool forRecordStore, OperationContext* opCtx) { _tableID = tableId; _ru = WiredTigerRecoveryUnit::get(opCtx); _session = _ru->getSession(); _cursor = _session->getCursor(uri, tableId, forRecordStore); if (!_cursor) { error() << "no cursor for uri: " << uri; } } WiredTigerCursor::~WiredTigerCursor() { _session->releaseCursor(_tableID, _cursor); _cursor = NULL; } void WiredTigerCursor::reset() { invariantWTOK(_cursor->reset(_cursor)); } }