From 45d4ddb4aaaf5b0bbe36442659c76be494a92af0 Mon Sep 17 00:00:00 2001 From: Max Hirschhorn Date: Wed, 30 Aug 2017 11:15:13 -0400 Subject: SERVER-30686 Add support for retryWrites=true in the mongo shell. --- .../suites/retryable_writes_jscore_passthrough.yml | 41 ++++++ jstests/noPassthrough/shell_can_retry_writes.js | 152 +++++++++++++++++++ src/mongo/shell/bulk_api.js | 13 ++ src/mongo/shell/collection.js | 13 ++ src/mongo/shell/session.js | 163 ++++++++++++++++++++- 5 files changed, 378 insertions(+), 4 deletions(-) create mode 100644 buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml create mode 100644 jstests/noPassthrough/shell_can_retry_writes.js diff --git a/buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml new file mode 100644 index 00000000000..4637156aae2 --- /dev/null +++ b/buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml @@ -0,0 +1,41 @@ +test_kind: js_test + +selector: + roots: + - jstests/core/**/*.js + exclude_files: + # These tests are not expected to pass with replica-sets: + - jstests/core/capped_update.js + - jstests/core/dbadmin.js + - jstests/core/opcounters_write_cmd.js + - jstests/core/read_after_optime.js + +executor: + config: + shell_options: + eval: >- + testingReplication = true; + load("jstests/libs/override_methods/enable_sessions.js"); + global_vars: + TestData: + sessionOptions: + retryWrites: true + readMode: commands + hooks: + # The CheckReplDBHash hook waits until all operations have replicated to and have been applied + # on the secondaries, so we run the ValidateCollections hook after it to ensure we're + # validating the entire contents of the collection. + - class: CheckReplOplogs + - class: CheckReplDBHash + - class: ValidateCollections + - class: CleanEveryN + n: 20 + fixture: + class: ReplicaSetFixture + mongod_options: + bind_ip_all: '' + oplogSize: 511 + set_parameters: + enableTestCommands: 1 + numInitialSyncAttempts: 1 + num_nodes: 2 diff --git a/jstests/noPassthrough/shell_can_retry_writes.js b/jstests/noPassthrough/shell_can_retry_writes.js new file mode 100644 index 00000000000..362e6a955c2 --- /dev/null +++ b/jstests/noPassthrough/shell_can_retry_writes.js @@ -0,0 +1,152 @@ +/** + * Tests that write operations executed through the mongo shell's CRUD API are assigned a + * transaction number so that they can be retried. + */ +(function() { + "use strict"; + + const rst = new ReplSetTest({nodes: 1}); + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + const db = primary.startSession({retryWrites: true}).getDatabase("test"); + const coll = db.shell_can_retry_writes; + + function testCommandCanBeRetried(func, expected = true) { + const mongoRunCommandOriginal = Mongo.prototype.runCommand; + + const sentinel = {}; + let cmdObjSeen = sentinel; + + Mongo.prototype.runCommand = function runComandSpy(dbName, cmdObj, options) { + cmdObjSeen = cmdObj; + return mongoRunCommandOriginal.apply(this, arguments); + }; + + try { + assert.doesNotThrow(func); + } finally { + Mongo.prototype.runCommand = mongoRunCommandOriginal; + } + + if (cmdObjSeen === sentinel) { + throw new Error("Mongo.prototype.runCommand() was never called: " + func.toString()); + } + + let cmdName = Object.keys(cmdObjSeen)[0]; + + // If the command is in a wrapped form, then we look for the actual command object inside + // the query/$query object. + if (cmdName === "query" || cmdName === "$query") { + cmdObjSeen = cmdObjSeen[cmdName]; + cmdName = Object.keys(cmdObjSeen)[0]; + } + + assert(cmdObjSeen.hasOwnProperty("lsid"), + "Expected operation " + tojson(cmdObjSeen) + " to have a logical session id: " + + func.toString()); + + if (expected) { + assert(cmdObjSeen.hasOwnProperty("txnNumber"), + "Expected operation " + tojson(cmdObjSeen) + + " to be assigned a transaction number since it can be retried: " + + func.toString()); + } else { + assert(!cmdObjSeen.hasOwnProperty("txnNumber"), + "Expected operation " + tojson(cmdObjSeen) + + " to not be assigned a transaction number since it cannot be retried: " + + func.toString()); + } + } + + testCommandCanBeRetried(function() { + coll.insertOne({_id: 0}); + }); + + testCommandCanBeRetried(function() { + coll.updateOne({_id: 0}, {$set: {a: 1}}); + }); + + testCommandCanBeRetried(function() { + coll.updateOne({_id: 1}, {$set: {a: 2}}, {upsert: true}); + }); + + testCommandCanBeRetried(function() { + coll.deleteOne({_id: 1}); + }); + + testCommandCanBeRetried(function() { + coll.insertMany([{_id: 2, b: 3}, {_id: 3, b: 4}], {ordered: true}); + }); + + testCommandCanBeRetried(function() { + coll.insertMany([{_id: 4}, {_id: 5}], {ordered: false}); + }, false); + + testCommandCanBeRetried(function() { + coll.updateMany({a: {$gt: 0}}, {$set: {c: 7}}); + }, false); + + testCommandCanBeRetried(function() { + coll.deleteMany({b: {$lt: 5}}); + }, false); + + // + // Tests for bulkWrite(). + // + + testCommandCanBeRetried(function() { + coll.bulkWrite([{insertOne: {document: {_id: 10}}}]); + }); + + testCommandCanBeRetried(function() { + coll.bulkWrite([{updateOne: {filter: {_id: 10}, update: {$set: {a: 1}}}}]); + }); + + testCommandCanBeRetried(function() { + coll.bulkWrite([{updateOne: {filter: {_id: 10}, update: {$set: {a: 2}}, upsert: true}}]); + }); + + testCommandCanBeRetried(function() { + coll.bulkWrite([{deleteOne: {filter: {_id: 10}}}]); + }); + + testCommandCanBeRetried(function() { + coll.bulkWrite( + [{insertOne: {document: {_id: 20, b: 3}}}, {insertOne: {document: {_id: 30, b: 4}}}], + {ordered: true}); + }); + + testCommandCanBeRetried(function() { + coll.bulkWrite([{insertOne: {document: {_id: 40}}}, {insertOne: {document: {_id: 50}}}], + {ordered: false}); + }, false); + + testCommandCanBeRetried(function() { + coll.bulkWrite([{updateMany: {filter: {a: {$gt: 0}}, update: {$set: {c: 7}}}}]); + }, false); + + testCommandCanBeRetried(function() { + coll.bulkWrite([{deleteMany: {filter: {b: {$lt: 5}}}}]); + }, false); + + // + // Tests for wrappers around "findAndModify" command. + // + + testCommandCanBeRetried(function() { + coll.findOneAndUpdate({_id: 100}, {$set: {d: 9}}, {upsert: true}); + }); + + testCommandCanBeRetried(function() { + coll.findOneAndReplace({_id: 100}, {e: 11}); + }); + + testCommandCanBeRetried(function() { + coll.findOneAndDelete({e: {$exists: true}}); + }); + + db.getSession().endSession(); + rst.stopSet(); +})(); diff --git a/src/mongo/shell/bulk_api.js b/src/mongo/shell/bulk_api.js index e741411516f..ef284e1500d 100644 --- a/src/mongo/shell/bulk_api.js +++ b/src/mongo/shell/bulk_api.js @@ -892,6 +892,19 @@ var _bulk_api_module = (function() { cmd.writeConcern = writeConcern; } + { + const kWireVersionSupportingRetryableWrites = 6; + const serverSupportsRetryableWrites = + coll.getMongo().getMinWireVersion() <= kWireVersionSupportingRetryableWrites && + kWireVersionSupportingRetryableWrites <= coll.getMongo().getMaxWireVersion(); + + const session = collection.getDB().getSession(); + if (serverSupportsRetryableWrites && session.getOptions().shouldRetryWrites() && + session._serverSession.canRetryWrites(cmd)) { + cmd = session._serverSession.assignTransactionNumber(cmd); + } + } + return cmd; }; diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js index f888ed67c4e..48f39a06c1d 100644 --- a/src/mongo/shell/collection.js +++ b/src/mongo/shell/collection.js @@ -776,6 +776,19 @@ DBCollection.prototype.findAndModify = function(args) { cmd[key] = args[key]; } + { + const kWireVersionSupportingRetryableWrites = 6; + const serverSupportsRetryableWrites = + this.getMongo().getMinWireVersion() <= kWireVersionSupportingRetryableWrites && + kWireVersionSupportingRetryableWrites <= this.getMongo().getMaxWireVersion(); + + const session = this.getDB().getSession(); + if (serverSupportsRetryableWrites && session.getOptions().shouldRetryWrites() && + session._serverSession.canRetryWrites(cmd)) { + cmd = session._serverSession.assignTransactionNumber(cmd); + } + } + var ret = this._db.runCommand(cmd); if (!ret.ok) { if (ret.errmsg == "No matching object found") { diff --git a/src/mongo/shell/session.js b/src/mongo/shell/session.js index 31cbdfba57b..90460663284 100644 --- a/src/mongo/shell/session.js +++ b/src/mongo/shell/session.js @@ -10,6 +10,7 @@ var { let _readPreference = rawOptions.readPreference; let _readConcern = rawOptions.readConcern; let _writeConcern = rawOptions.writeConcern; + let _retryWrites = rawOptions.retryWrites; this.getReadPreference = function getReadPreference() { return _readPreference; @@ -37,6 +38,14 @@ var { } _writeConcern = writeConcern; }; + + this.shouldRetryWrites = function shouldRetryWrites() { + return _retryWrites; + }; + + this.setRetryWrites = function setRetryWrites(retryWrites = true) { + _retryWrites = retryWrites; + }; } function SessionAwareClient(client) { @@ -93,12 +102,41 @@ var { } } + function runClientFunctionWithRetries( + driverSession, cmdObj, clientFunction, clientFunctionArguments) { + let cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command object + // inside the query/$query object. + if (cmdName === "query" || cmdName === "$query") { + cmdObj = cmdObj[cmdName]; + cmdName = Object.keys(cmdObj)[0]; + } + + const numRetries = cmdObj.hasOwnProperty("txnNumber") ? 1 : 0; + + do { + try { + return clientFunction.apply(client, clientFunctionArguments); + } catch (e) { + // TODO: Should we run an explicit "isMaster" command in order to compare the + // wire version of the server after we reconnect to it? + if (!isNetworkError(e) || numRetries === 0) { + throw e; + } + } + + --numRetries; + } while (numRetries >= 0); + } + this.runCommand = function runCommand(driverSession, dbName, cmdObj, options) { cmdObj = prepareCommandRequest(driverSession, cmdObj); - const res = client.runCommand(dbName, cmdObj, options); - processCommandResponse(driverSession, res); + const res = runClientFunctionWithRetries( + driverSession, cmdObj, client.runCommand, [dbName, cmdObj, options]); + processCommandResponse(driverSession, res); return res; }; @@ -106,15 +144,17 @@ var { driverSession, dbName, metadata, cmdObj) { cmdObj = prepareCommandRequest(driverSession, cmdObj); - const res = client.runCommandWithMetadata(dbName, metadata, cmdObj); - processCommandResponse(driverSession, res); + const res = runClientFunctionWithRetries( + driverSession, cmdObj, client.runCommandWithMetadata, [dbName, metadata, cmdObj]); + processCommandResponse(driverSession, res); return res; }; } function ServerSession(client) { let _lastUsed = new Date(); + let _nextTxnNum = 0; this.client = new SessionAwareClient(client); this.handle = client._startSession(); @@ -150,6 +190,113 @@ var { return cmdObj; }; + + this.assignTransactionNumber = function assignTransactionNumber(cmdObj) { + cmdObj = Object.assign({}, cmdObj); + + const cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command object + // inside the query/$query object. + let cmdObjUnwrapped = cmdObj; + if (cmdName === "query" || cmdName === "$query") { + cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]); + cmdObjUnwrapped = cmdObj[cmdName]; + } + + if (!cmdObjUnwrapped.hasOwnProperty("txnNumber")) { + // Since there's no native support for adding NumberLong instances and getting back + // another NumberLong instance, converting from a 64-bit floating-point value to a + // 64-bit integer value will overflow at 2**53. + cmdObjUnwrapped.txnNumber = new NumberLong(_nextTxnNum); + ++_nextTxnNum; + } + + return cmdObj; + }; + + this.canRetryWrites = function canRetryWrites(cmdObj) { + let cmdName = Object.keys(cmdObj)[0]; + + // If the command is in a wrapped form, then we look for the actual command name inside + // the query/$query object. + if (cmdName === "query" || cmdName === "$query") { + cmdObj = cmdObj[cmdName]; + cmdName = Object.keys(cmdObj)[0]; + } + + if (cmdName === "insert") { + if (!Array.isArray(cmdObj.documents)) { + // The command object is malformed, so we'll just leave it as-is and let the + // server reject it. + return false; + } + + if (cmdObj.documents.length === 1) { + // Single-statement operations (e.g. insertOne()) can be retried. + return true; + } + + // Multi-statement operations (e.g. insertMany()) can be retried if they are + // executed in order by the server. + return cmdObj.ordered ? true : false; + } else if (cmdName === "update") { + if (!Array.isArray(cmdObj.updates)) { + // The command object is malformed, so we'll just leave it as-is and let the + // server reject it. + return false; + } + + const hasMultiUpdate = cmdObj.updates.some(updateOp => updateOp.multi); + if (hasMultiUpdate) { + // Operations that modify multiple documents (e.g. updateMany()) cannot be + // retried. + return false; + } + + if (cmdObj.updates.length === 1) { + // Single-statement operations that modify a single document (e.g. updateOne()) + // can be retried. + return true; + } + + // Multi-statement operations that each modify a single document (e.g. bulkWrite()) + // can be retried if they are executed in order by the server. + return cmdObj.ordered ? true : false; + } else if (cmdName === "delete") { + if (!Array.isArray(cmdObj.deletes)) { + // The command object is malformed, so we'll just leave it as-is and let the + // server reject it. + return false; + } + + // We use bsonWoCompare() in order to handle cases where the limit is specified as a + // NumberInt() or NumberLong() instance. + const hasMultiDelete = cmdObj.deletes.some( + deleteOp => bsonWoCompare({_: deleteOp.limit}, {_: 0}) === 0); + if (hasMultiDelete) { + // Operations that modify multiple documents (e.g. deleteMany()) cannot be + // retried. + return false; + } + + if (cmdObj.deletes.length === 1) { + // Single-statement operations that modify a single document (e.g. deleteOne()) + // can be retried. + return true; + } + + // Multi-statement operations that each modify a single document (e.g. bulkWrite()) + // can be retried if they are executed in order by the server. + return cmdObj.ordered ? true : false; + } else if (cmdName === "findAndModify" || cmdName === "findandmodify") { + // Operations that modify a single document (e.g. findOneAndUpdate()) can be + // retried. + return true; + } + + return false; + }; } function makeDriverSessionConstructor(implMethods) { @@ -211,6 +358,14 @@ var { injectSessionId: function injectSessionId(cmdObj) { return cmdObj; }, + + assignTransactionNumber: function assignTransactionNumber(cmdObj) { + return cmdObj; + }, + + canRetryWrites: function canRetryWrites(cmdObj) { + return false; + }, }; }, -- cgit v1.2.1