summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2017-04-25 11:36:21 -0700
committerdormando <dormando@rydia.net>2017-05-21 14:24:34 -0700
commitc053584ad1e84e0974e95b4a1a3bd40cd777baa0 (patch)
tree9a63ee4476d2a888d93f5d883056e9ee7b23f828 /thread.c
parentae84d771811394a9f6f2ff12022707f69bdc311f (diff)
downloadmemcached-c053584ad1e84e0974e95b4a1a3bd40cd777baa0.tar.gz
fix ordering issue in conn dispatch
very old race condition: push to queue, write to signal pipe. with extstore, hammering a secondary item type into the queue could occasionally swap the item with the type it represented. now differentiate on the type, and the character signifies either to pull an item or do something else entirely.
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c59
1 files changed, 34 insertions, 25 deletions
diff --git a/thread.c b/thread.c
index 45b82ac..0656cb5 100644
--- a/thread.c
+++ b/thread.c
@@ -17,6 +17,10 @@
#define ITEMS_PER_ALLOC 64
/* An item in the connection queue. */
+enum conn_queue_item_modes {
+ queue_new_conn, /* brand new connection. */
+ queue_redispatch, /* redispatching from side thread */
+};
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int sfd;
@@ -24,6 +28,7 @@ struct conn_queue_item {
int event_flags;
int read_buffer_size;
enum network_transport transport;
+ enum conn_queue_item_modes mode;
conn *c;
CQ_ITEM *next;
};
@@ -361,6 +366,7 @@ static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
+ conn *c;
unsigned int timeout_fd;
if (read(fd, buf, 1) != 1) {
@@ -373,34 +379,35 @@ static void thread_libevent_process(int fd, short which, void *arg) {
case 'c':
item = cq_pop(me->new_conn_queue);
- if (NULL != item) {
- conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
- item->read_buffer_size, item->transport,
- me->base);
- if (c == NULL) {
- if (IS_UDP(item->transport)) {
- fprintf(stderr, "Can't listen for events on UDP socket\n");
- exit(1);
- } else {
- if (settings.verbose > 0) {
- fprintf(stderr, "Can't listen for events on fd %d\n",
- item->sfd);
+ if (NULL == item) {
+ break;
+ }
+ switch (item->mode) {
+ case queue_new_conn:
+ c = conn_new(item->sfd, item->init_state, item->event_flags,
+ item->read_buffer_size, item->transport,
+ me->base);
+ if (c == NULL) {
+ if (IS_UDP(item->transport)) {
+ fprintf(stderr, "Can't listen for events on UDP socket\n");
+ exit(1);
+ } else {
+ if (settings.verbose > 0) {
+ fprintf(stderr, "Can't listen for events on fd %d\n",
+ item->sfd);
+ }
+ close(item->sfd);
}
- close(item->sfd);
+ } else {
+ c->thread = me;
}
- } else {
- c->thread = me;
- }
- cqi_free(item);
- }
- break;
- case 'r':
- item = cq_pop(me->new_conn_queue);
+ break;
- if (NULL != item) {
- conn_worker_readd(item->c);
- cqi_free(item);
+ case queue_redispatch:
+ conn_worker_readd(item->c);
+ break;
}
+ cqi_free(item);
break;
/* we were told to pause and report in */
case 'p':
@@ -448,6 +455,7 @@ void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
item->event_flags = event_flags;
item->read_buffer_size = read_buffer_size;
item->transport = transport;
+ item->mode = queue_new_conn;
cq_push(thread->new_conn_queue, item);
@@ -475,10 +483,11 @@ void redispatch_conn(conn *c) {
item->sfd = c->sfd;
item->init_state = conn_new_cmd;
item->c = c;
+ item->mode = queue_redispatch;
cq_push(thread->new_conn_queue, item);
- buf[0] = 'r';
+ buf[0] = 'c';
if (write(thread->notify_send_fd, buf, 1) != 1) {
perror("Writing to thread notify pipe");
}