diff options
Diffstat (limited to 'src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp')
-rw-r--r-- | src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp | 382 |
1 files changed, 382 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp new file mode 100644 index 00000000000..938006f7f8a --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -0,0 +1,382 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/pipeline/process_interface/mongos_process_interface.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/curop.h" +#include "mongo/db/index/index_descriptor.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/query/collation/collation_spec.h" +#include "mongo/db/query/collation/collator_factory_interface.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/s/scoped_collection_metadata.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/s/grid.h" +#include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/s/query/establish_cursors.h" +#include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/transaction_router.h" +#include "mongo/util/fail_point.h" + +namespace mongo { + +using boost::intrusive_ptr; +using std::shared_ptr; +using std::string; +using std::unique_ptr; + +namespace { + +/** + * Returns the routing information for the namespace set on the passed ExpressionContext. Also + * verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry. + */ +StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfo( + const intrusive_ptr<ExpressionContext>& expCtx) { + auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns); + // Additionally check that the ExpressionContext's UUID matches the collection routing info. + if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().cm()) { + if (!swRoutingInfo.getValue().cm()->uuidMatches(*expCtx->uuid)) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "The UUID of collection " << expCtx->ns.ns() + << " changed; it may have been dropped and re-created."}; + } + } + return swRoutingInfo; +} + +bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const BSONObj& index, + const std::set<FieldPath>& uniqueKeyPaths) { + // Retrieve the collation from the index, or default to the simple collation. + const auto collation = uassertStatusOK( + CollatorFactoryInterface::get(expCtx->opCtx->getServiceContext()) + ->makeFromBSON(index.hasField(IndexDescriptor::kCollationFieldName) + ? index.getObjectField(IndexDescriptor::kCollationFieldName) + : CollationSpec::kSimpleSpec)); + + // SERVER-5335: The _id index does not report to be unique, but in fact is unique. + auto isIdIndex = index[IndexDescriptor::kIndexNameFieldName].String() == "_id_"; + return (isIdIndex || index.getBoolField(IndexDescriptor::kUniqueFieldName)) && + !index.hasField(IndexDescriptor::kPartialFilterExprFieldName) && + CommonProcessInterface::keyPatternNamesExactPaths( + index.getObjectField(IndexDescriptor::kKeyPatternFieldName), uniqueKeyPaths) && + CollatorInterface::collatorsMatch(collation.get(), expCtx->getCollator()); +} + +} // namespace + +std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::makePipeline( + const std::vector<BSONObj>& rawPipeline, + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const MakePipelineOptions pipelineOptions) { + // Explain is not supported for auxiliary lookups. + invariant(!expCtx->explain); + + auto pipeline = uassertStatusOK(Pipeline::parse(rawPipeline, expCtx)); + if (pipelineOptions.optimize) { + pipeline->optimizePipeline(); + } + if (pipelineOptions.attachCursorSource) { + // 'attachCursorSourceToPipeline' handles any complexity related to sharding. + pipeline = attachCursorSourceToPipeline(expCtx, pipeline.release(), false); + } + + return pipeline; +} + +std::unique_ptr<Pipeline, PipelineDeleter> MongosProcessInterface::attachCursorSourceToPipeline( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + Pipeline* ownedPipeline, + bool allowTargetingShards) { + // On mongos we can't have local cursors. + return sharded_agg_helpers::targetShardsAndAddMergeCursors(expCtx, ownedPipeline); +} + +boost::optional<Document> MongosProcessInterface::lookupSingleDocument( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + UUID collectionUUID, + const Document& filter, + boost::optional<BSONObj> readConcern, + bool allowSpeculativeMajorityRead) { + auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID); + + // Create the find command to be dispatched to the shard(s) in order to return the post-image. + auto filterObj = filter.toBson(); + BSONObjBuilder cmdBuilder; + bool findCmdIsByUuid(foreignExpCtx->uuid); + if (findCmdIsByUuid) { + foreignExpCtx->uuid->appendToBuilder(&cmdBuilder, "find"); + } else { + cmdBuilder.append("find", nss.coll()); + } + cmdBuilder.append("filter", filterObj); + if (readConcern) { + cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern); + } + if (allowSpeculativeMajorityRead) { + cmdBuilder.append("allowSpeculativeMajorityRead", true); + } + + auto shardResults = std::vector<RemoteCursor>(); + auto findCmd = cmdBuilder.obj(); + for (size_t numRetries = 0; numRetries <= kMaxNumStaleVersionRetries; ++numRetries) { + // Obtain the catalog cache. If we are retrying after a stale shard error, mark this + // operation as needing to block on the next catalog cache refresh. + auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(expCtx->opCtx, numRetries); + + // Verify that the collection exists, with the correct UUID. + auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx); + if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { + return boost::none; + } + auto routingInfo = uassertStatusOK(std::move(swRoutingInfo)); + + // Finalize the 'find' command object based on the routing table information. + if (findCmdIsByUuid && routingInfo.cm()) { + // Find by UUID and shard versioning do not work together (SERVER-31946). In the + // sharded case we've already checked the UUID, so find by namespace is safe. In the + // unlikely case that the collection has been deleted and a new collection with the same + // name created through a different mongos, the shard version will be detected as stale, + // as shard versions contain an 'epoch' field unique to the collection. + findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement()); + findCmdIsByUuid = false; + } + + try { + // Build the versioned requests to be dispatched to the shards. Typically, only a single + // shard will be targeted here; however, in certain cases where only the _id is present, + // we may need to scatter-gather the query to all shards in order to find the document. + auto requests = getVersionedRequestsForTargetedShards( + expCtx->opCtx, nss, routingInfo, findCmd, filterObj, CollationSpec::kSimpleSpec); + + // Dispatch the requests. The 'establishCursors' method conveniently prepares the result + // into a vector of cursor responses for us. + shardResults = establishCursors( + expCtx->opCtx, + Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), + nss, + ReadPreferenceSetting::get(expCtx->opCtx), + std::move(requests), + false); + } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { + // If it's an unsharded collection which has been deleted and re-created, we may get a + // NamespaceNotFound error when looking up by UUID. + return boost::none; + } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>&) { + // If the database version is stale, refresh its entry in the catalog cache. + catalogCache->onStaleDatabaseVersion(nss.db(), routingInfo.db().databaseVersion()); + continue; // Try again if allowed. + } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& e) { + // If the exception provides a shardId, add it to the set of shards requiring a refresh. + // If the cache currently considers the collection to be unsharded, this will trigger an + // epoch refresh. If no shard is provided, then the epoch is stale and we must refresh. + auto staleInfo = e.extraInfo<StaleConfigInfo>(); + if (auto staleShardId = (staleInfo ? staleInfo->getShardId() : boost::none)) { + catalogCache->onStaleShardVersion(std::move(routingInfo), *staleShardId); + } else { + catalogCache->onEpochChange(nss); + } + continue; // Try again if allowed. + } + break; // Success! + } + + // Iterate all shard results and build a single composite batch. We also enforce the requirement + // that only a single document should have been returned from across the cluster. + std::vector<BSONObj> finalBatch; + for (auto&& shardResult : shardResults) { + auto& shardCursor = shardResult.getCursorResponse(); + finalBatch.insert( + finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end()); + // We should have at most 1 result, and the cursor should be exhausted. + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "Shard cursor was unexpectedly open after lookup: " + << shardResult.getHostAndPort() + << ", id: " << shardCursor.getCursorId(), + shardCursor.getCursorId() == 0); + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "found more than one document matching " << filter.toString() + << " [" << finalBatch.begin()->toString() << ", " + << std::next(finalBatch.begin())->toString() << "]", + finalBatch.size() <= 1u); + } + + return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{}); +} + +BSONObj MongosProcessInterface::_reportCurrentOpForClient( + OperationContext* opCtx, + Client* client, + CurrentOpTruncateMode truncateOps, + CurrentOpBacktraceMode backtraceMode) const { + BSONObjBuilder builder; + + CurOp::reportCurrentOpForClient(opCtx, + client, + (truncateOps == CurrentOpTruncateMode::kTruncateOps), + (backtraceMode == CurrentOpBacktraceMode::kIncludeBacktrace), + &builder); + + OperationContext* clientOpCtx = client->getOperationContext(); + + if (clientOpCtx) { + if (auto txnRouter = TransactionRouter::get(clientOpCtx)) { + txnRouter.reportState(clientOpCtx, &builder, true /* sessionIsActive */); + } + } + + return builder.obj(); +} + +void MongosProcessInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, + CurrentOpUserMode userMode, + std::vector<BSONObj>* ops) const { + auto sessionCatalog = SessionCatalog::get(opCtx); + + const bool authEnabled = + AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); + + // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to + // create a pattern that will match against all authenticated usernames for the current client. + // If the user is listing ops for all users, we create an empty pattern; constructing an + // instance of SessionKiller::Matcher with this empty pattern will return all sessions. + auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers + ? makeSessionFilterForAuthenticatedUsers(opCtx) + : KillAllSessionsByPatternSet{{}}); + + sessionCatalog->scanSessions({std::move(sessionFilter)}, [&](const ObservableSession& session) { + if (!session.currentOperation()) { + auto op = + TransactionRouter::get(session).reportState(opCtx, false /* sessionIsActive */); + if (!op.isEmpty()) { + ops->emplace_back(op); + } + } + }); +} + +void MongosProcessInterface::_reportCurrentOpsForTransactionCoordinators( + OperationContext* opCtx, bool includeIdle, std::vector<BSONObj>* ops) const {}; + +std::vector<GenericCursor> MongosProcessInterface::getIdleCursors( + const intrusive_ptr<ExpressionContext>& expCtx, CurrentOpUserMode userMode) const { + invariant(hasGlobalServiceContext()); + auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager(); + invariant(cursorManager); + return cursorManager->getIdleCursors(expCtx->opCtx, userMode); +} + +bool MongosProcessInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { + auto routingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); + return routingInfo.isOK() && routingInfo.getValue().cm(); +} + +bool MongosProcessInterface::fieldsHaveSupportingUniqueIndex( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss, + const std::set<FieldPath>& fieldPaths) const { + const auto opCtx = expCtx->opCtx; + const auto routingInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + + // Run an exhaustive listIndexes against the primary shard only. + auto response = routingInfo.db().primary()->runExhaustiveCursorCommand( + opCtx, + ReadPreferenceSetting::get(opCtx), + nss.db().toString(), + BSON("listIndexes" << nss.coll()), + opCtx->hasDeadline() ? opCtx->getRemainingMaxTimeMillis() : Milliseconds(-1)); + + // If the namespace does not exist, then the field paths *must* be _id only. + if (response.getStatus() == ErrorCodes::NamespaceNotFound) { + return fieldPaths == std::set<FieldPath>{"_id"}; + } + uassertStatusOK(response); + + const auto& indexes = response.getValue().docs; + return std::any_of(indexes.begin(), indexes.end(), [&expCtx, &fieldPaths](const auto& index) { + return supportsUniqueKey(expCtx, index, fieldPaths); + }); +} + +std::pair<std::set<FieldPath>, boost::optional<ChunkVersion>> +MongosProcessInterface::ensureFieldsUniqueOrResolveDocumentKey( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + boost::optional<std::vector<std::string>> fields, + boost::optional<ChunkVersion> targetCollectionVersion, + const NamespaceString& outputNs) const { + invariant(expCtx->inMongos); + uassert( + 51179, "Received unexpected 'targetCollectionVersion' on mongos", !targetCollectionVersion); + + if (fields) { + // Convert 'fields' array to a set of FieldPaths. + auto fieldPaths = _convertToFieldPaths(*fields); + uassert(51190, + "Cannot find index to verify that join fields will be unique", + fieldsHaveSupportingUniqueIndex(expCtx, outputNs, fieldPaths)); + + // If the user supplies the 'fields' array, we don't need to attach a ChunkVersion for the + // shards since we are not at risk of 'guessing' the wrong shard key. + return {fieldPaths, boost::none}; + } + + // In case there are multiple shards which will perform this stage in parallel, we need to + // figure out and attach the collection's shard version to ensure each shard is talking about + // the same version of the collection. This mongos will coordinate that. We force a catalog + // refresh to do so because there is no shard versioning protocol on this namespace and so we + // otherwise could not be sure this node is (or will become) at all recent. We will also + // figure out and attach the 'joinFields' to send to the shards. + + // There are edge cases when the collection could be dropped or re-created during or near the + // time of the operation (for example, during aggregation). This is okay - we are mostly + // paranoid that this mongos is very stale and want to prevent returning an error if the + // collection was dropped a long time ago. Because of this, we are okay with piggy-backing off + // another thread's request to refresh the cache, simply waiting for that request to return + // instead of forcing another refresh. + targetCollectionVersion = refreshAndGetCollectionVersion(expCtx, outputNs); + + auto docKeyPaths = collectDocumentKeyFieldsActingAsRouter(expCtx->opCtx, outputNs); + return {std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()), + std::make_move_iterator(docKeyPaths.end())), + targetCollectionVersion}; +} + +} // namespace mongo |