summaryrefslogtreecommitdiff
path: root/ext
diff options
context:
space:
mode:
authorKeith Bostic <keith@wiredtiger.com>2015-05-03 11:51:21 -0400
committerKeith Bostic <keith@wiredtiger.com>2015-05-03 11:51:21 -0400
commit1025b9fa76089d5d622feabbae339171f56f3702 (patch)
treef1c34827b01337027306dcf6ad0d95e2cf854c83 /ext
parent4119c22f29e3b8d9e73ca114f1ace52f48c77c58 (diff)
downloadmongo-1025b9fa76089d5d622feabbae339171f56f3702.tar.gz
Add LZ4 raw support based on the new LZ4_compress_destSize API, making
it the default (LZ4 block mode is now lz4-noraw, to match zlib). This implementation mostly works, but... It's not backward compatible because we're storing more information in the destination buffer than before (two 4B values instead of a one 8B value). The 2.5.3 release is the only release that had LZ4 support, so I don't expect this to be a problem. We could be backward compatible, but not without a fair amount of pain. This code sizes decompression buffers incorrectly. The WiredTiger btree code sizes the decompression buffer based on the final in-memory size of the data, which isn't correct for data compressed with LZ4's raw compression function which may have compressed more data than the final in-memory size.
Diffstat (limited to 'ext')
-rw-r--r--ext/compressors/lz4/lz4_compress.c257
1 files changed, 173 insertions, 84 deletions
diff --git a/ext/compressors/lz4/lz4_compress.c b/ext/compressors/lz4/lz4_compress.c
index 9939a4f8a04..7cdd1602128 100644
--- a/ext/compressors/lz4/lz4_compress.c
+++ b/ext/compressors/lz4/lz4_compress.c
@@ -48,19 +48,36 @@ typedef struct {
} LZ4_COMPRESSOR;
/*
+ * LZ4 decompression requires the exact compressed byte count returned by the
+ * LZ4_compress and LZ4_compress_destSize functions. WiredTiger doesn't track
+ * that value, store it in the destination buffer.
+ *
+ * Additionally, LZ4_compress_destSize may compress into the middle of a record,
+ * and after decompression we return the length to the last record successfully
+ * decompressed, not the number of bytes decompressed; store that value in the
+ * destination buffer as well.
+ *
+ * Use fixed-size, 4B values (WiredTiger never writes buffers larger than 4GB).
+ */
+typedef struct {
+ uint32_t compressed_byte_count;
+ uint32_t decompressed_return_count;
+} LZ4_PREFIX;
+
+/*
* lz4_error --
* Output an error message, and return a standard error code.
*/
static int
lz4_error(
- WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int zret)
+ WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, int error)
{
WT_EXTENSION_API *wt_api;
wt_api = ((LZ4_COMPRESSOR *)compressor)->wt_api;
(void)wt_api->err_printf(wt_api,
- session, "lz4 error: %s: %d", call, zret);
+ session, "lz4 error: %s: %d", call, error);
return (WT_ERROR);
}
@@ -74,39 +91,33 @@ lz4_compress(WT_COMPRESSOR *compressor, WT_SESSION *session,
uint8_t *dst, size_t dst_len,
size_t *result_lenp, int *compression_failed)
{
- char *lz4buf;
- size_t lz4_len;
+ LZ4_PREFIX prefix;
+ int lz4_len;
- /*
- * The buffer should always be large enough due to the lz4_pre_size
- * call, but be paranoid and error if it isn't.
- */
- if (dst_len < src_len + sizeof(size_t))
- return (lz4_error(compressor, session,
- "LZ4 compress buffer too small", 0));
+ (void)compressor; /* Unused parameters */
+ (void)session;
+ (void)src_len;
+ (void)dst_len;
- /* Store the length of the compressed block in the first 8 bytes. */
- lz4buf = (char *)dst + sizeof(size_t);
- lz4_len = (size_t)LZ4_compress((const char *)src, lz4buf, (int)src_len);
+ /* Compress, starting after the prefix bytes. */
+ lz4_len = LZ4_compress(
+ (const char *)src, (char *)dst + sizeof(LZ4_PREFIX), (int)src_len);
/*
- * Flag no-compression if the result was larger than the original
- * size or compression failed.
+ * If compression succeeded and the compressed length is smaller than
+ * the original size, return success.
*/
- if (lz4_len == 0 || lz4_len + sizeof(size_t) >= src_len)
- *compression_failed = 1;
- else {
- /*
- * On decompression, lz4 requires the exact compressed byte
- * count (the current value of lz4_len). WiredTiger does not
- * preserve that value, so save lz4_len at the beginning of the
- * destination buffer.
- */
- *(size_t *)dst = lz4_len;
- *result_lenp = lz4_len + sizeof(size_t);
+ if (lz4_len != 0 && (size_t)lz4_len + sizeof(LZ4_PREFIX) < src_len) {
+ prefix.compressed_byte_count = (uint32_t)lz4_len;
+ prefix.decompressed_return_count = (uint32_t)src_len;
+ memcpy(dst, &prefix, sizeof(LZ4_PREFIX));
+
+ *result_lenp = (size_t)lz4_len + sizeof(LZ4_PREFIX);
*compression_failed = 0;
+ return (0);
}
+ *compression_failed = 1;
return (0);
}
@@ -120,41 +131,111 @@ lz4_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
uint8_t *dst, size_t dst_len,
size_t *result_lenp)
{
- WT_EXTENSION_API *wt_api;
- char *compressed_data;
+ LZ4_PREFIX prefix;
int decoded;
- size_t src_data_len;
-
- wt_api = ((LZ4_COMPRESSOR *)compressor)->wt_api;
- /* Retrieve compressed length from start of the data buffer. */
- src_data_len = *(size_t *)src;
- if (src_data_len + sizeof(size_t) > src_len) {
- (void)wt_api->err_printf(wt_api,
- session,
- "lz4_decompress: stored size exceeds buffer size");
- return (WT_ERROR);
- }
+ (void)src_len; /* Unused parameters */
- /* Skip over the data size to the start of compressed data. */
- compressed_data = (char *)src + sizeof(size_t);
+ /*
+ * Retrieve the length of the compressed block and the decompressed
+ * bytes to return from the start of the source buffer.
+ */
+ memcpy(&prefix, src, sizeof(LZ4_PREFIX));
/*
- * The destination buffer length should always be sufficient because
- * wiredtiger keeps track of the byte count before compression. Use
- * safe decompression: we may be relying on decompression to detect
- * corruption.
+ * Decompress, starting after the prefix bytes. Use safe decompression:
+ * we rely on decompression to detect corruption.
*/
- decoded = LZ4_decompress_safe(
- compressed_data, (char *)dst, (int)src_data_len, (int)dst_len);
+ decoded = LZ4_decompress_safe((const char *)src + sizeof(LZ4_PREFIX),
+ (char *)dst, (int)prefix.compressed_byte_count, (int)dst_len);
+
+ /* If decompression succeeded, return the uncompressed data length. */
+ if (decoded >= 0) {
+ *result_lenp = prefix.decompressed_return_count;
+ return (0);
+ }
+
+ return (
+ lz4_error(compressor, session, "LZ4 decompress error", decoded));
+}
+
+/*
+ * lz4_find_slot --
+ * Find the slot containing the target offset (binary search).
+ */
+static inline uint32_t
+lz4_find_slot(int target_arg, uint32_t *offsets, uint32_t slots)
+{
+ uint32_t base, indx, limit, target;
+
+ indx = 1;
+ target = (uint32_t)target_arg; /* Type conversion */
+
+ /* Figure out which slot we got to: binary search */
+ if (target >= offsets[slots])
+ indx = slots;
+ else if (target > offsets[1])
+ for (base = 2, limit = slots - base; limit != 0; limit >>= 1) {
+ indx = base + (limit >> 1);
+ if (target < offsets[indx])
+ continue;
+ base = indx + 1;
+ --limit;
+ }
+
+ return (indx);
+}
+
+/*
+ * lz4_compress_raw --
+ * Pack records into a specified on-disk page size.
+ */
+static int
+lz4_compress_raw(WT_COMPRESSOR *compressor, WT_SESSION *session,
+ size_t page_max, int split_pct, size_t extra,
+ uint8_t *src, uint32_t *offsets, uint32_t slots,
+ uint8_t *dst, size_t dst_len, int final,
+ size_t *result_lenp, uint32_t *result_slotsp)
+{
+ LZ4_PREFIX prefix;
+ int lz4_len;
+ uint32_t slot;
+ int sourceSize, targetDestSize;
+
+ (void)compressor; /* Unused parameters */
+ (void)session;
+ (void)split_pct;
+ (void)extra;
+ (void)dst_len;
+ (void)final;
- if (decoded < 0)
- return (lz4_error(compressor, session,
- "LZ4 decompress error", decoded));
+ sourceSize = (int)offsets[slots]; /* Type conversion */
+ targetDestSize = (int)page_max;
- /* return the uncompressed data length */
- *result_lenp = dst_len;
+ /* Compress, starting after the prefix bytes. */
+ lz4_len = LZ4_compress_destSize((const char *)src,
+ (char *)dst + sizeof(LZ4_PREFIX), &sourceSize, targetDestSize);
+ /*
+ * If compression succeeded and the compressed length is smaller than
+ * the original size, return success.
+ */
+ if (lz4_len != 0) {
+ /* Find the first slot we didn't compress. */
+ slot = lz4_find_slot(sourceSize, offsets, slots);
+
+ if ((size_t)lz4_len + sizeof(LZ4_PREFIX) < offsets[slot]) {
+ prefix.compressed_byte_count = (uint32_t)lz4_len;
+ prefix.decompressed_return_count = offsets[slot];
+ memcpy(dst, &prefix, sizeof(LZ4_PREFIX));
+
+ *result_slotsp = slot;
+ *result_lenp = (size_t)lz4_len + sizeof(LZ4_PREFIX);
+ }
+ }
+
+ *result_slotsp = 0;
+ *result_lenp = 1;
return (0);
}
@@ -164,18 +245,18 @@ lz4_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session,
*/
static int
lz4_pre_size(WT_COMPRESSOR *compressor, WT_SESSION *session,
- uint8_t *src, size_t src_len,
- size_t *result_lenp)
+ uint8_t *src, size_t src_len, size_t *result_lenp)
{
- (void)compressor;
+ (void)compressor; /* Unused parameters */
(void)session;
(void)src;
/*
- * LZ4 can use more space than the input data size, use the library
- * calculation of that overhead (plus our overhead) to be safe.
+ * In block mode, LZ4 can use more space than the input data size, use
+ * the library calculation of that overhead (plus our overhead) to be
+ * safe.
*/
- *result_lenp = LZ4_COMPRESSBOUND(src_len) + sizeof(size_t);
+ *result_lenp = LZ4_COMPRESSBOUND(src_len) + sizeof(LZ4_PREFIX);
return (0);
}
@@ -186,44 +267,30 @@ lz4_pre_size(WT_COMPRESSOR *compressor, WT_SESSION *session,
static int
lz4_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session)
{
- (void)session;
+ (void)session; /* Unused parameters */
- /* Free the allocated memory. */
free(compressor);
-
return (0);
}
-int lz4_extension_init(WT_CONNECTION *, WT_CONFIG_ARG *);
-
/*
- * lz4_extension_init --
- * A simple shared library compression example.
+ * lz4_add_compressor --
+ * Add a LZ4 compressor.
*/
-int
-lz4_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+static int
+lz_add_compressor(WT_CONNECTION *connection, int raw, const char *name)
{
LZ4_COMPRESSOR *lz4_compressor;
- (void)config; /* Unused parameters */
-
+ /*
+ * There are two almost identical LZ4 compressors: one using raw
+ * compression to target a specific block size, and one without.
+ */
if ((lz4_compressor = calloc(1, sizeof(LZ4_COMPRESSOR))) == NULL)
return (errno);
- /*
- * Allocate a local compressor structure, with a WT_COMPRESSOR structure
- * as the first field, allowing us to treat references to either type of
- * structure as a reference to the other type.
- *
- * This could be simplified if only a single database is opened in the
- * application, we could use a static WT_COMPRESSOR structure, and a
- * static reference to the WT_EXTENSION_API methods, then we don't need
- * to allocate memory when the compressor is initialized or free it when
- * the compressor is terminated. However, this approach is more general
- * purpose and supports multiple databases per application.
- */
lz4_compressor->compressor.compress = lz4_compress;
- lz4_compressor->compressor.compress_raw = NULL;
+ lz4_compressor->compressor.compress_raw = raw ? lz4_compress_raw : NULL;
lz4_compressor->compressor.decompress = lz4_decompress;
lz4_compressor->compressor.pre_size = lz4_pre_size;
lz4_compressor->compressor.terminate = lz4_terminate;
@@ -232,7 +299,29 @@ lz4_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
/* Load the compressor */
return (connection->add_compressor(
- connection, "lz4", (WT_COMPRESSOR *)lz4_compressor, NULL));
+ connection, name, (WT_COMPRESSOR *)lz4_compressor, NULL));
+}
+
+int lz4_extension_init(WT_CONNECTION *, WT_CONFIG_ARG *);
+
+/*
+ * lz4_extension_init --
+ * WiredTiger LZ4 compression extension - called directly when LZ4 support
+ * is built in, or via wiredtiger_extension_init when LZ4 support is included
+ * via extension loading.
+ */
+int
+lz4_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config)
+{
+ int ret;
+
+ (void)config; /* Unused parameters */
+
+ if ((ret = lz_add_compressor(connection, 1, "lz4")) != 0)
+ return (ret);
+ if ((ret = lz_add_compressor(connection, 0, "lz4-noraw")) != 0)
+ return (ret);
+ return (0);
}
/*