/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/sharding_task_executor.h" #include "mongo/base/disallow_copying.h" #include "mongo/base/status_with.h" #include "mongo/bson/timestamp.h" #include "mongo/db/logical_time.h" #include "mongo/db/operation_time_tracker.h" #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/sharding_metadata.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace executor { namespace { const std::string kOperationTimeField = "operationTime"; } ShardingTaskExecutor::ShardingTaskExecutor(std::unique_ptr executor) : _executor(std::move(executor)) {} void ShardingTaskExecutor::startup() { _executor->startup(); } void ShardingTaskExecutor::shutdown() { _executor->shutdown(); } void ShardingTaskExecutor::join() { _executor->join(); } void ShardingTaskExecutor::appendDiagnosticBSON(mongo::BSONObjBuilder* builder) const { _executor->appendDiagnosticBSON(builder); } Date_t ShardingTaskExecutor::now() { return _executor->now(); } StatusWith ShardingTaskExecutor::makeEvent() { return _executor->makeEvent(); } void ShardingTaskExecutor::signalEvent(const EventHandle& event) { return _executor->signalEvent(event); } StatusWith ShardingTaskExecutor::onEvent(const EventHandle& event, const CallbackFn& work) { return _executor->onEvent(event, work); } void ShardingTaskExecutor::waitForEvent(const EventHandle& event) { _executor->waitForEvent(event); } StatusWith ShardingTaskExecutor::waitForEvent(OperationContext* opCtx, const EventHandle& event, Date_t deadline) { return _executor->waitForEvent(opCtx, event, deadline); } StatusWith ShardingTaskExecutor::scheduleWork( const CallbackFn& work) { return _executor->scheduleWork(work); } StatusWith ShardingTaskExecutor::scheduleWorkAt( Date_t when, const CallbackFn& work) { return _executor->scheduleWorkAt(when, work); } StatusWith ShardingTaskExecutor::scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, const transport::BatonHandle& baton) { // schedule the user's callback if there is not opCtx if (!request.opCtx) { return _executor->scheduleRemoteCommand(request, cb, baton); } boost::optional newRequest; if (request.opCtx->getLogicalSessionId() && !request.cmdObj.hasField("lsid")) { newRequest.emplace(request); BSONObjBuilder bob(std::move(newRequest->cmdObj)); { // TODO SERVER-33702. BSONObjBuilder subbob(bob.subobjStart("lsid")); request.opCtx->getLogicalSessionId()->serialize(&subbob); } newRequest->cmdObj = bob.obj(); } std::shared_ptr timeTracker = OperationTimeTracker::get(request.opCtx); auto clusterGLE = ClusterLastErrorInfo::get(request.opCtx->getClient()); auto shardingCb = [ timeTracker, clusterGLE, cb, grid = Grid::get(request.opCtx) ]( const TaskExecutor::RemoteCommandCallbackArgs& args) { ON_BLOCK_EXIT([&cb, &args]() { cb(args); }); // Update replica set monitor info. auto shard = grid->shardRegistry()->getShardForHostNoReload(args.request.target); if (!shard) { LOG(1) << "Could not find shard containing host: " << args.request.target.toString(); } if (!args.response.isOK()) { if (isMongos() && args.response.status == ErrorCodes::IncompatibleWithUpgradedServer) { severe() << "This mongos server must be upgraded. It is attempting to communicate with " "an upgraded cluster with which it is incompatible. Error: '" << args.response.status.toString() << "' Crashing in order to bring attention to the incompatibility, rather " "than erroring endlessly."; fassertNoTrace(50710, false); } if (shard) { shard->updateReplSetMonitor(args.request.target, args.response.status); } LOG(1) << "Error processing the remote request, not updating operationTime or gLE"; return; } if (shard) { shard->updateReplSetMonitor(args.request.target, getStatusFromCommandResult(args.response.data)); } // Update the logical clock. invariant(timeTracker); auto operationTime = args.response.data[kOperationTimeField]; if (!operationTime.eoo()) { invariant(operationTime.type() == BSONType::bsonTimestamp); timeTracker->updateOperationTime(LogicalTime(operationTime.timestamp())); } // Update getLastError info for the client if we're tracking it. if (clusterGLE) { auto swShardingMetadata = rpc::ShardingMetadata::readFromMetadata(args.response.metadata); if (swShardingMetadata.isOK()) { auto shardingMetadata = std::move(swShardingMetadata.getValue()); auto shardConn = ConnectionString::parse(args.request.target.toString()); if (!shardConn.isOK()) { severe() << "got bad host string in saveGLEStats: " << args.request.target; } clusterGLE->addHostOpTime(shardConn.getValue(), HostOpTime(shardingMetadata.getLastOpTime(), shardingMetadata.getLastElectionId())); } else if (swShardingMetadata.getStatus() != ErrorCodes::NoSuchKey) { warning() << "Got invalid sharding metadata " << redact(swShardingMetadata.getStatus()) << " metadata object was '" << redact(args.response.metadata) << "'"; } } }; return _executor->scheduleRemoteCommand(newRequest ? *newRequest : request, shardingCb, baton); } void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { _executor->cancel(cbHandle); } void ShardingTaskExecutor::wait(const CallbackHandle& cbHandle) { _executor->wait(cbHandle); } void ShardingTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) const { _executor->appendConnectionStats(stats); } } // namespace executor } // namespace mongo