diff options
-rw-r--r-- | src/third_party/wiredtiger/dist/filelist | 1 | ||||
-rw-r--r-- | src/third_party/wiredtiger/ext/compressors/zstd/zstd_compress.c | 217 | ||||
-rw-r--r-- | src/third_party/wiredtiger/import.data | 2 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/conn/conn_api.c | 4 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/extern.h | 7 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/include/wiredtiger_ext.h | 52 | ||||
-rw-r--r-- | src/third_party/wiredtiger/src/support/lock_ext.c | 84 |
7 files changed, 361 insertions, 6 deletions
diff --git a/src/third_party/wiredtiger/dist/filelist b/src/third_party/wiredtiger/dist/filelist index b1fe227cdfc..d2a2d93cff3 100644 --- a/src/third_party/wiredtiger/dist/filelist +++ b/src/third_party/wiredtiger/dist/filelist @@ -201,6 +201,7 @@ src/support/hash_fnv.c src/support/hazard.c src/support/hex.c src/support/huffman.c +src/support/lock_ext.c src/support/modify.c src/support/mtx_rw.c src/support/pow.c diff --git a/src/third_party/wiredtiger/ext/compressors/zstd/zstd_compress.c b/src/third_party/wiredtiger/ext/compressors/zstd/zstd_compress.c index 6eb55332258..e1725142982 100644 --- a/src/third_party/wiredtiger/ext/compressors/zstd/zstd_compress.c +++ b/src/third_party/wiredtiger/ext/compressors/zstd/zstd_compress.c @@ -44,6 +44,26 @@ #define inline __inline #endif +/* Default context pool size. */ +#define CONTEXT_POOL_SIZE 50 + +struct ZSTD_Context; +typedef struct ZSTD_Context ZSTD_CONTEXT; +struct ZSTD_Context { + void *ctx; /* Either a compression context or a decompression context. */ + ZSTD_CONTEXT *next; +}; + +struct ZSTD_Context_Pool; +typedef struct ZSTD_Context_Pool ZSTD_CONTEXT_POOL; +struct ZSTD_Context_Pool { + int count; /* Pool size */ + WT_EXTENSION_SPINLOCK list_lock; /* Spinlock */ + ZSTD_CONTEXT *free_ctx_list; +}; + +typedef enum { CONTEXT_TYPE_COMPRESS, CONTEXT_TYPE_DECOMPRESS } CONTEXT_TYPE; + /* Local compressor structure. */ typedef struct { WT_COMPRESSOR compressor; /* Must come first */ @@ -51,6 +71,9 @@ typedef struct { WT_EXTENSION_API *wt_api; /* Extension API */ int compression_level; /* compression level */ + + ZSTD_CONTEXT_POOL *cctx_pool; /* Compression context pool. */ + ZSTD_CONTEXT_POOL *dctx_pool; /* Decompression context pool. */ } ZSTD_COMPRESSOR; /* @@ -90,6 +113,68 @@ zstd_error(WT_COMPRESSOR *compressor, WT_SESSION *session, const char *call, siz } /* + * zstd_get_context -- + * WiredTiger Zstd get a context from context pool. + */ +static void +zstd_get_context( + ZSTD_COMPRESSOR *zcompressor, WT_SESSION *session, CONTEXT_TYPE ctx_type, ZSTD_CONTEXT **contextp) +{ + WT_EXTENSION_API *wt_api; + ZSTD_CONTEXT_POOL *ctx_pool; + + wt_api = zcompressor->wt_api; + + /* Based on the type, decide the context pool from which the context to be allocated. */ + if (ctx_type == CONTEXT_TYPE_COMPRESS) + ctx_pool = zcompressor->cctx_pool; + else + ctx_pool = zcompressor->dctx_pool; + + *contextp = NULL; + if (ctx_pool->free_ctx_list == NULL) + return; + + wt_api->spin_lock(wt_api, session, &(ctx_pool->list_lock)); + *contextp = ctx_pool->free_ctx_list; + ctx_pool->free_ctx_list = (*contextp)->next; + wt_api->spin_unlock(wt_api, session, &(ctx_pool->list_lock)); + (*contextp)->next = NULL; + + return; +} + +/* + * zstd_release_context -- + * WiredTiger Zstd release a context back to context pool. + */ +static void +zstd_release_context( + ZSTD_COMPRESSOR *zcompressor, WT_SESSION *session, CONTEXT_TYPE ctx_type, ZSTD_CONTEXT *context) +{ + WT_EXTENSION_API *wt_api; + ZSTD_CONTEXT_POOL *ctx_pool; + + if (context == NULL) + return; + + wt_api = zcompressor->wt_api; + + /* Based on the type, decide the context pool to which the context to be released back. */ + if (ctx_type == CONTEXT_TYPE_COMPRESS) + ctx_pool = zcompressor->cctx_pool; + else + ctx_pool = zcompressor->dctx_pool; + + wt_api->spin_lock(wt_api, session, &(ctx_pool->list_lock)); + context->next = ctx_pool->free_ctx_list; + ctx_pool->free_ctx_list = context; + wt_api->spin_unlock(wt_api, session, &(ctx_pool->list_lock)); + + return; +} + +/* * zstd_compress -- * WiredTiger Zstd compression. */ @@ -98,15 +183,24 @@ zstd_compress(WT_COMPRESSOR *compressor, WT_SESSION *session, uint8_t *src, size uint8_t *dst, size_t dst_len, size_t *result_lenp, int *compression_failed) { ZSTD_COMPRESSOR *zcompressor; + ZSTD_CONTEXT *context = NULL; size_t zstd_ret; uint64_t zstd_len; zcompressor = (ZSTD_COMPRESSOR *)compressor; + zstd_get_context(zcompressor, session, CONTEXT_TYPE_COMPRESS, &context); + /* Compress, starting past the prefix bytes. */ - zstd_ret = ZSTD_compress( - dst + ZSTD_PREFIX, dst_len - ZSTD_PREFIX, src, src_len, zcompressor->compression_level); + if (context != NULL) { + zstd_ret = ZSTD_compressCCtx((ZSTD_CCtx *)context->ctx, dst + ZSTD_PREFIX, + dst_len - ZSTD_PREFIX, src, src_len, zcompressor->compression_level); + } else { + zstd_ret = ZSTD_compress( + dst + ZSTD_PREFIX, dst_len - ZSTD_PREFIX, src, src_len, zcompressor->compression_level); + } + zstd_release_context(zcompressor, session, CONTEXT_TYPE_COMPRESS, context); /* * If compression succeeded and the compressed length is smaller than the original size, return * success. @@ -144,10 +238,13 @@ zstd_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session, uint8_t *src, si uint8_t *dst, size_t dst_len, size_t *result_lenp) { WT_EXTENSION_API *wt_api; + ZSTD_COMPRESSOR *zcompressor; + ZSTD_CONTEXT *context = NULL; size_t zstd_ret; uint64_t zstd_len; wt_api = ((ZSTD_COMPRESSOR *)compressor)->wt_api; + zcompressor = (ZSTD_COMPRESSOR *)compressor; /* * Retrieve the saved length, handling little- to big-endian conversion as necessary. @@ -162,8 +259,20 @@ zstd_decompress(WT_COMPRESSOR *compressor, WT_SESSION *session, uint8_t *src, si return (WT_ERROR); } - zstd_ret = ZSTD_decompress(dst, dst_len, src + ZSTD_PREFIX, (size_t)zstd_len); + /* + * This type of context management is useful to avoid repeated context allocation overhead. This + * is typically for block compression, for streaming compression, context could be reused over + * and over again for performance gains. + */ + zstd_get_context(zcompressor, session, CONTEXT_TYPE_DECOMPRESS, &context); + if (context != NULL) { + zstd_ret = ZSTD_decompressDCtx( + (ZSTD_DCtx *)context->ctx, dst, dst_len, src + ZSTD_PREFIX, (size_t)zstd_len); + } else { + zstd_ret = ZSTD_decompress(dst, dst_len, src + ZSTD_PREFIX, (size_t)zstd_len); + } + zstd_release_context(zcompressor, session, CONTEXT_TYPE_DECOMPRESS, context); if (!ZSTD_isError(zstd_ret)) { *result_lenp = zstd_ret; return (0); @@ -193,14 +302,108 @@ zstd_pre_size( } /* + * zstd_init_context_pool -- + * Initialize a given type of context pool. + */ +static int +zstd_init_context_pool( + ZSTD_COMPRESSOR *zcompressor, CONTEXT_TYPE ctx_type, int count, ZSTD_CONTEXT_POOL **context_poolp) +{ + WT_EXTENSION_API *wt_api; + ZSTD_CONTEXT *context; + ZSTD_CONTEXT_POOL *context_pool; + int i, ret; + + wt_api = zcompressor->wt_api; + + /* Allocate and initialize both the context pools. */ + if ((context_pool = calloc(1, sizeof(ZSTD_CONTEXT_POOL))) == NULL) + return (errno); + + if ((ret = wt_api->spin_init(wt_api, &(context_pool->list_lock), "zstd context")) != 0) { + (void)wt_api->err_printf( + wt_api, NULL, "zstd_init_context_pool: %s", wt_api->strerror(wt_api, NULL, ret)); + return (ret); + } + context_pool->count = 0; + context_pool->free_ctx_list = NULL; + + for (i = 0; i < count; i++) { + context = NULL; + if ((context = calloc(1, sizeof(ZSTD_CONTEXT))) == NULL) { + (void)wt_api->err_printf( + wt_api, NULL, "zstd_init_context_pool: context calloc failure"); + return (errno); + } + + if (ctx_type == CONTEXT_TYPE_COMPRESS) + context->ctx = (void *)ZSTD_createCCtx(); + else + context->ctx = (void *)ZSTD_createDCtx(); + + if (context->ctx == NULL) { + (void)wt_api->err_printf( + wt_api, NULL, "zstd_init_context_pool: context create failure"); + return (errno); + } + context->next = context_pool->free_ctx_list; + context_pool->free_ctx_list = context; + context_pool->count++; + } + + *context_poolp = context_pool; + return (0); +} + +/* + * zstd_terminate_context_pool -- + * Terminate the given context pool. + */ +static void +zstd_terminate_context_pool( + WT_COMPRESSOR *compressor, CONTEXT_TYPE context_type, ZSTD_CONTEXT_POOL **context_poolp) +{ + WT_EXTENSION_API *wt_api; + ZSTD_CONTEXT *context; + ZSTD_CONTEXT_POOL *context_pool; + int i; + + wt_api = ((ZSTD_COMPRESSOR *)compressor)->wt_api; + context_pool = *context_poolp; + + for (i = 0; i < context_pool->count; i++) { + context = context_pool->free_ctx_list; + context_pool->free_ctx_list = context->next; + if (context_type == CONTEXT_TYPE_COMPRESS) + ZSTD_freeCCtx((ZSTD_CCtx *)context->ctx); + else + ZSTD_freeDCtx((ZSTD_DCtx *)context->ctx); + free(context); + context = NULL; + } + + wt_api->spin_destroy(wt_api, &(context_pool->list_lock)); + context_pool->count = 0; + free(context_pool); + *context_poolp = NULL; + return; +} + +/* * zstd_terminate -- * WiredTiger Zstd compression termination. */ static int zstd_terminate(WT_COMPRESSOR *compressor, WT_SESSION *session) { - (void)session; /* Unused parameters */ + ZSTD_COMPRESSOR *zcompressor; + + zcompressor = (ZSTD_COMPRESSOR *)compressor; + + (void)session; /* Unused parameters. */ + zstd_terminate_context_pool(compressor, CONTEXT_TYPE_COMPRESS, &(zcompressor->cctx_pool)); + zstd_terminate_context_pool(compressor, CONTEXT_TYPE_DECOMPRESS, &(zcompressor->dctx_pool)); free(compressor); return (0); } @@ -219,7 +422,6 @@ zstd_init_config(WT_CONNECTION *connection, WT_CONFIG_ARG *config, int *compress /* If configured as a built-in, there's no configuration argument. */ if (config == NULL) return (0); - /* * Zstd compression engine allows applications to specify a compression level; review the * configuration. @@ -289,6 +491,11 @@ zstd_extension_init(WT_CONNECTION *connection, WT_CONFIG_ARG *config) zstd_compressor->compression_level = compression_level; + zstd_init_context_pool( + zstd_compressor, CONTEXT_TYPE_COMPRESS, CONTEXT_POOL_SIZE, &(zstd_compressor->cctx_pool)); + zstd_init_context_pool( + zstd_compressor, CONTEXT_TYPE_DECOMPRESS, CONTEXT_POOL_SIZE, &(zstd_compressor->dctx_pool)); + /* Load the compressor */ if ((ret = connection->add_compressor( connection, "zstd", (WT_COMPRESSOR *)zstd_compressor, NULL)) == 0) diff --git a/src/third_party/wiredtiger/import.data b/src/third_party/wiredtiger/import.data index 6948d09039f..6223b378883 100644 --- a/src/third_party/wiredtiger/import.data +++ b/src/third_party/wiredtiger/import.data @@ -2,5 +2,5 @@ "vendor": "wiredtiger", "github": "wiredtiger/wiredtiger.git", "branch": "mongodb-5.0", - "commit": "eb5afc287da81efd473f7db128935959060108f3" + "commit": "4fb3f7a835d51b47436bbc77694f51fb69280e5c" } diff --git a/src/third_party/wiredtiger/src/conn/conn_api.c b/src/third_party/wiredtiger/src/conn/conn_api.c index b35d198c3aa..531d217d1bb 100644 --- a/src/third_party/wiredtiger/src/conn/conn_api.c +++ b/src/third_party/wiredtiger/src/conn/conn_api.c @@ -824,6 +824,10 @@ __conn_get_extension_api(WT_CONNECTION *wt_conn) conn->extension_api.struct_pack = __wt_ext_struct_pack; conn->extension_api.struct_size = __wt_ext_struct_size; conn->extension_api.struct_unpack = __wt_ext_struct_unpack; + conn->extension_api.spin_init = __wt_ext_spin_init; + conn->extension_api.spin_lock = __wt_ext_spin_lock; + conn->extension_api.spin_unlock = __wt_ext_spin_unlock; + conn->extension_api.spin_destroy = __wt_ext_spin_destroy; conn->extension_api.transaction_id = __wt_ext_transaction_id; conn->extension_api.transaction_isolation_level = __wt_ext_transaction_isolation_level; conn->extension_api.transaction_notify = __wt_ext_transaction_notify; diff --git a/src/third_party/wiredtiger/src/include/extern.h b/src/third_party/wiredtiger/src/include/extern.h index fafb3d58f24..c640770ffc2 100644 --- a/src/third_party/wiredtiger/src/include/extern.h +++ b/src/third_party/wiredtiger/src/include/extern.h @@ -709,6 +709,8 @@ extern int __wt_ext_pack_str(WT_EXTENSION_API *wt_api, WT_PACK_STREAM *ps, const WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_ext_pack_uint(WT_EXTENSION_API *wt_api, WT_PACK_STREAM *ps, uint64_t u) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); +extern int __wt_ext_spin_init(WT_EXTENSION_API *wt_api, WT_EXTENSION_SPINLOCK *ext_spinlock, + const char *name) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_ext_struct_pack(WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, void *buffer, size_t len, const char *fmt, ...) WT_GCC_FUNC_DECL_ATTRIBUTE((warn_unused_result)); extern int __wt_ext_struct_size(WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, size_t *lenp, @@ -1729,6 +1731,11 @@ extern void __wt_evict_priority_clear(WT_SESSION_IMPL *session); extern void __wt_evict_priority_set(WT_SESSION_IMPL *session, uint64_t v); extern void __wt_evict_server_wake(WT_SESSION_IMPL *session); extern void __wt_ext_scr_free(WT_EXTENSION_API *wt_api, WT_SESSION *wt_session, void *p); +extern void __wt_ext_spin_destroy(WT_EXTENSION_API *wt_api, WT_EXTENSION_SPINLOCK *ext_spinlock); +extern void __wt_ext_spin_lock( + WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_EXTENSION_SPINLOCK *ext_spinlock); +extern void __wt_ext_spin_unlock( + WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_EXTENSION_SPINLOCK *ext_spinlock); extern void __wt_fill_hex( const uint8_t *src, size_t src_max, uint8_t *dest, size_t dest_max, size_t *lenp); extern void __wt_free_int(WT_SESSION_IMPL *session, const void *p_arg) diff --git a/src/third_party/wiredtiger/src/include/wiredtiger_ext.h b/src/third_party/wiredtiger/src/include/wiredtiger_ext.h index 0efdc3bfefc..22d798647d6 100644 --- a/src/third_party/wiredtiger/src/include/wiredtiger_ext.h +++ b/src/third_party/wiredtiger/src/include/wiredtiger_ext.h @@ -57,6 +57,15 @@ struct __wt_txn_notify { int (*notify)(WT_TXN_NOTIFY *notify, WT_SESSION *session, uint64_t txnid, int committed); }; +typedef struct __wt_extension_spinlock WT_EXTENSION_SPINLOCK; +/*! + * A placeholder data structure that allows for using the WiredTiger + * spinlock implementation from within extensions. + */ +struct __wt_extension_spinlock { + void *spinlock; /* Represents actual WiredTiger spinlock. */ +}; + /*! * Table of WiredTiger extension methods. * @@ -536,6 +545,49 @@ struct __wt_extension_api { * @copydoc wiredtiger_version */ const char *(*version)(int *majorp, int *minorp, int *patchp); + + /*! + * Initialize a spinlock + * + * @param wt_api the extension handle + * @param session the session handle + * @param spinlock the extension spinlock + * @param name the name for the spinlock + * + */ + int (*spin_init)(WT_EXTENSION_API *wt_api, WT_EXTENSION_SPINLOCK *spinlock, const char *name); + + /*! + * Destroy a spinlock + * + * @param wt_api the extension handle + * @param session the session handle + * @param spinlock the extension spinlock + * + */ + void (*spin_destroy)(WT_EXTENSION_API *wt_api, WT_EXTENSION_SPINLOCK *spinlock); + + /*! + * Spin until the lock is acquired. + * + * @param wt_api the extension handle + * @param session the session handle + * @param spinlock the extension spinlock + * + */ + void (*spin_lock)( + WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_EXTENSION_SPINLOCK *spinlock); + + /*! + * Release the spinlock. + * + * @param wt_api the extension handle + * @param session the session handle + * @param spinlock the extension spinlock + * + */ + void (*spin_unlock)( + WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_EXTENSION_SPINLOCK *spinlock); }; /*! diff --git a/src/third_party/wiredtiger/src/support/lock_ext.c b/src/third_party/wiredtiger/src/support/lock_ext.c new file mode 100644 index 00000000000..38d338892c1 --- /dev/null +++ b/src/third_party/wiredtiger/src/support/lock_ext.c @@ -0,0 +1,84 @@ +/*- + * Copyright (c) 2014-present MongoDB, Inc. + * Copyright (c) 2008-2014 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +#include "wt_internal.h" + +/* + * __wt_ext_spin_init -- + * Allocate and initialize a spinlock. + */ +int +__wt_ext_spin_init(WT_EXTENSION_API *wt_api, WT_EXTENSION_SPINLOCK *ext_spinlock, const char *name) +{ + WT_DECL_RET; + WT_SESSION_IMPL *default_session; + WT_SPINLOCK *lock; + + ext_spinlock->spinlock = NULL; + default_session = ((WT_CONNECTION_IMPL *)wt_api->conn)->default_session; + if ((ret = __wt_calloc_one(default_session, &lock)) != 0) + return ret; + if ((ret = __wt_spin_init(default_session, lock, name)) != 0) { + __wt_free(default_session, lock); + return ret; + } + ext_spinlock->spinlock = lock; + return (0); +} + +/* + * __wt_ext_spin_lock -- + * Lock the spinlock. + */ +void +__wt_ext_spin_lock( + WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_EXTENSION_SPINLOCK *ext_spinlock) +{ + WT_SPINLOCK *lock; + + WT_UNUSED(wt_api); /* Unused parameters */ + lock = ((WT_SPINLOCK *)ext_spinlock->spinlock); + __wt_spin_lock((WT_SESSION_IMPL *)session, lock); + return; +} + +/* + * __wt_ext_spin_unlock -- + * Unlock the spinlock. + */ +void +__wt_ext_spin_unlock( + WT_EXTENSION_API *wt_api, WT_SESSION *session, WT_EXTENSION_SPINLOCK *ext_spinlock) +{ + WT_SPINLOCK *lock; + + WT_UNUSED(wt_api); /* Unused parameters */ + lock = ((WT_SPINLOCK *)ext_spinlock->spinlock); + __wt_spin_unlock((WT_SESSION_IMPL *)session, lock); + return; +} + +/* + * __wt_ext_spin_destroy -- + * Destroy the spinlock. + */ +void +__wt_ext_spin_destroy(WT_EXTENSION_API *wt_api, WT_EXTENSION_SPINLOCK *ext_spinlock) +{ + WT_SESSION_IMPL *default_session; + WT_SPINLOCK *lock; + + lock = ((WT_SPINLOCK *)ext_spinlock->spinlock); + + /* Default session is used to comply with the lock initialization. */ + default_session = ((WT_CONNECTION_IMPL *)wt_api->conn)->default_session; + __wt_spin_destroy(default_session, lock); + __wt_free(default_session, lock); + ext_spinlock->spinlock = NULL; + return; +} |