diff options
-rw-r--r-- | src/config.c | 33 | ||||
-rw-r--r-- | src/debug.c | 4 | ||||
-rw-r--r-- | src/networking.c | 14 | ||||
-rw-r--r-- | src/object.c | 2 | ||||
-rw-r--r-- | src/pubsub.c | 4 | ||||
-rw-r--r-- | src/replication.c | 2 | ||||
-rw-r--r-- | src/server.c | 135 | ||||
-rw-r--r-- | src/server.h | 10 | ||||
-rw-r--r-- | src/tracking.c | 2 | ||||
-rw-r--r-- | tests/unit/client-eviction.tcl | 51 |
10 files changed, 196 insertions, 61 deletions
diff --git a/src/config.c b/src/config.c index e57946429..78553b758 100644 --- a/src/config.c +++ b/src/config.c @@ -2976,6 +2976,37 @@ void rewriteConfigLatencyTrackingInfoPercentilesOutputOption(standardConfig *con rewriteConfigRewriteLine(state,name,line,1); } +static int applyClientMaxMemoryUsage(const char **err) { + UNUSED(err); + listIter li; + listNode *ln; + + /* server.client_mem_usage_buckets is an indication that the previous config + * was non-zero, in which case we can exit and no apply is needed. */ + if(server.maxmemory_clients !=0 && server.client_mem_usage_buckets) + return 1; + if (server.maxmemory_clients != 0) + initServerClientMemUsageBuckets(); + + /* When client eviction is enabled update memory buckets for all clients. + * When disabled, clear that data structure. */ + listRewind(server.clients, &li); + while ((ln = listNext(&li)) != NULL) { + client *c = listNodeValue(ln); + if (server.maxmemory_clients == 0) { + /* Remove client from memory usage bucket. */ + removeClientFromMemUsageBucket(c, 0); + } else { + /* Update each client(s) memory usage and add to appropriate bucket. */ + updateClientMemUsageAndBucket(c); + } + } + + if (server.maxmemory_clients == 0) + freeServerClientMemUsageBuckets(); + return 1; +} + standardConfig static_configs[] = { /* Bool configs */ createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL), @@ -3146,7 +3177,7 @@ standardConfig static_configs[] = { createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */ createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */ - createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL), + createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, applyClientMaxMemoryUsage), /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ diff --git a/src/debug.c b/src/debug.c index b642e4eac..43de7d2db 100644 --- a/src/debug.c +++ b/src/debug.c @@ -947,6 +947,10 @@ NULL else addReply(c, shared.ok); } else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) { + if (!server.client_mem_usage_buckets) { + addReplyError(c,"maxmemory-clients is disabled."); + return; + } sds bucket_info = sdsempty(); for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { if (j == 0) diff --git a/src/networking.c b/src/networking.c index 76566ddd7..8e7c5ace1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1970,7 +1970,7 @@ int writeToClient(client *c, int handler_installed) { * Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in * handleClientsWithPendingWritesUsingThreads(). */ if (io_threads_op == IO_THREADS_OP_IDLE) - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); return C_OK; } @@ -2420,7 +2420,7 @@ int processCommandAndResetClient(client *c) { commandProcessed(c); /* Update the client's memory to include output buffer growth following the * processed command. */ - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); } if (server.current_client == NULL) deadclient = 1; @@ -2557,7 +2557,7 @@ int processInputBuffer(client *c) { * important in case the query buffer is big and wasn't drained during * the above loop (because of partially sent big commands). */ if (io_threads_op == IO_THREADS_OP_IDLE) - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); return C_OK; } @@ -2995,9 +2995,11 @@ NULL /* CLIENT NO-EVICT ON|OFF */ if (!strcasecmp(c->argv[2]->ptr,"on")) { c->flags |= CLIENT_NO_EVICT; + removeClientFromMemUsageBucket(c, 0); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[2]->ptr,"off")) { c->flags &= ~CLIENT_NO_EVICT; + updateClientMemUsageAndBucket(c); addReply(c,shared.ok); } else { addReplyErrorObject(c,shared.syntaxerr); @@ -4228,7 +4230,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { client *c = listNodeValue(ln); /* Update the client in the mem usage after we're done processing it in the io-threads */ - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); /* Install the write handler if there are pending writes in some * of the clients. */ @@ -4337,7 +4339,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { } /* Once io-threads are idle we can update the client in the mem usage */ - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); if (processPendingCommandAndInputBuffer(c) == C_ERR) { /* If the client is no longer valid, we avoid @@ -4384,6 +4386,8 @@ size_t getClientEvictionLimit(void) { } void evictClients(void) { + if (!server.client_mem_usage_buckets) + return; /* Start eviction from topmost bucket (largest clients) */ int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1; listIter bucket_iter; diff --git a/src/object.c b/src/object.c index 7d71f59c4..f2ea7dafe 100644 --- a/src/object.c +++ b/src/object.c @@ -1208,7 +1208,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { /* Computing the memory used by the clients would be O(N) if done * here online. We use our values computed incrementally by - * updateClientMemUsage(). */ + * updateClientMemoryUsage(). */ mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+ server.stat_clients_type_memory[CLIENT_TYPE_NORMAL]; diff --git a/src/pubsub.c b/src/pubsub.c index 0b909bddd..2e2522c57 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -465,7 +465,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReplyPubsubMessage(c,channel,message,*type.messageBulk); - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); receivers++; } } @@ -491,7 +491,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); addReplyPubsubPatMessage(c,pattern,channel,message); - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); receivers++; } } diff --git a/src/replication.c b/src/replication.c index a57cd8528..797f6518d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -593,7 +593,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, while((ln = listNext(&li))) { client *monitor = ln->value; addReply(monitor,cmdobj); - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); } decrRefCount(cmdobj); } diff --git a/src/server.c b/src/server.c index 7bc320795..b4ea6f9e3 100644 --- a/src/server.c +++ b/src/server.c @@ -839,37 +839,44 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) { return &server.client_mem_usage_buckets[bucket_idx]; } -/* This is called both on explicit clients when something changed their buffers, - * so we can track clients' memory and enforce clients' maxmemory in real time, - * and also from the clientsCron. We call it from the cron so we have updated - * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration - * change requires us to evict a non-active client. +/* + * This method updates the client memory usage and update the + * server stats for client type. * - * This also adds the client to the correct memory usage bucket. Each bucket contains - * all clients with roughly the same amount of memory. This way we group - * together clients consuming about the same amount of memory and can quickly - * free them in case we reach maxmemory-clients (client eviction). + * This method is called from the clientsCron to have updated + * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients to accurately + * provide information around clients memory usage. + * + * It is also used in updateClientMemUsageAndBucket to have latest + * client memory usage information to place it into appropriate client memory + * usage bucket. */ -int updateClientMemUsage(client *c) { - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); +void updateClientMemoryUsage(client *c) { size_t mem = getClientMemoryUsage(c, NULL); int type = getClientType(c); + /* Now that we have the memory used by the client, remove the old + * value from the old category, and add it back. */ + server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage; + server.stat_clients_type_memory[type] += mem; + /* Remember what we added and where, to remove it next time. */ + c->last_memory_type = type; + c->last_memory_usage = mem; +} - /* Remove the old value of the memory used by the client from the old - * category, and add it back. */ - if (type != c->last_memory_type) { - server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage; - server.stat_clients_type_memory[type] += mem; - c->last_memory_type = type; - } else { - server.stat_clients_type_memory[type] += mem - c->last_memory_usage; +int clientEvictionAllowed(client *c) { + if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) { + return 0; } + int type = getClientType(c); + return (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB); +} - int allow_eviction = - (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) && - !(c->flags & CLIENT_NO_EVICT); - /* Update the client in the mem usage buckets */ +/* This function is used to cleanup the client's previously tracked memory usage. + * This is called during incremental client memory usage tracking as well as + * used to reset when client to bucket allocation is not required when + * client eviction is disabled. */ +void removeClientFromMemUsageBucket(client *c, int allow_eviction) { if (c->mem_usage_bucket) { c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; /* If this client can't be evicted then remove it from the mem usage @@ -880,23 +887,42 @@ int updateClientMemUsage(client *c) { c->mem_usage_bucket_node = NULL; } } - if (allow_eviction) { - clientMemUsageBucket *bucket = getMemUsageBucket(mem); - bucket->mem_usage_sum += mem; - if (bucket != c->mem_usage_bucket) { - if (c->mem_usage_bucket) - listDelNode(c->mem_usage_bucket->clients, - c->mem_usage_bucket_node); - c->mem_usage_bucket = bucket; - listAddNodeTail(bucket->clients, c); - c->mem_usage_bucket_node = listLast(bucket->clients); - } +} + +/* This is called only if explicit clients when something changed their buffers, + * so we can track clients' memory and enforce clients' maxmemory in real time. + * + * This also adds the client to the correct memory usage bucket. Each bucket contains + * all clients with roughly the same amount of memory. This way we group + * together clients consuming about the same amount of memory and can quickly + * free them in case we reach maxmemory-clients (client eviction). + * + * returns 1 if client eviction for this client is allowed, 0 otherwise. + */ +int updateClientMemUsageAndBucket(client *c) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + int allow_eviction = clientEvictionAllowed(c); + removeClientFromMemUsageBucket(c, allow_eviction); + + if (!allow_eviction) { + return 0; } - /* Remember what we added, to remove it next time. */ - c->last_memory_usage = mem; + /* Update client memory usage. */ + updateClientMemoryUsage(c); - return 0; + /* Update the client in the mem usage buckets */ + clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage); + bucket->mem_usage_sum += c->last_memory_usage; + if (bucket != c->mem_usage_bucket) { + if (c->mem_usage_bucket) + listDelNode(c->mem_usage_bucket->clients, + c->mem_usage_bucket_node); + c->mem_usage_bucket = bucket; + listAddNodeTail(bucket->clients, c); + c->mem_usage_bucket_node = listLast(bucket->clients); + } + return 1; } /* Return the max samples in the memory usage of clients tracked by @@ -984,8 +1010,11 @@ void clientsCron(void) { * in turn would make the INFO command too slow. So we perform this * computation incrementally and track the (not instantaneous but updated * to the second) total memory used by clients using clientsCron() in - * a more incremental way (depending on server.hz). */ - if (updateClientMemUsage(c)) continue; + * a more incremental way (depending on server.hz). + * If client eviction is enabled, update the bucket as well. */ + if (!updateClientMemUsageAndBucket(c)) + updateClientMemoryUsage(c); + if (closeClientOnOutputBufferLimitReached(c, 0)) continue; } } @@ -1865,6 +1894,25 @@ void createSharedObjects(void) { shared.maxstring = sdsnew("maxstring"); } +void initServerClientMemUsageBuckets() { + if (server.client_mem_usage_buckets) + return; + server.client_mem_usage_buckets = zmalloc(sizeof(clientMemUsageBucket)*CLIENT_MEM_USAGE_BUCKETS); + for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { + server.client_mem_usage_buckets[j].mem_usage_sum = 0; + server.client_mem_usage_buckets[j].clients = listCreate(); + } +} + +void freeServerClientMemUsageBuckets() { + if (!server.client_mem_usage_buckets) + return; + for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) + listRelease(server.client_mem_usage_buckets[j].clients); + zfree(server.client_mem_usage_buckets); + server.client_mem_usage_buckets = NULL; +} + void initServerConfig(void) { int j; char *default_bindaddr[CONFIG_DEFAULT_BINDADDR_COUNT] = CONFIG_DEFAULT_BINDADDR; @@ -2461,6 +2509,7 @@ void initServer(void) { server.cluster_drop_packet_filter = -1; server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME; server.reply_buffer_resizing_enabled = 1; + server.client_mem_usage_buckets = NULL; resetReplicationBuffer(); /* Make sure the locale is set on startup based on the config file. */ @@ -2469,11 +2518,6 @@ void initServer(void) { exit(1); } - for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { - server.client_mem_usage_buckets[j].mem_usage_sum = 0; - server.client_mem_usage_buckets[j].clients = listCreate(); - } - createSharedObjects(); adjustOpenFilesLimit(); const char *clk_msg = monotonicInit(); @@ -2606,6 +2650,9 @@ void initServer(void) { ACLUpdateDefaultUserPassword(server.requirepass); applyWatchdogPeriod(); + + if (server.maxmemory_clients != 0) + initServerClientMemUsageBuckets(); } void initListeners() { diff --git a/src/server.h b/src/server.h index 1ce5ff781..7e383c59b 100644 --- a/src/server.h +++ b/src/server.h @@ -1197,7 +1197,7 @@ typedef struct client { rax *client_tracking_prefixes; /* A dictionary of prefixes we are already subscribed to in BCAST mode, in the context of client side caching. */ - /* In updateClientMemUsage() we track the memory usage of + /* In updateClientMemoryUsage() we track the memory usage of * each client and add it to the sum of all the clients of a given type, * however we need to remember what was the old contribution of each * client, and in which category the client was, in order to remove it @@ -1551,7 +1551,7 @@ struct redisServer { client *current_client; /* Current client executing the command. */ /* Stuff for client mem eviction */ - clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS]; + clientMemUsageBucket* client_mem_usage_buckets; rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */ int in_nested_call; /* If > 0, in a nested call of a call */ @@ -2577,8 +2577,8 @@ int handleClientsWithPendingReadsUsingThreads(void); int stopThreadedIOIfNeeded(void); int clientHasPendingReplies(client *c); int islocalClient(client *c); -int updateClientMemUsage(client *c); -void updateClientMemUsageBucket(client *c); +int updateClientMemUsageAndBucket(client *c); +void removeClientFromMemUsageBucket(client *c, int allow_eviction); void unlinkClient(client *c); int writeToClient(client *c, int handler_installed); void linkClient(client *c); @@ -3117,6 +3117,8 @@ void initConfigValues(); void removeConfig(sds name); sds getConfigDebugInfo(); int allowProtectedAction(int config, client *c); +void initServerClientMemUsageBuckets(); +void freeServerClientMemUsageBuckets(); /* Module Configuration */ typedef struct ModuleConfig ModuleConfig; diff --git a/src/tracking.c b/src/tracking.c index 1b04c0626..d62eb736b 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -311,7 +311,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) { addReplyArrayLen(c,1); addReplyBulkCBuffer(c,keyname,keylen); } - updateClientMemUsage(c); + updateClientMemUsageAndBucket(c); } /* This function is called when a key is modified in Redis and in the case diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl index cd6ee1a6e..ca6d9020f 100644 --- a/tests/unit/client-eviction.tcl +++ b/tests/unit/client-eviction.tcl @@ -140,7 +140,7 @@ start_server {} { set temp_maxmemory_clients 200000 r config set maxmemory-clients $temp_maxmemory_clients - # Append watched keys until list maxes out maxmemroy clients and causes client eviction + # Append watched keys until list maxes out maxmemory clients and causes client eviction catch { for {set j 0} {$j < $temp_maxmemory_clients} {incr j} { $rr watch $j @@ -467,7 +467,7 @@ start_server {} { start_server {} { test "evict clients in right order (large to small)" { # Note that each size step needs to be at least x2 larger than previous step - # because of how the client-eviction size bucktting works + # because of how the client-eviction size bucketing works set sizes [list [kb 128] [mb 1] [mb 3]] set clients_per_size 3 r client setname control @@ -531,5 +531,52 @@ start_server {} { } {} {needs:debug} } +start_server {} { + foreach type {"client no-evict" "maxmemory-clients disabled"} { + r flushall + r client no-evict on + r config set maxmemory-clients 0 + + test "client total memory grows during $type" { + r setrange k [mb 1] v + set rr [redis_client] + $rr client setname test_client + if {$type eq "client no-evict"} { + $rr client no-evict on + r config set maxmemory-clients 1 + } + $rr deferred 1 + + # Fill output buffer in loop without reading it and make sure + # the tot-mem of client has increased (OS buffers didn't swallow it) + # and eviction not occurring. + while {true} { + $rr get k + $rr flush + after 10 + if {[client_field test_client tot-mem] > [mb 10]} { + break + } + } + + # Trigger the client eviction, by flipping the no-evict flag to off + if {$type eq "client no-evict"} { + $rr client no-evict off + } else { + r config set maxmemory-clients 1 + } + + # wait for the client to be disconnected + wait_for_condition 5000 50 { + ![client_exists test_client] + } else { + puts [r client list] + fail "client was not disconnected" + } + $rr close + } + } } +} ;# tags + |