summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt4
-rw-r--r--README.md2
-rw-r--r--benchmarks/db_bench.cc109
-rw-r--r--include/leveldb/options.h3
-rw-r--r--port/port_config.h.in7
-rw-r--r--port/port_example.h21
-rw-r--r--port/port_stdcxx.h67
-rw-r--r--table/format.cc23
-rw-r--r--table/table_builder.cc14
-rw-r--r--table/table_test.cc27
10 files changed, 224 insertions, 53 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b829c94..fda9e01 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -40,6 +40,7 @@ check_include_file("unistd.h" HAVE_UNISTD_H)
include(CheckLibraryExists)
check_library_exists(crc32c crc32c_value "" HAVE_CRC32C)
check_library_exists(snappy snappy_compress "" HAVE_SNAPPY)
+check_library_exists(zstd zstd_compress "" HAVE_ZSTD)
check_library_exists(tcmalloc malloc "" HAVE_TCMALLOC)
include(CheckCXXSymbolExists)
@@ -273,6 +274,9 @@ endif(HAVE_CRC32C)
if(HAVE_SNAPPY)
target_link_libraries(leveldb snappy)
endif(HAVE_SNAPPY)
+if(HAVE_ZSTD)
+ target_link_libraries(leveldb zstd)
+endif(HAVE_ZSTD)
if(HAVE_TCMALLOC)
target_link_libraries(leveldb tcmalloc)
endif(HAVE_TCMALLOC)
diff --git a/README.md b/README.md
index 3088c55..a5e5416 100644
--- a/README.md
+++ b/README.md
@@ -18,7 +18,7 @@ Authors: Sanjay Ghemawat (sanjay@google.com) and Jeff Dean (jeff@google.com)
* Multiple changes can be made in one atomic batch.
* Users can create a transient snapshot to get a consistent view of data.
* Forward and backward iteration is supported over the data.
- * Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/).
+ * Data is automatically compressed using the [Snappy compression library](https://google.github.io/snappy/), but [Zstd compression](https://facebook.github.io/zstd/) is also supported.
* External activity (file system operations etc.) is relayed through a virtual interface so users can customize the operating system interactions.
# Documentation
diff --git a/benchmarks/db_bench.cc b/benchmarks/db_bench.cc
index 43ff43f..db866f5 100644
--- a/benchmarks/db_bench.cc
+++ b/benchmarks/db_bench.cc
@@ -60,7 +60,9 @@ static const char* FLAGS_benchmarks =
"fill100K,"
"crc32c,"
"snappycomp,"
- "snappyuncomp,";
+ "snappyuncomp,"
+ "zstdcomp,"
+ "zstduncomp,";
// Number of key/values to place in database
static int FLAGS_num = 1000000;
@@ -367,6 +369,57 @@ struct ThreadState {
ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {}
};
+void Compress(
+ ThreadState* thread, std::string name,
+ std::function<bool(const char*, size_t, std::string*)> compress_func) {
+ RandomGenerator gen;
+ Slice input = gen.Generate(Options().block_size);
+ int64_t bytes = 0;
+ int64_t produced = 0;
+ bool ok = true;
+ std::string compressed;
+ while (ok && bytes < 1024 * 1048576) { // Compress 1G
+ ok = compress_func(input.data(), input.size(), &compressed);
+ produced += compressed.size();
+ bytes += input.size();
+ thread->stats.FinishedSingleOp();
+ }
+
+ if (!ok) {
+ thread->stats.AddMessage("(" + name + " failure)");
+ } else {
+ char buf[100];
+ std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
+ (produced * 100.0) / bytes);
+ thread->stats.AddMessage(buf);
+ thread->stats.AddBytes(bytes);
+ }
+}
+
+void Uncompress(
+ ThreadState* thread, std::string name,
+ std::function<bool(const char*, size_t, std::string*)> compress_func,
+ std::function<bool(const char*, size_t, char*)> uncompress_func) {
+ RandomGenerator gen;
+ Slice input = gen.Generate(Options().block_size);
+ std::string compressed;
+ bool ok = compress_func(input.data(), input.size(), &compressed);
+ int64_t bytes = 0;
+ char* uncompressed = new char[input.size()];
+ while (ok && bytes < 1024 * 1048576) { // Compress 1G
+ ok = uncompress_func(compressed.data(), compressed.size(), uncompressed);
+ bytes += input.size();
+ thread->stats.FinishedSingleOp();
+ }
+ delete[] uncompressed;
+
+ if (!ok) {
+ thread->stats.AddMessage("(" + name + " failure)");
+ } else {
+ thread->stats.AddBytes(bytes);
+ }
+}
+
} // namespace
class Benchmark {
@@ -579,6 +632,10 @@ class Benchmark {
method = &Benchmark::SnappyCompress;
} else if (name == Slice("snappyuncomp")) {
method = &Benchmark::SnappyUncompress;
+ } else if (name == Slice("zstdcomp")) {
+ method = &Benchmark::ZstdCompress;
+ } else if (name == Slice("zstduncomp")) {
+ method = &Benchmark::ZstdUncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
@@ -713,50 +770,20 @@ class Benchmark {
}
void SnappyCompress(ThreadState* thread) {
- RandomGenerator gen;
- Slice input = gen.Generate(Options().block_size);
- int64_t bytes = 0;
- int64_t produced = 0;
- bool ok = true;
- std::string compressed;
- while (ok && bytes < 1024 * 1048576) { // Compress 1G
- ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
- produced += compressed.size();
- bytes += input.size();
- thread->stats.FinishedSingleOp();
- }
-
- if (!ok) {
- thread->stats.AddMessage("(snappy failure)");
- } else {
- char buf[100];
- std::snprintf(buf, sizeof(buf), "(output: %.1f%%)",
- (produced * 100.0) / bytes);
- thread->stats.AddMessage(buf);
- thread->stats.AddBytes(bytes);
- }
+ Compress(thread, "snappy", &port::Snappy_Compress);
}
void SnappyUncompress(ThreadState* thread) {
- RandomGenerator gen;
- Slice input = gen.Generate(Options().block_size);
- std::string compressed;
- bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed);
- int64_t bytes = 0;
- char* uncompressed = new char[input.size()];
- while (ok && bytes < 1024 * 1048576) { // Compress 1G
- ok = port::Snappy_Uncompress(compressed.data(), compressed.size(),
- uncompressed);
- bytes += input.size();
- thread->stats.FinishedSingleOp();
- }
- delete[] uncompressed;
+ Uncompress(thread, "snappy", &port::Snappy_Compress,
+ &port::Snappy_Uncompress);
+ }
- if (!ok) {
- thread->stats.AddMessage("(snappy failure)");
- } else {
- thread->stats.AddBytes(bytes);
- }
+ void ZstdCompress(ThreadState* thread) {
+ Compress(thread, "zstd", &port::Zstd_Compress);
+ }
+
+ void ZstdUncompress(ThreadState* thread) {
+ Uncompress(thread, "zstd", &port::Zstd_Compress, &port::Zstd_Uncompress);
}
void Open() {
diff --git a/include/leveldb/options.h b/include/leveldb/options.h
index 97c6b0b..79bcdbb 100644
--- a/include/leveldb/options.h
+++ b/include/leveldb/options.h
@@ -26,7 +26,8 @@ enum CompressionType {
// NOTE: do not change the values of existing entries, as these are
// part of the persistent format on disk.
kNoCompression = 0x0,
- kSnappyCompression = 0x1
+ kSnappyCompression = 0x1,
+ kZstdCompression = 0x2,
};
// Options to control the behavior of a database (passed to DB::Open)
diff --git a/port/port_config.h.in b/port/port_config.h.in
index 272671d..34bf66a 100644
--- a/port/port_config.h.in
+++ b/port/port_config.h.in
@@ -30,4 +30,9 @@
#cmakedefine01 HAVE_SNAPPY
#endif // !defined(HAVE_SNAPPY)
-#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_ \ No newline at end of file
+// Define to 1 if you have Zstd.
+#if !defined(HAVE_Zstd)
+#cmakedefine01 HAVE_ZSTD
+#endif // !defined(HAVE_ZSTD)
+
+#endif // STORAGE_LEVELDB_PORT_PORT_CONFIG_H_
diff --git a/port/port_example.h b/port/port_example.h
index 704aa24..b1a1c32 100644
--- a/port/port_example.h
+++ b/port/port_example.h
@@ -72,7 +72,7 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
size_t* result);
// Attempt to snappy uncompress input[0,input_length-1] into *output.
-// Returns true if successful, false if the input is invalid lightweight
+// Returns true if successful, false if the input is invalid snappy
// compressed data.
//
// REQUIRES: at least the first "n" bytes of output[] must be writable
@@ -81,6 +81,25 @@ bool Snappy_GetUncompressedLength(const char* input, size_t length,
bool Snappy_Uncompress(const char* input_data, size_t input_length,
char* output);
+// Store the zstd compression of "input[0,input_length-1]" in *output.
+// Returns false if zstd is not supported by this port.
+bool Zstd_Compress(const char* input, size_t input_length, std::string* output);
+
+// If input[0,input_length-1] looks like a valid zstd compressed
+// buffer, store the size of the uncompressed data in *result and
+// return true. Else return false.
+bool Zstd_GetUncompressedLength(const char* input, size_t length,
+ size_t* result);
+
+// Attempt to zstd uncompress input[0,input_length-1] into *output.
+// Returns true if successful, false if the input is invalid zstd
+// compressed data.
+//
+// REQUIRES: at least the first "n" bytes of output[] must be writable
+// where "n" is the result of a successful call to
+// Zstd_GetUncompressedLength.
+bool Zstd_Uncompress(const char* input_data, size_t input_length, char* output);
+
// ------------------ Miscellaneous -------------------
// If heap profiling is not supported, returns false.
diff --git a/port/port_stdcxx.h b/port/port_stdcxx.h
index 2bda48d..ca961e6 100644
--- a/port/port_stdcxx.h
+++ b/port/port_stdcxx.h
@@ -28,6 +28,9 @@
#if HAVE_SNAPPY
#include <snappy.h>
#endif // HAVE_SNAPPY
+#if HAVE_ZSTD
+#include <zstd.h>
+#endif // HAVE_ZSTD
#include <cassert>
#include <condition_variable> // NOLINT
@@ -126,6 +129,70 @@ inline bool Snappy_Uncompress(const char* input, size_t length, char* output) {
#endif // HAVE_SNAPPY
}
+inline bool Zstd_Compress(const char* input, size_t length,
+ std::string* output) {
+#if HAVE_ZSTD
+ // Get the MaxCompressedLength.
+ size_t outlen = ZSTD_compressBound(length);
+ if (ZSTD_isError(outlen)) {
+ return false;
+ }
+ output->resize(outlen);
+ ZSTD_CCtx* ctx = ZSTD_createCCtx();
+ outlen = ZSTD_compress2(ctx, &(*output)[0], output->size(), input, length);
+ ZSTD_freeCCtx(ctx);
+ if (ZSTD_isError(outlen)) {
+ return false;
+ }
+ output->resize(outlen);
+ return true;
+#else
+ // Silence compiler warnings about unused arguments.
+ (void)input;
+ (void)length;
+ (void)output;
+ return false;
+#endif // HAVE_ZSTD
+}
+
+inline bool Zstd_GetUncompressedLength(const char* input, size_t length,
+ size_t* result) {
+#if HAVE_ZSTD
+ size_t size = ZSTD_getFrameContentSize(input, length);
+ if (size == 0) return false;
+ *result = size;
+ return true;
+#else
+ // Silence compiler warnings about unused arguments.
+ (void)input;
+ (void)length;
+ (void)result;
+ return false;
+#endif // HAVE_ZSTD
+}
+
+inline bool Zstd_Uncompress(const char* input, size_t length, char* output) {
+#if HAVE_ZSTD
+ size_t outlen;
+ if (!Zstd_GetUncompressedLength(input, length, &outlen)) {
+ return false;
+ }
+ ZSTD_DCtx* ctx = ZSTD_createDCtx();
+ outlen = ZSTD_decompressDCtx(ctx, output, outlen, input, length);
+ ZSTD_freeDCtx(ctx);
+ if (ZSTD_isError(outlen)) {
+ return false;
+ }
+ return true;
+#else
+ // Silence compiler warnings about unused arguments.
+ (void)input;
+ (void)length;
+ (void)output;
+ return false;
+#endif // HAVE_ZSTD
+}
+
inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) {
// Silence compiler warnings about unused arguments.
(void)func;
diff --git a/table/format.cc b/table/format.cc
index e183977..7647372 100644
--- a/table/format.cc
+++ b/table/format.cc
@@ -5,6 +5,7 @@
#include "table/format.h"
#include "leveldb/env.h"
+#include "leveldb/options.h"
#include "port/port.h"
#include "table/block.h"
#include "util/coding.h"
@@ -116,13 +117,31 @@ Status ReadBlock(RandomAccessFile* file, const ReadOptions& options,
size_t ulength = 0;
if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) {
delete[] buf;
- return Status::Corruption("corrupted compressed block contents");
+ return Status::Corruption("corrupted snappy compressed block length");
}
char* ubuf = new char[ulength];
if (!port::Snappy_Uncompress(data, n, ubuf)) {
delete[] buf;
delete[] ubuf;
- return Status::Corruption("corrupted compressed block contents");
+ return Status::Corruption("corrupted snappy compressed block contents");
+ }
+ delete[] buf;
+ result->data = Slice(ubuf, ulength);
+ result->heap_allocated = true;
+ result->cachable = true;
+ break;
+ }
+ case kZstdCompression: {
+ size_t ulength = 0;
+ if (!port::Zstd_GetUncompressedLength(data, n, &ulength)) {
+ delete[] buf;
+ return Status::Corruption("corrupted zstd compressed block length");
+ }
+ char* ubuf = new char[ulength];
+ if (!port::Zstd_Uncompress(data, n, ubuf)) {
+ delete[] buf;
+ delete[] ubuf;
+ return Status::Corruption("corrupted zstd compressed block contents");
}
delete[] buf;
result->data = Slice(ubuf, ulength);
diff --git a/table/table_builder.cc b/table/table_builder.cc
index 29a619d..ba3df9e 100644
--- a/table/table_builder.cc
+++ b/table/table_builder.cc
@@ -168,6 +168,20 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
}
break;
}
+
+ case kZstdCompression: {
+ std::string* compressed = &r->compressed_output;
+ if (port::Zstd_Compress(raw.data(), raw.size(), compressed) &&
+ compressed->size() < raw.size() - (raw.size() / 8u)) {
+ block_contents = *compressed;
+ } else {
+ // Zstd not supported, or compressed less than 12.5%, so just
+ // store uncompressed form
+ block_contents = raw;
+ type = kNoCompression;
+ }
+ break;
+ }
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
diff --git a/table/table_test.cc b/table/table_test.cc
index a405586..b3baf95 100644
--- a/table/table_test.cc
+++ b/table/table_test.cc
@@ -14,6 +14,7 @@
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
+#include "leveldb/options.h"
#include "leveldb/table_builder.h"
#include "table/block.h"
#include "table/block_builder.h"
@@ -784,15 +785,29 @@ TEST(TableTest, ApproximateOffsetOfPlain) {
ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000));
}
-static bool SnappyCompressionSupported() {
+static bool CompressionSupported(CompressionType type) {
std::string out;
Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
- return port::Snappy_Compress(in.data(), in.size(), &out);
+ if (type == kSnappyCompression) {
+ return port::Snappy_Compress(in.data(), in.size(), &out);
+ } else if (type == kZstdCompression) {
+ return port::Zstd_Compress(in.data(), in.size(), &out);
+ }
+ return false;
}
-TEST(TableTest, ApproximateOffsetOfCompressed) {
- if (!SnappyCompressionSupported())
- GTEST_SKIP() << "skipping compression tests";
+class CompressionTableTest
+ : public ::testing::TestWithParam<std::tuple<CompressionType>> {};
+
+INSTANTIATE_TEST_SUITE_P(CompressionTests, CompressionTableTest,
+ ::testing::Values(kSnappyCompression,
+ kZstdCompression));
+
+TEST_P(CompressionTableTest, ApproximateOffsetOfCompressed) {
+ CompressionType type = ::testing::get<0>(GetParam());
+ if (!CompressionSupported(type)) {
+ GTEST_SKIP() << "skipping compression test: " << type;
+ }
Random rnd(301);
TableConstructor c(BytewiseComparator());
@@ -805,7 +820,7 @@ TEST(TableTest, ApproximateOffsetOfCompressed) {
KVMap kvmap;
Options options;
options.block_size = 1024;
- options.compression = kSnappyCompression;
+ options.compression = type;
c.Finish(options, &keys, &kvmap);
// Expected upper and lower bounds of space used by compressible strings.