diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-12-19 14:31:36 -0500 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-12-19 17:29:29 -0500 |
commit | b5a2cc0fec6ac30b1a0196da5feb41d85a8b76c3 (patch) | |
tree | 4147e3b4ffbfea86eec107e7d6a6d777bded033d /src/mongo/s/commands | |
parent | bd9c109958c1721767f5432683706c62ec90fe30 (diff) | |
download | mongo-b5a2cc0fec6ac30b1a0196da5feb41d85a8b76c3.tar.gz |
SERVER-32190 Make MongoProcessInterface always available
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 40 | ||||
-rw-r--r-- | src/mongo/s/commands/pipeline_s.cpp | 300 | ||||
-rw-r--r-- | src/mongo/s/commands/pipeline_s.h | 104 |
3 files changed, 222 insertions, 222 deletions
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 2654463431a..7e7577dbdb3 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -102,8 +102,8 @@ Document wrapAggAsExplain(Document aggregateCommand, ExplainOptions::Verbosity v Status appendExplainResults( const std::vector<AsyncRequestsSender::Response>& shardResults, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging, + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards, + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging, BSONObjBuilder* result) { if (pipelineForTargetedShards->isSplitForShards()) { *result << "mergeType" @@ -202,7 +202,7 @@ std::set<ShardId> getTargetedShards(OperationContext* opCtx, BSONObj createCommandForTargetedShards( const AggregationRequest& request, const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForTargetedShards) { + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForTargetedShards) { // Create the command for the shards. MutableDocument targetedCmd(request.serializeToCommandObj()); targetedCmd[AggregationRequest::kFromMongosName] = Value(true); @@ -237,7 +237,7 @@ BSONObj createCommandForMergingShard( const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const BSONObj originalCmdObj, - const std::unique_ptr<Pipeline, Pipeline::Deleter>& pipelineForMerging) { + const std::unique_ptr<Pipeline, PipelineDeleter>& pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); mergeCmd["pipeline"] = Value(pipelineForMerging->serialize()); @@ -338,10 +338,10 @@ struct DispatchShardPipelineResults { // The half of the pipeline that was sent to each shard, or the entire pipeline if there was // only one shard targeted. - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForTargetedShards; + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForTargetedShards; // The merging half of the pipeline if more than one shard was targeted, otherwise nullptr. - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; // The command object to send to the targeted shards. BSONObj commandForTargetedShards; @@ -358,7 +358,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( BSONObj originalCmdObj, const AggregationRequest& aggRequest, const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, Pipeline::Deleter> pipeline) { + std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { // The process is as follows: // - First, determine whether we need to target more than one shard. If so, we split the // pipeline; if not, we retain the existing pipeline. @@ -381,7 +381,7 @@ StatusWith<DispatchShardPipelineResults> dispatchShardPipeline( const auto shardQuery = pipeline->getInitialQuery(); auto pipelineForTargetedShards = std::move(pipeline); - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging; + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging; BSONObj targetedCommand; int numAttempts = 0; @@ -520,17 +520,13 @@ StatusWith<std::pair<ShardId, Shard::CommandResponse>> establishMergingShardCurs return {{std::move(mergingShardId), std::move(shardCmdResponse)}}; } -BSONObj establishMergingMongosCursor( - OperationContext* opCtx, - const AggregationRequest& request, - const NamespaceString& requestedNss, - BSONObj cmdToRunOnNewShards, - const LiteParsedPipeline& liteParsedPipeline, - std::unique_ptr<Pipeline, Pipeline::Deleter> pipelineForMerging, - std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { - - // Inject the MongosProcessInterface for sources which need it. - PipelineS::injectMongosInterface(pipelineForMerging.get()); +BSONObj establishMergingMongosCursor(OperationContext* opCtx, + const AggregationRequest& request, + const NamespaceString& requestedNss, + BSONObj cmdToRunOnNewShards, + const LiteParsedPipeline& liteParsedPipeline, + std::unique_ptr<Pipeline, PipelineDeleter> pipelineForMerging, + std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { ClusterClientCursorParams params( requestedNss, @@ -732,7 +728,11 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, } boost::intrusive_ptr<ExpressionContext> mergeCtx = - new ExpressionContext(opCtx, request, std::move(collation), std::move(resolvedNamespaces)); + new ExpressionContext(opCtx, + request, + std::move(collation), + std::make_shared<PipelineS::MongoSProcessInterface>(), + std::move(resolvedNamespaces)); mergeCtx->inMongos = true; // explicitly *not* setting mergeCtx->tempDir diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp index ef75292be37..8c198771e2b 100644 --- a/src/mongo/s/commands/pipeline_s.cpp +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -39,6 +39,7 @@ #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/establish_cursors.h" +#include "mongo/s/query/router_exec_stage.h" namespace mongo { @@ -86,210 +87,109 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( return swRoutingInfo; } -/** - * Class to provide access to mongos-specific implementations of methods required by some document - * sources. - */ -class MongosProcessInterface final - : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { -public: - MongosProcessInterface(const intrusive_ptr<ExpressionContext>& expCtx) - : _expCtx(expCtx), _opCtx(_expCtx->opCtx) {} - - virtual ~MongosProcessInterface() = default; - - void setOperationContext(OperationContext* opCtx) final { - invariant(_expCtx->opCtx == opCtx); - _opCtx = opCtx; - } - - DBClientBase* directClient() final { - MONGO_UNREACHABLE; - } - - bool isSharded(const NamespaceString& ns) final { - MONGO_UNREACHABLE; - } - - BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final { - MONGO_UNREACHABLE; - } - - CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) final { - MONGO_UNREACHABLE; - } - - void appendLatencyStats(const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - Status appendStorageStats(const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const final { - MONGO_UNREACHABLE; - } - - BSONObj getCollectionOptions(const NamespaceString& nss) final { - MONGO_UNREACHABLE; - } - - Status renameIfOptionsAndIndexesHaveNotChanged( - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) final { - MONGO_UNREACHABLE; - } - - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { - MONGO_UNREACHABLE; - } - - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) final { - MONGO_UNREACHABLE; - } - - std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, - CurrentOpUserMode userMode, - CurrentOpTruncateMode truncateMode) const final { - MONGO_UNREACHABLE; - } - - std::string getShardName(OperationContext* opCtx) const final { - MONGO_UNREACHABLE; - } - - std::vector<FieldPath> collectDocumentKeyFields(UUID) const final { - MONGO_UNREACHABLE; - } - - boost::optional<Document> lookupSingleDocument(const NamespaceString& nss, - UUID collectionUUID, - const Document& filter, - boost::optional<BSONObj> readConcern) final { - auto foreignExpCtx = _expCtx->copyWith(nss, collectionUUID); +} // namespace - // Create the find command to be dispatched to the shard in order to return the post-change - // document. - auto filterObj = filter.toBson(); - BSONObjBuilder cmdBuilder; - bool findCmdIsByUuid(foreignExpCtx->uuid); - if (findCmdIsByUuid) { - foreignExpCtx->uuid->appendToBuilder(&cmdBuilder, "find"); - } else { - cmdBuilder.append("find", nss.coll()); +boost::optional<Document> PipelineS::MongoSProcessInterface::lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& filter, + boost::optional<BSONObj> readConcern) { + auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID); + + // Create the find command to be dispatched to the shard in order to return the post-change + // document. + auto filterObj = filter.toBson(); + BSONObjBuilder cmdBuilder; + bool findCmdIsByUuid(foreignExpCtx->uuid); + if (findCmdIsByUuid) { + foreignExpCtx->uuid->appendToBuilder(&cmdBuilder, "find"); + } else { + cmdBuilder.append("find", nss.coll()); + } + cmdBuilder.append("filter", filterObj); + cmdBuilder.append("comment", expCtx->comment); + if (readConcern) { + cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); + } + + auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); + auto findCmd = cmdBuilder.obj(); + size_t numAttempts = 0; + do { + // Verify that the collection exists, with the correct UUID. + auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx); + if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { + return boost::none; } - cmdBuilder.append("filter", filterObj); - cmdBuilder.append("comment", _expCtx->comment); - if (readConcern) { - cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); + auto routingInfo = uassertStatusOK(std::move(swRoutingInfo)); + if (findCmdIsByUuid && routingInfo.cm()) { + // Find by UUID and shard versioning do not work together (SERVER-31946). In the + // sharded case we've already checked the UUID, so find by namespace is safe. In the + // unlikely case that the collection has been deleted and a new collection with the same + // name created through a different mongos, the shard version will be detected as stale, + // as shard versions contain an 'epoch' field unique to the collection. + findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement()); + findCmdIsByUuid = false; } - auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); - auto findCmd = cmdBuilder.obj(); - size_t numAttempts = 0; - do { - // Verify that the collection exists, with the correct UUID. - auto catalogCache = Grid::get(_expCtx->opCtx)->catalogCache(); - auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx); - if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { - return boost::none; - } - auto routingInfo = uassertStatusOK(std::move(swRoutingInfo)); - if (findCmdIsByUuid && routingInfo.cm()) { - // Find by UUID and shard versioning do not work together (SERVER-31946). In the - // sharded case we've already checked the UUID, so find by namespace is safe. In - // the unlikely case that the collection has been deleted and a new collection with - // the same name created through a different mongos, the shard version will be - // detected as stale, as shard versions contain an 'epoch' field unique to the - // collection. - findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement()); - findCmdIsByUuid = false; - } - - // Get the ID and version of the single shard to which this query will be sent. - auto shardInfo = getSingleTargetedShardForQuery(_expCtx->opCtx, routingInfo, filterObj); - - // Dispatch the request. This will only be sent to a single shard and only a single - // result will be returned. The 'establishCursors' method conveniently prepares the - // result into a cursor response for us. - swShardResult = establishCursors( - _expCtx->opCtx, - Grid::get(_expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), - nss, - ReadPreferenceSetting::get(_expCtx->opCtx), - {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}}, - false, - nullptr); - - // If it's an unsharded collection which has been deleted and re-created, we may get a - // NamespaceNotFound error when looking up by UUID. - if (swShardResult.getStatus().code() == ErrorCodes::NamespaceNotFound) { - return boost::none; - } - // If we hit a stale shardVersion exception, invalidate the routing table cache. - if (ErrorCodes::isStaleShardingError(swShardResult.getStatus().code())) { - catalogCache->onStaleConfigError(std::move(routingInfo)); - } - } while (!swShardResult.isOK() && ++numAttempts < kMaxNumStaleVersionRetries); - - auto shardResult = uassertStatusOK(std::move(swShardResult)); - invariant(shardResult.size() == 1u); - - auto& cursor = shardResult.front().cursorResponse; - auto& batch = cursor.getBatch(); - - // We should have at most 1 result, and the cursor should be exhausted. - uassert(ErrorCodes::InternalError, - str::stream() << "Shard cursor was unexpectedly open after lookup: " - << shardResult.front().hostAndPort - << ", id: " - << cursor.getCursorId(), - cursor.getCursorId() == 0); - uassert(ErrorCodes::TooManyMatchingDocuments, - str::stream() << "found more than one document matching " << filter.toString() - << " [" - << batch.begin()->toString() - << ", " - << std::next(batch.begin())->toString() - << "]", - batch.size() <= 1u); - - return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{}); - } - - std::vector<GenericCursor> getCursors(const intrusive_ptr<ExpressionContext>& expCtx) const { - invariant(hasGlobalServiceContext()); - auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager(); - invariant(cursorManager); - return cursorManager->getAllCursors(); - } - -private: - intrusive_ptr<ExpressionContext> _expCtx; - OperationContext* _opCtx; -}; -} // namespace - -void PipelineS::injectMongosInterface(Pipeline* pipeline) { - const auto& sources = pipeline->getSources(); - for (auto&& source : sources) { - if (auto needsMongos = - dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get())) { - needsMongos->injectMongoProcessInterface( - std::make_shared<MongosProcessInterface>(pipeline->getContext())); + // Get the ID and version of the single shard to which this query will be sent. + auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj); + + // Dispatch the request. This will only be sent to a single shard and only a single result + // will be returned. The 'establishCursors' method conveniently prepares the result into a + // cursor response for us. + swShardResult = + establishCursors(expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + ReadPreferenceSetting::get(expCtx->opCtx), + {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}}, + false, + nullptr); + + // If it's an unsharded collection which has been deleted and re-created, we may get a + // NamespaceNotFound error when looking up by UUID. + if (swShardResult.getStatus().code() == ErrorCodes::NamespaceNotFound) { + return boost::none; } - } + // If we hit a stale shardVersion exception, invalidate the routing table cache. + if (ErrorCodes::isStaleShardingError(swShardResult.getStatus().code())) { + catalogCache->onStaleConfigError(std::move(routingInfo)); + } + } while (!swShardResult.isOK() && ++numAttempts < kMaxNumStaleVersionRetries); + + auto shardResult = uassertStatusOK(std::move(swShardResult)); + invariant(shardResult.size() == 1u); + + auto& cursor = shardResult.front().cursorResponse; + auto& batch = cursor.getBatch(); + + // We should have at most 1 result, and the cursor should be exhausted. + uassert(ErrorCodes::InternalError, + str::stream() << "Shard cursor was unexpectedly open after lookup: " + << shardResult.front().hostAndPort + << ", id: " + << cursor.getCursorId(), + cursor.getCursorId() == 0); + uassert(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "found more than one document matching " << filter.toString() << " [" + << batch.begin()->toString() + << ", " + << std::next(batch.begin())->toString() + << "]", + batch.size() <= 1u); + + return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{}); } + +std::vector<GenericCursor> PipelineS::MongoSProcessInterface::getCursors( + const intrusive_ptr<ExpressionContext>& expCtx) const { + invariant(hasGlobalServiceContext()); + auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager(); + invariant(cursorManager); + return cursorManager->getAllCursors(); +} + } // namespace mongo diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h index ca4bdc58270..173242e7653 100644 --- a/src/mongo/s/commands/pipeline_s.h +++ b/src/mongo/s/commands/pipeline_s.h @@ -28,6 +28,7 @@ #pragma once +#include "mongo/db/pipeline/mongo_process_interface.h" #include "mongo/db/pipeline/pipeline.h" namespace mongo { @@ -39,9 +40,108 @@ namespace mongo { class PipelineS { public: /** - * Injects a MongosProcessInterface into stages which require access to its functionality. + * Class to provide access to mongos-specific implementations of methods required by some + * document sources. */ - static void injectMongosInterface(Pipeline* pipeline); + class MongoSProcessInterface final : public MongoProcessInterface { + public: + MongoSProcessInterface() = default; + + virtual ~MongoSProcessInterface() = default; + + void setOperationContext(OperationContext* opCtx) final {} + + DBClientBase* directClient() final { + MONGO_UNREACHABLE; + } + + bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final { + MONGO_UNREACHABLE; + } + + BSONObj insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& ns, + const std::vector<BSONObj>& objs) final { + MONGO_UNREACHABLE; + } + + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) final { + MONGO_UNREACHABLE; + } + + void appendLatencyStats(OperationContext* opCtx, + const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + Status appendStorageStats(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + Status appendRecordCount(OperationContext* opCtx, + const NamespaceString& nss, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + BSONObj getCollectionOptions(const NamespaceString& nss) final { + MONGO_UNREACHABLE; + } + + Status renameIfOptionsAndIndexesHaveNotChanged( + OperationContext* opCtx, + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) final { + MONGO_UNREACHABLE; + } + + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + MONGO_UNREACHABLE; + } + + std::vector<BSONObj> getCurrentOps(OperationContext* opCtx, + CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode truncateMode) const final { + MONGO_UNREACHABLE; + } + + std::string getShardName(OperationContext* opCtx) const final { + MONGO_UNREACHABLE; + } + + std::vector<FieldPath> collectDocumentKeyFields(OperationContext*, + const NamespaceString&, + UUID) const final { + MONGO_UNREACHABLE; + } + + StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) final { + MONGO_UNREACHABLE; + } + + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& documentKey, + boost::optional<BSONObj> readConcern) final; + + std::vector<GenericCursor> getCursors( + const boost::intrusive_ptr<ExpressionContext>& expCtx) const final; + }; private: PipelineS() = delete; // Should never be instantiated. |