summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-06-26 11:27:28 -0400
committerRandolph Tan <randolph@10gen.com>2017-07-03 10:53:14 -0400
commitdaf92c6fa479f51b1a1d18cacf56b0a1bbeba9fb (patch)
tree6f80c26295ec5fe763275882c72797dc395cbe13 /src
parent5a933be9235b7d02bb5c790c7ff7eba3a867c43a (diff)
downloadmongo-daf92c6fa479f51b1a1d18cacf56b0a1bbeba9fb.tar.gz
SERVER-28903 Make Transaction state store last write optime instead of write results
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/session_transaction_state_test.cpp12
-rw-r--r--src/mongo/db/session_transaction_table.cpp2
-rw-r--r--src/mongo/db/session_txn_record.idl36
-rw-r--r--src/mongo/db/session_txn_state.cpp41
-rw-r--r--src/mongo/db/session_txn_state.h34
-rw-r--r--src/mongo/db/session_txn_state_holder.cpp11
-rw-r--r--src/mongo/db/session_txn_state_holder.h2
-rw-r--r--src/mongo/db/session_txn_write_history_iterator.cpp49
-rw-r--r--src/mongo/db/session_txn_write_history_iterator.h64
10 files changed, 161 insertions, 91 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index cbb721fe868..2baa48e18d0 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1313,6 +1313,7 @@ env.Library(
'session_transaction_table.cpp',
'session_txn_state.cpp',
'session_txn_state_holder.cpp',
+ 'session_txn_write_history_iterator.cpp',
env.Idlc('session_txn_record.idl')[0],
],
LIBDEPS=[
diff --git a/src/mongo/db/session_transaction_state_test.cpp b/src/mongo/db/session_transaction_state_test.cpp
index 54e4ab2a7b5..7a8d4d2a5f1 100644
--- a/src/mongo/db/session_transaction_state_test.cpp
+++ b/src/mongo/db/session_transaction_state_test.cpp
@@ -49,21 +49,19 @@ TEST(SessionTxnStateHolder, Demo) {
{
// Caller has now control of txn state and can read/write from it.
- auto txnStateToken = txnStateHolder->getTransactionState(&opCtx, 1);
+ auto txnStateToken = txnStateHolder->getTransactionState(&opCtx);
- auto partialResults = txnStateToken.get()->getPartialResults();
+ auto writeHistory = txnStateToken.get()->getWriteHistory(&opCtx);
- // Go over request object, and mark all statements that has results in partialResults as
- // done.
+ // Go over request object, and mark all statements that is in writeHistory as done.
+ // In addition, convert oplog entries into results appropriate for command.
// For every statement that is not yet 'done':
// Perform write op, and then store result:
// Commented out since OperationContextNoop uses LockerNoop
- // SingleWriteResult result;
// repl::OpTime opTime;
- // StmtId stmtId = 0;
- // txnStateToken.get()->storePartialResult(&opCtx, stmtId, result, opTime);
+ // txnStateToken.get()->saveTxnProgress(&opCtx, opTime);
// Consolidate partial results into final results for command response
}
diff --git a/src/mongo/db/session_transaction_table.cpp b/src/mongo/db/session_transaction_table.cpp
index 9cde9802287..02f69439e5c 100644
--- a/src/mongo/db/session_transaction_table.cpp
+++ b/src/mongo/db/session_transaction_table.cpp
@@ -110,7 +110,7 @@ std::shared_ptr<SessionTxnStateHolder> SessionTransactionTable::getSessionTxnSta
// TODO: consult sessions table (without network I/O). The fact that we reached this point
// means that the session was previously active.
- auto txnState = stdx::make_unique<SessionTxnState>(sessionId, kUninitializedTxnNumber);
+ auto txnState = stdx::make_unique<SessionTxnState>(sessionId);
auto newEntry = std::make_shared<SessionTxnStateHolder>(std::move(txnState));
_txnTable.insert(std::make_pair(sessionId, newEntry));
return newEntry;
diff --git a/src/mongo/db/session_txn_record.idl b/src/mongo/db/session_txn_record.idl
index e942f6c1167..cf9d2d30c16 100644
--- a/src/mongo/db/session_txn_record.idl
+++ b/src/mongo/db/session_txn_record.idl
@@ -30,11 +30,11 @@ global:
cpp_namespace: "mongo"
cpp_includes:
- "mongo/db/logical_session_id.h"
+ - "mongo/db/repl/optime.h"
imports:
- "mongo/idl/basic_types.idl"
- "mongo/db/logical_session_id.idl"
- - "mongo/db/ops/single_write_result.idl"
types:
LogicalSessionId:
@@ -44,31 +44,25 @@ types:
deserializer: "mongo::LogicalSessionId::parse"
serializer: "mongo::LogicalSessionId::toBSON"
+ OpTime:
+ description: "repl::OpTime"
+ bson_serialization_type: object
+ cpp_type: "mongo::repl::OpTime"
+ deserializer: "mongo::repl::OpTime::parse"
+ serializer: "mongo::repl::OpTime::toBSON"
+
structs:
- txnStmtTupleId:
- description: "Unique tuple that identifies a single statment within a logical transaction."
+ sessionTxnRecord:
+ description: "A document used for storing session transaction states."
fields:
- sessionId:
+ _id:
+ cpp_name: sessionId
type: LogicalSessionId
description: "The id of the session this transaction belongs to."
txnNum:
type: TxnNumber
description: "The id representing this transaction."
- stmtId:
- type: StmtId
- description: "The id of the last known statement that was executed successfully
- with side effects in this transaction."
-
- sessionTxnRecord:
- description: "A document used for storing session transaction states."
- fields:
- _id:
- cpp_name: "Id"
- type: txnStmtTupleId
- result:
- type: singleWriteResult
- description: "This is the structure for storing the partial results of this
- session transaction."
-
- # TODO: add opTime
+ lastWriteOpTime:
+ type: OpTime
+ description: "The optime of the last write on this transaction."
diff --git a/src/mongo/db/session_txn_state.cpp b/src/mongo/db/session_txn_state.cpp
index 91087795130..5d92167b586 100644
--- a/src/mongo/db/session_txn_state.cpp
+++ b/src/mongo/db/session_txn_state.cpp
@@ -40,7 +40,7 @@ auto getSessionTxnState = OperationContext::declareDecoration<SessionTxnState*>(
} // unnamed namespace
-const NamespaceString SessionTxnState::kConfigNS("admin.system.transactions");
+const NamespaceString SessionTxnState::kConfigNS("config.system.transactions");
SessionTxnState* SessionTxnState::get(OperationContext* opCtx) {
return getSessionTxnState(opCtx);
@@ -51,51 +51,38 @@ void SessionTxnState::set(OperationContext* opCtx, SessionTxnState* txnState) {
sessionTxnState = txnState;
}
-SessionTxnState::SessionTxnState(LogicalSessionId sessionId, TxnNumber txnNumber)
- : _sessionId(std::move(sessionId)), _txnNumber(std::move(txnNumber)) {}
+SessionTxnState::SessionTxnState(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
-void SessionTxnState::begin(OperationContext* opCtx) {
- if (!_isInitialized) {
+void SessionTxnState::begin(OperationContext* opCtx, const TxnNumber& txnNumber) {
+ if (!_txnRecord) {
// load txn table state from storage
}
- _isInitialized = true;
+ // TODO: assert if txnNumber < myTxnNumber
+ // TODO: if txnNumber > myTxnNumber, update record in storage then update _txnRecord.
}
-void SessionTxnState::addResultFromStorage(const SessionTxnRecord& txnRecord) {
- const auto& id = txnRecord.getId();
- invariant(id.getSessionId() == _sessionId);
- invariant(id.getTxnNum() == _txnNumber);
-
- _partialResults.insert(std::make_pair(id.getStmtId(), txnRecord.getResult()));
-}
-
-void SessionTxnState::storePartialResult(OperationContext* opCtx,
- StmtId stmtId,
- SingleWriteResult result,
- repl::OpTime opTime) {
+void SessionTxnState::saveTxnProgress(OperationContext* opCtx, repl::OpTime opTime) {
// Needs to be in the same write unit of work with the write for this result.
invariant(opCtx->lockState()->inAWriteUnitOfWork());
- _partialResults.insert(std::make_pair(stmtId, std::move(result)));
- // TODO: update collection, then update _writeResults & _lastWriteOpTime
- // assert if cannot store -> could mean that a new txn started
+ // TODO: update collection, then update _lastWriteOpTime
}
const LogicalSessionId& SessionTxnState::getSessionId() const {
return _sessionId;
}
-const TxnNumber& SessionTxnState::getTxnNum() const {
- return _txnNumber;
+TxnNumber SessionTxnState::getTxnNum() const {
+ return _txnRecord.value().getTxnNum();
}
-const SessionTxnState::PartialResults& SessionTxnState::getPartialResults() const {
- return _partialResults;
+const repl::OpTime& SessionTxnState::getLastWriteOpTime() const {
+ return _txnRecord.value().getLastWriteOpTime();
}
-void SessionTxnState::cleanUpOlderTransactions(OperationContext* opCtx) {
- // TODO: Remove all entries with txnNum < _txnNum && sessionId == _sessionId
+SessionTxnWriteHistoryIterator SessionTxnState::getWriteHistory(OperationContext* opCtx) const {
+ return SessionTxnWriteHistoryIterator(getLastWriteOpTime());
}
} // namespace mongo
diff --git a/src/mongo/db/session_txn_state.h b/src/mongo/db/session_txn_state.h
index 297081db050..c939c97506f 100644
--- a/src/mongo/db/session_txn_state.h
+++ b/src/mongo/db/session_txn_state.h
@@ -28,12 +28,13 @@
#pragma once
+#include <boost/optional.hpp>
#include <map>
#include "mongo/db/logical_session_id.h"
-#include "mongo/db/ops/single_write_result_gen.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/session_txn_write_history_iterator.h"
namespace mongo {
@@ -45,38 +46,27 @@ class OperationContext;
class SessionTxnState {
public:
static const NamespaceString kConfigNS;
- using PartialResults = std::map<StmtId, SingleWriteResult>;
- SessionTxnState(LogicalSessionId sessionId, TxnNumber txnNumber);
+ explicit SessionTxnState(LogicalSessionId sessionId);
/**
* Load transaction state from storage if it hasn't.
*/
- void begin(OperationContext* opCtx);
+ void begin(OperationContext* opCtx, const TxnNumber& txnNumber);
/**
- * Returns the partial results for this transaction.
+ * Returns the history of writes that has happened on this transaction.
*/
- const PartialResults& getPartialResults() const;
+ SessionTxnWriteHistoryIterator getWriteHistory(OperationContext* opCtx) const;
/**
* Stores the result of a single write operation within this transaction.
*/
- void storePartialResult(OperationContext* opCtx,
- StmtId stmtId,
- SingleWriteResult result,
- repl::OpTime opTime);
-
- void addResultFromStorage(const SessionTxnRecord& txnRecord);
+ void saveTxnProgress(OperationContext* opCtx, repl::OpTime opTime);
const LogicalSessionId& getSessionId() const;
- const TxnNumber& getTxnNum() const;
-
- /**
- * Removes all previous transaction states in this logical session with transaction id smaller
- * than this one.
- */
- void cleanUpOlderTransactions(OperationContext* opCtx);
+ TxnNumber getTxnNum() const;
+ const repl::OpTime& getLastWriteOpTime() const;
/**
* Returns a SessionTxnState stored in the operation context.
@@ -90,11 +80,7 @@ public:
private:
const LogicalSessionId _sessionId;
- const TxnNumber _txnNumber;
-
- bool _isInitialized = false;
- PartialResults _partialResults;
- repl::OpTime _lastWriteOpTime;
+ boost::optional<SessionTxnRecord> _txnRecord;
};
} // namespace mongo
diff --git a/src/mongo/db/session_txn_state_holder.cpp b/src/mongo/db/session_txn_state_holder.cpp
index fac69027556..8027c2afbf1 100644
--- a/src/mongo/db/session_txn_state_holder.cpp
+++ b/src/mongo/db/session_txn_state_holder.cpp
@@ -39,22 +39,13 @@ namespace mongo {
SessionTxnStateHolder::SessionTxnStateHolder(std::unique_ptr<SessionTxnState> txnState)
: _sessionId(txnState->getSessionId()), _txnState(std::move(txnState)) {}
-TxnStateAccessToken SessionTxnStateHolder::getTransactionState(OperationContext* opCtx,
- TxnNumber txnNum) {
+TxnStateAccessToken SessionTxnStateHolder::getTransactionState(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
while (!_txnState) {
opCtx->waitForConditionOrInterrupt(_txnStateAvailableCV, lk);
}
- if (txnNum < _txnState->getTxnNum()) {
- // uassert
- }
-
- if (txnNum > _txnState->getTxnNum()) {
- _txnState = stdx::make_unique<SessionTxnState>(_sessionId, txnNum);
- }
-
return TxnStateAccessToken(opCtx, this, std::move(_txnState));
}
diff --git a/src/mongo/db/session_txn_state_holder.h b/src/mongo/db/session_txn_state_holder.h
index 9b78a7409e7..4b1c784a0dc 100644
--- a/src/mongo/db/session_txn_state_holder.h
+++ b/src/mongo/db/session_txn_state_holder.h
@@ -54,7 +54,7 @@ public:
* Returns the active transaction to the caller. Blocks if the transaction state is currently
* in use and waits for it to be available.
*/
- TxnStateAccessToken getTransactionState(OperationContext* opCtx, TxnNumber txnNum);
+ TxnStateAccessToken getTransactionState(OperationContext* opCtx);
/**
* Returns the activeTxn back to this SessionTransactionState.
diff --git a/src/mongo/db/session_txn_write_history_iterator.cpp b/src/mongo/db/session_txn_write_history_iterator.cpp
new file mode 100644
index 00000000000..55564550d28
--- /dev/null
+++ b/src/mongo/db/session_txn_write_history_iterator.cpp
@@ -0,0 +1,49 @@
+/**
+ * Copyright (C) 2017 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/session_txn_write_history_iterator.h"
+
+namespace mongo {
+
+SessionTxnWriteHistoryIterator::SessionTxnWriteHistoryIterator(repl::OpTime startingOpTime)
+ : _nextOpTime(std::move(startingOpTime)) {}
+
+bool SessionTxnWriteHistoryIterator::hasNext() const {
+ return !_nextOpTime.isNull();
+}
+
+repl::OplogEntry next(OperationContext* opCtx) {
+ // TODO: use DBDirectClient to fetch the oplog entry.
+ // assert if !hasNext().
+ return repl::OplogEntry(BSONObj());
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/session_txn_write_history_iterator.h b/src/mongo/db/session_txn_write_history_iterator.h
new file mode 100644
index 00000000000..265e41c0336
--- /dev/null
+++ b/src/mongo/db/session_txn_write_history_iterator.h
@@ -0,0 +1,64 @@
+/**
+ * Copyright (C) 2017 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/optime.h"
+
+namespace mongo {
+
+class OperationContext;
+
+/**
+ * An iterator class that can traverse through the oplog entries that are linked via the prevTs
+ * field.
+ */
+class SessionTxnWriteHistoryIterator {
+public:
+ /**
+ * Creates a new iterator starting with an oplog entry with the given start opTime.
+ */
+ SessionTxnWriteHistoryIterator(repl::OpTime startingOpTime);
+
+ /**
+ * Returns false if there are no more entries to iterate.
+ */
+ bool hasNext() const;
+
+ /**
+ * Returns the next oplog entry.
+ * Throws if hasNext is false.
+ */
+ repl::OplogEntry next(OperationContext* opCtx);
+
+private:
+ repl::OpTime _nextOpTime;
+};
+
+} // namespace mongo