summaryrefslogtreecommitdiff
path: root/src/third_party/wiredtiger/src/conn/conn_capacity.c
blob: 2213f696e4f8762a3c9b07fcc7837505daba15b6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
/*
 * Copyright (c) 2014-2020 MongoDB, Inc.
 * Copyright (c) 2008-2014 WiredTiger, Inc.
 *	All rights reserved.
 *
 * See the file LICENSE for redistribution information.
 */

#include "wt_internal.h"

/*
 * Compute the time in nanoseconds that must be reserved to represent a number of bytes in a
 * subsystem with a particular capacity per second.
 */
#define WT_RESERVATION_NS(bytes, capacity) (((bytes)*WT_BILLION) / (capacity))

/*
 * The fraction of a second's worth of capacity that will be stolen at a time. The number of bytes
 * this represents may be different for different subsystems, since each subsystem has its own
 * capacity per second.
 */
#define WT_STEAL_FRACTION(x) ((x) / 16)

/*
 * __capacity_config --
 *     Set I/O capacity configuration.
 */
static int
__capacity_config(WT_SESSION_IMPL *session, const char *cfg[])
{
    WT_CAPACITY *cap;
    WT_CONFIG_ITEM cval;
    WT_CONNECTION_IMPL *conn;
    uint64_t total;

    conn = S2C(session);

    WT_RET(__wt_config_gets(session, cfg, "io_capacity.total", &cval));
    if (cval.val != 0 && cval.val < WT_THROTTLE_MIN)
        WT_RET_MSG(session, EINVAL, "total I/O capacity value %" PRId64 " below minimum %d",
          cval.val, WT_THROTTLE_MIN);

    cap = &conn->capacity;
    cap->total = total = (uint64_t)cval.val;
    if (cval.val != 0) {
        /*
         * We've been given a total capacity, set the capacity of all the subsystems.
         */
        cap->ckpt = WT_CAPACITY_SYS(total, WT_CAP_CKPT);
        cap->evict = WT_CAPACITY_SYS(total, WT_CAP_EVICT);
        cap->log = WT_CAPACITY_SYS(total, WT_CAP_LOG);
        cap->read = WT_CAPACITY_SYS(total, WT_CAP_READ);

        /*
         * Set the threshold to the percent of our capacity to periodically asynchronously flush
         * what we've written.
         */
        cap->threshold = ((cap->ckpt + cap->evict + cap->log) / 100) * WT_CAPACITY_PCT;
        if (cap->threshold < WT_CAPACITY_MIN_THRESHOLD)
            cap->threshold = WT_CAPACITY_MIN_THRESHOLD;
        WT_STAT_CONN_SET(session, capacity_threshold, cap->threshold);
    } else
        WT_STAT_CONN_SET(session, capacity_threshold, 0);

    return (0);
}

/*
 * __capacity_server_run_chk --
 *     Check to decide if the capacity server should continue running.
 */
static bool
__capacity_server_run_chk(WT_SESSION_IMPL *session)
{
    return (F_ISSET(S2C(session), WT_CONN_SERVER_CAPACITY));
}

/*
 * __capacity_server --
 *     The capacity server thread.
 */
static WT_THREAD_RET
__capacity_server(void *arg)
{
    WT_CAPACITY *cap;
    WT_CONNECTION_IMPL *conn;
    WT_DECL_RET;
    WT_SESSION_IMPL *session;
    uint64_t start, stop, time_ms;

    session = arg;
    conn = S2C(session);
    cap = &conn->capacity;
    for (;;) {
        /*
         * Wait until signalled but check once per second in case the signal was missed.
         */
        __wt_cond_wait(session, conn->capacity_cond, WT_MILLION, __capacity_server_run_chk);

        /* Check if we're quitting or being reconfigured. */
        if (!__capacity_server_run_chk(session))
            break;

        cap->signalled = false;
        if (cap->written < cap->threshold)
            continue;

        start = __wt_clock(session);
        WT_ERR(__wt_fsync_background(session));
        stop = __wt_clock(session);
        time_ms = WT_CLOCKDIFF_MS(stop, start);
        WT_STAT_CONN_SET(session, fsync_all_time, time_ms);
        cap->written = 0;
    }

    if (0) {
err:
        WT_IGNORE_RET(__wt_panic(session, ret, "capacity server error"));
    }
    return (WT_THREAD_RET_VALUE);
}

/*
 * __capacity_server_start --
 *     Start the capacity server thread.
 */
static int
__capacity_server_start(WT_CONNECTION_IMPL *conn)
{
    WT_SESSION_IMPL *session;

    F_SET(conn, WT_CONN_SERVER_CAPACITY);

    /*
     * The capacity server gets its own session.
     */
    WT_RET(__wt_open_internal_session(conn, "capacity-server", false, 0, &conn->capacity_session));
    session = conn->capacity_session;

    WT_RET(__wt_cond_alloc(session, "capacity server", &conn->capacity_cond));

    /*
     * Start the thread.
     */
    WT_RET(__wt_thread_create(session, &conn->capacity_tid, __capacity_server, session));
    conn->capacity_tid_set = true;

    return (0);
}

/*
 * __wt_capacity_server_create --
 *     Configure and start the capacity server.
 */
int
__wt_capacity_server_create(WT_SESSION_IMPL *session, const char *cfg[])
{
    WT_CONNECTION_IMPL *conn;

    conn = S2C(session);

    /*
     * Stop any server that is already running. This means that each time reconfigure is called
     * we'll bounce the server even if there are no configuration changes. This makes our life
     * easier as the underlying configuration routine doesn't have to worry about freeing objects in
     * the connection structure (it's guaranteed to always start with a blank slate), and we don't
     * have to worry about races where a running server is reading configuration information that
     * we're updating, and it's not expected that reconfiguration will happen a lot.
     */
    if (conn->capacity_session != NULL)
        WT_RET(__wt_capacity_server_destroy(session));
    WT_RET(__capacity_config(session, cfg));

    /*
     * If it is a read only connection or if background fsync is not supported, then there is
     * nothing to do.
     */
    if (F_ISSET(conn, WT_CONN_IN_MEMORY | WT_CONN_READONLY) || !__wt_fsync_background_chk(session))
        return (0);

    if (conn->capacity.total != 0)
        WT_RET(__capacity_server_start(conn));

    return (0);
}

/*
 * __wt_capacity_server_destroy --
 *     Destroy the capacity server thread.
 */
int
__wt_capacity_server_destroy(WT_SESSION_IMPL *session)
{
    WT_CONNECTION_IMPL *conn;
    WT_DECL_RET;

    conn = S2C(session);

    F_CLR(conn, WT_CONN_SERVER_CAPACITY);
    if (conn->capacity_tid_set) {
        __wt_cond_signal(session, conn->capacity_cond);
        WT_TRET(__wt_thread_join(session, &conn->capacity_tid));
        conn->capacity_tid_set = false;
    }
    __wt_cond_destroy(session, &conn->capacity_cond);

    /* Close the server thread's session. */
    if (conn->capacity_session != NULL)
        WT_TRET(__wt_session_close_internal(conn->capacity_session));

    /*
     * Ensure capacity settings are cleared - so that reconfigure doesn't get confused.
     */
    conn->capacity_session = NULL;
    conn->capacity_tid_set = false;
    conn->capacity_cond = NULL;

    return (ret);
}

/*
 * __capacity_signal --
 *     Signal the capacity thread if sufficient data has been written.
 */
static void
__capacity_signal(WT_SESSION_IMPL *session)
{
    WT_CAPACITY *cap;
    WT_CONNECTION_IMPL *conn;

    conn = S2C(session);
    cap = &conn->capacity;
    if (cap->written >= cap->threshold && !cap->signalled) {
        __wt_cond_signal(session, conn->capacity_cond);
        cap->signalled = true;
    }
}

