summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryoav-steinberg <yoav@monfort.co.il>2021-09-23 14:02:16 +0300
committerGitHub <noreply@github.com>2021-09-23 14:02:16 +0300
commit2753429c99425e3d0216cba79e0e61192975f252 (patch)
treea3a8af6eae8fd3121f17b3ba0308f1312777636c
parenta56d4533b72db8aa147be090c4c1d2bc548b9408 (diff)
downloadredis-2753429c99425e3d0216cba79e0e61192975f252.tar.gz
Client eviction (#8687)
### Description A mechanism for disconnecting clients when the sum of all connected clients is above a configured limit. This prevents eviction or OOM caused by accumulated used memory between all clients. It's a complimentary mechanism to the `client-output-buffer-limit` mechanism which takes into account not only a single client and not only output buffers but rather all memory used by all clients. #### Design The general design is as following: * We track memory usage of each client, taking into account all memory used by the client (query buffer, output buffer, parsed arguments, etc...). This is kept up to date after reading from the socket, after processing commands and after writing to the socket. * Based on the used memory we sort all clients into buckets. Each bucket contains all clients using up up to x2 memory of the clients in the bucket below it. For example up to 1m clients, up to 2m clients, up to 4m clients, ... * Before processing a command and before sleep we check if we're over the configured limit. If we are we start disconnecting clients from larger buckets downwards until we're under the limit. #### Config `maxmemory-clients` max memory all clients are allowed to consume, above this threshold we disconnect clients. This config can either be set to 0 (meaning no limit), a size in bytes (possibly with MB/GB suffix), or as a percentage of `maxmemory` by using the `%` suffix (e.g. setting it to `10%` would mean 10% of `maxmemory`). #### Important code changes * During the development I encountered yet more situations where our io-threads access global vars. And needed to fix them. I also had to handle keeps the clients sorted into the memory buckets (which are global) while their memory usage changes in the io-thread. To achieve this I decided to simplify how we check if we're in an io-thread and make it much more explicit. I removed the `CLIENT_PENDING_READ` flag used for checking if the client is in an io-thread (it wasn't used for anything else) and just used the global `io_threads_op` variable the same way to check during writes. * I optimized the cleanup of the client from the `clients_pending_read` list on client freeing. We now store a pointer in the `client` struct to this list so we don't need to search in it (`pending_read_list_node`). * Added `evicted_clients` stat to `INFO` command. * Added `CLIENT NO-EVICT ON|OFF` sub command to exclude a specific client from the client eviction mechanism. Added corrosponding 'e' flag in the client info string. * Added `multi-mem` field in the client info string to show how much memory is used up by buffered multi commands. * Client `tot-mem` now accounts for buffered multi-commands, pubsub patterns and channels (partially), tracking prefixes (partially). * CLIENT_CLOSE_ASAP flag is now handled in a new `beforeNextClient()` function so clients will be disconnected between processing different clients and not only before sleep. This new function can be used in the future for work we want to do outside the command processing loop but don't want to wait for all clients to be processed before we get to it. Specifically I wanted to handle output-buffer-limit related closing before we process client eviction in case the two race with each other. * Added a `DEBUG CLIENT-EVICTION` command to print out info about the client eviction buckets. * Each client now holds a pointer to the client eviction memory usage bucket it belongs to and listNode to itself in that bucket for quick removal. * Global `io_threads_op` variable now can contain a `IO_THREADS_OP_IDLE` value indicating no io-threading is currently being executed. * In order to track memory used by each clients in real-time we can't rely on updating these stats in `clientsCron()` alone anymore. So now I call `updateClientMemUsage()` (used to be `clientsCronTrackClientsMemUsage()`) after command processing, after writing data to pubsub clients, after writing the output buffer and after reading from the socket (and maybe other places too). The function is written to be fast. * Clients are evicted if needed (with appropriate log line) in `beforeSleep()` and before processing a command (before performing oom-checks and key-eviction). * All clients memory usage buckets are grouped as follows: * All clients using less than 64k. * 64K..128K * 128K..256K * ... * 2G..4G * All clients using 4g and up. * Added client-eviction.tcl with a bunch of tests for the new mechanism. * Extended maxmemory.tcl to test the interaction between maxmemory and maxmemory-clients settings. * Added an option to flag a numeric configuration variable as a "percent", this means that if we encounter a '%' after the number in the config file (or config set command) we consider it as valid. Such a number is store internally as a negative value. This way an integer value can be interpreted as either a percent (negative) or absolute value (positive). This is useful for example if some numeric configuration can optionally be set to a percentage of something else. Co-authored-by: Oran Agra <oran@redislabs.com>
-rw-r--r--redis.conf19
-rw-r--r--src/blocked.c12
-rw-r--r--src/config.c152
-rw-r--r--src/debug.c19
-rw-r--r--src/multi.c9
-rw-r--r--src/networking.c236
-rw-r--r--src/object.c2
-rw-r--r--src/pubsub.c2
-rw-r--r--src/redis-cli.c2
-rw-r--r--src/replication.c1
-rw-r--r--src/server.c130
-rw-r--r--src/server.h51
-rw-r--r--src/tracking.c1
-rw-r--r--src/util.c5
-rw-r--r--tests/support/util.tcl11
-rw-r--r--tests/test_helper.tcl1
-rw-r--r--tests/unit/client-eviction.tcl509
-rw-r--r--tests/unit/maxmemory.tcl137
18 files changed, 1168 insertions, 131 deletions
diff --git a/redis.conf b/redis.conf
index 3eb7374cc..bd7fa5271 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1841,6 +1841,25 @@ client-output-buffer-limit pubsub 32mb 8mb 60
#
# client-query-buffer-limit 1gb
+# In some scenarios client connections can hog up memory leading to OOM
+# errors or data eviction. To avoid this we can cap the accumulated memory
+# used by all client connections (all pubsub and normal clients). Once we
+# reach that limit connections will be dropped by the server freeing up
+# memory. The server will attempt to drop the connections using the most
+# memory first. We call this mechanism "client eviction".
+#
+# Client eviction is configured using the maxmemory-clients setting as follows:
+# 0 - client eviction is disabled (default)
+#
+# A memory value can be used for the client eviction threshold,
+# for example:
+# maxmemory-clients 1g
+#
+# A percentage value (between 1% and 100%) means the client eviction threshold
+# is based on a percentage of the maxmemory setting. For example to set client
+# eviction at 5% of maxmemory:
+# maxmemory-clients 5%
+
# In the Redis protocol, bulk requests, that are, elements representing single
# strings, are normally limited to 512 mb. However you can change this limit
# here, but must be 1mb or greater
diff --git a/src/blocked.c b/src/blocked.c
index 4898cdcbf..86aed2440 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -138,14 +138,14 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
/* If we have a queued command, execute it now. */
- if (processPendingCommandsAndResetClient(c) == C_ERR) {
- continue;
- }
- /* Then process client if it has more data in it's buffer. */
- if (c->querybuf && sdslen(c->querybuf) > 0) {
- processInputBuffer(c);
+ if (processPendingCommandsAndResetClient(c) == C_OK) {
+ /* Now process client if it has more data in it's buffer. */
+ if (c->querybuf && sdslen(c->querybuf) > 0) {
+ processInputBuffer(c);
+ }
}
}
+ beforeNextClient(c);
}
}
diff --git a/src/config.c b/src/config.c
index 349021f9e..2a8697e09 100644
--- a/src/config.c
+++ b/src/config.c
@@ -198,6 +198,10 @@ typedef enum numericType {
NUMERIC_TYPE_TIME_T,
} numericType;
+#define INTEGER_CONFIG 0 /* No flags means a simple integer configuration */
+#define MEMORY_CONFIG (1<<0) /* Indicates if this value can be loaded as a memory value */
+#define PERCENT_CONFIG (1<<1) /* Indicates if this value can be loaded as a percent (and stored as a negative int) */
+
typedef struct numericConfigData {
union {
int *i;
@@ -211,7 +215,7 @@ typedef struct numericConfigData {
off_t *ot;
time_t *tt;
} config; /* The pointer to the numeric config this value is stored in */
- int is_memory; /* Indicates if this value can be loaded as a memory value */
+ unsigned int flags;
numericType numeric_type; /* An enum indicating the type of this value */
long long lower_bound; /* The lower bound of this numeric value */
long long upper_bound; /* The upper bound of this numeric value */
@@ -1347,6 +1351,14 @@ void rewriteConfigBytesOption(struct rewriteConfigState *state, const char *opti
rewriteConfigRewriteLine(state,option,line,force);
}
+/* Rewrite a simple "option-name n%" configuration option. */
+void rewriteConfigPercentOption(struct rewriteConfigState *state, const char *option, long long value, long long defvalue) {
+ int force = value != defvalue;
+ sds line = sdscatprintf(sdsempty(),"%s %lld%%",option,value);
+
+ rewriteConfigRewriteLine(state,option,line,force);
+}
+
/* Rewrite a yes/no option. */
void rewriteConfigYesNoOption(struct rewriteConfigState *state, const char *option, int value, int defvalue) {
int force = value != defvalue;
@@ -2111,8 +2123,18 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) {
return 0;
}
} else {
+ /* Boundary check for percentages */
+ if (data.numeric.flags & PERCENT_CONFIG && ll < 0) {
+ if (ll < data.numeric.lower_bound) {
+ snprintf(loadbuf, LOADBUF_SIZE,
+ "percentage argument must be less or equal to %lld",
+ -data.numeric.lower_bound);
+ *err = loadbuf;
+ return 0;
+ }
+ }
/* Boundary check for signed types */
- if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) {
+ else if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) {
snprintf(loadbuf, LOADBUF_SIZE,
"argument must be between %lld and %lld inclusive",
data.numeric.lower_bound,
@@ -2124,22 +2146,46 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) {
return 1;
}
+static int numericParseString(typeData data, sds value, const char **err, long long *res) {
+ /* First try to parse as memory */
+ if (data.numeric.flags & MEMORY_CONFIG) {
+ int memerr;
+ *res = memtoull(value, &memerr);
+ if (!memerr)
+ return 1;
+ }
+
+ /* Attempt to parse as percent */
+ if (data.numeric.flags & PERCENT_CONFIG &&
+ sdslen(value) > 1 && value[sdslen(value)-1] == '%' &&
+ string2ll(value, sdslen(value)-1, res) &&
+ *res >= 0) {
+ /* We store percentage as negative value */
+ *res = -*res;
+ return 1;
+ }
+
+ /* Attempt a simple number (no special flags set) */
+ if (!data.numeric.flags && string2ll(value, sdslen(value), res))
+ return 1;
+
+ /* Select appropriate error string */
+ if (data.numeric.flags & MEMORY_CONFIG &&
+ data.numeric.flags & PERCENT_CONFIG)
+ *err = "argument must be a memory or percent value" ;
+ else if (data.numeric.flags & MEMORY_CONFIG)
+ *err = "argument must be a memory value";
+ else
+ *err = "argument couldn't be parsed into an integer";
+ return 0;
+}
static int numericConfigSet(typeData data, sds value, int update, const char **err) {
long long ll, prev = 0;
- if (data.numeric.is_memory) {
- int memerr;
- ll = memtoull(value, &memerr);
- if (memerr) {
- *err = "argument must be a memory value";
- return 0;
- }
- } else {
- if (!string2ll(value, sdslen(value), &ll)) {
- *err = "argument couldn't be parsed into an integer" ;
- return 0;
- }
- }
+
+ if (!numericParseString(data, value, err, &ll))
+ return 0;
+
if (!numericBoundaryCheck(data, ll, err))
return 0;
@@ -2158,21 +2204,21 @@ static int numericConfigSet(typeData data, sds value, int update, const char **e
static void numericConfigGet(client *c, typeData data) {
char buf[128];
- if (data.numeric.is_memory) {
- unsigned long long value = 0;
-
- GET_NUMERIC_TYPE(value)
- ull2string(buf, sizeof(buf), value);
- addReplyBulkCString(c, buf);
- } else{
- long long value = 0;
-
- GET_NUMERIC_TYPE(value)
+ long long value = 0;
+ GET_NUMERIC_TYPE(value)
+ if (data.numeric.flags & PERCENT_CONFIG && value < 0) {
+ int len = ll2string(buf, sizeof(buf), -value);
+ buf[len] = '%';
+ buf[len+1] = '\0';
+ }
+ else if (data.numeric.flags & MEMORY_CONFIG) {
+ ull2string(buf, sizeof(buf), value);
+ } else {
ll2string(buf, sizeof(buf), value);
- addReplyBulkCString(c, buf);
}
+ addReplyBulkCString(c, buf);
}
static void numericConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
@@ -2180,18 +2226,17 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite
GET_NUMERIC_TYPE(value)
- if (data.numeric.is_memory) {
+ if (data.numeric.flags & PERCENT_CONFIG && value < 0) {
+ rewriteConfigPercentOption(state, name, -value, data.numeric.default_value);
+ } else if (data.numeric.flags & MEMORY_CONFIG) {
rewriteConfigBytesOption(state, name, value, data.numeric.default_value);
} else {
rewriteConfigNumericalOption(state, name, value, data.numeric.default_value);
}
}
-#define INTEGER_CONFIG 0
-#define MEMORY_CONFIG 1
-
-#define embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) { \
- embedCommonConfig(name, alias, flags) \
+#define embedCommonNumericalConfig(name, alias, _flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) { \
+ embedCommonConfig(name, alias, _flags) \
embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \
.data.numeric = { \
.lower_bound = (lower), \
@@ -2199,73 +2244,73 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite
.default_value = (default), \
.is_valid_fn = (is_valid), \
.update_fn = (update), \
- .is_memory = (memory),
+ .flags = (num_conf_flags),
-#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_INT, \
.config.i = &(config_addr) \
} \
}
-#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_UINT, \
.config.ui = &(config_addr) \
} \
}
-#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_LONG, \
.config.l = &(config_addr) \
} \
}
-#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_ULONG, \
.config.ul = &(config_addr) \
} \
}
-#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_LONG_LONG, \
.config.ll = &(config_addr) \
} \
}
-#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_ULONG_LONG, \
.config.ull = &(config_addr) \
} \
}
-#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_SIZE_T, \
.config.st = &(config_addr) \
} \
}
-#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_SSIZE_T, \
.config.sst = &(config_addr) \
} \
}
-#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_TIME_T, \
.config.tt = &(config_addr) \
} \
}
-#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_OFF_T, \
.config.ot = &(config_addr) \
} \
@@ -2653,6 +2698,7 @@ standardConfig configs[] = {
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */
+ createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL),
/* Other configs */
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
diff --git a/src/debug.c b/src/debug.c
index d29d48673..4e95e2dfd 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -469,6 +469,8 @@ void debugCommand(client *c) {
" Return the size of different Redis core C structures.",
"ZIPLIST <key>",
" Show low level info about the ziplist encoding of <key>.",
+"CLIENT-EVICTION",
+" Show low level client eviction pools info (maxmemory-clients).",
NULL
};
addReplyHelp(c, help);
@@ -883,6 +885,23 @@ NULL
addReplyError(c, "CONFIG-REWRITE-FORCE-ALL failed");
else
addReply(c, shared.ok);
+ } else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) {
+ sds bucket_info = sdsempty();
+ for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
+ if (j == 0)
+ bucket_info = sdscatprintf(bucket_info, "bucket 0");
+ else
+ bucket_info = sdscatprintf(bucket_info, "bucket %10zu", (size_t)1<<(j-1+CLIENT_MEM_USAGE_BUCKET_MIN_LOG));
+ if (j == CLIENT_MEM_USAGE_BUCKETS-1)
+ bucket_info = sdscatprintf(bucket_info, "+ : ");
+ else
+ bucket_info = sdscatprintf(bucket_info, " - %10zu: ", ((size_t)1<<(j+CLIENT_MEM_USAGE_BUCKET_MIN_LOG))-1);
+ bucket_info = sdscatprintf(bucket_info, "tot-mem: %10zu, clients: %lu\n",
+ server.client_mem_usage_buckets[j].mem_usage_sum,
+ server.client_mem_usage_buckets[j].clients->len);
+ }
+ addReplyVerbatim(c,bucket_info,sdslen(bucket_info),"txt");
+ sdsfree(bucket_info);
#ifdef USE_JEMALLOC
} else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) {
mallctl_int(c, c->argv+2, c->argc-2);
diff --git a/src/multi.c b/src/multi.c
index e40d2a447..b02457bb9 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -37,6 +37,7 @@ void initClientMultiState(client *c) {
c->mstate.count = 0;
c->mstate.cmd_flags = 0;
c->mstate.cmd_inv_flags = 0;
+ c->mstate.argv_len_sums = 0;
}
/* Release all the resources associated with MULTI/EXEC state */
@@ -78,6 +79,7 @@ void queueMultiCommand(client *c) {
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
+ c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;
}
void discardTransaction(client *c) {
@@ -435,3 +437,10 @@ void unwatchCommand(client *c) {
c->flags &= (~CLIENT_DIRTY_CAS);
addReply(c,shared.ok);
}
+
+size_t multiStateMemOverhead(client *c) {
+ size_t mem = c->mstate.argv_len_sums;
+ /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
+ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
+ return mem;
+}
diff --git a/src/networking.c b/src/networking.c
index 3b522fb35..21278d783 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -180,15 +180,18 @@ client *createClient(connection *conn) {
c->sockname = NULL;
c->client_list_node = NULL;
c->paused_list_node = NULL;
+ c->pending_read_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
- c->client_cron_last_memory_usage = 0;
- c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;
+ c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0;
+ c->last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
+ c->mem_usage_bucket = NULL;
+ c->mem_usage_bucket_node = NULL;
if (conn) linkClient(c);
initClientMultiState(c);
return c;
@@ -267,7 +270,7 @@ int prepareClientToWrite(client *c) {
* not install a write handler. Instead, it will be done by
* handleClientsWithPendingReadsUsingThreads() upon return.
*/
- if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
+ if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
@@ -1288,13 +1291,13 @@ void unlinkClient(client *c) {
}
/* Remove from the list of pending reads if needed. */
- if (c->flags & CLIENT_PENDING_READ) {
- ln = listSearchKey(server.clients_pending_read,c);
- serverAssert(ln != NULL);
- listDelNode(server.clients_pending_read,ln);
- c->flags &= ~CLIENT_PENDING_READ;
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ if (c->pending_read_list_node != NULL) {
+ listDelNode(server.clients_pending_read,c->pending_read_list_node);
+ c->pending_read_list_node = NULL;
}
+
/* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */
if (c->flags & CLIENT_UNBLOCKED) {
@@ -1430,10 +1433,15 @@ void freeClient(client *c) {
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
- /* Remove the contribution that this client gave to our
+ /* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
- server.stat_clients_type_memory[c->client_cron_last_memory_type] -=
- c->client_cron_last_memory_usage;
+ server.stat_clients_type_memory[c->last_memory_type] -=
+ c->last_memory_usage;
+ /* Remove client from memory usage buckets */
+ if (c->mem_usage_bucket) {
+ c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
+ listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
+ }
/* Release other dynamically allocated client structure fields,
* and finally release the client structure itself. */
@@ -1470,6 +1478,27 @@ void freeClientAsync(client *c) {
pthread_mutex_unlock(&async_free_queue_mutex);
}
+/* Perform processing of the client before moving on to processing the next client
+ * this is useful for performing operations that affect the global state but can't
+ * wait until we're done with all clients. In other words can't wait until beforeSleep()
+ * return C_ERR in case client is no longer valid after call. */
+int beforeNextClient(client *c) {
+ /* Skip the client processing if we're in an IO thread, in that case we'll perform
+ this operation later (this function is called again) in the fan-in stage of the threading mechanism */
+ if (io_threads_op != IO_THREADS_OP_IDLE)
+ return C_OK;
+ /* Handle async frees */
+ /* Note: this doesn't make the server.clients_to_close list redundant because of
+ * cases where we want an async free of a client other than myself. For example
+ * in ACL modifications we disconnect clients authenticated to non-existent
+ * users (see ACL LOAD). */
+ if (c->flags & CLIENT_CLOSE_ASAP) {
+ freeClient(c);
+ return C_ERR;
+ }
+ return C_OK;
+}
+
/* Free the clients marked as CLOSE_ASAP, return the number of clients
* freed. */
int freeClientsInAsyncFreeQueue(void) {
@@ -1594,7 +1623,10 @@ int writeToClient(client *c, int handler_installed) {
* adDeleteFileEvent() is not thread safe: however writeToClient()
* is always called with handler_installed set to 0 from threads
* so we are fine. */
- if (handler_installed) connSetWriteHandler(c->conn, NULL);
+ if (handler_installed) {
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ connSetWriteHandler(c->conn, NULL);
+ }
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
@@ -1602,6 +1634,7 @@ int writeToClient(client *c, int handler_installed) {
return C_ERR;
}
}
+ updateClientMemUsage(c);
return C_OK;
}
@@ -2036,7 +2069,11 @@ int processCommandAndResetClient(client *c) {
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
+ /* Update the client's memory to include output buffer growth following the
+ * processed command. */
+ updateClientMemUsage(c);
}
+
if (server.current_client == NULL) deadclient = 1;
/*
* Restore the old client, this is needed because when a script
@@ -2117,7 +2154,8 @@ void processInputBuffer(client *c) {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
- if (c->flags & CLIENT_PENDING_READ) {
+ if (io_threads_op != IO_THREADS_OP_IDLE) {
+ serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
@@ -2137,6 +2175,11 @@ void processInputBuffer(client *c) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
+
+ /* Update client memory usage after processing the query buffer, this is
+ * important in case the query buffer is big and wasn't drained during
+ * the above loop (because of partially sent big commands). */
+ updateClientMemUsage(c);
}
void readQueryFromClient(connection *conn) {
@@ -2190,7 +2233,7 @@ void readQueryFromClient(connection *conn) {
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
- return;
+ goto done;
}
} else if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
@@ -2199,7 +2242,7 @@ void readQueryFromClient(connection *conn) {
sdsfree(info);
}
freeClientAsync(c);
- return;
+ goto done;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
@@ -2223,12 +2266,15 @@ void readQueryFromClient(connection *conn) {
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
- return;
+ goto done;
}
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
+
+done:
+ beforeNextClient(c);
}
/* A Redis "Address String" is a colon separated ip:port pair.
@@ -2306,6 +2352,7 @@ sds catClientInfoString(sds s, client *client) {
if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
if (client->flags & CLIENT_READONLY) *p++ = 'r';
+ if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
if (p == flags) *p++ = 'N';
*p++ = '\0';
@@ -2317,19 +2364,10 @@ sds catClientInfoString(sds s, client *client) {
*p = '\0';
/* Compute the total memory consumed by this client. */
- size_t obufmem = getClientOutputBufferMemoryUsage(client);
- size_t total_mem = obufmem;
- total_mem += zmalloc_size(client); /* includes client->buf */
- total_mem += sdsZmallocSize(client->querybuf);
- /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
- * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
- * spot problematic clients. */
- total_mem += client->argv_len_sum;
- if (client->argv)
- total_mem += zmalloc_size(client->argv);
+ size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
return sdscatfmt(s,
- "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
+ "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
(unsigned long long) client->id,
getClientPeerId(client),
getClientSockname(client),
@@ -2345,6 +2383,7 @@ sds catClientInfoString(sds s, client *client) {
(unsigned long long) sdslen(client->querybuf),
(unsigned long long) sdsavail(client->querybuf),
(unsigned long long) client->argv_len_sum,
+ (unsigned long long) client->mstate.argv_len_sums,
(unsigned long long) client->bufpos,
(unsigned long long) listLength(client->reply),
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
@@ -2565,6 +2604,18 @@ NULL
addReplyErrorObject(c,shared.syntaxerr);
return;
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) {
+ /* CLIENT PROTECT ON|OFF */
+ if (!strcasecmp(c->argv[2]->ptr,"on")) {
+ c->flags |= CLIENT_NO_EVICT;
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
+ c->flags &= ~CLIENT_NO_EVICT;
+ addReply(c,shared.ok);
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
} else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
/* CLIENT KILL <ip:port>
* CLIENT KILL <option> [value] ... <option> [value] */
@@ -3154,11 +3205,39 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* Note: this function is very fast so can be called as many time as
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
-unsigned long getClientOutputBufferMemoryUsage(client *c) {
- unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
+size_t getClientOutputBufferMemoryUsage(client *c) {
+ size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
+/* Returns the total client's memory usage.
+ * Optionally, if output_buffer_mem_usage is not NULL, it fills it with
+ * the client output buffer memory usage portion of the total. */
+size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
+ size_t mem = getClientOutputBufferMemoryUsage(c);
+ if (output_buffer_mem_usage != NULL)
+ *output_buffer_mem_usage = mem;
+ mem += sdsZmallocSize(c->querybuf);
+ mem += zmalloc_size(c);
+ /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
+ * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
+ * spot problematic clients. */
+ mem += c->argv_len_sum + sizeof(robj*)*c->argc;
+ mem += multiStateMemOverhead(c);
+
+ /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers
+ * to the strings themselves because they aren't stored per client. */
+ mem += listLength(c->pubsub_patterns) * sizeof(listNode);
+ mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) +
+ dictSlots(c->pubsub_channels) * sizeof(dictEntry*);
+
+ /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */
+ if (c->client_tracking_prefixes)
+ mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
+
+ return mem;
+}
+
/* Get the class of a client, used in order to enforce limits to different
* classes of clients.
*
@@ -3425,13 +3504,11 @@ void processEventsWhileBlocked(void) {
* ========================================================================== */
#define IO_THREADS_MAX_NUM 128
-#define IO_THREADS_OP_READ 0
-#define IO_THREADS_OP_WRITE 1
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
-int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
+int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??!
/* This is the list of clients each thread will serve when threaded I/O is
* used. We spawn io_threads_num-1 threads, since one is the main thread
@@ -3498,6 +3575,9 @@ void *IOThreadMain(void *myid) {
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
+ /* Indicate that io-threads are currently idle */
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
@@ -3584,6 +3664,12 @@ int stopThreadedIOIfNeeded(void) {
}
}
+/* This function achieves thread safety using a fan-out -> fan-in paradigm:
+ * Fan out: The main thread fans out work to the io-threads which block until
+ * setIOPendingCount() is called with a value larger than 0 by the main thread.
+ * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
+ * it can safely perform post-processing and return to normal synchronous
+ * work. */
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
@@ -3642,12 +3728,17 @@ int handleClientsWithPendingWritesUsingThreads(void) {
if (pending == 0) break;
}
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
+ /* Update the client in the mem usage buckets after we're done processing it in the io-threads */
+ updateClientMemUsageBucket(c);
+
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
@@ -3672,10 +3763,11 @@ int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
- !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
+ !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
+ io_threads_op == IO_THREADS_OP_IDLE)
{
- c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
+ c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
} else {
return 0;
@@ -3687,7 +3779,13 @@ int postponeClientRead(client *c) {
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
- * rendering it in the client structures. */
+ * rendering it in the client structures.
+ * This function achieves thread safety using a fan-out -> fan-in paradigm:
+ * Fan out: The main thread fans out work to the io-threads which block until
+ * setIOPendingCount() is called with a value larger than 0 by the main thread.
+ * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
+ * it can safely perform post-processing and return to normal synchronous
+ * work. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
@@ -3729,14 +3827,27 @@ int handleClientsWithPendingReadsUsingThreads(void) {
if (pending == 0) break;
}
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
- c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
+ c->pending_read_list_node = NULL;
serverAssert(!(c->flags & CLIENT_BLOCKED));
+
+ if (beforeNextClient(c) == C_ERR) {
+ /* If the client is no longer valid, we avoid
+ * processing the client later. So we just go
+ * to the next. */
+ continue;
+ }
+
+ /* Once io-threads are idle we can update the client in the mem usage buckets */
+ updateClientMemUsageBucket(c);
+
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
@@ -3758,3 +3869,56 @@ int handleClientsWithPendingReadsUsingThreads(void) {
return processed;
}
+
+/* Returns the actual client eviction limit based on current configuration or
+ * 0 if no limit. */
+size_t getClientEvictionLimit(void) {
+ size_t maxmemory_clients_actual = SIZE_MAX;
+
+ /* Handle percentage of maxmemory*/
+ if (server.maxmemory_clients < 0 && server.maxmemory > 0) {
+ unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100);
+ if (maxmemory_clients_bytes <= SIZE_MAX)
+ maxmemory_clients_actual = maxmemory_clients_bytes;
+ }
+ else if (server.maxmemory_clients > 0)
+ maxmemory_clients_actual = server.maxmemory_clients;
+ else
+ return 0;
+
+ /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate
+ * at all with the server because of bad configuration */
+ if (maxmemory_clients_actual < 1024*128)
+ maxmemory_clients_actual = 1024*128;
+
+ return maxmemory_clients_actual;
+}
+
+void evictClients(void) {
+ /* Start eviction from topmost bucket (largest clients) */
+ int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
+ listIter bucket_iter;
+ listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
+ size_t client_eviction_limit = getClientEvictionLimit();
+ if (client_eviction_limit == 0)
+ return;
+ while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) {
+ listNode *ln = listNext(&bucket_iter);
+ if (ln) {
+ client *c = ln->value;
+ sds ci = catClientInfoString(sdsempty(),c);
+ serverLog(LL_NOTICE, "Evicting client: %s", ci);
+ freeClient(c);
+ sdsfree(ci);
+ server.stat_evictedclients++;
+ } else {
+ curr_bucket--;
+ if (curr_bucket < 0) {
+ serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients");
+ break;
+ }
+ listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
+ }
+ }
+}
diff --git a/src/object.c b/src/object.c
index 0f869ea7e..edbd56acb 100644
--- a/src/object.c
+++ b/src/object.c
@@ -1180,7 +1180,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
/* Computing the memory used by the clients would be O(N) if done
* here online. We use our values computed incrementally by
- * clientsCronTrackClientsMemUsage(). */
+ * updateClientMemUsage(). */
mh->clients_slaves = server.stat_clients_type_memory[CLIENT_TYPE_SLAVE];
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
diff --git a/src/pubsub.c b/src/pubsub.c
index 0169b3604..e0bbc6d94 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -304,6 +304,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message);
+ updateClientMemUsage(c);
receivers++;
}
}
@@ -323,6 +324,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
+ updateClientMemUsage(c);
receivers++;
}
}
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 7144d2bc2..f3e6f6082 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -1286,6 +1286,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
!strcasecmp(argv[1],"htstats")) ||
(argc >= 2 && !strcasecmp(command,"debug") &&
!strcasecmp(argv[1],"htstats-key")) ||
+ (argc >= 2 && !strcasecmp(command,"debug") &&
+ !strcasecmp(argv[1],"client-eviction")) ||
(argc >= 2 && !strcasecmp(command,"memory") &&
(!strcasecmp(argv[1],"malloc-stats") ||
!strcasecmp(argv[1],"doctor"))) ||
diff --git a/src/replication.c b/src/replication.c
index e1d39582c..8c692df8d 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -414,6 +414,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
+ updateClientMemUsage(c);
}
decrRefCount(cmdobj);
}
diff --git a/src/server.c b/src/server.c
index d8555619c..9b8fd48ef 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2169,30 +2169,94 @@ int clientsCronTrackExpansiveClients(client *c, int time_idx) {
return 0; /* This function never terminates the client. */
}
-/* Iterating all the clients in getMemoryOverheadData() is too slow and
- * in turn would make the INFO command too slow. So we perform this
- * computation incrementally and track the (not instantaneous but updated
- * to the second) total memory used by clients using clientsCron() in
- * a more incremental way (depending on server.hz). */
-int clientsCronTrackClientsMemUsage(client *c) {
- size_t mem = 0;
+/* All normal clients are placed in one of the "mem usage buckets" according
+ * to how much memory they currently use. We use this function to find the
+ * appropriate bucket based on a given memory usage value. The algorithm simply
+ * does a log2(mem) to ge the bucket. This means, for examples, that if a
+ * client's memory usage doubles it's moved up to the next bucket, if it's
+ * halved we move it down a bucket.
+ * For more details see CLIENT_MEM_USAGE_BUCKETS documentation in server.h. */
+clientMemUsageBucket *getMemUsageBucket(size_t mem) {
+ int size_in_bits = 8*(int)sizeof(mem);
+ int clz = mem > 0 ? __builtin_clzl(mem) : size_in_bits;
+ int bucket_idx = size_in_bits - clz;
+ if (bucket_idx > CLIENT_MEM_USAGE_BUCKET_MAX_LOG)
+ bucket_idx = CLIENT_MEM_USAGE_BUCKET_MAX_LOG;
+ else if (bucket_idx < CLIENT_MEM_USAGE_BUCKET_MIN_LOG)
+ bucket_idx = CLIENT_MEM_USAGE_BUCKET_MIN_LOG;
+ bucket_idx -= CLIENT_MEM_USAGE_BUCKET_MIN_LOG;
+ return &server.client_mem_usage_buckets[bucket_idx];
+}
+
+/* This is called both on explicit clients when something changed their buffers,
+ * so we can track clients' memory and enforce clients' maxmemory in real time,
+ * and also from the clientsCron. We call it from the cron so we have updated
+ * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration
+ * change requires us to evict a non-active client.
+ */
+int updateClientMemUsage(client *c) {
+ size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
- mem += getClientOutputBufferMemoryUsage(c);
- mem += sdsZmallocSize(c->querybuf);
- mem += zmalloc_size(c);
- mem += c->argv_len_sum;
- if (c->argv) mem += zmalloc_size(c->argv);
- /* Now that we have the memory used by the client, remove the old
- * value from the old category, and add it back. */
- server.stat_clients_type_memory[c->client_cron_last_memory_type] -=
- c->client_cron_last_memory_usage;
- server.stat_clients_type_memory[type] += mem;
+
+ /* Remove the old value of the memory used by the client from the old
+ * category, and add it back. */
+ atomicDecr(server.stat_clients_type_memory[c->last_memory_type], c->last_memory_usage);
+ atomicIncr(server.stat_clients_type_memory[type], mem);
+
/* Remember what we added and where, to remove it next time. */
- c->client_cron_last_memory_usage = mem;
- c->client_cron_last_memory_type = type;
+ c->last_memory_usage = mem;
+ c->last_memory_type = type;
+
+ /* Update client mem usage bucket only when we're not in the context of an
+ * IO thread. See updateClientMemUsageBucket() for details. */
+ if (io_threads_op == IO_THREADS_OP_IDLE)
+ updateClientMemUsageBucket(c);
+
return 0;
}
+/* Adds the client to the correct memory usage bucket. Each bucket contains
+ * all clients with roughly the same amount of memory. This way we group
+ * together clients consuming about the same amount of memory and can quickly
+ * free them in case we reach maxmemory-clients (client eviction).
+ * Note that in case of io-threads enabled we have to call this function only
+ * after the fan-in phase (when no io-threads are working) because the bucket
+ * lists are global. The io-threads themselves track per-client memory usage in
+ * updateClientMemUsage(). Here we update the clients to each bucket when all
+ * io-threads are done (both for read and write io-threading). */
+void updateClientMemUsageBucket(client *c) {
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ int allow_eviction =
+ (c->last_memory_type == CLIENT_TYPE_NORMAL || c->last_memory_type == CLIENT_TYPE_PUBSUB) &&
+ !(c->flags & CLIENT_NO_EVICT);
+
+ /* Update the client in the mem usage buckets */
+ if (c->mem_usage_bucket) {
+ c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage_on_bucket_update;
+ /* If this client can't be evicted then remove it from the mem usage
+ * buckets */
+ if (!allow_eviction) {
+ listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
+ c->mem_usage_bucket = NULL;
+ c->mem_usage_bucket_node = NULL;
+ }
+ }
+ if (allow_eviction) {
+ clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage);
+ bucket->mem_usage_sum += c->last_memory_usage;
+ if (bucket != c->mem_usage_bucket) {
+ if (c->mem_usage_bucket)
+ listDelNode(c->mem_usage_bucket->clients,
+ c->mem_usage_bucket_node);
+ c->mem_usage_bucket = bucket;
+ listAddNodeTail(bucket->clients, c);
+ c->mem_usage_bucket_node = listLast(bucket->clients);
+ }
+ }
+
+ c->last_memory_usage_on_bucket_update = c->last_memory_usage;
+}
+
/* Return the max samples in the memory usage of clients tracked by
* the function clientsCronTrackExpansiveClients(). */
void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
@@ -2271,7 +2335,13 @@ void clientsCron(void) {
if (clientsCronHandleTimeout(c,now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
- if (clientsCronTrackClientsMemUsage(c)) continue;
+
+ /* Iterating all the clients in getMemoryOverheadData() is too slow and
+ * in turn would make the INFO command too slow. So we perform this
+ * computation incrementally and track the (not instantaneous but updated
+ * to the second) total memory used by clients using clientsCron() in
+ * a more incremental way (depending on server.hz). */
+ if (updateClientMemUsage(c)) continue;
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
}
}
@@ -2879,6 +2949,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* visit processCommand() at all). */
handleClientsBlockedOnKeys();
+ /* Disconnect some clients if they are consuming too much memory. */
+ evictClients();
+
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
@@ -3509,6 +3582,7 @@ void resetServerStats(void) {
server.stat_expired_time_cap_reached_count = 0;
server.stat_expire_cycle_time_used = 0;
server.stat_evictedkeys = 0;
+ server.stat_evictedclients = 0;
server.stat_total_eviction_exceeded_time = 0;
server.stat_last_eviction_exceeded_time = 0;
server.stat_keyspace_misses = 0;
@@ -3606,6 +3680,11 @@ void initServer(void) {
exit(1);
}
+ for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
+ server.client_mem_usage_buckets[j].mem_usage_sum = 0;
+ server.client_mem_usage_buckets[j].clients = listCreate();
+ }
+
createSharedObjects();
adjustOpenFilesLimit();
const char *clk_msg = monotonicInit();
@@ -4610,6 +4689,15 @@ int processCommand(client *c) {
}
}
+ /* Disconnect some clients if total clients memory is too high. We do this
+ * before key eviction, after the last command was executed and consumed
+ * some client output buffer memory. */
+ evictClients();
+ if (server.current_client == NULL) {
+ /* If we evicted ourself then abort processing the command */
+ return C_ERR;
+ }
+
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
@@ -5673,6 +5761,7 @@ sds genRedisInfoString(const char *section) {
"expired_time_cap_reached_count:%lld\r\n"
"expire_cycle_cpu_milliseconds:%lld\r\n"
"evicted_keys:%lld\r\n"
+ "evicted_clients:%lld\r\n"
"total_eviction_exceeded_time:%lld\r\n"
"current_eviction_exceeded_time:%lld\r\n"
"keyspace_hits:%lld\r\n"
@@ -5715,6 +5804,7 @@ sds genRedisInfoString(const char *section) {
server.stat_expired_time_cap_reached_count,
server.stat_expire_cycle_time_used/1000,
server.stat_evictedkeys,
+ server.stat_evictedclients,
(server.stat_total_eviction_exceeded_time + current_eviction_exceeded_time) / 1000,
current_eviction_exceeded_time / 1000,
server.stat_keyspace_hits,
diff --git a/src/server.h b/src/server.h
index 39184591d..3a94effe9 100644
--- a/src/server.h
+++ b/src/server.h
@@ -125,6 +125,12 @@ typedef long long ustime_t; /* microsecond time type. */
#define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}"
+/* Bucket sizes for client eviction pools. Each bucket stores clients with
+ * memory usage of up to twice the size of the bucket below it. */
+#define CLIENT_MEM_USAGE_BUCKET_MIN_LOG 15 /* Bucket sizes start at up to 32KB (2^15) */
+#define CLIENT_MEM_USAGE_BUCKET_MAX_LOG 33 /* Bucket for largest clients: sizes above 4GB (2^32) */
+#define CLIENT_MEM_USAGE_BUCKETS (1+CLIENT_MEM_USAGE_BUCKET_MAX_LOG-CLIENT_MEM_USAGE_BUCKET_MIN_LOG)
+
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
#define ACTIVE_EXPIRE_CYCLE_FAST 1
@@ -275,9 +281,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
-#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
- in the list of clients we can read
- from. */
+/* #define CLIENT_... (1<<29) currently unused, feel free to use in the future */
#define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully
* parsed command ready for execution. */
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
@@ -299,6 +303,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
and AOF client */
#define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants
RDB without replication buffer. */
+#define CLIENT_NO_EVICT (1ULL<<43) /* This client is protected against client
+ memory eviction. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -797,6 +803,7 @@ typedef struct multiState {
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
is possible to know if all the commands have a
certain flag. */
+ size_t argv_len_sums; /* mem used by all commands arguments */
} multiState;
/* This structure holds the blocking operation state for a client.
@@ -912,6 +919,11 @@ typedef struct {
need more reserved IDs use UINT64_MAX-1,
-2, ... and so forth. */
+typedef struct {
+ list *clients;
+ size_t mem_usage_sum;
+} clientMemUsageBucket;
+
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
@@ -976,6 +988,7 @@ typedef struct client {
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *paused_list_node; /* list node within the pause list */
+ listNode *pending_read_list_node; /* list node in clients pending read list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
@@ -993,13 +1006,18 @@ typedef struct client {
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
- /* In clientsCronTrackClientsMemUsage() we track the memory usage of
+ /* In updateClientMemUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which category the client was, in order to remove it
* before adding it the new value. */
- uint64_t client_cron_last_memory_usage;
- int client_cron_last_memory_type;
+ size_t last_memory_usage;
+ int last_memory_type;
+
+ size_t last_memory_usage_on_bucket_update;
+ listNode *mem_usage_bucket_node;
+ clientMemUsageBucket *mem_usage_bucket;
+
/* Response buffer */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
@@ -1288,6 +1306,10 @@ struct redisServer {
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* Current client executing the command. */
+
+ /* Stuff for client mem eviction */
+ clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS];
+
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
rax *clients_index; /* Active clients dictionary by client ID. */
@@ -1319,6 +1341,7 @@ struct redisServer {
long long stat_expired_time_cap_reached_count; /* Early expire cycle stops.*/
long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
+ long long stat_evictedclients; /* Number of evicted clients */
long long stat_total_eviction_exceeded_time; /* Total time over the memory limit, unit us */
monotime stat_last_eviction_exceeded_time; /* Timestamp of current eviction start, unit us */
long long stat_keyspace_hits; /* Number of successful lookups of keys */
@@ -1354,7 +1377,7 @@ struct redisServer {
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
double stat_module_progress; /* Module save progress. */
- uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
+ redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
@@ -1553,6 +1576,7 @@ struct redisServer {
/* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
+ ssize_t maxmemory_clients; /* Memory limit for total client buffers */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Precision of random sampling */
int maxmemory_eviction_tenacity;/* Aggressiveness of eviction processing */
@@ -1882,6 +1906,11 @@ typedef struct {
#define OBJ_HASH_KEY 1
#define OBJ_HASH_VALUE 2
+#define IO_THREADS_OP_IDLE 0
+#define IO_THREADS_OP_READ 1
+#define IO_THREADS_OP_WRITE 2
+extern int io_threads_op;
+
/*-----------------------------------------------------------------------------
* Extern declarations
*----------------------------------------------------------------------------*/
@@ -1966,6 +1995,7 @@ void redisSetCpuAffinity(const char *cpulist);
client *createClient(connection *conn);
void freeClient(client *c);
void freeClientAsync(client *c);
+int beforeNextClient(client *c);
void resetClient(client *c);
void freeClientOriginalArgv(client *c);
void sendReplyToClient(connection *conn);
@@ -2026,7 +2056,8 @@ void rewriteClientCommandVector(client *c, int argc, ...);
void rewriteClientCommandArgument(client *c, int i, robj *newval);
void replaceClientCommandVector(client *c, int argc, robj **argv);
void redactClientCommandArgument(client *c, int argc);
-unsigned long getClientOutputBufferMemoryUsage(client *c);
+size_t getClientOutputBufferMemoryUsage(client *c);
+size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage);
int freeClientsInAsyncFreeQueue(void);
int closeClientOnOutputBufferLimitReached(client *c, int async);
int getClientType(client *c);
@@ -2034,6 +2065,7 @@ int getClientTypeByName(char *name);
char *getClientTypeName(int class);
void flushSlavesOutputBuffers(void);
void disconnectSlaves(void);
+void evictClients(void);
int listenToPort(int port, socketFds *fds);
void pauseClients(mstime_t duration, pause_type type);
void unpauseClients(void);
@@ -2048,6 +2080,8 @@ int handleClientsWithPendingWritesUsingThreads(void);
int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c);
+int updateClientMemUsage(client *c);
+void updateClientMemUsageBucket(client *c);
void unlinkClient(client *c);
int writeToClient(client *c, int handler_installed);
void linkClient(client *c);
@@ -2106,6 +2140,7 @@ void unwatchAllKeys(client *c);
void initClientMultiState(client *c);
void freeClientMultiState(client *c);
void queueMultiCommand(client *c);
+size_t multiStateMemOverhead(client *c);
void touchWatchedKey(redisDb *db, robj *key);
int isWatchedKeyExpired(client *c);
void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with);
diff --git a/src/tracking.c b/src/tracking.c
index d6f2bf149..1e84cc3c1 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -305,6 +305,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
}
+ updateClientMemUsage(c);
}
/* This function is called when a key is modified in Redis and in the case
diff --git a/src/util.c b/src/util.c
index d57420007..61b0ed4f0 100644
--- a/src/util.c
+++ b/src/util.c
@@ -204,7 +204,10 @@ unsigned long long memtoull(const char *p, int *err) {
/* Search the first non digit character. */
u = p;
- if (*u == '-') u++;
+ if (*u == '-') {
+ if (err) *err = 1;
+ return 0;
+ }
while(*u && isdigit(*u)) u++;
if (*u == '\0' || !strcasecmp(u,"b")) {
mul = 1;
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index a834d4abd..67551b041 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -4,7 +4,7 @@ proc randstring {min max {type binary}} {
if {$type eq {binary}} {
set minval 0
set maxval 255
- } elseif {$type eq {alpha}} {
+ } elseif {$type eq {alpha} || $type eq {simplealpha}} {
set minval 48
set maxval 122
} elseif {$type eq {compr}} {
@@ -12,11 +12,10 @@ proc randstring {min max {type binary}} {
set maxval 52
}
while {$len} {
- set rr [expr {$minval+int(rand()*($maxval-$minval+1))}]
- if {$type eq {alpha} && $rr eq 92} {
- set rr 90; # avoid putting '\' char in the string, it can mess up TCL processing
- }
- append output [format "%c" $rr]
+ set rr [format "%c" [expr {$minval+int(rand()*($maxval-$minval+1))}]]
+ if {$type eq {simplealpha} && ![string is alnum $rr]} {continue}
+ if {$type eq {alpha} && $rr eq 92} {continue} ;# avoid putting '\' char in the string, it can mess up TCL processing
+ append output $rr
incr len -1
}
return $output
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 5150790db..a5cc2d4ac 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -82,6 +82,7 @@ set ::all_tests {
unit/oom-score-adj
unit/shutdown
unit/networking
+ unit/client-eviction
}
# Index to the next test to run in the ::all_tests list.
set ::next_test 0
diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl
new file mode 100644
index 000000000..9421d8b61
--- /dev/null
+++ b/tests/unit/client-eviction.tcl
@@ -0,0 +1,509 @@
+tags {"external:skip"} {
+
+# Get info about a redis client connection:
+# name - name of client we want to query
+# f - field name from "CLIENT LIST" we want to get
+proc client_field {name f} {
+ set clients [split [string trim [r client list]] "\r\n"]
+ set c [lsearch -inline $clients *name=$name*]
+ if {![regexp $f=(\[a-zA-Z0-9-\]+) $c - res]} {
+ error "no client named $name found with field $f"
+ }
+ return $res
+}
+
+proc client_exists {name} {
+ if {[catch { client_field $name tot-mem } e]} {
+ return false
+ }
+ return true
+}
+
+proc gen_client {} {
+ set rr [redis_client]
+ set name "tst_[randstring 4 4 simplealpha]"
+ $rr client setname $name
+ assert {[client_exists $name]}
+ return [list $rr $name]
+}
+
+# Sum a value across all redis client connections:
+# f - the field name from "CLIENT LIST" we want to sum
+proc clients_sum {f} {
+ set sum 0
+ set clients [split [string trim [r client list]] "\r\n"]
+ foreach c $clients {
+ if {![regexp $f=(\[a-zA-Z0-9-\]+) $c - res]} {
+ error "field $f not found in $c"
+ }
+ incr sum $res
+ }
+ return $sum
+}
+
+proc mb {v} {
+ return [expr $v * 1024 * 1024]
+}
+
+start_server {} {
+ set maxmemory_clients 3000000
+ r config set maxmemory-clients $maxmemory_clients
+
+ test "client evicted due to large argv" {
+ r flushdb
+ lassign [gen_client] rr cname
+ # Attempt a large multi-bulk command under eviction limit
+ $rr mset k v k2 [string repeat v 1000000]
+ assert_equal [$rr get k] v
+ # Attempt another command, now causing client eviction
+ catch { $rr mset k v k2 [string repeat v $maxmemory_clients] } e
+ assert {![client_exists $cname]}
+ $rr close
+ }
+
+ test "client evicted due to large query buf" {
+ r flushdb
+ lassign [gen_client] rr cname
+ # Attempt to fill the query buff without completing the argument above the limit, causing client eviction
+ catch {
+ $rr write [join [list "*1\r\n\$$maxmemory_clients\r\n" [string repeat v $maxmemory_clients]] ""]
+ $rr flush
+ $rr read
+ } e
+ assert {![client_exists $cname]}
+ $rr close
+ }
+
+ test "client evicted due to percentage of maxmemory" {
+ set maxmemory [mb 6]
+ r config set maxmemory $maxmemory
+ # Set client eviction threshold to 7% of maxmemory
+ set maxmemory_clients_p 7
+ r config set maxmemory-clients $maxmemory_clients_p%
+ r flushdb
+
+ set maxmemory_clients_actual [expr $maxmemory * $maxmemory_clients_p / 100]
+
+ lassign [gen_client] rr cname
+ # Attempt to fill the query buff with only half the percentage threshold verify we're not disconnected
+ set n [expr $maxmemory_clients_actual / 2]
+ $rr write [join [list "*1\r\n\$$n\r\n" [string repeat v $n]] ""]
+ $rr flush
+ set tot_mem [client_field $cname tot-mem]
+ assert {$tot_mem >= $n && $tot_mem < $maxmemory_clients_actual}
+
+ # Attempt to fill the query buff with the percentage threshold of maxmemory and verify we're evicted
+ $rr close
+ lassign [gen_client] rr cname
+ catch {
+ $rr write [join [list "*1\r\n\$$maxmemory_clients_actual\r\n" [string repeat v $maxmemory_clients_actual]] ""]
+ $rr flush
+ } e
+ assert {![client_exists $cname]}
+ $rr close
+
+ # Restore settings
+ r config set maxmemory 0
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to large multi buf" {
+ r flushdb
+ lassign [gen_client] rr cname
+
+ # Attempt a multi-exec where sum of commands is less than maxmemory_clients
+ $rr multi
+ $rr set k [string repeat v [expr $maxmemory_clients / 4]]
+ $rr set k [string repeat v [expr $maxmemory_clients / 4]]
+ assert_equal [$rr exec] {OK OK}
+
+ # Attempt a multi-exec where sum of commands is more than maxmemory_clients, causing client eviction
+ $rr multi
+ catch {
+ for {set j 0} {$j < 5} {incr j} {
+ $rr set k [string repeat v [expr $maxmemory_clients / 4]]
+ }
+ } e
+ assert {![client_exists $cname]}
+ $rr close
+ }
+
+ test "client evicted due to watched key list" {
+ r flushdb
+ set rr [redis_client]
+
+ # Since watched key list is a small overheatd this test uses a minimal maxmemory-clients config
+ set temp_maxmemory_clients 200000
+ r config set maxmemory-clients $temp_maxmemory_clients
+
+ # Append watched keys until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr watch $j
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Restore config for next tests
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to pubsub subscriptions" {
+ r flushdb
+
+ # Since pubsub subscriptions cause a small overheatd this test uses a minimal maxmemory-clients config
+ set temp_maxmemory_clients 200000
+ r config set maxmemory-clients $temp_maxmemory_clients
+
+ # Test eviction due to pubsub patterns
+ set rr [redis_client]
+ # Add patterns until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr psubscribe $j
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Test eviction due to pubsub channels
+ set rr [redis_client]
+ # Add patterns until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr subscribe $j
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Restore config for next tests
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to tracking redirection" {
+ r flushdb
+ # Use slow hz to avoid clientsCron from updating memory usage frequently since
+ # we're testing the update logic when writing tracking redirection output
+ set backup_hz [lindex [r config get hz] 1]
+ r config set hz 1
+
+ set rr [redis_client]
+ set redirected_c [redis_client]
+ $redirected_c client setname redirected_client
+ set redir_id [$redirected_c client id]
+ $redirected_c SUBSCRIBE __redis__:invalidate
+ $rr client tracking on redirect $redir_id bcast
+ # Use a big key name to fill the redirected tracking client's buffer quickly
+ set key_length [expr 1024*10]
+ set long_key [string repeat k $key_length]
+ # Use a script so we won't need to pass the long key name when dirtying it in the loop
+ set script_sha [$rr script load "redis.call('incr', '$long_key')"]
+ # Read and write to same (long) key until redirected_client's buffers cause it to be evicted
+ set t [clock milliseconds]
+ catch {
+ while true {
+ set mem [client_field redirected_client tot-mem]
+ assert {$mem < $maxmemory_clients}
+ $rr evalsha $script_sha 0
+ }
+ } e
+ assert_match {no client named redirected_client found*} $e
+
+ # Make sure eviction happened in less than 1sec, this means, statistically eviction
+ # wasn't caused by clientCron accounting
+ set t [expr [clock milliseconds] - $t]
+ assert {$t < 1000}
+
+ r config set hz $backup_hz
+ $rr close
+ $redirected_c close
+ }
+
+ test "client evicted due to client tracking prefixes" {
+ r flushdb
+ set rr [redis_client]
+
+ # Since tracking prefixes list is a small overheatd this test uses a minimal maxmemory-clients config
+ set temp_maxmemory_clients 200000
+ r config set maxmemory-clients $temp_maxmemory_clients
+
+ # Append tracking prefixes until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr client tracking on prefix [format %012s $j] bcast
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Restore config for next tests
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to output buf" {
+ r flushdb
+ r setrange k 200000 v
+ set rr [redis_deferring_client]
+ $rr client setname test_client
+ $rr flush
+ assert {[$rr read] == "OK"}
+ # Attempt a large response under eviction limit
+ $rr get k
+ $rr flush
+ assert {[string length [$rr read]] == 200001}
+ set mem [client_field test_client tot-mem]
+ assert {$mem < $maxmemory_clients}
+
+ # Fill output buff in loop without reading it and make sure
+ # we're eventually disconnected, but before reaching maxmemory_clients
+ while true {
+ if { [catch {
+ set mem [client_field test_client tot-mem]
+ assert {$mem < $maxmemory_clients}
+ $rr get k
+ $rr flush
+ } e]} {
+ assert {![client_exists test_client]}
+ break
+ }
+ }
+ $rr close
+ }
+
+ foreach {no_evict} {on off} {
+ test "client no-evict $no_evict" {
+ r flushdb
+ r client setname control
+ r client no-evict on ;# Avoid evicting the main connection
+ lassign [gen_client] rr cname
+ $rr client no-evict $no_evict
+
+ # Overflow maxmemory-clients
+ set qbsize [expr {$maxmemory_clients + 1}]
+ if {[catch {
+ $rr write [join [list "*1\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""]
+ $rr flush
+ wait_for_condition 200 10 {
+ [client_field $cname qbuf] == $qbsize
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+ } e] && $no_evict == off} {
+ assert {![client_exists $cname]}
+ } elseif {$no_evict == on} {
+ assert {[client_field $cname tot-mem] > $maxmemory_clients}
+ }
+ $rr close
+ }
+ }
+}
+
+start_server {} {
+ set server_pid [s process_id]
+ set maxmemory_clients [mb 10]
+ set obuf_limit [mb 3]
+ r config set maxmemory-clients $maxmemory_clients
+ r config set client-output-buffer-limit "normal $obuf_limit 0 0"
+
+ test "avoid client eviction when client is freed by output buffer limit" {
+ r flushdb
+ set obuf_size [expr {$obuf_limit + [mb 1]}]
+ r setrange k $obuf_size v
+ set rr1 [redis_client]
+ $rr1 client setname "qbuf-client"
+ set rr2 [redis_deferring_client]
+ $rr2 client setname "obuf-client1"
+ assert_equal [$rr2 read] OK
+ set rr3 [redis_deferring_client]
+ $rr3 client setname "obuf-client2"
+ assert_equal [$rr3 read] OK
+
+ # Occupy client's query buff with less than output buffer limit left to exceed maxmemory-clients
+ set qbsize [expr {$maxmemory_clients - $obuf_size}]
+ $rr1 write [join [list "*1\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""]
+ $rr1 flush
+ # Wait for qbuff to be as expected
+ wait_for_condition 200 10 {
+ [client_field qbuf-client qbuf] == $qbsize
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+
+ # Make the other two obuf-clients pass obuf limit and also pass maxmemory-clients
+ # We use two obuf-clients to make sure that even if client eviction is attempted
+ # between two command processing (with no sleep) we don't perform any client eviction
+ # because the obuf limit is enforced with precedence.
+ exec kill -SIGSTOP $server_pid
+ $rr2 get k
+ $rr2 flush
+ $rr3 get k
+ $rr3 flush
+ exec kill -SIGCONT $server_pid
+
+ # Validate obuf-clients were disconnected (because of obuf limit)
+ catch {client_field obuf-client1 name} e
+ assert_match {no client named obuf-client1 found*} $e
+ catch {client_field obuf-client2 name} e
+ assert_match {no client named obuf-client2 found*} $e
+
+ # Validate qbuf-client is still connected and wasn't evicted
+ assert_equal [client_field qbuf-client name] {qbuf-client}
+
+ $rr1 close
+ $rr2 close
+ $rr3 close
+ }
+}
+
+start_server {} {
+ test "decrease maxmemory-clients causes client eviction" {
+ set maxmemory_clients [mb 4]
+ set client_count 10
+ set qbsize [expr ($maxmemory_clients - [mb 1]) / $client_count]
+ r config set maxmemory-clients $maxmemory_clients
+
+
+ # Make multiple clients consume together roughly 1mb less than maxmemory_clients
+ set rrs {}
+ for {set j 0} {$j < $client_count} {incr j} {
+ set rr [redis_client]
+ lappend rrs $rr
+ $rr client setname client$j
+ $rr write [join [list "*2\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""]
+ $rr flush
+ wait_for_condition 200 10 {
+ [client_field client$j qbuf] >= $qbsize
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+ }
+
+ # Make sure all clients are still connected
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients == $client_count}
+
+ # Decrease maxmemory_clients and expect client eviction
+ r config set maxmemory-clients [expr $maxmemory_clients / 2]
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients > 0 && $connected_clients < $client_count}
+
+ foreach rr $rrs {$rr close}
+ }
+}
+
+start_server {} {
+ test "evict clients only until below limit" {
+ set client_count 10
+ set client_mem [mb 1]
+ r config set maxmemory-clients 0
+ r client setname control
+ r client no-evict on
+
+ # Make multiple clients consume together roughly 1mb less than maxmemory_clients
+ set total_client_mem 0
+ set rrs {}
+ for {set j 0} {$j < $client_count} {incr j} {
+ set rr [redis_client]
+ lappend rrs $rr
+ $rr client setname client$j
+ $rr write [join [list "*2\r\n\$$client_mem\r\n" [string repeat v $client_mem]] ""]
+ $rr flush
+ wait_for_condition 200 10 {
+ [client_field client$j tot-mem] >= $client_mem
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+ incr total_client_mem [client_field client$j tot-mem]
+ }
+
+ set client_actual_mem [expr $total_client_mem / $client_count]
+
+ # Make sure client_acutal_mem is more or equal to what we intended
+ assert {$client_actual_mem >= $client_mem}
+
+ # Make sure all clients are still connected
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients == $client_count}
+
+ # Set maxmemory-clients to accommodate half our clients (taking into account the control client)
+ set maxmemory_clients [expr ($client_actual_mem * $client_count) / 2 + [client_field control tot-mem]]
+ r config set maxmemory-clients $maxmemory_clients
+
+ # Make sure total used memory is below maxmemory_clients
+ set total_client_mem [clients_sum tot-mem]
+ assert {$total_client_mem <= $maxmemory_clients}
+
+ # Make sure we have only half of our clients now
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients == [expr $client_count / 2]}
+
+ foreach rr $rrs {$rr close}
+ }
+}
+
+start_server {} {
+ test "evict clients in right order (large to small)" {
+ # Note that each size step needs to be at least x2 larger than previous step
+ # because of how the client-eviction size bucktting works
+ set sizes [list 100000 [mb 1] [mb 3]]
+ set clients_per_size 3
+ r client setname control
+ r client no-evict on
+ r config set maxmemory-clients 0
+
+ # Run over all sizes and create some clients using up that size
+ set total_client_mem 0
+ set rrs {}
+ for {set i 0} {$i < [llength $sizes]} {incr i} {
+ set size [lindex $sizes $i]
+
+ for {set j 0} {$j < $clients_per_size} {incr j} {
+ set rr [redis_client]
+ lappend rrs $rr
+ $rr client setname client-$i
+ $rr write [join [list "*2\r\n\$$size\r\n" [string repeat v $size]] ""]
+ $rr flush
+ }
+ set client_mem [client_field client-$i tot-mem]
+
+ # Update our size list based on actual used up size (this is usually
+ # slightly more than expected because of allocator bins
+ assert {$client_mem >= $size}
+ set sizes [lreplace $sizes $i $i $client_mem]
+
+ # Account total client memory usage
+ incr total_mem [expr $clients_per_size * $client_mem]
+ }
+ incr total_mem [client_field control tot-mem]
+
+ # Make sure all clients are connected
+ set clients [split [string trim [r client list]] "\r\n"]
+ for {set i 0} {$i < [llength $sizes]} {incr i} {
+ assert_equal [llength [lsearch -all $clients "*name=client-$i *"]] $clients_per_size
+ }
+
+ # For each size reduce maxmemory-clients so relevant clients should be evicted
+ # do this from largest to smallest
+ foreach size [lreverse $sizes] {
+ set total_mem [expr $total_mem - $clients_per_size * $size]
+ r config set maxmemory-clients $total_mem
+ set clients [split [string trim [r client list]] "\r\n"]
+ # Verify only relevant clients were evicted
+ for {set i 0} {$i < [llength $sizes]} {incr i} {
+ set verify_size [lindex $sizes $i]
+ set count [llength [lsearch -all $clients "*name=client-$i *"]]
+ if {$verify_size < $size} {
+ assert_equal $count $clients_per_size
+ } else {
+ assert_equal $count 0
+ }
+ }
+ }
+ foreach rr $rrs {$rr close}
+ }
+}
+
+}
+
diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl
index dd986dd72..0bc0e4b6d 100644
--- a/tests/unit/maxmemory.tcl
+++ b/tests/unit/maxmemory.tcl
@@ -1,3 +1,140 @@
+start_server {tags {"maxmemory" "external:skip"}} {
+ r config set maxmemory 11mb
+ r config set maxmemory-policy allkeys-lru
+ set server_pid [s process_id]
+
+ proc init_test {client_eviction} {
+ r flushdb
+
+ set prev_maxmemory_clients [r config get maxmemory-clients]
+ if $client_eviction {
+ r config set maxmemory-clients 3mb
+ } else {
+ r config set maxmemory-clients 0
+ }
+
+ r config resetstat
+ # fill 5mb using 50 keys of 100kb
+ for {set j 0} {$j < 50} {incr j} {
+ r setrange $j 100000 x
+ }
+ assert_equal [r dbsize] 50
+ }
+
+ proc verify_test {client_eviction} {
+ set evicted_keys [s evicted_keys]
+ set evicted_clients [s evicted_clients]
+ set dbsize [r dbsize]
+
+ if $::verbose {
+ puts "evicted keys: $evicted_keys"
+ puts "evicted clients: $evicted_clients"
+ puts "dbsize: $dbsize"
+ }
+
+ if $client_eviction {
+ assert_morethan $evicted_clients 0
+ assert_equal $evicted_keys 0
+ assert_equal $dbsize 50
+ } else {
+ assert_equal $evicted_clients 0
+ assert_morethan $evicted_keys 0
+ assert_lessthan $dbsize 50
+ }
+ }
+
+ foreach {client_eviction} {false true} {
+ set clients {}
+ test "eviction due to output buffers of many MGET clients, client eviction: $client_eviction" {
+ init_test $client_eviction
+
+ for {set j 0} {$j < 20} {incr j} {
+ set rr [redis_deferring_client]
+ lappend clients $rr
+ }
+
+ # Freeze the server so output buffers will be filled in one event loop when we un-freeze after sending mgets
+ exec kill -SIGSTOP $server_pid
+ for {set j 0} {$j < 5} {incr j} {
+ foreach rr $clients {
+ $rr mget 1
+ $rr flush
+ }
+ }
+ # Unfreeze server
+ exec kill -SIGCONT $server_pid
+
+
+ for {set j 0} {$j < 5} {incr j} {
+ foreach rr $clients {
+ if {[catch { $rr read } err]} {
+ lremove clients $rr
+ }
+ }
+ }
+
+ verify_test $client_eviction
+ }
+ foreach rr $clients {
+ $rr close
+ }
+
+ set clients {}
+ test "eviction due to input buffer of a dead client, client eviction: $client_eviction" {
+ init_test $client_eviction
+
+ for {set j 0} {$j < 30} {incr j} {
+ set rr [redis_deferring_client]
+ lappend clients $rr
+ }
+
+ foreach rr $clients {
+ if {[catch {
+ $rr write "*250\r\n"
+ for {set j 0} {$j < 249} {incr j} {
+ $rr write "\$1000\r\n"
+ $rr write [string repeat x 1000]
+ $rr write "\r\n"
+ $rr flush
+ }
+ }]} {
+ lremove clients $rr
+ }
+ }
+
+ verify_test $client_eviction
+ }
+ foreach rr $clients {
+ $rr close
+ }
+
+ set clients {}
+ test "eviction due to output buffers of pubsub, client eviction: $client_eviction" {
+ init_test $client_eviction
+
+ for {set j 0} {$j < 10} {incr j} {
+ set rr [redis_deferring_client]
+ lappend clients $rr
+ }
+
+ foreach rr $clients {
+ $rr subscribe bla
+ $rr flush
+ }
+
+ for {set j 0} {$j < 40} {incr j} {
+ catch {r publish bla [string repeat x 100000]} err
+ }
+
+ verify_test $client_eviction
+ }
+ foreach rr $clients {
+ $rr close
+ }
+ }
+
+}
+
start_server {tags {"maxmemory external:skip"}} {
test "Without maxmemory small integers are shared" {
r config set maxmemory 0