diff options
Diffstat (limited to 'src/server.c')
-rw-r--r-- | src/server.c | 128 |
1 files changed, 1 insertions, 127 deletions
diff --git a/src/server.c b/src/server.c index cfed2f91b..c1f7436a1 100644 --- a/src/server.c +++ b/src/server.c @@ -1473,132 +1473,6 @@ int allPersistenceDisabled(void) { return server.saveparamslen == 0 && server.aof_state == AOF_OFF; } -/* ========================== Clients timeouts ============================= */ - -/* Check if this blocked client timedout (does nothing if the client is - * not blocked right now). If so send a reply, unblock it, and return 1. - * Otherwise 0 is returned and no operation is performed. */ -int checkBlockedClientTimeout(client *c, mstime_t now) { - if (c->flags & CLIENT_BLOCKED && - c->bpop.timeout != 0 - && c->bpop.timeout < now) - { - /* Handle blocking operation specific timeout. */ - replyToBlockedClientTimedOut(c); - unblockClient(c); - return 1; - } else { - return 0; - } -} - -/* Check for timeouts. Returns non-zero if the client was terminated. - * The function gets the current time in milliseconds as argument since - * it gets called multiple times in a loop, so calling gettimeofday() for - * each iteration would be costly without any actual gain. */ -int clientsCronHandleTimeout(client *c, mstime_t now_ms) { - time_t now = now_ms/1000; - - if (server.maxidletime && - /* This handles the idle clients connection timeout if set. */ - !(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */ - !(c->flags & CLIENT_MASTER) && /* No timeout for masters */ - !(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */ - !(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */ - (now - c->lastinteraction > server.maxidletime)) - { - serverLog(LL_VERBOSE,"Closing idle client"); - freeClient(c); - return 1; - } else if (c->flags & CLIENT_BLOCKED) { - /* Cluster: handle unblock & redirect of clients blocked - * into keys no longer served by this server. */ - if (server.cluster_enabled) { - if (clusterRedirectBlockedClientIfNeeded(c)) - unblockClient(c); - } - } - return 0; -} - -/* For blocked clients timeouts we populate a radix tree of 128 bit keys - * composed as such: - * - * [8 byte big endian expire time]+[8 byte client ID] - * - * We don't do any cleanup in the Radix tree: when we run the clients that - * reached the timeout already, if they are no longer existing or no longer - * blocked with such timeout, we just go forward. - * - * Every time a client blocks with a timeout, we add the client in - * the tree. In beforeSleep() we call clientsHandleTimeout() to run - * the tree and unblock the clients. */ - -#define CLIENT_ST_KEYLEN 16 /* 8 bytes mstime + 8 bytes client ID. */ - -/* Given client ID and timeout, write the resulting radix tree key in buf. */ -void encodeTimeoutKey(unsigned char *buf, uint64_t timeout, uint64_t id) { - timeout = htonu64(timeout); - memcpy(buf,&timeout,sizeof(timeout)); - memcpy(buf+8,&id,sizeof(id)); -} - -/* Given a key encoded with encodeTimeoutKey(), resolve the fields and write - * the timeout into *toptr and the client ID into *idptr. */ -void decodeTimeoutKey(unsigned char *buf, uint64_t *toptr, uint64_t *idptr) { - memcpy(toptr,buf,sizeof(*toptr)); - *toptr = ntohu64(*toptr); - memcpy(idptr,buf+8,sizeof(*idptr)); -} - -/* Add the specified client id / timeout as a key in the radix tree we use - * to handle blocked clients timeouts. The client is not added to the list - * if its timeout is zero (block forever). */ -void addClientToTimeoutTable(client *c) { - if (c->bpop.timeout == 0) return; - uint64_t timeout = c->bpop.timeout; - uint64_t id = c->id; - unsigned char buf[CLIENT_ST_KEYLEN]; - encodeTimeoutKey(buf,timeout,id); - if (raxTryInsert(server.clients_timeout_table,buf,sizeof(buf),NULL,NULL)) - c->flags |= CLIENT_IN_TO_TABLE; -} - -/* Remove the client from the table when it is unblocked for reasons - * different than timing out. */ -void removeClientFromTimeoutTable(client *c) { - if (!(c->flags & CLIENT_IN_TO_TABLE)) return; - c->flags &= ~CLIENT_IN_TO_TABLE; - uint64_t timeout = c->bpop.timeout; - uint64_t id = c->id; - unsigned char buf[CLIENT_ST_KEYLEN]; - encodeTimeoutKey(buf,timeout,id); - raxRemove(server.clients_timeout_table,buf,sizeof(buf),NULL); -} - -/* This function is called in beforeSleep() in order to unblock clients - * that are waiting in blocking operations with a timeout set. */ -void clientsHandleTimeout(void) { - if (raxSize(server.clients_timeout_table) == 0) return; - uint64_t now = mstime(); - raxIterator ri; - raxStart(&ri,server.clients_timeout_table); - raxSeek(&ri,"^",NULL,0); - - while(raxNext(&ri)) { - uint64_t id, timeout; - decodeTimeoutKey(ri.key,&timeout,&id); - if (timeout >= now) break; /* All the timeouts are in the future. */ - client *c = lookupClientByID(id); - if (c) { - c->flags &= ~CLIENT_IN_TO_TABLE; - checkBlockedClientTimeout(c,now); - } - raxRemove(server.clients_timeout_table,ri.key,ri.key_len,NULL); - raxSeek(&ri,"^",NULL,0); - } -} - /* ======================= Cron: called every 100 ms ======================== */ /* Add a sample to the operations per second array of samples. */ @@ -2183,7 +2057,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Handle precise timeouts of blocked clients. */ - clientsHandleTimeout(); + handleBlockedClientsTimeout(); /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); |