summaryrefslogtreecommitdiff
path: root/deps
diff options
context:
space:
mode:
authorRyan <ry@tinyclouds.org>2009-08-21 14:53:58 +0200
committerRyan <ry@tinyclouds.org>2009-08-21 14:53:58 +0200
commited3602dddcd0e7ddaedd8962252479ded992d95b (patch)
treefbb573d269fe86ca08716bdc440b5855a8ac7d04 /deps
parentf7f11352b7fc22d69eae22ed424cf1122245b77a (diff)
downloadnode-new-ed3602dddcd0e7ddaedd8962252479ded992d95b.tar.gz
Sync evcom
Diffstat (limited to 'deps')
-rw-r--r--deps/evcom/evcom.c60
-rw-r--r--deps/evcom/evcom.h2
-rw-r--r--deps/evcom/test/test.c44
3 files changed, 78 insertions, 28 deletions
diff --git a/deps/evcom/evcom.c b/deps/evcom/evcom.c
index febed43a38..629864d58f 100644
--- a/deps/evcom/evcom.c
+++ b/deps/evcom/evcom.c
@@ -226,7 +226,10 @@ stream_send__close_one (evcom_stream *stream)
/* TODO recover from EINTR */
stream__set_send_closed(stream);
- if (DUPLEX(stream)) stream__set_recv_closed(stream);
+
+ if (DUPLEX(stream) || stream->recv_action == stream_recv__wait_for_close) {
+ stream__set_recv_closed(stream);
+ }
return OKAY;
}
@@ -253,8 +256,11 @@ stream__close_both (evcom_stream *stream)
static int
stream_send__close (evcom_stream *stream)
{
- stream->send_action = DUPLEX(stream) ?
- stream_send__close_one : stream__close_both;
+ if (DUPLEX(stream) || stream->recvfd < 0) {
+ stream->send_action = stream_send__close_one;
+ } else {
+ stream->send_action = stream__close_both;
+ }
return OKAY;
}
@@ -268,7 +274,10 @@ stream_recv__close_one (evcom_stream *stream)
/* TODO recover from EINTR */
stream__set_recv_closed(stream);
- if (DUPLEX(stream)) stream__set_send_closed(stream);
+
+ if (DUPLEX(stream)) {
+ stream__set_send_closed(stream);
+ }
return OKAY;
}
@@ -276,8 +285,11 @@ stream_recv__close_one (evcom_stream *stream)
static int
stream_recv__close (evcom_stream *stream)
{
- stream->recv_action = DUPLEX(stream) ?
- stream_recv__close_one : stream__close_both;
+ if (DUPLEX(stream) || stream->sendfd < 0) {
+ stream->recv_action = stream_recv__close_one;
+ } else {
+ stream->recv_action = stream__close_both;
+ }
return OKAY;
}
@@ -526,13 +538,13 @@ stream_recv__data (evcom_stream *stream)
if (recved == 0) {
stream->flags &= ~EVCOM_READABLE;
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
+ stream->recv_action = stream_recv__wait_for_close;
}
/* NOTE: EOF is signaled with recved == 0 on callback */
if (stream->on_read) stream->on_read(stream, buf, recved);
if (recved == 0) {
- stream->recv_action = stream_recv__wait_for_close;
return OKAY;
}
}
@@ -682,7 +694,7 @@ stream_send__wait_for_connection (evcom_stream *stream)
return OKAY;
}
-static void
+void
evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd)
{
assert(recvfd >= 0);
@@ -690,6 +702,14 @@ evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd)
if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX;
+ if (set_nonblock(recvfd) != 0) {
+ evcom_perror("set_nonblock(recvfd)", errno);
+ }
+
+ if (set_nonblock(sendfd) != 0) {
+ evcom_perror("set_nonblock(sendfd)", errno);
+ }
+
#ifdef SO_NOSIGPIPE
if (DUPLEX(stream)) {
int flags = 1;
@@ -736,7 +756,9 @@ accept_connection (evcom_server *server)
if (fd < 0) {
switch (errno) {
case EMFILE:
+ case ENFILE:
too_many_connections = 1;
+ server->flags |= EVCOM_TOO_MANY_CONN;
evcom_server_detach(server);
return NULL;
@@ -992,10 +1014,11 @@ stream_event (EV_P_ ev_io *w, int revents)
if (stream->sendfd < 0 && stream->recvfd < 0) {
ev_timer_stop(EV_A_ &stream->timeout_watcher);
- if (too_many_connections && stream->server) {
+ if (stream->server && (stream->server->flags & EVCOM_TOO_MANY_CONN)) {
#if EV_MULTIPLICITY
struct ev_loop *loop = stream->server->loop;
#endif
+ stream->server->flags &= ~EVCOM_TOO_MANY_CONN;
evcom_server_attach(EV_A_ stream->server);
}
too_many_connections = 0;
@@ -1049,19 +1072,24 @@ void
evcom_stream_close (evcom_stream *stream)
{
stream->flags |= EVCOM_GOT_CLOSE;
- if (WRITABLE(stream)) {
- ev_io_start(D_LOOP_(stream) &stream->write_watcher);
+ if (ATTACHED(stream)) {
+ // start the watchers if attached.
+ evcom_stream_attach(D_LOOP_(stream) stream);
}
}
void evcom_stream_force_close (evcom_stream *stream)
{
- close(stream->recvfd);
- /* XXX What to do on EINTR? */
- stream__set_recv_closed(stream);
+ if (stream->recvfd >= 0) {
+ close(stream->recvfd);
+ /* XXX What to do on EINTR? */
+ stream__set_recv_closed(stream);
+ }
- if (!DUPLEX(stream)) close(stream->sendfd);
- stream__set_send_closed(stream);
+ if (!DUPLEX(stream) && stream->sendfd >= 0) {
+ close(stream->sendfd);
+ stream__set_send_closed(stream);
+ }
evcom_stream_detach(stream);
}
diff --git a/deps/evcom/evcom.h b/deps/evcom/evcom.h
index 1bbfd03f15..fae9244747 100644
--- a/deps/evcom/evcom.h
+++ b/deps/evcom/evcom.h
@@ -57,6 +57,7 @@ extern "C" {
#define EVCOM_PAUSED 0x0040
#define EVCOM_READABLE 0x0080
#define EVCOM_WRITABLE 0x0100
+#define EVCOM_TOO_MANY_CONN 0x0200
enum evcom_stream_state { EVCOM_INITIALIZED
, EVCOM_CONNECTING
@@ -181,6 +182,7 @@ void evcom_stream_init (evcom_stream *, float timeout);
int evcom_stream_pair (evcom_stream *a, evcom_stream *b);
int evcom_stream_connect (evcom_stream *, struct sockaddr *address);
+void evcom_stream_assign_fds (evcom_stream *, int recvfd, int sendfd);
void evcom_stream_attach (EV_P_ evcom_stream *);
void evcom_stream_detach (evcom_stream *);
diff --git a/deps/evcom/test/test.c b/deps/evcom/test/test.c
index 6b4b765fa0..7aadebeebc 100644
--- a/deps/evcom/test/test.c
+++ b/deps/evcom/test/test.c
@@ -218,9 +218,6 @@ pingpong (struct sockaddr *address)
nconnections = 0;
got_server_close = 0;
- printf("sizeof(evcom_server): %d\n", (int)sizeof(evcom_server));
- printf("sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream));
-
evcom_server_init(&server);
server.on_connection = pingpong_on_server_connection;
server.on_close = common_on_server_close;
@@ -549,7 +546,7 @@ void b_read (evcom_stream *stream, const void *buf, size_t len)
}
int
-pair_pingpong ()
+pair_pingpong (int use_pipe)
{
a_got_close = 0;
a_got_connect = 0;
@@ -573,8 +570,18 @@ pair_pingpong ()
if (use_tls) anon_tls_server(&b);
#endif
- int r = evcom_stream_pair(&a, &b);
- assert(r == 0);
+ if (use_pipe) {
+ int pipeA[2], pipeB[2];
+ assert(0 == pipe(pipeA));
+ assert(0 == pipe(pipeB));
+
+ evcom_stream_assign_fds(&a, pipeA[0], pipeB[1]);
+ evcom_stream_assign_fds(&b, pipeB[0], pipeA[1]);
+
+ } else {
+ int r = evcom_stream_pair(&a, &b);
+ assert(r == 0);
+ }
evcom_stream_attach(EV_DEFAULT_ &a);
evcom_stream_attach(EV_DEFAULT_ &b);
@@ -764,6 +771,11 @@ free_unix_address (struct sockaddr *address)
int
main (void)
{
+ fprintf(stderr, "sizeof(evcom_server): %d\n", (int)sizeof(evcom_server));
+ fprintf(stderr, "sizeof(evcom_stream): %d\n", (int)sizeof(evcom_stream));
+ fprintf(stderr, "sizeof(evcom_reader): %d\n", (int)sizeof(evcom_reader));
+ fprintf(stderr, "sizeof(evcom_writer): %d\n", (int)sizeof(evcom_writer));
+
#if EVCOM_HAVE_GNUTLS
gnutls_global_init();
@@ -785,6 +797,14 @@ main (void)
use_tls = 0;
+ fprintf(stderr, "pair_pingpong use_pipe=1: ");
+ assert(pair_pingpong(1) == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "pair_pingpong use_pipe=0: ");
+ assert(pair_pingpong(0) == 0);
+ fprintf(stderr, "\n");
+
fprintf(stderr, "zero_stream tcp: ");
assert(zero_stream((struct sockaddr*)&tcp_address, 5*1024*1024) == 0);
fprintf(stderr, "\n");
@@ -793,10 +813,6 @@ main (void)
assert(pipe_stream() == 0);
fprintf(stderr, "\n");
- fprintf(stderr, "pair_pingpong: ");
- assert(pair_pingpong() == 0);
- fprintf(stderr, "\n");
-
fprintf(stderr, "pingpong tcp: ");
assert(pingpong((struct sockaddr*)&tcp_address) == 0);
fprintf(stderr, "\n");
@@ -812,8 +828,12 @@ main (void)
assert(zero_stream((struct sockaddr*)&tcp_address, 50*1024) == 0);
fprintf(stderr, "\n");
- fprintf(stderr, "pair_pingpong ssl: ");
- assert(pair_pingpong() == 0);
+ fprintf(stderr, "pair_pingpong ssl use_pipe=1: ");
+ assert(pair_pingpong(1) == 0);
+ fprintf(stderr, "\n");
+
+ fprintf(stderr, "pair_pingpong ssl use_pipe=0: ");
+ assert(pair_pingpong(0) == 0);
fprintf(stderr, "\n");
fprintf(stderr, "pingpong ssl: ");