summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorantirez <antirez@gmail.com>2015-09-28 18:25:57 +0200
committerantirez <antirez@gmail.com>2015-09-30 16:29:41 +0200
commit1c7d87df0cd64fd786bebd5cb9636912504359d5 (patch)
tree31bce750170bc1ac32d6b879984396c63b8174bc
parentd1b6a17d1ed64c919d55a27ea780e973196a5e98 (diff)
downloadredis-1c7d87df0cd64fd786bebd5cb9636912504359d5.tar.gz
Avoid installing the client write handler when possible.
-rw-r--r--src/networking.c22
-rw-r--r--src/replication.c6
-rw-r--r--src/server.c32
-rw-r--r--src/server.h3
4 files changed, 56 insertions, 7 deletions
diff --git a/src/networking.c b/src/networking.c
index 336561e10..4f6441de0 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -166,16 +166,18 @@ int prepareClientToWrite(client *c) {
/* Only install the handler if not already installed and, in case of
* slaves, if the client can actually receive writes. */
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
+ !(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
- /* Try to install the write handler. */
- if (aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
- sendReplyToClient, c) == AE_ERR)
- {
- freeClientAsync(c);
- return C_ERR;
- }
+ /* Here instead of installing the write handler, we just flag the
+ * client and put it into a list of clients that have something
+ * to write to the socket. This way before re-entering the event
+ * loop, we can try to directly write to the client sockets avoiding
+ * a system call. We'll only really install the write handler if
+ * we'll not be able to write the whole reply at once. */
+ c->flags |= CLIENT_PENDING_WRITE;
+ listAddNodeTail(server.clients_pending_write,c);
}
/* Authorize the caller to queue in the output buffer of this client. */
@@ -739,6 +741,12 @@ void freeClient(client *c) {
listDelNode(server.clients,ln);
}
+ /* Remove from the list of pending writes if needed. */
+ if (c->flags & CLIENT_PENDING_WRITE) {
+ ln = listSearchKey(server.clients_pending_write,c);
+ listDelNode(server.clients_pending_write,ln);
+ }
+
/* When client was just unblocked because of a blocking operation,
* remove it from the list of unblocked clients. */
if (c->flags & CLIENT_UNBLOCKED) {
diff --git a/src/replication.c b/src/replication.c
index 24157eb61..ec33c57da 100644
--- a/src/replication.c
+++ b/src/replication.c
@@ -1857,6 +1857,12 @@ void replicationCacheMaster(client *c) {
serverAssert(ln != NULL);
listDelNode(server.clients,ln);
+ /* Remove from the list of clients with pending writes as well. */
+ if (c->flags & CLIENT_PENDING_WRITE) {
+ ln = listSearchKey(server.clients_pending_write,c);
+ if (ln) listDelNode(server.clients_pending_write,ln);
+ }
+
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;
diff --git a/src/server.c b/src/server.c
index 5ceef9aef..316eba496 100644
--- a/src/server.c
+++ b/src/server.c
@@ -1274,6 +1274,34 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
return 1000/server.hz;
}
+/* This function is called just before entering the event loop, in the hope
+ * we can just write the replies to the client output buffer without any
+ * need to use a syscall in order to install the writable event handler,
+ * get it called, and so forth. */
+void handleClientsWithPendingWrites(void) {
+ listIter li;
+ listNode *ln;
+
+ listRewind(server.clients_pending_write,&li);
+ while((ln = listNext(&li))) {
+ client *c = listNodeValue(ln);
+ c->flags &= ~CLIENT_PENDING_WRITE;
+ listDelNode(server.clients_pending_write,ln);
+
+ /* Try to write buffers to the client socket. */
+ sendReplyToClient(server.el,c->fd,c,0);
+
+ /* If there is nothing left, do nothing. Otherwise install
+ * the write handler. */
+ if ((c->bufpos || listLength(c->reply)) &&
+ aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
+ sendReplyToClient, c) == AE_ERR)
+ {
+ freeClientAsync(c);
+ }
+ }
+}
+
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
@@ -1317,6 +1345,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
+
+ /* Handle writes with pending output buffers. */
+ handleClientsWithPendingWrites();
}
/* =========================== Server initialization ======================== */
@@ -1781,6 +1812,7 @@ void initServer(void) {
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
+ server.clients_pending_write = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
diff --git a/src/server.h b/src/server.h
index 4e99179e3..998112893 100644
--- a/src/server.h
+++ b/src/server.h
@@ -260,6 +260,8 @@ typedef long long mstime_t; /* millisecond time type. */
#define CLIENT_READONLY (1<<17) /* Cluster client is in read-only state. */
#define CLIENT_PUBSUB (1<<18) /* Client is in Pub/Sub mode. */
#define CLIENT_PREVENT_PROP (1<<19) /* Don't propagate to AOF / Slaves. */
+#define CLIENT_PENDING_WRITE (1<<20) /* Client has output to send but a write
+ handler is yet not installed. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@@ -714,6 +716,7 @@ struct redisServer {
int cfd_count; /* Used slots in cfd[] */
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
+ list *clients_pending_write; /* There is to write or install handler. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* Current client, only used on crash report */
int clients_paused; /* True if clients are currently paused */