diff options
author | Pavi Vetriselvan <pvselvan@umich.edu> | 2018-04-02 15:01:57 -0400 |
---|---|---|
committer | Pavi Vetriselvan <pvselvan@umich.edu> | 2018-04-02 15:05:10 -0400 |
commit | 94d23eaeacae39e59140b338ba7369a0f3572fc9 (patch) | |
tree | c96c98f0fd3893a7dba0fa08957b97a994b29a23 /src/mongo | |
parent | 1c2257457c1c566e84ea6ae6b0e42ac95336a709 (diff) | |
download | mongo-94d23eaeacae39e59140b338ba7369a0f3572fc9.tar.gz |
SERVER-33218 implement txn shell helpers for reads and writes
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/shell/bulk_api.js | 3 | ||||
-rw-r--r-- | src/mongo/shell/session.js | 180 |
2 files changed, 178 insertions, 5 deletions
diff --git a/src/mongo/shell/bulk_api.js b/src/mongo/shell/bulk_api.js index a872a1e1ca3..0e63b97d029 100644 --- a/src/mongo/shell/bulk_api.js +++ b/src/mongo/shell/bulk_api.js @@ -877,7 +877,8 @@ var _bulk_api_module = (function() { const session = collection.getDB().getSession(); if (serverSupportsRetryableWrites && session.getOptions().shouldRetryWrites() && - session._serverSession.canRetryWrites(cmd)) { + session._serverSession.canRetryWrites(cmd) && + !session._serverSession.isInActiveTransaction()) { cmd = session._serverSession.assignTransactionNumber(cmd); } } diff --git a/src/mongo/shell/session.js b/src/mongo/shell/session.js index 31c6431a26e..0a2701742c8 100644 --- a/src/mongo/shell/session.js +++ b/src/mongo/shell/session.js @@ -1,5 +1,8 @@ /** * Implements the sessions api for the shell. + * + * Roughly follows the driver sessions spec: + * https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst#abstract */ var { DriverSession, SessionOptions, _DummyDriverSession, _DelegatingDriverSession, @@ -259,6 +262,7 @@ var { } } + // Retryable writes code should execute only we are not in an active transaction. if (jsTest.options().alwaysInjectTransactionNumber && serverSupports(kWireVersionSupportingRetryableWrites) && driverSession.getOptions().shouldRetryWrites() && @@ -266,6 +270,12 @@ var { cmdObj = driverSession._serverSession.assignTransactionNumber(cmdObj); } + // If startTransaction was called on the session, attach txn number and readConcern. + // TODO: SERVER-34170 guard this code with a wire version check. + if (driverSession._serverSession.isInActiveTransaction()) { + cmdObj = driverSession._serverSession.assignTxnInfo(cmdObj); + } + return cmdObj; } @@ -337,7 +347,8 @@ var { const sessionOptions = driverSession.getOptions(); let numRetries = (sessionOptions.shouldRetryWrites() && cmdObj.hasOwnProperty("txnNumber") && - !jsTest.options().skipRetryOnNetworkError) + !jsTest.options().skipRetryOnNetworkError && + !driverSession._serverSession.isInActiveTransaction()) ? 1 : 0; @@ -432,17 +443,64 @@ var { }; } + function TransactionOptions(rawOptions = {}) { + if (!(this instanceof TransactionOptions)) { + return new TransactionOptions(rawOptions); + } + + let _readConcern = rawOptions.readConcern; + let _writeConcern = rawOptions.writeConcern; + + this.setTxnReadConcern = function setTxnReadConcern(value) { + _readConcern = value; + }; + + this.getTxnReadConcern = function getTxnReadConcern() { + return _readConcern; + }; + + this.setTxnWriteConcern = function setTxnWriteConcern(value) { + _writeConcern = value; + }; + + this.getTxnWriteConcern = function getTxnWriteConcern() { + return _writeConcern; + }; + } + + // The server session maintains the state of a transaction, a monotonically increasing txn + // number, and a transaction's read/write concerns. function ServerSession(client) { + // The default txnState is `inactive` until we call startTransaction. + let _txnState = ServerSession.TransactionStates.kInactive; + + let _txnOptions; + + // Keep track of the first statement of a transaction, which is the only statement + // in which a user can specify a readConcern and writeConcern. + // This will eventually turn into a stmtId field. + let _firstStatement = false; + + // _txnNumber starts at -1 because when we increment it, the first transaction + // and retryable write will both have a txnNumber of 0. + let _txnNumber = -1; let _lastUsed = new Date(); - let _nextTxnNum = 0; this.client = new SessionAwareClient(client); this.handle = client._startSession(); + this.isInActiveTransaction = function isInActiveTransaction() { + return _txnState === ServerSession.TransactionStates.kActive; + }; + this.getLastUsed = function getLastUsed() { return _lastUsed; }; + this.getTxnOptions = function getTxnOptions() { + return _txnOptions; + }; + function updateLastUsed() { _lastUsed = new Date(); } @@ -488,8 +546,8 @@ var { // Since there's no native support for adding NumberLong instances and getting back // another NumberLong instance, converting from a 64-bit floating-point value to a // 64-bit integer value will overflow at 2**53. - cmdObjUnwrapped.txnNumber = new NumberLong(_nextTxnNum); - ++_nextTxnNum; + _txnNumber++; + cmdObjUnwrapped.txnNumber = new NumberLong(_txnNumber); } return cmdObj; @@ -575,8 +633,83 @@ var { return false; }; + + this.assignTxnInfo = function assignTxnInfo(cmdObj) { + cmdObj = Object.assign({}, cmdObj); + + const cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command object + // inside the query/$query object. + let cmdObjUnwrapped = cmdObj; + if (cmdName === "query" || cmdName === "$query") { + cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]); + cmdObjUnwrapped = cmdObj[cmdName]; + } + + if (!cmdObjUnwrapped.hasOwnProperty("txnNumber")) { + // Since there's no native support for adding NumberLong instances and getting back + // another NumberLong instance, converting from a 64-bit floating-point value to a + // 64-bit integer value will overflow at 2**53. + cmdObjUnwrapped.txnNumber = new NumberLong(_txnNumber); + } + + // readConcern and autocommit can only be specified on the first statement in a + // transaction. + if (_firstStatement) { + // TODO: As a part of SERVER-34052, we might also need to specify + // `cmdObjUnwrapped.startTransaction = 1` on the first statement of a multi + // statement txn. + cmdObjUnwrapped.autocommit = false; + if (_txnOptions.getTxnReadConcern() !== undefined) { + cmdObjUnwrapped.readConcern = _txnOptions.getTxnReadConcern(); + } + if (_txnOptions.getTxnWriteConcern() !== undefined) { + cmdObjUnwrapped.writeConcern = _txnOptions.getTxnWriteConcern(); + } + _firstStatement = false; + } + + return cmdObj; + }; + + this.startTransaction = function startTransaction(txnOptsObj) { + if (_txnState === ServerSession.TransactionStates.kActive) { + throw new Error("There is already an active transaction on this session."); + } + _txnOptions = new TransactionOptions(txnOptsObj); + _txnState = ServerSession.TransactionStates.kActive; + _firstStatement = true; + _txnNumber++; + }; + + this.commitTransaction = function commitTransaction(driverSession) { + // run commitTxn command + return endTransaction("commitTransaction", driverSession); + }; + + this.abortTransaction = function abortTransaction(driverSession) { + // run abortTxn command + return endTransaction("abortTransaction", driverSession); + }; + + const endTransaction = (commandName, driverSession) => { + const cmd = {[commandName]: 1, txnNumber: NumberLong(_txnNumber)}; + // run command against the admin database. + const res = this.client.runCommand(driverSession, "admin", cmd, 0); + _txnState = ServerSession.TransactionStates.kInactive; + return res; + }; } + // TransactionStates represents the state of the current transaction. The default state + // is `inactive` until startTransaction is called and changes the state to `active`. + // Calling a successful abort or commitTransaction will change the state to `inactive`. + ServerSession.TransactionStates = { + kActive: 'active', + kInactive: 'inactive', + }; + function makeDriverSessionConstructor(implMethods, defaultOptions = {}) { return function(client, options = defaultOptions) { let _options = options; @@ -675,6 +808,18 @@ var { } return "session " + tojson(sessionId); }; + + this.startTransaction = function startTransaction(txnOptsObj = {}) { + this._serverSession.startTransaction(txnOptsObj); + }; + + this.commitTransaction = function commitTransaction() { + assert.commandWorked(this._serverSession.commitTransaction(this)); + }; + + this.abortTransaction = function abortTransaction() { + assert.commandWorked(this._serverSession.abortTransaction(this)); + }; }; } @@ -743,6 +888,33 @@ var { canRetryWrites: function canRetryWrites(cmdObj) { return false; }, + + assignTxnInfo: function assignTxnInfo(cmdObj) { + return cmdObj; + }, + + isInActiveTransaction: function isInActiveTransaction() { + return false; + }, + + getTxnOptions: function getTxnOptions() { + return {}; + }, + + startTransaction: function startTransaction() { + throw new Error("Must call startSession() on the Mongo connection " + + "object before starting a transaction."); + }, + + commitTransaction: function commitTransaction() { + throw new Error("Must call startSession() on the Mongo connection " + + "object before committing a transaction."); + }, + + abortTransaction: function abortTransaction() { + throw new Error("Must call startSession() on the Mongo connection " + + "object before aborting a transaction."); + }, }; }, |