summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2017-08-30 11:15:13 -0400
committerMax Hirschhorn <max.hirschhorn@mongodb.com>2017-08-30 11:15:13 -0400
commit45d4ddb4aaaf5b0bbe36442659c76be494a92af0 (patch)
tree0e780ee7770318bbe09fb7eaeaefb48ae4eda476
parente9f25df1ee004319caa493fe4d4b5c34948df9f3 (diff)
downloadmongo-45d4ddb4aaaf5b0bbe36442659c76be494a92af0.tar.gz
SERVER-30686 Add support for retryWrites=true in the mongo shell.
-rw-r--r--buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml41
-rw-r--r--jstests/noPassthrough/shell_can_retry_writes.js152
-rw-r--r--src/mongo/shell/bulk_api.js13
-rw-r--r--src/mongo/shell/collection.js13
-rw-r--r--src/mongo/shell/session.js163
5 files changed, 378 insertions, 4 deletions
diff --git a/buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml
new file mode 100644
index 00000000000..4637156aae2
--- /dev/null
+++ b/buildscripts/resmokeconfig/suites/retryable_writes_jscore_passthrough.yml
@@ -0,0 +1,41 @@
+test_kind: js_test
+
+selector:
+ roots:
+ - jstests/core/**/*.js
+ exclude_files:
+ # These tests are not expected to pass with replica-sets:
+ - jstests/core/capped_update.js
+ - jstests/core/dbadmin.js
+ - jstests/core/opcounters_write_cmd.js
+ - jstests/core/read_after_optime.js
+
+executor:
+ config:
+ shell_options:
+ eval: >-
+ testingReplication = true;
+ load("jstests/libs/override_methods/enable_sessions.js");
+ global_vars:
+ TestData:
+ sessionOptions:
+ retryWrites: true
+ readMode: commands
+ hooks:
+ # The CheckReplDBHash hook waits until all operations have replicated to and have been applied
+ # on the secondaries, so we run the ValidateCollections hook after it to ensure we're
+ # validating the entire contents of the collection.
+ - class: CheckReplOplogs
+ - class: CheckReplDBHash
+ - class: ValidateCollections
+ - class: CleanEveryN
+ n: 20
+ fixture:
+ class: ReplicaSetFixture
+ mongod_options:
+ bind_ip_all: ''
+ oplogSize: 511
+ set_parameters:
+ enableTestCommands: 1
+ numInitialSyncAttempts: 1
+ num_nodes: 2
diff --git a/jstests/noPassthrough/shell_can_retry_writes.js b/jstests/noPassthrough/shell_can_retry_writes.js
new file mode 100644
index 00000000000..362e6a955c2
--- /dev/null
+++ b/jstests/noPassthrough/shell_can_retry_writes.js
@@ -0,0 +1,152 @@
+/**
+ * Tests that write operations executed through the mongo shell's CRUD API are assigned a
+ * transaction number so that they can be retried.
+ */
+(function() {
+ "use strict";
+
+ const rst = new ReplSetTest({nodes: 1});
+ rst.startSet();
+ rst.initiate();
+
+ const primary = rst.getPrimary();
+ const db = primary.startSession({retryWrites: true}).getDatabase("test");
+ const coll = db.shell_can_retry_writes;
+
+ function testCommandCanBeRetried(func, expected = true) {
+ const mongoRunCommandOriginal = Mongo.prototype.runCommand;
+
+ const sentinel = {};
+ let cmdObjSeen = sentinel;
+
+ Mongo.prototype.runCommand = function runComandSpy(dbName, cmdObj, options) {
+ cmdObjSeen = cmdObj;
+ return mongoRunCommandOriginal.apply(this, arguments);
+ };
+
+ try {
+ assert.doesNotThrow(func);
+ } finally {
+ Mongo.prototype.runCommand = mongoRunCommandOriginal;
+ }
+
+ if (cmdObjSeen === sentinel) {
+ throw new Error("Mongo.prototype.runCommand() was never called: " + func.toString());
+ }
+
+ let cmdName = Object.keys(cmdObjSeen)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command object inside
+ // the query/$query object.
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObjSeen = cmdObjSeen[cmdName];
+ cmdName = Object.keys(cmdObjSeen)[0];
+ }
+
+ assert(cmdObjSeen.hasOwnProperty("lsid"),
+ "Expected operation " + tojson(cmdObjSeen) + " to have a logical session id: " +
+ func.toString());
+
+ if (expected) {
+ assert(cmdObjSeen.hasOwnProperty("txnNumber"),
+ "Expected operation " + tojson(cmdObjSeen) +
+ " to be assigned a transaction number since it can be retried: " +
+ func.toString());
+ } else {
+ assert(!cmdObjSeen.hasOwnProperty("txnNumber"),
+ "Expected operation " + tojson(cmdObjSeen) +
+ " to not be assigned a transaction number since it cannot be retried: " +
+ func.toString());
+ }
+ }
+
+ testCommandCanBeRetried(function() {
+ coll.insertOne({_id: 0});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.updateOne({_id: 0}, {$set: {a: 1}});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.updateOne({_id: 1}, {$set: {a: 2}}, {upsert: true});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.deleteOne({_id: 1});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.insertMany([{_id: 2, b: 3}, {_id: 3, b: 4}], {ordered: true});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.insertMany([{_id: 4}, {_id: 5}], {ordered: false});
+ }, false);
+
+ testCommandCanBeRetried(function() {
+ coll.updateMany({a: {$gt: 0}}, {$set: {c: 7}});
+ }, false);
+
+ testCommandCanBeRetried(function() {
+ coll.deleteMany({b: {$lt: 5}});
+ }, false);
+
+ //
+ // Tests for bulkWrite().
+ //
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite([{insertOne: {document: {_id: 10}}}]);
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite([{updateOne: {filter: {_id: 10}, update: {$set: {a: 1}}}}]);
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite([{updateOne: {filter: {_id: 10}, update: {$set: {a: 2}}, upsert: true}}]);
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite([{deleteOne: {filter: {_id: 10}}}]);
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite(
+ [{insertOne: {document: {_id: 20, b: 3}}}, {insertOne: {document: {_id: 30, b: 4}}}],
+ {ordered: true});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite([{insertOne: {document: {_id: 40}}}, {insertOne: {document: {_id: 50}}}],
+ {ordered: false});
+ }, false);
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite([{updateMany: {filter: {a: {$gt: 0}}, update: {$set: {c: 7}}}}]);
+ }, false);
+
+ testCommandCanBeRetried(function() {
+ coll.bulkWrite([{deleteMany: {filter: {b: {$lt: 5}}}}]);
+ }, false);
+
+ //
+ // Tests for wrappers around "findAndModify" command.
+ //
+
+ testCommandCanBeRetried(function() {
+ coll.findOneAndUpdate({_id: 100}, {$set: {d: 9}}, {upsert: true});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.findOneAndReplace({_id: 100}, {e: 11});
+ });
+
+ testCommandCanBeRetried(function() {
+ coll.findOneAndDelete({e: {$exists: true}});
+ });
+
+ db.getSession().endSession();
+ rst.stopSet();
+})();
diff --git a/src/mongo/shell/bulk_api.js b/src/mongo/shell/bulk_api.js
index e741411516f..ef284e1500d 100644
--- a/src/mongo/shell/bulk_api.js
+++ b/src/mongo/shell/bulk_api.js
@@ -892,6 +892,19 @@ var _bulk_api_module = (function() {
cmd.writeConcern = writeConcern;
}
+ {
+ const kWireVersionSupportingRetryableWrites = 6;
+ const serverSupportsRetryableWrites =
+ coll.getMongo().getMinWireVersion() <= kWireVersionSupportingRetryableWrites &&
+ kWireVersionSupportingRetryableWrites <= coll.getMongo().getMaxWireVersion();
+
+ const session = collection.getDB().getSession();
+ if (serverSupportsRetryableWrites && session.getOptions().shouldRetryWrites() &&
+ session._serverSession.canRetryWrites(cmd)) {
+ cmd = session._serverSession.assignTransactionNumber(cmd);
+ }
+ }
+
return cmd;
};
diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js
index f888ed67c4e..48f39a06c1d 100644
--- a/src/mongo/shell/collection.js
+++ b/src/mongo/shell/collection.js
@@ -776,6 +776,19 @@ DBCollection.prototype.findAndModify = function(args) {
cmd[key] = args[key];
}
+ {
+ const kWireVersionSupportingRetryableWrites = 6;
+ const serverSupportsRetryableWrites =
+ this.getMongo().getMinWireVersion() <= kWireVersionSupportingRetryableWrites &&
+ kWireVersionSupportingRetryableWrites <= this.getMongo().getMaxWireVersion();
+
+ const session = this.getDB().getSession();
+ if (serverSupportsRetryableWrites && session.getOptions().shouldRetryWrites() &&
+ session._serverSession.canRetryWrites(cmd)) {
+ cmd = session._serverSession.assignTransactionNumber(cmd);
+ }
+ }
+
var ret = this._db.runCommand(cmd);
if (!ret.ok) {
if (ret.errmsg == "No matching object found") {
diff --git a/src/mongo/shell/session.js b/src/mongo/shell/session.js
index 31cbdfba57b..90460663284 100644
--- a/src/mongo/shell/session.js
+++ b/src/mongo/shell/session.js
@@ -10,6 +10,7 @@ var {
let _readPreference = rawOptions.readPreference;
let _readConcern = rawOptions.readConcern;
let _writeConcern = rawOptions.writeConcern;
+ let _retryWrites = rawOptions.retryWrites;
this.getReadPreference = function getReadPreference() {
return _readPreference;
@@ -37,6 +38,14 @@ var {
}
_writeConcern = writeConcern;
};
+
+ this.shouldRetryWrites = function shouldRetryWrites() {
+ return _retryWrites;
+ };
+
+ this.setRetryWrites = function setRetryWrites(retryWrites = true) {
+ _retryWrites = retryWrites;
+ };
}
function SessionAwareClient(client) {
@@ -93,12 +102,41 @@ var {
}
}
+ function runClientFunctionWithRetries(
+ driverSession, cmdObj, clientFunction, clientFunctionArguments) {
+ let cmdName = Object.keys(cmdObj)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command object
+ // inside the query/$query object.
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObj = cmdObj[cmdName];
+ cmdName = Object.keys(cmdObj)[0];
+ }
+
+ const numRetries = cmdObj.hasOwnProperty("txnNumber") ? 1 : 0;
+
+ do {
+ try {
+ return clientFunction.apply(client, clientFunctionArguments);
+ } catch (e) {
+ // TODO: Should we run an explicit "isMaster" command in order to compare the
+ // wire version of the server after we reconnect to it?
+ if (!isNetworkError(e) || numRetries === 0) {
+ throw e;
+ }
+ }
+
+ --numRetries;
+ } while (numRetries >= 0);
+ }
+
this.runCommand = function runCommand(driverSession, dbName, cmdObj, options) {
cmdObj = prepareCommandRequest(driverSession, cmdObj);
- const res = client.runCommand(dbName, cmdObj, options);
- processCommandResponse(driverSession, res);
+ const res = runClientFunctionWithRetries(
+ driverSession, cmdObj, client.runCommand, [dbName, cmdObj, options]);
+ processCommandResponse(driverSession, res);
return res;
};
@@ -106,15 +144,17 @@ var {
driverSession, dbName, metadata, cmdObj) {
cmdObj = prepareCommandRequest(driverSession, cmdObj);
- const res = client.runCommandWithMetadata(dbName, metadata, cmdObj);
- processCommandResponse(driverSession, res);
+ const res = runClientFunctionWithRetries(
+ driverSession, cmdObj, client.runCommandWithMetadata, [dbName, metadata, cmdObj]);
+ processCommandResponse(driverSession, res);
return res;
};
}
function ServerSession(client) {
let _lastUsed = new Date();
+ let _nextTxnNum = 0;
this.client = new SessionAwareClient(client);
this.handle = client._startSession();
@@ -150,6 +190,113 @@ var {
return cmdObj;
};
+
+ this.assignTransactionNumber = function assignTransactionNumber(cmdObj) {
+ cmdObj = Object.assign({}, cmdObj);
+
+ const cmdName = Object.keys(cmdObj)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command object
+ // inside the query/$query object.
+ let cmdObjUnwrapped = cmdObj;
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObj[cmdName] = Object.assign({}, cmdObj[cmdName]);
+ cmdObjUnwrapped = cmdObj[cmdName];
+ }
+
+ if (!cmdObjUnwrapped.hasOwnProperty("txnNumber")) {
+ // Since there's no native support for adding NumberLong instances and getting back
+ // another NumberLong instance, converting from a 64-bit floating-point value to a
+ // 64-bit integer value will overflow at 2**53.
+ cmdObjUnwrapped.txnNumber = new NumberLong(_nextTxnNum);
+ ++_nextTxnNum;
+ }
+
+ return cmdObj;
+ };
+
+ this.canRetryWrites = function canRetryWrites(cmdObj) {
+ let cmdName = Object.keys(cmdObj)[0];
+
+ // If the command is in a wrapped form, then we look for the actual command name inside
+ // the query/$query object.
+ if (cmdName === "query" || cmdName === "$query") {
+ cmdObj = cmdObj[cmdName];
+ cmdName = Object.keys(cmdObj)[0];
+ }
+
+ if (cmdName === "insert") {
+ if (!Array.isArray(cmdObj.documents)) {
+ // The command object is malformed, so we'll just leave it as-is and let the
+ // server reject it.
+ return false;
+ }
+
+ if (cmdObj.documents.length === 1) {
+ // Single-statement operations (e.g. insertOne()) can be retried.
+ return true;
+ }
+
+ // Multi-statement operations (e.g. insertMany()) can be retried if they are
+ // executed in order by the server.
+ return cmdObj.ordered ? true : false;
+ } else if (cmdName === "update") {
+ if (!Array.isArray(cmdObj.updates)) {
+ // The command object is malformed, so we'll just leave it as-is and let the
+ // server reject it.
+ return false;
+ }
+
+ const hasMultiUpdate = cmdObj.updates.some(updateOp => updateOp.multi);
+ if (hasMultiUpdate) {
+ // Operations that modify multiple documents (e.g. updateMany()) cannot be
+ // retried.
+ return false;
+ }
+
+ if (cmdObj.updates.length === 1) {
+ // Single-statement operations that modify a single document (e.g. updateOne())
+ // can be retried.
+ return true;
+ }
+
+ // Multi-statement operations that each modify a single document (e.g. bulkWrite())
+ // can be retried if they are executed in order by the server.
+ return cmdObj.ordered ? true : false;
+ } else if (cmdName === "delete") {
+ if (!Array.isArray(cmdObj.deletes)) {
+ // The command object is malformed, so we'll just leave it as-is and let the
+ // server reject it.
+ return false;
+ }
+
+ // We use bsonWoCompare() in order to handle cases where the limit is specified as a
+ // NumberInt() or NumberLong() instance.
+ const hasMultiDelete = cmdObj.deletes.some(
+ deleteOp => bsonWoCompare({_: deleteOp.limit}, {_: 0}) === 0);
+ if (hasMultiDelete) {
+ // Operations that modify multiple documents (e.g. deleteMany()) cannot be
+ // retried.
+ return false;
+ }
+
+ if (cmdObj.deletes.length === 1) {
+ // Single-statement operations that modify a single document (e.g. deleteOne())
+ // can be retried.
+ return true;
+ }
+
+ // Multi-statement operations that each modify a single document (e.g. bulkWrite())
+ // can be retried if they are executed in order by the server.
+ return cmdObj.ordered ? true : false;
+ } else if (cmdName === "findAndModify" || cmdName === "findandmodify") {
+ // Operations that modify a single document (e.g. findOneAndUpdate()) can be
+ // retried.
+ return true;
+ }
+
+ return false;
+ };
}
function makeDriverSessionConstructor(implMethods) {
@@ -211,6 +358,14 @@ var {
injectSessionId: function injectSessionId(cmdObj) {
return cmdObj;
},
+
+ assignTransactionNumber: function assignTransactionNumber(cmdObj) {
+ return cmdObj;
+ },
+
+ canRetryWrites: function canRetryWrites(cmdObj) {
+ return false;
+ },
};
},