summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGregory Noma <gregory.noma@gmail.com>2018-07-05 18:17:17 -0400
committerGregory Noma <gregory.noma@gmail.com>2018-07-10 09:56:55 -0400
commit1868c56cf7466793b1bbee553d9325d5d68f5ba2 (patch)
tree88684b8d5cd558bd6d3bc025a6cb36b827abd584 /src
parente86d684515abe1c4dbf79dbb71741b5db0317039 (diff)
downloadmongo-1868c56cf7466793b1bbee553d9325d5d68f5ba2.tar.gz
SERVER-35910 Upgrade FindCmd and ClusterFindCmd to use TypedCommand
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/find_cmd.cpp540
-rw-r--r--src/mongo/s/commands/cluster_find_cmd.cpp293
2 files changed, 411 insertions, 422 deletions
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 617cfdca8e0..e16f2473092 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -61,15 +61,17 @@ const auto kTermField = "term"_sd;
/**
* A command for running .find() queries.
*/
-class FindCmd : public BasicCommand {
+class FindCmd final : public Command {
public:
- FindCmd() : BasicCommand("find") {}
+ FindCmd() : Command("find") {}
- bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
+ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest) override {
+ // TODO: Parse into a QueryRequest here.
+ return std::make_unique<Invocation>(this, opMsgRequest, opMsgRequest.getDatabase());
}
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override {
return AllowedOnSecondary::kOptIn;
}
@@ -81,12 +83,6 @@ public:
return false;
}
- bool supportsReadConcern(const std::string& dbName,
- const BSONObj& cmdObj,
- repl::ReadConcernLevel level) const final {
- return true;
- }
-
std::string help() const override {
return "query for documents";
}
@@ -111,299 +107,297 @@ public:
return false;
}
- Status checkAuthForOperation(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj) const override {
- AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient());
+ class Invocation final : public CommandInvocation {
+ public:
+ Invocation(const FindCmd* definition, const OpMsgRequest& request, StringData dbName)
+ : CommandInvocation(definition), _request(request), _dbName(dbName) {}
- if (!authSession->isAuthorizedToParseNamespaceElement(cmdObj.firstElement())) {
- return Status(ErrorCodes::Unauthorized, "Unauthorized");
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
}
- const auto hasTerm = cmdObj.hasField(kTermField);
- return authSession->checkAuthForFind(
- AutoGetCollection::resolveNamespaceStringOrUUID(
- opCtx, CommandHelpers::parseNsOrUUID(dbname, cmdObj)),
- hasTerm);
- }
+ bool supportsReadConcern(repl::ReadConcernLevel level) const final {
+ return true;
+ }
- Status explain(OperationContext* opCtx,
- const OpMsgRequest& request,
- ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const override {
- std::string dbname = request.getDatabase().toString();
- const BSONObj& cmdObj = request.body;
- // Acquire locks and resolve possible UUID. The RAII object is optional, because in the case
- // of a view, the locks need to be released.
- boost::optional<AutoGetCollectionForReadCommand> ctx;
- ctx.emplace(opCtx,
- CommandHelpers::parseNsOrUUID(dbname, cmdObj),
- AutoGetCollection::ViewMode::kViewsPermitted);
- const auto nss = ctx->getNss();
-
- // Parse the command BSON to a QueryRequest.
- const bool isExplain = true;
- auto qrStatus = QueryRequest::makeFromFindCommand(nss, cmdObj, isExplain);
- if (!qrStatus.isOK()) {
- return qrStatus.getStatus();
+ NamespaceString ns() const override {
+ // TODO get the ns from the parsed QueryRequest.
+ return NamespaceString(CommandHelpers::parseNsFromCommand(_dbName, _request.body));
}
- // Finish the parsing step by using the QueryRequest to create a CanonicalQuery.
- const ExtensionsCallbackReal extensionsCallback(opCtx, &nss);
- const boost::intrusive_ptr<ExpressionContext> expCtx;
- auto statusWithCQ =
- CanonicalQuery::canonicalize(opCtx,
- std::move(qrStatus.getValue()),
- expCtx,
- extensionsCallback,
- MatchExpressionParser::kAllowAllSpecialFeatures);
- if (!statusWithCQ.isOK()) {
- return statusWithCQ.getStatus();
+ void doCheckAuthorization(OperationContext* opCtx) const final {
+ AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient());
+
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ authSession->isAuthorizedToParseNamespaceElement(_request.body.firstElement()));
+
+ const auto hasTerm = _request.body.hasField(kTermField);
+ uassertStatusOK(authSession->checkAuthForFind(
+ AutoGetCollection::resolveNamespaceStringOrUUID(
+ opCtx, CommandHelpers::parseNsOrUUID(_dbName, _request.body)),
+ hasTerm));
}
- std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
-
- if (ctx->getView()) {
- // Relinquish locks. The aggregation command will re-acquire them.
- ctx.reset();
-
- // Convert the find command into an aggregation using $match (and other stages, as
- // necessary), if possible.
- const auto& qr = cq->getQueryRequest();
- auto viewAggregationCommand = qr.asAggregationCommand();
- if (!viewAggregationCommand.isOK())
- return viewAggregationCommand.getStatus();
-
- // Create the agg request equivalent of the find operation, with the explain verbosity
- // included.
- auto aggRequest = AggregationRequest::parseFromBSON(
- nss, viewAggregationCommand.getValue(), verbosity);
- if (!aggRequest.isOK()) {
- return aggRequest.getStatus();
- }
- try {
- return runAggregate(
- opCtx, nss, aggRequest.getValue(), viewAggregationCommand.getValue(), *out);
- } catch (DBException& error) {
- if (error.code() == ErrorCodes::InvalidPipelineOperator) {
- return {ErrorCodes::InvalidPipelineOperator,
- str::stream() << "Unsupported in view pipeline: " << error.what()};
+ void explain(OperationContext* opCtx,
+ ExplainOptions::Verbosity verbosity,
+ BSONObjBuilder* result) override {
+ // Acquire locks and resolve possible UUID. The RAII object is optional, because in the
+ // case of a view, the locks need to be released.
+ boost::optional<AutoGetCollectionForReadCommand> ctx;
+ ctx.emplace(opCtx,
+ CommandHelpers::parseNsOrUUID(_dbName, _request.body),
+ AutoGetCollection::ViewMode::kViewsPermitted);
+ const auto nss = ctx->getNss();
+
+ // Parse the command BSON to a QueryRequest.
+ const bool isExplain = true;
+ auto qr =
+ uassertStatusOK(QueryRequest::makeFromFindCommand(nss, _request.body, isExplain));
+
+ // Finish the parsing step by using the QueryRequest to create a CanonicalQuery.
+ const ExtensionsCallbackReal extensionsCallback(opCtx, &nss);
+ const boost::intrusive_ptr<ExpressionContext> expCtx;
+ auto cq = uassertStatusOK(
+ CanonicalQuery::canonicalize(opCtx,
+ std::move(qr),
+ expCtx,
+ extensionsCallback,
+ MatchExpressionParser::kAllowAllSpecialFeatures));
+
+ if (ctx->getView()) {
+ // Relinquish locks. The aggregation command will re-acquire them.
+ ctx.reset();
+
+ // Convert the find command into an aggregation using $match (and other stages, as
+ // necessary), if possible.
+ const auto& qr = cq->getQueryRequest();
+ auto viewAggregationCommand = uassertStatusOK(qr.asAggregationCommand());
+
+ // Create the agg request equivalent of the find operation, with the explain
+ // verbosity included.
+ auto aggRequest = uassertStatusOK(
+ AggregationRequest::parseFromBSON(nss, viewAggregationCommand, verbosity));
+
+ try {
+ uassertStatusOK(
+ runAggregate(opCtx, nss, aggRequest, viewAggregationCommand, *result));
+ } catch (DBException& error) {
+ if (error.code() == ErrorCodes::InvalidPipelineOperator) {
+ uasserted(ErrorCodes::InvalidPipelineOperator,
+ str::stream() << "Unsupported in view pipeline: "
+ << error.what());
+ }
+ throw;
}
- return error.toStatus();
+ return;
}
- }
- // The collection may be NULL. If so, getExecutor() should handle it by returning an
- // execution tree with an EOFStage.
- Collection* const collection = ctx->getCollection();
+ // The collection may be NULL. If so, getExecutor() should handle it by returning an
+ // execution tree with an EOFStage.
+ Collection* const collection = ctx->getCollection();
- // We have a parsed query. Time to get the execution plan for it.
- auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
- if (!statusWithPlanExecutor.isOK()) {
- return statusWithPlanExecutor.getStatus();
- }
- auto exec = std::move(statusWithPlanExecutor.getValue());
+ // We have a parsed query. Time to get the execution plan for it.
+ auto exec = uassertStatusOK(getExecutorFind(opCtx, collection, nss, std::move(cq)));
- // Got the execution tree. Explain it.
- Explain::explainStages(exec.get(), collection, verbosity, out);
- return Status::OK();
- }
-
- /**
- * Runs a query using the following steps:
- * --Parsing.
- * --Acquire locks.
- * --Plan query, obtaining an executor that can run it.
- * --Generate the first batch.
- * --Save state for getMore, transferring ownership of the executor to a ClientCursor.
- * --Generate response to send to the client.
- */
- bool run(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
- // Although it is a command, a find command gets counted as a query.
- globalOpCounters.gotQuery();
-
- // Parse the command BSON to a QueryRequest.
- const bool isExplain = false;
- // Pass parseNs to makeFromFindCommand in case cmdObj does not have a UUID.
- auto qrStatus = QueryRequest::makeFromFindCommand(
- NamespaceString(parseNs(dbname, cmdObj)), cmdObj, isExplain);
- uassertStatusOK(qrStatus.getStatus());
-
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- auto& qr = qrStatus.getValue();
- const auto session = OperationContextSession::get(opCtx);
- uassert(ErrorCodes::InvalidOptions,
- "It is illegal to open a tailable cursor in a transaction",
- session == nullptr || !(session->inMultiDocumentTransaction() && qr->isTailable()));
-
- // Validate term before acquiring locks, if provided.
- if (auto term = qr->getReplicationTerm()) {
- Status status = replCoord->updateTerm(opCtx, *term);
- // Note: updateTerm returns ok if term stayed the same.
- uassertStatusOK(status);
+ // Got the execution tree. Explain it.
+ Explain::explainStages(exec.get(), collection, verbosity, result);
}
- // Acquire locks. If the query is on a view, we release our locks and convert the query
- // request into an aggregation command.
- boost::optional<AutoGetCollectionForReadCommand> ctx;
- ctx.emplace(opCtx,
- CommandHelpers::parseNsOrUUID(dbname, cmdObj),
- AutoGetCollection::ViewMode::kViewsPermitted);
- const auto& nss = ctx->getNss();
-
- qr->refreshNSS(opCtx);
-
- // Check whether we are allowed to read from this node after acquiring our locks.
- uassertStatusOK(replCoord->checkCanServeReadsFor(
- opCtx, nss, ReadPreferenceSetting::get(opCtx).canRunOnSecondary()));
-
- // Fill out curop information.
- //
- // We pass negative values for 'ntoreturn' and 'ntoskip' to indicate that these values
- // should be omitted from the log line. Limit and skip information is already present in the
- // find command parameters, so these fields are redundant.
- const int ntoreturn = -1;
- const int ntoskip = -1;
- beginQueryOp(opCtx, nss, cmdObj, ntoreturn, ntoskip);
-
- // Finish the parsing step by using the QueryRequest to create a CanonicalQuery.
- const ExtensionsCallbackReal extensionsCallback(opCtx, &nss);
- const boost::intrusive_ptr<ExpressionContext> expCtx;
- auto statusWithCQ =
- CanonicalQuery::canonicalize(opCtx,
- std::move(qr),
- expCtx,
- extensionsCallback,
- MatchExpressionParser::kAllowAllSpecialFeatures);
- uassertStatusOK(statusWithCQ.getStatus());
- std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
-
- if (ctx->getView()) {
- // Relinquish locks. The aggregation command will re-acquire them.
- ctx.reset();
-
- // Convert the find command into an aggregation using $match (and other stages, as
- // necessary), if possible.
- const auto& qr = cq->getQueryRequest();
- auto viewAggregationCommand = qr.asAggregationCommand();
- uassertStatusOK(viewAggregationCommand.getStatus());
-
- BSONObj aggResult = CommandHelpers::runCommandDirectly(
- opCtx,
- OpMsgRequest::fromDBAndBody(dbname, std::move(viewAggregationCommand.getValue())));
- auto status = getStatusFromCommandResult(aggResult);
- if (status.code() == ErrorCodes::InvalidPipelineOperator) {
- uasserted(ErrorCodes::InvalidPipelineOperator,
- str::stream() << "Unsupported in view pipeline: " << status.reason());
+ /**
+ * Runs a query using the following steps:
+ * --Parsing.
+ * --Acquire locks.
+ * --Plan query, obtaining an executor that can run it.
+ * --Generate the first batch.
+ * --Save state for getMore, transferring ownership of the executor to a ClientCursor.
+ * --Generate response to send to the client.
+ */
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) {
+ // Although it is a command, a find command gets counted as a query.
+ globalOpCounters.gotQuery();
+
+ // Parse the command BSON to a QueryRequest.
+ const bool isExplain = false;
+ // Pass parseNs to makeFromFindCommand in case _request.body does not have a UUID.
+ auto qr = uassertStatusOK(QueryRequest::makeFromFindCommand(
+ NamespaceString(CommandHelpers::parseNsFromCommand(_dbName, _request.body)),
+ _request.body,
+ isExplain));
+
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ const auto session = OperationContextSession::get(opCtx);
+ uassert(ErrorCodes::InvalidOptions,
+ "It is illegal to open a tailable cursor in a transaction",
+ session == nullptr ||
+ !(session->inMultiDocumentTransaction() && qr->isTailable()));
+
+ // Validate term before acquiring locks, if provided.
+ if (auto term = qr->getReplicationTerm()) {
+ // Note: updateTerm returns ok if term stayed the same.
+ uassertStatusOK(replCoord->updateTerm(opCtx, *term));
}
- result.resetToEmpty();
- result.appendElements(aggResult);
- return status.isOK();
- }
- Collection* const collection = ctx->getCollection();
+ // Acquire locks. If the query is on a view, we release our locks and convert the query
+ // request into an aggregation command.
+ boost::optional<AutoGetCollectionForReadCommand> ctx;
+ ctx.emplace(opCtx,
+ CommandHelpers::parseNsOrUUID(_dbName, _request.body),
+ AutoGetCollection::ViewMode::kViewsPermitted);
+ const auto& nss = ctx->getNss();
+
+ qr->refreshNSS(opCtx);
+
+ // Check whether we are allowed to read from this node after acquiring our locks.
+ uassertStatusOK(replCoord->checkCanServeReadsFor(
+ opCtx, nss, ReadPreferenceSetting::get(opCtx).canRunOnSecondary()));
+
+ // Fill out curop information.
+ //
+ // We pass negative values for 'ntoreturn' and 'ntoskip' to indicate that these values
+ // should be omitted from the log line. Limit and skip information is already present in
+ // the find command parameters, so these fields are redundant.
+ const int ntoreturn = -1;
+ const int ntoskip = -1;
+ beginQueryOp(opCtx, nss, _request.body, ntoreturn, ntoskip);
+
+ // Finish the parsing step by using the QueryRequest to create a CanonicalQuery.
+ const ExtensionsCallbackReal extensionsCallback(opCtx, &nss);
+ const boost::intrusive_ptr<ExpressionContext> expCtx;
+ auto cq = uassertStatusOK(
+ CanonicalQuery::canonicalize(opCtx,
+ std::move(qr),
+ expCtx,
+ extensionsCallback,
+ MatchExpressionParser::kAllowAllSpecialFeatures));
+
+ if (ctx->getView()) {
+ // Relinquish locks. The aggregation command will re-acquire them.
+ ctx.reset();
+
+ // Convert the find command into an aggregation using $match (and other stages, as
+ // necessary), if possible.
+ const auto& qr = cq->getQueryRequest();
+ auto viewAggregationCommand = uassertStatusOK(qr.asAggregationCommand());
+
+ BSONObj aggResult = CommandHelpers::runCommandDirectly(
+ opCtx, OpMsgRequest::fromDBAndBody(_dbName, std::move(viewAggregationCommand)));
+ auto status = getStatusFromCommandResult(aggResult);
+ if (status.code() == ErrorCodes::InvalidPipelineOperator) {
+ uasserted(ErrorCodes::InvalidPipelineOperator,
+ str::stream() << "Unsupported in view pipeline: " << status.reason());
+ }
+ uassertStatusOK(status);
+ result->getBodyBuilder().appendElements(aggResult);
+ return;
+ }
- // Get the execution plan for the query.
- auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
- uassertStatusOK(statusWithPlanExecutor.getStatus());
+ Collection* const collection = ctx->getCollection();
- auto exec = std::move(statusWithPlanExecutor.getValue());
+ // Get the execution plan for the query.
+ auto exec = uassertStatusOK(getExecutorFind(opCtx, collection, nss, std::move(cq)));
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
- }
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setPlanSummary_inlock(Explain::getPlanSummary(exec.get()));
+ }
- if (!collection) {
- // No collection. Just fill out curop indicating that there were zero results and
- // there is no ClientCursor id, and then return.
- const long long numResults = 0;
- const CursorId cursorId = 0;
- endQueryOp(opCtx, collection, *exec, numResults, cursorId);
- appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &result);
- return true;
- }
+ if (!collection) {
+ // No collection. Just fill out curop indicating that there were zero results and
+ // there is no ClientCursor id, and then return.
+ const long long numResults = 0;
+ const CursorId cursorId = 0;
+ endQueryOp(opCtx, collection, *exec, numResults, cursorId);
+ auto bodyBuilder = result->getBodyBuilder();
+ appendCursorResponseObject(cursorId, nss.ns(), BSONArray(), &bodyBuilder);
+ return;
+ }
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &waitInFindBeforeMakingBatch, opCtx, "waitInFindBeforeMakingBatch");
-
- const QueryRequest& originalQR = exec->getCanonicalQuery()->getQueryRequest();
-
- // Stream query results, adding them to a BSONArray as we go.
- CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &result);
- BSONObj obj;
- PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
- long long numResults = 0;
- while (!FindCommon::enoughForFirstBatch(originalQR, numResults) &&
- PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
- // If we can't fit this result inside the current batch, then we stash it for later.
- if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) {
- exec->enqueue(obj);
- break;
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitInFindBeforeMakingBatch, opCtx, "waitInFindBeforeMakingBatch");
+
+ const QueryRequest& originalQR = exec->getCanonicalQuery()->getQueryRequest();
+
+ // Stream query results, adding them to a BSONArray as we go.
+ auto bodyBuilder = result->getBodyBuilder();
+ CursorResponseBuilder firstBatch(/*isInitialResponse*/ true, &bodyBuilder);
+ BSONObj obj;
+ PlanExecutor::ExecState state = PlanExecutor::ADVANCED;
+ long long numResults = 0;
+ while (!FindCommon::enoughForFirstBatch(originalQR, numResults) &&
+ PlanExecutor::ADVANCED == (state = exec->getNext(&obj, nullptr))) {
+ // If we can't fit this result inside the current batch, then we stash it for later.
+ if (!FindCommon::haveSpaceForNext(obj, numResults, firstBatch.bytesUsed())) {
+ exec->enqueue(obj);
+ break;
+ }
+
+ // Add result to output buffer.
+ firstBatch.append(obj);
+ numResults++;
}
- // Add result to output buffer.
- firstBatch.append(obj);
- numResults++;
- }
+ // Throw an assertion if query execution fails for any reason.
+ if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
+ firstBatch.abandon();
+ error() << "Plan executor error during find command: "
+ << PlanExecutor::statestr(state)
+ << ", stats: " << redact(Explain::getWinningPlanStats(exec.get()));
- // Throw an assertion if query execution fails for any reason.
- if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
- firstBatch.abandon();
- error() << "Plan executor error during find command: " << PlanExecutor::statestr(state)
- << ", stats: " << redact(Explain::getWinningPlanStats(exec.get()));
+ uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(obj).withContext(
+ "Executor error during find command"));
+ }
- uassertStatusOK(WorkingSetCommon::getMemberObjectStatus(obj).withContext(
- "Executor error during find command"));
- }
+ // Before saving the cursor, ensure that whatever plan we established happened with the
+ // expected collection version
+ auto css = CollectionShardingState::get(opCtx, nss);
+ css->checkShardVersionOrThrow(opCtx);
+
+ // Set up the cursor for getMore.
+ CursorId cursorId = 0;
+ if (shouldSaveCursor(opCtx, collection, state, exec.get())) {
+ // Create a ClientCursor containing this plan executor and register it with the
+ // cursor manager.
+ ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor(
+ opCtx,
+ {std::move(exec),
+ nss,
+ AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
+ repl::ReadConcernArgs::get(opCtx).getLevel(),
+ _request.body});
+ cursorId = pinnedCursor.getCursor()->cursorid();
+
+ invariant(!exec);
+ PlanExecutor* cursorExec = pinnedCursor.getCursor()->getExecutor();
+
+ // State will be restored on getMore.
+ cursorExec->saveState();
+ cursorExec->detachFromOperationContext();
+
+ // We assume that cursors created through a DBDirectClient are always used from
+ // their original OperationContext, so we do not need to move time to and from the
+ // cursor.
+ if (!opCtx->getClient()->isInDirectClient()) {
+ pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(
+ opCtx->getRemainingMaxTimeMicros());
+ }
+ pinnedCursor.getCursor()->setPos(numResults);
- // Before saving the cursor, ensure that whatever plan we established happened with the
- // expected collection version
- auto css = CollectionShardingState::get(opCtx, nss);
- css->checkShardVersionOrThrow(opCtx);
-
- // Set up the cursor for getMore.
- CursorId cursorId = 0;
- if (shouldSaveCursor(opCtx, collection, state, exec.get())) {
- // Create a ClientCursor containing this plan executor and register it with the cursor
- // manager.
- ClientCursorPin pinnedCursor = collection->getCursorManager()->registerCursor(
- opCtx,
- {std::move(exec),
- nss,
- AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
- repl::ReadConcernArgs::get(opCtx).getLevel(),
- cmdObj});
- cursorId = pinnedCursor.getCursor()->cursorid();
-
- invariant(!exec);
- PlanExecutor* cursorExec = pinnedCursor.getCursor()->getExecutor();
-
- // State will be restored on getMore.
- cursorExec->saveState();
- cursorExec->detachFromOperationContext();
-
- // We assume that cursors created through a DBDirectClient are always used from their
- // original OperationContext, so we do not need to move time to and from the cursor.
- if (!opCtx->getClient()->isInDirectClient()) {
- pinnedCursor.getCursor()->setLeftoverMaxTimeMicros(
- opCtx->getRemainingMaxTimeMicros());
+ // Fill out curop based on the results.
+ endQueryOp(opCtx, collection, *cursorExec, numResults, cursorId);
+ } else {
+ endQueryOp(opCtx, collection, *exec, numResults, cursorId);
}
- pinnedCursor.getCursor()->setPos(numResults);
- // Fill out curop based on the results.
- endQueryOp(opCtx, collection, *cursorExec, numResults, cursorId);
- } else {
- endQueryOp(opCtx, collection, *exec, numResults, cursorId);
+ // Generate the response object to send to the client.
+ firstBatch.done(cursorId, nss.ns());
}
- // Generate the response object to send to the client.
- firstBatch.done(cursorId, nss.ns());
- return true;
- }
+ private:
+ const OpMsgRequest& _request;
+ const StringData _dbName;
+ };
} findCmd;
diff --git a/src/mongo/s/commands/cluster_find_cmd.cpp b/src/mongo/s/commands/cluster_find_cmd.cpp
index c5fbc7e684f..b0dba6a4b2d 100644
--- a/src/mongo/s/commands/cluster_find_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_cmd.cpp
@@ -57,17 +57,17 @@ const char kTermField[] = "term";
/**
* Implements the find command on mongos.
*/
-class ClusterFindCmd : public BasicCommand {
- MONGO_DISALLOW_COPYING(ClusterFindCmd);
-
+class ClusterFindCmd final : public Command {
public:
- ClusterFindCmd() : BasicCommand("find") {}
+ ClusterFindCmd() : Command("find") {}
- virtual bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
+ std::unique_ptr<CommandInvocation> parse(OperationContext* opCtx,
+ const OpMsgRequest& opMsgRequest) override {
+ // TODO: Parse into a QueryRequest here.
+ return std::make_unique<Invocation>(this, opMsgRequest, opMsgRequest.getDatabase());
}
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override {
return AllowedOnSecondary::kOptIn;
}
@@ -79,12 +79,6 @@ public:
return false;
}
- bool supportsReadConcern(const std::string& dbName,
- const BSONObj& cmdObj,
- repl::ReadConcernLevel level) const final {
- return true;
- }
-
bool shouldAffectCommandCounter() const final {
return false;
}
@@ -93,152 +87,153 @@ public:
return "query for documents";
}
- /**
- * In order to run the find command, you must be authorized for the "find" action
- * type on the collection.
- */
- Status checkAuthForCommand(Client* client,
- const std::string& dbname,
- const BSONObj& cmdObj) const final {
- const NamespaceString nss(parseNs(dbname, cmdObj));
- auto hasTerm = cmdObj.hasField(kTermField);
- return AuthorizationSession::get(client)->checkAuthForFind(nss, hasTerm);
- }
+ class Invocation final : public CommandInvocation {
+ public:
+ Invocation(const ClusterFindCmd* definition, const OpMsgRequest& request, StringData dbName)
+ : CommandInvocation(definition), _request(request), _dbName(dbName) {}
- Status explain(OperationContext* opCtx,
- const OpMsgRequest& request,
- ExplainOptions::Verbosity verbosity,
- BSONObjBuilder* out) const final {
- std::string dbname = request.getDatabase().toString();
- const BSONObj& cmdObj = request.body;
- const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, cmdObj));
- // Parse the command BSON to a QueryRequest.
- bool isExplain = true;
- auto swQr = QueryRequest::makeFromFindCommand(std::move(nss), cmdObj, isExplain);
- if (!swQr.isOK()) {
- return swQr.getStatus();
+ private:
+ bool supportsWriteConcern() const override {
+ return false;
}
- auto& qr = *swQr.getValue();
-
- try {
- const auto explainCmd = ClusterExplain::wrapAsExplain(cmdObj, verbosity);
-
- long long millisElapsed;
- std::vector<AsyncRequestsSender::Response> shardResponses;
-
- // We will time how long it takes to run the commands on the shards.
- Timer timer;
- const auto routingInfo = uassertStatusOK(
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, qr.nss()));
- shardResponses =
- scatterGatherVersionedTargetByRoutingTable(opCtx,
- qr.nss().db(),
- qr.nss(),
- routingInfo,
- explainCmd,
- ReadPreferenceSetting::get(opCtx),
- Shard::RetryPolicy::kIdempotent,
- qr.getFilter(),
- qr.getCollation());
- millisElapsed = timer.millis();
-
- const char* mongosStageName =
- ClusterExplain::getStageNameForReadOp(shardResponses.size(), cmdObj);
-
- uassertStatusOK(ClusterExplain::buildExplainResult(
- opCtx,
- ClusterExplain::downconvert(opCtx, shardResponses),
- mongosStageName,
- millisElapsed,
- out));
-
- return Status::OK();
- } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
- out->resetToEmpty();
-
- auto aggCmdOnView = qr.asAggregationCommand();
- if (!aggCmdOnView.isOK()) {
- return aggCmdOnView.getStatus();
- }
- auto aggRequestOnView =
- AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue(), verbosity);
- if (!aggRequestOnView.isOK()) {
- return aggRequestOnView.getStatus();
- }
+ bool supportsReadConcern(repl::ReadConcernLevel level) const final {
+ return true;
+ }
- auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView.getValue());
- auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
+ NamespaceString ns() const override {
+ // TODO get the ns from the parsed QueryRequest.
+ return NamespaceString(CommandHelpers::parseNsFromCommand(_dbName, _request.body));
+ }
- ClusterAggregate::Namespaces nsStruct;
- nsStruct.requestedNss = std::move(nss);
- nsStruct.executionNss = std::move(ex->getNamespace());
+ /**
+ * In order to run the find command, you must be authorized for the "find" action
+ * type on the collection.
+ */
+ void doCheckAuthorization(OperationContext* opCtx) const final {
+ auto hasTerm = _request.body.hasField(kTermField);
+ uassertStatusOK(
+ AuthorizationSession::get(opCtx->getClient())->checkAuthForFind(ns(), hasTerm));
+ }
- auto status = ClusterAggregate::runAggregate(
- opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, out);
- uassertStatusOK(status);
- return status;
+ void explain(OperationContext* opCtx,
+ ExplainOptions::Verbosity verbosity,
+ BSONObjBuilder* result) override {
+ // Parse the command BSON to a QueryRequest.
+ bool isExplain = true;
+ auto qr =
+ uassertStatusOK(QueryRequest::makeFromFindCommand(ns(), _request.body, isExplain));
+
+ try {
+ const auto explainCmd = ClusterExplain::wrapAsExplain(_request.body, verbosity);
+
+ long long millisElapsed;
+ std::vector<AsyncRequestsSender::Response> shardResponses;
+
+ // We will time how long it takes to run the commands on the shards.
+ Timer timer;
+ const auto routingInfo = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, qr->nss()));
+ shardResponses =
+ scatterGatherVersionedTargetByRoutingTable(opCtx,
+ qr->nss().db(),
+ qr->nss(),
+ routingInfo,
+ explainCmd,
+ ReadPreferenceSetting::get(opCtx),
+ Shard::RetryPolicy::kIdempotent,
+ qr->getFilter(),
+ qr->getCollation());
+ millisElapsed = timer.millis();
+
+ const char* mongosStageName =
+ ClusterExplain::getStageNameForReadOp(shardResponses.size(), _request.body);
+
+ uassertStatusOK(ClusterExplain::buildExplainResult(
+ opCtx,
+ ClusterExplain::downconvert(opCtx, shardResponses),
+ mongosStageName,
+ millisElapsed,
+ result));
+
+ } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
+ result->resetToEmpty();
+
+ auto aggCmdOnView = uassertStatusOK(qr->asAggregationCommand());
+
+ auto aggRequestOnView = uassertStatusOK(
+ AggregationRequest::parseFromBSON(ns(), aggCmdOnView, verbosity));
+
+ auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView);
+ auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
+
+ ClusterAggregate::Namespaces nsStruct;
+ nsStruct.requestedNss = ns();
+ nsStruct.executionNss = std::move(ex->getNamespace());
+
+ uassertStatusOK(ClusterAggregate::runAggregate(
+ opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, result));
+ }
}
- }
- bool run(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) final {
- // We count find command as a query op.
- globalOpCounters.gotQuery();
-
- const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, cmdObj));
-
- const bool isExplain = false;
- auto qr = QueryRequest::makeFromFindCommand(nss, cmdObj, isExplain);
- uassertStatusOK(qr.getStatus());
-
- const boost::intrusive_ptr<ExpressionContext> expCtx;
- auto cq = CanonicalQuery::canonicalize(opCtx,
- std::move(qr.getValue()),
- expCtx,
- ExtensionsCallbackNoop(),
- MatchExpressionParser::kAllowAllSpecialFeatures);
- uassertStatusOK(cq.getStatus());
-
- try {
- // Do the work to generate the first batch of results. This blocks waiting to get
- // responses from the shard(s).
- std::vector<BSONObj> batch;
- auto cursorId = ClusterFind::runQuery(
- opCtx, *cq.getValue(), ReadPreferenceSetting::get(opCtx), &batch);
- // Build the response document.
- CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result);
- for (const auto& obj : batch) {
- firstBatch.append(obj);
+ void run(OperationContext* opCtx, rpc::ReplyBuilderInterface* result) {
+ // We count find command as a query op.
+ globalOpCounters.gotQuery();
+
+ const bool isExplain = false;
+ auto qr =
+ uassertStatusOK(QueryRequest::makeFromFindCommand(ns(), _request.body, isExplain));
+
+ const boost::intrusive_ptr<ExpressionContext> expCtx;
+ auto cq = uassertStatusOK(
+ CanonicalQuery::canonicalize(opCtx,
+ std::move(qr),
+ expCtx,
+ ExtensionsCallbackNoop(),
+ MatchExpressionParser::kAllowAllSpecialFeatures));
+
+ try {
+ // Do the work to generate the first batch of results. This blocks waiting to get
+ // responses from the shard(s).
+ std::vector<BSONObj> batch;
+ auto cursorId =
+ ClusterFind::runQuery(opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch);
+ auto bodyBuilder = result->getBodyBuilder();
+ // Build the response document.
+ CursorResponseBuilder firstBatch(/*firstBatch*/ true, &bodyBuilder);
+ for (const auto& obj : batch) {
+ firstBatch.append(obj);
+ }
+ firstBatch.done(cursorId, cq->ns());
+ } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
+ result->reset();
+
+ auto aggCmdOnView = uassertStatusOK(cq->getQueryRequest().asAggregationCommand());
+
+ auto aggRequestOnView =
+ uassertStatusOK(AggregationRequest::parseFromBSON(ns(), aggCmdOnView));
+
+ auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView);
+ auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
+
+ // We pass both the underlying collection namespace and the view namespace here. The
+ // underlying collection namespace is used to execute the aggregation on mongoD. Any
+ // cursor returned will be registered under the view namespace so that subsequent
+ // getMore and killCursors calls against the view have access.
+ ClusterAggregate::Namespaces nsStruct;
+ nsStruct.requestedNss = ns();
+ nsStruct.executionNss = std::move(ex->getNamespace());
+
+ auto bodyBuilder = result->getBodyBuilder();
+ uassertStatusOK(ClusterAggregate::runAggregate(
+ opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &bodyBuilder));
}
- firstBatch.done(cursorId, nss.ns());
- return true;
- } catch (const ExceptionFor<ErrorCodes::CommandOnShardedViewNotSupportedOnMongod>& ex) {
- auto aggCmdOnView = cq.getValue()->getQueryRequest().asAggregationCommand();
- uassertStatusOK(aggCmdOnView.getStatus());
-
- auto aggRequestOnView = AggregationRequest::parseFromBSON(nss, aggCmdOnView.getValue());
- uassertStatusOK(aggRequestOnView.getStatus());
-
- auto resolvedAggRequest = ex->asExpandedViewAggregation(aggRequestOnView.getValue());
- auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
-
- // We pass both the underlying collection namespace and the view namespace here. The
- // underlying collection namespace is used to execute the aggregation on mongoD. Any
- // cursor returned will be registered under the view namespace so that subsequent
- // getMore and killCursors calls against the view have access.
- ClusterAggregate::Namespaces nsStruct;
- nsStruct.requestedNss = std::move(nss);
- nsStruct.executionNss = std::move(ex->getNamespace());
-
- auto status = ClusterAggregate::runAggregate(
- opCtx, nsStruct, resolvedAggRequest, resolvedAggCmd, &result);
- uassertStatusOK(status);
- return true;
}
- }
+
+ private:
+ const OpMsgRequest& _request;
+ const StringData _dbName;
+ };
} cmdFindCluster;