summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPavi Vetriselvan <pvselvan@umich.edu>2018-04-02 15:01:57 -0400
committerPavi Vetriselvan <pvselvan@umich.edu>2018-04-02 15:05:10 -0400
commit94d23eaeacae39e59140b338ba7369a0f3572fc9 (patch)
treec96c98f0fd3893a7dba0fa08957b97a994b29a23 /src
parent1c2257457c1c566e84ea6ae6b0e42ac95336a709 (diff)
downloadmongo-94d23eaeacae39e59140b338ba7369a0f3572fc9.tar.gz
SERVER-33218 implement txn shell helpers for reads and writes
Diffstat (limited to 'src')
-rw-r--r--src/mongo/shell/bulk_api.js3
-rw-r--r--src/mongo/shell/session.js180
2 files changed, 178 insertions, 5 deletions
diff --git a/src/mongo/shell/bulk_api.js b/src/mongo/shell/bulk_api.js
index a872a1e1ca3..0e63b97d029 100644
--- a/src/mongo/shell/bulk_api.js
+++ b/src/mongo/shell/bulk_api.js
@@ -877,7 +877,8 @@ var _bulk_api_module = (function() {
const session = collection.getDB().getSession();
if (serverSupportsRetryableWrites && session.getOptions().shouldRetryWrites() &&
- session._serverSession.canRetryWrites(cmd)) {
+ session._serverSession.canRetryWrites(cmd) &&
+ !session._serverSession.isInActiveTransaction()) {
cmd = session._serverSession.assignTransactionNumber(cmd);
}
}
diff --git a/src/mongo/shell/session.js b/src/mongo/shell/session.js
index 31c6431a26e..0a2701742c8 100644
--- a/src/mongo/shell/session.js
+++ b/src/mongo/shell/session.js
@@ -1,5 +1,8 @@
/**
* Implements the sessions api for the shell.
+ *
+ * Roughly follows the driver sessions spec:
+ * https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#abstract
*/
var {
DriverSession, SessionOptions, _DummyDriverSession, _DelegatingDriverSession,
@@ -259,6 +262,7 @@ var {
}
}
+ // Retryable writes code should execute only we are not in an active transaction.
if (jsTest.options().alwaysInjectTransactionNumber &&
serverSupports(kWireVersionSupportingRetryableWrites) &&
driverSession.getOptions().shouldRetryWrites() &&
@@ -266,6 +270,12 @@ var {
cmdObj = driverSession._serverSession.assignTransactionNumber(cmdObj);
}
+ // If startTransaction was called on the session, attach txn number and readConcern.
+ // TODO: SERVER-34170 guard this code with a wire version check.
+ if (driverSession._serverSession.isInActiveTransaction()) {
+ cmdObj = driverSession._serverSession.assignTxnInfo(cmdObj);
+ }
+
return cmdObj;
}
@@ -337,7 +347,8 @@ var {
const sessionOptions = driverSession.getOptions();
let numRetries =
(sessionOptions.shouldRetryWrites() && cmdObj.hasOwnProperty("txnNumber") &&
- !jsTest.options().skipRetryOnNetworkError)
+ !jsTest.options().skipRetryOnNetworkError &&
+ !driverSession._serverSession.isInActiveTransaction())
? 1
: 0;
@@ -432,17 +443,64 @@ var {
};
}
+ function TransactionOptions(rawOptions = {}) {
+ if (!(this instanceof TransactionOptions)) {
+ return new TransactionOptions(rawOptions);
+ }
+
+ let _readConcern = rawOptions.readConcern;
+ let _writeConcern = rawOptions.writeConcern;
+
+ this.setTxnReadConcern = function setTxnReadConcern(value) {
+ _readConcern = value;
+ };
+
+ this.getTxnReadConcern = function getTxnReadConcern() {
+ return _readConcern;
+ };
+
+ this.setTxnWriteConcern = function setTxnWriteConcern(value) {
+ _writeConcern = value;
+ };
+
+ this.getTxnWriteConcern = function getTxnWriteConcern() {
+ return _writeConcern;
+ };
+ }
+
+ // The server session maintains the state of a transaction, a monotonically increasing txn
+ // number, and a transaction's read/write concerns.
function ServerSession(client) {
+ // The default txnState is `inactive` until we call startTransaction.
+ let _txnState = ServerSession.TransactionStates.kInactive;
+
+ let _txnOptions;
+
+ // Keep track of the first statement of a transaction, which is the only statement
+ // in which a user can specify a readConcern and writeConcern.
+ // This will eventually turn into a stmtId field.
+ let _firstStatement = false;
+
+ // _txnNumber starts at -1 because when we increment it, the first transaction
+ // and retryable write will both have a txnNumber of 0.
+ let _txnNumber = -1;
let _lastUsed = new Date();
- let _nextTxnNum = 0;
this.client = new SessionAwareClient(client);
this.handle = client._startSession();
+ this.isInActiveTransaction = function isInActiveTransaction() {
+ return _txnState === ServerSession.TransactionStates.kActive;
+ };
+
this.getLastUsed = function getLastUsed() {
return _lastUsed;
};
+ this.getTxnOptions = function getTxnOptions() {
+ return _txnOptions;
+ };
+
function updateLastUsed() {
_lastUsed = new Date();
}
@@ -488,8 +546,8 @@ var {
// Since there's no native support for adding NumberLong instances and getting back
// another NumberLong instance, converting from a 64-bit floating-point value to a
// 64-bit integer value will overflow at 2**53.
- cmdObjUnwrapped.txnNumber = new NumberLong(_nextTxnNum);
- ++_nextTxnNum;
+ _txnNumber++;
+ cmdObjUnwrapped.txnNumber = new NumberLong(_txnNumber);
}
return cmdObj;
@@ -575,8 +633,83 @@ var {
return false;
};
+
+ this.assignTxnInfo = function assignTxnInfo(cmdObj) {
+ cmdObj = Object.assign({}, cmdObj);
+
+ const cmdName = Object.keys(cmdObj)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command object
+ // inside the query/$query object.
+ let cmdObjUnwrapped = cmdObj;
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]);
+ cmdObjUnwrapped = cmdObj[cmdName];
+ }
+
+ if (!cmdObjUnwrapped.hasOwnProperty("txnNumber")) {
+ // Since there's no native support for adding NumberLong instances and getting back
+ // another NumberLong instance, converting from a 64-bit floating-point value to a
+ // 64-bit integer value will overflow at 2**53.
+ cmdObjUnwrapped.txnNumber = new NumberLong(_txnNumber);
+ }
+
+ // readConcern and autocommit can only be specified on the first statement in a
+ // transaction.
+ if (_firstStatement) {
+ // TODO: As a part of SERVER-34052, we might also need to specify
+ // `cmdObjUnwrapped.startTransaction = 1` on the first statement of a multi
+ // statement txn.
+ cmdObjUnwrapped.autocommit = false;
+ if (_txnOptions.getTxnReadConcern() !== undefined) {
+ cmdObjUnwrapped.readConcern = _txnOptions.getTxnReadConcern();
+ }
+ if (_txnOptions.getTxnWriteConcern() !== undefined) {
+ cmdObjUnwrapped.writeConcern = _txnOptions.getTxnWriteConcern();
+ }
+ _firstStatement = false;
+ }
+
+ return cmdObj;
+ };
+
+ this.startTransaction = function startTransaction(txnOptsObj) {
+ if (_txnState === ServerSession.TransactionStates.kActive) {
+ throw new Error("There is already an active transaction on this session.");
+ }
+ _txnOptions = new TransactionOptions(txnOptsObj);
+ _txnState = ServerSession.TransactionStates.kActive;
+ _firstStatement = true;
+ _txnNumber++;
+ };
+
+ this.commitTransaction = function commitTransaction(driverSession) {
+ // run commitTxn command
+ return endTransaction("commitTransaction", driverSession);
+ };
+
+ this.abortTransaction = function abortTransaction(driverSession) {
+ // run abortTxn command
+ return endTransaction("abortTransaction", driverSession);
+ };
+
+ const endTransaction = (commandName, driverSession) => {
+ const cmd = {[commandName]: 1, txnNumber: NumberLong(_txnNumber)};
+ // run command against the admin database.
+ const res = this.client.runCommand(driverSession, "admin", cmd, 0);
+ _txnState = ServerSession.TransactionStates.kInactive;
+ return res;
+ };
}
+ // TransactionStates represents the state of the current transaction. The default state
+ // is `inactive` until startTransaction is called and changes the state to `active`.
+ // Calling a successful abort or commitTransaction will change the state to `inactive`.
+ ServerSession.TransactionStates = {
+ kActive: 'active',
+ kInactive: 'inactive',
+ };
+
function makeDriverSessionConstructor(implMethods, defaultOptions = {}) {
return function(client, options = defaultOptions) {
let _options = options;
@@ -675,6 +808,18 @@ var {
}
return "session " + tojson(sessionId);
};
+
+ this.startTransaction = function startTransaction(txnOptsObj = {}) {
+ this._serverSession.startTransaction(txnOptsObj);
+ };
+
+ this.commitTransaction = function commitTransaction() {
+ assert.commandWorked(this._serverSession.commitTransaction(this));
+ };
+
+ this.abortTransaction = function abortTransaction() {
+ assert.commandWorked(this._serverSession.abortTransaction(this));
+ };
};
}
@@ -743,6 +888,33 @@ var {
canRetryWrites: function canRetryWrites(cmdObj) {
return false;
},
+
+ assignTxnInfo: function assignTxnInfo(cmdObj) {
+ return cmdObj;
+ },
+
+ isInActiveTransaction: function isInActiveTransaction() {
+ return false;
+ },
+
+ getTxnOptions: function getTxnOptions() {
+ return {};
+ },
+
+ startTransaction: function startTransaction() {
+ throw new Error("Must call startSession() on the Mongo connection " +
+ "object before starting a transaction.");
+ },
+
+ commitTransaction: function commitTransaction() {
+ throw new Error("Must call startSession() on the Mongo connection " +
+ "object before committing a transaction.");
+ },
+
+ abortTransaction: function abortTransaction() {
+ throw new Error("Must call startSession() on the Mongo connection " +
+ "object before aborting a transaction.");
+ },
};
},