/*
 * __capacity_reserve --
 *     Make a reservation for the given number of bytes against the capacity of the subsystem.
 */
static void
__capacity_reserve(
  uint64_t *reservation, uint64_t bytes, uint64_t capacity, uint64_t now_ns, uint64_t *result)
{
    uint64_t res_len, res_value;

    if (capacity != 0) {
        res_len = WT_RESERVATION_NS(bytes, capacity);
        res_value = __wt_atomic_add64(reservation, res_len);
        if (now_ns > res_value && now_ns - res_value > WT_BILLION)
            /*
             * If the reservation clock is out of date, bring it to within a second of a current
             * time.
             */
            *reservation = (now_ns - WT_BILLION) + res_len;
    } else
        res_value = now_ns;

    *result = res_value;
}

/*
 * __wt_capacity_throttle --
 *     Reserve a time to perform a write operation for the subsystem, and wait until that time. The
 *     concept is that each write to a subsystem reserves a time slot to do its write, and
 *     atomically adjusts the reservation marker to point past the reserved slot. The size of the
 *     adjustment (i.e. the length of time represented by the slot in nanoseconds) is chosen to be
 *     proportional to the number of bytes to be written, and the proportion is a simple calculation
 *     so that we can fit reservations for exactly the configured capacity in a second. Reservation
 *     times are in nanoseconds since the epoch.
 */
void
__wt_capacity_throttle(WT_SESSION_IMPL *session, uint64_t bytes, WT_THROTTLE_TYPE type)
{
    struct timespec now;
    WT_CAPACITY *cap;
    WT_CONNECTION_IMPL *conn;
    uint64_t best_res, capacity, new_res, now_ns, sleep_us, res_total_value;
    uint64_t res_value, steal_capacity, stolen_bytes, this_res;
    uint64_t *reservation, *steal;
    uint64_t total_capacity;

    conn = S2C(session);
    cap = &conn->capacity;
    /* NOLINTNEXTLINE(clang-analyzer-deadcode.DeadStores) */
    capacity = steal_capacity = 0;
    reservation = steal = NULL;
    switch (type) {
    case WT_THROTTLE_CKPT:
        capacity = cap->ckpt;
        reservation = &cap->reservation_ckpt;
        WT_STAT_CONN_INCRV(session, capacity_bytes_ckpt, bytes);
        WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);
        break;
    case WT_THROTTLE_EVICT:
        capacity = cap->evict;
        reservation = &cap->reservation_evict;
        WT_STAT_CONN_INCRV(session, capacity_bytes_evict, bytes);
        WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);
        break;
    case WT_THROTTLE_LOG:
        capacity = cap->log;
        reservation = &cap->reservation_log;
        WT_STAT_CONN_INCRV(session, capacity_bytes_log, bytes);
        WT_STAT_CONN_INCRV(session, capacity_bytes_written, bytes);
        break;
    case WT_THROTTLE_READ:
        capacity = cap->read;
        reservation = &cap->reservation_read;
        WT_STAT_CONN_INCRV(session, capacity_bytes_read, bytes);
        break;
    }
    total_capacity = cap->total;

    /*
     * Right now no subsystem can be individually turned off, but it is certainly a possibility to
     * consider one subsystem may be turned off at some point in the future. If this subsystem is
     * not throttled there's nothing to do.
     */
    if (cap->total == 0 || capacity == 0 || F_ISSET(conn, WT_CONN_RECOVERING))
        return;

    /*
     * There may in fact be some reads done under the umbrella of log I/O, but they are mostly done
     * under recovery. And if we are recovering, we don't reach this code.
     */
    if (type != WT_THROTTLE_READ) {
        (void)__wt_atomic_addv64(&cap->written, bytes);
        __capacity_signal(session);
    }

    /* If we get sizes larger than this, later calculations may overflow. */
    WT_ASSERT(session, bytes < 16 * (uint64_t)WT_GIGABYTE);
    WT_ASSERT(session, capacity != 0);

    /* Get the current time in nanoseconds since the epoch. */
    __wt_epoch(session, &now);
    now_ns = (uint64_t)now.tv_sec * WT_BILLION + (uint64_t)now.tv_nsec;

again:
    /* Take a reservation for the subsystem, and for the total */
    __capacity_reserve(reservation, bytes, capacity, now_ns, &res_value);
    __capacity_reserve(&cap->reservation_total, bytes, total_capacity, now_ns, &res_total_value);

    /*
     * If we ended up with a future reservation, and we aren't constricted by the total capacity,
     * then we may be able to reallocate some unused reservation time from another subsystem.
     */
    if (res_value > now_ns && res_total_value < now_ns && steal == NULL && total_capacity != 0) {
        best_res = now_ns - WT_BILLION / 2;
        if (type != WT_THROTTLE_CKPT && (this_res = cap->reservation_ckpt) < best_res) {
            steal = &cap->reservation_ckpt;
            steal_capacity = cap->ckpt;
            best_res = this_res;
        }
        if (type != WT_THROTTLE_EVICT && (this_res = cap->reservation_evict) < best_res) {
            steal = &cap->reservation_evict;
            steal_capacity = cap->evict;
            best_res = this_res;
        }
        if (type != WT_THROTTLE_LOG && (this_res = cap->reservation_log) < best_res) {
            steal = &cap->reservation_log;
            steal_capacity = cap->log;
            best_res = this_res;
        }
        if (type != WT_THROTTLE_READ && (this_res = cap->reservation_read) < best_res) {
            steal = &cap->reservation_read;
            steal_capacity = cap->read;
            best_res = this_res;
        }

        if (steal != NULL) {
            /*
             * We have a subsystem that has enough spare capacity to steal. We'll take a small slice
             * (a fraction of a second worth) and add it to our own subsystem.
             */
            if (best_res < now_ns - WT_BILLION && now_ns > WT_BILLION)
                new_res = now_ns - WT_BILLION;
            else
                new_res = best_res;
            WT_ASSERT(session, steal_capacity != 0);
            new_res += WT_STEAL_FRACTION(WT_BILLION) + WT_RESERVATION_NS(bytes, steal_capacity);
            if (!__wt_atomic_casv64(steal, best_res, new_res)) {
                /*
                 * Give up our reservations and try again. We won't try to steal the next time.
                 */
                (void)__wt_atomic_sub64(reservation, WT_RESERVATION_NS(bytes, capacity));
                (void)__wt_atomic_sub64(
                  &cap->reservation_total, WT_RESERVATION_NS(bytes, total_capacity));
                goto again;
            }

            /*
             * We've stolen a fraction of a second of capacity. Figure out how many bytes that is,
             * before adding that many bytes to the acquiring subsystem's capacity.
             */
            stolen_bytes = WT_STEAL_FRACTION(steal_capacity);
            res_value = __wt_atomic_sub64(reservation, WT_RESERVATION_NS(stolen_bytes, capacity));
        }
    }
    if (res_value < res_total_value)
        res_value = res_total_value;

    if (res_value > now_ns) {
        sleep_us = (res_value - now_ns) / WT_THOUSAND;
        if (res_value == res_total_value)
            WT_STAT_CONN_INCRV(session, capacity_time_total, sleep_us);
        else
            switch (type) {
            case WT_THROTTLE_CKPT:
                WT_STAT_CONN_INCRV(session, capacity_time_ckpt, sleep_us);
                break;
            case WT_THROTTLE_EVICT:
                WT_STAT_CONN_INCRV(session, capacity_time_evict, sleep_us);
                break;
            case WT_THROTTLE_LOG:
                WT_STAT_CONN_INCRV(session, capacity_time_log, sleep_us);
                break;
            case WT_THROTTLE_READ:
                WT_STAT_CONN_INCRV(session, capacity_time_read, sleep_us);
                break;
            }
        if (sleep_us > WT_CAPACITY_SLEEP_CUTOFF_US)
            /* Sleep handles large usec values. */
            __wt_sleep(0, sleep_us);
    }
}