summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Morrow <acm@mongodb.com>2017-06-11 14:22:04 -0400
committerAndrew Morrow <acm@mongodb.com>2017-06-15 15:54:04 -0400
commit3dc746d9f61f8b9bb18c44d64f5e425532ac4304 (patch)
tree67717dc6fd1fc2b486491b34038f6644a071c0fb
parentf48f1ca3506de6f376e1b6a6172201bcec5dd04b (diff)
downloadmongo-3dc746d9f61f8b9bb18c44d64f5e425532ac4304.tar.gz
SERVER-29552 Fold runCommands into ServiceEntryPointMongod
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/run_commands.cpp704
-rw-r--r--src/mongo/db/run_commands.h38
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp659
4 files changed, 658 insertions, 744 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index a7a6f5aa57e..504e4172eb8 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -566,7 +566,6 @@ env.Library(
env.Library(
target="service_context_d",
source=[
- "run_commands.cpp",
"service_context_d.cpp",
"service_entry_point_mongod.cpp",
],
diff --git a/src/mongo/db/run_commands.cpp b/src/mongo/db/run_commands.cpp
deleted file mode 100644
index f1f8b652ab6..00000000000
--- a/src/mongo/db/run_commands.cpp
+++ /dev/null
@@ -1,704 +0,0 @@
-/* Copyright 2016 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/run_commands.h"
-
-#include "mongo/db/auth/impersonation_session.h"
-#include "mongo/db/client.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/curop.h"
-#include "mongo/db/lasterror.h"
-#include "mongo/db/logical_clock.h"
-#include "mongo/db/logical_time_validator.h"
-#include "mongo/db/read_concern.h"
-#include "mongo/db/repl/repl_client_info.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/stats/counters.h"
-#include "mongo/rpc/factory.h"
-#include "mongo/rpc/metadata.h"
-#include "mongo/rpc/metadata/config_server_metadata.h"
-#include "mongo/rpc/metadata/logical_time_metadata.h"
-#include "mongo/rpc/metadata/oplog_query_metadata.h"
-#include "mongo/rpc/metadata/repl_set_metadata.h"
-#include "mongo/rpc/metadata/sharding_metadata.h"
-#include "mongo/rpc/metadata/tracking_metadata.h"
-#include "mongo/rpc/reply_builder_interface.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/stale_exception.h"
-#include "mongo/util/log.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-
-namespace {
-
-void registerError(OperationContext* opCtx, const DBException& exception) {
- CurOp::get(opCtx)->debug().exceptionInfo = exception.getInfo();
-}
-
-void _generateErrorResponse(OperationContext* opCtx,
- rpc::ReplyBuilderInterface* replyBuilder,
- const DBException& exception,
- const BSONObj& replyMetadata) {
- registerError(opCtx, exception);
-
- // We could have thrown an exception after setting fields in the builder,
- // so we need to reset it to a clean state just to be sure.
- replyBuilder->reset();
-
- // We need to include some extra information for SendStaleConfig.
- if (exception.getCode() == ErrorCodes::SendStaleConfig) {
- const SendStaleConfigException& scex =
- static_cast<const SendStaleConfigException&>(exception);
- replyBuilder->setCommandReply(scex.toStatus(),
- BSON("ns" << scex.getns() << "vReceived"
- << BSONArray(scex.getVersionReceived().toBSON())
- << "vWanted"
- << BSONArray(scex.getVersionWanted().toBSON())));
- } else {
- replyBuilder->setCommandReply(exception.toStatus());
- }
-
- replyBuilder->setMetadata(replyMetadata);
-}
-
-void _generateErrorResponse(OperationContext* opCtx,
- rpc::ReplyBuilderInterface* replyBuilder,
- const DBException& exception,
- const BSONObj& replyMetadata,
- LogicalTime operationTime) {
- registerError(opCtx, exception);
-
- // We could have thrown an exception after setting fields in the builder,
- // so we need to reset it to a clean state just to be sure.
- replyBuilder->reset();
-
- // We need to include some extra information for SendStaleConfig.
- if (exception.getCode() == ErrorCodes::SendStaleConfig) {
- const SendStaleConfigException& scex =
- static_cast<const SendStaleConfigException&>(exception);
- replyBuilder->setCommandReply(scex.toStatus(),
- BSON("ns" << scex.getns() << "vReceived"
- << BSONArray(scex.getVersionReceived().toBSON())
- << "vWanted"
- << BSONArray(scex.getVersionWanted().toBSON())
- << "operationTime"
- << operationTime.asTimestamp()));
- } else {
- replyBuilder->setCommandReply(exception.toStatus(),
- BSON("operationTime" << operationTime.asTimestamp()));
- }
-
- replyBuilder->setMetadata(replyMetadata);
-}
-
-/**
- * Guard object for making a good-faith effort to enter maintenance mode and leave it when it
- * goes out of scope.
- *
- * Sometimes we cannot set maintenance mode, in which case the call to setMaintenanceMode will
- * return a non-OK status. This class does not treat that case as an error which means that
- * anybody using it is assuming it is ok to continue execution without maintenance mode.
- *
- * TODO: This assumption needs to be audited and documented, or this behavior should be moved
- * elsewhere.
- */
-class MaintenanceModeSetter {
-public:
- MaintenanceModeSetter()
- : maintenanceModeSet(
- repl::getGlobalReplicationCoordinator()->setMaintenanceMode(true).isOK()) {}
- ~MaintenanceModeSetter() {
- if (maintenanceModeSet)
- repl::getGlobalReplicationCoordinator()->setMaintenanceMode(false);
- }
-
-private:
- bool maintenanceModeSet;
-};
-
-void appendReplyMetadata(OperationContext* opCtx,
- const OpMsgRequest& request,
- BSONObjBuilder* metadataBob) {
- const bool isShardingAware = ShardingState::get(opCtx)->enabled();
- const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer;
- repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
- const bool isReplSet =
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
-
- if (isReplSet) {
- // Attach our own last opTime.
- repl::OpTime lastOpTimeFromClient =
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- replCoord->prepareReplMetadata(opCtx, request.body, lastOpTimeFromClient, metadataBob);
- // For commands from mongos, append some info to help getLastError(w) work.
- // TODO: refactor out of here as part of SERVER-18236
- if (isShardingAware || isConfig) {
- rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId())
- .writeToMetadata(metadataBob);
- if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) {
- // No need to sign logical times for internal clients.
- SignedLogicalTime currentTime(
- LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0);
- rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime);
- logicalTimeMetadata.writeToMetadata(metadataBob);
- } else if (auto validator = LogicalTimeValidator::get(opCtx)) {
- auto currentTime =
- validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime());
- rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime);
- logicalTimeMetadata.writeToMetadata(metadataBob);
- }
- }
- }
-
- // If we're a shard other than the config shard, attach the last configOpTime we know about.
- if (isShardingAware && !isConfig) {
- auto opTime = grid.configOpTime();
- rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob);
- }
-}
-
-/**
- * Given the specified command and whether it supports read concern, returns an effective read
- * concern which should be used.
- */
-StatusWith<repl::ReadConcernArgs> _extractReadConcern(const BSONObj& cmdObj,
- bool supportsNonLocalReadConcern) {
- repl::ReadConcernArgs readConcernArgs;
-
- auto readConcernParseStatus = readConcernArgs.initialize(cmdObj);
- if (!readConcernParseStatus.isOK()) {
- return readConcernParseStatus;
- }
-
- if (!supportsNonLocalReadConcern &&
- readConcernArgs.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) {
- return {ErrorCodes::InvalidOptions,
- str::stream() << "Command does not support non local read concern"};
- }
-
- return readConcernArgs;
-}
-
-void _waitForWriteConcernAndAddToCommandResponse(OperationContext* opCtx,
- const std::string& commandName,
- BSONObjBuilder* commandResponseBuilder) {
- WriteConcernResult res;
- auto waitForWCStatus =
- waitForWriteConcern(opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- opCtx->getWriteConcern(),
- &res);
- Command::appendCommandWCStatus(*commandResponseBuilder, waitForWCStatus, res);
-
- // SERVER-22421: This code is to ensure error response backwards compatibility with the
- // user management commands. This can be removed in 3.6.
- if (!waitForWCStatus.isOK() && Command::isUserManagementCommand(commandName)) {
- BSONObj temp = commandResponseBuilder->asTempObj().copy();
- commandResponseBuilder->resetToEmpty();
- Command::appendCommandStatus(*commandResponseBuilder, waitForWCStatus);
- commandResponseBuilder->appendElementsUnique(temp);
- }
-}
-
-/**
- * For replica set members it returns the last known op time from opCtx. Otherwise will return
- * uninitialized logical time.
- */
-LogicalTime getClientOperationTime(OperationContext* opCtx) {
- repl::ReplicationCoordinator* replCoord =
- repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
- const bool isReplSet =
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
- LogicalTime operationTime;
- if (isReplSet) {
- operationTime = LogicalTime(
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp());
- }
- return operationTime;
-}
-
-/**
- * Returns the proper operationTime for a command. To construct the operationTime for replica set
- * members, it uses the last optime in the oplog for writes, last committed optime for majority
- * reads, and the last applied optime for every other read. An uninitialized logical time is
- * returned for non replica set members.
- */
-LogicalTime computeOperationTime(OperationContext* opCtx,
- LogicalTime startOperationTime,
- repl::ReadConcernLevel level) {
- repl::ReplicationCoordinator* replCoord =
- repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
- const bool isReplSet =
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
-
- if (!isReplSet) {
- return LogicalTime();
- }
-
- auto operationTime = getClientOperationTime(opCtx);
- invariant(operationTime >= startOperationTime);
-
- // If the last operationTime has not changed, consider this command a read, and, for replica set
- // members, construct the operationTime with the proper optime for its read concern level.
- if (operationTime == startOperationTime) {
- if (level == repl::ReadConcernLevel::kMajorityReadConcern) {
- operationTime = LogicalTime(replCoord->getLastCommittedOpTime().getTimestamp());
- } else {
- operationTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp());
- }
- }
-
- return operationTime;
-}
-
-bool runCommandImpl(OperationContext* opCtx,
- Command* command,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* replyBuilder,
- LogicalTime startOperationTime) {
- auto bytesToReserve = command->reserveBytesForReply();
-
-// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the
-// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency
-// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds.
-#ifdef _WIN32
- if (kDebugBuild)
- bytesToReserve = 0;
-#endif
-
- // run expects non-const bsonobj
- BSONObj cmd = request.body;
-
- // run expects const db std::string (can't bind to temporary)
- const std::string db = request.getDatabase().toString();
-
- BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve);
- auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern(db, cmd));
-
- if (!readConcernArgsStatus.isOK()) {
- auto result =
- Command::appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus());
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
-
- Status rcStatus = waitForReadConcern(opCtx, readConcernArgsStatus.getValue());
- if (!rcStatus.isOK()) {
- if (rcStatus == ErrorCodes::ExceededTimeLimit) {
- const int debugLevel =
- serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2;
- LOG(debugLevel) << "Command on database " << db
- << " timed out waiting for read concern to be satisfied. Command: "
- << redact(command->getRedactedCopyForLogging(request.body));
- }
-
- auto result = Command::appendCommandStatus(inPlaceReplyBob, rcStatus);
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
-
- std::string errmsg;
- bool result;
- if (!command->supportsWriteConcern(cmd)) {
- if (commandSpecifiesWriteConcern(cmd)) {
- auto result = Command::appendCommandStatus(
- inPlaceReplyBob,
- {ErrorCodes::InvalidOptions, "Command does not support writeConcern"});
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
-
- result = command->enhancedRun(opCtx, request, errmsg, inPlaceReplyBob);
- } else {
- auto wcResult = extractWriteConcern(opCtx, cmd, db);
- if (!wcResult.isOK()) {
- auto result = Command::appendCommandStatus(inPlaceReplyBob, wcResult.getStatus());
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
-
- // Change the write concern while running the command.
- const auto oldWC = opCtx->getWriteConcern();
- ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
- opCtx->setWriteConcern(wcResult.getValue());
- ON_BLOCK_EXIT([&] {
- _waitForWriteConcernAndAddToCommandResponse(
- opCtx, command->getName(), &inPlaceReplyBob);
- });
-
- result = command->enhancedRun(opCtx, request, errmsg, inPlaceReplyBob);
-
- // Nothing in run() should change the writeConcern.
- dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() ==
- wcResult.getValue().toBSON()));
- }
-
- // When a linearizable read command is passed in, check to make sure we're reading
- // from the primary.
- if (command->supportsReadConcern(db, cmd) &&
- (readConcernArgsStatus.getValue().getLevel() ==
- repl::ReadConcernLevel::kLinearizableReadConcern) &&
- (request.getCommandName() != "getMore")) {
-
- auto linearizableReadStatus = waitForLinearizableReadConcern(opCtx);
-
- if (!linearizableReadStatus.isOK()) {
- inPlaceReplyBob.resetToEmpty();
- auto result = Command::appendCommandStatus(inPlaceReplyBob, linearizableReadStatus);
- inPlaceReplyBob.doneFast();
- replyBuilder->setMetadata(rpc::makeEmptyMetadata());
- return result;
- }
- }
-
- Command::appendCommandStatus(inPlaceReplyBob, result, errmsg);
-
- auto operationTime = computeOperationTime(
- opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel());
-
- // An uninitialized operation time means the cluster time is not propagated, so the operation
- // time should not be attached to the response.
- if (operationTime != LogicalTime::kUninitialized) {
- Command::appendOperationTime(inPlaceReplyBob, operationTime);
- }
-
- inPlaceReplyBob.doneFast();
-
- BSONObjBuilder metadataBob;
- appendReplyMetadata(opCtx, request, &metadataBob);
- replyBuilder->setMetadata(metadataBob.done());
-
- return result;
-}
-
-/**
- * Executes a command after stripping metadata, performing authorization checks,
- * handling audit impersonation, and (potentially) setting maintenance mode. This method
- * also checks that the command is permissible to run on the node given its current
- * replication state. All the logic here is independent of any particular command; any
- * functionality relevant to a specific command should be confined to its run() method.
- */
-void execCommandDatabase(OperationContext* opCtx,
- Command* command,
- const OpMsgRequest& request,
- rpc::ReplyBuilderInterface* replyBuilder) {
-
- auto startOperationTime = getClientOperationTime(opCtx);
- try {
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- CurOp::get(opCtx)->setCommand_inlock(command);
- }
-
- // TODO: move this back to runCommands when mongos supports OperationContext
- // see SERVER-18515 for details.
- rpc::readRequestMetadata(opCtx, request.body);
- rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName());
-
- std::string dbname = request.getDatabase().toString();
- uassert(
- ErrorCodes::InvalidNamespace,
- str::stream() << "Invalid database name: '" << dbname << "'",
- NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
-
- std::unique_ptr<MaintenanceModeSetter> mmSetter;
-
- BSONElement cmdOptionMaxTimeMSField;
- BSONElement helpField;
- BSONElement shardVersionFieldIdx;
- BSONElement queryOptionMaxTimeMSField;
-
- StringMap<int> topLevelFields;
- for (auto&& element : request.body) {
- StringData fieldName = element.fieldNameStringData();
- if (fieldName == QueryRequest::cmdOptionMaxTimeMS) {
- cmdOptionMaxTimeMSField = element;
- } else if (fieldName == Command::kHelpFieldName) {
- helpField = element;
- } else if (fieldName == ChunkVersion::kShardVersionField) {
- shardVersionFieldIdx = element;
- } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) {
- queryOptionMaxTimeMSField = element;
- }
-
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Parsed command object contains duplicate top level key: "
- << fieldName,
- topLevelFields[fieldName]++ == 0);
- }
-
- if (Command::isHelpRequest(helpField)) {
- CurOp::get(opCtx)->ensureStarted();
- // We disable last-error for help requests due to SERVER-11492, because config servers
- // use help requests to determine which commands are database writes, and so must be
- // forwarded to all config servers.
- LastError::get(opCtx->getClient()).disable();
- Command::generateHelpResponse(opCtx, replyBuilder, *command);
- return;
- }
-
- ImpersonationSessionGuard guard(opCtx);
- uassertStatusOK(Command::checkAuthorization(command, opCtx, dbname, request.body));
-
- repl::ReplicationCoordinator* replCoord =
- repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
- const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname);
-
- {
- bool commandCanRunOnSecondary = command->slaveOk();
-
- bool commandIsOverriddenToRunOnSecondary =
- command->slaveOverrideOk() && ReadPreferenceSetting::get(opCtx).canRunOnSecondary();
-
- bool iAmStandalone = !opCtx->writesAreReplicated();
- bool canRunHere = iAmPrimary || commandCanRunOnSecondary ||
- commandIsOverriddenToRunOnSecondary || iAmStandalone;
-
- // This logic is clearer if we don't have to invert it.
- if (!canRunHere && command->slaveOverrideOk()) {
- uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false");
- }
-
- uassert(ErrorCodes::NotMaster, "not master", canRunHere);
-
- if (!command->maintenanceOk() &&
- replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
- !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) &&
- !replCoord->getMemberState().secondary()) {
-
- uassert(ErrorCodes::NotMasterOrSecondary,
- "node is recovering",
- !replCoord->getMemberState().recovering());
- uassert(ErrorCodes::NotMasterOrSecondary,
- "node is not in primary or recovering state",
- replCoord->getMemberState().primary());
- // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode
- uassert(ErrorCodes::NotMasterOrSecondary,
- "node is in drain mode",
- commandIsOverriddenToRunOnSecondary || commandCanRunOnSecondary);
- }
- }
-
- if (command->adminOnly()) {
- LOG(2) << "command: " << request.getCommandName();
- }
-
- if (command->maintenanceMode()) {
- mmSetter.reset(new MaintenanceModeSetter);
- }
-
- if (command->shouldAffectCommandCounter()) {
- OpCounters* opCounters = &globalOpCounters;
- opCounters->gotCommand();
- }
-
- // Handle command option maxTimeMS.
- int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField));
-
- uassert(ErrorCodes::InvalidOptions,
- "no such command option $maxTimeMs; use maxTimeMS instead",
- queryOptionMaxTimeMSField.eoo());
-
- if (maxTimeMS > 0) {
- uassert(40119,
- "Illegal attempt to set operation deadline within DBDirectClient",
- !opCtx->getClient()->isInDirectClient());
- opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS});
- }
-
- // Operations are only versioned against the primary. We also make sure not to redo shard
- // version handling if this command was issued via the direct client.
- if (iAmPrimary && !opCtx->getClient()->isInDirectClient()) {
- // Handle a shard version that may have been sent along with the command.
- auto commandNS = NamespaceString(command->parseNs(dbname, request.body));
- auto& oss = OperationShardingState::get(opCtx);
- oss.initializeShardVersion(commandNS, shardVersionFieldIdx);
- auto shardingState = ShardingState::get(opCtx);
- if (oss.hasShardVersion()) {
- uassertStatusOK(shardingState->canAcceptShardedCommands());
- }
-
- // Handle config optime information that may have been sent along with the command.
- uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(opCtx));
- }
-
- // Can throw
- opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
-
- bool retval = false;
-
- CurOp::get(opCtx)->ensureStarted();
-
- command->incrementCommandsExecuted();
-
- if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking,
- logger::LogSeverity::Debug(1)) &&
- rpc::TrackingMetadata::get(opCtx).getParentOperId()) {
- MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking)
- << rpc::TrackingMetadata::get(opCtx).toString();
- rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
- }
- retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);
-
- if (!retval) {
- command->incrementCommandsFailed();
- }
- } catch (const DBException& e) {
- // If we got a stale config, wait in case the operation is stuck in a critical section
- if (e.getCode() == ErrorCodes::SendStaleConfig) {
- auto sce = dynamic_cast<const StaleConfigException*>(&e);
- invariant(sce); // do not upcasts from DBException created by uassert variants.
-
- if (!opCtx->getClient()->isInDirectClient()) {
- ShardingState::get(opCtx)->onStaleShardVersion(
- opCtx, NamespaceString(sce->getns()), sce->getVersionReceived());
- }
- }
-
- BSONObjBuilder metadataBob;
- appendReplyMetadata(opCtx, request, &metadataBob);
-
- const std::string db = request.getDatabase().toString();
- auto readConcernArgsStatus =
- _extractReadConcern(request.body, command->supportsReadConcern(db, request.body));
- auto operationTime = readConcernArgsStatus.isOK()
- ? computeOperationTime(
- opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel())
- : LogicalClock::get(opCtx)->getClusterTime();
-
- // An uninitialized operation time means the cluster time is not propagated, so the
- // operation time should not be attached to the error response.
- if (operationTime != LogicalTime::kUninitialized) {
- LOG(1) << "assertion while executing command '" << request.getCommandName() << "' "
- << "on database '" << request.getDatabase() << "' "
- << "with arguments '" << command->getRedactedCopyForLogging(request.body)
- << "' and operationTime '" << operationTime.toString() << "': " << e.toString();
-
- _generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), operationTime);
- } else {
- LOG(1) << "assertion while executing command '" << request.getCommandName() << "' "
- << "on database '" << request.getDatabase() << "' "
- << "with arguments '" << command->getRedactedCopyForLogging(request.body)
- << "': " << e.toString();
-
- _generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj());
- }
- }
-}
-
-/**
- * Fills out CurOp / OpDebug with basic command info.
- */
-void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) {
- auto curop = CurOp::get(opCtx);
- curop->debug().iscommand = true;
-
- // We construct a legacy $cmd namespace so we can fill in curOp using
- // the existing logic that existed for OP_QUERY commands
- NamespaceString nss(request.getDatabase(), "$cmd");
-
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curop->setOpDescription_inlock(request.body);
- curop->markCommand_inlock();
- curop->setNS_inlock(nss.ns());
-}
-} // namespace
-
-DbResponse runCommands(OperationContext* opCtx, const Message& message) {
- auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));
-
- // TODO SERVER-28964 If this parsing the request fails we reply to an invalid request which
- // isn't always safe. Unfortunately tests currently rely on this. Figure out what to do
- // (probably throw a special exception type like ConnectionFatalMessageParseError).
- bool canReply = true;
- auto curOp = CurOp::get(opCtx);
- boost::optional<OpMsgRequest> request;
- try {
- request.emplace(rpc::opMsgRequestFromAnyProtocol(message)); // Request is validated here.
- canReply = !request->isFlagSet(OpMsg::kMoreToCome);
-
- curOpCommandSetup(opCtx, *request);
-
- Command* c = nullptr;
- // In the absence of a Command object, no redaction is possible. Therefore
- // to avoid displaying potentially sensitive information in the logs,
- // we restrict the log message to the name of the unrecognized command.
- // However, the complete command object will still be echoed to the client.
- if (!(c = Command::findCommand(request->getCommandName()))) {
- Command::unknownCommands.increment();
- std::string msg = str::stream() << "no such command: '" << request->getCommandName()
- << "'";
- LOG(2) << msg;
- uasserted(ErrorCodes::CommandNotFound,
- str::stream() << msg << ", bad cmd: '" << redact(request->body) << "'");
- }
-
- LOG(2) << "run command " << request->getDatabase() << ".$cmd" << ' '
- << c->getRedactedCopyForLogging(request->body);
-
- {
- // Try to set this as early as possible, as soon as we have figured out the command.
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- curOp->setLogicalOp_inlock(c->getLogicalOp());
- }
-
- execCommandDatabase(opCtx, c, *request, replyBuilder.get());
- } catch (const DBException& ex) {
- if (request) {
- LOG(1) << "assertion while executing command '" << request->getCommandName() << "' "
- << "on database '" << request->getDatabase() << "': " << ex.toString();
- } else {
- // We failed during parsing so we can't log anything about the command.
- LOG(1) << "assertion while executing command: " << ex.toString();
- }
-
- _generateErrorResponse(opCtx, replyBuilder.get(), ex, rpc::makeEmptyMetadata());
- }
-
- if (!canReply) {
- return {};
- }
-
- auto response = replyBuilder->done();
- curOp->debug().responseLength = response.header().dataLen();
-
- // TODO exhaust
- return DbResponse{std::move(response)};
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/run_commands.h b/src/mongo/db/run_commands.h
deleted file mode 100644
index 187958269ee..00000000000
--- a/src/mongo/db/run_commands.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/* Copyright 2016 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/dbmessage.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/util/net/message.h"
-
-namespace mongo {
-
-DbResponse runCommands(OperationContext* opCtx, const Message& message);
-
-} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index 48df1288986..76234bc3b4b 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -34,6 +34,9 @@
#include "mongo/db/audit.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/auth/impersonation_session.h"
+#include "mongo/db/client.h"
+#include "mongo/db/commands.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/curop.h"
#include "mongo/db/curop_metrics.h"
@@ -43,19 +46,35 @@
#include "mongo/db/introspect.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/lasterror.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/db/logical_time_validator.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/query/find.h"
-#include "mongo/db/run_commands.h"
+#include "mongo/db/read_concern.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/top.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/metadata.h"
+#include "mongo/rpc/metadata/config_server_metadata.h"
+#include "mongo/rpc/metadata/logical_time_metadata.h"
+#include "mongo/rpc/metadata/oplog_query_metadata.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/rpc/metadata/sharding_metadata.h"
+#include "mongo/rpc/metadata/tracking_metadata.h"
+#include "mongo/rpc/reply_builder_interface.h"
+#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/message.h"
#include "mongo/util/net/op_msg.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -128,6 +147,644 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception,
response->setData(bb.release());
}
+void registerError(OperationContext* opCtx, const DBException& exception) {
+ CurOp::get(opCtx)->debug().exceptionInfo = exception.getInfo();
+}
+
+void _generateErrorResponse(OperationContext* opCtx,
+ rpc::ReplyBuilderInterface* replyBuilder,
+ const DBException& exception,
+ const BSONObj& replyMetadata) {
+ registerError(opCtx, exception);
+
+ // We could have thrown an exception after setting fields in the builder,
+ // so we need to reset it to a clean state just to be sure.
+ replyBuilder->reset();
+
+ // We need to include some extra information for SendStaleConfig.
+ if (exception.getCode() == ErrorCodes::SendStaleConfig) {
+ const SendStaleConfigException& scex =
+ static_cast<const SendStaleConfigException&>(exception);
+ replyBuilder->setCommandReply(scex.toStatus(),
+ BSON("ns" << scex.getns() << "vReceived"
+ << BSONArray(scex.getVersionReceived().toBSON())
+ << "vWanted"
+ << BSONArray(scex.getVersionWanted().toBSON())));
+ } else {
+ replyBuilder->setCommandReply(exception.toStatus());
+ }
+
+ replyBuilder->setMetadata(replyMetadata);
+}
+
+void _generateErrorResponse(OperationContext* opCtx,
+ rpc::ReplyBuilderInterface* replyBuilder,
+ const DBException& exception,
+ const BSONObj& replyMetadata,
+ LogicalTime operationTime) {
+ registerError(opCtx, exception);
+
+ // We could have thrown an exception after setting fields in the builder,
+ // so we need to reset it to a clean state just to be sure.
+ replyBuilder->reset();
+
+ // We need to include some extra information for SendStaleConfig.
+ if (exception.getCode() == ErrorCodes::SendStaleConfig) {
+ const SendStaleConfigException& scex =
+ static_cast<const SendStaleConfigException&>(exception);
+ replyBuilder->setCommandReply(scex.toStatus(),
+ BSON("ns" << scex.getns() << "vReceived"
+ << BSONArray(scex.getVersionReceived().toBSON())
+ << "vWanted"
+ << BSONArray(scex.getVersionWanted().toBSON())
+ << "operationTime"
+ << operationTime.asTimestamp()));
+ } else {
+ replyBuilder->setCommandReply(exception.toStatus(),
+ BSON("operationTime" << operationTime.asTimestamp()));
+ }
+
+ replyBuilder->setMetadata(replyMetadata);
+}
+
+/**
+ * Guard object for making a good-faith effort to enter maintenance mode and leave it when it
+ * goes out of scope.
+ *
+ * Sometimes we cannot set maintenance mode, in which case the call to setMaintenanceMode will
+ * return a non-OK status. This class does not treat that case as an error which means that
+ * anybody using it is assuming it is ok to continue execution without maintenance mode.
+ *
+ * TODO: This assumption needs to be audited and documented, or this behavior should be moved
+ * elsewhere.
+ */
+class MaintenanceModeSetter {
+public:
+ MaintenanceModeSetter()
+ : maintenanceModeSet(
+ repl::getGlobalReplicationCoordinator()->setMaintenanceMode(true).isOK()) {}
+ ~MaintenanceModeSetter() {
+ if (maintenanceModeSet)
+ repl::getGlobalReplicationCoordinator()->setMaintenanceMode(false);
+ }
+
+private:
+ bool maintenanceModeSet;
+};
+
+void appendReplyMetadata(OperationContext* opCtx,
+ const OpMsgRequest& request,
+ BSONObjBuilder* metadataBob) {
+ const bool isShardingAware = ShardingState::get(opCtx)->enabled();
+ const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer;
+ repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();
+ const bool isReplSet =
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+
+ if (isReplSet) {
+ // Attach our own last opTime.
+ repl::OpTime lastOpTimeFromClient =
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ replCoord->prepareReplMetadata(opCtx, request.body, lastOpTimeFromClient, metadataBob);
+ // For commands from mongos, append some info to help getLastError(w) work.
+ // TODO: refactor out of here as part of SERVER-18236
+ if (isShardingAware || isConfig) {
+ rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId())
+ .writeToMetadata(metadataBob);
+ if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) {
+ // No need to sign logical times for internal clients.
+ SignedLogicalTime currentTime(
+ LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0);
+ rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime);
+ logicalTimeMetadata.writeToMetadata(metadataBob);
+ } else if (auto validator = LogicalTimeValidator::get(opCtx)) {
+ auto currentTime =
+ validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime());
+ rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime);
+ logicalTimeMetadata.writeToMetadata(metadataBob);
+ }
+ }
+ }
+
+ // If we're a shard other than the config shard, attach the last configOpTime we know about.
+ if (isShardingAware && !isConfig) {
+ auto opTime = grid.configOpTime();
+ rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob);
+ }
+}
+
+/**
+ * Given the specified command and whether it supports read concern, returns an effective read
+ * concern which should be used.
+ */
+StatusWith<repl::ReadConcernArgs> _extractReadConcern(const BSONObj& cmdObj,
+ bool supportsNonLocalReadConcern) {
+ repl::ReadConcernArgs readConcernArgs;
+
+ auto readConcernParseStatus = readConcernArgs.initialize(cmdObj);
+ if (!readConcernParseStatus.isOK()) {
+ return readConcernParseStatus;
+ }
+
+ if (!supportsNonLocalReadConcern &&
+ readConcernArgs.getLevel() != repl::ReadConcernLevel::kLocalReadConcern) {
+ return {ErrorCodes::InvalidOptions,
+ str::stream() << "Command does not support non local read concern"};
+ }
+
+ return readConcernArgs;
+}
+
+void _waitForWriteConcernAndAddToCommandResponse(OperationContext* opCtx,
+ const std::string& commandName,
+ BSONObjBuilder* commandResponseBuilder) {
+ WriteConcernResult res;
+ auto waitForWCStatus =
+ waitForWriteConcern(opCtx,
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
+ opCtx->getWriteConcern(),
+ &res);
+ Command::appendCommandWCStatus(*commandResponseBuilder, waitForWCStatus, res);
+
+ // SERVER-22421: This code is to ensure error response backwards compatibility with the
+ // user management commands. This can be removed in 3.6.
+ if (!waitForWCStatus.isOK() && Command::isUserManagementCommand(commandName)) {
+ BSONObj temp = commandResponseBuilder->asTempObj().copy();
+ commandResponseBuilder->resetToEmpty();
+ Command::appendCommandStatus(*commandResponseBuilder, waitForWCStatus);
+ commandResponseBuilder->appendElementsUnique(temp);
+ }
+}
+
+/**
+ * For replica set members it returns the last known op time from opCtx. Otherwise will return
+ * uninitialized logical time.
+ */
+LogicalTime getClientOperationTime(OperationContext* opCtx) {
+ repl::ReplicationCoordinator* replCoord =
+ repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
+ const bool isReplSet =
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+ LogicalTime operationTime;
+ if (isReplSet) {
+ operationTime = LogicalTime(
+ repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp());
+ }
+ return operationTime;
+}
+
+/**
+ * Returns the proper operationTime for a command. To construct the operationTime for replica set
+ * members, it uses the last optime in the oplog for writes, last committed optime for majority
+ * reads, and the last applied optime for every other read. An uninitialized logical time is
+ * returned for non replica set members.
+ */
+LogicalTime computeOperationTime(OperationContext* opCtx,
+ LogicalTime startOperationTime,
+ repl::ReadConcernLevel level) {
+ repl::ReplicationCoordinator* replCoord =
+ repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
+ const bool isReplSet =
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+
+ if (!isReplSet) {
+ return LogicalTime();
+ }
+
+ auto operationTime = getClientOperationTime(opCtx);
+ invariant(operationTime >= startOperationTime);
+
+ // If the last operationTime has not changed, consider this command a read, and, for replica set
+ // members, construct the operationTime with the proper optime for its read concern level.
+ if (operationTime == startOperationTime) {
+ if (level == repl::ReadConcernLevel::kMajorityReadConcern) {
+ operationTime = LogicalTime(replCoord->getLastCommittedOpTime().getTimestamp());
+ } else {
+ operationTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp());
+ }
+ }
+
+ return operationTime;
+}
+
+bool runCommandImpl(OperationContext* opCtx,
+ Command* command,
+ const OpMsgRequest& request,
+ rpc::ReplyBuilderInterface* replyBuilder,
+ LogicalTime startOperationTime) {
+ auto bytesToReserve = command->reserveBytesForReply();
+
+// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the
+// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency
+// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds.
+#ifdef _WIN32
+ if (kDebugBuild)
+ bytesToReserve = 0;
+#endif
+
+ // run expects non-const bsonobj
+ BSONObj cmd = request.body;
+
+ // run expects const db std::string (can't bind to temporary)
+ const std::string db = request.getDatabase().toString();
+
+ BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve);
+ auto readConcernArgsStatus = _extractReadConcern(cmd, command->supportsReadConcern(db, cmd));
+
+ if (!readConcernArgsStatus.isOK()) {
+ auto result =
+ Command::appendCommandStatus(inPlaceReplyBob, readConcernArgsStatus.getStatus());
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
+
+ Status rcStatus = waitForReadConcern(opCtx, readConcernArgsStatus.getValue());
+ if (!rcStatus.isOK()) {
+ if (rcStatus == ErrorCodes::ExceededTimeLimit) {
+ const int debugLevel =
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2;
+ LOG(debugLevel) << "Command on database " << db
+ << " timed out waiting for read concern to be satisfied. Command: "
+ << redact(command->getRedactedCopyForLogging(request.body));
+ }
+
+ auto result = Command::appendCommandStatus(inPlaceReplyBob, rcStatus);
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
+
+ std::string errmsg;
+ bool result;
+ if (!command->supportsWriteConcern(cmd)) {
+ if (commandSpecifiesWriteConcern(cmd)) {
+ auto result = Command::appendCommandStatus(
+ inPlaceReplyBob,
+ {ErrorCodes::InvalidOptions, "Command does not support writeConcern"});
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
+
+ result = command->enhancedRun(opCtx, request, errmsg, inPlaceReplyBob);
+ } else {
+ auto wcResult = extractWriteConcern(opCtx, cmd, db);
+ if (!wcResult.isOK()) {
+ auto result = Command::appendCommandStatus(inPlaceReplyBob, wcResult.getStatus());
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
+
+ // Change the write concern while running the command.
+ const auto oldWC = opCtx->getWriteConcern();
+ ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
+ opCtx->setWriteConcern(wcResult.getValue());
+ ON_BLOCK_EXIT([&] {
+ _waitForWriteConcernAndAddToCommandResponse(
+ opCtx, command->getName(), &inPlaceReplyBob);
+ });
+
+ result = command->enhancedRun(opCtx, request, errmsg, inPlaceReplyBob);
+
+ // Nothing in run() should change the writeConcern.
+ dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() ==
+ wcResult.getValue().toBSON()));
+ }
+
+ // When a linearizable read command is passed in, check to make sure we're reading
+ // from the primary.
+ if (command->supportsReadConcern(db, cmd) &&
+ (readConcernArgsStatus.getValue().getLevel() ==
+ repl::ReadConcernLevel::kLinearizableReadConcern) &&
+ (request.getCommandName() != "getMore")) {
+
+ auto linearizableReadStatus = waitForLinearizableReadConcern(opCtx);
+
+ if (!linearizableReadStatus.isOK()) {
+ inPlaceReplyBob.resetToEmpty();
+ auto result = Command::appendCommandStatus(inPlaceReplyBob, linearizableReadStatus);
+ inPlaceReplyBob.doneFast();
+ replyBuilder->setMetadata(rpc::makeEmptyMetadata());
+ return result;
+ }
+ }
+
+ Command::appendCommandStatus(inPlaceReplyBob, result, errmsg);
+
+ auto operationTime = computeOperationTime(
+ opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel());
+
+ // An uninitialized operation time means the cluster time is not propagated, so the operation
+ // time should not be attached to the response.
+ if (operationTime != LogicalTime::kUninitialized) {
+ Command::appendOperationTime(inPlaceReplyBob, operationTime);
+ }
+
+ inPlaceReplyBob.doneFast();
+
+ BSONObjBuilder metadataBob;
+ appendReplyMetadata(opCtx, request, &metadataBob);
+ replyBuilder->setMetadata(metadataBob.done());
+
+ return result;
+}
+
+/**
+ * Executes a command after stripping metadata, performing authorization checks,
+ * handling audit impersonation, and (potentially) setting maintenance mode. This method
+ * also checks that the command is permissible to run on the node given its current
+ * replication state. All the logic here is independent of any particular command; any
+ * functionality relevant to a specific command should be confined to its run() method.
+ */
+void execCommandDatabase(OperationContext* opCtx,
+ Command* command,
+ const OpMsgRequest& request,
+ rpc::ReplyBuilderInterface* replyBuilder) {
+
+ auto startOperationTime = getClientOperationTime(opCtx);
+ try {
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ CurOp::get(opCtx)->setCommand_inlock(command);
+ }
+
+ // TODO: move this back to runCommands when mongos supports OperationContext
+ // see SERVER-18515 for details.
+ rpc::readRequestMetadata(opCtx, request.body);
+ rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName());
+
+ std::string dbname = request.getDatabase().toString();
+ uassert(
+ ErrorCodes::InvalidNamespace,
+ str::stream() << "Invalid database name: '" << dbname << "'",
+ NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow));
+
+ std::unique_ptr<MaintenanceModeSetter> mmSetter;
+
+ BSONElement cmdOptionMaxTimeMSField;
+ BSONElement helpField;
+ BSONElement shardVersionFieldIdx;
+ BSONElement queryOptionMaxTimeMSField;
+
+ StringMap<int> topLevelFields;
+ for (auto&& element : request.body) {
+ StringData fieldName = element.fieldNameStringData();
+ if (fieldName == QueryRequest::cmdOptionMaxTimeMS) {
+ cmdOptionMaxTimeMSField = element;
+ } else if (fieldName == Command::kHelpFieldName) {
+ helpField = element;
+ } else if (fieldName == ChunkVersion::kShardVersionField) {
+ shardVersionFieldIdx = element;
+ } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) {
+ queryOptionMaxTimeMSField = element;
+ }
+
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Parsed command object contains duplicate top level key: "
+ << fieldName,
+ topLevelFields[fieldName]++ == 0);
+ }
+
+ if (Command::isHelpRequest(helpField)) {
+ CurOp::get(opCtx)->ensureStarted();
+ // We disable last-error for help requests due to SERVER-11492, because config servers
+ // use help requests to determine which commands are database writes, and so must be
+ // forwarded to all config servers.
+ LastError::get(opCtx->getClient()).disable();
+ Command::generateHelpResponse(opCtx, replyBuilder, *command);
+ return;
+ }
+
+ ImpersonationSessionGuard guard(opCtx);
+ uassertStatusOK(Command::checkAuthorization(command, opCtx, dbname, request.body));
+
+ repl::ReplicationCoordinator* replCoord =
+ repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
+ const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname);
+
+ {
+ bool commandCanRunOnSecondary = command->slaveOk();
+
+ bool commandIsOverriddenToRunOnSecondary =
+ command->slaveOverrideOk() && ReadPreferenceSetting::get(opCtx).canRunOnSecondary();
+
+ bool iAmStandalone = !opCtx->writesAreReplicated();
+ bool canRunHere = iAmPrimary || commandCanRunOnSecondary ||
+ commandIsOverriddenToRunOnSecondary || iAmStandalone;
+
+ // This logic is clearer if we don't have to invert it.
+ if (!canRunHere && command->slaveOverrideOk()) {
+ uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false");
+ }
+
+ uassert(ErrorCodes::NotMaster, "not master", canRunHere);
+
+ if (!command->maintenanceOk() &&
+ replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet &&
+ !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) &&
+ !replCoord->getMemberState().secondary()) {
+
+ uassert(ErrorCodes::NotMasterOrSecondary,
+ "node is recovering",
+ !replCoord->getMemberState().recovering());
+ uassert(ErrorCodes::NotMasterOrSecondary,
+ "node is not in primary or recovering state",
+ replCoord->getMemberState().primary());
+ // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode
+ uassert(ErrorCodes::NotMasterOrSecondary,
+ "node is in drain mode",
+ commandIsOverriddenToRunOnSecondary || commandCanRunOnSecondary);
+ }
+ }
+
+ if (command->adminOnly()) {
+ LOG(2) << "command: " << request.getCommandName();
+ }
+
+ if (command->maintenanceMode()) {
+ mmSetter.reset(new MaintenanceModeSetter);
+ }
+
+ if (command->shouldAffectCommandCounter()) {
+ OpCounters* opCounters = &globalOpCounters;
+ opCounters->gotCommand();
+ }
+
+ // Handle command option maxTimeMS.
+ int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField));
+
+ uassert(ErrorCodes::InvalidOptions,
+ "no such command option $maxTimeMs; use maxTimeMS instead",
+ queryOptionMaxTimeMSField.eoo());
+
+ if (maxTimeMS > 0) {
+ uassert(40119,
+ "Illegal attempt to set operation deadline within DBDirectClient",
+ !opCtx->getClient()->isInDirectClient());
+ opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS});
+ }
+
+ // Operations are only versioned against the primary. We also make sure not to redo shard
+ // version handling if this command was issued via the direct client.
+ if (iAmPrimary && !opCtx->getClient()->isInDirectClient()) {
+ // Handle a shard version that may have been sent along with the command.
+ auto commandNS = NamespaceString(command->parseNs(dbname, request.body));
+ auto& oss = OperationShardingState::get(opCtx);
+ oss.initializeShardVersion(commandNS, shardVersionFieldIdx);
+ auto shardingState = ShardingState::get(opCtx);
+ if (oss.hasShardVersion()) {
+ uassertStatusOK(shardingState->canAcceptShardedCommands());
+ }
+
+ // Handle config optime information that may have been sent along with the command.
+ uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(opCtx));
+ }
+
+ // Can throw
+ opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point.
+
+ bool retval = false;
+
+ CurOp::get(opCtx)->ensureStarted();
+
+ command->incrementCommandsExecuted();
+
+ if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking,
+ logger::LogSeverity::Debug(1)) &&
+ rpc::TrackingMetadata::get(opCtx).getParentOperId()) {
+ MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking)
+ << rpc::TrackingMetadata::get(opCtx).toString();
+ rpc::TrackingMetadata::get(opCtx).setIsLogged(true);
+ }
+ retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);
+
+ if (!retval) {
+ command->incrementCommandsFailed();
+ }
+ } catch (const DBException& e) {
+ // If we got a stale config, wait in case the operation is stuck in a critical section
+ if (e.getCode() == ErrorCodes::SendStaleConfig) {
+ auto sce = dynamic_cast<const StaleConfigException*>(&e);
+ invariant(sce); // do not upcasts from DBException created by uassert variants.
+
+ if (!opCtx->getClient()->isInDirectClient()) {
+ ShardingState::get(opCtx)->onStaleShardVersion(
+ opCtx, NamespaceString(sce->getns()), sce->getVersionReceived());
+ }
+ }
+
+ BSONObjBuilder metadataBob;
+ appendReplyMetadata(opCtx, request, &metadataBob);
+
+ const std::string db = request.getDatabase().toString();
+ auto readConcernArgsStatus =
+ _extractReadConcern(request.body, command->supportsReadConcern(db, request.body));
+ auto operationTime = readConcernArgsStatus.isOK()
+ ? computeOperationTime(
+ opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel())
+ : LogicalClock::get(opCtx)->getClusterTime();
+
+ // An uninitialized operation time means the cluster time is not propagated, so the
+ // operation time should not be attached to the error response.
+ if (operationTime != LogicalTime::kUninitialized) {
+ LOG(1) << "assertion while executing command '" << request.getCommandName() << "' "
+ << "on database '" << request.getDatabase() << "' "
+ << "with arguments '" << command->getRedactedCopyForLogging(request.body)
+ << "' and operationTime '" << operationTime.toString() << "': " << e.toString();
+
+ _generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), operationTime);
+ } else {
+ LOG(1) << "assertion while executing command '" << request.getCommandName() << "' "
+ << "on database '" << request.getDatabase() << "' "
+ << "with arguments '" << command->getRedactedCopyForLogging(request.body)
+ << "': " << e.toString();
+
+ _generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj());
+ }
+ }
+}
+
+/**
+ * Fills out CurOp / OpDebug with basic command info.
+ */
+void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) {
+ auto curop = CurOp::get(opCtx);
+ curop->debug().iscommand = true;
+
+ // We construct a legacy $cmd namespace so we can fill in curOp using
+ // the existing logic that existed for OP_QUERY commands
+ NamespaceString nss(request.getDatabase(), "$cmd");
+
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ curop->setOpDescription_inlock(request.body);
+ curop->markCommand_inlock();
+ curop->setNS_inlock(nss.ns());
+}
+
+DbResponse runCommands(OperationContext* opCtx, const Message& message) {
+ auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));
+
+ // TODO SERVER-28964 If this parsing the request fails we reply to an invalid request which
+ // isn't always safe. Unfortunately tests currently rely on this. Figure out what to do
+ // (probably throw a special exception type like ConnectionFatalMessageParseError).
+ bool canReply = true;
+ auto curOp = CurOp::get(opCtx);
+ boost::optional<OpMsgRequest> request;
+ try {
+ request.emplace(rpc::opMsgRequestFromAnyProtocol(message)); // Request is validated here.
+ canReply = !request->isFlagSet(OpMsg::kMoreToCome);
+
+ curOpCommandSetup(opCtx, *request);
+
+ Command* c = nullptr;
+ // In the absence of a Command object, no redaction is possible. Therefore
+ // to avoid displaying potentially sensitive information in the logs,
+ // we restrict the log message to the name of the unrecognized command.
+ // However, the complete command object will still be echoed to the client.
+ if (!(c = Command::findCommand(request->getCommandName()))) {
+ Command::unknownCommands.increment();
+ std::string msg = str::stream() << "no such command: '" << request->getCommandName()
+ << "'";
+ LOG(2) << msg;
+ uasserted(ErrorCodes::CommandNotFound,
+ str::stream() << msg << ", bad cmd: '" << redact(request->body) << "'");
+ }
+
+ LOG(2) << "run command " << request->getDatabase() << ".$cmd" << ' '
+ << c->getRedactedCopyForLogging(request->body);
+
+ {
+ // Try to set this as early as possible, as soon as we have figured out the command.
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ curOp->setLogicalOp_inlock(c->getLogicalOp());
+ }
+
+ execCommandDatabase(opCtx, c, *request, replyBuilder.get());
+ } catch (const DBException& ex) {
+ if (request) {
+ LOG(1) << "assertion while executing command '" << request->getCommandName() << "' "
+ << "on database '" << request->getDatabase() << "': " << ex.toString();
+ } else {
+ // We failed during parsing so we can't log anything about the command.
+ LOG(1) << "assertion while executing command: " << ex.toString();
+ }
+
+ _generateErrorResponse(opCtx, replyBuilder.get(), ex, rpc::makeEmptyMetadata());
+ }
+
+ if (!canReply) {
+ return {};
+ }
+
+ auto response = replyBuilder->done();
+ curOp->debug().responseLength = response.header().dataLen();
+
+ // TODO exhaust
+ return DbResponse{std::move(response)};
+}
+
// In SERVER-7775 we reimplemented the pseudo-commands fsyncUnlock, inProg, and killOp
// as ordinary commands. To support old clients for another release, this helper serves
// to execute the real command from the legacy pseudo-command codepath.