diff options
Diffstat (limited to 'src/mongo/shell/session.js')
-rw-r--r-- | src/mongo/shell/session.js | 163 |
1 files changed, 159 insertions, 4 deletions
diff --git a/src/mongo/shell/session.js b/src/mongo/shell/session.js index 31cbdfba57b..90460663284 100644 --- a/src/mongo/shell/session.js +++ b/src/mongo/shell/session.js @@ -10,6 +10,7 @@ var { let _readPreference = rawOptions.readPreference; let _readConcern = rawOptions.readConcern; let _writeConcern = rawOptions.writeConcern; + let _retryWrites = rawOptions.retryWrites; this.getReadPreference = function getReadPreference() { return _readPreference; @@ -37,6 +38,14 @@ var { } _writeConcern = writeConcern; }; + + this.shouldRetryWrites = function shouldRetryWrites() { + return _retryWrites; + }; + + this.setRetryWrites = function setRetryWrites(retryWrites = true) { + _retryWrites = retryWrites; + }; } function SessionAwareClient(client) { @@ -93,12 +102,41 @@ var { } } + function runClientFunctionWithRetries( + driverSession, cmdObj, clientFunction, clientFunctionArguments) { + let cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command object + // inside the query/$query object. + if (cmdName === "query" || cmdName === "$query") { + cmdObj = cmdObj[cmdName]; + cmdName = Object.keys(cmdObj)[0]; + } + + const numRetries = cmdObj.hasOwnProperty("txnNumber") ? 1 : 0; + + do { + try { + return clientFunction.apply(client, clientFunctionArguments); + } catch (e) { + // TODO: Should we run an explicit "isMaster" command in order to compare the + // wire version of the server after we reconnect to it? + if (!isNetworkError(e) || numRetries === 0) { + throw e; + } + } + + --numRetries; + } while (numRetries >= 0); + } + this.runCommand = function runCommand(driverSession, dbName, cmdObj, options) { cmdObj = prepareCommandRequest(driverSession, cmdObj); - const res = client.runCommand(dbName, cmdObj, options); - processCommandResponse(driverSession, res); + const res = runClientFunctionWithRetries( + driverSession, cmdObj, client.runCommand, [dbName, cmdObj, options]); + processCommandResponse(driverSession, res); return res; }; @@ -106,15 +144,17 @@ var { driverSession, dbName, metadata, cmdObj) { cmdObj = prepareCommandRequest(driverSession, cmdObj); - const res = client.runCommandWithMetadata(dbName, metadata, cmdObj); - processCommandResponse(driverSession, res); + const res = runClientFunctionWithRetries( + driverSession, cmdObj, client.runCommandWithMetadata, [dbName, metadata, cmdObj]); + processCommandResponse(driverSession, res); return res; }; } function ServerSession(client) { let _lastUsed = new Date(); + let _nextTxnNum = 0; this.client = new SessionAwareClient(client); this.handle = client._startSession(); @@ -150,6 +190,113 @@ var { return cmdObj; }; + + this.assignTransactionNumber = function assignTransactionNumber(cmdObj) { + cmdObj = Object.assign({}, cmdObj); + + const cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command object + // inside the query/$query object. + let cmdObjUnwrapped = cmdObj; + if (cmdName === "query" || cmdName === "$query") { + cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]); + cmdObjUnwrapped = cmdObj[cmdName]; + } + + if (!cmdObjUnwrapped.hasOwnProperty("txnNumber")) { + // Since there's no native support for adding NumberLong instances and getting back + // another NumberLong instance, converting from a 64-bit floating-point value to a + // 64-bit integer value will overflow at 2**53. + cmdObjUnwrapped.txnNumber = new NumberLong(_nextTxnNum); + ++_nextTxnNum; + } + + return cmdObj; + }; + + this.canRetryWrites = function canRetryWrites(cmdObj) { + let cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command name inside + // the query/$query object. + if (cmdName === "query" || cmdName === "$query") { + cmdObj = cmdObj[cmdName]; + cmdName = Object.keys(cmdObj)[0]; + } + + if (cmdName === "insert") { + if (!Array.isArray(cmdObj.documents)) { + // The command object is malformed, so we'll just leave it as-is and let the + // server reject it. + return false; + } + + if (cmdObj.documents.length === 1) { + // Single-statement operations (e.g. insertOne()) can be retried. + return true; + } + + // Multi-statement operations (e.g. insertMany()) can be retried if they are + // executed in order by the server. + return cmdObj.ordered ? true : false; + } else if (cmdName === "update") { + if (!Array.isArray(cmdObj.updates)) { + // The command object is malformed, so we'll just leave it as-is and let the + // server reject it. + return false; + } + + const hasMultiUpdate = cmdObj.updates.some(updateOp => updateOp.multi); + if (hasMultiUpdate) { + // Operations that modify multiple documents (e.g. updateMany()) cannot be + // retried. + return false; + } + + if (cmdObj.updates.length === 1) { + // Single-statement operations that modify a single document (e.g. updateOne()) + // can be retried. + return true; + } + + // Multi-statement operations that each modify a single document (e.g. bulkWrite()) + // can be retried if they are executed in order by the server. + return cmdObj.ordered ? true : false; + } else if (cmdName === "delete") { + if (!Array.isArray(cmdObj.deletes)) { + // The command object is malformed, so we'll just leave it as-is and let the + // server reject it. + return false; + } + + // We use bsonWoCompare() in order to handle cases where the limit is specified as a + // NumberInt() or NumberLong() instance. + const hasMultiDelete = cmdObj.deletes.some( + deleteOp => bsonWoCompare({_: deleteOp.limit}, {_: 0}) === 0); + if (hasMultiDelete) { + // Operations that modify multiple documents (e.g. deleteMany()) cannot be + // retried. + return false; + } + + if (cmdObj.deletes.length === 1) { + // Single-statement operations that modify a single document (e.g. deleteOne()) + // can be retried. + return true; + } + + // Multi-statement operations that each modify a single document (e.g. bulkWrite()) + // can be retried if they are executed in order by the server. + return cmdObj.ordered ? true : false; + } else if (cmdName === "findAndModify" || cmdName === "findandmodify") { + // Operations that modify a single document (e.g. findOneAndUpdate()) can be + // retried. + return true; + } + + return false; + }; } function makeDriverSessionConstructor(implMethods) { @@ -211,6 +358,14 @@ var { injectSessionId: function injectSessionId(cmdObj) { return cmdObj; }, + + assignTransactionNumber: function assignTransactionNumber(cmdObj) { + return cmdObj; + }, + + canRetryWrites: function canRetryWrites(cmdObj) { + return false; + }, }; }, |