diff options
-rw-r--r-- | src/mongo/base/data_range.h | 6 | ||||
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/ftdc/SConscript | 36 | ||||
-rw-r--r-- | src/mongo/db/ftdc/block_compressor.cpp | 119 | ||||
-rw-r--r-- | src/mongo/db/ftdc/block_compressor.h | 72 | ||||
-rw-r--r-- | src/mongo/db/ftdc/compressor.cpp | 218 | ||||
-rw-r--r-- | src/mongo/db/ftdc/compressor.h | 177 | ||||
-rw-r--r-- | src/mongo/db/ftdc/compressor_test.cpp | 429 | ||||
-rw-r--r-- | src/mongo/db/ftdc/config.h | 98 | ||||
-rw-r--r-- | src/mongo/db/ftdc/constants.h | 45 | ||||
-rw-r--r-- | src/mongo/db/ftdc/decompressor.cpp | 169 | ||||
-rw-r--r-- | src/mongo/db/ftdc/decompressor.h | 63 | ||||
-rw-r--r-- | src/mongo/db/ftdc/util.cpp | 399 | ||||
-rw-r--r-- | src/mongo/db/ftdc/util.h | 184 | ||||
-rw-r--r-- | src/mongo/db/ftdc/util_test.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/ftdc/varint.cpp | 84 | ||||
-rw-r--r-- | src/mongo/db/ftdc/varint.h | 92 | ||||
-rw-r--r-- | src/mongo/db/ftdc/varint_test.cpp | 88 |
19 files changed, 2331 insertions, 0 deletions
diff --git a/src/mongo/base/data_range.h b/src/mongo/base/data_range.h index ee6da34ee7a..5a152bdddf8 100644 --- a/src/mongo/base/data_range.h +++ b/src/mongo/base/data_range.h @@ -54,6 +54,9 @@ public: invariant(end >= begin); } + ConstDataRange(const char* begin, std::size_t length, std::ptrdiff_t debug_offset = 0) + : ConstDataRange(begin, begin + length, debug_offset) {} + const char* data() const { return _begin; } @@ -112,6 +115,9 @@ public: DataRange(bytes_type begin, bytes_type end, std::ptrdiff_t debug_offset = 0) : ConstDataRange(begin, end, debug_offset) {} + DataRange(bytes_type begin, std::size_t length, std::ptrdiff_t debug_offset = 0) + : ConstDataRange(begin, length, debug_offset) {} + template <typename T> Status write(const T& value, std::size_t offset = 0) { if (offset > length()) { diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 2d0471fa7ed..2beae89d8f1 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -146,6 +146,7 @@ error_code("CursorInUse", 143) error_code("IncompatibleCatalogManager", 144) error_code("PooledConnectionsDropped", 145) error_code("ExceededMemoryLimit", 146) +error_code("ZLibError", 147) # Non-sequential error codes (for compatibility only) error_code("RecvStaleConfig", 9996) diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 3ca65f44c82..9f1c8bc530b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -12,6 +12,7 @@ env.SConscript( 'concurrency', 'exec', 'fts', + 'ftdc', 'geo', 'index', 'matcher', @@ -689,6 +690,7 @@ serveronlyLibdeps = [ "curop", "exec/exec", "exec/working_set", + "ftdc/ftdc_mongod", "fts/ftsmongod", "global_timestamp", "index/index_descriptor", diff --git a/src/mongo/db/ftdc/SConscript b/src/mongo/db/ftdc/SConscript new file mode 100644 index 00000000000..b050d88f0bb --- /dev/null +++ b/src/mongo/db/ftdc/SConscript @@ -0,0 +1,36 @@ +# -*- mode: python -*- +Import("env") + +ftdcEnv = env.Clone() +ftdcEnv.InjectThirdPartyIncludePaths(libraries=['zlib']) + +ftdcEnv.Library( + target='ftdc', + source=[ + 'block_compressor.cpp', + 'compressor.cpp', + 'decompressor.cpp', + 'util.cpp', + 'varint.cpp' + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + '$BUILD_DIR/mongo/db/service_context', + '$BUILD_DIR/third_party/s2/s2', # For VarInt + '$BUILD_DIR/third_party/shim_zlib', + ], +) + +env.CppUnitTest( + target='ftdc_test', + source=[ + 'compressor_test.cpp', + 'ftdc_test.cpp', + 'util_test.cpp', + 'varint_test.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/bson/util/bson_extract', + 'ftdc', + ]) + diff --git a/src/mongo/db/ftdc/block_compressor.cpp b/src/mongo/db/ftdc/block_compressor.cpp new file mode 100644 index 00000000000..1077426fc4b --- /dev/null +++ b/src/mongo/db/ftdc/block_compressor.cpp @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/ftdc/block_compressor.h" + +#include <zlib.h> + +#include "mongo/util/mongoutils/str.h" + +namespace mongo { + +StatusWith<ConstDataRange> BlockCompressor::compress(ConstDataRange source) { + z_stream stream; + int level = Z_DEFAULT_COMPRESSION; + + stream.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(source.data())); + stream.avail_in = source.length(); + + // In compress.c in the zlib source, they recommend that the compression buffer be + // at least 0.1% larger + 12 bytes then the source length. + // We make the buffer 1% larger then the source length buffer to be on the safe side. If we are + // too small, deflate returns an error. + _buffer.resize(source.length() * 1.01 + 12); + + stream.next_out = _buffer.data(); + stream.avail_out = _buffer.size(); + + stream.zalloc = nullptr; + stream.zfree = nullptr; + stream.opaque = nullptr; + + int err = deflateInit(&stream, level); + if (err != Z_OK) { + return {ErrorCodes::ZLibError, str::stream() << "deflateInit failed with " << err}; + } + + err = deflate(&stream, Z_FINISH); + if (err != Z_STREAM_END) { + (void)deflateEnd(&stream); + + if (err != Z_OK) { + return {ErrorCodes::ZLibError, str::stream() << "deflate failed with " << err}; + } + } + + err = deflateEnd(&stream); + if (err != Z_OK) { + return {ErrorCodes::ZLibError, str::stream() << "deflateEnd failed with " << err}; + } + + return ConstDataRange(reinterpret_cast<char*>(_buffer.data()), stream.total_out); +} + +StatusWith<ConstDataRange> BlockCompressor::uncompress(ConstDataRange source, + size_t uncompressedLength) { + z_stream stream; + + stream.next_in = reinterpret_cast<unsigned char*>(const_cast<char*>(source.data())); + stream.avail_in = source.length(); + + _buffer.resize(uncompressedLength); + + stream.next_out = _buffer.data(); + stream.avail_out = _buffer.size(); + + stream.zalloc = nullptr; + stream.zfree = nullptr; + stream.opaque = nullptr; + + int err = inflateInit(&stream); + if (err != Z_OK) { + return {ErrorCodes::ZLibError, str::stream() << "inflateInit failed with " << err}; + } + + err = inflate(&stream, Z_FINISH); + if (err != Z_STREAM_END) { + (void)inflateEnd(&stream); + + if (err != Z_OK) { + return {ErrorCodes::ZLibError, str::stream() << "inflate failed with " << err}; + } + } + + err = inflateEnd(&stream); + if (err != Z_OK) { + return {ErrorCodes::ZLibError, str::stream() << "inflateEnd failed with " << err}; + } + + return ConstDataRange(reinterpret_cast<char*>(_buffer.data()), stream.total_out); +} + +} // namespace mongo diff --git a/src/mongo/db/ftdc/block_compressor.h b/src/mongo/db/ftdc/block_compressor.h new file mode 100644 index 00000000000..23ac8c811c0 --- /dev/null +++ b/src/mongo/db/ftdc/block_compressor.h @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <cstdint> +#include <vector> + +#include "mongo/base/data_range.h" +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" + +namespace mongo { + +/** + * Compesses and uncompresses a block of buffer using zlib. + */ +class BlockCompressor { + MONGO_DISALLOW_COPYING(BlockCompressor); + +public: + BlockCompressor() = default; + + /** + * Compress a buffer of data. + * + * Returns a pointer to a buffer that BlockCompressor owns. + * The returned buffer is valid until the next call to compress or uncompress. + */ + StatusWith<ConstDataRange> compress(ConstDataRange source); + + /** + * Uncompress a buffer of data. + * + * maxUncompressedLength is the upper bound on the size of the uncompressed data + * so that an internal buffer can be allocated to fit it. + * + * Returns a pointer to a buffer that BlockCompressor owns. + * The returned buffer is valid until the next call to compress or uncompress. + */ + StatusWith<ConstDataRange> uncompress(ConstDataRange source, size_t maxUncompressedLength); + +private: + std::vector<std::uint8_t> _buffer; +}; + +} // namespace mongo diff --git a/src/mongo/db/ftdc/compressor.cpp b/src/mongo/db/ftdc/compressor.cpp new file mode 100644 index 00000000000..1467075ed1c --- /dev/null +++ b/src/mongo/db/ftdc/compressor.cpp @@ -0,0 +1,218 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/ftdc/compressor.h" + +#include "mongo/base/data_builder.h" +#include "mongo/db/ftdc/config.h" +#include "mongo/db/ftdc/util.h" +#include "mongo/db/ftdc/varint.h" +#include "mongo/db/jsobj.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +using std::swap; + +StatusWith<boost::optional<std::tuple<ConstDataRange, FTDCCompressor::CompressorState>>> +FTDCCompressor::addSample(const BSONObj& sample) { + if (_referenceDoc.isEmpty()) { + FTDCBSONUtil::extractMetricsFromDocument(sample, sample, &_metrics); + _reset(sample); + return {boost::none_t()}; + } + + _metrics.resize(0); + + auto swMatches = FTDCBSONUtil::extractMetricsFromDocument(_referenceDoc, sample, &_metrics); + + if (!swMatches.isOK()) { + return swMatches.getStatus(); + } + + dassert((swMatches.getValue() == false || _metricsCount == _metrics.size()) && + _metrics.size() < std::numeric_limits<std::uint32_t>::max()); + + // We need to flush the current set of samples since the BSON schema has changed. + if (!swMatches.getValue()) { + auto swCompressedSamples = getCompressedSamples(); + + if (!swCompressedSamples.isOK()) { + return swCompressedSamples.getStatus(); + } + + // Set the new sample as the current reference document as we have to start all over + _reset(sample); + return {boost::optional<std::tuple<ConstDataRange, FTDCCompressor::CompressorState>>( + std::tuple<ConstDataRange, FTDCCompressor::CompressorState>( + swCompressedSamples.getValue(), CompressorState::kSchemaChanged))}; + } + + + // Add another sample + for (std::size_t i = 0; i < _metrics.size(); ++i) { + // NOTE: This touches a lot of cache lines so that compression code can be more effcient. + _deltas[getArrayOffset(_maxSamples, _sampleCount, i)] = _metrics[i] - _prevmetrics[i]; + } + + ++_sampleCount; + + _prevmetrics.clear(); + swap(_prevmetrics, _metrics); + + // If the count is full, flush + if (_sampleCount == _maxSamples) { + auto swCompressedSamples = getCompressedSamples(); + + if (!swCompressedSamples.isOK()) { + return swCompressedSamples.getStatus(); + } + + // Setup so that we treat the next sample as the reference sample + _referenceDoc = BSONObj(); + + return {boost::optional<std::tuple<ConstDataRange, FTDCCompressor::CompressorState>>( + std::tuple<ConstDataRange, FTDCCompressor::CompressorState>( + swCompressedSamples.getValue(), CompressorState::kCompressorFull))}; + } + + // The buffer is not full, inform the caller + return {boost::none_t()}; +} + +StatusWith<ConstDataRange> FTDCCompressor::getCompressedSamples() { + _chunkBuffer.setlen(0); + + // Append reference document - BSON Object + _chunkBuffer.appendBuf(_referenceDoc.objdata(), _referenceDoc.objsize()); + + // Append count of metrics - uint32 little endian + _chunkBuffer.appendNum(static_cast<std::uint32_t>(_metricsCount)); + + // Append count of samples - uint32 little endian + _chunkBuffer.appendNum(static_cast<std::uint32_t>(_sampleCount)); + + if (_metricsCount == 0 || _sampleCount == 0) { + return ConstDataRange(_chunkBuffer.buf(), static_cast<size_t>(_chunkBuffer.len())); + } + + // On average, we do not need all 10 bytes for every sample, worst case, we grow the buffer + DataBuilder db(_metricsCount * _sampleCount * FTDCVarInt::kMaxSizeBytes64 / 2); + + std::uint32_t zeroesCount = 0; + + // For each set of samples for a particular metric, + // we think of it is simple array of 64-bit integers we try to compress into a byte array. + // This is done in three steps for each metric + // 1. Delta Compression + // - i.e., we store the difference between pairs of samples, not their absolute values + // - this is done in addSamples + // 2. Run Length Encoding of zeros + // - We find consecutive sets of zeros and represent them as a tuple of (0, count - 1). + // - Each memeber is stored as VarInt packed integer + // 3. Finally, for non-zero members, we store these as VarInt packed + // + // These byte arrays are added to a buffer which is then concatenated with other chunks and + // compressed with ZLIB. + for (std::uint32_t i = 0; i < _metricsCount; i++) { + for (std::uint32_t j = 0; j < _sampleCount; j++) { + std::uint64_t delta = _deltas[getArrayOffset(_maxSamples, j, i)]; + + if (delta == 0) { + ++zeroesCount; + continue; + } + + // If we have a non-zero sample, then write out all the accumulated zero samples. + if (zeroesCount > 0) { + auto s1 = db.writeAndAdvance(FTDCVarInt(0)); + if (!s1.isOK()) { + return s1; + } + + auto s2 = db.writeAndAdvance(FTDCVarInt(zeroesCount - 1)); + if (!s2.isOK()) { + return s2; + } + + zeroesCount = 0; + } + + auto s3 = db.writeAndAdvance(FTDCVarInt(delta)); + if (!s3.isOK()) { + return s3; + } + } + + // If we are on the last metric, and the previous loop ended in a zero, write out the RLE + // pair of zero information. + if ((i == (_metricsCount - 1)) && zeroesCount) { + auto s1 = db.writeAndAdvance(FTDCVarInt(0)); + if (!s1.isOK()) { + return s1; + } + + auto s2 = db.writeAndAdvance(FTDCVarInt(zeroesCount - 1)); + if (!s2.isOK()) { + return s2; + } + } + } + + auto swDest = _compressor.compress(db.getCursor()); + + // The only way for compression to fail is if the buffer size calculations are wrong + if (!swDest.isOK()) { + return swDest.getStatus(); + } + + _chunkBuffer.appendBuf(swDest.getValue().data(), swDest.getValue().length()); + + return ConstDataRange(_chunkBuffer.buf(), static_cast<size_t>(_chunkBuffer.len())); +} + +void FTDCCompressor::reset() { + _metrics.clear(); + _reset(BSONObj()); +} + +void FTDCCompressor::_reset(const BSONObj& referenceDoc) { + _referenceDoc = referenceDoc; + + _metricsCount = _metrics.size(); + _sampleCount = 0; + _prevmetrics.clear(); + swap(_prevmetrics, _metrics); + + _maxSamples = _config->maxSamplesPerArchiveMetricChunk; + _deltas.resize(_metricsCount * _maxSamples); +} + +} // namespace mongo diff --git a/src/mongo/db/ftdc/compressor.h b/src/mongo/db/ftdc/compressor.h new file mode 100644 index 00000000000..e1a994ccf2d --- /dev/null +++ b/src/mongo/db/ftdc/compressor.h @@ -0,0 +1,177 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <cstddef> +#include <cstdint> +#include <tuple> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/util/builder.h" +#include "mongo/db/ftdc/block_compressor.h" +#include "mongo/db/ftdc/config.h" +#include "mongo/db/jsobj.h" + +namespace mongo { + +/** + * FTDCCompressor is responsible for taking a set of BSON documents containing metrics, and + * compressing them into a highly compressed buffer. Metrics are defined as BSON number or number + * like type (like dates, and timestamps). + * + * Compression Method + * 1. For each document after the first, it computes the delta between it and the preceding document + * for the number fields + * 2. It stores the deltas into an array of std::int64_t. + * 3. It compressed each std::int64_t using VarInt integer compression. See varint.h. + * 4. Encodes zeros in Run Length Encoded pairs of <Count, Zero> + * 5. ZLIB compresses the final processed array + * + * NOTE: This compression ignores non-number data, and assumes the non-number data is constant + * across all documents in the series of documents. + */ +class FTDCCompressor { + MONGO_DISALLOW_COPYING(FTDCCompressor); + +public: + /** + * The FTDCCompressor returns one of these values when a sample is added to indicate whether the + * caller should flush the buffer to disk or not. + */ + enum class CompressorState { + /** + * Needs to flush because the schemas has changed. Caller needs to flush. + */ + kSchemaChanged, + + /** + * Quota on the number of samples in a metric chunk has been reached. Caller needs to flush. + */ + kCompressorFull, + }; + + explicit FTDCCompressor(const FTDCConfig* config) : _config(config) {} + + /** + * Add a bson document containing metrics into the compressor. + * + * Returns flag indicating whether the caller should flush the compressor buffer to disk. + * 1. kCompressorFull if the compressor is considered full. + * 2. kSchemaChanged if there was a schema change, and buffer should be flushed. + * 3. kHasSpace if it has room for more metrics in the current buffer. + */ + StatusWith<boost::optional<std::tuple<ConstDataRange, CompressorState>>> addSample( + const BSONObj& sample); + + /** + * Returns the number of enqueued samples. + * + * The a buffer will decompress to (1 + getCountCount). The extra 1 comes + * from the reference document. + */ + std::size_t getSampleCount() const { + return _sampleCount; + } + + /** + * Has a document been added? + * + * If addSample has been called, then we have at least the reference document, but not + * necessarily any additional metric samples. When the buffer is filled to capacity, + * the reference document is reset. + */ + bool hasDataToFlush() const { + return !_referenceDoc.isEmpty() || _chunkBuffer.len() > 0; + } + + /** + * Gets buffer of compressed data contained in the FTDCCompressor. + * + * The returned buffer is valid until next call to addSample() or getCompressedSamples() with + * CompressBuffer::kGenerateNewCompressedBuffer. + */ + StatusWith<ConstDataRange> getCompressedSamples(); + + /** + * Reset the state of the compressor. + * + * Callers can use this to reset the compressor to a clean state instead of recreating it. + */ + void reset(); + + /** + * Compute the offset into an array for given (sample, metric) pair + */ + static size_t getArrayOffset(std::uint32_t sampleCount, + std::uint32_t sample, + std::uint32_t metric) { + return metric * sampleCount + sample; + } + +private: + /** + * Reset the state + */ + void _reset(const BSONObj& referenceDoc); + +private: + // Block Compressor + BlockCompressor _compressor; + + // Config + const FTDCConfig* const _config; + + // Reference schema document + BSONObj _referenceDoc; + + // Number of Metrics for the reference document + std::uint32_t _metricsCount{0}; + + // Number of samples recorded + std::uint32_t _sampleCount{0}; + + // Max samples for the current chunk + std::size_t _maxSamples{0}; + + // Array of deltas - M x S + // _deltas[Metrics][Samples] + std::vector<std::uint64_t> _deltas; + + // Buffer for metric chunk = header + zlib compressed array + BufBuilder _chunkBuffer; + + // Buffer to hold metrics + std::vector<std::uint64_t> _metrics; + std::vector<std::uint64_t> _prevmetrics; +}; + +} // namespace mongo diff --git a/src/mongo/db/ftdc/compressor_test.cpp b/src/mongo/db/ftdc/compressor_test.cpp new file mode 100644 index 00000000000..7b423679855 --- /dev/null +++ b/src/mongo/db/ftdc/compressor_test.cpp @@ -0,0 +1,429 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <limits> +#include <random> + +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/ftdc/compressor.h" +#include "mongo/db/ftdc/config.h" +#include "mongo/db/ftdc/decompressor.h" +#include "mongo/db/ftdc/ftdc_test.h" +#include "mongo/db/jsobj.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +#define ASSERT_HAS_SPACE(st) \ + ASSERT_TRUE(st.isOK()); \ + ASSERT_FALSE(st.getValue().is_initialized()); + +#define ASSERT_SCHEMA_CHANGED(st) \ + ASSERT_TRUE(st.isOK()); \ + ASSERT_TRUE(std::get<1>(st.getValue().get()) == \ + FTDCCompressor::CompressorState::kSchemaChanged); \ + ASSERT_TRUE(st.getValue().is_initialized()); + +#define ASSERT_FULL(st) \ + ASSERT_TRUE(st.isOK()); \ + ASSERT_TRUE(std::get<1>(st.getValue().get()) == \ + FTDCCompressor::CompressorState::kCompressorFull); \ + ASSERT_TRUE(st.getValue().is_initialized()); + +// Sanity check +TEST(FTDCCompressor, TestBasic) { + FTDCConfig config; + FTDCCompressor c(&config); + + auto st = c.addSample(BSON("name" + << "joe" + << "key1" << 33 << "key2" << 42)); + ASSERT_HAS_SPACE(st); + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45)); + ASSERT_HAS_SPACE(st); + + + StatusWith<ConstDataRange> swBuf = c.getCompressedSamples(); + + ASSERT_TRUE(swBuf.isOK()); + ASSERT_TRUE(swBuf.getValue().length() > 0); + ASSERT_TRUE(swBuf.getValue().data() != nullptr); +} + +// Test strings only +TEST(FTDCCompressor, TestStrings) { + FTDCConfig config; + FTDCCompressor c(&config); + + auto st = c.addSample(BSON("name" + << "joe" + << "key1" + << "value1" + << "key2" + << "value2")); + ASSERT_HAS_SPACE(st); + st = c.addSample(BSON("name" + << "joe" + << "key1" + << "value3" + << "key2" + << "value6")); + ASSERT_HAS_SPACE(st); + + StatusWith<ConstDataRange> swBuf = c.getCompressedSamples(); + + ASSERT_TRUE(swBuf.isOK()); + ASSERT_TRUE(swBuf.getValue().length() > 0); + ASSERT_TRUE(swBuf.getValue().data() != nullptr); +} + +/** + * Test class that records a series of samples and ensures that compress + decompress round trips + * them correctly. + */ +class TestTie { +public: + TestTie() : _compressor(&_config) {} + + ~TestTie() { + validate(boost::none_t()); + } + + StatusWith<boost::optional<std::tuple<ConstDataRange, FTDCCompressor::CompressorState>>> + addSample(const BSONObj& sample) { + auto st = _compressor.addSample(sample); + + if (!st.getValue().is_initialized()) { + _docs.emplace_back(sample); + } else if (std::get<1>(st.getValue().get()) == + FTDCCompressor::CompressorState::kSchemaChanged) { + validate(std::get<0>(st.getValue().get())); + _docs.clear(); + _docs.emplace_back(sample); + } else if (std::get<1>(st.getValue().get()) == + FTDCCompressor::CompressorState::kCompressorFull) { + _docs.emplace_back(sample); + validate(std::get<0>(st.getValue().get())); + _docs.clear(); + } else { + MONGO_UNREACHABLE; + } + + return st; + } + + void validate(boost::optional<ConstDataRange> cdr) { + std::vector<BSONObj> list; + if (cdr.is_initialized()) { + auto sw = _decompressor.uncompress(cdr.get()); + ASSERT_TRUE(sw.isOK()); + list = sw.getValue(); + } else { + auto swBuf = _compressor.getCompressedSamples(); + ASSERT_TRUE(swBuf.isOK()); + auto sw = _decompressor.uncompress(swBuf.getValue()); + ASSERT_TRUE(sw.isOK()); + + list = sw.getValue(); + } + + ValidateDocumentList(list, _docs); + } + +private: + std::vector<BSONObj> _docs; + FTDCConfig _config; + FTDCCompressor _compressor; + FTDCDecompressor _decompressor; +}; + +// Test various schema changes +TEST(FTDCCompressor, TestSchemaChanges) { + TestTie c; + + auto st = c.addSample(BSON("name" + << "joe" + << "key1" << 33 << "key2" << 42)); + ASSERT_HAS_SPACE(st); + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45)); + ASSERT_HAS_SPACE(st); + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45)); + ASSERT_HAS_SPACE(st); + + // Add Field + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45 << "key3" << 47)); + ASSERT_SCHEMA_CHANGED(st); + + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45 << "key3" << 47)); + ASSERT_HAS_SPACE(st); + + // Rename field + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key5" << 45 << "key3" << 47)); + ASSERT_SCHEMA_CHANGED(st); + + // Change type + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key5" + << "45" + << "key3" << 47)); + ASSERT_SCHEMA_CHANGED(st); + + // Add Field + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45 << "key3" << 47 << "key7" << 34 << "key9" + << 45 << "key13" << 47)); + ASSERT_SCHEMA_CHANGED(st); + + // Remove Field + st = c.addSample(BSON("name" + << "joe" + << "key7" << 34 << "key9" << 45 << "key13" << 47)); + ASSERT_SCHEMA_CHANGED(st); + + st = c.addSample(BSON("name" + << "joe" + << "key7" << 34 << "key9" << 45 << "key13" << 47)); + ASSERT_HAS_SPACE(st); + + // Start new batch + st = c.addSample(BSON("name" + << "joe" + << "key7" << 5)); + ASSERT_SCHEMA_CHANGED(st); + + // Change field to object + st = c.addSample(BSON("name" + << "joe" + << "key7" << BSON( // nested object + "a" << 1))); + ASSERT_SCHEMA_CHANGED(st); + + // Change field from object to number + st = c.addSample(BSON("name" + << "joe" + << "key7" << 7)); + ASSERT_SCHEMA_CHANGED(st); + + // Change field from number to array + st = c.addSample(BSON("name" + << "joe" + << "key7" << BSON_ARRAY(13 << 17))); + ASSERT_SCHEMA_CHANGED(st); + + // Change field from array to number + st = c.addSample(BSON("name" + << "joe" + << "key7" << 19)); + ASSERT_SCHEMA_CHANGED(st); + + + // New Schema + st = c.addSample(BSON("_id" << 1)); + ASSERT_SCHEMA_CHANGED(st); + + // Change field to oid + st = c.addSample(BSON(GENOID)); + ASSERT_SCHEMA_CHANGED(st); + + // Change field from oid to object + st = c.addSample(BSON("_id" << BSON("sub1" << 1))); + ASSERT_SCHEMA_CHANGED(st); + + // Change field from object to oid + st = c.addSample(BSON(GENOID)); + ASSERT_SCHEMA_CHANGED(st); +} + +// Ensure changing between the various number formats is considered compatible +TEST(FTDCCompressor, TestNumbersCompat) { + TestTie c; + + auto st = c.addSample(BSON("name" + << "joe" + << "key1" << 33 << "key2" << 42LL)); + ASSERT_HAS_SPACE(st); + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34LL << "key2" << 45.0f)); + ASSERT_HAS_SPACE(st); + st = c.addSample(BSON("name" + << "joe" + << "key1" << static_cast<char>(32) << "key2" << 45.0F)); + ASSERT_HAS_SPACE(st); +} + +// Test timestamp +TEST(FTDCCompressor, TestTimeStamp) { + TestTie c; + + BSONObjBuilder builder1; + BSONObj o = builder1.append("ts", Timestamp(0x55667788LL, 0x11223344LL)).obj(); + auto st = c.addSample(o); + ASSERT_HAS_SPACE(st); +} + +// Test all types +TEST(FTDCCompressor, Types) { + TestTie c; + + auto st = c.addSample(BSON("name" + << "joe" + << "key1" << 33 << "key2" << 42LL)); + ASSERT_HAS_SPACE(st); + + const char bytes[] = {0x1, 0x2, 0x3}; + BSONObj o = BSON("created" << DATENOW // date_t + << "null" << BSONNULL // { a : null } + << "undefined" << BSONUndefined // { a : undefined } + << "obj" << BSON( // nested object + "a" + << "abc" + << "b" << 123LL) << "foo" + << BSON_ARRAY("bar" + << "baz" + << "qux") // array of strings + << "foo2" << BSON_ARRAY(5 << 6 << 7) // array of ints + << "bindata" << BSONBinData(&bytes[0], 3, bdtCustom) // bindata + << "oid" << OID("010203040506070809101112") // oid + << "bool" << true // bool + << "regex" << BSONRegEx("mongodb") // regex + << "ref" << BSONDBRef("c", OID("010203040506070809101112")) // ref + << "code" << BSONCode("func f() { return 1; }") // code + << "codewscope" << BSONCodeWScope("func f() { return 1; }", + BSON("c" << true)) // codew + << "minkey" << MINKEY // minkey + << "maxkey" << MAXKEY // maxkey + ); + + st = c.addSample(o); + ASSERT_SCHEMA_CHANGED(st); + + st = c.addSample(o); + ASSERT_HAS_SPACE(st); + + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34LL << "key2" << 45.0f)); + ASSERT_SCHEMA_CHANGED(st); + st = c.addSample(BSON("name" + << "joe" + << "key1" << static_cast<char>(32) << "key2" << 45.0F)); + ASSERT_HAS_SPACE(st); +} + +// Test a full buffer +TEST(FTDCCompressor, TestFull) { + // Test a large numbers of zeros, and incremental numbers in a full buffer + for (int j = 0; j < 2; j++) { + TestTie c; + + auto st = c.addSample(BSON("name" + << "joe" + << "key1" << 33 << "key2" << 42)); + ASSERT_HAS_SPACE(st); + + for (size_t i = 0; i <= FTDCConfig::kMaxSamplesPerArchiveMetricChunkDefault - 2; i++) { + st = c.addSample(BSON("name" + << "joe" + << "key1" << static_cast<long long int>(i * j) << "key2" << 45)); + ASSERT_HAS_SPACE(st); + } + + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45)); + ASSERT_FULL(st); + + // Add Value + st = c.addSample(BSON("name" + << "joe" + << "key1" << 34 << "key2" << 45)); + ASSERT_HAS_SPACE(st); + } +} + +template <typename T> +BSONObj generateSample(std::random_device& rd, T generator, size_t count) { + BSONObjBuilder builder; + + for (size_t i = 0; i < count; ++i) { + builder.append("key", generator(rd)); + } + + return builder.obj(); +} + +// Test many metrics +TEST(ZFTDCCompressor, TestManyMetrics) { + std::random_device rd; + std::mt19937 gen(rd()); + + std::uniform_int_distribution<long long> genValues(1, std::numeric_limits<long long>::max()); + const size_t metrics = 1000; + + // Test a large numbers of zeros, and incremental numbers in a full buffer + for (int j = 0; j < 2; j++) { + TestTie c; + + auto st = c.addSample(generateSample(rd, genValues, metrics)); + ASSERT_HAS_SPACE(st); + + for (size_t i = 0; i <= FTDCConfig::kMaxSamplesPerArchiveMetricChunkDefault - 2; i++) { + st = c.addSample(generateSample(rd, genValues, metrics)); + ASSERT_HAS_SPACE(st); + } + + st = c.addSample(generateSample(rd, genValues, metrics)); + ASSERT_FULL(st); + + // Add Value + st = c.addSample(generateSample(rd, genValues, metrics)); + ASSERT_HAS_SPACE(st); + } +} + +} // namespace mongo diff --git a/src/mongo/db/ftdc/config.h b/src/mongo/db/ftdc/config.h new file mode 100644 index 00000000000..ef7ee8b9143 --- /dev/null +++ b/src/mongo/db/ftdc/config.h @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <cstdint> + +#include "mongo/util/time_support.h" + +namespace mongo { + +/** + * Configuration settings for full-time diagnostic data capture (FTDC). + * + * These settings are configurable by the user at startup & runtime via setParameter. + */ +struct FTDCConfig { + FTDCConfig() + : enabled(kEnabledDefault), + maxDirectorySizeBytes(kMaxDirectorySizeBytesDefault), + maxFileSizeBytes(kMaxFileSizeBytesDefault), + period(kPeriodMillisDefault), + maxSamplesPerArchiveMetricChunk(kMaxSamplesPerArchiveMetricChunkDefault), + maxSamplesPerInterimMetricChunk(kMaxSamplesPerInterimMetricChunkDefault) {} + + /** + * True if FTDC is collecting data. False otherwise + */ + bool enabled; + + /** + * Max Size of all FTDC files. If the total file size is > maxDirectorySizeBytes by summing up + * all files in the FTDC directory, the extra files are removed. + */ + std::uint64_t maxDirectorySizeBytes; + + /** + * Max size of a file in bytes. + */ + std::uint64_t maxFileSizeBytes; + + /** + * Period at which to run FTDC. + * + * FTDC is always run at the beginning at the period, and skips periods if the collector runs + * for more then a single period. + */ + Milliseconds period; + + /** + * Maximum number of samples to collect in an archive metric chunk for long term storage. + */ + std::uint32_t maxSamplesPerArchiveMetricChunk; + + /** + * Maximum number of samples to collect in an interim metric chunk in case the process + * terminates. + */ + std::uint32_t maxSamplesPerInterimMetricChunk; + + static const bool kEnabledDefault = true; + + static const std::uint64_t kPeriodMillisDefault; + static const std::uint64_t kMaxDirectorySizeBytesDefault = 100 * 1024 * 1024; + static const std::uint64_t kMaxFileSizeBytesDefault = 10 * 1024 * 1024; + + static const std::uint64_t kMaxFileUniqifier = 65000; + + static const std::uint32_t kMaxSamplesPerArchiveMetricChunkDefault = 300; + static const std::uint32_t kMaxSamplesPerInterimMetricChunkDefault = 10; +}; + +} // namespace mongo diff --git a/src/mongo/db/ftdc/constants.h b/src/mongo/db/ftdc/constants.h new file mode 100644 index 00000000000..a989fd35b64 --- /dev/null +++ b/src/mongo/db/ftdc/constants.h @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +namespace mongo { + +extern const char kFTDCInterimFile[]; +extern const char kFTDCArchiveFile[]; + +extern const char kFTDCIdField[]; +extern const char kFTDCTypeField[]; + +extern const char kFTDCDataField[]; +extern const char kFTDCDocField[]; + +extern const char kFTDCDocsField[]; + +extern const char kFTDCCollectStartField[]; +extern const char kFTDCCollectEndField[]; + +} // namespace mongo diff --git a/src/mongo/db/ftdc/decompressor.cpp b/src/mongo/db/ftdc/decompressor.cpp new file mode 100644 index 00000000000..91a3d48f65c --- /dev/null +++ b/src/mongo/db/ftdc/decompressor.cpp @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/ftdc/decompressor.h" + +#include "mongo/base/data_range_cursor.h" +#include "mongo/base/data_type_validated.h" +#include "mongo/db/ftdc/compressor.h" +#include "mongo/db/ftdc/util.h" +#include "mongo/db/ftdc/varint.h" +#include "mongo/db/jsobj.h" +#include "mongo/util/assert_util.h" + +namespace mongo { + +StatusWith<std::vector<BSONObj>> FTDCDecompressor::uncompress(ConstDataRange buf) { + ConstDataRangeCursor cdc(buf); + + // The document is not part of any checksum so we must validate it is correct + auto swRef = cdc.readAndAdvance<Validated<BSONObj>>(); + if (!swRef.isOK()) { + return {swRef.getStatus()}; + } + + BSONObj ref = swRef.getValue(); + + // Read count of metrics + auto swMetricsCount = cdc.readAndAdvance<LittleEndian<std::uint32_t>>(); + if (!swMetricsCount.isOK()) { + return {swMetricsCount.getStatus()}; + } + + std::uint32_t metricsCount = swMetricsCount.getValue(); + + // Read count of samples + auto swSampleCount = cdc.readAndAdvance<LittleEndian<std::uint32_t>>(); + if (!swSampleCount.isOK()) { + return {swSampleCount.getStatus()}; + } + + std::uint32_t sampleCount = swSampleCount.getValue(); + + // Limit size of the buffer we need for metrics and samples + if (metricsCount * sampleCount > 1000000) { + return Status(ErrorCodes::InvalidLength, + "Metrics Count and Sample Count have exceeded the allowable range."); + } + + std::vector<std::uint64_t> metrics; + + metrics.reserve(metricsCount); + + // We pass the reference document as both the reference document and current document as we only + // want the array of metrics. + (void)FTDCBSONUtil::extractMetricsFromDocument(ref, ref, &metrics); + + if (metrics.size() != metricsCount) { + return {ErrorCodes::BadValue, + "The metrics in the reference document and metrics count do not match"}; + } + + std::vector<BSONObj> docs; + + // Allocate space for the reference document + samples + docs.reserve(1 + sampleCount); + + docs.emplace_back(ref); + + // We must always return the reference document + if (sampleCount == 0) { + return {docs}; + } + + // Decompress the zlib compressed buffer + size_t expectedDestLength = metricsCount * sampleCount * FTDCVarInt::kMaxSizeBytes64; + + auto statusUncompress = _compressor.uncompress(cdc, expectedDestLength); + + if (!statusUncompress.isOK()) { + return {statusUncompress.getStatus()}; + } + + // Read the samples + std::vector<std::uint64_t> deltas(metricsCount * sampleCount); + + // decompress the deltas + std::uint64_t zeroesCount = 0; + + auto cdrc = ConstDataRangeCursor(statusUncompress.getValue()); + + for (std::uint32_t i = 0; i < metricsCount; i++) { + for (std::uint32_t j = 0; j < sampleCount; j++) { + if (zeroesCount) { + deltas[FTDCCompressor::getArrayOffset(sampleCount, j, i)] = 0; + zeroesCount--; + continue; + } + + auto swDelta = cdrc.readAndAdvance<FTDCVarInt>(); + + if (!swDelta.isOK()) { + return swDelta.getStatus(); + } + + if (swDelta.getValue() == 0) { + auto swZero = cdrc.readAndAdvance<FTDCVarInt>(); + + if (!swZero.isOK()) { + return swDelta.getStatus(); + } + + zeroesCount = swZero.getValue(); + } + + deltas[FTDCCompressor::getArrayOffset(sampleCount, j, i)] = swDelta.getValue(); + } + } + + // Inflate the deltas + for (std::uint32_t i = 0; i < metricsCount; i++) { + deltas[FTDCCompressor::getArrayOffset(sampleCount, 0, i)] += metrics[i]; + } + + for (std::uint32_t i = 0; i < metricsCount; i++) { + for (std::uint32_t j = 1; j < sampleCount; j++) { + deltas[FTDCCompressor::getArrayOffset(sampleCount, j, i)] += + deltas[FTDCCompressor::getArrayOffset(sampleCount, j - 1, i)]; + } + } + + for (std::uint32_t i = 0; i < sampleCount; ++i) { + for (std::uint32_t j = 0; j < metricsCount; ++j) { + metrics[j] = deltas[j * sampleCount + i]; + } + + docs.emplace_back(FTDCBSONUtil::constructDocumentFromMetrics(ref, metrics).getValue()); + } + + return {docs}; +} + +} // namespace mongo diff --git a/src/mongo/db/ftdc/decompressor.h b/src/mongo/db/ftdc/decompressor.h new file mode 100644 index 00000000000..42af2985783 --- /dev/null +++ b/src/mongo/db/ftdc/decompressor.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/base/data_range.h" +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/db/ftdc/block_compressor.h" +#include "mongo/db/jsobj.h" + +namespace mongo { + +/** + * Inflates a compressed chunk of metrics into a list of BSON documents + */ +class FTDCDecompressor { + MONGO_DISALLOW_COPYING(FTDCDecompressor); + +public: + FTDCDecompressor() = default; + + /** + * Inflates a compressed chunk of metrics into a vector of owned BSON documents. + * + * Will fail if the chunk is corrupt or too short. + * + * Returns N samples where N = sample count + 1. The 1 is the reference document. + */ + StatusWith<std::vector<BSONObj>> uncompress(ConstDataRange buf); + +private: + BlockCompressor _compressor; +}; + +} // namespace mongo diff --git a/src/mongo/db/ftdc/util.cpp b/src/mongo/db/ftdc/util.cpp new file mode 100644 index 00000000000..a154fb92db5 --- /dev/null +++ b/src/mongo/db/ftdc/util.cpp @@ -0,0 +1,399 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kFTDC + +#include "mongo/platform/basic.h" + +#include "mongo/db/ftdc/util.h" + +#include <boost/filesystem.hpp> + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/config.h" +#include "mongo/db/ftdc/config.h" +#include "mongo/db/ftdc/constants.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/service_context.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +const char kFTDCInterimFile[] = "metrics.interim"; +const char kFTDCArchiveFile[] = "metrics"; + +const char kFTDCIdField[] = "_id"; +const char kFTDCTypeField[] = "type"; + +const char kFTDCDataField[] = "data"; +const char kFTDCDocField[] = "doc"; + +const char kFTDCDocsField[] = "docs"; + +const char kFTDCCollectStartField[] = "start"; +const char kFTDCCollectEndField[] = "end"; + +const std::uint64_t FTDCConfig::kPeriodMillisDefault = 1000; + +const std::size_t kMaxRecursion = 10; + +namespace FTDCUtil { + +boost::filesystem::path getInterimFile(const boost::filesystem::path& file) { + if (boost::filesystem::is_directory(file)) { + return file / kFTDCInterimFile; + } + + auto p = file.parent_path(); + p /= kFTDCInterimFile; + + return p; +} + +Date_t roundTime(Date_t now, Milliseconds period) { + // Note: auto type deduction is explicitly avoided here to ensure rigid type correctness + long long clock_duration = now.toMillisSinceEpoch(); + + long long now_next_period = clock_duration + period.count(); + + long long excess_time(now_next_period % period.count()); + + long long next_time = now_next_period - excess_time; + + return Date_t::fromMillisSinceEpoch(next_time); +} + +} // namespace FTDCUtil + + +namespace FTDCBSONUtil { + +namespace { + +StatusWith<bool> extractMetricsFromDocument(const BSONObj& referenceDoc, + const BSONObj& currentDoc, + std::vector<std::uint64_t>* metrics, + size_t recursion) { + if (recursion > kMaxRecursion) { + return {ErrorCodes::BadValue, "Recursion limit reached."}; + } + + bool matches = true; + BSONObjIterator itCurrent(currentDoc); + BSONObjIterator itReference(referenceDoc); + + while (itCurrent.more()) { + // Schema mismatch if current document is longer than reference document + if (matches && !itReference.more()) { + LOG(4) << "full-time diagnostic data capture schema change: currrent document is " + "longer than " + "reference document"; + matches = false; + } + + BSONElement currentElement = itCurrent.next(); + BSONElement referenceElement = matches ? itReference.next() : BSONElement(); + + if (matches) { + // Check for matching field names + if (referenceElement.fieldNameStringData() != currentElement.fieldNameStringData()) { + LOG(4) + << "full-time diagnostic data capture schema change: field name change - from '" + << referenceElement.fieldNameStringData() << "' to '" + << currentElement.fieldNameStringData() << "'"; + matches = false; + } + + // Check that types match, allowing any numeric type to match any other numeric type. + // This looseness is necessary because some metrics use varying numeric types, + // and if that was considered a schema mismatch, it would increase the number of + // reference samples required. + if ((currentElement.type() != referenceElement.type()) && + !(referenceElement.isNumber() == true && + currentElement.isNumber() == referenceElement.isNumber())) { + LOG(4) << "full-time diagnostic data capture schema change: field type change for " + "field '" << referenceElement.fieldNameStringData() << "' from '" + << static_cast<int>(referenceElement.type()) << "' to '" + << static_cast<int>(currentElement.type()) << "'"; + matches = false; + } + } + + switch (currentElement.type()) { + // all numeric types are extracted as long (int64) + // this supports the loose schema matching mentioned above, + // but does create a range issue for doubles, and requires doubles to be integer + case NumberDouble: + case NumberInt: + case NumberLong: + case NumberDecimal: + metrics->emplace_back(currentElement.numberLong()); + break; + + case Bool: + metrics->emplace_back(currentElement.Bool()); + break; + + case Date: + metrics->emplace_back(currentElement.Date().toMillisSinceEpoch()); + break; + + case bsonTimestamp: + // very slightly more space efficient to treat these as two separate metrics + metrics->emplace_back(currentElement.timestampValue()); + metrics->emplace_back(currentElement.timestampInc()); + break; + + case Object: + case Array: { + // Maximum recursion is controlled by the documents we collect. Maximum is 5 in the + // current implementation. + auto sw = extractMetricsFromDocument(matches ? referenceElement.Obj() : BSONObj(), + currentElement.Obj(), + metrics, + recursion + 1); + if (!sw.isOK()) { + return sw; + } + matches = matches && sw.getValue(); + } break; + + default: + break; + } + } + + // schema mismatch if ref is longer than curr + if (matches && itReference.more()) { + LOG(4) << "full-time diagnostic data capture schema change: reference document is longer " + "then current"; + matches = false; + } + + return {matches}; +} + +} // namespace + +StatusWith<bool> extractMetricsFromDocument(const BSONObj& referenceDoc, + const BSONObj& currentDoc, + std::vector<std::uint64_t>* metrics) { + return extractMetricsFromDocument(referenceDoc, currentDoc, metrics, 0); +} + +namespace { +Status constructDocumentFromMetrics(const BSONObj& referenceDocument, + BSONObjBuilder& builder, + const std::vector<std::uint64_t>& metrics, + size_t* pos, + size_t recursion) { + if (recursion > kMaxRecursion) { + return {ErrorCodes::BadValue, "Recursion limit reached."}; + } + + BSONObjIterator iterator(referenceDocument); + while (iterator.more()) { + BSONElement currentElement = iterator.next(); + + switch (currentElement.type()) { + case NumberDouble: + case NumberInt: + case NumberLong: + case NumberDecimal: + if (*pos >= metrics.size()) { + return Status( + ErrorCodes::BadValue, + "There are more metrics in the reference document then expected."); + } + + builder.append(currentElement.fieldName(), + static_cast<long long int>(metrics[(*pos)++])); + break; + + case Bool: + if (*pos >= metrics.size()) { + return Status( + ErrorCodes::BadValue, + "There are more metrics in the reference document then expected."); + } + + builder.append(currentElement.fieldName(), static_cast<bool>(metrics[(*pos)++])); + break; + + case Date: + if (*pos >= metrics.size()) { + return Status( + ErrorCodes::BadValue, + "There are more metrics in the reference document then expected."); + } + + builder.append( + currentElement.fieldName(), + Date_t::fromMillisSinceEpoch(static_cast<std::uint64_t>(metrics[(*pos)++]))); + break; + + case bsonTimestamp: { + if (*pos + 1 >= metrics.size()) { + return Status( + ErrorCodes::BadValue, + "There are more metrics in the reference document then expected."); + } + + std::uint64_t seconds = metrics[(*pos)++]; + std::uint64_t increment = metrics[(*pos)++]; + builder.append(currentElement.fieldName(), Timestamp(seconds, increment)); + break; + } + + case Object: { + BSONObjBuilder sub(builder.subobjStart(currentElement.fieldName())); + auto s = constructDocumentFromMetrics( + currentElement.Obj(), sub, metrics, pos, recursion + 1); + if (!s.isOK()) { + return s; + } + break; + } + + case Array: { + BSONObjBuilder sub(builder.subarrayStart(currentElement.fieldName())); + auto s = constructDocumentFromMetrics( + currentElement.Obj(), sub, metrics, pos, recursion + 1); + if (!s.isOK()) { + return s; + } + break; + } + + default: + builder.append(currentElement); + break; + } + } + + return Status::OK(); +} + +} // namespace + +StatusWith<BSONObj> constructDocumentFromMetrics(const BSONObj& ref, + const std::vector<std::uint64_t>& metrics) { + size_t at = 0; + BSONObjBuilder b; + Status s = constructDocumentFromMetrics(ref, b, metrics, &at, 0); + if (!s.isOK()) { + return StatusWith<BSONObj>(s); + } + + return b.obj(); +} + +BSONObj createBSONMetadataDocument(const BSONObj& metadata) { + BSONObjBuilder builder; + builder.appendDate(kFTDCIdField, getGlobalServiceContext()->getClockSource()->now()); + builder.appendNumber(kFTDCTypeField, static_cast<int>(FTDCType::kMetadata)); + builder.appendObject(kFTDCDocField, metadata.objdata(), metadata.objsize()); + + return builder.obj(); +} + +BSONObj createBSONMetricChunkDocument(ConstDataRange buf) { + BSONObjBuilder builder; + + builder.appendDate(kFTDCIdField, getGlobalServiceContext()->getClockSource()->now()); + builder.appendNumber(kFTDCTypeField, static_cast<int>(FTDCType::kMetricChunk)); + builder.appendBinData(kFTDCDataField, buf.length(), BinDataType::BinDataGeneral, buf.data()); + + return builder.obj(); +} + +StatusWith<FTDCType> getBSONDocumentType(const BSONObj& obj) { + long long value; + + Status status = bsonExtractIntegerField(obj, kFTDCTypeField, &value); + if (!status.isOK()) { + return {status}; + } + + if (static_cast<FTDCType>(value) != FTDCType::kMetricChunk && + static_cast<FTDCType>(value) != FTDCType::kMetadata) { + return {ErrorCodes::BadValue, + str::stream() << "Field '" << std::string(kFTDCTypeField) + << "' is not an expected value, found '" << value << "'"}; + } + + return {static_cast<FTDCType>(value)}; +} + +StatusWith<BSONObj> getBSONDocumentFromMetadataDoc(const BSONObj& obj) { + if (kDebugBuild) { + auto swType = getBSONDocumentType(obj); + dassert(swType.isOK() && swType.getValue() == FTDCType::kMetadata); + } + + BSONElement element; + + Status status = bsonExtractTypedField(obj, kFTDCDocField, BSONType::Object, &element); + if (!status.isOK()) { + return {status}; + } + + return {element.Obj()}; +} + +StatusWith<std::vector<BSONObj>> getMetricsFromMetricDoc(const BSONObj& obj, + FTDCDecompressor* decompressor) { + if (kDebugBuild) { + auto swType = getBSONDocumentType(obj); + dassert(swType.isOK() && swType.getValue() == FTDCType::kMetricChunk); + } + + BSONElement element; + + Status status = bsonExtractTypedField(obj, kFTDCDataField, BSONType::BinData, &element); + if (!status.isOK()) { + return {status}; + } + + int length; + const char* buffer = element.binData(length); + if (length < 0) { + return {ErrorCodes::BadValue, + str::stream() << "Field " << std::string(kFTDCTypeField) << " is not a BinData."}; + } + + return decompressor->uncompress({buffer, static_cast<std::size_t>(length)}); +} + +} // namespace FTDCBSONUtil + +} // namespace mongo diff --git a/src/mongo/db/ftdc/util.h b/src/mongo/db/ftdc/util.h new file mode 100644 index 00000000000..f2b57b77a81 --- /dev/null +++ b/src/mongo/db/ftdc/util.h @@ -0,0 +1,184 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <boost/filesystem/path.hpp> +#include <vector> + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/db/ftdc/decompressor.h" +#include "mongo/db/jsobj.h" + +namespace mongo { + +/** + * Utilities for inflating and deflating BSON documents and metric arrays + */ +namespace FTDCBSONUtil { + +/** +* Type of FTDC document. +* +* NOTE: Persisted to disk via BSON Objects. +*/ +enum class FTDCType : std::int32_t { + /** + * A metadata document is composed of a header + an array of bson documents + * + * See createBSONMetadataChunkDocument + */ + kMetadata = 0, + + /** + * A metrics chunk is composed of a header + a compressed metric chunk. + * + * See createBSONMetricChunkDocument + */ + kMetricChunk = 1, +}; + + +/** + * Extract an array of numbers from a pair of documents. Verifies the pair of documents have same + * structure. + * + * Types considered numbers for the purpose of metrics: + * - double - encoded as an integer, loses fractional components via truncation + * - 32-bit integer + * - 64-integer + * - bool + * - date + * - timestamp + * Note: Timestamp is encoded as two integers, the timestamp value followed by the increment. + * + * Two documents are considered the same if satisfy the following criteria: + * + * Criteria: During a Depth First traversal of the document: + * 1. Each element has the same name regardless of its type. + * 2. The same number of elements exist in each document. + * 3. The types of each element are the same. + * Note: Double, Int, and Long are treated as equivalent for this purpose. + * + * @param referenceDoc A reference document to use the as the definition of the correct schema. + * @param doc A second document to compare against the reference document and extract metrics + * from + * @param metrics A vector of metrics that were extracted from the doc + * + * \return false if the documents differ in terms of metrics + */ +StatusWith<bool> extractMetricsFromDocument(const BSONObj& referenceDoc, + const BSONObj& doc, + std::vector<std::uint64_t>* metrics); + +/** + * Construct a document from a reference document and array of metrics. + * + * @param referenceDoc A reference document to use the as the definition of the correct schema. + * @param builder A BSON builder to construct a single document into. Each document will be a + *copy + * of the reference document with the numerical fields replaced with values from metrics array. + * @param metrics A vector of metrics for all documents + * @param pos A position into the array of metrics to start iterating from. + * + * \return Status if the decompression of the buffer was successful or failed. Decompression may + * fail if the buffer is not valid. + */ +Status constructDocumentFromMetrics(const BSONObj& referenceDoc, + BSONObjBuilder& builder, + const std::vector<std::uint64_t>& metrics, + size_t* pos); + +/** + * Construct a document from a reference document and array of metrics. See documentation above. + */ +StatusWith<BSONObj> constructDocumentFromMetrics(const BSONObj& ref, + const std::vector<std::uint64_t>& metrics); + +/** + * Create BSON metadata document for storage. The passed in document is embedded as the doc + * field in the example above. + * + * Example: + * { + * "_id" : Date_t + * "type" : 0 + * "doc" : { ... } + * } + */ +BSONObj createBSONMetadataDocument(const BSONObj& metadata); + +/** + * Create a BSON metric chunk document for storage. The passed in document is embedded as the + * data field in the example above. + * + * Example: + * { + * "_id" : Date_t + * "type" : 1 + * "data" : BinData(...) + * } + */ +BSONObj createBSONMetricChunkDocument(ConstDataRange buf); + +/** + * Get the type of a BSON document + */ +StatusWith<FTDCType> getBSONDocumentType(const BSONObj& obj); + +/** + * Extract the metadata field from a BSON document + */ +StatusWith<BSONObj> getBSONDocumentFromMetadataDoc(const BSONObj& obj); + +/** + * Get the set of metric documents from the compressed chunk of a metric document + */ +StatusWith<std::vector<BSONObj>> getMetricsFromMetricDoc(const BSONObj& obj, + FTDCDecompressor* decompressor); +} // namespace FTDCBSONUtil + + +/** + * Miscellaneous utilties for FTDC. + */ +namespace FTDCUtil { +/** + * Construct the full path to the interim file + */ +boost::filesystem::path getInterimFile(const boost::filesystem::path& file); + +/** + * Round the specified time_point to the next multiple of period after the specified time_point + */ +Date_t roundTime(Date_t now, Milliseconds period); + +} // namespace FTDCUtil + +} // namespace mongo diff --git a/src/mongo/db/ftdc/util_test.cpp b/src/mongo/db/ftdc/util_test.cpp new file mode 100644 index 00000000000..aa99006c73a --- /dev/null +++ b/src/mongo/db/ftdc/util_test.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/ftdc/util.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +void checkTime(int expected, int now_time, int period) { + ASSERT_TRUE(Date_t::fromMillisSinceEpoch(expected) == + FTDCUtil::roundTime(Date_t::fromMillisSinceEpoch(now_time), Milliseconds(period))); +} + +// Validate time rounding +TEST(FTDCUtilTest, TestRoundTime) { + checkTime(4, 3, 1); + checkTime(7, 3, 7); + checkTime(14, 8, 7); + checkTime(14, 13, 7); +} + +} // namespace mongo diff --git a/src/mongo/db/ftdc/varint.cpp b/src/mongo/db/ftdc/varint.cpp new file mode 100644 index 00000000000..d4b0a8819c4 --- /dev/null +++ b/src/mongo/db/ftdc/varint.cpp @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/db/ftdc/varint.h" + +#include <third_party/s2/util/coding/varint.h> + +#include "mongo/util/assert_util.h" + +namespace mongo { + +Status DataType::Handler<FTDCVarInt>::load( + FTDCVarInt* t, const char* ptr, size_t length, size_t* advanced, std::ptrdiff_t debug_offset) { + std::uint64_t value; + + const char* newptr = + Varint::Parse64WithLimit(ptr, ptr + length, reinterpret_cast<uint64*>(&value)); + + if (!newptr) { + return DataType::makeTrivialLoadStatus(FTDCVarInt::kMaxSizeBytes64, length, debug_offset); + } + + if (t) { + *t = value; + } + + if (advanced) { + *advanced = newptr - ptr; + } + + return Status::OK(); +} + +Status DataType::Handler<FTDCVarInt>::store( + const FTDCVarInt& t, char* ptr, size_t length, size_t* advanced, std::ptrdiff_t debug_offset) { + // nullptr means it wants to know how much space we want + if (!ptr) { + *advanced = FTDCVarInt::kMaxSizeBytes64; + return Status::OK(); + } + + if (FTDCVarInt::kMaxSizeBytes64 > length) { + return DataType::makeTrivialStoreStatus(FTDCVarInt::kMaxSizeBytes64, length, debug_offset); + } + + // Use a dassert since static_assert does not work because the expression does not have a + // constant value + dassert(Varint::kMax64 == FTDCVarInt::kMaxSizeBytes64); + + const char* newptr = Varint::Encode64(ptr, t); + + if (advanced) { + *advanced = newptr - ptr; + } + + return Status::OK(); +} + +} // namespace mongo diff --git a/src/mongo/db/ftdc/varint.h b/src/mongo/db/ftdc/varint.h new file mode 100644 index 00000000000..0dd4c73fb1b --- /dev/null +++ b/src/mongo/db/ftdc/varint.h @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <cstddef> +#include <cstdint> + +#include "mongo/base/status.h" +#include "mongo/base/data_type.h" + +namespace mongo { +/** + * Methods to compress and decompress 64-bit integers into variable integers + * + * Uses a technique described here: + * S. Buttcher, C. L. A. Clarke, and G. V. Cormack. + * Information Retrieval: Implementing and Evaluating Search Engines. MIT Press, Cambridge, MA, + * 2010 + */ +struct FTDCVarInt { + /** + * Maximum number of bytes an integer can compress to + */ + static const std::size_t kMaxSizeBytes64 = 10; + + FTDCVarInt() = default; + FTDCVarInt(std::uint64_t t) : _value(t) {} + + operator std::uint64_t() const { + return _value; + } + +private: + std::uint64_t _value{0}; +}; + +template <> +struct DataType::Handler<FTDCVarInt> { + /** + * Compress a 64-bit integer and return the new buffer position. + * + * end should be the byte after the end of the buffer. + * + * Return nullptr for bad encoded data. + */ + static Status load(FTDCVarInt* t, + const char* ptr, + size_t length, + size_t* advanced, + std::ptrdiff_t debug_offset); + + /** + * Compress a 64-bit integer and return the new buffer position + */ + static Status store(const FTDCVarInt& t, + char* ptr, + size_t length, + size_t* advanced, + std::ptrdiff_t debug_offset); + + static FTDCVarInt defaultConstruct() { + return 0; + } +}; + +} // namespace mongo diff --git a/src/mongo/db/ftdc/varint_test.cpp b/src/mongo/db/ftdc/varint_test.cpp new file mode 100644 index 00000000000..cb5f303981c --- /dev/null +++ b/src/mongo/db/ftdc/varint_test.cpp @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/data_builder.h" +#include "mongo/base/data_type_validated.h" +#include "mongo/base/init.h" +#include "mongo/db/client.h" +#include "mongo/db/ftdc/collector.h" +#include "mongo/db/ftdc/config.h" +#include "mongo/db/ftdc/controller.h" +#include "mongo/db/ftdc/varint.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { + +// Test integer packing and unpacking +void TestInt(std::uint64_t i) { + char buf[11]; + + DataView dvWrite(&buf[0]); + + dvWrite.write(i); + + ConstDataView cdvRead(&buf[0]); + + std::uint64_t d = cdvRead.read<std::uint64_t>(); + + ASSERT_EQUALS(i, d); +} + +// Test various integer combinations compress and uncompress correctly +TEST(FTDCVarIntTest, TestIntCompression) { + // Check numbers with leading 1 + for (int i = 0; i < 63; i++) { + TestInt(i); + TestInt(i - 1); + } + + // Check numbers composed of repeating hex numbers + for (int i = 0; i < 15; i++) { + int v = 0; + for (int j = 0; j < 15; j++) { + v = v << 4 | i; + TestInt(v); + } + } +} + +// Test data builder can write a lot of zeros +TEST(FTDCVarIntTest, TestDataBuilder) { + DataBuilder db(1); + + // DataBuilder grows by 2x, and we reserve 10 bytes + // lcm(2**x, 10) == 16 + for (int i = 0; i < 16; i++) { + auto s1 = db.writeAndAdvance(FTDCVarInt(0)); + ASSERT_OK(s1); + }; +} + +} // namespace mongo |