summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Davis <chrisd@torproject.org>2010-08-17 05:02:00 -0700
committerChristopher Davis <chrisd@torproject.org>2010-09-08 01:22:22 -0700
commit76f7e7ae745ad730795f0a4a3bc1299a00137cc2 (patch)
tree5327da1a718cc561e989fee28af0bbafc2e2ee92
parentd844242f9b138be4896942ded25d74a91bf29901 (diff)
downloadlibevent-76f7e7ae745ad730795f0a4a3bc1299a00137cc2.tar.gz
Some IOCP bufferevent tweaks.
- Increment reference count of bufferevents before initiating overlapped operations to prevent the destructor from being called while operations are pending. The only portable way of canceling overlapped ops is to close the socket. - Translate error codes to WSA* codes. - Better handling of errors. - Add an interface to add and del "virtual" events. Because IOCP bufferevents don't register any events with the base, the event loop has no way of knowing they exist. This causes the loop to terminate prematurely. event_base_{add,del}_virtual increment/decrement base's event count so the loop runs while there are any enabled IOCP bufferevents.
-rw-r--r--bufferevent_async.c254
-rw-r--r--event-internal.h6
-rw-r--r--event.c18
-rw-r--r--event_iocp.c2
4 files changed, 211 insertions, 69 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 23b636d2..fa1bbc92 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -48,11 +48,15 @@
#include <ws2tcpip.h>
#endif
+#include <sys/queue.h>
+
#include "event2/util.h"
#include "event2/bufferevent.h"
#include "event2/buffer.h"
#include "event2/bufferevent_struct.h"
#include "event2/event.h"
+#include "event2/util.h"
+#include "event-internal.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
@@ -74,6 +78,8 @@ struct bufferevent_async {
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
unsigned ok : 1;
+ unsigned read_added : 1;
+ unsigned write_added : 1;
};
const struct bufferevent_ops bufferevent_ops_async = {
@@ -125,74 +131,143 @@ upcast_write(struct event_overlapped *eo)
}
static void
-bev_async_consider_writing(struct bufferevent_async *b)
+bev_async_del_write(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (beva->write_added) {
+ beva->write_added = 0;
+ event_base_del_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_del_read(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (beva->read_added) {
+ beva->read_added = 0;
+ event_base_del_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_add_write(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (!beva->write_added) {
+ beva->write_added = 1;
+ event_base_add_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_add_read(struct bufferevent_async *beva)
+{
+ struct bufferevent *bev = &beva->bev.bev;
+
+ if (!beva->read_added) {
+ beva->read_added = 1;
+ event_base_add_virtual(bev->ev_base);
+ }
+}
+
+static void
+bev_async_consider_writing(struct bufferevent_async *beva)
{
size_t at_most;
int limit;
+ struct bufferevent *bev = &beva->bev.bev;
+
/* Don't write if there's a write in progress, or we do not
- * want to write. */
- if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
+ * want to write, or when there's nothing left to write. */
+ if (beva->write_in_progress)
return;
- /* Don't write if there's nothing to write */
- if (!evbuffer_get_length(b->bev.bev.output))
+ if (!beva->ok || !(bev->enabled&EV_WRITE) ||
+ !evbuffer_get_length(bev->output)) {
+ bev_async_del_write(beva);
return;
+ }
- at_most = evbuffer_get_length(b->bev.bev.output);
+ at_most = evbuffer_get_length(bev->output);
/* XXXX This over-commits. */
- limit = _bufferevent_get_write_max(&b->bev);
+ limit = _bufferevent_get_write_max(&beva->bev);
if (at_most >= limit)
at_most = limit;
- if (b->bev.write_suspended)
+ if (beva->bev.write_suspended) {
+ bev_async_del_write(beva);
return;
+ }
/* XXXX doesn't respect low-water mark very well. */
- if (evbuffer_launch_write(b->bev.bev.output, at_most,
- &b->write_overlapped)) {
- EVUTIL_ASSERT(0);/* XXX act sensibly. */
+ bufferevent_incref(bev);
+ if (evbuffer_launch_write(bev->output, at_most,
+ &beva->write_overlapped)) {
+ bufferevent_decref(bev);
+ beva->ok = 0;
+ _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
- b->write_in_progress = 1;
+ beva->write_in_progress = 1;
+ bev_async_add_write(beva);
}
}
static void
-bev_async_consider_reading(struct bufferevent_async *b)
+bev_async_consider_reading(struct bufferevent_async *beva)
{
size_t cur_size;
size_t read_high;
size_t at_most;
int limit;
+ struct bufferevent *bev = &beva->bev.bev;
+
/* Don't read if there is a read in progress, or we do not
* want to read. */
- if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
+ if (beva->read_in_progress)
+ return;
+ if (!beva->ok || !(bev->enabled&EV_READ)) {
+ bev_async_del_read(beva);
return;
+ }
/* Don't read if we're full */
- cur_size = evbuffer_get_length(b->bev.bev.input);
- read_high = b->bev.bev.wm_read.high;
+ cur_size = evbuffer_get_length(bev->input);
+ read_high = bev->wm_read.high;
if (read_high) {
- if (cur_size >= read_high)
+ if (cur_size >= read_high) {
+ bev_async_del_read(beva);
return;
+ }
at_most = read_high - cur_size;
} else {
at_most = 16384; /* FIXME totally magic. */
}
/* XXXX This over-commits. */
- limit = _bufferevent_get_read_max(&b->bev);
+ limit = _bufferevent_get_read_max(&beva->bev);
if (at_most >= limit)
at_most = limit;
- if (b->bev.read_suspended)
+ if (beva->bev.read_suspended) {
+ bev_async_del_read(beva);
return;
+ }
- if (evbuffer_launch_read(b->bev.bev.input, at_most,
- &b->read_overlapped)) {
- EVUTIL_ASSERT(0);
+ bufferevent_incref(bev);
+ if (evbuffer_launch_read(bev->input, at_most, &beva->read_overlapped)) {
+ beva->ok = 0;
+ bufferevent_decref(bev);
+ _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
} else {
- b->read_in_progress = 1;
+ beva->read_in_progress = 1;
+ bev_async_add_read(beva);
}
+
+ return;
}
static void
@@ -260,14 +335,19 @@ be_async_enable(struct bufferevent *buf, short what)
static int
be_async_disable(struct bufferevent *bev, short what)
{
+ struct bufferevent_async *bev_async = upcast(bev);
/* XXXX If we disable reading or writing, we may want to consider
* canceling any in-progress read or write operation, though it might
* not work. */
- if (what & EV_READ)
+ if (what & EV_READ) {
BEV_DEL_GENERIC_READ_TIMEOUT(bev);
- if (what & EV_WRITE)
+ bev_async_del_read(bev_async);
+ }
+ if (what & EV_WRITE) {
BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
+ bev_async_del_write(bev_async);
+ }
return 0;
}
@@ -275,18 +355,36 @@ be_async_disable(struct bufferevent *bev, short what)
static void
be_async_destruct(struct bufferevent *bev)
{
+ struct bufferevent_async *bev_async = upcast(bev);
struct bufferevent_private *bev_p = BEV_UPCAST(bev);
evutil_socket_t fd;
- EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
+ EVUTIL_ASSERT(!upcast(bev)->write_in_progress &&
+ !upcast(bev)->read_in_progress);
+
+ bev_async_del_read(bev_async);
+ bev_async_del_write(bev_async);
- /* XXX cancel any outstanding I/O operations */
fd = _evbuffer_overlapped_get_fd(bev->input);
- /* delete this in case non-blocking connect was used */
- event_del(&bev->ev_write);
if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
evutil_closesocket(fd);
- _bufferevent_del_generic_timeout_cbs(bev);
+ /* delete this in case non-blocking connect was used */
+ if (event_initialized(&bev->ev_write)) {
+ event_del(&bev->ev_write);
+ _bufferevent_del_generic_timeout_cbs(bev);
+ }
+}
+
+/* GetQueuedCompletionStatus doesn't reliably yield WSA error codes, so
+ * we use WSAGetOverlappedResult to translate. */
+static void
+bev_async_set_wsa_error(struct bufferevent *bev, struct event_overlapped *eo)
+{
+ DWORD bytes, flags;
+ evutil_socket_t fd;
+
+ fd = _evbuffer_overlapped_get_fd(bev->input);
+ WSAGetOverlappedResult(fd, &eo->overlapped, &bytes, FALSE, &flags);
}
static int
@@ -303,15 +401,22 @@ connect_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent_async *bev_a = upcast_connect(eo);
struct bufferevent *bev = &bev_a->bev.bev;
- _bufferevent_incref_and_lock(bev);
+ BEV_LOCK(bev);
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
+ event_base_del_virtual(bev->ev_base);
+
+ if (ok)
+ bufferevent_async_set_connected(bev);
+ else
+ bev_async_set_wsa_error(bev, eo);
- bufferevent_async_set_connected(bev);
_bufferevent_run_eventcb(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
+ event_base_del_virtual(bev->ev_base);
+
_bufferevent_decref_and_unlock(bev);
}
@@ -323,26 +428,32 @@ read_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_READING;
- _bufferevent_incref_and_lock(bev);
- EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
+ BEV_LOCK(bev);
+ EVUTIL_ASSERT(bev_a->read_in_progress);
evbuffer_commit_read(bev->input, nbytes);
bev_a->read_in_progress = 0;
- if (ok && nbytes) {
- BEV_RESET_GENERIC_READ_TIMEOUT(bev);
- _bufferevent_decrement_read_buckets(&bev_a->bev, nbytes);
- if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
- _bufferevent_run_readcb(bev);
- bev_async_consider_reading(bev_a);
- } else if (!ok) {
- what |= BEV_EVENT_ERROR;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
- } else if (!nbytes) {
- what |= BEV_EVENT_EOF;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
+ if (!ok)
+ bev_async_set_wsa_error(bev, eo);
+
+ if (bev_a->ok) {
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+ _bufferevent_decrement_read_buckets(&bev_a->bev,
+ nbytes);
+ if (evbuffer_get_length(bev->input) >= bev->wm_read.low)
+ _bufferevent_run_readcb(bev);
+ bev_async_consider_reading(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
}
_bufferevent_decref_and_unlock(bev);
@@ -356,26 +467,32 @@ write_complete(struct event_overlapped *eo, ev_uintptr_t key,
struct bufferevent *bev = &bev_a->bev.bev;
short what = BEV_EVENT_WRITING;
- _bufferevent_incref_and_lock(bev);
- EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
-
+ BEV_LOCK(bev);
+ EVUTIL_ASSERT(bev_a->write_in_progress);
evbuffer_commit_write(bev->output, nbytes);
bev_a->write_in_progress = 0;
- if (ok && nbytes) {
- BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
- _bufferevent_decrement_write_buckets(&bev_a->bev, nbytes);
- if (evbuffer_get_length(bev->output) <= bev->wm_write.low)
- _bufferevent_run_writecb(bev);
- bev_async_consider_writing(bev_a);
- } else if (!ok) {
- what |= BEV_EVENT_ERROR;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
- } else if (!nbytes) {
- what |= BEV_EVENT_EOF;
- bev_a->ok = 0;
- _bufferevent_run_eventcb(bev, what);
+ if (!ok)
+ bev_async_set_wsa_error(bev, eo);
+
+ if (bev_a->ok) {
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+ _bufferevent_decrement_write_buckets(&bev_a->bev,
+ nbytes);
+ if (evbuffer_get_length(bev->output) <=
+ bev->wm_write.low)
+ _bufferevent_run_writecb(bev);
+ bev_async_consider_writing(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
}
_bufferevent_decref_and_unlock(bev);
@@ -423,8 +540,6 @@ bufferevent_async_new(struct event_base *base,
evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
evbuffer_add_cb(bev->output, be_async_outbuf_callback, bev);
- evbuffer_defer_callbacks(bev->input, base);
- evbuffer_defer_callbacks(bev->output, base);
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
event_overlapped_init(&bev_a->read_overlapped, read_complete);
@@ -497,11 +612,16 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
WSAGetLastError() != WSAEINVAL)
return -1;
+ event_base_add_virtual(bev->ev_base);
+ bufferevent_incref(bev);
rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
&bev_async->connect_overlapped.overlapped);
if (rc || WSAGetLastError() == ERROR_IO_PENDING)
return 0;
+ event_base_del_virtual(bev->ev_base);
+ bufferevent_decref(bev);
+
return -1;
}
diff --git a/event-internal.h b/event-internal.h
index e52c129b..7d97d98e 100644
--- a/event-internal.h
+++ b/event-internal.h
@@ -182,6 +182,8 @@ struct event_base {
/** Data to implement the common signal handelr code. */
struct evsig_info sig;
+ /** Number of virtual events */
+ int virtual_event_count;
/** Number of total events added to this event_base */
int event_count;
/** Number of total events active in this event_base */
@@ -313,6 +315,10 @@ int _evsig_restore_handler(struct event_base *base, int evsignal);
void event_active_nolock(struct event *ev, int res, short count);
+/* FIXME document. */
+void event_base_add_virtual(struct event_base *base);
+void event_base_del_virtual(struct event_base *base);
+
#ifdef __cplusplus
}
#endif
diff --git a/event.c b/event.c
index 5f4656fb..aaf8f28b 100644
--- a/event.c
+++ b/event.c
@@ -962,7 +962,7 @@ static int
event_haveevents(struct event_base *base)
{
/* Caller must hold th_base_lock */
- return (base->event_count > 0);
+ return (base->virtual_event_count > 0 || base->event_count > 0);
}
/* "closure" function called when processing active signal events */
@@ -2707,3 +2707,19 @@ event_base_dump_events(struct event_base *base, FILE *output)
}
}
}
+
+void
+event_base_add_virtual(struct event_base *base)
+{
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ base->virtual_event_count++;
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+}
+
+void
+event_base_del_virtual(struct event_base *base)
+{
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ base->virtual_event_count--;
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+}
diff --git a/event_iocp.c b/event_iocp.c
index 82fa9aee..19c7bffc 100644
--- a/event_iocp.c
+++ b/event_iocp.c
@@ -36,6 +36,7 @@
#include "log-internal.h"
#include "mm-internal.h"
#include "event-internal.h"
+#include "evthread-internal.h"
#define NOTIFICATION_KEY ((ULONG_PTR)-1)
@@ -277,4 +278,3 @@ event_base_get_iocp(struct event_base *base)
return NULL;
#endif
}
-