summaryrefslogtreecommitdiff
path: root/bufferevent_async.c
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2009-11-17 20:31:09 +0000
committerNick Mathewson <nickm@torproject.org>2009-11-17 20:31:09 +0000
commitd7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd (patch)
treed02c3c5d7a4a50299aeaae5c3b7ea41d7cce6945 /bufferevent_async.c
parent201d8d0bafeb2ba1388746ed745cd5d8defb3689 (diff)
downloadlibevent-d7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd.tar.gz
Move responsibility for IOCP callback into bufferevent_async.
This patch from Chris Davis saves some callback depth, and adds proper ref-counting to bufferevents when there's a deferred evbuffer callback inflight. It could use a couple more comments to really nail down what its invariants are. svn:r1543
Diffstat (limited to 'bufferevent_async.c')
-rw-r--r--bufferevent_async.c178
1 files changed, 135 insertions, 43 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c
index d34051ce..a8e92b70 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -80,8 +80,11 @@ const struct bufferevent_ops bufferevent_ops_async = {
struct bufferevent_async {
struct bufferevent_private bev;
struct event_overlapped connect_overlapped;
+ struct event_overlapped read_overlapped;
+ struct event_overlapped write_overlapped;
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
+ unsigned ok : 1;
};
static inline struct bufferevent_async *
@@ -91,16 +94,33 @@ upcast(struct bufferevent *bev)
if (bev->be_ops != &bufferevent_ops_async)
return NULL;
bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
- EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
return bev_a;
}
static inline struct bufferevent_async *
-upcast_overlapped(struct event_overlapped *eo)
+upcast_connect(struct event_overlapped *eo)
{
struct bufferevent_async *bev_a;
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
- EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
+ EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
+ return bev_a;
+}
+
+static inline struct bufferevent_async *
+upcast_read(struct event_overlapped *eo)
+{
+ struct bufferevent_async *bev_a;
+ bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
+ EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
+ return bev_a;
+}
+
+static inline struct bufferevent_async *
+upcast_write(struct event_overlapped *eo)
+{
+ struct bufferevent_async *bev_a;
+ bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
+ EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
return bev_a;
}
@@ -109,14 +129,15 @@ bev_async_consider_writing(struct bufferevent_async *b)
{
/* Don't write if there's a write in progress, or we do not
* want to write. */
- if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
+ if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
return;
/* Don't write if there's nothing to write */
if (!evbuffer_get_length(b->bev.bev.output))
return;
/* XXXX doesn't respect low-water mark very well. */
- if (evbuffer_launch_write(b->bev.bev.output, -1)) {
+ if (evbuffer_launch_write(b->bev.bev.output, -1,
+ &b->write_overlapped)) {
EVUTIL_ASSERT(0);/* XXX act sensibly. */
} else {
b->write_in_progress = 1;
@@ -131,7 +152,7 @@ bev_async_consider_reading(struct bufferevent_async *b)
size_t at_most;
/* Don't read if there is a read in progress, or we do not
* want to read. */
- if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
+ if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
return;
/* Don't read if we're full */
@@ -145,7 +166,8 @@ bev_async_consider_reading(struct bufferevent_async *b)
at_most = 16384; /* FIXME totally magic. */
}
- if (evbuffer_launch_read(b->bev.bev.input, at_most)) {
+ if (evbuffer_launch_read(b->bev.bev.input, at_most,
+ &b->read_overlapped)) {
EVUTIL_ASSERT(0);
} else {
b->read_in_progress = 1;
@@ -159,26 +181,15 @@ be_async_outbuf_callback(struct evbuffer *buf,
{
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);
- /* If we successfully wrote from the outbuf, or we added data to the
- * outbuf and were not writing before, we may want to write now. */
+
+ /* If we added data to the outbuf and were not writing before,
+ * we may want to write now. */
_bufferevent_incref_and_lock(bev);
- if (cbinfo->n_deleted) {
- /* XXXX can't detect 0-length write completion */
- bev_async->write_in_progress = 0;
- }
- if (cbinfo->n_added || cbinfo->n_deleted)
+ if (cbinfo->n_added)
bev_async_consider_writing(bev_async);
- if (cbinfo->n_deleted) {
- BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
-
- if (bev->writecb != NULL &&
- evbuffer_get_length(bev->output) <= bev->wm_write.low)
- _bufferevent_run_writecb(bev);
- }
-
_bufferevent_decref_and_unlock(bev);
}
@@ -190,26 +201,14 @@ be_async_inbuf_callback(struct evbuffer *buf,
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);
- /* If we successfully read into the inbuf, or we drained data from
- * the inbuf and were not reading before, we may want to read now */
+ /* If we drained data from the inbuf and were not reading before,
+ * we may want to read now */
_bufferevent_incref_and_lock(bev);
- if (cbinfo->n_added) {
- /* XXXX can't detect 0-length read completion */
- bev_async->read_in_progress = 0;
- }
- if (cbinfo->n_added || cbinfo->n_deleted)
+ if (cbinfo->n_deleted)
bev_async_consider_reading(bev_async);
- if (cbinfo->n_added) {
- BEV_RESET_GENERIC_READ_TIMEOUT(bev);
-
- if (evbuffer_get_length(bev->input) >= bev->wm_read.low &&
- bev->readcb != NULL)
- _bufferevent_run_readcb(bev);
- }
-
_bufferevent_decref_and_unlock(bev);
}
@@ -218,6 +217,10 @@ be_async_enable(struct bufferevent *buf, short what)
{
struct bufferevent_async *bev_async = upcast(buf);
+ if (!bev_async->ok)
+ return -1;
+
+ /* NOTE: This interferes with non-blocking connect */
_bufferevent_generic_adj_timeouts(buf);
/* If we newly enable reading or writing, and we aren't reading or
@@ -245,6 +248,17 @@ be_async_disable(struct bufferevent *bev, short what)
static void
be_async_destruct(struct bufferevent *bev)
{
+ struct bufferevent_private *bev_p = BEV_UPCAST(bev);
+ evutil_socket_t fd;
+
+ EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
+
+ /* XXX cancel any outstanding I/O operations */
+ fd = _evbuffer_overlapped_get_fd(bev->input);
+ /* delete this in case non-blocking connect was used */
+ event_del(&bev->ev_write);
+ if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
+ EVUTIL_CLOSESOCKET(fd);
_bufferevent_del_generic_timeout_cbs(bev);
}
@@ -259,20 +273,87 @@ static void
connect_complete(struct event_overlapped *eo, uintptr_t key,
ev_ssize_t nbytes, int ok)
{
- struct bufferevent_async *bev_a = upcast_overlapped(eo);
- struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */
+ struct bufferevent_async *bev_a = upcast_connect(eo);
+ struct bufferevent *bev = &bev_a->bev.bev;
_bufferevent_incref_and_lock(bev);
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
+ bufferevent_async_set_connected(bev);
_bufferevent_run_eventcb(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
_bufferevent_decref_and_unlock(bev);
}
+static void
+read_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ struct bufferevent_async *bev_a = upcast_read(eo);
+ struct bufferevent *bev = &bev_a->bev.bev;
+ short what = BEV_EVENT_READING;
+
+ _bufferevent_incref_and_lock(bev);
+ EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
+
+ evbuffer_commit_read(bev->input, nbytes);
+ bev_a->read_in_progress = 0;
+
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+ if (bev->readcb != NULL &&
+ evbuffer_get_length(bev->input) >= bev->wm_read.low)
+ _bufferevent_run_readcb(bev);
+ bev_async_consider_reading(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
+
+ _bufferevent_decref_and_unlock(bev);
+}
+
+static void
+write_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ struct bufferevent_async *bev_a = upcast_write(eo);
+ struct bufferevent *bev = &bev_a->bev.bev;
+ short what = BEV_EVENT_WRITING;
+
+ _bufferevent_incref_and_lock(bev);
+ EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
+
+ evbuffer_commit_write(bev->output, nbytes);
+ bev_a->write_in_progress = 0;
+
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+ if (bev->writecb != NULL &&
+ evbuffer_get_length(bev->output) <= bev->wm_write.low)
+ _bufferevent_run_writecb(bev);
+ bev_async_consider_writing(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
+
+ _bufferevent_decref_and_unlock(bev);
+}
+
struct bufferevent *
bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
@@ -318,10 +399,11 @@ bufferevent_async_new(struct event_base *base,
evbuffer_defer_callbacks(bev->input, base);
evbuffer_defer_callbacks(bev->output, base);
- evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
- _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
-
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
+ event_overlapped_init(&bev_a->read_overlapped, read_complete);
+ event_overlapped_init(&bev_a->write_overlapped, write_complete);
+
+ bev_a->ok = fd >= 0;
return bev;
err:
@@ -329,6 +411,16 @@ err:
return NULL;
}
+void
+bufferevent_async_set_connected(struct bufferevent *bev)
+{
+ struct bufferevent_async *bev_async = upcast(bev);
+ bev_async->ok = 1;
+ _bufferevent_init_generic_timeout_cbs(bev);
+ /* Now's a good time to consider reading/writing */
+ be_async_enable(bev, bev->enabled);
+}
+
int
bufferevent_async_can_connect(struct bufferevent *bev)
{
@@ -369,7 +461,7 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
sin6->sin6_family = AF_INET6;
sin6->sin6_addr = in6addr_any;
} else {
- /* XXX: what to do? */
+ /* Well, the user will have to bind() */
return -1;
}
if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&