diff options
author | Keith Bostic <keith@wiredtiger.com> | 2015-05-03 11:51:21 -0400 |
---|---|---|
committer | Keith Bostic <keith@wiredtiger.com> | 2015-05-03 11:51:21 -0400 |
commit | 1025b9fa76089d5d622feabbae339171f56f3702 (patch) | |
tree | f1c34827b01337027306dcf6ad0d95e2cf854c83 /ext | |
parent | 4119c22f29e3b8d9e73ca114f1ace52f48c77c58 (diff) | |
download | mongo-1025b9fa76089d5d622feabbae339171f56f3702.tar.gz |
Add LZ4 raw support based on the new LZ4_compress_destSize API, making
it the default (LZ4 block mode is now lz4-noraw, to match zlib).
This implementation mostly works, but...
It's not backward compatible because we're storing more information in
the destination buffer than before (two 4B values instead of a one 8B
value). The 2.5.3 release is the only release that had LZ4 support, so
I don't expect this to be a problem. We could be backward compatible,
but not without a fair amount of pain.
This code sizes decompression buffers incorrectly. The WiredTiger btree
code sizes the decompression buffer based on the final in-memory size
of the data, which isn't correct for data compressed with LZ4's raw
compression function which may have compressed more data than the final
in-memory size.
Diffstat (limited to 'ext')
-rw-r--r-- | ext/compressors/lz4/lz4_compress.c | 257 |
1 files changed, 173 insertions, 84 deletions
diff --git a/ext/compressors/lz4/lz4_compress.c b/ext/compressors/lz4/lz4_compress.c index 9939a4f8a04..7cdd1602128 100644 --- a/ext/compressors/lz4/lz4_compress.c +++ b/ext/compressors/lz4/lz4_compress.c @@ -48,19 +48,36 @@ typedef struct { } LZ4_COMPRESSOR; /* + * LZ4 decompression requires the exact compressed byte count returned by the + * LZ4_compress and LZ4_compress_destSize functions. WiredTiger doesn't track + * that value, store it in the destination buffer. + * + * Additionally, LZ4_compress_destSize may compress into the middle of a record, + * and after decompression we return the length to the last record successfully + * decompressed, not the number of bytes decompressed; store that value in the + * destination buffer as well. + * + * Use fixed-size, 4B values (WiredTiger never writes buffers larger than 4GB). + */ +typedef struct { + uint32_t compressed_byte_count; + uint32_t decompressed_return_count; +} LZ4_PREFIX; + +/* * lz4_error -- * Output an error message, and return a standard error code. */ static int lz4_error( - WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int zret) + WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int error) { WT_EXTENSION_API *wt_api; wt_api = ((LZ4_COMPRESSOR *)compressor)->wt_api; (void)wt_api->err_printf(wt_api, - session, "lz4 error: %s: %d", call, zret); + session, "lz4 error: %s: %d", call, error); return (WT_ERROR); } @@ -74,39 +91,33 @@ lz4_compress(WT_COMPRESSOR *compressor, WT_SESSION *session, uint8_t *dst, size_t dst_len, size_t *result_lenp, int *compression_failed) { - char *lz4buf; - size_t lz4_len; + LZ4_PREFIX prefix; + int lz4_len; - /* - * The buffer should always be large enough due to the lz4_pre_size - * call, but be paranoid and error if it isn't. - */ - if (dst_len < src_len + sizeof(size_t)) - return (lz4_error(compressor, session, - "LZ4 compress buffer too small", 0)); + (void)compressor; /* Unused parameters */ + (void)session; + (void)src_len; + (void)dst_len; - /* Store the length of the compressed block in the first 8 bytes. */ - lz4buf = (char *)dst + sizeof(size_t); - lz4_len = (size_t)LZ4_compress((const char *)src, lz4buf, (int)src_len); + /* Compress, starting after the prefix bytes. */ + lz4_len = LZ4_compress( + (const char *)src, (char *)dst + sizeof(LZ4_PREFIX), (int)src_len); /* - * Flag no-compression if the result was larger than the original - * size or compression failed. + * If compression succeeded and the compressed length is smaller than + * the original size, return success. */ - if (lz4_len == 0 || lz4_len + sizeof(size_t) >= src_len) - *compression_failed = 1; - else { - /* - * On decompression, lz4 requires the exact compressed byte - * count (the current value of lz4_len). WiredTiger does not - * preserve that value, so save lz4_len at the beginning of the - * destination buffer. - */ - *(size_t *)dst = lz4_len; - *result_lenp = lz4_len + sizeof(size_t); + if (lz4_len != 0 && (size_t)lz4_len + sizeof(LZ4_PREFIX) < src_len) { + prefix.compressed_byte_count = (uint32_t)lz4_len; + prefix.decompressed_return_count = (uint32_t)src_len; + memcpy(dst, &prefix, sizeof(LZ4_PREFIX)); + + *result_lenp = (size_t)lz4_len + sizeof(LZ4_PREFIX); *compression_failed = 0; + return (0); } + *compression_failed = 1; return (0); } @@ -120,41 +131,111 @@ lz4_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session, uint8_t *dst, size_t dst_len, size_t *result_lenp) { - WT_EXTENSION_API *wt_api; - char *compressed_data; + LZ4_PREFIX prefix; int decoded; - size_t src_data_len; - - wt_api = ((LZ4_COMPRESSOR *)compressor)->wt_api; - /* Retrieve compressed length from start of the data buffer. */ - src_data_len = *(size_t *)src; - if (src_data_len + sizeof(size_t) > src_len) { - (void)wt_api->err_printf(wt_api, - session, - "lz4_decompress: stored size exceeds buffer size"); - return (WT_ERROR); - } + (void)src_len; /* Unused parameters */ - /* Skip over the data size to the start of compressed data. */ - compressed_data = (char *)src + sizeof(size_t); + /* + * Retrieve the length of the compressed block and the decompressed + * bytes to return from the start of the source buffer. + */ + memcpy(&prefix, src, sizeof(LZ4_PREFIX)); /* - * The destination buffer length should always be sufficient because - * wiredtiger keeps track of the byte count before compression. Use - * safe decompression: we may be relying on decompression to detect - * corruption. + * Decompress, starting after the prefix bytes. Use safe decompression: + * we rely on decompression to detect corruption. */ - decoded = LZ4_decompress_safe( - compressed_data, (char *)dst, (int)src_data_len, (int)dst_len); + decoded = LZ4_decompress_safe((const char *)src + sizeof(LZ4_PREFIX), + (char *)dst, (int)prefix.compressed_byte_count, (int)dst_len); + + /* If decompression succeeded, return the uncompressed data length. */ + if (decoded >= 0) { + *result_lenp = prefix.decompressed_return_count; + return (0); + } + + return ( + lz4_error(compressor, session, "LZ4 decompress error", decoded)); +} + +/* + * lz4_find_slot -- + * Find the slot containing the target offset (binary search). + */ +static inline uint32_t +lz4_find_slot(int target_arg, uint32_t *offsets, uint32_t slots) +{ + uint32_t base, indx, limit, target; + + indx = 1; + target = (uint32_t)target_arg; /* Type conversion */ + + /* Figure out which slot we got to: binary search */ + if (target >= offsets[slots]) + indx = slots; + else if (target > offsets[1]) + for (base = 2, limit = slots - base; limit != 0; limit >>= 1) { + indx = base + (limit >> 1); + if (target < offsets[indx]) + continue; + base = indx + 1; + --limit; + } + + return (indx); +} + +/* + * lz4_compress_raw -- + * Pack records into a specified on-disk page size. + */ +static int +lz4_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session, + size_t page_max, int split_pct, size_t extra, + uint8_t *src, uint32_t *offsets, uint32_t slots, + uint8_t *dst, size_t dst_len, int final, + size_t *result_lenp, uint32_t *result_slotsp) +{ + LZ4_PREFIX prefix; + int lz4_len; + uint32_t slot; + int sourceSize, targetDestSize; + + (void)compressor; /* Unused parameters */ + (void)session; + (void)split_pct; + (void)extra; + (void)dst_len; + (void)final; - if (decoded < 0) - return (lz4_error(compressor, session, - "LZ4 decompress error", decoded)); + sourceSize = (int)offsets[slots]; /* Type conversion */ + targetDestSize = (int)page_max; - /* return the uncompressed data length */ - *result_lenp = dst_len; + /* Compress, starting after the prefix bytes. */ + lz4_len = LZ4_compress_destSize((const char *)src, + (char *)dst + sizeof(LZ4_PREFIX), &sourceSize, targetDestSize); + /* + * If compression succeeded and the compressed length is smaller than + * the original size, return success. + */ + if (lz4_len != 0) { + /* Find the first slot we didn't compress. */ + slot = lz4_find_slot(sourceSize, offsets, slots); + + if ((size_t)lz4_len + sizeof(LZ4_PREFIX) < offsets[slot]) { + prefix.compressed_byte_count = (uint32_t)lz4_len; + prefix.decompressed_return_count = offsets[slot]; + memcpy(dst, &prefix, sizeof(LZ4_PREFIX)); + + *result_slotsp = slot; + *result_lenp = (size_t)lz4_len + sizeof(LZ4_PREFIX); + } + } + + *result_slotsp = 0; + *result_lenp = 1; return (0); } @@ -164,18 +245,18 @@ lz4_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session, */ static int lz4_pre_size(WT_COMPRESSOR *compressor, WT_SESSION *session, - uint8_t *src, size_t src_len, - size_t *result_lenp) + uint8_t *src, size_t src_len, size_t *result_lenp) { - (void)compressor; + (void)compressor; /* Unused parameters */ (void)session; (void)src; /* - * LZ4 can use more space than the input data size, use the library - * calculation of that overhead (plus our overhead) to be safe. + * In block mode, LZ4 can use more space than the input data size, use + * the library calculation of that overhead (plus our overhead) to be + * safe. */ - *result_lenp = LZ4_COMPRESSBOUND(src_len) + sizeof(size_t); + *result_lenp = LZ4_COMPRESSBOUND(src_len) + sizeof(LZ4_PREFIX); return (0); } @@ -186,44 +267,30 @@ lz4_pre_size(WT_COMPRESSOR *compressor, WT_SESSION *session, static int lz4_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session) { - (void)session; + (void)session; /* Unused parameters */ - /* Free the allocated memory. */ free(compressor); - return (0); } -int lz4_extension_init(WT_CONNECTION *, WT_CONFIG_ARG *); - /* - * lz4_extension_init -- - * A simple shared library compression example. + * lz4_add_compressor -- + * Add a LZ4 compressor. */ -int -lz4_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +static int +lz_add_compressor(WT_CONNECTION *connection, int raw, const char *name) { LZ4_COMPRESSOR *lz4_compressor; - (void)config; /* Unused parameters */ - + /* + * There are two almost identical LZ4 compressors: one using raw + * compression to target a specific block size, and one without. + */ if ((lz4_compressor = calloc(1, sizeof(LZ4_COMPRESSOR))) == NULL) return (errno); - /* - * Allocate a local compressor structure, with a WT_COMPRESSOR structure - * as the first field, allowing us to treat references to either type of - * structure as a reference to the other type. - * - * This could be simplified if only a single database is opened in the - * application, we could use a static WT_COMPRESSOR structure, and a - * static reference to the WT_EXTENSION_API methods, then we don't need - * to allocate memory when the compressor is initialized or free it when - * the compressor is terminated. However, this approach is more general - * purpose and supports multiple databases per application. - */ lz4_compressor->compressor.compress = lz4_compress; - lz4_compressor->compressor.compress_raw = NULL; + lz4_compressor->compressor.compress_raw = raw ? lz4_compress_raw : NULL; lz4_compressor->compressor.decompress = lz4_decompress; lz4_compressor->compressor.pre_size = lz4_pre_size; lz4_compressor->compressor.terminate = lz4_terminate; @@ -232,7 +299,29 @@ lz4_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) /* Load the compressor */ return (connection->add_compressor( - connection, "lz4", (WT_COMPRESSOR *)lz4_compressor, NULL)); + connection, name, (WT_COMPRESSOR *)lz4_compressor, NULL)); +} + +int lz4_extension_init(WT_CONNECTION *, WT_CONFIG_ARG *); + +/* + * lz4_extension_init -- + * WiredTiger LZ4 compression extension - called directly when LZ4 support + * is built in, or via wiredtiger_extension_init when LZ4 support is included + * via extension loading. + */ +int +lz4_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) +{ + int ret; + + (void)config; /* Unused parameters */ + + if ((ret = lz_add_compressor(connection, 1, "lz4")) != 0) + return (ret); + if ((ret = lz_add_compressor(connection, 0, "lz4-noraw")) != 0) + return (ret); + return (0); } /* |