/** * Copyright (C) 2017 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/db/catalog/catalog_raii.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/write_concern.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace { PseudoRandom hashGenerator(std::unique_ptr(SecureRandom::create())->nextInt64()); boost::optional fetchPrePostImageOplog(OperationContext* opCtx, const repl::OplogEntry& oplog) { auto opTimeToFetch = oplog.getPreImageOpTime(); if (!opTimeToFetch) { opTimeToFetch = oplog.getPostImageOpTime(); } if (!opTimeToFetch) { return boost::none; } auto opTime = opTimeToFetch.value(); DBDirectClient client(opCtx); auto oplogBSON = client.findOne(NamespaceString::kRsOplogNamespace.ns(), opTime.asQuery()); return uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); } /** * Creates an OplogEntry using the given field values */ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, long long hash, repl::OpTypeEnum opType, const BSONObj& oField, const boost::optional& o2Field, const OperationSessionInfo& sessionInfo, const boost::optional& statementId) { return repl::OplogEntry(opTime, // optime hash, // hash opType, // op type {}, // namespace boost::none, // uuid boost::none, // fromMigrate repl::OplogEntry::kOplogVersion, // version oField, // o o2Field, // o2 sessionInfo, // session info boost::none, // wall clock time statementId, // statement id boost::none, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime } /** * Creates a special "write history lost" sentinel oplog entry. */ repl::OplogEntry makeSentinelOplogEntry(OperationSessionInfo sessionInfo) { return makeOplogEntry({}, // optime hashGenerator.nextInt64(), // hash repl::OpTypeEnum::kNoop, // op type {}, // o Session::kDeadEndSentinel, // o2 sessionInfo, // session info kIncompleteHistoryStmtId); // statement id } } // namespace SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* opCtx, NamespaceString ns) : _ns(std::move(ns)), _rollbackIdAtInit(repl::ReplicationProcess::get(opCtx)->getRollbackID()) { // Sort is not needed for correctness. This is just for making it easier to write deterministic // tests. Query query; query.sort(BSON("_id" << 1)); DBDirectClient client(opCtx); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query); while (cursor->more()) { auto nextSession = SessionTxnRecord::parse( IDLParserErrorContext("Session migration cloning"), cursor->next()); if (!nextSession.getLastWriteOpTime().isNull()) { _sessionOplogIterators.push_back( stdx::make_unique(std::move(nextSession), _rollbackIdAtInit)); } } { AutoGetCollection autoColl(opCtx, NamespaceString::kRsOplogNamespace, MODE_IX); writeConflictRetry( opCtx, "session migration initialization majority commit barrier", NamespaceString::kRsOplogNamespace.ns(), [&] { const auto message = BSON("sessionMigrateCloneStart" << _ns.ns()); WriteUnitOfWork wuow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( opCtx, _ns, {}, {}, message); wuow.commit(); }); } auto opTimeToWait = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); WriteConcernResult result; WriteConcernOptions majority( WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result)); } bool SessionCatalogMigrationSource::hasMoreOplog() { return _hasMoreOplogFromSessionCatalog() || _hasNewWrites(); } SessionCatalogMigrationSource::OplogResult SessionCatalogMigrationSource::getLastFetchedOplog() { { stdx::lock_guard _lk(_sessionCloneMutex); if (_lastFetchedOplog) { return OplogResult(_lastFetchedOplog, false); } } { stdx::lock_guard _lk(_newOplogMutex); return OplogResult(_lastFetchedNewWriteOplog, true); } } bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) { if (_fetchNextOplogFromSessionCatalog(opCtx)) { return true; } return _fetchNextNewWriteOplog(opCtx); } bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) { if (_currentOplogIterator) { if (_currentOplogIterator->hasNext()) { auto nextOplog = _currentOplogIterator->getNext(opCtx); auto nextStmtId = nextOplog.getStatementId(); // Note: This is an optimization based on the assumption that it is not possible to be // touching different namespaces in the same transaction. if (!nextStmtId || (nextStmtId && *nextStmtId != kIncompleteHistoryStmtId && nextOplog.getNamespace() != _ns)) { _currentOplogIterator.reset(); return false; } auto doc = fetchPrePostImageOplog(opCtx, nextOplog); if (doc) { _lastFetchedOplogBuffer.push_back(nextOplog); _lastFetchedOplog = *doc; } else { _lastFetchedOplog = nextOplog; } return true; } else { _currentOplogIterator.reset(); } } return false; } bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() { stdx::lock_guard _lk(_sessionCloneMutex); return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty() || !_sessionOplogIterators.empty() || _currentOplogIterator; } // Important: The no-op oplog entry for findAndModify should always be returned first before the // actual operation. repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedOplogFromSessionCatalog() { stdx::lock_guard lk(_sessionCloneMutex); return _lastFetchedOplogBuffer.back(); } bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationContext* opCtx) { stdx::unique_lock lk(_sessionCloneMutex); if (!_lastFetchedOplogBuffer.empty()) { _lastFetchedOplog = _lastFetchedOplogBuffer.back(); _lastFetchedOplogBuffer.pop_back(); return true; } _lastFetchedOplog.reset(); if (_handleWriteHistory(lk, opCtx)) { return true; } while (!_sessionOplogIterators.empty()) { _currentOplogIterator = std::move(_sessionOplogIterators.back()); _sessionOplogIterators.pop_back(); if (_handleWriteHistory(lk, opCtx)) { return true; } } return false; } bool SessionCatalogMigrationSource::_hasNewWrites() { stdx::lock_guard lk(_newOplogMutex); return _lastFetchedNewWriteOplog || !_newWriteOpTimeList.empty(); } repl::OplogEntry SessionCatalogMigrationSource::_getLastFetchedNewWriteOplog() { stdx::lock_guard lk(_newOplogMutex); invariant(_lastFetchedNewWriteOplog); return *_lastFetchedNewWriteOplog; } bool SessionCatalogMigrationSource::_fetchNextNewWriteOplog(OperationContext* opCtx) { repl::OpTime nextOpTimeToFetch; { stdx::lock_guard lk(_newOplogMutex); if (_newWriteOpTimeList.empty()) { _lastFetchedNewWriteOplog.reset(); return false; } nextOpTimeToFetch = _newWriteOpTimeList.front(); } DBDirectClient client(opCtx); auto newWriteOplog = client.findOne(NamespaceString::kRsOplogNamespace.ns(), nextOpTimeToFetch.asQuery()); uassert(40620, str::stream() << "Unable to fetch oplog entry with opTime: " << nextOpTimeToFetch.toBSON(), !newWriteOplog.isEmpty()); { stdx::lock_guard lk(_newOplogMutex); _lastFetchedNewWriteOplog = uassertStatusOK(repl::OplogEntry::parse(newWriteOplog)); _newWriteOpTimeList.pop_front(); } return true; } void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) { stdx::lock_guard lk(_newOplogMutex); _newWriteOpTimeList.push_back(opTime); } SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator( SessionTxnRecord txnRecord, int expectedRollbackId) : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId) { _writeHistoryIterator = stdx::make_unique(_record.getLastWriteOpTime()); } bool SessionCatalogMigrationSource::SessionOplogIterator::hasNext() const { return _writeHistoryIterator && _writeHistoryIterator->hasNext(); } repl::OplogEntry SessionCatalogMigrationSource::SessionOplogIterator::getNext( OperationContext* opCtx) { try { // Note: during SessionCatalogMigrationSource::init, we inserted a document and wait for it // to committed to the majority. In addition, the TransactionHistoryIterator uses OpTime // to query for the oplog. This means that if we can successfully fetch the oplog, we are // guaranteed that they are majority committed. If we can't fetch the oplog, it can either // mean that the oplog has been rolled over or was rolled back. return _writeHistoryIterator->next(opCtx); } catch (const AssertionException& excep) { if (excep.code() == ErrorCodes::IncompleteTransactionHistory) { // Note: no need to check if in replicaSet mode because having an iterator implies // oplog exists. auto rollbackId = repl::ReplicationProcess::get(opCtx)->getRollbackID(); uassert(40656, str::stream() << "rollback detected, rollbackId was " << _initialRollbackId << " but is now " << rollbackId, rollbackId == _initialRollbackId); // If the rollbackId hasn't changed, this means that the oplog has been truncated. // So, we return the special "write history lost" sentinel. OperationSessionInfo sessionInfo; sessionInfo.setSessionId(_record.getSessionId()); sessionInfo.setTxnNumber(_record.getTxnNum()); auto oplog = makeSentinelOplogEntry(sessionInfo); _writeHistoryIterator.reset(); return oplog; } throw; } } } // namespace mongo