summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Hows <howsdav@gmail.com>2016-01-19 17:43:44 +1100
committerDavid Hows <howsdav@gmail.com>2016-01-20 09:24:13 +1100
commit1743180fcf473f811ea9b76221ef3f1d91c8ac3a (patch)
tree6d1c180dd5551f839797df8cd790f8a98f9811cb
parent3223f52ef1da3f1029d2b6c7858cc4db4b6d7a48 (diff)
downloadmongo-1743180fcf473f811ea9b76221ef3f1d91c8ac3a.tar.gz
WT-2267 - WTPERF Throttle implementation
-rw-r--r--SConstruct1
-rw-r--r--bench/wtperf/Makefile.am3
-rw-r--r--bench/wtperf/config.c3
-rw-r--r--bench/wtperf/wtperf.c55
-rw-r--r--bench/wtperf/wtperf.h25
-rw-r--r--bench/wtperf/wtperf_throttle.c119
-rw-r--r--bench/wtperf/wtperf_truncate.c2
7 files changed, 150 insertions, 58 deletions
diff --git a/SConstruct b/SConstruct
index 6a2b0497d15..3d15c96a225 100644
--- a/SConstruct
+++ b/SConstruct
@@ -458,6 +458,7 @@ t = env.Program("wtperf", [
"bench/wtperf/misc.c",
"bench/wtperf/track.c",
"bench/wtperf/wtperf.c",
+ "bench/wtperf/wtperf_throttle.c",
"bench/wtperf/wtperf_truncate.c",
],
LIBS=[wtlib, shim] + wtlibs)
diff --git a/bench/wtperf/Makefile.am b/bench/wtperf/Makefile.am
index 15f151d84b2..c8b98a3c37b 100644
--- a/bench/wtperf/Makefile.am
+++ b/bench/wtperf/Makefile.am
@@ -5,7 +5,8 @@ LDADD = $(top_builddir)/libwiredtiger.la -lm
noinst_PROGRAMS = wtperf
wtperf_LDFLAGS = -static
wtperf_SOURCES =\
- config.c misc.c track.c wtperf.c wtperf_truncate.c wtperf.h wtperf_opt.i
+ config.c misc.c track.c wtperf.c wtperf.h \
+ wtperf_opt.i wtperf_throttle.c wtperf_truncate.c
TESTS = smoke.sh
AM_TESTS_ENVIRONMENT = rm -rf WT_TEST ; mkdir WT_TEST ;
diff --git a/bench/wtperf/config.c b/bench/wtperf/config.c
index a4c3ebe441c..8b2b1a6cad8 100644
--- a/bench/wtperf/config.c
+++ b/bench/wtperf/config.c
@@ -237,8 +237,7 @@ config_threads(CONFIG *cfg, const char *config, size_t len)
continue;
}
if (STRING_MATCH("throttle", k.str, k.len)) {
- if ((workp->throttle = v.val) < 0)
- goto err;
+ workp->throttle = v.val;
continue;
}
if (STRING_MATCH("insert", k.str, k.len) ||
diff --git a/bench/wtperf/wtperf.c b/bench/wtperf/wtperf.c
index 9beef0509df..7adf462309e 100644
--- a/bench/wtperf/wtperf.c
+++ b/bench/wtperf/wtperf.c
@@ -86,7 +86,7 @@ static int start_threads(CONFIG *,
static int stop_threads(CONFIG *, u_int, CONFIG_THREAD *);
static void *thread_run_wtperf(void *);
static void *worker(void *);
-static void worker_throttle(int64_t, int64_t *, struct timespec *);
+
static uint64_t wtperf_rand(CONFIG_THREAD *);
static uint64_t wtperf_value_range(CONFIG *);
@@ -421,7 +421,7 @@ do_range_reads(CONFIG *cfg, WT_CURSOR *cursor)
static void *
worker(void *arg)
{
- struct timespec start, stop, interval;
+ struct timespec start, stop;
CONFIG *cfg;
CONFIG_THREAD *thread;
TRACK *trk;
@@ -429,7 +429,7 @@ worker(void *arg)
WT_CURSOR **cursors, *cursor, *tmp_cursor;
WT_SESSION *session;
size_t i;
- int64_t ops, ops_per_txn, throttle_ops;
+ int64_t ops, ops_per_txn;
uint64_t next_val, usecs;
uint8_t *op, *op_end;
int measure_latency, ret, truncated;
@@ -444,7 +444,6 @@ worker(void *arg)
ops_per_txn = thread->workload->ops_per_txn;
session = NULL;
trk = NULL;
- throttle_ops = 0;
if ((ret = conn->open_session(
conn, NULL, cfg->sess_config, &session)) != 0) {
@@ -477,10 +476,8 @@ worker(void *arg)
}
/* Setup the timer for throttling. */
if (thread->workload->throttle != 0 &&
- (ret = __wt_epoch(NULL, &interval)) != 0) {
- lprintf(cfg, ret, 0, "Get time call failed");
+ (ret = setup_throttle(thread)) != 0)
goto err;
- }
/* Setup for truncate */
if (thread->workload->truncate != 0)
@@ -730,13 +727,11 @@ op_err: if (ret == WT_ROLLBACK && ops_per_txn != 0) {
op = thread->workload->ops;
/*
- * Check throttling periodically to avoid taking too
- * many time samples.
+ * Decrement throttle tickets and check if needed check if we
+ * should sleep and then get more tickets to perform more work.
*/
- if (thread->workload->throttle != 0 &&
- throttle_ops++ % THROTTLE_OPS == 0)
- worker_throttle(thread->workload->throttle,
- &throttle_ops, &interval);
+ if (--thread->throttle_cfg.ops_count == 0)
+ worker_throttle(thread);
}
if ((ret = session->close(session, NULL)) != 0) {
@@ -2402,40 +2397,6 @@ stop_threads(CONFIG *cfg, u_int num, CONFIG_THREAD *threads)
return (0);
}
-/*
- * TODO: Spread the stalls out, so we don't flood at the start of each
- * second and then pause. Doing this every 10th of a second is probably enough
- */
-static void
-worker_throttle(int64_t throttle, int64_t *ops, struct timespec *interval)
-{
- struct timespec now;
- uint64_t usecs_to_complete;
- if (*ops < throttle)
- return;
-
- /* Ignore errors, we don't really care. */
- if (__wt_epoch(NULL, &now) != 0)
- return;
-
- /*
- * If we've completed enough operations, reset the counters.
- * If we did enough operations in less than a second, sleep for
- * the rest of the second.
- */
- usecs_to_complete = WT_TIMEDIFF_US(now, *interval);
- if (usecs_to_complete < USEC_PER_SEC)
- (void)usleep((useconds_t)(USEC_PER_SEC - usecs_to_complete));
-
- /*
- * After sleeping, set the interval to the current time.
- */
- if (__wt_epoch(NULL, &now) != 0)
- return;
- *ops = 0;
- *interval = now;
-}
-
static int
drop_all_tables(CONFIG *cfg)
{
diff --git a/bench/wtperf/wtperf.h b/bench/wtperf/wtperf.h
index 99450a99961..989fd84135d 100644
--- a/bench/wtperf/wtperf.h
+++ b/bench/wtperf/wtperf.h
@@ -66,6 +66,7 @@
typedef struct __config CONFIG;
typedef struct __config_thread CONFIG_THREAD;
+typedef struct __truncate_queue_entry TRUNCATE_QUEUE_ENTRY;
#define EXT_PFX ",extensions=("
#define EXT_SFX ")"
@@ -90,10 +91,10 @@ typedef struct {
int64_t insert; /* Insert ratio */
int64_t read; /* Read ratio */
int64_t update; /* Update ratio */
- int64_t throttle; /* Maximum operations/second */
/* Number of operations per transaction. Zero for autocommit */
int64_t ops_per_txn;
int64_t truncate; /* Truncate ratio */
+ uint64_t throttle; /* Maximum operations/second */
uint64_t truncate_pct; /* Truncate Percent */
uint64_t truncate_count; /* Truncate Count */
@@ -106,8 +107,7 @@ typedef struct {
} WORKLOAD;
/* Steering items for the truncate workload */
-typedef struct __truncate_struct TRUNCATE_CONFIG;
-struct __truncate_struct {
+typedef struct {
uint64_t stone_gap;
uint64_t needed_stones;
uint64_t final_stone_gap;
@@ -117,7 +117,7 @@ struct __truncate_struct {
uint64_t num_stones;
uint64_t last_key;
uint64_t catchup_multiplier;
-};
+} TRUNCATE_CONFIG;
/* Queue entry for use with the Truncate Logic */
struct __truncate_queue_entry {
@@ -125,7 +125,6 @@ struct __truncate_queue_entry {
uint64_t diff; /* Number of items to be truncated*/
TAILQ_ENTRY(__truncate_queue_entry) q;
};
-typedef struct __truncate_queue_entry TRUNCATE_QUEUE_ENTRY;
struct __config_queue_entry {
char *string;
@@ -133,6 +132,14 @@ struct __config_queue_entry {
};
typedef struct __config_queue_entry CONFIG_QUEUE_ENTRY;
+/* Steering for the throttle configuration */
+typedef struct {
+ struct timespec last_increment; /* Time that we last added more ops */
+ uint64_t ops_count; /* The number of ops this increment */
+ uint64_t ops_per_increment; /* Ops to add per increment */
+ uint64_t usecs_increment; /* Time interval of each increment */
+} THROTTLE_CONFIG;
+
#define LOG_PARTIAL_CONFIG ",log=(enabled=false)"
/*
* NOTE: If you add any fields to this structure here, you must also add
@@ -264,14 +271,16 @@ struct __config_thread { /* Per-thread structure */
WORKLOAD *workload; /* Workload */
+ THROTTLE_CONFIG throttle_cfg; /* Throttle configuration */
+
+ TRUNCATE_CONFIG trunc_cfg; /* Truncate configuration */
+
TRACK ckpt; /* Checkpoint operations */
TRACK insert; /* Insert operations */
TRACK read; /* Read operations */
TRACK update; /* Update operations */
TRACK truncate; /* Truncate operations */
TRACK truncate_sleep; /* Truncate sleep operations */
- TRUNCATE_CONFIG trunc_cfg; /* Truncate configuration */
-
};
void cleanup_truncate_config(CONFIG *);
@@ -293,6 +302,7 @@ int run_truncate(
CONFIG *, CONFIG_THREAD *, WT_CURSOR *, WT_SESSION *, int *);
int setup_log_file(CONFIG *);
int setup_truncate(CONFIG *, CONFIG_THREAD *, WT_SESSION *);
+int setup_throttle(CONFIG_THREAD*);
uint64_t sum_ckpt_ops(CONFIG *);
uint64_t sum_insert_ops(CONFIG *);
uint64_t sum_pop_ops(CONFIG *);
@@ -300,6 +310,7 @@ uint64_t sum_read_ops(CONFIG *);
uint64_t sum_truncate_ops(CONFIG *);
uint64_t sum_update_ops(CONFIG *);
void usage(void);
+int worker_throttle(CONFIG_THREAD*);
void lprintf(const CONFIG *, int err, uint32_t, const char *, ...)
#if defined(__GNUC__)
diff --git a/bench/wtperf/wtperf_throttle.c b/bench/wtperf/wtperf_throttle.c
new file mode 100644
index 00000000000..6c11b214521
--- /dev/null
+++ b/bench/wtperf/wtperf_throttle.c
@@ -0,0 +1,119 @@
+/*-
+ * Public Domain 2014-2015 MongoDB, Inc.
+ * Public Domain 2008-2014 WiredTiger, Inc.
+ *
+ * This is free and unencumbered software released into the public domain.
+ *
+ * Anyone is free to copy, modify, publish, use, compile, sell, or
+ * distribute this software, either in source code form or as a compiled
+ * binary, for any purpose, commercial or non-commercial, and by any
+ * means.
+ *
+ * In jurisdictions that recognize copyright laws, the author or authors
+ * of this software dedicate any and all copyright interest in the
+ * software to the public domain. We make this dedication for the benefit
+ * of the public at large and to the detriment of our heirs and
+ * successors. We intend this dedication to be an overt act of
+ * relinquishment in perpetuity of all present and future rights to this
+ * software under copyright law.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
+ * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
+ * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
+ * OTHER DEALINGS IN THE SOFTWARE.
+ */
+
+#include "wtperf.h"
+
+/*
+ * Put the initial config together for running a throttled workload.
+ */
+int
+setup_throttle(CONFIG_THREAD *thread)
+{
+ THROTTLE_CONFIG *throttle_cfg;
+
+ throttle_cfg = &thread->throttle_cfg;
+
+ /*
+ * Setup how the number of operations to run each interval in order to
+ * meet our desired max throughput.
+ * - If we have a very small number of then we can look to do one op
+ * on a larger increment. Given there is overhead in throttle logic
+ * we want to avoid running the throttle check regularly.
+ * - For most workloads, we aim to do 100 ops per interval and adjust
+ * the sleep period accordingly.
+ * - For high throughput workloads, we aim to do many ops in 100us
+ * increments.
+ */
+
+ if (thread->workload->throttle < THROTTLE_OPS) {
+ /* If the interval is very small, we do laps of 1 */
+ throttle_cfg->usecs_increment =
+ USEC_PER_SEC / thread->workload->throttle;
+ throttle_cfg->ops_per_increment = 1;
+ } else if (thread->workload->throttle < USEC_PER_SEC / THROTTLE_OPS) {
+ throttle_cfg->usecs_increment =
+ USEC_PER_SEC / thread->workload->throttle * THROTTLE_OPS;
+ throttle_cfg->ops_per_increment = THROTTLE_OPS;
+ } else {
+ /* If the interval is large, we do more ops per interval */
+ throttle_cfg->usecs_increment = USEC_PER_SEC / THROTTLE_OPS;
+ throttle_cfg->ops_per_increment =
+ thread->workload->throttle / THROTTLE_OPS;
+ }
+
+ /* Give the queue some initial tickets to work with */
+ throttle_cfg->ops_count = throttle_cfg->ops_per_increment;
+
+ /* Set the first timestamp of when we incremented */
+ WT_RET(__wt_epoch(NULL, &throttle_cfg->last_increment));
+ return (0);
+}
+
+/*
+ * Run the throttle function. Will sleep if needed and then reload the counter
+ * to perform more operations.
+ */
+int
+worker_throttle(CONFIG_THREAD *thread)
+{
+ THROTTLE_CONFIG *throttle_cfg;
+ struct timespec now;
+ uint64_t usecs_delta;
+
+ throttle_cfg = &thread->throttle_cfg;
+
+ WT_RET(__wt_epoch(NULL, &now));
+
+ /*
+ * If we did enough operations in the current interval, sleep for
+ * the rest of the interval. Then add more tickets to the queue.
+ */
+ usecs_delta = WT_TIMEDIFF_US(now, throttle_cfg->last_increment);
+ if (usecs_delta < throttle_cfg->usecs_increment) {
+ (void)usleep(
+ (useconds_t)(throttle_cfg->usecs_increment - usecs_delta));
+ throttle_cfg->ops_count =
+ throttle_cfg->ops_per_increment;
+ /*
+ * After sleeping, set the interval to the current time.
+ */
+ WT_RET(__wt_epoch(NULL, &throttle_cfg->last_increment));
+ } else {
+ throttle_cfg->ops_count =
+ (uint64_t) (float)(usecs_delta /
+ throttle_cfg->usecs_increment) *
+ (float)throttle_cfg->ops_per_increment;
+ throttle_cfg->last_increment = now;
+ }
+
+ /* Don't over-fill the queue */
+ throttle_cfg->ops_count =
+ WT_MIN(throttle_cfg->ops_count, thread->workload->throttle);
+
+ return (0);
+}
diff --git a/bench/wtperf/wtperf_truncate.c b/bench/wtperf/wtperf_truncate.c
index 0ff80749830..11b09c60d5d 100644
--- a/bench/wtperf/wtperf_truncate.c
+++ b/bench/wtperf/wtperf_truncate.c
@@ -109,7 +109,7 @@ setup_truncate(CONFIG *cfg, CONFIG_THREAD *thread, WT_SESSION *session) {
cfg, truncate_item->key, trunc_cfg->stone_gap * i);
truncate_item->diff =
(trunc_cfg->stone_gap * i) - trunc_cfg->last_key;
- TAILQ_INSERT_TAIL( &cfg->stone_head, truncate_item, q);
+ TAILQ_INSERT_TAIL(&cfg->stone_head, truncate_item, q);
trunc_cfg->last_key = trunc_cfg->stone_gap * i;
trunc_cfg->num_stones++;
}