/** * Copyright (C) 2015 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/s/query/cluster_find.h" #include #include #include "mongo/base/status_with.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/client/read_preference.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/find_common.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_planner_common.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/platform/overflow_arithmetic.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_client_cursor_impl.h" #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/s/query/store_possible_cursor.h" #include "mongo/s/stale_exception.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace { static const BSONObj kSortKeyMetaProjection = BSON("$meta" << "sortKey"); static const BSONObj kGeoNearDistanceMetaProjection = BSON("$meta" << "geoNearDistance"); // We must allow some amount of overhead per result document, since when we make a cursor response // the documents are elements of a BSONArray. The overhead is 1 byte/doc for the type + 1 byte/doc // for the field name's null terminator + 1 byte per digit in the array index. The index can be no // more than 8 decimal digits since the response is at most 16MB, and 16 * 1024 * 1024 < 1 * 10^8. static const int kPerDocumentOverheadBytesUpperBound = 10; /** * Given the QueryRequest 'qr' being executed by mongos, returns a copy of the query which is * suitable for forwarding to the targeted hosts. */ StatusWith> transformQueryForShards( const QueryRequest& qr, bool appendGeoNearDistanceProjection) { // If there is a limit, we forward the sum of the limit and the skip. boost::optional newLimit; if (qr.getLimit()) { long long newLimitValue; if (mongoSignedAddOverflow64(*qr.getLimit(), qr.getSkip().value_or(0), &newLimitValue)) { return Status( ErrorCodes::Overflow, str::stream() << "sum of limit and skip cannot be represented as a 64-bit integer, limit: " << *qr.getLimit() << ", skip: " << qr.getSkip().value_or(0)); } newLimit = newLimitValue; } // Similarly, if nToReturn is set, we forward the sum of nToReturn and the skip. boost::optional newNToReturn; if (qr.getNToReturn()) { // !wantMore and ntoreturn mean the same as !wantMore and limit, so perform the conversion. if (!qr.wantMore()) { long long newLimitValue; if (mongoSignedAddOverflow64( *qr.getNToReturn(), qr.getSkip().value_or(0), &newLimitValue)) { return Status(ErrorCodes::Overflow, str::stream() << "sum of ntoreturn and skip cannot be represented as a 64-bit " "integer, ntoreturn: " << *qr.getNToReturn() << ", skip: " << qr.getSkip().value_or(0)); } newLimit = newLimitValue; } else { long long newNToReturnValue; if (mongoSignedAddOverflow64( *qr.getNToReturn(), qr.getSkip().value_or(0), &newNToReturnValue)) { return Status(ErrorCodes::Overflow, str::stream() << "sum of ntoreturn and skip cannot be represented as a 64-bit " "integer, ntoreturn: " << *qr.getNToReturn() << ", skip: " << qr.getSkip().value_or(0)); } newNToReturn = newNToReturnValue; } } // If there is a sort other than $natural, we send a sortKey meta-projection to the remote node. BSONObj newProjection = qr.getProj(); if (!qr.getSort().isEmpty() && !qr.getSort()["$natural"]) { BSONObjBuilder projectionBuilder; projectionBuilder.appendElements(qr.getProj()); projectionBuilder.append(ClusterClientCursorParams::kSortKeyField, kSortKeyMetaProjection); newProjection = projectionBuilder.obj(); } if (appendGeoNearDistanceProjection) { invariant(qr.getSort().isEmpty()); BSONObjBuilder projectionBuilder; projectionBuilder.appendElements(qr.getProj()); projectionBuilder.append(ClusterClientCursorParams::kSortKeyField, kGeoNearDistanceMetaProjection); newProjection = projectionBuilder.obj(); } auto newQR = stdx::make_unique(qr); newQR->setProj(newProjection); newQR->setSkip(boost::none); newQR->setLimit(newLimit); newQR->setNToReturn(newNToReturn); // Even if the client sends us singleBatch=true (wantMore=false), we may need to retrieve // multiple batches from a shard in order to return the single requested batch to the client. // Therefore, we must always send singleBatch=false (wantMore=true) to the shards. newQR->setWantMore(true); invariantOK(newQR->validate()); return std::move(newQR); } CursorId runQueryWithoutRetrying(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, ChunkManager* chunkManager, std::shared_ptr primary, std::vector* results) { auto shardRegistry = Grid::get(opCtx)->shardRegistry(); // Get the set of shards on which we will run the query. std::vector> shards; if (chunkManager) { std::set shardIds; chunkManager->getShardIdsForQuery(opCtx, query.getQueryRequest().getFilter(), query.getQueryRequest().getCollation(), &shardIds); for (auto id : shardIds) { shards.emplace_back(uassertStatusOK(shardRegistry->getShard(opCtx, id))); } } else { shards.emplace_back(std::move(primary)); } // Construct the query and parameters. ClusterClientCursorParams params(query.nss(), readPref); params.limit = query.getQueryRequest().getLimit(); params.batchSize = query.getQueryRequest().getEffectiveBatchSize(); params.skip = query.getQueryRequest().getSkip(); params.tailableMode = query.getQueryRequest().getTailableMode(); params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults(); // This is the batchSize passed to each subsequent getMore command issued by the cursor. We // usually use the batchSize associated with the initial find, but as it is illegal to send a // getMore with a batchSize of 0, we set it to use the default batchSize logic. if (params.batchSize && *params.batchSize == 0) { params.batchSize = boost::none; } // $natural sort is actually a hint to use a collection scan, and shouldn't be treated like a // sort on mongos. Including a $natural anywhere in the sort spec results in the whole sort // being considered a hint to use a collection scan. if (!query.getQueryRequest().getSort().hasField("$natural")) { params.sort = FindCommon::transformSortSpec(query.getQueryRequest().getSort()); } bool appendGeoNearDistanceProjection = false; if (query.getQueryRequest().getSort().isEmpty() && QueryPlannerCommon::hasNode(query.root(), MatchExpression::GEO_NEAR)) { // There is no specified sort, and there is a GEO_NEAR node. This means we should merge sort // by the geoNearDistance. Request the projection {$sortKey: } from the // shards. Indicate to the AsyncResultsMerger that it should extract the sort key // {"$sortKey": } and sort by the order {"$sortKey": 1}. params.sort = ClusterClientCursorParams::kWholeSortKeySortPattern; params.compareWholeSortKey = true; appendGeoNearDistanceProjection = true; } // Tailable cursors can't have a sort, which should have already been validated. invariant(params.sort.isEmpty() || !query.getQueryRequest().isTailable()); const auto qrToForward = uassertStatusOK( transformQueryForShards(query.getQueryRequest(), appendGeoNearDistanceProjection)); // Construct the find command that we will use to establish cursors, attaching the shardVersion. std::vector> requests; for (const auto& shard : shards) { invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::INVALID); BSONObjBuilder cmdBuilder; qrToForward->asFindCommand(&cmdBuilder); if (chunkManager) { ChunkVersion version(chunkManager->getVersion(shard->getId())); version.appendForCommands(&cmdBuilder); } else if (!query.nss().isOnInternalDb()) { ChunkVersion version(ChunkVersion::UNSHARDED()); version.appendForCommands(&cmdBuilder); } requests.emplace_back(shard->getId(), cmdBuilder.obj()); } // Establish the cursors with a consistent shardVersion across shards. params.remotes = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), query.nss(), readPref, requests, query.getQueryRequest().isAllowPartialResults()); // Determine whether the cursor we may eventually register will be single- or multi-target. const auto cursorType = params.remotes.size() > 1 ? ClusterCursorManager::CursorType::MultiTarget : ClusterCursorManager::CursorType::SingleTarget; // Transfer the established cursors to a ClusterClientCursor. auto ccc = ClusterClientCursorImpl::make( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); // Retrieve enough data from the ClusterClientCursor for the first batch of results. auto cursorState = ClusterCursorManager::CursorState::NotExhausted; int bytesBuffered = 0; while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) { auto next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); if (next.isEOF()) { // We reached end-of-stream. If the cursor is not tailable, then we mark it as // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even // when we reach end-of-stream. However, if all the remote cursors are exhausted, there // is no hope of returning data and thus we need to close the mongos cursor as well. if (!ccc->isTailable() || ccc->remotesExhausted()) { cursorState = ClusterCursorManager::CursorState::Exhausted; } break; } auto nextObj = *next.getResult(); // If adding this object will cause us to exceed the message size limit, then we stash it // for later. if (!FindCommon::haveSpaceForNext(nextObj, results->size(), bytesBuffered)) { ccc->queueResult(nextObj); break; } // Add doc to the batch. Account for the space overhead associated with returning this doc // inside a BSON array. bytesBuffered += (nextObj.objsize() + kPerDocumentOverheadBytesUpperBound); results->push_back(std::move(nextObj)); } ccc->detachFromOperationContext(); if (!query.getQueryRequest().wantMore() && !ccc->isTailable()) { cursorState = ClusterCursorManager::CursorState::Exhausted; } // If the cursor is exhausted, then there are no more results to return and we don't need to // allocate a cursor id. if (cursorState == ClusterCursorManager::CursorState::Exhausted) { return CursorId(0); } // Register the cursor with the cursor manager for subsequent getMore's. auto cursorManager = Grid::get(opCtx)->getCursorManager(); const auto cursorLifetime = query.getQueryRequest().isNoCursorTimeout() ? ClusterCursorManager::CursorLifetime::Immortal : ClusterCursorManager::CursorLifetime::Mortal; auto authUsers = AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(); return uassertStatusOK(cursorManager->registerCursor( opCtx, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime, authUsers)); } } // namespace const size_t ClusterFind::kMaxStaleConfigRetries = 10; CursorId ClusterFind::runQuery(OperationContext* opCtx, const CanonicalQuery& query, const ReadPreferenceSetting& readPref, std::vector* results) { invariant(results); // Projection on the reserved sort key field is illegal in mongos. if (query.getQueryRequest().getProj().hasField(ClusterClientCursorParams::kSortKeyField)) { uasserted(ErrorCodes::BadValue, str::stream() << "Projection contains illegal field '" << ClusterClientCursorParams::kSortKeyField << "': " << query.getQueryRequest().getProj()); } auto const catalogCache = Grid::get(opCtx)->catalogCache(); // Re-target and re-send the initial find command to the shards until we have established the // shard version. for (size_t retries = 1; retries <= kMaxStaleConfigRetries; ++retries) { auto routingInfoStatus = catalogCache->getCollectionRoutingInfo(opCtx, query.nss()); if (routingInfoStatus == ErrorCodes::NamespaceNotFound) { // If the database doesn't exist, we successfully return an empty result set without // creating a cursor. return CursorId(0); } auto routingInfo = uassertStatusOK(routingInfoStatus); try { return runQueryWithoutRetrying( opCtx, query, readPref, routingInfo.cm().get(), routingInfo.primary(), results); } catch (const DBException& ex) { if (!ErrorCodes::isStaleShardingError(ex.code()) && ex.code() != ErrorCodes::ShardNotFound) { // Errors other than trying to reach a non existent shard or receiving a stale // metadata message from MongoD are fatal to the operation. Network errors and // replication retries happen at the level of the AsyncResultsMerger. throw; } LOG(1) << "Received error status for query " << redact(query.toStringShort()) << " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": " << redact(ex); catalogCache->onStaleConfigError(std::move(routingInfo)); } } uasserted(ErrorCodes::StaleShardVersion, str::stream() << "Retried " << kMaxStaleConfigRetries << " times without successfully establishing shard version."); } StatusWith ClusterFind::runGetMore(OperationContext* opCtx, const GetMoreRequest& request) { auto cursorManager = Grid::get(opCtx)->getCursorManager(); auto authzSession = AuthorizationSession::get(opCtx->getClient()); auto authChecker = [&authzSession](UserNameIterator userNames) -> Status { return authzSession->isCoauthorizedWith(userNames) ? Status::OK() : Status(ErrorCodes::Unauthorized, "User not authorized to access cursor"); }; auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid, opCtx, authChecker); if (!pinnedCursor.isOK()) { return pinnedCursor.getStatus(); } invariant(request.cursorid == pinnedCursor.getValue().getCursorId()); // If the fail point is enabled, busy wait until it is disabled. while (MONGO_FAIL_POINT(waitAfterPinningCursorBeforeGetMoreBatch)) { } if (auto readPref = pinnedCursor.getValue().getReadPreference()) { ReadPreferenceSetting::get(opCtx) = *readPref; } if (pinnedCursor.getValue().isTailableAndAwaitData()) { // A maxTimeMS specified on a tailable, awaitData cursor is special. Instead of imposing a // deadline on the operation, it is used to communicate how long the server should wait for // new results. Here we clear any deadline set during command processing and track the // deadline instead via the 'waitForInsertsDeadline' decoration. This deadline defaults to // 1 second if the user didn't specify a maxTimeMS. opCtx->clearDeadline(); auto timeout = request.awaitDataTimeout.value_or(Milliseconds{1000}); awaitDataState(opCtx).waitForInsertsDeadline = opCtx->getServiceContext()->getPreciseClockSource()->now() + timeout; invariant(pinnedCursor.getValue().setAwaitDataTimeout(timeout).isOK()); } else if (request.awaitDataTimeout) { return {ErrorCodes::BadValue, "maxTimeMS can only be used with getMore for tailable, awaitData cursors"}; } std::vector batch; int bytesBuffered = 0; long long batchSize = request.batchSize.value_or(0); long long startingFrom = pinnedCursor.getValue().getNumReturnedSoFar(); auto cursorState = ClusterCursorManager::CursorState::NotExhausted; while (!FindCommon::enoughForGetMore(batchSize, batch.size())) { auto context = batch.empty() ? RouterExecStage::ExecContext::kGetMoreNoResultsYet : RouterExecStage::ExecContext::kGetMoreWithAtLeastOneResultInBatch; StatusWith next = Status{ErrorCodes::InternalError, "uninitialized cluster query result"}; try { next = pinnedCursor.getValue().next(context); } catch (const ExceptionFor&) { // This exception is thrown when a $changeStream stage encounters an event // that invalidates the cursor. We should close the cursor and return without // error. cursorState = ClusterCursorManager::CursorState::Exhausted; break; } if (!next.isOK()) { return next.getStatus(); } if (next.getValue().isEOF()) { // We reached end-of-stream. If the cursor is not tailable, then we mark it as // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even when // we reach end-of-stream. However, if all the remote cursors are exhausted, there is no // hope of returning data and thus we need to close the mongos cursor as well. if (!pinnedCursor.getValue().isTailable() || pinnedCursor.getValue().remotesExhausted()) { cursorState = ClusterCursorManager::CursorState::Exhausted; } break; } if (!FindCommon::haveSpaceForNext( *next.getValue().getResult(), batch.size(), bytesBuffered)) { pinnedCursor.getValue().queueResult(*next.getValue().getResult()); break; } // Add doc to the batch. Account for the space overhead associated with returning this doc // inside a BSON array. bytesBuffered += (next.getValue().getResult()->objsize() + kPerDocumentOverheadBytesUpperBound); batch.push_back(std::move(*next.getValue().getResult())); } // Upon successful completion, transfer ownership of the cursor back to the cursor manager. If // the cursor has been exhausted, the cursor manager will clean it up for us. pinnedCursor.getValue().returnCursor(cursorState); CursorId idToReturn = (cursorState == ClusterCursorManager::CursorState::Exhausted) ? CursorId(0) : request.cursorid; return CursorResponse(request.nss, idToReturn, std::move(batch), startingFrom); } } // namespace mongo