diff options
author | dormando <dormando@rydia.net> | 2017-04-25 11:36:21 -0700 |
---|---|---|
committer | dormando <dormando@rydia.net> | 2017-05-21 14:24:34 -0700 |
commit | c053584ad1e84e0974e95b4a1a3bd40cd777baa0 (patch) | |
tree | 9a63ee4476d2a888d93f5d883056e9ee7b23f828 /thread.c | |
parent | ae84d771811394a9f6f2ff12022707f69bdc311f (diff) | |
download | memcached-c053584ad1e84e0974e95b4a1a3bd40cd777baa0.tar.gz |
fix ordering issue in conn dispatch
very old race condition: push to queue, write to signal pipe. with extstore,
hammering a secondary item type into the queue could occasionally swap the
item with the type it represented.
now differentiate on the type, and the character signifies either to pull an
item or do something else entirely.
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 59 |
1 files changed, 34 insertions, 25 deletions
@@ -17,6 +17,10 @@ #define ITEMS_PER_ALLOC 64 /* An item in the connection queue. */ +enum conn_queue_item_modes { + queue_new_conn, /* brand new connection. */ + queue_redispatch, /* redispatching from side thread */ +}; typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; @@ -24,6 +28,7 @@ struct conn_queue_item { int event_flags; int read_buffer_size; enum network_transport transport; + enum conn_queue_item_modes mode; conn *c; CQ_ITEM *next; }; @@ -361,6 +366,7 @@ static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; + conn *c; unsigned int timeout_fd; if (read(fd, buf, 1) != 1) { @@ -373,34 +379,35 @@ static void thread_libevent_process(int fd, short which, void *arg) { case 'c': item = cq_pop(me->new_conn_queue); - if (NULL != item) { - conn *c = conn_new(item->sfd, item->init_state, item->event_flags, - item->read_buffer_size, item->transport, - me->base); - if (c == NULL) { - if (IS_UDP(item->transport)) { - fprintf(stderr, "Can't listen for events on UDP socket\n"); - exit(1); - } else { - if (settings.verbose > 0) { - fprintf(stderr, "Can't listen for events on fd %d\n", - item->sfd); + if (NULL == item) { + break; + } + switch (item->mode) { + case queue_new_conn: + c = conn_new(item->sfd, item->init_state, item->event_flags, + item->read_buffer_size, item->transport, + me->base); + if (c == NULL) { + if (IS_UDP(item->transport)) { + fprintf(stderr, "Can't listen for events on UDP socket\n"); + exit(1); + } else { + if (settings.verbose > 0) { + fprintf(stderr, "Can't listen for events on fd %d\n", + item->sfd); + } + close(item->sfd); } - close(item->sfd); + } else { + c->thread = me; } - } else { - c->thread = me; - } - cqi_free(item); - } - break; - case 'r': - item = cq_pop(me->new_conn_queue); + break; - if (NULL != item) { - conn_worker_readd(item->c); - cqi_free(item); + case queue_redispatch: + conn_worker_readd(item->c); + break; } + cqi_free(item); break; /* we were told to pause and report in */ case 'p': @@ -448,6 +455,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags, item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->transport = transport; + item->mode = queue_new_conn; cq_push(thread->new_conn_queue, item); @@ -475,10 +483,11 @@ void redispatch_conn(conn *c) { item->sfd = c->sfd; item->init_state = conn_new_cmd; item->c = c; + item->mode = queue_redispatch; cq_push(thread->new_conn_queue, item); - buf[0] = 'r'; + buf[0] = 'c'; if (write(thread->notify_send_fd, buf, 1) != 1) { perror("Writing to thread notify pipe"); } |