diff options
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/commands/SConscript | 4 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_multicast.cpp | 158 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_multicast.idl | 37 | ||||
-rw-r--r-- | src/mongo/s/commands/kill_sessions_remote.cpp | 137 | ||||
-rw-r--r-- | src/mongo/s/commands/kill_sessions_remote.h | 44 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_manager.h | 5 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 7 |
10 files changed, 408 insertions, 2 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 75db97fa257..75fb56c876b 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -61,6 +61,7 @@ env.Library( 'cluster_merge_chunks_cmd.cpp', 'cluster_move_chunk_cmd.cpp', 'cluster_move_primary_cmd.cpp', + 'cluster_multicast.cpp', 'cluster_netstat_cmd.cpp', 'cluster_pipeline_cmd.cpp', 'cluster_plan_cache_cmd.cpp', @@ -79,7 +80,9 @@ env.Library( 'cluster_write.cpp', 'cluster_write_cmd.cpp', 'commands_public.cpp', + 'kill_sessions_remote.cpp', 'strategy.cpp', + env.Idlc('cluster_multicast.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/authmongos', @@ -88,6 +91,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/write_commands_common', '$BUILD_DIR/mongo/db/pipeline/aggregation', '$BUILD_DIR/mongo/db/views/views', + '$BUILD_DIR/mongo/executor/async_multicaster', '$BUILD_DIR/mongo/rpc/client_metadata', '$BUILD_DIR/mongo/s/async_requests_sender', '$BUILD_DIR/mongo/s/client/parallel', diff --git a/src/mongo/s/commands/cluster_multicast.cpp b/src/mongo/s/commands/cluster_multicast.cpp new file mode 100644 index 00000000000..56af56e255a --- /dev/null +++ b/src/mongo/s/commands/cluster_multicast.cpp @@ -0,0 +1,158 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/base/init.h" +#include "mongo/db/commands.h" +#include "mongo/executor/async_multicaster.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/commands/cluster_multicast_gen.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace { + +std::vector<HostAndPort> getAllClusterHosts(OperationContext* opCtx) { + auto registry = Grid::get(opCtx)->shardRegistry(); + + std::vector<ShardId> shardIds; + registry->getAllShardIds(&shardIds); + + std::vector<HostAndPort> servers; + for (const auto& shardId : shardIds) { + auto shard = uassertStatusOK(registry->getShard(opCtx, shardId)); + + auto cs = shard->getConnString(); + for (auto&& host : cs.getServers()) { + servers.emplace_back(host); + } + } + + return servers; +} + +class MulticastCmd : public BasicCommand { +public: + MulticastCmd() : BasicCommand("multicast") {} + + bool slaveOk() const override { + return true; + } + + bool adminOnly() const override { + return true; + } + + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + + void help(std::stringstream& help) const override { + help << "multicasts a command to hosts in a system"; + } + + // no privs because it's a test command + void addRequiredPrivileges(const std::string& dbname, + const BSONObj& cmdObj, + std::vector<Privilege>* out) override {} + + bool run(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + IDLParserErrorContext ctx("ClusterMulticastArgs"); + auto args = ClusterMulticastArgs::parse(ctx, cmdObj); + + // Grab an arbitrary executor. + auto executor = Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(); + + // Grab all hosts in the cluster. + auto servers = getAllClusterHosts(opCtx); + + executor::AsyncMulticaster::Options options; + if (args.getConcurrency()) { + options.maxConcurrency = *args.getConcurrency(); + } + + auto results = + executor::AsyncMulticaster(executor, options) + .multicast(servers, + args.getDb().toString(), + args.getMulticast(), + opCtx, + (args.getTimeout() ? Milliseconds(*args.getTimeout()) + : executor::RemoteCommandRequest::kNoTimeout)); + + bool success = true; + + { + BSONObjBuilder bob(result.subobjStart("hosts")); + + for (const auto& r : results) { + HostAndPort host; + executor::RemoteCommandResponse response; + std::tie(host, response) = r; + + if (!response.isOK() || !response.data["ok"].trueValue()) { + success = false; + } + + { + BSONObjBuilder subbob(bob.subobjStart(host.toString())); + + if (appendCommandStatus(subbob, response.status)) { + subbob.append("data", response.data); + subbob.append("metadata", response.metadata); + if (response.elapsedMillis) { + subbob.append("elapsedMillis", response.elapsedMillis->count()); + } + } + } + } + } + + return success; + } +}; + +MONGO_INITIALIZER(RegisterMulticast)(InitializerContext* context) { + if (Command::testCommandsEnabled) { + new MulticastCmd(); + } + return Status::OK(); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_multicast.idl b/src/mongo/s/commands/cluster_multicast.idl new file mode 100644 index 00000000000..cbed46fd512 --- /dev/null +++ b/src/mongo/s/commands/cluster_multicast.idl @@ -0,0 +1,37 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + + ClusterMulticastArgs: + description: "A struct representing cluster multicast args" + strict: false + fields: + multicast: object + $db: + type: string + cpp_name: db + concurrency: + type: int + optional: true + timeout: + type: int + optional: true diff --git a/src/mongo/s/commands/kill_sessions_remote.cpp b/src/mongo/s/commands/kill_sessions_remote.cpp new file mode 100644 index 00000000000..a89e407eb45 --- /dev/null +++ b/src/mongo/s/commands/kill_sessions_remote.cpp @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/s/commands/kill_sessions_remote.h" + +#include "mongo/db/client.h" +#include "mongo/db/kill_sessions_common.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" +#include "mongo/executor/async_multicaster.h" +#include "mongo/executor/task_executor_pool.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/s/query/cluster_cursor_manager.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +constexpr size_t kMaxConcurrency = 100; +constexpr Milliseconds kTimeout = Milliseconds(60000); + +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(KillSessionsMaxConcurrency, int, kMaxConcurrency); +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(KillSessionsPerHostTimeoutMS, int, kTimeout.count()); + +/** + * Get all hosts in the cluster. + */ +std::vector<HostAndPort> getAllClusterHosts(OperationContext* opCtx) { + auto registry = Grid::get(opCtx)->shardRegistry(); + + std::vector<ShardId> shardIds; + registry->getAllShardIds(&shardIds); + + std::vector<HostAndPort> servers; + for (const auto& shardId : shardIds) { + auto shard = uassertStatusOK(registry->getShard(opCtx, shardId)); + + auto cs = shard->getConnString(); + for (auto&& host : cs.getServers()) { + servers.emplace_back(host); + } + } + + return servers; +} + +/** + * A function for running an arbitrary command on all shards. Only returns which hosts failed. + */ +SessionKiller::Result parallelExec(OperationContext* opCtx, + const BSONObj& cmd, + SessionKiller::UniformRandomBitGenerator* urbg) { + // Grab an arbitrary executor. + auto executor = Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(); + + // Grab all hosts in the cluster. + auto servers = getAllClusterHosts(opCtx); + std::shuffle(servers.begin(), servers.end(), *urbg); + + // To indicate which hosts fail. + std::vector<HostAndPort> failed; + + executor::AsyncMulticaster::Options options; + options.maxConcurrency = KillSessionsMaxConcurrency; + auto results = + executor::AsyncMulticaster(executor, options) + .multicast(servers, "admin", cmd, opCtx, Milliseconds(KillSessionsPerHostTimeoutMS)); + + for (const auto& result : results) { + if (!std::get<1>(result).isOK()) { + failed.push_back(std::get<0>(result)); + } + } + + return failed; +} + +Status killSessionsRemoteKillCursor(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) { + return Grid::get(opCtx)->getCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher); +} + +} // namespace + +/** + * This kill function (meant for mongos), kills matching local ops first, then fans out to all other + * nodes in the cluster to kill them as well. + */ +SessionKiller::Result killSessionsRemote(OperationContext* opCtx, + const SessionKiller::Matcher& matcher, + SessionKiller::UniformRandomBitGenerator* urbg) { + // First kill local sessions. + uassertStatusOK(killSessionsRemoteKillCursor(opCtx, matcher)); + uassertStatusOK(killSessionsLocalKillOps(opCtx, matcher)); + + // Generate the kill command. + KillAllSessionsByPatternCmd cmd; + cmd.setKillAllSessionsByPattern(std::vector<KillAllSessionsByPattern>{ + matcher.getPatterns().begin(), matcher.getPatterns().end()}); + + return parallelExec(opCtx, cmd.toBSON(), urbg); +} + +} // namespace mongo diff --git a/src/mongo/s/commands/kill_sessions_remote.h b/src/mongo/s/commands/kill_sessions_remote.h new file mode 100644 index 00000000000..0be3cc7189e --- /dev/null +++ b/src/mongo/s/commands/kill_sessions_remote.h @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/kill_sessions.h" + +#include "mongo/db/session_killer.h" + +namespace mongo { + +/** + * The killSessions killer for running on mongos + */ +SessionKiller::Result killSessionsRemote(OperationContext* opCtx, + const SessionKiller::Matcher& patterns, + SessionKiller::UniformRandomBitGenerator* urbg); + +} // namespace mongo diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index a17e73199d4..9d6aa9e72f5 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -217,14 +217,14 @@ void execCommandClient(OperationContext* opCtx, try { bool ok = false; if (!supportsWriteConcern) { - ok = c->enhancedRun(opCtx, request, result); + ok = c->publicRun(opCtx, request, result); } else { // Change the write concern while running the command. const auto oldWC = opCtx->getWriteConcern(); ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); opCtx->setWriteConcern(wcResult.getValue()); - ok = c->enhancedRun(opCtx, request, result); + ok = c->publicRun(opCtx, request, result); } if (!ok) { c->incrementCommandsFailed(); diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 2977050be19..259e2d28244 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -133,6 +133,8 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/auth/authcore', + '$BUILD_DIR/mongo/db/kill_sessions', '$BUILD_DIR/mongo/db/logical_session_id', ], ) diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 99e7b2afa82..25e63a50d54 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -34,6 +34,7 @@ #include <set> +#include "mongo/db/kill_sessions_common.h" #include "mongo/util/clock_source.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -471,6 +472,17 @@ void ClusterCursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) cons } } +Status ClusterCursorManager::killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) { + auto eraser = [&](ClusterCursorManager& mgr, CursorId id) { + uassertStatusOK(mgr.killCursor(getNamespaceForCursorId(id).get(), id)); + }; + + auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser)); + visitor(*this); + return visitor.getStatus(); +} + stdx::unordered_set<CursorId> ClusterCursorManager::getCursorsForSession( LogicalSessionId lsid) const { stdx::lock_guard<stdx::mutex> lk(_mutex); diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index f6da640bb1e..d6eb8f47abf 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -32,7 +32,9 @@ #include <vector> #include "mongo/db/cursor_id.h" +#include "mongo/db/kill_sessions.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/session_killer.h" #include "mongo/platform/random.h" #include "mongo/s/query/cluster_client_cursor.h" #include "mongo/stdx/mutex.h" @@ -353,6 +355,9 @@ public: */ void appendActiveSessions(LogicalSessionIdSet* lsids) const; + Status killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher); + /** * Returns a list of all open cursors for the given session. */ diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 521702e56b8..27de3a7994f 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -52,6 +52,7 @@ #include "mongo/db/client.h" #include "mongo/db/ftdc/ftdc_mongos.h" #include "mongo/db/initialize_server_global_state.h" +#include "mongo/db/kill_sessions.h" #include "mongo/db/lasterror.h" #include "mongo/db/log_process_details.h" #include "mongo/db/logical_clock.h" @@ -62,6 +63,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_noop.h" +#include "mongo/db/session_killer.h" #include "mongo/db/startup_warnings_common.h" #include "mongo/db/wire_version.h" #include "mongo/executor/task_executor_pool.h" @@ -75,6 +77,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/client/shard_remote.h" #include "mongo/s/client/sharding_connection_hook.h" +#include "mongo/s/commands/kill_sessions_remote.h" #include "mongo/s/config_server_catalog_cache_loader.h" #include "mongo/s/grid.h" #include "mongo/s/is_mongos.h" @@ -395,6 +398,10 @@ static ExitCode runMongosServer() { runner->startup().transitional_ignore(); getGlobalServiceContext()->setPeriodicRunner(std::move(runner)); + SessionKiller::set( + getGlobalServiceContext(), + std::make_shared<SessionKiller>(getGlobalServiceContext(), killSessionsRemote)); + // Set up the logical session cache LogicalSessionCache::set(getGlobalServiceContext(), makeLogicalSessionCacheS()); |