summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/commands/SConscript4
-rw-r--r--src/mongo/s/commands/cluster_multicast.cpp158
-rw-r--r--src/mongo/s/commands/cluster_multicast.idl37
-rw-r--r--src/mongo/s/commands/kill_sessions_remote.cpp137
-rw-r--r--src/mongo/s/commands/kill_sessions_remote.h44
-rw-r--r--src/mongo/s/commands/strategy.cpp4
-rw-r--r--src/mongo/s/query/SConscript2
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp12
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h5
-rw-r--r--src/mongo/s/server.cpp7
10 files changed, 408 insertions, 2 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 75db97fa257..75fb56c876b 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -61,6 +61,7 @@ env.Library(
'cluster_merge_chunks_cmd.cpp',
'cluster_move_chunk_cmd.cpp',
'cluster_move_primary_cmd.cpp',
+ 'cluster_multicast.cpp',
'cluster_netstat_cmd.cpp',
'cluster_pipeline_cmd.cpp',
'cluster_plan_cache_cmd.cpp',
@@ -79,7 +80,9 @@ env.Library(
'cluster_write.cpp',
'cluster_write_cmd.cpp',
'commands_public.cpp',
+ 'kill_sessions_remote.cpp',
'strategy.cpp',
+ env.Idlc('cluster_multicast.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/authmongos',
@@ -88,6 +91,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/write_commands_common',
'$BUILD_DIR/mongo/db/pipeline/aggregation',
'$BUILD_DIR/mongo/db/views/views',
+ '$BUILD_DIR/mongo/executor/async_multicaster',
'$BUILD_DIR/mongo/rpc/client_metadata',
'$BUILD_DIR/mongo/s/async_requests_sender',
'$BUILD_DIR/mongo/s/client/parallel',
diff --git a/src/mongo/s/commands/cluster_multicast.cpp b/src/mongo/s/commands/cluster_multicast.cpp
new file mode 100644
index 00000000000..56af56e255a
--- /dev/null
+++ b/src/mongo/s/commands/cluster_multicast.cpp
@@ -0,0 +1,158 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <vector>
+
+#include "mongo/base/init.h"
+#include "mongo/db/commands.h"
+#include "mongo/executor/async_multicaster.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/commands/cluster_multicast_gen.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+namespace {
+
+std::vector<HostAndPort> getAllClusterHosts(OperationContext* opCtx) {
+ auto registry = Grid::get(opCtx)->shardRegistry();
+
+ std::vector<ShardId> shardIds;
+ registry->getAllShardIds(&shardIds);
+
+ std::vector<HostAndPort> servers;
+ for (const auto& shardId : shardIds) {
+ auto shard = uassertStatusOK(registry->getShard(opCtx, shardId));
+
+ auto cs = shard->getConnString();
+ for (auto&& host : cs.getServers()) {
+ servers.emplace_back(host);
+ }
+ }
+
+ return servers;
+}
+
+class MulticastCmd : public BasicCommand {
+public:
+ MulticastCmd() : BasicCommand("multicast") {}
+
+ bool slaveOk() const override {
+ return true;
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return false;
+ }
+
+ void help(std::stringstream& help) const override {
+ help << "multicasts a command to hosts in a system";
+ }
+
+ // no privs because it's a test command
+ void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) override {}
+
+ bool run(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+ IDLParserErrorContext ctx("ClusterMulticastArgs");
+ auto args = ClusterMulticastArgs::parse(ctx, cmdObj);
+
+ // Grab an arbitrary executor.
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor();
+
+ // Grab all hosts in the cluster.
+ auto servers = getAllClusterHosts(opCtx);
+
+ executor::AsyncMulticaster::Options options;
+ if (args.getConcurrency()) {
+ options.maxConcurrency = *args.getConcurrency();
+ }
+
+ auto results =
+ executor::AsyncMulticaster(executor, options)
+ .multicast(servers,
+ args.getDb().toString(),
+ args.getMulticast(),
+ opCtx,
+ (args.getTimeout() ? Milliseconds(*args.getTimeout())
+ : executor::RemoteCommandRequest::kNoTimeout));
+
+ bool success = true;
+
+ {
+ BSONObjBuilder bob(result.subobjStart("hosts"));
+
+ for (const auto& r : results) {
+ HostAndPort host;
+ executor::RemoteCommandResponse response;
+ std::tie(host, response) = r;
+
+ if (!response.isOK() || !response.data["ok"].trueValue()) {
+ success = false;
+ }
+
+ {
+ BSONObjBuilder subbob(bob.subobjStart(host.toString()));
+
+ if (appendCommandStatus(subbob, response.status)) {
+ subbob.append("data", response.data);
+ subbob.append("metadata", response.metadata);
+ if (response.elapsedMillis) {
+ subbob.append("elapsedMillis", response.elapsedMillis->count());
+ }
+ }
+ }
+ }
+ }
+
+ return success;
+ }
+};
+
+MONGO_INITIALIZER(RegisterMulticast)(InitializerContext* context) {
+ if (Command::testCommandsEnabled) {
+ new MulticastCmd();
+ }
+ return Status::OK();
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_multicast.idl b/src/mongo/s/commands/cluster_multicast.idl
new file mode 100644
index 00000000000..cbed46fd512
--- /dev/null
+++ b/src/mongo/s/commands/cluster_multicast.idl
@@ -0,0 +1,37 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+
+ ClusterMulticastArgs:
+ description: "A struct representing cluster multicast args"
+ strict: false
+ fields:
+ multicast: object
+ $db:
+ type: string
+ cpp_name: db
+ concurrency:
+ type: int
+ optional: true
+ timeout:
+ type: int
+ optional: true
diff --git a/src/mongo/s/commands/kill_sessions_remote.cpp b/src/mongo/s/commands/kill_sessions_remote.cpp
new file mode 100644
index 00000000000..a89e407eb45
--- /dev/null
+++ b/src/mongo/s/commands/kill_sessions_remote.cpp
@@ -0,0 +1,137 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/kill_sessions_remote.h"
+
+#include "mongo/db/client.h"
+#include "mongo/db/kill_sessions_common.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/server_parameters.h"
+#include "mongo/db/service_context.h"
+#include "mongo/executor/async_multicaster.h"
+#include "mongo/executor/task_executor_pool.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/query/cluster_cursor_manager.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+
+namespace {
+
+constexpr size_t kMaxConcurrency = 100;
+constexpr Milliseconds kTimeout = Milliseconds(60000);
+
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(KillSessionsMaxConcurrency, int, kMaxConcurrency);
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(KillSessionsPerHostTimeoutMS, int, kTimeout.count());
+
+/**
+ * Get all hosts in the cluster.
+ */
+std::vector<HostAndPort> getAllClusterHosts(OperationContext* opCtx) {
+ auto registry = Grid::get(opCtx)->shardRegistry();
+
+ std::vector<ShardId> shardIds;
+ registry->getAllShardIds(&shardIds);
+
+ std::vector<HostAndPort> servers;
+ for (const auto& shardId : shardIds) {
+ auto shard = uassertStatusOK(registry->getShard(opCtx, shardId));
+
+ auto cs = shard->getConnString();
+ for (auto&& host : cs.getServers()) {
+ servers.emplace_back(host);
+ }
+ }
+
+ return servers;
+}
+
+/**
+ * A function for running an arbitrary command on all shards. Only returns which hosts failed.
+ */
+SessionKiller::Result parallelExec(OperationContext* opCtx,
+ const BSONObj& cmd,
+ SessionKiller::UniformRandomBitGenerator* urbg) {
+ // Grab an arbitrary executor.
+ auto executor = Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor();
+
+ // Grab all hosts in the cluster.
+ auto servers = getAllClusterHosts(opCtx);
+ std::shuffle(servers.begin(), servers.end(), *urbg);
+
+ // To indicate which hosts fail.
+ std::vector<HostAndPort> failed;
+
+ executor::AsyncMulticaster::Options options;
+ options.maxConcurrency = KillSessionsMaxConcurrency;
+ auto results =
+ executor::AsyncMulticaster(executor, options)
+ .multicast(servers, "admin", cmd, opCtx, Milliseconds(KillSessionsPerHostTimeoutMS));
+
+ for (const auto& result : results) {
+ if (!std::get<1>(result).isOK()) {
+ failed.push_back(std::get<0>(result));
+ }
+ }
+
+ return failed;
+}
+
+Status killSessionsRemoteKillCursor(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) {
+ return Grid::get(opCtx)->getCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher);
+}
+
+} // namespace
+
+/**
+ * This kill function (meant for mongos), kills matching local ops first, then fans out to all other
+ * nodes in the cluster to kill them as well.
+ */
+SessionKiller::Result killSessionsRemote(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher,
+ SessionKiller::UniformRandomBitGenerator* urbg) {
+ // First kill local sessions.
+ uassertStatusOK(killSessionsRemoteKillCursor(opCtx, matcher));
+ uassertStatusOK(killSessionsLocalKillOps(opCtx, matcher));
+
+ // Generate the kill command.
+ KillAllSessionsByPatternCmd cmd;
+ cmd.setKillAllSessionsByPattern(std::vector<KillAllSessionsByPattern>{
+ matcher.getPatterns().begin(), matcher.getPatterns().end()});
+
+ return parallelExec(opCtx, cmd.toBSON(), urbg);
+}
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/kill_sessions_remote.h b/src/mongo/s/commands/kill_sessions_remote.h
new file mode 100644
index 00000000000..0be3cc7189e
--- /dev/null
+++ b/src/mongo/s/commands/kill_sessions_remote.h
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/kill_sessions.h"
+
+#include "mongo/db/session_killer.h"
+
+namespace mongo {
+
+/**
+ * The killSessions killer for running on mongos
+ */
+SessionKiller::Result killSessionsRemote(OperationContext* opCtx,
+ const SessionKiller::Matcher& patterns,
+ SessionKiller::UniformRandomBitGenerator* urbg);
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index a17e73199d4..9d6aa9e72f5 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -217,14 +217,14 @@ void execCommandClient(OperationContext* opCtx,
try {
bool ok = false;
if (!supportsWriteConcern) {
- ok = c->enhancedRun(opCtx, request, result);
+ ok = c->publicRun(opCtx, request, result);
} else {
// Change the write concern while running the command.
const auto oldWC = opCtx->getWriteConcern();
ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
opCtx->setWriteConcern(wcResult.getValue());
- ok = c->enhancedRun(opCtx, request, result);
+ ok = c->publicRun(opCtx, request, result);
}
if (!ok) {
c->incrementCommandsFailed();
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 2977050be19..259e2d28244 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -133,6 +133,8 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/db/auth/authcore',
+ '$BUILD_DIR/mongo/db/kill_sessions',
'$BUILD_DIR/mongo/db/logical_session_id',
],
)
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 99e7b2afa82..25e63a50d54 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -34,6 +34,7 @@
#include <set>
+#include "mongo/db/kill_sessions_common.h"
#include "mongo/util/clock_source.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -471,6 +472,17 @@ void ClusterCursorManager::appendActiveSessions(LogicalSessionIdSet* lsids) cons
}
}
+Status ClusterCursorManager::killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
+ auto eraser = [&](ClusterCursorManager& mgr, CursorId id) {
+ uassertStatusOK(mgr.killCursor(getNamespaceForCursorId(id).get(), id));
+ };
+
+ auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser));
+ visitor(*this);
+ return visitor.getStatus();
+}
+
stdx::unordered_set<CursorId> ClusterCursorManager::getCursorsForSession(
LogicalSessionId lsid) const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index f6da640bb1e..d6eb8f47abf 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -32,7 +32,9 @@
#include <vector>
#include "mongo/db/cursor_id.h"
+#include "mongo/db/kill_sessions.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/session_killer.h"
#include "mongo/platform/random.h"
#include "mongo/s/query/cluster_client_cursor.h"
#include "mongo/stdx/mutex.h"
@@ -353,6 +355,9 @@ public:
*/
void appendActiveSessions(LogicalSessionIdSet* lsids) const;
+ Status killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher);
+
/**
* Returns a list of all open cursors for the given session.
*/
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 521702e56b8..27de3a7994f 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -52,6 +52,7 @@
#include "mongo/db/client.h"
#include "mongo/db/ftdc/ftdc_mongos.h"
#include "mongo/db/initialize_server_global_state.h"
+#include "mongo/db/kill_sessions.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/log_process_details.h"
#include "mongo/db/logical_clock.h"
@@ -62,6 +63,7 @@
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_noop.h"
+#include "mongo/db/session_killer.h"
#include "mongo/db/startup_warnings_common.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/task_executor_pool.h"
@@ -75,6 +77,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/client/shard_remote.h"
#include "mongo/s/client/sharding_connection_hook.h"
+#include "mongo/s/commands/kill_sessions_remote.h"
#include "mongo/s/config_server_catalog_cache_loader.h"
#include "mongo/s/grid.h"
#include "mongo/s/is_mongos.h"
@@ -395,6 +398,10 @@ static ExitCode runMongosServer() {
runner->startup().transitional_ignore();
getGlobalServiceContext()->setPeriodicRunner(std::move(runner));
+ SessionKiller::set(
+ getGlobalServiceContext(),
+ std::make_shared<SessionKiller>(getGlobalServiceContext(), killSessionsRemote));
+
// Set up the logical session cache
LogicalSessionCache::set(getGlobalServiceContext(), makeLogicalSessionCacheS());