diff options
author | Nick Mathewson <nickm@torproject.org> | 2009-11-17 20:31:09 +0000 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2009-11-17 20:31:09 +0000 |
commit | d7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd (patch) | |
tree | d02c3c5d7a4a50299aeaae5c3b7ea41d7cce6945 /bufferevent_async.c | |
parent | 201d8d0bafeb2ba1388746ed745cd5d8defb3689 (diff) | |
download | libevent-d7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd.tar.gz |
Move responsibility for IOCP callback into bufferevent_async.
This patch from Chris Davis saves some callback depth, and adds proper
ref-counting to bufferevents when there's a deferred evbuffer callback
inflight. It could use a couple more comments to really nail down what
its invariants are.
svn:r1543
Diffstat (limited to 'bufferevent_async.c')
-rw-r--r-- | bufferevent_async.c | 178 |
1 files changed, 135 insertions, 43 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c index d34051ce..a8e92b70 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -80,8 +80,11 @@ const struct bufferevent_ops bufferevent_ops_async = { struct bufferevent_async { struct bufferevent_private bev; struct event_overlapped connect_overlapped; + struct event_overlapped read_overlapped; + struct event_overlapped write_overlapped; unsigned read_in_progress : 1; unsigned write_in_progress : 1; + unsigned ok : 1; }; static inline struct bufferevent_async * @@ -91,16 +94,33 @@ upcast(struct bufferevent *bev) if (bev->be_ops != &bufferevent_ops_async) return NULL; bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); - EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async); return bev_a; } static inline struct bufferevent_async * -upcast_overlapped(struct event_overlapped *eo) +upcast_connect(struct event_overlapped *eo) { struct bufferevent_async *bev_a; bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); - EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async); + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); + return bev_a; +} + +static inline struct bufferevent_async * +upcast_read(struct event_overlapped *eo) +{ + struct bufferevent_async *bev_a; + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); + return bev_a; +} + +static inline struct bufferevent_async * +upcast_write(struct event_overlapped *eo) +{ + struct bufferevent_async *bev_a; + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); return bev_a; } @@ -109,14 +129,15 @@ bev_async_consider_writing(struct bufferevent_async *b) { /* Don't write if there's a write in progress, or we do not * want to write. */ - if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) + if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) return; /* Don't write if there's nothing to write */ if (!evbuffer_get_length(b->bev.bev.output)) return; /* XXXX doesn't respect low-water mark very well. */ - if (evbuffer_launch_write(b->bev.bev.output, -1)) { + if (evbuffer_launch_write(b->bev.bev.output, -1, + &b->write_overlapped)) { EVUTIL_ASSERT(0);/* XXX act sensibly. */ } else { b->write_in_progress = 1; @@ -131,7 +152,7 @@ bev_async_consider_reading(struct bufferevent_async *b) size_t at_most; /* Don't read if there is a read in progress, or we do not * want to read. */ - if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) + if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) return; /* Don't read if we're full */ @@ -145,7 +166,8 @@ bev_async_consider_reading(struct bufferevent_async *b) at_most = 16384; /* FIXME totally magic. */ } - if (evbuffer_launch_read(b->bev.bev.input, at_most)) { + if (evbuffer_launch_read(b->bev.bev.input, at_most, + &b->read_overlapped)) { EVUTIL_ASSERT(0); } else { b->read_in_progress = 1; @@ -159,26 +181,15 @@ be_async_outbuf_callback(struct evbuffer *buf, { struct bufferevent *bev = arg; struct bufferevent_async *bev_async = upcast(bev); - /* If we successfully wrote from the outbuf, or we added data to the - * outbuf and were not writing before, we may want to write now. */ + + /* If we added data to the outbuf and were not writing before, + * we may want to write now. */ _bufferevent_incref_and_lock(bev); - if (cbinfo->n_deleted) { - /* XXXX can't detect 0-length write completion */ - bev_async->write_in_progress = 0; - } - if (cbinfo->n_added || cbinfo->n_deleted) + if (cbinfo->n_added) bev_async_consider_writing(bev_async); - if (cbinfo->n_deleted) { - BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - - if (bev->writecb != NULL && - evbuffer_get_length(bev->output) <= bev->wm_write.low) - _bufferevent_run_writecb(bev); - } - _bufferevent_decref_and_unlock(bev); } @@ -190,26 +201,14 @@ be_async_inbuf_callback(struct evbuffer *buf, struct bufferevent *bev = arg; struct bufferevent_async *bev_async = upcast(bev); - /* If we successfully read into the inbuf, or we drained data from - * the inbuf and were not reading before, we may want to read now */ + /* If we drained data from the inbuf and were not reading before, + * we may want to read now */ _bufferevent_incref_and_lock(bev); - if (cbinfo->n_added) { - /* XXXX can't detect 0-length read completion */ - bev_async->read_in_progress = 0; - } - if (cbinfo->n_added || cbinfo->n_deleted) + if (cbinfo->n_deleted) bev_async_consider_reading(bev_async); - if (cbinfo->n_added) { - BEV_RESET_GENERIC_READ_TIMEOUT(bev); - - if (evbuffer_get_length(bev->input) >= bev->wm_read.low && - bev->readcb != NULL) - _bufferevent_run_readcb(bev); - } - _bufferevent_decref_and_unlock(bev); } @@ -218,6 +217,10 @@ be_async_enable(struct bufferevent *buf, short what) { struct bufferevent_async *bev_async = upcast(buf); + if (!bev_async->ok) + return -1; + + /* NOTE: This interferes with non-blocking connect */ _bufferevent_generic_adj_timeouts(buf); /* If we newly enable reading or writing, and we aren't reading or @@ -245,6 +248,17 @@ be_async_disable(struct bufferevent *bev, short what) static void be_async_destruct(struct bufferevent *bev) { + struct bufferevent_private *bev_p = BEV_UPCAST(bev); + evutil_socket_t fd; + + EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress); + + /* XXX cancel any outstanding I/O operations */ + fd = _evbuffer_overlapped_get_fd(bev->input); + /* delete this in case non-blocking connect was used */ + event_del(&bev->ev_write); + if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) + EVUTIL_CLOSESOCKET(fd); _bufferevent_del_generic_timeout_cbs(bev); } @@ -259,20 +273,87 @@ static void connect_complete(struct event_overlapped *eo, uintptr_t key, ev_ssize_t nbytes, int ok) { - struct bufferevent_async *bev_a = upcast_overlapped(eo); - struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */ + struct bufferevent_async *bev_a = upcast_connect(eo); + struct bufferevent *bev = &bev_a->bev.bev; _bufferevent_incref_and_lock(bev); EVUTIL_ASSERT(bev_a->bev.connecting); bev_a->bev.connecting = 0; + bufferevent_async_set_connected(bev); _bufferevent_run_eventcb(bev, ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); _bufferevent_decref_and_unlock(bev); } +static void +read_complete(struct event_overlapped *eo, uintptr_t key, + ev_ssize_t nbytes, int ok) +{ + struct bufferevent_async *bev_a = upcast_read(eo); + struct bufferevent *bev = &bev_a->bev.bev; + short what = BEV_EVENT_READING; + + _bufferevent_incref_and_lock(bev); + EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress); + + evbuffer_commit_read(bev->input, nbytes); + bev_a->read_in_progress = 0; + + if (ok && nbytes) { + BEV_RESET_GENERIC_READ_TIMEOUT(bev); + if (bev->readcb != NULL && + evbuffer_get_length(bev->input) >= bev->wm_read.low) + _bufferevent_run_readcb(bev); + bev_async_consider_reading(bev_a); + } else if (!ok) { + what |= BEV_EVENT_ERROR; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } else if (!nbytes) { + what |= BEV_EVENT_EOF; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } + + _bufferevent_decref_and_unlock(bev); +} + +static void +write_complete(struct event_overlapped *eo, uintptr_t key, + ev_ssize_t nbytes, int ok) +{ + struct bufferevent_async *bev_a = upcast_write(eo); + struct bufferevent *bev = &bev_a->bev.bev; + short what = BEV_EVENT_WRITING; + + _bufferevent_incref_and_lock(bev); + EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress); + + evbuffer_commit_write(bev->output, nbytes); + bev_a->write_in_progress = 0; + + if (ok && nbytes) { + BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); + if (bev->writecb != NULL && + evbuffer_get_length(bev->output) <= bev->wm_write.low) + _bufferevent_run_writecb(bev); + bev_async_consider_writing(bev_a); + } else if (!ok) { + what |= BEV_EVENT_ERROR; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } else if (!nbytes) { + what |= BEV_EVENT_EOF; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } + + _bufferevent_decref_and_unlock(bev); +} + struct bufferevent * bufferevent_async_new(struct event_base *base, evutil_socket_t fd, int options) @@ -318,10 +399,11 @@ bufferevent_async_new(struct event_base *base, evbuffer_defer_callbacks(bev->input, base); evbuffer_defer_callbacks(bev->output, base); - evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); - _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev); - event_overlapped_init(&bev_a->connect_overlapped, connect_complete); + event_overlapped_init(&bev_a->read_overlapped, read_complete); + event_overlapped_init(&bev_a->write_overlapped, write_complete); + + bev_a->ok = fd >= 0; return bev; err: @@ -329,6 +411,16 @@ err: return NULL; } +void +bufferevent_async_set_connected(struct bufferevent *bev) +{ + struct bufferevent_async *bev_async = upcast(bev); + bev_async->ok = 1; + _bufferevent_init_generic_timeout_cbs(bev); + /* Now's a good time to consider reading/writing */ + be_async_enable(bev, bev->enabled); +} + int bufferevent_async_can_connect(struct bufferevent *bev) { @@ -369,7 +461,7 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, sin6->sin6_family = AF_INET6; sin6->sin6_addr = in6addr_any; } else { - /* XXX: what to do? */ + /* Well, the user will have to bind() */ return -1; } if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && |