summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/pipeline/SConscript1
-rw-r--r--src/mongo/db/pipeline/aggregation_request.cpp9
-rw-r--r--src/mongo/db/pipeline/aggregation_request.h14
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp8
-rw-r--r--src/mongo/db/pipeline/expression_context.h3
-rw-r--r--src/mongo/db/pipeline/expression_context_for_test.h2
-rw-r--r--src/mongo/db/pipeline/expression_test.cpp38
-rw-r--r--src/mongo/db/pipeline/runtime_constants.idl50
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp27
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h5
-rw-r--r--src/mongo/db/pipeline/variables.cpp55
-rw-r--r--src/mongo/db/pipeline/variables.h29
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp19
13 files changed, 243 insertions, 17 deletions
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index d8cd96c02bc..e9183efb0eb 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -595,6 +595,7 @@ env.Library(
env.Idlc('document_source_list_sessions.idl')[0],
env.Idlc('document_source_out.idl')[0],
env.Idlc('exchange_spec.idl')[0],
+ env.Idlc('runtime_constants.idl')[0],
'resume_token.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/pipeline/aggregation_request.cpp b/src/mongo/db/pipeline/aggregation_request.cpp
index ce2dc95da88..2357693d4f0 100644
--- a/src/mongo/db/pipeline/aggregation_request.cpp
+++ b/src/mongo/db/pipeline/aggregation_request.cpp
@@ -239,6 +239,13 @@ StatusWith<AggregationRequest> AggregationRequest::parseFromBSON(
WriteConcernOptions writeConcern;
uassertStatusOK(writeConcern.parse(elem.embeddedObject()));
request.setWriteConcern(writeConcern);
+ } else if (kRuntimeConstants == fieldName) {
+ try {
+ IDLParserErrorContext ctx("internalRuntimeConstants");
+ request.setRuntimeConstants(RuntimeConstants::parse(ctx, elem.Obj()));
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
} else if (!isGenericArgument(fieldName)) {
return {ErrorCodes::FailedToParse,
str::stream() << "unrecognized field '" << elem.fieldName() << "'"};
@@ -342,6 +349,8 @@ Document AggregationRequest::serializeToCommandObj() const {
{kExchangeName, _exchangeSpec ? Value(_exchangeSpec->toBSON()) : Value()},
{WriteConcernOptions::kWriteConcernField,
_writeConcern ? Value(_writeConcern->toBSON()) : Value()},
+ // Only serialize runtime constants if any were specified.
+ {kRuntimeConstants, _runtimeConstants ? Value(_runtimeConstants->toBSON()) : Value()},
};
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/aggregation_request.h b/src/mongo/db/pipeline/aggregation_request.h
index d58bf8d2b09..fc146ef4324 100644
--- a/src/mongo/db/pipeline/aggregation_request.h
+++ b/src/mongo/db/pipeline/aggregation_request.h
@@ -36,6 +36,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/exchange_spec_gen.h"
+#include "mongo/db/pipeline/runtime_constants_gen.h"
#include "mongo/db/query/explain_options.h"
#include "mongo/db/write_concern_options.h"
@@ -63,6 +64,7 @@ public:
static constexpr StringData kHintName = "hint"_sd;
static constexpr StringData kCommentName = "comment"_sd;
static constexpr StringData kExchangeName = "exchange"_sd;
+ static constexpr StringData kRuntimeConstants = "runtimeConstants"_sd;
static constexpr long long kDefaultBatchSize = 101;
@@ -216,6 +218,10 @@ public:
return _writeConcern;
}
+ const auto& getRuntimeConstants() const {
+ return _runtimeConstants;
+ }
+
//
// Setters for optional fields.
//
@@ -284,6 +290,10 @@ public:
_writeConcern = writeConcern;
}
+ void setRuntimeConstants(RuntimeConstants runtimeConstants) {
+ _runtimeConstants = std::move(runtimeConstants);
+ }
+
private:
// Required fields.
const NamespaceString _nss;
@@ -333,5 +343,9 @@ private:
// The explicit writeConcern for the operation or boost::none if the user did not specifiy one.
boost::optional<WriteConcernOptions> _writeConcern;
+
+ // A document containing runtime constants; i.e. values that do not change once computed (e.g.
+ // $$NOW).
+ boost::optional<RuntimeConstants> _runtimeConstants;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 508f51788c2..21651f6f2b8 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -62,6 +62,11 @@ ExpressionContext::ExpressionContext(OperationContext* opCtx,
_ownedCollator = std::move(collator);
_resolvedNamespaces = std::move(resolvedNamespaces);
uuid = std::move(collUUID);
+ if (request.getRuntimeConstants()) {
+ variables.setRuntimeConstants(request.getRuntimeConstants().get());
+ } else {
+ variables.generateRuntimeConstants(opCtx);
+ }
}
ExpressionContext::ExpressionContext(OperationContext* opCtx, const CollatorInterface* collator)
@@ -170,6 +175,9 @@ intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(
expCtx->_resolvedNamespaces = _resolvedNamespaces;
+ expCtx->variables = variables;
+ expCtx->variablesParseState = variablesParseState.copyWith(expCtx->variables.useIdGenerator());
+
// Note that we intentionally skip copying the value of '_interruptCounter' because 'expCtx' is
// intended to be used for executing a separate aggregation pipeline.
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 156d58854d1..e3e9e76b2f3 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -196,6 +196,9 @@ public:
_resolvedNamespaces[nss.coll()] = std::move(resolvedNamespace);
}
+ auto getRuntimeConstants() const {
+ return variables.getRuntimeConstants();
+ }
// The explain verbosity requested by the user, or boost::none if no explain was requested.
boost::optional<ExplainOptions::Verbosity> explain;
diff --git a/src/mongo/db/pipeline/expression_context_for_test.h b/src/mongo/db/pipeline/expression_context_for_test.h
index 713e6cfe512..5438937133e 100644
--- a/src/mongo/db/pipeline/expression_context_for_test.h
+++ b/src/mongo/db/pipeline/expression_context_for_test.h
@@ -60,6 +60,8 @@ public:
// initialize with a nullptr and set post-construction.
timeZoneDatabase = TimeZoneDatabase::get(_serviceContext.getServiceContext());
opCtx = _testOpCtx.get();
+ RuntimeConstants constants(Date_t::now(), Timestamp());
+ variables.setRuntimeConstants(constants);
}
ExpressionContextForTest(OperationContext* opCtx, const AggregationRequest& request)
diff --git a/src/mongo/db/pipeline/expression_test.cpp b/src/mongo/db/pipeline/expression_test.cpp
index a5077b0b952..d0ee16fc2fa 100644
--- a/src/mongo/db/pipeline/expression_test.cpp
+++ b/src/mongo/db/pipeline/expression_test.cpp
@@ -6268,4 +6268,42 @@ public:
SuiteInstance<All> myall;
+namespace NowAndClusterTime {
+TEST(NowAndClusterTime, BasicTest) {
+ intrusive_ptr<ExpressionContextForTest> expCtx(new ExpressionContextForTest());
+
+ // $$NOW is the Date type.
+ {
+ auto expression = ExpressionFieldPath::parse(expCtx, "$$NOW", expCtx->variablesParseState);
+ Value result = expression->evaluate(Document());
+ ASSERT_EQ(result.getType(), Date);
+ }
+ // $$CLUSTER_TIME is the timestamp type.
+ {
+ auto expression =
+ ExpressionFieldPath::parse(expCtx, "$$CLUSTER_TIME", expCtx->variablesParseState);
+ Value result = expression->evaluate(Document());
+ ASSERT_EQ(result.getType(), bsonTimestamp);
+ }
+
+ // Multiple references to $$NOW must return the same value.
+ {
+ auto expression = Expression::parseExpression(
+ expCtx, fromjson("{$eq: [\"$$NOW\", \"$$NOW\"]}"), expCtx->variablesParseState);
+ Value result = expression->evaluate(Document());
+
+ ASSERT_VALUE_EQ(result, Value{true});
+ }
+ // Same is true for the $$CLUSTER_TIME.
+ {
+ auto expression =
+ Expression::parseExpression(expCtx,
+ fromjson("{$eq: [\"$$CLUSTER_TIME\", \"$$CLUSTER_TIME\"]}"),
+ expCtx->variablesParseState);
+ Value result = expression->evaluate(Document());
+
+ ASSERT_VALUE_EQ(result, Value{true});
+ }
+}
+}
} // namespace ExpressionTests
diff --git a/src/mongo/db/pipeline/runtime_constants.idl b/src/mongo/db/pipeline/runtime_constants.idl
new file mode 100644
index 00000000000..4a68db64c75
--- /dev/null
+++ b/src/mongo/db/pipeline/runtime_constants.idl
@@ -0,0 +1,50 @@
+# Copyright (C) 2018-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# Runtime constants IDL file
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+ RuntimeConstants:
+ description: A collection of values that do not change once computed. It is shipped as part
+ of an aggregation request so all shards participating in a query see the same
+ values.
+ fields:
+ localNow:
+ cpp_name: localNow
+ type: date
+ description: A value of the $$NOW variable.
+ clusterTime:
+ cpp_name: clusterTime
+ type: timestamp
+ description: A value of the $$CLUSTER_TIME variable.
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 3f87f138e78..9c84317c8d9 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -75,7 +75,7 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
- const boost::optional<ShardId>& shardId,
+ const boost::optional<RuntimeConstants>& constants,
Pipeline* pipeline,
BSONObj collationObj) {
// Create the command for the shards.
@@ -84,14 +84,19 @@ BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
targetedCmd[AggregationRequest::kPipelineName] = Value(pipeline->serialize());
}
- return genericTransformForShards(std::move(targetedCmd), opCtx, shardId, request, collationObj);
+ return genericTransformForShards(
+ std::move(targetedCmd), opCtx, request, constants, collationObj);
}
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
OperationContext* opCtx,
- const boost::optional<ShardId>& shardId,
const AggregationRequest& request,
+ const boost::optional<RuntimeConstants>& constants,
BSONObj collationObj) {
+ if (constants) {
+ cmdForShards[AggregationRequest::kRuntimeConstants] = Value(constants.get().toBSON());
+ }
+
cmdForShards[AggregationRequest::kFromMongosName] = Value(true);
// If this is a request for an aggregation explain, then we must wrap the aggregate inside an
// explain command.
@@ -158,8 +163,8 @@ BSONObj createCommandForTargetedShards(
const cluster_aggregation_planner::SplitPipeline& splitPipeline,
const BSONObj collationObj,
const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ const boost::optional<RuntimeConstants>& constants,
bool needsMerge) {
-
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
// If we've parsed a pipeline on mongos, always override the pipeline, in case parsing it
@@ -196,7 +201,7 @@ BSONObj createCommandForTargetedShards(
exchangeSpec ? Value(exchangeSpec->exchangeSpec.toBSON()) : Value();
return genericTransformForShards(
- std::move(targetedCmd), opCtx, boost::none, request, collationObj);
+ std::move(targetedCmd), opCtx, request, constants, collationObj);
}
/**
@@ -270,10 +275,16 @@ DispatchShardPipelineResults dispatchShardPipeline(
// Generate the command object for the targeted shards.
BSONObj targetedCommand = splitPipeline
- ? createCommandForTargetedShards(
- opCtx, aggRequest, litePipe, *splitPipeline, collationObj, exchangeSpec, true)
+ ? createCommandForTargetedShards(opCtx,
+ aggRequest,
+ litePipe,
+ *splitPipeline,
+ collationObj,
+ exchangeSpec,
+ expCtx->getRuntimeConstants(),
+ true)
: createPassthroughCommandForShard(
- opCtx, aggRequest, boost::none, pipeline.get(), collationObj);
+ opCtx, aggRequest, expCtx->getRuntimeConstants(), pipeline.get(), collationObj);
// Refresh the shard registry if we're targeting all shards. We need the shard registry
// to be at least as current as the logical time used when creating the command for
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index f1dc2c2d91e..861e1acb84c 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -110,18 +110,19 @@ BSONObj createCommandForTargetedShards(
const cluster_aggregation_planner::SplitPipeline& splitPipeline,
const BSONObj collationObj,
const boost::optional<cluster_aggregation_planner::ShardedExchangePolicy> exchangeSpec,
+ const boost::optional<RuntimeConstants>& constants,
bool needsMerge);
BSONObj createPassthroughCommandForShard(OperationContext* opCtx,
const AggregationRequest& request,
- const boost::optional<ShardId>& shardId,
+ const boost::optional<RuntimeConstants>& constants,
Pipeline* pipeline,
BSONObj collationObj);
BSONObj genericTransformForShards(MutableDocument&& cmdForShards,
OperationContext* opCtx,
- const boost::optional<ShardId>& shardId,
const AggregationRequest& request,
+ const boost::optional<RuntimeConstants>& constants,
BSONObj collationObj);
/**
diff --git a/src/mongo/db/pipeline/variables.cpp b/src/mongo/db/pipeline/variables.cpp
index 13f2344e311..82a62224784 100644
--- a/src/mongo/db/pipeline/variables.cpp
+++ b/src/mongo/db/pipeline/variables.cpp
@@ -27,18 +27,21 @@
* it in the license file.
*/
-#include "mongo/platform/basic.h"
-
#include "mongo/db/pipeline/variables.h"
+#include "mongo/db/client.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/platform/basic.h"
+#include "mongo/platform/random.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/time_support.h"
namespace mongo {
constexpr Variables::Id Variables::kRootId;
constexpr Variables::Id Variables::kRemoveId;
-const StringMap<Variables::Id> Variables::kBuiltinVarNameToId = {{"ROOT", kRootId},
- {"REMOVE", kRemoveId}};
+const StringMap<Variables::Id> Variables::kBuiltinVarNameToId = {
+ {"ROOT", kRootId}, {"REMOVE", kRemoveId}, {"NOW", kNowId}, {"CLUSTER_TIME", kClusterTimeId}};
void Variables::uassertValidNameForUserWrite(StringData varName) {
// System variables users allowed to write to (currently just one)
@@ -141,6 +144,16 @@ Value Variables::getValue(Id id, const Document& root) const {
return Value(root);
case Variables::kRemoveId:
return Value();
+ case Variables::kNowId:
+ case Variables::kClusterTimeId:
+ if (auto it = _runtimeConstants.find(id); it != _runtimeConstants.end()) {
+ return it->second;
+ }
+
+ uasserted(51144,
+ str::stream() << "Buildin variable $$" << getBuiltinVariableName(id)
+ << " is not available.");
+ MONGO_UNREACHABLE;
default:
MONGO_UNREACHABLE;
}
@@ -162,6 +175,40 @@ Document Variables::getDocument(Id id, const Document& root) const {
return Document();
}
+RuntimeConstants Variables::getRuntimeConstants() const {
+ RuntimeConstants constants;
+
+ if (auto it = _runtimeConstants.find(kNowId); it != _runtimeConstants.end()) {
+ constants.setLocalNow(it->second.getDate());
+ }
+ if (auto it = _runtimeConstants.find(kClusterTimeId); it != _runtimeConstants.end()) {
+ constants.setClusterTime(it->second.getTimestamp());
+ }
+
+ return constants;
+}
+
+void Variables::setRuntimeConstants(const RuntimeConstants& constants) {
+ _runtimeConstants[kNowId] = Value(constants.getLocalNow());
+ _runtimeConstants[kClusterTimeId] = Value(constants.getClusterTime());
+}
+
+void Variables::generateRuntimeConstants(OperationContext* opCtx) {
+ _runtimeConstants[kNowId] = Value(Date_t::now());
+
+ if (opCtx->getClient()) {
+ if (auto logicalClock = LogicalClock::get(opCtx); logicalClock) {
+ auto clusterTime = logicalClock->getClusterTime();
+
+ // On a standalone mongod the logical clock may not be running and $$CLUSTER_TIME is not
+ // available.
+ if (clusterTime != LogicalTime::kUninitialized) {
+ _runtimeConstants[kClusterTimeId] = Value(clusterTime.asTimestamp());
+ }
+ }
+ }
+}
+
Variables::Id VariablesParseState::defineVariable(StringData name) {
// Caller should have validated before hand by using Variables::uassertValidNameForUserWrite.
massert(17275,
diff --git a/src/mongo/db/pipeline/variables.h b/src/mongo/db/pipeline/variables.h
index c681e9ace14..22a0e88526f 100644
--- a/src/mongo/db/pipeline/variables.h
+++ b/src/mongo/db/pipeline/variables.h
@@ -29,7 +29,9 @@
#pragma once
+#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/document.h"
+#include "mongo/db/pipeline/runtime_constants_gen.h"
#include "mongo/stdx/memory.h"
#include "mongo/stdx/unordered_map.h"
#include "mongo/util/string_map.h"
@@ -73,6 +75,8 @@ public:
// Ids for builtin variables.
static constexpr Variables::Id kRootId = Id(-1);
static constexpr Variables::Id kRemoveId = Id(-2);
+ static constexpr Variables::Id kNowId = Id(-3);
+ static constexpr Variables::Id kClusterTimeId = Id(-4);
// Map from builtin var name to reserved id number.
static const StringMap<Id> kBuiltinVarNameToId;
@@ -119,6 +123,21 @@ public:
return &_idGenerator;
}
+ /**
+ * Serializes runtime constants. This is used to send the constants to shards.
+ */
+ RuntimeConstants getRuntimeConstants() const;
+
+ /**
+ * Deserialize runtime constants.
+ */
+ void setRuntimeConstants(const RuntimeConstants& constants);
+
+ /**
+ * Generate values that must be constant during the execution.
+ */
+ void generateRuntimeConstants(OperationContext* opCtx);
+
private:
struct ValueAndState {
ValueAndState() = default;
@@ -131,8 +150,18 @@ private:
void setValue(Id id, const Value& value, bool isConstant);
+ static auto getBuiltinVariableName(Variables::Id variable) {
+ for (auto & [ name, id ] : kBuiltinVarNameToId) {
+ if (variable == id) {
+ return name;
+ }
+ }
+ return std::string();
+ }
+
IdGenerator _idGenerator;
std::vector<ValueAndState> _valueList;
+ stdx::unordered_map<Id, Value> _runtimeConstants;
};
/**
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index e683ae162de..9149ae33530 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -113,6 +113,9 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
mergeCmd[AggregationRequest::kFromMongosName] = Value(true);
+ mergeCmd[AggregationRequest::kRuntimeConstants] =
+ Value(mergeCtx->getRuntimeConstants().toBSON());
+
// If the user didn't specify a collation already, make sure there's a collation attached to
// the merge command, since the merging shard may not have the collection metadata.
if (mergeCmd.peek()["collation"].missing()) {
@@ -178,8 +181,15 @@ sharded_agg_helpers::DispatchShardPipelineResults dispatchExchangeConsumerPipeli
consumerPipelines.emplace_back(std::move(consumerPipeline), nullptr, boost::none);
- auto consumerCmdObj = sharded_agg_helpers::createCommandForTargetedShards(
- opCtx, request, litePipe, consumerPipelines.back(), collationObj, boost::none, false);
+ auto consumerCmdObj =
+ sharded_agg_helpers::createCommandForTargetedShards(opCtx,
+ request,
+ litePipe,
+ consumerPipelines.back(),
+ collationObj,
+ boost::none,
+ expCtx->getRuntimeConstants(),
+ false);
requests.emplace_back(shardDispatchResults->exchangeSpec->consumerShards[idx],
consumerCmdObj);
@@ -704,6 +714,9 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
const PrivilegeVector& privileges,
BSONObjBuilder* result) {
uassert(51028, "Cannot specify exchange option to a mongos", !request.getExchangeSpec());
+ uassert(51143,
+ "Cannot specify runtime constants option to a mongos",
+ !request.getRuntimeConstants());
uassert(51089,
str::stream() << "Internal parameter(s) [" << AggregationRequest::kNeedsMergeName
<< ", "
@@ -870,7 +883,7 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// explain if necessary, and rewrites the result into a format safe to forward to shards.
BSONObj cmdObj = CommandHelpers::filterCommandRequestForPassthrough(
sharded_agg_helpers::createPassthroughCommandForShard(
- opCtx, aggRequest, shardId, nullptr, BSONObj()));
+ opCtx, aggRequest, boost::none, nullptr, BSONObj()));
MultiStatementTransactionRequestsSender ars(
opCtx,