summaryrefslogtreecommitdiff
path: root/src/server.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.c')
-rw-r--r--src/server.c153
1 files changed, 80 insertions, 73 deletions
diff --git a/src/server.c b/src/server.c
index 00c279837..84f21fed3 100644
--- a/src/server.c
+++ b/src/server.c
@@ -526,6 +526,30 @@ dictType stringSetDictType = {
NULL /* allow to expand */
};
+/* Dict for for case-insensitive search using null terminated C strings.
+ * The key and value do not have a destructor. */
+dictType externalStringType = {
+ distCStrCaseHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ distCStrKeyCaseCompare, /* key compare */
+ NULL, /* key destructor */
+ NULL, /* val destructor */
+ NULL /* allow to expand */
+};
+
+/* Dict for case-insensitive search using sds objects with a zmalloc
+ * allocated object as the value. */
+dictType sdsHashDictType = {
+ dictSdsCaseHash, /* hash function */
+ NULL, /* key dup */
+ NULL, /* val dup */
+ dictSdsKeyCaseCompare, /* key compare */
+ dictSdsDestructor, /* key destructor */
+ dictVanillaFree, /* val destructor */
+ NULL /* allow to expand */
+};
+
int htNeedsResize(dict *dict) {
long long size, used;
@@ -686,23 +710,6 @@ int clientsCronResizeQueryBuffer(client *c) {
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen > c->querybuf_peak)
c->querybuf_peak = c->bulklen;
-
- /* Clients representing masters also use a "pending query buffer" that
- * is the yet not applied part of the stream we are reading. Such buffer
- * also needs resizing from time to time, otherwise after a very large
- * transfer (a huge value or a big MIGRATE operation) it will keep using
- * a lot of memory. */
- if (c->flags & CLIENT_MASTER) {
- /* There are two conditions to resize the pending query buffer:
- * 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF.
- * 2) Used length is smaller than pending_querybuf_size/2 */
- size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf);
- if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF &&
- sdslen(c->pending_querybuf) < (pending_querybuf_size/2))
- {
- c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf);
- }
- }
return 0;
}
@@ -720,6 +727,10 @@ int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
const size_t buffer_target_shrink_size = c->buf_usable_size/2;
const size_t buffer_target_expand_size = c->buf_usable_size*2;
+ /* in case the resizing is disabled return immediately */
+ if(!server.reply_buffer_resizing_enabled)
+ return 0;
+
if (buffer_target_shrink_size >= PROTO_REPLY_MIN_BYTES &&
c->buf_peak < buffer_target_shrink_size )
{
@@ -786,7 +797,7 @@ int clientsCronTrackExpansiveClients(client *c, int time_idx) {
* client's memory usage doubles it's moved up to the next bucket, if it's
* halved we move it down a bucket.
* For more details see CLIENT_MEM_USAGE_BUCKETS documentation in server.h. */
-clientMemUsageBucket *getMemUsageBucket(size_t mem) {
+static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
int size_in_bits = 8*(int)sizeof(mem);
int clz = mem > 0 ? __builtin_clzl(mem) : size_in_bits;
int bucket_idx = size_in_bits - clz;
@@ -803,46 +814,34 @@ clientMemUsageBucket *getMemUsageBucket(size_t mem) {
* and also from the clientsCron. We call it from the cron so we have updated
* stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration
* change requires us to evict a non-active client.
+ *
+ * This also adds the client to the correct memory usage bucket. Each bucket contains
+ * all clients with roughly the same amount of memory. This way we group
+ * together clients consuming about the same amount of memory and can quickly
+ * free them in case we reach maxmemory-clients (client eviction).
*/
int updateClientMemUsage(client *c) {
+ serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
/* Remove the old value of the memory used by the client from the old
* category, and add it back. */
- atomicDecr(server.stat_clients_type_memory[c->last_memory_type], c->last_memory_usage);
- atomicIncr(server.stat_clients_type_memory[type], mem);
-
- /* Remember what we added and where, to remove it next time. */
- c->last_memory_usage = mem;
- c->last_memory_type = type;
-
- /* Update client mem usage bucket only when we're not in the context of an
- * IO thread. See updateClientMemUsageBucket() for details. */
- if (io_threads_op == IO_THREADS_OP_IDLE)
- updateClientMemUsageBucket(c);
-
- return 0;
-}
+ if (type != c->last_memory_type) {
+ server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
+ server.stat_clients_type_memory[type] += mem;
+ c->last_memory_type = type;
+ } else {
+ server.stat_clients_type_memory[type] += mem - c->last_memory_usage;
+ }
-/* Adds the client to the correct memory usage bucket. Each bucket contains
- * all clients with roughly the same amount of memory. This way we group
- * together clients consuming about the same amount of memory and can quickly
- * free them in case we reach maxmemory-clients (client eviction).
- * Note that in case of io-threads enabled we have to call this function only
- * after the fan-in phase (when no io-threads are working) because the bucket
- * lists are global. The io-threads themselves track per-client memory usage in
- * updateClientMemUsage(). Here we update the clients to each bucket when all
- * io-threads are done (both for read and write io-threading). */
-void updateClientMemUsageBucket(client *c) {
- serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
int allow_eviction =
- (c->last_memory_type == CLIENT_TYPE_NORMAL || c->last_memory_type == CLIENT_TYPE_PUBSUB) &&
+ (type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) &&
!(c->flags & CLIENT_NO_EVICT);
/* Update the client in the mem usage buckets */
if (c->mem_usage_bucket) {
- c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage_on_bucket_update;
+ c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
/* If this client can't be evicted then remove it from the mem usage
* buckets */
if (!allow_eviction) {
@@ -852,8 +851,8 @@ void updateClientMemUsageBucket(client *c) {
}
}
if (allow_eviction) {
- clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage);
- bucket->mem_usage_sum += c->last_memory_usage;
+ clientMemUsageBucket *bucket = getMemUsageBucket(mem);
+ bucket->mem_usage_sum += mem;
if (bucket != c->mem_usage_bucket) {
if (c->mem_usage_bucket)
listDelNode(c->mem_usage_bucket->clients,
@@ -864,7 +863,10 @@ void updateClientMemUsageBucket(client *c) {
}
}
- c->last_memory_usage_on_bucket_update = c->last_memory_usage;
+ /* Remember what we added, to remove it next time. */
+ c->last_memory_usage = mem;
+
+ return 0;
}
/* Return the max samples in the memory usage of clients tracked by
@@ -1410,7 +1412,7 @@ void blockingOperationEnds() {
}
}
-/* This function fill in the role of serverCron during RDB or AOF loading, and
+/* This function fills in the role of serverCron during RDB or AOF loading, and
* also during blocked scripts.
* It attempts to do its duties at a similar rate as the configured server.hz,
* and updates cronloops variable so that similarly to serverCron, the
@@ -2405,6 +2407,7 @@ void initServer(void) {
server.thp_enabled = 0;
server.cluster_drop_packet_filter = -1;
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
+ server.reply_buffer_resizing_enabled = 1;
resetReplicationBuffer();
if ((server.tls_port || server.tls_replication || server.tls_cluster)
@@ -3406,6 +3409,16 @@ void rejectCommand(client *c, robj *reply) {
}
}
+void rejectCommandSds(client *c, sds s) {
+ if (c->cmd && c->cmd->proc == execCommand) {
+ execCommandAbort(c, s);
+ sdsfree(s);
+ } else {
+ /* The following frees 's'. */
+ addReplyErrorSds(c, s);
+ }
+}
+
void rejectCommandFormat(client *c, const char *fmt, ...) {
if (c->cmd) c->cmd->rejected_calls++;
flagTransaction(c);
@@ -3416,13 +3429,7 @@ void rejectCommandFormat(client *c, const char *fmt, ...) {
/* Make sure there are no newlines in the string, otherwise invalid protocol
* is emitted (The args come from the user, they may contain any character). */
sdsmapchars(s, "\r\n", " ", 2);
- if (c->cmd && c->cmd->proc == execCommand) {
- execCommandAbort(c, s);
- sdsfree(s);
- } else {
- /* The following frees 's'. */
- addReplyErrorSds(c, s);
- }
+ rejectCommandSds(c, s);
}
/* This is called after a command in call, we can do some maintenance job in it. */
@@ -3705,23 +3712,14 @@ int processCommand(client *c) {
server.masterhost == NULL &&
(is_write_command ||c->cmd->proc == pingCommand))
{
- if (deny_write_type == DISK_ERROR_TYPE_RDB)
- rejectCommand(c, shared.bgsaveerr);
- else
- rejectCommandFormat(c,
- "-MISCONF Errors writing to the AOF file: %s",
- strerror(server.aof_last_write_errno));
+ sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
+ rejectCommandSds(c, err);
return C_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
- if (server.masterhost == NULL &&
- server.repl_min_slaves_to_write &&
- server.repl_min_slaves_max_lag &&
- is_write_command &&
- server.repl_good_slaves_count < server.repl_min_slaves_to_write)
- {
+ if (is_write_command && !checkGoodReplicasStatus()) {
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
@@ -4146,6 +4144,18 @@ int writeCommandsDeniedByDiskError(void) {
return DISK_ERROR_TYPE_NONE;
}
+sds writeCommandsGetDiskErrorMessage(int error_code) {
+ sds ret = NULL;
+ if (error_code == DISK_ERROR_TYPE_RDB) {
+ ret = sdsdup(shared.bgsaveerr->ptr);
+ } else {
+ ret = sdscatfmt(sdsempty(),
+ "-MISCONF Errors writing to the AOF file: %s",
+ strerror(server.aof_last_write_errno));
+ }
+ return ret;
+}
+
/* The PING command. It works in a different way if the client is in
* in Pub/Sub mode. */
void pingCommand(client *c) {
@@ -6388,9 +6398,9 @@ void dismissMemory(void* ptr, size_t size_hint) {
/* Dismiss big chunks of memory inside a client structure, see dismissMemory() */
void dismissClientMemory(client *c) {
- /* Dismiss client query buffer. */
+ /* Dismiss client query buffer and static reply buffer. */
+ dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
- dismissSds(c->pending_querybuf);
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {
@@ -6414,9 +6424,6 @@ void dismissClientMemory(client *c) {
if (bulk) dismissMemory(bulk, bulk->size);
}
}
-
- /* The client struct has a big static reply buffer in it. */
- dismissMemory(c, 0);
}
/* In the child process, we don't need some buffers anymore, and these are