summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--redis.conf19
-rw-r--r--src/blocked.c12
-rw-r--r--src/config.c152
-rw-r--r--src/debug.c19
-rw-r--r--src/multi.c9
-rw-r--r--src/networking.c236
-rw-r--r--src/object.c2
-rw-r--r--src/pubsub.c2
-rw-r--r--src/redis-cli.c2
-rw-r--r--src/replication.c1
-rw-r--r--src/server.c130
-rw-r--r--src/server.h51
-rw-r--r--src/tracking.c1
-rw-r--r--src/util.c5
-rw-r--r--tests/support/util.tcl11
-rw-r--r--tests/test_helper.tcl1
-rw-r--r--tests/unit/client-eviction.tcl509
-rw-r--r--tests/unit/maxmemory.tcl137
18 files changed, 1168 insertions, 131 deletions
diff --git a/redis.conf b/redis.conf
index 3eb7374cc..bd7fa5271 100644
--- a/redis.conf
+++ b/redis.conf
@@ -1841,6 +1841,25 @@ client-output-buffer-limit pubsub 32mb 8mb 60
#
# client-query-buffer-limit 1gb
+# In some scenarios client connections can hog up memory leading to OOM
+# errors or data eviction. To avoid this we can cap the accumulated memory
+# used by all client connections (all pubsub and normal clients). Once we
+# reach that limit connections will be dropped by the server freeing up
+# memory. The server will attempt to drop the connections using the most
+# memory first. We call this mechanism "client eviction".
+#
+# Client eviction is configured using the maxmemory-clients setting as follows:
+# 0 - client eviction is disabled (default)
+#
+# A memory value can be used for the client eviction threshold,
+# for example:
+# maxmemory-clients 1g
+#
+# A percentage value (between 1% and 100%) means the client eviction threshold
+# is based on a percentage of the maxmemory setting. For example to set client
+# eviction at 5% of maxmemory:
+# maxmemory-clients 5%
+
# In the Redis protocol, bulk requests, that are, elements representing single
# strings, are normally limited to 512 mb. However you can change this limit
# here, but must be 1mb or greater
diff --git a/src/blocked.c b/src/blocked.c
index 4898cdcbf..86aed2440 100644
--- a/src/blocked.c
+++ b/src/blocked.c
@@ -138,14 +138,14 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
/* If we have a queued command, execute it now. */
- if (processPendingCommandsAndResetClient(c) == C_ERR) {
- continue;
- }
- /* Then process client if it has more data in it's buffer. */
- if (c->querybuf && sdslen(c->querybuf) > 0) {
- processInputBuffer(c);
+ if (processPendingCommandsAndResetClient(c) == C_OK) {
+ /* Now process client if it has more data in it's buffer. */
+ if (c->querybuf && sdslen(c->querybuf) > 0) {
+ processInputBuffer(c);
+ }
}
}
+ beforeNextClient(c);
}
}
diff --git a/src/config.c b/src/config.c
index 349021f9e..2a8697e09 100644
--- a/src/config.c
+++ b/src/config.c
@@ -198,6 +198,10 @@ typedef enum numericType {
NUMERIC_TYPE_TIME_T,
} numericType;
+#define INTEGER_CONFIG 0 /* No flags means a simple integer configuration */
+#define MEMORY_CONFIG (1<<0) /* Indicates if this value can be loaded as a memory value */
+#define PERCENT_CONFIG (1<<1) /* Indicates if this value can be loaded as a percent (and stored as a negative int) */
+
typedef struct numericConfigData {
union {
int *i;
@@ -211,7 +215,7 @@ typedef struct numericConfigData {
off_t *ot;
time_t *tt;
} config; /* The pointer to the numeric config this value is stored in */
- int is_memory; /* Indicates if this value can be loaded as a memory value */
+ unsigned int flags;
numericType numeric_type; /* An enum indicating the type of this value */
long long lower_bound; /* The lower bound of this numeric value */
long long upper_bound; /* The upper bound of this numeric value */
@@ -1347,6 +1351,14 @@ void rewriteConfigBytesOption(struct rewriteConfigState *state, const char *opti
rewriteConfigRewriteLine(state,option,line,force);
}
+/* Rewrite a simple "option-name n%" configuration option. */
+void rewriteConfigPercentOption(struct rewriteConfigState *state, const char *option, long long value, long long defvalue) {
+ int force = value != defvalue;
+ sds line = sdscatprintf(sdsempty(),"%s %lld%%",option,value);
+
+ rewriteConfigRewriteLine(state,option,line,force);
+}
+
/* Rewrite a yes/no option. */
void rewriteConfigYesNoOption(struct rewriteConfigState *state, const char *option, int value, int defvalue) {
int force = value != defvalue;
@@ -2111,8 +2123,18 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) {
return 0;
}
} else {
+ /* Boundary check for percentages */
+ if (data.numeric.flags & PERCENT_CONFIG && ll < 0) {
+ if (ll < data.numeric.lower_bound) {
+ snprintf(loadbuf, LOADBUF_SIZE,
+ "percentage argument must be less or equal to %lld",
+ -data.numeric.lower_bound);
+ *err = loadbuf;
+ return 0;
+ }
+ }
/* Boundary check for signed types */
- if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) {
+ else if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) {
snprintf(loadbuf, LOADBUF_SIZE,
"argument must be between %lld and %lld inclusive",
data.numeric.lower_bound,
@@ -2124,22 +2146,46 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) {
return 1;
}
+static int numericParseString(typeData data, sds value, const char **err, long long *res) {
+ /* First try to parse as memory */
+ if (data.numeric.flags & MEMORY_CONFIG) {
+ int memerr;
+ *res = memtoull(value, &memerr);
+ if (!memerr)
+ return 1;
+ }
+
+ /* Attempt to parse as percent */
+ if (data.numeric.flags & PERCENT_CONFIG &&
+ sdslen(value) > 1 && value[sdslen(value)-1] == '%' &&
+ string2ll(value, sdslen(value)-1, res) &&
+ *res >= 0) {
+ /* We store percentage as negative value */
+ *res = -*res;
+ return 1;
+ }
+
+ /* Attempt a simple number (no special flags set) */
+ if (!data.numeric.flags && string2ll(value, sdslen(value), res))
+ return 1;
+
+ /* Select appropriate error string */
+ if (data.numeric.flags & MEMORY_CONFIG &&
+ data.numeric.flags & PERCENT_CONFIG)
+ *err = "argument must be a memory or percent value" ;
+ else if (data.numeric.flags & MEMORY_CONFIG)
+ *err = "argument must be a memory value";
+ else
+ *err = "argument couldn't be parsed into an integer";
+ return 0;
+}
static int numericConfigSet(typeData data, sds value, int update, const char **err) {
long long ll, prev = 0;
- if (data.numeric.is_memory) {
- int memerr;
- ll = memtoull(value, &memerr);
- if (memerr) {
- *err = "argument must be a memory value";
- return 0;
- }
- } else {
- if (!string2ll(value, sdslen(value), &ll)) {
- *err = "argument couldn't be parsed into an integer" ;
- return 0;
- }
- }
+
+ if (!numericParseString(data, value, err, &ll))
+ return 0;
+
if (!numericBoundaryCheck(data, ll, err))
return 0;
@@ -2158,21 +2204,21 @@ static int numericConfigSet(typeData data, sds value, int update, const char **e
static void numericConfigGet(client *c, typeData data) {
char buf[128];
- if (data.numeric.is_memory) {
- unsigned long long value = 0;
-
- GET_NUMERIC_TYPE(value)
- ull2string(buf, sizeof(buf), value);
- addReplyBulkCString(c, buf);
- } else{
- long long value = 0;
-
- GET_NUMERIC_TYPE(value)
+ long long value = 0;
+ GET_NUMERIC_TYPE(value)
+ if (data.numeric.flags & PERCENT_CONFIG && value < 0) {
+ int len = ll2string(buf, sizeof(buf), -value);
+ buf[len] = '%';
+ buf[len+1] = '\0';
+ }
+ else if (data.numeric.flags & MEMORY_CONFIG) {
+ ull2string(buf, sizeof(buf), value);
+ } else {
ll2string(buf, sizeof(buf), value);
- addReplyBulkCString(c, buf);
}
+ addReplyBulkCString(c, buf);
}
static void numericConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) {
@@ -2180,18 +2226,17 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite
GET_NUMERIC_TYPE(value)
- if (data.numeric.is_memory) {
+ if (data.numeric.flags & PERCENT_CONFIG && value < 0) {
+ rewriteConfigPercentOption(state, name, -value, data.numeric.default_value);
+ } else if (data.numeric.flags & MEMORY_CONFIG) {
rewriteConfigBytesOption(state, name, value, data.numeric.default_value);
} else {
rewriteConfigNumericalOption(state, name, value, data.numeric.default_value);
}
}
-#define INTEGER_CONFIG 0
-#define MEMORY_CONFIG 1
-
-#define embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) { \
- embedCommonConfig(name, alias, flags) \
+#define embedCommonNumericalConfig(name, alias, _flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) { \
+ embedCommonConfig(name, alias, _flags) \
embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \
.data.numeric = { \
.lower_bound = (lower), \
@@ -2199,73 +2244,73 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite
.default_value = (default), \
.is_valid_fn = (is_valid), \
.update_fn = (update), \
- .is_memory = (memory),
+ .flags = (num_conf_flags),
-#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_INT, \
.config.i = &(config_addr) \
} \
}
-#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_UINT, \
.config.ui = &(config_addr) \
} \
}
-#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_LONG, \
.config.l = &(config_addr) \
} \
}
-#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_ULONG, \
.config.ul = &(config_addr) \
} \
}
-#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_LONG_LONG, \
.config.ll = &(config_addr) \
} \
}
-#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_ULONG_LONG, \
.config.ull = &(config_addr) \
} \
}
-#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_SIZE_T, \
.config.st = &(config_addr) \
} \
}
-#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_SSIZE_T, \
.config.sst = &(config_addr) \
} \
}
-#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_TIME_T, \
.config.tt = &(config_addr) \
} \
}
-#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
- embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \
+#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
+ embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \
.numeric_type = NUMERIC_TYPE_OFF_T, \
.config.ot = &(config_addr) \
} \
@@ -2653,6 +2698,7 @@ standardConfig configs[] = {
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */
+ createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL),
/* Other configs */
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
diff --git a/src/debug.c b/src/debug.c
index d29d48673..4e95e2dfd 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -469,6 +469,8 @@ void debugCommand(client *c) {
" Return the size of different Redis core C structures.",
"ZIPLIST <key>",
" Show low level info about the ziplist encoding of <key>.",
+"CLIENT-EVICTION",
+" Show low level client eviction pools info (maxmemory-clients).",
NULL
};
addReplyHelp(c, help);
@@ -883,6 +885,23 @@ NULL
addReplyError(c, "CONFIG-REWRITE-FORCE-ALL failed");
else
addReply(c, shared.ok);
+ } else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) {
+ sds bucket_info = sdsempty();
+ for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
+ if (j == 0)
+ bucket_info = sdscatprintf(bucket_info, "bucket 0");
+ else
+ bucket_info = sdscatprintf(bucket_info, "bucket %10zu", (size_t)1<<(j-1+CLIENT_MEM_USAGE_BUCKET_MIN_LOG));
+ if (j == CLIENT_MEM_USAGE_BUCKETS-1)
+ bucket_info = sdscatprintf(bucket_info, "+ : ");
+ else
+ bucket_info = sdscatprintf(bucket_info, " - %10zu: ", ((size_t)1<<(j+CLIENT_MEM_USAGE_BUCKET_MIN_LOG))-1);
+ bucket_info = sdscatprintf(bucket_info, "tot-mem: %10zu, clients: %lu\n",
+ server.client_mem_usage_buckets[j].mem_usage_sum,
+ server.client_mem_usage_buckets[j].clients->len);
+ }
+ addReplyVerbatim(c,bucket_info,sdslen(bucket_info),"txt");
+ sdsfree(bucket_info);
#ifdef USE_JEMALLOC
} else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) {
mallctl_int(c, c->argv+2, c->argc-2);
diff --git a/src/multi.c b/src/multi.c
index e40d2a447..b02457bb9 100644
--- a/src/multi.c
+++ b/src/multi.c
@@ -37,6 +37,7 @@ void initClientMultiState(client *c) {
c->mstate.count = 0;
c->mstate.cmd_flags = 0;
c->mstate.cmd_inv_flags = 0;
+ c->mstate.argv_len_sums = 0;
}
/* Release all the resources associated with MULTI/EXEC state */
@@ -78,6 +79,7 @@ void queueMultiCommand(client *c) {
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
+ c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc;
}
void discardTransaction(client *c) {
@@ -435,3 +437,10 @@ void unwatchCommand(client *c) {
c->flags &= (~CLIENT_DIRTY_CAS);
addReply(c,shared.ok);
}
+
+size_t multiStateMemOverhead(client *c) {
+ size_t mem = c->mstate.argv_len_sums;
+ /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */
+ mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey));
+ return mem;
+}
diff --git a/src/networking.c b/src/networking.c
index 3b522fb35..21278d783 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -180,15 +180,18 @@ client *createClient(connection *conn) {
c->sockname = NULL;
c->client_list_node = NULL;
c->paused_list_node = NULL;
+ c->pending_read_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
- c->client_cron_last_memory_usage = 0;
- c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;
+ c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0;
+ c->last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
+ c->mem_usage_bucket = NULL;
+ c->mem_usage_bucket_node = NULL;
if (conn) linkClient(c);
initClientMultiState(c);
return c;
@@ -267,7 +270,7 @@ int prepareClientToWrite(client *c) {
* not install a write handler. Instead, it will be done by
* handleClientsWithPendingReadsUsingThreads() upon return.
*/
- if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
+ if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
@@ -1288,13 +1291,13 @@ void unlinkClient(client *c) {
}
/* Remove from the list of pending reads if needed. */
- if (c->flags & CLIENT_PENDING_READ) {
- ln = listSearchKey(server.clients_pending_read,c);
- serverAssert(ln != NULL);
- listDelNode(server.clients_pending_read,ln);
- c->flags &= ~CLIENT_PENDING_READ;
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ if (c->pending_read_list_node != NULL) {
+ listDelNode(server.clients_pending_read,c->pending_read_list_node);
+ c->pending_read_list_node = NULL;
}
+
/* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */
if (c->flags & CLIENT_UNBLOCKED) {
@@ -1430,10 +1433,15 @@ void freeClient(client *c) {
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
- /* Remove the contribution that this client gave to our
+ /* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
- server.stat_clients_type_memory[c->client_cron_last_memory_type] -=
- c->client_cron_last_memory_usage;
+ server.stat_clients_type_memory[c->last_memory_type] -=
+ c->last_memory_usage;
+ /* Remove client from memory usage buckets */
+ if (c->mem_usage_bucket) {
+ c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
+ listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
+ }
/* Release other dynamically allocated client structure fields,
* and finally release the client structure itself. */
@@ -1470,6 +1478,27 @@ void freeClientAsync(client *c) {
pthread_mutex_unlock(&async_free_queue_mutex);
}
+/* Perform processing of the client before moving on to processing the next client
+ * this is useful for performing operations that affect the global state but can't
+ * wait until we're done with all clients. In other words can't wait until beforeSleep()
+ * return C_ERR in case client is no longer valid after call. */
+int beforeNextClient(client *c) {
+ /* Skip the client processing if we're in an IO thread, in that case we'll perform
+ this operation later (this function is called again) in the fan-in stage of the threading mechanism */
+ if (io_threads_op != IO_THREADS_OP_IDLE)
+ return C_OK;
+ /* Handle async frees */
+ /* Note: this doesn't make the server.clients_to_close list redundant because of
+ * cases where we want an async free of a client other than myself. For example
+ * in ACL modifications we disconnect clients authenticated to non-existent
+ * users (see ACL LOAD). */
+ if (c->flags & CLIENT_CLOSE_ASAP) {
+ freeClient(c);
+ return C_ERR;
+ }
+ return C_OK;
+}
+
/* Free the clients marked as CLOSE_ASAP, return the number of clients
* freed. */
int freeClientsInAsyncFreeQueue(void) {
@@ -1594,7 +1623,10 @@ int writeToClient(client *c, int handler_installed) {
* adDeleteFileEvent() is not thread safe: however writeToClient()
* is always called with handler_installed set to 0 from threads
* so we are fine. */
- if (handler_installed) connSetWriteHandler(c->conn, NULL);
+ if (handler_installed) {
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ connSetWriteHandler(c->conn, NULL);
+ }
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
@@ -1602,6 +1634,7 @@ int writeToClient(client *c, int handler_installed) {
return C_ERR;
}
}
+ updateClientMemUsage(c);
return C_OK;
}
@@ -2036,7 +2069,11 @@ int processCommandAndResetClient(client *c) {
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
+ /* Update the client's memory to include output buffer growth following the
+ * processed command. */
+ updateClientMemUsage(c);
}
+
if (server.current_client == NULL) deadclient = 1;
/*
* Restore the old client, this is needed because when a script
@@ -2117,7 +2154,8 @@ void processInputBuffer(client *c) {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
- if (c->flags & CLIENT_PENDING_READ) {
+ if (io_threads_op != IO_THREADS_OP_IDLE) {
+ serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
@@ -2137,6 +2175,11 @@ void processInputBuffer(client *c) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
+
+ /* Update client memory usage after processing the query buffer, this is
+ * important in case the query buffer is big and wasn't drained during
+ * the above loop (because of partially sent big commands). */
+ updateClientMemUsage(c);
}
void readQueryFromClient(connection *conn) {
@@ -2190,7 +2233,7 @@ void readQueryFromClient(connection *conn) {
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
- return;
+ goto done;
}
} else if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
@@ -2199,7 +2242,7 @@ void readQueryFromClient(connection *conn) {
sdsfree(info);
}
freeClientAsync(c);
- return;
+ goto done;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
@@ -2223,12 +2266,15 @@ void readQueryFromClient(connection *conn) {
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
- return;
+ goto done;
}
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
+
+done:
+ beforeNextClient(c);
}
/* A Redis "Address String" is a colon separated ip:port pair.
@@ -2306,6 +2352,7 @@ sds catClientInfoString(sds s, client *client) {
if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
if (client->flags & CLIENT_READONLY) *p++ = 'r';
+ if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
if (p == flags) *p++ = 'N';
*p++ = '\0';
@@ -2317,19 +2364,10 @@ sds catClientInfoString(sds s, client *client) {
*p = '\0';
/* Compute the total memory consumed by this client. */
- size_t obufmem = getClientOutputBufferMemoryUsage(client);
- size_t total_mem = obufmem;
- total_mem += zmalloc_size(client); /* includes client->buf */
- total_mem += sdsZmallocSize(client->querybuf);
- /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
- * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
- * spot problematic clients. */
- total_mem += client->argv_len_sum;
- if (client->argv)
- total_mem += zmalloc_size(client->argv);
+ size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
return sdscatfmt(s,
- "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
+ "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
(unsigned long long) client->id,
getClientPeerId(client),
getClientSockname(client),
@@ -2345,6 +2383,7 @@ sds catClientInfoString(sds s, client *client) {
(unsigned long long) sdslen(client->querybuf),
(unsigned long long) sdsavail(client->querybuf),
(unsigned long long) client->argv_len_sum,
+ (unsigned long long) client->mstate.argv_len_sums,
(unsigned long long) client->bufpos,
(unsigned long long) listLength(client->reply),
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
@@ -2565,6 +2604,18 @@ NULL
addReplyErrorObject(c,shared.syntaxerr);
return;
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) {
+ /* CLIENT PROTECT ON|OFF */
+ if (!strcasecmp(c->argv[2]->ptr,"on")) {
+ c->flags |= CLIENT_NO_EVICT;
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
+ c->flags &= ~CLIENT_NO_EVICT;
+ addReply(c,shared.ok);
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
} else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
/* CLIENT KILL <ip:port>
* CLIENT KILL <option> [value] ... <option> [value] */
@@ -3154,11 +3205,39 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* Note: this function is very fast so can be called as many time as
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
-unsigned long getClientOutputBufferMemoryUsage(client *c) {
- unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
+size_t getClientOutputBufferMemoryUsage(client *c) {
+ size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
+/* Returns the total client's memory usage.
+ * Optionally, if output_buffer_mem_usage is not NULL, it fills it with
+ * the client output buffer memory usage portion of the total. */
+size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
+ size_t mem = getClientOutputBufferMemoryUsage(c);
+ if (output_buffer_mem_usage != NULL)
+ *output_buffer_mem_usage = mem;
+ mem += sdsZmallocSize(c->querybuf);
+ mem += zmalloc_size(c);
+ /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
+ * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
+ * spot problematic clients. */
+ mem += c->argv_len_sum + sizeof(robj*)*c->argc;
+ mem += multiStateMemOverhead(c);
+
+ /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers
+ * to the strings themselves because they aren't stored per client. */
+ mem += listLength(c->pubsub_patterns) * sizeof(listNode);
+ mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) +
+ dictSlots(c->pubsub_channels) * sizeof(dictEntry*);
+
+ /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */
+ if (c->client_tracking_prefixes)
+ mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
+
+ return mem;
+}
+
/* Get the class of a client, used in order to enforce limits to different
* classes of clients.
*
@@ -3425,13 +3504,11 @@ void processEventsWhileBlocked(void) {
* ========================================================================== */
#define IO_THREADS_MAX_NUM 128
-#define IO_THREADS_OP_READ 0
-#define IO_THREADS_OP_WRITE 1
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
-int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
+int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??!
/* This is the list of clients each thread will serve when threaded I/O is
* used. We spawn io_threads_num-1 threads, since one is the main thread
@@ -3498,6 +3575,9 @@ void *IOThreadMain(void *myid) {
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
+ /* Indicate that io-threads are currently idle */
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
@@ -3584,6 +3664,12 @@ int stopThreadedIOIfNeeded(void) {
}
}
+/* This function achieves thread safety using a fan-out -> fan-in paradigm:
+ * Fan out: The main thread fans out work to the io-threads which block until
+ * setIOPendingCount() is called with a value larger than 0 by the main thread.
+ * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
+ * it can safely perform post-processing and return to normal synchronous
+ * work. */
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
@@ -3642,12 +3728,17 @@ int handleClientsWithPendingWritesUsingThreads(void) {
if (pending == 0) break;
}
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
+ /* Update the client in the mem usage buckets after we're done processing it in the io-threads */
+ updateClientMemUsageBucket(c);
+
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
@@ -3672,10 +3763,11 @@ int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
- !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
+ !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
+ io_threads_op == IO_THREADS_OP_IDLE)
{
- c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
+ c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
} else {
return 0;
@@ -3687,7 +3779,13 @@ int postponeClientRead(client *c) {
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
- * rendering it in the client structures. */
+ * rendering it in the client structures.
+ * This function achieves thread safety using a fan-out -> fan-in paradigm:
+ * Fan out: The main thread fans out work to the io-threads which block until
+ * setIOPendingCount() is called with a value larger than 0 by the main thread.
+ * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
+ * it can safely perform post-processing and return to normal synchronous
+ * work. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
@@ -3729,14 +3827,27 @@ int handleClientsWithPendingReadsUsingThreads(void) {
if (pending == 0) break;
}
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
- c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
+ c->pending_read_list_node = NULL;
serverAssert(!(c->flags & CLIENT_BLOCKED));
+
+ if (beforeNextClient(c) == C_ERR) {
+ /* If the client is no longer valid, we avoid
+ * processing the client later. So we just go
+ * to the next. */
+ continue;
+ }
+
+ /* Once io-threads are idle we can update the client in the mem usage buckets */
+ updateClientMemUsageBucket(c);
+
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
@@ -3758,3 +3869,56 @@ int handleClientsWithPendingReadsUsingThreads(void) {
return processed;
}
+
+/* Returns the actual client eviction limit based on current configuration or
+ * 0 if no limit. */
+size_t getClientEvictionLimit(void) {
+ size_t maxmemory_clients_actual = SIZE_MAX;
+
+ /* Handle percentage of maxmemory*/
+ if (server.maxmemory_clients < 0 && server.maxmemory > 0) {
+ unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100);
+ if (maxmemory_clients_bytes <= SIZE_MAX)
+ maxmemory_clients_actual = maxmemory_clients_bytes;
+ }
+ else if (server.maxmemory_clients > 0)
+ maxmemory_clients_actual = server.maxmemory_clients;
+ else
+ return 0;
+
+ /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate
+ * at all with the server because of bad configuration */
+ if (maxmemory_clients_actual < 1024*128)
+ maxmemory_clients_actual = 1024*128;
+
+ return maxmemory_clients_actual;
+}
+
+void evictClients(void) {
+ /* Start eviction from topmost bucket (largest clients) */
+ int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
+ listIter bucket_iter;
+ listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
+ size_t client_eviction_limit = getClientEvictionLimit();
+ if (client_eviction_limit == 0)
+ return;
+ while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) {
+ listNode *ln = listNext(&bucket_iter);
+ if (ln) {
+ client *c = ln->value;
+ sds ci = catClientInfoString(sdsempty(),c);
+ serverLog(LL_NOTICE, "Evicting client: %s", ci);
+ freeClient(c);
+ sdsfree(ci);
+ server.stat_evictedclients++;
+ } else {
+ curr_bucket--;
+ if (curr_bucket < 0) {
+ serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients");
+ break;
+ }
+ listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
+ }
+ }
+}
diff --git a/src/object.c b/src/object.c
index 0f869ea7e..edbd56acb 100644
--- a/src/object.c
+++ b/src/object.c
@@ -1180,7 +1180,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
/* Computing the memory used by the clients would be O(N) if done
* here online. We use our values computed incrementally by
- * clientsCronTrackClientsMemUsage(). */
+ * updateClientMemUsage(). */
mh->clients_slaves = server.stat_clients_type_memory[CLIENT_TYPE_SLAVE];
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
diff --git a/src/pubsub.c b/src/pubsub.c
index 0169b3604..e0bbc6d94 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -304,6 +304,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message);
+ updateClientMemUsage(c);
receivers++;
}
}
@@ -323,6 +324,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
+ updateClientMemUsage(c);
receivers++;
}
}
diff --git a/src/redis-cli.c b/src/redis-cli.c
index 7144d2bc2..f3e6f6082 100644
--- a/src/redis-cli.c
+++ b/src/redis-cli.c
@@ -1286,6 +1286,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
!strcasecmp(argv[1],"htstats")) ||
(argc >= 2 && !strcasecmp(command,"debug") &&
!strcasecmp(argv[1],"htstats-key")) ||
+ (argc >= 2 && !strcasecmp(command,"debug") &&
+ !strcasecmp(argv[1],"client-eviction")) ||
(argc >= 2 && !strcasecmp(command,"memory") &&
(!strcasecmp(argv[1],"malloc-stats") ||
!strcasecmp(argv[1],"doctor"))) ||
diff --git a/src/replication.c b/src/replication.c
index e1d39582c..8c692df8d 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -414,6 +414,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
+ updateClientMemUsage(c);
}
decrRefCount(cmdobj);
}
diff --git a/src/server.c b/src/server.c
index d8555619c..9b8fd48ef 100644
--- a/src/server.c
+++ b/src/server.c
@@ -2169,30 +2169,94 @@ int clientsCronTrackExpansiveClients(client *c, int time_idx) {
return 0; /* This function never terminates the client. */
}
-/* Iterating all the clients in getMemoryOverheadData() is too slow and
- * in turn would make the INFO command too slow. So we perform this
- * computation incrementally and track the (not instantaneous but updated
- * to the second) total memory used by clients using clientsCron() in
- * a more incremental way (depending on server.hz). */
-int clientsCronTrackClientsMemUsage(client *c) {
- size_t mem = 0;
+/* All normal clients are placed in one of the "mem usage buckets" according
+ * to how much memory they currently use. We use this function to find the
+ * appropriate bucket based on a given memory usage value. The algorithm simply
+ * does a log2(mem) to ge the bucket. This means, for examples, that if a
+ * client's memory usage doubles it's moved up to the next bucket, if it's
+ * halved we move it down a bucket.
+ * For more details see CLIENT_MEM_USAGE_BUCKETS documentation in server.h. */
+clientMemUsageBucket *getMemUsageBucket(size_t mem) {
+ int size_in_bits = 8*(int)sizeof(mem);
+ int clz = mem > 0 ? __builtin_clzl(mem) : size_in_bits;
+ int bucket_idx = size_in_bits - clz;
+ if (bucket_idx > CLIENT_MEM_USAGE_BUCKET_MAX_LOG)
+ bucket_idx = CLIENT_MEM_USAGE_BUCKET_MAX_LOG;
+ else if (bucket_idx < CLIENT_MEM_USAGE_BUCKET_MIN_LOG)
+ bucket_idx = CLIENT_MEM_USAGE_BUCKET_MIN_LOG;
+ bucket_idx -= CLIENT_MEM_USAGE_BUCKET_MIN_LOG;
+ return &server.client_mem_usage_buckets[bucket_idx];
+}
+
+/* This is called both on explicit clients when something changed their buffers,
+ * so we can track clients' memory and enforce clients' maxmemory in real time,
+ * and also from the clientsCron. We call it from the cron so we have updated
+ * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration
+ * change requires us to evict a non-active client.
+ */
+int updateClientMemUsage(client *c) {
+ size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
- mem += getClientOutputBufferMemoryUsage(c);
- mem += sdsZmallocSize(c->querybuf);
- mem += zmalloc_size(c);
- mem += c->argv_len_sum;
- if (c->argv) mem += zmalloc_size(c->argv);
- /* Now that we have the memory used by the client, remove the old
- * value from the old category, and add it back. */
- server.stat_clients_type_memory[c->client_cron_last_memory_type] -=
- c->client_cron_last_memory_usage;
- server.stat_clients_type_memory[type] += mem;
+
+ /* Remove the old value of the memory used by the client from the old
+ * category, and add it back. */
+ atomicDecr(server.stat_clients_type_memory[c->last_memory_type], c->last_memory_usage);
+ atomicIncr(server.stat_clients_type_memory[type], mem);
+
/* Remember what we added and where, to remove it next time. */
- c->client_cron_last_memory_usage = mem;
- c->client_cron_last_memory_type = type;
+ c->last_memory_usage = mem;
+ c->last_memory_type = type;
+
+ /* Update client mem usage bucket only when we're not in the context of an
+ * IO thread. See updateClientMemUsageBucket() for details. */
+ if (io_threads_op == IO_THREADS_OP_IDLE)
+ updateClientMemUsageBucket(c);
+
return 0;
}
+/* Adds the client to the correct memory usage bucket. Each bucket contains
+ * all clients with roughly the same amount of memory. This way we group
+ * together clients consuming about the same amount of memory and can quickly
+ * free them in case we reach maxmemory-clients (client eviction).
+ * Note that in case of io-threads enabled we have to call this function only
+ * after the fan-in phase (when no io-threads are working) because the bucket
+ * lists are global. The io-threads themselves track per-client memory usage in
+ * updateClientMemUsage(). Here we update the clients to each bucket when all
+ * io-threads are done (both for read and write io-threading). */
+void updateClientMemUsageBucket(client *c) {
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ int allow_eviction =
+ (c->last_memory_type == CLIENT_TYPE_NORMAL || c->last_memory_type == CLIENT_TYPE_PUBSUB) &&
+ !(c->flags & CLIENT_NO_EVICT);
+
+ /* Update the client in the mem usage buckets */
+ if (c->mem_usage_bucket) {
+ c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage_on_bucket_update;
+ /* If this client can't be evicted then remove it from the mem usage
+ * buckets */
+ if (!allow_eviction) {
+ listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
+ c->mem_usage_bucket = NULL;
+ c->mem_usage_bucket_node = NULL;
+ }
+ }
+ if (allow_eviction) {
+ clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage);
+ bucket->mem_usage_sum += c->last_memory_usage;
+ if (bucket != c->mem_usage_bucket) {
+ if (c->mem_usage_bucket)
+ listDelNode(c->mem_usage_bucket->clients,
+ c->mem_usage_bucket_node);
+ c->mem_usage_bucket = bucket;
+ listAddNodeTail(bucket->clients, c);
+ c->mem_usage_bucket_node = listLast(bucket->clients);
+ }
+ }
+
+ c->last_memory_usage_on_bucket_update = c->last_memory_usage;
+}
+
/* Return the max samples in the memory usage of clients tracked by
* the function clientsCronTrackExpansiveClients(). */
void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) {
@@ -2271,7 +2335,13 @@ void clientsCron(void) {
if (clientsCronHandleTimeout(c,now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
- if (clientsCronTrackClientsMemUsage(c)) continue;
+
+ /* Iterating all the clients in getMemoryOverheadData() is too slow and
+ * in turn would make the INFO command too slow. So we perform this
+ * computation incrementally and track the (not instantaneous but updated
+ * to the second) total memory used by clients using clientsCron() in
+ * a more incremental way (depending on server.hz). */
+ if (updateClientMemUsage(c)) continue;
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
}
}
@@ -2879,6 +2949,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* visit processCommand() at all). */
handleClientsBlockedOnKeys();
+ /* Disconnect some clients if they are consuming too much memory. */
+ evictClients();
+
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
@@ -3509,6 +3582,7 @@ void resetServerStats(void) {
server.stat_expired_time_cap_reached_count = 0;
server.stat_expire_cycle_time_used = 0;
server.stat_evictedkeys = 0;
+ server.stat_evictedclients = 0;
server.stat_total_eviction_exceeded_time = 0;
server.stat_last_eviction_exceeded_time = 0;
server.stat_keyspace_misses = 0;
@@ -3606,6 +3680,11 @@ void initServer(void) {
exit(1);
}
+ for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
+ server.client_mem_usage_buckets[j].mem_usage_sum = 0;
+ server.client_mem_usage_buckets[j].clients = listCreate();
+ }
+
createSharedObjects();
adjustOpenFilesLimit();
const char *clk_msg = monotonicInit();
@@ -4610,6 +4689,15 @@ int processCommand(client *c) {
}
}
+ /* Disconnect some clients if total clients memory is too high. We do this
+ * before key eviction, after the last command was executed and consumed
+ * some client output buffer memory. */
+ evictClients();
+ if (server.current_client == NULL) {
+ /* If we evicted ourself then abort processing the command */
+ return C_ERR;
+ }
+
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
@@ -5673,6 +5761,7 @@ sds genRedisInfoString(const char *section) {
"expired_time_cap_reached_count:%lld\r\n"
"expire_cycle_cpu_milliseconds:%lld\r\n"
"evicted_keys:%lld\r\n"
+ "evicted_clients:%lld\r\n"
"total_eviction_exceeded_time:%lld\r\n"
"current_eviction_exceeded_time:%lld\r\n"
"keyspace_hits:%lld\r\n"
@@ -5715,6 +5804,7 @@ sds genRedisInfoString(const char *section) {
server.stat_expired_time_cap_reached_count,
server.stat_expire_cycle_time_used/1000,
server.stat_evictedkeys,
+ server.stat_evictedclients,
(server.stat_total_eviction_exceeded_time + current_eviction_exceeded_time) / 1000,
current_eviction_exceeded_time / 1000,
server.stat_keyspace_hits,
diff --git a/src/server.h b/src/server.h
index 39184591d..3a94effe9 100644
--- a/src/server.h
+++ b/src/server.h
@@ -125,6 +125,12 @@ typedef long long ustime_t; /* microsecond time type. */
#define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}"
+/* Bucket sizes for client eviction pools. Each bucket stores clients with
+ * memory usage of up to twice the size of the bucket below it. */
+#define CLIENT_MEM_USAGE_BUCKET_MIN_LOG 15 /* Bucket sizes start at up to 32KB (2^15) */
+#define CLIENT_MEM_USAGE_BUCKET_MAX_LOG 33 /* Bucket for largest clients: sizes above 4GB (2^32) */
+#define CLIENT_MEM_USAGE_BUCKETS (1+CLIENT_MEM_USAGE_BUCKET_MAX_LOG-CLIENT_MEM_USAGE_BUCKET_MIN_LOG)
+
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
#define ACTIVE_EXPIRE_CYCLE_FAST 1
@@ -275,9 +281,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
-#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put
- in the list of clients we can read
- from. */
+/* #define CLIENT_... (1<<29) currently unused, feel free to use in the future */
#define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully
* parsed command ready for execution. */
#define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to
@@ -299,6 +303,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
and AOF client */
#define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants
RDB without replication buffer. */
+#define CLIENT_NO_EVICT (1ULL<<43) /* This client is protected against client
+ memory eviction. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -797,6 +803,7 @@ typedef struct multiState {
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
is possible to know if all the commands have a
certain flag. */
+ size_t argv_len_sums; /* mem used by all commands arguments */
} multiState;
/* This structure holds the blocking operation state for a client.
@@ -912,6 +919,11 @@ typedef struct {
need more reserved IDs use UINT64_MAX-1,
-2, ... and so forth. */
+typedef struct {
+ list *clients;
+ size_t mem_usage_sum;
+} clientMemUsageBucket;
+
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
@@ -976,6 +988,7 @@ typedef struct client {
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *paused_list_node; /* list node within the pause list */
+ listNode *pending_read_list_node; /* list node in clients pending read list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
@@ -993,13 +1006,18 @@ typedef struct client {
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
- /* In clientsCronTrackClientsMemUsage() we track the memory usage of
+ /* In updateClientMemUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which category the client was, in order to remove it
* before adding it the new value. */
- uint64_t client_cron_last_memory_usage;
- int client_cron_last_memory_type;
+ size_t last_memory_usage;
+ int last_memory_type;
+
+ size_t last_memory_usage_on_bucket_update;
+ listNode *mem_usage_bucket_node;
+ clientMemUsageBucket *mem_usage_bucket;
+
/* Response buffer */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
@@ -1288,6 +1306,10 @@ struct redisServer {
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* Current client executing the command. */
+
+ /* Stuff for client mem eviction */
+ clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS];
+
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
rax *clients_index; /* Active clients dictionary by client ID. */
@@ -1319,6 +1341,7 @@ struct redisServer {
long long stat_expired_time_cap_reached_count; /* Early expire cycle stops.*/
long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
+ long long stat_evictedclients; /* Number of evicted clients */
long long stat_total_eviction_exceeded_time; /* Total time over the memory limit, unit us */
monotime stat_last_eviction_exceeded_time; /* Timestamp of current eviction start, unit us */
long long stat_keyspace_hits; /* Number of successful lookups of keys */
@@ -1354,7 +1377,7 @@ struct redisServer {
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
double stat_module_progress; /* Module save progress. */
- uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
+ redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
@@ -1553,6 +1576,7 @@ struct redisServer {
/* Limits */
unsigned int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
+ ssize_t maxmemory_clients; /* Memory limit for total client buffers */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Precision of random sampling */
int maxmemory_eviction_tenacity;/* Aggressiveness of eviction processing */
@@ -1882,6 +1906,11 @@ typedef struct {
#define OBJ_HASH_KEY 1
#define OBJ_HASH_VALUE 2
+#define IO_THREADS_OP_IDLE 0
+#define IO_THREADS_OP_READ 1
+#define IO_THREADS_OP_WRITE 2
+extern int io_threads_op;
+
/*-----------------------------------------------------------------------------
* Extern declarations
*----------------------------------------------------------------------------*/
@@ -1966,6 +1995,7 @@ void redisSetCpuAffinity(const char *cpulist);
client *createClient(connection *conn);
void freeClient(client *c);
void freeClientAsync(client *c);
+int beforeNextClient(client *c);
void resetClient(client *c);
void freeClientOriginalArgv(client *c);
void sendReplyToClient(connection *conn);
@@ -2026,7 +2056,8 @@ void rewriteClientCommandVector(client *c, int argc, ...);
void rewriteClientCommandArgument(client *c, int i, robj *newval);
void replaceClientCommandVector(client *c, int argc, robj **argv);
void redactClientCommandArgument(client *c, int argc);
-unsigned long getClientOutputBufferMemoryUsage(client *c);
+size_t getClientOutputBufferMemoryUsage(client *c);
+size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage);
int freeClientsInAsyncFreeQueue(void);
int closeClientOnOutputBufferLimitReached(client *c, int async);
int getClientType(client *c);
@@ -2034,6 +2065,7 @@ int getClientTypeByName(char *name);
char *getClientTypeName(int class);
void flushSlavesOutputBuffers(void);
void disconnectSlaves(void);
+void evictClients(void);
int listenToPort(int port, socketFds *fds);
void pauseClients(mstime_t duration, pause_type type);
void unpauseClients(void);
@@ -2048,6 +2080,8 @@ int handleClientsWithPendingWritesUsingThreads(void);
int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c);
+int updateClientMemUsage(client *c);
+void updateClientMemUsageBucket(client *c);
void unlinkClient(client *c);
int writeToClient(client *c, int handler_installed);
void linkClient(client *c);
@@ -2106,6 +2140,7 @@ void unwatchAllKeys(client *c);
void initClientMultiState(client *c);
void freeClientMultiState(client *c);
void queueMultiCommand(client *c);
+size_t multiStateMemOverhead(client *c);
void touchWatchedKey(redisDb *db, robj *key);
int isWatchedKeyExpired(client *c);
void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with);
diff --git a/src/tracking.c b/src/tracking.c
index d6f2bf149..1e84cc3c1 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -305,6 +305,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
}
+ updateClientMemUsage(c);
}
/* This function is called when a key is modified in Redis and in the case
diff --git a/src/util.c b/src/util.c
index d57420007..61b0ed4f0 100644
--- a/src/util.c
+++ b/src/util.c
@@ -204,7 +204,10 @@ unsigned long long memtoull(const char *p, int *err) {
/* Search the first non digit character. */
u = p;
- if (*u == '-') u++;
+ if (*u == '-') {
+ if (err) *err = 1;
+ return 0;
+ }
while(*u && isdigit(*u)) u++;
if (*u == '\0' || !strcasecmp(u,"b")) {
mul = 1;
diff --git a/tests/support/util.tcl b/tests/support/util.tcl
index a834d4abd..67551b041 100644
--- a/tests/support/util.tcl
+++ b/tests/support/util.tcl
@@ -4,7 +4,7 @@ proc randstring {min max {type binary}} {
if {$type eq {binary}} {
set minval 0
set maxval 255
- } elseif {$type eq {alpha}} {
+ } elseif {$type eq {alpha} || $type eq {simplealpha}} {
set minval 48
set maxval 122
} elseif {$type eq {compr}} {
@@ -12,11 +12,10 @@ proc randstring {min max {type binary}} {
set maxval 52
}
while {$len} {
- set rr [expr {$minval+int(rand()*($maxval-$minval+1))}]
- if {$type eq {alpha} && $rr eq 92} {
- set rr 90; # avoid putting '\' char in the string, it can mess up TCL processing
- }
- append output [format "%c" $rr]
+ set rr [format "%c" [expr {$minval+int(rand()*($maxval-$minval+1))}]]
+ if {$type eq {simplealpha} && ![string is alnum $rr]} {continue}
+ if {$type eq {alpha} && $rr eq 92} {continue} ;# avoid putting '\' char in the string, it can mess up TCL processing
+ append output $rr
incr len -1
}
return $output
diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl
index 5150790db..a5cc2d4ac 100644
--- a/tests/test_helper.tcl
+++ b/tests/test_helper.tcl
@@ -82,6 +82,7 @@ set ::all_tests {
unit/oom-score-adj
unit/shutdown
unit/networking
+ unit/client-eviction
}
# Index to the next test to run in the ::all_tests list.
set ::next_test 0
diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl
new file mode 100644
index 000000000..9421d8b61
--- /dev/null
+++ b/tests/unit/client-eviction.tcl
@@ -0,0 +1,509 @@
+tags {"external:skip"} {
+
+# Get info about a redis client connection:
+# name - name of client we want to query
+# f - field name from "CLIENT LIST" we want to get
+proc client_field {name f} {
+ set clients [split [string trim [r client list]] "\r\n"]
+ set c [lsearch -inline $clients *name=$name*]
+ if {![regexp $f=(\[a-zA-Z0-9-\]+) $c - res]} {
+ error "no client named $name found with field $f"
+ }
+ return $res
+}
+
+proc client_exists {name} {
+ if {[catch { client_field $name tot-mem } e]} {
+ return false
+ }
+ return true
+}
+
+proc gen_client {} {
+ set rr [redis_client]
+ set name "tst_[randstring 4 4 simplealpha]"
+ $rr client setname $name
+ assert {[client_exists $name]}
+ return [list $rr $name]
+}
+
+# Sum a value across all redis client connections:
+# f - the field name from "CLIENT LIST" we want to sum
+proc clients_sum {f} {
+ set sum 0
+ set clients [split [string trim [r client list]] "\r\n"]
+ foreach c $clients {
+ if {![regexp $f=(\[a-zA-Z0-9-\]+) $c - res]} {
+ error "field $f not found in $c"
+ }
+ incr sum $res
+ }
+ return $sum
+}
+
+proc mb {v} {
+ return [expr $v * 1024 * 1024]
+}
+
+start_server {} {
+ set maxmemory_clients 3000000
+ r config set maxmemory-clients $maxmemory_clients
+
+ test "client evicted due to large argv" {
+ r flushdb
+ lassign [gen_client] rr cname
+ # Attempt a large multi-bulk command under eviction limit
+ $rr mset k v k2 [string repeat v 1000000]
+ assert_equal [$rr get k] v
+ # Attempt another command, now causing client eviction
+ catch { $rr mset k v k2 [string repeat v $maxmemory_clients] } e
+ assert {![client_exists $cname]}
+ $rr close
+ }
+
+ test "client evicted due to large query buf" {
+ r flushdb
+ lassign [gen_client] rr cname
+ # Attempt to fill the query buff without completing the argument above the limit, causing client eviction
+ catch {
+ $rr write [join [list "*1\r\n\$$maxmemory_clients\r\n" [string repeat v $maxmemory_clients]] ""]
+ $rr flush
+ $rr read
+ } e
+ assert {![client_exists $cname]}
+ $rr close
+ }
+
+ test "client evicted due to percentage of maxmemory" {
+ set maxmemory [mb 6]
+ r config set maxmemory $maxmemory
+ # Set client eviction threshold to 7% of maxmemory
+ set maxmemory_clients_p 7
+ r config set maxmemory-clients $maxmemory_clients_p%
+ r flushdb
+
+ set maxmemory_clients_actual [expr $maxmemory * $maxmemory_clients_p / 100]
+
+ lassign [gen_client] rr cname
+ # Attempt to fill the query buff with only half the percentage threshold verify we're not disconnected
+ set n [expr $maxmemory_clients_actual / 2]
+ $rr write [join [list "*1\r\n\$$n\r\n" [string repeat v $n]] ""]
+ $rr flush
+ set tot_mem [client_field $cname tot-mem]
+ assert {$tot_mem >= $n && $tot_mem < $maxmemory_clients_actual}
+
+ # Attempt to fill the query buff with the percentage threshold of maxmemory and verify we're evicted
+ $rr close
+ lassign [gen_client] rr cname
+ catch {
+ $rr write [join [list "*1\r\n\$$maxmemory_clients_actual\r\n" [string repeat v $maxmemory_clients_actual]] ""]
+ $rr flush
+ } e
+ assert {![client_exists $cname]}
+ $rr close
+
+ # Restore settings
+ r config set maxmemory 0
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to large multi buf" {
+ r flushdb
+ lassign [gen_client] rr cname
+
+ # Attempt a multi-exec where sum of commands is less than maxmemory_clients
+ $rr multi
+ $rr set k [string repeat v [expr $maxmemory_clients / 4]]
+ $rr set k [string repeat v [expr $maxmemory_clients / 4]]
+ assert_equal [$rr exec] {OK OK}
+
+ # Attempt a multi-exec where sum of commands is more than maxmemory_clients, causing client eviction
+ $rr multi
+ catch {
+ for {set j 0} {$j < 5} {incr j} {
+ $rr set k [string repeat v [expr $maxmemory_clients / 4]]
+ }
+ } e
+ assert {![client_exists $cname]}
+ $rr close
+ }
+
+ test "client evicted due to watched key list" {
+ r flushdb
+ set rr [redis_client]
+
+ # Since watched key list is a small overheatd this test uses a minimal maxmemory-clients config
+ set temp_maxmemory_clients 200000
+ r config set maxmemory-clients $temp_maxmemory_clients
+
+ # Append watched keys until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr watch $j
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Restore config for next tests
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to pubsub subscriptions" {
+ r flushdb
+
+ # Since pubsub subscriptions cause a small overheatd this test uses a minimal maxmemory-clients config
+ set temp_maxmemory_clients 200000
+ r config set maxmemory-clients $temp_maxmemory_clients
+
+ # Test eviction due to pubsub patterns
+ set rr [redis_client]
+ # Add patterns until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr psubscribe $j
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Test eviction due to pubsub channels
+ set rr [redis_client]
+ # Add patterns until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr subscribe $j
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Restore config for next tests
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to tracking redirection" {
+ r flushdb
+ # Use slow hz to avoid clientsCron from updating memory usage frequently since
+ # we're testing the update logic when writing tracking redirection output
+ set backup_hz [lindex [r config get hz] 1]
+ r config set hz 1
+
+ set rr [redis_client]
+ set redirected_c [redis_client]
+ $redirected_c client setname redirected_client
+ set redir_id [$redirected_c client id]
+ $redirected_c SUBSCRIBE __redis__:invalidate
+ $rr client tracking on redirect $redir_id bcast
+ # Use a big key name to fill the redirected tracking client's buffer quickly
+ set key_length [expr 1024*10]
+ set long_key [string repeat k $key_length]
+ # Use a script so we won't need to pass the long key name when dirtying it in the loop
+ set script_sha [$rr script load "redis.call('incr', '$long_key')"]
+ # Read and write to same (long) key until redirected_client's buffers cause it to be evicted
+ set t [clock milliseconds]
+ catch {
+ while true {
+ set mem [client_field redirected_client tot-mem]
+ assert {$mem < $maxmemory_clients}
+ $rr evalsha $script_sha 0
+ }
+ } e
+ assert_match {no client named redirected_client found*} $e
+
+ # Make sure eviction happened in less than 1sec, this means, statistically eviction
+ # wasn't caused by clientCron accounting
+ set t [expr [clock milliseconds] - $t]
+ assert {$t < 1000}
+
+ r config set hz $backup_hz
+ $rr close
+ $redirected_c close
+ }
+
+ test "client evicted due to client tracking prefixes" {
+ r flushdb
+ set rr [redis_client]
+
+ # Since tracking prefixes list is a small overheatd this test uses a minimal maxmemory-clients config
+ set temp_maxmemory_clients 200000
+ r config set maxmemory-clients $temp_maxmemory_clients
+
+ # Append tracking prefixes until list maxes out maxmemroy clients and causes client eviction
+ catch {
+ for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
+ $rr client tracking on prefix [format %012s $j] bcast
+ }
+ } e
+ assert_match {I/O error reading reply} $e
+ $rr close
+
+ # Restore config for next tests
+ r config set maxmemory-clients $maxmemory_clients
+ }
+
+ test "client evicted due to output buf" {
+ r flushdb
+ r setrange k 200000 v
+ set rr [redis_deferring_client]
+ $rr client setname test_client
+ $rr flush
+ assert {[$rr read] == "OK"}
+ # Attempt a large response under eviction limit
+ $rr get k
+ $rr flush
+ assert {[string length [$rr read]] == 200001}
+ set mem [client_field test_client tot-mem]
+ assert {$mem < $maxmemory_clients}
+
+ # Fill output buff in loop without reading it and make sure
+ # we're eventually disconnected, but before reaching maxmemory_clients
+ while true {
+ if { [catch {
+ set mem [client_field test_client tot-mem]
+ assert {$mem < $maxmemory_clients}
+ $rr get k
+ $rr flush
+ } e]} {
+ assert {![client_exists test_client]}
+ break
+ }
+ }
+ $rr close
+ }
+
+ foreach {no_evict} {on off} {
+ test "client no-evict $no_evict" {
+ r flushdb
+ r client setname control
+ r client no-evict on ;# Avoid evicting the main connection
+ lassign [gen_client] rr cname
+ $rr client no-evict $no_evict
+
+ # Overflow maxmemory-clients
+ set qbsize [expr {$maxmemory_clients + 1}]
+ if {[catch {
+ $rr write [join [list "*1\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""]
+ $rr flush
+ wait_for_condition 200 10 {
+ [client_field $cname qbuf] == $qbsize
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+ } e] && $no_evict == off} {
+ assert {![client_exists $cname]}
+ } elseif {$no_evict == on} {
+ assert {[client_field $cname tot-mem] > $maxmemory_clients}
+ }
+ $rr close
+ }
+ }
+}
+
+start_server {} {
+ set server_pid [s process_id]
+ set maxmemory_clients [mb 10]
+ set obuf_limit [mb 3]
+ r config set maxmemory-clients $maxmemory_clients
+ r config set client-output-buffer-limit "normal $obuf_limit 0 0"
+
+ test "avoid client eviction when client is freed by output buffer limit" {
+ r flushdb
+ set obuf_size [expr {$obuf_limit + [mb 1]}]
+ r setrange k $obuf_size v
+ set rr1 [redis_client]
+ $rr1 client setname "qbuf-client"
+ set rr2 [redis_deferring_client]
+ $rr2 client setname "obuf-client1"
+ assert_equal [$rr2 read] OK
+ set rr3 [redis_deferring_client]
+ $rr3 client setname "obuf-client2"
+ assert_equal [$rr3 read] OK
+
+ # Occupy client's query buff with less than output buffer limit left to exceed maxmemory-clients
+ set qbsize [expr {$maxmemory_clients - $obuf_size}]
+ $rr1 write [join [list "*1\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""]
+ $rr1 flush
+ # Wait for qbuff to be as expected
+ wait_for_condition 200 10 {
+ [client_field qbuf-client qbuf] == $qbsize
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+
+ # Make the other two obuf-clients pass obuf limit and also pass maxmemory-clients
+ # We use two obuf-clients to make sure that even if client eviction is attempted
+ # between two command processing (with no sleep) we don't perform any client eviction
+ # because the obuf limit is enforced with precedence.
+ exec kill -SIGSTOP $server_pid
+ $rr2 get k
+ $rr2 flush
+ $rr3 get k
+ $rr3 flush
+ exec kill -SIGCONT $server_pid
+
+ # Validate obuf-clients were disconnected (because of obuf limit)
+ catch {client_field obuf-client1 name} e
+ assert_match {no client named obuf-client1 found*} $e
+ catch {client_field obuf-client2 name} e
+ assert_match {no client named obuf-client2 found*} $e
+
+ # Validate qbuf-client is still connected and wasn't evicted
+ assert_equal [client_field qbuf-client name] {qbuf-client}
+
+ $rr1 close
+ $rr2 close
+ $rr3 close
+ }
+}
+
+start_server {} {
+ test "decrease maxmemory-clients causes client eviction" {
+ set maxmemory_clients [mb 4]
+ set client_count 10
+ set qbsize [expr ($maxmemory_clients - [mb 1]) / $client_count]
+ r config set maxmemory-clients $maxmemory_clients
+
+
+ # Make multiple clients consume together roughly 1mb less than maxmemory_clients
+ set rrs {}
+ for {set j 0} {$j < $client_count} {incr j} {
+ set rr [redis_client]
+ lappend rrs $rr
+ $rr client setname client$j
+ $rr write [join [list "*2\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""]
+ $rr flush
+ wait_for_condition 200 10 {
+ [client_field client$j qbuf] >= $qbsize
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+ }
+
+ # Make sure all clients are still connected
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients == $client_count}
+
+ # Decrease maxmemory_clients and expect client eviction
+ r config set maxmemory-clients [expr $maxmemory_clients / 2]
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients > 0 && $connected_clients < $client_count}
+
+ foreach rr $rrs {$rr close}
+ }
+}
+
+start_server {} {
+ test "evict clients only until below limit" {
+ set client_count 10
+ set client_mem [mb 1]
+ r config set maxmemory-clients 0
+ r client setname control
+ r client no-evict on
+
+ # Make multiple clients consume together roughly 1mb less than maxmemory_clients
+ set total_client_mem 0
+ set rrs {}
+ for {set j 0} {$j < $client_count} {incr j} {
+ set rr [redis_client]
+ lappend rrs $rr
+ $rr client setname client$j
+ $rr write [join [list "*2\r\n\$$client_mem\r\n" [string repeat v $client_mem]] ""]
+ $rr flush
+ wait_for_condition 200 10 {
+ [client_field client$j tot-mem] >= $client_mem
+ } else {
+ fail "Failed to fill qbuf for test"
+ }
+ incr total_client_mem [client_field client$j tot-mem]
+ }
+
+ set client_actual_mem [expr $total_client_mem / $client_count]
+
+ # Make sure client_acutal_mem is more or equal to what we intended
+ assert {$client_actual_mem >= $client_mem}
+
+ # Make sure all clients are still connected
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients == $client_count}
+
+ # Set maxmemory-clients to accommodate half our clients (taking into account the control client)
+ set maxmemory_clients [expr ($client_actual_mem * $client_count) / 2 + [client_field control tot-mem]]
+ r config set maxmemory-clients $maxmemory_clients
+
+ # Make sure total used memory is below maxmemory_clients
+ set total_client_mem [clients_sum tot-mem]
+ assert {$total_client_mem <= $maxmemory_clients}
+
+ # Make sure we have only half of our clients now
+ set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
+ assert {$connected_clients == [expr $client_count / 2]}
+
+ foreach rr $rrs {$rr close}
+ }
+}
+
+start_server {} {
+ test "evict clients in right order (large to small)" {
+ # Note that each size step needs to be at least x2 larger than previous step
+ # because of how the client-eviction size bucktting works
+ set sizes [list 100000 [mb 1] [mb 3]]
+ set clients_per_size 3
+ r client setname control
+ r client no-evict on
+ r config set maxmemory-clients 0
+
+ # Run over all sizes and create some clients using up that size
+ set total_client_mem 0
+ set rrs {}
+ for {set i 0} {$i < [llength $sizes]} {incr i} {
+ set size [lindex $sizes $i]
+
+ for {set j 0} {$j < $clients_per_size} {incr j} {
+ set rr [redis_client]
+ lappend rrs $rr
+ $rr client setname client-$i
+ $rr write [join [list "*2\r\n\$$size\r\n" [string repeat v $size]] ""]
+ $rr flush
+ }
+ set client_mem [client_field client-$i tot-mem]
+
+ # Update our size list based on actual used up size (this is usually
+ # slightly more than expected because of allocator bins
+ assert {$client_mem >= $size}
+ set sizes [lreplace $sizes $i $i $client_mem]
+
+ # Account total client memory usage
+ incr total_mem [expr $clients_per_size * $client_mem]
+ }
+ incr total_mem [client_field control tot-mem]
+
+ # Make sure all clients are connected
+ set clients [split [string trim [r client list]] "\r\n"]
+ for {set i 0} {$i < [llength $sizes]} {incr i} {
+ assert_equal [llength [lsearch -all $clients "*name=client-$i *"]] $clients_per_size
+ }
+
+ # For each size reduce maxmemory-clients so relevant clients should be evicted
+ # do this from largest to smallest
+ foreach size [lreverse $sizes] {
+ set total_mem [expr $total_mem - $clients_per_size * $size]
+ r config set maxmemory-clients $total_mem
+ set clients [split [string trim [r client list]] "\r\n"]
+ # Verify only relevant clients were evicted
+ for {set i 0} {$i < [llength $sizes]} {incr i} {
+ set verify_size [lindex $sizes $i]
+ set count [llength [lsearch -all $clients "*name=client-$i *"]]
+ if {$verify_size < $size} {
+ assert_equal $count $clients_per_size
+ } else {
+ assert_equal $count 0
+ }
+ }
+ }
+ foreach rr $rrs {$rr close}
+ }
+}
+
+}
+
diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl
index dd986dd72..0bc0e4b6d 100644
--- a/tests/unit/maxmemory.tcl
+++ b/tests/unit/maxmemory.tcl
@@ -1,3 +1,140 @@
+start_server {tags {"maxmemory" "external:skip"}} {
+ r config set maxmemory 11mb
+ r config set maxmemory-policy allkeys-lru
+ set server_pid [s process_id]
+
+ proc init_test {client_eviction} {
+ r flushdb
+
+ set prev_maxmemory_clients [r config get maxmemory-clients]
+ if $client_eviction {
+ r config set maxmemory-clients 3mb
+ } else {
+ r config set maxmemory-clients 0
+ }
+
+ r config resetstat
+ # fill 5mb using 50 keys of 100kb
+ for {set j 0} {$j < 50} {incr j} {
+ r setrange $j 100000 x
+ }
+ assert_equal [r dbsize] 50
+ }
+
+ proc verify_test {client_eviction} {
+ set evicted_keys [s evicted_keys]
+ set evicted_clients [s evicted_clients]
+ set dbsize [r dbsize]
+
+ if $::verbose {
+ puts "evicted keys: $evicted_keys"
+ puts "evicted clients: $evicted_clients"
+ puts "dbsize: $dbsize"
+ }
+
+ if $client_eviction {
+ assert_morethan $evicted_clients 0
+ assert_equal $evicted_keys 0
+ assert_equal $dbsize 50
+ } else {
+ assert_equal $evicted_clients 0
+ assert_morethan $evicted_keys 0
+ assert_lessthan $dbsize 50
+ }
+ }
+
+ foreach {client_eviction} {false true} {
+ set clients {}
+ test "eviction due to output buffers of many MGET clients, client eviction: $client_eviction" {
+ init_test $client_eviction
+
+ for {set j 0} {$j < 20} {incr j} {
+ set rr [redis_deferring_client]
+ lappend clients $rr
+ }
+
+ # Freeze the server so output buffers will be filled in one event loop when we un-freeze after sending mgets
+ exec kill -SIGSTOP $server_pid
+ for {set j 0} {$j < 5} {incr j} {
+ foreach rr $clients {
+ $rr mget 1
+ $rr flush
+ }
+ }
+ # Unfreeze server
+ exec kill -SIGCONT $server_pid
+
+
+ for {set j 0} {$j < 5} {incr j} {
+ foreach rr $clients {
+ if {[catch { $rr read } err]} {
+ lremove clients $rr
+ }
+ }
+ }
+
+ verify_test $client_eviction
+ }
+ foreach rr $clients {
+ $rr close
+ }
+
+ set clients {}
+ test "eviction due to input buffer of a dead client, client eviction: $client_eviction" {
+ init_test $client_eviction
+
+ for {set j 0} {$j < 30} {incr j} {
+ set rr [redis_deferring_client]
+ lappend clients $rr
+ }
+
+ foreach rr $clients {
+ if {[catch {
+ $rr write "*250\r\n"
+ for {set j 0} {$j < 249} {incr j} {
+ $rr write "\$1000\r\n"
+ $rr write [string repeat x 1000]
+ $rr write "\r\n"
+ $rr flush
+ }
+ }]} {
+ lremove clients $rr
+ }
+ }
+
+ verify_test $client_eviction
+ }
+ foreach rr $clients {
+ $rr close
+ }
+
+ set clients {}
+ test "eviction due to output buffers of pubsub, client eviction: $client_eviction" {
+ init_test $client_eviction
+
+ for {set j 0} {$j < 10} {incr j} {
+ set rr [redis_deferring_client]
+ lappend clients $rr
+ }
+
+ foreach rr $clients {
+ $rr subscribe bla
+ $rr flush
+ }
+
+ for {set j 0} {$j < 40} {incr j} {
+ catch {r publish bla [string repeat x 100000]} err
+ }
+
+ verify_test $client_eviction
+ }
+ foreach rr $clients {
+ $rr close
+ }
+ }
+
+}
+
start_server {tags {"maxmemory external:skip"}} {
test "Without maxmemory small integers are shared" {
r config set maxmemory 0