diff options
-rw-r--r-- | redis.conf | 19 | ||||
-rw-r--r-- | src/blocked.c | 12 | ||||
-rw-r--r-- | src/config.c | 152 | ||||
-rw-r--r-- | src/debug.c | 19 | ||||
-rw-r--r-- | src/multi.c | 9 | ||||
-rw-r--r-- | src/networking.c | 236 | ||||
-rw-r--r-- | src/object.c | 2 | ||||
-rw-r--r-- | src/pubsub.c | 2 | ||||
-rw-r--r-- | src/redis-cli.c | 2 | ||||
-rw-r--r-- | src/replication.c | 1 | ||||
-rw-r--r-- | src/server.c | 130 | ||||
-rw-r--r-- | src/server.h | 51 | ||||
-rw-r--r-- | src/tracking.c | 1 | ||||
-rw-r--r-- | src/util.c | 5 | ||||
-rw-r--r-- | tests/support/util.tcl | 11 | ||||
-rw-r--r-- | tests/test_helper.tcl | 1 | ||||
-rw-r--r-- | tests/unit/client-eviction.tcl | 509 | ||||
-rw-r--r-- | tests/unit/maxmemory.tcl | 137 |
18 files changed, 1168 insertions, 131 deletions
diff --git a/redis.conf b/redis.conf index 3eb7374cc..bd7fa5271 100644 --- a/redis.conf +++ b/redis.conf @@ -1841,6 +1841,25 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# In some scenarios client connections can hog up memory leading to OOM +# errors or data eviction. To avoid this we can cap the accumulated memory +# used by all client connections (all pubsub and normal clients). Once we +# reach that limit connections will be dropped by the server freeing up +# memory. The server will attempt to drop the connections using the most +# memory first. We call this mechanism "client eviction". +# +# Client eviction is configured using the maxmemory-clients setting as follows: +# 0 - client eviction is disabled (default) +# +# A memory value can be used for the client eviction threshold, +# for example: +# maxmemory-clients 1g +# +# A percentage value (between 1% and 100%) means the client eviction threshold +# is based on a percentage of the maxmemory setting. For example to set client +# eviction at 5% of maxmemory: +# maxmemory-clients 5% + # In the Redis protocol, bulk requests, that are, elements representing single # strings, are normally limited to 512 mb. However you can change this limit # here, but must be 1mb or greater diff --git a/src/blocked.c b/src/blocked.c index 4898cdcbf..86aed2440 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -138,14 +138,14 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { /* If we have a queued command, execute it now. */ - if (processPendingCommandsAndResetClient(c) == C_ERR) { - continue; - } - /* Then process client if it has more data in it's buffer. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - processInputBuffer(c); + if (processPendingCommandsAndResetClient(c) == C_OK) { + /* Now process client if it has more data in it's buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) { + processInputBuffer(c); + } } } + beforeNextClient(c); } } diff --git a/src/config.c b/src/config.c index 349021f9e..2a8697e09 100644 --- a/src/config.c +++ b/src/config.c @@ -198,6 +198,10 @@ typedef enum numericType { NUMERIC_TYPE_TIME_T, } numericType; +#define INTEGER_CONFIG 0 /* No flags means a simple integer configuration */ +#define MEMORY_CONFIG (1<<0) /* Indicates if this value can be loaded as a memory value */ +#define PERCENT_CONFIG (1<<1) /* Indicates if this value can be loaded as a percent (and stored as a negative int) */ + typedef struct numericConfigData { union { int *i; @@ -211,7 +215,7 @@ typedef struct numericConfigData { off_t *ot; time_t *tt; } config; /* The pointer to the numeric config this value is stored in */ - int is_memory; /* Indicates if this value can be loaded as a memory value */ + unsigned int flags; numericType numeric_type; /* An enum indicating the type of this value */ long long lower_bound; /* The lower bound of this numeric value */ long long upper_bound; /* The upper bound of this numeric value */ @@ -1347,6 +1351,14 @@ void rewriteConfigBytesOption(struct rewriteConfigState *state, const char *opti rewriteConfigRewriteLine(state,option,line,force); } +/* Rewrite a simple "option-name n%" configuration option. */ +void rewriteConfigPercentOption(struct rewriteConfigState *state, const char *option, long long value, long long defvalue) { + int force = value != defvalue; + sds line = sdscatprintf(sdsempty(),"%s %lld%%",option,value); + + rewriteConfigRewriteLine(state,option,line,force); +} + /* Rewrite a yes/no option. */ void rewriteConfigYesNoOption(struct rewriteConfigState *state, const char *option, int value, int defvalue) { int force = value != defvalue; @@ -2111,8 +2123,18 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) { return 0; } } else { + /* Boundary check for percentages */ + if (data.numeric.flags & PERCENT_CONFIG && ll < 0) { + if (ll < data.numeric.lower_bound) { + snprintf(loadbuf, LOADBUF_SIZE, + "percentage argument must be less or equal to %lld", + -data.numeric.lower_bound); + *err = loadbuf; + return 0; + } + } /* Boundary check for signed types */ - if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) { + else if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) { snprintf(loadbuf, LOADBUF_SIZE, "argument must be between %lld and %lld inclusive", data.numeric.lower_bound, @@ -2124,22 +2146,46 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) { return 1; } +static int numericParseString(typeData data, sds value, const char **err, long long *res) { + /* First try to parse as memory */ + if (data.numeric.flags & MEMORY_CONFIG) { + int memerr; + *res = memtoull(value, &memerr); + if (!memerr) + return 1; + } + + /* Attempt to parse as percent */ + if (data.numeric.flags & PERCENT_CONFIG && + sdslen(value) > 1 && value[sdslen(value)-1] == '%' && + string2ll(value, sdslen(value)-1, res) && + *res >= 0) { + /* We store percentage as negative value */ + *res = -*res; + return 1; + } + + /* Attempt a simple number (no special flags set) */ + if (!data.numeric.flags && string2ll(value, sdslen(value), res)) + return 1; + + /* Select appropriate error string */ + if (data.numeric.flags & MEMORY_CONFIG && + data.numeric.flags & PERCENT_CONFIG) + *err = "argument must be a memory or percent value" ; + else if (data.numeric.flags & MEMORY_CONFIG) + *err = "argument must be a memory value"; + else + *err = "argument couldn't be parsed into an integer"; + return 0; +} static int numericConfigSet(typeData data, sds value, int update, const char **err) { long long ll, prev = 0; - if (data.numeric.is_memory) { - int memerr; - ll = memtoull(value, &memerr); - if (memerr) { - *err = "argument must be a memory value"; - return 0; - } - } else { - if (!string2ll(value, sdslen(value), &ll)) { - *err = "argument couldn't be parsed into an integer" ; - return 0; - } - } + + if (!numericParseString(data, value, err, &ll)) + return 0; + if (!numericBoundaryCheck(data, ll, err)) return 0; @@ -2158,21 +2204,21 @@ static int numericConfigSet(typeData data, sds value, int update, const char **e static void numericConfigGet(client *c, typeData data) { char buf[128]; - if (data.numeric.is_memory) { - unsigned long long value = 0; - - GET_NUMERIC_TYPE(value) - ull2string(buf, sizeof(buf), value); - addReplyBulkCString(c, buf); - } else{ - long long value = 0; - - GET_NUMERIC_TYPE(value) + long long value = 0; + GET_NUMERIC_TYPE(value) + if (data.numeric.flags & PERCENT_CONFIG && value < 0) { + int len = ll2string(buf, sizeof(buf), -value); + buf[len] = '%'; + buf[len+1] = '\0'; + } + else if (data.numeric.flags & MEMORY_CONFIG) { + ull2string(buf, sizeof(buf), value); + } else { ll2string(buf, sizeof(buf), value); - addReplyBulkCString(c, buf); } + addReplyBulkCString(c, buf); } static void numericConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) { @@ -2180,18 +2226,17 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite GET_NUMERIC_TYPE(value) - if (data.numeric.is_memory) { + if (data.numeric.flags & PERCENT_CONFIG && value < 0) { + rewriteConfigPercentOption(state, name, -value, data.numeric.default_value); + } else if (data.numeric.flags & MEMORY_CONFIG) { rewriteConfigBytesOption(state, name, value, data.numeric.default_value); } else { rewriteConfigNumericalOption(state, name, value, data.numeric.default_value); } } -#define INTEGER_CONFIG 0 -#define MEMORY_CONFIG 1 - -#define embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) { \ - embedCommonConfig(name, alias, flags) \ +#define embedCommonNumericalConfig(name, alias, _flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) { \ + embedCommonConfig(name, alias, _flags) \ embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \ .data.numeric = { \ .lower_bound = (lower), \ @@ -2199,73 +2244,73 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite .default_value = (default), \ .is_valid_fn = (is_valid), \ .update_fn = (update), \ - .is_memory = (memory), + .flags = (num_conf_flags), -#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_INT, \ .config.i = &(config_addr) \ } \ } -#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_UINT, \ .config.ui = &(config_addr) \ } \ } -#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_LONG, \ .config.l = &(config_addr) \ } \ } -#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_ULONG, \ .config.ul = &(config_addr) \ } \ } -#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_LONG_LONG, \ .config.ll = &(config_addr) \ } \ } -#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_ULONG_LONG, \ .config.ull = &(config_addr) \ } \ } -#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_SIZE_T, \ .config.st = &(config_addr) \ } \ } -#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_SSIZE_T, \ .config.sst = &(config_addr) \ } \ } -#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_TIME_T, \ .config.tt = &(config_addr) \ } \ } -#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_OFF_T, \ .config.ot = &(config_addr) \ } \ @@ -2653,6 +2698,7 @@ standardConfig configs[] = { createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */ createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */ + createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL), /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ diff --git a/src/debug.c b/src/debug.c index d29d48673..4e95e2dfd 100644 --- a/src/debug.c +++ b/src/debug.c @@ -469,6 +469,8 @@ void debugCommand(client *c) { " Return the size of different Redis core C structures.", "ZIPLIST <key>", " Show low level info about the ziplist encoding of <key>.", +"CLIENT-EVICTION", +" Show low level client eviction pools info (maxmemory-clients).", NULL }; addReplyHelp(c, help); @@ -883,6 +885,23 @@ NULL addReplyError(c, "CONFIG-REWRITE-FORCE-ALL failed"); else addReply(c, shared.ok); + } else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) { + sds bucket_info = sdsempty(); + for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { + if (j == 0) + bucket_info = sdscatprintf(bucket_info, "bucket 0"); + else + bucket_info = sdscatprintf(bucket_info, "bucket %10zu", (size_t)1<<(j-1+CLIENT_MEM_USAGE_BUCKET_MIN_LOG)); + if (j == CLIENT_MEM_USAGE_BUCKETS-1) + bucket_info = sdscatprintf(bucket_info, "+ : "); + else + bucket_info = sdscatprintf(bucket_info, " - %10zu: ", ((size_t)1<<(j+CLIENT_MEM_USAGE_BUCKET_MIN_LOG))-1); + bucket_info = sdscatprintf(bucket_info, "tot-mem: %10zu, clients: %lu\n", + server.client_mem_usage_buckets[j].mem_usage_sum, + server.client_mem_usage_buckets[j].clients->len); + } + addReplyVerbatim(c,bucket_info,sdslen(bucket_info),"txt"); + sdsfree(bucket_info); #ifdef USE_JEMALLOC } else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) { mallctl_int(c, c->argv+2, c->argc-2); diff --git a/src/multi.c b/src/multi.c index e40d2a447..b02457bb9 100644 --- a/src/multi.c +++ b/src/multi.c @@ -37,6 +37,7 @@ void initClientMultiState(client *c) { c->mstate.count = 0; c->mstate.cmd_flags = 0; c->mstate.cmd_inv_flags = 0; + c->mstate.argv_len_sums = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -78,6 +79,7 @@ void queueMultiCommand(client *c) { c->mstate.count++; c->mstate.cmd_flags |= c->cmd->flags; c->mstate.cmd_inv_flags |= ~c->cmd->flags; + c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc; } void discardTransaction(client *c) { @@ -435,3 +437,10 @@ void unwatchCommand(client *c) { c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } + +size_t multiStateMemOverhead(client *c) { + size_t mem = c->mstate.argv_len_sums; + /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ + mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); + return mem; +} diff --git a/src/networking.c b/src/networking.c index 3b522fb35..21278d783 100644 --- a/src/networking.c +++ b/src/networking.c @@ -180,15 +180,18 @@ client *createClient(connection *conn) { c->sockname = NULL; c->client_list_node = NULL; c->paused_list_node = NULL; + c->pending_read_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; - c->client_cron_last_memory_usage = 0; - c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL; + c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0; + c->last_memory_type = CLIENT_TYPE_NORMAL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); + c->mem_usage_bucket = NULL; + c->mem_usage_bucket_node = NULL; if (conn) linkClient(c); initClientMultiState(c); return c; @@ -267,7 +270,7 @@ int prepareClientToWrite(client *c) { * not install a write handler. Instead, it will be done by * handleClientsWithPendingReadsUsingThreads() upon return. */ - if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) + if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) clientInstallWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1288,13 +1291,13 @@ void unlinkClient(client *c) { } /* Remove from the list of pending reads if needed. */ - if (c->flags & CLIENT_PENDING_READ) { - ln = listSearchKey(server.clients_pending_read,c); - serverAssert(ln != NULL); - listDelNode(server.clients_pending_read,ln); - c->flags &= ~CLIENT_PENDING_READ; + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + if (c->pending_read_list_node != NULL) { + listDelNode(server.clients_pending_read,c->pending_read_list_node); + c->pending_read_list_node = NULL; } + /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { @@ -1430,10 +1433,15 @@ void freeClient(client *c) { * we lost the connection with the master. */ if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); - /* Remove the contribution that this client gave to our + /* Remove the contribution that this client gave to our * incrementally computed memory usage. */ - server.stat_clients_type_memory[c->client_cron_last_memory_type] -= - c->client_cron_last_memory_usage; + server.stat_clients_type_memory[c->last_memory_type] -= + c->last_memory_usage; + /* Remove client from memory usage buckets */ + if (c->mem_usage_bucket) { + c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; + listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node); + } /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ @@ -1470,6 +1478,27 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } +/* Perform processing of the client before moving on to processing the next client + * this is useful for performing operations that affect the global state but can't + * wait until we're done with all clients. In other words can't wait until beforeSleep() + * return C_ERR in case client is no longer valid after call. */ +int beforeNextClient(client *c) { + /* Skip the client processing if we're in an IO thread, in that case we'll perform + this operation later (this function is called again) in the fan-in stage of the threading mechanism */ + if (io_threads_op != IO_THREADS_OP_IDLE) + return C_OK; + /* Handle async frees */ + /* Note: this doesn't make the server.clients_to_close list redundant because of + * cases where we want an async free of a client other than myself. For example + * in ACL modifications we disconnect clients authenticated to non-existent + * users (see ACL LOAD). */ + if (c->flags & CLIENT_CLOSE_ASAP) { + freeClient(c); + return C_ERR; + } + return C_OK; +} + /* Free the clients marked as CLOSE_ASAP, return the number of clients * freed. */ int freeClientsInAsyncFreeQueue(void) { @@ -1594,7 +1623,10 @@ int writeToClient(client *c, int handler_installed) { * adDeleteFileEvent() is not thread safe: however writeToClient() * is always called with handler_installed set to 0 from threads * so we are fine. */ - if (handler_installed) connSetWriteHandler(c->conn, NULL); + if (handler_installed) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + connSetWriteHandler(c->conn, NULL); + } /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { @@ -1602,6 +1634,7 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } } + updateClientMemUsage(c); return C_OK; } @@ -2036,7 +2069,11 @@ int processCommandAndResetClient(client *c) { server.current_client = c; if (processCommand(c) == C_OK) { commandProcessed(c); + /* Update the client's memory to include output buffer growth following the + * processed command. */ + updateClientMemUsage(c); } + if (server.current_client == NULL) deadclient = 1; /* * Restore the old client, this is needed because when a script @@ -2117,7 +2154,8 @@ void processInputBuffer(client *c) { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ - if (c->flags & CLIENT_PENDING_READ) { + if (io_threads_op != IO_THREADS_OP_IDLE) { + serverAssert(io_threads_op == IO_THREADS_OP_READ); c->flags |= CLIENT_PENDING_COMMAND; break; } @@ -2137,6 +2175,11 @@ void processInputBuffer(client *c) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } + + /* Update client memory usage after processing the query buffer, this is + * important in case the query buffer is big and wasn't drained during + * the above loop (because of partially sent big commands). */ + updateClientMemUsage(c); } void readQueryFromClient(connection *conn) { @@ -2190,7 +2233,7 @@ void readQueryFromClient(connection *conn) { } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); - return; + goto done; } } else if (nread == 0) { if (server.verbosity <= LL_VERBOSE) { @@ -2199,7 +2242,7 @@ void readQueryFromClient(connection *conn) { sdsfree(info); } freeClientAsync(c); - return; + goto done; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a @@ -2223,12 +2266,15 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - return; + goto done; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ processInputBuffer(c); + +done: + beforeNextClient(c); } /* A Redis "Address String" is a colon separated ip:port pair. @@ -2306,6 +2352,7 @@ sds catClientInfoString(sds s, client *client) { if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; if (client->flags & CLIENT_READONLY) *p++ = 'r'; + if (client->flags & CLIENT_NO_EVICT) *p++ = 'e'; if (p == flags) *p++ = 'N'; *p++ = '\0'; @@ -2317,19 +2364,10 @@ sds catClientInfoString(sds s, client *client) { *p = '\0'; /* Compute the total memory consumed by this client. */ - size_t obufmem = getClientOutputBufferMemoryUsage(client); - size_t total_mem = obufmem; - total_mem += zmalloc_size(client); /* includes client->buf */ - total_mem += sdsZmallocSize(client->querybuf); - /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory - * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to - * spot problematic clients. */ - total_mem += client->argv_len_sum; - if (client->argv) - total_mem += zmalloc_size(client->argv); + size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); return sdscatfmt(s, - "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", + "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", (unsigned long long) client->id, getClientPeerId(client), getClientSockname(client), @@ -2345,6 +2383,7 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->argv_len_sum, + (unsigned long long) client->mstate.argv_len_sums, (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ @@ -2565,6 +2604,18 @@ NULL addReplyErrorObject(c,shared.syntaxerr); return; } + } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) { + /* CLIENT PROTECT ON|OFF */ + if (!strcasecmp(c->argv[2]->ptr,"on")) { + c->flags |= CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr,"off")) { + c->flags &= ~CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { /* CLIENT KILL <ip:port> * CLIENT KILL <option> [value] ... <option> [value] */ @@ -3154,11 +3205,39 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * Note: this function is very fast so can be called as many time as * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ -unsigned long getClientOutputBufferMemoryUsage(client *c) { - unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); +size_t getClientOutputBufferMemoryUsage(client *c) { + size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); return c->reply_bytes + (list_item_size*listLength(c->reply)); } +/* Returns the total client's memory usage. + * Optionally, if output_buffer_mem_usage is not NULL, it fills it with + * the client output buffer memory usage portion of the total. */ +size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { + size_t mem = getClientOutputBufferMemoryUsage(c); + if (output_buffer_mem_usage != NULL) + *output_buffer_mem_usage = mem; + mem += sdsZmallocSize(c->querybuf); + mem += zmalloc_size(c); + /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory + * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to + * spot problematic clients. */ + mem += c->argv_len_sum + sizeof(robj*)*c->argc; + mem += multiStateMemOverhead(c); + + /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers + * to the strings themselves because they aren't stored per client. */ + mem += listLength(c->pubsub_patterns) * sizeof(listNode); + mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) + + dictSlots(c->pubsub_channels) * sizeof(dictEntry*); + + /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */ + if (c->client_tracking_prefixes) + mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*)); + + return mem; +} + /* Get the class of a client, used in order to enforce limits to different * classes of clients. * @@ -3425,13 +3504,11 @@ void processEventsWhileBlocked(void) { * ========================================================================== */ #define IO_THREADS_MAX_NUM 128 -#define IO_THREADS_OP_READ 0 -#define IO_THREADS_OP_WRITE 1 pthread_t io_threads[IO_THREADS_MAX_NUM]; pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; -int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ +int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??! /* This is the list of clients each thread will serve when threaded I/O is * used. We spawn io_threads_num-1 threads, since one is the main thread @@ -3498,6 +3575,9 @@ void *IOThreadMain(void *myid) { void initThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */ + /* Indicate that io-threads are currently idle */ + io_threads_op = IO_THREADS_OP_IDLE; + /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; @@ -3584,6 +3664,12 @@ int stopThreadedIOIfNeeded(void) { } } +/* This function achieves thread safety using a fan-out -> fan-in paradigm: + * Fan out: The main thread fans out work to the io-threads which block until + * setIOPendingCount() is called with a value larger than 0 by the main thread. + * Fan in: The main thread waits until getIOPendingCount() returns 0. Then + * it can safely perform post-processing and return to normal synchronous + * work. */ int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ @@ -3642,12 +3728,17 @@ int handleClientsWithPendingWritesUsingThreads(void) { if (pending == 0) break; } + io_threads_op = IO_THREADS_OP_IDLE; + /* Run the list of clients again to install the write handler where * needed. */ listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); + /* Update the client in the mem usage buckets after we're done processing it in the io-threads */ + updateClientMemUsageBucket(c); + /* Install the write handler if there are pending writes in some * of the clients. */ if (clientHasPendingReplies(c) && @@ -3672,10 +3763,11 @@ int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && - !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) + !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) && + io_threads_op == IO_THREADS_OP_IDLE) { - c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); + c->pending_read_list_node = listFirst(server.clients_pending_read); return 1; } else { return 0; @@ -3687,7 +3779,13 @@ int postponeClientRead(client *c) { * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available - * rendering it in the client structures. */ + * rendering it in the client structures. + * This function achieves thread safety using a fan-out -> fan-in paradigm: + * Fan out: The main thread fans out work to the io-threads which block until + * setIOPendingCount() is called with a value larger than 0 by the main thread. + * Fan in: The main thread waits until getIOPendingCount() returns 0. Then + * it can safely perform post-processing and return to normal synchronous + * work. */ int handleClientsWithPendingReadsUsingThreads(void) { if (!server.io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); @@ -3729,14 +3827,27 @@ int handleClientsWithPendingReadsUsingThreads(void) { if (pending == 0) break; } + io_threads_op = IO_THREADS_OP_IDLE; + /* Run the list of clients again to process the new buffers. */ while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); - c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); + c->pending_read_list_node = NULL; serverAssert(!(c->flags & CLIENT_BLOCKED)); + + if (beforeNextClient(c) == C_ERR) { + /* If the client is no longer valid, we avoid + * processing the client later. So we just go + * to the next. */ + continue; + } + + /* Once io-threads are idle we can update the client in the mem usage buckets */ + updateClientMemUsageBucket(c); + if (processPendingCommandsAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go @@ -3758,3 +3869,56 @@ int handleClientsWithPendingReadsUsingThreads(void) { return processed; } + +/* Returns the actual client eviction limit based on current configuration or + * 0 if no limit. */ +size_t getClientEvictionLimit(void) { + size_t maxmemory_clients_actual = SIZE_MAX; + + /* Handle percentage of maxmemory*/ + if (server.maxmemory_clients < 0 && server.maxmemory > 0) { + unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100); + if (maxmemory_clients_bytes <= SIZE_MAX) + maxmemory_clients_actual = maxmemory_clients_bytes; + } + else if (server.maxmemory_clients > 0) + maxmemory_clients_actual = server.maxmemory_clients; + else + return 0; + + /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate + * at all with the server because of bad configuration */ + if (maxmemory_clients_actual < 1024*128) + maxmemory_clients_actual = 1024*128; + + return maxmemory_clients_actual; +} + +void evictClients(void) { + /* Start eviction from topmost bucket (largest clients) */ + int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1; + listIter bucket_iter; + listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter); + size_t client_eviction_limit = getClientEvictionLimit(); + if (client_eviction_limit == 0) + return; + while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + + server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) { + listNode *ln = listNext(&bucket_iter); + if (ln) { + client *c = ln->value; + sds ci = catClientInfoString(sdsempty(),c); + serverLog(LL_NOTICE, "Evicting client: %s", ci); + freeClient(c); + sdsfree(ci); + server.stat_evictedclients++; + } else { + curr_bucket--; + if (curr_bucket < 0) { + serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients"); + break; + } + listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter); + } + } +} diff --git a/src/object.c b/src/object.c index 0f869ea7e..edbd56acb 100644 --- a/src/object.c +++ b/src/object.c @@ -1180,7 +1180,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { /* Computing the memory used by the clients would be O(N) if done * here online. We use our values computed incrementally by - * clientsCronTrackClientsMemUsage(). */ + * updateClientMemUsage(). */ mh->clients_slaves = server.stat_clients_type_memory[CLIENT_TYPE_SLAVE]; mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+ diff --git a/src/pubsub.c b/src/pubsub.c index 0169b3604..e0bbc6d94 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -304,6 +304,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReplyPubsubMessage(c,channel,message); + updateClientMemUsage(c); receivers++; } } @@ -323,6 +324,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); addReplyPubsubPatMessage(c,pattern,channel,message); + updateClientMemUsage(c); receivers++; } } diff --git a/src/redis-cli.c b/src/redis-cli.c index 7144d2bc2..f3e6f6082 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1286,6 +1286,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) { !strcasecmp(argv[1],"htstats")) || (argc >= 2 && !strcasecmp(command,"debug") && !strcasecmp(argv[1],"htstats-key")) || + (argc >= 2 && !strcasecmp(command,"debug") && + !strcasecmp(argv[1],"client-eviction")) || (argc >= 2 && !strcasecmp(command,"memory") && (!strcasecmp(argv[1],"malloc-stats") || !strcasecmp(argv[1],"doctor"))) || diff --git a/src/replication.c b/src/replication.c index e1d39582c..8c692df8d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -414,6 +414,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, while((ln = listNext(&li))) { client *monitor = ln->value; addReply(monitor,cmdobj); + updateClientMemUsage(c); } decrRefCount(cmdobj); } diff --git a/src/server.c b/src/server.c index d8555619c..9b8fd48ef 100644 --- a/src/server.c +++ b/src/server.c @@ -2169,30 +2169,94 @@ int clientsCronTrackExpansiveClients(client *c, int time_idx) { return 0; /* This function never terminates the client. */ } -/* Iterating all the clients in getMemoryOverheadData() is too slow and - * in turn would make the INFO command too slow. So we perform this - * computation incrementally and track the (not instantaneous but updated - * to the second) total memory used by clients using clientsCron() in - * a more incremental way (depending on server.hz). */ -int clientsCronTrackClientsMemUsage(client *c) { - size_t mem = 0; +/* All normal clients are placed in one of the "mem usage buckets" according + * to how much memory they currently use. We use this function to find the + * appropriate bucket based on a given memory usage value. The algorithm simply + * does a log2(mem) to ge the bucket. This means, for examples, that if a + * client's memory usage doubles it's moved up to the next bucket, if it's + * halved we move it down a bucket. + * For more details see CLIENT_MEM_USAGE_BUCKETS documentation in server.h. */ +clientMemUsageBucket *getMemUsageBucket(size_t mem) { + int size_in_bits = 8*(int)sizeof(mem); + int clz = mem > 0 ? __builtin_clzl(mem) : size_in_bits; + int bucket_idx = size_in_bits - clz; + if (bucket_idx > CLIENT_MEM_USAGE_BUCKET_MAX_LOG) + bucket_idx = CLIENT_MEM_USAGE_BUCKET_MAX_LOG; + else if (bucket_idx < CLIENT_MEM_USAGE_BUCKET_MIN_LOG) + bucket_idx = CLIENT_MEM_USAGE_BUCKET_MIN_LOG; + bucket_idx -= CLIENT_MEM_USAGE_BUCKET_MIN_LOG; + return &server.client_mem_usage_buckets[bucket_idx]; +} + +/* This is called both on explicit clients when something changed their buffers, + * so we can track clients' memory and enforce clients' maxmemory in real time, + * and also from the clientsCron. We call it from the cron so we have updated + * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration + * change requires us to evict a non-active client. + */ +int updateClientMemUsage(client *c) { + size_t mem = getClientMemoryUsage(c, NULL); int type = getClientType(c); - mem += getClientOutputBufferMemoryUsage(c); - mem += sdsZmallocSize(c->querybuf); - mem += zmalloc_size(c); - mem += c->argv_len_sum; - if (c->argv) mem += zmalloc_size(c->argv); - /* Now that we have the memory used by the client, remove the old - * value from the old category, and add it back. */ - server.stat_clients_type_memory[c->client_cron_last_memory_type] -= - c->client_cron_last_memory_usage; - server.stat_clients_type_memory[type] += mem; + + /* Remove the old value of the memory used by the client from the old + * category, and add it back. */ + atomicDecr(server.stat_clients_type_memory[c->last_memory_type], c->last_memory_usage); + atomicIncr(server.stat_clients_type_memory[type], mem); + /* Remember what we added and where, to remove it next time. */ - c->client_cron_last_memory_usage = mem; - c->client_cron_last_memory_type = type; + c->last_memory_usage = mem; + c->last_memory_type = type; + + /* Update client mem usage bucket only when we're not in the context of an + * IO thread. See updateClientMemUsageBucket() for details. */ + if (io_threads_op == IO_THREADS_OP_IDLE) + updateClientMemUsageBucket(c); + return 0; } +/* Adds the client to the correct memory usage bucket. Each bucket contains + * all clients with roughly the same amount of memory. This way we group + * together clients consuming about the same amount of memory and can quickly + * free them in case we reach maxmemory-clients (client eviction). + * Note that in case of io-threads enabled we have to call this function only + * after the fan-in phase (when no io-threads are working) because the bucket + * lists are global. The io-threads themselves track per-client memory usage in + * updateClientMemUsage(). Here we update the clients to each bucket when all + * io-threads are done (both for read and write io-threading). */ +void updateClientMemUsageBucket(client *c) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + int allow_eviction = + (c->last_memory_type == CLIENT_TYPE_NORMAL || c->last_memory_type == CLIENT_TYPE_PUBSUB) && + !(c->flags & CLIENT_NO_EVICT); + + /* Update the client in the mem usage buckets */ + if (c->mem_usage_bucket) { + c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage_on_bucket_update; + /* If this client can't be evicted then remove it from the mem usage + * buckets */ + if (!allow_eviction) { + listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node); + c->mem_usage_bucket = NULL; + c->mem_usage_bucket_node = NULL; + } + } + if (allow_eviction) { + clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage); + bucket->mem_usage_sum += c->last_memory_usage; + if (bucket != c->mem_usage_bucket) { + if (c->mem_usage_bucket) + listDelNode(c->mem_usage_bucket->clients, + c->mem_usage_bucket_node); + c->mem_usage_bucket = bucket; + listAddNodeTail(bucket->clients, c); + c->mem_usage_bucket_node = listLast(bucket->clients); + } + } + + c->last_memory_usage_on_bucket_update = c->last_memory_usage; +} + /* Return the max samples in the memory usage of clients tracked by * the function clientsCronTrackExpansiveClients(). */ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { @@ -2271,7 +2335,13 @@ void clientsCron(void) { if (clientsCronHandleTimeout(c,now)) continue; if (clientsCronResizeQueryBuffer(c)) continue; if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue; - if (clientsCronTrackClientsMemUsage(c)) continue; + + /* Iterating all the clients in getMemoryOverheadData() is too slow and + * in turn would make the INFO command too slow. So we perform this + * computation incrementally and track the (not instantaneous but updated + * to the second) total memory used by clients using clientsCron() in + * a more incremental way (depending on server.hz). */ + if (updateClientMemUsage(c)) continue; if (closeClientOnOutputBufferLimitReached(c, 0)) continue; } } @@ -2879,6 +2949,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * visit processCommand() at all). */ handleClientsBlockedOnKeys(); + /* Disconnect some clients if they are consuming too much memory. */ + evictClients(); + /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ @@ -3509,6 +3582,7 @@ void resetServerStats(void) { server.stat_expired_time_cap_reached_count = 0; server.stat_expire_cycle_time_used = 0; server.stat_evictedkeys = 0; + server.stat_evictedclients = 0; server.stat_total_eviction_exceeded_time = 0; server.stat_last_eviction_exceeded_time = 0; server.stat_keyspace_misses = 0; @@ -3606,6 +3680,11 @@ void initServer(void) { exit(1); } + for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { + server.client_mem_usage_buckets[j].mem_usage_sum = 0; + server.client_mem_usage_buckets[j].clients = listCreate(); + } + createSharedObjects(); adjustOpenFilesLimit(); const char *clk_msg = monotonicInit(); @@ -4610,6 +4689,15 @@ int processCommand(client *c) { } } + /* Disconnect some clients if total clients memory is too high. We do this + * before key eviction, after the last command was executed and consumed + * some client output buffer memory. */ + evictClients(); + if (server.current_client == NULL) { + /* If we evicted ourself then abort processing the command */ + return C_ERR; + } + /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering @@ -5673,6 +5761,7 @@ sds genRedisInfoString(const char *section) { "expired_time_cap_reached_count:%lld\r\n" "expire_cycle_cpu_milliseconds:%lld\r\n" "evicted_keys:%lld\r\n" + "evicted_clients:%lld\r\n" "total_eviction_exceeded_time:%lld\r\n" "current_eviction_exceeded_time:%lld\r\n" "keyspace_hits:%lld\r\n" @@ -5715,6 +5804,7 @@ sds genRedisInfoString(const char *section) { server.stat_expired_time_cap_reached_count, server.stat_expire_cycle_time_used/1000, server.stat_evictedkeys, + server.stat_evictedclients, (server.stat_total_eviction_exceeded_time + current_eviction_exceeded_time) / 1000, current_eviction_exceeded_time / 1000, server.stat_keyspace_hits, diff --git a/src/server.h b/src/server.h index 39184591d..3a94effe9 100644 --- a/src/server.h +++ b/src/server.h @@ -125,6 +125,12 @@ typedef long long ustime_t; /* microsecond time type. */ #define CONFIG_MIN_RESERVED_FDS 32 #define CONFIG_DEFAULT_PROC_TITLE_TEMPLATE "{title} {listen-addr} {server-mode}" +/* Bucket sizes for client eviction pools. Each bucket stores clients with + * memory usage of up to twice the size of the bucket below it. */ +#define CLIENT_MEM_USAGE_BUCKET_MIN_LOG 15 /* Bucket sizes start at up to 32KB (2^15) */ +#define CLIENT_MEM_USAGE_BUCKET_MAX_LOG 33 /* Bucket for largest clients: sizes above 4GB (2^32) */ +#define CLIENT_MEM_USAGE_BUCKETS (1+CLIENT_MEM_USAGE_BUCKET_MAX_LOG-CLIENT_MEM_USAGE_BUCKET_MIN_LOG) + #define ACTIVE_EXPIRE_CYCLE_SLOW 0 #define ACTIVE_EXPIRE_CYCLE_FAST 1 @@ -275,9 +281,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ -#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put - in the list of clients we can read - from. */ +/* #define CLIENT_... (1<<29) currently unused, feel free to use in the future */ #define CLIENT_PENDING_COMMAND (1<<30) /* Indicates the client has a fully * parsed command ready for execution. */ #define CLIENT_TRACKING (1ULL<<31) /* Client enabled keys tracking in order to @@ -299,6 +303,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; and AOF client */ #define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants RDB without replication buffer. */ +#define CLIENT_NO_EVICT (1ULL<<43) /* This client is protected against client + memory eviction. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -797,6 +803,7 @@ typedef struct multiState { int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it is possible to know if all the commands have a certain flag. */ + size_t argv_len_sums; /* mem used by all commands arguments */ } multiState; /* This structure holds the blocking operation state for a client. @@ -912,6 +919,11 @@ typedef struct { need more reserved IDs use UINT64_MAX-1, -2, ... and so forth. */ +typedef struct { + list *clients; + size_t mem_usage_sum; +} clientMemUsageBucket; + typedef struct client { uint64_t id; /* Client incremental unique ID. */ connection *conn; @@ -976,6 +988,7 @@ typedef struct client { sds sockname; /* Cached connection target address. */ listNode *client_list_node; /* list node in client list */ listNode *paused_list_node; /* list node within the pause list */ + listNode *pending_read_list_node; /* list node in clients pending read list */ RedisModuleUserChangedFunc auth_callback; /* Module callback to execute * when the authenticated user * changes. */ @@ -993,13 +1006,18 @@ typedef struct client { rax *client_tracking_prefixes; /* A dictionary of prefixes we are already subscribed to in BCAST mode, in the context of client side caching. */ - /* In clientsCronTrackClientsMemUsage() we track the memory usage of + /* In updateClientMemUsage() we track the memory usage of * each client and add it to the sum of all the clients of a given type, * however we need to remember what was the old contribution of each * client, and in which category the client was, in order to remove it * before adding it the new value. */ - uint64_t client_cron_last_memory_usage; - int client_cron_last_memory_type; + size_t last_memory_usage; + int last_memory_type; + + size_t last_memory_usage_on_bucket_update; + listNode *mem_usage_bucket_node; + clientMemUsageBucket *mem_usage_bucket; + /* Response buffer */ int bufpos; size_t buf_usable_size; /* Usable size of buffer. */ @@ -1288,6 +1306,10 @@ struct redisServer { list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client executing the command. */ + + /* Stuff for client mem eviction */ + clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS]; + rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ long fixed_time_expire; /* If > 0, expire keys against server.mstime. */ rax *clients_index; /* Active clients dictionary by client ID. */ @@ -1319,6 +1341,7 @@ struct redisServer { long long stat_expired_time_cap_reached_count; /* Early expire cycle stops.*/ long long stat_expire_cycle_time_used; /* Cumulative microseconds used. */ long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */ + long long stat_evictedclients; /* Number of evicted clients */ long long stat_total_eviction_exceeded_time; /* Total time over the memory limit, unit us */ monotime stat_last_eviction_exceeded_time; /* Timestamp of current eviction start, unit us */ long long stat_keyspace_hits; /* Number of successful lookups of keys */ @@ -1354,7 +1377,7 @@ struct redisServer { size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ double stat_module_progress; /* Module save progress. */ - uint64_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ + redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ @@ -1553,6 +1576,7 @@ struct redisServer { /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ + ssize_t maxmemory_clients; /* Memory limit for total client buffers */ int maxmemory_policy; /* Policy for key eviction */ int maxmemory_samples; /* Precision of random sampling */ int maxmemory_eviction_tenacity;/* Aggressiveness of eviction processing */ @@ -1882,6 +1906,11 @@ typedef struct { #define OBJ_HASH_KEY 1 #define OBJ_HASH_VALUE 2 +#define IO_THREADS_OP_IDLE 0 +#define IO_THREADS_OP_READ 1 +#define IO_THREADS_OP_WRITE 2 +extern int io_threads_op; + /*----------------------------------------------------------------------------- * Extern declarations *----------------------------------------------------------------------------*/ @@ -1966,6 +1995,7 @@ void redisSetCpuAffinity(const char *cpulist); client *createClient(connection *conn); void freeClient(client *c); void freeClientAsync(client *c); +int beforeNextClient(client *c); void resetClient(client *c); void freeClientOriginalArgv(client *c); void sendReplyToClient(connection *conn); @@ -2026,7 +2056,8 @@ void rewriteClientCommandVector(client *c, int argc, ...); void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); void redactClientCommandArgument(client *c, int argc); -unsigned long getClientOutputBufferMemoryUsage(client *c); +size_t getClientOutputBufferMemoryUsage(client *c); +size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage); int freeClientsInAsyncFreeQueue(void); int closeClientOnOutputBufferLimitReached(client *c, int async); int getClientType(client *c); @@ -2034,6 +2065,7 @@ int getClientTypeByName(char *name); char *getClientTypeName(int class); void flushSlavesOutputBuffers(void); void disconnectSlaves(void); +void evictClients(void); int listenToPort(int port, socketFds *fds); void pauseClients(mstime_t duration, pause_type type); void unpauseClients(void); @@ -2048,6 +2080,8 @@ int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void); int stopThreadedIOIfNeeded(void); int clientHasPendingReplies(client *c); +int updateClientMemUsage(client *c); +void updateClientMemUsageBucket(client *c); void unlinkClient(client *c); int writeToClient(client *c, int handler_installed); void linkClient(client *c); @@ -2106,6 +2140,7 @@ void unwatchAllKeys(client *c); void initClientMultiState(client *c); void freeClientMultiState(client *c); void queueMultiCommand(client *c); +size_t multiStateMemOverhead(client *c); void touchWatchedKey(redisDb *db, robj *key); int isWatchedKeyExpired(client *c); void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with); diff --git a/src/tracking.c b/src/tracking.c index d6f2bf149..1e84cc3c1 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -305,6 +305,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { addReplyArrayLen(c,1); addReplyBulkCBuffer(c,keyname,keylen); } + updateClientMemUsage(c); } /* This function is called when a key is modified in Redis and in the case diff --git a/src/util.c b/src/util.c index d57420007..61b0ed4f0 100644 --- a/src/util.c +++ b/src/util.c @@ -204,7 +204,10 @@ unsigned long long memtoull(const char *p, int *err) { /* Search the first non digit character. */ u = p; - if (*u == '-') u++; + if (*u == '-') { + if (err) *err = 1; + return 0; + } while(*u && isdigit(*u)) u++; if (*u == '\0' || !strcasecmp(u,"b")) { mul = 1; diff --git a/tests/support/util.tcl b/tests/support/util.tcl index a834d4abd..67551b041 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -4,7 +4,7 @@ proc randstring {min max {type binary}} { if {$type eq {binary}} { set minval 0 set maxval 255 - } elseif {$type eq {alpha}} { + } elseif {$type eq {alpha} || $type eq {simplealpha}} { set minval 48 set maxval 122 } elseif {$type eq {compr}} { @@ -12,11 +12,10 @@ proc randstring {min max {type binary}} { set maxval 52 } while {$len} { - set rr [expr {$minval+int(rand()*($maxval-$minval+1))}] - if {$type eq {alpha} && $rr eq 92} { - set rr 90; # avoid putting '\' char in the string, it can mess up TCL processing - } - append output [format "%c" $rr] + set rr [format "%c" [expr {$minval+int(rand()*($maxval-$minval+1))}]] + if {$type eq {simplealpha} && ![string is alnum $rr]} {continue} + if {$type eq {alpha} && $rr eq 92} {continue} ;# avoid putting '\' char in the string, it can mess up TCL processing + append output $rr incr len -1 } return $output diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 5150790db..a5cc2d4ac 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -82,6 +82,7 @@ set ::all_tests { unit/oom-score-adj unit/shutdown unit/networking + unit/client-eviction } # Index to the next test to run in the ::all_tests list. set ::next_test 0 diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl new file mode 100644 index 000000000..9421d8b61 --- /dev/null +++ b/tests/unit/client-eviction.tcl @@ -0,0 +1,509 @@ +tags {"external:skip"} { + +# Get info about a redis client connection: +# name - name of client we want to query +# f - field name from "CLIENT LIST" we want to get +proc client_field {name f} { + set clients [split [string trim [r client list]] "\r\n"] + set c [lsearch -inline $clients *name=$name*] + if {![regexp $f=(\[a-zA-Z0-9-\]+) $c - res]} { + error "no client named $name found with field $f" + } + return $res +} + +proc client_exists {name} { + if {[catch { client_field $name tot-mem } e]} { + return false + } + return true +} + +proc gen_client {} { + set rr [redis_client] + set name "tst_[randstring 4 4 simplealpha]" + $rr client setname $name + assert {[client_exists $name]} + return [list $rr $name] +} + +# Sum a value across all redis client connections: +# f - the field name from "CLIENT LIST" we want to sum +proc clients_sum {f} { + set sum 0 + set clients [split [string trim [r client list]] "\r\n"] + foreach c $clients { + if {![regexp $f=(\[a-zA-Z0-9-\]+) $c - res]} { + error "field $f not found in $c" + } + incr sum $res + } + return $sum +} + +proc mb {v} { + return [expr $v * 1024 * 1024] +} + +start_server {} { + set maxmemory_clients 3000000 + r config set maxmemory-clients $maxmemory_clients + + test "client evicted due to large argv" { + r flushdb + lassign [gen_client] rr cname + # Attempt a large multi-bulk command under eviction limit + $rr mset k v k2 [string repeat v 1000000] + assert_equal [$rr get k] v + # Attempt another command, now causing client eviction + catch { $rr mset k v k2 [string repeat v $maxmemory_clients] } e + assert {![client_exists $cname]} + $rr close + } + + test "client evicted due to large query buf" { + r flushdb + lassign [gen_client] rr cname + # Attempt to fill the query buff without completing the argument above the limit, causing client eviction + catch { + $rr write [join [list "*1\r\n\$$maxmemory_clients\r\n" [string repeat v $maxmemory_clients]] ""] + $rr flush + $rr read + } e + assert {![client_exists $cname]} + $rr close + } + + test "client evicted due to percentage of maxmemory" { + set maxmemory [mb 6] + r config set maxmemory $maxmemory + # Set client eviction threshold to 7% of maxmemory + set maxmemory_clients_p 7 + r config set maxmemory-clients $maxmemory_clients_p% + r flushdb + + set maxmemory_clients_actual [expr $maxmemory * $maxmemory_clients_p / 100] + + lassign [gen_client] rr cname + # Attempt to fill the query buff with only half the percentage threshold verify we're not disconnected + set n [expr $maxmemory_clients_actual / 2] + $rr write [join [list "*1\r\n\$$n\r\n" [string repeat v $n]] ""] + $rr flush + set tot_mem [client_field $cname tot-mem] + assert {$tot_mem >= $n && $tot_mem < $maxmemory_clients_actual} + + # Attempt to fill the query buff with the percentage threshold of maxmemory and verify we're evicted + $rr close + lassign [gen_client] rr cname + catch { + $rr write [join [list "*1\r\n\$$maxmemory_clients_actual\r\n" [string repeat v $maxmemory_clients_actual]] ""] + $rr flush + } e + assert {![client_exists $cname]} + $rr close + + # Restore settings + r config set maxmemory 0 + r config set maxmemory-clients $maxmemory_clients + } + + test "client evicted due to large multi buf" { + r flushdb + lassign [gen_client] rr cname + + # Attempt a multi-exec where sum of commands is less than maxmemory_clients + $rr multi + $rr set k [string repeat v [expr $maxmemory_clients / 4]] + $rr set k [string repeat v [expr $maxmemory_clients / 4]] + assert_equal [$rr exec] {OK OK} + + # Attempt a multi-exec where sum of commands is more than maxmemory_clients, causing client eviction + $rr multi + catch { + for {set j 0} {$j < 5} {incr j} { + $rr set k [string repeat v [expr $maxmemory_clients / 4]] + } + } e + assert {![client_exists $cname]} + $rr close + } + + test "client evicted due to watched key list" { + r flushdb + set rr [redis_client] + + # Since watched key list is a small overheatd this test uses a minimal maxmemory-clients config + set temp_maxmemory_clients 200000 + r config set maxmemory-clients $temp_maxmemory_clients + + # Append watched keys until list maxes out maxmemroy clients and causes client eviction + catch { + for {set j 0} {$j < $temp_maxmemory_clients} {incr j} { + $rr watch $j + } + } e + assert_match {I/O error reading reply} $e + $rr close + + # Restore config for next tests + r config set maxmemory-clients $maxmemory_clients + } + + test "client evicted due to pubsub subscriptions" { + r flushdb + + # Since pubsub subscriptions cause a small overheatd this test uses a minimal maxmemory-clients config + set temp_maxmemory_clients 200000 + r config set maxmemory-clients $temp_maxmemory_clients + + # Test eviction due to pubsub patterns + set rr [redis_client] + # Add patterns until list maxes out maxmemroy clients and causes client eviction + catch { + for {set j 0} {$j < $temp_maxmemory_clients} {incr j} { + $rr psubscribe $j + } + } e + assert_match {I/O error reading reply} $e + $rr close + + # Test eviction due to pubsub channels + set rr [redis_client] + # Add patterns until list maxes out maxmemroy clients and causes client eviction + catch { + for {set j 0} {$j < $temp_maxmemory_clients} {incr j} { + $rr subscribe $j + } + } e + assert_match {I/O error reading reply} $e + $rr close + + # Restore config for next tests + r config set maxmemory-clients $maxmemory_clients + } + + test "client evicted due to tracking redirection" { + r flushdb + # Use slow hz to avoid clientsCron from updating memory usage frequently since + # we're testing the update logic when writing tracking redirection output + set backup_hz [lindex [r config get hz] 1] + r config set hz 1 + + set rr [redis_client] + set redirected_c [redis_client] + $redirected_c client setname redirected_client + set redir_id [$redirected_c client id] + $redirected_c SUBSCRIBE __redis__:invalidate + $rr client tracking on redirect $redir_id bcast + # Use a big key name to fill the redirected tracking client's buffer quickly + set key_length [expr 1024*10] + set long_key [string repeat k $key_length] + # Use a script so we won't need to pass the long key name when dirtying it in the loop + set script_sha [$rr script load "redis.call('incr', '$long_key')"] + # Read and write to same (long) key until redirected_client's buffers cause it to be evicted + set t [clock milliseconds] + catch { + while true { + set mem [client_field redirected_client tot-mem] + assert {$mem < $maxmemory_clients} + $rr evalsha $script_sha 0 + } + } e + assert_match {no client named redirected_client found*} $e + + # Make sure eviction happened in less than 1sec, this means, statistically eviction + # wasn't caused by clientCron accounting + set t [expr [clock milliseconds] - $t] + assert {$t < 1000} + + r config set hz $backup_hz + $rr close + $redirected_c close + } + + test "client evicted due to client tracking prefixes" { + r flushdb + set rr [redis_client] + + # Since tracking prefixes list is a small overheatd this test uses a minimal maxmemory-clients config + set temp_maxmemory_clients 200000 + r config set maxmemory-clients $temp_maxmemory_clients + + # Append tracking prefixes until list maxes out maxmemroy clients and causes client eviction + catch { + for {set j 0} {$j < $temp_maxmemory_clients} {incr j} { + $rr client tracking on prefix [format %012s $j] bcast + } + } e + assert_match {I/O error reading reply} $e + $rr close + + # Restore config for next tests + r config set maxmemory-clients $maxmemory_clients + } + + test "client evicted due to output buf" { + r flushdb + r setrange k 200000 v + set rr [redis_deferring_client] + $rr client setname test_client + $rr flush + assert {[$rr read] == "OK"} + # Attempt a large response under eviction limit + $rr get k + $rr flush + assert {[string length [$rr read]] == 200001} + set mem [client_field test_client tot-mem] + assert {$mem < $maxmemory_clients} + + # Fill output buff in loop without reading it and make sure + # we're eventually disconnected, but before reaching maxmemory_clients + while true { + if { [catch { + set mem [client_field test_client tot-mem] + assert {$mem < $maxmemory_clients} + $rr get k + $rr flush + } e]} { + assert {![client_exists test_client]} + break + } + } + $rr close + } + + foreach {no_evict} {on off} { + test "client no-evict $no_evict" { + r flushdb + r client setname control + r client no-evict on ;# Avoid evicting the main connection + lassign [gen_client] rr cname + $rr client no-evict $no_evict + + # Overflow maxmemory-clients + set qbsize [expr {$maxmemory_clients + 1}] + if {[catch { + $rr write [join [list "*1\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""] + $rr flush + wait_for_condition 200 10 { + [client_field $cname qbuf] == $qbsize + } else { + fail "Failed to fill qbuf for test" + } + } e] && $no_evict == off} { + assert {![client_exists $cname]} + } elseif {$no_evict == on} { + assert {[client_field $cname tot-mem] > $maxmemory_clients} + } + $rr close + } + } +} + +start_server {} { + set server_pid [s process_id] + set maxmemory_clients [mb 10] + set obuf_limit [mb 3] + r config set maxmemory-clients $maxmemory_clients + r config set client-output-buffer-limit "normal $obuf_limit 0 0" + + test "avoid client eviction when client is freed by output buffer limit" { + r flushdb + set obuf_size [expr {$obuf_limit + [mb 1]}] + r setrange k $obuf_size v + set rr1 [redis_client] + $rr1 client setname "qbuf-client" + set rr2 [redis_deferring_client] + $rr2 client setname "obuf-client1" + assert_equal [$rr2 read] OK + set rr3 [redis_deferring_client] + $rr3 client setname "obuf-client2" + assert_equal [$rr3 read] OK + + # Occupy client's query buff with less than output buffer limit left to exceed maxmemory-clients + set qbsize [expr {$maxmemory_clients - $obuf_size}] + $rr1 write [join [list "*1\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""] + $rr1 flush + # Wait for qbuff to be as expected + wait_for_condition 200 10 { + [client_field qbuf-client qbuf] == $qbsize + } else { + fail "Failed to fill qbuf for test" + } + + # Make the other two obuf-clients pass obuf limit and also pass maxmemory-clients + # We use two obuf-clients to make sure that even if client eviction is attempted + # between two command processing (with no sleep) we don't perform any client eviction + # because the obuf limit is enforced with precedence. + exec kill -SIGSTOP $server_pid + $rr2 get k + $rr2 flush + $rr3 get k + $rr3 flush + exec kill -SIGCONT $server_pid + + # Validate obuf-clients were disconnected (because of obuf limit) + catch {client_field obuf-client1 name} e + assert_match {no client named obuf-client1 found*} $e + catch {client_field obuf-client2 name} e + assert_match {no client named obuf-client2 found*} $e + + # Validate qbuf-client is still connected and wasn't evicted + assert_equal [client_field qbuf-client name] {qbuf-client} + + $rr1 close + $rr2 close + $rr3 close + } +} + +start_server {} { + test "decrease maxmemory-clients causes client eviction" { + set maxmemory_clients [mb 4] + set client_count 10 + set qbsize [expr ($maxmemory_clients - [mb 1]) / $client_count] + r config set maxmemory-clients $maxmemory_clients + + + # Make multiple clients consume together roughly 1mb less than maxmemory_clients + set rrs {} + for {set j 0} {$j < $client_count} {incr j} { + set rr [redis_client] + lappend rrs $rr + $rr client setname client$j + $rr write [join [list "*2\r\n\$$qbsize\r\n" [string repeat v $qbsize]] ""] + $rr flush + wait_for_condition 200 10 { + [client_field client$j qbuf] >= $qbsize + } else { + fail "Failed to fill qbuf for test" + } + } + + # Make sure all clients are still connected + set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]] + assert {$connected_clients == $client_count} + + # Decrease maxmemory_clients and expect client eviction + r config set maxmemory-clients [expr $maxmemory_clients / 2] + set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]] + assert {$connected_clients > 0 && $connected_clients < $client_count} + + foreach rr $rrs {$rr close} + } +} + +start_server {} { + test "evict clients only until below limit" { + set client_count 10 + set client_mem [mb 1] + r config set maxmemory-clients 0 + r client setname control + r client no-evict on + + # Make multiple clients consume together roughly 1mb less than maxmemory_clients + set total_client_mem 0 + set rrs {} + for {set j 0} {$j < $client_count} {incr j} { + set rr [redis_client] + lappend rrs $rr + $rr client setname client$j + $rr write [join [list "*2\r\n\$$client_mem\r\n" [string repeat v $client_mem]] ""] + $rr flush + wait_for_condition 200 10 { + [client_field client$j tot-mem] >= $client_mem + } else { + fail "Failed to fill qbuf for test" + } + incr total_client_mem [client_field client$j tot-mem] + } + + set client_actual_mem [expr $total_client_mem / $client_count] + + # Make sure client_acutal_mem is more or equal to what we intended + assert {$client_actual_mem >= $client_mem} + + # Make sure all clients are still connected + set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]] + assert {$connected_clients == $client_count} + + # Set maxmemory-clients to accommodate half our clients (taking into account the control client) + set maxmemory_clients [expr ($client_actual_mem * $client_count) / 2 + [client_field control tot-mem]] + r config set maxmemory-clients $maxmemory_clients + + # Make sure total used memory is below maxmemory_clients + set total_client_mem [clients_sum tot-mem] + assert {$total_client_mem <= $maxmemory_clients} + + # Make sure we have only half of our clients now + set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]] + assert {$connected_clients == [expr $client_count / 2]} + + foreach rr $rrs {$rr close} + } +} + +start_server {} { + test "evict clients in right order (large to small)" { + # Note that each size step needs to be at least x2 larger than previous step + # because of how the client-eviction size bucktting works + set sizes [list 100000 [mb 1] [mb 3]] + set clients_per_size 3 + r client setname control + r client no-evict on + r config set maxmemory-clients 0 + + # Run over all sizes and create some clients using up that size + set total_client_mem 0 + set rrs {} + for {set i 0} {$i < [llength $sizes]} {incr i} { + set size [lindex $sizes $i] + + for {set j 0} {$j < $clients_per_size} {incr j} { + set rr [redis_client] + lappend rrs $rr + $rr client setname client-$i + $rr write [join [list "*2\r\n\$$size\r\n" [string repeat v $size]] ""] + $rr flush + } + set client_mem [client_field client-$i tot-mem] + + # Update our size list based on actual used up size (this is usually + # slightly more than expected because of allocator bins + assert {$client_mem >= $size} + set sizes [lreplace $sizes $i $i $client_mem] + + # Account total client memory usage + incr total_mem [expr $clients_per_size * $client_mem] + } + incr total_mem [client_field control tot-mem] + + # Make sure all clients are connected + set clients [split [string trim [r client list]] "\r\n"] + for {set i 0} {$i < [llength $sizes]} {incr i} { + assert_equal [llength [lsearch -all $clients "*name=client-$i *"]] $clients_per_size + } + + # For each size reduce maxmemory-clients so relevant clients should be evicted + # do this from largest to smallest + foreach size [lreverse $sizes] { + set total_mem [expr $total_mem - $clients_per_size * $size] + r config set maxmemory-clients $total_mem + set clients [split [string trim [r client list]] "\r\n"] + # Verify only relevant clients were evicted + for {set i 0} {$i < [llength $sizes]} {incr i} { + set verify_size [lindex $sizes $i] + set count [llength [lsearch -all $clients "*name=client-$i *"]] + if {$verify_size < $size} { + assert_equal $count $clients_per_size + } else { + assert_equal $count 0 + } + } + } + foreach rr $rrs {$rr close} + } +} + +} + diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index dd986dd72..0bc0e4b6d 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -1,3 +1,140 @@ +start_server {tags {"maxmemory" "external:skip"}} { + r config set maxmemory 11mb + r config set maxmemory-policy allkeys-lru + set server_pid [s process_id] + + proc init_test {client_eviction} { + r flushdb + + set prev_maxmemory_clients [r config get maxmemory-clients] + if $client_eviction { + r config set maxmemory-clients 3mb + } else { + r config set maxmemory-clients 0 + } + + r config resetstat + # fill 5mb using 50 keys of 100kb + for {set j 0} {$j < 50} {incr j} { + r setrange $j 100000 x + } + assert_equal [r dbsize] 50 + } + + proc verify_test {client_eviction} { + set evicted_keys [s evicted_keys] + set evicted_clients [s evicted_clients] + set dbsize [r dbsize] + + if $::verbose { + puts "evicted keys: $evicted_keys" + puts "evicted clients: $evicted_clients" + puts "dbsize: $dbsize" + } + + if $client_eviction { + assert_morethan $evicted_clients 0 + assert_equal $evicted_keys 0 + assert_equal $dbsize 50 + } else { + assert_equal $evicted_clients 0 + assert_morethan $evicted_keys 0 + assert_lessthan $dbsize 50 + } + } + + foreach {client_eviction} {false true} { + set clients {} + test "eviction due to output buffers of many MGET clients, client eviction: $client_eviction" { + init_test $client_eviction + + for {set j 0} {$j < 20} {incr j} { + set rr [redis_deferring_client] + lappend clients $rr + } + + # Freeze the server so output buffers will be filled in one event loop when we un-freeze after sending mgets + exec kill -SIGSTOP $server_pid + for {set j 0} {$j < 5} {incr j} { + foreach rr $clients { + $rr mget 1 + $rr flush + } + } + # Unfreeze server + exec kill -SIGCONT $server_pid + + + for {set j 0} {$j < 5} {incr j} { + foreach rr $clients { + if {[catch { $rr read } err]} { + lremove clients $rr + } + } + } + + verify_test $client_eviction + } + foreach rr $clients { + $rr close + } + + set clients {} + test "eviction due to input buffer of a dead client, client eviction: $client_eviction" { + init_test $client_eviction + + for {set j 0} {$j < 30} {incr j} { + set rr [redis_deferring_client] + lappend clients $rr + } + + foreach rr $clients { + if {[catch { + $rr write "*250\r\n" + for {set j 0} {$j < 249} {incr j} { + $rr write "\$1000\r\n" + $rr write [string repeat x 1000] + $rr write "\r\n" + $rr flush + } + }]} { + lremove clients $rr + } + } + + verify_test $client_eviction + } + foreach rr $clients { + $rr close + } + + set clients {} + test "eviction due to output buffers of pubsub, client eviction: $client_eviction" { + init_test $client_eviction + + for {set j 0} {$j < 10} {incr j} { + set rr [redis_deferring_client] + lappend clients $rr + } + + foreach rr $clients { + $rr subscribe bla + $rr flush + } + + for {set j 0} {$j < 40} {incr j} { + catch {r publish bla [string repeat x 100000]} err + } + + verify_test $client_eviction + } + foreach rr $clients { + $rr close + } + } + +} + start_server {tags {"maxmemory external:skip"}} { test "Without maxmemory small integers are shared" { r config set maxmemory 0 |