/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand #include "mongo/platform/basic.h" #include "mongo/db/service_entry_point_common.h" #include "mongo/base/checked_cast.h" #include "mongo/bson/mutable/document.h" #include "mongo/db/audit.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/impersonation_session.h" #include "mongo/db/client.h" #include "mongo/db/command_can_run_here.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/global_lock_acquisition_tracker.h" #include "mongo/db/curop.h" #include "mongo/db/curop_metrics.h" #include "mongo/db/cursor_manager.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/initialize_operation_session_info.h" #include "mongo/db/introspect.h" #include "mongo/db/jsobj.h" #include "mongo/db/lasterror.h" #include "mongo/db/logical_clock.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/ops/write_ops_exec.h" #include "mongo/db/query/find.h" #include "mongo/db/read_concern.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_entry_point_common.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/logical_time_metadata.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/rpc/metadata/tracking_metadata.h" #include "mongo/rpc/reply_builder_interface.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/message.h" #include "mongo/util/net/op_msg.h" #include "mongo/util/scopeguard.h" namespace mongo { MONGO_FP_DECLARE(rsStopGetMore); MONGO_FP_DECLARE(respondWithNotPrimaryInCommandDispatch); MONGO_FP_DECLARE(skipCheckingForNotMasterInCommandDispatch); namespace { using logger::LogComponent; // The command names for which to check out a session. These are commands that support retryable // writes, readConcern snapshot, or multi-statement transactions. We additionally check out the // session for commands that can take a lock and then run another whitelisted command in // DBDirectClient. Otherwise, the nested command would try to check out a session under a lock, // which is not allowed. const StringMap sessionCheckoutWhitelist = {{"applyOps", 1}, {"count", 1}, {"delete", 1}, {"eval", 1}, {"$eval", 1}, {"explain", 1}, {"find", 1}, {"findandmodify", 1}, {"findAndModify", 1}, {"geoSearch", 1}, {"getMore", 1}, {"group", 1}, {"insert", 1}, {"mapReduce", 1}, {"parallelCollectionScan", 1}, {"refreshLogicalSessionCacheNow", 1}, {"update", 1}}; // The command names for which readConcern level snapshot is allowed. The getMore command is // implicitly allowed to operate on a cursor which was opened under readConcern level snapshot. const StringMap readConcernSnapshotWhitelist = { {"find", 1}, {"count", 1}, {"geoSearch", 1}, {"parallelCollectionScan", 1}, {"update", 1}}; void generateLegacyQueryErrorResponse(const AssertionException* exception, const QueryMessage& queryMessage, CurOp* curop, Message* response) { curop->debug().errInfo = exception->toStatus(); log(LogComponent::kQuery) << "assertion " << exception->toString() << " ns:" << queryMessage.ns << " query:" << (queryMessage.query.valid(BSONVersion::kLatest) ? redact(queryMessage.query) : "query object is corrupt"); if (queryMessage.ntoskip || queryMessage.ntoreturn) { log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip << " ntoreturn:" << queryMessage.ntoreturn; } auto scex = exception->extraInfo(); BSONObjBuilder err; err.append("$err", exception->reason()); err.append("code", exception->code()); if (scex) { err.append("ok", 0.0); err.append("ns", scex->getns()); scex->getVersionReceived().addToBSON(err, "vReceived"); scex->getVersionWanted().addToBSON(err, "vWanted"); } BSONObj errObj = err.done(); if (scex) { log(LogComponent::kQuery) << "stale version detected during query over " << queryMessage.ns << " : " << errObj; } BufBuilder bb; bb.skip(sizeof(QueryResult::Value)); bb.appendBuf((void*)errObj.objdata(), errObj.objsize()); // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h QueryResult::View msgdata = bb.buf(); QueryResult::View qr = msgdata; qr.setResultFlags(ResultFlag_ErrSet); if (scex) qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale); qr.msgdata().setLen(bb.len()); qr.msgdata().setOperation(opReply); qr.setCursorId(0); qr.setStartingFrom(0); qr.setNReturned(1); response->setData(bb.release()); } void registerError(OperationContext* opCtx, const DBException& exception) { LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason()); CurOp::get(opCtx)->debug().errInfo = exception.toStatus(); } void generateErrorResponse(OperationContext* opCtx, rpc::ReplyBuilderInterface* replyBuilder, const DBException& exception, const BSONObj& replyMetadata) { registerError(opCtx, exception); // We could have thrown an exception after setting fields in the builder, // so we need to reset it to a clean state just to be sure. replyBuilder->reset(); replyBuilder->setCommandReply(exception.toStatus()); replyBuilder->setMetadata(replyMetadata); } void generateErrorResponse(OperationContext* opCtx, rpc::ReplyBuilderInterface* replyBuilder, const DBException& exception, const BSONObj& replyMetadata, LogicalTime operationTime) { registerError(opCtx, exception); // We could have thrown an exception after setting fields in the builder, // so we need to reset it to a clean state just to be sure. replyBuilder->reset(); replyBuilder->setCommandReply(exception.toStatus(), BSON("operationTime" << operationTime.asTimestamp())); replyBuilder->setMetadata(replyMetadata); } /** * Guard object for making a good-faith effort to enter maintenance mode and leave it when it * goes out of scope. * * Sometimes we cannot set maintenance mode, in which case the call to setMaintenanceMode will * return a non-OK status. This class does not treat that case as an error which means that * anybody using it is assuming it is ok to continue execution without maintenance mode. * * TODO: This assumption needs to be audited and documented, or this behavior should be moved * elsewhere. */ class MaintenanceModeSetter { MONGO_DISALLOW_COPYING(MaintenanceModeSetter); public: MaintenanceModeSetter(OperationContext* opCtx) : _opCtx(opCtx), _maintenanceModeSet( repl::ReplicationCoordinator::get(_opCtx)->setMaintenanceMode(true).isOK()) {} ~MaintenanceModeSetter() { if (_maintenanceModeSet) { repl::ReplicationCoordinator::get(_opCtx) ->setMaintenanceMode(false) .transitional_ignore(); } } private: OperationContext* const _opCtx; const bool _maintenanceModeSet; }; // Called from the error contexts where request may not be available. // It only attaches clusterTime and operationTime. void appendReplyMetadataOnError(OperationContext* opCtx, BSONObjBuilder* metadataBob) { auto const replCoord = repl::ReplicationCoordinator::get(opCtx); const bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; if (isReplSet) { if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { // No need to sign cluster times for internal clients. SignedLogicalTime currentTime( LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); logicalTimeMetadata.writeToMetadata(metadataBob); } else if (auto validator = LogicalTimeValidator::get(opCtx)) { auto currentTime = validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); logicalTimeMetadata.writeToMetadata(metadataBob); } } } void appendReplyMetadata(OperationContext* opCtx, const OpMsgRequest& request, BSONObjBuilder* metadataBob) { const bool isShardingAware = ShardingState::get(opCtx)->enabled(); const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer; auto const replCoord = repl::ReplicationCoordinator::get(opCtx); const bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; if (isReplSet) { // Attach our own last opTime. repl::OpTime lastOpTimeFromClient = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); replCoord->prepareReplMetadata(request.body, lastOpTimeFromClient, metadataBob); // For commands from mongos, append some info to help getLastError(w) work. // TODO: refactor out of here as part of SERVER-18236 if (isShardingAware || isConfig) { rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) .writeToMetadata(metadataBob) .transitional_ignore(); } if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { // No need to sign cluster times for internal clients. SignedLogicalTime currentTime( LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); logicalTimeMetadata.writeToMetadata(metadataBob); } else if (auto validator = LogicalTimeValidator::get(opCtx)) { auto currentTime = validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); logicalTimeMetadata.writeToMetadata(metadataBob); } } // If we're a shard other than the config shard, attach the last configOpTime we know about. if (isShardingAware && !isConfig) { auto opTime = grid.configOpTime(); rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob); } } /** * Given the specified command, returns an effective read concern which should be used or an error * if the read concern is not valid for the command. */ StatusWith _extractReadConcern(const Command* command, const std::string& dbName, const BSONObj& cmdObj) { repl::ReadConcernArgs readConcernArgs; auto readConcernParseStatus = readConcernArgs.initialize(cmdObj); if (!readConcernParseStatus.isOK()) { return readConcernParseStatus; } if (!command->supportsReadConcern(dbName, cmdObj, readConcernArgs.getLevel())) { return {ErrorCodes::InvalidOptions, str::stream() << "Command does not support read concern " << readConcernArgs.toString()}; } return readConcernArgs; } /** * For replica set members it returns the last known op time from opCtx. Otherwise will return * uninitialized cluster time. */ LogicalTime getClientOperationTime(OperationContext* opCtx) { auto const replCoord = repl::ReplicationCoordinator::get(opCtx); const bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; LogicalTime operationTime; if (isReplSet) { operationTime = LogicalTime( repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp()); } return operationTime; } /** * Returns the proper operationTime for a command. To construct the operationTime for replica set * members, it uses the last optime in the oplog for writes, last committed optime for majority * reads, and the last applied optime for every other read. An uninitialized cluster time is * returned for non replica set members. */ LogicalTime computeOperationTime(OperationContext* opCtx, LogicalTime startOperationTime, repl::ReadConcernLevel level) { auto const replCoord = repl::ReplicationCoordinator::get(opCtx); const bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; if (!isReplSet) { return LogicalTime(); } auto operationTime = getClientOperationTime(opCtx); invariant(operationTime >= startOperationTime); // If the last operationTime has not changed, consider this command a read, and, for replica set // members, construct the operationTime with the proper optime for its read concern level. if (operationTime == startOperationTime) { if (level == repl::ReadConcernLevel::kMajorityReadConcern) { operationTime = LogicalTime(replCoord->getLastCommittedOpTime().getTimestamp()); } else { operationTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); } } return operationTime; } bool runCommandImpl(OperationContext* opCtx, Command* command, const OpMsgRequest& request, rpc::ReplyBuilderInterface* replyBuilder, LogicalTime startOperationTime, const ServiceEntryPointCommon::Hooks& behaviors) { auto bytesToReserve = command->reserveBytesForReply(); // SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the // additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency // suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. #ifdef _WIN32 if (kDebugBuild) bytesToReserve = 0; #endif // run expects non-const bsonobj BSONObj cmd = request.body; // run expects const db std::string (can't bind to temporary) const std::string db = request.getDatabase().toString(); BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); behaviors.waitForReadConcern(opCtx, command, db, request, cmd); bool result; if (!command->supportsWriteConcern(cmd)) { behaviors.uassertCommandDoesNotSpecifyWriteConcern(cmd); result = command->publicRun(opCtx, request, inPlaceReplyBob); } else { auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, cmd, db)); auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); // Change the write concern while running the command. const auto oldWC = opCtx->getWriteConcern(); ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); opCtx->setWriteConcern(wcResult); ON_BLOCK_EXIT([&] { behaviors.waitForWriteConcern( opCtx, command->getName(), lastOpBeforeRun, &inPlaceReplyBob); }); result = command->publicRun(opCtx, request, inPlaceReplyBob); // Nothing in run() should change the writeConcern. dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == wcResult.toBSON())); } behaviors.waitForLinearizableReadConcern(opCtx); CommandHelpers::appendCommandStatus(inPlaceReplyBob, result); behaviors.attachCurOpErrInfo(opCtx, inPlaceReplyBob); auto operationTime = computeOperationTime( opCtx, startOperationTime, repl::ReadConcernArgs::get(opCtx).getLevel()); // An uninitialized operation time means the cluster time is not propagated, so the operation // time should not be attached to the response. if (operationTime != LogicalTime::kUninitialized) { operationTime.appendAsOperationTime(&inPlaceReplyBob); } inPlaceReplyBob.doneFast(); BSONObjBuilder metadataBob; appendReplyMetadata(opCtx, request, &metadataBob); replyBuilder->setMetadata(metadataBob.done()); return result; } /** * Executes a command after stripping metadata, performing authorization checks, * handling audit impersonation, and (potentially) setting maintenance mode. This method * also checks that the command is permissible to run on the node given its current * replication state. All the logic here is independent of any particular command; any * functionality relevant to a specific command should be confined to its run() method. */ void execCommandDatabase(OperationContext* opCtx, Command* command, const OpMsgRequest& request, rpc::ReplyBuilderInterface* replyBuilder, const ServiceEntryPointCommon::Hooks& behaviors) { auto startOperationTime = getClientOperationTime(opCtx); try { { stdx::lock_guard lk(*opCtx->getClient()); CurOp::get(opCtx)->setCommand_inlock(command); } // TODO: move this back to runCommands when mongos supports OperationContext // see SERVER-18515 for details. rpc::readRequestMetadata(opCtx, request.body); rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); auto const replCoord = repl::ReplicationCoordinator::get(opCtx); auto sessionOptions = initializeOperationSessionInfo( opCtx, request.body, command->requiresAuth(), replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet, opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()); // Session ids are forwarded in requests, so commands that require roundtrips between // servers may result in a deadlock when a server tries to check out a session it is already // using to service an earlier operation in the command's chain. To avoid this, only check // out sessions for commands that require them. const bool shouldCheckoutSession = sessionCheckoutWhitelist.find(command->getName()) != sessionCheckoutWhitelist.cend(); boost::optional autocommitVal = boost::none; if (sessionOptions && sessionOptions->getAutocommit()) { autocommitVal = *sessionOptions->getAutocommit(); } OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession, autocommitVal); const auto dbname = request.getDatabase().toString(); uassert( ErrorCodes::InvalidNamespace, str::stream() << "Invalid database name: '" << dbname << "'", NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); std::unique_ptr mmSetter; BSONElement cmdOptionMaxTimeMSField; BSONElement allowImplicitCollectionCreationField; BSONElement helpField; BSONElement shardVersionFieldIdx; BSONElement queryOptionMaxTimeMSField; StringMap topLevelFields; for (auto&& element : request.body) { StringData fieldName = element.fieldNameStringData(); if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { cmdOptionMaxTimeMSField = element; } else if (fieldName == "allowImplicitCollectionCreation") { allowImplicitCollectionCreationField = element; } else if (fieldName == CommandHelpers::kHelpFieldName) { helpField = element; } else if (fieldName == ChunkVersion::kShardVersionField) { shardVersionFieldIdx = element; } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { queryOptionMaxTimeMSField = element; } uassert(ErrorCodes::FailedToParse, str::stream() << "Parsed command object contains duplicate top level key: " << fieldName, topLevelFields[fieldName]++ == 0); } if (CommandHelpers::isHelpRequest(helpField)) { CurOp::get(opCtx)->ensureStarted(); // We disable last-error for help requests due to SERVER-11492, because config servers // use help requests to determine which commands are database writes, and so must be // forwarded to all config servers. LastError::get(opCtx->getClient()).disable(); Command::generateHelpResponse(opCtx, replyBuilder, *command); return; } ImpersonationSessionGuard guard(opCtx); uassertStatusOK(Command::checkAuthorization(command, opCtx, request)); const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); if (!opCtx->getClient()->isInDirectClient() && !MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) { auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; bool couldHaveOptedIn = allowed == Command::AllowedOnSecondary::kOptIn; bool optedIn = couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); bool canRunHere = commandCanRunHere(opCtx, dbname, command); if (!canRunHere && couldHaveOptedIn) { uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false"); } if (MONGO_FAIL_POINT(respondWithNotPrimaryInCommandDispatch)) { uassert(ErrorCodes::NotMaster, "not primary", canRunHere); } else { uassert(ErrorCodes::NotMaster, "not master", canRunHere); } if (!command->maintenanceOk() && replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && !replCoord->getMemberState().secondary()) { uassert(ErrorCodes::NotMasterOrSecondary, "node is recovering", !replCoord->getMemberState().recovering()); uassert(ErrorCodes::NotMasterOrSecondary, "node is not in primary or recovering state", replCoord->getMemberState().primary()); // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode uassert(ErrorCodes::NotMasterOrSecondary, "node is in drain mode", optedIn || alwaysAllowed); } } if (command->adminOnly()) { LOG(2) << "command: " << request.getCommandName(); } if (command->maintenanceMode()) { mmSetter.reset(new MaintenanceModeSetter(opCtx)); } if (command->shouldAffectCommandCounter()) { OpCounters* opCounters = &globalOpCounters; opCounters->gotCommand(); } // Handle command option maxTimeMS. int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); uassert(ErrorCodes::InvalidOptions, "no such command option $maxTimeMs; use maxTimeMS instead", queryOptionMaxTimeMSField.eoo()); if (maxTimeMS > 0) { uassert(40119, "Illegal attempt to set operation deadline within DBDirectClient", !opCtx->getClient()->isInDirectClient()); opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}); } auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); readConcernArgs = uassertStatusOK(_extractReadConcern(command, dbname, request.body)); // TODO SERVER-33354: Remove whitelist. if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { const bool snapshotAllowedForCommand = readConcernSnapshotWhitelist.find(command->getName()) != readConcernSnapshotWhitelist.cend(); uassert(ErrorCodes::InvalidOptions, str::stream() << "readConcern level snapshot may not be used with the " << command->getName() << " command", snapshotAllowedForCommand); uassert(ErrorCodes::InvalidOptions, "readConcernLevel snapshot requires a session ID", opCtx->getLogicalSessionId()); uassert(ErrorCodes::InvalidOptions, "readConcernLevel snapshot requires a txnNumber", opCtx->getTxnNumber()); // TODO SERVER-33355: Remove once readConcern level snapshot is supported on // secondaries. uassert(ErrorCodes::InvalidOptions, "readConcern level snapshot only supported on primaries", iAmPrimary); opCtx->lockState()->setSharedLocksShouldTwoPhaseLock(true); } auto& oss = OperationShardingState::get(opCtx); if (!opCtx->getClient()->isInDirectClient() && readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { oss.initializeShardVersion(NamespaceString(command->parseNs(dbname, request.body)), shardVersionFieldIdx); auto const shardingState = ShardingState::get(opCtx); if (oss.hasShardVersion()) { uassertStatusOK(shardingState->canAcceptShardedCommands()); } // Handle config optime information that may have been sent along with the command. uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(opCtx)); } oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); // Can throw opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. bool retval = false; CurOp::get(opCtx)->ensureStarted(); command->incrementCommandsExecuted(); if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking, logger::LogSeverity::Debug(1)) && rpc::TrackingMetadata::get(opCtx).getParentOperId()) { MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking) << rpc::TrackingMetadata::get(opCtx).toString(); rpc::TrackingMetadata::get(opCtx).setIsLogged(true); } sessionTxnState.unstashTransactionResources(); retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime, behaviors); if (retval) { if (opCtx->getWriteUnitOfWork()) { if (!opCtx->hasStashedCursor()) { // If we are in an autocommit=true transaction and have no stashed cursor, // commit the transaction. opCtx->getWriteUnitOfWork()->commit(); } else { sessionTxnState.stashTransactionResources(); } } } else { command->incrementCommandsFailed(); } } catch (const DBException& e) { // If we got a stale config, wait in case the operation is stuck in a critical section if (auto sce = e.extraInfo()) { if (!opCtx->getClient()->isInDirectClient()) { // We already have the StaleConfig exception, so just swallow any errors due to // refresh onShardVersionMismatch( opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) .ignore(); } } BSONObjBuilder metadataBob; appendReplyMetadata(opCtx, request, &metadataBob); // Note: the read concern may not have been successfully or yet placed on the opCtx, so // parsing it separately here. const std::string db = request.getDatabase().toString(); auto readConcernArgsStatus = _extractReadConcern(command, db, request.body); auto operationTime = readConcernArgsStatus.isOK() ? computeOperationTime( opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel()) : LogicalClock::get(opCtx)->getClusterTime(); // An uninitialized operation time means the cluster time is not propagated, so the // operation time should not be attached to the error response. if (operationTime != LogicalTime::kUninitialized) { LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " << "on database '" << request.getDatabase() << "' " << "with arguments '" << ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body) << "' and operationTime '" << operationTime.toString() << "': " << e.toString(); generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), operationTime); } else { LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " << "on database '" << request.getDatabase() << "' " << "with arguments '" << ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body) << "': " << e.toString(); generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj()); } } } /** * Fills out CurOp / OpDebug with basic command info. */ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { auto curop = CurOp::get(opCtx); curop->debug().iscommand = true; // We construct a legacy $cmd namespace so we can fill in curOp using // the existing logic that existed for OP_QUERY commands NamespaceString nss(request.getDatabase(), "$cmd"); stdx::lock_guard lk(*opCtx->getClient()); curop->setOpDescription_inlock(request.body); curop->markCommand_inlock(); curop->setNS_inlock(nss.ns()); } DbResponse runCommands(OperationContext* opCtx, const Message& message, const ServiceEntryPointCommon::Hooks& behaviors) { auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); [&] { OpMsgRequest request; try { // Parse. request = rpc::opMsgRequestFromAnyProtocol(message); } catch (const DBException& ex) { // If this error needs to fail the connection, propagate it out. if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) throw; auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); BSONObjBuilder metadataBob; appendReplyMetadataOnError(opCtx, &metadataBob); // Otherwise, reply with the parse error. This is useful for cases where parsing fails // due to user-supplied input, such as the document too deep error. Since we failed // during parsing, we can't log anything about the command. LOG(1) << "assertion while parsing command: " << ex.toString(); generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); return; // From lambda. Don't try executing if parsing failed. } try { // Execute. curOpCommandSetup(opCtx, request); Command* c = nullptr; // In the absence of a Command object, no redaction is possible. Therefore // to avoid displaying potentially sensitive information in the logs, // we restrict the log message to the name of the unrecognized command. // However, the complete command object will still be echoed to the client. if (!(c = CommandHelpers::findCommand(request.getCommandName()))) { globalCommandRegistry()->incrementUnknownCommands(); std::string msg = str::stream() << "no such command: '" << request.getCommandName() << "'"; LOG(2) << msg; uasserted(ErrorCodes::CommandNotFound, str::stream() << msg << ", bad cmd: '" << redact(request.body) << "'"); } LOG(2) << "run command " << request.getDatabase() << ".$cmd" << ' ' << ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body); { // Try to set this as early as possible, as soon as we have figured out the command. stdx::lock_guard lk(*opCtx->getClient()); CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); } execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors); } catch (const DBException& ex) { BSONObjBuilder metadataBob; appendReplyMetadataOnError(opCtx, &metadataBob); auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " << "on database '" << request.getDatabase() << "': " << ex.toString(); generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); } }(); if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) { // Close the connection to get client to go through server selection again. uassert(ErrorCodes::NotMaster, "Not-master error during fire-and-forget command processing", !LastError::get(opCtx->getClient()).hadNotMasterError()); return {}; // Don't reply. } auto response = replyBuilder->done(); CurOp::get(opCtx)->debug().responseLength = response.header().dataLen(); // TODO exhaust return DbResponse{std::move(response)}; } DbResponse receivedQuery(OperationContext* opCtx, const NamespaceString& nss, Client& c, const Message& m) { invariant(!nss.isCommand()); globalOpCounters.gotQuery(); DbMessage d(m); QueryMessage q(d); CurOp& op = *CurOp::get(opCtx); DbResponse dbResponse; try { Client* client = opCtx->getClient(); Status status = AuthorizationSession::get(client)->checkAuthForFind(nss, false); audit::logQueryAuthzCheck(client, nss, q.query, status.code()); uassertStatusOK(status); dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response); } catch (const AssertionException& e) { // If we got a stale config, wait in case the operation is stuck in a critical section if (auto sce = e.extraInfo()) { if (!opCtx->getClient()->isInDirectClient()) { // We already have the StaleConfig exception, so just swallow any errors due to // refresh onShardVersionMismatch( opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) .ignore(); } } dbResponse.response.reset(); generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response); } op.debug().responseLength = dbResponse.response.header().dataLen(); return dbResponse; } void receivedKillCursors(OperationContext* opCtx, const Message& m) { LastError::get(opCtx->getClient()).disable(); DbMessage dbmessage(m); int n = dbmessage.pullInt(); uassert(13659, "sent 0 cursors to kill", n != 0); massert(13658, str::stream() << "bad kill cursors size: " << m.dataSize(), m.dataSize() == 8 + (8 * n)); uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1); if (n > 2000) { (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n; verify(n < 30000); } const char* cursorArray = dbmessage.getArray(n); int found = CursorManager::killCursorGlobalIfAuthorized(opCtx, n, cursorArray); if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n; } } void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { auto insertOp = InsertOp::parseLegacy(m); invariant(insertOp.getNamespace() == nsString); for (const auto& obj : insertOp.getDocuments()) { Status status = AuthorizationSession::get(opCtx->getClient())->checkAuthForInsert(opCtx, nsString, obj); audit::logInsertAuthzCheck(opCtx->getClient(), nsString, obj, status.code()); uassertStatusOK(status); } performInserts(opCtx, insertOp); } void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { auto updateOp = UpdateOp::parseLegacy(m); auto& singleUpdate = updateOp.getUpdates()[0]; invariant(updateOp.getNamespace() == nsString); Status status = AuthorizationSession::get(opCtx->getClient()) ->checkAuthForUpdate(opCtx, nsString, singleUpdate.getQ(), singleUpdate.getU(), singleUpdate.getUpsert()); audit::logUpdateAuthzCheck(opCtx->getClient(), nsString, singleUpdate.getQ(), singleUpdate.getU(), singleUpdate.getUpsert(), singleUpdate.getMulti(), status.code()); uassertStatusOK(status); performUpdates(opCtx, updateOp); } void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { auto deleteOp = DeleteOp::parseLegacy(m); auto& singleDelete = deleteOp.getDeletes()[0]; invariant(deleteOp.getNamespace() == nsString); Status status = AuthorizationSession::get(opCtx->getClient()) ->checkAuthForDelete(opCtx, nsString, singleDelete.getQ()); audit::logDeleteAuthzCheck(opCtx->getClient(), nsString, singleDelete.getQ(), status.code()); uassertStatusOK(status); performDeletes(opCtx, deleteOp); } DbResponse receivedGetMore(OperationContext* opCtx, const Message& m, CurOp& curop, bool* shouldLogOpDebug) { globalOpCounters.gotGetMore(); DbMessage d(m); const char* ns = d.getns(); int ntoreturn = d.pullInt(); uassert( 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); long long cursorid = d.pullInt64(); curop.debug().ntoreturn = ntoreturn; curop.debug().cursorid = cursorid; { stdx::lock_guard lk(*opCtx->getClient()); CurOp::get(opCtx)->setNS_inlock(ns); } bool exhaust = false; bool isCursorAuthorized = false; DbResponse dbresponse; try { const NamespaceString nsString(ns); uassert(ErrorCodes::InvalidNamespace, str::stream() << "Invalid ns [" << ns << "]", nsString.isValid()); Status status = AuthorizationSession::get(opCtx->getClient()) ->checkAuthForGetMore(nsString, cursorid, false); audit::logGetMoreAuthzCheck(opCtx->getClient(), nsString, cursorid, status.code()); uassertStatusOK(status); while (MONGO_FAIL_POINT(rsStopGetMore)) { sleepmillis(0); } dbresponse.response = getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); } catch (AssertionException& e) { if (isCursorAuthorized) { // If a cursor with id 'cursorid' was authorized, it may have been advanced // before an exception terminated processGetMore. Erase the ClientCursor // because it may now be out of sync with the client's iteration state. // SERVER-7952 // TODO Temporary code, see SERVER-4563 for a cleanup overview. CursorManager::killCursorGlobal(opCtx, cursorid); } BSONObjBuilder err; err.append("$err", e.reason()); err.append("code", e.code()); BSONObj errObj = err.obj(); curop.debug().errInfo = e.toStatus(); dbresponse = replyToQuery(errObj, ResultFlag_ErrSet); curop.debug().responseLength = dbresponse.response.header().dataLen(); curop.debug().nreturned = 1; *shouldLogOpDebug = true; return dbresponse; } curop.debug().responseLength = dbresponse.response.header().dataLen(); auto queryResult = QueryResult::ConstView(dbresponse.response.buf()); curop.debug().nreturned = queryResult.getNReturned(); if (exhaust) { curop.debug().exhaust = true; dbresponse.exhaustNS = ns; } return dbresponse; } } // namespace BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* command, const BSONObj& cmdObj) { mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled); command->redactForLogging(&cmdToLog); BSONObjBuilder bob; cmdToLog.writeTo(&bob); return bob.obj(); } DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, const Message& m, const Hooks& behaviors) { // before we lock... NetworkOp op = m.operation(); bool isCommand = false; DbMessage dbmsg(m); Client& c = *opCtx->getClient(); if (c.isInDirectClient()) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); } else { LastError::get(c).startRequest(); AuthorizationSession::get(c)->startRequest(opCtx); // We should not be holding any locks at this point invariant(!opCtx->lockState()->isLocked()); } const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); if (op == dbQuery) { if (nsString.isCommand()) { isCommand = true; } } else if (op == dbCommand || op == dbMsg) { isCommand = true; } CurOp& currentOp = *CurOp::get(opCtx); { stdx::lock_guard lk(*opCtx->getClient()); // Commands handling code will reset this if the operation is a command // which is logically a basic CRUD operation like query, insert, etc. currentOp.setNetworkOp_inlock(op); currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); } OpDebug& debug = currentOp.debug(); long long logThresholdMs = serverGlobalParams.slowMS; bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); DbResponse dbresponse; if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { dbresponse = runCommands(opCtx, m, behaviors); } else if (op == dbQuery) { invariant(!isCommand); dbresponse = receivedQuery(opCtx, nsString, c, m); } else if (op == dbGetMore) { dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); } else { // The remaining operations do not return any response. They are fire-and-forget. try { if (op == dbKillCursors) { currentOp.ensureStarted(); logThresholdMs = 10; receivedKillCursors(opCtx, m); } else if (op != dbInsert && op != dbUpdate && op != dbDelete) { log() << " operation isn't supported: " << static_cast(op); currentOp.done(); shouldLogOpDebug = true; } else { if (!opCtx->getClient()->isInDirectClient()) { uassert(18663, str::stream() << "legacy writeOps not longer supported for " << "versioned connections, ns: " << nsString.ns() << ", op: " << networkOpToString(op), !ShardedConnectionInfo::get(&c, false)); } if (!nsString.isValid()) { uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false); } else if (op == dbInsert) { receivedInsert(opCtx, nsString, m); } else if (op == dbUpdate) { receivedUpdate(opCtx, nsString, m); } else if (op == dbDelete) { receivedDelete(opCtx, nsString, m); } else { invariant(false); } } } catch (const AssertionException& ue) { LastError::get(c).setLastError(ue.code(), ue.reason()); LOG(3) << " Caught Assertion in " << networkOpToString(op) << ", continuing " << redact(ue); debug.errInfo = ue.toStatus(); } } currentOp.ensureStarted(); currentOp.done(); debug.executionTimeMicros = durationCount(currentOp.elapsedTimeExcludingPauses()); Top::get(opCtx->getServiceContext()) .incrementGlobalLatencyStats( opCtx, durationCount(currentOp.elapsedTimeExcludingPauses()), currentOp.getReadWriteType()); const bool shouldSample = serverGlobalParams.sampleRate == 1.0 ? true : c.getPrng().nextCanonicalDouble() < serverGlobalParams.sampleRate; if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { Locker::LockerInfo lockerInfo; opCtx->lockState()->getLockerInfo(&lockerInfo); log() << debug.report(&c, currentOp, lockerInfo.stats); } if (currentOp.shouldDBProfile(shouldSample)) { // Performance profiling is on if (opCtx->lockState()->isReadLocked()) { LOG(1) << "note: not profiling because recursive read lock"; } else if (behaviors.lockedForWriting()) { // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post // lockedForWriting() call but prior to profile collection lock acquisition. LOG(1) << "note: not profiling because doing fsync+lock"; } else if (storageGlobalParams.readOnly) { LOG(1) << "note: not profiling because server is read-only"; } else { profile(opCtx, op); } } recordCurOpMetrics(opCtx); return dbresponse; } ServiceEntryPointCommon::Hooks::~Hooks() = default; } // namespace mongo