diff options
author | jannaerin <golden.janna@gmail.com> | 2019-03-29 16:30:28 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2019-04-09 11:21:38 -0400 |
commit | 3e7d0821ad83fb29b25ced2b6cf9db313b19ce9b (patch) | |
tree | 1da9096d188cdac0eb1621a9b141fc8c945367d3 /src/mongo/s/commands | |
parent | 386dc94aad875d8e43bec3db0d3e970286c94494 (diff) | |
download | mongo-3e7d0821ad83fb29b25ced2b6cf9db313b19ce9b.tar.gz |
SERVER-39842 Allow update to shard key value if retryable write
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 181 | ||||
-rw-r--r-- | src/mongo/s/commands/document_shard_key_update_util.cpp | 35 | ||||
-rw-r--r-- | src/mongo/s/commands/document_shard_key_update_util.h | 4 |
3 files changed, 143 insertions, 77 deletions
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 579c8cb6749..b544ddd28ed 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -47,6 +47,7 @@ #include "mongo/s/commands/document_shard_key_update_util.h" #include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" +#include "mongo/s/session_catalog_router.h" #include "mongo/s/transaction_router.h" #include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/s/write_ops/batched_command_request.h" @@ -123,79 +124,149 @@ void batchErrorToLastError(const BatchedCommandRequest& request, } } -bool updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, - BatchedCommandRequest& request, - BatchedCommandResponse& response) { - if (!response.getOk() || !response.isErrDetailsSet()) { - return false; +/** + * Checks if the response contains a WouldChangeOwningShard error. If it does, asserts that the + * batch size is 1 and returns the extra info attached to the exception. + */ +boost::optional<WouldChangeOwningShardInfo> getWouldChangeOwningShardErrorInfo( + OperationContext* opCtx, + const BatchedCommandRequest& request, + BatchedCommandResponse* response, + bool originalCmdInTxn) { + + if (!response->getOk() || !response->isErrDetailsSet()) { + return boost::none; } - for (const auto& err : response.getErrDetails()) { - if (err->toStatus() != ErrorCodes::WouldChangeOwningShard) { - continue; - } - - auto txnRouter = TransactionRouter::get(opCtx); - bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; + // Updating the shard key when batch size > 1 is disallowed when the document would move + // shards. If the update is in a transaction uassert. If the write is not in a transaction, + // change any WouldChangeOwningShard errors in this batch to InvalidOptions to be reported + // to the user. + if (request.sizeWriteOps() != 1U) { + for (auto it = response->getErrDetails().begin(); it != response->getErrDetails().end(); + ++it) { + if ((*it)->toStatus() != ErrorCodes::WouldChangeOwningShard) { + continue; + } - // Updating the shard key when batch size > 1 is disallowed when the document would move - // shards. If the update is in a transaction uassert. If the write is not in a transaction, - // change any WouldChangeOwningShard errors in this batch to InvalidOptions to be reported - // to the user. - if (request.sizeWriteOps() != 1U) { - if (!isRetryableWrite) + if (originalCmdInTxn) uasserted(ErrorCodes::InvalidOptions, "Document shard key value updates that cause the doc to move shards " "must be sent with write batch of size 1"); - err->setStatus({ErrorCodes::InvalidOptions, - "Document shard key value updates that cause the doc to move shards " - "must be sent with write batch of size 1"}); - continue; + (*it)->setStatus({ErrorCodes::InvalidOptions, + "Document shard key value updates that cause the doc to move shards " + "must be sent with write batch of size 1"}); } - BSONObjBuilder extraInfoBuilder; - err->toStatus().extraInfo()->serialize(&extraInfoBuilder); - auto extraInfo = extraInfoBuilder.obj(); - auto wouldChangeOwningShardExtraInfo = - WouldChangeOwningShardInfo::parseFromCommandError(extraInfo); - - if (isRetryableWrite) { - // TODO: SERVER-39842 Start txn and resend update - uasserted( - ErrorCodes::ImmutableField, - "After applying the update, an immutable field was found to have been altered."); + return boost::none; + } else { + for (const auto& err : response->getErrDetails()) { + if (err->toStatus() != ErrorCodes::WouldChangeOwningShard) { + continue; + } + + BSONObjBuilder extraInfoBuilder; + err->toStatus().extraInfo()->serialize(&extraInfoBuilder); + auto extraInfo = extraInfoBuilder.obj(); + return WouldChangeOwningShardInfo::parseFromCommandError(extraInfo); } + } + + return boost::none; +} +/** + * Called when the response contains a WouldChangeOwningShard error. Deletes the original document + * and inserts the new one in a transaction. Returns whether or not we match and delete the original + * document. If the delete and insert succeed, modifies the response object to reflect that we + * successfully updated one document. + */ +bool updateShardKeyValue(OperationContext* opCtx, + const BatchedCommandRequest& request, + BatchedCommandResponse* response, + const WouldChangeOwningShardInfo& wouldChangeOwningShardErrorInfo) { + auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument( + opCtx, + request.getNS(), + wouldChangeOwningShardErrorInfo, + request.getWriteCommandBase().getStmtId() ? request.getWriteCommandBase().getStmtId().get() + : 0); + + // If we get here, the batch size is 1 and we have successfully deleted the old doc + // and inserted the new one, so it is safe to unset the error details. + response->unsetErrDetails(); + if (!matchedDoc) + return false; + + response->setN(response->getN() + 1); + response->setNModified(response->getNModified() + 1); + + return true; +} + +/** + * Changes the shard key for the document if the response object contains a WouldChangeOwningShard + * error. If the original command was sent as a retryable write, starts a transaction on the same + * session and txnNum, deletes the original document, inserts the new one, and commits the + * transaction. If the original command is part of a transaction, deletes the original document and + * inserts the new one. Returns whether or not we actually complete the delete and insert. + */ +bool handleWouldChangeOwningShardError(OperationContext* opCtx, + const BatchedCommandRequest& request, + BatchedCommandResponse* response, + BatchWriteExecStats stats) { + auto txnRouter = TransactionRouter::get(opCtx); + bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; + + auto wouldChangeOwningShardErrorInfo = + getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite); + if (!wouldChangeOwningShardErrorInfo) + return false; + + if (isRetryableWrite) { + RouterOperationContextSession routerSession(opCtx); try { - auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument( - opCtx, - request.getNS(), - wouldChangeOwningShardExtraInfo, - request.getWriteCommandBase().getStmtId().get()); - - // If we get here, the batch size is 1 and we have successfully deleted the old doc and - // inserted the new one, so it is safe to unset the error details. - response.unsetErrDetails(); - if (!matchedDoc) - return false; - - response.setN(response.getN() + 1); - response.setNModified(response.getNModified() + 1); - - return true; + // Start transaction and re-run the original update command + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + auto txnRouterForShardKeyChange = + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); + + ClusterWriter::write(opCtx, request, &stats, response); + wouldChangeOwningShardErrorInfo = + getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite); + + // If we do not get WouldChangeOwningShard when re-running the update, the document has + // been modified or deleted and we do not need to delete it and insert a new one. + auto updatedShardKey = + wouldChangeOwningShardErrorInfo && + updateShardKeyValue( + opCtx, request, response, wouldChangeOwningShardErrorInfo.get()); + + // Commit the transaction + documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx, + txnRouterForShardKeyChange); + + return updatedShardKey; } catch (const DBException& e) { + // Set the error status to the status of the failed command and abort the transaction. auto status = e.toStatus(); + response->getErrDetails().back()->setStatus(status); - if (!isRetryableWrite) - uasserted(status.code(), status.reason()); + auto txnRouterForAbort = TransactionRouter::get(opCtx); + if (txnRouterForAbort) + txnRouterForAbort->implicitlyAbortTransaction(opCtx, status); - err->setStatus(status); - txnRouter->implicitlyAbortTransaction(opCtx, status); + return false; } + } else { + // Delete the original document and insert the new one + return updateShardKeyValue(opCtx, request, response, wouldChangeOwningShardErrorInfo.get()); } - return false; + MONGO_UNREACHABLE } /** @@ -333,7 +404,7 @@ private: bool updatedShardKey = false; if (_batchedRequest.getBatchType() == BatchedCommandRequest::BatchType_Update) { updatedShardKey = - updateShardKeyValueOnWouldChangeOwningShardError(opCtx, batchedRequest, response); + handleWouldChangeOwningShardError(opCtx, batchedRequest, &response, stats); } // Populate the lastError object based on the write response diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp index 3390d6a6703..f27c73b810e 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.cpp +++ b/src/mongo/s/commands/document_shard_key_update_util.cpp @@ -77,24 +77,6 @@ bool executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx, return true; } -TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) { - auto txnRouter = TransactionRouter::get(opCtx); - invariant(txnRouter); - - auto txnNumber = opCtx->getTxnNumber(); - invariant(txnNumber); - - txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart); - - return txnRouter; -} - -void commitShardKeyUpdateTransaction(OperationContext* opCtx, - TransactionRouter* txnRouter, - TxnNumber txnNumber) { - auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none); -} - /** * Creates the delete op that will be used to delete the pre-image document. Will also attach the * original document _id retrieved from 'updatePreImage'. @@ -135,12 +117,27 @@ bool updateShardKeyForDocument(OperationContext* opCtx, auto updatePostImage = documentKeyChangeInfo.getPostImage()->getOwned(); auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage, stmtId); - auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage, stmtId); return executeOperationsAsPartOfShardKeyUpdate(opCtx, deleteCmdObj, insertCmdObj, nss.db()); } +TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) { + auto txnRouter = TransactionRouter::get(opCtx); + invariant(txnRouter); + + auto txnNumber = opCtx->getTxnNumber(); + invariant(txnNumber); + + txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart); + + return txnRouter; +} + +void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) { + auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none); +} + BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, const BSONObj& updatePreImage, int stmtId) { diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h index 07dae4f6874..d29eda4c848 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.h +++ b/src/mongo/s/commands/document_shard_key_update_util.h @@ -75,9 +75,7 @@ TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx); * Commits the transaction on this session. This method is called to commit the transaction started * when WouldChangeOwningShard is thrown for a write that is not in a transaction already. */ -void commitShardKeyUpdateTransaction(OperationContext* opCtx, - TransactionRouter* txnRouter, - TxnNumber txnNumber); +void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter); /** * Creates the BSONObj that will be used to delete the pre-image document. Will also attach |