summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/conn/conn_capacity.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/third_party/wiredtiger/src/conn/conn_capacity.c')
-rw-r--r--src/third_party/wiredtiger/src/conn/conn_capacity.c474
1 files changed, 474 insertions, 0 deletions
diff --git a/src/third_party/wiredtiger/src/conn/conn_capacity.c b/src/third_party/wiredtiger/src/conn/conn_capacity.c
new file mode 100644
index 00000000000..0dd6a8c3c6d
--- /dev/null
+++ b/src/third_party/wiredtiger/src/conn/conn_capacity.c
@@ -0,0 +1,474 @@
+/*
+ * Copyright (c) 2014-2019 MongoDB, Inc.
+ * Copyright (c) 2008-2014 WiredTiger, Inc.
+ * All rights reserved.
+ *
+ * See the file LICENSE for redistribution information.
+ */
+
+#include "wt_internal.h"
+
+/*
+ * Compute the time in nanoseconds that must be reserved to represent
+ * a number of bytes in a subsystem with a particular capacity per second.
+ */
+#define WT_RESERVATION_NS(bytes, capacity) \
+ (((bytes) * WT_BILLION) / (capacity))
+
+/*
+ * The fraction of a second's worth of capacity that will be stolen at a
+ * time. The number of bytes this represents may be different for different
+ * subsystems, since each subsystem has its own capacity per second.
+ */
+#define WT_STEAL_FRACTION(x) ((x) / 16)
+
+/*
+ * __capacity_config --
+ * Set I/O capacity configuration.
+ */
+static int
+__capacity_config(WT_SESSION_IMPL *session, const char *cfg[])
+{
+ WT_CAPACITY *cap;
+ WT_CONFIG_ITEM cval;
+ WT_CONNECTION_IMPL *conn;
+ uint64_t total;
+
+ conn = S2C(session);
+
+ WT_RET(__wt_config_gets(session, cfg, "io_capacity.total", &cval));
+ if (cval.val != 0 && cval.val < WT_THROTTLE_MIN)
+ WT_RET_MSG(session, EINVAL,
+ "total I/O capacity value %" PRId64 " below minimum %d",
+ cval.val, WT_THROTTLE_MIN);
+
+ cap = &conn->capacity;
+ cap->total = total = (uint64_t)cval.val;
+ if (cval.val != 0) {
+ /*
+ * We've been given a total capacity, set the
+ * capacity of all the subsystems.
+ */
+ cap->ckpt = WT_CAPACITY_SYS(total, WT_CAP_CKPT);
+ cap->evict = WT_CAPACITY_SYS(total, WT_CAP_EVICT);
+ cap->log = WT_CAPACITY_SYS(total, WT_CAP_LOG);
+ cap->read = WT_CAPACITY_SYS(total, WT_CAP_READ);
+
+ /*
+ * Set the threshold to the percent of our capacity to
+ * periodically asynchronously flush what we've written.
+ */
+ cap->threshold = ((cap->ckpt + cap->evict + cap->log) /
+ 100) * WT_CAPACITY_PCT;
+ if (cap->threshold < WT_CAPACITY_MIN_THRESHOLD)
+ cap->threshold = WT_CAPACITY_MIN_THRESHOLD;
+ WT_STAT_CONN_SET(session, capacity_threshold, cap->threshold);
+ } else
+ WT_STAT_CONN_SET(session, capacity_threshold, 0);
+
+ return (0);
+}
+
+/*
+ * __capacity_server_run_chk --
+ * Check to decide if the capacity server should continue running.
+ */
+static bool
+__capacity_server_run_chk(WT_SESSION_IMPL *session)
+{
+ return (F_ISSET(S2C(session), WT_CONN_SERVER_CAPACITY));
+}
+
+/*
+ * __capacity_server --
+ * The capacity server thread.
+ */
+static WT_THREAD_RET
+__capacity_server(void *arg)
+{
+ WT_CAPACITY *cap;
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ WT_SESSION_IMPL *session;
+ uint64_t start, stop, time_ms;
+
+ session = arg;
+ conn = S2C(session);
+ cap = &conn->capacity;
+ for (;;) {
+ /*
+ * Wait until signalled but check once per second in case
+ * the signal was missed.
+ */
+ __wt_cond_wait(session,
+ conn->capacity_cond, WT_MILLION, __capacity_server_run_chk);
+
+ /* Check if we're quitting or being reconfigured. */
+ if (!__capacity_server_run_chk(session))
+ break;
+
+ cap->signalled = false;
+ if (cap->written < cap->threshold)
+ continue;
+
+ start = __wt_clock(session);
+ WT_ERR(__wt_fsync_background(session));
+ stop = __wt_clock(session);
+ time_ms = WT_CLOCKDIFF_MS(stop, start);
+ WT_STAT_CONN_SET(session, fsync_all_time, time_ms);
+ cap->written = 0;
+ }
+
+ if (0) {
+err: WT_PANIC_MSG(session, ret, "capacity server error");
+ }
+ return (WT_THREAD_RET_VALUE);
+}
+
+/*
+ * __capacity_server_start --
+ * Start the capacity server thread.
+ */
+static int
+__capacity_server_start(WT_CONNECTION_IMPL *conn)
+{
+ WT_SESSION_IMPL *session;
+
+ F_SET(conn, WT_CONN_SERVER_CAPACITY);
+
+ /*
+ * The capacity server gets its own session.
+ */
+ WT_RET(__wt_open_internal_session(conn,
+ "capacity-server", false, 0, &conn->capacity_session));
+ session = conn->capacity_session;
+
+ WT_RET(__wt_cond_alloc(session,
+ "capacity server", &conn->capacity_cond));
+
+ /*
+ * Start the thread.
+ */
+ WT_RET(__wt_thread_create(
+ session, &conn->capacity_tid, __capacity_server, session));
+ conn->capacity_tid_set = true;
+
+ return (0);
+}
+
+/*
+ * __wt_capacity_server_create --
+ * Configure and start the capacity server.
+ */
+int
+__wt_capacity_server_create(WT_SESSION_IMPL *session, const char *cfg[])
+{
+ WT_CONNECTION_IMPL *conn;
+
+ conn = S2C(session);
+
+ /*
+ * Stop any server that is already running. This means that each time
+ * reconfigure is called we'll bounce the server even if there are no
+ * configuration changes. This makes our life easier as the underlying
+ * configuration routine doesn't have to worry about freeing objects
+ * in the connection structure (it's guaranteed to always start with a
+ * blank slate), and we don't have to worry about races where a running
+ * server is reading configuration information that we're updating, and
+ * it's not expected that reconfiguration will happen a lot.
+ */
+ if (conn->capacity_session != NULL)
+ WT_RET(__wt_capacity_server_destroy(session));
+ WT_RET(__capacity_config(session, cfg));
+
+ /*
+ * If it is a read only connection or if background fsync is not
+ * supported, then there is nothing to do.
+ */
+ if (F_ISSET(conn, WT_CONN_IN_MEMORY | WT_CONN_READONLY) ||
+ !__wt_fsync_background_chk(session))
+ return (0);
+
+ if (conn->capacity.total != 0)
+ WT_RET(__capacity_server_start(conn));
+
+ return (0);
+}
+
+/*
+ * __wt_capacity_server_destroy --
+ * Destroy the capacity server thread.
+ */
+int
+__wt_capacity_server_destroy(WT_SESSION_IMPL *session)
+{
+ WT_CONNECTION_IMPL *conn;
+ WT_DECL_RET;
+ WT_SESSION *wt_session;
+
+ conn = S2C(session);
+
+ F_CLR(conn, WT_CONN_SERVER_CAPACITY);
+ if (conn->capacity_tid_set) {
+ __wt_cond_signal(session, conn->capacity_cond);
+ WT_TRET(__wt_thread_join(session, &conn->capacity_tid));
+ conn->capacity_tid_set = false;
+ }
+ __wt_cond_destroy(session, &conn->capacity_cond);
+
+ /* Close the server thread's session. */
+ if (conn->capacity_session != NULL) {
+ wt_session = &conn->capacity_session->iface;
+ WT_TRET(wt_session->close(wt_session, NULL));
+ }
+
+ /*
+ * Ensure capacity settings are cleared - so that reconfigure doesn't
+ * get confused.
+ */
+ conn->capacity_session = NULL;
+ conn->capacity_tid_set = false;
+ conn->capacity_cond = NULL;
+
+ return (ret);
+}
+
+/*
+ * __capacity_signal --
+ * Signal the capacity thread if sufficient data has been written.
+ */
+static void
+__capacity_signal(WT_SESSION_IMPL *session)
+{
+ WT_CAPACITY *cap;
+ WT_CONNECTION_IMPL *conn;
+
+ conn = S2C(session);
+ cap = &conn->capacity;
+ if (cap->written >= cap->threshold && !cap->signalled) {
+ __wt_cond_signal(session, conn->capacity_cond);
+ cap->signalled = true;
+ }
+}
+
+/*
+ * __capacity_reserve --
+ * Make a reservation for the given number of bytes against
+ * the capacity of the subsystem.
+ */
+static void
+__capacity_reserve(uint64_t *reservation, uint64_t bytes, uint64_t capacity,
+ uint64_t now_ns, uint64_t *result)
+{
+ uint64_t res_len, res_value;
+
+ if (capacity != 0) {
+ res_len = WT_RESERVATION_NS(bytes, capacity);
+ res_value = __wt_atomic_add64(reservation, res_len);
+ if (now_ns > res_value && now_ns - res_value > WT_BILLION)
+ /*
+ * If the reservation clock is out of date, bring it
+ * to within a second of a current time.
+ */
+ (void)__wt_atomic_store64(reservation,
+ (now_ns - WT_BILLION) + res_len);
+ } else
+ res_value = now_ns;
+
+ *result = res_value;
+}
+
+/*
+ * __wt_capacity_throttle --
+ * Reserve a time to perform a write operation for the subsystem,
+ * and wait until that time.
+ *
+ * The concept is that each write to a subsystem reserves a time slot
+ * to do its write, and atomically adjusts the reservation marker to
+ * point past the reserved slot. The size of the adjustment (i.e. the
+ * length of time represented by the slot in nanoseconds) is chosen to
+ * be proportional to the number of bytes to be written, and the
+ * proportion is a simple calculation so that we can fit reservations for
+ * exactly the configured capacity in a second. Reservation times are
+ * in nanoseconds since the epoch.
+ */
+void
+__wt_capacity_throttle(WT_SESSION_IMPL *session, uint64_t bytes,
+ WT_THROTTLE_TYPE type)
+{
+ struct timespec now;
+ WT_CAPACITY *cap;
+ WT_CONNECTION_IMPL *conn;
+ uint64_t best_res, capacity, new_res, now_ns, sleep_us, res_total_value;
+ uint64_t res_value, steal_capacity, stolen_bytes, this_res;
+ uint64_t *reservation, *steal;
+ uint64_t total_capacity;
+
+ conn = S2C(session);
+ cap = &conn->capacity;
+ /* If not using capacity there's nothing to do. */
+ if (cap->total == 0)
+ return;
+
+ capacity = steal_capacity = 0;
+ reservation = steal = NULL;
+ switch (type) {
+ case WT_THROTTLE_CKPT:
+ capacity = cap->ckpt;
+ reservation = &cap->reservation_ckpt;
+ WT_STAT_CONN_INCRV(session, capacity_bytes_ckpt, bytes);
+ break;
+ case WT_THROTTLE_EVICT:
+ capacity = cap->evict;
+ reservation = &cap->reservation_evict;
+ WT_STAT_CONN_INCRV(session, capacity_bytes_evict, bytes);
+ break;
+ case WT_THROTTLE_LOG:
+ capacity = cap->log;
+ reservation = &cap->reservation_log;
+ WT_STAT_CONN_INCRV(session, capacity_bytes_log, bytes);
+ break;
+ case WT_THROTTLE_READ:
+ capacity = cap->read;
+ reservation = &cap->reservation_read;
+ WT_STAT_CONN_INCRV(session, capacity_bytes_read, bytes);
+ break;
+ }
+ total_capacity = cap->total;
+
+ /*
+ * Right now no subsystem can be individually turned off, but it is
+ * certainly a possibility to consider one subsystem may be turned off
+ * at some point in the future. If this subsystem is not throttled
+ * there's nothing to do.
+ */
+ if (capacity == 0 || F_ISSET(conn, WT_CONN_RECOVERING))
+ return;
+
+ /*
+ * There may in fact be some reads done under the umbrella of log
+ * I/O, but they are mostly done under recovery. And if we are
+ * recovering, we don't reach this code.
+ */
+ if (type != WT_THROTTLE_READ) {
+ (void)__wt_atomic_addv64(&cap->written, bytes);
+ WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);
+ __capacity_signal(session);
+ }
+
+ /* If we get sizes larger than this, later calculations may overflow. */
+ WT_ASSERT(session, bytes < 16 * (uint64_t)WT_GIGABYTE);
+ WT_ASSERT(session, capacity != 0);
+
+ /* Get the current time in nanoseconds since the epoch. */
+ __wt_epoch(session, &now);
+ now_ns = (uint64_t)now.tv_sec * WT_BILLION + (uint64_t)now.tv_nsec;
+
+again:
+ /* Take a reservation for the subsystem, and for the total */
+ __capacity_reserve(reservation, bytes, capacity, now_ns, &res_value);
+ __capacity_reserve(&cap->reservation_total, bytes, total_capacity,
+ now_ns, &res_total_value);
+
+ /*
+ * If we ended up with a future reservation, and we aren't constricted
+ * by the total capacity, then we may be able to reallocate some
+ * unused reservation time from another subsystem.
+ */
+ if (res_value > now_ns && res_total_value < now_ns && steal == NULL &&
+ total_capacity != 0) {
+ best_res = now_ns - WT_BILLION / 2;
+ if (type != WT_THROTTLE_CKPT &&
+ (this_res = cap->reservation_ckpt) < best_res) {
+ steal = &cap->reservation_ckpt;
+ steal_capacity = cap->ckpt;
+ best_res = this_res;
+ }
+ if (type != WT_THROTTLE_EVICT &&
+ (this_res = cap->reservation_evict) < best_res) {
+ steal = &cap->reservation_evict;
+ steal_capacity = cap->evict;
+ best_res = this_res;
+ }
+ if (type != WT_THROTTLE_LOG &&
+ (this_res = cap->reservation_log) < best_res) {
+ steal = &cap->reservation_log;
+ steal_capacity = cap->log;
+ best_res = this_res;
+ }
+ if (type != WT_THROTTLE_READ &&
+ (this_res = cap->reservation_read) < best_res) {
+ steal = &cap->reservation_read;
+ steal_capacity = cap->read;
+ best_res = this_res;
+ }
+
+ if (steal != NULL) {
+ /*
+ * We have a subsystem that has enough spare capacity
+ * to steal. We'll take a small slice (a fraction
+ * of a second worth) and add it to our own subsystem.
+ */
+ if (best_res < now_ns - WT_BILLION &&
+ now_ns > WT_BILLION)
+ new_res = now_ns - WT_BILLION;
+ else
+ new_res = best_res;
+ WT_ASSERT(session, steal_capacity != 0);
+ new_res += WT_STEAL_FRACTION(WT_BILLION) +
+ WT_RESERVATION_NS(bytes, steal_capacity);
+ if (!__wt_atomic_casv64(steal, best_res, new_res)) {
+ /*
+ * Give up our reservations and try again.
+ * We won't try to steal the next time.
+ */
+ (void)__wt_atomic_sub64(reservation,
+ WT_RESERVATION_NS(bytes, capacity));
+ (void)__wt_atomic_sub64(&cap->reservation_total,
+ WT_RESERVATION_NS(bytes, total_capacity));
+ goto again;
+ }
+
+ /*
+ * We've stolen a fraction of a second of capacity.
+ * Figure out how many bytes that is, before adding
+ * that many bytes to the acquiring subsystem's
+ * capacity.
+ */
+ stolen_bytes = WT_STEAL_FRACTION(steal_capacity);
+ res_value = __wt_atomic_sub64(reservation,
+ WT_RESERVATION_NS(stolen_bytes, capacity));
+ }
+ }
+ if (res_value < res_total_value)
+ res_value = res_total_value;
+
+ if (res_value > now_ns) {
+ sleep_us = (res_value - now_ns) / WT_THOUSAND;
+ if (res_value == res_total_value)
+ WT_STAT_CONN_INCRV(session,
+ capacity_time_total, sleep_us);
+ else
+ switch (type) {
+ case WT_THROTTLE_CKPT:
+ WT_STAT_CONN_INCRV(session,
+ capacity_time_ckpt, sleep_us);
+ break;
+ case WT_THROTTLE_EVICT:
+ WT_STAT_CONN_INCRV(session,
+ capacity_time_evict, sleep_us);
+ break;
+ case WT_THROTTLE_LOG:
+ WT_STAT_CONN_INCRV(session,
+ capacity_time_log, sleep_us);
+ break;
+ case WT_THROTTLE_READ:
+ WT_STAT_CONN_INCRV(session,
+ capacity_time_read, sleep_us);
+ break;
+ }
+ if (sleep_us > WT_CAPACITY_SLEEP_CUTOFF_US)
+ /* Sleep handles large usec values. */
+ __wt_sleep(0, sleep_us);
+ }
+}