diff options
author | Randolph Tan <randolph@10gen.com> | 2017-06-26 11:27:28 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2017-07-03 10:53:14 -0400 |
commit | daf92c6fa479f51b1a1d18cacf56b0a1bbeba9fb (patch) | |
tree | 6f80c26295ec5fe763275882c72797dc395cbe13 /src | |
parent | 5a933be9235b7d02bb5c790c7ff7eba3a867c43a (diff) | |
download | mongo-daf92c6fa479f51b1a1d18cacf56b0a1bbeba9fb.tar.gz |
SERVER-28903 Make Transaction state store last write optime instead of write results
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/session_transaction_state_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/session_transaction_table.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/session_txn_record.idl | 36 | ||||
-rw-r--r-- | src/mongo/db/session_txn_state.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/session_txn_state.h | 34 | ||||
-rw-r--r-- | src/mongo/db/session_txn_state_holder.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/session_txn_state_holder.h | 2 | ||||
-rw-r--r-- | src/mongo/db/session_txn_write_history_iterator.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/session_txn_write_history_iterator.h | 64 |
10 files changed, 161 insertions, 91 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index cbb721fe868..2baa48e18d0 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1313,6 +1313,7 @@ env.Library( 'session_transaction_table.cpp', 'session_txn_state.cpp', 'session_txn_state_holder.cpp', + 'session_txn_write_history_iterator.cpp', env.Idlc('session_txn_record.idl')[0], ], LIBDEPS=[ diff --git a/src/mongo/db/session_transaction_state_test.cpp b/src/mongo/db/session_transaction_state_test.cpp index 54e4ab2a7b5..7a8d4d2a5f1 100644 --- a/src/mongo/db/session_transaction_state_test.cpp +++ b/src/mongo/db/session_transaction_state_test.cpp @@ -49,21 +49,19 @@ TEST(SessionTxnStateHolder, Demo) { { // Caller has now control of txn state and can read/write from it. - auto txnStateToken = txnStateHolder->getTransactionState(&opCtx, 1); + auto txnStateToken = txnStateHolder->getTransactionState(&opCtx); - auto partialResults = txnStateToken.get()->getPartialResults(); + auto writeHistory = txnStateToken.get()->getWriteHistory(&opCtx); - // Go over request object, and mark all statements that has results in partialResults as - // done. + // Go over request object, and mark all statements that is in writeHistory as done. + // In addition, convert oplog entries into results appropriate for command. // For every statement that is not yet 'done': // Perform write op, and then store result: // Commented out since OperationContextNoop uses LockerNoop - // SingleWriteResult result; // repl::OpTime opTime; - // StmtId stmtId = 0; - // txnStateToken.get()->storePartialResult(&opCtx, stmtId, result, opTime); + // txnStateToken.get()->saveTxnProgress(&opCtx, opTime); // Consolidate partial results into final results for command response } diff --git a/src/mongo/db/session_transaction_table.cpp b/src/mongo/db/session_transaction_table.cpp index 9cde9802287..02f69439e5c 100644 --- a/src/mongo/db/session_transaction_table.cpp +++ b/src/mongo/db/session_transaction_table.cpp @@ -110,7 +110,7 @@ std::shared_ptr<SessionTxnStateHolder> SessionTransactionTable::getSessionTxnSta // TODO: consult sessions table (without network I/O). The fact that we reached this point // means that the session was previously active. - auto txnState = stdx::make_unique<SessionTxnState>(sessionId, kUninitializedTxnNumber); + auto txnState = stdx::make_unique<SessionTxnState>(sessionId); auto newEntry = std::make_shared<SessionTxnStateHolder>(std::move(txnState)); _txnTable.insert(std::make_pair(sessionId, newEntry)); return newEntry; diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl index e942f6c1167..cf9d2d30c16 100644 --- a/src/mongo/db/session_txn_record.idl +++ b/src/mongo/db/session_txn_record.idl @@ -30,11 +30,11 @@ global: cpp_namespace: "mongo" cpp_includes: - "mongo/db/logical_session_id.h" + - "mongo/db/repl/optime.h" imports: - "mongo/idl/basic_types.idl" - "mongo/db/logical_session_id.idl" - - "mongo/db/ops/single_write_result.idl" types: LogicalSessionId: @@ -44,31 +44,25 @@ types: deserializer: "mongo::LogicalSessionId::parse" serializer: "mongo::LogicalSessionId::toBSON" + OpTime: + description: "repl::OpTime" + bson_serialization_type: object + cpp_type: "mongo::repl::OpTime" + deserializer: "mongo::repl::OpTime::parse" + serializer: "mongo::repl::OpTime::toBSON" + structs: - txnStmtTupleId: - description: "Unique tuple that identifies a single statment within a logical transaction." + sessionTxnRecord: + description: "A document used for storing session transaction states." fields: - sessionId: + _id: + cpp_name: sessionId type: LogicalSessionId description: "The id of the session this transaction belongs to." txnNum: type: TxnNumber description: "The id representing this transaction." - stmtId: - type: StmtId - description: "The id of the last known statement that was executed successfully - with side effects in this transaction." - - sessionTxnRecord: - description: "A document used for storing session transaction states." - fields: - _id: - cpp_name: "Id" - type: txnStmtTupleId - result: - type: singleWriteResult - description: "This is the structure for storing the partial results of this - session transaction." - - # TODO: add opTime + lastWriteOpTime: + type: OpTime + description: "The optime of the last write on this transaction." diff --git a/src/mongo/db/session_txn_state.cpp b/src/mongo/db/session_txn_state.cpp index 91087795130..5d92167b586 100644 --- a/src/mongo/db/session_txn_state.cpp +++ b/src/mongo/db/session_txn_state.cpp @@ -40,7 +40,7 @@ auto getSessionTxnState = OperationContext::declareDecoration<SessionTxnState*>( } // unnamed namespace -const NamespaceString SessionTxnState::kConfigNS("admin.system.transactions"); +const NamespaceString SessionTxnState::kConfigNS("config.system.transactions"); SessionTxnState* SessionTxnState::get(OperationContext* opCtx) { return getSessionTxnState(opCtx); @@ -51,51 +51,38 @@ void SessionTxnState::set(OperationContext* opCtx, SessionTxnState* txnState) { sessionTxnState = txnState; } -SessionTxnState::SessionTxnState(LogicalSessionId sessionId, TxnNumber txnNumber) - : _sessionId(std::move(sessionId)), _txnNumber(std::move(txnNumber)) {} +SessionTxnState::SessionTxnState(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} -void SessionTxnState::begin(OperationContext* opCtx) { - if (!_isInitialized) { +void SessionTxnState::begin(OperationContext* opCtx, const TxnNumber& txnNumber) { + if (!_txnRecord) { // load txn table state from storage } - _isInitialized = true; + // TODO: assert if txnNumber < myTxnNumber + // TODO: if txnNumber > myTxnNumber, update record in storage then update _txnRecord. } -void SessionTxnState::addResultFromStorage(const SessionTxnRecord& txnRecord) { - const auto& id = txnRecord.getId(); - invariant(id.getSessionId() == _sessionId); - invariant(id.getTxnNum() == _txnNumber); - - _partialResults.insert(std::make_pair(id.getStmtId(), txnRecord.getResult())); -} - -void SessionTxnState::storePartialResult(OperationContext* opCtx, - StmtId stmtId, - SingleWriteResult result, - repl::OpTime opTime) { +void SessionTxnState::saveTxnProgress(OperationContext* opCtx, repl::OpTime opTime) { // Needs to be in the same write unit of work with the write for this result. invariant(opCtx->lockState()->inAWriteUnitOfWork()); - _partialResults.insert(std::make_pair(stmtId, std::move(result))); - // TODO: update collection, then update _writeResults & _lastWriteOpTime - // assert if cannot store -> could mean that a new txn started + // TODO: update collection, then update _lastWriteOpTime } const LogicalSessionId& SessionTxnState::getSessionId() const { return _sessionId; } -const TxnNumber& SessionTxnState::getTxnNum() const { - return _txnNumber; +TxnNumber SessionTxnState::getTxnNum() const { + return _txnRecord.value().getTxnNum(); } -const SessionTxnState::PartialResults& SessionTxnState::getPartialResults() const { - return _partialResults; +const repl::OpTime& SessionTxnState::getLastWriteOpTime() const { + return _txnRecord.value().getLastWriteOpTime(); } -void SessionTxnState::cleanUpOlderTransactions(OperationContext* opCtx) { - // TODO: Remove all entries with txnNum < _txnNum && sessionId == _sessionId +SessionTxnWriteHistoryIterator SessionTxnState::getWriteHistory(OperationContext* opCtx) const { + return SessionTxnWriteHistoryIterator(getLastWriteOpTime()); } } // namespace mongo diff --git a/src/mongo/db/session_txn_state.h b/src/mongo/db/session_txn_state.h index 297081db050..c939c97506f 100644 --- a/src/mongo/db/session_txn_state.h +++ b/src/mongo/db/session_txn_state.h @@ -28,12 +28,13 @@ #pragma once +#include <boost/optional.hpp> #include <map> #include "mongo/db/logical_session_id.h" -#include "mongo/db/ops/single_write_result_gen.h" #include "mongo/db/repl/optime.h" #include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/session_txn_write_history_iterator.h" namespace mongo { @@ -45,38 +46,27 @@ class OperationContext; class SessionTxnState { public: static const NamespaceString kConfigNS; - using PartialResults = std::map<StmtId, SingleWriteResult>; - SessionTxnState(LogicalSessionId sessionId, TxnNumber txnNumber); + explicit SessionTxnState(LogicalSessionId sessionId); /** * Load transaction state from storage if it hasn't. */ - void begin(OperationContext* opCtx); + void begin(OperationContext* opCtx, const TxnNumber& txnNumber); /** - * Returns the partial results for this transaction. + * Returns the history of writes that has happened on this transaction. */ - const PartialResults& getPartialResults() const; + SessionTxnWriteHistoryIterator getWriteHistory(OperationContext* opCtx) const; /** * Stores the result of a single write operation within this transaction. */ - void storePartialResult(OperationContext* opCtx, - StmtId stmtId, - SingleWriteResult result, - repl::OpTime opTime); - - void addResultFromStorage(const SessionTxnRecord& txnRecord); + void saveTxnProgress(OperationContext* opCtx, repl::OpTime opTime); const LogicalSessionId& getSessionId() const; - const TxnNumber& getTxnNum() const; - - /** - * Removes all previous transaction states in this logical session with transaction id smaller - * than this one. - */ - void cleanUpOlderTransactions(OperationContext* opCtx); + TxnNumber getTxnNum() const; + const repl::OpTime& getLastWriteOpTime() const; /** * Returns a SessionTxnState stored in the operation context. @@ -90,11 +80,7 @@ public: private: const LogicalSessionId _sessionId; - const TxnNumber _txnNumber; - - bool _isInitialized = false; - PartialResults _partialResults; - repl::OpTime _lastWriteOpTime; + boost::optional<SessionTxnRecord> _txnRecord; }; } // namespace mongo diff --git a/src/mongo/db/session_txn_state_holder.cpp b/src/mongo/db/session_txn_state_holder.cpp index fac69027556..8027c2afbf1 100644 --- a/src/mongo/db/session_txn_state_holder.cpp +++ b/src/mongo/db/session_txn_state_holder.cpp @@ -39,22 +39,13 @@ namespace mongo { SessionTxnStateHolder::SessionTxnStateHolder(std::unique_ptr<SessionTxnState> txnState) : _sessionId(txnState->getSessionId()), _txnState(std::move(txnState)) {} -TxnStateAccessToken SessionTxnStateHolder::getTransactionState(OperationContext* opCtx, - TxnNumber txnNum) { +TxnStateAccessToken SessionTxnStateHolder::getTransactionState(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_mutex); while (!_txnState) { opCtx->waitForConditionOrInterrupt(_txnStateAvailableCV, lk); } - if (txnNum < _txnState->getTxnNum()) { - // uassert - } - - if (txnNum > _txnState->getTxnNum()) { - _txnState = stdx::make_unique<SessionTxnState>(_sessionId, txnNum); - } - return TxnStateAccessToken(opCtx, this, std::move(_txnState)); } diff --git a/src/mongo/db/session_txn_state_holder.h b/src/mongo/db/session_txn_state_holder.h index 9b78a7409e7..4b1c784a0dc 100644 --- a/src/mongo/db/session_txn_state_holder.h +++ b/src/mongo/db/session_txn_state_holder.h @@ -54,7 +54,7 @@ public: * Returns the active transaction to the caller. Blocks if the transaction state is currently * in use and waits for it to be available. */ - TxnStateAccessToken getTransactionState(OperationContext* opCtx, TxnNumber txnNum); + TxnStateAccessToken getTransactionState(OperationContext* opCtx); /** * Returns the activeTxn back to this SessionTransactionState. diff --git a/src/mongo/db/session_txn_write_history_iterator.cpp b/src/mongo/db/session_txn_write_history_iterator.cpp new file mode 100644 index 00000000000..55564550d28 --- /dev/null +++ b/src/mongo/db/session_txn_write_history_iterator.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/operation_context.h" +#include "mongo/db/session_txn_write_history_iterator.h" + +namespace mongo { + +SessionTxnWriteHistoryIterator::SessionTxnWriteHistoryIterator(repl::OpTime startingOpTime) + : _nextOpTime(std::move(startingOpTime)) {} + +bool SessionTxnWriteHistoryIterator::hasNext() const { + return !_nextOpTime.isNull(); +} + +repl::OplogEntry next(OperationContext* opCtx) { + // TODO: use DBDirectClient to fetch the oplog entry. + // assert if !hasNext(). + return repl::OplogEntry(BSONObj()); +} + +} // namespace mongo diff --git a/src/mongo/db/session_txn_write_history_iterator.h b/src/mongo/db/session_txn_write_history_iterator.h new file mode 100644 index 00000000000..265e41c0336 --- /dev/null +++ b/src/mongo/db/session_txn_write_history_iterator.h @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/optime.h" + +namespace mongo { + +class OperationContext; + +/** + * An iterator class that can traverse through the oplog entries that are linked via the prevTs + * field. + */ +class SessionTxnWriteHistoryIterator { +public: + /** + * Creates a new iterator starting with an oplog entry with the given start opTime. + */ + SessionTxnWriteHistoryIterator(repl::OpTime startingOpTime); + + /** + * Returns false if there are no more entries to iterate. + */ + bool hasNext() const; + + /** + * Returns the next oplog entry. + * Throws if hasNext is false. + */ + repl::OplogEntry next(OperationContext* opCtx); + +private: + repl::OpTime _nextOpTime; +}; + +} // namespace mongo |