diff options
Diffstat (limited to 'src/networking.c')
-rw-r--r-- | src/networking.c | 589 |
1 files changed, 589 insertions, 0 deletions
diff --git a/src/networking.c b/src/networking.c new file mode 100644 index 000000000..31844a09f --- /dev/null +++ b/src/networking.c @@ -0,0 +1,589 @@ +#include "redis.h" + +#include <sys/uio.h> + +void *dupClientReplyValue(void *o) { + incrRefCount((robj*)o); + return o; +} + +int listMatchObjects(void *a, void *b) { + return equalStringObjects(a,b); +} + +redisClient *createClient(int fd) { + redisClient *c = zmalloc(sizeof(*c)); + + anetNonBlock(NULL,fd); + anetTcpNoDelay(NULL,fd); + if (!c) return NULL; + selectDb(c,0); + c->fd = fd; + c->querybuf = sdsempty(); + c->argc = 0; + c->argv = NULL; + c->bulklen = -1; + c->multibulk = 0; + c->mbargc = 0; + c->mbargv = NULL; + c->sentlen = 0; + c->flags = 0; + c->lastinteraction = time(NULL); + c->authenticated = 0; + c->replstate = REDIS_REPL_NONE; + c->reply = listCreate(); + listSetFreeMethod(c->reply,decrRefCount); + listSetDupMethod(c->reply,dupClientReplyValue); + c->blocking_keys = NULL; + c->blocking_keys_num = 0; + c->io_keys = listCreate(); + c->watched_keys = listCreate(); + listSetFreeMethod(c->io_keys,decrRefCount); + c->pubsub_channels = dictCreate(&setDictType,NULL); + c->pubsub_patterns = listCreate(); + listSetFreeMethod(c->pubsub_patterns,decrRefCount); + listSetMatchMethod(c->pubsub_patterns,listMatchObjects); + if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, + readQueryFromClient, c) == AE_ERR) { + freeClient(c); + return NULL; + } + listAddNodeTail(server.clients,c); + initClientMultiState(c); + return c; +} + +void addReply(redisClient *c, robj *obj) { + if (listLength(c->reply) == 0 && + (c->replstate == REDIS_REPL_NONE || + c->replstate == REDIS_REPL_ONLINE) && + aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, + sendReplyToClient, c) == AE_ERR) return; + + if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) { + obj = dupStringObject(obj); + obj->refcount = 0; /* getDecodedObject() will increment the refcount */ + } + listAddNodeTail(c->reply,getDecodedObject(obj)); +} + +void addReplySds(redisClient *c, sds s) { + robj *o = createObject(REDIS_STRING,s); + addReply(c,o); + decrRefCount(o); +} + +void addReplyDouble(redisClient *c, double d) { + char buf[128]; + + snprintf(buf,sizeof(buf),"%.17g",d); + addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n", + (unsigned long) strlen(buf),buf)); +} + +void addReplyLongLong(redisClient *c, long long ll) { + char buf[128]; + size_t len; + + if (ll == 0) { + addReply(c,shared.czero); + return; + } else if (ll == 1) { + addReply(c,shared.cone); + return; + } + buf[0] = ':'; + len = ll2string(buf+1,sizeof(buf)-1,ll); + buf[len+1] = '\r'; + buf[len+2] = '\n'; + addReplySds(c,sdsnewlen(buf,len+3)); +} + +void addReplyUlong(redisClient *c, unsigned long ul) { + char buf[128]; + size_t len; + + if (ul == 0) { + addReply(c,shared.czero); + return; + } else if (ul == 1) { + addReply(c,shared.cone); + return; + } + len = snprintf(buf,sizeof(buf),":%lu\r\n",ul); + addReplySds(c,sdsnewlen(buf,len)); +} + +void addReplyBulkLen(redisClient *c, robj *obj) { + size_t len, intlen; + char buf[128]; + + if (obj->encoding == REDIS_ENCODING_RAW) { + len = sdslen(obj->ptr); + } else { + long n = (long)obj->ptr; + + /* Compute how many bytes will take this integer as a radix 10 string */ + len = 1; + if (n < 0) { + len++; + n = -n; + } + while((n = n/10) != 0) { + len++; + } + } + buf[0] = '$'; + intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len); + buf[intlen+1] = '\r'; + buf[intlen+2] = '\n'; + addReplySds(c,sdsnewlen(buf,intlen+3)); +} + +void addReplyBulk(redisClient *c, robj *obj) { + addReplyBulkLen(c,obj); + addReply(c,obj); + addReply(c,shared.crlf); +} + +/* In the CONFIG command we need to add vanilla C string as bulk replies */ +void addReplyBulkCString(redisClient *c, char *s) { + if (s == NULL) { + addReply(c,shared.nullbulk); + } else { + robj *o = createStringObject(s,strlen(s)); + addReplyBulk(c,o); + decrRefCount(o); + } +} + +void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { + int cport, cfd; + char cip[128]; + redisClient *c; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + REDIS_NOTUSED(privdata); + + cfd = anetAccept(server.neterr, fd, cip, &cport); + if (cfd == AE_ERR) { + redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr); + return; + } + redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport); + if ((c = createClient(cfd)) == NULL) { + redisLog(REDIS_WARNING,"Error allocating resoures for the client"); + close(cfd); /* May be already closed, just ingore errors */ + return; + } + /* If maxclient directive is set and this is one client more... close the + * connection. Note that we create the client instead to check before + * for this condition, since now the socket is already set in nonblocking + * mode and we can send an error for free using the Kernel I/O */ + if (server.maxclients && listLength(server.clients) > server.maxclients) { + char *err = "-ERR max number of clients reached\r\n"; + + /* That's a best effort error message, don't check write errors */ + if (write(c->fd,err,strlen(err)) == -1) { + /* Nothing to do, Just to avoid the warning... */ + } + freeClient(c); + return; + } + server.stat_numconnections++; +} + +static void freeClientArgv(redisClient *c) { + int j; + + for (j = 0; j < c->argc; j++) + decrRefCount(c->argv[j]); + for (j = 0; j < c->mbargc; j++) + decrRefCount(c->mbargv[j]); + c->argc = 0; + c->mbargc = 0; +} + +void freeClient(redisClient *c) { + listNode *ln; + + /* Note that if the client we are freeing is blocked into a blocking + * call, we have to set querybuf to NULL *before* to call + * unblockClientWaitingData() to avoid processInputBuffer() will get + * called. Also it is important to remove the file events after + * this, because this call adds the READABLE event. */ + sdsfree(c->querybuf); + c->querybuf = NULL; + if (c->flags & REDIS_BLOCKED) + unblockClientWaitingData(c); + + /* UNWATCH all the keys */ + unwatchAllKeys(c); + listRelease(c->watched_keys); + /* Unsubscribe from all the pubsub channels */ + pubsubUnsubscribeAllChannels(c,0); + pubsubUnsubscribeAllPatterns(c,0); + dictRelease(c->pubsub_channels); + listRelease(c->pubsub_patterns); + /* Obvious cleanup */ + aeDeleteFileEvent(server.el,c->fd,AE_READABLE); + aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + listRelease(c->reply); + freeClientArgv(c); + close(c->fd); + /* Remove from the list of clients */ + ln = listSearchKey(server.clients,c); + redisAssert(ln != NULL); + listDelNode(server.clients,ln); + /* Remove from the list of clients that are now ready to be restarted + * after waiting for swapped keys */ + if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) { + ln = listSearchKey(server.io_ready_clients,c); + if (ln) { + listDelNode(server.io_ready_clients,ln); + server.vm_blocked_clients--; + } + } + /* Remove from the list of clients waiting for swapped keys */ + while (server.vm_enabled && listLength(c->io_keys)) { + ln = listFirst(c->io_keys); + dontWaitForSwappedKey(c,ln->value); + } + listRelease(c->io_keys); + /* Master/slave cleanup */ + if (c->flags & REDIS_SLAVE) { + if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1) + close(c->repldbfd); + list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves; + ln = listSearchKey(l,c); + redisAssert(ln != NULL); + listDelNode(l,ln); + } + if (c->flags & REDIS_MASTER) { + server.master = NULL; + server.replstate = REDIS_REPL_CONNECT; + } + /* Release memory */ + zfree(c->argv); + zfree(c->mbargv); + freeClientMultiState(c); + zfree(c); +} + +#define GLUEREPLY_UP_TO (1024) +static void glueReplyBuffersIfNeeded(redisClient *c) { + int copylen = 0; + char buf[GLUEREPLY_UP_TO]; + listNode *ln; + listIter li; + robj *o; + + listRewind(c->reply,&li); + while((ln = listNext(&li))) { + int objlen; + + o = ln->value; + objlen = sdslen(o->ptr); + if (copylen + objlen <= GLUEREPLY_UP_TO) { + memcpy(buf+copylen,o->ptr,objlen); + copylen += objlen; + listDelNode(c->reply,ln); + } else { + if (copylen == 0) return; + break; + } + } + /* Now the output buffer is empty, add the new single element */ + o = createObject(REDIS_STRING,sdsnewlen(buf,copylen)); + listAddNodeHead(c->reply,o); +} + +void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { + redisClient *c = privdata; + int nwritten = 0, totwritten = 0, objlen; + robj *o; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + + /* Use writev() if we have enough buffers to send */ + if (!server.glueoutputbuf && + listLength(c->reply) > REDIS_WRITEV_THRESHOLD && + !(c->flags & REDIS_MASTER)) + { + sendReplyToClientWritev(el, fd, privdata, mask); + return; + } + + while(listLength(c->reply)) { + if (server.glueoutputbuf && listLength(c->reply) > 1) + glueReplyBuffersIfNeeded(c); + + o = listNodeValue(listFirst(c->reply)); + objlen = sdslen(o->ptr); + + if (objlen == 0) { + listDelNode(c->reply,listFirst(c->reply)); + continue; + } + + if (c->flags & REDIS_MASTER) { + /* Don't reply to a master */ + nwritten = objlen - c->sentlen; + } else { + nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen); + if (nwritten <= 0) break; + } + c->sentlen += nwritten; + totwritten += nwritten; + /* If we fully sent the object on head go to the next one */ + if (c->sentlen == objlen) { + listDelNode(c->reply,listFirst(c->reply)); + c->sentlen = 0; + } + /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT + * bytes, in a single threaded server it's a good idea to serve + * other clients as well, even if a very large request comes from + * super fast link that is always able to accept data (in real world + * scenario think about 'KEYS *' against the loopback interfae) */ + if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break; + } + if (nwritten == -1) { + if (errno == EAGAIN) { + nwritten = 0; + } else { + redisLog(REDIS_VERBOSE, + "Error writing to client: %s", strerror(errno)); + freeClient(c); + return; + } + } + if (totwritten > 0) c->lastinteraction = time(NULL); + if (listLength(c->reply) == 0) { + c->sentlen = 0; + aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + } +} + +void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask) +{ + redisClient *c = privdata; + int nwritten = 0, totwritten = 0, objlen, willwrite; + robj *o; + struct iovec iov[REDIS_WRITEV_IOVEC_COUNT]; + int offset, ion = 0; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + + listNode *node; + while (listLength(c->reply)) { + offset = c->sentlen; + ion = 0; + willwrite = 0; + + /* fill-in the iov[] array */ + for(node = listFirst(c->reply); node; node = listNextNode(node)) { + o = listNodeValue(node); + objlen = sdslen(o->ptr); + + if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT) + break; + + if(ion == REDIS_WRITEV_IOVEC_COUNT) + break; /* no more iovecs */ + + iov[ion].iov_base = ((char*)o->ptr) + offset; + iov[ion].iov_len = objlen - offset; + willwrite += objlen - offset; + offset = 0; /* just for the first item */ + ion++; + } + + if(willwrite == 0) + break; + + /* write all collected blocks at once */ + if((nwritten = writev(fd, iov, ion)) < 0) { + if (errno != EAGAIN) { + redisLog(REDIS_VERBOSE, + "Error writing to client: %s", strerror(errno)); + freeClient(c); + return; + } + break; + } + + totwritten += nwritten; + offset = c->sentlen; + + /* remove written robjs from c->reply */ + while (nwritten && listLength(c->reply)) { + o = listNodeValue(listFirst(c->reply)); + objlen = sdslen(o->ptr); + + if(nwritten >= objlen - offset) { + listDelNode(c->reply, listFirst(c->reply)); + nwritten -= objlen - offset; + c->sentlen = 0; + } else { + /* partial write */ + c->sentlen += nwritten; + break; + } + offset = 0; + } + } + + if (totwritten > 0) + c->lastinteraction = time(NULL); + + if (listLength(c->reply) == 0) { + c->sentlen = 0; + aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + } +} + +/* resetClient prepare the client to process the next command */ +void resetClient(redisClient *c) { + freeClientArgv(c); + c->bulklen = -1; + c->multibulk = 0; +} + +void closeTimedoutClients(void) { + redisClient *c; + listNode *ln; + time_t now = time(NULL); + listIter li; + + listRewind(server.clients,&li); + while ((ln = listNext(&li)) != NULL) { + c = listNodeValue(ln); + if (server.maxidletime && + !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */ + !(c->flags & REDIS_MASTER) && /* no timeout for masters */ + dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */ + listLength(c->pubsub_patterns) == 0 && + (now - c->lastinteraction > server.maxidletime)) + { + redisLog(REDIS_VERBOSE,"Closing idle client"); + freeClient(c); + } else if (c->flags & REDIS_BLOCKED) { + if (c->blockingto != 0 && c->blockingto < now) { + addReply(c,shared.nullmultibulk); + unblockClientWaitingData(c); + } + } + } +} + +void processInputBuffer(redisClient *c) { +again: + /* Before to process the input buffer, make sure the client is not + * waitig for a blocking operation such as BLPOP. Note that the first + * iteration the client is never blocked, otherwise the processInputBuffer + * would not be called at all, but after the execution of the first commands + * in the input buffer the client may be blocked, and the "goto again" + * will try to reiterate. The following line will make it return asap. */ + if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return; + if (c->bulklen == -1) { + /* Read the first line of the query */ + char *p = strchr(c->querybuf,'\n'); + size_t querylen; + + if (p) { + sds query, *argv; + int argc, j; + + query = c->querybuf; + c->querybuf = sdsempty(); + querylen = 1+(p-(query)); + if (sdslen(query) > querylen) { + /* leave data after the first line of the query in the buffer */ + c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen); + } + *p = '\0'; /* remove "\n" */ + if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */ + sdsupdatelen(query); + + /* Now we can split the query in arguments */ + argv = sdssplitlen(query,sdslen(query)," ",1,&argc); + sdsfree(query); + + if (c->argv) zfree(c->argv); + c->argv = zmalloc(sizeof(robj*)*argc); + + for (j = 0; j < argc; j++) { + if (sdslen(argv[j])) { + c->argv[c->argc] = createObject(REDIS_STRING,argv[j]); + c->argc++; + } else { + sdsfree(argv[j]); + } + } + zfree(argv); + if (c->argc) { + /* Execute the command. If the client is still valid + * after processCommand() return and there is something + * on the query buffer try to process the next command. */ + if (processCommand(c) && sdslen(c->querybuf)) goto again; + } else { + /* Nothing to process, argc == 0. Just process the query + * buffer if it's not empty or return to the caller */ + if (sdslen(c->querybuf)) goto again; + } + return; + } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) { + redisLog(REDIS_VERBOSE, "Client protocol error"); + freeClient(c); + return; + } + } else { + /* Bulk read handling. Note that if we are at this point + the client already sent a command terminated with a newline, + we are reading the bulk data that is actually the last + argument of the command. */ + int qbl = sdslen(c->querybuf); + + if (c->bulklen <= qbl) { + /* Copy everything but the final CRLF as final argument */ + c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2); + c->argc++; + c->querybuf = sdsrange(c->querybuf,c->bulklen,-1); + /* Process the command. If the client is still valid after + * the processing and there is more data in the buffer + * try to parse it. */ + if (processCommand(c) && sdslen(c->querybuf)) goto again; + return; + } + } +} + +void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { + redisClient *c = (redisClient*) privdata; + char buf[REDIS_IOBUF_LEN]; + int nread; + REDIS_NOTUSED(el); + REDIS_NOTUSED(mask); + + nread = read(fd, buf, REDIS_IOBUF_LEN); + if (nread == -1) { + if (errno == EAGAIN) { + nread = 0; + } else { + redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno)); + freeClient(c); + return; + } + } else if (nread == 0) { + redisLog(REDIS_VERBOSE, "Client closed connection"); + freeClient(c); + return; + } + if (nread) { + c->querybuf = sdscatlen(c->querybuf, buf, nread); + c->lastinteraction = time(NULL); + } else { + return; + } + processInputBuffer(c); +} |