diff options
author | Kevin Pulo <kevin.pulo@mongodb.com> | 2019-11-06 06:39:27 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-06 06:39:27 +0000 |
commit | 81238fa87afbe52a9658547f63c79fac126862f1 (patch) | |
tree | b5cecb5cff754360369ff7c2bbb3c23fea10dc38 | |
parent | a0f929889db6566a92200ac3f1f430f1d042862a (diff) | |
download | mongo-81238fa87afbe52a9658547f63c79fac126862f1.tar.gz |
SERVER-43712 mongos use ReadWriteConcernDefaults for WC
28 files changed, 258 insertions, 99 deletions
diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index faabf3f766c..3401ceea111 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -57,12 +57,15 @@ bool isRetryableWriteCommand(StringData cmdName) { } // namespace +bool commandSupportsWriteConcernInTransaction(StringData cmdName) { + return cmdName == "commitTransaction" || cmdName == "coordinateCommitTransaction" || + cmdName == "abortTransaction" || cmdName == "prepareTransaction"; +} + void validateWriteConcernForTransaction(const WriteConcernOptions& wcResult, StringData cmdName) { uassert(ErrorCodes::InvalidOptions, "writeConcern is not allowed within a multi-statement transaction", - wcResult.usedDefault || cmdName == "commitTransaction" || - cmdName == "coordinateCommitTransaction" || cmdName == "abortTransaction" || - cmdName == "prepareTransaction"); + wcResult.usedDefault || commandSupportsWriteConcernInTransaction(cmdName)); } bool shouldCommandSkipSessionCheckout(StringData cmdName) { diff --git a/src/mongo/db/transaction_validation.h b/src/mongo/db/transaction_validation.h index ec22c7f304e..2e57866eeb1 100644 --- a/src/mongo/db/transaction_validation.h +++ b/src/mongo/db/transaction_validation.h @@ -35,6 +35,11 @@ namespace mongo { /** + * Returns true if the given cmd name is allowed to specify write concern in a transaction. + */ +bool commandSupportsWriteConcernInTransaction(StringData cmdName); + +/** * Throws if the given write concern is not allowed in a transaction. */ void validateWriteConcernForTransaction(const WriteConcernOptions& wcResult, StringData cmdName); diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index a2fd9e350b2..6036dcc31c8 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -267,6 +267,57 @@ BSONObj appendAllowImplicitCreate(BSONObj cmdObj, bool allow) { return newCmdBuilder.obj(); } +BSONObj applyReadWriteConcern(OperationContext* opCtx, bool appendWC, const BSONObj& cmdObj) { + // Never apply write concern to ordinary operations inside transactions. Applying writeConcern + // to terminal operations such as abortTransaction and commitTransaction is done directly by the + // TransactionRouter. + if (TransactionRouter::get(opCtx)) { + return cmdObj; + } + + // Append all original fields except the readConcern/writeConcern field to the new command. + BSONObjBuilder output; + bool seenWriteConcern = false; + for (const auto& elem : cmdObj) { + const auto name = elem.fieldNameStringData(); + if (appendWC && name == WriteConcernOptions::kWriteConcernField) { + seenWriteConcern = true; + } + if (!output.hasField(name)) { + output.append(elem); + } + } + + // Finally, add the new read/write concern. + if (appendWC && !seenWriteConcern) { + output.append(WriteConcernOptions::kWriteConcernField, opCtx->getWriteConcern().toBSON()); + } + + return output.obj(); +} + +BSONObj applyReadWriteConcern(OperationContext* opCtx, + CommandInvocation* invocation, + const BSONObj& cmdObj) { + return applyReadWriteConcern(opCtx, invocation->supportsWriteConcern(), cmdObj); +} + +BSONObj applyReadWriteConcern(OperationContext* opCtx, BasicCommand* cmd, const BSONObj& cmdObj) { + return applyReadWriteConcern(opCtx, cmd->supportsWriteConcern(cmdObj), cmdObj); +} + +BSONObj stripWriteConcern(const BSONObj& cmdObj) { + BSONObjBuilder output; + for (const auto& elem : cmdObj) { + const auto name = elem.fieldNameStringData(); + if (name == WriteConcernOptions::kWriteConcernField) { + continue; + } + output.append(elem); + } + return output.obj(); +} + std::vector<AsyncRequestsSender::Response> scatterGatherUnversionedTargetAllShards( OperationContext* opCtx, StringData dbName, diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index 2a50e01d2a3..f65f39f9607 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -36,6 +36,7 @@ #include "mongo/base/status.h" #include "mongo/base/string_data.h" #include "mongo/bson/bsonobj.h" +#include "mongo/db/commands.h" #include "mongo/db/jsobj.h" #include "mongo/rpc/write_concern_error_detail.h" #include "mongo/s/async_requests_sender.h" @@ -91,6 +92,26 @@ BSONObj appendShardVersion(BSONObj cmdObj, ChunkVersion version); BSONObj appendAllowImplicitCreate(BSONObj cmdObj, bool allow); /** + * Returns a copy of 'cmdObj' with the read/writeConcern from the OpCtx appended, unless the + * cmdObj explicitly specifies read/writeConcern. + */ +BSONObj applyReadWriteConcern(OperationContext* opCtx, bool appendWC, const BSONObj& cmdObj); + +/** + * Convenience versions of applyReadWriteConcern() for calling from within + * CommandInvocation or BasicCommand. + */ +BSONObj applyReadWriteConcern(OperationContext* opCtx, + CommandInvocation* invocation, + const BSONObj& cmdObj); +BSONObj applyReadWriteConcern(OperationContext* opCtx, BasicCommand* cmd, const BSONObj& cmdObj); + +/** + * Returns a copy of 'cmdObj' with the writeConcern removed. + */ +BSONObj stripWriteConcern(const BSONObj& cmdObj); + +/** * Utility for dispatching unversioned commands to all shards in a cluster. * * Returns a non-OK status if a failure occurs on *this* node during execution. Otherwise, returns diff --git a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp index 8389409d6ba..3d4667879f3 100644 --- a/src/mongo/s/commands/cluster_coll_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_coll_stats_cmd.cpp @@ -88,7 +88,8 @@ public: nss.db(), nss, routingInfo, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, {}, diff --git a/src/mongo/s/commands/cluster_collection_mod_cmd.cpp b/src/mongo/s/commands/cluster_collection_mod_cmd.cpp index f4091d155c4..247aa8db8aa 100644 --- a/src/mongo/s/commands/cluster_collection_mod_cmd.cpp +++ b/src/mongo/s/commands/cluster_collection_mod_cmd.cpp @@ -73,7 +73,8 @@ public: auto shardResponses = scatterGatherOnlyVersionIfUnsharded( opCtx, nss, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kNoRetry); return appendRawResponses( diff --git a/src/mongo/s/commands/cluster_create_indexes_cmd.cpp b/src/mongo/s/commands/cluster_create_indexes_cmd.cpp index 7ec58e26000..3169b6fe485 100644 --- a/src/mongo/s/commands/cluster_create_indexes_cmd.cpp +++ b/src/mongo/s/commands/cluster_create_indexes_cmd.cpp @@ -73,8 +73,8 @@ public: createShardDatabase(opCtx, dbName); - auto shardResponses = - dispatchCommandAssertCollectionExistsOnAtLeastOneShard(opCtx, nss, cmdObj); + auto shardResponses = dispatchCommandAssertCollectionExistsOnAtLeastOneShard( + opCtx, nss, applyReadWriteConcern(opCtx, this, cmdObj)); return appendRawResponses(opCtx, &errmsg, diff --git a/src/mongo/s/commands/cluster_data_size_cmd.cpp b/src/mongo/s/commands/cluster_data_size_cmd.cpp index c8d410e1634..3b172589ffb 100644 --- a/src/mongo/s/commands/cluster_data_size_cmd.cpp +++ b/src/mongo/s/commands/cluster_data_size_cmd.cpp @@ -107,7 +107,8 @@ public: nss.db(), nss, routingInfo, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, {}, diff --git a/src/mongo/s/commands/cluster_db_stats_cmd.cpp b/src/mongo/s/commands/cluster_db_stats_cmd.cpp index d7d31e5fd55..3408419de95 100644 --- a/src/mongo/s/commands/cluster_db_stats_cmd.cpp +++ b/src/mongo/s/commands/cluster_db_stats_cmd.cpp @@ -124,7 +124,8 @@ public: auto shardResponses = scatterGatherUnversionedTargetAllShards( opCtx, dbName, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); if (!appendRawResponses(opCtx, &errmsg, &output, shardResponses)) { diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index 3c9c667678d..0009fc6f89a 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -186,7 +186,8 @@ public: nss.db(), nss, routingInfo, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, query, diff --git a/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp b/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp index 9dca911b224..90db76876da 100644 --- a/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp +++ b/src/mongo/s/commands/cluster_drop_indexes_cmd.cpp @@ -80,7 +80,8 @@ public: auto shardResponses = scatterGatherOnlyVersionIfUnsharded( opCtx, nss, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kNotIdempotent); return appendRawResponses(opCtx, diff --git a/src/mongo/s/commands/cluster_filemd5_cmd.cpp b/src/mongo/s/commands/cluster_filemd5_cmd.cpp index 8b3d1385864..e17534b8618 100644 --- a/src/mongo/s/commands/cluster_filemd5_cmd.cpp +++ b/src/mongo/s/commands/cluster_filemd5_cmd.cpp @@ -118,8 +118,10 @@ public: SimpleBSONObjComparator::kInstance.evaluate( routingInfo.cm()->getShardKeyPattern().toBSON() == BSON("files_id" << 1))) { CommandHelpers::filterCommandReplyForPassthrough( - callShardFn(CommandHelpers::filterCommandRequestForPassthrough(cmdObj), - BSON("files_id" << cmdObj.firstElement())), + callShardFn( + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), + BSON("files_id" << cmdObj.firstElement())), &result); return true; } @@ -149,7 +151,8 @@ public: while (true) { const auto res = callShardFn( [&] { - BSONObjBuilder bb(CommandHelpers::filterCommandRequestForPassthrough(cmdObj)); + BSONObjBuilder bb(applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj))); bb.append("partialOk", true); bb.append("startAt", numGridFSChunksProcessed); if (!lastResult.isEmpty()) { diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 90442a2b4c2..7169f2be79a 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -207,7 +207,7 @@ public: chunkMgr->getVersion(shard->getId()), boost::none, nss, - explainCmd, + applyReadWriteConcern(opCtx, false, explainCmd), &bob); } else { _runCommand(opCtx, @@ -215,7 +215,7 @@ public: ChunkVersion::UNSHARDED(), routingInfo.db().databaseVersion(), nss, - explainCmd, + applyReadWriteConcern(opCtx, false, explainCmd), &bob); } @@ -254,7 +254,7 @@ public: ChunkVersion::UNSHARDED(), routingInfo.db().databaseVersion(), nss, - cmdObjForShard, + applyReadWriteConcern(opCtx, this, cmdObjForShard), &result); return true; } @@ -271,7 +271,7 @@ public: chunkMgr->getVersion(chunk.getShardId()), boost::none, nss, - cmdObjForShard, + applyReadWriteConcern(opCtx, this, cmdObjForShard), &result); return true; @@ -331,9 +331,18 @@ private: // Re-run the findAndModify command that will change the shard key value in a // transaction. We call _runCommand recursively, and this second time through // since it will be run as a transaction it will take the other code path to - // updateShardKeyValueOnWouldChangeOwningShardError. + // updateShardKeyValueOnWouldChangeOwningShardError. We ensure the retried + // operation does not include WC inside the transaction by stripping it from the + // cmdObj. The transaction commit will still use the WC, because it uses the WC + // from the opCtx (which has been set previously in Strategy). documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); - _runCommand(opCtx, shardId, shardVersion, dbVersion, nss, cmdObj, result); + _runCommand(opCtx, + shardId, + shardVersion, + dbVersion, + nss, + stripWriteConcern(cmdObj), + result); uassertStatusOK(getStatusFromCommandResult(result->asTempObj())); auto commitResponse = documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); diff --git a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp index aabd696f845..3d941e1ad3b 100644 --- a/src/mongo/s/commands/cluster_get_last_error_cmd.cpp +++ b/src/mongo/s/commands/cluster_get_last_error_cmd.cpp @@ -39,6 +39,7 @@ #include "mongo/db/lasterror.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" @@ -251,12 +252,13 @@ public: const HostOpTimeMap hostOpTimes(ClusterLastErrorInfo::get(cc())->getPrevHostOpTimes()); std::vector<LegacyWCResponse> wcResponses; - auto status = - enforceLegacyWriteConcern(opCtx, - dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), - hostOpTimes, - &wcResponses); + auto status = enforceLegacyWriteConcern( + opCtx, + dbname, + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), + hostOpTimes, + &wcResponses); // Don't forget about our last hosts, reset the client info ClusterLastErrorInfo::get(cc())->disableForCommand(); diff --git a/src/mongo/s/commands/cluster_index_filter_cmd.cpp b/src/mongo/s/commands/cluster_index_filter_cmd.cpp index dc517eccd4e..f6c5bd37777 100644 --- a/src/mongo/s/commands/cluster_index_filter_cmd.cpp +++ b/src/mongo/s/commands/cluster_index_filter_cmd.cpp @@ -99,7 +99,8 @@ public: nss.db(), nss, routingInfo, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, query, diff --git a/src/mongo/s/commands/cluster_list_databases_cmd.cpp b/src/mongo/s/commands/cluster_list_databases_cmd.cpp index 9ba0e23bd34..fb5e9ab261a 100644 --- a/src/mongo/s/commands/cluster_list_databases_cmd.cpp +++ b/src/mongo/s/commands/cluster_list_databases_cmd.cpp @@ -40,6 +40,7 @@ #include "mongo/db/commands/list_databases_gen.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" @@ -118,7 +119,8 @@ public: shardIds.emplace_back(ShardRegistry::kConfigServerShardId); // { filter: matchExpression }. - auto filteredCmd = CommandHelpers::filterCommandRequestForPassthrough(cmdObj); + auto filteredCmd = applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)); for (const ShardId& shardId : shardIds) { const auto shardStatus = shardRegistry->getShard(opCtx, shardId); diff --git a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp index c0abf8efd87..6656c0d8f25 100644 --- a/src/mongo/s/commands/cluster_map_reduce_cmd.cpp +++ b/src/mongo/s/commands/cluster_map_reduce_cmd.cpp @@ -35,6 +35,7 @@ #include "mongo/db/commands/map_reduce_command_base.h" #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/query/query_knobs_gen.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/commands/cluster_map_reduce.h" #include "mongo/s/commands/cluster_map_reduce_agg.h" @@ -57,7 +58,7 @@ public: if (getTestCommandsEnabled() && internalQueryUseAggMapReduce.load()) { return runAggregationMapReduce(opCtx, dbname, cmd, errmsg, result); } - return runMapReduce(opCtx, dbname, cmd, errmsg, result); + return runMapReduce(opCtx, dbname, applyReadWriteConcern(opCtx, this, cmd), errmsg, result); } } clusterMapReduceCommand; diff --git a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp index b0854622b20..192c5f9300a 100644 --- a/src/mongo/s/commands/cluster_plan_cache_cmd.cpp +++ b/src/mongo/s/commands/cluster_plan_cache_cmd.cpp @@ -122,7 +122,8 @@ bool ClusterPlanCacheCmd::run(OperationContext* opCtx, nss.db(), nss, routingInfo, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, query, diff --git a/src/mongo/s/commands/cluster_reset_error_cmd.cpp b/src/mongo/s/commands/cluster_reset_error_cmd.cpp index 1a37a591b05..21dca8edbf3 100644 --- a/src/mongo/s/commands/cluster_reset_error_cmd.cpp +++ b/src/mongo/s/commands/cluster_reset_error_cmd.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands.h" #include "mongo/db/lasterror.h" #include "mongo/s/client/shard_connection.h" +#include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/cluster_last_error_info.h" namespace mongo { @@ -77,7 +78,10 @@ public: // Don't care about result from shards. conn->runCommand( - dbname, CommandHelpers::filterCommandRequestForPassthrough(cmdObj), res); + dbname, + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), + res); conn.done(); } diff --git a/src/mongo/s/commands/cluster_restart_catalog_command.cpp b/src/mongo/s/commands/cluster_restart_catalog_command.cpp index 5bcc6686ca5..4f3af29908c 100644 --- a/src/mongo/s/commands/cluster_restart_catalog_command.cpp +++ b/src/mongo/s/commands/cluster_restart_catalog_command.cpp @@ -79,7 +79,8 @@ public: auto shardResponses = scatterGatherUnversionedTargetAllShards( opCtx, db, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent); diff --git a/src/mongo/s/commands/cluster_set_index_commit_quorum_cmd.cpp b/src/mongo/s/commands/cluster_set_index_commit_quorum_cmd.cpp index a19fb187524..5074b7da454 100644 --- a/src/mongo/s/commands/cluster_set_index_commit_quorum_cmd.cpp +++ b/src/mongo/s/commands/cluster_set_index_commit_quorum_cmd.cpp @@ -91,7 +91,8 @@ public: scatterGatherOnlyVersionIfUnsharded( opCtx, request().getNamespace(), - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kNotIdempotent); } diff --git a/src/mongo/s/commands/cluster_user_management_commands.cpp b/src/mongo/s/commands/cluster_user_management_commands.cpp index 607114dda40..58384e84755 100644 --- a/src/mongo/s/commands/cluster_user_management_commands.cpp +++ b/src/mongo/s/commands/cluster_user_management_commands.cpp @@ -90,7 +90,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); } @@ -134,7 +135,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -183,7 +185,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -223,7 +226,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -270,7 +274,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -317,7 +322,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -354,7 +360,11 @@ public: const BSONObj& cmdObj, BSONObjBuilder& result) { return Grid::get(opCtx)->catalogClient()->runUserManagementReadCommand( - opCtx, dbname, CommandHelpers::filterCommandRequestForPassthrough(cmdObj), &result); + opCtx, + dbname, + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), + &result); } } cmdUsersInfo; @@ -389,7 +399,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); } @@ -425,7 +436,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -466,7 +478,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -506,7 +519,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -546,7 +560,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -586,7 +601,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -629,7 +645,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -674,7 +691,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); const auto authzManager = AuthorizationManager::get(opCtx->getServiceContext()); @@ -711,7 +729,11 @@ public: const BSONObj& cmdObj, BSONObjBuilder& result) { return Grid::get(opCtx)->catalogClient()->runUserManagementReadCommand( - opCtx, dbname, CommandHelpers::filterCommandRequestForPassthrough(cmdObj), &result); + opCtx, + dbname, + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), + &result); } } cmdRolesInfo; @@ -798,7 +820,8 @@ public: opCtx, getName(), dbname, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), &result); } diff --git a/src/mongo/s/commands/cluster_validate_cmd.cpp b/src/mongo/s/commands/cluster_validate_cmd.cpp index 510b299ac35..f56261282e4 100644 --- a/src/mongo/s/commands/cluster_validate_cmd.cpp +++ b/src/mongo/s/commands/cluster_validate_cmd.cpp @@ -80,7 +80,8 @@ public: nss.db(), nss, routingInfo, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), ReadPreferenceSetting::get(opCtx), Shard::RetryPolicy::kIdempotent, {}, diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index e1b8a7a41e2..5732d1243d3 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -190,14 +190,14 @@ boost::optional<WouldChangeOwningShardInfo> getWouldChangeOwningShardErrorInfo( * inserts the new one. Returns whether or not we actually complete the delete and insert. */ bool handleWouldChangeOwningShardError(OperationContext* opCtx, - const BatchedCommandRequest& request, + BatchedCommandRequest* request, BatchedCommandResponse* response, BatchWriteExecStats stats) { auto txnRouter = TransactionRouter::get(opCtx); bool isRetryableWrite = opCtx->getTxnNumber() && !txnRouter; auto wouldChangeOwningShardErrorInfo = - getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite); + getWouldChangeOwningShardErrorInfo(opCtx, *request, response, !isRetryableWrite); if (!wouldChangeOwningShardErrorInfo) return false; @@ -214,12 +214,17 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + // Ensure the retried operation does not include WC inside the transaction. The + // transaction commit will still use the WC, because it uses the WC from the opCtx + // (which has been set previously in Strategy). + request->unsetWriteConcern(); + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); // Clear the error details from the response object before sending the write again response->unsetErrDetails(); - ClusterWriter::write(opCtx, request, &stats, response); + ClusterWriter::write(opCtx, *request, &stats, response); wouldChangeOwningShardErrorInfo = - getWouldChangeOwningShardErrorInfo(opCtx, request, response, !isRetryableWrite); + getWouldChangeOwningShardErrorInfo(opCtx, *request, response, !isRetryableWrite); if (!wouldChangeOwningShardErrorInfo) uassertStatusOK(response->toStatus()); @@ -228,7 +233,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, // new one. updatedShardKey = wouldChangeOwningShardErrorInfo && documentShardKeyUpdateUtil::updateShardKeyForDocument( - opCtx, request.getNS(), wouldChangeOwningShardErrorInfo.get()); + opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get()); // If the operation was an upsert, record the _id of the new document. if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) { @@ -272,7 +277,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, try { // Delete the original document and insert the new one updatedShardKey = documentShardKeyUpdateUtil::updateShardKeyForDocument( - opCtx, request.getNS(), wouldChangeOwningShardErrorInfo.get()); + opCtx, request->getNS(), wouldChangeOwningShardErrorInfo.get()); // If the operation was an upsert, record the _id of the new document. if (updatedShardKey && wouldChangeOwningShardErrorInfo->getShouldUpsert()) { @@ -441,7 +446,7 @@ private: bool updatedShardKey = false; if (_batchedRequest.getBatchType() == BatchedCommandRequest::BatchType_Update) { updatedShardKey = - handleWouldChangeOwningShardError(opCtx, batchedRequest, &response, stats); + handleWouldChangeOwningShardError(opCtx, &batchedRequest, &response, stats); } // Populate the lastError object based on the write response diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 1c20246854e..ad038b48066 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -144,7 +144,8 @@ protected: // Here, we first filter the command before appending an UNSHARDED shardVersion, because // "shardVersion" is one of the fields that gets filtered out. - BSONObj filteredCmdObj(CommandHelpers::filterCommandRequestForPassthrough(cmdObj)); + BSONObj filteredCmdObj(applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj))); BSONObj filteredCmdObjWithVersion( appendShardVersion(filteredCmdObj, ChunkVersion::UNSHARDED())); @@ -235,8 +236,8 @@ public: opCtx, ReadPreferenceSetting(ReadPreference::PrimaryOnly), "admin", - configsvrRenameCollectionRequest.toBSON( - CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), + configsvrRenameCollectionRequest.toBSON(applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj))), Shard::RetryPolicy::kIdempotent)); uassertStatusOK(cmdResponse.commandStatus); @@ -260,8 +261,10 @@ public: NamespaceString::kAdminDb, fromNss, fromRoutingInfo, - appendAllowImplicitCreate(CommandHelpers::filterCommandRequestForPassthrough(cmdObj), - true), + appendAllowImplicitCreate( + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), + true), Shard::RetryPolicy::kNoRetry, &result); } @@ -310,7 +313,8 @@ public: dbName, nss, routingInfo, - CommandHelpers::filterCommandRequestForPassthrough(cmdObj), + applyReadWriteConcern( + opCtx, this, CommandHelpers::filterCommandRequestForPassthrough(cmdObj)), Shard::RetryPolicy::kIdempotent, &result); } @@ -494,7 +498,7 @@ public: opCtx, dbName, dbInfoStatus.getValue(), - newCmd, + applyReadWriteConcern(opCtx, this, newCmd), nss, &result, uassertStatusOK(AuthorizationSession::get(opCtx->getClient()) @@ -554,7 +558,7 @@ public: opCtx, nss.db(), routingInfo.db(), - cmdObj, + applyReadWriteConcern(opCtx, this, cmdObj), nss, &result, {Privilege(ResourcePattern::forExactNamespace(nss), ActionType::listIndexes)}); diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 3bcb77f840c..631438a95bf 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -57,6 +57,7 @@ #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" +#include "mongo/db/read_write_concern_defaults.h" #include "mongo/db/stats/counters.h" #include "mongo/db/transaction_validation.h" #include "mongo/db/views/resolved_view.h" @@ -243,23 +244,6 @@ void execCommandClient(OperationContext* opCtx, globalOpCounters.gotCommand(); } - auto wcResult = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)); - - bool supportsWriteConcern = invocation->supportsWriteConcern(); - if (!supportsWriteConcern && !wcResult.usedDefault) { - // This command doesn't do writes so it should not be passed a writeConcern. - // If we did not use the default writeConcern, one was provided when it shouldn't have - // been by the user. - auto body = result->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow( - body, Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern")); - return; - } - - if (TransactionRouter::get(opCtx)) { - validateWriteConcernForTransaction(wcResult, c->getName()); - } - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { uassert(ErrorCodes::InvalidOptions, @@ -280,24 +264,13 @@ void execCommandClient(OperationContext* opCtx, } auto txnRouter = TransactionRouter::get(opCtx); - if (!supportsWriteConcern) { - if (txnRouter) { - invokeInTransactionRouter(opCtx, invocation, result); - } else { - invocation->run(opCtx, result); - } + if (txnRouter) { + invokeInTransactionRouter(opCtx, invocation, result); } else { - // Change the write concern while running the command. - const auto oldWC = opCtx->getWriteConcern(); - ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - opCtx->setWriteConcern(wcResult); - - if (txnRouter) { - invokeInTransactionRouter(opCtx, invocation, result); - } else { - invocation->run(opCtx, result); - } + invocation->run(opCtx, result); + } + if (invocation->supportsWriteConcern()) { failCommand.executeIf( [&](const BSONObj& data) { result->getBodyBuilder().append(data["writeConcernError"]); @@ -393,6 +366,8 @@ void runCommand(OperationContext* opCtx, auto allowTransactionsOnConfigDatabase = false; validateSessionOptions(osi, command->getName(), nss, allowTransactionsOnConfigDatabase); + auto wc = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)); + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); auto readConcernParseStatus = [&]() { // We must obtain the client lock to set the ReadConcernArgs on the operation @@ -440,6 +415,39 @@ void runCommand(OperationContext* opCtx, txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); } + bool supportsWriteConcern = invocation->supportsWriteConcern(); + if (!supportsWriteConcern && !wc.usedDefault) { + // This command doesn't do writes so it should not be passed a writeConcern. + // If we did not use the default writeConcern, one was provided when it shouldn't have + // been by the user. + auto responseBuilder = replyBuilder->getBodyBuilder(); + CommandHelpers::appendCommandStatusNoThrow( + responseBuilder, + Status(ErrorCodes::InvalidOptions, "Command does not support writeConcern")); + return; + } + + if (supportsWriteConcern && wc.usedDefault && + (!TransactionRouter::get(opCtx) || + commandSupportsWriteConcernInTransaction(commandName))) { + // This command supports WC, but wasn't given one - so apply the default, if there is + // one. + if (const auto wcDefault = ReadWriteConcernDefaults::get(opCtx->getServiceContext()) + .getDefaultWriteConcern()) { + wc = *wcDefault; + LOG(2) << "Applying default writeConcern on " << request.getCommandName() << " of " + << wcDefault->toBSON(); + } + } + + if (TransactionRouter::get(opCtx)) { + validateWriteConcernForTransaction(wc, commandName); + } + + if (supportsWriteConcern) { + opCtx->setWriteConcern(wc); + } + for (int tries = 0;; ++tries) { // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. bool canRetry = tries < kMaxNumStaleVersionRetries - 1; diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index bba83f7ad6e..309d3f4f83e 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -542,6 +542,9 @@ BatchedCommandRequest BatchWriteOp::buildBatchRequest( // writeErrors request.setWriteConcern(upgradeWriteConcern(_clientRequest.getWriteConcern())); } + } else if (!TransactionRouter::get(_opCtx)) { + // Apply the WC from the opCtx (except if in a transaction). + request.setWriteConcern(_opCtx->getWriteConcern().toBSON()); } request.setAllowImplicitCreate(_clientRequest.isImplicitCreateAllowed()); diff --git a/src/mongo/s/write_ops/batched_command_request.h b/src/mongo/s/write_ops/batched_command_request.h index eb0d3439e9e..540f2c5f34d 100644 --- a/src/mongo/s/write_ops/batched_command_request.h +++ b/src/mongo/s/write_ops/batched_command_request.h @@ -92,6 +92,10 @@ public: _writeConcern = writeConcern.getOwned(); } + void unsetWriteConcern() { + _writeConcern = boost::none; + } + bool hasWriteConcern() const { return _writeConcern.is_initialized(); } |