diff options
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 236 |
1 files changed, 200 insertions, 36 deletions
diff --git a/src/networking.c b/src/networking.c index 3b522fb35..21278d783 100644 --- a/src/networking.c +++ b/src/networking.c @@ -180,15 +180,18 @@ client *createClient(connection *conn) { c->sockname = NULL; c->client_list_node = NULL; c->paused_list_node = NULL; + c->pending_read_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; - c->client_cron_last_memory_usage = 0; - c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL; + c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0; + c->last_memory_type = CLIENT_TYPE_NORMAL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); + c->mem_usage_bucket = NULL; + c->mem_usage_bucket_node = NULL; if (conn) linkClient(c); initClientMultiState(c); return c; @@ -267,7 +270,7 @@ int prepareClientToWrite(client *c) { * not install a write handler. Instead, it will be done by * handleClientsWithPendingReadsUsingThreads() upon return. */ - if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) + if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) clientInstallWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1288,13 +1291,13 @@ void unlinkClient(client *c) { } /* Remove from the list of pending reads if needed. */ - if (c->flags & CLIENT_PENDING_READ) { - ln = listSearchKey(server.clients_pending_read,c); - serverAssert(ln != NULL); - listDelNode(server.clients_pending_read,ln); - c->flags &= ~CLIENT_PENDING_READ; + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + if (c->pending_read_list_node != NULL) { + listDelNode(server.clients_pending_read,c->pending_read_list_node); + c->pending_read_list_node = NULL; } + /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { @@ -1430,10 +1433,15 @@ void freeClient(client *c) { * we lost the connection with the master. */ if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); - /* Remove the contribution that this client gave to our + /* Remove the contribution that this client gave to our * incrementally computed memory usage. */ - server.stat_clients_type_memory[c->client_cron_last_memory_type] -= - c->client_cron_last_memory_usage; + server.stat_clients_type_memory[c->last_memory_type] -= + c->last_memory_usage; + /* Remove client from memory usage buckets */ + if (c->mem_usage_bucket) { + c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; + listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node); + } /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ @@ -1470,6 +1478,27 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } +/* Perform processing of the client before moving on to processing the next client + * this is useful for performing operations that affect the global state but can't + * wait until we're done with all clients. In other words can't wait until beforeSleep() + * return C_ERR in case client is no longer valid after call. */ +int beforeNextClient(client *c) { + /* Skip the client processing if we're in an IO thread, in that case we'll perform + this operation later (this function is called again) in the fan-in stage of the threading mechanism */ + if (io_threads_op != IO_THREADS_OP_IDLE) + return C_OK; + /* Handle async frees */ + /* Note: this doesn't make the server.clients_to_close list redundant because of + * cases where we want an async free of a client other than myself. For example + * in ACL modifications we disconnect clients authenticated to non-existent + * users (see ACL LOAD). */ + if (c->flags & CLIENT_CLOSE_ASAP) { + freeClient(c); + return C_ERR; + } + return C_OK; +} + /* Free the clients marked as CLOSE_ASAP, return the number of clients * freed. */ int freeClientsInAsyncFreeQueue(void) { @@ -1594,7 +1623,10 @@ int writeToClient(client *c, int handler_installed) { * adDeleteFileEvent() is not thread safe: however writeToClient() * is always called with handler_installed set to 0 from threads * so we are fine. */ - if (handler_installed) connSetWriteHandler(c->conn, NULL); + if (handler_installed) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + connSetWriteHandler(c->conn, NULL); + } /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { @@ -1602,6 +1634,7 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } } + updateClientMemUsage(c); return C_OK; } @@ -2036,7 +2069,11 @@ int processCommandAndResetClient(client *c) { server.current_client = c; if (processCommand(c) == C_OK) { commandProcessed(c); + /* Update the client's memory to include output buffer growth following the + * processed command. */ + updateClientMemUsage(c); } + if (server.current_client == NULL) deadclient = 1; /* * Restore the old client, this is needed because when a script @@ -2117,7 +2154,8 @@ void processInputBuffer(client *c) { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ - if (c->flags & CLIENT_PENDING_READ) { + if (io_threads_op != IO_THREADS_OP_IDLE) { + serverAssert(io_threads_op == IO_THREADS_OP_READ); c->flags |= CLIENT_PENDING_COMMAND; break; } @@ -2137,6 +2175,11 @@ void processInputBuffer(client *c) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } + + /* Update client memory usage after processing the query buffer, this is + * important in case the query buffer is big and wasn't drained during + * the above loop (because of partially sent big commands). */ + updateClientMemUsage(c); } void readQueryFromClient(connection *conn) { @@ -2190,7 +2233,7 @@ void readQueryFromClient(connection *conn) { } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); - return; + goto done; } } else if (nread == 0) { if (server.verbosity <= LL_VERBOSE) { @@ -2199,7 +2242,7 @@ void readQueryFromClient(connection *conn) { sdsfree(info); } freeClientAsync(c); - return; + goto done; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a @@ -2223,12 +2266,15 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - return; + goto done; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ processInputBuffer(c); + +done: + beforeNextClient(c); } /* A Redis "Address String" is a colon separated ip:port pair. @@ -2306,6 +2352,7 @@ sds catClientInfoString(sds s, client *client) { if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; if (client->flags & CLIENT_READONLY) *p++ = 'r'; + if (client->flags & CLIENT_NO_EVICT) *p++ = 'e'; if (p == flags) *p++ = 'N'; *p++ = '\0'; @@ -2317,19 +2364,10 @@ sds catClientInfoString(sds s, client *client) { *p = '\0'; /* Compute the total memory consumed by this client. */ - size_t obufmem = getClientOutputBufferMemoryUsage(client); - size_t total_mem = obufmem; - total_mem += zmalloc_size(client); /* includes client->buf */ - total_mem += sdsZmallocSize(client->querybuf); - /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory - * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to - * spot problematic clients. */ - total_mem += client->argv_len_sum; - if (client->argv) - total_mem += zmalloc_size(client->argv); + size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); return sdscatfmt(s, - "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", + "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", (unsigned long long) client->id, getClientPeerId(client), getClientSockname(client), @@ -2345,6 +2383,7 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->argv_len_sum, + (unsigned long long) client->mstate.argv_len_sums, (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ @@ -2565,6 +2604,18 @@ NULL addReplyErrorObject(c,shared.syntaxerr); return; } + } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) { + /* CLIENT PROTECT ON|OFF */ + if (!strcasecmp(c->argv[2]->ptr,"on")) { + c->flags |= CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr,"off")) { + c->flags &= ~CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { /* CLIENT KILL <ip:port> * CLIENT KILL <option> [value] ... <option> [value] */ @@ -3154,11 +3205,39 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * Note: this function is very fast so can be called as many time as * the caller wishes. The main usage of this function currently is * enforcing the client output length limits. */ -unsigned long getClientOutputBufferMemoryUsage(client *c) { - unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); +size_t getClientOutputBufferMemoryUsage(client *c) { + size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); return c->reply_bytes + (list_item_size*listLength(c->reply)); } +/* Returns the total client's memory usage. + * Optionally, if output_buffer_mem_usage is not NULL, it fills it with + * the client output buffer memory usage portion of the total. */ +size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { + size_t mem = getClientOutputBufferMemoryUsage(c); + if (output_buffer_mem_usage != NULL) + *output_buffer_mem_usage = mem; + mem += sdsZmallocSize(c->querybuf); + mem += zmalloc_size(c); + /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory + * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to + * spot problematic clients. */ + mem += c->argv_len_sum + sizeof(robj*)*c->argc; + mem += multiStateMemOverhead(c); + + /* Add memory overhead of pubsub channels and patterns. Note: this is just the overhead of the robj pointers + * to the strings themselves because they aren't stored per client. */ + mem += listLength(c->pubsub_patterns) * sizeof(listNode); + mem += dictSize(c->pubsub_channels) * sizeof(dictEntry) + + dictSlots(c->pubsub_channels) * sizeof(dictEntry*); + + /* Add memory overhead of the tracking prefixes, this is an underestimation so we don't need to traverse the entire rax */ + if (c->client_tracking_prefixes) + mem += c->client_tracking_prefixes->numnodes * (sizeof(raxNode) * sizeof(raxNode*)); + + return mem; +} + /* Get the class of a client, used in order to enforce limits to different * classes of clients. * @@ -3425,13 +3504,11 @@ void processEventsWhileBlocked(void) { * ========================================================================== */ #define IO_THREADS_MAX_NUM 128 -#define IO_THREADS_OP_READ 0 -#define IO_THREADS_OP_WRITE 1 pthread_t io_threads[IO_THREADS_MAX_NUM]; pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; redisAtomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; -int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ +int io_threads_op; /* IO_THREADS_OP_IDLE, IO_THREADS_OP_READ or IO_THREADS_OP_WRITE. */ // TODO: should access to this be atomic??! /* This is the list of clients each thread will serve when threaded I/O is * used. We spawn io_threads_num-1 threads, since one is the main thread @@ -3498,6 +3575,9 @@ void *IOThreadMain(void *myid) { void initThreadedIO(void) { server.io_threads_active = 0; /* We start with threads not active. */ + /* Indicate that io-threads are currently idle */ + io_threads_op = IO_THREADS_OP_IDLE; + /* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; @@ -3584,6 +3664,12 @@ int stopThreadedIOIfNeeded(void) { } } +/* This function achieves thread safety using a fan-out -> fan-in paradigm: + * Fan out: The main thread fans out work to the io-threads which block until + * setIOPendingCount() is called with a value larger than 0 by the main thread. + * Fan in: The main thread waits until getIOPendingCount() returns 0. Then + * it can safely perform post-processing and return to normal synchronous + * work. */ int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ @@ -3642,12 +3728,17 @@ int handleClientsWithPendingWritesUsingThreads(void) { if (pending == 0) break; } + io_threads_op = IO_THREADS_OP_IDLE; + /* Run the list of clients again to install the write handler where * needed. */ listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); + /* Update the client in the mem usage buckets after we're done processing it in the io-threads */ + updateClientMemUsageBucket(c); + /* Install the write handler if there are pending writes in some * of the clients. */ if (clientHasPendingReplies(c) && @@ -3672,10 +3763,11 @@ int postponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && - !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED))) + !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) && + io_threads_op == IO_THREADS_OP_IDLE) { - c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); + c->pending_read_list_node = listFirst(server.clients_pending_read); return 1; } else { return 0; @@ -3687,7 +3779,13 @@ int postponeClientRead(client *c) { * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available - * rendering it in the client structures. */ + * rendering it in the client structures. + * This function achieves thread safety using a fan-out -> fan-in paradigm: + * Fan out: The main thread fans out work to the io-threads which block until + * setIOPendingCount() is called with a value larger than 0 by the main thread. + * Fan in: The main thread waits until getIOPendingCount() returns 0. Then + * it can safely perform post-processing and return to normal synchronous + * work. */ int handleClientsWithPendingReadsUsingThreads(void) { if (!server.io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); @@ -3729,14 +3827,27 @@ int handleClientsWithPendingReadsUsingThreads(void) { if (pending == 0) break; } + io_threads_op = IO_THREADS_OP_IDLE; + /* Run the list of clients again to process the new buffers. */ while(listLength(server.clients_pending_read)) { ln = listFirst(server.clients_pending_read); client *c = listNodeValue(ln); - c->flags &= ~CLIENT_PENDING_READ; listDelNode(server.clients_pending_read,ln); + c->pending_read_list_node = NULL; serverAssert(!(c->flags & CLIENT_BLOCKED)); + + if (beforeNextClient(c) == C_ERR) { + /* If the client is no longer valid, we avoid + * processing the client later. So we just go + * to the next. */ + continue; + } + + /* Once io-threads are idle we can update the client in the mem usage buckets */ + updateClientMemUsageBucket(c); + if (processPendingCommandsAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid * processing the client later. So we just go @@ -3758,3 +3869,56 @@ int handleClientsWithPendingReadsUsingThreads(void) { return processed; } + +/* Returns the actual client eviction limit based on current configuration or + * 0 if no limit. */ +size_t getClientEvictionLimit(void) { + size_t maxmemory_clients_actual = SIZE_MAX; + + /* Handle percentage of maxmemory*/ + if (server.maxmemory_clients < 0 && server.maxmemory > 0) { + unsigned long long maxmemory_clients_bytes = (unsigned long long)((double)server.maxmemory * -(double) server.maxmemory_clients / 100); + if (maxmemory_clients_bytes <= SIZE_MAX) + maxmemory_clients_actual = maxmemory_clients_bytes; + } + else if (server.maxmemory_clients > 0) + maxmemory_clients_actual = server.maxmemory_clients; + else + return 0; + + /* Don't allow a too small maxmemory-clients to avoid cases where we can't communicate + * at all with the server because of bad configuration */ + if (maxmemory_clients_actual < 1024*128) + maxmemory_clients_actual = 1024*128; + + return maxmemory_clients_actual; +} + +void evictClients(void) { + /* Start eviction from topmost bucket (largest clients) */ + int curr_bucket = CLIENT_MEM_USAGE_BUCKETS-1; + listIter bucket_iter; + listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter); + size_t client_eviction_limit = getClientEvictionLimit(); + if (client_eviction_limit == 0) + return; + while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + + server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >= client_eviction_limit) { + listNode *ln = listNext(&bucket_iter); + if (ln) { + client *c = ln->value; + sds ci = catClientInfoString(sdsempty(),c); + serverLog(LL_NOTICE, "Evicting client: %s", ci); + freeClient(c); + sdsfree(ci); + server.stat_evictedclients++; + } else { + curr_bucket--; + if (curr_bucket < 0) { + serverLog(LL_WARNING, "Over client maxmemory after evicting all evictable clients"); + break; + } + listRewind(server.client_mem_usage_buckets[curr_bucket].clients, &bucket_iter); + } + } +} |