summaryrefslogtreecommitdiff
path: root/src/journal
diff options
context:
space:
mode:
Diffstat (limited to 'src/journal')
-rw-r--r--src/journal/compress.c239
-rw-r--r--src/journal/compress.h7
-rw-r--r--src/journal/test-compress.c92
3 files changed, 294 insertions, 44 deletions
diff --git a/src/journal/compress.c b/src/journal/compress.c
index 4e00e4fc5e..2bbfc7644a 100644
--- a/src/journal/compress.c
+++ b/src/journal/compress.c
@@ -16,6 +16,11 @@
#include <lz4frame.h>
#endif
+#if HAVE_ZSTD
+#include <zstd.h>
+#include <zstd_errors.h>
+#endif
+
#include "alloc-util.h"
#include "compress.h"
#include "fd-util.h"
@@ -33,6 +38,22 @@ DEFINE_TRIVIAL_CLEANUP_FUNC(LZ4F_compressionContext_t, LZ4F_freeCompressionConte
DEFINE_TRIVIAL_CLEANUP_FUNC(LZ4F_decompressionContext_t, LZ4F_freeDecompressionContext);
#endif
+#if HAVE_ZSTD
+DEFINE_TRIVIAL_CLEANUP_FUNC(ZSTD_CCtx *, ZSTD_freeCCtx);
+DEFINE_TRIVIAL_CLEANUP_FUNC(ZSTD_DCtx *, ZSTD_freeDCtx);
+
+static int zstd_ret_to_errno(size_t ret) {
+ switch (ZSTD_getErrorCode(ret)) {
+ case ZSTD_error_dstSize_tooSmall:
+ return -ENOBUFS;
+ case ZSTD_error_memory_allocation:
+ return -ENOMEM;
+ default:
+ return -EBADMSG;
+ }
+}
+#endif
+
#define ALIGN_8(l) ALIGN_TO(l, sizeof(size_t))
static const char* const object_compressed_table[_OBJECT_COMPRESSED_MAX] = {
@@ -668,12 +689,230 @@ int decompress_stream_lz4(int in, int out, uint64_t max_bytes) {
#endif
}
+int compress_stream_zstd(int fdf, int fdt, uint64_t max_bytes) {
+#if HAVE_ZSTD
+ _cleanup_(ZSTD_freeCCtxp) ZSTD_CCtx *cctx = NULL;
+ _cleanup_free_ void *in_buff = NULL, *out_buff = NULL;
+ size_t in_allocsize, out_allocsize;
+ size_t z;
+ uint64_t left = max_bytes, in_bytes = 0;
+ /* This can be used in the future to add uncompressed size to the header */
+ uint64_t in_totalsize = 0;
+
+ assert(fdf >= 0);
+ assert(fdt >= 0);
+
+ /* Create the context and buffers */
+ in_allocsize = ZSTD_CStreamInSize();
+ out_allocsize = ZSTD_CStreamOutSize();
+ in_buff = malloc(in_allocsize);
+ out_buff = malloc(out_allocsize);
+ cctx = ZSTD_createCCtx();
+ if (!cctx || !out_buff || !in_buff)
+ return -ENOMEM;
+
+ if (in_totalsize) {
+ z = ZSTD_CCtx_setPledgedSrcSize(cctx, in_totalsize);
+ if (z)
+ log_debug("Failed to enable ZSTD input size, ignoring: %s", ZSTD_getErrorName(z));
+ }
+ z = ZSTD_CCtx_setParameter(cctx, ZSTD_c_checksumFlag, 1);
+ if (ZSTD_isError(z))
+ log_debug("Failed to enable ZSTD checksum, ignoring: %s", ZSTD_getErrorName(z));
+
+ /* This loop read from the input file, compresses that entire chunk,
+ * and writes all output produced to the output file.
+ */
+ for (;;) {
+ bool is_last_chunk;
+ ZSTD_inBuffer input = {
+ .src = in_buff,
+ .size = 0,
+ .pos = 0
+ };
+ ssize_t red;
+
+ red = loop_read(fdf, in_buff, in_allocsize, true);
+ if (red < 0)
+ return red;
+ is_last_chunk = red == 0;
+
+ in_bytes += (size_t) red;
+ input.size = (size_t) red;
+
+ for (bool finished = false; !finished;) {
+ ZSTD_outBuffer output = {
+ .dst = out_buff,
+ .size = out_allocsize,
+ .pos = 0
+ };
+ size_t remaining;
+ ssize_t wrote;
+
+ /* Compress into the output buffer and write all of the
+ * output to the file so we can reuse the buffer next
+ * iteration.
+ */
+ remaining = ZSTD_compressStream2(
+ cctx, &output, &input,
+ is_last_chunk ? ZSTD_e_end : ZSTD_e_continue);
+
+ if (ZSTD_isError(remaining)) {
+ log_debug("ZSTD encoder failed: %s", ZSTD_getErrorName(remaining));
+ return zstd_ret_to_errno(remaining);
+ }
+
+ if (left < output.pos)
+ return -EFBIG;
+
+ wrote = loop_write(fdt, output.dst, output.pos, 1);
+ if (wrote < 0)
+ return wrote;
+
+ left -= output.pos;
+
+ /* If we're on the last chunk we're finished when zstd
+ * returns 0, which means its consumed all the input AND
+ * finished the frame. Otherwise, we're finished when
+ * we've consumed all the input.
+ */
+ finished = is_last_chunk ? (remaining == 0) : (input.pos == input.size);
+ }
+
+ /* zstd only returns 0 when the input is completely consumed */
+ assert(input.pos == input.size);
+ if (is_last_chunk)
+ break;
+ }
+
+ log_debug(
+ "ZSTD compression finished (%" PRIu64 " -> %" PRIu64 " bytes, %.1f%%)",
+ in_bytes,
+ max_bytes - left,
+ (double) (max_bytes - left) / in_bytes * 100);
+
+ return 0;
+#else
+ return -EPROTONOSUPPORT;
+#endif
+}
+
+int decompress_stream_zstd(int fdf, int fdt, uint64_t max_bytes) {
+#if HAVE_ZSTD
+ _cleanup_(ZSTD_freeDCtxp) ZSTD_DCtx *dctx = NULL;
+ _cleanup_free_ void *in_buff = NULL, *out_buff = NULL;
+ size_t in_allocsize, out_allocsize;
+ size_t last_result = 0;
+ uint64_t left = max_bytes, in_bytes = 0;
+
+ assert(fdf >= 0);
+ assert(fdt >= 0);
+
+ /* Create the context and buffers */
+ in_allocsize = ZSTD_DStreamInSize();
+ out_allocsize = ZSTD_DStreamOutSize();
+ in_buff = malloc(in_allocsize);
+ out_buff = malloc(out_allocsize);
+ dctx = ZSTD_createDCtx();
+ if (!dctx || !out_buff || !in_buff)
+ return -ENOMEM;
+
+ /* This loop assumes that the input file is one or more concatenated
+ * zstd streams. This example won't work if there is trailing non-zstd
+ * data at the end, but streaming decompression in general handles this
+ * case. ZSTD_decompressStream() returns 0 exactly when the frame is
+ * completed, and doesn't consume input after the frame.
+ */
+ for (;;) {
+ bool has_error = false;
+ ZSTD_inBuffer input = {
+ .src = in_buff,
+ .size = 0,
+ .pos = 0
+ };
+ ssize_t red;
+
+ red = loop_read(fdf, in_buff, in_allocsize, true);
+ if (red < 0)
+ return red;
+ if (red == 0)
+ break;
+
+ in_bytes += (size_t) red;
+ input.size = (size_t) red;
+ input.pos = 0;
+
+ /* Given a valid frame, zstd won't consume the last byte of the
+ * frame until it has flushed all of the decompressed data of
+ * the frame. So input.pos < input.size means frame is not done
+ * or there is still output available.
+ */
+ while (input.pos < input.size) {
+ ZSTD_outBuffer output = {
+ .dst = out_buff,
+ .size = out_allocsize,
+ .pos = 0
+ };
+ ssize_t wrote;
+ /* The return code is zero if the frame is complete, but
+ * there may be multiple frames concatenated together.
+ * Zstd will automatically reset the context when a
+ * frame is complete. Still, calling ZSTD_DCtx_reset()
+ * can be useful to reset the context to a clean state,
+ * for instance if the last decompression call returned
+ * an error.
+ */
+ last_result = ZSTD_decompressStream(dctx, &output, &input);
+ if (ZSTD_isError(last_result)) {
+ has_error = true;
+ break;
+ }
+
+ if (left < output.pos)
+ return -EFBIG;
+
+ wrote = loop_write(fdt, output.dst, output.pos, 1);
+ if (wrote < 0)
+ return wrote;
+
+ left -= output.pos;
+ }
+ if (has_error)
+ break;
+ }
+
+ if (in_bytes == 0)
+ return log_debug_errno(SYNTHETIC_ERRNO(EBADMSG), "ZSTD decoder failed: no data read");
+
+ if (last_result != 0) {
+ /* The last return value from ZSTD_decompressStream did not end
+ * on a frame, but we reached the end of the file! We assume
+ * this is an error, and the input was truncated.
+ */
+ log_debug("ZSTD decoder failed: %s", ZSTD_getErrorName(last_result));
+ return zstd_ret_to_errno(last_result);
+ }
+
+ log_debug(
+ "ZSTD decompression finished (%" PRIu64 " -> %" PRIu64 " bytes, %.1f%%)",
+ in_bytes,
+ max_bytes - left,
+ (double) (max_bytes - left) / in_bytes * 100);
+ return 0;
+#else
+ log_debug("Cannot decompress file. Compiled without ZSTD support.");
+ return -EPROTONOSUPPORT;
+#endif
+}
+
int decompress_stream(const char *filename, int fdf, int fdt, uint64_t max_bytes) {
if (endswith(filename, ".lz4"))
return decompress_stream_lz4(fdf, fdt, max_bytes);
else if (endswith(filename, ".xz"))
return decompress_stream_xz(fdf, fdt, max_bytes);
+ else if (endswith(filename, ".zst"))
+ return decompress_stream_zstd(fdf, fdt, max_bytes);
else
return -EPROTONOSUPPORT;
}
diff --git a/src/journal/compress.h b/src/journal/compress.h
index 56411484ce..74ef592f43 100644
--- a/src/journal/compress.h
+++ b/src/journal/compress.h
@@ -52,11 +52,16 @@ int decompress_startswith(int compression,
int compress_stream_xz(int fdf, int fdt, uint64_t max_bytes);
int compress_stream_lz4(int fdf, int fdt, uint64_t max_bytes);
+int compress_stream_zstd(int fdf, int fdt, uint64_t max_bytes);
int decompress_stream_xz(int fdf, int fdt, uint64_t max_size);
int decompress_stream_lz4(int fdf, int fdt, uint64_t max_size);
+int decompress_stream_zstd(int fdf, int fdt, uint64_t max_size);
-#if HAVE_LZ4
+#if HAVE_ZSTD
+# define compress_stream compress_stream_zstd
+# define COMPRESSED_EXT ".zst"
+#elif HAVE_LZ4
# define compress_stream compress_stream_lz4
# define COMPRESSED_EXT ".lz4"
#else
diff --git a/src/journal/test-compress.c b/src/journal/test-compress.c
index 0b2898e109..c9d295b3c1 100644
--- a/src/journal/test-compress.c
+++ b/src/journal/test-compress.c
@@ -44,20 +44,20 @@ typedef int (decompress_sw_t)(const void *src, uint64_t src_size,
typedef int (compress_stream_t)(int fdf, int fdt, uint64_t max_bytes);
typedef int (decompress_stream_t)(int fdf, int fdt, uint64_t max_size);
-#if HAVE_XZ || HAVE_LZ4
-static void test_compress_decompress(int compression,
- compress_blob_t compress,
- decompress_blob_t decompress,
- const char *data,
- size_t data_len,
- bool may_fail) {
+#if HAVE_XZ || HAVE_LZ4 || HAVE_ZSTD
+_unused_ static void test_compress_decompress(const char *compression,
+ compress_blob_t compress,
+ decompress_blob_t decompress,
+ const char *data,
+ size_t data_len,
+ bool may_fail) {
char compressed[512];
size_t csize, usize = 0;
_cleanup_free_ char *decompressed = NULL;
int r;
log_info("/* testing %s %s blob compression/decompression */",
- object_compressed_to_string(compression), data);
+ compression, data);
r = compress(data, data_len, compressed, sizeof(compressed), &csize);
if (r == -ENOBUFS) {
@@ -88,12 +88,12 @@ static void test_compress_decompress(int compression,
memzero(decompressed, usize);
}
-static void test_decompress_startswith(int compression,
- compress_blob_t compress,
- decompress_sw_t decompress_sw,
- const char *data,
- size_t data_len,
- bool may_fail) {
+_unused_ static void test_decompress_startswith(const char *compression,
+ compress_blob_t compress,
+ decompress_sw_t decompress_sw,
+ const char *data,
+ size_t data_len,
+ bool may_fail) {
char *compressed;
_cleanup_free_ char *compressed1 = NULL, *compressed2 = NULL, *decompressed = NULL;
@@ -101,7 +101,7 @@ static void test_decompress_startswith(int compression,
int r;
log_info("/* testing decompress_startswith with %s on %.20s text */",
- object_compressed_to_string(compression), data);
+ compression, data);
#define BUFSIZE_1 512
#define BUFSIZE_2 20000
@@ -136,9 +136,9 @@ static void test_decompress_startswith(int compression,
assert_se(r > 0);
}
-static void test_decompress_startswith_short(int compression,
- compress_blob_t compress,
- decompress_sw_t decompress_sw) {
+_unused_ static void test_decompress_startswith_short(const char *compression,
+ compress_blob_t compress,
+ decompress_sw_t decompress_sw) {
#define TEXT "HUGE=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
@@ -146,7 +146,7 @@ static void test_decompress_startswith_short(int compression,
size_t i, csize;
int r;
- log_info("/* %s with %s */", __func__, object_compressed_to_string(compression));
+ log_info("/* %s with %s */", __func__, compression);
r = compress(TEXT, sizeof TEXT, buf, sizeof buf, &csize);
assert_se(r == 0);
@@ -162,11 +162,11 @@ static void test_decompress_startswith_short(int compression,
}
}
-static void test_compress_stream(int compression,
- const char* cat,
- compress_stream_t compress,
- decompress_stream_t decompress,
- const char *srcfile) {
+_unused_ static void test_compress_stream(const char *compression,
+ const char *cat,
+ compress_stream_t compress,
+ decompress_stream_t decompress,
+ const char *srcfile) {
_cleanup_close_ int src = -1, dst = -1, dst2 = -1;
_cleanup_(unlink_tempfilep) char
@@ -182,8 +182,7 @@ static void test_compress_stream(int compression,
return;
}
- log_debug("/* testing %s compression */",
- object_compressed_to_string(compression));
+ log_debug("/* testing %s compression */", compression);
log_debug("/* create source from %s */", srcfile);
@@ -266,8 +265,8 @@ static void test_lz4_decompress_partial(void) {
#endif
int main(int argc, char *argv[]) {
-#if HAVE_XZ || HAVE_LZ4
- const char text[] =
+#if HAVE_XZ || HAVE_LZ4 || HAVE_ZSTD
+ _unused_ const char text[] =
"text\0foofoofoofoo AAAA aaaaaaaaa ghost busters barbarbar FFF"
"foofoofoofoo AAAA aaaaaaaaa ghost busters barbarbar FFF";
@@ -288,60 +287,67 @@ int main(int argc, char *argv[]) {
random_bytes(data + 7, sizeof(data) - 7);
#if HAVE_XZ
- test_compress_decompress(OBJECT_COMPRESSED_XZ, compress_blob_xz, decompress_blob_xz,
+ test_compress_decompress("XZ", compress_blob_xz, decompress_blob_xz,
text, sizeof(text), false);
- test_compress_decompress(OBJECT_COMPRESSED_XZ, compress_blob_xz, decompress_blob_xz,
+ test_compress_decompress("XZ", compress_blob_xz, decompress_blob_xz,
data, sizeof(data), true);
- test_decompress_startswith(OBJECT_COMPRESSED_XZ,
+ test_decompress_startswith("XZ",
compress_blob_xz, decompress_startswith_xz,
text, sizeof(text), false);
- test_decompress_startswith(OBJECT_COMPRESSED_XZ,
+ test_decompress_startswith("XZ",
compress_blob_xz, decompress_startswith_xz,
data, sizeof(data), true);
- test_decompress_startswith(OBJECT_COMPRESSED_XZ,
+ test_decompress_startswith("XZ",
compress_blob_xz, decompress_startswith_xz,
huge, HUGE_SIZE, true);
- test_compress_stream(OBJECT_COMPRESSED_XZ, "xzcat",
+ test_compress_stream("XZ", "xzcat",
compress_stream_xz, decompress_stream_xz, srcfile);
- test_decompress_startswith_short(OBJECT_COMPRESSED_XZ, compress_blob_xz, decompress_startswith_xz);
+ test_decompress_startswith_short("XZ", compress_blob_xz, decompress_startswith_xz);
#else
log_info("/* XZ test skipped */");
#endif
#if HAVE_LZ4
- test_compress_decompress(OBJECT_COMPRESSED_LZ4, compress_blob_lz4, decompress_blob_lz4,
+ test_compress_decompress("LZ4", compress_blob_lz4, decompress_blob_lz4,
text, sizeof(text), false);
- test_compress_decompress(OBJECT_COMPRESSED_LZ4, compress_blob_lz4, decompress_blob_lz4,
+ test_compress_decompress("LZ4", compress_blob_lz4, decompress_blob_lz4,
data, sizeof(data), true);
- test_decompress_startswith(OBJECT_COMPRESSED_LZ4,
+ test_decompress_startswith("LZ4",
compress_blob_lz4, decompress_startswith_lz4,
text, sizeof(text), false);
- test_decompress_startswith(OBJECT_COMPRESSED_LZ4,
+ test_decompress_startswith("LZ4",
compress_blob_lz4, decompress_startswith_lz4,
data, sizeof(data), true);
- test_decompress_startswith(OBJECT_COMPRESSED_LZ4,
+ test_decompress_startswith("LZ4",
compress_blob_lz4, decompress_startswith_lz4,
huge, HUGE_SIZE, true);
- test_compress_stream(OBJECT_COMPRESSED_LZ4, "lz4cat",
+ test_compress_stream("LZ4", "lz4cat",
compress_stream_lz4, decompress_stream_lz4, srcfile);
test_lz4_decompress_partial();
- test_decompress_startswith_short(OBJECT_COMPRESSED_LZ4, compress_blob_lz4, decompress_startswith_lz4);
+ test_decompress_startswith_short("LZ4", compress_blob_lz4, decompress_startswith_lz4);
#else
log_info("/* LZ4 test skipped */");
#endif
+#if HAVE_ZSTD
+ test_compress_stream("ZSTD", "zstdcat",
+ compress_stream_zstd, decompress_stream_zstd, srcfile);
+#else
+ log_info("/* ZSTD test skipped */");
+#endif
+
return 0;
#else
- log_info("/* XZ and LZ4 tests skipped */");
+ log_info("/* XZ, LZ4 and ZSTD tests skipped */");
return EXIT_TEST_SKIP;
#endif
}