summaryrefslogtreecommitdiff
path: root/bufferevent_async.c
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2009-11-04 05:19:26 +0000
committerNick Mathewson <nickm@torproject.org>2009-11-04 05:19:26 +0000
commit86db1c851be804b13aadebcc0b1a21a4493192dd (patch)
treebd6a597a4448dde5a1c2c5f953cfcb7dd0fd0a5e /bufferevent_async.c
parent6ca32df11ac92a4c3ab417d656253353e28a7dc3 (diff)
downloadlibevent-86db1c851be804b13aadebcc0b1a21a4493192dd.tar.gz
Commit ConnectEx code to get connect working with async bufferevents.
This is code by Chris Davis, with changes to get the unit tests failing less aggressively. The unit tests for this code do not completely pass yet; Chris is looking into that. If they aren't passing by the next release, I'll turn off this code. svn:r1499
Diffstat (limited to 'bufferevent_async.c')
-rw-r--r--bufferevent_async.c111
1 files changed, 108 insertions, 3 deletions
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 89c674a6..d34051ce 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -45,6 +45,7 @@
#ifdef WIN32
#include <winsock2.h>
+#include <ws2tcpip.h>
#endif
#include "event2/util.h"
@@ -78,6 +79,7 @@ const struct bufferevent_ops bufferevent_ops_async = {
struct bufferevent_async {
struct bufferevent_private bev;
+ struct event_overlapped connect_overlapped;
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
};
@@ -93,6 +95,15 @@ upcast(struct bufferevent *bev)
return bev_a;
}
+static inline struct bufferevent_async *
+upcast_overlapped(struct event_overlapped *eo)
+{
+ struct bufferevent_async *bev_a;
+ bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
+ EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
+ return bev_a;
+}
+
static void
bev_async_consider_writing(struct bufferevent_async *b)
{
@@ -244,6 +255,24 @@ be_async_flush(struct bufferevent *bev, short what,
return 0;
}
+static void
+connect_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ struct bufferevent_async *bev_a = upcast_overlapped(eo);
+ struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */
+
+ _bufferevent_incref_and_lock(bev);
+
+ EVUTIL_ASSERT(bev_a->bev.connecting);
+ bev_a->bev.connecting = 0;
+
+ _bufferevent_run_eventcb(bev,
+ ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
+
+ _bufferevent_decref_and_unlock(bev);
+}
+
struct bufferevent *
bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
@@ -257,8 +286,14 @@ bufferevent_async_new(struct event_base *base,
if (!(iocp = event_base_get_iocp(base)))
return NULL;
- if (event_iocp_port_associate(iocp, fd, 1)<0)
- return NULL;
+ if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
+ int err = GetLastError();
+ /* We may have alrady associated this fd with a port.
+ * Let's hope it's this port, and that the error code
+ * for doing this neer changes. */
+ if (err != ERROR_INVALID_PARAMETER)
+ return NULL;
+ }
if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
return NULL;
@@ -283,14 +318,72 @@ bufferevent_async_new(struct event_base *base,
evbuffer_defer_callbacks(bev->input, base);
evbuffer_defer_callbacks(bev->output, base);
+ evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
_bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
+ event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
+
return bev;
err:
bufferevent_free(&bev_a->bev.bev);
return NULL;
}
+int
+bufferevent_async_can_connect(struct bufferevent *bev)
+{
+ const struct win32_extension_fns *ext =
+ event_get_win32_extension_fns();
+
+ if (BEV_IS_ASYNC(bev) &&
+ event_base_get_iocp(bev->ev_base) &&
+ ext && ext->ConnectEx)
+ return 1;
+
+ return 0;
+}
+
+int
+bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
+ const struct sockaddr *sa, int socklen)
+{
+ BOOL rc;
+ struct bufferevent_async *bev_async = upcast(bev);
+ struct sockaddr_storage ss;
+ const struct win32_extension_fns *ext =
+ event_get_win32_extension_fns();
+
+ EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
+
+ /* ConnectEx() requires that the socket be bound to an address
+ * with bind() before using, otherwise it will fail. We attempt
+ * to issue a bind() here, taking into account that the error
+ * code is set to WSAEINVAL when the socket is already bound. */
+ memset(&ss, 0, sizeof(ss));
+ if (sa->sa_family == AF_INET) {
+ struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
+ sin->sin_family = AF_INET;
+ sin->sin_addr.s_addr = INADDR_ANY;
+ } else if (sa->sa_family == AF_INET6) {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
+ sin6->sin6_family = AF_INET6;
+ sin6->sin6_addr = in6addr_any;
+ } else {
+ /* XXX: what to do? */
+ return -1;
+ }
+ if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
+ WSAGetLastError() != WSAEINVAL)
+ return -1;
+
+ rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
+ &bev_async->connect_overlapped.overlapped);
+ if (rc || WSAGetLastError() == ERROR_IO_PENDING)
+ return 0;
+
+ return -1;
+}
+
static int
be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
union bufferevent_ctrl_data *data)
@@ -299,7 +392,19 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
case BEV_CTRL_GET_FD:
data->fd = _evbuffer_overlapped_get_fd(bev->input);
return 0;
- case BEV_CTRL_SET_FD:
+ case BEV_CTRL_SET_FD: {
+ struct event_iocp_port *iocp;
+
+ if (data->fd == _evbuffer_overlapped_get_fd(bev->input))
+ return 0;
+ if (!(iocp = event_base_get_iocp(bev->ev_base)))
+ return -1;
+ if (event_iocp_port_associate(iocp, data->fd, 1) < 0)
+ return -1;
+ _evbuffer_overlapped_set_fd(bev->input, data->fd);
+ _evbuffer_overlapped_set_fd(bev->output, data->fd);
+ return 0;
+ }
case BEV_CTRL_GET_UNDERLYING:
default:
return -1;