diff options
author | Mathias Stearn <mathias@10gen.com> | 2017-05-17 14:45:50 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2017-05-17 19:29:36 -0400 |
commit | 4ec8e7a515ceb3e87b64ddde624404d1c4498066 (patch) | |
tree | db55a4472b434f63743062357899cd071330cec0 | |
parent | 83c1cdd93fe082c15b252440b85101dfb7ff3c98 (diff) | |
download | mongo-4ec8e7a515ceb3e87b64ddde624404d1c4498066.tar.gz |
SERVER-29249 Remove mongod-specific run() method from Command
-rw-r--r-- | src/mongo/db/commands.h | 10 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/run_commands.cpp | 252 |
3 files changed, 133 insertions, 146 deletions
diff --git a/src/mongo/db/commands.h b/src/mongo/db/commands.h index d60e47a17ea..d0f4d2bba25 100644 --- a/src/mongo/db/commands.h +++ b/src/mongo/db/commands.h @@ -129,16 +129,6 @@ public: BSONObjBuilder& result) = 0; /** - * Translation point between the new request/response types and the legacy types. - * - * Then we won't need to mutate the command object. At that point we can also make - * this method virtual so commands can override it directly. - */ - bool run(OperationContext* opCtx, - const rpc::RequestInterface& request, - rpc::ReplyBuilderInterface* replyBuilder); - - /** * supportsWriteConcern returns true if this command should be parsed for a writeConcern * field and wait for that write concern to be satisfied after the command runs. * diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 3af7b265c08..d6205e93dd3 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -253,15 +253,14 @@ static WriteResult::SingleResult createIndex(OperationContext* opCtx, BSONObjBuilder cmdBuilder; cmdBuilder << "createIndexes" << ns.coll(); cmdBuilder << "indexes" << BSON_ARRAY(spec); - cmdBuilder << "$db" << systemIndexes.db(); - - OpMsg request; - request.body = cmdBuilder.done(); - rpc::OpMsgRequest cmdRequest(request); - rpc::OpMsgReplyBuilder cmdReplyBuilder; - Command::findCommand("createIndexes")->run(opCtx, cmdRequest, &cmdReplyBuilder); - auto cmdReplyMsg = cmdReplyBuilder.done(); - auto cmdResult = OpMsg::parse(cmdReplyMsg).body; + auto cmd = cmdBuilder.obj(); + + BSONObjBuilder cmdReplyBuilder; + std::string errMsg; + bool ok = Command::findCommand("createIndexes") + ->run(opCtx, systemIndexes.db().toString(), cmd, errMsg, cmdReplyBuilder); + Command::appendCommandStatus(cmdReplyBuilder, ok, errMsg); + auto cmdResult = cmdReplyBuilder.obj(); uassertStatusOK(getStatusFromCommandResult(cmdResult)); // Unlike normal inserts, it is not an error to "insert" a duplicate index. diff --git a/src/mongo/db/run_commands.cpp b/src/mongo/db/run_commands.cpp index 2fde2dbb661..6e2f0b26388 100644 --- a/src/mongo/db/run_commands.cpp +++ b/src/mongo/db/run_commands.cpp @@ -341,6 +341,130 @@ LogicalTime computeOperationTime(OperationContext* opCtx, return operationTime; } +bool runCommandImpl(OperationContext* opCtx, + Command* command, + const rpc::RequestInterface& request, + rpc::ReplyBuilderInterface* replyBuilder) { + auto bytesToReserve = command->reserveBytesForReply(); + +// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the +// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency +// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. +#ifdef _WIN32 + if (kDebugBuild) + bytesToReserve = 0; +#endif + + // run expects non-const bsonobj + BSONObj cmd = request.getCommandArgs(); + + // run expects const db std::string (can't bind to temporary) + const std::string db = request.getDatabase().toString(); + + BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); + auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern()); + + if (!readConcernArgsStatus.isOK()) { + auto result = + Command::appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus()); + inPlaceReplyBob.doneFast(); + replyBuilder->setMetadata(rpc::makeEmptyMetadata()); + return result; + } + + Status rcStatus = waitForReadConcern(opCtx, readConcernArgsStatus.getValue()); + if (!rcStatus.isOK()) { + if (rcStatus == ErrorCodes::ExceededTimeLimit) { + const int debugLevel = + serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2; + LOG(debugLevel) << "Command on database " << db + << " timed out waiting for read concern to be satisfied. Command: " + << redact(command->getRedactedCopyForLogging(request.getCommandArgs())); + } + + auto result = Command::appendCommandStatus(inPlaceReplyBob, rcStatus); + inPlaceReplyBob.doneFast(); + replyBuilder->setMetadata(rpc::makeEmptyMetadata()); + return result; + } + + std::string errmsg; + bool result; + auto startOperationTime = getClientOperationTime(opCtx); + if (!command->supportsWriteConcern(cmd)) { + if (commandSpecifiesWriteConcern(cmd)) { + auto result = Command::appendCommandStatus( + inPlaceReplyBob, + {ErrorCodes::InvalidOptions, "Command does not support writeConcern"}); + inPlaceReplyBob.doneFast(); + replyBuilder->setMetadata(rpc::makeEmptyMetadata()); + return result; + } + + // TODO: remove queryOptions parameter from command's run method. + result = command->run(opCtx, db, cmd, errmsg, inPlaceReplyBob); + } else { + auto wcResult = extractWriteConcern(opCtx, cmd, db); + if (!wcResult.isOK()) { + auto result = Command::appendCommandStatus(inPlaceReplyBob, wcResult.getStatus()); + inPlaceReplyBob.doneFast(); + replyBuilder->setMetadata(rpc::makeEmptyMetadata()); + return result; + } + + // Change the write concern while running the command. + const auto oldWC = opCtx->getWriteConcern(); + ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); + opCtx->setWriteConcern(wcResult.getValue()); + ON_BLOCK_EXIT([&] { + _waitForWriteConcernAndAddToCommandResponse( + opCtx, command->getName(), &inPlaceReplyBob); + }); + + result = command->run(opCtx, db, cmd, errmsg, inPlaceReplyBob); + + // Nothing in run() should change the writeConcern. + dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == + wcResult.getValue().toBSON())); + } + + // When a linearizable read command is passed in, check to make sure we're reading + // from the primary. + if (command->supportsReadConcern() && (readConcernArgsStatus.getValue().getLevel() == + repl::ReadConcernLevel::kLinearizableReadConcern) && + (request.getCommandName() != "getMore")) { + + auto linearizableReadStatus = waitForLinearizableReadConcern(opCtx); + + if (!linearizableReadStatus.isOK()) { + inPlaceReplyBob.resetToEmpty(); + auto result = Command::appendCommandStatus(inPlaceReplyBob, linearizableReadStatus); + inPlaceReplyBob.doneFast(); + replyBuilder->setMetadata(rpc::makeEmptyMetadata()); + return result; + } + } + + Command::appendCommandStatus(inPlaceReplyBob, result, errmsg); + + auto operationTime = computeOperationTime( + opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel()); + + // An uninitialized operation time means the cluster time is not propagated, so the operation + // time should not be attached to the response. + if (operationTime != LogicalTime::kUninitialized) { + Command::appendOperationTime(inPlaceReplyBob, operationTime); + } + + inPlaceReplyBob.doneFast(); + + BSONObjBuilder metadataBob; + appendReplyMetadata(opCtx, request, &metadataBob); + replyBuilder->setMetadata(metadataBob.done()); + + return result; +} + /** * Executes a command after stripping metadata, performing authorization checks, * handling audit impersonation, and (potentially) setting maintenance mode. This method @@ -507,7 +631,7 @@ void execCommandDatabase(OperationContext* opCtx, << rpc::TrackingMetadata::get(opCtx).toString(); rpc::TrackingMetadata::get(opCtx).setIsLogged(true); } - retval = command->run(opCtx, request, replyBuilder); + retval = runCommandImpl(opCtx, command, request, replyBuilder); if (!retval) { command->incrementCommandsFailed(); @@ -547,134 +671,8 @@ void execCommandDatabase(OperationContext* opCtx, } } } - } // namespace -// This really belongs in commands.cpp, but we need to move it here so we can -// use shardingState and the repl coordinator without changing our entire library -// structure. -// It will be moved back as part of SERVER-18236. -bool Command::run(OperationContext* opCtx, - const rpc::RequestInterface& request, - rpc::ReplyBuilderInterface* replyBuilder) { - auto bytesToReserve = reserveBytesForReply(); - -// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the -// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency -// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. -#ifdef _WIN32 - if (kDebugBuild) - bytesToReserve = 0; -#endif - - // run expects non-const bsonobj - BSONObj cmd = request.getCommandArgs(); - - // run expects const db std::string (can't bind to temporary) - const std::string db = request.getDatabase().toString(); - - BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); - auto readConcernArgsStatus = _extractReadConcern(cmd, supportsReadConcern()); - - if (!readConcernArgsStatus.isOK()) { - auto result = appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus()); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } - - Status rcStatus = waitForReadConcern(opCtx, readConcernArgsStatus.getValue()); - if (!rcStatus.isOK()) { - if (rcStatus == ErrorCodes::ExceededTimeLimit) { - const int debugLevel = - serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2; - LOG(debugLevel) << "Command on database " << db - << " timed out waiting for read concern to be satisfied. Command: " - << redact(getRedactedCopyForLogging(request.getCommandArgs())); - } - - auto result = appendCommandStatus(inPlaceReplyBob, rcStatus); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } - - std::string errmsg; - bool result; - auto startOperationTime = getClientOperationTime(opCtx); - if (!supportsWriteConcern(cmd)) { - if (commandSpecifiesWriteConcern(cmd)) { - auto result = appendCommandStatus( - inPlaceReplyBob, - {ErrorCodes::InvalidOptions, "Command does not support writeConcern"}); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } - - // TODO: remove queryOptions parameter from command's run method. - result = run(opCtx, db, cmd, errmsg, inPlaceReplyBob); - } else { - auto wcResult = extractWriteConcern(opCtx, cmd, db); - if (!wcResult.isOK()) { - auto result = appendCommandStatus(inPlaceReplyBob, wcResult.getStatus()); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } - - // Change the write concern while running the command. - const auto oldWC = opCtx->getWriteConcern(); - ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - opCtx->setWriteConcern(wcResult.getValue()); - ON_BLOCK_EXIT([&] { - _waitForWriteConcernAndAddToCommandResponse(opCtx, getName(), &inPlaceReplyBob); - }); - - result = run(opCtx, db, cmd, errmsg, inPlaceReplyBob); - - // Nothing in run() should change the writeConcern. - dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == - wcResult.getValue().toBSON())); - } - - // When a linearizable read command is passed in, check to make sure we're reading - // from the primary. - if (supportsReadConcern() && (readConcernArgsStatus.getValue().getLevel() == - repl::ReadConcernLevel::kLinearizableReadConcern) && - (request.getCommandName() != "getMore")) { - - auto linearizableReadStatus = waitForLinearizableReadConcern(opCtx); - - if (!linearizableReadStatus.isOK()) { - inPlaceReplyBob.resetToEmpty(); - auto result = appendCommandStatus(inPlaceReplyBob, linearizableReadStatus); - inPlaceReplyBob.doneFast(); - replyBuilder->setMetadata(rpc::makeEmptyMetadata()); - return result; - } - } - - appendCommandStatus(inPlaceReplyBob, result, errmsg); - - auto operationTime = computeOperationTime( - opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel()); - - // An uninitialized operation time means the cluster time is not propagated, so the operation - // time should not be attached to the response. - if (operationTime != LogicalTime::kUninitialized) { - appendOperationTime(inPlaceReplyBob, operationTime); - } - - inPlaceReplyBob.doneFast(); - - BSONObjBuilder metadataBob; - appendReplyMetadata(opCtx, request, &metadataBob); - replyBuilder->setMetadata(metadataBob.done()); - - return result; -} - void generateErrorResponse(OperationContext* opCtx, rpc::ReplyBuilderInterface* replyBuilder, const DBException& exception) { |