diff options
Diffstat (limited to 'src/third_party/wiredtiger/src/conn/conn_capacity.c')
-rw-r--r-- | src/third_party/wiredtiger/src/conn/conn_capacity.c | 474 |
1 files changed, 474 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/src/conn/conn_capacity.c b/src/third_party/wiredtiger/src/conn/conn_capacity.c new file mode 100644 index 00000000000..0dd6a8c3c6d --- /dev/null +++ b/src/third_party/wiredtiger/src/conn/conn_capacity.c @@ -0,0 +1,474 @@ +/* + * Copyright (c) 2014-2019 MongoDB, Inc. + * Copyright (c) 2008-2014 WiredTiger, Inc. + * All rights reserved. + * + * See the file LICENSE for redistribution information. + */ + +#include "wt_internal.h" + +/* + * Compute the time in nanoseconds that must be reserved to represent + * a number of bytes in a subsystem with a particular capacity per second. + */ +#define WT_RESERVATION_NS(bytes, capacity) \ + (((bytes) * WT_BILLION) / (capacity)) + +/* + * The fraction of a second's worth of capacity that will be stolen at a + * time. The number of bytes this represents may be different for different + * subsystems, since each subsystem has its own capacity per second. + */ +#define WT_STEAL_FRACTION(x) ((x) / 16) + +/* + * __capacity_config -- + * Set I/O capacity configuration. + */ +static int +__capacity_config(WT_SESSION_IMPL *session, const char *cfg[]) +{ + WT_CAPACITY *cap; + WT_CONFIG_ITEM cval; + WT_CONNECTION_IMPL *conn; + uint64_t total; + + conn = S2C(session); + + WT_RET(__wt_config_gets(session, cfg, "io_capacity.total", &cval)); + if (cval.val != 0 && cval.val < WT_THROTTLE_MIN) + WT_RET_MSG(session, EINVAL, + "total I/O capacity value %" PRId64 " below minimum %d", + cval.val, WT_THROTTLE_MIN); + + cap = &conn->capacity; + cap->total = total = (uint64_t)cval.val; + if (cval.val != 0) { + /* + * We've been given a total capacity, set the + * capacity of all the subsystems. + */ + cap->ckpt = WT_CAPACITY_SYS(total, WT_CAP_CKPT); + cap->evict = WT_CAPACITY_SYS(total, WT_CAP_EVICT); + cap->log = WT_CAPACITY_SYS(total, WT_CAP_LOG); + cap->read = WT_CAPACITY_SYS(total, WT_CAP_READ); + + /* + * Set the threshold to the percent of our capacity to + * periodically asynchronously flush what we've written. + */ + cap->threshold = ((cap->ckpt + cap->evict + cap->log) / + 100) * WT_CAPACITY_PCT; + if (cap->threshold < WT_CAPACITY_MIN_THRESHOLD) + cap->threshold = WT_CAPACITY_MIN_THRESHOLD; + WT_STAT_CONN_SET(session, capacity_threshold, cap->threshold); + } else + WT_STAT_CONN_SET(session, capacity_threshold, 0); + + return (0); +} + +/* + * __capacity_server_run_chk -- + * Check to decide if the capacity server should continue running. + */ +static bool +__capacity_server_run_chk(WT_SESSION_IMPL *session) +{ + return (F_ISSET(S2C(session), WT_CONN_SERVER_CAPACITY)); +} + +/* + * __capacity_server -- + * The capacity server thread. + */ +static WT_THREAD_RET +__capacity_server(void *arg) +{ + WT_CAPACITY *cap; + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_SESSION_IMPL *session; + uint64_t start, stop, time_ms; + + session = arg; + conn = S2C(session); + cap = &conn->capacity; + for (;;) { + /* + * Wait until signalled but check once per second in case + * the signal was missed. + */ + __wt_cond_wait(session, + conn->capacity_cond, WT_MILLION, __capacity_server_run_chk); + + /* Check if we're quitting or being reconfigured. */ + if (!__capacity_server_run_chk(session)) + break; + + cap->signalled = false; + if (cap->written < cap->threshold) + continue; + + start = __wt_clock(session); + WT_ERR(__wt_fsync_background(session)); + stop = __wt_clock(session); + time_ms = WT_CLOCKDIFF_MS(stop, start); + WT_STAT_CONN_SET(session, fsync_all_time, time_ms); + cap->written = 0; + } + + if (0) { +err: WT_PANIC_MSG(session, ret, "capacity server error"); + } + return (WT_THREAD_RET_VALUE); +} + +/* + * __capacity_server_start -- + * Start the capacity server thread. + */ +static int +__capacity_server_start(WT_CONNECTION_IMPL *conn) +{ + WT_SESSION_IMPL *session; + + F_SET(conn, WT_CONN_SERVER_CAPACITY); + + /* + * The capacity server gets its own session. + */ + WT_RET(__wt_open_internal_session(conn, + "capacity-server", false, 0, &conn->capacity_session)); + session = conn->capacity_session; + + WT_RET(__wt_cond_alloc(session, + "capacity server", &conn->capacity_cond)); + + /* + * Start the thread. + */ + WT_RET(__wt_thread_create( + session, &conn->capacity_tid, __capacity_server, session)); + conn->capacity_tid_set = true; + + return (0); +} + +/* + * __wt_capacity_server_create -- + * Configure and start the capacity server. + */ +int +__wt_capacity_server_create(WT_SESSION_IMPL *session, const char *cfg[]) +{ + WT_CONNECTION_IMPL *conn; + + conn = S2C(session); + + /* + * Stop any server that is already running. This means that each time + * reconfigure is called we'll bounce the server even if there are no + * configuration changes. This makes our life easier as the underlying + * configuration routine doesn't have to worry about freeing objects + * in the connection structure (it's guaranteed to always start with a + * blank slate), and we don't have to worry about races where a running + * server is reading configuration information that we're updating, and + * it's not expected that reconfiguration will happen a lot. + */ + if (conn->capacity_session != NULL) + WT_RET(__wt_capacity_server_destroy(session)); + WT_RET(__capacity_config(session, cfg)); + + /* + * If it is a read only connection or if background fsync is not + * supported, then there is nothing to do. + */ + if (F_ISSET(conn, WT_CONN_IN_MEMORY | WT_CONN_READONLY) || + !__wt_fsync_background_chk(session)) + return (0); + + if (conn->capacity.total != 0) + WT_RET(__capacity_server_start(conn)); + + return (0); +} + +/* + * __wt_capacity_server_destroy -- + * Destroy the capacity server thread. + */ +int +__wt_capacity_server_destroy(WT_SESSION_IMPL *session) +{ + WT_CONNECTION_IMPL *conn; + WT_DECL_RET; + WT_SESSION *wt_session; + + conn = S2C(session); + + F_CLR(conn, WT_CONN_SERVER_CAPACITY); + if (conn->capacity_tid_set) { + __wt_cond_signal(session, conn->capacity_cond); + WT_TRET(__wt_thread_join(session, &conn->capacity_tid)); + conn->capacity_tid_set = false; + } + __wt_cond_destroy(session, &conn->capacity_cond); + + /* Close the server thread's session. */ + if (conn->capacity_session != NULL) { + wt_session = &conn->capacity_session->iface; + WT_TRET(wt_session->close(wt_session, NULL)); + } + + /* + * Ensure capacity settings are cleared - so that reconfigure doesn't + * get confused. + */ + conn->capacity_session = NULL; + conn->capacity_tid_set = false; + conn->capacity_cond = NULL; + + return (ret); +} + +/* + * __capacity_signal -- + * Signal the capacity thread if sufficient data has been written. + */ +static void +__capacity_signal(WT_SESSION_IMPL *session) +{ + WT_CAPACITY *cap; + WT_CONNECTION_IMPL *conn; + + conn = S2C(session); + cap = &conn->capacity; + if (cap->written >= cap->threshold && !cap->signalled) { + __wt_cond_signal(session, conn->capacity_cond); + cap->signalled = true; + } +} + +/* + * __capacity_reserve -- + * Make a reservation for the given number of bytes against + * the capacity of the subsystem. + */ +static void +__capacity_reserve(uint64_t *reservation, uint64_t bytes, uint64_t capacity, + uint64_t now_ns, uint64_t *result) +{ + uint64_t res_len, res_value; + + if (capacity != 0) { + res_len = WT_RESERVATION_NS(bytes, capacity); + res_value = __wt_atomic_add64(reservation, res_len); + if (now_ns > res_value && now_ns - res_value > WT_BILLION) + /* + * If the reservation clock is out of date, bring it + * to within a second of a current time. + */ + (void)__wt_atomic_store64(reservation, + (now_ns - WT_BILLION) + res_len); + } else + res_value = now_ns; + + *result = res_value; +} + +/* + * __wt_capacity_throttle -- + * Reserve a time to perform a write operation for the subsystem, + * and wait until that time. + * + * The concept is that each write to a subsystem reserves a time slot + * to do its write, and atomically adjusts the reservation marker to + * point past the reserved slot. The size of the adjustment (i.e. the + * length of time represented by the slot in nanoseconds) is chosen to + * be proportional to the number of bytes to be written, and the + * proportion is a simple calculation so that we can fit reservations for + * exactly the configured capacity in a second. Reservation times are + * in nanoseconds since the epoch. + */ +void +__wt_capacity_throttle(WT_SESSION_IMPL *session, uint64_t bytes, + WT_THROTTLE_TYPE type) +{ + struct timespec now; + WT_CAPACITY *cap; + WT_CONNECTION_IMPL *conn; + uint64_t best_res, capacity, new_res, now_ns, sleep_us, res_total_value; + uint64_t res_value, steal_capacity, stolen_bytes, this_res; + uint64_t *reservation, *steal; + uint64_t total_capacity; + + conn = S2C(session); + cap = &conn->capacity; + /* If not using capacity there's nothing to do. */ + if (cap->total == 0) + return; + + capacity = steal_capacity = 0; + reservation = steal = NULL; + switch (type) { + case WT_THROTTLE_CKPT: + capacity = cap->ckpt; + reservation = &cap->reservation_ckpt; + WT_STAT_CONN_INCRV(session, capacity_bytes_ckpt, bytes); + break; + case WT_THROTTLE_EVICT: + capacity = cap->evict; + reservation = &cap->reservation_evict; + WT_STAT_CONN_INCRV(session, capacity_bytes_evict, bytes); + break; + case WT_THROTTLE_LOG: + capacity = cap->log; + reservation = &cap->reservation_log; + WT_STAT_CONN_INCRV(session, capacity_bytes_log, bytes); + break; + case WT_THROTTLE_READ: + capacity = cap->read; + reservation = &cap->reservation_read; + WT_STAT_CONN_INCRV(session, capacity_bytes_read, bytes); + break; + } + total_capacity = cap->total; + + /* + * Right now no subsystem can be individually turned off, but it is + * certainly a possibility to consider one subsystem may be turned off + * at some point in the future. If this subsystem is not throttled + * there's nothing to do. + */ + if (capacity == 0 || F_ISSET(conn, WT_CONN_RECOVERING)) + return; + + /* + * There may in fact be some reads done under the umbrella of log + * I/O, but they are mostly done under recovery. And if we are + * recovering, we don't reach this code. + */ + if (type != WT_THROTTLE_READ) { + (void)__wt_atomic_addv64(&cap->written, bytes); + WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes); + __capacity_signal(session); + } + + /* If we get sizes larger than this, later calculations may overflow. */ + WT_ASSERT(session, bytes < 16 * (uint64_t)WT_GIGABYTE); + WT_ASSERT(session, capacity != 0); + + /* Get the current time in nanoseconds since the epoch. */ + __wt_epoch(session, &now); + now_ns = (uint64_t)now.tv_sec * WT_BILLION + (uint64_t)now.tv_nsec; + +again: + /* Take a reservation for the subsystem, and for the total */ + __capacity_reserve(reservation, bytes, capacity, now_ns, &res_value); + __capacity_reserve(&cap->reservation_total, bytes, total_capacity, + now_ns, &res_total_value); + + /* + * If we ended up with a future reservation, and we aren't constricted + * by the total capacity, then we may be able to reallocate some + * unused reservation time from another subsystem. + */ + if (res_value > now_ns && res_total_value < now_ns && steal == NULL && + total_capacity != 0) { + best_res = now_ns - WT_BILLION / 2; + if (type != WT_THROTTLE_CKPT && + (this_res = cap->reservation_ckpt) < best_res) { + steal = &cap->reservation_ckpt; + steal_capacity = cap->ckpt; + best_res = this_res; + } + if (type != WT_THROTTLE_EVICT && + (this_res = cap->reservation_evict) < best_res) { + steal = &cap->reservation_evict; + steal_capacity = cap->evict; + best_res = this_res; + } + if (type != WT_THROTTLE_LOG && + (this_res = cap->reservation_log) < best_res) { + steal = &cap->reservation_log; + steal_capacity = cap->log; + best_res = this_res; + } + if (type != WT_THROTTLE_READ && + (this_res = cap->reservation_read) < best_res) { + steal = &cap->reservation_read; + steal_capacity = cap->read; + best_res = this_res; + } + + if (steal != NULL) { + /* + * We have a subsystem that has enough spare capacity + * to steal. We'll take a small slice (a fraction + * of a second worth) and add it to our own subsystem. + */ + if (best_res < now_ns - WT_BILLION && + now_ns > WT_BILLION) + new_res = now_ns - WT_BILLION; + else + new_res = best_res; + WT_ASSERT(session, steal_capacity != 0); + new_res += WT_STEAL_FRACTION(WT_BILLION) + + WT_RESERVATION_NS(bytes, steal_capacity); + if (!__wt_atomic_casv64(steal, best_res, new_res)) { + /* + * Give up our reservations and try again. + * We won't try to steal the next time. + */ + (void)__wt_atomic_sub64(reservation, + WT_RESERVATION_NS(bytes, capacity)); + (void)__wt_atomic_sub64(&cap->reservation_total, + WT_RESERVATION_NS(bytes, total_capacity)); + goto again; + } + + /* + * We've stolen a fraction of a second of capacity. + * Figure out how many bytes that is, before adding + * that many bytes to the acquiring subsystem's + * capacity. + */ + stolen_bytes = WT_STEAL_FRACTION(steal_capacity); + res_value = __wt_atomic_sub64(reservation, + WT_RESERVATION_NS(stolen_bytes, capacity)); + } + } + if (res_value < res_total_value) + res_value = res_total_value; + + if (res_value > now_ns) { + sleep_us = (res_value - now_ns) / WT_THOUSAND; + if (res_value == res_total_value) + WT_STAT_CONN_INCRV(session, + capacity_time_total, sleep_us); + else + switch (type) { + case WT_THROTTLE_CKPT: + WT_STAT_CONN_INCRV(session, + capacity_time_ckpt, sleep_us); + break; + case WT_THROTTLE_EVICT: + WT_STAT_CONN_INCRV(session, + capacity_time_evict, sleep_us); + break; + case WT_THROTTLE_LOG: + WT_STAT_CONN_INCRV(session, + capacity_time_log, sleep_us); + break; + case WT_THROTTLE_READ: + WT_STAT_CONN_INCRV(session, + capacity_time_read, sleep_us); + break; + } + if (sleep_us > WT_CAPACITY_SLEEP_CUTOFF_US) + /* Sleep handles large usec values. */ + __wt_sleep(0, sleep_us); + } +} |