diff options
author | Randolph Tan <randolph@10gen.com> | 2018-09-06 15:56:56 -0400 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2018-09-21 11:48:16 -0400 |
commit | 31fb995848293b1a47506bef7ca2f1fe090662b5 (patch) | |
tree | f8605fd5c0a94f04f436f02288e2b308b876f598 /src/mongo/s/query | |
parent | 73e7f1883f492dbefe25badd1ebdfed34859d844 (diff) | |
download | mongo-31fb995848293b1a47506bef7ca2f1fe090662b5.tar.gz |
SERVER-37016 Don't stash TransactionRouter inside ShardingTaskExecutor
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r-- | src/mongo/s/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger.h | 3 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_params.idl | 2 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregate.cpp | 49 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_aggregation_planner.cpp | 18 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_params.h | 15 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/establish_cursors.cpp | 14 | ||||
-rw-r--r-- | src/mongo/s/query/results_merger_test_fixture.h | 11 | ||||
-rw-r--r-- | src/mongo/s/query/store_possible_cursor.cpp | 5 |
12 files changed, 104 insertions, 26 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index e5d2111c955..abd8335a085 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -101,7 +101,7 @@ env.Library( "$BUILD_DIR/mongo/db/query/command_request_response", "$BUILD_DIR/mongo/db/query/query_common", "$BUILD_DIR/mongo/executor/task_executor_interface", - "$BUILD_DIR/mongo/s/async_requests_sender", + "$BUILD_DIR/mongo/s/multi_statement_transaction_requests_sender", "$BUILD_DIR/mongo/s/client/sharding_client", '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', ], diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 3cc6756c843..1723358b06c 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -367,6 +367,11 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) { newCmdBob.append(OperationSessionInfo::kTxnNumberFieldName, *_params.getTxnNumber()); } + if (_params.getAutocommit()) { + newCmdBob.append(OperationSessionInfoFromClient::kAutocommitFieldName, + *_params.getAutocommit()); + } + cmdObj = newCmdBob.obj(); } diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index 488e03d2ee5..b1c3cd0406d 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -277,6 +277,9 @@ private: // The exact host in the shard on which the cursor resides. HostAndPort shardHostAndPort; + // The identity of the shard which the cursor belongs to. + ShardId shardId; + // The buffer of results that have been retrieved but not yet returned to the caller. std::queue<ClusterQueryResult> docBuffer; diff --git a/src/mongo/s/query/async_results_merger_params.idl b/src/mongo/s/query/async_results_merger_params.idl index 2d85faf730f..db2a11fd925 100644 --- a/src/mongo/s/query/async_results_merger_params.idl +++ b/src/mongo/s/query/async_results_merger_params.idl @@ -63,7 +63,7 @@ structs: AsyncResultsMergerParams: description: The parameters needed to establish an AsyncResultsMerger. chained_structs: - OperationSessionInfo : OperationSessionInfo + OperationSessionInfoFromClient : OperationSessionInfo fields: sort: type: object diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index b852cf33f79..d4a088d5e27 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -1786,7 +1786,7 @@ DEATH_TEST_F(AsyncResultsMergerTest, "Invariant failure params.getSessionId()") { AsyncResultsMergerParams params; - OperationSessionInfo sessionInfo; + OperationSessionInfoFromClient sessionInfo; sessionInfo.setTxnNumber(5); params.setOperationSessionInfo(sessionInfo); diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 44d3e3bb0f0..426445bb3bc 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -169,6 +169,7 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx, */ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, OperationContext* opCtx, + const boost::optional<ShardId>& shardId, const AggregationRequest& request, BSONObj collationObj) { cmdForShards[AggregationRequest::kFromMongosName] = Value(true); @@ -192,12 +193,22 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards, Value(static_cast<long long>(*opCtx->getTxnNumber())); } + auto aggCmd = cmdForShards.freeze().toBson(); + + if (shardId) { + if (auto txnRouter = TransactionRouter::get(opCtx)) { + auto& participant = txnRouter->getOrCreateParticipant(*shardId); + aggCmd = participant.attachTxnFieldsIfNeeded(aggCmd); + } + } + // agg creates temp collection and should handle implicit create separately. - return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), true); + return appendAllowImplicitCreate(aggCmd, true); } BSONObj createPassthroughCommandForShard(OperationContext* opCtx, const AggregationRequest& request, + const boost::optional<ShardId>& shardId, Pipeline* pipeline, const BSONObj& originalCmdObj, BSONObj collationObj) { @@ -209,7 +220,7 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx, // This pipeline is not split, ensure that the write concern is propagated if present. targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]); - return genericTransformForShards(std::move(targetedCmd), opCtx, request, collationObj); + return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj); } BSONObj createCommandForTargetedShards( @@ -238,12 +249,14 @@ BSONObj createCommandForTargetedShards( targetedCmd[AggregationRequest::kExchangeName] = exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value(); - return genericTransformForShards(std::move(targetedCmd), opCtx, request, collationObj); + return genericTransformForShards( + std::move(targetedCmd), opCtx, boost::none, request, collationObj); } BSONObj createCommandForMergingShard(const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const BSONObj originalCmdObj, + const ShardId& shardId, const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); @@ -259,8 +272,15 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, : Value(Document{CollationSpec::kSimpleSpec}); } + auto aggCmd = mergeCmd.freeze().toBson(); + + if (auto txnRouter = TransactionRouter::get(mergeCtx->opCtx)) { + auto& participant = txnRouter->getOrCreateParticipant(shardId); + aggCmd = participant.attachTxnFieldsIfNeeded(aggCmd); + } + // agg creates temp collection and should handle implicit create separately. - return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true); + return appendAllowImplicitCreate(aggCmd, true); } std::vector<RemoteCursor> establishShardCursors( @@ -426,7 +446,7 @@ DispatchShardPipelineResults dispatchShardPipeline( ? createCommandForTargetedShards( opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true) : createPassthroughCommandForShard( - opCtx, aggRequest, pipeline.get(), originalCmdObj, collationObj); + opCtx, aggRequest, boost::none, pipeline.get(), originalCmdObj, collationObj); // Refresh the shard registry if we're targeting all shards. We need the shard registry // to be at least as current as the logical time used when creating the command for @@ -628,6 +648,10 @@ BSONObj establishMergingMongosCursor( params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); + if (TransactionRouter::get(opCtx)) { + params.isAutoCommit = false; + } + auto ccc = cluster_aggregation_planner::buildClusterCursor( opCtx, std::move(pipelineForMerging), std::move(params)); @@ -966,7 +990,8 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex targetedShards, routingInfo->db().primaryId()); - auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergePipeline); + auto mergeCmdObj = + createCommandForMergingShard(request, expCtx, cmdObj, mergingShardId, mergePipeline); // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. auto mergeResponse = @@ -1144,16 +1169,15 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, } auto shard = std::move(swShard.getValue()); - // aggPassthrough is for unsharded collections since changing primary shardId will cause SSV - // error and hence shardId history does not need to be verified. - if (auto txnRouter = TransactionRouter::get(opCtx)) { + auto txnRouter = TransactionRouter::get(opCtx); + if (txnRouter) { txnRouter->computeAtClusterTimeForOneShard(opCtx, shardId); } // Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an // explain if necessary, and rewrites the result into a format safe to forward to shards. cmdObj = CommandHelpers::filterCommandRequestForPassthrough( - createPassthroughCommandForShard(opCtx, aggRequest, nullptr, cmdObj, BSONObj())); + createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, cmdObj, BSONObj())); auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts( opCtx, @@ -1163,6 +1187,11 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, : std::move(cmdObj), Shard::RetryPolicy::kIdempotent)); + if (txnRouter) { + auto& participant = txnRouter->getOrCreateParticipant(shardId); + participant.markAsCommandSent(); + } + if (ErrorCodes::isStaleShardVersionError(cmdResponse.commandStatus.code())) { uassertStatusOK( cmdResponse.commandStatus.withContext("command failed because of stale config")); diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp index ba9dc325461..b799560db35 100644 --- a/src/mongo/s/query/cluster_aggregation_planner.cpp +++ b/src/mongo/s/query/cluster_aggregation_planner.cpp @@ -50,6 +50,7 @@ #include "mongo/s/query/router_stage_skip.h" #include "mongo/s/shard_id.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/transaction/transaction_router.h" namespace mongo { namespace cluster_aggregation_planner { @@ -383,9 +384,22 @@ void addMergeCursorsSource(Pipeline* mergePipeline, armParams.setTailableMode(mergePipeline->getContext()->tailableMode); armParams.setNss(mergePipeline->getContext()->ns); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(opCtx->getLogicalSessionId()); + OperationSessionInfoFromClient sessionInfo; + boost::optional<LogicalSessionFromClient> lsidFromClient; + + auto lsid = opCtx->getLogicalSessionId(); + if (lsid) { + lsidFromClient.emplace(lsid->getId()); + lsidFromClient->setUid(lsid->getUid()); + } + + sessionInfo.setSessionId(lsidFromClient); sessionInfo.setTxnNumber(opCtx->getTxnNumber()); + + if (TransactionRouter::get(opCtx)) { + sessionInfo.setAutocommit(false); + } + armParams.setOperationSessionInfo(sessionInfo); // For change streams, we need to set up a custom stage to establish cursors on new shards when diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index a853d26a99f..bd873ebb4d0 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -86,9 +86,17 @@ struct ClusterClientCursorParams { armParams.setNss(nsString); armParams.setAllowPartialResults(isAllowPartialResults); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(lsid); + OperationSessionInfoFromClient sessionInfo; + boost::optional<LogicalSessionFromClient> lsidFromClient; + + if (lsid) { + lsidFromClient.emplace(lsid->getId()); + lsidFromClient->setUid(lsid->getUid()); + } + + sessionInfo.setSessionId(lsidFromClient); sessionInfo.setTxnNumber(txnNumber); + sessionInfo.setAutocommit(isAutoCommit); armParams.setOperationSessionInfo(sessionInfo); return armParams; @@ -139,6 +147,9 @@ struct ClusterClientCursorParams { // The transaction number of the command that created the cursor. boost::optional<TxnNumber> txnNumber; + + // Set to false for multi statement transactions. + boost::optional<bool> isAutoCommit; }; } // mongo diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 0f49c9a9f57..19ae450ff3e 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -241,6 +241,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx, params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); + if (TransactionRouter::get(opCtx)) { + params.isAutoCommit = false; + } + // This is the batchSize passed to each subsequent getMore command issued by the cursor. We // usually use the batchSize associated with the initial find, but as it is illegal to send a // getMore with a batchSize of 0, we set it to use the default batchSize logic. diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp index 8cdb30d1c0b..a4198e0726c 100644 --- a/src/mongo/s/query/establish_cursors.cpp +++ b/src/mongo/s/query/establish_cursors.cpp @@ -39,8 +39,8 @@ #include "mongo/db/query/killcursors_request.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" -#include "mongo/s/async_requests_sender.h" #include "mongo/s/grid.h" +#include "mongo/s/multi_statement_transaction_requests_sender.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -60,12 +60,12 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx, } // Send the requests - AsyncRequestsSender ars(opCtx, - executor, - nss.db().toString(), - std::move(requests), - readPref, - Shard::RetryPolicy::kIdempotent); + MultiStatementTransactionRequestsSender ars(opCtx, + executor, + nss.db().toString(), + std::move(requests), + readPref, + Shard::RetryPolicy::kIdempotent); std::vector<RemoteCursor> remoteCursors; try { diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h index 641d934a2cd..8119d8e70de 100644 --- a/src/mongo/s/query/results_merger_test_fixture.h +++ b/src/mongo/s/query/results_merger_test_fixture.h @@ -86,8 +86,15 @@ protected: params.setAllowPartialResults(qr->isAllowPartialResults()); } - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(operationContext()->getLogicalSessionId()); + OperationSessionInfoFromClient sessionInfo; + boost::optional<LogicalSessionFromClient> lsidFromClient; + + if (auto lsid = operationContext()->getLogicalSessionId()) { + lsidFromClient.emplace(lsid->getId()); + lsidFromClient->setUid(lsid->getUid()); + } + + sessionInfo.setSessionId(lsidFromClient); sessionInfo.setTxnNumber(operationContext()->getTxnNumber()); params.setOperationSessionInfo(sessionInfo); return params; diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 28db3452a29..b50177ba246 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -41,6 +41,7 @@ #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/shard_id.h" +#include "mongo/s/transaction/transaction_router.h" namespace mongo { @@ -119,6 +120,10 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, params.lsid = opCtx->getLogicalSessionId(); params.txnNumber = opCtx->getTxnNumber(); + if (TransactionRouter::get(opCtx)) { + params.isAutoCommit = false; + } + auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); // We don't expect to use this cursor until a subsequent getMore, so detach from the current |