diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-26 00:42:14 +0100 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-10-26 04:28:23 +0100 |
commit | 81c5724d9d11a1ff2b16937dc01e6439d9e0f266 (patch) | |
tree | 4935dcc0cd22ec06a810d3e28a48b6d2cdf8a53d /src/mongo | |
parent | b5c44441b27e5e4cda03d1ab5ca1f0af07b00835 (diff) | |
download | mongo-81c5724d9d11a1ff2b16937dc01e6439d9e0f266.tar.gz |
SERVER-31597 Refactor $changeStream post-update lookup
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/pipeline/document_source.h | 9 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/pipeline/stub_mongo_process_interface.h | 5 | ||||
-rw-r--r-- | src/mongo/s/commands/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 212 | ||||
-rw-r--r-- | src/mongo/s/commands/pipeline_s.cpp | 260 | ||||
-rw-r--r-- | src/mongo/s/commands/pipeline_s.h | 50 |
9 files changed, 386 insertions, 238 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 6b3754f5b8e..8e4278f040b 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -839,6 +839,15 @@ public: */ virtual std::vector<FieldPath> collectDocumentKeyFields(UUID) const = 0; + /** + * Returns zero or one documents matching the input filter, or throws if more than one match + * was found. The passed ExpressionContext may use a different namespace than the + * ExpressionContext used to construct the MongoProcessInterface. Returns boost::none if no + * matching documents were found, including cases where the given namespace does not exist. + */ + virtual boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const Document& filter) = 0; + // Add new methods as needed. }; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index 9535ad95bb2..058a959f588 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -68,6 +68,9 @@ DocumentSource::GetNextResult DocumentSourceLookupChangePostImage::getNext() { return input; } + // Temporarily remove any deadline from this operation to avoid timeout during lookup. + OperationContext::DeadlineStash deadlineStash(pExpCtx->opCtx); + MutableDocument output(input.releaseDocument()); output[kFullDocumentFieldName] = lookupPostImage(output.peek()); return output.freeze(); @@ -93,9 +96,10 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat // Make sure we have a well-formed input. auto nss = assertNamespaceMatches(updateOp); - auto documentKey = assertFieldHasType( - updateOp, DocumentSourceChangeStream::kDocumentKeyField, BSONType::Object); - auto matchSpec = BSON("$match" << documentKey); + auto documentKey = assertFieldHasType(updateOp, + DocumentSourceChangeStream::kDocumentKeyField, + BSONType::Object) + .getDocument(); // Extract the UUID from resume token and do change stream lookups by UUID. auto resumeToken = @@ -104,29 +108,11 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat // TODO SERVER-29134 we need to extract the namespace from the document and set them on the new // ExpressionContext if we're getting notifications from an entire database. auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getData().uuid); - auto pipelineStatus = _mongoProcessInterface->makePipeline({matchSpec}, foreignExpCtx); - if (pipelineStatus.getStatus() == ErrorCodes::NamespaceNotFound) { - // We couldn't find the collection with UUID, it may have been dropped. - return Value(BSONNULL); - } - auto pipeline = uassertStatusOK(std::move(pipelineStatus)); - - if (auto first = pipeline->getNext()) { - auto lookedUpDocument = Value(*first); - if (auto next = pipeline->getNext()) { - uasserted(40580, - str::stream() << "found more than document with documentKey " - << documentKey.toString() - << " while looking up post image after change: [" - << lookedUpDocument.toString() - << ", " - << next->toString() - << "]"); - } - return lookedUpDocument; - } - // We couldn't find it with the documentKey, it may have been deleted. - return Value(BSONNULL); + auto lookedUpDoc = _mongoProcessInterface->lookupSingleDocument(foreignExpCtx, documentKey); + + // Check whether the lookup returned any documents. Even if the lookup itself succeeded, it may + // not have returned any results if the document was deleted in the time since the update op. + return (lookedUpDoc ? Value(*lookedUpDoc) : Value(BSONNULL)); } } // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp index d9db1db38ce..05b92297359 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp @@ -90,7 +90,7 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { + const MakePipelineOptions opts = MakePipelineOptions{}) final { auto pipeline = Pipeline::parse(rawPipeline, expCtx); if (!pipeline.isOK()) { return pipeline.getStatus(); @@ -113,6 +113,27 @@ public: return Status::OK(); } + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const Document& filter) { + auto swPipeline = makePipeline({BSON("$match" << filter)}, expCtx); + if (swPipeline == ErrorCodes::NamespaceNotFound) { + return boost::none; + } + auto pipeline = uassertStatusOK(std::move(swPipeline)); + + auto lookedUpDocument = pipeline->getNext(); + if (auto next = pipeline->getNext()) { + uasserted(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "found more than one document matching " << filter.toString() + << " [" + << (*lookedUpDocument).toString() + << ", " + << (*next).toString() + << "]"); + } + return lookedUpDocument; + } + private: deque<DocumentSource::GetNextResult> _mockResults; }; @@ -248,7 +269,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni lookupChangeStage->injectMongoProcessInterface( std::make_shared<MockMongoProcessInterface>(std::move(foreignCollection))); - ASSERT_THROWS_CODE(lookupChangeStage->getNext(), AssertionException, 40580); + ASSERT_THROWS_CODE( + lookupChangeStage->getNext(), AssertionException, ErrorCodes::TooManyMatchingDocuments); } TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index d36b047af68..932b3e4f3e4 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -200,7 +200,7 @@ public: StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( const std::vector<BSONObj>& rawPipeline, const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) final { + const MakePipelineOptions opts = MakePipelineOptions{}) final { // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace // than the DocumentSource this MongodProcessInterface is injected into, but both // ExpressionContext instances should still have the same OperationContext. @@ -381,6 +381,27 @@ public: return result; } + boost::optional<Document> lookupSingleDocument(const intrusive_ptr<ExpressionContext>& expCtx, + const Document& filter) final { + auto swPipeline = makePipeline({BSON("$match" << filter)}, expCtx); + if (swPipeline == ErrorCodes::NamespaceNotFound) { + return boost::none; + } + auto pipeline = uassertStatusOK(std::move(swPipeline)); + + auto lookedUpDocument = pipeline->getNext(); + if (auto next = pipeline->getNext()) { + uasserted(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "found more than one document matching " << filter.toString() + << " [" + << lookedUpDocument->toString() + << ", " + << next->toString() + << "]"); + } + return lookedUpDocument; + } + private: intrusive_ptr<ExpressionContext> _ctx; DBDirectClient _client; diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index 1a1a72f1224..06e22b24bdd 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -117,5 +117,10 @@ public: std::vector<FieldPath> collectDocumentKeyFields(UUID) const override { MONGO_UNREACHABLE; } + + boost::optional<Document> lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const Document& filter) { + MONGO_UNREACHABLE; + } }; } // namespace mongo diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 68660c0b42f..23fe7a7a7d6 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -79,6 +79,7 @@ env.Library( 'cluster_write_cmd.cpp', 'commands_public.cpp', 'kill_sessions_remote.cpp', + 'pipeline_s.cpp', 'strategy.cpp', env.Idlc('cluster_multicast.idl')[0], ], diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index 602e3dac6b8..d4aac818133 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -54,16 +54,13 @@ #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/commands/cluster_commands_helpers.h" +#include "mongo/s/commands/pipeline_s.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/cluster_query_knobs.h" -#include "mongo/s/query/document_source_router_adapter.h" #include "mongo/s/query/establish_cursors.h" -#include "mongo/s/query/router_exec_stage.h" -#include "mongo/s/query/router_stage_merge.h" -#include "mongo/s/query/router_stage_pipeline.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" @@ -71,104 +68,6 @@ namespace mongo { namespace { - -/** - * Class to provide access to mongos-specific implementations of methods required by some document - * sources. - */ -class MongosProcessInterface final - : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { -public: - MongosProcessInterface(OperationContext* opCtx) : _opCtx(opCtx) {} - - virtual ~MongosProcessInterface() = default; - - void setOperationContext(OperationContext* opCtx) override { - _opCtx = opCtx; - } - - DBClientBase* directClient() override { - MONGO_UNREACHABLE; - } - - bool isSharded(const NamespaceString& ns) override { - MONGO_UNREACHABLE; - } - - BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) override { - MONGO_UNREACHABLE; - } - - CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) override { - MONGO_UNREACHABLE; - } - - void appendLatencyStats(const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const override { - MONGO_UNREACHABLE; - } - - Status appendStorageStats(const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const override { - MONGO_UNREACHABLE; - } - - Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const override { - MONGO_UNREACHABLE; - } - - BSONObj getCollectionOptions(const NamespaceString& nss) override { - MONGO_UNREACHABLE; - } - - Status renameIfOptionsAndIndexesHaveNotChanged( - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) override { - MONGO_UNREACHABLE; - } - - /** - * Constructs an executable pipeline targeted to a remote shard. Returns - * ErrorCodes::InternalError if 'rawPipeline' specifies a pipeline that does not target a single - * shard. - */ - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions = {}) override; - - Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, - Pipeline* pipeline) override { - MONGO_UNREACHABLE; - } - - std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, - CurrentOpUserMode userMode, - CurrentOpTruncateMode truncateMode) const override { - MONGO_UNREACHABLE; - } - - std::string getShardName(OperationContext* opCtx) const override { - MONGO_UNREACHABLE; - } - - std::vector<FieldPath> collectDocumentKeyFields(UUID) const override { - MONGO_UNREACHABLE; - } - -private: - StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipelineWithOneRemote( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx); - - OperationContext* _opCtx; -}; - // Given a document representing an aggregation command such as // // {aggregate: "myCollection", pipeline: [], ...}, @@ -594,15 +493,8 @@ BSONObj establishMergingMongosCursor( std::vector<ClusterClientCursorParams::RemoteCursor> cursors) { // Inject the MongosProcessInterface for sources which need it. - const auto& sources = pipelineForMerging->getSources(); - for (auto&& source : sources) { - DocumentSourceNeedsMongoProcessInterface* needsMongoProcessInterface = - dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get()); - if (needsMongoProcessInterface) { - needsMongoProcessInterface->injectMongoProcessInterface( - std::make_shared<MongosProcessInterface>(opCtx)); - } - } + PipelineS::injectMongosInterface(pipelineForMerging.get()); + ClusterClientCursorParams params( requestedNss, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), @@ -683,104 +575,6 @@ BSONObj establishMergingMongosCursor( return cursorResponse.obj(); } -/** - * This is a special type of RouterExecStage that is used to iterate remote cursors that were - * created internally and do not represent a client cursor, such as those used in $changeStream's - * updateLookup functionality. - * - * The purpose of this class is to provide ownership over a ClusterClientCursorParams struct without - * creating a ClusterClientCursor, which would show up in the server stats for this mongos. - */ -class RouterStageInternalCursor final : public RouterExecStage { -public: - RouterStageInternalCursor(OperationContext* opCtx, - std::unique_ptr<ClusterClientCursorParams>&& params, - std::unique_ptr<RouterExecStage> child) - : RouterExecStage(opCtx, std::move(child)), _params(std::move(params)) {} - - StatusWith<ClusterQueryResult> next(ExecContext execContext) { - return getChildStage()->next(execContext); - } - -private: - std::unique_ptr<ClusterClientCursorParams> _params; -}; - -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> MongosProcessInterface::makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions pipelineOptions) { - // For the time being we don't expect any callers with options other than these. - invariant(pipelineOptions.optimize); - invariant(pipelineOptions.attachCursorSource); - invariant(!pipelineOptions.forceInjectMongoProcessInterface); - - // 'expCtx' may represent the settings for an aggregation pipeline on a different namespace - // than the DocumentSource this MongodImplementation is injected into, but both - // ExpressionContext instances should still have the same OperationContext. - invariant(_opCtx == expCtx->opCtx); - - // Explain is not supported for auxiliary lookups. - invariant(!expCtx->explain); - return makePipelineWithOneRemote(rawPipeline, expCtx); -} - -StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> -MongosProcessInterface::makePipelineWithOneRemote( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx) { - - // Temporarily remove any deadline from this operation, we don't want to timeout while doing - // this lookup. - OperationContext::DeadlineStash deadlineStash(expCtx->opCtx); - - // Generate the command object for the targeted shards. - AggregationRequest aggRequest(expCtx->ns, rawPipeline); - LiteParsedPipeline liteParsedPipeline(aggRequest); - auto parsedPipeline = Pipeline::parse(rawPipeline, expCtx); - if (!parsedPipeline.isOK()) { - return parsedPipeline.getStatus(); - } - parsedPipeline.getValue()->optimizePipeline(); - - auto targetStatus = establishShardCursors(expCtx, - expCtx->ns, - aggRequest.serializeToCommandObj().toBson(), - aggRequest, - liteParsedPipeline, - std::move(parsedPipeline.getValue())); - - if (!targetStatus.isOK()) { - return targetStatus.getStatus(); - } - auto targetingResults = std::move(targetStatus.getValue()); - if (targetingResults.remoteCursors.size() != 1) { - return {ErrorCodes::InternalError, - str::stream() << "Unable to target pipeline to single shard: " - << Value(rawPipeline).toString()}; - } - invariant(!targetingResults.pipelineForMerging); - - auto params = stdx::make_unique<ClusterClientCursorParams>( - expCtx->ns, - AuthorizationSession::get(expCtx->opCtx->getClient())->getAuthenticatedUserNames(), - ReadPreferenceSetting::get(expCtx->opCtx)); - params->remotes = std::move(targetingResults.remoteCursors); - - // We will transfer ownership of the params to the RouterStageInternalCursor, but need a - // reference to them to construct the RouterStageMerge. - auto* unownedParams = params.get(); - auto mergeStage = stdx::make_unique<RouterStageMerge>( - expCtx->opCtx, - Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), - unownedParams); - auto routerExecutionTree = stdx::make_unique<RouterStageInternalCursor>( - expCtx->opCtx, std::move(params), std::move(mergeStage)); - - return Pipeline::create( - {DocumentSourceRouterAdapter::create(expCtx, std::move(routerExecutionTree))}, expCtx); -} - } // namespace Status ClusterAggregate::runAggregate(OperationContext* opCtx, diff --git a/src/mongo/s/commands/pipeline_s.cpp b/src/mongo/s/commands/pipeline_s.cpp new file mode 100644 index 00000000000..67068ed12ff --- /dev/null +++ b/src/mongo/s/commands/pipeline_s.cpp @@ -0,0 +1,260 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/s/commands/pipeline_s.h" + +#include "mongo/db/pipeline/document_source.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/commands/cluster_commands_helpers.h" +#include "mongo/s/grid.h" +#include "mongo/s/query/establish_cursors.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; +using std::unique_ptr; + +namespace { +/** + * Determines the single shard to which the given query will be targeted, and its associated + * shardVersion. Throws if the query targets more than one shard. + */ +std::pair<ShardId, ChunkVersion> getSingleTargetedShardForQuery( + OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, BSONObj query) { + if (auto chunkMgr = routingInfo.cm()) { + std::set<ShardId> shardIds; + chunkMgr->getShardIdsForQuery(opCtx, query, BSONObj(), &shardIds); + uassert(ErrorCodes::InternalError, + str::stream() << "Unable to target lookup query to a single shard: " + << query.toString(), + shardIds.size() == 1u); + return {*shardIds.begin(), chunkMgr->getVersion(*shardIds.begin())}; + } + + return {routingInfo.primaryId(), ChunkVersion::UNSHARDED()}; +} + +/** + * Returns the routing information for the namespace set on the passed ExpressionContext. Also + * verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry. + */ +StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( + const intrusive_ptr<ExpressionContext>& expCtx) { + auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); + // Additionally check that the ExpressionContext's UUID matches the collection routing info. + if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().cm()) { + if (!swRoutingInfo.getValue().cm()->uuidMatches(*expCtx->uuid)) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "The UUID of collection " << expCtx->ns.ns() + << " changed; it may have been dropped and re-created."}; + } + } + return swRoutingInfo; +} + +/** + * Class to provide access to mongos-specific implementations of methods required by some document + * sources. + */ +class MongosProcessInterface final + : public DocumentSourceNeedsMongoProcessInterface::MongoProcessInterface { +public: + MongosProcessInterface(const intrusive_ptr<ExpressionContext>& expCtx) + : _expCtx(expCtx), _opCtx(_expCtx->opCtx) {} + + virtual ~MongosProcessInterface() = default; + + void setOperationContext(OperationContext* opCtx) final { + invariant(_expCtx->opCtx == opCtx); + _opCtx = opCtx; + } + + DBClientBase* directClient() final { + MONGO_UNREACHABLE; + } + + bool isSharded(const NamespaceString& ns) final { + MONGO_UNREACHABLE; + } + + BSONObj insert(const NamespaceString& ns, const std::vector<BSONObj>& objs) final { + MONGO_UNREACHABLE; + } + + CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, + const NamespaceString& ns) final { + MONGO_UNREACHABLE; + } + + void appendLatencyStats(const NamespaceString& nss, + bool includeHistograms, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + Status appendStorageStats(const NamespaceString& nss, + const BSONObj& param, + BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + Status appendRecordCount(const NamespaceString& nss, BSONObjBuilder* builder) const final { + MONGO_UNREACHABLE; + } + + BSONObj getCollectionOptions(const NamespaceString& nss) final { + MONGO_UNREACHABLE; + } + + Status renameIfOptionsAndIndexesHaveNotChanged( + const BSONObj& renameCommandObj, + const NamespaceString& targetNs, + const BSONObj& originalCollectionOptions, + const std::list<BSONObj>& originalIndexes) final { + MONGO_UNREACHABLE; + } + + StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions opts) final { + MONGO_UNREACHABLE; + } + + Status attachCursorSourceToPipeline(const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* pipeline) final { + MONGO_UNREACHABLE; + } + + std::vector<BSONObj> getCurrentOps(CurrentOpConnectionsMode connMode, + CurrentOpUserMode userMode, + CurrentOpTruncateMode truncateMode) const final { + MONGO_UNREACHABLE; + } + + std::string getShardName(OperationContext* opCtx) const final { + MONGO_UNREACHABLE; + } + + std::vector<FieldPath> collectDocumentKeyFields(UUID) const final { + MONGO_UNREACHABLE; + } + + boost::optional<Document> lookupSingleDocument(const intrusive_ptr<ExpressionContext>& expCtx, + const Document& filter) final { + // The passed ExpressionContext may use a different namespace than the _expCtx used to + // construct the MongosProcessInterface, but both should be using the same opCtx. + invariant(expCtx->opCtx == _expCtx->opCtx); + + // Create a QueryRequest from the given filter. This will be used to generate the find + // command to be dispatched to the shard in order to return the post-change document. + auto filterObj = filter.toBson(); + QueryRequest qr(expCtx->ns); + qr.setFilter(filterObj); + + auto swShardResult = makeStatusWith<std::vector<ClusterClientCursorParams::RemoteCursor>>(); + auto findCmd = qr.asFindCommand(); + size_t numAttempts = 0; + do { + // Verify that the collection exists, with the UUID passed in the expCtx. + auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + auto swRoutingInfo = getCollectionRoutingInfo(expCtx); + if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { + return boost::none; + } + auto routingInfo = uassertStatusOK(std::move(swRoutingInfo)); + + // Get the ID and version of the single shard to which this query will be sent. + auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj); + + // Dispatch the request. This will only be sent to a single shard and only a single + // result will be returned. The 'establishCursors' method conveniently prepares the + // result into a cursor response for us. + swShardResult = establishCursors( + expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + expCtx->ns, + ReadPreferenceSetting::get(expCtx->opCtx), + {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}}, + false, + nullptr); + + // If we hit a stale shardVersion exception, invalidate the routing table cache. + if (ErrorCodes::isStaleShardingError(swShardResult.getStatus().code())) { + catalogCache->onStaleConfigError(std::move(routingInfo)); + } + } while (!swShardResult.isOK() && ++numAttempts < kMaxNumStaleVersionRetries); + + auto shardResult = uassertStatusOK(std::move(swShardResult)); + invariant(shardResult.size() == 1u); + + auto& cursor = shardResult.front().cursorResponse; + auto& batch = cursor.getBatch(); + + // We should have at most 1 result, and the cursor should be exhausted. + uassert(ErrorCodes::InternalError, + str::stream() << "Shard cursor was unexpectedly open after lookup: " + << shardResult.front().hostAndPort + << ", id: " + << cursor.getCursorId(), + cursor.getCursorId() == 0); + uassert(ErrorCodes::TooManyMatchingDocuments, + str::stream() << "found more than one document matching " << filter.toString() + << " [" + << batch.begin()->toString() + << ", " + << std::next(batch.begin())->toString() + << "]", + batch.size() <= 1u); + + return boost::make_optional(!batch.empty(), Document(batch.front())); + } + +private: + intrusive_ptr<ExpressionContext> _expCtx; + OperationContext* _opCtx; +}; +} // namespace + +void PipelineS::injectMongosInterface(Pipeline* pipeline) { + const auto& sources = pipeline->getSources(); + for (auto&& source : sources) { + if (auto needsMongos = + dynamic_cast<DocumentSourceNeedsMongoProcessInterface*>(source.get())) { + needsMongos->injectMongoProcessInterface( + std::make_shared<MongosProcessInterface>(pipeline->getContext())); + } + } +} +} // namespace mongo diff --git a/src/mongo/s/commands/pipeline_s.h b/src/mongo/s/commands/pipeline_s.h new file mode 100644 index 00000000000..ca4bdc58270 --- /dev/null +++ b/src/mongo/s/commands/pipeline_s.h @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/pipeline/pipeline.h" + +namespace mongo { +/** + * PipelineS is an extension of the Pipeline class to provide additional utility functions on + * mongoS. For example, it can inject the pipeline with an implementation of MongoProcessInterface + * which provides mongos-specific versions of methods required by some document sources. + */ +class PipelineS { +public: + /** + * Injects a MongosProcessInterface into stages which require access to its functionality. + */ + static void injectMongosInterface(Pipeline* pipeline); + +private: + PipelineS() = delete; // Should never be instantiated. +}; + +} // namespace mongo |