summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/config.c33
-rw-r--r--src/debug.c4
-rw-r--r--src/networking.c14
-rw-r--r--src/object.c2
-rw-r--r--src/pubsub.c4
-rw-r--r--src/replication.c2
-rw-r--r--src/server.c135
-rw-r--r--src/server.h10
-rw-r--r--src/tracking.c2
-rw-r--r--tests/unit/client-eviction.tcl51
10 files changed, 196 insertions, 61 deletions
diff --git a/src/config.c b/src/config.c
index e57946429..78553b758 100644
--- a/src/config.c
+++ b/src/config.c
@@ -2976,6 +2976,37 @@ void rewriteConfigLatencyTrackingInfoPercentilesOutputOption(standardConfig *con
rewriteConfigRewriteLine(state,name,line,1);
}
+static int applyClientMaxMemoryUsage(const char **err) {
+ UNUSED(err);
+ listIter li;
+ listNode *ln;
+
+ /* server.client_mem_usage_buckets is an indication that the previous config
+ * was non-zero, in which case we can exit and no apply is needed. */
+ if(server.maxmemory_clients !=0 && server.client_mem_usage_buckets)
+ return 1;
+ if (server.maxmemory_clients != 0)
+ initServerClientMemUsageBuckets();
+
+ /* When client eviction is enabled update memory buckets for all clients.
+ * When disabled, clear that data structure. */
+ listRewind(server.clients, &li);
+ while ((ln = listNext(&li)) != NULL) {
+ client *c = listNodeValue(ln);
+ if (server.maxmemory_clients == 0) {
+ /* Remove client from memory usage bucket. */
+ removeClientFromMemUsageBucket(c, 0);
+ } else {
+ /* Update each client(s) memory usage and add to appropriate bucket. */
+ updateClientMemUsageAndBucket(c);
+ }
+ }
+
+ if (server.maxmemory_clients == 0)
+ freeServerClientMemUsageBuckets();
+ return 1;
+}
+
standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
@@ -3146,7 +3177,7 @@ standardConfig static_configs[] = {
createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL),
createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */
createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */
- createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL),
+ createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, applyClientMaxMemoryUsage),
/* Other configs */
createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */
diff --git a/src/debug.c b/src/debug.c
index b642e4eac..43de7d2db 100644
--- a/src/debug.c
+++ b/src/debug.c
@@ -947,6 +947,10 @@ NULL
else
addReply(c, shared.ok);
} else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) {
+ if (!server.client_mem_usage_buckets) {
+ addReplyError(c,"maxmemory-clients is disabled.");
+ return;
+ }
sds bucket_info = sdsempty();
for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
if (j == 0)
diff --git a/src/networking.c b/src/networking.c
index 76566ddd7..8e7c5ace1 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -1970,7 +1970,7 @@ int writeToClient(client *c, int handler_installed) {
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
* handleClientsWithPendingWritesUsingThreads(). */
if (io_threads_op == IO_THREADS_OP_IDLE)
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
return C_OK;
}
@@ -2420,7 +2420,7 @@ int processCommandAndResetClient(client *c) {
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
}
if (server.current_client == NULL) deadclient = 1;
@@ -2557,7 +2557,7 @@ int processInputBuffer(client *c) {
* important in case the query buffer is big and wasn't drained during
* the above loop (because of partially sent big commands). */
if (io_threads_op == IO_THREADS_OP_IDLE)
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
return C_OK;
}
@@ -2995,9 +2995,11 @@ NULL
/* CLIENT NO-EVICT ON|OFF */
if (!strcasecmp(c->argv[2]->ptr,"on")) {
c->flags |= CLIENT_NO_EVICT;
+ removeClientFromMemUsageBucket(c, 0);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[2]->ptr,"off")) {
c->flags &= ~CLIENT_NO_EVICT;
+ updateClientMemUsageAndBucket(c);
addReply(c,shared.ok);
} else {
addReplyErrorObject(c,shared.syntaxerr);
@@ -4228,7 +4230,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
client *c = listNodeValue(ln);
/* Update the client in the mem usage after we're done processing it in the io-threads */
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
/* Install the write handler if there are pending writes in some
* of the clients. */
@@ -4337,7 +4339,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
}
/* Once io-threads are idle we can update the client in the mem usage */
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, we avoid
@@ -4384,6 +4386,8 @@ size_t getClientEvictionLimit(void) {
}
void evictClients(void) {
+ if (!server.client_mem_usage_buckets)
+ return;
/* Start eviction from topmost bucket (largest clients) */
int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
listIter bucket_iter;
diff --git a/src/object.c b/src/object.c
index 7d71f59c4..f2ea7dafe 100644
--- a/src/object.c
+++ b/src/object.c
@@ -1208,7 +1208,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
/* Computing the memory used by the clients would be O(N) if done
* here online. We use our values computed incrementally by
- * updateClientMemUsage(). */
+ * updateClientMemoryUsage(). */
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
diff --git a/src/pubsub.c b/src/pubsub.c
index 0b909bddd..2e2522c57 100644
--- a/src/pubsub.c
+++ b/src/pubsub.c
@@ -465,7 +465,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
receivers++;
}
}
@@ -491,7 +491,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
receivers++;
}
}
diff --git a/src/replication.c b/src/replication.c
index a57cd8528..797f6518d 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -593,7 +593,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
while((ln = listNext(&li))) {
client *monitor = ln->value;
addReply(monitor,cmdobj);
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
}
decrRefCount(cmdobj);
}
diff --git a/src/server.c b/src/server.c
index 7bc320795..b4ea6f9e3 100644
--- a/src/server.c
+++ b/src/server.c
@@ -839,37 +839,44 @@ static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
return &server.client_mem_usage_buckets[bucket_idx];
}
-/* This is called both on explicit clients when something changed their buffers,
- * so we can track clients' memory and enforce clients' maxmemory in real time,
- * and also from the clientsCron. We call it from the cron so we have updated
- * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration
- * change requires us to evict a non-active client.
+/*
+ * This method updates the client memory usage and update the
+ * server stats for client type.
*
- * This also adds the client to the correct memory usage bucket. Each bucket contains
- * all clients with roughly the same amount of memory. This way we group
- * together clients consuming about the same amount of memory and can quickly
- * free them in case we reach maxmemory-clients (client eviction).
+ * This method is called from the clientsCron to have updated
+ * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients to accurately
+ * provide information around clients memory usage.
+ *
+ * It is also used in updateClientMemUsageAndBucket to have latest
+ * client memory usage information to place it into appropriate client memory
+ * usage bucket.
*/
-int updateClientMemUsage(client *c) {
- serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+void updateClientMemoryUsage(client *c) {
size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
+ /* Now that we have the memory used by the client, remove the old
+ * value from the old category, and add it back. */
+ server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
+ server.stat_clients_type_memory[type] += mem;
+ /* Remember what we added and where, to remove it next time. */
+ c->last_memory_type = type;
+ c->last_memory_usage = mem;
+}
- /* Remove the old value of the memory used by the client from the old
- * category, and add it back. */
- if (type != c->last_memory_type) {
- server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
- server.stat_clients_type_memory[type] += mem;
- c->last_memory_type = type;
- } else {
- server.stat_clients_type_memory[type] += mem - c->last_memory_usage;
+int clientEvictionAllowed(client *c) {
+ if (server.maxmemory_clients == 0 || c->flags & CLIENT_NO_EVICT) {
+ return 0;
}
+ int type = getClientType(c);
+ return (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB);
+}
- int allow_eviction =
- (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) &&
- !(c->flags & CLIENT_NO_EVICT);
- /* Update the client in the mem usage buckets */
+/* This function is used to cleanup the client's previously tracked memory usage.
+ * This is called during incremental client memory usage tracking as well as
+ * used to reset when client to bucket allocation is not required when
+ * client eviction is disabled. */
+void removeClientFromMemUsageBucket(client *c, int allow_eviction) {
if (c->mem_usage_bucket) {
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
/* If this client can't be evicted then remove it from the mem usage
@@ -880,23 +887,42 @@ int updateClientMemUsage(client *c) {
c->mem_usage_bucket_node = NULL;
}
}
- if (allow_eviction) {
- clientMemUsageBucket *bucket = getMemUsageBucket(mem);
- bucket->mem_usage_sum += mem;
- if (bucket != c->mem_usage_bucket) {
- if (c->mem_usage_bucket)
- listDelNode(c->mem_usage_bucket->clients,
- c->mem_usage_bucket_node);
- c->mem_usage_bucket = bucket;
- listAddNodeTail(bucket->clients, c);
- c->mem_usage_bucket_node = listLast(bucket->clients);
- }
+}
+
+/* This is called only if explicit clients when something changed their buffers,
+ * so we can track clients' memory and enforce clients' maxmemory in real time.
+ *
+ * This also adds the client to the correct memory usage bucket. Each bucket contains
+ * all clients with roughly the same amount of memory. This way we group
+ * together clients consuming about the same amount of memory and can quickly
+ * free them in case we reach maxmemory-clients (client eviction).
+ *
+ * returns 1 if client eviction for this client is allowed, 0 otherwise.
+ */
+int updateClientMemUsageAndBucket(client *c) {
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ int allow_eviction = clientEvictionAllowed(c);
+ removeClientFromMemUsageBucket(c, allow_eviction);
+
+ if (!allow_eviction) {
+ return 0;
}
- /* Remember what we added, to remove it next time. */
- c->last_memory_usage = mem;
+ /* Update client memory usage. */
+ updateClientMemoryUsage(c);
- return 0;
+ /* Update the client in the mem usage buckets */
+ clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage);
+ bucket->mem_usage_sum += c->last_memory_usage;
+ if (bucket != c->mem_usage_bucket) {
+ if (c->mem_usage_bucket)
+ listDelNode(c->mem_usage_bucket->clients,
+ c->mem_usage_bucket_node);
+ c->mem_usage_bucket = bucket;
+ listAddNodeTail(bucket->clients, c);
+ c->mem_usage_bucket_node = listLast(bucket->clients);
+ }
+ return 1;
}
/* Return the max samples in the memory usage of clients tracked by
@@ -984,8 +1010,11 @@ void clientsCron(void) {
* in turn would make the INFO command too slow. So we perform this
* computation incrementally and track the (not instantaneous but updated
* to the second) total memory used by clients using clientsCron() in
- * a more incremental way (depending on server.hz). */
- if (updateClientMemUsage(c)) continue;
+ * a more incremental way (depending on server.hz).
+ * If client eviction is enabled, update the bucket as well. */
+ if (!updateClientMemUsageAndBucket(c))
+ updateClientMemoryUsage(c);
+
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
}
}
@@ -1865,6 +1894,25 @@ void createSharedObjects(void) {
shared.maxstring = sdsnew("maxstring");
}
+void initServerClientMemUsageBuckets() {
+ if (server.client_mem_usage_buckets)
+ return;
+ server.client_mem_usage_buckets = zmalloc(sizeof(clientMemUsageBucket)*CLIENT_MEM_USAGE_BUCKETS);
+ for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
+ server.client_mem_usage_buckets[j].mem_usage_sum = 0;
+ server.client_mem_usage_buckets[j].clients = listCreate();
+ }
+}
+
+void freeServerClientMemUsageBuckets() {
+ if (!server.client_mem_usage_buckets)
+ return;
+ for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++)
+ listRelease(server.client_mem_usage_buckets[j].clients);
+ zfree(server.client_mem_usage_buckets);
+ server.client_mem_usage_buckets = NULL;
+}
+
void initServerConfig(void) {
int j;
char *default_bindaddr[CONFIG_DEFAULT_BINDADDR_COUNT] = CONFIG_DEFAULT_BINDADDR;
@@ -2461,6 +2509,7 @@ void initServer(void) {
server.cluster_drop_packet_filter = -1;
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
server.reply_buffer_resizing_enabled = 1;
+ server.client_mem_usage_buckets = NULL;
resetReplicationBuffer();
/* Make sure the locale is set on startup based on the config file. */
@@ -2469,11 +2518,6 @@ void initServer(void) {
exit(1);
}
- for (j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) {
- server.client_mem_usage_buckets[j].mem_usage_sum = 0;
- server.client_mem_usage_buckets[j].clients = listCreate();
- }
-
createSharedObjects();
adjustOpenFilesLimit();
const char *clk_msg = monotonicInit();
@@ -2606,6 +2650,9 @@ void initServer(void) {
ACLUpdateDefaultUserPassword(server.requirepass);
applyWatchdogPeriod();
+
+ if (server.maxmemory_clients != 0)
+ initServerClientMemUsageBuckets();
}
void initListeners() {
diff --git a/src/server.h b/src/server.h
index 1ce5ff781..7e383c59b 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1197,7 +1197,7 @@ typedef struct client {
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
- /* In updateClientMemUsage() we track the memory usage of
+ /* In updateClientMemoryUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which category the client was, in order to remove it
@@ -1551,7 +1551,7 @@ struct redisServer {
client *current_client; /* Current client executing the command. */
/* Stuff for client mem eviction */
- clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS];
+ clientMemUsageBucket* client_mem_usage_buckets;
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
int in_nested_call; /* If > 0, in a nested call of a call */
@@ -2577,8 +2577,8 @@ int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c);
int islocalClient(client *c);
-int updateClientMemUsage(client *c);
-void updateClientMemUsageBucket(client *c);
+int updateClientMemUsageAndBucket(client *c);
+void removeClientFromMemUsageBucket(client *c, int allow_eviction);
void unlinkClient(client *c);
int writeToClient(client *c, int handler_installed);
void linkClient(client *c);
@@ -3117,6 +3117,8 @@ void initConfigValues();
void removeConfig(sds name);
sds getConfigDebugInfo();
int allowProtectedAction(int config, client *c);
+void initServerClientMemUsageBuckets();
+void freeServerClientMemUsageBuckets();
/* Module Configuration */
typedef struct ModuleConfig ModuleConfig;
diff --git a/src/tracking.c b/src/tracking.c
index 1b04c0626..d62eb736b 100644
--- a/src/tracking.c
+++ b/src/tracking.c
@@ -311,7 +311,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyArrayLen(c,1);
addReplyBulkCBuffer(c,keyname,keylen);
}
- updateClientMemUsage(c);
+ updateClientMemUsageAndBucket(c);
}
/* This function is called when a key is modified in Redis and in the case
diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl
index cd6ee1a6e..ca6d9020f 100644
--- a/tests/unit/client-eviction.tcl
+++ b/tests/unit/client-eviction.tcl
@@ -140,7 +140,7 @@ start_server {} {
set temp_maxmemory_clients 200000
r config set maxmemory-clients $temp_maxmemory_clients
- # Append watched keys until list maxes out maxmemroy clients and causes client eviction
+ # Append watched keys until list maxes out maxmemory clients and causes client eviction
catch {
for {set j 0} {$j < $temp_maxmemory_clients} {incr j} {
$rr watch $j
@@ -467,7 +467,7 @@ start_server {} {
start_server {} {
test "evict clients in right order (large to small)" {
# Note that each size step needs to be at least x2 larger than previous step
- # because of how the client-eviction size bucktting works
+ # because of how the client-eviction size bucketing works
set sizes [list [kb 128] [mb 1] [mb 3]]
set clients_per_size 3
r client setname control
@@ -531,5 +531,52 @@ start_server {} {
} {} {needs:debug}
}
+start_server {} {
+ foreach type {"client no-evict" "maxmemory-clients disabled"} {
+ r flushall
+ r client no-evict on
+ r config set maxmemory-clients 0
+
+ test "client total memory grows during $type" {
+ r setrange k [mb 1] v
+ set rr [redis_client]
+ $rr client setname test_client
+ if {$type eq "client no-evict"} {
+ $rr client no-evict on
+ r config set maxmemory-clients 1
+ }
+ $rr deferred 1
+
+ # Fill output buffer in loop without reading it and make sure
+ # the tot-mem of client has increased (OS buffers didn't swallow it)
+ # and eviction not occurring.
+ while {true} {
+ $rr get k
+ $rr flush
+ after 10
+ if {[client_field test_client tot-mem] > [mb 10]} {
+ break
+ }
+ }
+
+ # Trigger the client eviction, by flipping the no-evict flag to off
+ if {$type eq "client no-evict"} {
+ $rr client no-evict off
+ } else {
+ r config set maxmemory-clients 1
+ }
+
+ # wait for the client to be disconnected
+ wait_for_condition 5000 50 {
+ ![client_exists test_client]
+ } else {
+ puts [r client list]
+ fail "client was not disconnected"
+ }
+ $rr close
+ }
+ }
}
+} ;# tags
+