summaryrefslogtreecommitdiff
path: root/scripts/memcached-automove-extstore
blob: 4578d17da1bd92c40ac35c98d381d2763c257c78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
#!/usr/bin/python3
# Copyright 2017 Facebook.
# Licensed under the same terms as memcached itself.

import argparse
import socket
import sys
import re
import traceback
from time import sleep

parser = argparse.ArgumentParser(description="daemon for rebalancing slabs")
parser.add_argument("--host", help="host to connect to",
        default="localhost:11211", metavar="HOST:PORT")
parser.add_argument("-s", "--sleep", help="seconds between runs",
                    type=int, default="1")
parser.add_argument("-v", "--verbose", action="store_true")
parser.add_argument("-a", "--automove", action="store_true", default=False,
                    help="enable automatic page rebalancing")
parser.add_argument("-w", "--window", type=int, default="30",
                    help="rolling window size for decision history")
parser.add_argument("-l", "--poolmin", type=int, default=1000,
                    help="a thing")
parser.add_argument("-p", "--poolhigh", type=int, default=2000,
                    help="another thing")

args = parser.parse_args()

host, port = args.host.split(':')

def window_check(history, sid, key):
    total = 0
    for window in history['w']:
        s = window.get(sid)
        if s and s.get(key):
            total += s.get(key)
    return total

def window_key_check(history, key):
    total = 0
    for window in history['w']:
        v = window.get(key)
        if v:
            total += v
    return total


def determine_move(history, stats, diffs, totals):
    """ Figure out of a page move is in order.

    - we should use as much memory as possible to buffer items, reducing
      the load on flash.
    - tries to keep the global free page pool inbetween poolmin/poolmax.
    - avoids flapping as much as possible:
      - only pull pages off of a class if it hasn't recently evicted or allocated pages.
      - only pull pages off if a sufficient number of chunks are available.
      - if global pool is below minimum, slowly reduce extstore item_age, flushing
        more items to storage.
      - if global pool is above maximum, slowly increase extstore item_age, which
        will slowly free memory.
    - shifts in traffic load patterns should slowly move between increasing or decreasing
      age.
    """
    # rotate windows
    history['w'].append({})
    if (len(history['w']) > args.window):
        history['w'].pop(0)
    w = history['w'][-1]
    oldest = (-1, 0)
    # Most bytes free
    freest = (-1, 0)
    decision = (-1, -1)
    if int(stats['slab_global_page_pool']) < args.poolmin:
        w['slab_pool_low'] = 1
    if int(stats['slab_global_page_pool']) > args.poolhigh:
        w['slab_pool_high'] = 1
    if args.verbose:
        print("global pool: [{}]".format(stats['slab_global_page_pool']))

    pool_low = window_key_check(history, 'slab_pool_low')
    pool_high = window_key_check(history, 'slab_pool_high')
    for sid, slab in diffs.items():
        # Only balance larger slab classes
        if slab['chunk_size'] < 250:
            continue

        w[sid] = {}
        if 'evicted_d' not in slab or 'total_pages_d' not in slab:
            continue
        # mark this window as dirty if total pages increases or evictions
        # happened
        if slab['total_pages_d'] > 0:
            w[sid]['dirty'] = 1
        if slab['evicted_d'] > 0:
            w[sid]['dirty'] = 1
            w[sid]['ev'] = 1
        w[sid]['age'] = slab['age']
        age = window_check(history, sid, 'age') / len(history['w'])

        # if > 3.5 pages free, and not dirty, reassign to global page pool and
        # break.
        if slab['free_chunks'] > slab['chunks_per_page'] * 3.5:
            if window_check(history, sid, 'dirty') == 0:
                cbytes = slab['chunk_size'] * slab['free_chunks']
                if cbytes > freest[1]:
                    freest = (sid, cbytes)

        # are we the oldest slab class? (and a valid target)
        if age > oldest[1] and slab['total_pages'] > 2:
            oldest = (sid, age)

    # Oldest loses a page to global pool goes to global page pool.
    if pool_high:
        # Pool is too large, target item age higher.
        decision = (-2, int(oldest[1] * 1.02))
    elif freest[0] != -1:
        if args.verbose:
            print("freest:  [class: {}] [bytes: {}]".format(int(freest[0]), freest[1]))
        decision = (freest[0], 0)
    elif oldest[0] != -1 and pool_low:
        if args.verbose:
            print("old:   [class: {}] [age: {:.2f}]".format(
                int(oldest[0]), oldest[1]))
        # Re-target item age.
        decision = (-2, int(oldest[1] * 0.99))

    if (len(history['w']) >= args.window):
        return decision
    return (-1, -1)


def run_move(s, decision):
    s.write("slabs reassign " + str(decision[0]) + " " + str(decision[1]) + "\r\n")
    line = s.readline().rstrip()
    if args.verbose:
        print("move result:", line)


def run_item_age_adjust(s, age):
    s.write("extstore item_age {}\r\n".format(age))
    line = s.readline().rstrip()
    if args.verbose:
        print("extstore item_age result:", age, line)

def diff_stats(before, after):
    """ fills out "diffs" as deltas between before/after,
    and "totals" as the sum of all slab classes.
    "_d" postfix to keys means the delta between before/after.
    non-postfix keys are total as of 'after's reading.
    """
    diffs = {}
    totals = {}
    for slabid in after.keys():
        sb = before.get(slabid)
        sa = after.get(slabid)
        if not (sb and sa):
            continue
        slab = sa.copy()
        for k in sa.keys():
            if k not in sb:
                continue
            if k not in totals:
                totals[k] = 0
                totals[k + '_d'] = 0
            if k + '_d' not in slab:
                slab[k + '_d'] = 0
            if re.search(r"^\d+$", sa[k]):
                totals[k] += int(sa[k])
                slab[k] = int(sa[k])
                slab[k + '_d'] = int(sa[k]) - int(sb[k])
                totals[k + '_d'] += int(sa[k]) - int(sb[k])
        slab['slab'] = slabid
        diffs[slabid] = slab
    return (diffs, totals)


def read_slab_stats(s):
    slabs = {}
    for statcmd in ['items', 'slabs']:
        #print("stat cmd: " + statcmd)
        # FIXME: Formatting
        s.write("stats " + statcmd + "\r\n")
        while True:
            line = s.readline().rstrip()
            if line.startswith("END"):
                break

            m = re.match(r"^STAT (?:items:)?(\d+):(\S+) (\S+)", line)
            if m:
                (slab, var, val) = m.groups()
                if slab not in slabs:
                    slabs[slab] = {}
                slabs[slab][var] = val
            #print("line: " + line)
    return slabs


# HACK: lets look at 'evictions' being nonzero to indicate memory filled at some point.
def read_stats(s):
    stats = {}
    s.write("stats\r\n")
    while True:
        line = s.readline().rstrip()
        if line.startswith("END"):
            break

        m = re.match(r"^STAT (\S+) (\S+)", line)
        if m:
            (key, val) = m.groups()
            stats[key] = val
    return stats


def pct(num, divisor):
    if not divisor:
        return 0
    return (num / divisor)


def show_detail(diffs, totals):
    """ just a pretty printer for some extra data """
    print("\n  {:2s}: {:8s} (pct  ) {:10s} (pct    ) {:6s} (pct)   {:6s}".format('sb',
                'evicted', 'items', 'pages', 'age'))

    for sid, slab in diffs.items():
        if 'evicted_d' not in slab:
            continue
        print("  {:2d}: {:8d} ({:.2f}%) {:10d} ({:.4f}%) {:6d} ({:.2f}%) {:6d}".format(
              int(sid), slab['evicted_d'], pct(slab['evicted_d'], totals['evicted_d']),
              slab['number'], pct(slab['number'], totals['number']),
              slab['total_pages'], pct(slab['total_pages'],
              totals['total_pages']),
              slab['age']))


stats_pre = {}
history = { 'w': [{}] }
last_item_age = 1
while True:
    try:
        with socket.create_connection((host, port), 5) as c:
            s = c.makefile(mode="rw", buffering=1)
            s.write("slabs automove 0\r\n")
            print(s.readline().rstrip())
            while True:
                stats_post = read_slab_stats(s)
                stats = read_stats(s)
                (diffs, totals) = diff_stats(stats_pre, stats_post)
                #if args.verbose:
                #    show_detail(diffs, totals)
                decision = (-1, -1)
                if stats['evictions'] != 0:
                    decision = determine_move(history, stats, diffs, totals)
                    if int(decision[0]) > 0 and decision[1] != -1:
                        print("moving page from, to:", decision)
                        if args.automove:
                            run_move(s, decision)
                    elif decision[0] == -2 and last_item_age != decision[1]:
                        # Adjusting the item age for bg flusher
                        if args.automove:
                            run_item_age_adjust(s, decision[1])


                # Minimize sleeping if we just moved a page to global pool.
                # Improves responsiveness during flushes/quick changes.
                if decision[1] == 0:
                    continue
                else:
                    sleep(args.sleep)
                stats_pre = stats_post
    except:
        err = sys.exc_info()
        print("disconnected:", err[0], err[1])
        traceback.print_exc()
        stats_pre = {}
        history = { 'w': [{}] }
        sleep(args.sleep)