summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2019-04-08 16:09:26 -0400
committerjannaerin <golden.janna@gmail.com>2019-04-22 11:46:46 -0400
commitdc392f3d00692327f488f07dea1bba247d56a580 (patch)
tree42a9d8e2c4e7ac0b735d524088286bcc916dc38b
parent3ccf6f6e9ed261a390574fa8a3d590951e1f04ac (diff)
downloadmongo-dc392f3d00692327f488f07dea1bba247d56a580.tar.gz
SERVER-39843 Allow findAndModify to change the shard key value when sent as retryable write
-rw-r--r--jstests/multiVersion/update_shard_key_disallowed_fcv40.js87
-rw-r--r--jstests/sharding/update_shard_key_doc_moves_shards.js5
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp55
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp44
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.cpp4
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.h2
6 files changed, 148 insertions, 49 deletions
diff --git a/jstests/multiVersion/update_shard_key_disallowed_fcv40.js b/jstests/multiVersion/update_shard_key_disallowed_fcv40.js
index 1949aac101f..86dc798c99e 100644
--- a/jstests/multiVersion/update_shard_key_disallowed_fcv40.js
+++ b/jstests/multiVersion/update_shard_key_disallowed_fcv40.js
@@ -22,7 +22,7 @@
function shardCollectionAndMoveChunks(docsToInsert, shardKey, splitDoc, moveDoc) {
for (let i = 0; i < docsToInsert.length; i++) {
- assert.writeOK(mongos.getDB(kDbName).foo.insert(docsToInsert[i]));
+ assert.commandWorked(mongos.getDB(kDbName).foo.insert(docsToInsert[i]));
}
assert.commandWorked(mongos.getDB(kDbName).foo.createIndex(shardKey));
@@ -38,7 +38,10 @@
st.rs1.getPrimary().adminCommand({_flushDatabaseCacheUpdates: kDbName});
}
- function assertCannotUpdateShardKey() {
+ function assertCannotUpdateShardKey(isMixedCluster) {
+ // ------------------------------------------------
+ // Test changes to shard key run as retryable write
+ // ------------------------------------------------
let session = st.s.startSession({retryWrites: true});
let sessionDB = session.getDatabase(kDbName);
@@ -62,13 +65,13 @@
});
// Assert that updating the shard key when the doc would move shards fails for both modify
- // and replacement updates
+ // and replacement updates.
assert.writeError(sessionDB.foo.update({x: 80}, {$set: {x: 3}}));
// TODO: SERVER-39158. Currently, this update will not fail but will not update the doc.
// After SERVER-39158 is finished, this should fail.
- assert.writeOK(sessionDB.foo.update({x: 80}, {x: 3}));
- assert.eq(1, mongos.getDB(kDbName).foo.find({x: 80}).toArray().length);
- assert.eq(0, mongos.getDB(kDbName).foo.find({x: 3}).toArray().length);
+ assert.commandWorked(sessionDB.foo.update({x: 80}, {x: 3}));
+ assert.eq(1, mongos.getDB(kDbName).foo.find({x: 80}).itcount());
+ assert.eq(0, mongos.getDB(kDbName).foo.find({x: 3}).itcount());
assert.throws(function() {
sessionDB.foo.findAndModify({query: {x: 80}, update: {$set: {x: 3}}});
@@ -116,7 +119,9 @@
mongos.getDB(kDbName).foo.drop();
- // Test that we fail when attempt to run in a transaction as well
+ // -----------------------------------------------
+ // Test changes to shard key run in a transaction
+ // -----------------------------------------------
session = st.s.startSession();
sessionDB = session.getDatabase(kDbName);
@@ -140,6 +145,70 @@
session.abortTransaction();
mongos.getDB(kDbName).foo.drop();
+
+ if (isMixedCluster) {
+ // Assert that updating the shard key when the doc would move shards fails on commit
+ // because one of the participants is not in FCV 4.2. If the original write is a
+ // retryable write, the write will fail when mongos attempts to run commitTransaction.
+ // If the original write is part of a transaction, the write itself will complete
+ // successfully, but the transaction will fail to commit.
+
+ // Retryable write - updates to full shard key
+ session = st.s.startSession({retryWrites: true});
+ sessionDB = session.getDatabase(kDbName);
+
+ shardCollectionMoveChunks(
+ st, kDbName, ns, {x: 1}, [{x: 30}, {x: 50}, {x: 80}], {x: 50}, {x: 80});
+ cleanupOrphanedDocs(st, ns);
+
+ assert.writeError(sessionDB.foo.update({x: 30}, {$set: {x: 100}}));
+ assert.throws(function() {
+ sessionDB.foo.findAndModify({query: {x: 30}, update: {$set: {x: 100}}});
+ });
+
+ mongos.getDB(kDbName).foo.drop();
+
+ // Retryable write - updates to partial shard key
+ shardCollectionMoveChunks(st,
+ kDbName,
+ ns,
+ {x: 1, y: 1},
+ [{x: 30, y: 4}, {x: 50, y: 50}, {x: 80, y: 100}],
+ {x: 50, y: 50},
+ {x: 80, y: 100});
+ cleanupOrphanedDocs(st, ns);
+
+ assert.writeError(sessionDB.foo.update({x: 30}, {$set: {x: 100}}));
+ assert.throws(function() {
+ sessionDB.foo.findAndModify({query: {x: 30}, update: {x: 100}});
+ });
+
+ mongos.getDB(kDbName).foo.drop();
+
+ // Transactional writes
+ session = st.s.startSession();
+ sessionDB = session.getDatabase(kDbName);
+
+ shardCollectionMoveChunks(
+ st, kDbName, ns, {x: 1}, [{x: 30}, {x: 50}, {x: 80}], {x: 50}, {x: 80});
+ cleanupOrphanedDocs(st, ns);
+
+ session.startTransaction();
+ assert.commandWorked(sessionDB.foo.update({x: 30}, {$set: {x: 100}}));
+ assert.commandFailed(session.commitTransaction_forTesting());
+
+ assert.eq(1, mongos.getDB(kDbName).foo.find({x: 30}).itcount());
+ assert.eq(0, mongos.getDB(kDbName).foo.find({x: 100}).itcount());
+
+ session.startTransaction();
+ sessionDB.foo.findAndModify({query: {x: 30}, update: {x: 100}});
+ assert.commandFailed(session.commitTransaction_forTesting());
+
+ assert.eq(1, mongos.getDB(kDbName).foo.find({x: 30}).itcount());
+ assert.eq(0, mongos.getDB(kDbName).foo.find({x: 100}).itcount());
+
+ mongos.getDB(kDbName).foo.drop();
+ }
}
// Check that updating the shard key fails when all shards are in FCV 4.0
@@ -148,7 +217,7 @@
checkFCV(st.rs0.getPrimary().getDB("admin"), "4.0");
checkFCV(st.rs1.getPrimary().getDB("admin"), "4.0");
- assertCannotUpdateShardKey();
+ assertCannotUpdateShardKey(false);
// Check that updating the shard key fails when shard0 is in FCV 4.2 but shard 1 is in FCV 4.0
assert.commandWorked(
@@ -157,7 +226,7 @@
checkFCV(st.rs0.getPrimary().getDB("admin"), "4.2");
checkFCV(st.rs1.getPrimary().getDB("admin"), "4.0");
- assertCannotUpdateShardKey();
+ assertCannotUpdateShardKey(true);
st.stop();
})();
diff --git a/jstests/sharding/update_shard_key_doc_moves_shards.js b/jstests/sharding/update_shard_key_doc_moves_shards.js
index ed7c6b69a49..91bcf7b3541 100644
--- a/jstests/sharding/update_shard_key_doc_moves_shards.js
+++ b/jstests/sharding/update_shard_key_doc_moves_shards.js
@@ -107,10 +107,7 @@
// Test that changing the shard key works correctly when either the update or findAndModify
// command is used and when the command is run either as a retryable write or in a transaction.
// Pairs represent [shouldRunCommandInTxn, runUpdateAsFindAndModifyCmd]
- //
- // TODO: SERVER-39843 add [false, true] to run retryable write findAndModify commands that
- // update the shard key
- let changeShardKeyOptions = [[false, false], [true, false], [true, true]];
+ let changeShardKeyOptions = [[false, false], [true, false], [false, true], [true, true]];
changeShardKeyOptions.forEach(function(updatePair) {
let runInTxn = updatePair[0];
let isFindAndModify = updatePair[1];
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 1cc1c01089e..0db0c46abbb 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -46,6 +46,7 @@
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
+#include "mongo/s/session_catalog_router.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/transaction_router.h"
#include "mongo/s/would_change_owning_shard_exception.h"
@@ -83,21 +84,12 @@ void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
Status responseStatus,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
- auto txnRouter = TransactionRouter::get(opCtx);
- bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter;
-
BSONObjBuilder extraInfoBuilder;
responseStatus.extraInfo()->serialize(&extraInfoBuilder);
auto extraInfo = extraInfoBuilder.obj();
auto wouldChangeOwningShardExtraInfo =
WouldChangeOwningShardInfo::parseFromCommandError(extraInfo);
- if (isRetryableWrite) {
- // TODO: SERVER-39843 Start txn and resend command
- uasserted(ErrorCodes::ImmutableField,
- "After applying the update, an immutable field was found to have been altered.");
- }
-
try {
auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument(
opCtx, nss, wouldChangeOwningShardExtraInfo, cmdObj.getIntField("stmtId"));
@@ -118,8 +110,7 @@ void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
result->append("ok", 1.0);
} catch (const DBException& e) {
auto status = e.toStatus();
- if (!isRetryableWrite)
- uassertStatusOK(status.withContext("findAndModify"));
+ uassertStatusOK(status.withContext("findAndModify"));
}
}
@@ -250,6 +241,7 @@ private:
const NamespaceString& nss,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
+ bool isRetryableWrite = opCtx->getTxnNumber() && !TransactionRouter::get(opCtx);
const auto response = [&] {
std::vector<AsyncRequestsSender::Request> requests;
requests.emplace_back(
@@ -257,8 +249,6 @@ private:
appendShardVersion(CommandHelpers::filterCommandRequestForPassthrough(cmdObj),
shardVersion));
- bool isRetryableWrite = opCtx->getTxnNumber() && !TransactionRouter::get(opCtx);
-
MultiStatementTransactionRequestsSender ars(
opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
@@ -283,8 +273,43 @@ private:
}
if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) {
- updateShardKeyValueOnWouldChangeOwningShardError(
- opCtx, nss, responseStatus, cmdObj, result);
+ if (isRetryableWrite) {
+ RouterOperationContextSession routerSession(opCtx);
+ try {
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ readConcernArgs =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ // Re-run the findAndModify command that will change the shard key value in a
+ // transaction. We call _runCommand recursively, and this second time through
+ // since it will be run as a transaction it will take the other code path to
+ // updateShardKeyValueOnWouldChangeOwningShardError.
+ auto txnRouterForShardKeyChange =
+ documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
+ _runCommand(opCtx, shardId, shardVersion, nss, cmdObj, result);
+ auto commitResponse =
+ documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(
+ opCtx, txnRouterForShardKeyChange);
+
+ // TODO SERVER-40666: Check the commit response returned success for all shards
+ if (commitResponse.hasField("ok") && !commitResponse["ok"].trueValue()) {
+ // TODO SERVER-40646: Change error reported to something more useful to user
+ uassertStatusOK(
+ Status{ErrorCodes::Error(commitResponse.getIntField("code")),
+ commitResponse.getStringField("errmsg")});
+ }
+ } catch (const DBException& e) {
+ auto txnRouterForAbort = TransactionRouter::get(opCtx);
+ if (txnRouterForAbort)
+ txnRouterForAbort->implicitlyAbortTransaction(opCtx, e.toStatus());
+
+ throw;
+ }
+ } else {
+ updateShardKeyValueOnWouldChangeOwningShardError(
+ opCtx, nss, responseStatus, cmdObj, result);
+ }
+
return;
}
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index cfc69ba09cb..be5a0a5e9c8 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -195,16 +195,9 @@ bool updateShardKeyValue(OperationContext* opCtx,
wouldChangeOwningShardErrorInfo,
request.getWriteCommandBase().getStmtId() ? request.getWriteCommandBase().getStmtId().get()
: 0);
-
- // If we get here, the batch size is 1 and we have successfully deleted the old doc
- // and inserted the new one, so it is safe to unset the error details.
- response->unsetErrDetails();
if (!matchedDoc)
return false;
- response->setN(response->getN() + 1);
- response->setNModified(response->getNModified() + 1);
-
return true;
}
@@ -227,6 +220,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
if (!wouldChangeOwningShardErrorInfo)
return false;
+ bool updatedShardKey = false;
if (isRetryableWrite) {
if (MONGO_FAIL_POINT(hangAfterThrowWouldChangeOwningShardRetryableWrite)) {
log() << "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint";
@@ -248,17 +242,22 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite);
// If we do not get WouldChangeOwningShard when re-running the update, the document has
- // been modified or deleted and we do not need to delete it and insert a new one.
- auto updatedShardKey =
- wouldChangeOwningShardErrorInfo &&
+ // been modified or deleted concurrently and we do not need to delete it and insert a
+ // new one.
+ updatedShardKey = wouldChangeOwningShardErrorInfo &&
updateShardKeyValue(
- opCtx, request, response, wouldChangeOwningShardErrorInfo.get());
+ opCtx, request, response, wouldChangeOwningShardErrorInfo.get());
// Commit the transaction
- documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx,
- txnRouterForShardKeyChange);
-
- return updatedShardKey;
+ auto commitResponse = documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(
+ opCtx, txnRouterForShardKeyChange);
+
+ // TODO SERVER-40666: Check the commit response returned success for all shards
+ if (commitResponse.hasField("ok") && !commitResponse["ok"].trueValue()) {
+ // TODO SERVER-40646: Change the error we report to something more useful to a user
+ uassertStatusOK(Status{ErrorCodes::Error(commitResponse.getIntField("code")),
+ commitResponse.getStringField("errmsg")});
+ }
} catch (const DBException& e) {
// Set the error status to the status of the failed command and abort the transaction.
auto status = e.toStatus();
@@ -283,8 +282,8 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
} else {
try {
// Delete the original document and insert the new one
- return updateShardKeyValue(
- opCtx, request, response, wouldChangeOwningShardErrorInfo.get());
+ updatedShardKey =
+ updateShardKeyValue(opCtx, request, response, wouldChangeOwningShardErrorInfo.get());
} catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) {
Status status = ex->getKeyPattern().hasField("_id")
? ex.toStatus().withContext(
@@ -293,9 +292,18 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
: ex.toStatus();
uassertStatusOK(status);
}
+
+ }
+
+ if (updatedShardKey) {
+ // If we get here, the batch size is 1 and we have successfully deleted the old doc
+ // and inserted the new one, so it is safe to unset the error details.
+ response->unsetErrDetails();
+ response->setN(response->getN() + 1);
+ response->setNModified(response->getNModified() + 1);
}
- MONGO_UNREACHABLE
+ return updatedShardKey;
}
/**
diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp
index 706c54a1c18..cf7692b5557 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.cpp
+++ b/src/mongo/s/commands/document_shard_key_update_util.cpp
@@ -142,8 +142,8 @@ TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) {
return txnRouter;
}
-void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) {
- auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none);
+BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) {
+ return txnRouter->commitTransaction(opCtx, boost::none);
}
BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss,
diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h
index d29eda4c848..50e22ad5cba 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.h
+++ b/src/mongo/s/commands/document_shard_key_update_util.h
@@ -75,7 +75,7 @@ TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx);
* Commits the transaction on this session. This method is called to commit the transaction started
* when WouldChangeOwningShard is thrown for a write that is not in a transaction already.
*/
-void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter);
+BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter);
/**
* Creates the BSONObj that will be used to delete the pre-image document. Will also attach