summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/bench/wtperf/wtperf.h
blob: ff3b937a6518adff3ac5563464d5d579ec45f8ed (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
/*-
 * Public Domain 2014-present MongoDB, Inc.
 * Public Domain 2008-2014 WiredTiger, Inc.
 *
 * This is free and unencumbered software released into the public domain.
 *
 * Anyone is free to copy, modify, publish, use, compile, sell, or
 * distribute this software, either in source code form or as a compiled
 * binary, for any purpose, commercial or non-commercial, and by any
 * means.
 *
 * In jurisdictions that recognize copyright laws, the author or authors
 * of this software dedicate any and all copyright interest in the
 * software to the public domain. We make this dedication for the benefit
 * of the public at large and to the detriment of our heirs and
 * successors. We intend this dedication to be an overt act of
 * relinquishment in perpetuity of all present and future rights to this
 * software under copyright law.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 * IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
 * OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
 * ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
 * OTHER DEALINGS IN THE SOFTWARE.
 */

#ifndef HAVE_WTPERF_H
#define HAVE_WTPERF_H

#include "test_util.h"

#include <math.h>

#include "config_opt.h"

typedef struct __wtperf WTPERF;
typedef struct __wtperf_thread WTPERF_THREAD;
typedef struct __truncate_queue_entry TRUNCATE_QUEUE_ENTRY;

#define EXT_PFX ",extensions=("
#define EXT_SFX ")"
#define EXTPATH "../../ext/" /* Extensions path */
#define BLKCMP_PFX "block_compressor="

/* Compressor Extensions */
#undef LZ4_PATH
#define LZ4_PATH "compressors/lz4/libwiredtiger_lz4.so"
#undef SNAPPY_PATH
#define SNAPPY_PATH "compressors/snappy/libwiredtiger_snappy.so"
#undef ZLIB_PATH
#define ZLIB_PATH "compressors/zlib/libwiredtiger_zlib.so"
#undef ZSTD_PATH
#define ZSTD_PATH "compressors/zstd/libwiredtiger_zstd.so"

#define LZ4_BLK BLKCMP_PFX "lz4"
#define LZ4_EXT EXT_PFX EXTPATH LZ4_PATH EXT_SFX
#define SNAPPY_BLK BLKCMP_PFX "snappy"
#define SNAPPY_EXT EXT_PFX EXTPATH SNAPPY_PATH EXT_SFX
#define ZLIB_BLK BLKCMP_PFX "zlib"
#define ZLIB_EXT EXT_PFX EXTPATH ZLIB_PATH EXT_SFX
#define ZSTD_BLK BLKCMP_PFX "zstd"
#define ZSTD_EXT EXT_PFX EXTPATH ZSTD_PATH EXT_SFX

/* Tiered Storage Extensions */
#ifndef DIR_STORE_PATH
#define DIR_STORE_PATH "storage_sources/dir_store/libwiredtiger_dir_store.so"
#endif
#ifndef S3_PATH
#define S3_PATH "storage_sources/s3_store/libwiredtiger_s3_store.so"
#endif

#define DIR_EXT EXT_PFX EXTPATH DIR_STORE_PATH EXT_SFX
#define S3_EXT EXT_PFX EXTPATH S3_PATH EXT_SFX

#define MAX_MODIFY_PCT 10
#define MAX_MODIFY_NUM 16

typedef struct {
    int64_t threads;   /* Thread count */
    int64_t insert;    /* Insert ratio */
    int64_t modify;    /* Modify ratio */
    int64_t read;      /* Read ratio */
    int64_t update;    /* Update ratio */
    uint64_t throttle; /* Maximum operations/second */
                       /* Number of operations per transaction. Zero for autocommit */

    int64_t modify_delta;   /* Value size change on modify */
    bool modify_distribute; /* Distribute the change of modifications across the whole new record */
    bool modify_force_update; /* Do force update instead of modify */
    int64_t ops_per_txn;
    int64_t pause;           /* Time between scans */
    int64_t read_range;      /* Range of reads */
    int32_t table_index;     /* Table to focus ops on */
    int64_t truncate;        /* Truncate ratio */
    uint64_t truncate_pct;   /* Truncate Percent */
    uint64_t truncate_count; /* Truncate Count */
    int64_t update_delta;    /* Value size change on update */

#define WORKER_INSERT 1     /* Insert */
#define WORKER_INSERT_RMW 2 /* Insert with read-modify-write */
#define WORKER_MODIFY 3     /* Modify */
#define WORKER_READ 4       /* Read */
#define WORKER_TRUNCATE 5   /* Truncate */
#define WORKER_UPDATE 6     /* Update */
    uint8_t ops[100];       /* Operation schedule */
} WORKLOAD;

/* Steering items for the truncate workload */
typedef struct {
    uint64_t stone_gap;
    uint64_t needed_stones;
    uint64_t expected_total;
    uint64_t total_inserts;
    uint64_t last_total_inserts;
    uint64_t num_stones;
    uint64_t last_key;
    uint64_t catchup_multiplier;
} TRUNCATE_CONFIG;

/* Queue entry for use with the Truncate Logic */
struct __truncate_queue_entry {
    char *key;     /* Truncation point */
    uint64_t diff; /* Number of items to be truncated*/
    TAILQ_ENTRY(__truncate_queue_entry) q;
};

/* Steering for the throttle configuration */
typedef struct {
    struct timespec last_increment; /* Time that we last added more ops */
    uint64_t ops_count;             /* The number of ops this increment */
    uint64_t ops_per_increment;     /* Ops to add per increment */
    uint64_t usecs_increment;       /* Time interval of each increment */
} THROTTLE_CONFIG;

#define LOG_PARTIAL_CONFIG ",log=(enabled=false)"
#define READONLY_CONFIG ",readonly=true"
struct __wtperf {         /* Per-database structure */
    char *home;           /* WiredTiger home */
    char *monitor_dir;    /* Monitor output dir */
    char *partial_config; /* Config string for partial logging */
    char *reopen_config;  /* Config string for conn reopen */
    char *log_table_uri;  /* URI for log table */
    char **uris;          /* URIs */

    WT_CONNECTION *conn; /* Database connection */

    FILE *logf; /* Logging handle */

    const char *compress_ext;   /* Compression extension for conn */
    const char *compress_table; /* Compression arg to table create */

    const char *tiered_ext;   /* Tiered extension for conn */
    const char *tiered_table; /* Tiered arg to table create */

    WTPERF_THREAD *backupthreads; /* Backup threads */
    WTPERF_THREAD *ckptthreads;   /* Checkpoint threads */
    WTPERF_THREAD *flushthreads;  /* Flush_tier threads */
    WTPERF_THREAD *popthreads;    /* Populate threads */
    WTPERF_THREAD *scanthreads;   /* Scan threads */

#define WORKLOAD_MAX 50
    WTPERF_THREAD *workers; /* Worker threads */
    u_int workers_cnt;

    WORKLOAD *workload; /* Workloads */
    u_int workload_cnt;

    /* State tracking variables. */
    uint64_t backup_ops;   /* backup operations */
    uint64_t ckpt_ops;     /* checkpoint operations */
    uint64_t flush_ops;    /* flush operations */
    uint64_t scan_ops;     /* scan operations */
    uint64_t insert_ops;   /* insert operations */
    uint64_t modify_ops;   /* modify operations */
    uint64_t read_ops;     /* read operations */
    uint64_t truncate_ops; /* truncate operations */
    uint64_t update_ops;   /* update operations */

    uint64_t insert_key;         /* insert key */
    uint64_t log_like_table_key; /* used to allocate IDs for log table */

    volatile bool backup;    /* backup in progress */
    volatile bool ckpt;      /* checkpoint in progress */
    volatile bool flush;     /* flush_tier in progress */
    volatile bool scan;      /* scan in progress */
    volatile bool error;     /* thread error */
    volatile bool ckpt_stop; /* notify checkpoint thread to stop */
    volatile bool stop;      /* notify threads to stop */
    volatile bool in_warmup; /* running warmup phase */

    volatile bool idle_cycle_run; /* Signal for idle cycle thread */

    volatile uint32_t totalsec; /* total seconds running */

#define CFG_GROW 0x0001     /* There is a grow workload */
#define CFG_SHRINK 0x0002   /* There is a shrink workload */
#define CFG_TRUNCATE 0x0004 /* There is a truncate workload */
    uint32_t flags;         /* flags */

    /* Queue head for use with the Truncate Logic */
    TAILQ_HEAD(__truncate_qh, __truncate_queue_entry) stone_head;

    CONFIG_OPTS *opts; /* Global configuration */
};

#define ELEMENTS(a) (sizeof(a) / sizeof(a[0]))

#define READ_RANGE_OPS 10
#define THROTTLE_OPS 100

#define THOUSAND (1000ULL)
#define MILLION (1000000ULL)
#define BILLION (1000000000ULL)

#define NSEC_PER_SEC BILLION
#define USEC_PER_SEC MILLION
#define MSEC_PER_SEC THOUSAND

#define ns_to_ms(v) ((v) / MILLION)
#define ns_to_sec(v) ((v) / BILLION)
#define ns_to_us(v) ((v) / THOUSAND)

#define us_to_ms(v) ((v) / THOUSAND)
#define us_to_ns(v) ((v)*THOUSAND)
#define us_to_sec(v) ((v) / MILLION)

#define ms_to_ns(v) ((v)*MILLION)
#define ms_to_us(v) ((v)*THOUSAND)
#define ms_to_sec(v) ((v) / THOUSAND)

#define sec_to_ns(v) ((v)*BILLION)
#define sec_to_us(v) ((v)*MILLION)
#define sec_to_ms(v) ((v)*THOUSAND)

typedef struct {
    /*
     * Threads maintain the total thread operation and total latency they've experienced; the
     * monitor thread periodically copies these values into the last_XXX fields.
     */
    uint64_t ops;         /* Total operations */
    uint64_t latency_ops; /* Total ops sampled for latency */
    uint64_t latency;     /* Total latency */

    uint64_t last_latency_ops; /* Last read by monitor thread */
    uint64_t last_latency;

    /*
     * Minimum/maximum latency, shared with the monitor thread, that is, the monitor thread clears
     * it so it's recalculated again for each period.
     */
    uint32_t min_latency; /* Minimum latency (uS) */
    uint32_t max_latency; /* Maximum latency (uS) */

    /*
     * Latency buckets.
     */
    uint32_t us[1000]; /* < 1us ... 1000us */
    uint32_t ms[1000]; /* < 1ms ... 1000ms */
    uint32_t sec[100]; /* < 1s 2s ... 100s */
} TRACK;

struct __wtperf_thread {    /* Per-thread structure */
    WTPERF *wtperf;         /* Enclosing configuration */
    WT_CURSOR *rand_cursor; /* Random key cursor */

    WT_RAND_STATE rnd; /* Random number generation state */

    wt_thread_t handle; /* Handle */

    char *key_buf, *value_buf; /* Key/value memory */

    WORKLOAD *workload; /* Workload */

    THROTTLE_CONFIG throttle_cfg; /* Throttle configuration */

    TRUNCATE_CONFIG trunc_cfg; /* Truncate configuration */

    TRACK backup;         /* Backup operations */
    TRACK ckpt;           /* Checkpoint operations */
    TRACK flush;          /* Flush_tier operations */
    TRACK insert;         /* Insert operations */
    TRACK modify;         /* Modify operations */
    TRACK read;           /* Read operations */
    TRACK scan;           /* Scan operations */
    TRACK truncate;       /* Truncate operations */
    TRACK truncate_sleep; /* Truncate sleep operations */
    TRACK update;         /* Update operations */
};

void backup_read(WTPERF *, const char *);
void cleanup_truncate_config(WTPERF *);
int config_opt_file(WTPERF *, const char *);
void config_opt_cleanup(CONFIG_OPTS *);
void config_opt_init(CONFIG_OPTS **);
void config_opt_log(CONFIG_OPTS *, const char *);
int config_opt_name_value(WTPERF *, const char *, const char *);
void config_opt_print(WTPERF *);
int config_opt_str(WTPERF *, const char *);
void config_opt_usage(void);
int config_sanity(WTPERF *);
void latency_insert(WTPERF *, uint32_t *, uint32_t *, uint32_t *);
void latency_modify(WTPERF *, uint32_t *, uint32_t *, uint32_t *);
void latency_print(WTPERF *);
void latency_read(WTPERF *, uint32_t *, uint32_t *, uint32_t *);
void latency_update(WTPERF *, uint32_t *, uint32_t *, uint32_t *);
int run_truncate(WTPERF *, WTPERF_THREAD *, WT_CURSOR *, WT_SESSION *, int *);
int setup_log_file(WTPERF *);
void setup_throttle(WTPERF_THREAD *);
void setup_truncate(WTPERF *, WTPERF_THREAD *, WT_SESSION *);
void start_idle_table_cycle(WTPERF *, wt_thread_t *);
void stop_idle_table_cycle(WTPERF *, wt_thread_t);
void worker_throttle(WTPERF_THREAD *);
uint64_t sum_backup_ops(WTPERF *);
uint64_t sum_ckpt_ops(WTPERF *);
uint64_t sum_flush_ops(WTPERF *);
uint64_t sum_scan_ops(WTPERF *);
uint64_t sum_insert_ops(WTPERF *);
uint64_t sum_modify_ops(WTPERF *);
uint64_t sum_pop_ops(WTPERF *);
uint64_t sum_read_ops(WTPERF *);
uint64_t sum_truncate_ops(WTPERF *);
uint64_t sum_update_ops(WTPERF *);

void lprintf(const WTPERF *, int err, uint32_t, const char *, ...)
#if defined(__GNUC__)
  __attribute__((format(printf, 4, 5)))
#endif
  ;

static inline void
generate_key(CONFIG_OPTS *opts, char *key_buf, uint64_t keyno)
{
    u64_to_string_zf(keyno, key_buf, opts->key_sz);
}

static inline void
extract_key(char *key_buf, uint64_t *keynop)
{
    (void)sscanf(key_buf, "%" SCNu64, keynop);
}

/*
 * die --
 *     Print message and exit on failure.
 */
static inline void die(int, const char *) WT_GCC_FUNC_DECL_ATTRIBUTE((noreturn));
static inline void
die(int e, const char *str)
{
    fprintf(stderr, "Call to %s failed: %s", str, wiredtiger_strerror(e));
    exit(EXIT_FAILURE);
}
#endif