summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2018-09-06 15:56:56 -0400
committerRandolph Tan <randolph@10gen.com>2018-09-21 11:48:16 -0400
commit31fb995848293b1a47506bef7ca2f1fe090662b5 (patch)
treef8605fd5c0a94f04f436f02288e2b308b876f598 /src/mongo/s/query
parent73e7f1883f492dbefe25badd1ebdfed34859d844 (diff)
downloadmongo-31fb995848293b1a47506bef7ca2f1fe090662b5.tar.gz
SERVER-37016 Don't stash TransactionRouter inside ShardingTaskExecutor
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/SConscript2
-rw-r--r--src/mongo/s/query/async_results_merger.cpp5
-rw-r--r--src/mongo/s/query/async_results_merger.h3
-rw-r--r--src/mongo/s/query/async_results_merger_params.idl2
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp2
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp49
-rw-r--r--src/mongo/s/query/cluster_aggregation_planner.cpp18
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h15
-rw-r--r--src/mongo/s/query/cluster_find.cpp4
-rw-r--r--src/mongo/s/query/establish_cursors.cpp14
-rw-r--r--src/mongo/s/query/results_merger_test_fixture.h11
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp5
12 files changed, 104 insertions, 26 deletions
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index e5d2111c955..abd8335a085 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -101,7 +101,7 @@ env.Library(
"$BUILD_DIR/mongo/db/query/command_request_response",
"$BUILD_DIR/mongo/db/query/query_common",
"$BUILD_DIR/mongo/executor/task_executor_interface",
- "$BUILD_DIR/mongo/s/async_requests_sender",
+ "$BUILD_DIR/mongo/s/multi_statement_transaction_requests_sender",
"$BUILD_DIR/mongo/s/client/sharding_client",
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
],
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 3cc6756c843..1723358b06c 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -367,6 +367,11 @@ Status AsyncResultsMerger::_askForNextBatch(WithLock, size_t remoteIndex) {
newCmdBob.append(OperationSessionInfo::kTxnNumberFieldName, *_params.getTxnNumber());
}
+ if (_params.getAutocommit()) {
+ newCmdBob.append(OperationSessionInfoFromClient::kAutocommitFieldName,
+ *_params.getAutocommit());
+ }
+
cmdObj = newCmdBob.obj();
}
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index 488e03d2ee5..b1c3cd0406d 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -277,6 +277,9 @@ private:
// The exact host in the shard on which the cursor resides.
HostAndPort shardHostAndPort;
+ // The identity of the shard which the cursor belongs to.
+ ShardId shardId;
+
// The buffer of results that have been retrieved but not yet returned to the caller.
std::queue<ClusterQueryResult> docBuffer;
diff --git a/src/mongo/s/query/async_results_merger_params.idl b/src/mongo/s/query/async_results_merger_params.idl
index 2d85faf730f..db2a11fd925 100644
--- a/src/mongo/s/query/async_results_merger_params.idl
+++ b/src/mongo/s/query/async_results_merger_params.idl
@@ -63,7 +63,7 @@ structs:
AsyncResultsMergerParams:
description: The parameters needed to establish an AsyncResultsMerger.
chained_structs:
- OperationSessionInfo : OperationSessionInfo
+ OperationSessionInfoFromClient : OperationSessionInfo
fields:
sort:
type: object
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index b852cf33f79..d4a088d5e27 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -1786,7 +1786,7 @@ DEATH_TEST_F(AsyncResultsMergerTest,
"Invariant failure params.getSessionId()") {
AsyncResultsMergerParams params;
- OperationSessionInfo sessionInfo;
+ OperationSessionInfoFromClient sessionInfo;
sessionInfo.setTxnNumber(5);
params.setOperationSessionInfo(sessionInfo);
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 44d3e3bb0f0..426445bb3bc 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -169,6 +169,7 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx,
*/
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
OperationContext* opCtx,
+ const boost::optional<ShardId>& shardId,
const AggregationRequest& request,
BSONObj collationObj) {
cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
@@ -192,12 +193,22 @@ BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
Value(static_cast<long long>(*opCtx->getTxnNumber()));
}
+ auto aggCmd = cmdForShards.freeze().toBson();
+
+ if (shardId) {
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ auto& participant = txnRouter->getOrCreateParticipant(*shardId);
+ aggCmd = participant.attachTxnFieldsIfNeeded(aggCmd);
+ }
+ }
+
// agg creates temp collection and should handle implicit create separately.
- return appendAllowImplicitCreate(cmdForShards.freeze().toBson(), true);
+ return appendAllowImplicitCreate(aggCmd, true);
}
BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
+ const boost::optional<ShardId>& shardId,
Pipeline* pipeline,
const BSONObj& originalCmdObj,
BSONObj collationObj) {
@@ -209,7 +220,7 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
// This pipeline is not split, ensure that the write concern is propagated if present.
targetedCmd["writeConcern"] = Value(originalCmdObj["writeConcern"]);
- return genericTransformForShards(std::move(targetedCmd), opCtx, request, collationObj);
+ return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj);
}
BSONObj createCommandForTargetedShards(
@@ -238,12 +249,14 @@ BSONObj createCommandForTargetedShards(
targetedCmd[AggregationRequest::kExchangeName] =
exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
- return genericTransformForShards(std::move(targetedCmd), opCtx, request, collationObj);
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, boost::none, request, collationObj);
}
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const BSONObj originalCmdObj,
+ const ShardId& shardId,
const Pipeline* pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
@@ -259,8 +272,15 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
: Value(Document{CollationSpec::kSimpleSpec});
}
+ auto aggCmd = mergeCmd.freeze().toBson();
+
+ if (auto txnRouter = TransactionRouter::get(mergeCtx->opCtx)) {
+ auto& participant = txnRouter->getOrCreateParticipant(shardId);
+ aggCmd = participant.attachTxnFieldsIfNeeded(aggCmd);
+ }
+
// agg creates temp collection and should handle implicit create separately.
- return appendAllowImplicitCreate(mergeCmd.freeze().toBson(), true);
+ return appendAllowImplicitCreate(aggCmd, true);
}
std::vector<RemoteCursor> establishShardCursors(
@@ -426,7 +446,7 @@ DispatchShardPipelineResults dispatchShardPipeline(
? createCommandForTargetedShards(
opCtx, aggRequest, *splitPipeline, collationObj, exchangeSpec, true)
: createPassthroughCommandForShard(
- opCtx, aggRequest, pipeline.get(), originalCmdObj, collationObj);
+ opCtx, aggRequest, boost::none, pipeline.get(), originalCmdObj, collationObj);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
@@ -628,6 +648,10 @@ BSONObj establishMergingMongosCursor(
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
+ if (TransactionRouter::get(opCtx)) {
+ params.isAutoCommit = false;
+ }
+
auto ccc = cluster_aggregation_planner::buildClusterCursor(
opCtx, std::move(pipelineForMerging), std::move(params));
@@ -966,7 +990,8 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
targetedShards,
routingInfo->db().primaryId());
- auto mergeCmdObj = createCommandForMergingShard(request, expCtx, cmdObj, mergePipeline);
+ auto mergeCmdObj =
+ createCommandForMergingShard(request, expCtx, cmdObj, mergingShardId, mergePipeline);
// Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
auto mergeResponse =
@@ -1144,16 +1169,15 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
}
auto shard = std::move(swShard.getValue());
- // aggPassthrough is for unsharded collections since changing primary shardId will cause SSV
- // error and hence shardId history does not need to be verified.
- if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ auto txnRouter = TransactionRouter::get(opCtx);
+ if (txnRouter) {
txnRouter->computeAtClusterTimeForOneShard(opCtx, shardId);
}
// Format the command for the shard. This adds the 'fromMongos' field, wraps the command as an
// explain if necessary, and rewrites the result into a format safe to forward to shards.
cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
- createPassthroughCommandForShard(opCtx, aggRequest, nullptr, cmdObj, BSONObj()));
+ createPassthroughCommandForShard(opCtx, aggRequest, shardId, nullptr, cmdObj, BSONObj()));
auto cmdResponse = uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
opCtx,
@@ -1163,6 +1187,11 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
: std::move(cmdObj),
Shard::RetryPolicy::kIdempotent));
+ if (txnRouter) {
+ auto& participant = txnRouter->getOrCreateParticipant(shardId);
+ participant.markAsCommandSent();
+ }
+
if (ErrorCodes::isStaleShardVersionError(cmdResponse.commandStatus.code())) {
uassertStatusOK(
cmdResponse.commandStatus.withContext("command failed because of stale config"));
diff --git a/src/mongo/s/query/cluster_aggregation_planner.cpp b/src/mongo/s/query/cluster_aggregation_planner.cpp
index ba9dc325461..b799560db35 100644
--- a/src/mongo/s/query/cluster_aggregation_planner.cpp
+++ b/src/mongo/s/query/cluster_aggregation_planner.cpp
@@ -50,6 +50,7 @@
#include "mongo/s/query/router_stage_skip.h"
#include "mongo/s/shard_id.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/s/transaction/transaction_router.h"
namespace mongo {
namespace cluster_aggregation_planner {
@@ -383,9 +384,22 @@ void addMergeCursorsSource(Pipeline* mergePipeline,
armParams.setTailableMode(mergePipeline->getContext()->tailableMode);
armParams.setNss(mergePipeline->getContext()->ns);
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(opCtx->getLogicalSessionId());
+ OperationSessionInfoFromClient sessionInfo;
+ boost::optional<LogicalSessionFromClient> lsidFromClient;
+
+ auto lsid = opCtx->getLogicalSessionId();
+ if (lsid) {
+ lsidFromClient.emplace(lsid->getId());
+ lsidFromClient->setUid(lsid->getUid());
+ }
+
+ sessionInfo.setSessionId(lsidFromClient);
sessionInfo.setTxnNumber(opCtx->getTxnNumber());
+
+ if (TransactionRouter::get(opCtx)) {
+ sessionInfo.setAutocommit(false);
+ }
+
armParams.setOperationSessionInfo(sessionInfo);
// For change streams, we need to set up a custom stage to establish cursors on new shards when
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index a853d26a99f..bd873ebb4d0 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -86,9 +86,17 @@ struct ClusterClientCursorParams {
armParams.setNss(nsString);
armParams.setAllowPartialResults(isAllowPartialResults);
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(lsid);
+ OperationSessionInfoFromClient sessionInfo;
+ boost::optional<LogicalSessionFromClient> lsidFromClient;
+
+ if (lsid) {
+ lsidFromClient.emplace(lsid->getId());
+ lsidFromClient->setUid(lsid->getUid());
+ }
+
+ sessionInfo.setSessionId(lsidFromClient);
sessionInfo.setTxnNumber(txnNumber);
+ sessionInfo.setAutocommit(isAutoCommit);
armParams.setOperationSessionInfo(sessionInfo);
return armParams;
@@ -139,6 +147,9 @@ struct ClusterClientCursorParams {
// The transaction number of the command that created the cursor.
boost::optional<TxnNumber> txnNumber;
+
+ // Set to false for multi statement transactions.
+ boost::optional<bool> isAutoCommit;
};
} // mongo
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 0f49c9a9f57..19ae450ff3e 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -241,6 +241,10 @@ CursorId runQueryWithoutRetrying(OperationContext* opCtx,
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
+ if (TransactionRouter::get(opCtx)) {
+ params.isAutoCommit = false;
+ }
+
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
// usually use the batchSize associated with the initial find, but as it is illegal to send a
// getMore with a batchSize of 0, we set it to use the default batchSize logic.
diff --git a/src/mongo/s/query/establish_cursors.cpp b/src/mongo/s/query/establish_cursors.cpp
index 8cdb30d1c0b..a4198e0726c 100644
--- a/src/mongo/s/query/establish_cursors.cpp
+++ b/src/mongo/s/query/establish_cursors.cpp
@@ -39,8 +39,8 @@
#include "mongo/db/query/killcursors_request.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/executor/remote_command_response.h"
-#include "mongo/s/async_requests_sender.h"
#include "mongo/s/grid.h"
+#include "mongo/s/multi_statement_transaction_requests_sender.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -60,12 +60,12 @@ std::vector<RemoteCursor> establishCursors(OperationContext* opCtx,
}
// Send the requests
- AsyncRequestsSender ars(opCtx,
- executor,
- nss.db().toString(),
- std::move(requests),
- readPref,
- Shard::RetryPolicy::kIdempotent);
+ MultiStatementTransactionRequestsSender ars(opCtx,
+ executor,
+ nss.db().toString(),
+ std::move(requests),
+ readPref,
+ Shard::RetryPolicy::kIdempotent);
std::vector<RemoteCursor> remoteCursors;
try {
diff --git a/src/mongo/s/query/results_merger_test_fixture.h b/src/mongo/s/query/results_merger_test_fixture.h
index 641d934a2cd..8119d8e70de 100644
--- a/src/mongo/s/query/results_merger_test_fixture.h
+++ b/src/mongo/s/query/results_merger_test_fixture.h
@@ -86,8 +86,15 @@ protected:
params.setAllowPartialResults(qr->isAllowPartialResults());
}
- OperationSessionInfo sessionInfo;
- sessionInfo.setSessionId(operationContext()->getLogicalSessionId());
+ OperationSessionInfoFromClient sessionInfo;
+ boost::optional<LogicalSessionFromClient> lsidFromClient;
+
+ if (auto lsid = operationContext()->getLogicalSessionId()) {
+ lsidFromClient.emplace(lsid->getId());
+ lsidFromClient->setUid(lsid->getUid());
+ }
+
+ sessionInfo.setSessionId(lsidFromClient);
sessionInfo.setTxnNumber(operationContext()->getTxnNumber());
params.setOperationSessionInfo(sessionInfo);
return params;
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 28db3452a29..b50177ba246 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -41,6 +41,7 @@
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/shard_id.h"
+#include "mongo/s/transaction/transaction_router.h"
namespace mongo {
@@ -119,6 +120,10 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
params.lsid = opCtx->getLogicalSessionId();
params.txnNumber = opCtx->getTxnNumber();
+ if (TransactionRouter::get(opCtx)) {
+ params.isAutoCommit = false;
+ }
+
auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));
// We don't expect to use this cursor until a subsequent getMore, so detach from the current