From 85d4a3a085c67f2258b60b07259db73e2f29ea50 Mon Sep 17 00:00:00 2001 From: Hannes Magnusson Date: Wed, 7 Dec 2016 15:00:49 -0800 Subject: SERVER-27310 Add support for zlib wire protocol compression Closes #1152 Signed-off-by: Jonathan Reams --- src/mongo/transport/SConscript | 7 +- src/mongo/transport/message_compressor_base.h | 3 +- .../transport/message_compressor_manager_test.cpp | 9 ++- .../transport/message_compressor_registry.cpp | 3 + src/mongo/transport/message_compressor_zlib.cpp | 89 ++++++++++++++++++++++ src/mongo/transport/message_compressor_zlib.h | 44 +++++++++++ src/third_party/zlib-1.2.8/SConscript | 2 + src/third_party/zlib-1.2.8/compress.c | 80 +++++++++++++++++++ src/third_party/zlib-1.2.8/uncompr.c | 59 ++++++++++++++ 9 files changed, 293 insertions(+), 3 deletions(-) create mode 100644 src/mongo/transport/message_compressor_zlib.cpp create mode 100644 src/mongo/transport/message_compressor_zlib.h create mode 100644 src/third_party/zlib-1.2.8/compress.c create mode 100644 src/third_party/zlib-1.2.8/uncompr.c diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index b365613660d..4d66ab981e1 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -103,19 +103,24 @@ env.CppUnitTest( ], ) -env.Library( + +zlibEnv = env.Clone() +zlibEnv.InjectThirdPartyIncludePaths(libraries=['zlib']) +zlibEnv.Library( target='message_compressor', source=[ 'message_compressor_manager.cpp', 'message_compressor_metrics.cpp', 'message_compressor_registry.cpp', 'message_compressor_snappy.cpp', + 'message_compressor_zlib.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/util/decorable', '$BUILD_DIR/mongo/util/options_parser/options_parser', '$BUILD_DIR/third_party/shim_snappy', + '$BUILD_DIR/third_party/shim_zlib', ] ) diff --git a/src/mongo/transport/message_compressor_base.h b/src/mongo/transport/message_compressor_base.h index a2c05a6069e..0d1a6cf631e 100644 --- a/src/mongo/transport/message_compressor_base.h +++ b/src/mongo/transport/message_compressor_base.h @@ -39,6 +39,7 @@ namespace mongo { enum class MessageCompressor : uint8_t { kNoop = 0, kSnappy = 1, + kZlib = 2, kExtended = 255, }; @@ -52,7 +53,7 @@ public: virtual ~MessageCompressorBase() = default; /* - * Returns the name for subclass compressors (e.g. "snappy" or "noop") + * Returns the name for subclass compressors (e.g. "snappy", "zlib", or "noop") */ const std::string& getName() const { return _name; diff --git a/src/mongo/transport/message_compressor_manager_test.cpp b/src/mongo/transport/message_compressor_manager_test.cpp index 383bb1e3260..50da507a2c6 100644 --- a/src/mongo/transport/message_compressor_manager_test.cpp +++ b/src/mongo/transport/message_compressor_manager_test.cpp @@ -33,6 +33,8 @@ #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/message_compressor_noop.h" #include "mongo/transport/message_compressor_registry.h" +#include "mongo/transport/message_compressor_snappy.h" +#include "mongo/transport/message_compressor_zlib.h" #include "mongo/unittest/unittest.h" #include "mongo/util/net/message.h" @@ -179,7 +181,12 @@ TEST(NoopMessageCompressor, Fidelity) { TEST(SnappyMessageCompressor, Fidelity) { auto testMessage = buildMessage(); - checkFidelity(testMessage, stdx::make_unique()); + checkFidelity(testMessage, stdx::make_unique()); +} + +TEST(ZlibMessageCompressor, Fidelity) { + auto testMessage = buildMessage(); + checkFidelity(testMessage, stdx::make_unique()); } } // namespace mongo diff --git a/src/mongo/transport/message_compressor_registry.cpp b/src/mongo/transport/message_compressor_registry.cpp index 46b464b3f07..2cadfa6f1cb 100644 --- a/src/mongo/transport/message_compressor_registry.cpp +++ b/src/mongo/transport/message_compressor_registry.cpp @@ -34,6 +34,7 @@ #include "mongo/stdx/memory.h" #include "mongo/transport/message_compressor_noop.h" #include "mongo/transport/message_compressor_snappy.h" +#include "mongo/transport/message_compressor_zlib.h" #include "mongo/util/options_parser/option_section.h" #include @@ -51,6 +52,8 @@ StringData getMessageCompressorName(MessageCompressor id) { return "noop"_sd; case MessageCompressor::kSnappy: return "snappy"_sd; + case MessageCompressor::kZlib: + return "zlib"_sd; default: fassert(40269, "Invalid message compressor ID"); } diff --git a/src/mongo/transport/message_compressor_zlib.cpp b/src/mongo/transport/message_compressor_zlib.cpp new file mode 100644 index 00000000000..193e98fac85 --- /dev/null +++ b/src/mongo/transport/message_compressor_zlib.cpp @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/base/init.h" +#include "mongo/stdx/memory.h" +#include "mongo/transport/message_compressor_registry.h" +#include "mongo/transport/message_compressor_zlib.h" + +#include + +namespace mongo { + +ZlibMessageCompressor::ZlibMessageCompressor() : MessageCompressorBase(MessageCompressor::kZlib) {} + +std::size_t ZlibMessageCompressor::getMaxCompressedSize(size_t inputSize) { + return ::compressBound(inputSize); +} + +StatusWith ZlibMessageCompressor::compressData(ConstDataRange input, + DataRange output) { + size_t outLength = output.length(); + int ret = ::compress2(const_cast(reinterpret_cast(output.data())), + reinterpret_cast(&outLength), + reinterpret_cast(input.data()), + input.length(), + Z_DEFAULT_COMPRESSION); + + if (ret != Z_OK) { + return Status{ErrorCodes::BadValue, "Could not compress input"}; + } + counterHitCompress(input.length(), outLength); + return {outLength}; +} + +StatusWith ZlibMessageCompressor::decompressData(ConstDataRange input, + DataRange output) { + uLongf length = output.length(); + int ret = ::uncompress(const_cast(reinterpret_cast(output.data())), + &length, + reinterpret_cast(input.data()), + input.length()); + + if (ret != Z_OK) { + return Status{ErrorCodes::BadValue, "Compressed message was invalid or corrupted"}; + } + + counterHitDecompress(input.length(), output.length()); + return {output.length()}; +} + + +MONGO_INITIALIZER_GENERAL(ZlibMessageCompressorInit, + ("EndStartupOptionHandling"), + ("AllCompressorsRegistered")) +(InitializerContext* context) { + auto& compressorRegistry = MessageCompressorRegistry::get(); + compressorRegistry.registerImplementation(stdx::make_unique()); + return Status::OK(); +} +} // namespace mongo diff --git a/src/mongo/transport/message_compressor_zlib.h b/src/mongo/transport/message_compressor_zlib.h new file mode 100644 index 00000000000..09466a9e1d9 --- /dev/null +++ b/src/mongo/transport/message_compressor_zlib.h @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/transport/message_compressor_base.h" + +namespace mongo { +class ZlibMessageCompressor final : public MessageCompressorBase { +public: + ZlibMessageCompressor(); + + std::size_t getMaxCompressedSize(size_t inputSize) override; + + StatusWith compressData(ConstDataRange input, DataRange output) override; + + StatusWith decompressData(ConstDataRange input, DataRange output) override; +}; + + +} // namespace mongo diff --git a/src/third_party/zlib-1.2.8/SConscript b/src/third_party/zlib-1.2.8/SConscript index f0072456a25..9052821164d 100644 --- a/src/third_party/zlib-1.2.8/SConscript +++ b/src/third_party/zlib-1.2.8/SConscript @@ -12,12 +12,14 @@ env.Library( source=[ 'adler32.c', 'crc32.c', + 'compress.c', 'deflate.c', 'infback.c', 'inffast.c', 'inflate.c', 'inftrees.c', 'trees.c', + 'uncompr.c', 'zutil.c', ], LIBDEPS_TAGS=[ diff --git a/src/third_party/zlib-1.2.8/compress.c b/src/third_party/zlib-1.2.8/compress.c new file mode 100644 index 00000000000..6e9762676a0 --- /dev/null +++ b/src/third_party/zlib-1.2.8/compress.c @@ -0,0 +1,80 @@ +/* compress.c -- compress a memory buffer + * Copyright (C) 1995-2005 Jean-loup Gailly. + * For conditions of distribution and use, see copyright notice in zlib.h + */ + +/* @(#) $Id$ */ + +#define ZLIB_INTERNAL +#include "zlib.h" + +/* =========================================================================== + Compresses the source buffer into the destination buffer. The level + parameter has the same meaning as in deflateInit. sourceLen is the byte + length of the source buffer. Upon entry, destLen is the total size of the + destination buffer, which must be at least 0.1% larger than sourceLen plus + 12 bytes. Upon exit, destLen is the actual size of the compressed buffer. + + compress2 returns Z_OK if success, Z_MEM_ERROR if there was not enough + memory, Z_BUF_ERROR if there was not enough room in the output buffer, + Z_STREAM_ERROR if the level parameter is invalid. +*/ +int ZEXPORT compress2 (dest, destLen, source, sourceLen, level) + Bytef *dest; + uLongf *destLen; + const Bytef *source; + uLong sourceLen; + int level; +{ + z_stream stream; + int err; + + stream.next_in = (z_const Bytef *)source; + stream.avail_in = (uInt)sourceLen; +#ifdef MAXSEG_64K + /* Check for source > 64K on 16-bit machine: */ + if ((uLong)stream.avail_in != sourceLen) return Z_BUF_ERROR; +#endif + stream.next_out = dest; + stream.avail_out = (uInt)*destLen; + if ((uLong)stream.avail_out != *destLen) return Z_BUF_ERROR; + + stream.zalloc = (alloc_func)0; + stream.zfree = (free_func)0; + stream.opaque = (voidpf)0; + + err = deflateInit(&stream, level); + if (err != Z_OK) return err; + + err = deflate(&stream, Z_FINISH); + if (err != Z_STREAM_END) { + deflateEnd(&stream); + return err == Z_OK ? Z_BUF_ERROR : err; + } + *destLen = stream.total_out; + + err = deflateEnd(&stream); + return err; +} + +/* =========================================================================== + */ +int ZEXPORT compress (dest, destLen, source, sourceLen) + Bytef *dest; + uLongf *destLen; + const Bytef *source; + uLong sourceLen; +{ + return compress2(dest, destLen, source, sourceLen, Z_DEFAULT_COMPRESSION); +} + +/* =========================================================================== + If the default memLevel or windowBits for deflateInit() is changed, then + this function needs to be updated. + */ +uLong ZEXPORT compressBound (sourceLen) + uLong sourceLen; +{ + return sourceLen + (sourceLen >> 12) + (sourceLen >> 14) + + (sourceLen >> 25) + 13; +} diff --git a/src/third_party/zlib-1.2.8/uncompr.c b/src/third_party/zlib-1.2.8/uncompr.c new file mode 100644 index 00000000000..242e9493dff --- /dev/null +++ b/src/third_party/zlib-1.2.8/uncompr.c @@ -0,0 +1,59 @@ +/* uncompr.c -- decompress a memory buffer + * Copyright (C) 1995-2003, 2010 Jean-loup Gailly. + * For conditions of distribution and use, see copyright notice in zlib.h + */ + +/* @(#) $Id$ */ + +#define ZLIB_INTERNAL +#include "zlib.h" + +/* =========================================================================== + Decompresses the source buffer into the destination buffer. sourceLen is + the byte length of the source buffer. Upon entry, destLen is the total + size of the destination buffer, which must be large enough to hold the + entire uncompressed data. (The size of the uncompressed data must have + been saved previously by the compressor and transmitted to the decompressor + by some mechanism outside the scope of this compression library.) + Upon exit, destLen is the actual size of the compressed buffer. + + uncompress returns Z_OK if success, Z_MEM_ERROR if there was not + enough memory, Z_BUF_ERROR if there was not enough room in the output + buffer, or Z_DATA_ERROR if the input data was corrupted. +*/ +int ZEXPORT uncompress (dest, destLen, source, sourceLen) + Bytef *dest; + uLongf *destLen; + const Bytef *source; + uLong sourceLen; +{ + z_stream stream; + int err; + + stream.next_in = (z_const Bytef *)source; + stream.avail_in = (uInt)sourceLen; + /* Check for source > 64K on 16-bit machine: */ + if ((uLong)stream.avail_in != sourceLen) return Z_BUF_ERROR; + + stream.next_out = dest; + stream.avail_out = (uInt)*destLen; + if ((uLong)stream.avail_out != *destLen) return Z_BUF_ERROR; + + stream.zalloc = (alloc_func)0; + stream.zfree = (free_func)0; + + err = inflateInit(&stream); + if (err != Z_OK) return err; + + err = inflate(&stream, Z_FINISH); + if (err != Z_STREAM_END) { + inflateEnd(&stream); + if (err == Z_NEED_DICT || (err == Z_BUF_ERROR && stream.avail_in == 0)) + return Z_DATA_ERROR; + return err; + } + *destLen = stream.total_out; + + err = inflateEnd(&stream); + return err; +} -- cgit v1.2.1