diff options
author | David Hows <howsdav@gmail.com> | 2016-01-19 17:43:44 +1100 |
---|---|---|
committer | David Hows <howsdav@gmail.com> | 2016-01-20 09:24:13 +1100 |
commit | 1743180fcf473f811ea9b76221ef3f1d91c8ac3a (patch) | |
tree | 6d1c180dd5551f839797df8cd790f8a98f9811cb | |
parent | 3223f52ef1da3f1029d2b6c7858cc4db4b6d7a48 (diff) | |
download | mongo-1743180fcf473f811ea9b76221ef3f1d91c8ac3a.tar.gz |
WT-2267 - WTPERF Throttle implementation
-rw-r--r-- | SConstruct | 1 | ||||
-rw-r--r-- | bench/wtperf/Makefile.am | 3 | ||||
-rw-r--r-- | bench/wtperf/config.c | 3 | ||||
-rw-r--r-- | bench/wtperf/wtperf.c | 55 | ||||
-rw-r--r-- | bench/wtperf/wtperf.h | 25 | ||||
-rw-r--r-- | bench/wtperf/wtperf_throttle.c | 119 | ||||
-rw-r--r-- | bench/wtperf/wtperf_truncate.c | 2 |
7 files changed, 150 insertions, 58 deletions
diff --git a/SConstruct b/SConstruct index 6a2b0497d15..3d15c96a225 100644 --- a/SConstruct +++ b/SConstruct @@ -458,6 +458,7 @@ t = env.Program("wtperf", [ "bench/wtperf/misc.c", "bench/wtperf/track.c", "bench/wtperf/wtperf.c", + "bench/wtperf/wtperf_throttle.c", "bench/wtperf/wtperf_truncate.c", ], LIBS=[wtlib, shim] + wtlibs) diff --git a/bench/wtperf/Makefile.am b/bench/wtperf/Makefile.am index 15f151d84b2..c8b98a3c37b 100644 --- a/bench/wtperf/Makefile.am +++ b/bench/wtperf/Makefile.am @@ -5,7 +5,8 @@ LDADD = $(top_builddir)/libwiredtiger.la -lm noinst_PROGRAMS = wtperf wtperf_LDFLAGS = -static wtperf_SOURCES =\ - config.c misc.c track.c wtperf.c wtperf_truncate.c wtperf.h wtperf_opt.i + config.c misc.c track.c wtperf.c wtperf.h \ + wtperf_opt.i wtperf_throttle.c wtperf_truncate.c TESTS = smoke.sh AM_TESTS_ENVIRONMENT = rm -rf WT_TEST ; mkdir WT_TEST ; diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c index a4c3ebe441c..8b2b1a6cad8 100644 --- a/bench/wtperf/config.c +++ b/bench/wtperf/config.c @@ -237,8 +237,7 @@ config_threads(CONFIG *cfg, const char *config, size_t len) continue; } if (STRING_MATCH("throttle", k.str, k.len)) { - if ((workp->throttle = v.val) < 0) - goto err; + workp->throttle = v.val; continue; } if (STRING_MATCH("insert", k.str, k.len) || diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c index 9beef0509df..7adf462309e 100644 --- a/bench/wtperf/wtperf.c +++ b/bench/wtperf/wtperf.c @@ -86,7 +86,7 @@ static int start_threads(CONFIG *, static int stop_threads(CONFIG *, u_int, CONFIG_THREAD *); static void *thread_run_wtperf(void *); static void *worker(void *); -static void worker_throttle(int64_t, int64_t *, struct timespec *); + static uint64_t wtperf_rand(CONFIG_THREAD *); static uint64_t wtperf_value_range(CONFIG *); @@ -421,7 +421,7 @@ do_range_reads(CONFIG *cfg, WT_CURSOR *cursor) static void * worker(void *arg) { - struct timespec start, stop, interval; + struct timespec start, stop; CONFIG *cfg; CONFIG_THREAD *thread; TRACK *trk; @@ -429,7 +429,7 @@ worker(void *arg) WT_CURSOR **cursors, *cursor, *tmp_cursor; WT_SESSION *session; size_t i; - int64_t ops, ops_per_txn, throttle_ops; + int64_t ops, ops_per_txn; uint64_t next_val, usecs; uint8_t *op, *op_end; int measure_latency, ret, truncated; @@ -444,7 +444,6 @@ worker(void *arg) ops_per_txn = thread->workload->ops_per_txn; session = NULL; trk = NULL; - throttle_ops = 0; if ((ret = conn->open_session( conn, NULL, cfg->sess_config, &session)) != 0) { @@ -477,10 +476,8 @@ worker(void *arg) } /* Setup the timer for throttling. */ if (thread->workload->throttle != 0 && - (ret = __wt_epoch(NULL, &interval)) != 0) { - lprintf(cfg, ret, 0, "Get time call failed"); + (ret = setup_throttle(thread)) != 0) goto err; - } /* Setup for truncate */ if (thread->workload->truncate != 0) @@ -730,13 +727,11 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) { op = thread->workload->ops; /* - * Check throttling periodically to avoid taking too - * many time samples. + * Decrement throttle tickets and check if needed check if we + * should sleep and then get more tickets to perform more work. */ - if (thread->workload->throttle != 0 && - throttle_ops++ % THROTTLE_OPS == 0) - worker_throttle(thread->workload->throttle, - &throttle_ops, &interval); + if (--thread->throttle_cfg.ops_count == 0) + worker_throttle(thread); } if ((ret = session->close(session, NULL)) != 0) { @@ -2402,40 +2397,6 @@ stop_threads(CONFIG *cfg, u_int num, CONFIG_THREAD *threads) return (0); } -/* - * TODO: Spread the stalls out, so we don't flood at the start of each - * second and then pause. Doing this every 10th of a second is probably enough - */ -static void -worker_throttle(int64_t throttle, int64_t *ops, struct timespec *interval) -{ - struct timespec now; - uint64_t usecs_to_complete; - if (*ops < throttle) - return; - - /* Ignore errors, we don't really care. */ - if (__wt_epoch(NULL, &now) != 0) - return; - - /* - * If we've completed enough operations, reset the counters. - * If we did enough operations in less than a second, sleep for - * the rest of the second. - */ - usecs_to_complete = WT_TIMEDIFF_US(now, *interval); - if (usecs_to_complete < USEC_PER_SEC) - (void)usleep((useconds_t)(USEC_PER_SEC - usecs_to_complete)); - - /* - * After sleeping, set the interval to the current time. - */ - if (__wt_epoch(NULL, &now) != 0) - return; - *ops = 0; - *interval = now; -} - static int drop_all_tables(CONFIG *cfg) { diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h index 99450a99961..989fd84135d 100644 --- a/bench/wtperf/wtperf.h +++ b/bench/wtperf/wtperf.h @@ -66,6 +66,7 @@ typedef struct __config CONFIG; typedef struct __config_thread CONFIG_THREAD; +typedef struct __truncate_queue_entry TRUNCATE_QUEUE_ENTRY; #define EXT_PFX ",extensions=(" #define EXT_SFX ")" @@ -90,10 +91,10 @@ typedef struct { int64_t insert; /* Insert ratio */ int64_t read; /* Read ratio */ int64_t update; /* Update ratio */ - int64_t throttle; /* Maximum operations/second */ /* Number of operations per transaction. Zero for autocommit */ int64_t ops_per_txn; int64_t truncate; /* Truncate ratio */ + uint64_t throttle; /* Maximum operations/second */ uint64_t truncate_pct; /* Truncate Percent */ uint64_t truncate_count; /* Truncate Count */ @@ -106,8 +107,7 @@ typedef struct { } WORKLOAD; /* Steering items for the truncate workload */ -typedef struct __truncate_struct TRUNCATE_CONFIG; -struct __truncate_struct { +typedef struct { uint64_t stone_gap; uint64_t needed_stones; uint64_t final_stone_gap; @@ -117,7 +117,7 @@ struct __truncate_struct { uint64_t num_stones; uint64_t last_key; uint64_t catchup_multiplier; -}; +} TRUNCATE_CONFIG; /* Queue entry for use with the Truncate Logic */ struct __truncate_queue_entry { @@ -125,7 +125,6 @@ struct __truncate_queue_entry { uint64_t diff; /* Number of items to be truncated*/ TAILQ_ENTRY(__truncate_queue_entry) q; }; -typedef struct __truncate_queue_entry TRUNCATE_QUEUE_ENTRY; struct __config_queue_entry { char *string; @@ -133,6 +132,14 @@ struct __config_queue_entry { }; typedef struct __config_queue_entry CONFIG_QUEUE_ENTRY; +/* Steering for the throttle configuration */ +typedef struct { + struct timespec last_increment; /* Time that we last added more ops */ + uint64_t ops_count; /* The number of ops this increment */ + uint64_t ops_per_increment; /* Ops to add per increment */ + uint64_t usecs_increment; /* Time interval of each increment */ +} THROTTLE_CONFIG; + #define LOG_PARTIAL_CONFIG ",log=(enabled=false)" /* * NOTE: If you add any fields to this structure here, you must also add @@ -264,14 +271,16 @@ struct __config_thread { /* Per-thread structure */ WORKLOAD *workload; /* Workload */ + THROTTLE_CONFIG throttle_cfg; /* Throttle configuration */ + + TRUNCATE_CONFIG trunc_cfg; /* Truncate configuration */ + TRACK ckpt; /* Checkpoint operations */ TRACK insert; /* Insert operations */ TRACK read; /* Read operations */ TRACK update; /* Update operations */ TRACK truncate; /* Truncate operations */ TRACK truncate_sleep; /* Truncate sleep operations */ - TRUNCATE_CONFIG trunc_cfg; /* Truncate configuration */ - }; void cleanup_truncate_config(CONFIG *); @@ -293,6 +302,7 @@ int run_truncate( CONFIG *, CONFIG_THREAD *, WT_CURSOR *, WT_SESSION *, int *); int setup_log_file(CONFIG *); int setup_truncate(CONFIG *, CONFIG_THREAD *, WT_SESSION *); +int setup_throttle(CONFIG_THREAD*); uint64_t sum_ckpt_ops(CONFIG *); uint64_t sum_insert_ops(CONFIG *); uint64_t sum_pop_ops(CONFIG *); @@ -300,6 +310,7 @@ uint64_t sum_read_ops(CONFIG *); uint64_t sum_truncate_ops(CONFIG *); uint64_t sum_update_ops(CONFIG *); void usage(void); +int worker_throttle(CONFIG_THREAD*); void lprintf(const CONFIG *, int err, uint32_t, const char *, ...) #if defined(__GNUC__) diff --git a/bench/wtperf/wtperf_throttle.c b/bench/wtperf/wtperf_throttle.c new file mode 100644 index 00000000000..6c11b214521 --- /dev/null +++ b/bench/wtperf/wtperf_throttle.c @@ -0,0 +1,119 @@ +/*- + * Public Domain 2014-2015 MongoDB, Inc. + * Public Domain 2008-2014 WiredTiger, Inc. + * + * This is free and unencumbered software released into the public domain. + * + * Anyone is free to copy, modify, publish, use, compile, sell, or + * distribute this software, either in source code form or as a compiled + * binary, for any purpose, commercial or non-commercial, and by any + * means. + * + * In jurisdictions that recognize copyright laws, the author or authors + * of this software dedicate any and all copyright interest in the + * software to the public domain. We make this dedication for the benefit + * of the public at large and to the detriment of our heirs and + * successors. We intend this dedication to be an overt act of + * relinquishment in perpetuity of all present and future rights to this + * software under copyright law. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR + * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, + * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR + * OTHER DEALINGS IN THE SOFTWARE. + */ + +#include "wtperf.h" + +/* + * Put the initial config together for running a throttled workload. + */ +int +setup_throttle(CONFIG_THREAD *thread) +{ + THROTTLE_CONFIG *throttle_cfg; + + throttle_cfg = &thread->throttle_cfg; + + /* + * Setup how the number of operations to run each interval in order to + * meet our desired max throughput. + * - If we have a very small number of then we can look to do one op + * on a larger increment. Given there is overhead in throttle logic + * we want to avoid running the throttle check regularly. + * - For most workloads, we aim to do 100 ops per interval and adjust + * the sleep period accordingly. + * - For high throughput workloads, we aim to do many ops in 100us + * increments. + */ + + if (thread->workload->throttle < THROTTLE_OPS) { + /* If the interval is very small, we do laps of 1 */ + throttle_cfg->usecs_increment = + USEC_PER_SEC / thread->workload->throttle; + throttle_cfg->ops_per_increment = 1; + } else if (thread->workload->throttle < USEC_PER_SEC / THROTTLE_OPS) { + throttle_cfg->usecs_increment = + USEC_PER_SEC / thread->workload->throttle * THROTTLE_OPS; + throttle_cfg->ops_per_increment = THROTTLE_OPS; + } else { + /* If the interval is large, we do more ops per interval */ + throttle_cfg->usecs_increment = USEC_PER_SEC / THROTTLE_OPS; + throttle_cfg->ops_per_increment = + thread->workload->throttle / THROTTLE_OPS; + } + + /* Give the queue some initial tickets to work with */ + throttle_cfg->ops_count = throttle_cfg->ops_per_increment; + + /* Set the first timestamp of when we incremented */ + WT_RET(__wt_epoch(NULL, &throttle_cfg->last_increment)); + return (0); +} + +/* + * Run the throttle function. Will sleep if needed and then reload the counter + * to perform more operations. + */ +int +worker_throttle(CONFIG_THREAD *thread) +{ + THROTTLE_CONFIG *throttle_cfg; + struct timespec now; + uint64_t usecs_delta; + + throttle_cfg = &thread->throttle_cfg; + + WT_RET(__wt_epoch(NULL, &now)); + + /* + * If we did enough operations in the current interval, sleep for + * the rest of the interval. Then add more tickets to the queue. + */ + usecs_delta = WT_TIMEDIFF_US(now, throttle_cfg->last_increment); + if (usecs_delta < throttle_cfg->usecs_increment) { + (void)usleep( + (useconds_t)(throttle_cfg->usecs_increment - usecs_delta)); + throttle_cfg->ops_count = + throttle_cfg->ops_per_increment; + /* + * After sleeping, set the interval to the current time. + */ + WT_RET(__wt_epoch(NULL, &throttle_cfg->last_increment)); + } else { + throttle_cfg->ops_count = + (uint64_t) (float)(usecs_delta / + throttle_cfg->usecs_increment) * + (float)throttle_cfg->ops_per_increment; + throttle_cfg->last_increment = now; + } + + /* Don't over-fill the queue */ + throttle_cfg->ops_count = + WT_MIN(throttle_cfg->ops_count, thread->workload->throttle); + + return (0); +} diff --git a/bench/wtperf/wtperf_truncate.c b/bench/wtperf/wtperf_truncate.c index 0ff80749830..11b09c60d5d 100644 --- a/bench/wtperf/wtperf_truncate.c +++ b/bench/wtperf/wtperf_truncate.c @@ -109,7 +109,7 @@ setup_truncate(CONFIG *cfg, CONFIG_THREAD *thread, WT_SESSION *session) { cfg, truncate_item->key, trunc_cfg->stone_gap * i); truncate_item->diff = (trunc_cfg->stone_gap * i) - trunc_cfg->last_key; - TAILQ_INSERT_TAIL( &cfg->stone_head, truncate_item, q); + TAILQ_INSERT_TAIL(&cfg->stone_head, truncate_item, q); trunc_cfg->last_key = trunc_cfg->stone_gap * i; trunc_cfg->num_stones++; } |