diff options
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 153 |
1 files changed, 80 insertions, 73 deletions
diff --git a/src/server.c b/src/server.c index 00c279837..84f21fed3 100644 --- a/src/server.c +++ b/src/server.c @@ -526,6 +526,30 @@ dictType stringSetDictType = { NULL /* allow to expand */ }; +/* Dict for for case-insensitive search using null terminated C strings. + * The key and value do not have a destructor. */ +dictType externalStringType = { + distCStrCaseHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + distCStrKeyCaseCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +/* Dict for case-insensitive search using sds objects with a zmalloc + * allocated object as the value. */ +dictType sdsHashDictType = { + dictSdsCaseHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + dictVanillaFree, /* val destructor */ + NULL /* allow to expand */ +}; + int htNeedsResize(dict *dict) { long long size, used; @@ -686,23 +710,6 @@ int clientsCronResizeQueryBuffer(client *c) { * which ever is bigger. */ if (c->bulklen != -1 && (size_t)c->bulklen > c->querybuf_peak) c->querybuf_peak = c->bulklen; - - /* Clients representing masters also use a "pending query buffer" that - * is the yet not applied part of the stream we are reading. Such buffer - * also needs resizing from time to time, otherwise after a very large - * transfer (a huge value or a big MIGRATE operation) it will keep using - * a lot of memory. */ - if (c->flags & CLIENT_MASTER) { - /* There are two conditions to resize the pending query buffer: - * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF. - * 2) Used length is smaller than pending_querybuf_size/2 */ - size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf); - if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF && - sdslen(c->pending_querybuf) < (pending_querybuf_size/2)) - { - c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf); - } - } return 0; } @@ -720,6 +727,10 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) { const size_t buffer_target_shrink_size = c->buf_usable_size/2; const size_t buffer_target_expand_size = c->buf_usable_size*2; + /* in case the resizing is disabled return immediately */ + if(!server.reply_buffer_resizing_enabled) + return 0; + if (buffer_target_shrink_size >= PROTO_REPLY_MIN_BYTES && c->buf_peak < buffer_target_shrink_size ) { @@ -786,7 +797,7 @@ int clientsCronTrackExpansiveClients(client *c, int time_idx) { * client's memory usage doubles it's moved up to the next bucket, if it's * halved we move it down a bucket. * For more details see CLIENT_MEM_USAGE_BUCKETS documentation in server.h. */ -clientMemUsageBucket *getMemUsageBucket(size_t mem) { +static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) { int size_in_bits = 8*(int)sizeof(mem); int clz = mem > 0 ? __builtin_clzl(mem) : size_in_bits; int bucket_idx = size_in_bits - clz; @@ -803,46 +814,34 @@ clientMemUsageBucket *getMemUsageBucket(size_t mem) { * and also from the clientsCron. We call it from the cron so we have updated * stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration * change requires us to evict a non-active client. + * + * This also adds the client to the correct memory usage bucket. Each bucket contains + * all clients with roughly the same amount of memory. This way we group + * together clients consuming about the same amount of memory and can quickly + * free them in case we reach maxmemory-clients (client eviction). */ int updateClientMemUsage(client *c) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); size_t mem = getClientMemoryUsage(c, NULL); int type = getClientType(c); /* Remove the old value of the memory used by the client from the old * category, and add it back. */ - atomicDecr(server.stat_clients_type_memory[c->last_memory_type], c->last_memory_usage); - atomicIncr(server.stat_clients_type_memory[type], mem); - - /* Remember what we added and where, to remove it next time. */ - c->last_memory_usage = mem; - c->last_memory_type = type; - - /* Update client mem usage bucket only when we're not in the context of an - * IO thread. See updateClientMemUsageBucket() for details. */ - if (io_threads_op == IO_THREADS_OP_IDLE) - updateClientMemUsageBucket(c); - - return 0; -} + if (type != c->last_memory_type) { + server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage; + server.stat_clients_type_memory[type] += mem; + c->last_memory_type = type; + } else { + server.stat_clients_type_memory[type] += mem - c->last_memory_usage; + } -/* Adds the client to the correct memory usage bucket. Each bucket contains - * all clients with roughly the same amount of memory. This way we group - * together clients consuming about the same amount of memory and can quickly - * free them in case we reach maxmemory-clients (client eviction). - * Note that in case of io-threads enabled we have to call this function only - * after the fan-in phase (when no io-threads are working) because the bucket - * lists are global. The io-threads themselves track per-client memory usage in - * updateClientMemUsage(). Here we update the clients to each bucket when all - * io-threads are done (both for read and write io-threading). */ -void updateClientMemUsageBucket(client *c) { - serverAssert(io_threads_op == IO_THREADS_OP_IDLE); int allow_eviction = - (c->last_memory_type == CLIENT_TYPE_NORMAL || c->last_memory_type == CLIENT_TYPE_PUBSUB) && + (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) && !(c->flags & CLIENT_NO_EVICT); /* Update the client in the mem usage buckets */ if (c->mem_usage_bucket) { - c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage_on_bucket_update; + c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; /* If this client can't be evicted then remove it from the mem usage * buckets */ if (!allow_eviction) { @@ -852,8 +851,8 @@ void updateClientMemUsageBucket(client *c) { } } if (allow_eviction) { - clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage); - bucket->mem_usage_sum += c->last_memory_usage; + clientMemUsageBucket *bucket = getMemUsageBucket(mem); + bucket->mem_usage_sum += mem; if (bucket != c->mem_usage_bucket) { if (c->mem_usage_bucket) listDelNode(c->mem_usage_bucket->clients, @@ -864,7 +863,10 @@ void updateClientMemUsageBucket(client *c) { } } - c->last_memory_usage_on_bucket_update = c->last_memory_usage; + /* Remember what we added, to remove it next time. */ + c->last_memory_usage = mem; + + return 0; } /* Return the max samples in the memory usage of clients tracked by @@ -1410,7 +1412,7 @@ void blockingOperationEnds() { } } -/* This function fill in the role of serverCron during RDB or AOF loading, and +/* This function fills in the role of serverCron during RDB or AOF loading, and * also during blocked scripts. * It attempts to do its duties at a similar rate as the configured server.hz, * and updates cronloops variable so that similarly to serverCron, the @@ -2405,6 +2407,7 @@ void initServer(void) { server.thp_enabled = 0; server.cluster_drop_packet_filter = -1; server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME; + server.reply_buffer_resizing_enabled = 1; resetReplicationBuffer(); if ((server.tls_port || server.tls_replication || server.tls_cluster) @@ -3406,6 +3409,16 @@ void rejectCommand(client *c, robj *reply) { } } +void rejectCommandSds(client *c, sds s) { + if (c->cmd && c->cmd->proc == execCommand) { + execCommandAbort(c, s); + sdsfree(s); + } else { + /* The following frees 's'. */ + addReplyErrorSds(c, s); + } +} + void rejectCommandFormat(client *c, const char *fmt, ...) { if (c->cmd) c->cmd->rejected_calls++; flagTransaction(c); @@ -3416,13 +3429,7 @@ void rejectCommandFormat(client *c, const char *fmt, ...) { /* Make sure there are no newlines in the string, otherwise invalid protocol * is emitted (The args come from the user, they may contain any character). */ sdsmapchars(s, "\r\n", " ", 2); - if (c->cmd && c->cmd->proc == execCommand) { - execCommandAbort(c, s); - sdsfree(s); - } else { - /* The following frees 's'. */ - addReplyErrorSds(c, s); - } + rejectCommandSds(c, s); } /* This is called after a command in call, we can do some maintenance job in it. */ @@ -3705,23 +3712,14 @@ int processCommand(client *c) { server.masterhost == NULL && (is_write_command ||c->cmd->proc == pingCommand)) { - if (deny_write_type == DISK_ERROR_TYPE_RDB) - rejectCommand(c, shared.bgsaveerr); - else - rejectCommandFormat(c, - "-MISCONF Errors writing to the AOF file: %s", - strerror(server.aof_last_write_errno)); + sds err = writeCommandsGetDiskErrorMessage(deny_write_type); + rejectCommandSds(c, err); return C_OK; } /* Don't accept write commands if there are not enough good slaves and * user configured the min-slaves-to-write option. */ - if (server.masterhost == NULL && - server.repl_min_slaves_to_write && - server.repl_min_slaves_max_lag && - is_write_command && - server.repl_good_slaves_count < server.repl_min_slaves_to_write) - { + if (is_write_command && !checkGoodReplicasStatus()) { rejectCommand(c, shared.noreplicaserr); return C_OK; } @@ -4146,6 +4144,18 @@ int writeCommandsDeniedByDiskError(void) { return DISK_ERROR_TYPE_NONE; } +sds writeCommandsGetDiskErrorMessage(int error_code) { + sds ret = NULL; + if (error_code == DISK_ERROR_TYPE_RDB) { + ret = sdsdup(shared.bgsaveerr->ptr); + } else { + ret = sdscatfmt(sdsempty(), + "-MISCONF Errors writing to the AOF file: %s", + strerror(server.aof_last_write_errno)); + } + return ret; +} + /* The PING command. It works in a different way if the client is in * in Pub/Sub mode. */ void pingCommand(client *c) { @@ -6388,9 +6398,9 @@ void dismissMemory(void* ptr, size_t size_hint) { /* Dismiss big chunks of memory inside a client structure, see dismissMemory() */ void dismissClientMemory(client *c) { - /* Dismiss client query buffer. */ + /* Dismiss client query buffer and static reply buffer. */ + dismissMemory(c->buf, c->buf_usable_size); dismissSds(c->querybuf); - dismissSds(c->pending_querybuf); /* Dismiss argv array only if we estimate it contains a big buffer. */ if (c->argc && c->argv_len_sum/c->argc >= server.page_size) { for (int i = 0; i < c->argc; i++) { @@ -6414,9 +6424,6 @@ void dismissClientMemory(client *c) { if (bulk) dismissMemory(bulk, bulk->size); } } - - /* The client struct has a big static reply buffer in it. */ - dismissMemory(c, 0); } /* In the child process, we don't need some buffers anymore, and these are |