summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/adlist.c35
-rw-r--r--src/adlist.h3
-rw-r--r--src/module.c2
-rw-r--r--src/networking.c17
-rw-r--r--src/server.h2
5 files changed, 47 insertions, 12 deletions
diff --git a/src/adlist.c b/src/adlist.c
index 2e811e02e..6bde46132 100644
--- a/src/adlist.c
+++ b/src/adlist.c
@@ -93,6 +93,14 @@ list *listAddNodeHead(list *list, void *value)
if ((node = zmalloc(sizeof(*node))) == NULL)
return NULL;
node->value = value;
+ listLinkNodeHead(list, node);
+ return list;
+}
+
+/*
+ * Add a node that has already been allocated to the head of list
+ */
+void listLinkNodeHead(list* list, listNode *node) {
if (list->len == 0) {
list->head = list->tail = node;
node->prev = node->next = NULL;
@@ -103,7 +111,6 @@ list *listAddNodeHead(list *list, void *value)
list->head = node;
}
list->len++;
- return list;
}
/* Add a new node to the list, to tail, containing the specified 'value'
@@ -162,11 +169,20 @@ list *listInsertNode(list *list, listNode *old_node, void *value, int after) {
}
/* Remove the specified node from the specified list.
- * It's up to the caller to free the private value of the node.
+ * The node is freed. If free callback is provided the value is freed as well.
*
* This function can't fail. */
void listDelNode(list *list, listNode *node)
{
+ listUnlinkNode(list, node);
+ if (list->free) list->free(node->value);
+ zfree(node);
+}
+
+/*
+ * Remove the specified node from the list without freeing it.
+ */
+void listUnlinkNode(list *list, listNode *node) {
if (node->prev)
node->prev->next = node->next;
else
@@ -175,8 +191,10 @@ void listDelNode(list *list, listNode *node)
node->next->prev = node->prev;
else
list->tail = node->prev;
- if (list->free) list->free(node->value);
- zfree(node);
+
+ node->next = NULL;
+ node->prev = NULL;
+
list->len--;
}
@@ -381,3 +399,12 @@ void listJoin(list *l, list *o) {
o->head = o->tail = NULL;
o->len = 0;
}
+
+/* Initializes the node's value and sets its pointers
+ * so that it is initially not a member of any list.
+ */
+void listInitNode(listNode *node, void *value) {
+ node->prev = NULL;
+ node->next = NULL;
+ node->value = value;
+}
diff --git a/src/adlist.h b/src/adlist.h
index dd8a8d693..8f319f7cc 100644
--- a/src/adlist.h
+++ b/src/adlist.h
@@ -88,6 +88,9 @@ void listRewindTail(list *list, listIter *li);
void listRotateTailToHead(list *list);
void listRotateHeadToTail(list *list);
void listJoin(list *l, list *o);
+void listInitNode(listNode *node, void *value);
+void listLinkNodeHead(list *list, listNode *node);
+void listUnlinkNode(list *list, listNode *node);
/* Directions for iterators */
#define AL_START_HEAD 0
diff --git a/src/module.c b/src/module.c
index ade541497..308d5716f 100644
--- a/src/module.c
+++ b/src/module.c
@@ -7542,7 +7542,7 @@ void moduleHandleBlockedClients(void) {
!(c->flags & CLIENT_PENDING_WRITE))
{
c->flags |= CLIENT_PENDING_WRITE;
- listAddNodeHead(server.clients_pending_write,c);
+ listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
}
}
diff --git a/src/networking.c b/src/networking.c
index 5dcbdd0f6..932413816 100644
--- a/src/networking.c
+++ b/src/networking.c
@@ -207,6 +207,7 @@ client *createClient(connection *conn) {
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
+ listInitNode(&c->clients_pending_write_node, c);
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
c->mem_usage_bucket = NULL;
@@ -255,7 +256,7 @@ void putClientInPendingWriteQueue(client *c) {
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
- listAddNodeHead(server.clients_pending_write,c);
+ listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
}
}
@@ -1439,9 +1440,9 @@ void unlinkClient(client *c) {
/* Remove from the list of pending writes if needed. */
if (c->flags & CLIENT_PENDING_WRITE) {
- ln = listSearchKey(server.clients_pending_write,c);
- serverAssert(ln != NULL);
- listDelNode(server.clients_pending_write,ln);
+ serverAssert(&c->clients_pending_write_node.next != NULL ||
+ &c->clients_pending_write_node.prev != NULL);
+ listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
c->flags &= ~CLIENT_PENDING_WRITE;
}
@@ -1975,7 +1976,7 @@ int handleClientsWithPendingWrites(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
- listDelNode(server.clients_pending_write,ln);
+ listUnlinkNode(server.clients_pending_write,ln);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
@@ -4156,7 +4157,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
- listDelNode(server.clients_pending_write, ln);
+ listUnlinkNode(server.clients_pending_write, ln);
continue;
}
@@ -4215,7 +4216,9 @@ int handleClientsWithPendingWritesUsingThreads(void) {
installClientWriteHandler(c);
}
}
- listEmpty(server.clients_pending_write);
+ while(listLength(server.clients_pending_write) > 0) {
+ listUnlinkNode(server.clients_pending_write, server.clients_pending_write->head);
+ }
/* Update processed count on server */
server.stat_io_writes_processed += processed;
diff --git a/src/server.h b/src/server.h
index f6e3b2009..7280cd7c3 100644
--- a/src/server.h
+++ b/src/server.h
@@ -1176,6 +1176,8 @@ typedef struct client {
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
+ /* list node in clients_pending_write list */
+ listNode clients_pending_write_node;
/* Response buffer */
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */