summaryrefslogtreecommitdiff
path: root/src/networking.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/networking.c')
-rw-r--r--src/networking.c589
1 files changed, 589 insertions, 0 deletions
diff --git a/src/networking.c b/src/networking.c
new file mode 100644
index 000000000..31844a09f
--- /dev/null
+++ b/src/networking.c
@@ -0,0 +1,589 @@
+#include "redis.h"
+
+#include <sys/uio.h>
+
+void *dupClientReplyValue(void *o) {
+ incrRefCount((robj*)o);
+ return o;
+}
+
+int listMatchObjects(void *a, void *b) {
+ return equalStringObjects(a,b);
+}
+
+redisClient *createClient(int fd) {
+ redisClient *c = zmalloc(sizeof(*c));
+
+ anetNonBlock(NULL,fd);
+ anetTcpNoDelay(NULL,fd);
+ if (!c) return NULL;
+ selectDb(c,0);
+ c->fd = fd;
+ c->querybuf = sdsempty();
+ c->argc = 0;
+ c->argv = NULL;
+ c->bulklen = -1;
+ c->multibulk = 0;
+ c->mbargc = 0;
+ c->mbargv = NULL;
+ c->sentlen = 0;
+ c->flags = 0;
+ c->lastinteraction = time(NULL);
+ c->authenticated = 0;
+ c->replstate = REDIS_REPL_NONE;
+ c->reply = listCreate();
+ listSetFreeMethod(c->reply,decrRefCount);
+ listSetDupMethod(c->reply,dupClientReplyValue);
+ c->blocking_keys = NULL;
+ c->blocking_keys_num = 0;
+ c->io_keys = listCreate();
+ c->watched_keys = listCreate();
+ listSetFreeMethod(c->io_keys,decrRefCount);
+ c->pubsub_channels = dictCreate(&setDictType,NULL);
+ c->pubsub_patterns = listCreate();
+ listSetFreeMethod(c->pubsub_patterns,decrRefCount);
+ listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
+ if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
+ readQueryFromClient, c) == AE_ERR) {
+ freeClient(c);
+ return NULL;
+ }
+ listAddNodeTail(server.clients,c);
+ initClientMultiState(c);
+ return c;
+}
+
+void addReply(redisClient *c, robj *obj) {
+ if (listLength(c->reply) == 0 &&
+ (c->replstate == REDIS_REPL_NONE ||
+ c->replstate == REDIS_REPL_ONLINE) &&
+ aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
+ sendReplyToClient, c) == AE_ERR) return;
+
+ if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
+ obj = dupStringObject(obj);
+ obj->refcount = 0; /* getDecodedObject() will increment the refcount */
+ }
+ listAddNodeTail(c->reply,getDecodedObject(obj));
+}
+
+void addReplySds(redisClient *c, sds s) {
+ robj *o = createObject(REDIS_STRING,s);
+ addReply(c,o);
+ decrRefCount(o);
+}
+
+void addReplyDouble(redisClient *c, double d) {
+ char buf[128];
+
+ snprintf(buf,sizeof(buf),"%.17g",d);
+ addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
+ (unsigned long) strlen(buf),buf));
+}
+
+void addReplyLongLong(redisClient *c, long long ll) {
+ char buf[128];
+ size_t len;
+
+ if (ll == 0) {
+ addReply(c,shared.czero);
+ return;
+ } else if (ll == 1) {
+ addReply(c,shared.cone);
+ return;
+ }
+ buf[0] = ':';
+ len = ll2string(buf+1,sizeof(buf)-1,ll);
+ buf[len+1] = '\r';
+ buf[len+2] = '\n';
+ addReplySds(c,sdsnewlen(buf,len+3));
+}
+
+void addReplyUlong(redisClient *c, unsigned long ul) {
+ char buf[128];
+ size_t len;
+
+ if (ul == 0) {
+ addReply(c,shared.czero);
+ return;
+ } else if (ul == 1) {
+ addReply(c,shared.cone);
+ return;
+ }
+ len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
+ addReplySds(c,sdsnewlen(buf,len));
+}
+
+void addReplyBulkLen(redisClient *c, robj *obj) {
+ size_t len, intlen;
+ char buf[128];
+
+ if (obj->encoding == REDIS_ENCODING_RAW) {
+ len = sdslen(obj->ptr);
+ } else {
+ long n = (long)obj->ptr;
+
+ /* Compute how many bytes will take this integer as a radix 10 string */
+ len = 1;
+ if (n < 0) {
+ len++;
+ n = -n;
+ }
+ while((n = n/10) != 0) {
+ len++;
+ }
+ }
+ buf[0] = '$';
+ intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
+ buf[intlen+1] = '\r';
+ buf[intlen+2] = '\n';
+ addReplySds(c,sdsnewlen(buf,intlen+3));
+}
+
+void addReplyBulk(redisClient *c, robj *obj) {
+ addReplyBulkLen(c,obj);
+ addReply(c,obj);
+ addReply(c,shared.crlf);
+}
+
+/* In the CONFIG command we need to add vanilla C string as bulk replies */
+void addReplyBulkCString(redisClient *c, char *s) {
+ if (s == NULL) {
+ addReply(c,shared.nullbulk);
+ } else {
+ robj *o = createStringObject(s,strlen(s));
+ addReplyBulk(c,o);
+ decrRefCount(o);
+ }
+}
+
+void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
+ int cport, cfd;
+ char cip[128];
+ redisClient *c;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+ REDIS_NOTUSED(privdata);
+
+ cfd = anetAccept(server.neterr, fd, cip, &cport);
+ if (cfd == AE_ERR) {
+ redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
+ return;
+ }
+ redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
+ if ((c = createClient(cfd)) == NULL) {
+ redisLog(REDIS_WARNING,"Error allocating resoures for the client");
+ close(cfd); /* May be already closed, just ingore errors */
+ return;
+ }
+ /* If maxclient directive is set and this is one client more... close the
+ * connection. Note that we create the client instead to check before
+ * for this condition, since now the socket is already set in nonblocking
+ * mode and we can send an error for free using the Kernel I/O */
+ if (server.maxclients && listLength(server.clients) > server.maxclients) {
+ char *err = "-ERR max number of clients reached\r\n";
+
+ /* That's a best effort error message, don't check write errors */
+ if (write(c->fd,err,strlen(err)) == -1) {
+ /* Nothing to do, Just to avoid the warning... */
+ }
+ freeClient(c);
+ return;
+ }
+ server.stat_numconnections++;
+}
+
+static void freeClientArgv(redisClient *c) {
+ int j;
+
+ for (j = 0; j < c->argc; j++)
+ decrRefCount(c->argv[j]);
+ for (j = 0; j < c->mbargc; j++)
+ decrRefCount(c->mbargv[j]);
+ c->argc = 0;
+ c->mbargc = 0;
+}
+
+void freeClient(redisClient *c) {
+ listNode *ln;
+
+ /* Note that if the client we are freeing is blocked into a blocking
+ * call, we have to set querybuf to NULL *before* to call
+ * unblockClientWaitingData() to avoid processInputBuffer() will get
+ * called. Also it is important to remove the file events after
+ * this, because this call adds the READABLE event. */
+ sdsfree(c->querybuf);
+ c->querybuf = NULL;
+ if (c->flags & REDIS_BLOCKED)
+ unblockClientWaitingData(c);
+
+ /* UNWATCH all the keys */
+ unwatchAllKeys(c);
+ listRelease(c->watched_keys);
+ /* Unsubscribe from all the pubsub channels */
+ pubsubUnsubscribeAllChannels(c,0);
+ pubsubUnsubscribeAllPatterns(c,0);
+ dictRelease(c->pubsub_channels);
+ listRelease(c->pubsub_patterns);
+ /* Obvious cleanup */
+ aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
+ aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+ listRelease(c->reply);
+ freeClientArgv(c);
+ close(c->fd);
+ /* Remove from the list of clients */
+ ln = listSearchKey(server.clients,c);
+ redisAssert(ln != NULL);
+ listDelNode(server.clients,ln);
+ /* Remove from the list of clients that are now ready to be restarted
+ * after waiting for swapped keys */
+ if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) {
+ ln = listSearchKey(server.io_ready_clients,c);
+ if (ln) {
+ listDelNode(server.io_ready_clients,ln);
+ server.vm_blocked_clients--;
+ }
+ }
+ /* Remove from the list of clients waiting for swapped keys */
+ while (server.vm_enabled && listLength(c->io_keys)) {
+ ln = listFirst(c->io_keys);
+ dontWaitForSwappedKey(c,ln->value);
+ }
+ listRelease(c->io_keys);
+ /* Master/slave cleanup */
+ if (c->flags & REDIS_SLAVE) {
+ if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
+ close(c->repldbfd);
+ list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
+ ln = listSearchKey(l,c);
+ redisAssert(ln != NULL);
+ listDelNode(l,ln);
+ }
+ if (c->flags & REDIS_MASTER) {
+ server.master = NULL;
+ server.replstate = REDIS_REPL_CONNECT;
+ }
+ /* Release memory */
+ zfree(c->argv);
+ zfree(c->mbargv);
+ freeClientMultiState(c);
+ zfree(c);
+}
+
+#define GLUEREPLY_UP_TO (1024)
+static void glueReplyBuffersIfNeeded(redisClient *c) {
+ int copylen = 0;
+ char buf[GLUEREPLY_UP_TO];
+ listNode *ln;
+ listIter li;
+ robj *o;
+
+ listRewind(c->reply,&li);
+ while((ln = listNext(&li))) {
+ int objlen;
+
+ o = ln->value;
+ objlen = sdslen(o->ptr);
+ if (copylen + objlen <= GLUEREPLY_UP_TO) {
+ memcpy(buf+copylen,o->ptr,objlen);
+ copylen += objlen;
+ listDelNode(c->reply,ln);
+ } else {
+ if (copylen == 0) return;
+ break;
+ }
+ }
+ /* Now the output buffer is empty, add the new single element */
+ o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
+ listAddNodeHead(c->reply,o);
+}
+
+void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
+ redisClient *c = privdata;
+ int nwritten = 0, totwritten = 0, objlen;
+ robj *o;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+
+ /* Use writev() if we have enough buffers to send */
+ if (!server.glueoutputbuf &&
+ listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
+ !(c->flags & REDIS_MASTER))
+ {
+ sendReplyToClientWritev(el, fd, privdata, mask);
+ return;
+ }
+
+ while(listLength(c->reply)) {
+ if (server.glueoutputbuf && listLength(c->reply) > 1)
+ glueReplyBuffersIfNeeded(c);
+
+ o = listNodeValue(listFirst(c->reply));
+ objlen = sdslen(o->ptr);
+
+ if (objlen == 0) {
+ listDelNode(c->reply,listFirst(c->reply));
+ continue;
+ }
+
+ if (c->flags & REDIS_MASTER) {
+ /* Don't reply to a master */
+ nwritten = objlen - c->sentlen;
+ } else {
+ nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
+ if (nwritten <= 0) break;
+ }
+ c->sentlen += nwritten;
+ totwritten += nwritten;
+ /* If we fully sent the object on head go to the next one */
+ if (c->sentlen == objlen) {
+ listDelNode(c->reply,listFirst(c->reply));
+ c->sentlen = 0;
+ }
+ /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
+ * bytes, in a single threaded server it's a good idea to serve
+ * other clients as well, even if a very large request comes from
+ * super fast link that is always able to accept data (in real world
+ * scenario think about 'KEYS *' against the loopback interfae) */
+ if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
+ }
+ if (nwritten == -1) {
+ if (errno == EAGAIN) {
+ nwritten = 0;
+ } else {
+ redisLog(REDIS_VERBOSE,
+ "Error writing to client: %s", strerror(errno));
+ freeClient(c);
+ return;
+ }
+ }
+ if (totwritten > 0) c->lastinteraction = time(NULL);
+ if (listLength(c->reply) == 0) {
+ c->sentlen = 0;
+ aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+ }
+}
+
+void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
+{
+ redisClient *c = privdata;
+ int nwritten = 0, totwritten = 0, objlen, willwrite;
+ robj *o;
+ struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
+ int offset, ion = 0;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+
+ listNode *node;
+ while (listLength(c->reply)) {
+ offset = c->sentlen;
+ ion = 0;
+ willwrite = 0;
+
+ /* fill-in the iov[] array */
+ for(node = listFirst(c->reply); node; node = listNextNode(node)) {
+ o = listNodeValue(node);
+ objlen = sdslen(o->ptr);
+
+ if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
+ break;
+
+ if(ion == REDIS_WRITEV_IOVEC_COUNT)
+ break; /* no more iovecs */
+
+ iov[ion].iov_base = ((char*)o->ptr) + offset;
+ iov[ion].iov_len = objlen - offset;
+ willwrite += objlen - offset;
+ offset = 0; /* just for the first item */
+ ion++;
+ }
+
+ if(willwrite == 0)
+ break;
+
+ /* write all collected blocks at once */
+ if((nwritten = writev(fd, iov, ion)) < 0) {
+ if (errno != EAGAIN) {
+ redisLog(REDIS_VERBOSE,
+ "Error writing to client: %s", strerror(errno));
+ freeClient(c);
+ return;
+ }
+ break;
+ }
+
+ totwritten += nwritten;
+ offset = c->sentlen;
+
+ /* remove written robjs from c->reply */
+ while (nwritten && listLength(c->reply)) {
+ o = listNodeValue(listFirst(c->reply));
+ objlen = sdslen(o->ptr);
+
+ if(nwritten >= objlen - offset) {
+ listDelNode(c->reply, listFirst(c->reply));
+ nwritten -= objlen - offset;
+ c->sentlen = 0;
+ } else {
+ /* partial write */
+ c->sentlen += nwritten;
+ break;
+ }
+ offset = 0;
+ }
+ }
+
+ if (totwritten > 0)
+ c->lastinteraction = time(NULL);
+
+ if (listLength(c->reply) == 0) {
+ c->sentlen = 0;
+ aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
+ }
+}
+
+/* resetClient prepare the client to process the next command */
+void resetClient(redisClient *c) {
+ freeClientArgv(c);
+ c->bulklen = -1;
+ c->multibulk = 0;
+}
+
+void closeTimedoutClients(void) {
+ redisClient *c;
+ listNode *ln;
+ time_t now = time(NULL);
+ listIter li;
+
+ listRewind(server.clients,&li);
+ while ((ln = listNext(&li)) != NULL) {
+ c = listNodeValue(ln);
+ if (server.maxidletime &&
+ !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
+ !(c->flags & REDIS_MASTER) && /* no timeout for masters */
+ dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
+ listLength(c->pubsub_patterns) == 0 &&
+ (now - c->lastinteraction > server.maxidletime))
+ {
+ redisLog(REDIS_VERBOSE,"Closing idle client");
+ freeClient(c);
+ } else if (c->flags & REDIS_BLOCKED) {
+ if (c->blockingto != 0 && c->blockingto < now) {
+ addReply(c,shared.nullmultibulk);
+ unblockClientWaitingData(c);
+ }
+ }
+ }
+}
+
+void processInputBuffer(redisClient *c) {
+again:
+ /* Before to process the input buffer, make sure the client is not
+ * waitig for a blocking operation such as BLPOP. Note that the first
+ * iteration the client is never blocked, otherwise the processInputBuffer
+ * would not be called at all, but after the execution of the first commands
+ * in the input buffer the client may be blocked, and the "goto again"
+ * will try to reiterate. The following line will make it return asap. */
+ if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
+ if (c->bulklen == -1) {
+ /* Read the first line of the query */
+ char *p = strchr(c->querybuf,'\n');
+ size_t querylen;
+
+ if (p) {
+ sds query, *argv;
+ int argc, j;
+
+ query = c->querybuf;
+ c->querybuf = sdsempty();
+ querylen = 1+(p-(query));
+ if (sdslen(query) > querylen) {
+ /* leave data after the first line of the query in the buffer */
+ c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
+ }
+ *p = '\0'; /* remove "\n" */
+ if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
+ sdsupdatelen(query);
+
+ /* Now we can split the query in arguments */
+ argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
+ sdsfree(query);
+
+ if (c->argv) zfree(c->argv);
+ c->argv = zmalloc(sizeof(robj*)*argc);
+
+ for (j = 0; j < argc; j++) {
+ if (sdslen(argv[j])) {
+ c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
+ c->argc++;
+ } else {
+ sdsfree(argv[j]);
+ }
+ }
+ zfree(argv);
+ if (c->argc) {
+ /* Execute the command. If the client is still valid
+ * after processCommand() return and there is something
+ * on the query buffer try to process the next command. */
+ if (processCommand(c) && sdslen(c->querybuf)) goto again;
+ } else {
+ /* Nothing to process, argc == 0. Just process the query
+ * buffer if it's not empty or return to the caller */
+ if (sdslen(c->querybuf)) goto again;
+ }
+ return;
+ } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
+ redisLog(REDIS_VERBOSE, "Client protocol error");
+ freeClient(c);
+ return;
+ }
+ } else {
+ /* Bulk read handling. Note that if we are at this point
+ the client already sent a command terminated with a newline,
+ we are reading the bulk data that is actually the last
+ argument of the command. */
+ int qbl = sdslen(c->querybuf);
+
+ if (c->bulklen <= qbl) {
+ /* Copy everything but the final CRLF as final argument */
+ c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
+ c->argc++;
+ c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
+ /* Process the command. If the client is still valid after
+ * the processing and there is more data in the buffer
+ * try to parse it. */
+ if (processCommand(c) && sdslen(c->querybuf)) goto again;
+ return;
+ }
+ }
+}
+
+void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
+ redisClient *c = (redisClient*) privdata;
+ char buf[REDIS_IOBUF_LEN];
+ int nread;
+ REDIS_NOTUSED(el);
+ REDIS_NOTUSED(mask);
+
+ nread = read(fd, buf, REDIS_IOBUF_LEN);
+ if (nread == -1) {
+ if (errno == EAGAIN) {
+ nread = 0;
+ } else {
+ redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
+ freeClient(c);
+ return;
+ }
+ } else if (nread == 0) {
+ redisLog(REDIS_VERBOSE, "Client closed connection");
+ freeClient(c);
+ return;
+ }
+ if (nread) {
+ c->querybuf = sdscatlen(c->querybuf, buf, nread);
+ c->lastinteraction = time(NULL);
+ } else {
+ return;
+ }
+ processInputBuffer(c);
+}