diff options
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/server_options_helpers.cpp | 11 | ||||
-rw-r--r-- | src/mongo/shell/shell_options.cpp | 9 | ||||
-rw-r--r-- | src/mongo/transport/SConscript | 20 | ||||
-rw-r--r-- | src/mongo/transport/message_compressor_base.h | 141 | ||||
-rw-r--r-- | src/mongo/transport/message_compressor_noop.h | 55 | ||||
-rw-r--r-- | src/mongo/transport/message_compressor_registry.cpp | 137 | ||||
-rw-r--r-- | src/mongo/transport/message_compressor_registry.h | 119 | ||||
-rw-r--r-- | src/mongo/transport/message_compressor_registry_test.cpp | 87 |
10 files changed, 581 insertions, 0 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 1b5cfebb024..624ca0eb062 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -372,6 +372,7 @@ if not has_option('noshell') and usemozjs: 'rpc/protocol', 'scripting/scripting', 'shell/mongojs', + 'transport/message_compressor_registry', 'util/net/network', 'util/options_parser/options_parser_init', 'util/processinfo', diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 14eadcbecba..fea4ff19638 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -342,6 +342,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/util/cmdline_utils/cmdline_utils', + '$BUILD_DIR/mongo/transport/message_compressor_registry', # The dependency on network is a temporary crutch that should go away once the # networking library has separate options '$BUILD_DIR/mongo/util/net/network', diff --git a/src/mongo/db/server_options_helpers.cpp b/src/mongo/db/server_options_helpers.cpp index c1729324606..6956c8454f0 100644 --- a/src/mongo/db/server_options_helpers.cpp +++ b/src/mongo/db/server_options_helpers.cpp @@ -48,6 +48,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/logger/log_component.h" #include "mongo/logger/message_event_utf8_encoder.h" +#include "mongo/transport/message_compressor_registry.h" #include "mongo/util/cmdline_utils/censor_cmdline.h" #include "mongo/util/log.h" #include "mongo/util/map_util.h" @@ -389,6 +390,11 @@ Status addGeneralServerOptions(moe::OptionSection* options) { .hidden() .setSources(moe::SourceAllLegacy); + auto ret = addMessageCompressionOptions(options, false); + if (!ret.isOK()) { + return ret; + } + return Status::OK(); } @@ -1038,6 +1044,11 @@ Status storeServerOptions(const moe::Environment& params, const std::vector<std: } #endif + ret = storeMessageCompressionOptions(params); + if (!ret.isOK()) { + return ret; + } + return Status::OK(); } diff --git a/src/mongo/shell/shell_options.cpp b/src/mongo/shell/shell_options.cpp index 2b8b1ba513b..12abad3f60a 100644 --- a/src/mongo/shell/shell_options.cpp +++ b/src/mongo/shell/shell_options.cpp @@ -44,6 +44,7 @@ #include "mongo/db/server_options.h" #include "mongo/rpc/protocol.h" #include "mongo/shell/shell_utils.h" +#include "mongo/transport/message_compressor_registry.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/net/ssl_options.h" @@ -195,6 +196,10 @@ Status addMongoShellOptions(moe::OptionSection* options) { "rpcProtocols", "rpcProtocols", moe::String, " none, opQueryOnly, opCommandOnly, all") .hidden(); + ret = addMessageCompressionOptions(options, true); + if (!ret.isOK()) + return ret; + return Status::OK(); } @@ -401,6 +406,10 @@ Status storeMongoShellOptions(const moe::Environment& params, return Status(ErrorCodes::InvalidOptions, sb.str()); } + ret = storeMessageCompressionOptions(params); + if (!ret.isOK()) + return ret; + return Status::OK(); } } diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 38ef3410619..72a46fb93fb 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -99,4 +99,24 @@ env.CppUnitTest( LIBDEPS=[ 'transport_layer_mock', ], + +env.Library( + target='message_compressor_registry', + source=[ + 'message_compressor_registry.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/util/options_parser/options_parser', + ] +) + +env.CppUnitTest( + target='message_compressor_test', + source=[ + 'message_compressor_registry_test.cpp', + ], + LIBDEPS=[ + 'message_compressor_registry', + ] ) diff --git a/src/mongo/transport/message_compressor_base.h b/src/mongo/transport/message_compressor_base.h new file mode 100644 index 00000000000..c0a2c14b72d --- /dev/null +++ b/src/mongo/transport/message_compressor_base.h @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/data_range.h" +#include "mongo/base/status_with.h" +#include "mongo/base/string_data.h" +#include "mongo/platform/atomic_word.h" + +namespace mongo { +using MessageCompressorId = uint8_t; + +class MessageCompressorBase { + MONGO_DISALLOW_COPYING(MessageCompressorBase); + +public: + virtual ~MessageCompressorBase() = default; + + /* + * Returns the name for subclass compressors (e.g. "snappy" or "noop") + */ + const std::string& getName() const { + return _name; + } + + /* + * Returns the numeric ID for subclass compressors (e.g. 1 or 0) + */ + MessageCompressorId getId() const { + return _id; + } + + /* + * This returns the maximum output size of a call to compressData. It is used + * by the MessageCompressorManager to determine how big a buffer to allocate. + */ + virtual std::size_t getMaxCompressedSize(size_t inputSize) = 0; + + /* + * This method compresses the data in the input ConstDataRange into the output DataRange. + * It returns the number of bytes actually compressed into the output range, or an error + * status. + */ + virtual StatusWith<std::size_t> compressData(ConstDataRange input, DataRange output) = 0; + + /* + * This method decompresses the data in the input ConstDataRange into the output DataRange. + * It returns the number of bytes actually decompressed into the output range, or an error + * status. + */ + virtual StatusWith<std::size_t> decompressData(ConstDataRange input, DataRange output) = 0; + + /* + * This returns the number of bytes passed in the input for compressData + */ + int64_t getCompressedBytesIn() const { + return _compressBytesIn.loadRelaxed(); + } + + /* + * This returns the number of bytes written to output for compressData + */ + int64_t getCompressedBytesOut() const { + return _compressBytesOut.loadRelaxed(); + } + + /* + * This returns the number of bytes passed in the input for decompressData + */ + int64_t getDecompressedBytesIn() const { + return _decompressBytesIn.loadRelaxed(); + } + + /* + * This returns the number of bytes written to output for decompressData + */ + int64_t getDecompressedBytesOut() const { + return _decompressBytesOut.loadRelaxed(); + } + + +protected: + /* + * This is called by sub-classes to intialize their ID/name fields. + */ + MessageCompressorBase(MessageCompressorId id, StringData name) + : _id{id}, _name{name.toString()} {} + + /* + * Called by sub-classes to bump their bytesIn/bytesOut counters for compression + */ + void counterHitCompress(int64_t bytesIn, int64_t bytesOut) { + _compressBytesIn.addAndFetch(bytesIn); + _compressBytesOut.addAndFetch(bytesOut); + } + + /* + * Called by sub-classes to bump their bytesIn/bytesOut counters for decompression + */ + void counterHitDecompress(int64_t bytesIn, int64_t bytesOut) { + _decompressBytesIn.addAndFetch(bytesIn); + _decompressBytesOut.addAndFetch(bytesOut); + } + +private: + const MessageCompressorId _id; + const std::string _name; + + AtomicInt64 _compressBytesIn; + AtomicInt64 _compressBytesOut; + + AtomicInt64 _decompressBytesIn; + AtomicInt64 _decompressBytesOut; +}; +} // namespace mongo diff --git a/src/mongo/transport/message_compressor_noop.h b/src/mongo/transport/message_compressor_noop.h new file mode 100644 index 00000000000..54a2c795f9d --- /dev/null +++ b/src/mongo/transport/message_compressor_noop.h @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/transport/message_compressor_base.h" + +namespace mongo { + +class NoopMessageCompressor final : public MessageCompressorBase { +public: + NoopMessageCompressor() : MessageCompressorBase(0, "noop") {} + + std::size_t getMaxCompressedSize(size_t inputSize) override { + return inputSize; + } + + StatusWith<std::size_t> compressData(ConstDataRange input, DataRange output) override { + memcpy(const_cast<char*>(output.data()), input.data(), input.length()); + counterHitCompress(input.length(), input.length()); + return {input.length()}; + } + + StatusWith<std::size_t> decompressData(ConstDataRange input, DataRange output) override { + memcpy(const_cast<char*>(output.data()), input.data(), input.length()); + counterHitDecompress(input.length(), input.length()); + return {input.length()}; + } +}; +} // namespace mongo diff --git a/src/mongo/transport/message_compressor_registry.cpp b/src/mongo/transport/message_compressor_registry.cpp new file mode 100644 index 00000000000..e64d73b063e --- /dev/null +++ b/src/mongo/transport/message_compressor_registry.cpp @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/transport/message_compressor_registry.h" + +#include "mongo/base/init.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/message_compressor_noop.h" +#include "mongo/util/options_parser/option_section.h" + +#include <boost/algorithm/string/classification.hpp> +#include <boost/algorithm/string/split.hpp> + +namespace mongo { + +MessageCompressorRegistry& MessageCompressorRegistry::get() { + static MessageCompressorRegistry globalRegistry; + return globalRegistry; +} + +void MessageCompressorRegistry::registerImplementation( + std::unique_ptr<MessageCompressorBase> impl) { + // It's an error to register a compressor that's already been registered + fassert(40254, + _compressorsByName.find(impl->getName()) == _compressorsByName.end() && + _compressorsByIds[impl->getId()] == nullptr); + + // Check to see if this compressor is allowed by configuration + auto it = std::find(_compressorNames.begin(), _compressorNames.end(), impl->getName()); + if (it == _compressorNames.end()) + return; + + _compressorsByName[impl->getName()] = impl.get(); + _compressorsByIds[impl->getId()] = std::move(impl); +} + +void MessageCompressorRegistry::finalizeSupportedCompressors() { + // Remove compressor names from the compressorNames list if they were never registered. + // This prevents _compressorNames from having totally bogus names specified by users. + std::remove_if( + _compressorNames.begin(), _compressorNames.end(), [this](const std::string& name) { + return _compressorsByName.find(name) == _compressorsByName.end(); + }); +} + +const std::vector<std::string>& MessageCompressorRegistry::getCompressorNames() const { + return _compressorNames; +} + +MessageCompressorBase* MessageCompressorRegistry::getCompressor(MessageCompressorId id) const { + return _compressorsByIds.at(id).get(); +} + +MessageCompressorBase* MessageCompressorRegistry::getCompressor(StringData name) const { + auto it = _compressorsByName.find(name.toString()); + if (it == _compressorsByName.end()) + return nullptr; + return it->second; +} + +void MessageCompressorRegistry::setSupportedCompressors(std::vector<std::string>&& names) { + _compressorNames = std::move(names); +} + +Status addMessageCompressionOptions(moe::OptionSection* options, bool forShell) { + auto ret = + options + ->addOptionChaining("net.compression.compressors", + "networkMessageCompressors", + moe::String, + "Comma-separated list of compressors to use for network messages") + .setImplicit(moe::Value(std::string(""))); + if (forShell) + ret.hidden(); + + return Status::OK(); +} + +Status storeMessageCompressionOptions(const moe::Environment& params) { + std::vector<std::string> restrict; + if (params.count("net.compression.compressors")) { + auto compressorListStr = params["net.compression.compressors"].as<std::string>(); + boost::algorithm::split(restrict, compressorListStr, boost::is_any_of(", ")); + } + + auto& compressorFactory = MessageCompressorRegistry::get(); + compressorFactory.setSupportedCompressors(std::move(restrict)); + + return Status::OK(); +} + +// This instantiates and registers the "noop" compressor. It must happen after option storage +// because that's when the configuration of the compressors gets set. +MONGO_INITIALIZER_GENERAL(NoopMessageCompressorInit, + ("EndStartupOptionHandling"), + ("AllCompressorsRegistered")) +(InitializerContext* context) { + auto& compressorRegistry = MessageCompressorRegistry::get(); + compressorRegistry.registerImplementation(stdx::make_unique<NoopMessageCompressor>()); + return Status::OK(); +} + +// This cleans up any compressors that were requested by the user, but weren't registered by +// any compressor. It must be run after all the compressors have registered themselves with +// the global registry. +MONGO_INITIALIZER(AllCompressorsRegistered)(InitializerContext* context) { + MessageCompressorRegistry::get().finalizeSupportedCompressors(); + return Status::OK(); +} +} // namespace mongo diff --git a/src/mongo/transport/message_compressor_registry.h b/src/mongo/transport/message_compressor_registry.h new file mode 100644 index 00000000000..18c0e060aa5 --- /dev/null +++ b/src/mongo/transport/message_compressor_registry.h @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/transport/message_compressor_base.h" +#include "mongo/util/string_map.h" + +#include <array> +#include <limits> +#include <map> +#include <memory> +#include <string> +#include <vector> + +namespace mongo { + +namespace optionenvironment { +class OptionSection; +class Environment; +} // namespace option environment + +namespace moe = mongo::optionenvironment; + +/* + * The MessageCompressorRegistry holds the global registrations of compressors for a process. + */ +class MessageCompressorRegistry { + MONGO_DISALLOW_COPYING(MessageCompressorRegistry); + +public: + MessageCompressorRegistry() = default; + + /* + * Returns the global MessageCompressorRegistry + */ + static MessageCompressorRegistry& get(); + + /* + * Registers a new implementation of a MessageCompressor with the registry. This only gets + * called during startup. It is an error to call this twice with compressors with the same name + * or ID numbers. + * + * This method is not thread-safe and should only be called from a single-threaded context + * (a MONGO_INITIALIZER). + */ + void registerImplementation(std::unique_ptr<MessageCompressorBase> impl); + + /* + * Returns the list of compressor names that have been registered and configured. + * + * Iterators and value in this vector may be invalidated by calls to: + * setSupportedCompressors + * finalizeSupportedCompressors + */ + const std::vector<std::string>& getCompressorNames() const; + + /* + * Returns a compressor given an ID number. If no compressor exists with the ID number, it + * returns nullptr + */ + MessageCompressorBase* getCompressor(MessageCompressorId id) const; + + /* Returns a compressor given a name. If no compressor with that name exists, it returns + * nullptr + */ + MessageCompressorBase* getCompressor(StringData name) const; + + /* + * Sets the list of supported compressors for this registry. Should be called during + * option parsing and before calling registerImplementation for any compressors. + */ + void setSupportedCompressors(std::vector<std::string>&& compressorNames); + + /* + * Finalizes the list of supported compressors for this registry. Should be called after all + * calls to registerImplementation. It will remove any compressor names that aren't keys in + * the _compressors map. + */ + void finalizeSupportedCompressors(); + +private: + StringMap<MessageCompressorBase*> _compressorsByName; + std::array<std::unique_ptr<MessageCompressorBase>, + std::numeric_limits<MessageCompressorId>::max() + 1> + _compressorsByIds; + std::vector<std::string> _compressorNames; +}; + +Status addMessageCompressionOptions(moe::OptionSection* options, bool forShell); +Status storeMessageCompressionOptions(const moe::Environment& params); +} // namespace mongo diff --git a/src/mongo/transport/message_compressor_registry_test.cpp b/src/mongo/transport/message_compressor_registry_test.cpp new file mode 100644 index 00000000000..b3a766f2f55 --- /dev/null +++ b/src/mongo/transport/message_compressor_registry_test.cpp @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/stdx/memory.h" +#include "mongo/transport/message_compressor_noop.h" +#include "mongo/transport/message_compressor_registry.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +/** + * Asserts that a value is null + * + * TODO: Move this into the unittest's standard ASSERT_ macros. + */ +#define ASSERT_NULL(a) ASSERT_EQ(a, static_cast<decltype(a)>(nullptr)) + +namespace mongo { +namespace { +TEST(MessageCompressorRegistry, RegularTest) { + MessageCompressorRegistry registry; + auto compressor = stdx::make_unique<NoopMessageCompressor>(); + auto compressorPtr = compressor.get(); + + std::vector<std::string> compressorList = {compressorPtr->getName()}; + auto compressorListCheck = compressorList; + registry.setSupportedCompressors(std::move(compressorList)); + registry.registerImplementation(std::move(compressor)); + registry.finalizeSupportedCompressors(); + + ASSERT_TRUE(compressorListCheck == registry.getCompressorNames()); + + ASSERT_EQ(registry.getCompressor(compressorPtr->getName()), compressorPtr); + ASSERT_EQ(registry.getCompressor(compressorPtr->getId()), compressorPtr); + + ASSERT_NULL(registry.getCompressor("fakecompressor")); + ASSERT_NULL(registry.getCompressor(255)); +} + +TEST(MessageCompressorRegistry, NothingRegistered) { + MessageCompressorRegistry registry; + + ASSERT_NULL(registry.getCompressor("noop")); + ASSERT_NULL(registry.getCompressor(0)); +} + +TEST(MessageCompressorRegistry, SetSupported) { + MessageCompressorRegistry registry; + auto compressor = stdx::make_unique<NoopMessageCompressor>(); + auto compressorPtr = compressor.get(); + + std::vector<std::string> compressorList = {"foobar"}; + registry.setSupportedCompressors(std::move(compressorList)); + registry.registerImplementation(std::move(compressor)); + registry.finalizeSupportedCompressors(); + + ASSERT_NULL(registry.getCompressor(compressorPtr->getName())); + ASSERT_NULL(registry.getCompressor(compressorPtr->getId())); +} +} // namespace +} // namespace mongo |