summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2009-11-04 05:19:26 +0000
committerNick Mathewson <nickm@torproject.org>2009-11-04 05:19:26 +0000
commit86db1c851be804b13aadebcc0b1a21a4493192dd (patch)
treebd6a597a4448dde5a1c2c5f953cfcb7dd0fd0a5e
parent6ca32df11ac92a4c3ab417d656253353e28a7dc3 (diff)
downloadlibevent-86db1c851be804b13aadebcc0b1a21a4493192dd.tar.gz
Commit ConnectEx code to get connect working with async bufferevents.
This is code by Chris Davis, with changes to get the unit tests failing less aggressively. The unit tests for this code do not completely pass yet; Chris is looking into that. If they aren't passing by the next release, I'll turn off this code. svn:r1499
-rw-r--r--buffer_iocp.c11
-rw-r--r--bufferevent-internal.h11
-rw-r--r--bufferevent_async.c111
-rw-r--r--bufferevent_sock.c41
-rw-r--r--iocp-internal.h6
-rw-r--r--test/regress.h3
-rw-r--r--test/regress_bufferevent.c33
-rw-r--r--test/regress_iocp.c5
-rw-r--r--test/regress_main.c4
9 files changed, 204 insertions, 21 deletions
diff --git a/buffer_iocp.c b/buffer_iocp.c
index b52f84d6..cab782a3 100644
--- a/buffer_iocp.c
+++ b/buffer_iocp.c
@@ -316,3 +316,14 @@ _evbuffer_overlapped_get_fd(struct evbuffer *buf)
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
return buf_o ? buf_o->fd : -1;
}
+
+void
+_evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd)
+{
+ struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
+ EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+ /* XXX is this right?, should it cancel current I/O operations? */
+ if (buf_o)
+ buf_o->fd = fd;
+ EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+}
diff --git a/bufferevent-internal.h b/bufferevent-internal.h
index eec60e1a..4ee6d908 100644
--- a/bufferevent-internal.h
+++ b/bufferevent-internal.h
@@ -141,6 +141,17 @@ extern const struct bufferevent_ops bufferevent_ops_socket;
extern const struct bufferevent_ops bufferevent_ops_filter;
extern const struct bufferevent_ops bufferevent_ops_pair;
+#define BEV_IS_SOCKET(bevp) ((bevp)->be_ops == &bufferevent_ops_socket)
+#define BEV_IS_FILTER(bevp) ((bevp)->be_ops == &bufferevent_ops_filter)
+#define BEV_IS_PAIR(bevp) ((bevp)->be_ops == &bufferevent_ops_pair)
+
+#ifdef WIN32
+extern const struct bufferevent_ops bufferevent_ops_async;
+#define BEV_IS_ASYNC(bevp) ((bevp)->be_ops == &bufferevent_ops_async)
+#else
+#define BEV_IS_ASYNC(bevp) 0
+#endif
+
/** Initialize the shared parts of a bufferevent. */
int bufferevent_init_common(struct bufferevent_private *, struct event_base *, const struct bufferevent_ops *, enum bufferevent_options options);
diff --git a/bufferevent_async.c b/bufferevent_async.c
index 89c674a6..d34051ce 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -45,6 +45,7 @@
#ifdef WIN32
#include <winsock2.h>
+#include <ws2tcpip.h>
#endif
#include "event2/util.h"
@@ -78,6 +79,7 @@ const struct bufferevent_ops bufferevent_ops_async = {
struct bufferevent_async {
struct bufferevent_private bev;
+ struct event_overlapped connect_overlapped;
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
};
@@ -93,6 +95,15 @@ upcast(struct bufferevent *bev)
return bev_a;
}
+static inline struct bufferevent_async *
+upcast_overlapped(struct event_overlapped *eo)
+{
+ struct bufferevent_async *bev_a;
+ bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
+ EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
+ return bev_a;
+}
+
static void
bev_async_consider_writing(struct bufferevent_async *b)
{
@@ -244,6 +255,24 @@ be_async_flush(struct bufferevent *bev, short what,
return 0;
}
+static void
+connect_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ struct bufferevent_async *bev_a = upcast_overlapped(eo);
+ struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */
+
+ _bufferevent_incref_and_lock(bev);
+
+ EVUTIL_ASSERT(bev_a->bev.connecting);
+ bev_a->bev.connecting = 0;
+
+ _bufferevent_run_eventcb(bev,
+ ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
+
+ _bufferevent_decref_and_unlock(bev);
+}
+
struct bufferevent *
bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
@@ -257,8 +286,14 @@ bufferevent_async_new(struct event_base *base,
if (!(iocp = event_base_get_iocp(base)))
return NULL;
- if (event_iocp_port_associate(iocp, fd, 1)<0)
- return NULL;
+ if (fd >= 0 && event_iocp_port_associate(iocp, fd, 1)<0) {
+ int err = GetLastError();
+ /* We may have alrady associated this fd with a port.
+ * Let's hope it's this port, and that the error code
+ * for doing this neer changes. */
+ if (err != ERROR_INVALID_PARAMETER)
+ return NULL;
+ }
if (!(bev_a = mm_calloc(1, sizeof(struct bufferevent_async))))
return NULL;
@@ -283,14 +318,72 @@ bufferevent_async_new(struct event_base *base,
evbuffer_defer_callbacks(bev->input, base);
evbuffer_defer_callbacks(bev->output, base);
+ evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
_bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
+ event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
+
return bev;
err:
bufferevent_free(&bev_a->bev.bev);
return NULL;
}
+int
+bufferevent_async_can_connect(struct bufferevent *bev)
+{
+ const struct win32_extension_fns *ext =
+ event_get_win32_extension_fns();
+
+ if (BEV_IS_ASYNC(bev) &&
+ event_base_get_iocp(bev->ev_base) &&
+ ext && ext->ConnectEx)
+ return 1;
+
+ return 0;
+}
+
+int
+bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
+ const struct sockaddr *sa, int socklen)
+{
+ BOOL rc;
+ struct bufferevent_async *bev_async = upcast(bev);
+ struct sockaddr_storage ss;
+ const struct win32_extension_fns *ext =
+ event_get_win32_extension_fns();
+
+ EVUTIL_ASSERT(ext && ext->ConnectEx && fd >= 0 && sa != NULL);
+
+ /* ConnectEx() requires that the socket be bound to an address
+ * with bind() before using, otherwise it will fail. We attempt
+ * to issue a bind() here, taking into account that the error
+ * code is set to WSAEINVAL when the socket is already bound. */
+ memset(&ss, 0, sizeof(ss));
+ if (sa->sa_family == AF_INET) {
+ struct sockaddr_in *sin = (struct sockaddr_in *)&ss;
+ sin->sin_family = AF_INET;
+ sin->sin_addr.s_addr = INADDR_ANY;
+ } else if (sa->sa_family == AF_INET6) {
+ struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&ss;
+ sin6->sin6_family = AF_INET6;
+ sin6->sin6_addr = in6addr_any;
+ } else {
+ /* XXX: what to do? */
+ return -1;
+ }
+ if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
+ WSAGetLastError() != WSAEINVAL)
+ return -1;
+
+ rc = ext->ConnectEx(fd, sa, socklen, NULL, 0, NULL,
+ &bev_async->connect_overlapped.overlapped);
+ if (rc || WSAGetLastError() == ERROR_IO_PENDING)
+ return 0;
+
+ return -1;
+}
+
static int
be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
union bufferevent_ctrl_data *data)
@@ -299,7 +392,19 @@ be_async_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
case BEV_CTRL_GET_FD:
data->fd = _evbuffer_overlapped_get_fd(bev->input);
return 0;
- case BEV_CTRL_SET_FD:
+ case BEV_CTRL_SET_FD: {
+ struct event_iocp_port *iocp;
+
+ if (data->fd == _evbuffer_overlapped_get_fd(bev->input))
+ return 0;
+ if (!(iocp = event_base_get_iocp(bev->ev_base)))
+ return -1;
+ if (event_iocp_port_associate(iocp, data->fd, 1) < 0)
+ return -1;
+ _evbuffer_overlapped_set_fd(bev->input, data->fd);
+ _evbuffer_overlapped_set_fd(bev->output, data->fd);
+ return 0;
+ }
case BEV_CTRL_GET_UNDERLYING:
default:
return -1;
diff --git a/bufferevent_sock.c b/bufferevent_sock.c
index c8fd1c7b..0b15b6b1 100644
--- a/bufferevent_sock.c
+++ b/bufferevent_sock.c
@@ -215,7 +215,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
} else {
connected = 1;
_bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED);
- if (!(bufev->enabled & EV_WRITE)) {
+ if (!(bufev->enabled & EV_WRITE) || BEV_IS_ASYNC(bufev)) {
event_del(&bufev->ev_write);
goto done;
}
@@ -332,18 +332,33 @@ bufferevent_socket_connect(struct bufferevent *bev,
ownfd = 1;
}
if (sa) {
- r = evutil_socket_connect(&fd, sa, socklen);
- if (r < 0) {
- _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
- if (ownfd)
- EVUTIL_CLOSESOCKET(fd);
- /* do something about the error? */
+#ifdef WIN32
+ if (bufferevent_async_can_connect(bev)) {
+ bufferevent_setfd(bev, fd);
+ r = bufferevent_async_connect(bev, fd, sa, socklen);
+ if (r < 0)
+ goto freesock;
+ bufev_p->connecting = 1;
+ result = 0;
goto done;
- }
+ } else
+#endif
+ r = evutil_socket_connect(&fd, sa, socklen);
+ if (r < 0)
+ goto freesock;
}
+#ifdef WIN32
+ /* ConnectEx() isn't always around, even when IOCP is enabled.
+ * Here, we borrow the socket object's write handler to fall back
+ * on a non-blocking connect() when ConnectEx() is unavailable. */
+ if (BEV_IS_ASYNC(bev)) {
+ event_assign(&bev->ev_write, bev->ev_base, fd,
+ EV_WRITE|EV_PERSIST, bufferevent_writecb, bev);
+ }
+#endif
bufferevent_setfd(bev, fd);
if (r == 0) {
- if (! bufferevent_enable(bev, EV_WRITE)) {
+ if (! be_socket_enable(bev, EV_WRITE)) {
bufev_p->connecting = 1;
result = 0;
goto done;
@@ -354,6 +369,14 @@ bufferevent_socket_connect(struct bufferevent *bev,
_bufferevent_run_eventcb(bev, BEV_EVENT_CONNECTED);
}
+ goto done;
+
+freesock:
+ _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR);
+ if (ownfd)
+ EVUTIL_CLOSESOCKET(fd);
+ /* do something about the error? */
+
done:
_bufferevent_decref_and_unlock(bev);
return result;
diff --git a/iocp-internal.h b/iocp-internal.h
index 18b89d25..c0500ace 100644
--- a/iocp-internal.h
+++ b/iocp-internal.h
@@ -117,6 +117,8 @@ struct evbuffer *evbuffer_overlapped_new(evutil_socket_t fd);
/** XXXX Document (nickm) */
evutil_socket_t _evbuffer_overlapped_get_fd(struct evbuffer *buf);
+void _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd);
+
/** Start reading data onto the end of an overlapped evbuffer.
An evbuffer can only have one read pending at a time. While the read
@@ -176,6 +178,10 @@ int event_base_start_iocp(struct event_base *base);
struct bufferevent *bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options);
+/* FIXME document. */
+int bufferevent_async_can_connect(struct bufferevent *bev);
+int bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
+ const struct sockaddr *sa, int socklen);
#ifdef __cplusplus
}
diff --git a/test/regress.h b/test/regress.h
index a586c4f7..0593a69b 100644
--- a/test/regress.h
+++ b/test/regress.h
@@ -84,7 +84,8 @@ void run_legacy_test_fn(void *ptr);
#define TT_LEGACY (TT_FIRST_USER_FLAG<<3)
#define TT_NEED_THREADS (TT_FIRST_USER_FLAG<<4)
#define TT_NO_LOGS (TT_FIRST_USER_FLAG<<5)
-#define TT_ENABLE_IOCP (TT_FIRST_USER_FLAG<<6)
+#define TT_ENABLE_IOCP_FLAG (TT_FIRST_USER_FLAG<<6)
+#define TT_ENABLE_IOCP (TT_ENABLE_IOCP_FLAG|TT_NEED_THREADS)
/* All the flags that a legacy test needs. */
#define TT_ISOLATED TT_FORK|TT_NEED_SOCKETPAIR|TT_NEED_BASE
diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c
index 7658741c..257abe56 100644
--- a/test/regress_bufferevent.c
+++ b/test/regress_bufferevent.c
@@ -73,6 +73,9 @@
#include "event2/util.h"
#include "bufferevent-internal.h"
+#ifdef WIN32
+#include "iocp-internal.h"
+#endif
#include "regress.h"
@@ -412,10 +415,14 @@ listen_cb(struct evconnlistener *listener, evutil_socket_t fd,
struct event_base *base = arg;
struct bufferevent *bev;
const char s[] = TEST_STR;
+ TT_BLATHER(("Got a request on socket %d", (int)fd ));
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
+ tt_assert(bev);
bufferevent_write(bev, s, sizeof(s));
bufferevent_setcb(bev, NULL, sender_writecb, sender_errorcb, NULL);
bufferevent_enable(bev, EV_WRITE);
+end:
+ ;
}
static void
@@ -459,6 +466,14 @@ test_bufferevent_connect(void *arg)
if (strstr((char*)data->setup_data, "lock")) {
be_flags |= BEV_OPT_THREADSAFE;
}
+#ifdef WIN32
+ if (!strcmp((char*)data->setup_data, "unset_connectex")) {
+ struct win32_extension_fns *ext =
+ (struct win32_extension_fns *)
+ event_get_win32_extension_fns();
+ ext->ConnectEx = NULL;
+ }
+#endif
memset(&localhost, 0, sizeof(localhost));
@@ -616,17 +631,23 @@ struct testcase_t bufferevent_iocp_testcases[] = {
LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
#if 0
- { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE,
- &basic_setup, (void*)"" },
+ { "bufferevent_connect", test_bufferevent_connect,
+ TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
{ "bufferevent_connect_defer", test_bufferevent_connect,
- TT_FORK|TT_NEED_BASE, &basic_setup, (void*)"defer" },
+ TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"defer" },
{ "bufferevent_connect_lock", test_bufferevent_connect,
- TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup, (void*)"lock" },
+ TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
+ (void*)"lock" },
{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
- TT_FORK|TT_NEED_BASE|TT_NEED_THREADS, &basic_setup,
+ TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
(void*)"defer lock" },
+#endif
{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
- TT_FORK|TT_NEED_BASE, &basic_setup, NULL },
+ TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
+#if 0
+ { "bufferevent_connect_nonblocking", test_bufferevent_connect,
+ TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
+ (void*)"unset_connectex" },
#endif
END_OF_TESTCASES,
diff --git a/test/regress_iocp.c b/test/regress_iocp.c
index 2841f4a3..f1e9af6e 100644
--- a/test/regress_iocp.c
+++ b/test/regress_iocp.c
@@ -38,6 +38,11 @@
#include "tinytest.h"
#include "tinytest_macros.h"
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>
+#include <winsock2.h>
+#undef WIN32_LEAN_AND_MEAN
+
#include "iocp-internal.h"
#include "evthread-internal.h"
diff --git a/test/regress_main.c b/test/regress_main.c
index 2e383971..f2528025 100644
--- a/test/regress_main.c
+++ b/test/regress_main.c
@@ -135,7 +135,7 @@ basic_test_setup(const struct testcase_t *testcase)
struct basic_test_data *data = NULL;
#ifndef WIN32
- if (testcase->flags & TT_ENABLE_IOCP)
+ if (testcase->flags & TT_ENABLE_IOCP_FLAG)
return (void*)TT_SKIP;
#endif
@@ -177,7 +177,7 @@ basic_test_setup(const struct testcase_t *testcase)
if (!base)
exit(1);
}
- if (testcase->flags & TT_ENABLE_IOCP) {
+ if (testcase->flags & TT_ENABLE_IOCP_FLAG) {
if (event_base_start_iocp(base)<0) {
event_base_free(base);
return (void*)TT_SKIP;