diff options
author | Billy Donahue <billy.donahue@mongodb.com> | 2018-02-08 15:34:13 -0500 |
---|---|---|
committer | Billy Donahue <billy.donahue@mongodb.com> | 2018-02-14 12:49:59 -0500 |
commit | 87a217c70c86a9cd1a3ff1622caefc147b110144 (patch) | |
tree | 591c1ed3f69c1d7557e2493febc9d6f8cf4d50cb | |
parent | 8865f48da84e0864ddd21f64f067bce5868345c2 (diff) | |
download | mongo-87a217c70c86a9cd1a3ff1622caefc147b110144.tar.gz |
SERVER-33214 ServiceEntryPointCommon
common base for service_entry_point_embedded and
service_entry_point_mongod.
Use the deps of service_entry_point_embedded as
a starting point for the deps of service_entry_point_common.
-rw-r--r-- | src/mongo/client/embedded/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/client/embedded/service_entry_point_embedded.cpp | 1059 | ||||
-rw-r--r-- | src/mongo/client/embedded/service_entry_point_embedded.h | 3 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 21 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 1125 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.h | 89 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.cpp | 1152 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.h | 3 |
8 files changed, 1318 insertions, 2135 deletions
diff --git a/src/mongo/client/embedded/SConscript b/src/mongo/client/embedded/SConscript index 516fea02977..8c862f1ed2e 100644 --- a/src/mongo/client/embedded/SConscript +++ b/src/mongo/client/embedded/SConscript @@ -19,6 +19,7 @@ env.Library( '$BUILD_DIR/mongo/db/command_can_run_here', '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/db/service_entry_point_common', '$BUILD_DIR/mongo/db/s/sharding', '$BUILD_DIR/mongo/db/storage/storage_engine_lock_file', '$BUILD_DIR/mongo/db/storage/storage_engine_metadata', diff --git a/src/mongo/client/embedded/service_entry_point_embedded.cpp b/src/mongo/client/embedded/service_entry_point_embedded.cpp index 817ed5a3008..1cbbd48b080 100644 --- a/src/mongo/client/embedded/service_entry_point_embedded.cpp +++ b/src/mongo/client/embedded/service_entry_point_embedded.cpp @@ -40,7 +40,6 @@ #include "mongo/db/client.h" #include "mongo/db/command_can_run_here.h" #include "mongo/db/commands.h" -#include "mongo/db/commands/fsync.h" #include "mongo/db/concurrency/global_lock_acquisition_tracker.h" #include "mongo/db/curop.h" #include "mongo/db/curop_metrics.h" @@ -65,10 +64,12 @@ #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" +#include "mongo/db/service_entry_point_common.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/rpc/factory.h" +#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/config_server_metadata.h" #include "mongo/rpc/metadata/logical_time_metadata.h" @@ -87,1054 +88,32 @@ namespace mongo { -MONGO_FP_DECLARE(rsStopGetMore); -MONGO_FP_DECLARE(respondWithNotPrimaryInCommandDispatch); - -namespace { -using logger::LogComponent; - -// The command names for which to check out a session. -// -// Note: Eval should check out a session because it defaults to running under a global write lock, -// so if it didn't, and the function it was given contains any of these whitelisted commands, they -// would try to check out a session under a lock, which is not allowed. Similarly, -// refreshLogicalSessionCacheNow triggers a bulk update under a lock on the sessions collection. -const StringMap<int> cmdWhitelist = {{"delete", 1}, - {"eval", 1}, - {"$eval", 1}, - {"findandmodify", 1}, - {"findAndModify", 1}, - {"insert", 1}, - {"refreshLogicalSessionCacheNow", 1}, - {"update", 1}}; - -BSONObj getRedactedCopyForLogging(const Command* command, const BSONObj& cmdObj) { - mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled); - command->redactForLogging(&cmdToLog); - BSONObjBuilder bob; - cmdToLog.writeTo(&bob); - return bob.obj(); -} - -void generateLegacyQueryErrorResponse(const AssertionException* exception, - const QueryMessage& queryMessage, - CurOp* curop, - Message* response) { - curop->debug().errInfo = exception->toStatus(); - - log(LogComponent::kQuery) << "assertion " << exception->toString() << " ns:" << queryMessage.ns - << " query:" << (queryMessage.query.valid(BSONVersion::kLatest) - ? queryMessage.query.toString() - : "query object is corrupt"); - if (queryMessage.ntoskip || queryMessage.ntoreturn) { - log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip - << " ntoreturn:" << queryMessage.ntoreturn; - } - - auto scex = exception->extraInfo<StaleConfigInfo>(); - - BSONObjBuilder err; - err.append("$err", exception->reason()); - err.append("code", exception->code()); - if (scex) { - err.append("ok", 0.0); - err.append("ns", scex->getns()); - scex->getVersionReceived().addToBSON(err, "vReceived"); - scex->getVersionWanted().addToBSON(err, "vWanted"); - } - BSONObj errObj = err.done(); - - if (scex) { - log(LogComponent::kQuery) << "stale version detected during query over " << queryMessage.ns - << " : " << errObj; - } - - BufBuilder bb; - bb.skip(sizeof(QueryResult::Value)); - bb.appendBuf((void*)errObj.objdata(), errObj.objsize()); - - // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h - QueryResult::View msgdata = bb.buf(); - QueryResult::View qr = msgdata; - qr.setResultFlags(ResultFlag_ErrSet); - if (scex) - qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale); - qr.msgdata().setLen(bb.len()); - qr.msgdata().setOperation(opReply); - qr.setCursorId(0); - qr.setStartingFrom(0); - qr.setNReturned(1); - response->setData(bb.release()); -} - -void registerError(OperationContext* opCtx, const DBException& exception) { - LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason()); - CurOp::get(opCtx)->debug().errInfo = exception.toStatus(); -} - -void generateErrorResponse(OperationContext* opCtx, - rpc::ReplyBuilderInterface* replyBuilder, - const DBException& exception, - const BSONObj& replyMetadata) { - registerError(opCtx, exception); - - // We could have thrown an exception after setting fields in the builder, - // so we need to reset it to a clean state just to be sure. - replyBuilder->reset(); - replyBuilder->setCommandReply(exception.toStatus()); - replyBuilder->setMetadata(replyMetadata); -} - -void generateErrorResponse(OperationContext* opCtx, - rpc::ReplyBuilderInterface* replyBuilder, - const DBException& exception, - const BSONObj& replyMetadata, - LogicalTime operationTime) { - registerError(opCtx, exception); - - // We could have thrown an exception after setting fields in the builder, - // so we need to reset it to a clean state just to be sure. - replyBuilder->reset(); - replyBuilder->setCommandReply(exception.toStatus(), - BSON("operationTime" << operationTime.asTimestamp())); - replyBuilder->setMetadata(replyMetadata); -} - -/** - * Guard object for making a good-faith effort to enter maintenance mode and leave it when it - * goes out of scope. - * - * Sometimes we cannot set maintenance mode, in which case the call to setMaintenanceMode will - * return a non-OK status. This class does not treat that case as an error which means that - * anybody using it is assuming it is ok to continue execution without maintenance mode. - * - * TODO: This assumption needs to be audited and documented, or this behavior should be moved - * elsewhere. - */ -class MaintenanceModeSetter { - MONGO_DISALLOW_COPYING(MaintenanceModeSetter); - +class ServiceEntryPointEmbedded::Hooks : public ServiceEntryPointCommon::Hooks { public: - MaintenanceModeSetter(OperationContext* opCtx) - : _opCtx(opCtx), - _maintenanceModeSet( - repl::ReplicationCoordinator::get(_opCtx)->setMaintenanceMode(true).isOK()) {} - - ~MaintenanceModeSetter() { - if (_maintenanceModeSet) { - repl::ReplicationCoordinator::get(_opCtx) - ->setMaintenanceMode(false) - .transitional_ignore(); - } - } - -private: - OperationContext* const _opCtx; - const bool _maintenanceModeSet; -}; - -// Called from the error contexts where request may not be available. -// It only attaches clusterTime and operationTime. -void appendReplyMetadataOnError(OperationContext* opCtx, BSONObjBuilder* metadataBob) { - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - - if (isReplSet) { - if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { - // No need to sign cluster times for internal clients. - SignedLogicalTime currentTime( - LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } else if (auto validator = LogicalTimeValidator::get(opCtx)) { - auto currentTime = - validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } - } -} - -void appendReplyMetadata(OperationContext* opCtx, - const OpMsgRequest& request, - BSONObjBuilder* metadataBob) { - const bool isShardingAware = ShardingState::get(opCtx)->enabled(); - const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer; - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - - if (isReplSet) { - // Attach our own last opTime. - repl::OpTime lastOpTimeFromClient = - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - replCoord->prepareReplMetadata(request.body, lastOpTimeFromClient, metadataBob); - // For commands from mongos, append some info to help getLastError(w) work. - // TODO: refactor out of here as part of SERVER-18236 - if (isShardingAware || isConfig) { - rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) - .writeToMetadata(metadataBob) - .transitional_ignore(); - } - if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { - // No need to sign cluster times for internal clients. - SignedLogicalTime currentTime( - LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } else if (auto validator = LogicalTimeValidator::get(opCtx)) { - auto currentTime = - validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } - } - - // If we're a shard other than the config shard, attach the last configOpTime we know about. - if (isShardingAware && !isConfig) { - auto opTime = grid.configOpTime(); - rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob); - } -} - -/** - * Given the specified command, returns an effective read concern which should be used or an error - * if the read concern is not valid for the command. - */ -StatusWith<repl::ReadConcernArgs> _extractReadConcern(const Command* command, - const std::string& dbName, - const BSONObj& cmdObj) { - repl::ReadConcernArgs readConcernArgs; - - auto readConcernParseStatus = readConcernArgs.initialize(cmdObj); - if (!readConcernParseStatus.isOK()) { - return readConcernParseStatus; - } - - if (!command->supportsReadConcern(dbName, cmdObj, readConcernArgs.getLevel())) { - return {ErrorCodes::InvalidOptions, - str::stream() << "Command does not support read concern " - << readConcernArgs.toString()}; - } - - return readConcernArgs; -} - -void _waitForWriteConcernAndAddToCommandResponse(OperationContext* opCtx, - const std::string& commandName, - const repl::OpTime& lastOpBeforeRun, - BSONObjBuilder* commandResponseBuilder) { - auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - - // Ensures that if we tried to do a write, we wait for write concern, even if that write was - // a noop. - if ((lastOpAfterRun == lastOpBeforeRun) && - GlobalLockAcquisitionTracker::get(opCtx).getGlobalExclusiveLockTaken()) { - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - } - - WriteConcernResult res; - auto waitForWCStatus = - waitForWriteConcern(opCtx, lastOpAfterRun, opCtx->getWriteConcern(), &res); - CommandHelpers::appendCommandWCStatus(*commandResponseBuilder, waitForWCStatus, res); - - // SERVER-22421: This code is to ensure error response backwards compatibility with the - // user management commands. This can be removed in 3.6. - if (!waitForWCStatus.isOK() && CommandHelpers::isUserManagementCommand(commandName)) { - BSONObj temp = commandResponseBuilder->asTempObj().copy(); - commandResponseBuilder->resetToEmpty(); - CommandHelpers::appendCommandStatus(*commandResponseBuilder, waitForWCStatus); - commandResponseBuilder->appendElementsUnique(temp); - } -} - -/** - * For replica set members it returns the last known op time from opCtx. Otherwise will return - * uninitialized cluster time. - */ -LogicalTime getClientOperationTime(OperationContext* opCtx) { - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - LogicalTime operationTime; - if (isReplSet) { - operationTime = LogicalTime( - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp()); - } - return operationTime; -} - -/** - * Returns the proper operationTime for a command. To construct the operationTime for replica set - * members, it uses the last optime in the oplog for writes, last committed optime for majority - * reads, and the last applied optime for every other read. An uninitialized cluster time is - * returned for non replica set members. - */ -LogicalTime computeOperationTime(OperationContext* opCtx, - LogicalTime startOperationTime, - repl::ReadConcernLevel level) { - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - - if (!isReplSet) { - return LogicalTime(); - } - - auto operationTime = getClientOperationTime(opCtx); - invariant(operationTime >= startOperationTime); - - // If the last operationTime has not changed, consider this command a read, and, for replica set - // members, construct the operationTime with the proper optime for its read concern level. - if (operationTime == startOperationTime) { - if (level == repl::ReadConcernLevel::kMajorityReadConcern) { - operationTime = LogicalTime(replCoord->getLastCommittedOpTime().getTimestamp()); - } else { - operationTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); - } - } - - return operationTime; -} - -bool runCommandImpl(OperationContext* opCtx, - Command* command, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder, - LogicalTime startOperationTime) { - auto bytesToReserve = command->reserveBytesForReply(); - -// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the -// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency -// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. -#ifdef _WIN32 - if (kDebugBuild) - bytesToReserve = 0; -#endif - - // run expects non-const bsonobj - BSONObj cmd = request.body; - - // run expects const db std::string (can't bind to temporary) - const std::string db = request.getDatabase().toString(); - - BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); - - bool result; - if (!command->supportsWriteConcern(cmd)) { - result = command->publicRun(opCtx, request, inPlaceReplyBob); - } else { - auto wcResult = extractWriteConcern(opCtx, cmd, db); - if (!wcResult.isOK()) { - auto result = - CommandHelpers::appendCommandStatus(inPlaceReplyBob, wcResult.getStatus()); - inPlaceReplyBob.doneFast(); - BSONObjBuilder metadataBob; - appendReplyMetadataOnError(opCtx, &metadataBob); - replyBuilder->setMetadata(metadataBob.done()); - return result; - } - - auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - - // Change the write concern while running the command. - const auto oldWC = opCtx->getWriteConcern(); - ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - opCtx->setWriteConcern(wcResult.getValue()); - ON_BLOCK_EXIT([&] { - _waitForWriteConcernAndAddToCommandResponse( - opCtx, command->getName(), lastOpBeforeRun, &inPlaceReplyBob); - }); - - result = command->publicRun(opCtx, request, inPlaceReplyBob); - - // Nothing in run() should change the writeConcern. - dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == - wcResult.getValue().toBSON())); - } - - CommandHelpers::appendCommandStatus(inPlaceReplyBob, result); - - auto operationTime = computeOperationTime( - opCtx, startOperationTime, repl::ReadConcernArgs::get(opCtx).getLevel()); - - // An uninitialized operation time means the cluster time is not propagated, so the operation - // time should not be attached to the response. - if (operationTime != LogicalTime::kUninitialized) { - operationTime.appendAsOperationTime(&inPlaceReplyBob); - } - - inPlaceReplyBob.doneFast(); - - BSONObjBuilder metadataBob; - appendReplyMetadata(opCtx, request, &metadataBob); - replyBuilder->setMetadata(metadataBob.done()); - - return result; -} - -// When active, we won't check if we are master in command dispatch. Activate this if you want to -// test failing during command execution. -MONGO_FP_DECLARE(skipCheckingForNotMasterInCommandDispatch); - -/** - * Executes a command after stripping metadata, performing authorization checks, - * handling audit impersonation, and (potentially) setting maintenance mode. This method - * also checks that the command is permissible to run on the node given its current - * replication state. All the logic here is independent of any particular command; any - * functionality relevant to a specific command should be confined to its run() method. - */ -void execCommandDatabase(OperationContext* opCtx, - Command* command, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder) { - - auto startOperationTime = getClientOperationTime(opCtx); - try { - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setCommand_inlock(command); - } - - // TODO: move this back to runCommands when mongos supports OperationContext - // see SERVER-18515 for details. - rpc::readRequestMetadata(opCtx, request.body); - rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); - - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - initializeOperationSessionInfo( - opCtx, - request.body, - command->requiresAuth(), - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet, - opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()); - - const auto dbname = request.getDatabase().toString(); - uassert( - ErrorCodes::InvalidNamespace, - str::stream() << "Invalid database name: '" << dbname << "'", - NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); - - std::unique_ptr<MaintenanceModeSetter> mmSetter; - - BSONElement cmdOptionMaxTimeMSField; - BSONElement allowImplicitCollectionCreationField; - BSONElement helpField; - BSONElement shardVersionFieldIdx; - BSONElement queryOptionMaxTimeMSField; - - StringMap<int> topLevelFields; - for (auto&& element : request.body) { - StringData fieldName = element.fieldNameStringData(); - if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { - cmdOptionMaxTimeMSField = element; - } else if (fieldName == "allowImplicitCollectionCreation") { - allowImplicitCollectionCreationField = element; - } else if (fieldName == CommandHelpers::kHelpFieldName) { - helpField = element; - } else if (fieldName == ChunkVersion::kShardVersionField) { - shardVersionFieldIdx = element; - } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { - queryOptionMaxTimeMSField = element; - } - - uassert(ErrorCodes::FailedToParse, - str::stream() << "Parsed command object contains duplicate top level key: " - << fieldName, - topLevelFields[fieldName]++ == 0); - } - - if (CommandHelpers::isHelpRequest(helpField)) { - CurOp::get(opCtx)->ensureStarted(); - // We disable last-error for help requests due to SERVER-11492, because config servers - // use help requests to determine which commands are database writes, and so must be - // forwarded to all config servers. - LastError::get(opCtx->getClient()).disable(); - Command::generateHelpResponse(opCtx, replyBuilder, *command); - return; - } - - // Session ids are forwarded in requests, so commands that require roundtrips between - // servers may result in a deadlock when a server tries to check out a session it is already - // using to service an earlier operation in the command's chain. To avoid this, only check - // out sessions for commands that require them (i.e. write commands). - OperationContextSession sessionTxnState( - opCtx, cmdWhitelist.find(command->getName()) != cmdWhitelist.cend()); - - ImpersonationSessionGuard guard(opCtx); - uassertStatusOK(Command::checkAuthorization(command, opCtx, request)); - - const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); - - if (!opCtx->getClient()->isInDirectClient() && - !MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) { - auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); - bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; - bool couldHaveOptedIn = allowed == Command::AllowedOnSecondary::kOptIn; - bool optedIn = - couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); - bool canRunHere = commandCanRunHere(opCtx, dbname, command); - if (!canRunHere && couldHaveOptedIn) { - uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false"); - } - - if (MONGO_FAIL_POINT(respondWithNotPrimaryInCommandDispatch)) { - uassert(ErrorCodes::NotMaster, "not primary", canRunHere); - } else { - uassert(ErrorCodes::NotMaster, "not master", canRunHere); - } - - if (!command->maintenanceOk() && - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && - !replCoord->getMemberState().secondary()) { - - uassert(ErrorCodes::NotMasterOrSecondary, - "node is recovering", - !replCoord->getMemberState().recovering()); - uassert(ErrorCodes::NotMasterOrSecondary, - "node is not in primary or recovering state", - replCoord->getMemberState().primary()); - // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode - uassert(ErrorCodes::NotMasterOrSecondary, - "node is in drain mode", - optedIn || alwaysAllowed); - } - } - - if (command->adminOnly()) { - LOG(2) << "command: " << request.getCommandName(); - } - - if (command->maintenanceMode()) { - mmSetter.reset(new MaintenanceModeSetter(opCtx)); - } - - if (command->shouldAffectCommandCounter()) { - OpCounters* opCounters = &globalOpCounters; - opCounters->gotCommand(); - } - - // Handle command option maxTimeMS. - int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); - - uassert(ErrorCodes::InvalidOptions, - "no such command option $maxTimeMs; use maxTimeMS instead", - queryOptionMaxTimeMSField.eoo()); - - if (maxTimeMS > 0) { - uassert(50673, - "Illegal attempt to set operation deadline within DBDirectClient", - !opCtx->getClient()->isInDirectClient()); - opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}); - } - - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - readConcernArgs = uassertStatusOK(_extractReadConcern(command, dbname, request.body)); - - auto& oss = OperationShardingState::get(opCtx); - - if (!opCtx->getClient()->isInDirectClient() && - readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && - (iAmPrimary || - (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - oss.initializeShardVersion(NamespaceString(command->parseNs(dbname, request.body)), - shardVersionFieldIdx); - - auto const shardingState = ShardingState::get(opCtx); - if (oss.hasShardVersion()) { - uassertStatusOK(shardingState->canAcceptShardedCommands()); - } - - // Handle config optime information that may have been sent along with the command. - uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(opCtx)); - } - - oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); - - // Can throw - opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - - bool retval = false; - - CurOp::get(opCtx)->ensureStarted(); - - command->incrementCommandsExecuted(); - - if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking, - logger::LogSeverity::Debug(1)) && - rpc::TrackingMetadata::get(opCtx).getParentOperId()) { - MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking) - << rpc::TrackingMetadata::get(opCtx).toString(); - rpc::TrackingMetadata::get(opCtx).setIsLogged(true); - } - - retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime); - - if (!retval) { - command->incrementCommandsFailed(); - } - } catch (const DBException& e) { - // If we got a stale config, wait in case the operation is stuck in a critical section - if (auto sce = e.extraInfo<StaleConfigInfo>()) { - if (!opCtx->getClient()->isInDirectClient()) { - ShardingState::get(opCtx) - ->onStaleShardVersion( - opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) - .transitional_ignore(); - } - } - - BSONObjBuilder metadataBob; - appendReplyMetadata(opCtx, request, &metadataBob); - - // Note: the read concern may not have been successfully or yet placed on the opCtx, so - // parsing it separately here. - const std::string db = request.getDatabase().toString(); - auto readConcernArgsStatus = _extractReadConcern(command, db, request.body); - auto operationTime = readConcernArgsStatus.isOK() - ? computeOperationTime( - opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel()) - : LogicalClock::get(opCtx)->getClusterTime(); - - // An uninitialized operation time means the cluster time is not propagated, so the - // operation time should not be attached to the error response. - if (operationTime != LogicalTime::kUninitialized) { - LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " - << "on database '" << request.getDatabase() << "' " - << "with arguments '" << getRedactedCopyForLogging(command, request.body) - << "' and operationTime '" << operationTime.toString() << "': " << e.toString(); - - generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), operationTime); - } else { - LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " - << "on database '" << request.getDatabase() << "' " - << "with arguments '" << getRedactedCopyForLogging(command, request.body) - << "': " << e.toString(); - - generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj()); - } + bool lockedForWriting() const override { + return false; } -} -/** - * Fills out CurOp / OpDebug with basic command info. - */ -void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { - auto curop = CurOp::get(opCtx); - curop->debug().iscommand = true; - - // We construct a legacy $cmd namespace so we can fill in curOp using - // the existing logic that existed for OP_QUERY commands - NamespaceString nss(request.getDatabase(), "$cmd"); + void waitForReadConcern(OperationContext*, + const Command*, + const std::string&, + const OpMsgRequest&, + const BSONObj&) const override {} - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curop->setOpDescription_inlock(request.body); - curop->markCommand_inlock(); - curop->setNS_inlock(nss.ns()); -} - -DbResponse runCommands(OperationContext* opCtx, const Message& message) { - auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); - [&] { - OpMsgRequest request; - try { // Parse. - request = rpc::opMsgRequestFromAnyProtocol(message); - } catch (const DBException& ex) { - // If this error needs to fail the connection, propagate it out. - if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) - throw; - - auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); - BSONObjBuilder metadataBob; - appendReplyMetadataOnError(opCtx, &metadataBob); - // Otherwise, reply with the parse error. This is useful for cases where parsing fails - // due to user-supplied input, such as the document too deep error. Since we failed - // during parsing, we can't log anything about the command. - LOG(1) << "assertion while parsing command: " << ex.toString(); - generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); - - return; // From lambda. Don't try executing if parsing failed. - } - - try { // Execute. - curOpCommandSetup(opCtx, request); - - Command* c = nullptr; - // In the absence of a Command object, no redaction is possible. Therefore - // to avoid displaying potentially sensitive information in the logs, - // we restrict the log message to the name of the unrecognized command. - // However, the complete command object will still be echoed to the client. - if (!(c = CommandHelpers::findCommand(request.getCommandName()))) { - globalCommandRegistry()->incrementUnknownCommands(); - std::string msg = str::stream() << "no such command: '" << request.getCommandName() - << "'"; - LOG(2) << msg; - uasserted(ErrorCodes::CommandNotFound, - str::stream() << msg << ", bad cmd: '" << redact(request.body) << "'"); - } - - LOG(2) << "run command " << request.getDatabase() << ".$cmd" << ' ' - << getRedactedCopyForLogging(c, request.body); - - { - // Try to set this as early as possible, as soon as we have figured out the command. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); - } - - execCommandDatabase(opCtx, c, request, replyBuilder.get()); - } catch (const DBException& ex) { - BSONObjBuilder metadataBob; - appendReplyMetadataOnError(opCtx, &metadataBob); - auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); - LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " - << "on database '" << request.getDatabase() << "': " << ex.toString(); - - generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); - } - }(); - - if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) { - // Close the connection to get client to go through server selection again. - uassert(ErrorCodes::NotMaster, - "Not-master error during fire-and-forget command processing", - !LastError::get(opCtx->getClient()).hadNotMasterError()); - - return {}; // Don't reply. - } - - auto response = replyBuilder->done(); - CurOp::get(opCtx)->debug().responseLength = response.header().dataLen(); - - // TODO exhaust - return DbResponse{std::move(response)}; -} - -DbResponse receivedQuery(OperationContext* opCtx, - const NamespaceString& nss, - Client& c, - const Message& m) { - invariant(!nss.isCommand()); - globalOpCounters.gotQuery(); - - DbMessage d(m); - QueryMessage q(d); - - CurOp& op = *CurOp::get(opCtx); - DbResponse dbResponse; - - try { - Client* client = opCtx->getClient(); - Status status = AuthorizationSession::get(client)->checkAuthForFind(nss, false); - audit::logQueryAuthzCheck(client, nss, q.query, status.code()); - uassertStatusOK(status); - - dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response); - } catch (const AssertionException& e) { - // If we got a stale config, wait in case the operation is stuck in a critical section - if (auto sce = e.extraInfo<StaleConfigInfo>()) { - if (!opCtx->getClient()->isInDirectClient()) { - ShardingState::get(opCtx) - ->onStaleShardVersion( - opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) - .transitional_ignore(); - } - } - - dbResponse.response.reset(); - generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response); - } - - op.debug().responseLength = dbResponse.response.header().dataLen(); - return dbResponse; -} - -void receivedKillCursors(OperationContext* opCtx, const Message& m) { - LastError::get(opCtx->getClient()).disable(); - DbMessage dbmessage(m); - int n = dbmessage.pullInt(); - - uassert(50675, "sent 0 cursors to kill", n != 0); - massert(50674, - str::stream() << "bad kill cursors size: " << m.dataSize(), - m.dataSize() == 8 + (8 * n)); - uassert(50671, str::stream() << "sent negative cursors to kill: " << n, n >= 1); - - if (n > 2000) { - (n < 30000 ? warning() : error()) << "_receivedKillCursors, n=" << n; - verify(n < 30000); - } + void waitForWriteConcern(OperationContext* opCtx, + const std::string& commandName, + const repl::OpTime& lastOpBeforeRun, + BSONObjBuilder* commandResponseBuilder) const override {} - const char* cursorArray = dbmessage.getArray(n); + void waitForLinearizableReadConcern(OperationContext*) const override {} - int found = CursorManager::killCursorGlobalIfAuthorized(opCtx, n, cursorArray); + void uassertCommandDoesNotSpecifyWriteConcern(const BSONObj&) const override {} - if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { - LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n; - } -} - -void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { - auto insertOp = InsertOp::parseLegacy(m); - invariant(insertOp.getNamespace() == nsString); - - for (const auto& obj : insertOp.getDocuments()) { - Status status = - AuthorizationSession::get(opCtx->getClient())->checkAuthForInsert(opCtx, nsString, obj); - audit::logInsertAuthzCheck(opCtx->getClient(), nsString, obj, status.code()); - uassertStatusOK(status); - } - performInserts(opCtx, insertOp); -} - -void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { - auto updateOp = UpdateOp::parseLegacy(m); - auto& singleUpdate = updateOp.getUpdates()[0]; - invariant(updateOp.getNamespace() == nsString); - - Status status = AuthorizationSession::get(opCtx->getClient()) - ->checkAuthForUpdate(opCtx, - nsString, - singleUpdate.getQ(), - singleUpdate.getU(), - singleUpdate.getUpsert()); - audit::logUpdateAuthzCheck(opCtx->getClient(), - nsString, - singleUpdate.getQ(), - singleUpdate.getU(), - singleUpdate.getUpsert(), - singleUpdate.getMulti(), - status.code()); - uassertStatusOK(status); - - performUpdates(opCtx, updateOp); -} - -void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { - auto deleteOp = DeleteOp::parseLegacy(m); - auto& singleDelete = deleteOp.getDeletes()[0]; - invariant(deleteOp.getNamespace() == nsString); - - Status status = AuthorizationSession::get(opCtx->getClient()) - ->checkAuthForDelete(opCtx, nsString, singleDelete.getQ()); - audit::logDeleteAuthzCheck(opCtx->getClient(), nsString, singleDelete.getQ(), status.code()); - uassertStatusOK(status); - - performDeletes(opCtx, deleteOp); -} - -DbResponse receivedGetMore(OperationContext* opCtx, - const Message& m, - CurOp& curop, - bool* shouldLogOpDebug) { - globalOpCounters.gotGetMore(); - DbMessage d(m); - - const char* ns = d.getns(); - int ntoreturn = d.pullInt(); - uassert( - 50676, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); - long long cursorid = d.pullInt64(); - - curop.debug().ntoreturn = ntoreturn; - curop.debug().cursorid = cursorid; - - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setNS_inlock(ns); - } - - bool exhaust = false; - bool isCursorAuthorized = false; - - DbResponse dbresponse; - try { - const NamespaceString nsString(ns); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid ns [" << ns << "]", - nsString.isValid()); - - Status status = AuthorizationSession::get(opCtx->getClient()) - ->checkAuthForGetMore(nsString, cursorid, false); - audit::logGetMoreAuthzCheck(opCtx->getClient(), nsString, cursorid, status.code()); - uassertStatusOK(status); - - while (MONGO_FAIL_POINT(rsStopGetMore)) { - sleepmillis(0); - } - - dbresponse.response = - getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); - } catch (AssertionException& e) { - if (isCursorAuthorized) { - // If a cursor with id 'cursorid' was authorized, it may have been advanced - // before an exception terminated processGetMore. Erase the ClientCursor - // because it may now be out of sync with the client's iteration state. - // SERVER-7952 - // TODO Temporary code, see SERVER-4563 for a cleanup overview. - CursorManager::killCursorGlobal(opCtx, cursorid); - } - - BSONObjBuilder err; - err.append("$err", e.reason()); - err.append("code", e.code()); - BSONObj errObj = err.obj(); - - curop.debug().errInfo = e.toStatus(); - - dbresponse = replyToQuery(errObj, ResultFlag_ErrSet); - curop.debug().responseLength = dbresponse.response.header().dataLen(); - curop.debug().nreturned = 1; - *shouldLogOpDebug = true; - return dbresponse; - } - - curop.debug().responseLength = dbresponse.response.header().dataLen(); - auto queryResult = QueryResult::ConstView(dbresponse.response.buf()); - curop.debug().nreturned = queryResult.getNReturned(); - - if (exhaust) { - curop.debug().exhaust = true; - dbresponse.exhaustNS = ns; - } - - return dbresponse; -} - -} // namespace + void attachCurOpErrInfo(OperationContext*, BSONObjBuilder&) const override {} +}; DbResponse ServiceEntryPointEmbedded::handleRequest(OperationContext* opCtx, const Message& m) { - // before we lock... - NetworkOp op = m.operation(); - bool isCommand = false; - - DbMessage dbmsg(m); - - Client& c = *opCtx->getClient(); - if (c.isInDirectClient()) { - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - } else { - LastError::get(c).startRequest(); - AuthorizationSession::get(c)->startRequest(opCtx); - - // We should not be holding any locks at this point - invariant(!opCtx->lockState()->isLocked()); - } - - const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; - const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); - - if (op == dbQuery) { - if (nsString.isCommand()) { - isCommand = true; - } - } else if (op == dbCommand || op == dbMsg) { - isCommand = true; - } - - CurOp& currentOp = *CurOp::get(opCtx); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - // Commands handling code will reset this if the operation is a command - // which is logically a basic CRUD operation like query, insert, etc. - currentOp.setNetworkOp_inlock(op); - currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); - } - - OpDebug& debug = currentOp.debug(); - - long long logThresholdMs = serverGlobalParams.slowMS; - bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); - - DbResponse dbresponse; - if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { - dbresponse = runCommands(opCtx, m); - } else if (op == dbQuery) { - invariant(!isCommand); - dbresponse = receivedQuery(opCtx, nsString, c, m); - } else if (op == dbGetMore) { - dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); - } else { - // The remaining operations do not return any response. They are fire-and-forget. - try { - if (op == dbKillCursors) { - currentOp.ensureStarted(); - logThresholdMs = 10; - receivedKillCursors(opCtx, m); - } else if (op != dbInsert && op != dbUpdate && op != dbDelete) { - log() << " operation isn't supported: " << static_cast<int>(op); - currentOp.done(); - shouldLogOpDebug = true; - } else { - if (!opCtx->getClient()->isInDirectClient()) { - uassert(50670, - str::stream() << "legacy writeOps not longer supported for " - << "versioned connections, ns: " - << nsString.ns() - << ", op: " - << networkOpToString(op), - !ShardedConnectionInfo::get(&c, false)); - } - - if (!nsString.isValid()) { - uassert(50672, str::stream() << "Invalid ns [" << ns << "]", false); - } else if (op == dbInsert) { - receivedInsert(opCtx, nsString, m); - } else if (op == dbUpdate) { - receivedUpdate(opCtx, nsString, m); - } else if (op == dbDelete) { - receivedDelete(opCtx, nsString, m); - } else { - invariant(false); - } - } - } catch (const AssertionException& ue) { - LastError::get(c).setLastError(ue.code(), ue.reason()); - LOG(3) << " Caught Assertion in " << networkOpToString(op) << ", continuing " - << redact(ue); - debug.errInfo = ue.toStatus(); - } - } - currentOp.ensureStarted(); - currentOp.done(); - debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()); - - Top::get(opCtx->getServiceContext()) - .incrementGlobalLatencyStats( - opCtx, - durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()), - currentOp.getReadWriteType()); - - const bool shouldSample = serverGlobalParams.sampleRate == 1.0 - ? true - : c.getPrng().nextCanonicalDouble() < serverGlobalParams.sampleRate; - - if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { - Locker::LockerInfo lockerInfo; - opCtx->lockState()->getLockerInfo(&lockerInfo); - log() << debug.report(&c, currentOp, lockerInfo.stats); - } - - if (currentOp.shouldDBProfile(shouldSample)) { - // Performance profiling is on - if (opCtx->lockState()->isReadLocked()) { - LOG(1) << "note: not profiling because recursive read lock"; - } /*else if (lockedForWriting()) { - // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post - // lockedForWriting() call but prior to profile collection lock acquisition. - LOG(1) << "note: not profiling because doing fsync+lock"; - }*/ else if (storageGlobalParams.readOnly) { - LOG(1) << "note: not profiling because server is read-only"; - } else { - profile(opCtx, op); - } - } - - recordCurOpMetrics(opCtx); - return dbresponse; + return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{}); } } // namespace mongo diff --git a/src/mongo/client/embedded/service_entry_point_embedded.h b/src/mongo/client/embedded/service_entry_point_embedded.h index d44847a5c2d..1f0c7af475f 100644 --- a/src/mongo/client/embedded/service_entry_point_embedded.h +++ b/src/mongo/client/embedded/service_entry_point_embedded.h @@ -42,6 +42,9 @@ class ServiceEntryPointEmbedded final : public ServiceEntryPointImpl { public: using ServiceEntryPointImpl::ServiceEntryPointImpl; DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; + +private: + class Hooks; }; } // namespace mongo diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index add6bdc1be1..80e0345c93a 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -640,6 +640,7 @@ env.Library( LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/commands/fsync_locked', + '$BUILD_DIR/mongo/db/service_entry_point_common', ], ) @@ -658,6 +659,26 @@ env.Library( ) env.Library( + target="service_entry_point_common", + source=[ + "service_entry_point_common.cpp", + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/auth/authcore', + '$BUILD_DIR/mongo/db/auth/authmongod', + '$BUILD_DIR/mongo/db/command_can_run_here', + '$BUILD_DIR/mongo/db/ops/write_ops_exec', + '$BUILD_DIR/mongo/db/rw_concern_d', + '$BUILD_DIR/mongo/db/s/sharding', + '$BUILD_DIR/mongo/db/storage/storage_engine_lock_file', + '$BUILD_DIR/mongo/db/storage/storage_engine_metadata', + ], +) + +env.Library( target="background", source=[ "background.cpp", diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp new file mode 100644 index 00000000000..ae6652e9e63 --- /dev/null +++ b/src/mongo/db/service_entry_point_common.cpp @@ -0,0 +1,1125 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/service_entry_point_common.h" + +#include "mongo/base/checked_cast.h" +#include "mongo/bson/mutable/document.h" +#include "mongo/db/audit.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/impersonation_session.h" +#include "mongo/db/client.h" +#include "mongo/db/command_can_run_here.h" +#include "mongo/db/commands.h" +#include "mongo/db/concurrency/global_lock_acquisition_tracker.h" +#include "mongo/db/curop.h" +#include "mongo/db/curop_metrics.h" +#include "mongo/db/cursor_manager.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/initialize_operation_session_info.h" +#include "mongo/db/introspect.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/lasterror.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/logical_time_validator.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/ops/write_ops_exec.h" +#include "mongo/db/query/find.h" +#include "mongo/db/read_concern.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/service_entry_point_common.h" +#include "mongo/db/session_catalog.h" +#include "mongo/db/stats/counters.h" +#include "mongo/db/stats/top.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata.h" +#include "mongo/rpc/metadata/config_server_metadata.h" +#include "mongo/rpc/metadata/logical_time_metadata.h" +#include "mongo/rpc/metadata/oplog_query_metadata.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/rpc/metadata/sharding_metadata.h" +#include "mongo/rpc/metadata/tracking_metadata.h" +#include "mongo/rpc/reply_builder_interface.h" +#include "mongo/s/grid.h" +#include "mongo/s/stale_exception.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" +#include "mongo/util/net/message.h" +#include "mongo/util/net/op_msg.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +MONGO_FP_DECLARE(rsStopGetMore); +MONGO_FP_DECLARE(respondWithNotPrimaryInCommandDispatch); +MONGO_FP_DECLARE(skipCheckingForNotMasterInCommandDispatch); + +namespace { +using logger::LogComponent; + +// The command names for which to check out a session. +// +// Note: Eval should check out a session because it defaults to running under a global write lock, +// so if it didn't, and the function it was given contains any of these whitelisted commands, they +// would try to check out a session under a lock, which is not allowed. Similarly, +// refreshLogicalSessionCacheNow triggers a bulk update under a lock on the sessions collection. +const StringMap<int> cmdWhitelist = {{"delete", 1}, + {"eval", 1}, + {"$eval", 1}, + {"findandmodify", 1}, + {"findAndModify", 1}, + {"insert", 1}, + {"refreshLogicalSessionCacheNow", 1}, + {"update", 1}, + {"find", 1}, + {"getMore", 1}}; + +void generateLegacyQueryErrorResponse(const AssertionException* exception, + const QueryMessage& queryMessage, + CurOp* curop, + Message* response) { + curop->debug().errInfo = exception->toStatus(); + + log(LogComponent::kQuery) << "assertion " << exception->toString() << " ns:" << queryMessage.ns + << " query:" << (queryMessage.query.valid(BSONVersion::kLatest) + ? queryMessage.query.toString() + : "query object is corrupt"); + if (queryMessage.ntoskip || queryMessage.ntoreturn) { + log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip + << " ntoreturn:" << queryMessage.ntoreturn; + } + + auto scex = exception->extraInfo<StaleConfigInfo>(); + + BSONObjBuilder err; + err.append("$err", exception->reason()); + err.append("code", exception->code()); + if (scex) { + err.append("ok", 0.0); + err.append("ns", scex->getns()); + scex->getVersionReceived().addToBSON(err, "vReceived"); + scex->getVersionWanted().addToBSON(err, "vWanted"); + } + BSONObj errObj = err.done(); + + if (scex) { + log(LogComponent::kQuery) << "stale version detected during query over " << queryMessage.ns + << " : " << errObj; + } + + BufBuilder bb; + bb.skip(sizeof(QueryResult::Value)); + bb.appendBuf((void*)errObj.objdata(), errObj.objsize()); + + // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h + QueryResult::View msgdata = bb.buf(); + QueryResult::View qr = msgdata; + qr.setResultFlags(ResultFlag_ErrSet); + if (scex) + qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale); + qr.msgdata().setLen(bb.len()); + qr.msgdata().setOperation(opReply); + qr.setCursorId(0); + qr.setStartingFrom(0); + qr.setNReturned(1); + response->setData(bb.release()); +} + +void registerError(OperationContext* opCtx, const DBException& exception) { + LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason()); + CurOp::get(opCtx)->debug().errInfo = exception.toStatus(); +} + +void generateErrorResponse(OperationContext* opCtx, + rpc::ReplyBuilderInterface* replyBuilder, + const DBException& exception, + const BSONObj& replyMetadata) { + registerError(opCtx, exception); + + // We could have thrown an exception after setting fields in the builder, + // so we need to reset it to a clean state just to be sure. + replyBuilder->reset(); + replyBuilder->setCommandReply(exception.toStatus()); + replyBuilder->setMetadata(replyMetadata); +} + +void generateErrorResponse(OperationContext* opCtx, + rpc::ReplyBuilderInterface* replyBuilder, + const DBException& exception, + const BSONObj& replyMetadata, + LogicalTime operationTime) { + registerError(opCtx, exception); + + // We could have thrown an exception after setting fields in the builder, + // so we need to reset it to a clean state just to be sure. + replyBuilder->reset(); + replyBuilder->setCommandReply(exception.toStatus(), + BSON("operationTime" << operationTime.asTimestamp())); + replyBuilder->setMetadata(replyMetadata); +} + +/** + * Guard object for making a good-faith effort to enter maintenance mode and leave it when it + * goes out of scope. + * + * Sometimes we cannot set maintenance mode, in which case the call to setMaintenanceMode will + * return a non-OK status. This class does not treat that case as an error which means that + * anybody using it is assuming it is ok to continue execution without maintenance mode. + * + * TODO: This assumption needs to be audited and documented, or this behavior should be moved + * elsewhere. + */ +class MaintenanceModeSetter { + MONGO_DISALLOW_COPYING(MaintenanceModeSetter); + +public: + MaintenanceModeSetter(OperationContext* opCtx) + : _opCtx(opCtx), + _maintenanceModeSet( + repl::ReplicationCoordinator::get(_opCtx)->setMaintenanceMode(true).isOK()) {} + + ~MaintenanceModeSetter() { + if (_maintenanceModeSet) { + repl::ReplicationCoordinator::get(_opCtx) + ->setMaintenanceMode(false) + .transitional_ignore(); + } + } + +private: + OperationContext* const _opCtx; + const bool _maintenanceModeSet; +}; + +// Called from the error contexts where request may not be available. +// It only attaches clusterTime and operationTime. +void appendReplyMetadataOnError(OperationContext* opCtx, BSONObjBuilder* metadataBob) { + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (isReplSet) { + if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { + // No need to sign cluster times for internal clients. + SignedLogicalTime currentTime( + LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } else if (auto validator = LogicalTimeValidator::get(opCtx)) { + auto currentTime = + validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } + } +} + +void appendReplyMetadata(OperationContext* opCtx, + const OpMsgRequest& request, + BSONObjBuilder* metadataBob) { + const bool isShardingAware = ShardingState::get(opCtx)->enabled(); + const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer; + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (isReplSet) { + // Attach our own last opTime. + repl::OpTime lastOpTimeFromClient = + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + replCoord->prepareReplMetadata(request.body, lastOpTimeFromClient, metadataBob); + // For commands from mongos, append some info to help getLastError(w) work. + // TODO: refactor out of here as part of SERVER-18236 + if (isShardingAware || isConfig) { + rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) + .writeToMetadata(metadataBob) + .transitional_ignore(); + } + if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { + // No need to sign cluster times for internal clients. + SignedLogicalTime currentTime( + LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } else if (auto validator = LogicalTimeValidator::get(opCtx)) { + auto currentTime = + validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); + rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); + logicalTimeMetadata.writeToMetadata(metadataBob); + } + } + + // If we're a shard other than the config shard, attach the last configOpTime we know about. + if (isShardingAware && !isConfig) { + auto opTime = grid.configOpTime(); + rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob); + } +} + +/** + * Given the specified command, returns an effective read concern which should be used or an error + * if the read concern is not valid for the command. + */ +StatusWith<repl::ReadConcernArgs> _extractReadConcern(const Command* command, + const std::string& dbName, + const BSONObj& cmdObj) { + repl::ReadConcernArgs readConcernArgs; + + auto readConcernParseStatus = readConcernArgs.initialize(cmdObj); + if (!readConcernParseStatus.isOK()) { + return readConcernParseStatus; + } + + if (!command->supportsReadConcern(dbName, cmdObj, readConcernArgs.getLevel())) { + return {ErrorCodes::InvalidOptions, + str::stream() << "Command does not support read concern " + << readConcernArgs.toString()}; + } + + return readConcernArgs; +} + +/** + * For replica set members it returns the last known op time from opCtx. Otherwise will return + * uninitialized cluster time. + */ +LogicalTime getClientOperationTime(OperationContext* opCtx) { + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + LogicalTime operationTime; + if (isReplSet) { + operationTime = LogicalTime( + repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp()); + } + return operationTime; +} + +/** + * Returns the proper operationTime for a command. To construct the operationTime for replica set + * members, it uses the last optime in the oplog for writes, last committed optime for majority + * reads, and the last applied optime for every other read. An uninitialized cluster time is + * returned for non replica set members. + */ +LogicalTime computeOperationTime(OperationContext* opCtx, + LogicalTime startOperationTime, + repl::ReadConcernLevel level) { + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + const bool isReplSet = + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + + if (!isReplSet) { + return LogicalTime(); + } + + auto operationTime = getClientOperationTime(opCtx); + invariant(operationTime >= startOperationTime); + + // If the last operationTime has not changed, consider this command a read, and, for replica set + // members, construct the operationTime with the proper optime for its read concern level. + if (operationTime == startOperationTime) { + if (level == repl::ReadConcernLevel::kMajorityReadConcern) { + operationTime = LogicalTime(replCoord->getLastCommittedOpTime().getTimestamp()); + } else { + operationTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); + } + } + + return operationTime; +} + +bool runCommandImpl(OperationContext* opCtx, + Command* command, + const OpMsgRequest& request, + rpc::ReplyBuilderInterface* replyBuilder, + LogicalTime startOperationTime, + const ServiceEntryPointCommon::Hooks& behaviors) { + auto bytesToReserve = command->reserveBytesForReply(); + +// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the +// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency +// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. +#ifdef _WIN32 + if (kDebugBuild) + bytesToReserve = 0; +#endif + + // run expects non-const bsonobj + BSONObj cmd = request.body; + + // run expects const db std::string (can't bind to temporary) + const std::string db = request.getDatabase().toString(); + + BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); + + behaviors.waitForReadConcern(opCtx, command, db, request, cmd); + + bool result; + if (!command->supportsWriteConcern(cmd)) { + behaviors.uassertCommandDoesNotSpecifyWriteConcern(cmd); + result = command->publicRun(opCtx, request, inPlaceReplyBob); + } else { + auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, cmd, db)); + + auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + + // Change the write concern while running the command. + const auto oldWC = opCtx->getWriteConcern(); + ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); + opCtx->setWriteConcern(wcResult); + ON_BLOCK_EXIT([&] { + behaviors.waitForWriteConcern( + opCtx, command->getName(), lastOpBeforeRun, &inPlaceReplyBob); + }); + + result = command->publicRun(opCtx, request, inPlaceReplyBob); + + // Nothing in run() should change the writeConcern. + dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == + wcResult.toBSON())); + } + + behaviors.waitForLinearizableReadConcern(opCtx); + + CommandHelpers::appendCommandStatus(inPlaceReplyBob, result); + + behaviors.attachCurOpErrInfo(opCtx, inPlaceReplyBob); + + auto operationTime = computeOperationTime( + opCtx, startOperationTime, repl::ReadConcernArgs::get(opCtx).getLevel()); + + // An uninitialized operation time means the cluster time is not propagated, so the operation + // time should not be attached to the response. + if (operationTime != LogicalTime::kUninitialized) { + operationTime.appendAsOperationTime(&inPlaceReplyBob); + } + + inPlaceReplyBob.doneFast(); + + BSONObjBuilder metadataBob; + appendReplyMetadata(opCtx, request, &metadataBob); + replyBuilder->setMetadata(metadataBob.done()); + + return result; +} + +/** + * Executes a command after stripping metadata, performing authorization checks, + * handling audit impersonation, and (potentially) setting maintenance mode. This method + * also checks that the command is permissible to run on the node given its current + * replication state. All the logic here is independent of any particular command; any + * functionality relevant to a specific command should be confined to its run() method. + */ +void execCommandDatabase(OperationContext* opCtx, + Command* command, + const OpMsgRequest& request, + rpc::ReplyBuilderInterface* replyBuilder, + const ServiceEntryPointCommon::Hooks& behaviors) { + + auto startOperationTime = getClientOperationTime(opCtx); + try { + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setCommand_inlock(command); + } + + // TODO: move this back to runCommands when mongos supports OperationContext + // see SERVER-18515 for details. + rpc::readRequestMetadata(opCtx, request.body); + rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); + + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + initializeOperationSessionInfo( + opCtx, + request.body, + command->requiresAuth(), + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet, + opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()); + + const auto dbname = request.getDatabase().toString(); + uassert( + ErrorCodes::InvalidNamespace, + str::stream() << "Invalid database name: '" << dbname << "'", + NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); + + std::unique_ptr<MaintenanceModeSetter> mmSetter; + + BSONElement cmdOptionMaxTimeMSField; + BSONElement allowImplicitCollectionCreationField; + BSONElement helpField; + BSONElement shardVersionFieldIdx; + BSONElement queryOptionMaxTimeMSField; + + StringMap<int> topLevelFields; + for (auto&& element : request.body) { + StringData fieldName = element.fieldNameStringData(); + if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { + cmdOptionMaxTimeMSField = element; + } else if (fieldName == "allowImplicitCollectionCreation") { + allowImplicitCollectionCreationField = element; + } else if (fieldName == CommandHelpers::kHelpFieldName) { + helpField = element; + } else if (fieldName == ChunkVersion::kShardVersionField) { + shardVersionFieldIdx = element; + } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { + queryOptionMaxTimeMSField = element; + } + + uassert(ErrorCodes::FailedToParse, + str::stream() << "Parsed command object contains duplicate top level key: " + << fieldName, + topLevelFields[fieldName]++ == 0); + } + + if (CommandHelpers::isHelpRequest(helpField)) { + CurOp::get(opCtx)->ensureStarted(); + // We disable last-error for help requests due to SERVER-11492, because config servers + // use help requests to determine which commands are database writes, and so must be + // forwarded to all config servers. + LastError::get(opCtx->getClient()).disable(); + Command::generateHelpResponse(opCtx, replyBuilder, *command); + return; + } + + // Session ids are forwarded in requests, so commands that require roundtrips between + // servers may result in a deadlock when a server tries to check out a session it is already + // using to service an earlier operation in the command's chain. To avoid this, only check + // out sessions for commands that require them (i.e. write commands). + // Session checkout is also prevented for commands run within DBDirectClient. If checkout is + // required, it is expected to be handled by the outermost command. + const bool shouldCheckoutSession = + cmdWhitelist.find(command->getName()) != cmdWhitelist.cend() && + !opCtx->getClient()->isInDirectClient(); + OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession); + + ImpersonationSessionGuard guard(opCtx); + uassertStatusOK(Command::checkAuthorization(command, opCtx, request)); + + const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); + + if (!opCtx->getClient()->isInDirectClient() && + !MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) { + auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); + bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; + bool couldHaveOptedIn = allowed == Command::AllowedOnSecondary::kOptIn; + bool optedIn = + couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); + bool canRunHere = commandCanRunHere(opCtx, dbname, command); + if (!canRunHere && couldHaveOptedIn) { + uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false"); + } + + if (MONGO_FAIL_POINT(respondWithNotPrimaryInCommandDispatch)) { + uassert(ErrorCodes::NotMaster, "not primary", canRunHere); + } else { + uassert(ErrorCodes::NotMaster, "not master", canRunHere); + } + + if (!command->maintenanceOk() && + replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && + !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && + !replCoord->getMemberState().secondary()) { + + uassert(ErrorCodes::NotMasterOrSecondary, + "node is recovering", + !replCoord->getMemberState().recovering()); + uassert(ErrorCodes::NotMasterOrSecondary, + "node is not in primary or recovering state", + replCoord->getMemberState().primary()); + // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode + uassert(ErrorCodes::NotMasterOrSecondary, + "node is in drain mode", + optedIn || alwaysAllowed); + } + } + + if (command->adminOnly()) { + LOG(2) << "command: " << request.getCommandName(); + } + + if (command->maintenanceMode()) { + mmSetter.reset(new MaintenanceModeSetter(opCtx)); + } + + if (command->shouldAffectCommandCounter()) { + OpCounters* opCounters = &globalOpCounters; + opCounters->gotCommand(); + } + + // Handle command option maxTimeMS. + int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); + + uassert(ErrorCodes::InvalidOptions, + "no such command option $maxTimeMs; use maxTimeMS instead", + queryOptionMaxTimeMSField.eoo()); + + if (maxTimeMS > 0) { + uassert(40119, + "Illegal attempt to set operation deadline within DBDirectClient", + !opCtx->getClient()->isInDirectClient()); + opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}); + } + + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + readConcernArgs = uassertStatusOK(_extractReadConcern(command, dbname, request.body)); + + auto& oss = OperationShardingState::get(opCtx); + + if (!opCtx->getClient()->isInDirectClient() && + readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && + (iAmPrimary || + (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { + oss.initializeShardVersion(NamespaceString(command->parseNs(dbname, request.body)), + shardVersionFieldIdx); + + auto const shardingState = ShardingState::get(opCtx); + if (oss.hasShardVersion()) { + uassertStatusOK(shardingState->canAcceptShardedCommands()); + } + + // Handle config optime information that may have been sent along with the command. + uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(opCtx)); + } + + oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); + + // Can throw + opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. + + bool retval = false; + + CurOp::get(opCtx)->ensureStarted(); + + command->incrementCommandsExecuted(); + + if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking, + logger::LogSeverity::Debug(1)) && + rpc::TrackingMetadata::get(opCtx).getParentOperId()) { + MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking) + << rpc::TrackingMetadata::get(opCtx).toString(); + rpc::TrackingMetadata::get(opCtx).setIsLogged(true); + } + + retval = + runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime, behaviors); + + if (!retval) { + command->incrementCommandsFailed(); + } + } catch (const DBException& e) { + // If we got a stale config, wait in case the operation is stuck in a critical section + if (auto sce = e.extraInfo<StaleConfigInfo>()) { + if (!opCtx->getClient()->isInDirectClient()) { + ShardingState::get(opCtx) + ->onStaleShardVersion( + opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) + .transitional_ignore(); + } + } + + BSONObjBuilder metadataBob; + appendReplyMetadata(opCtx, request, &metadataBob); + + // Note: the read concern may not have been successfully or yet placed on the opCtx, so + // parsing it separately here. + const std::string db = request.getDatabase().toString(); + auto readConcernArgsStatus = _extractReadConcern(command, db, request.body); + auto operationTime = readConcernArgsStatus.isOK() + ? computeOperationTime( + opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel()) + : LogicalClock::get(opCtx)->getClusterTime(); + + // An uninitialized operation time means the cluster time is not propagated, so the + // operation time should not be attached to the error response. + if (operationTime != LogicalTime::kUninitialized) { + LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " + << "on database '" << request.getDatabase() << "' " + << "with arguments '" + << ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body) + << "' and operationTime '" << operationTime.toString() << "': " << e.toString(); + + generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), operationTime); + } else { + LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " + << "on database '" << request.getDatabase() << "' " + << "with arguments '" + << ServiceEntryPointCommon::getRedactedCopyForLogging(command, request.body) + << "': " << e.toString(); + + generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj()); + } + } +} + +/** + * Fills out CurOp / OpDebug with basic command info. + */ +void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { + auto curop = CurOp::get(opCtx); + curop->debug().iscommand = true; + + // We construct a legacy $cmd namespace so we can fill in curOp using + // the existing logic that existed for OP_QUERY commands + NamespaceString nss(request.getDatabase(), "$cmd"); + + stdx::lock_guard<Client> lk(*opCtx->getClient()); + curop->setOpDescription_inlock(request.body); + curop->markCommand_inlock(); + curop->setNS_inlock(nss.ns()); +} + +DbResponse runCommands(OperationContext* opCtx, + const Message& message, + const ServiceEntryPointCommon::Hooks& behaviors) { + auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); + [&] { + OpMsgRequest request; + try { // Parse. + request = rpc::opMsgRequestFromAnyProtocol(message); + } catch (const DBException& ex) { + // If this error needs to fail the connection, propagate it out. + if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) + throw; + + auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); + BSONObjBuilder metadataBob; + appendReplyMetadataOnError(opCtx, &metadataBob); + // Otherwise, reply with the parse error. This is useful for cases where parsing fails + // due to user-supplied input, such as the document too deep error. Since we failed + // during parsing, we can't log anything about the command. + LOG(1) << "assertion while parsing command: " << ex.toString(); + generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); + + return; // From lambda. Don't try executing if parsing failed. + } + + try { // Execute. + curOpCommandSetup(opCtx, request); + + Command* c = nullptr; + // In the absence of a Command object, no redaction is possible. Therefore + // to avoid displaying potentially sensitive information in the logs, + // we restrict the log message to the name of the unrecognized command. + // However, the complete command object will still be echoed to the client. + if (!(c = CommandHelpers::findCommand(request.getCommandName()))) { + globalCommandRegistry()->incrementUnknownCommands(); + std::string msg = str::stream() << "no such command: '" << request.getCommandName() + << "'"; + LOG(2) << msg; + uasserted(ErrorCodes::CommandNotFound, + str::stream() << msg << ", bad cmd: '" << redact(request.body) << "'"); + } + + LOG(2) << "run command " << request.getDatabase() << ".$cmd" << ' ' + << ServiceEntryPointCommon::getRedactedCopyForLogging(c, request.body); + + { + // Try to set this as early as possible, as soon as we have figured out the command. + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); + } + + execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors); + } catch (const DBException& ex) { + BSONObjBuilder metadataBob; + appendReplyMetadataOnError(opCtx, &metadataBob); + auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); + LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " + << "on database '" << request.getDatabase() << "': " << ex.toString(); + + generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); + } + }(); + + if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) { + // Close the connection to get client to go through server selection again. + uassert(ErrorCodes::NotMaster, + "Not-master error during fire-and-forget command processing", + !LastError::get(opCtx->getClient()).hadNotMasterError()); + + return {}; // Don't reply. + } + + auto response = replyBuilder->done(); + CurOp::get(opCtx)->debug().responseLength = response.header().dataLen(); + + // TODO exhaust + return DbResponse{std::move(response)}; +} + +DbResponse receivedQuery(OperationContext* opCtx, + const NamespaceString& nss, + Client& c, + const Message& m) { + invariant(!nss.isCommand()); + globalOpCounters.gotQuery(); + + DbMessage d(m); + QueryMessage q(d); + + CurOp& op = *CurOp::get(opCtx); + DbResponse dbResponse; + + try { + Client* client = opCtx->getClient(); + Status status = AuthorizationSession::get(client)->checkAuthForFind(nss, false); + audit::logQueryAuthzCheck(client, nss, q.query, status.code()); + uassertStatusOK(status); + + dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response); + } catch (const AssertionException& e) { + // If we got a stale config, wait in case the operation is stuck in a critical section + if (auto sce = e.extraInfo<StaleConfigInfo>()) { + if (!opCtx->getClient()->isInDirectClient()) { + ShardingState::get(opCtx) + ->onStaleShardVersion( + opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) + .transitional_ignore(); + } + } + + dbResponse.response.reset(); + generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response); + } + + op.debug().responseLength = dbResponse.response.header().dataLen(); + return dbResponse; +} + +void receivedKillCursors(OperationContext* opCtx, const Message& m) { + LastError::get(opCtx->getClient()).disable(); + DbMessage dbmessage(m); + int n = dbmessage.pullInt(); + + uassert(13659, "sent 0 cursors to kill", n != 0); + massert(13658, + str::stream() << "bad kill cursors size: " << m.dataSize(), + m.dataSize() == 8 + (8 * n)); + uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1); + + if (n > 2000) { + (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n; + verify(n < 30000); + } + + const char* cursorArray = dbmessage.getArray(n); + + int found = CursorManager::killCursorGlobalIfAuthorized(opCtx, n, cursorArray); + + if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { + LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n; + } +} + +void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { + auto insertOp = InsertOp::parseLegacy(m); + invariant(insertOp.getNamespace() == nsString); + + for (const auto& obj : insertOp.getDocuments()) { + Status status = + AuthorizationSession::get(opCtx->getClient())->checkAuthForInsert(opCtx, nsString, obj); + audit::logInsertAuthzCheck(opCtx->getClient(), nsString, obj, status.code()); + uassertStatusOK(status); + } + performInserts(opCtx, insertOp); +} + +void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { + auto updateOp = UpdateOp::parseLegacy(m); + auto& singleUpdate = updateOp.getUpdates()[0]; + invariant(updateOp.getNamespace() == nsString); + + Status status = AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForUpdate(opCtx, + nsString, + singleUpdate.getQ(), + singleUpdate.getU(), + singleUpdate.getUpsert()); + audit::logUpdateAuthzCheck(opCtx->getClient(), + nsString, + singleUpdate.getQ(), + singleUpdate.getU(), + singleUpdate.getUpsert(), + singleUpdate.getMulti(), + status.code()); + uassertStatusOK(status); + + performUpdates(opCtx, updateOp); +} + +void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { + auto deleteOp = DeleteOp::parseLegacy(m); + auto& singleDelete = deleteOp.getDeletes()[0]; + invariant(deleteOp.getNamespace() == nsString); + + Status status = AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForDelete(opCtx, nsString, singleDelete.getQ()); + audit::logDeleteAuthzCheck(opCtx->getClient(), nsString, singleDelete.getQ(), status.code()); + uassertStatusOK(status); + + performDeletes(opCtx, deleteOp); +} + +DbResponse receivedGetMore(OperationContext* opCtx, + const Message& m, + CurOp& curop, + bool* shouldLogOpDebug) { + globalOpCounters.gotGetMore(); + DbMessage d(m); + + const char* ns = d.getns(); + int ntoreturn = d.pullInt(); + uassert( + 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); + long long cursorid = d.pullInt64(); + + curop.debug().ntoreturn = ntoreturn; + curop.debug().cursorid = cursorid; + + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + CurOp::get(opCtx)->setNS_inlock(ns); + } + + bool exhaust = false; + bool isCursorAuthorized = false; + + DbResponse dbresponse; + try { + const NamespaceString nsString(ns); + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "Invalid ns [" << ns << "]", + nsString.isValid()); + + Status status = AuthorizationSession::get(opCtx->getClient()) + ->checkAuthForGetMore(nsString, cursorid, false); + audit::logGetMoreAuthzCheck(opCtx->getClient(), nsString, cursorid, status.code()); + uassertStatusOK(status); + + while (MONGO_FAIL_POINT(rsStopGetMore)) { + sleepmillis(0); + } + + dbresponse.response = + getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); + } catch (AssertionException& e) { + if (isCursorAuthorized) { + // If a cursor with id 'cursorid' was authorized, it may have been advanced + // before an exception terminated processGetMore. Erase the ClientCursor + // because it may now be out of sync with the client's iteration state. + // SERVER-7952 + // TODO Temporary code, see SERVER-4563 for a cleanup overview. + CursorManager::killCursorGlobal(opCtx, cursorid); + } + + BSONObjBuilder err; + err.append("$err", e.reason()); + err.append("code", e.code()); + BSONObj errObj = err.obj(); + + curop.debug().errInfo = e.toStatus(); + + dbresponse = replyToQuery(errObj, ResultFlag_ErrSet); + curop.debug().responseLength = dbresponse.response.header().dataLen(); + curop.debug().nreturned = 1; + *shouldLogOpDebug = true; + return dbresponse; + } + + curop.debug().responseLength = dbresponse.response.header().dataLen(); + auto queryResult = QueryResult::ConstView(dbresponse.response.buf()); + curop.debug().nreturned = queryResult.getNReturned(); + + if (exhaust) { + curop.debug().exhaust = true; + dbresponse.exhaustNS = ns; + } + + return dbresponse; +} + +} // namespace + +BSONObj ServiceEntryPointCommon::getRedactedCopyForLogging(const Command* command, + const BSONObj& cmdObj) { + mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled); + command->redactForLogging(&cmdToLog); + BSONObjBuilder bob; + cmdToLog.writeTo(&bob); + return bob.obj(); +} + +DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, + const Message& m, + const Hooks& behaviors) { + // before we lock... + NetworkOp op = m.operation(); + bool isCommand = false; + + DbMessage dbmsg(m); + + Client& c = *opCtx->getClient(); + if (c.isInDirectClient()) { + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + } else { + LastError::get(c).startRequest(); + AuthorizationSession::get(c)->startRequest(opCtx); + + // We should not be holding any locks at this point + invariant(!opCtx->lockState()->isLocked()); + } + + const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; + const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); + + if (op == dbQuery) { + if (nsString.isCommand()) { + isCommand = true; + } + } else if (op == dbCommand || op == dbMsg) { + isCommand = true; + } + + CurOp& currentOp = *CurOp::get(opCtx); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + // Commands handling code will reset this if the operation is a command + // which is logically a basic CRUD operation like query, insert, etc. + currentOp.setNetworkOp_inlock(op); + currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); + } + + OpDebug& debug = currentOp.debug(); + + long long logThresholdMs = serverGlobalParams.slowMS; + bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); + + DbResponse dbresponse; + if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { + dbresponse = runCommands(opCtx, m, behaviors); + } else if (op == dbQuery) { + invariant(!isCommand); + dbresponse = receivedQuery(opCtx, nsString, c, m); + } else if (op == dbGetMore) { + dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); + } else { + // The remaining operations do not return any response. They are fire-and-forget. + try { + if (op == dbKillCursors) { + currentOp.ensureStarted(); + logThresholdMs = 10; + receivedKillCursors(opCtx, m); + } else if (op != dbInsert && op != dbUpdate && op != dbDelete) { + log() << " operation isn't supported: " << static_cast<int>(op); + currentOp.done(); + shouldLogOpDebug = true; + } else { + if (!opCtx->getClient()->isInDirectClient()) { + uassert(18663, + str::stream() << "legacy writeOps not longer supported for " + << "versioned connections, ns: " + << nsString.ns() + << ", op: " + << networkOpToString(op), + !ShardedConnectionInfo::get(&c, false)); + } + + if (!nsString.isValid()) { + uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false); + } else if (op == dbInsert) { + receivedInsert(opCtx, nsString, m); + } else if (op == dbUpdate) { + receivedUpdate(opCtx, nsString, m); + } else if (op == dbDelete) { + receivedDelete(opCtx, nsString, m); + } else { + invariant(false); + } + } + } catch (const AssertionException& ue) { + LastError::get(c).setLastError(ue.code(), ue.reason()); + LOG(3) << " Caught Assertion in " << networkOpToString(op) << ", continuing " + << redact(ue); + debug.errInfo = ue.toStatus(); + } + } + currentOp.ensureStarted(); + currentOp.done(); + debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()); + + Top::get(opCtx->getServiceContext()) + .incrementGlobalLatencyStats( + opCtx, + durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()), + currentOp.getReadWriteType()); + + const bool shouldSample = serverGlobalParams.sampleRate == 1.0 + ? true + : c.getPrng().nextCanonicalDouble() < serverGlobalParams.sampleRate; + + if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { + Locker::LockerInfo lockerInfo; + opCtx->lockState()->getLockerInfo(&lockerInfo); + log() << debug.report(&c, currentOp, lockerInfo.stats); + } + + if (currentOp.shouldDBProfile(shouldSample)) { + // Performance profiling is on + if (opCtx->lockState()->isReadLocked()) { + LOG(1) << "note: not profiling because recursive read lock"; + } else if (behaviors.lockedForWriting()) { + // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post + // lockedForWriting() call but prior to profile collection lock acquisition. + LOG(1) << "note: not profiling because doing fsync+lock"; + } else if (storageGlobalParams.readOnly) { + LOG(1) << "note: not profiling because server is read-only"; + } else { + profile(opCtx, op); + } + } + + recordCurOpMetrics(opCtx); + return dbresponse; +} + +ServiceEntryPointCommon::Hooks::~Hooks() = default; + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_common.h b/src/mongo/db/service_entry_point_common.h new file mode 100644 index 00000000000..0ec781f8c0f --- /dev/null +++ b/src/mongo/db/service_entry_point_common.h @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/transport/service_entry_point_impl.h" + +#include "mongo/base/status.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbmessage.h" +#include "mongo/db/operation_context.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/net/message.h" + +namespace mongo { + +MONGO_FP_FORWARD_DECLARE(rsStopGetMore); +MONGO_FP_FORWARD_DECLARE(respondWithNotPrimaryInCommandDispatch); + +// When active, we won't check if we are master in command dispatch. Activate this if you want to +// test failing during command execution. +MONGO_FP_FORWARD_DECLARE(skipCheckingForNotMasterInCommandDispatch); + +/** + * Helpers for writing ServiceEntryPointImpl implementations from a reusable core. + * Implementations are ServiceEntryPointMongo and ServiceEntryPointEmbedded, which share + * most of their code, but vary in small details captured by the Hooks customization + * interface. + */ +struct ServiceEntryPointCommon { + /** + * Interface for customizing ServiceEntryPointImpl behavior. + */ + class Hooks { + public: + virtual ~Hooks(); + virtual bool lockedForWriting() const = 0; + virtual void waitForReadConcern(OperationContext* opCtx, + const Command* command, + const std::string& db, + const OpMsgRequest& request, + const BSONObj& cmdObj) const = 0; + virtual void waitForWriteConcern(OperationContext* opCtx, + const std::string& commandName, + const repl::OpTime& lastOpBeforeRun, + BSONObjBuilder* commandResponseBuilder) const = 0; + + virtual void waitForLinearizableReadConcern(OperationContext* opCtx) const = 0; + virtual void uassertCommandDoesNotSpecifyWriteConcern(const BSONObj& cmdObj) const = 0; + + virtual void attachCurOpErrInfo(OperationContext* opCtx, + BSONObjBuilder& replyObj) const = 0; + }; + + static DbResponse handleRequest(OperationContext* opCtx, const Message& m, const Hooks& hooks); + + /** + * Produce a new object based on cmdObj, but with redactions applied as specified by + * `command->redactForLogging`. + */ + static BSONObj getRedactedCopyForLogging(const Command* command, const BSONObj& cmdObj); +}; + +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index a4134a16d42..3b403f52ecb 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -32,1134 +32,96 @@ #include "mongo/db/service_entry_point_mongod.h" -#include "mongo/base/checked_cast.h" -#include "mongo/bson/mutable/document.h" -#include "mongo/db/audit.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/auth/impersonation_session.h" -#include "mongo/db/client.h" -#include "mongo/db/command_can_run_here.h" -#include "mongo/db/commands.h" #include "mongo/db/commands/fsync_locked.h" #include "mongo/db/concurrency/global_lock_acquisition_tracker.h" #include "mongo/db/curop.h" -#include "mongo/db/curop_metrics.h" -#include "mongo/db/cursor_manager.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/initialize_operation_session_info.h" -#include "mongo/db/introspect.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/lasterror.h" -#include "mongo/db/logical_clock.h" -#include "mongo/db/logical_session_id.h" -#include "mongo/db/logical_session_id_helpers.h" -#include "mongo/db/logical_time_validator.h" -#include "mongo/db/ops/write_ops.h" -#include "mongo/db/ops/write_ops_exec.h" -#include "mongo/db/query/find.h" #include "mongo/db/read_concern.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/repl_client_info.h" -#include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/sharded_connection_info.h" -#include "mongo/db/s/sharding_state.h" -#include "mongo/db/session_catalog.h" -#include "mongo/db/stats/counters.h" -#include "mongo/db/stats/top.h" -#include "mongo/rpc/factory.h" +#include "mongo/db/service_entry_point_common.h" +#include "mongo/logger/redaction.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/rpc/metadata.h" -#include "mongo/rpc/metadata/config_server_metadata.h" -#include "mongo/rpc/metadata/logical_time_metadata.h" -#include "mongo/rpc/metadata/oplog_query_metadata.h" -#include "mongo/rpc/metadata/repl_set_metadata.h" -#include "mongo/rpc/metadata/sharding_metadata.h" -#include "mongo/rpc/metadata/tracking_metadata.h" -#include "mongo/rpc/reply_builder_interface.h" -#include "mongo/s/grid.h" -#include "mongo/s/stale_exception.h" -#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/util/net/message.h" -#include "mongo/util/net/op_msg.h" -#include "mongo/util/scopeguard.h" namespace mongo { -MONGO_FP_DECLARE(rsStopGetMore); -MONGO_FP_DECLARE(respondWithNotPrimaryInCommandDispatch); - -namespace { -using logger::LogComponent; - -// The command names for which to check out a session. -// -// Note: Eval should check out a session because it defaults to running under a global write lock, -// so if it didn't, and the function it was given contains any of these whitelisted commands, they -// would try to check out a session under a lock, which is not allowed. Similarly, -// refreshLogicalSessionCacheNow triggers a bulk update under a lock on the sessions collection. -const StringMap<int> cmdWhitelist = {{"delete", 1}, - {"eval", 1}, - {"$eval", 1}, - {"findandmodify", 1}, - {"findAndModify", 1}, - {"insert", 1}, - {"refreshLogicalSessionCacheNow", 1}, - {"update", 1}, - {"find", 1}, - {"getMore", 1}}; - -BSONObj getRedactedCopyForLogging(const Command* command, const BSONObj& cmdObj) { - mutablebson::Document cmdToLog(cmdObj, mutablebson::Document::kInPlaceDisabled); - command->redactForLogging(&cmdToLog); - BSONObjBuilder bob; - cmdToLog.writeTo(&bob); - return bob.obj(); -} - -void generateLegacyQueryErrorResponse(const AssertionException* exception, - const QueryMessage& queryMessage, - CurOp* curop, - Message* response) { - curop->debug().errInfo = exception->toStatus(); - - log(LogComponent::kQuery) << "assertion " << exception->toString() << " ns:" << queryMessage.ns - << " query:" << (queryMessage.query.valid(BSONVersion::kLatest) - ? queryMessage.query.toString() - : "query object is corrupt"); - if (queryMessage.ntoskip || queryMessage.ntoreturn) { - log(LogComponent::kQuery) << " ntoskip:" << queryMessage.ntoskip - << " ntoreturn:" << queryMessage.ntoreturn; - } - - auto scex = exception->extraInfo<StaleConfigInfo>(); - - BSONObjBuilder err; - err.append("$err", exception->reason()); - err.append("code", exception->code()); - if (scex) { - err.append("ok", 0.0); - err.append("ns", scex->getns()); - scex->getVersionReceived().addToBSON(err, "vReceived"); - scex->getVersionWanted().addToBSON(err, "vWanted"); - } - BSONObj errObj = err.done(); - - if (scex) { - log(LogComponent::kQuery) << "stale version detected during query over " << queryMessage.ns - << " : " << errObj; - } - - BufBuilder bb; - bb.skip(sizeof(QueryResult::Value)); - bb.appendBuf((void*)errObj.objdata(), errObj.objsize()); - - // TODO: call replyToQuery() from here instead of this!!! see dbmessage.h - QueryResult::View msgdata = bb.buf(); - QueryResult::View qr = msgdata; - qr.setResultFlags(ResultFlag_ErrSet); - if (scex) - qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale); - qr.msgdata().setLen(bb.len()); - qr.msgdata().setOperation(opReply); - qr.setCursorId(0); - qr.setStartingFrom(0); - qr.setNReturned(1); - response->setData(bb.release()); -} - -void registerError(OperationContext* opCtx, const DBException& exception) { - LastError::get(opCtx->getClient()).setLastError(exception.code(), exception.reason()); - CurOp::get(opCtx)->debug().errInfo = exception.toStatus(); -} - -void _generateErrorResponse(OperationContext* opCtx, - rpc::ReplyBuilderInterface* replyBuilder, - const DBException& exception, - const BSONObj& replyMetadata) { - registerError(opCtx, exception); - - // We could have thrown an exception after setting fields in the builder, - // so we need to reset it to a clean state just to be sure. - replyBuilder->reset(); - replyBuilder->setCommandReply(exception.toStatus()); - replyBuilder->setMetadata(replyMetadata); -} - -void _generateErrorResponse(OperationContext* opCtx, - rpc::ReplyBuilderInterface* replyBuilder, - const DBException& exception, - const BSONObj& replyMetadata, - LogicalTime operationTime) { - registerError(opCtx, exception); - - // We could have thrown an exception after setting fields in the builder, - // so we need to reset it to a clean state just to be sure. - replyBuilder->reset(); - replyBuilder->setCommandReply(exception.toStatus(), - BSON("operationTime" << operationTime.asTimestamp())); - replyBuilder->setMetadata(replyMetadata); -} - -/** - * Guard object for making a good-faith effort to enter maintenance mode and leave it when it - * goes out of scope. - * - * Sometimes we cannot set maintenance mode, in which case the call to setMaintenanceMode will - * return a non-OK status. This class does not treat that case as an error which means that - * anybody using it is assuming it is ok to continue execution without maintenance mode. - * - * TODO: This assumption needs to be audited and documented, or this behavior should be moved - * elsewhere. - */ -class MaintenanceModeSetter { - MONGO_DISALLOW_COPYING(MaintenanceModeSetter); - +class ServiceEntryPointMongod::Hooks final : public ServiceEntryPointCommon::Hooks { public: - MaintenanceModeSetter(OperationContext* opCtx) - : _opCtx(opCtx), - _maintenanceModeSet( - repl::ReplicationCoordinator::get(_opCtx)->setMaintenanceMode(true).isOK()) {} - - ~MaintenanceModeSetter() { - if (_maintenanceModeSet) { - repl::ReplicationCoordinator::get(_opCtx) - ->setMaintenanceMode(false) - .transitional_ignore(); - } - } - -private: - OperationContext* const _opCtx; - const bool _maintenanceModeSet; -}; - -// Called from the error contexts where request may not be available. -// It only attaches clusterTime and operationTime. -void appendReplyMetadataOnError(OperationContext* opCtx, BSONObjBuilder* metadataBob) { - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + bool lockedForWriting() const override { + return mongo::lockedForWriting(); + } + + void waitForReadConcern(OperationContext* opCtx, + const Command* command, + const std::string& db, + const OpMsgRequest& request, + const BSONObj& cmdObj) const override { + Status rcStatus = mongo::waitForReadConcern( + opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmdObj)); + if (!rcStatus.isOK()) { + if (rcStatus == ErrorCodes::ExceededTimeLimit) { + const int debugLevel = + serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2; + LOG(debugLevel) << "Command on database " << db + << " timed out waiting for read concern to be satisfied. Command: " + << redact(ServiceEntryPointCommon::getRedactedCopyForLogging( + command, request.body)); + } - if (isReplSet) { - if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { - // No need to sign cluster times for internal clients. - SignedLogicalTime currentTime( - LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } else if (auto validator = LogicalTimeValidator::get(opCtx)) { - auto currentTime = - validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); + uassertStatusOK(rcStatus); } } -} -void appendReplyMetadata(OperationContext* opCtx, - const OpMsgRequest& request, - BSONObjBuilder* metadataBob) { - const bool isShardingAware = ShardingState::get(opCtx)->enabled(); - const bool isConfig = serverGlobalParams.clusterRole == ClusterRole::ConfigServer; - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - - if (isReplSet) { - // Attach our own last opTime. - repl::OpTime lastOpTimeFromClient = - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - replCoord->prepareReplMetadata(request.body, lastOpTimeFromClient, metadataBob); - // For commands from mongos, append some info to help getLastError(w) work. - // TODO: refactor out of here as part of SERVER-18236 - if (isShardingAware || isConfig) { - rpc::ShardingMetadata(lastOpTimeFromClient, replCoord->getElectionId()) - .writeToMetadata(metadataBob) - .transitional_ignore(); - } - if (LogicalTimeValidator::isAuthorizedToAdvanceClock(opCtx)) { - // No need to sign cluster times for internal clients. - SignedLogicalTime currentTime( - LogicalClock::get(opCtx)->getClusterTime(), TimeProofService::TimeProof(), 0); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); - } else if (auto validator = LogicalTimeValidator::get(opCtx)) { - auto currentTime = - validator->trySignLogicalTime(LogicalClock::get(opCtx)->getClusterTime()); - rpc::LogicalTimeMetadata logicalTimeMetadata(currentTime); - logicalTimeMetadata.writeToMetadata(metadataBob); + void waitForWriteConcern(OperationContext* opCtx, + const std::string& commandName, + const repl::OpTime& lastOpBeforeRun, + BSONObjBuilder* commandResponseBuilder) const override { + auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + // Ensures that if we tried to do a write, we wait for write concern, even if that write was + // a noop. + if ((lastOpAfterRun == lastOpBeforeRun) && + GlobalLockAcquisitionTracker::get(opCtx).getGlobalExclusiveLockTaken()) { + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); } - } - - // If we're a shard other than the config shard, attach the last configOpTime we know about. - if (isShardingAware && !isConfig) { - auto opTime = grid.configOpTime(); - rpc::ConfigServerMetadata(opTime).writeToMetadata(metadataBob); - } -} - -/** - * Given the specified command, returns an effective read concern which should be used or an error - * if the read concern is not valid for the command. - */ -StatusWith<repl::ReadConcernArgs> _extractReadConcern(const Command* command, - const std::string& dbName, - const BSONObj& cmdObj) { - repl::ReadConcernArgs readConcernArgs; - - auto readConcernParseStatus = readConcernArgs.initialize(cmdObj); - if (!readConcernParseStatus.isOK()) { - return readConcernParseStatus; - } - - if (!command->supportsReadConcern(dbName, cmdObj, readConcernArgs.getLevel())) { - return {ErrorCodes::InvalidOptions, - str::stream() << "Command does not support read concern " - << readConcernArgs.toString()}; - } - - return readConcernArgs; -} - -void _waitForWriteConcernAndAddToCommandResponse(OperationContext* opCtx, - const std::string& commandName, - const repl::OpTime& lastOpBeforeRun, - BSONObjBuilder* commandResponseBuilder) { - auto lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - - // Ensures that if we tried to do a write, we wait for write concern, even if that write was - // a noop. - if ((lastOpAfterRun == lastOpBeforeRun) && - GlobalLockAcquisitionTracker::get(opCtx).getGlobalExclusiveLockTaken()) { - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - lastOpAfterRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - } - - WriteConcernResult res; - auto waitForWCStatus = - waitForWriteConcern(opCtx, lastOpAfterRun, opCtx->getWriteConcern(), &res); - CommandHelpers::appendCommandWCStatus(*commandResponseBuilder, waitForWCStatus, res); - - // SERVER-22421: This code is to ensure error response backwards compatibility with the - // user management commands. This can be removed in 3.6. - if (!waitForWCStatus.isOK() && CommandHelpers::isUserManagementCommand(commandName)) { - BSONObj temp = commandResponseBuilder->asTempObj().copy(); - commandResponseBuilder->resetToEmpty(); - CommandHelpers::appendCommandStatus(*commandResponseBuilder, waitForWCStatus); - commandResponseBuilder->appendElementsUnique(temp); - } -} - -/** - * For replica set members it returns the last known op time from opCtx. Otherwise will return - * uninitialized cluster time. - */ -LogicalTime getClientOperationTime(OperationContext* opCtx) { - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - LogicalTime operationTime; - if (isReplSet) { - operationTime = LogicalTime( - repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp().getTimestamp()); - } - return operationTime; -} - -/** - * Returns the proper operationTime for a command. To construct the operationTime for replica set - * members, it uses the last optime in the oplog for writes, last committed optime for majority - * reads, and the last applied optime for every other read. An uninitialized cluster time is - * returned for non replica set members. - */ -LogicalTime computeOperationTime(OperationContext* opCtx, - LogicalTime startOperationTime, - repl::ReadConcernLevel level) { - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - const bool isReplSet = - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - if (!isReplSet) { - return LogicalTime(); - } + WriteConcernResult res; + auto waitForWCStatus = + mongo::waitForWriteConcern(opCtx, lastOpAfterRun, opCtx->getWriteConcern(), &res); - auto operationTime = getClientOperationTime(opCtx); - invariant(operationTime >= startOperationTime); + CommandHelpers::appendCommandWCStatus(*commandResponseBuilder, waitForWCStatus, res); - // If the last operationTime has not changed, consider this command a read, and, for replica set - // members, construct the operationTime with the proper optime for its read concern level. - if (operationTime == startOperationTime) { - if (level == repl::ReadConcernLevel::kMajorityReadConcern) { - operationTime = LogicalTime(replCoord->getLastCommittedOpTime().getTimestamp()); - } else { - operationTime = LogicalTime(replCoord->getMyLastAppliedOpTime().getTimestamp()); + // SERVER-22421: This code is to ensure error response backwards compatibility with the + // user management commands. This can be removed in 3.6. + if (!waitForWCStatus.isOK() && CommandHelpers::isUserManagementCommand(commandName)) { + BSONObj temp = commandResponseBuilder->asTempObj().copy(); + commandResponseBuilder->resetToEmpty(); + CommandHelpers::appendCommandStatus(*commandResponseBuilder, waitForWCStatus); + commandResponseBuilder->appendElementsUnique(temp); } } - return operationTime; -} - -bool runCommandImpl(OperationContext* opCtx, - Command* command, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder, - LogicalTime startOperationTime) { - auto bytesToReserve = command->reserveBytesForReply(); - -// SERVER-22100: In Windows DEBUG builds, the CRT heap debugging overhead, in conjunction with the -// additional memory pressure introduced by reply buffer pre-allocation, causes the concurrency -// suite to run extremely slowly. As a workaround we do not pre-allocate in Windows DEBUG builds. -#ifdef _WIN32 - if (kDebugBuild) - bytesToReserve = 0; -#endif - - // run expects non-const bsonobj - BSONObj cmd = request.body; - - // run expects const db std::string (can't bind to temporary) - const std::string db = request.getDatabase().toString(); - - BSONObjBuilder inPlaceReplyBob = replyBuilder->getInPlaceReplyBuilder(bytesToReserve); - - Status rcStatus = waitForReadConcern( - opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd)); - if (!rcStatus.isOK()) { - if (rcStatus == ErrorCodes::ExceededTimeLimit) { - const int debugLevel = - serverGlobalParams.clusterRole == ClusterRole::ConfigServer ? 0 : 2; - LOG(debugLevel) << "Command on database " << db - << " timed out waiting for read concern to be satisfied. Command: " - << redact(getRedactedCopyForLogging(command, request.body)); + void waitForLinearizableReadConcern(OperationContext* opCtx) const override { + // When a linearizable read command is passed in, check to make sure we're reading + // from the primary. + if (repl::ReadConcernArgs::get(opCtx).getLevel() == + repl::ReadConcernLevel::kLinearizableReadConcern) { + uassertStatusOK(mongo::waitForLinearizableReadConcern(opCtx)); } - - uassertStatusOK(rcStatus); } - bool result; - if (!command->supportsWriteConcern(cmd)) { + void uassertCommandDoesNotSpecifyWriteConcern(const BSONObj& cmd) const override { if (commandSpecifiesWriteConcern(cmd)) { uassertStatusOK({ErrorCodes::InvalidOptions, "Command does not support writeConcern"}); } - - result = command->publicRun(opCtx, request, inPlaceReplyBob); - } else { - auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, cmd, db)); - - auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - - // Change the write concern while running the command. - const auto oldWC = opCtx->getWriteConcern(); - ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - opCtx->setWriteConcern(wcResult); - ON_BLOCK_EXIT([&] { - _waitForWriteConcernAndAddToCommandResponse( - opCtx, command->getName(), lastOpBeforeRun, &inPlaceReplyBob); - }); - - result = command->publicRun(opCtx, request, inPlaceReplyBob); - - // Nothing in run() should change the writeConcern. - dassert(SimpleBSONObjComparator::kInstance.evaluate(opCtx->getWriteConcern().toBSON() == - wcResult.toBSON())); - } - - // When a linearizable read command is passed in, check to make sure we're reading - // from the primary. - if (repl::ReadConcernArgs::get(opCtx).getLevel() == - repl::ReadConcernLevel::kLinearizableReadConcern) { - - uassertStatusOK(waitForLinearizableReadConcern(opCtx)); - } - - CommandHelpers::appendCommandStatus(inPlaceReplyBob, result); - CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(inPlaceReplyBob.asTempObj()); - - auto operationTime = computeOperationTime( - opCtx, startOperationTime, repl::ReadConcernArgs::get(opCtx).getLevel()); - - // An uninitialized operation time means the cluster time is not propagated, so the operation - // time should not be attached to the response. - if (operationTime != LogicalTime::kUninitialized) { - operationTime.appendAsOperationTime(&inPlaceReplyBob); - } - - inPlaceReplyBob.doneFast(); - - BSONObjBuilder metadataBob; - appendReplyMetadata(opCtx, request, &metadataBob); - replyBuilder->setMetadata(metadataBob.done()); - - return result; -} - -// When active, we won't check if we are master in command dispatch. Activate this if you want to -// test failing during command execution. -MONGO_FP_DECLARE(skipCheckingForNotMasterInCommandDispatch); - -/** - * Executes a command after stripping metadata, performing authorization checks, - * handling audit impersonation, and (potentially) setting maintenance mode. This method - * also checks that the command is permissible to run on the node given its current - * replication state. All the logic here is independent of any particular command; any - * functionality relevant to a specific command should be confined to its run() method. - */ -void execCommandDatabase(OperationContext* opCtx, - Command* command, - const OpMsgRequest& request, - rpc::ReplyBuilderInterface* replyBuilder) { - - auto startOperationTime = getClientOperationTime(opCtx); - try { - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setCommand_inlock(command); - } - - // TODO: move this back to runCommands when mongos supports OperationContext - // see SERVER-18515 for details. - rpc::readRequestMetadata(opCtx, request.body); - rpc::TrackingMetadata::get(opCtx).initWithOperName(command->getName()); - - auto const replCoord = repl::ReplicationCoordinator::get(opCtx); - initializeOperationSessionInfo( - opCtx, - request.body, - command->requiresAuth(), - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet, - opCtx->getServiceContext()->getGlobalStorageEngine()->supportsDocLocking()); - - const auto dbname = request.getDatabase().toString(); - uassert( - ErrorCodes::InvalidNamespace, - str::stream() << "Invalid database name: '" << dbname << "'", - NamespaceString::validDBName(dbname, NamespaceString::DollarInDbNameBehavior::Allow)); - - std::unique_ptr<MaintenanceModeSetter> mmSetter; - - BSONElement cmdOptionMaxTimeMSField; - BSONElement allowImplicitCollectionCreationField; - BSONElement helpField; - BSONElement shardVersionFieldIdx; - BSONElement queryOptionMaxTimeMSField; - - StringMap<int> topLevelFields; - for (auto&& element : request.body) { - StringData fieldName = element.fieldNameStringData(); - if (fieldName == QueryRequest::cmdOptionMaxTimeMS) { - cmdOptionMaxTimeMSField = element; - } else if (fieldName == "allowImplicitCollectionCreation") { - allowImplicitCollectionCreationField = element; - } else if (fieldName == CommandHelpers::kHelpFieldName) { - helpField = element; - } else if (fieldName == ChunkVersion::kShardVersionField) { - shardVersionFieldIdx = element; - } else if (fieldName == QueryRequest::queryOptionMaxTimeMS) { - queryOptionMaxTimeMSField = element; - } - - uassert(ErrorCodes::FailedToParse, - str::stream() << "Parsed command object contains duplicate top level key: " - << fieldName, - topLevelFields[fieldName]++ == 0); - } - - if (CommandHelpers::isHelpRequest(helpField)) { - CurOp::get(opCtx)->ensureStarted(); - // We disable last-error for help requests due to SERVER-11492, because config servers - // use help requests to determine which commands are database writes, and so must be - // forwarded to all config servers. - LastError::get(opCtx->getClient()).disable(); - Command::generateHelpResponse(opCtx, replyBuilder, *command); - return; - } - - // Session ids are forwarded in requests, so commands that require roundtrips between - // servers may result in a deadlock when a server tries to check out a session it is already - // using to service an earlier operation in the command's chain. To avoid this, only check - // out sessions for commands that require them (i.e. write commands). - // Session checkout is also prevented for commands run within DBDirectClient. If checkout is - // required, it is expected to be handled by the outermost command. - const bool shouldCheckoutSession = - cmdWhitelist.find(command->getName()) != cmdWhitelist.cend() && - !opCtx->getClient()->isInDirectClient(); - OperationContextSession sessionTxnState(opCtx, shouldCheckoutSession); - - ImpersonationSessionGuard guard(opCtx); - uassertStatusOK(Command::checkAuthorization(command, opCtx, request)); - - const bool iAmPrimary = replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname); - - if (!opCtx->getClient()->isInDirectClient() && - !MONGO_FAIL_POINT(skipCheckingForNotMasterInCommandDispatch)) { - auto allowed = command->secondaryAllowed(opCtx->getServiceContext()); - bool alwaysAllowed = allowed == Command::AllowedOnSecondary::kAlways; - bool couldHaveOptedIn = allowed == Command::AllowedOnSecondary::kOptIn; - bool optedIn = - couldHaveOptedIn && ReadPreferenceSetting::get(opCtx).canRunOnSecondary(); - bool canRunHere = commandCanRunHere(opCtx, dbname, command); - if (!canRunHere && couldHaveOptedIn) { - uasserted(ErrorCodes::NotMasterNoSlaveOk, "not master and slaveOk=false"); - } - - if (MONGO_FAIL_POINT(respondWithNotPrimaryInCommandDispatch)) { - uassert(ErrorCodes::NotMaster, "not primary", canRunHere); - } else { - uassert(ErrorCodes::NotMaster, "not master", canRunHere); - } - - if (!command->maintenanceOk() && - replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet && - !replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, dbname) && - !replCoord->getMemberState().secondary()) { - - uassert(ErrorCodes::NotMasterOrSecondary, - "node is recovering", - !replCoord->getMemberState().recovering()); - uassert(ErrorCodes::NotMasterOrSecondary, - "node is not in primary or recovering state", - replCoord->getMemberState().primary()); - // Check ticket SERVER-21432, slaveOk commands are allowed in drain mode - uassert(ErrorCodes::NotMasterOrSecondary, - "node is in drain mode", - optedIn || alwaysAllowed); - } - } - - if (command->adminOnly()) { - LOG(2) << "command: " << request.getCommandName(); - } - - if (command->maintenanceMode()) { - mmSetter.reset(new MaintenanceModeSetter(opCtx)); - } - - if (command->shouldAffectCommandCounter()) { - OpCounters* opCounters = &globalOpCounters; - opCounters->gotCommand(); - } - - // Handle command option maxTimeMS. - int maxTimeMS = uassertStatusOK(QueryRequest::parseMaxTimeMS(cmdOptionMaxTimeMSField)); - - uassert(ErrorCodes::InvalidOptions, - "no such command option $maxTimeMs; use maxTimeMS instead", - queryOptionMaxTimeMSField.eoo()); - - if (maxTimeMS > 0) { - uassert(40119, - "Illegal attempt to set operation deadline within DBDirectClient", - !opCtx->getClient()->isInDirectClient()); - opCtx->setDeadlineAfterNowBy(Milliseconds{maxTimeMS}); - } - - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - readConcernArgs = uassertStatusOK(_extractReadConcern(command, dbname, request.body)); - - auto& oss = OperationShardingState::get(opCtx); - - if (!opCtx->getClient()->isInDirectClient() && - readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && - (iAmPrimary || - (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - oss.initializeShardVersion(NamespaceString(command->parseNs(dbname, request.body)), - shardVersionFieldIdx); - - auto const shardingState = ShardingState::get(opCtx); - if (oss.hasShardVersion()) { - uassertStatusOK(shardingState->canAcceptShardedCommands()); - } - - // Handle config optime information that may have been sent along with the command. - uassertStatusOK(shardingState->updateConfigServerOpTimeFromMetadata(opCtx)); - } - - oss.setAllowImplicitCollectionCreation(allowImplicitCollectionCreationField); - - // Can throw - opCtx->checkForInterrupt(); // May trigger maxTimeAlwaysTimeOut fail point. - - bool retval = false; - - CurOp::get(opCtx)->ensureStarted(); - - command->incrementCommandsExecuted(); - - if (logger::globalLogDomain()->shouldLog(logger::LogComponent::kTracking, - logger::LogSeverity::Debug(1)) && - rpc::TrackingMetadata::get(opCtx).getParentOperId()) { - MONGO_LOG_COMPONENT(1, logger::LogComponent::kTracking) - << rpc::TrackingMetadata::get(opCtx).toString(); - rpc::TrackingMetadata::get(opCtx).setIsLogged(true); - } - - retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime); - - if (!retval) { - command->incrementCommandsFailed(); - } - } catch (const DBException& e) { - // If we got a stale config, wait in case the operation is stuck in a critical section - if (auto sce = e.extraInfo<StaleConfigInfo>()) { - if (!opCtx->getClient()->isInDirectClient()) { - ShardingState::get(opCtx) - ->onStaleShardVersion( - opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) - .transitional_ignore(); - } - } - - BSONObjBuilder metadataBob; - appendReplyMetadata(opCtx, request, &metadataBob); - - // Note: the read concern may not have been successfully or yet placed on the opCtx, so - // parsing it separately here. - const std::string db = request.getDatabase().toString(); - auto readConcernArgsStatus = _extractReadConcern(command, db, request.body); - auto operationTime = readConcernArgsStatus.isOK() - ? computeOperationTime( - opCtx, startOperationTime, readConcernArgsStatus.getValue().getLevel()) - : LogicalClock::get(opCtx)->getClusterTime(); - - // An uninitialized operation time means the cluster time is not propagated, so the - // operation time should not be attached to the error response. - if (operationTime != LogicalTime::kUninitialized) { - LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " - << "on database '" << request.getDatabase() << "' " - << "with arguments '" << getRedactedCopyForLogging(command, request.body) - << "' and operationTime '" << operationTime.toString() << "': " << e.toString(); - - _generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj(), operationTime); - } else { - LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " - << "on database '" << request.getDatabase() << "' " - << "with arguments '" << getRedactedCopyForLogging(command, request.body) - << "': " << e.toString(); - - _generateErrorResponse(opCtx, replyBuilder, e, metadataBob.obj()); - } - } -} - -/** - * Fills out CurOp / OpDebug with basic command info. - */ -void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { - auto curop = CurOp::get(opCtx); - curop->debug().iscommand = true; - - // We construct a legacy $cmd namespace so we can fill in curOp using - // the existing logic that existed for OP_QUERY commands - NamespaceString nss(request.getDatabase(), "$cmd"); - - stdx::lock_guard<Client> lk(*opCtx->getClient()); - curop->setOpDescription_inlock(request.body); - curop->markCommand_inlock(); - curop->setNS_inlock(nss.ns()); -} - -DbResponse runCommands(OperationContext* opCtx, const Message& message) { - auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); - [&] { - OpMsgRequest request; - try { // Parse. - request = rpc::opMsgRequestFromAnyProtocol(message); - } catch (const DBException& ex) { - // If this error needs to fail the connection, propagate it out. - if (ErrorCodes::isConnectionFatalMessageParseError(ex.code())) - throw; - - auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); - BSONObjBuilder metadataBob; - appendReplyMetadataOnError(opCtx, &metadataBob); - // Otherwise, reply with the parse error. This is useful for cases where parsing fails - // due to user-supplied input, such as the document too deep error. Since we failed - // during parsing, we can't log anything about the command. - LOG(1) << "assertion while parsing command: " << ex.toString(); - _generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); - - return; // From lambda. Don't try executing if parsing failed. - } - - try { // Execute. - curOpCommandSetup(opCtx, request); - - Command* c = nullptr; - // In the absence of a Command object, no redaction is possible. Therefore - // to avoid displaying potentially sensitive information in the logs, - // we restrict the log message to the name of the unrecognized command. - // However, the complete command object will still be echoed to the client. - if (!(c = CommandHelpers::findCommand(request.getCommandName()))) { - globalCommandRegistry()->incrementUnknownCommands(); - std::string msg = str::stream() << "no such command: '" << request.getCommandName() - << "'"; - LOG(2) << msg; - uasserted(ErrorCodes::CommandNotFound, - str::stream() << msg << ", bad cmd: '" << redact(request.body) << "'"); - } - - LOG(2) << "run command " << request.getDatabase() << ".$cmd" << ' ' - << getRedactedCopyForLogging(c, request.body); - - { - // Try to set this as early as possible, as soon as we have figured out the command. - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setLogicalOp_inlock(c->getLogicalOp()); - } - - execCommandDatabase(opCtx, c, request, replyBuilder.get()); - } catch (const DBException& ex) { - BSONObjBuilder metadataBob; - appendReplyMetadataOnError(opCtx, &metadataBob); - auto operationTime = LogicalClock::get(opCtx)->getClusterTime(); - LOG(1) << "assertion while executing command '" << request.getCommandName() << "' " - << "on database '" << request.getDatabase() << "': " << ex.toString(); - - _generateErrorResponse(opCtx, replyBuilder.get(), ex, metadataBob.obj(), operationTime); - } - }(); - - if (OpMsg::isFlagSet(message, OpMsg::kMoreToCome)) { - // Close the connection to get client to go through server selection again. - uassert(ErrorCodes::NotMaster, - "Not-master error during fire-and-forget command processing", - !LastError::get(opCtx->getClient()).hadNotMasterError()); - - return {}; // Don't reply. - } - - auto response = replyBuilder->done(); - CurOp::get(opCtx)->debug().responseLength = response.header().dataLen(); - - // TODO exhaust - return DbResponse{std::move(response)}; -} - -DbResponse receivedQuery(OperationContext* opCtx, - const NamespaceString& nss, - Client& c, - const Message& m) { - invariant(!nss.isCommand()); - globalOpCounters.gotQuery(); - - DbMessage d(m); - QueryMessage q(d); - - CurOp& op = *CurOp::get(opCtx); - DbResponse dbResponse; - - try { - Client* client = opCtx->getClient(); - Status status = AuthorizationSession::get(client)->checkAuthForFind(nss, false); - audit::logQueryAuthzCheck(client, nss, q.query, status.code()); - uassertStatusOK(status); - - dbResponse.exhaustNS = runQuery(opCtx, q, nss, dbResponse.response); - } catch (const AssertionException& e) { - // If we got a stale config, wait in case the operation is stuck in a critical section - if (auto sce = e.extraInfo<StaleConfigInfo>()) { - if (!opCtx->getClient()->isInDirectClient()) { - ShardingState::get(opCtx) - ->onStaleShardVersion( - opCtx, NamespaceString(sce->getns()), sce->getVersionReceived()) - .transitional_ignore(); - } - } - - dbResponse.response.reset(); - generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response); - } - - op.debug().responseLength = dbResponse.response.header().dataLen(); - return dbResponse; -} - -void receivedKillCursors(OperationContext* opCtx, const Message& m) { - LastError::get(opCtx->getClient()).disable(); - DbMessage dbmessage(m); - int n = dbmessage.pullInt(); - - uassert(13659, "sent 0 cursors to kill", n != 0); - massert(13658, - str::stream() << "bad kill cursors size: " << m.dataSize(), - m.dataSize() == 8 + (8 * n)); - uassert(13004, str::stream() << "sent negative cursors to kill: " << n, n >= 1); - - if (n > 2000) { - (n < 30000 ? warning() : error()) << "receivedKillCursors, n=" << n; - verify(n < 30000); - } - - const char* cursorArray = dbmessage.getArray(n); - - int found = CursorManager::killCursorGlobalIfAuthorized(opCtx, n, cursorArray); - - if (shouldLog(logger::LogSeverity::Debug(1)) || found != n) { - LOG(found == n ? 1 : 0) << "killcursors: found " << found << " of " << n; - } -} - -void receivedInsert(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { - auto insertOp = InsertOp::parseLegacy(m); - invariant(insertOp.getNamespace() == nsString); - - for (const auto& obj : insertOp.getDocuments()) { - Status status = - AuthorizationSession::get(opCtx->getClient())->checkAuthForInsert(opCtx, nsString, obj); - audit::logInsertAuthzCheck(opCtx->getClient(), nsString, obj, status.code()); - uassertStatusOK(status); - } - performInserts(opCtx, insertOp); -} - -void receivedUpdate(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { - auto updateOp = UpdateOp::parseLegacy(m); - auto& singleUpdate = updateOp.getUpdates()[0]; - invariant(updateOp.getNamespace() == nsString); - - Status status = AuthorizationSession::get(opCtx->getClient()) - ->checkAuthForUpdate(opCtx, - nsString, - singleUpdate.getQ(), - singleUpdate.getU(), - singleUpdate.getUpsert()); - audit::logUpdateAuthzCheck(opCtx->getClient(), - nsString, - singleUpdate.getQ(), - singleUpdate.getU(), - singleUpdate.getUpsert(), - singleUpdate.getMulti(), - status.code()); - uassertStatusOK(status); - - performUpdates(opCtx, updateOp); -} - -void receivedDelete(OperationContext* opCtx, const NamespaceString& nsString, const Message& m) { - auto deleteOp = DeleteOp::parseLegacy(m); - auto& singleDelete = deleteOp.getDeletes()[0]; - invariant(deleteOp.getNamespace() == nsString); - - Status status = AuthorizationSession::get(opCtx->getClient()) - ->checkAuthForDelete(opCtx, nsString, singleDelete.getQ()); - audit::logDeleteAuthzCheck(opCtx->getClient(), nsString, singleDelete.getQ(), status.code()); - uassertStatusOK(status); - - performDeletes(opCtx, deleteOp); -} - -DbResponse receivedGetMore(OperationContext* opCtx, - const Message& m, - CurOp& curop, - bool* shouldLogOpDebug) { - globalOpCounters.gotGetMore(); - DbMessage d(m); - - const char* ns = d.getns(); - int ntoreturn = d.pullInt(); - uassert( - 34419, str::stream() << "Invalid ntoreturn for OP_GET_MORE: " << ntoreturn, ntoreturn >= 0); - long long cursorid = d.pullInt64(); - - curop.debug().ntoreturn = ntoreturn; - curop.debug().cursorid = cursorid; - - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - CurOp::get(opCtx)->setNS_inlock(ns); - } - - bool exhaust = false; - bool isCursorAuthorized = false; - - DbResponse dbresponse; - try { - const NamespaceString nsString(ns); - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "Invalid ns [" << ns << "]", - nsString.isValid()); - - Status status = AuthorizationSession::get(opCtx->getClient()) - ->checkAuthForGetMore(nsString, cursorid, false); - audit::logGetMoreAuthzCheck(opCtx->getClient(), nsString, cursorid, status.code()); - uassertStatusOK(status); - - while (MONGO_FAIL_POINT(rsStopGetMore)) { - sleepmillis(0); - } - - dbresponse.response = - getMore(opCtx, ns, ntoreturn, cursorid, &exhaust, &isCursorAuthorized); - } catch (AssertionException& e) { - if (isCursorAuthorized) { - // If a cursor with id 'cursorid' was authorized, it may have been advanced - // before an exception terminated processGetMore. Erase the ClientCursor - // because it may now be out of sync with the client's iteration state. - // SERVER-7952 - // TODO Temporary code, see SERVER-4563 for a cleanup overview. - CursorManager::killCursorGlobal(opCtx, cursorid); - } - - BSONObjBuilder err; - err.append("$err", e.reason()); - err.append("code", e.code()); - BSONObj errObj = err.obj(); - - curop.debug().errInfo = e.toStatus(); - - dbresponse = replyToQuery(errObj, ResultFlag_ErrSet); - curop.debug().responseLength = dbresponse.response.header().dataLen(); - curop.debug().nreturned = 1; - *shouldLogOpDebug = true; - return dbresponse; } - curop.debug().responseLength = dbresponse.response.header().dataLen(); - auto queryResult = QueryResult::ConstView(dbresponse.response.buf()); - curop.debug().nreturned = queryResult.getNReturned(); - - if (exhaust) { - curop.debug().exhaust = true; - dbresponse.exhaustNS = ns; + void attachCurOpErrInfo(OperationContext* opCtx, BSONObjBuilder& replyObj) const override { + CurOp::get(opCtx)->debug().errInfo = getStatusFromCommandResult(replyObj.asTempObj()); } - - return dbresponse; -} - -} // namespace +}; DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) { - // before we lock... - NetworkOp op = m.operation(); - bool isCommand = false; - - DbMessage dbmsg(m); - - Client& c = *opCtx->getClient(); - if (c.isInDirectClient()) { - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - } else { - LastError::get(c).startRequest(); - AuthorizationSession::get(c)->startRequest(opCtx); - - // We should not be holding any locks at this point - invariant(!opCtx->lockState()->isLocked()); - } - - const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL; - const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString(); - - if (op == dbQuery) { - if (nsString.isCommand()) { - isCommand = true; - } - } else if (op == dbCommand || op == dbMsg) { - isCommand = true; - } - - CurOp& currentOp = *CurOp::get(opCtx); - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - // Commands handling code will reset this if the operation is a command - // which is logically a basic CRUD operation like query, insert, etc. - currentOp.setNetworkOp_inlock(op); - currentOp.setLogicalOp_inlock(networkOpToLogicalOp(op)); - } - - OpDebug& debug = currentOp.debug(); - - long long logThresholdMs = serverGlobalParams.slowMS; - bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1)); - - DbResponse dbresponse; - if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { - dbresponse = runCommands(opCtx, m); - } else if (op == dbQuery) { - invariant(!isCommand); - dbresponse = receivedQuery(opCtx, nsString, c, m); - } else if (op == dbGetMore) { - dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug); - } else { - // The remaining operations do not return any response. They are fire-and-forget. - try { - if (op == dbKillCursors) { - currentOp.ensureStarted(); - logThresholdMs = 10; - receivedKillCursors(opCtx, m); - } else if (op != dbInsert && op != dbUpdate && op != dbDelete) { - log() << " operation isn't supported: " << static_cast<int>(op); - currentOp.done(); - shouldLogOpDebug = true; - } else { - if (!opCtx->getClient()->isInDirectClient()) { - uassert(18663, - str::stream() << "legacy writeOps not longer supported for " - << "versioned connections, ns: " - << nsString.ns() - << ", op: " - << networkOpToString(op), - !ShardedConnectionInfo::get(&c, false)); - } - - if (!nsString.isValid()) { - uassert(16257, str::stream() << "Invalid ns [" << ns << "]", false); - } else if (op == dbInsert) { - receivedInsert(opCtx, nsString, m); - } else if (op == dbUpdate) { - receivedUpdate(opCtx, nsString, m); - } else if (op == dbDelete) { - receivedDelete(opCtx, nsString, m); - } else { - invariant(false); - } - } - } catch (const AssertionException& ue) { - LastError::get(c).setLastError(ue.code(), ue.reason()); - LOG(3) << " Caught Assertion in " << networkOpToString(op) << ", continuing " - << redact(ue); - debug.errInfo = ue.toStatus(); - } - } - currentOp.ensureStarted(); - currentOp.done(); - debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()); - - Top::get(opCtx->getServiceContext()) - .incrementGlobalLatencyStats( - opCtx, - durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses()), - currentOp.getReadWriteType()); - - const bool shouldSample = serverGlobalParams.sampleRate == 1.0 - ? true - : c.getPrng().nextCanonicalDouble() < serverGlobalParams.sampleRate; - - if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) { - Locker::LockerInfo lockerInfo; - opCtx->lockState()->getLockerInfo(&lockerInfo); - log() << debug.report(&c, currentOp, lockerInfo.stats); - } - - if (currentOp.shouldDBProfile(shouldSample)) { - // Performance profiling is on - if (opCtx->lockState()->isReadLocked()) { - LOG(1) << "note: not profiling because recursive read lock"; - } else if (lockedForWriting()) { - // TODO SERVER-26825: Fix race condition where fsyncLock is acquired post - // lockedForWriting() call but prior to profile collection lock acquisition. - LOG(1) << "note: not profiling because doing fsync+lock"; - } else if (storageGlobalParams.readOnly) { - LOG(1) << "note: not profiling because server is read-only"; - } else { - profile(opCtx, op); - } - } - - recordCurOpMetrics(opCtx); - return dbresponse; + return ServiceEntryPointCommon::handleRequest(opCtx, m, Hooks{}); } } // namespace mongo diff --git a/src/mongo/db/service_entry_point_mongod.h b/src/mongo/db/service_entry_point_mongod.h index 1c41dfe6540..49922e40c0f 100644 --- a/src/mongo/db/service_entry_point_mongod.h +++ b/src/mongo/db/service_entry_point_mongod.h @@ -42,6 +42,9 @@ class ServiceEntryPointMongod final : public ServiceEntryPointImpl { public: using ServiceEntryPointImpl::ServiceEntryPointImpl; DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; + +private: + class Hooks; }; } // namespace mongo |