summaryrefslogtreecommitdiff
path: root/memcached.c
diff options
context:
space:
mode:
authordormando <dormando@rydia.net>2020-10-20 17:16:05 -0700
committerdormando <dormando@rydia.net>2020-10-30 15:50:12 -0700
commit5d4785936e3e8937047daafd874c792668dc8528 (patch)
treeb39b15e5369faea0c2cf9b13e2613a2ed13ecf7e /memcached.c
parent1ca5cdce9d403b4130e35dc1c99c6a1ea15f946f (diff)
downloadmemcached-5d4785936e3e8937047daafd874c792668dc8528.tar.gz
queue: replace c->io_pending to avoid a mutex
since multiple queues can be sent to different sidethreads, we need a new mechanism for knowing when to return everything. In the common case only one queue will be active, so adding a mutex would be excessive.
Diffstat (limited to 'memcached.c')
-rw-r--r--memcached.c50
1 files changed, 26 insertions, 24 deletions
diff --git a/memcached.c b/memcached.c
index 94597ea..1c5fce0 100644
--- a/memcached.c
+++ b/memcached.c
@@ -517,6 +517,14 @@ void conn_close_idle(conn *c) {
/* bring conn back from a sidethread. could have had its event base moved. */
void conn_worker_readd(conn *c) {
+ if (c->state == conn_io_queue) {
+ c->io_queues_submitted--;
+ // If we're still waiting for other queues to return, don't re-add the
+ // connection yet.
+ if (c->io_queues_submitted != 0) {
+ return;
+ }
+ }
c->ev_flags = EV_READ | EV_PERSIST;
event_set(&c->event, c->sfd, c->ev_flags, event_handler, (void *)c);
event_base_set(c->thread->base, &c->event);
@@ -530,13 +538,8 @@ void conn_worker_readd(conn *c) {
if (c->state == conn_closing) {
drive_machine(c);
return;
- }
-
- // If we had IO objects, process
- if (c->io_queued) {
- c->io_queued = false;
- // state should be conn_io_queue already, so it will know how to
- // dequeue and finalize the async work.
+ } else if (c->state == conn_io_queue) {
+ // machine will know how to return based on secondary state.
drive_machine(c);
} else {
conn_set_state(c, conn_new_cmd);
@@ -692,7 +695,7 @@ conn *conn_new(const int sfd, enum conn_states init_state,
c->last_cmd_time = current_time; /* initialize for idle kicker */
// wipe all queues.
memset(c->io_queues, 0, sizeof(c->io_queues));
- c->io_pending = 0;
+ c->io_queues_submitted = 0;
c->item = 0;
@@ -3171,26 +3174,25 @@ static void drive_machine(conn *c) {
* remove the connection from the worker thread and dispatch the
* IO queue
*/
- if (c->io_pending) {
- assert(c->io_queued == false);
- conn_set_state(c, conn_io_queue);
- event_del(&c->event);
- c->io_queued = true;
- // TODO: write as for loop?
- /*io_queue_t *q = c->io_queues;
- while (q->type != IO_QUEUE_NONE) {
- if (q->stack_ctx != NULL) {
- q->submit_cb(q->ctx, q->stack_ctx);
- }
- q++;
- }*/
+ if (c->io_queues[0].type != IO_QUEUE_NONE) {
+ assert(c->io_queues_submitted == 0);
+ bool hit = false;
+
for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
- if (q->stack_ctx != NULL) {
+ if (q->count != 0) {
+ assert(q->stack_ctx != NULL);
+ hit = true;
q->submit_cb(q->ctx, q->stack_ctx);
+ c->io_queues_submitted++;
}
}
- stop = true;
- break;
+ if (hit) {
+ conn_set_state(c, conn_io_queue);
+ event_del(&c->event);
+
+ stop = true;
+ break;
+ }
}
switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {