summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2019-03-29 16:30:28 -0400
committerjannaerin <golden.janna@gmail.com>2019-04-09 11:21:38 -0400
commit3e7d0821ad83fb29b25ced2b6cf9db313b19ce9b (patch)
tree1da9096d188cdac0eb1621a9b141fc8c945367d3 /src/mongo/s/commands
parent386dc94aad875d8e43bec3db0d3e970286c94494 (diff)
downloadmongo-3e7d0821ad83fb29b25ced2b6cf9db313b19ce9b.tar.gz
SERVER-39842 Allow update to shard key value if retryable write
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp181
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.cpp35
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.h4
3 files changed, 143 insertions, 77 deletions
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 579c8cb6749..b544ddd28ed 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -47,6 +47,7 @@
#include "mongo/s/commands/document_shard_key_update_util.h"
#include "mongo/s/grid.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
+#include "mongo/s/session_catalog_router.h"
#include "mongo/s/transaction_router.h"
#include "mongo/s/would_change_owning_shard_exception.h"
#include "mongo/s/write_ops/batched_command_request.h"
@@ -123,79 +124,149 @@ void batchErrorToLastError(const BatchedCommandRequest& request,
}
}
-bool updateShardKeyValueOnWouldChangeOwningShardError(OperationContext* opCtx,
- BatchedCommandRequest& request,
- BatchedCommandResponse& response) {
- if (!response.getOk() || !response.isErrDetailsSet()) {
- return false;
+/**
+ * Checks if the response contains a WouldChangeOwningShard error. If it does, asserts that the
+ * batch size is 1 and returns the extra info attached to the exception.
+ */
+boost::optional<WouldChangeOwningShardInfo> getWouldChangeOwningShardErrorInfo(
+ OperationContext* opCtx,
+ const BatchedCommandRequest& request,
+ BatchedCommandResponse* response,
+ bool originalCmdInTxn) {
+
+ if (!response->getOk() || !response->isErrDetailsSet()) {
+ return boost::none;
}
- for (const auto& err : response.getErrDetails()) {
- if (err->toStatus() != ErrorCodes::WouldChangeOwningShard) {
- continue;
- }
-
- auto txnRouter = TransactionRouter::get(opCtx);
- bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter;
+ // Updating the shard key when batch size > 1 is disallowed when the document would move
+ // shards. If the update is in a transaction uassert. If the write is not in a transaction,
+ // change any WouldChangeOwningShard errors in this batch to InvalidOptions to be reported
+ // to the user.
+ if (request.sizeWriteOps() != 1U) {
+ for (auto it = response->getErrDetails().begin(); it != response->getErrDetails().end();
+ ++it) {
+ if ((*it)->toStatus() != ErrorCodes::WouldChangeOwningShard) {
+ continue;
+ }
- // Updating the shard key when batch size > 1 is disallowed when the document would move
- // shards. If the update is in a transaction uassert. If the write is not in a transaction,
- // change any WouldChangeOwningShard errors in this batch to InvalidOptions to be reported
- // to the user.
- if (request.sizeWriteOps() != 1U) {
- if (!isRetryableWrite)
+ if (originalCmdInTxn)
uasserted(ErrorCodes::InvalidOptions,
"Document shard key value updates that cause the doc to move shards "
"must be sent with write batch of size 1");
- err->setStatus({ErrorCodes::InvalidOptions,
- "Document shard key value updates that cause the doc to move shards "
- "must be sent with write batch of size 1"});
- continue;
+ (*it)->setStatus({ErrorCodes::InvalidOptions,
+ "Document shard key value updates that cause the doc to move shards "
+ "must be sent with write batch of size 1"});
}
- BSONObjBuilder extraInfoBuilder;
- err->toStatus().extraInfo()->serialize(&extraInfoBuilder);
- auto extraInfo = extraInfoBuilder.obj();
- auto wouldChangeOwningShardExtraInfo =
- WouldChangeOwningShardInfo::parseFromCommandError(extraInfo);
-
- if (isRetryableWrite) {
- // TODO: SERVER-39842 Start txn and resend update
- uasserted(
- ErrorCodes::ImmutableField,
- "After applying the update, an immutable field was found to have been altered.");
+ return boost::none;
+ } else {
+ for (const auto& err : response->getErrDetails()) {
+ if (err->toStatus() != ErrorCodes::WouldChangeOwningShard) {
+ continue;
+ }
+
+ BSONObjBuilder extraInfoBuilder;
+ err->toStatus().extraInfo()->serialize(&extraInfoBuilder);
+ auto extraInfo = extraInfoBuilder.obj();
+ return WouldChangeOwningShardInfo::parseFromCommandError(extraInfo);
}
+ }
+
+ return boost::none;
+}
+/**
+ * Called when the response contains a WouldChangeOwningShard error. Deletes the original document
+ * and inserts the new one in a transaction. Returns whether or not we match and delete the original
+ * document. If the delete and insert succeed, modifies the response object to reflect that we
+ * successfully updated one document.
+ */
+bool updateShardKeyValue(OperationContext* opCtx,
+ const BatchedCommandRequest& request,
+ BatchedCommandResponse* response,
+ const WouldChangeOwningShardInfo& wouldChangeOwningShardErrorInfo) {
+ auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument(
+ opCtx,
+ request.getNS(),
+ wouldChangeOwningShardErrorInfo,
+ request.getWriteCommandBase().getStmtId() ? request.getWriteCommandBase().getStmtId().get()
+ : 0);
+
+ // If we get here, the batch size is 1 and we have successfully deleted the old doc
+ // and inserted the new one, so it is safe to unset the error details.
+ response->unsetErrDetails();
+ if (!matchedDoc)
+ return false;
+
+ response->setN(response->getN() + 1);
+ response->setNModified(response->getNModified() + 1);
+
+ return true;
+}
+
+/**
+ * Changes the shard key for the document if the response object contains a WouldChangeOwningShard
+ * error. If the original command was sent as a retryable write, starts a transaction on the same
+ * session and txnNum, deletes the original document, inserts the new one, and commits the
+ * transaction. If the original command is part of a transaction, deletes the original document and
+ * inserts the new one. Returns whether or not we actually complete the delete and insert.
+ */
+bool handleWouldChangeOwningShardError(OperationContext* opCtx,
+ const BatchedCommandRequest& request,
+ BatchedCommandResponse* response,
+ BatchWriteExecStats stats) {
+ auto txnRouter = TransactionRouter::get(opCtx);
+ bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter;
+
+ auto wouldChangeOwningShardErrorInfo =
+ getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite);
+ if (!wouldChangeOwningShardErrorInfo)
+ return false;
+
+ if (isRetryableWrite) {
+ RouterOperationContextSession routerSession(opCtx);
try {
- auto matchedDoc = documentShardKeyUpdateUtil::updateShardKeyForDocument(
- opCtx,
- request.getNS(),
- wouldChangeOwningShardExtraInfo,
- request.getWriteCommandBase().getStmtId().get());
-
- // If we get here, the batch size is 1 and we have successfully deleted the old doc and
- // inserted the new one, so it is safe to unset the error details.
- response.unsetErrDetails();
- if (!matchedDoc)
- return false;
-
- response.setN(response.getN() + 1);
- response.setNModified(response.getNModified() + 1);
-
- return true;
+ // Start transaction and re-run the original update command
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ auto txnRouterForShardKeyChange =
+ documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
+
+ ClusterWriter::write(opCtx, request, &stats, response);
+ wouldChangeOwningShardErrorInfo =
+ getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite);
+
+ // If we do not get WouldChangeOwningShard when re-running the update, the document has
+ // been modified or deleted and we do not need to delete it and insert a new one.
+ auto updatedShardKey =
+ wouldChangeOwningShardErrorInfo &&
+ updateShardKeyValue(
+ opCtx, request, response, wouldChangeOwningShardErrorInfo.get());
+
+ // Commit the transaction
+ documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx,
+ txnRouterForShardKeyChange);
+
+ return updatedShardKey;
} catch (const DBException& e) {
+ // Set the error status to the status of the failed command and abort the transaction.
auto status = e.toStatus();
+ response->getErrDetails().back()->setStatus(status);
- if (!isRetryableWrite)
- uasserted(status.code(), status.reason());
+ auto txnRouterForAbort = TransactionRouter::get(opCtx);
+ if (txnRouterForAbort)
+ txnRouterForAbort->implicitlyAbortTransaction(opCtx, status);
- err->setStatus(status);
- txnRouter->implicitlyAbortTransaction(opCtx, status);
+ return false;
}
+ } else {
+ // Delete the original document and insert the new one
+ return updateShardKeyValue(opCtx, request, response, wouldChangeOwningShardErrorInfo.get());
}
- return false;
+ MONGO_UNREACHABLE
}
/**
@@ -333,7 +404,7 @@ private:
bool updatedShardKey = false;
if (_batchedRequest.getBatchType() == BatchedCommandRequest::BatchType_Update) {
updatedShardKey =
- updateShardKeyValueOnWouldChangeOwningShardError(opCtx, batchedRequest, response);
+ handleWouldChangeOwningShardError(opCtx, batchedRequest, &response, stats);
}
// Populate the lastError object based on the write response
diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp
index 3390d6a6703..f27c73b810e 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.cpp
+++ b/src/mongo/s/commands/document_shard_key_update_util.cpp
@@ -77,24 +77,6 @@ bool executeOperationsAsPartOfShardKeyUpdate(OperationContext* opCtx,
return true;
}
-TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) {
- auto txnRouter = TransactionRouter::get(opCtx);
- invariant(txnRouter);
-
- auto txnNumber = opCtx->getTxnNumber();
- invariant(txnNumber);
-
- txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart);
-
- return txnRouter;
-}
-
-void commitShardKeyUpdateTransaction(OperationContext* opCtx,
- TransactionRouter* txnRouter,
- TxnNumber txnNumber) {
- auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none);
-}
-
/**
* Creates the delete op that will be used to delete the pre-image document. Will also attach the
* original document _id retrieved from 'updatePreImage'.
@@ -135,12 +117,27 @@ bool updateShardKeyForDocument(OperationContext* opCtx,
auto updatePostImage = documentKeyChangeInfo.getPostImage()->getOwned();
auto deleteCmdObj = constructShardKeyDeleteCmdObj(nss, updatePreImage, stmtId);
-
auto insertCmdObj = constructShardKeyInsertCmdObj(nss, updatePostImage, stmtId);
return executeOperationsAsPartOfShardKeyUpdate(opCtx, deleteCmdObj, insertCmdObj, nss.db());
}
+TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) {
+ auto txnRouter = TransactionRouter::get(opCtx);
+ invariant(txnRouter);
+
+ auto txnNumber = opCtx->getTxnNumber();
+ invariant(txnNumber);
+
+ txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart);
+
+ return txnRouter;
+}
+
+void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) {
+ auto commitResponse = txnRouter->commitTransaction(opCtx, boost::none);
+}
+
BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss,
const BSONObj& updatePreImage,
int stmtId) {
diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h
index 07dae4f6874..d29eda4c848 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.h
+++ b/src/mongo/s/commands/document_shard_key_update_util.h
@@ -75,9 +75,7 @@ TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx);
* Commits the transaction on this session. This method is called to commit the transaction started
* when WouldChangeOwningShard is thrown for a write that is not in a transaction already.
*/
-void commitShardKeyUpdateTransaction(OperationContext* opCtx,
- TransactionRouter* txnRouter,
- TxnNumber txnNumber);
+void commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter);
/**
* Creates the BSONObj that will be used to delete the pre-image document. Will also attach