diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/SConscript | 19 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 30 | ||||
-rw-r--r-- | src/mongo/db/auth/action_types.txt | 1 | ||||
-rw-r--r-- | src/mongo/db/auth/role_graph_builtin_roles.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/traffic_recording_cmds.cpp | 120 | ||||
-rw-r--r-- | src/mongo/db/traffic_reader.cpp | 252 | ||||
-rw-r--r-- | src/mongo/db/traffic_reader.h | 42 | ||||
-rw-r--r-- | src/mongo/db/traffic_reader_main.cpp | 133 | ||||
-rw-r--r-- | src/mongo/db/traffic_recorder.cpp | 416 | ||||
-rw-r--r-- | src/mongo/db/traffic_recorder.h | 79 | ||||
-rw-r--r-- | src/mongo/db/traffic_recorder.idl | 77 | ||||
-rw-r--r-- | src/mongo/shell/shell_utils_launcher.cpp | 10 | ||||
-rw-r--r-- | src/mongo/transport/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 8 |
15 files changed, 1192 insertions, 1 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index ea3da91549a..ea7cb46d148 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -349,6 +349,7 @@ mongod = env.Program( 'db/introspect', 'db/keys_collection_client_direct', 'db/kill_sessions_local', + 'db/traffic_recorder', 'db/logical_session_cache_factory_mongod', 'db/logical_time_metadata_hook', 'db/matcher/expressions_mongod_only', @@ -451,6 +452,23 @@ if env.TargetOSIs('windows'): env.Alias('generated-sources', generatedServerManifest) env.Depends("s/server.res", generatedServerManifest) + +mongotrafficreader = env.Program( + target="mongotrafficreader", + source=[ + "db/traffic_reader_main.cpp" + ], + LIBDEPS=[ + 'base', + 'db/traffic_reader', + 'rpc/protocol', + 'util/signal_handlers' + ], +) + +if not hygienic: + env.Install('#/', mongotrafficreader) + # mongos mongos = env.Program( target='mongos', @@ -533,6 +551,7 @@ if not has_option('noshell') and usemozjs: 'db/query/command_request_response', 'db/query/query_request', 'db/server_options_core', + 'db/traffic_reader', 'linenoise_utf8', 'rpc/protocol', 'scripting/scripting', diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 5fd17fe7ee3..ed2e69d5a79 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2067,3 +2067,33 @@ env.CppIntegrationTest( '$BUILD_DIR/mongo/util/version_impl', ], ) + +env.Library( + target='traffic_recorder', + source=[ + 'traffic_recorder.cpp', + env.Idlc('traffic_recorder.idl')[0], + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/service_context', + "$BUILD_DIR/mongo/rpc/rpc", + "$BUILD_DIR/mongo/db/commands/server_status", + ], +) + +env.Library( + target='traffic_reader', + source=[ + "traffic_reader.cpp", + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/rpc/protocol', + "$BUILD_DIR/mongo/rpc/rpc", + ], +) diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt index 303022bf3a9..8ae39298cff 100644 --- a/src/mongo/db/auth/action_types.txt +++ b/src/mongo/db/auth/action_types.txt @@ -115,6 +115,7 @@ "storageDetails", "top", "touch", +"trafficRecord", "unlock", "useUUID", "update", diff --git a/src/mongo/db/auth/role_graph_builtin_roles.cpp b/src/mongo/db/auth/role_graph_builtin_roles.cpp index e9506a5f07c..7a6865f3308 100644 --- a/src/mongo/db/auth/role_graph_builtin_roles.cpp +++ b/src/mongo/db/auth/role_graph_builtin_roles.cpp @@ -222,7 +222,8 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) { << ActionType::killAnySession << ActionType::killop << ActionType::replSetResizeOplog - << ActionType::resync; // clusterManager gets this also + << ActionType::resync // clusterManager gets this also + << ActionType::trafficRecord; // hostManager role actions that target the database resource hostManagerRoleDatabaseActions diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 58aa4e3da35..be6a0b674ef 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -141,6 +141,7 @@ env.Library( "mr_common.cpp", "reap_logical_session_cache_now.cpp", "refresh_sessions_command_internal.cpp", + "traffic_recording_cmds.cpp", "user_management_commands_common.cpp", ], LIBDEPS_PRIVATE=[ @@ -159,6 +160,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/isself', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/executor/egress_tag_closer_manager', + '$BUILD_DIR/mongo/db/traffic_recorder', '$BUILD_DIR/mongo/executor/task_executor_pool', '$BUILD_DIR/mongo/rpc/client_metadata', '$BUILD_DIR/mongo/s/sharding_legacy_api', diff --git a/src/mongo/db/commands/traffic_recording_cmds.cpp b/src/mongo/db/commands/traffic_recording_cmds.cpp new file mode 100644 index 00000000000..7fd2894c9f6 --- /dev/null +++ b/src/mongo/db/commands/traffic_recording_cmds.cpp @@ -0,0 +1,120 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/action_set.h" +#include "mongo/db/auth/action_type.h" +#include "mongo/db/auth/authorization_manager.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/traffic_recorder.h" +#include "mongo/db/traffic_recorder_gen.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +class StartRecordingCommand final : public TypedCommand<StartRecordingCommand> { +public: + using Request = StartRecordingTraffic; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + TrafficRecorder::get(opCtx->getServiceContext()).start(request()); + log() << "** Warning: The recording file contains unencrypted user traffic." + << " We recommend that you limit retention of this file and " + << "store it on an encrypted filesystem volume."; + } + + private: + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), + ActionType::trafficRecord})); + } + + NamespaceString ns() const override { + return NamespaceString(request().getDbName(), ""); + } + }; + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kAlways; + } +} startRecordingTrafficCommand; + +class StopRecordingCommand final : public TypedCommand<StopRecordingCommand> { +public: + using Request = StopRecordingTraffic; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + TrafficRecorder::get(opCtx->getServiceContext()).stop(); + } + + private: + bool supportsWriteConcern() const override { + return false; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForPrivilege(Privilege{ResourcePattern::forClusterResource(), + ActionType::trafficRecord})); + } + + NamespaceString ns() const override { + return NamespaceString(request().getDbName(), ""); + } + }; + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kAlways; + } +} stopRecordingTrafficCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/traffic_reader.cpp b/src/mongo/db/traffic_reader.cpp new file mode 100644 index 00000000000..1198f6c11b3 --- /dev/null +++ b/src/mongo/db/traffic_reader.cpp @@ -0,0 +1,252 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <fcntl.h> +#include <iostream> +#include <string> +#include <sys/types.h> +#include <vector> + +#ifdef _WIN32 +#include <io.h> +#else +#include <unistd.h> +#endif + +#include "mongo/base/data_cursor.h" +#include "mongo/base/data_range_cursor.h" +#include "mongo/base/data_type_endian.h" +#include "mongo/base/data_type_validated.h" +#include "mongo/base/data_view.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/traffic_reader.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/message.h" +#include "mongo/rpc/op_msg.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/errno_util.h" +#include "mongo/util/scopeguard.h" +#include "mongo/util/time_support.h" + +namespace { +// Taken from src/mongo/gotools/mongoreplay/util.go +// Time.Unix() returns the number of seconds from the unix epoch but time's +// underlying struct stores 'sec' as the number of seconds elapsed since +// January 1, year 1 00:00:00 UTC (In the Proleptic Gregorian Calendar) +// This calculation allows for conversion between the internal representation +// and the UTC representation. +const long long unixToInternal = + static_cast<long long>(1969 * 365 + 1969 / 4 - 1969 / 100 + 1969 / 400) * 86400; +} // namespace + +namespace mongo { + +namespace { + +// Packet struct +struct TrafficReaderPacket { + uint64_t id; + StringData local; + StringData remote; + Date_t date; + uint64_t order; + MsgData::ConstView message; +}; + +bool readBytes(size_t toRead, char* buf, int fd) { + while (toRead) { +#ifdef _WIN32 + auto r = _read(fd, buf, toRead); +#else + auto r = ::read(fd, buf, toRead); +#endif + + if (r == -1) { + auto pair = errnoAndDescription(); + + uassert(ErrorCodes::FileStreamFailed, + str::stream() << "failed to read bytes: errno(" << pair.first << ") : " + << pair.second, + pair.first == EINTR); + + continue; + } else if (r == 0) { + return false; + } + + buf += r; + toRead -= r; + } + + return true; +} + +boost::optional<TrafficReaderPacket> readPacket(char* buf, int fd) { + if (!readBytes(4, buf, fd)) { + return boost::none; + } + auto len = ConstDataView(buf).read<LittleEndian<uint32_t>>(); + + uassert(ErrorCodes::FailedToParse, "packet too large", len < MaxMessageSizeBytes); + uassert( + ErrorCodes::FailedToParse, "could not read full packet", readBytes(len - 4, buf + 4, fd)); + + ConstDataRangeCursor cdr(buf, buf + len); + + // Read the packet + uassertStatusOK(cdr.skip<LittleEndian<uint32_t>>()); + uint64_t id = uassertStatusOK(cdr.readAndAdvance<LittleEndian<uint64_t>>()); + StringData local = uassertStatusOK(cdr.readAndAdvance<Terminated<'\0', StringData>>()); + StringData remote = uassertStatusOK(cdr.readAndAdvance<Terminated<'\0', StringData>>()); + uint64_t date = uassertStatusOK(cdr.readAndAdvance<LittleEndian<uint64_t>>()); + uint64_t order = uassertStatusOK(cdr.readAndAdvance<LittleEndian<uint64_t>>()); + MsgData::ConstView message(cdr.data()); + + return TrafficReaderPacket{ + id, local, remote, Date_t::fromMillisSinceEpoch(date), order, message}; +} + +void getBSONObjFromPacket(TrafficReaderPacket& packet, BSONObjBuilder* builder) { + { + // RawOp Field + BSONObjBuilder rawop(builder->subobjStart("rawop")); + + // Add the header fields to rawOp + { + BSONObjBuilder header(rawop.subobjStart("header")); + header.append("messagelength", static_cast<int32_t>(packet.message.getLen())); + header.append("requestid", static_cast<int32_t>(packet.message.getId())); + header.append("responseto", static_cast<int32_t>(packet.message.getResponseToMsgId())); + header.append("opcode", static_cast<int32_t>(packet.message.getNetworkOp())); + } + + // Add the binary reprentation of the entire message for rawop.body + // auto buf = SharedBuffer::allocate(packet.message.getLen()); + // std::memcpy(buf.get(), packet.message.view2ptr(), packet.message.getLen()); + // rawop.appendBinData("body", packet.message.getLen(), BinDataGeneral, buf.get()); + rawop.appendBinData( + "body", packet.message.getLen(), BinDataGeneral, packet.message.view2ptr()); + } + + // The seen field represents the time that the operation took place + // Trying to re-create the way mongoreplay does this + { + BSONObjBuilder seen(builder->subobjStart("seen")); + seen.append( + "sec", + static_cast<int64_t>((packet.date.toMillisSinceEpoch() / 1000) + unixToInternal)); + seen.append("nsec", static_cast<int32_t>(packet.order)); + } + + // Figure out which is the src endpoint as opposed to the dest endpoint + auto localInd = packet.local.rfind(':'); + auto remoteInd = packet.remote.rfind(':'); + if (localInd != std::string::npos && remoteInd != std::string::npos) { + auto local = packet.local.substr(localInd + 1); + auto remote = packet.remote.substr(remoteInd + 1); + if (packet.message.getResponseToMsgId()) { + builder->append("srcendpoint", local); + builder->append("destendpoint", remote); + } else { + builder->append("srcendpoint", remote); + builder->append("destendpoint", local); + } + } + + // Fill out the remaining fields + builder->append("order", static_cast<int64_t>(packet.order)); + builder->append("seenconnectionnum", static_cast<int64_t>(packet.id)); + builder->append("playedconnectionnum", static_cast<int64_t>(0)); + builder->append("generation", static_cast<int32_t>(0)); +} + +void addOpType(TrafficReaderPacket& packet, BSONObjBuilder* builder) { + if (packet.message.getNetworkOp() == dbMsg) { + Message message; + message.setData(dbMsg, packet.message.data(), packet.message.dataLen()); + + auto opMsg = rpc::opMsgRequestFromAnyProtocol(message); + builder->append("opType", opMsg.getCommandName()); + } else { + builder->append("opType", "legacy"); + } +} + +} // namespace + +BSONArray trafficRecordingFileToBSONArr(const std::string& inputFile) { + BSONArrayBuilder builder{}; + +// Open the connection to the input file +#ifdef _WIN32 + auto inputFd = ::open(inputFile.c_str(), O_RDONLY | O_BINARY); +#else + auto inputFd = ::open(inputFile.c_str(), O_RDONLY); +#endif + + const auto guard = makeGuard([&] { ::close(inputFd); }); + + uassert(ErrorCodes::FileNotOpen, + str::stream() << "Specified file does not exist (" << inputFile << ")", + inputFd > 0); + + auto buf = SharedBuffer::allocate(MaxMessageSizeBytes); + while (auto packet = readPacket(buf.get(), inputFd)) { + BSONObjBuilder bob(builder.subobjStart()); + getBSONObjFromPacket(*packet, &bob); + addOpType(*packet, &bob); + } + + return builder.arr(); +} + +void trafficRecordingFileToMongoReplayFile(int inputFd, std::ostream& outputStream) { + // Document expected by mongoreplay + BSONObjBuilder opts{}; + opts.append("playbackfileversion", 1); + opts.append("driveropsfiltered", false); + auto optsObj = opts.obj(); + outputStream.write(optsObj.objdata(), optsObj.objsize()); + + BSONObjBuilder bob; + auto buf = SharedBuffer::allocate(MaxMessageSizeBytes); + + while (auto packet = readPacket(buf.get(), inputFd)) { + getBSONObjFromPacket(*packet, &bob); + + auto obj = bob.asTempObj(); + outputStream.write(obj.objdata(), obj.objsize()); + + bob.resetToEmpty(); + } +} + +} // namespace mongo diff --git a/src/mongo/db/traffic_reader.h b/src/mongo/db/traffic_reader.h new file mode 100644 index 00000000000..c22464fa92a --- /dev/null +++ b/src/mongo/db/traffic_reader.h @@ -0,0 +1,42 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/rpc/op_msg.h" + +#pragma once + +namespace mongo { + +// Method for testing, takes the recorded traffic and returns a BSONArray +BSONArray trafficRecordingFileToBSONArr(const std::string& inputFile); + +// This is the function that traffic_reader_main.cpp calls +void trafficRecordingFileToMongoReplayFile(int inFile, std::ostream& outFile); +} // namespace mongo diff --git a/src/mongo/db/traffic_reader_main.cpp b/src/mongo/db/traffic_reader_main.cpp new file mode 100644 index 00000000000..c907b74c979 --- /dev/null +++ b/src/mongo/db/traffic_reader_main.cpp @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <fcntl.h> +#include <fstream> +#include <iostream> +#include <string> + +#ifdef _WIN32 +#include <io.h> +#endif + +#include "mongo/base/initializer.h" +#include "mongo/db/traffic_reader.h" +#include "mongo/util/signal_handlers.h" +#include "mongo/util/text.h" + +#include <boost/filesystem.hpp> +#include <boost/program_options.hpp> + +using namespace mongo; + +int main(int argc, char* argv[], char** envp) { + + setupSignalHandlers(); + + Status status = mongo::runGlobalInitializers(argc, argv, envp); + if (!status.isOK()) { + std::cerr << "Failed global initialization: " << status << std::endl; + return EXIT_FAILURE; + } + + startSignalProcessingThread(); + + // Handle program options + boost::program_options::variables_map vm; + + // input / output files for the reader (input defaults to stdin) + int inputFd = 0; + std::ofstream outputStream; + + try { + // Define the program options + auto inputStr = "Path to file input file (defaults to stdin)"; + auto outputStr = + "Path to file that mongotrafficreader will place its output (defaults to stdout)"; + boost::program_options::options_description desc{"Options"}; + desc.add_options()("help,h", "help")( + "input,i", boost::program_options::value<std::string>(), inputStr)( + "output,o", boost::program_options::value<std::string>(), outputStr); + + // Parse the program options + store(parse_command_line(argc, argv, desc), vm); + notify(vm); + + // Handle the help option + if (vm.count("help")) { + std::cout << "Mongo Traffic Reader Help: \n\n\t./mongotrafficreader " + "-i trafficinput.txt -o mongotrafficreader_dump.bson \n\n" + << desc << std::endl; + return EXIT_SUCCESS; + } + + // User can specify a --input param and it must point to a valid file + if (vm.count("input")) { + auto inputFile = vm["input"].as<std::string>(); + if (!boost::filesystem::exists(inputFile.c_str())) { + std::cout << "Error: Specified file does not exist (" << inputFile.c_str() << ")" + << std::endl; + return EXIT_FAILURE; + } + +// Open the connection to the input file +#ifdef _WIN32 + inputFd = open(inputFile.c_str(), O_RDONLY | O_BINARY); +#else + inputFd = open(inputFile.c_str(), O_RDONLY); +#endif + } + + // User must specify a --output param and it does not need to point to a valid file + if (vm.count("output")) { + auto outputFile = vm["output"].as<std::string>(); + + // Open the connection to the output file + outputStream.open(outputFile, std::ios::out | std::ios::trunc | std::ios::binary); + if (!outputStream.is_open()) { + std::cerr << "Error writing to file: " << outputFile << std::endl; + return EXIT_FAILURE; + } + } else { + // output to std::cout + outputStream.copyfmt(std::cout); + outputStream.clear(std::cout.rdstate()); + outputStream.basic_ios<char>::rdbuf(std::cout.rdbuf()); + } + } catch (const boost::program_options::error& ex) { + std::cerr << ex.what() << '\n'; + return EXIT_FAILURE; + } + + mongo::trafficRecordingFileToMongoReplayFile(inputFd, outputStream); + + return 0; +} diff --git a/src/mongo/db/traffic_recorder.cpp b/src/mongo/db/traffic_recorder.cpp new file mode 100644 index 00000000000..6ba88708b78 --- /dev/null +++ b/src/mongo/db/traffic_recorder.cpp @@ -0,0 +1,416 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/traffic_recorder.h" + +#include <boost/filesystem/operations.hpp> +#include <fstream> + +#include "mongo/base/data_builder.h" +#include "mongo/base/data_type_terminated.h" +#include "mongo/base/init.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/commands/server_status.h" +#include "mongo/db/commands/test_commands_enabled.h" +#include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" +#include "mongo/rpc/factory.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/producer_consumer_queue.h" + +namespace mongo { + +namespace { + +constexpr auto kDefaultTrafficRecordingDirectory = ""_sd; + +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(trafficRecordingDirectory, + std::string, + kDefaultTrafficRecordingDirectory.toString()) + ->withValidator([](const std::string& newValue) { + if (!boost::filesystem::is_directory(newValue)) { + return Status(ErrorCodes::FileNotOpen, + str::stream() << "traffic recording directory \"" << newValue + << "\" is not a directory."); + } + + return Status::OK(); + }); + +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(AlwaysRecordTraffic, std::string, ""); + +bool shouldAlwaysRecordTraffic = false; + +MONGO_INITIALIZER(ShouldAlwaysRecordTraffic)(InitializerContext*) { + if (!AlwaysRecordTraffic.size()) { + return Status::OK(); + } + + if (!getTestCommandsEnabled()) { + return Status(ErrorCodes::BadValue, + "invalid to set AlwaysRecordTraffic if test commands are not enabled"); + } + + if (trafficRecordingDirectory.empty()) { + if (serverGlobalParams.logpath.empty()) { + return Status(ErrorCodes::BadValue, + "invalid to set AlwaysRecordTraffic without a logpath or " + "trafficRecordingDirectory"); + } else { + trafficRecordingDirectory = serverGlobalParams.logpath; + } + } + + shouldAlwaysRecordTraffic = true; + + return Status::OK(); +} + +} // namespace + +/** + * The Recording class represents a single recording that the recorder is exposing. It's made up of + * a background thread which flushes records to disk, and helper methods to push to that thread, + * expose stats, and stop the recording. + */ +class TrafficRecorder::Recording { +public: + Recording(const StartRecordingTraffic& options) + : _path(_getPath(options.getFilename().toString())), _maxLogSize(options.getMaxFileSize()) { + + MultiProducerSingleConsumerQueue<TrafficRecordingPacket, CostFunction>::Options + queueOptions; + queueOptions.maxQueueDepth = options.getBufferSize(); + if (!shouldAlwaysRecordTraffic) { + queueOptions.maxProducerQueueDepth = 0; + } + _pcqPipe = MultiProducerSingleConsumerQueue<TrafficRecordingPacket, CostFunction>::Pipe( + queueOptions); + + _trafficStats.setRunning(true); + _trafficStats.setBufferSize(options.getBufferSize()); + _trafficStats.setRecordingFile(_path); + _trafficStats.setMaxFileSize(_maxLogSize); + } + + void run() { + _thread = stdx::thread([ consumer = std::move(_pcqPipe.consumer), this ] { + try { + DataBuilder db; + std::fstream out(_path, + std::ios_base::binary | std::ios_base::trunc | std::ios_base::out); + + while (true) { + std::deque<TrafficRecordingPacket> storage; + size_t bytes; + + std::tie(storage, bytes) = consumer.popManyUpTo(MaxMessageSizeBytes); + + // if this fired... somehow we got a message bigger than a message + invariant(bytes); + + for (const auto& packet : storage) { + db.clear(); + Message toWrite = packet.message; + + uassertStatusOK(db.writeAndAdvance<LittleEndian<uint32_t>>(0)); + uassertStatusOK(db.writeAndAdvance<LittleEndian<uint64_t>>(packet.id)); + uassertStatusOK(db.writeAndAdvance<Terminated<'\0', StringData>>( + StringData(packet.local))); + uassertStatusOK(db.writeAndAdvance<Terminated<'\0', StringData>>( + StringData(packet.remote))); + uassertStatusOK(db.writeAndAdvance<LittleEndian<uint64_t>>( + packet.now.toMillisSinceEpoch())); + uassertStatusOK(db.writeAndAdvance<LittleEndian<uint64_t>>(packet.order)); + + auto size = db.size() + toWrite.size(); + uassertStatusOK(db.getCursor().write<LittleEndian<uint32_t>>(size)); + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _written += size; + } + + uassert(ErrorCodes::LogWriteFailed, + "hit maximum log size", + _written < _maxLogSize); + + out.write(db.getCursor().data(), db.size()); + out.write(toWrite.buf(), toWrite.size()); + } + } + } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueConsumed>&) { + // Close naturally + } catch (...) { + auto status = exceptionToStatus(); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + _result = status; + } + }); + } + + /** + * pushRecord returns false if the queue was full. This is ultimately fatal to the recording + */ + bool pushRecord(const transport::SessionHandle& ts, + Date_t now, + const uint64_t order, + const Message& message) { + try { + _pcqPipe.producer.push( + {ts->id(), ts->local().toString(), ts->remote().toString(), now, order, message}); + return true; + } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueProducerQueueDepthExceeded>&) { + invariant(!shouldAlwaysRecordTraffic); + + // If we couldn't push our packet begin the process of failing the recording + _pcqPipe.producer.close(); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // If the result was otherwise okay, mark it as failed due to the queue blocking. If + // it failed for another reason, don't overwrite that. + if (_result.isOK()) { + _result = Status(ErrorCodes::Error(51061), "queue was blocked in traffic recorder"); + } + } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) { + } + + return false; + } + + Status shutdown() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + if (!_inShutdown) { + _inShutdown = true; + lk.unlock(); + + _pcqPipe.producer.close(); + _thread.join(); + + lk.lock(); + } + + return _result; + } + + BSONObj getStats() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _trafficStats.setBufferedBytes(_pcqPipe.controller.getStats().queueDepth); + _trafficStats.setCurrentFileSize(_written); + return _trafficStats.toBSON(); + } + + AtomicWord<uint64_t> order{0}; + +private: + struct TrafficRecordingPacket { + const uint64_t id; + const std::string local; + const std::string remote; + const Date_t now; + const uint64_t order; + const Message message; + }; + + struct CostFunction { + size_t operator()(const TrafficRecordingPacket& packet) const { + return packet.message.size(); + } + }; + + static std::string _getPath(const std::string& filename) { + uassert(ErrorCodes::BadValue, + "Traffic recording filename must not be empty", + !filename.empty()); + + if (trafficRecordingDirectory.back() == '/') { + trafficRecordingDirectory.pop_back(); + } + auto parentPath = boost::filesystem::path(trafficRecordingDirectory); + auto path = parentPath / filename; + + uassert(ErrorCodes::BadValue, + "Traffic recording filename must be a simple filename", + path.parent_path() == parentPath); + + return path.string(); + } + + const std::string _path; + const size_t _maxLogSize; + + MultiProducerSingleConsumerQueue<TrafficRecordingPacket, CostFunction>::Pipe _pcqPipe; + stdx::thread _thread; + + stdx::mutex _mutex; + bool _inShutdown = false; + TrafficRecorderStats _trafficStats; + size_t _written = 0; + Status _result = Status::OK(); +}; + +namespace { +static const auto getTrafficRecorder = ServiceContext::declareDecoration<TrafficRecorder>(); +} // namespace + +TrafficRecorder& TrafficRecorder::get(ServiceContext* svc) { + return getTrafficRecorder(svc); +} + +TrafficRecorder::TrafficRecorder() : _shouldRecord(shouldAlwaysRecordTraffic) {} + +TrafficRecorder::~TrafficRecorder() { + if (shouldAlwaysRecordTraffic) { + _recording->shutdown().ignore(); + } +} + +void TrafficRecorder::start(const StartRecordingTraffic& options) { + invariant(!shouldAlwaysRecordTraffic); + + uassert(ErrorCodes::BadValue, + "Traffic recording directory not set", + !trafficRecordingDirectory.empty()); + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + uassert(ErrorCodes::BadValue, "Traffic recording already active", !_recording); + + _recording = std::make_shared<Recording>(options); + _recording->run(); + } + + _shouldRecord.store(true); +} + +void TrafficRecorder::stop() { + invariant(!shouldAlwaysRecordTraffic); + + _shouldRecord.store(false); + + auto recording = [&] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + uassert(ErrorCodes::BadValue, "Traffic recording not active", _recording); + + return std::move(_recording); + }(); + + uassertStatusOK(recording->shutdown()); +} + +void TrafficRecorder::observe(const transport::SessionHandle& ts, + Date_t now, + const Message& message) { + if (shouldAlwaysRecordTraffic) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (!_recording) { + StartRecordingTraffic options; + options.setFilename(AlwaysRecordTraffic); + options.setMaxFileSize(std::numeric_limits<int64_t>::max()); + + _recording = std::make_shared<Recording>(options); + _recording->run(); + } + } + + invariant(_recording->pushRecord(ts, now, _recording->order.addAndFetch(1), message)); + return; + } + + if (!_shouldRecord.load()) { + return; + } + + auto recording = _getCurrentRecording(); + + // If we don't have an active recording, bail + if (!recording) { + return; + } + + // Try to record the message + if (recording->pushRecord(ts, now, recording->order.addAndFetch(1), message)) { + return; + } + + // We couldn't queue + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // If the recording isn't the one we have in hand bail (its been ended, or a new one has + // been created + if (_recording != recording) { + return; + } + + // We couldn't queue and it's still our recording. No one else should try to queue + _shouldRecord.store(false); +} + +std::shared_ptr<TrafficRecorder::Recording> TrafficRecorder::_getCurrentRecording() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _recording; +} + +class TrafficRecorder::TrafficRecorderSSS : public ServerStatusSection { +public: + TrafficRecorderSSS() : ServerStatusSection("trafficRecording") {} + + bool includeByDefault() const override { + return true; + } + + BSONObj generateSection(OperationContext* opCtx, + const BSONElement& configElement) const override { + auto& recorder = TrafficRecorder::get(opCtx->getServiceContext()); + + if (!recorder._shouldRecord.load()) { + return BSON("running" << false); + } + + auto recording = recorder._getCurrentRecording(); + + if (!recording) { + return BSON("running" << false); + } + + return recording->getStats(); + } +} trafficRecorderStats; + +} // namespace mongo diff --git a/src/mongo/db/traffic_recorder.h b/src/mongo/db/traffic_recorder.h new file mode 100644 index 00000000000..8bd261cbfb4 --- /dev/null +++ b/src/mongo/db/traffic_recorder.h @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> + +#include "mongo/db/service_context.h" +#include "mongo/db/traffic_recorder_gen.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/rpc/message.h" +#include "mongo/stdx/mutex.h" +#include "mongo/transport/session.h" + +namespace mongo { + +/** + * A service context level global which captures packet capture through the transport layer if it is + * enabled. The service is intended to be turned on and off via startRecordingTrafficTraffic and + * stopRecordingTrafficTraffic. + * + * The recording can have one recording running at a time and the intention is that observe() blocks + * callers for the least amount of time possible. + */ +class TrafficRecorder { +public: + static TrafficRecorder& get(ServiceContext* svc); + + TrafficRecorder(); + ~TrafficRecorder(); + + // Start and stop block until the associate operation has succeeded or failed + // + // On failure these methods throw + void start(const StartRecordingTraffic& options); + void stop(); + + void observe(const transport::SessionHandle& ts, Date_t now, const Message& message); + +private: + class TrafficRecorderSSS; + class Recording; + + std::shared_ptr<Recording> _getCurrentRecording() const; + + AtomicWord<bool> _shouldRecord; + + // The mutex only protects the last recording shared_ptr + mutable stdx::mutex _mutex; + std::shared_ptr<Recording> _recording; +}; + +} // namespace mongo diff --git a/src/mongo/db/traffic_recorder.idl b/src/mongo/db/traffic_recorder.idl new file mode 100644 index 00000000000..fa9cd232c14 --- /dev/null +++ b/src/mongo/db/traffic_recorder.idl @@ -0,0 +1,77 @@ +# Copyright (C) 2018-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. + +# This IDL file describes the BSON format for a LogicalSessionId, and +# handles the serialization to and deserialization from its BSON representation +# for that class. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + + +structs: + + TrafficRecorderStats: + description: "A struct representing the trafficRecording server status section" + strict: true + fields: + running: + type: bool + bufferSize: + type: long + bufferedBytes: + type: long + recordingFile: + type: string + maxFileSize: + type: long + currentFileSize: + type: long + +commands: + startRecordingTraffic: + description: "start recording Command" + namespace: ignored + fields: + filename: + description: "output file name" + type: string + bufferSize: + description: "size of buffer" + default: 134217728 + type: long + maxFileSize: + description: "size of log file" + default: 6294967296 + type: long + + stopRecordingTraffic: + description: "stop recording Command" + namespace: ignored diff --git a/src/mongo/shell/shell_utils_launcher.cpp b/src/mongo/shell/shell_utils_launcher.cpp index 58b33fe83aa..5db817fff68 100644 --- a/src/mongo/shell/shell_utils_launcher.cpp +++ b/src/mongo/shell/shell_utils_launcher.cpp @@ -61,6 +61,7 @@ #endif #include "mongo/client/dbclient_connection.h" +#include "mongo/db/traffic_reader.h" #include "mongo/scripting/engine.h" #include "mongo/shell/shell_options.h" #include "mongo/shell/shell_utils.h" @@ -1062,6 +1063,14 @@ BSONObj StopMongoProgramByPid(const BSONObj& a, void* data) { return BSON("" << (double)code); } +BSONObj ConvertTrafficRecordingToBSON(const BSONObj& a, void* data) { + int nFields = a.nFields(); + uassert(ErrorCodes::FailedToParse, "wrong number of arguments", nFields == 1); + + auto arr = trafficRecordingFileToBSONArr(a.firstElement().String()); + return BSON("" << arr); +} + int KillMongoProgramInstances() { vector<ProcessId> pids; registry.getRegisteredPids(pids); @@ -1113,6 +1122,7 @@ void installShellUtilsLauncher(Scope& scope) { scope.injectNative("resetDbpath", ResetDbpath); scope.injectNative("pathExists", PathExists); scope.injectNative("copyDbpath", CopyDbpath); + scope.injectNative("convertTrafficRecordingToBSON", ConvertTrafficRecordingToBSON); } } // namespace shell_utils } // namespace mongo diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 6ca36ed7916..43032ccb1b7 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -174,6 +174,7 @@ env.Library( 'transport_layer_common', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/traffic_recorder', '$BUILD_DIR/mongo/transport/message_compressor', ], ) diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 6ecf18384d9..09d3a8b00f0 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -38,6 +38,7 @@ #include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/stats/counters.h" +#include "mongo/db/traffic_recorder.h" #include "mongo/rpc/message.h" #include "mongo/rpc/op_msg.h" #include "mongo/stdx/memory.h" @@ -423,6 +424,9 @@ void ServiceStateMachine::_sinkCallback(Status status) { void ServiceStateMachine::_processMessage(ThreadGuard guard) { invariant(!_inMessage.empty()); + TrafficRecorder::get(_serviceContext) + .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage); + auto& compressorMgr = MessageCompressorManager::forSession(_session()); _compressorId = boost::none; @@ -472,6 +476,10 @@ void ServiceStateMachine::_processMessage(ThreadGuard guard) { uassertStatusOK(swm.getStatus()); toSink = swm.getValue(); } + + TrafficRecorder::get(_serviceContext) + .observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink); + _sinkMessage(std::move(guard), std::move(toSink)); } else { |