summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c236
1 files changed, 200 insertions, 36 deletions
diff --git a/src/networking.c b/src/networking.c
index 3b522fb35..21278d783 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -180,15 +180,18 @@ client *createClient(connection *conn) {
c->sockname = NULL;
c->client_list_node = NULL;
c->paused_list_node = NULL;
+ c->pending_read_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
- c->client_cron_last_memory_usage = 0;
- c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;
+ c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0;
+ c->last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
+ c->mem_usage_bucket = NULL;
+ c->mem_usage_bucket_node = NULL;
if (conn) linkClient(c);
initClientMultiState(c);
return c;
@@ -267,7 +270,7 @@ int prepareClientToWrite(client *c) {
* not install a write handler. Instead, it will be done by
* handleClientsWithPendingReadsUsingThreads() upon return.
*/
- if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
+ if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE)
clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
@@ -1288,13 +1291,13 @@ void unlinkClient(client *c) {
}
/* Remove from the list of pending reads if needed. */
- if (c->flags & CLIENT_PENDING_READ) {
- ln = listSearchKey(server.clients_pending_read,c);
- serverAssert(ln != NULL);
- listDelNode(server.clients_pending_read,ln);
- c->flags &= ~CLIENT_PENDING_READ;
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ if (c->pending_read_list_node != NULL) {
+ listDelNode(server.clients_pending_read,c->pending_read_list_node);
+ c->pending_read_list_node = NULL;
}
+
/* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */
if (c->flags & CLIENT_UNBLOCKED) {
@@ -1430,10 +1433,15 @@ void freeClient(client *c) {
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();
- /* Remove the contribution that this client gave to our
+ /* Remove the contribution that this client gave to our
* incrementally computed memory usage. */
- server.stat_clients_type_memory[c->client_cron_last_memory_type] -=
- c->client_cron_last_memory_usage;
+ server.stat_clients_type_memory[c->last_memory_type] -=
+ c->last_memory_usage;
+ /* Remove client from memory usage buckets */
+ if (c->mem_usage_bucket) {
+ c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
+ listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node);
+ }
/* Release other dynamically allocated client structure fields,
* and finally release the client structure itself. */
@@ -1470,6 +1478,27 @@ void freeClientAsync(client *c) {
pthread_mutex_unlock(&async_free_queue_mutex);
}
+/* Perform processing of the client before moving on to processing the next client
+ * this is useful for performing operations that affect the global state but can't
+ * wait until we're done with all clients. In other words can't wait until beforeSleep()
+ * return C_ERR in case client is no longer valid after call. */
+int beforeNextClient(client *c) {
+ /* Skip the client processing if we're in an IO thread, in that case we'll perform
+ this operation later (this function is called again) in the fan-in stage of the threading mechanism */
+ if (io_threads_op != IO_THREADS_OP_IDLE)
+ return C_OK;
+ /* Handle async frees */
+ /* Note: this doesn't make the server.clients_to_close list redundant because of
+ * cases where we want an async free of a client other than myself. For example
+ * in ACL modifications we disconnect clients authenticated to non-existent
+ * users (see ACL LOAD). */
+ if (c->flags & CLIENT_CLOSE_ASAP) {
+ freeClient(c);
+ return C_ERR;
+ }
+ return C_OK;
+}
+
/* Free the clients marked as CLOSE_ASAP, return the number of clients
* freed. */
int freeClientsInAsyncFreeQueue(void) {
@@ -1594,7 +1623,10 @@ int writeToClient(client *c, int handler_installed) {
* adDeleteFileEvent() is not thread safe: however writeToClient()
* is always called with handler_installed set to 0 from threads
* so we are fine. */
- if (handler_installed) connSetWriteHandler(c->conn, NULL);
+ if (handler_installed) {
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
+ connSetWriteHandler(c->conn, NULL);
+ }
/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
@@ -1602,6 +1634,7 @@ int writeToClient(client *c, int handler_installed) {
return C_ERR;
}
}
+ updateClientMemUsage(c);
return C_OK;
}
@@ -2036,7 +2069,11 @@ int processCommandAndResetClient(client *c) {
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
+ /* Update the client's memory to include output buffer growth following the
+ * processed command. */
+ updateClientMemUsage(c);
}
+
if (server.current_client == NULL) deadclient = 1;
/*
* Restore the old client, this is needed because when a script
@@ -2117,7 +2154,8 @@ void processInputBuffer(client *c) {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
- if (c->flags & CLIENT_PENDING_READ) {
+ if (io_threads_op != IO_THREADS_OP_IDLE) {
+ serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
@@ -2137,6 +2175,11 @@ void processInputBuffer(client *c) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
+
+ /* Update client memory usage after processing the query buffer, this is
+ * important in case the query buffer is big and wasn't drained during
+ * the above loop (because of partially sent big commands). */
+ updateClientMemUsage(c);
}
void readQueryFromClient(connection *conn) {
@@ -2190,7 +2233,7 @@ void readQueryFromClient(connection *conn) {
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
- return;
+ goto done;
}
} else if (nread == 0) {
if (server.verbosity <= LL_VERBOSE) {
@@ -2199,7 +2242,7 @@ void readQueryFromClient(connection *conn) {
sdsfree(info);
}
freeClientAsync(c);
- return;
+ goto done;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
@@ -2223,12 +2266,15 @@ void readQueryFromClient(connection *conn) {
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
- return;
+ goto done;
}
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
+
+done:
+ beforeNextClient(c);
}
/* A Redis "Address String" is a colon separated ip:port pair.
@@ -2306,6 +2352,7 @@ sds catClientInfoString(sds s, client *client) {
if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A';
if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U';
if (client->flags & CLIENT_READONLY) *p++ = 'r';
+ if (client->flags & CLIENT_NO_EVICT) *p++ = 'e';
if (p == flags) *p++ = 'N';
*p++ = '\0';
@@ -2317,19 +2364,10 @@ sds catClientInfoString(sds s, client *client) {
*p = '\0';
/* Compute the total memory consumed by this client. */
- size_t obufmem = getClientOutputBufferMemoryUsage(client);
- size_t total_mem = obufmem;
- total_mem += zmalloc_size(client); /* includes client->buf */
- total_mem += sdsZmallocSize(client->querybuf);
- /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
- * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
- * spot problematic clients. */
- total_mem += client->argv_len_sum;
- if (client->argv)
- total_mem += zmalloc_size(client->argv);
+ size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
return sdscatfmt(s,
- "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
+ "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
(unsigned long long) client->id,
getClientPeerId(client),
getClientSockname(client),
@@ -2345,6 +2383,7 @@ sds catClientInfoString(sds s, client *client) {
(unsigned long long) sdslen(client->querybuf),
(unsigned long long) sdsavail(client->querybuf),
(unsigned long long) client->argv_len_sum,
+ (unsigned long long) client->mstate.argv_len_sums,
(unsigned long long) client->bufpos,
(unsigned long long) listLength(client->reply),
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
@@ -2565,6 +2604,18 @@ NULL
addReplyErrorObject(c,shared.syntaxerr);
return;
}
+ } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) {
+ /* CLIENT PROTECT ON|OFF */
+ if (!strcasecmp(c->argv[2]->ptr,"on")) {
+ c->flags |= CLIENT_NO_EVICT;
+ addReply(c,shared.ok);
+ } else if (!strcasecmp(c->argv[2]->ptr,"off")) {
+ c->flags &= ~CLIENT_NO_EVICT;
+ addReply(c,shared.ok);
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
} else if (!strcasecmp(c->argv[1]->ptr,"kill")) {
/* CLIENT KILL <ip:port>
* CLIENT KILL <option> [value] ... <option> [value] */
@@ -3154,11 +3205,39 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* Note: this function is very fast so can be called as many time as
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
-unsigned long getClientOutputBufferMemoryUsage(client *c) {
- unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
+size_t getClientOutputBufferMemoryUsage(client *c) {
+ size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
+/* Returns the total client's memory usage.
+ * Optionally, if output_buffer_mem_usage is not NULL, it fills it with
+ * the client output buffer memory usage portion of the total. */
+size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
+ size_t mem = getClientOutputBufferMemoryUsage(c);
+ if (output_buffer_mem_usage != NULL)
+ *output_buffer_mem_usage = mem;
+ mem += sdsZmallocSize(c->querybuf);
+ mem += zmalloc_size(c);
+ /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
+ * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
+ * spot problematic clients. */
+ mem += c->argv_len_sum + sizeof(robj*)*c->argc;
+ mem += multiStateMemOverhead(c);
+
+ /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers
+ * to the strings themselves because they aren't stored per client. */
+ mem += listLength(c->pubsub_patterns) * sizeof(listNode);
+ mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) +
+ dictSlots(c->pubsub_channels) * sizeof(dictEntry*);
+
+ /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */
+ if (c->client_tracking_prefixes)
+ mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*));
+
+ return mem;
+}
+
/* Get the class of a client, used in order to enforce limits to different
* classes of clients.
*
@@ -3425,13 +3504,11 @@ void processEventsWhileBlocked(void) {
* ========================================================================== */
#define IO_THREADS_MAX_NUM 128
-#define IO_THREADS_OP_READ 0
-#define IO_THREADS_OP_WRITE 1
pthread_t io_threads[IO_THREADS_MAX_NUM];
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
-int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
+int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??!
/* This is the list of clients each thread will serve when threaded I/O is
* used. We spawn io_threads_num-1 threads, since one is the main thread
@@ -3498,6 +3575,9 @@ void *IOThreadMain(void *myid) {
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
+ /* Indicate that io-threads are currently idle */
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
@@ -3584,6 +3664,12 @@ int stopThreadedIOIfNeeded(void) {
}
}
+/* This function achieves thread safety using a fan-out -> fan-in paradigm:
+ * Fan out: The main thread fans out work to the io-threads which block until
+ * setIOPendingCount() is called with a value larger than 0 by the main thread.
+ * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
+ * it can safely perform post-processing and return to normal synchronous
+ * work. */
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
@@ -3642,12 +3728,17 @@ int handleClientsWithPendingWritesUsingThreads(void) {
if (pending == 0) break;
}
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
+ /* Update the client in the mem usage buckets after we're done processing it in the io-threads */
+ updateClientMemUsageBucket(c);
+
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
@@ -3672,10 +3763,11 @@ int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
- !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
+ !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
+ io_threads_op == IO_THREADS_OP_IDLE)
{
- c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
+ c->pending_read_list_node = listFirst(server.clients_pending_read);
return 1;
} else {
return 0;
@@ -3687,7 +3779,13 @@ int postponeClientRead(client *c) {
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
- * rendering it in the client structures. */
+ * rendering it in the client structures.
+ * This function achieves thread safety using a fan-out -> fan-in paradigm:
+ * Fan out: The main thread fans out work to the io-threads which block until
+ * setIOPendingCount() is called with a value larger than 0 by the main thread.
+ * Fan in: The main thread waits until getIOPendingCount() returns 0. Then
+ * it can safely perform post-processing and return to normal synchronous
+ * work. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
@@ -3729,14 +3827,27 @@ int handleClientsWithPendingReadsUsingThreads(void) {
if (pending == 0) break;
}
+ io_threads_op = IO_THREADS_OP_IDLE;
+
/* Run the list of clients again to process the new buffers. */
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
- c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
+ c->pending_read_list_node = NULL;
serverAssert(!(c->flags & CLIENT_BLOCKED));
+
+ if (beforeNextClient(c) == C_ERR) {
+ /* If the client is no longer valid, we avoid
+ * processing the client later. So we just go
+ * to the next. */
+ continue;
+ }
+
+ /* Once io-threads are idle we can update the client in the mem usage buckets */
+ updateClientMemUsageBucket(c);
+
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
@@ -3758,3 +3869,56 @@ int handleClientsWithPendingReadsUsingThreads(void) {
return processed;
}
+
+/* Returns the actual client eviction limit based on current configuration or
+ * 0 if no limit. */
+size_t getClientEvictionLimit(void) {
+ size_t maxmemory_clients_actual = SIZE_MAX;
+
+ /* Handle percentage of maxmemory*/
+ if (server.maxmemory_clients < 0 && server.maxmemory > 0) {
+ unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100);
+ if (maxmemory_clients_bytes <= SIZE_MAX)
+ maxmemory_clients_actual = maxmemory_clients_bytes;
+ }
+ else if (server.maxmemory_clients > 0)
+ maxmemory_clients_actual = server.maxmemory_clients;
+ else
+ return 0;
+
+ /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate
+ * at all with the server because of bad configuration */
+ if (maxmemory_clients_actual < 1024*128)
+ maxmemory_clients_actual = 1024*128;
+
+ return maxmemory_clients_actual;
+}
+
+void evictClients(void) {
+ /* Start eviction from topmost bucket (largest clients) */
+ int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1;
+ listIter bucket_iter;
+ listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
+ size_t client_eviction_limit = getClientEvictionLimit();
+ if (client_eviction_limit == 0)
+ return;
+ while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) {
+ listNode *ln = listNext(&bucket_iter);
+ if (ln) {
+ client *c = ln->value;
+ sds ci = catClientInfoString(sdsempty(),c);
+ serverLog(LL_NOTICE, "Evicting client: %s", ci);
+ freeClient(c);
+ sdsfree(ci);
+ server.stat_evictedclients++;
+ } else {
+ curr_bucket--;
+ if (curr_bucket < 0) {
+ serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients");
+ break;
+ }
+ listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter);
+ }
+ }
+}