diff options
author | jannaerin <golden.janna@gmail.com> | 2019-03-20 12:41:09 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2019-03-20 15:48:42 -0400 |
commit | 869b713681f5832e687fe213084e6170ebef60c4 (patch) | |
tree | 965a2266fa34b8835f32f3e9163a2c21e208103c /src | |
parent | 1d8d992f2fef6db349a11893da4f2bf52c39dc86 (diff) | |
download | mongo-869b713681f5832e687fe213084e6170ebef60c4.tar.gz |
SERVER-39837 Allow findAndModify to update the shard key value when run in transaction
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/exec/update_stage.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_and_modify_cmd.cpp | 53 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/document_shard_key_update_util.cpp | 73 | ||||
-rw-r--r-- | src/mongo/s/commands/document_shard_key_update_util.h | 11 | ||||
-rw-r--r-- | src/mongo/s/would_change_owning_shard_exception.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/would_change_owning_shard_exception.h | 25 |
7 files changed, 112 insertions, 72 deletions
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index 54119affd1d..8f5704deebe 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -924,11 +924,10 @@ void UpdateStage::assertUpdateToShardKeyFieldsIsValidAndDocStillBelongsToNode( txnParticipant); if (!metadata->keyBelongsToMe(newShardKey)) { - boost::optional<BSONObj> originalQuery{txnParticipant.inMultiDocumentTransaction(), - _params.request->getQuery()}; + // If this update is in a multi-stmt txn, attach the post image to the error. boost::optional<BSONObj> postImg{txnParticipant.inMultiDocumentTransaction(), newObj}; - uasserted(WouldChangeOwningShardInfo(originalQuery, postImg), + uasserted(WouldChangeOwningShardInfo(oldObj.value(), postImg), str::stream() << "This update would cause the doc to change owning shards"); } } diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 283a460a83b..1b11eb0d0aa 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -42,11 +42,13 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/commands/cluster_explain.h" +#include "mongo/s/commands/document_shard_key_update_util.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" +#include "mongo/s/would_change_owning_shard_exception.h" #include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/timer.h" @@ -76,6 +78,51 @@ BSONObj getShardKey(OperationContext* opCtx, const ChunkManager& chunkMgr, const return shardKey; } +void updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, + const NamespaceString nss, + Status responseStatus, + const BSONObj& cmdObj, + BSONObjBuilder* result) { + auto txnRouter = TransactionRouter::get(opCtx); + bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; + + BSONObjBuilder extraInfoBuilder; + responseStatus.extraInfo()->serialize(&extraInfoBuilder); + auto extraInfo = extraInfoBuilder.obj(); + auto wouldChangeOwningShardExtraInfo = + WouldChangeOwningShardInfo::parseFromCommandError(extraInfo); + + if (isRetryableWrite) { + // TODO: SERVER-39843 Start txn and resend command + uasserted(ErrorCodes::ImmutableField, + "After applying the update, an immutable field was found to have been altered."); + } + + try { + auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument( + opCtx, nss, wouldChangeOwningShardExtraInfo, cmdObj.getIntField("stmtId")); + + BSONObjBuilder lastErrorObjBuilder(result->subobjStart("lastErrorObject")); + lastErrorObjBuilder.appendNumber("n", matchedDoc ? 1 : 0); + lastErrorObjBuilder.appendBool("updatedExisting", matchedDoc ? true : false); + lastErrorObjBuilder.doneFast(); + + if (matchedDoc) { + result->append("value", + cmdObj.getBoolField("new") + ? wouldChangeOwningShardExtraInfo.getPostImage().get() + : wouldChangeOwningShardExtraInfo.getPreImage()); + } else { + result->appendNull("value"); + } + result->append("ok", 1.0); + } catch (const DBException& e) { + auto status = e.toStatus(); + if (!isRetryableWrite) + uassertStatusOK(status.withContext("findAndModify")); + } +} + class FindAndModifyCmd : public BasicCommand { public: FindAndModifyCmd() : BasicCommand("findAndModify", "findandmodify") {} @@ -235,6 +282,12 @@ private: uassertStatusOK(responseStatus.withContext("findAndModify")); } + if (responseStatus.code() == ErrorCodes::WouldChangeOwningShard) { + updateShardKeyValueOnWouldChangeOwningShardError( + opCtx, nss, responseStatus, cmdObj, result); + return; + } + // First append the properly constructed writeConcernError. It will then be skipped in // appendElementsUnique. if (auto wcErrorElem = response.data["writeConcernError"]) { diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index ede88f0bc1f..8e410ed515c 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -168,7 +168,7 @@ bool updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, } try { - documentShardKeyUpdateUtil::updateShardKeyForDocument( + auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument( opCtx, request.getNS(), wouldChangeOwningShardExtraInfo, @@ -177,6 +177,9 @@ bool updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx, // If we get here, the batch size is 1 and we have successfully deleted the old doc and // inserted the new one, so it is safe to unset the error details. response.unsetErrDetails(); + if (!matchedDoc) + return false; + response.setN(response.getN() + 1); response.setNModified(response.getNModified() + 1); diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp index d4378009bf2..3390d6a6703 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.cpp +++ b/src/mongo/s/commands/document_shard_key_update_util.cpp @@ -26,7 +26,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ - +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/commands/document_shard_key_update_util.h" @@ -37,16 +37,17 @@ #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/s/write_ops/cluster_write.h" +#include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace { /** - * Calls into the command execution stack to run the given commands. Will blindly uassert on any + * Calls into the command execution stack to run the given command. Will blindly uassert on any * error returned by a command. */ -void executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx, +bool executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx, const BSONObj& deleteCmdObj, const BSONObj& insertCmdObj, const StringData db) { @@ -59,7 +60,7 @@ void executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx, uassertStatusOK(deleteResponse.toStatus()); // If we do not delete any document, this is essentially equivalent to not matching a doc. if (deleteResponse.getN() != 1) - return; + return false; auto insertOpMsg = OpMsgRequest::fromDBAndBody(db, insertCmdObj); auto insertRequest = BatchedCommandRequest::parseInsert(insertOpMsg); @@ -72,22 +73,38 @@ void executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx, "Document not successfully inserted while changing shard key for namespace " + insertRequest.getNS().toString(), insertResponse.getN() == 1); + + return true; +} + +TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) { + auto txnRouter = TransactionRouter::get(opCtx); + invariant(txnRouter); + + auto txnNumber = opCtx->getTxnNumber(); + invariant(txnNumber); + + txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart); + + return txnRouter; +} + +void commitShardKeyUpdateTransaction(OperationContext* opCtx, + TransactionRouter* txnRouter, + TxnNumber txnNumber) { + auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none); } /** * Creates the delete op that will be used to delete the pre-image document. Will also attach the - * original document _id retrieved from the 'updatePostImage'. + * original document _id retrieved from 'updatePreImage'. */ write_ops::Delete createShardKeyDeleteOp(const NamespaceString& nss, - const BSONObj& originalQueryPredicate, - const BSONObj& updatePostImage) { - BSONObjBuilder fullPredicateBuilder(originalQueryPredicate); - fullPredicateBuilder.append(updatePostImage["_id"]); - + const BSONObj& updatePreImage) { write_ops::Delete deleteOp(nss); deleteOp.setDeletes({[&] { write_ops::DeleteOpEntry entry; - entry.setQ(fullPredicateBuilder.obj()); + entry.setQ(updatePreImage); entry.setMulti(false); return entry; }()}); @@ -109,47 +126,25 @@ write_ops::Insert createShardKeyInsertOp(const NamespaceString& nss, namespace documentShardKeyUpdateUtil { -void updateShardKeyForDocument(OperationContext* opCtx, +bool updateShardKeyForDocument(OperationContext* opCtx, const NamespaceString& nss, const WouldChangeOwningShardInfo& documentKeyChangeInfo, int stmtId) { - auto originalQueryPredicate = documentKeyChangeInfo.getOriginalQueryPredicate()->getOwned(); - + auto updatePreImage = documentKeyChangeInfo.getPreImage().getOwned(); invariant(documentKeyChangeInfo.getPostImage()); auto updatePostImage = documentKeyChangeInfo.getPostImage()->getOwned(); - auto deleteCmdObj = - constructShardKeyDeleteCmdObj(nss, originalQueryPredicate, updatePostImage, stmtId); + auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage, stmtId); auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage, stmtId); - executeOperationsAsPartOfShardKeyUpdate(opCtx, deleteCmdObj, insertCmdObj, nss.db()); -} - - -TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) { - auto txnRouter = TransactionRouter::get(opCtx); - invariant(txnRouter); - - auto txnNumber = opCtx->getTxnNumber(); - invariant(txnNumber); - - txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart); - - return txnRouter; -} - -void commitShardKeyUpdateTransaction(OperationContext* opCtx, - TransactionRouter* txnRouter, - TxnNumber txnNumber) { - auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none); + return executeOperationsAsPartOfShardKeyUpdate(opCtx, deleteCmdObj, insertCmdObj, nss.db()); } BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, - const BSONObj& originalQueryPredicate, - const BSONObj& updatePostImage, + const BSONObj& updatePreImage, int stmtId) { - auto deleteOp = createShardKeyDeleteOp(nss, originalQueryPredicate, updatePostImage); + auto deleteOp = createShardKeyDeleteOp(nss, updatePreImage); // TODO SERVER-40181: Do not set the stmtId once we remove stmtIds from txn oplog entries deleteOp.getWriteCommandBase().setStmtId(stmtId); return deleteOp.toBSON({}); diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h index baebfc5202c..07dae4f6874 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.h +++ b/src/mongo/s/commands/document_shard_key_update_util.h @@ -56,12 +56,11 @@ namespace documentShardKeyUpdateUtil { /** * Coordinating method and external point of entry for updating a document's shard key. This method - * creates the necessary delete and insert operations. It will then run each operation using the - * ClusterWriter. + * creates the necessary extra operations. It will then run each operation using the ClusterWriter. * If any statement throws, an exception will leave this method, and must be handled by external * callers. */ -void updateShardKeyForDocument(OperationContext* opCtx, +bool updateShardKeyForDocument(OperationContext* opCtx, const NamespaceString& nss, const WouldChangeOwningShardInfo& documentKeyChangeInfo, int stmtId); @@ -74,8 +73,7 @@ TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx); /** * Commits the transaction on this session. This method is called to commit the transaction started - * when - * WouldChangeOwningShard is thrown for a write that is not in a transaction already. + * when WouldChangeOwningShard is thrown for a write that is not in a transaction already. */ void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter, @@ -89,8 +87,7 @@ void commitShardKeyUpdateTransaction(OperationContext* opCtx, * intermediary test coverage. */ BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, - const BSONObj& originalQueryPredicate, - const BSONObj& updatePostImage, + const BSONObj& updatePreImage, int stmtId); /* diff --git a/src/mongo/s/would_change_owning_shard_exception.cpp b/src/mongo/s/would_change_owning_shard_exception.cpp index 9a69daf9bcf..9563c586431 100644 --- a/src/mongo/s/would_change_owning_shard_exception.cpp +++ b/src/mongo/s/would_change_owning_shard_exception.cpp @@ -38,14 +38,14 @@ namespace mongo { namespace { MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(WouldChangeOwningShardInfo); -constexpr StringData kOriginalQueryPredicate = "originalQueryPredicate"_sd; + +constexpr StringData kPreImage = "preImage"_sd; constexpr StringData kPostImage = "postImage"_sd; } // namespace void WouldChangeOwningShardInfo::serialize(BSONObjBuilder* bob) const { - if (_originalQueryPredicate) - bob->append(kOriginalQueryPredicate, _originalQueryPredicate.get()); + bob->append(kPreImage, _preImage); if (_postImage) bob->append(kPostImage, _postImage.get()); } @@ -55,14 +55,12 @@ std::shared_ptr<const ErrorExtraInfo> WouldChangeOwningShardInfo::parse(const BS } WouldChangeOwningShardInfo WouldChangeOwningShardInfo::parseFromCommandError(const BSONObj& obj) { - boost::optional<BSONObj> originalQueryPredicate = boost::none; + boost::optional<BSONObj> originalUpdate = boost::none; boost::optional<BSONObj> postImage = boost::none; - if (obj[kOriginalQueryPredicate]) - originalQueryPredicate = obj[kOriginalQueryPredicate].Obj().getOwned(); if (obj[kPostImage]) postImage = obj[kPostImage].Obj().getOwned(); - return WouldChangeOwningShardInfo(originalQueryPredicate, postImage); + return WouldChangeOwningShardInfo(obj[kPreImage].Obj().getOwned(), postImage); } } // namespace mongo diff --git a/src/mongo/s/would_change_owning_shard_exception.h b/src/mongo/s/would_change_owning_shard_exception.h index 0ead364fa23..db48f95792e 100644 --- a/src/mongo/s/would_change_owning_shard_exception.h +++ b/src/mongo/s/would_change_owning_shard_exception.h @@ -38,28 +38,23 @@ namespace mongo { /** * This error is thrown when an update would cause a document to be owned by a different * shard. If the update is part of a multi statement transaction, we will attach the - * query from the original update and the post image returned by the update stage. MongoS - * will use these to delete the original doc and insert the new doc. If the update is a - * retryable write, we will not attach any extra info and MongoS will start an internal - * transaction and re-send the original update command upon catching this error. + * pre image and the post image returned by the update stage. MongoS will use these to delete + * the original doc and insert the new doc. If the update is a retryable write, we will attach + * only the pre image. */ class WouldChangeOwningShardInfo final : public ErrorExtraInfo { public: static constexpr auto code = ErrorCodes::WouldChangeOwningShard; - explicit WouldChangeOwningShardInfo(const boost::optional<BSONObj>& originalQueryPredicate, - const boost::optional<BSONObj>& postImage) { - // Either both originalQueryPredicate and postImage should be set or neither should. - invariant((originalQueryPredicate && postImage) != (!originalQueryPredicate && !postImage)); - - if (originalQueryPredicate) - _originalQueryPredicate = originalQueryPredicate->getOwned(); + explicit WouldChangeOwningShardInfo(const BSONObj& preImage, + const boost::optional<BSONObj>& postImage) + : _preImage(preImage.getOwned()) { if (postImage) _postImage = postImage->getOwned(); } - const auto& getOriginalQueryPredicate() const { - return _originalQueryPredicate; + const auto& getPreImage() const { + return _preImage; } const auto& getPostImage() const { @@ -77,8 +72,8 @@ public: static WouldChangeOwningShardInfo parseFromCommandError(const BSONObj& commandError); private: - // the 'q' portion of the original update comamand - boost::optional<BSONObj> _originalQueryPredicate; + // The pre image of the document + BSONObj _preImage; // The post image returned by the update stage boost::optional<BSONObj> _postImage; |