diff options
author | jannaerin <golden.janna@gmail.com> | 2019-04-08 16:09:26 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2019-04-22 11:46:46 -0400 |
commit | dc392f3d00692327f488f07dea1bba247d56a580 (patch) | |
tree | 42a9d8e2c4e7ac0b735d524088286bcc916dc38b | |
parent | 3ccf6f6e9ed261a390574fa8a3d590951e1f04ac (diff) | |
download | mongo-dc392f3d00692327f488f07dea1bba247d56a580.tar.gz |
SERVER-39843 Allow findAndModify to change the shard key value when sent as retryable write
6 files changed, 148 insertions, 49 deletions
diff --git a/jstests/multiVersion/update_shard_key_disallowed_fcv40.js b/jstests/multiVersion/update_shard_key_disallowed_fcv40.js index 1949aac101f..86dc798c99e 100644 --- a/jstests/multiVersion/update_shard_key_disallowed_fcv40.js +++ b/jstests/multiVersion/update_shard_key_disallowed_fcv40.js @@ -22,7 +22,7 @@ function shardCollectionAndMoveChunks(docsToInsert, shardKey, splitDoc, moveDoc) { for (let i = 0; i < docsToInsert.length; i++) { - assert.writeOK(mongos.getDB(kDbName).foo.insert(docsToInsert[i])); + assert.commandWorked(mongos.getDB(kDbName).foo.insert(docsToInsert[i])); } assert.commandWorked(mongos.getDB(kDbName).foo.createIndex(shardKey)); @@ -38,7 +38,10 @@ st.rs1.getPrimary().adminCommand({_flushDatabaseCacheUpdates: kDbName}); } - function assertCannotUpdateShardKey() { + function assertCannotUpdateShardKey(isMixedCluster) { + // ------------------------------------------------ + // Test changes to shard key run as retryable write + // ------------------------------------------------ let session = st.s.startSession({retryWrites: true}); let sessionDB = session.getDatabase(kDbName); @@ -62,13 +65,13 @@ }); // Assert that updating the shard key when the doc would move shards fails for both modify - // and replacement updates + // and replacement updates. assert.writeError(sessionDB.foo.update({x: 80}, {$set: {x: 3}})); // TODO: SERVER-39158. Currently, this update will not fail but will not update the doc. // After SERVER-39158 is finished, this should fail. - assert.writeOK(sessionDB.foo.update({x: 80}, {x: 3})); - assert.eq(1, mongos.getDB(kDbName).foo.find({x: 80}).toArray().length); - assert.eq(0, mongos.getDB(kDbName).foo.find({x: 3}).toArray().length); + assert.commandWorked(sessionDB.foo.update({x: 80}, {x: 3})); + assert.eq(1, mongos.getDB(kDbName).foo.find({x: 80}).itcount()); + assert.eq(0, mongos.getDB(kDbName).foo.find({x: 3}).itcount()); assert.throws(function() { sessionDB.foo.findAndModify({query: {x: 80}, update: {$set: {x: 3}}}); @@ -116,7 +119,9 @@ mongos.getDB(kDbName).foo.drop(); - // Test that we fail when attempt to run in a transaction as well + // ----------------------------------------------- + // Test changes to shard key run in a transaction + // ----------------------------------------------- session = st.s.startSession(); sessionDB = session.getDatabase(kDbName); @@ -140,6 +145,70 @@ session.abortTransaction(); mongos.getDB(kDbName).foo.drop(); + + if (isMixedCluster) { + // Assert that updating the shard key when the doc would move shards fails on commit + // because one of the participants is not in FCV 4.2. If the original write is a + // retryable write, the write will fail when mongos attempts to run commitTransaction. + // If the original write is part of a transaction, the write itself will complete + // successfully, but the transaction will fail to commit. + + // Retryable write - updates to full shard key + session = st.s.startSession({retryWrites: true}); + sessionDB = session.getDatabase(kDbName); + + shardCollectionMoveChunks( + st, kDbName, ns, {x: 1}, [{x: 30}, {x: 50}, {x: 80}], {x: 50}, {x: 80}); + cleanupOrphanedDocs(st, ns); + + assert.writeError(sessionDB.foo.update({x: 30}, {$set: {x: 100}})); + assert.throws(function() { + sessionDB.foo.findAndModify({query: {x: 30}, update: {$set: {x: 100}}}); + }); + + mongos.getDB(kDbName).foo.drop(); + + // Retryable write - updates to partial shard key + shardCollectionMoveChunks(st, + kDbName, + ns, + {x: 1, y: 1}, + [{x: 30, y: 4}, {x: 50, y: 50}, {x: 80, y: 100}], + {x: 50, y: 50}, + {x: 80, y: 100}); + cleanupOrphanedDocs(st, ns); + + assert.writeError(sessionDB.foo.update({x: 30}, {$set: {x: 100}})); + assert.throws(function() { + sessionDB.foo.findAndModify({query: {x: 30}, update: {x: 100}}); + }); + + mongos.getDB(kDbName).foo.drop(); + + // Transactional writes + session = st.s.startSession(); + sessionDB = session.getDatabase(kDbName); + + shardCollectionMoveChunks( + st, kDbName, ns, {x: 1}, [{x: 30}, {x: 50}, {x: 80}], {x: 50}, {x: 80}); + cleanupOrphanedDocs(st, ns); + + session.startTransaction(); + assert.commandWorked(sessionDB.foo.update({x: 30}, {$set: {x: 100}})); + assert.commandFailed(session.commitTransaction_forTesting()); + + assert.eq(1, mongos.getDB(kDbName).foo.find({x: 30}).itcount()); + assert.eq(0, mongos.getDB(kDbName).foo.find({x: 100}).itcount()); + + session.startTransaction(); + sessionDB.foo.findAndModify({query: {x: 30}, update: {x: 100}}); + assert.commandFailed(session.commitTransaction_forTesting()); + + assert.eq(1, mongos.getDB(kDbName).foo.find({x: 30}).itcount()); + assert.eq(0, mongos.getDB(kDbName).foo.find({x: 100}).itcount()); + + mongos.getDB(kDbName).foo.drop(); + } } // Check that updating the shard key fails when all shards are in FCV 4.0 @@ -148,7 +217,7 @@ checkFCV(st.rs0.getPrimary().getDB("admin"), "4.0"); checkFCV(st.rs1.getPrimary().getDB("admin"), "4.0"); - assertCannotUpdateShardKey(); + assertCannotUpdateShardKey(false); // Check that updating the shard key fails when shard0 is in FCV 4.2 but shard 1 is in FCV 4.0 assert.commandWorked( @@ -157,7 +226,7 @@ checkFCV(st.rs0.getPrimary().getDB("admin"), "4.2"); checkFCV(st.rs1.getPrimary().getDB("admin"), "4.0"); - assertCannotUpdateShardKey(); + assertCannotUpdateShardKey(true); st.stop(); })(); diff --git a/jstests/sharding/update_shard_key_doc_moves_shards.js b/jstests/sharding/update_shard_key_doc_moves_shards.js index ed7c6b69a49..91bcf7b3541 100644 --- a/jstests/sharding/update_shard_key_doc_moves_shards.js +++ b/jstests/sharding/update_shard_key_doc_moves_shards.js @@ -107,10 +107,7 @@ // Test that changing the shard key works correctly when either the update or findAndModify // command is used and when the command is run either as a retryable write or in a transaction. // Pairs represent [shouldRunCommandInTxn, runUpdateAsFindAndModifyCmd] - // - // TODO: SERVER-39843 add [false, true] to run retryable write findAndModify commands that - // update the shard key - let changeShardKeyOptions = [[false, false], [true, false], [true, true]]; + let changeShardKeyOptions = [[false, false], [true, false], [false, true], [true, true]]; changeShardKeyOptions.forEach(function(updatePair) { let runInTxn = updatePair[0]; let isFindAndModify = updatePair[1]; diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 1cc1c01089e..0db0c46abbb 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -46,6 +46,7 @@ #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" +#include "mongo/s/session_catalog_router.h" #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" #include "mongo/s/would_change_owning_shard_exception.h" @@ -83,21 +84,12 @@ void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, Status responseStatus, const BSONObj& cmdObj, BSONObjBuilder* result) { - auto txnRouter = TransactionRouter::get(opCtx); - bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; - BSONObjBuilder extraInfoBuilder; responseStatus.extraInfo()->serialize(&extraInfoBuilder); auto extraInfo = extraInfoBuilder.obj(); auto wouldChangeOwningShardExtraInfo = WouldChangeOwningShardInfo::parseFromCommandError(extraInfo); - if (isRetryableWrite) { - // TODO: SERVER-39843 Start txn and resend command - uasserted(ErrorCodes::ImmutableField, - "After applying the update, an immutable field was found to have been altered."); - } - try { auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument( opCtx, nss, wouldChangeOwningShardExtraInfo, cmdObj.getIntField("stmtId")); @@ -118,8 +110,7 @@ void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, result->append("ok", 1.0); } catch (const DBException& e) { auto status = e.toStatus(); - if (!isRetryableWrite) - uassertStatusOK(status.withContext("findAndModify")); + uassertStatusOK(status.withContext("findAndModify")); } } @@ -250,6 +241,7 @@ private: const NamespaceString& nss, const BSONObj& cmdObj, BSONObjBuilder* result) { + bool isRetryableWrite = opCtx->getTxnNumber() && !TransactionRouter::get(opCtx); const auto response = [&] { std::vector<AsyncRequestsSender::Request> requests; requests.emplace_back( @@ -257,8 +249,6 @@ private: appendShardVersion(CommandHelpers::filterCommandRequestForPassthrough(cmdObj), shardVersion)); - bool isRetryableWrite = opCtx->getTxnNumber() && !TransactionRouter::get(opCtx); - MultiStatementTransactionRequestsSender ars( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), @@ -283,8 +273,43 @@ private: } if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) { - updateShardKeyValueOnWouldChangeOwningShardError( - opCtx, nss, responseStatus, cmdObj, result); + if (isRetryableWrite) { + RouterOperationContextSession routerSession(opCtx); + try { + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + readConcernArgs = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + // Re-run the findAndModify command that will change the shard key value in a + // transaction. We call _runCommand recursively, and this second time through + // since it will be run as a transaction it will take the other code path to + // updateShardKeyValueOnWouldChangeOwningShardError. + auto txnRouterForShardKeyChange = + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); + _runCommand(opCtx, shardId, shardVersion, nss, cmdObj, result); + auto commitResponse = + documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction( + opCtx, txnRouterForShardKeyChange); + + // TODO SERVER-40666: Check the commit response returned success for all shards + if (commitResponse.hasField("ok") && !commitResponse["ok"].trueValue()) { + // TODO SERVER-40646: Change error reported to something more useful to user + uassertStatusOK( + Status{ErrorCodes::Error(commitResponse.getIntField("code")), + commitResponse.getStringField("errmsg")}); + } + } catch (const DBException& e) { + auto txnRouterForAbort = TransactionRouter::get(opCtx); + if (txnRouterForAbort) + txnRouterForAbort->implicitlyAbortTransaction(opCtx, e.toStatus()); + + throw; + } + } else { + updateShardKeyValueOnWouldChangeOwningShardError( + opCtx, nss, responseStatus, cmdObj, result); + } + return; } diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index cfc69ba09cb..be5a0a5e9c8 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -195,16 +195,9 @@ bool updateShardKeyValue(OperationContext* opCtx, wouldChangeOwningShardErrorInfo, request.getWriteCommandBase().getStmtId() ? request.getWriteCommandBase().getStmtId().get() : 0); - - // If we get here, the batch size is 1 and we have successfully deleted the old doc - // and inserted the new one, so it is safe to unset the error details. - response->unsetErrDetails(); if (!matchedDoc) return false; - response->setN(response->getN() + 1); - response->setNModified(response->getNModified() + 1); - return true; } @@ -227,6 +220,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, if (!wouldChangeOwningShardErrorInfo) return false; + bool updatedShardKey = false; if (isRetryableWrite) { if (MONGO_FAIL_POINT(hangAfterThrowWouldChangeOwningShardRetryableWrite)) { log() << "Hit hangAfterThrowWouldChangeOwningShardRetryableWrite failpoint"; @@ -248,17 +242,22 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite); // If we do not get WouldChangeOwningShard when re-running the update, the document has - // been modified or deleted and we do not need to delete it and insert a new one. - auto updatedShardKey = - wouldChangeOwningShardErrorInfo && + // been modified or deleted concurrently and we do not need to delete it and insert a + // new one. + updatedShardKey = wouldChangeOwningShardErrorInfo && updateShardKeyValue( - opCtx, request, response, wouldChangeOwningShardErrorInfo.get()); + opCtx, request, response, wouldChangeOwningShardErrorInfo.get()); // Commit the transaction - documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx, - txnRouterForShardKeyChange); - - return updatedShardKey; + auto commitResponse = documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction( + opCtx, txnRouterForShardKeyChange); + + // TODO SERVER-40666: Check the commit response returned success for all shards + if (commitResponse.hasField("ok") && !commitResponse["ok"].trueValue()) { + // TODO SERVER-40646: Change the error we report to something more useful to a user + uassertStatusOK(Status{ErrorCodes::Error(commitResponse.getIntField("code")), + commitResponse.getStringField("errmsg")}); + } } catch (const DBException& e) { // Set the error status to the status of the failed command and abort the transaction. auto status = e.toStatus(); @@ -283,8 +282,8 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, } else { try { // Delete the original document and insert the new one - return updateShardKeyValue( - opCtx, request, response, wouldChangeOwningShardErrorInfo.get()); + updatedShardKey = + updateShardKeyValue(opCtx, request, response, wouldChangeOwningShardErrorInfo.get()); } catch (const ExceptionFor<ErrorCodes::DuplicateKey>& ex) { Status status = ex->getKeyPattern().hasField("_id") ? ex.toStatus().withContext( @@ -293,9 +292,18 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, : ex.toStatus(); uassertStatusOK(status); } + + } + + if (updatedShardKey) { + // If we get here, the batch size is 1 and we have successfully deleted the old doc + // and inserted the new one, so it is safe to unset the error details. + response->unsetErrDetails(); + response->setN(response->getN() + 1); + response->setNModified(response->getNModified() + 1); } - MONGO_UNREACHABLE + return updatedShardKey; } /** diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp index 706c54a1c18..cf7692b5557 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.cpp +++ b/src/mongo/s/commands/document_shard_key_update_util.cpp @@ -142,8 +142,8 @@ TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) { return txnRouter; } -void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) { - auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none); +BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) { + return txnRouter->commitTransaction(opCtx, boost::none); } BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h index d29eda4c848..50e22ad5cba 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.h +++ b/src/mongo/s/commands/document_shard_key_update_util.h @@ -75,7 +75,7 @@ TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx); * Commits the transaction on this session. This method is called to commit the transaction started * when WouldChangeOwningShard is thrown for a write that is not in a transaction already. */ -void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter); +BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter); /** * Creates the BSONObj that will be used to delete the pre-image document. Will also attach |