diff options
Diffstat (limited to 'src/mongo/s')
11 files changed, 82 insertions, 37 deletions
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index ff5c2d8ef4a..4e2bdcbac76 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -69,7 +69,7 @@ std::unique_ptr<QueryRequest> parseCmdObjectToQueryRequest(OperationContext* opC // operation in a transaction, or not running in a transaction, then use the readConcern // from the opCtx (which may be a cluster-wide default). const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - qr->setReadConcern(readConcernArgs.toBSON()["readConcern"].Obj()); + qr->setReadConcern(readConcernArgs.toBSONInner()); } } uassert( diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 823360ac898..d7fc3224347 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -467,6 +467,30 @@ private: BSONObjBuilder& result) const { BatchWriteExecStats stats; BatchedCommandResponse response; + + // The batched request will only have WC if it was supplied by the client. Otherwise, the + // batched request should use the WC from the opCtx. + if (!batchedRequest.hasWriteConcern()) { + if (opCtx->getWriteConcern().usedDefault) { + // Pass writeConcern: {}, rather than {w: 1, wtimeout: 0}, so as to not override the + // configsvr w:majority upconvert. + batchedRequest.setWriteConcern(BSONObj()); + } else { + batchedRequest.setWriteConcern(opCtx->getWriteConcern().toBSON()); + } + } + + // Write ops are never allowed to have writeConcern inside transactions. Normally + // disallowing WC on non-terminal commands in a transaction is handled earlier, during + // command dispatch. However, if this is a regular write operation being automatically + // retried inside a transaction (such as changing a document's shard key across shards), + // then batchedRequest will have a writeConcern (added by the if() above) from when it was + // initially run outside a transaction. Thus it's necessary to unconditionally clear the + // writeConcern when in a transaction. + if (TransactionRouter::get(opCtx)) { + batchedRequest.unsetWriteConcern(); + } + ClusterWriter::write(opCtx, batchedRequest, &stats, &response); bool updatedShardKey = false; diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index 0988e6fff7f..aeeeb965b33 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -146,7 +146,10 @@ BSONObj createCommandForMergingShard(Document serializedCommand, mergeCmd.remove("readConcern"); } - return mergeCmd.freeze().toBson(); + return applyReadWriteConcern(mergeCtx->opCtx, + !(txnRouter && mergingShardContributesData), /* appendRC */ + !mergeCtx->explain, /* appendWC */ + mergeCmd.freeze().toBson()); } Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -389,7 +392,10 @@ DispatchShardPipelineResults dispatchExchangeConsumerPipeline( expCtx, serializedCommand, consumerPipelines.back(), boost::none, false); requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx], - consumerCmdObj); + applyReadWriteConcern(opCtx, + true, /* appendRC */ + !expCtx->explain, /* appendWC */ + consumerCmdObj)); } auto cursors = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), diff --git a/src/mongo/s/request_types/merge_chunk_request_test.cpp b/src/mongo/s/request_types/merge_chunk_request_test.cpp index 94a7bf68511..88d5d165148 100644 --- a/src/mongo/s/request_types/merge_chunk_request_test.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_test.cpp @@ -61,13 +61,13 @@ TEST(MergeChunkRequest, ConfigCommandtoBSON) { << BSON_ARRAY(BSON("a" << 1) << BSON("a" << 5) << BSON("a" << 10)) << "shard" << "shard0000" << "validAfter" << Timestamp{100}); - BSONObj writeConcernObj = BSON("writeConcern" << BSON("w" - << "majority")); + BSONObj writeConcernObj = BSON("w" + << "majority"); BSONObjBuilder cmdBuilder; { cmdBuilder.appendElements(serializedRequest); - cmdBuilder.appendElements(writeConcernObj); + cmdBuilder.append("writeConcern", writeConcernObj); } auto request = assertGet(MergeChunkRequest::parseFromConfigCommand(serializedRequest)); diff --git a/src/mongo/s/request_types/merge_chunk_request_type.cpp b/src/mongo/s/request_types/merge_chunk_request_type.cpp index 4bd9844578b..387bacc16b0 100644 --- a/src/mongo/s/request_types/merge_chunk_request_type.cpp +++ b/src/mongo/s/request_types/merge_chunk_request_type.cpp @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/db/write_concern_options.h" namespace mongo { namespace { @@ -131,7 +132,7 @@ BSONObj MergeChunkRequest::toConfigCommandBSON(const BSONObj& writeConcern) { appendAsConfigCommand(&cmdBuilder); // Tack on passed-in writeConcern - cmdBuilder.appendElements(writeConcern); + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, writeConcern); return cmdBuilder.obj(); } diff --git a/src/mongo/s/request_types/migration_secondary_throttle_options.cpp b/src/mongo/s/request_types/migration_secondary_throttle_options.cpp index d739671b1e3..c143fc9ea60 100644 --- a/src/mongo/s/request_types/migration_secondary_throttle_options.cpp +++ b/src/mongo/s/request_types/migration_secondary_throttle_options.cpp @@ -98,8 +98,12 @@ StatusWith<MigrationSecondaryThrottleOptions> MigrationSecondaryThrottleOptions: } if (secondaryThrottle != kOn) { - return Status(ErrorCodes::UnsupportedFormat, - "Cannot specify write concern when secondaryThrottle is not set"); + // Ignore the specified writeConcern, since it won't be used. This is necessary + // to normalize the otherwise non-standard way that moveChunk uses writeConcern (ie. + // only using it when secondaryThrottle: true), so that shardsvrs can enforce always + // receiving writeConcern on internalClient connections (at the ServiceEntryPoint + // layer). + return MigrationSecondaryThrottleOptions(secondaryThrottle, boost::none); } writeConcernBSON = writeConcernElem.Obj().getOwned(); diff --git a/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp b/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp index b295e3f0b3d..59c3fb130bd 100644 --- a/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp +++ b/src/mongo/s/request_types/migration_secondary_throttle_options_test.cpp @@ -168,20 +168,24 @@ TEST(MigrationSecondaryThrottleOptions, DisabledInBalancerConfig) { ASSERT_EQ(MigrationSecondaryThrottleOptions::kOff, options.getSecondaryThrottle()); } -TEST(MigrationSecondaryThrottleOptions, ParseFailsDisabledInCommandBSONWriteConcernSpecified) { - auto status = MigrationSecondaryThrottleOptions::createFromCommand( - BSON("someOtherField" << 1 << "secondaryThrottle" << false << "writeConcern" - << BSON("w" - << "majority"))); - ASSERT_EQ(ErrorCodes::UnsupportedFormat, status.getStatus().code()); -} - -TEST(MigrationSecondaryThrottleOptions, ParseFailsNotSpecifiedInCommandBSONWriteConcernSpecified) { - auto status = MigrationSecondaryThrottleOptions::createFromCommand( - BSON("someOtherField" << 1 << "writeConcern" - << BSON("w" - << "majority"))); - ASSERT_EQ(ErrorCodes::UnsupportedFormat, status.getStatus().code()); +TEST(MigrationSecondaryThrottleOptions, IgnoreWriteConcernWhenSecondaryThrottleOff) { + MigrationSecondaryThrottleOptions options = + assertGet(MigrationSecondaryThrottleOptions::createFromCommand( + BSON("someOtherField" << 1 << "_secondaryThrottle" << false << "writeConcern" + << BSON("w" + << "majority")))); + ASSERT_EQ(MigrationSecondaryThrottleOptions::kOff, options.getSecondaryThrottle()); + ASSERT(!options.isWriteConcernSpecified()); +} + +TEST(MigrationSecondaryThrottleOptions, IgnoreWriteConcernWhenSecondaryThrottleAbsent) { + MigrationSecondaryThrottleOptions options = + assertGet(MigrationSecondaryThrottleOptions::createFromCommand( + BSON("someOtherField" << 1 << "writeConcern" + << BSON("w" + << "majority")))); + ASSERT_EQ(MigrationSecondaryThrottleOptions::kDefault, options.getSecondaryThrottle()); + ASSERT(!options.isWriteConcernSpecified()); } TEST(MigrationSecondaryThrottleOptions, EqualityOperatorSameValue) { diff --git a/src/mongo/s/request_types/split_chunk_request_test.cpp b/src/mongo/s/request_types/split_chunk_request_test.cpp index 1727c3aa792..5759519a2b4 100644 --- a/src/mongo/s/request_types/split_chunk_request_test.cpp +++ b/src/mongo/s/request_types/split_chunk_request_test.cpp @@ -77,13 +77,13 @@ TEST(SplitChunkRequest, ConfigCommandtoBSON) { << "collEpoch" << OID("7fffffff0000000000000001") << "min" << BSON("a" << 1) << "max" << BSON("a" << 10) << "splitPoints" << BSON_ARRAY(BSON("a" << 5)) << "shard" << "shard0000"); - BSONObj writeConcernObj = BSON("writeConcern" << BSON("w" - << "majority")); + BSONObj writeConcernObj = BSON("w" + << "majority"); BSONObjBuilder cmdBuilder; { cmdBuilder.appendElements(serializedRequest); - cmdBuilder.appendElements(writeConcernObj); + cmdBuilder.append("writeConcern", writeConcernObj); } auto request = assertGet(SplitChunkRequest::parseFromConfigCommand(serializedRequest)); diff --git a/src/mongo/s/request_types/split_chunk_request_type.cpp b/src/mongo/s/request_types/split_chunk_request_type.cpp index 6773e413197..20e826c1400 100644 --- a/src/mongo/s/request_types/split_chunk_request_type.cpp +++ b/src/mongo/s/request_types/split_chunk_request_type.cpp @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobjbuilder.h" #include "mongo/bson/util/bson_extract.h" +#include "mongo/db/write_concern_options.h" namespace mongo { @@ -120,7 +121,7 @@ BSONObj SplitChunkRequest::toConfigCommandBSON(const BSONObj& writeConcern) { appendAsConfigCommand(&cmdBuilder); // Tack on passed-in writeConcern - cmdBuilder.appendElements(writeConcern); + cmdBuilder.append(WriteConcernOptions::kWriteConcernField, writeConcern); return cmdBuilder.obj(); } diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 5d5b2ee0417..a37ccd808b8 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -129,9 +129,9 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd, repl::ReadConcernArgs readConcernArgs, boost::optional<LogicalTime> atClusterTime, bool doAppendStartTransaction) { - auto cmdWithReadConcern = !readConcernArgs.isEmpty() - ? appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime) - : std::move(cmd); + // startTransaction: true always requires readConcern, even if it's empty. + auto cmdWithReadConcern = + appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime); BSONObjBuilder bob(std::move(cmdWithReadConcern)); @@ -668,8 +668,8 @@ void TransactionRouter::Router::_assertAbortStatusIsOkOrNoSuchTransaction( << " from shard: " << response.shardId, status.isOK() || status.code() == ErrorCodes::NoSuchTransaction); - // abortTransaction is sent with no write concern, so there's no need to check for a write - // concern error. + // abortTransaction is sent with "local" write concern (w: 1), so there's no need to check for a + // write concern error. } std::vector<ShardId> TransactionRouter::Router::_getPendingParticipants() const { @@ -689,7 +689,10 @@ void TransactionRouter::Router::_clearPendingParticipants(OperationContext* opCt // transactions will be left open if the retry does not re-target any of these shards. std::vector<AsyncRequestsSender::Request> abortRequests; for (const auto& participant : pendingParticipants) { - abortRequests.emplace_back(participant, BSON("abortTransaction" << 1)); + abortRequests.emplace_back(participant, + BSON("abortTransaction" + << 1 << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions().toBSON())); } auto responses = gatherResponses(opCtx, NamespaceString::kAdminDb, @@ -1224,7 +1227,8 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC p().terminationInitiated = true; - auto abortCmd = BSON("abortTransaction" << 1); + auto abortCmd = BSON("abortTransaction" << 1 << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions().toBSON()); std::vector<AsyncRequestsSender::Request> abortRequests; for (const auto& participantEntry : o().participants) { abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 18bc6833570..b5c313c4299 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -733,7 +733,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFir ErrorCodes::InvalidOptions); } -TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToParticipants) { +TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughEmptyReadConcernToParticipants) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); TxnNumber txnNum{3}; @@ -745,8 +745,9 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToPart BSONObj expectedNewObj = BSON("insert" << "test" - << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum); + << "readConcern" << BSONObj() << "startTransaction" << true + << "coordinator" << true << "autocommit" << false << "txnNumber" + << txnNum); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, |