diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2018-07-05 18:17:17 -0400 |
---|---|---|
committer | Gregory Noma <gregory.noma@gmail.com> | 2018-07-10 09:56:55 -0400 |
commit | 1868c56cf7466793b1bbee553d9325d5d68f5ba2 (patch) | |
tree | 88684b8d5cd558bd6d3bc025a6cb36b827abd584 /src | |
parent | e86d684515abe1c4dbf79dbb71741b5db0317039 (diff) | |
download | mongo-1868c56cf7466793b1bbee553d9325d5d68f5ba2.tar.gz |
SERVER-35910 Upgrade FindCmd and ClusterFindCmd to use TypedCommand
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 540 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_find_cmd.cpp | 293 |
2 files changed, 411 insertions, 422 deletions
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 617cfdca8e0..e16f2473092 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -61,15 +61,17 @@ const auto kTermField = "term"_sd; /** * A command for running .find() queries. */ -class FindCmd : public BasicCommand { +class FindCmd final : public Command { public: - FindCmd() : BasicCommand("find") {} + FindCmd() : Command("find") {} - bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; + std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, + const OpMsgRequest& opMsgRequest) override { + // TODO: Parse into a QueryRequest here. + return std::make_unique<Invocation>(this, opMsgRequest, opMsgRequest.getDatabase()); } - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override { return AllowedOnSecondary::kOptIn; } @@ -81,12 +83,6 @@ public: return false; } - bool supportsReadConcern(const std::string& dbName, - const BSONObj& cmdObj, - repl::ReadConcernLevel level) const final { - return true; - } - std::string help() const override { return "query for documents"; } @@ -111,299 +107,297 @@ public: return false; } - Status checkAuthForOperation(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj) const override { - AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient()); + class Invocation final : public CommandInvocation { + public: + Invocation(const FindCmd* definition, const OpMsgRequest& request, StringData dbName) + : CommandInvocation(definition), _request(request), _dbName(dbName) {} - if (!authSession->isAuthorizedToParseNamespaceElement(cmdObj.firstElement())) { - return Status(ErrorCodes::Unauthorized, "Unauthorized"); + private: + bool supportsWriteConcern() const override { + return false; } - const auto hasTerm = cmdObj.hasField(kTermField); - return authSession->checkAuthForFind( - AutoGetCollection::resolveNamespaceStringOrUUID( - opCtx, CommandHelpers::parseNsOrUUID(dbname, cmdObj)), - hasTerm); - } + bool supportsReadConcern(repl::ReadConcernLevel level) const final { + return true; + } - Status explain(OperationContext* opCtx, - const OpMsgRequest& request, - ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const override { - std::string dbname = request.getDatabase().toString(); - const BSONObj& cmdObj = request.body; - // Acquire locks and resolve possible UUID. The RAII object is optional, because in the case - // of a view, the locks need to be released. - boost::optional<AutoGetCollectionForReadCommand> ctx; - ctx.emplace(opCtx, - CommandHelpers::parseNsOrUUID(dbname, cmdObj), - AutoGetCollection::ViewMode::kViewsPermitted); - const auto nss = ctx->getNss(); - - // Parse the command BSON to a QueryRequest. - const bool isExplain = true; - auto qrStatus = QueryRequest::makeFromFindCommand(nss, cmdObj, isExplain); - if (!qrStatus.isOK()) { - return qrStatus.getStatus(); + NamespaceString ns() const override { + // TODO get the ns from the parsed QueryRequest. + return NamespaceString(CommandHelpers::parseNsFromCommand(_dbName, _request.body)); } - // Finish the parsing step by using the QueryRequest to create a CanonicalQuery. - const ExtensionsCallbackReal extensionsCallback(opCtx, &nss); - const boost::intrusive_ptr<ExpressionContext> expCtx; - auto statusWithCQ = - CanonicalQuery::canonicalize(opCtx, - std::move(qrStatus.getValue()), - expCtx, - extensionsCallback, - MatchExpressionParser::kAllowAllSpecialFeatures); - if (!statusWithCQ.isOK()) { - return statusWithCQ.getStatus(); + void doCheckAuthorization(OperationContext* opCtx) const final { + AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient()); + + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + authSession->isAuthorizedToParseNamespaceElement(_request.body.firstElement())); + + const auto hasTerm = _request.body.hasField(kTermField); + uassertStatusOK(authSession->checkAuthForFind( + AutoGetCollection::resolveNamespaceStringOrUUID( + opCtx, CommandHelpers::parseNsOrUUID(_dbName, _request.body)), + hasTerm)); } - std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - - if (ctx->getView()) { - // Relinquish locks. The aggregation command will re-acquire them. - ctx.reset(); - - // Convert the find command into an aggregation using $match (and other stages, as - // necessary), if possible. - const auto& qr = cq->getQueryRequest(); - auto viewAggregationCommand = qr.asAggregationCommand(); - if (!viewAggregationCommand.isOK()) - return viewAggregationCommand.getStatus(); - - // Create the agg request equivalent of the find operation, with the explain verbosity - // included. - auto aggRequest = AggregationRequest::parseFromBSON( - nss, viewAggregationCommand.getValue(), verbosity); - if (!aggRequest.isOK()) { - return aggRequest.getStatus(); - } - try { - return runAggregate( - opCtx, nss, aggRequest.getValue(), viewAggregationCommand.getValue(), *out); - } catch (DBException& error) { - if (error.code() == ErrorCodes::InvalidPipelineOperator) { - return {ErrorCodes::InvalidPipelineOperator, - str::stream() << "Unsupported in view pipeline: " << error.what()}; + void explain(OperationContext* opCtx, + ExplainOptions::Verbosity verbosity, + BSONObjBuilder* result) override { + // Acquire locks and resolve possible UUID. The RAII object is optional, because in the + // case of a view, the locks need to be released. + boost::optional<AutoGetCollectionForReadCommand> ctx; + ctx.emplace(opCtx, + CommandHelpers::parseNsOrUUID(_dbName, _request.body), + AutoGetCollection::ViewMode::kViewsPermitted); + const auto nss = ctx->getNss(); + + // Parse the command BSON to a QueryRequest. + const bool isExplain = true; + auto qr = + uassertStatusOK(QueryRequest::makeFromFindCommand(nss, _request.body, isExplain)); + + // Finish the parsing step by using the QueryRequest to create a CanonicalQuery. + const ExtensionsCallbackReal extensionsCallback(opCtx, &nss); + const boost::intrusive_ptr<ExpressionContext> expCtx; + auto cq = uassertStatusOK( + CanonicalQuery::canonicalize(opCtx, + std::move(qr), + expCtx, + extensionsCallback, + MatchExpressionParser::kAllowAllSpecialFeatures)); + + if (ctx->getView()) { + // Relinquish locks. The aggregation command will re-acquire them. + ctx.reset(); + + // Convert the find command into an aggregation using $match (and other stages, as + // necessary), if possible. + const auto& qr = cq->getQueryRequest(); + auto viewAggregationCommand = uassertStatusOK(qr.asAggregationCommand()); + + // Create the agg request equivalent of the find operation, with the explain + // verbosity included. + auto aggRequest = uassertStatusOK( + AggregationRequest::parseFromBSON(nss, viewAggregationCommand, verbosity)); + + try { + uassertStatusOK( + runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, *result)); + } catch (DBException& error) { + if (error.code() == ErrorCodes::InvalidPipelineOperator) { + uasserted(ErrorCodes::InvalidPipelineOperator, + str::stream() << "Unsupported in view pipeline: " + << error.what()); + } + throw; } - return error.toStatus(); + return; } - } - // The collection may be NULL. If so, getExecutor() should handle it by returning an - // execution tree with an EOFStage. - Collection* const collection = ctx->getCollection(); + // The collection may be NULL. If so, getExecutor() should handle it by returning an + // execution tree with an EOFStage. + Collection* const collection = ctx->getCollection(); - // We have a parsed query. Time to get the execution plan for it. - auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); - if (!statusWithPlanExecutor.isOK()) { - return statusWithPlanExecutor.getStatus(); - } - auto exec = std::move(statusWithPlanExecutor.getValue()); + // We have a parsed query. Time to get the execution plan for it. + auto exec = uassertStatusOK(getExecutorFind(opCtx, collection, nss, std::move(cq))); - // Got the execution tree. Explain it. - Explain::explainStages(exec.get(), collection, verbosity, out); - return Status::OK(); - } - - /** - * Runs a query using the following steps: - * --Parsing. - * --Acquire locks. - * --Plan query, obtaining an executor that can run it. - * --Generate the first batch. - * --Save state for getMore, transferring ownership of the executor to a ClientCursor. - * --Generate response to send to the client. - */ - bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { - // Although it is a command, a find command gets counted as a query. - globalOpCounters.gotQuery(); - - // Parse the command BSON to a QueryRequest. - const bool isExplain = false; - // Pass parseNs to makeFromFindCommand in case cmdObj does not have a UUID. - auto qrStatus = QueryRequest::makeFromFindCommand( - NamespaceString(parseNs(dbname, cmdObj)), cmdObj, isExplain); - uassertStatusOK(qrStatus.getStatus()); - - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - auto& qr = qrStatus.getValue(); - const auto session = OperationContextSession::get(opCtx); - uassert(ErrorCodes::InvalidOptions, - "It is illegal to open a tailable cursor in a transaction", - session == nullptr || !(session->inMultiDocumentTransaction() && qr->isTailable())); - - // Validate term before acquiring locks, if provided. - if (auto term = qr->getReplicationTerm()) { - Status status = replCoord->updateTerm(opCtx, *term); - // Note: updateTerm returns ok if term stayed the same. - uassertStatusOK(status); + // Got the execution tree. Explain it. + Explain::explainStages(exec.get(), collection, verbosity, result); } - // Acquire locks. If the query is on a view, we release our locks and convert the query - // request into an aggregation command. - boost::optional<AutoGetCollectionForReadCommand> ctx; - ctx.emplace(opCtx, - CommandHelpers::parseNsOrUUID(dbname, cmdObj), - AutoGetCollection::ViewMode::kViewsPermitted); - const auto& nss = ctx->getNss(); - - qr->refreshNSS(opCtx); - - // Check whether we are allowed to read from this node after acquiring our locks. - uassertStatusOK(replCoord->checkCanServeReadsFor( - opCtx, nss, ReadPreferenceSetting::get(opCtx).canRunOnSecondary())); - - // Fill out curop information. - // - // We pass negative values for 'ntoreturn' and 'ntoskip' to indicate that these values - // should be omitted from the log line. Limit and skip information is already present in the - // find command parameters, so these fields are redundant. - const int ntoreturn = -1; - const int ntoskip = -1; - beginQueryOp(opCtx, nss, cmdObj, ntoreturn, ntoskip); - - // Finish the parsing step by using the QueryRequest to create a CanonicalQuery. - const ExtensionsCallbackReal extensionsCallback(opCtx, &nss); - const boost::intrusive_ptr<ExpressionContext> expCtx; - auto statusWithCQ = - CanonicalQuery::canonicalize(opCtx, - std::move(qr), - expCtx, - extensionsCallback, - MatchExpressionParser::kAllowAllSpecialFeatures); - uassertStatusOK(statusWithCQ.getStatus()); - std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); - - if (ctx->getView()) { - // Relinquish locks. The aggregation command will re-acquire them. - ctx.reset(); - - // Convert the find command into an aggregation using $match (and other stages, as - // necessary), if possible. - const auto& qr = cq->getQueryRequest(); - auto viewAggregationCommand = qr.asAggregationCommand(); - uassertStatusOK(viewAggregationCommand.getStatus()); - - BSONObj aggResult = CommandHelpers::runCommandDirectly( - opCtx, - OpMsgRequest::fromDBAndBody(dbname, std::move(viewAggregationCommand.getValue()))); - auto status = getStatusFromCommandResult(aggResult); - if (status.code() == ErrorCodes::InvalidPipelineOperator) { - uasserted(ErrorCodes::InvalidPipelineOperator, - str::stream() << "Unsupported in view pipeline: " << status.reason()); + /** + * Runs a query using the following steps: + * --Parsing. + * --Acquire locks. + * --Plan query, obtaining an executor that can run it. + * --Generate the first batch. + * --Save state for getMore, transferring ownership of the executor to a ClientCursor. + * --Generate response to send to the client. + */ + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) { + // Although it is a command, a find command gets counted as a query. + globalOpCounters.gotQuery(); + + // Parse the command BSON to a QueryRequest. + const bool isExplain = false; + // Pass parseNs to makeFromFindCommand in case _request.body does not have a UUID. + auto qr = uassertStatusOK(QueryRequest::makeFromFindCommand( + NamespaceString(CommandHelpers::parseNsFromCommand(_dbName, _request.body)), + _request.body, + isExplain)); + + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + const auto session = OperationContextSession::get(opCtx); + uassert(ErrorCodes::InvalidOptions, + "It is illegal to open a tailable cursor in a transaction", + session == nullptr || + !(session->inMultiDocumentTransaction() && qr->isTailable())); + + // Validate term before acquiring locks, if provided. + if (auto term = qr->getReplicationTerm()) { + // Note: updateTerm returns ok if term stayed the same. + uassertStatusOK(replCoord->updateTerm(opCtx, *term)); } - result.resetToEmpty(); - result.appendElements(aggResult); - return status.isOK(); - } - Collection* const collection = ctx->getCollection(); + // Acquire locks. If the query is on a view, we release our locks and convert the query + // request into an aggregation command. + boost::optional<AutoGetCollectionForReadCommand> ctx; + ctx.emplace(opCtx, + CommandHelpers::parseNsOrUUID(_dbName, _request.body), + AutoGetCollection::ViewMode::kViewsPermitted); + const auto& nss = ctx->getNss(); + + qr->refreshNSS(opCtx); + + // Check whether we are allowed to read from this node after acquiring our locks. + uassertStatusOK(replCoord->checkCanServeReadsFor( + opCtx, nss, ReadPreferenceSetting::get(opCtx).canRunOnSecondary())); + + // Fill out curop information. + // + // We pass negative values for 'ntoreturn' and 'ntoskip' to indicate that these values + // should be omitted from the log line. Limit and skip information is already present in + // the find command parameters, so these fields are redundant. + const int ntoreturn = -1; + const int ntoskip = -1; + beginQueryOp(opCtx, nss, _request.body, ntoreturn, ntoskip); + + // Finish the parsing step by using the QueryRequest to create a CanonicalQuery. + const ExtensionsCallbackReal extensionsCallback(opCtx, &nss); + const boost::intrusive_ptr<ExpressionContext> expCtx; + auto cq = uassertStatusOK( + CanonicalQuery::canonicalize(opCtx, + std::move(qr), + expCtx, + extensionsCallback, + MatchExpressionParser::kAllowAllSpecialFeatures)); + + if (ctx->getView()) { + // Relinquish locks. The aggregation command will re-acquire them. + ctx.reset(); + + // Convert the find command into an aggregation using $match (and other stages, as + // necessary), if possible. + const auto& qr = cq->getQueryRequest(); + auto viewAggregationCommand = uassertStatusOK(qr.asAggregationCommand()); + + BSONObj aggResult = CommandHelpers::runCommandDirectly( + opCtx, OpMsgRequest::fromDBAndBody(_dbName, std::move(viewAggregationCommand))); + auto status = getStatusFromCommandResult(aggResult); + if (status.code() == ErrorCodes::InvalidPipelineOperator) { + uasserted(ErrorCodes::InvalidPipelineOperator, + str::stream() << "Unsupported in view pipeline: " << status.reason()); + } + uassertStatusOK(status); + result->getBodyBuilder().appendElements(aggResult); + return; + } - // Get the execution plan for the query. - auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); - uassertStatusOK(statusWithPlanExecutor.getStatus()); + Collection* const collection = ctx->getCollection(); - auto exec = std::move(statusWithPlanExecutor.getValue()); + // Get the execution plan for the query. + auto exec = uassertStatusOK(getExecutorFind(opCtx, collection, nss, std::move(cq))); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); - } + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get())); + } - if (!collection) { - // No collection. Just fill out curop indicating that there were zero results and - // there is no ClientCursor id, and then return. - const long long numResults = 0; - const CursorId cursorId = 0; - endQueryOp(opCtx, collection, *exec, numResults, cursorId); - appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &result); - return true; - } + if (!collection) { + // No collection. Just fill out curop indicating that there were zero results and + // there is no ClientCursor id, and then return. + const long long numResults = 0; + const CursorId cursorId = 0; + endQueryOp(opCtx, collection, *exec, numResults, cursorId); + auto bodyBuilder = result->getBodyBuilder(); + appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &bodyBuilder); + return; + } - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &waitInFindBeforeMakingBatch, opCtx, "waitInFindBeforeMakingBatch"); - - const QueryRequest& originalQR = exec->getCanonicalQuery()->getQueryRequest(); - - // Stream query results, adding them to a BSONArray as we go. - CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &result); - BSONObj obj; - PlanExecutor::ExecState state = PlanExecutor::ADVANCED; - long long numResults = 0; - while (!FindCommon::enoughForFirstBatch(originalQR, numResults) && - PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) { - // If we can't fit this result inside the current batch, then we stash it for later. - if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) { - exec->enqueue(obj); - break; + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitInFindBeforeMakingBatch, opCtx, "waitInFindBeforeMakingBatch"); + + const QueryRequest& originalQR = exec->getCanonicalQuery()->getQueryRequest(); + + // Stream query results, adding them to a BSONArray as we go. + auto bodyBuilder = result->getBodyBuilder(); + CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &bodyBuilder); + BSONObj obj; + PlanExecutor::ExecState state = PlanExecutor::ADVANCED; + long long numResults = 0; + while (!FindCommon::enoughForFirstBatch(originalQR, numResults) && + PlanExecutor::ADVANCED == (state = exec->getNext(&obj, nullptr))) { + // If we can't fit this result inside the current batch, then we stash it for later. + if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) { + exec->enqueue(obj); + break; + } + + // Add result to output buffer. + firstBatch.append(obj); + numResults++; } - // Add result to output buffer. - firstBatch.append(obj); - numResults++; - } + // Throw an assertion if query execution fails for any reason. + if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { + firstBatch.abandon(); + error() << "Plan executor error during find command: " + << PlanExecutor::statestr(state) + << ", stats: " << redact(Explain::getWinningPlanStats(exec.get())); - // Throw an assertion if query execution fails for any reason. - if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { - firstBatch.abandon(); - error() << "Plan executor error during find command: " << PlanExecutor::statestr(state) - << ", stats: " << redact(Explain::getWinningPlanStats(exec.get())); + uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(obj).withContext( + "Executor error during find command")); + } - uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(obj).withContext( - "Executor error during find command")); - } + // Before saving the cursor, ensure that whatever plan we established happened with the + // expected collection version + auto css = CollectionShardingState::get(opCtx, nss); + css->checkShardVersionOrThrow(opCtx); + + // Set up the cursor for getMore. + CursorId cursorId = 0; + if (shouldSaveCursor(opCtx, collection, state, exec.get())) { + // Create a ClientCursor containing this plan executor and register it with the + // cursor manager. + ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( + opCtx, + {std::move(exec), + nss, + AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), + repl::ReadConcernArgs::get(opCtx).getLevel(), + _request.body}); + cursorId = pinnedCursor.getCursor()->cursorid(); + + invariant(!exec); + PlanExecutor* cursorExec = pinnedCursor.getCursor()->getExecutor(); + + // State will be restored on getMore. + cursorExec->saveState(); + cursorExec->detachFromOperationContext(); + + // We assume that cursors created through a DBDirectClient are always used from + // their original OperationContext, so we do not need to move time to and from the + // cursor. + if (!opCtx->getClient()->isInDirectClient()) { + pinnedCursor.getCursor()->setLeftoverMaxTimeMicros( + opCtx->getRemainingMaxTimeMicros()); + } + pinnedCursor.getCursor()->setPos(numResults); - // Before saving the cursor, ensure that whatever plan we established happened with the - // expected collection version - auto css = CollectionShardingState::get(opCtx, nss); - css->checkShardVersionOrThrow(opCtx); - - // Set up the cursor for getMore. - CursorId cursorId = 0; - if (shouldSaveCursor(opCtx, collection, state, exec.get())) { - // Create a ClientCursor containing this plan executor and register it with the cursor - // manager. - ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor( - opCtx, - {std::move(exec), - nss, - AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), - repl::ReadConcernArgs::get(opCtx).getLevel(), - cmdObj}); - cursorId = pinnedCursor.getCursor()->cursorid(); - - invariant(!exec); - PlanExecutor* cursorExec = pinnedCursor.getCursor()->getExecutor(); - - // State will be restored on getMore. - cursorExec->saveState(); - cursorExec->detachFromOperationContext(); - - // We assume that cursors created through a DBDirectClient are always used from their - // original OperationContext, so we do not need to move time to and from the cursor. - if (!opCtx->getClient()->isInDirectClient()) { - pinnedCursor.getCursor()->setLeftoverMaxTimeMicros( - opCtx->getRemainingMaxTimeMicros()); + // Fill out curop based on the results. + endQueryOp(opCtx, collection, *cursorExec, numResults, cursorId); + } else { + endQueryOp(opCtx, collection, *exec, numResults, cursorId); } - pinnedCursor.getCursor()->setPos(numResults); - // Fill out curop based on the results. - endQueryOp(opCtx, collection, *cursorExec, numResults, cursorId); - } else { - endQueryOp(opCtx, collection, *exec, numResults, cursorId); + // Generate the response object to send to the client. + firstBatch.done(cursorId, nss.ns()); } - // Generate the response object to send to the client. - firstBatch.done(cursorId, nss.ns()); - return true; - } + private: + const OpMsgRequest& _request; + const StringData _dbName; + }; } findCmd; diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp index c5fbc7e684f..b0dba6a4b2d 100644 --- a/src/mongo/s/commands/cluster_find_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_cmd.cpp @@ -57,17 +57,17 @@ const char kTermField[] = "term"; /** * Implements the find command on mongos. */ -class ClusterFindCmd : public BasicCommand { - MONGO_DISALLOW_COPYING(ClusterFindCmd); - +class ClusterFindCmd final : public Command { public: - ClusterFindCmd() : BasicCommand("find") {} + ClusterFindCmd() : Command("find") {} - virtual bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; + std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx, + const OpMsgRequest& opMsgRequest) override { + // TODO: Parse into a QueryRequest here. + return std::make_unique<Invocation>(this, opMsgRequest, opMsgRequest.getDatabase()); } - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override { return AllowedOnSecondary::kOptIn; } @@ -79,12 +79,6 @@ public: return false; } - bool supportsReadConcern(const std::string& dbName, - const BSONObj& cmdObj, - repl::ReadConcernLevel level) const final { - return true; - } - bool shouldAffectCommandCounter() const final { return false; } @@ -93,152 +87,153 @@ public: return "query for documents"; } - /** - * In order to run the find command, you must be authorized for the "find" action - * type on the collection. - */ - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) const final { - const NamespaceString nss(parseNs(dbname, cmdObj)); - auto hasTerm = cmdObj.hasField(kTermField); - return AuthorizationSession::get(client)->checkAuthForFind(nss, hasTerm); - } + class Invocation final : public CommandInvocation { + public: + Invocation(const ClusterFindCmd* definition, const OpMsgRequest& request, StringData dbName) + : CommandInvocation(definition), _request(request), _dbName(dbName) {} - Status explain(OperationContext* opCtx, - const OpMsgRequest& request, - ExplainOptions::Verbosity verbosity, - BSONObjBuilder* out) const final { - std::string dbname = request.getDatabase().toString(); - const BSONObj& cmdObj = request.body; - const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, cmdObj)); - // Parse the command BSON to a QueryRequest. - bool isExplain = true; - auto swQr = QueryRequest::makeFromFindCommand(std::move(nss), cmdObj, isExplain); - if (!swQr.isOK()) { - return swQr.getStatus(); + private: + bool supportsWriteConcern() const override { + return false; } - auto& qr = *swQr.getValue(); - - try { - const auto explainCmd = ClusterExplain::wrapAsExplain(cmdObj, verbosity); - - long long millisElapsed; - std::vector<AsyncRequestsSender::Response> shardResponses; - - // We will time how long it takes to run the commands on the shards. - Timer timer; - const auto routingInfo = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, qr.nss())); - shardResponses = - scatterGatherVersionedTargetByRoutingTable(opCtx, - qr.nss().db(), - qr.nss(), - routingInfo, - explainCmd, - ReadPreferenceSetting::get(opCtx), - Shard::RetryPolicy::kIdempotent, - qr.getFilter(), - qr.getCollation()); - millisElapsed = timer.millis(); - - const char* mongosStageName = - ClusterExplain::getStageNameForReadOp(shardResponses.size(), cmdObj); - - uassertStatusOK(ClusterExplain::buildExplainResult( - opCtx, - ClusterExplain::downconvert(opCtx, shardResponses), - mongosStageName, - millisElapsed, - out)); - - return Status::OK(); - } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { - out->resetToEmpty(); - - auto aggCmdOnView = qr.asAggregationCommand(); - if (!aggCmdOnView.isOK()) { - return aggCmdOnView.getStatus(); - } - auto aggRequestOnView = - AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue(), verbosity); - if (!aggRequestOnView.isOK()) { - return aggRequestOnView.getStatus(); - } + bool supportsReadConcern(repl::ReadConcernLevel level) const final { + return true; + } - auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView.getValue()); - auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); + NamespaceString ns() const override { + // TODO get the ns from the parsed QueryRequest. + return NamespaceString(CommandHelpers::parseNsFromCommand(_dbName, _request.body)); + } - ClusterAggregate::Namespaces nsStruct; - nsStruct.requestedNss = std::move(nss); - nsStruct.executionNss = std::move(ex->getNamespace()); + /** + * In order to run the find command, you must be authorized for the "find" action + * type on the collection. + */ + void doCheckAuthorization(OperationContext* opCtx) const final { + auto hasTerm = _request.body.hasField(kTermField); + uassertStatusOK( + AuthorizationSession::get(opCtx->getClient())->checkAuthForFind(ns(), hasTerm)); + } - auto status = ClusterAggregate::runAggregate( - opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out); - uassertStatusOK(status); - return status; + void explain(OperationContext* opCtx, + ExplainOptions::Verbosity verbosity, + BSONObjBuilder* result) override { + // Parse the command BSON to a QueryRequest. + bool isExplain = true; + auto qr = + uassertStatusOK(QueryRequest::makeFromFindCommand(ns(), _request.body, isExplain)); + + try { + const auto explainCmd = ClusterExplain::wrapAsExplain(_request.body, verbosity); + + long long millisElapsed; + std::vector<AsyncRequestsSender::Response> shardResponses; + + // We will time how long it takes to run the commands on the shards. + Timer timer; + const auto routingInfo = uassertStatusOK( + Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, qr->nss())); + shardResponses = + scatterGatherVersionedTargetByRoutingTable(opCtx, + qr->nss().db(), + qr->nss(), + routingInfo, + explainCmd, + ReadPreferenceSetting::get(opCtx), + Shard::RetryPolicy::kIdempotent, + qr->getFilter(), + qr->getCollation()); + millisElapsed = timer.millis(); + + const char* mongosStageName = + ClusterExplain::getStageNameForReadOp(shardResponses.size(), _request.body); + + uassertStatusOK(ClusterExplain::buildExplainResult( + opCtx, + ClusterExplain::downconvert(opCtx, shardResponses), + mongosStageName, + millisElapsed, + result)); + + } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { + result->resetToEmpty(); + + auto aggCmdOnView = uassertStatusOK(qr->asAggregationCommand()); + + auto aggRequestOnView = uassertStatusOK( + AggregationRequest::parseFromBSON(ns(), aggCmdOnView, verbosity)); + + auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView); + auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); + + ClusterAggregate::Namespaces nsStruct; + nsStruct.requestedNss = ns(); + nsStruct.executionNss = std::move(ex->getNamespace()); + + uassertStatusOK(ClusterAggregate::runAggregate( + opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result)); + } } - } - bool run(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj, - BSONObjBuilder& result) final { - // We count find command as a query op. - globalOpCounters.gotQuery(); - - const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, cmdObj)); - - const bool isExplain = false; - auto qr = QueryRequest::makeFromFindCommand(nss, cmdObj, isExplain); - uassertStatusOK(qr.getStatus()); - - const boost::intrusive_ptr<ExpressionContext> expCtx; - auto cq = CanonicalQuery::canonicalize(opCtx, - std::move(qr.getValue()), - expCtx, - ExtensionsCallbackNoop(), - MatchExpressionParser::kAllowAllSpecialFeatures); - uassertStatusOK(cq.getStatus()); - - try { - // Do the work to generate the first batch of results. This blocks waiting to get - // responses from the shard(s). - std::vector<BSONObj> batch; - auto cursorId = ClusterFind::runQuery( - opCtx, *cq.getValue(), ReadPreferenceSetting::get(opCtx), &batch); - // Build the response document. - CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result); - for (const auto& obj : batch) { - firstBatch.append(obj); + void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) { + // We count find command as a query op. + globalOpCounters.gotQuery(); + + const bool isExplain = false; + auto qr = + uassertStatusOK(QueryRequest::makeFromFindCommand(ns(), _request.body, isExplain)); + + const boost::intrusive_ptr<ExpressionContext> expCtx; + auto cq = uassertStatusOK( + CanonicalQuery::canonicalize(opCtx, + std::move(qr), + expCtx, + ExtensionsCallbackNoop(), + MatchExpressionParser::kAllowAllSpecialFeatures)); + + try { + // Do the work to generate the first batch of results. This blocks waiting to get + // responses from the shard(s). + std::vector<BSONObj> batch; + auto cursorId = + ClusterFind::runQuery(opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch); + auto bodyBuilder = result->getBodyBuilder(); + // Build the response document. + CursorResponseBuilder firstBatch(/*firstBatch*/ true, &bodyBuilder); + for (const auto& obj : batch) { + firstBatch.append(obj); + } + firstBatch.done(cursorId, cq->ns()); + } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { + result->reset(); + + auto aggCmdOnView = uassertStatusOK(cq->getQueryRequest().asAggregationCommand()); + + auto aggRequestOnView = + uassertStatusOK(AggregationRequest::parseFromBSON(ns(), aggCmdOnView)); + + auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView); + auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); + + // We pass both the underlying collection namespace and the view namespace here. The + // underlying collection namespace is used to execute the aggregation on mongoD. Any + // cursor returned will be registered under the view namespace so that subsequent + // getMore and killCursors calls against the view have access. + ClusterAggregate::Namespaces nsStruct; + nsStruct.requestedNss = ns(); + nsStruct.executionNss = std::move(ex->getNamespace()); + + auto bodyBuilder = result->getBodyBuilder(); + uassertStatusOK(ClusterAggregate::runAggregate( + opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder)); } - firstBatch.done(cursorId, nss.ns()); - return true; - } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) { - auto aggCmdOnView = cq.getValue()->getQueryRequest().asAggregationCommand(); - uassertStatusOK(aggCmdOnView.getStatus()); - - auto aggRequestOnView = AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue()); - uassertStatusOK(aggRequestOnView.getStatus()); - - auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView.getValue()); - auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); - - // We pass both the underlying collection namespace and the view namespace here. The - // underlying collection namespace is used to execute the aggregation on mongoD. Any - // cursor returned will be registered under the view namespace so that subsequent - // getMore and killCursors calls against the view have access. - ClusterAggregate::Namespaces nsStruct; - nsStruct.requestedNss = std::move(nss); - nsStruct.executionNss = std::move(ex->getNamespace()); - - auto status = ClusterAggregate::runAggregate( - opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &result); - uassertStatusOK(status); - return true; } - } + + private: + const OpMsgRequest& _request; + const StringData _dbName; + }; } cmdFindCluster; |