summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-12-19 14:31:36 -0500
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-12-19 17:29:29 -0500
commitb5a2cc0fec6ac30b1a0196da5feb41d85a8b76c3 (patch)
tree4147e3b4ffbfea86eec107e7d6a6d777bded033d /src/mongo/s/commands
parentbd9c109958c1721767f5432683706c62ec90fe30 (diff)
downloadmongo-b5a2cc0fec6ac30b1a0196da5feb41d85a8b76c3.tar.gz
SERVER-32190 Make MongoProcessInterface always available
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp40
-rw-r--r--src/mongo/s/commands/pipeline_s.cpp300
-rw-r--r--src/mongo/s/commands/pipeline_s.h104
3 files changed, 222 insertions, 222 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index 2654463431a..7e7577dbdb3 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -102,8 +102,8 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v
Status appendExplainResults(
const std::vector<AsyncRequestsSender::Response>& shardResults,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging,
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards,
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging,
BSONObjBuilder* result) {
if (pipelineForTargetedShards->isSplitForShards()) {
*result << "mergeType"
@@ -202,7 +202,7 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx,
BSONObj createCommandForTargetedShards(
const AggregationRequest& request,
const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards) {
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) {
// Create the command for the shards.
MutableDocument targetedCmd(request.serializeToCommandObj());
targetedCmd[AggregationRequest::kFromMongosName] = Value(true);
@@ -237,7 +237,7 @@ BSONObj createCommandForMergingShard(
const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const BSONObj originalCmdObj,
- const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging) {
+ const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
mergeCmd["pipeline"] = Value(pipelineForMerging->serialize());
@@ -338,10 +338,10 @@ struct DispatchShardPipelineResults {
// The half of the pipeline that was sent to each shard, or the entire pipeline if there was
// only one shard targeted.
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards;
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards;
// The merging half of the pipeline if more than one shard was targeted, otherwise nullptr.
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
// The command object to send to the targeted shards.
BSONObj commandForTargetedShards;
@@ -358,7 +358,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
BSONObj originalCmdObj,
const AggregationRequest& aggRequest,
const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) {
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
// The process is as follows:
// - First, determine whether we need to target more than one shard. If so, we split the
// pipeline; if not, we retain the existing pipeline.
@@ -381,7 +381,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline(
const auto shardQuery = pipeline->getInitialQuery();
auto pipelineForTargetedShards = std::move(pipeline);
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging;
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging;
BSONObj targetedCommand;
int numAttempts = 0;
@@ -520,17 +520,13 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs
return {{std::move(mergingShardId), std::move(shardCmdResponse)}};
}
-BSONObj establishMergingMongosCursor(
- OperationContext* opCtx,
- const AggregationRequest& request,
- const NamespaceString& requestedNss,
- BSONObj cmdToRunOnNewShards,
- const LiteParsedPipeline& liteParsedPipeline,
- std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging,
- std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
-
- // Inject the MongosProcessInterface for sources which need it.
- PipelineS::injectMongosInterface(pipelineForMerging.get());
+BSONObj establishMergingMongosCursor(OperationContext* opCtx,
+ const AggregationRequest& request,
+ const NamespaceString& requestedNss,
+ BSONObj cmdToRunOnNewShards,
+ const LiteParsedPipeline& liteParsedPipeline,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging,
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors) {
ClusterClientCursorParams params(
requestedNss,
@@ -732,7 +728,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
}
boost::intrusive_ptr<ExpressionContext> mergeCtx =
- new ExpressionContext(opCtx, request, std::move(collation), std::move(resolvedNamespaces));
+ new ExpressionContext(opCtx,
+ request,
+ std::move(collation),
+ std::make_shared<PipelineS::MongoSProcessInterface>(),
+ std::move(resolvedNamespaces));
mergeCtx->inMongos = true;
// explicitly *not* setting mergeCtx->tempDir
diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp
index ef75292be37..8c198771e2b 100644
--- a/src/mongo/s/commands/pipeline_s.cpp
+++ b/src/mongo/s/commands/pipeline_s.cpp
@@ -39,6 +39,7 @@
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/establish_cursors.h"
+#include "mongo/s/query/router_exec_stage.h"
namespace mongo {
@@ -86,210 +87,109 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo(
return swRoutingInfo;
}
-/**
- * Class to provide access to mongos-specific implementations of methods required by some document
- * sources.
- */
-class MongosProcessInterface final
- : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface {
-public:
- MongosProcessInterface(const intrusive_ptr<ExpressionContext>& expCtx)
- : _expCtx(expCtx), _opCtx(_expCtx->opCtx) {}
-
- virtual ~MongosProcessInterface() = default;
-
- void setOperationContext(OperationContext* opCtx) final {
- invariant(_expCtx->opCtx == opCtx);
- _opCtx = opCtx;
- }
-
- DBClientBase* directClient() final {
- MONGO_UNREACHABLE;
- }
-
- bool isSharded(const NamespaceString& ns) final {
- MONGO_UNREACHABLE;
- }
-
- BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final {
- MONGO_UNREACHABLE;
- }
-
- CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) final {
- MONGO_UNREACHABLE;
- }
-
- void appendLatencyStats(const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- Status appendStorageStats(const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const final {
- MONGO_UNREACHABLE;
- }
-
- BSONObj getCollectionOptions(const NamespaceString& nss) final {
- MONGO_UNREACHABLE;
- }
-
- Status renameIfOptionsAndIndexesHaveNotChanged(
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) final {
- MONGO_UNREACHABLE;
- }
-
- StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) final {
- MONGO_UNREACHABLE;
- }
-
- Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- Pipeline* pipeline) final {
- MONGO_UNREACHABLE;
- }
-
- std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode,
- CurrentOpUserMode userMode,
- CurrentOpTruncateMode truncateMode) const final {
- MONGO_UNREACHABLE;
- }
-
- std::string getShardName(OperationContext* opCtx) const final {
- MONGO_UNREACHABLE;
- }
-
- std::vector<FieldPath> collectDocumentKeyFields(UUID) const final {
- MONGO_UNREACHABLE;
- }
-
- boost::optional<Document> lookupSingleDocument(const NamespaceString& nss,
- UUID collectionUUID,
- const Document& filter,
- boost::optional<BSONObj> readConcern) final {
- auto foreignExpCtx = _expCtx->copyWith(nss, collectionUUID);
+} // namespace
- // Create the find command to be dispatched to the shard in order to return the post-change
- // document.
- auto filterObj = filter.toBson();
- BSONObjBuilder cmdBuilder;
- bool findCmdIsByUuid(foreignExpCtx->uuid);
- if (findCmdIsByUuid) {
- foreignExpCtx->uuid->appendToBuilder(&cmdBuilder, "find");
- } else {
- cmdBuilder.append("find", nss.coll());
+boost::optional<Document> PipelineS::MongoSProcessInterface::lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& filter,
+ boost::optional<BSONObj> readConcern) {
+ auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID);
+
+ // Create the find command to be dispatched to the shard in order to return the post-change
+ // document.
+ auto filterObj = filter.toBson();
+ BSONObjBuilder cmdBuilder;
+ bool findCmdIsByUuid(foreignExpCtx->uuid);
+ if (findCmdIsByUuid) {
+ foreignExpCtx->uuid->appendToBuilder(&cmdBuilder, "find");
+ } else {
+ cmdBuilder.append("find", nss.coll());
+ }
+ cmdBuilder.append("filter", filterObj);
+ cmdBuilder.append("comment", expCtx->comment);
+ if (readConcern) {
+ cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
+ }
+
+ auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
+ auto findCmd = cmdBuilder.obj();
+ size_t numAttempts = 0;
+ do {
+ // Verify that the collection exists, with the correct UUID.
+ auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
+ auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx);
+ if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
}
- cmdBuilder.append("filter", filterObj);
- cmdBuilder.append("comment", _expCtx->comment);
- if (readConcern) {
- cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
+ auto routingInfo = uassertStatusOK(std::move(swRoutingInfo));
+ if (findCmdIsByUuid && routingInfo.cm()) {
+ // Find by UUID and shard versioning do not work together (SERVER-31946). In the
+ // sharded case we've already checked the UUID, so find by namespace is safe. In the
+ // unlikely case that the collection has been deleted and a new collection with the same
+ // name created through a different mongos, the shard version will be detected as stale,
+ // as shard versions contain an 'epoch' field unique to the collection.
+ findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement());
+ findCmdIsByUuid = false;
}
- auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>();
- auto findCmd = cmdBuilder.obj();
- size_t numAttempts = 0;
- do {
- // Verify that the collection exists, with the correct UUID.
- auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache();
- auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx);
- if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
- return boost::none;
- }
- auto routingInfo = uassertStatusOK(std::move(swRoutingInfo));
- if (findCmdIsByUuid && routingInfo.cm()) {
- // Find by UUID and shard versioning do not work together (SERVER-31946). In the
- // sharded case we've already checked the UUID, so find by namespace is safe. In
- // the unlikely case that the collection has been deleted and a new collection with
- // the same name created through a different mongos, the shard version will be
- // detected as stale, as shard versions contain an 'epoch' field unique to the
- // collection.
- findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement());
- findCmdIsByUuid = false;
- }
-
- // Get the ID and version of the single shard to which this query will be sent.
- auto shardInfo = getSingleTargetedShardForQuery(_expCtx->opCtx, routingInfo, filterObj);
-
- // Dispatch the request. This will only be sent to a single shard and only a single
- // result will be returned. The 'establishCursors' method conveniently prepares the
- // result into a cursor response for us.
- swShardResult = establishCursors(
- _expCtx->opCtx,
- Grid::get(_expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
- nss,
- ReadPreferenceSetting::get(_expCtx->opCtx),
- {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}},
- false,
- nullptr);
-
- // If it's an unsharded collection which has been deleted and re-created, we may get a
- // NamespaceNotFound error when looking up by UUID.
- if (swShardResult.getStatus().code() == ErrorCodes::NamespaceNotFound) {
- return boost::none;
- }
- // If we hit a stale shardVersion exception, invalidate the routing table cache.
- if (ErrorCodes::isStaleShardingError(swShardResult.getStatus().code())) {
- catalogCache->onStaleConfigError(std::move(routingInfo));
- }
- } while (!swShardResult.isOK() && ++numAttempts < kMaxNumStaleVersionRetries);
-
- auto shardResult = uassertStatusOK(std::move(swShardResult));
- invariant(shardResult.size() == 1u);
-
- auto& cursor = shardResult.front().cursorResponse;
- auto& batch = cursor.getBatch();
-
- // We should have at most 1 result, and the cursor should be exhausted.
- uassert(ErrorCodes::InternalError,
- str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.front().hostAndPort
- << ", id: "
- << cursor.getCursorId(),
- cursor.getCursorId() == 0);
- uassert(ErrorCodes::TooManyMatchingDocuments,
- str::stream() << "found more than one document matching " << filter.toString()
- << " ["
- << batch.begin()->toString()
- << ", "
- << std::next(batch.begin())->toString()
- << "]",
- batch.size() <= 1u);
-
- return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{});
- }
-
- std::vector<GenericCursor> getCursors(const intrusive_ptr<ExpressionContext>& expCtx) const {
- invariant(hasGlobalServiceContext());
- auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager();
- invariant(cursorManager);
- return cursorManager->getAllCursors();
- }
-
-private:
- intrusive_ptr<ExpressionContext> _expCtx;
- OperationContext* _opCtx;
-};
-} // namespace
-
-void PipelineS::injectMongosInterface(Pipeline* pipeline) {
- const auto& sources = pipeline->getSources();
- for (auto&& source : sources) {
- if (auto needsMongos =
- dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get())) {
- needsMongos->injectMongoProcessInterface(
- std::make_shared<MongosProcessInterface>(pipeline->getContext()));
+ // Get the ID and version of the single shard to which this query will be sent.
+ auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj);
+
+ // Dispatch the request. This will only be sent to a single shard and only a single result
+ // will be returned. The 'establishCursors' method conveniently prepares the result into a
+ // cursor response for us.
+ swShardResult =
+ establishCursors(expCtx->opCtx,
+ Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ nss,
+ ReadPreferenceSetting::get(expCtx->opCtx),
+ {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}},
+ false,
+ nullptr);
+
+ // If it's an unsharded collection which has been deleted and re-created, we may get a
+ // NamespaceNotFound error when looking up by UUID.
+ if (swShardResult.getStatus().code() == ErrorCodes::NamespaceNotFound) {
+ return boost::none;
}
- }
+ // If we hit a stale shardVersion exception, invalidate the routing table cache.
+ if (ErrorCodes::isStaleShardingError(swShardResult.getStatus().code())) {
+ catalogCache->onStaleConfigError(std::move(routingInfo));
+ }
+ } while (!swShardResult.isOK() && ++numAttempts < kMaxNumStaleVersionRetries);
+
+ auto shardResult = uassertStatusOK(std::move(swShardResult));
+ invariant(shardResult.size() == 1u);
+
+ auto& cursor = shardResult.front().cursorResponse;
+ auto& batch = cursor.getBatch();
+
+ // We should have at most 1 result, and the cursor should be exhausted.
+ uassert(ErrorCodes::InternalError,
+ str::stream() << "Shard cursor was unexpectedly open after lookup: "
+ << shardResult.front().hostAndPort
+ << ", id: "
+ << cursor.getCursorId(),
+ cursor.getCursorId() == 0);
+ uassert(ErrorCodes::TooManyMatchingDocuments,
+ str::stream() << "found more than one document matching " << filter.toString() << " ["
+ << batch.begin()->toString()
+ << ", "
+ << std::next(batch.begin())->toString()
+ << "]",
+ batch.size() <= 1u);
+
+ return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{});
}
+
+std::vector<GenericCursor> PipelineS::MongoSProcessInterface::getCursors(
+ const intrusive_ptr<ExpressionContext>& expCtx) const {
+ invariant(hasGlobalServiceContext());
+ auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager();
+ invariant(cursorManager);
+ return cursorManager->getAllCursors();
+}
+
} // namespace mongo
diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h
index ca4bdc58270..173242e7653 100644
--- a/src/mongo/s/commands/pipeline_s.h
+++ b/src/mongo/s/commands/pipeline_s.h
@@ -28,6 +28,7 @@
#pragma once
+#include "mongo/db/pipeline/mongo_process_interface.h"
#include "mongo/db/pipeline/pipeline.h"
namespace mongo {
@@ -39,9 +40,108 @@ namespace mongo {
class PipelineS {
public:
/**
- * Injects a MongosProcessInterface into stages which require access to its functionality.
+ * Class to provide access to mongos-specific implementations of methods required by some
+ * document sources.
*/
- static void injectMongosInterface(Pipeline* pipeline);
+ class MongoSProcessInterface final : public MongoProcessInterface {
+ public:
+ MongoSProcessInterface() = default;
+
+ virtual ~MongoSProcessInterface() = default;
+
+ void setOperationContext(OperationContext* opCtx) final {}
+
+ DBClientBase* directClient() final {
+ MONGO_UNREACHABLE;
+ }
+
+ bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& ns,
+ const std::vector<BSONObj>& objs) final {
+ MONGO_UNREACHABLE;
+ }
+
+ CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
+ const NamespaceString& ns) final {
+ MONGO_UNREACHABLE;
+ }
+
+ void appendLatencyStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool includeHistograms,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendStorageStats(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const BSONObj& param,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status appendRecordCount(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BSONObjBuilder* builder) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ BSONObj getCollectionOptions(const NamespaceString& nss) final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status renameIfOptionsAndIndexesHaveNotChanged(
+ OperationContext* opCtx,
+ const BSONObj& renameCommandObj,
+ const NamespaceString& targetNs,
+ const BSONObj& originalCollectionOptions,
+ const std::list<BSONObj>& originalIndexes) final {
+ MONGO_UNREACHABLE;
+ }
+
+ Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ Pipeline* pipeline) final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::vector<BSONObj> getCurrentOps(OperationContext* opCtx,
+ CurrentOpConnectionsMode connMode,
+ CurrentOpUserMode userMode,
+ CurrentOpTruncateMode truncateMode) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::string getShardName(OperationContext* opCtx) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ std::vector<FieldPath> collectDocumentKeyFields(OperationContext*,
+ const NamespaceString&,
+ UUID) const final {
+ MONGO_UNREACHABLE;
+ }
+
+ StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline(
+ const std::vector<BSONObj>& rawPipeline,
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const MakePipelineOptions pipelineOptions) final {
+ MONGO_UNREACHABLE;
+ }
+
+ boost::optional<Document> lookupSingleDocument(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss,
+ UUID collectionUUID,
+ const Document& documentKey,
+ boost::optional<BSONObj> readConcern) final;
+
+ std::vector<GenericCursor> getCursors(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx) const final;
+ };
private:
PipelineS() = delete; // Should never be instantiated.