diff options
author | Garrett D'Amore <garrett@damore.org> | 2016-05-31 13:21:07 -0700 |
---|---|---|
committer | Garrett D'Amore <garrett@damore.org> | 2016-05-31 19:35:28 -0700 |
commit | 60720570c610635ed8ec0228e294e9580ba0c7c6 (patch) | |
tree | a3a4e80f2f82697109a09aaf66aa6c398d433267 | |
parent | 6dd98f856abbb9357cb4a82de5498a5c56c4eedd (diff) | |
download | nanomsg-60720570c610635ed8ec0228e294e9580ba0c7c6.tar.gz |
fixes #617 nn_bind does not fail when binding to an already bound TCP port
-rw-r--r-- | src/transports/ipc/bipc.c | 186 | ||||
-rw-r--r-- | src/transports/tcp/btcp.c | 212 | ||||
-rw-r--r-- | src/transports/ws/bws.c | 195 | ||||
-rw-r--r-- | tests/bug328.c | 10 | ||||
-rw-r--r-- | tests/ipc.c | 27 | ||||
-rw-r--r-- | tests/tcp.c | 21 | ||||
-rw-r--r-- | tests/ws.c | 24 |
7 files changed, 181 insertions, 494 deletions
diff --git a/src/transports/ipc/bipc.c b/src/transports/ipc/bipc.c index e796f95..acf994e 100644 --- a/src/transports/ipc/bipc.c +++ b/src/transports/ipc/bipc.c @@ -1,6 +1,7 @@ /* Copyright (c) 2012-2013 Martin Sustrik All rights reserved. Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com> + Copyright 2016 Garrett D'Amore <garrett@damore.org> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), @@ -27,8 +28,6 @@ #include "../../aio/fsm.h" #include "../../aio/usock.h" -#include "../utils/backoff.h" - #include "../../utils/err.h" #include "../../utils/cont.h" #include "../../utils/alloc.h" @@ -51,14 +50,9 @@ #define NN_BIPC_STATE_STOPPING_AIPC 3 #define NN_BIPC_STATE_STOPPING_USOCK 4 #define NN_BIPC_STATE_STOPPING_AIPCS 5 -#define NN_BIPC_STATE_LISTENING 6 -#define NN_BIPC_STATE_WAITING 7 -#define NN_BIPC_STATE_CLOSING 8 -#define NN_BIPC_STATE_STOPPING_BACKOFF 9 #define NN_BIPC_SRC_USOCK 1 #define NN_BIPC_SRC_AIPC 2 -#define NN_BIPC_SRC_RECONNECT_TIMER 3 struct nn_bipc { @@ -78,9 +72,6 @@ struct nn_bipc { /* List of accepted connections. */ struct nn_list aipcs; - - /* Used to wait before retrying to connect. */ - struct nn_backoff retry; }; /* nn_epbase virtual interface implementation. */ @@ -96,44 +87,39 @@ static void nn_bipc_handler (struct nn_fsm *self, int src, int type, void *srcptr); static void nn_bipc_shutdown (struct nn_fsm *self, int src, int type, void *srcptr); -static void nn_bipc_start_listening (struct nn_bipc *self); +static int nn_bipc_listen (struct nn_bipc *self); static void nn_bipc_start_accepting (struct nn_bipc *self); int nn_bipc_create (void *hint, struct nn_epbase **epbase) { struct nn_bipc *self; - int reconnect_ivl; - int reconnect_ivl_max; size_t sz; + int rc; /* Allocate the new endpoint object. */ self = nn_alloc (sizeof (struct nn_bipc), "bipc"); alloc_assert (self); + /* Initialise the structure. */ nn_epbase_init (&self->epbase, &nn_bipc_epbase_vfptr, hint); nn_fsm_init_root (&self->fsm, nn_bipc_handler, nn_bipc_shutdown, nn_epbase_getctx (&self->epbase)); self->state = NN_BIPC_STATE_IDLE; - sz = sizeof (reconnect_ivl); - nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL, - &reconnect_ivl, &sz); - nn_assert (sz == sizeof (reconnect_ivl)); - sz = sizeof (reconnect_ivl_max); - nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX, - &reconnect_ivl_max, &sz); - nn_assert (sz == sizeof (reconnect_ivl_max)); - if (reconnect_ivl_max == 0) - reconnect_ivl_max = reconnect_ivl; - nn_backoff_init (&self->retry, NN_BIPC_SRC_RECONNECT_TIMER, - reconnect_ivl, reconnect_ivl_max, &self->fsm); - nn_usock_init (&self->usock, NN_BIPC_SRC_USOCK, &self->fsm); self->aipc = NULL; nn_list_init (&self->aipcs); /* Start the state machine. */ nn_fsm_start (&self->fsm); + nn_usock_init (&self->usock, NN_BIPC_SRC_USOCK, &self->fsm); + + rc = nn_bipc_listen (self); + if (rc != 0) { + nn_epbase_term (&self->epbase); + return rc; + } + /* Return the base class as an out parameter. */ *epbase = &self->epbase; @@ -159,7 +145,6 @@ static void nn_bipc_destroy (struct nn_epbase *self) nn_list_term (&bipc->aipcs); nn_assert (bipc->aipc == NULL); nn_usock_term (&bipc->usock); - nn_backoff_term (&bipc->retry); nn_epbase_term (&bipc->epbase); nn_fsm_term (&bipc->fsm); @@ -181,7 +166,6 @@ static void nn_bipc_shutdown (struct nn_fsm *self, int src, int type, bipc = nn_cont (self, struct nn_bipc, fsm); if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { - nn_backoff_stop (&bipc->retry); if (bipc->aipc) { nn_aipc_stop (bipc->aipc); bipc->state = NN_BIPC_STATE_STOPPING_AIPC; @@ -208,8 +192,7 @@ static void nn_bipc_shutdown (struct nn_fsm *self, int src, int type, bipc->state = NN_BIPC_STATE_STOPPING_USOCK; } if (nn_slow (bipc->state == NN_BIPC_STATE_STOPPING_USOCK)) { - if (!nn_usock_isidle (&bipc->usock) || - !nn_backoff_isidle (&bipc->retry)) + if (!nn_usock_isidle (&bipc->usock)) return; for (it = nn_list_begin (&bipc->aipcs); it != nn_list_end (&bipc->aipcs); @@ -257,51 +240,32 @@ static void nn_bipc_handler (struct nn_fsm *self, int src, int type, /* IDLE state. */ /******************************************************************************/ case NN_BIPC_STATE_IDLE: - switch (src) { - - case NN_FSM_ACTION: - switch (type) { - case NN_FSM_START: - nn_bipc_start_listening (bipc); - return; - default: - nn_fsm_bad_action (bipc->state, src, type); - } - - default: - nn_fsm_bad_source (bipc->state, src, type); - } + nn_assert (src == NN_FSM_ACTION); + nn_assert (type == NN_FSM_START); + bipc->state = NN_BIPC_STATE_ACTIVE; + return; /******************************************************************************/ /* ACTIVE state. */ /* The execution is yielded to the aipc state machine in this state. */ /******************************************************************************/ case NN_BIPC_STATE_ACTIVE: - if (srcptr == bipc->aipc) { - switch (type) { - case NN_AIPC_ACCEPTED: - - /* Move the newly created connection to the list of existing - connections. */ - nn_list_insert (&bipc->aipcs, &bipc->aipc->item, - nn_list_end (&bipc->aipcs)); - bipc->aipc = NULL; - - /* Start waiting for a new incoming connection. */ - nn_bipc_start_accepting (bipc); - - return; - - default: - nn_fsm_bad_action (bipc->state, src, type); - } + if (src == NN_BIPC_SRC_USOCK) { + nn_assert (type == NN_USOCK_SHUTDOWN || type == NN_USOCK_STOPPED); + return; } - /* For all remaining events we'll assume they are coming from one - of remaining child aipc objects. */ + /* All other events come from child aipc objects. */ nn_assert (src == NN_BIPC_SRC_AIPC); aipc = (struct nn_aipc*) srcptr; switch (type) { + case NN_AIPC_ACCEPTED: + + nn_list_insert (&bipc->aipcs, &aipc->item, + nn_list_end (&bipc->aipcs)); + bipc->aipc = NULL; + nn_bipc_start_accepting (bipc); + return; case NN_AIPC_ERROR: nn_aipc_stop (aipc); return; @@ -315,71 +279,6 @@ static void nn_bipc_handler (struct nn_fsm *self, int src, int type, } /******************************************************************************/ -/* CLOSING_USOCK state. */ -/* usock object was asked to stop but it haven't stopped yet. */ -/******************************************************************************/ - case NN_BIPC_STATE_CLOSING: - switch (src) { - - case NN_BIPC_SRC_USOCK: - switch (type) { - case NN_USOCK_SHUTDOWN: - return; - case NN_USOCK_STOPPED: - nn_backoff_start (&bipc->retry); - bipc->state = NN_BIPC_STATE_WAITING; - return; - default: - nn_fsm_bad_action (bipc->state, src, type); - } - - default: - nn_fsm_bad_source (bipc->state, src, type); - } - -/******************************************************************************/ -/* WAITING state. */ -/* Waiting before re-bind is attempted. This way we won't overload */ -/* the system by continuous re-bind attemps. */ -/******************************************************************************/ - case NN_BIPC_STATE_WAITING: - switch (src) { - - case NN_BIPC_SRC_RECONNECT_TIMER: - switch (type) { - case NN_BACKOFF_TIMEOUT: - nn_backoff_stop (&bipc->retry); - bipc->state = NN_BIPC_STATE_STOPPING_BACKOFF; - return; - default: - nn_fsm_bad_action (bipc->state, src, type); - } - - default: - nn_fsm_bad_source (bipc->state, src, type); - } - -/******************************************************************************/ -/* STOPPING_BACKOFF state. */ -/* backoff object was asked to stop, but it haven't stopped yet. */ -/******************************************************************************/ - case NN_BIPC_STATE_STOPPING_BACKOFF: - switch (src) { - - case NN_BIPC_SRC_RECONNECT_TIMER: - switch (type) { - case NN_BACKOFF_STOPPED: - nn_bipc_start_listening (bipc); - return; - default: - nn_fsm_bad_action (bipc->state, src, type); - } - - default: - nn_fsm_bad_source (bipc->state, src, type); - } - -/******************************************************************************/ /* Invalid state. */ /******************************************************************************/ default: @@ -387,11 +286,7 @@ static void nn_bipc_handler (struct nn_fsm *self, int src, int type, } } -/******************************************************************************/ -/* State machine actions. */ -/******************************************************************************/ - -static void nn_bipc_start_listening (struct nn_bipc *self) +static int nn_bipc_listen (struct nn_bipc *self) { int rc; struct sockaddr_storage ss; @@ -431,30 +326,31 @@ static void nn_bipc_start_listening (struct nn_bipc *self) /* Start listening for incoming connections. */ rc = nn_usock_start (&self->usock, AF_UNIX, SOCK_STREAM, 0); - if (nn_slow (rc < 0)) { - nn_backoff_start (&self->retry); - self->state = NN_BIPC_STATE_WAITING; - return; + if (rc < 0) { + return rc; } rc = nn_usock_bind (&self->usock, (struct sockaddr*) &ss, sizeof (struct sockaddr_un)); - if (nn_slow (rc < 0)) { + if (rc < 0) { nn_usock_stop (&self->usock); - self->state = NN_BIPC_STATE_CLOSING; - return; + return rc; } rc = nn_usock_listen (&self->usock, NN_BIPC_BACKLOG); - if (nn_slow (rc < 0)) { + if (rc < 0) { nn_usock_stop (&self->usock); - self->state = NN_BIPC_STATE_CLOSING; - return; + return rc; } nn_bipc_start_accepting (self); - self->state = NN_BIPC_STATE_ACTIVE; + + return 0; } +/******************************************************************************/ +/* State machine actions. */ +/******************************************************************************/ + static void nn_bipc_start_accepting (struct nn_bipc *self) { nn_assert (self->aipc == NULL); diff --git a/src/transports/tcp/btcp.c b/src/transports/tcp/btcp.c index 2c7f75e..1b012c8 100644 --- a/src/transports/tcp/btcp.c +++ b/src/transports/tcp/btcp.c @@ -1,5 +1,6 @@ /* Copyright (c) 2012-2013 Martin Sustrik All rights reserved. + Copyright 2016 Garrett D'Amore <garrett@damore.org> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), @@ -55,14 +56,9 @@ #define NN_BTCP_STATE_STOPPING_ATCP 3 #define NN_BTCP_STATE_STOPPING_USOCK 4 #define NN_BTCP_STATE_STOPPING_ATCPS 5 -#define NN_BTCP_STATE_LISTENING 6 -#define NN_BTCP_STATE_WAITING 7 -#define NN_BTCP_STATE_CLOSING 8 -#define NN_BTCP_STATE_STOPPING_BACKOFF 9 #define NN_BTCP_SRC_USOCK 1 #define NN_BTCP_SRC_ATCP 2 -#define NN_BTCP_SRC_RECONNECT_TIMER 3 struct nn_btcp { @@ -82,9 +78,6 @@ struct nn_btcp { /* List of accepted connections. */ struct nn_list atcps; - - /* Used to wait before retrying to connect. */ - struct nn_backoff retry; }; /* nn_epbase virtual interface implementation. */ @@ -100,7 +93,7 @@ static void nn_btcp_handler (struct nn_fsm *self, int src, int type, void *srcptr); static void nn_btcp_shutdown (struct nn_fsm *self, int src, int type, void *srcptr); -static void nn_btcp_start_listening (struct nn_btcp *self); +static int nn_btcp_listen (struct nn_btcp *self); static void nn_btcp_start_accepting (struct nn_btcp *self); int nn_btcp_create (void *hint, struct nn_epbase **epbase) @@ -114,9 +107,6 @@ int nn_btcp_create (void *hint, struct nn_epbase **epbase) size_t sslen; int ipv4only; size_t ipv4onlylen; - int reconnect_ivl; - int reconnect_ivl_max; - size_t sz; /* Allocate the new endpoint object. */ self = nn_alloc (sizeof (struct nn_btcp), "btcp"); @@ -157,25 +147,20 @@ int nn_btcp_create (void *hint, struct nn_epbase **epbase) nn_fsm_init_root (&self->fsm, nn_btcp_handler, nn_btcp_shutdown, nn_epbase_getctx (&self->epbase)); self->state = NN_BTCP_STATE_IDLE; - sz = sizeof (reconnect_ivl); - nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL, - &reconnect_ivl, &sz); - nn_assert (sz == sizeof (reconnect_ivl)); - sz = sizeof (reconnect_ivl_max); - nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX, - &reconnect_ivl_max, &sz); - nn_assert (sz == sizeof (reconnect_ivl_max)); - if (reconnect_ivl_max == 0) - reconnect_ivl_max = reconnect_ivl; - nn_backoff_init (&self->retry, NN_BTCP_SRC_RECONNECT_TIMER, - reconnect_ivl, reconnect_ivl_max, &self->fsm); - nn_usock_init (&self->usock, NN_BTCP_SRC_USOCK, &self->fsm); self->atcp = NULL; nn_list_init (&self->atcps); /* Start the state machine. */ nn_fsm_start (&self->fsm); + nn_usock_init (&self->usock, NN_BTCP_SRC_USOCK, &self->fsm); + + rc = nn_btcp_listen (self); + if (rc != 0) { + nn_epbase_term (&self->epbase); + return rc; + } + /* Return the base class as an out parameter. */ *epbase = &self->epbase; @@ -201,7 +186,6 @@ static void nn_btcp_destroy (struct nn_epbase *self) nn_list_term (&btcp->atcps); nn_assert (btcp->atcp == NULL); nn_usock_term (&btcp->usock); - nn_backoff_term (&btcp->retry); nn_epbase_term (&btcp->epbase); nn_fsm_term (&btcp->fsm); @@ -218,7 +202,6 @@ static void nn_btcp_shutdown (struct nn_fsm *self, int src, int type, btcp = nn_cont (self, struct nn_btcp, fsm); if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { - nn_backoff_stop (&btcp->retry); if (btcp->atcp) { nn_atcp_stop (btcp->atcp); btcp->state = NN_BTCP_STATE_STOPPING_ATCP; @@ -237,8 +220,7 @@ static void nn_btcp_shutdown (struct nn_fsm *self, int src, int type, btcp->state = NN_BTCP_STATE_STOPPING_USOCK; } if (nn_slow (btcp->state == NN_BTCP_STATE_STOPPING_USOCK)) { - if (!nn_usock_isidle (&btcp->usock) || - !nn_backoff_isidle (&btcp->retry)) + if (!nn_usock_isidle (&btcp->usock)) return; for (it = nn_list_begin (&btcp->atcps); it != nn_list_end (&btcp->atcps); @@ -286,51 +268,33 @@ static void nn_btcp_handler (struct nn_fsm *self, int src, int type, /* IDLE state. */ /******************************************************************************/ case NN_BTCP_STATE_IDLE: - switch (src) { - - case NN_FSM_ACTION: - switch (type) { - case NN_FSM_START: - nn_btcp_start_listening (btcp); - return; - default: - nn_fsm_bad_action (btcp->state, src, type); - } - - default: - nn_fsm_bad_source (btcp->state, src, type); - } + nn_assert (src == NN_FSM_ACTION); + nn_assert (type == NN_FSM_START); + btcp->state = NN_BTCP_STATE_ACTIVE; + return; /******************************************************************************/ /* ACTIVE state. */ /* The execution is yielded to the atcp state machine in this state. */ /******************************************************************************/ case NN_BTCP_STATE_ACTIVE: - if (srcptr == btcp->atcp) { - switch (type) { - case NN_ATCP_ACCEPTED: - - /* Move the newly created connection to the list of existing - connections. */ - nn_list_insert (&btcp->atcps, &btcp->atcp->item, - nn_list_end (&btcp->atcps)); - btcp->atcp = NULL; - - /* Start waiting for a new incoming connection. */ - nn_btcp_start_accepting (btcp); - - return; - - default: - nn_fsm_bad_action (btcp->state, src, type); - } + if (src == NN_BTCP_SRC_USOCK) { + /* usock object cleaning up */ + nn_assert (type == NN_USOCK_SHUTDOWN || type == NN_USOCK_STOPPED); + return; } - /* For all remaining events we'll assume they are coming from one - of remaining child atcp objects. */ + /* All other events come from child atcp objects. */ nn_assert (src == NN_BTCP_SRC_ATCP); atcp = (struct nn_atcp*) srcptr; switch (type) { + case NN_ATCP_ACCEPTED: + nn_assert (btcp->atcp == atcp) ; + nn_list_insert (&btcp->atcps, &atcp->item, + nn_list_end (&btcp->atcps)); + btcp->atcp = NULL; + nn_btcp_start_accepting (btcp); + return; case NN_ATCP_ERROR: nn_atcp_stop (atcp); return; @@ -344,71 +308,6 @@ static void nn_btcp_handler (struct nn_fsm *self, int src, int type, } /******************************************************************************/ -/* CLOSING_USOCK state. */ -/* usock object was asked to stop but it haven't stopped yet. */ -/******************************************************************************/ - case NN_BTCP_STATE_CLOSING: - switch (src) { - - case NN_BTCP_SRC_USOCK: - switch (type) { - case NN_USOCK_SHUTDOWN: - return; - case NN_USOCK_STOPPED: - nn_backoff_start (&btcp->retry); - btcp->state = NN_BTCP_STATE_WAITING; - return; - default: - nn_fsm_bad_action (btcp->state, src, type); - } - - default: - nn_fsm_bad_source (btcp->state, src, type); - } - -/******************************************************************************/ -/* WAITING state. */ -/* Waiting before re-bind is attempted. This way we won't overload */ -/* the system by continuous re-bind attemps. */ -/******************************************************************************/ - case NN_BTCP_STATE_WAITING: - switch (src) { - - case NN_BTCP_SRC_RECONNECT_TIMER: - switch (type) { - case NN_BACKOFF_TIMEOUT: - nn_backoff_stop (&btcp->retry); - btcp->state = NN_BTCP_STATE_STOPPING_BACKOFF; - return; - default: - nn_fsm_bad_action (btcp->state, src, type); - } - - default: - nn_fsm_bad_source (btcp->state, src, type); - } - -/******************************************************************************/ -/* STOPPING_BACKOFF state. */ -/* backoff object was asked to stop, but it haven't stopped yet. */ -/******************************************************************************/ - case NN_BTCP_STATE_STOPPING_BACKOFF: - switch (src) { - - case NN_BTCP_SRC_RECONNECT_TIMER: - switch (type) { - case NN_BACKOFF_STOPPED: - nn_btcp_start_listening (btcp); - return; - default: - nn_fsm_bad_action (btcp->state, src, type); - } - - default: - nn_fsm_bad_source (btcp->state, src, type); - } - -/******************************************************************************/ /* Invalid state. */ /******************************************************************************/ default: @@ -416,11 +315,7 @@ static void nn_btcp_handler (struct nn_fsm *self, int src, int type, } } -/******************************************************************************/ -/* State machine actions. */ -/******************************************************************************/ - -static void nn_btcp_start_listening (struct nn_btcp *self) +static int nn_btcp_listen (struct nn_btcp *self) { int rc; struct sockaddr_storage ss; @@ -439,11 +334,14 @@ static void nn_btcp_start_listening (struct nn_btcp *self) /* Parse the port. */ end = addr + strlen (addr); pos = strrchr (addr, ':'); - nn_assert (pos); + if (pos == NULL) { + return -EINVAL; + } ++pos; rc = nn_port_resolve (pos, end - pos); - nn_assert (rc >= 0); - port = rc; + if (rc <= 0) + return rc; + port = (uint16_t) rc; /* Parse the address. */ ipv4onlylen = sizeof (ipv4only); @@ -451,45 +349,50 @@ static void nn_btcp_start_listening (struct nn_btcp *self) &ipv4only, &ipv4onlylen); nn_assert (ipv4onlylen == sizeof (ipv4only)); rc = nn_iface_resolve (addr, pos - addr - 1, ipv4only, &ss, &sslen); - errnum_assert (rc == 0, -rc); + if (rc < 0) { + return rc; + } /* Combine the port and the address. */ - if (ss.ss_family == AF_INET) { + switch (ss.ss_family) { + case AF_INET: ((struct sockaddr_in*) &ss)->sin_port = htons (port); sslen = sizeof (struct sockaddr_in); - } - else if (ss.ss_family == AF_INET6) { + break; + case AF_INET6: ((struct sockaddr_in6*) &ss)->sin6_port = htons (port); sslen = sizeof (struct sockaddr_in6); - } - else + break; + default: nn_assert (0); + } /* Start listening for incoming connections. */ rc = nn_usock_start (&self->usock, ss.ss_family, SOCK_STREAM, 0); - if (nn_slow (rc < 0)) { - nn_backoff_start (&self->retry); - self->state = NN_BTCP_STATE_WAITING; - return; + if (rc < 0) { + return rc; } rc = nn_usock_bind (&self->usock, (struct sockaddr*) &ss, (size_t) sslen); - if (nn_slow (rc < 0)) { - nn_usock_stop (&self->usock); - self->state = NN_BTCP_STATE_CLOSING; - return; + if (rc < 0) { + nn_usock_stop (&self->usock); + return rc; } rc = nn_usock_listen (&self->usock, NN_BTCP_BACKLOG); - if (nn_slow (rc < 0)) { + if (rc < 0) { nn_usock_stop (&self->usock); - self->state = NN_BTCP_STATE_CLOSING; - return; + return rc; } nn_btcp_start_accepting(self); - self->state = NN_BTCP_STATE_ACTIVE; + + return 0; } +/******************************************************************************/ +/* State machine actions. */ +/******************************************************************************/ + static void nn_btcp_start_accepting (struct nn_btcp *self) { nn_assert (self->atcp == NULL); @@ -502,4 +405,3 @@ static void nn_btcp_start_accepting (struct nn_btcp *self) /* Start waiting for a new incoming connection. */ nn_atcp_start (self->atcp, &self->usock); } - diff --git a/src/transports/ws/bws.c b/src/transports/ws/bws.c index cc7321c..06b7759 100644 --- a/src/transports/ws/bws.c +++ b/src/transports/ws/bws.c @@ -1,6 +1,7 @@ /* Copyright (c) 2012-2013 250bpm s.r.o. All rights reserved. Copyright (c) 2014-2016 Jack R. Dunaway. All rights reserved. + Copyright 2016 Garrett D'Amore <garrett@damore.org> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), @@ -30,8 +31,6 @@ #include "../../aio/fsm.h" #include "../../aio/usock.h" -#include "../utils/backoff.h" - #include "../../utils/err.h" #include "../../utils/cont.h" #include "../../utils/alloc.h" @@ -56,14 +55,9 @@ #define NN_BWS_STATE_STOPPING_AWS 3 #define NN_BWS_STATE_STOPPING_USOCK 4 #define NN_BWS_STATE_STOPPING_AWSS 5 -#define NN_BWS_STATE_LISTENING 6 -#define NN_BWS_STATE_WAITING 7 -#define NN_BWS_STATE_CLOSING 8 -#define NN_BWS_STATE_STOPPING_BACKOFF 9 #define NN_BWS_SRC_USOCK 1 #define NN_BWS_SRC_AWS 2 -#define NN_BWS_SRC_RECONNECT_TIMER 3 struct nn_bws { @@ -83,9 +77,6 @@ struct nn_bws { /* List of accepted connections. */ struct nn_list awss; - - /* Timer used to throttle reconnection attempts. */ - struct nn_backoff retry; }; /* nn_epbase virtual interface implementation. */ @@ -101,7 +92,7 @@ static void nn_bws_handler (struct nn_fsm *self, int src, int type, void *srcptr); static void nn_bws_shutdown (struct nn_fsm *self, int src, int type, void *srcptr); -static void nn_bws_start_listening (struct nn_bws *self); +static int nn_bws_listen (struct nn_bws *self); static void nn_bws_start_accepting (struct nn_bws *self); int nn_bws_create (void *hint, struct nn_epbase **epbase) @@ -115,8 +106,6 @@ int nn_bws_create (void *hint, struct nn_epbase **epbase) size_t sslen; int ipv4only; size_t ipv4onlylen; - int reconnect_ivl; - int reconnect_ivl_max; size_t sz; /* Allocate the new endpoint object. */ @@ -158,25 +147,20 @@ int nn_bws_create (void *hint, struct nn_epbase **epbase) nn_fsm_init_root (&self->fsm, nn_bws_handler, nn_bws_shutdown, nn_epbase_getctx (&self->epbase)); self->state = NN_BWS_STATE_IDLE; - sz = sizeof (reconnect_ivl); - nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL, - &reconnect_ivl, &sz); - nn_assert (sz == sizeof (reconnect_ivl)); - sz = sizeof (reconnect_ivl_max); - nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX, - &reconnect_ivl_max, &sz); - nn_assert (sz == sizeof (reconnect_ivl_max)); - if (reconnect_ivl_max == 0) - reconnect_ivl_max = reconnect_ivl; - nn_backoff_init (&self->retry, NN_BWS_SRC_RECONNECT_TIMER, - reconnect_ivl, reconnect_ivl_max, &self->fsm); - nn_usock_init (&self->usock, NN_BWS_SRC_USOCK, &self->fsm); self->aws = NULL; nn_list_init (&self->awss); /* Start the state machine. */ nn_fsm_start (&self->fsm); + nn_usock_init (&self->usock, NN_BWS_SRC_USOCK, &self->fsm); + + rc = nn_bws_listen (self); + if (rc != 0) { + nn_epbase_term (&self->epbase); + return rc; + } + /* Return the base class as an out parameter. */ *epbase = &self->epbase; @@ -202,7 +186,6 @@ static void nn_bws_destroy (struct nn_epbase *self) nn_list_term (&bws->awss); nn_assert (bws->aws == NULL); nn_usock_term (&bws->usock); - nn_backoff_term (&bws->retry); nn_epbase_term (&bws->epbase); nn_fsm_term (&bws->fsm); @@ -219,7 +202,6 @@ static void nn_bws_shutdown (struct nn_fsm *self, int src, int type, bws = nn_cont (self, struct nn_bws, fsm); if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { - nn_backoff_stop (&bws->retry); if (bws->aws) { nn_aws_stop (bws->aws); bws->state = NN_BWS_STATE_STOPPING_AWS; @@ -238,7 +220,7 @@ static void nn_bws_shutdown (struct nn_fsm *self, int src, int type, bws->state = NN_BWS_STATE_STOPPING_USOCK; } if (nn_slow (bws->state == NN_BWS_STATE_STOPPING_USOCK)) { - if (!nn_usock_isidle (&bws->usock) || !nn_backoff_isidle (&bws->retry)) + if (!nn_usock_isidle (&bws->usock)) return; for (it = nn_list_begin (&bws->awss); it != nn_list_end (&bws->awss); @@ -286,44 +268,19 @@ static void nn_bws_handler (struct nn_fsm *self, int src, int type, /* IDLE state. */ /******************************************************************************/ case NN_BWS_STATE_IDLE: - switch (src) { - - case NN_FSM_ACTION: - switch (type) { - case NN_FSM_START: - nn_bws_start_listening (bws); - return; - default: - nn_fsm_bad_action (bws->state, src, type); - } - - default: - nn_fsm_bad_source (bws->state, src, type); - } + nn_assert (src == NN_FSM_ACTION); + nn_assert (type == NN_FSM_START); + bws->state = NN_BWS_STATE_ACTIVE; + return; /******************************************************************************/ /* ACTIVE state. */ /* The execution is yielded to the aws state machine in this state. */ /******************************************************************************/ case NN_BWS_STATE_ACTIVE: - if (srcptr == bws->aws) { - switch (type) { - case NN_AWS_ACCEPTED: - - /* Move the newly created connection to the list of existing - connections. */ - nn_list_insert (&bws->awss, &bws->aws->item, - nn_list_end (&bws->awss)); - bws->aws = NULL; - - /* Start waiting for a new incoming connection. */ - nn_bws_start_accepting (bws); - - return; - - default: - nn_fsm_bad_action (bws->state, src, type); - } + if (src == NN_BWS_SRC_USOCK) { + nn_assert (type == NN_USOCK_SHUTDOWN || type == NN_USOCK_STOPPED); + return; } /* For all remaining events we'll assume they are coming from one @@ -331,6 +288,18 @@ static void nn_bws_handler (struct nn_fsm *self, int src, int type, nn_assert (src == NN_BWS_SRC_AWS); aws = (struct nn_aws*) srcptr; switch (type) { + case NN_AWS_ACCEPTED: + + /* Move the newly created connection to the list of existing + connections. */ + nn_list_insert (&bws->awss, &bws->aws->item, + nn_list_end (&bws->awss)); + bws->aws = NULL; + + /* Start waiting for a new incoming connection. */ + nn_bws_start_accepting (bws); + return; + case NN_AWS_ERROR: nn_aws_stop (aws); return; @@ -344,71 +313,6 @@ static void nn_bws_handler (struct nn_fsm *self, int src, int type, } /******************************************************************************/ -/* CLOSING_USOCK state. */ -/* usock object was asked to stop but it hasn't stopped yet. */ -/******************************************************************************/ - case NN_BWS_STATE_CLOSING: - switch (src) { - - case NN_BWS_SRC_USOCK: - switch (type) { - case NN_USOCK_SHUTDOWN: - return; - case NN_USOCK_STOPPED: - nn_backoff_start (&bws->retry); - bws->state = NN_BWS_STATE_WAITING; - return; - default: - nn_fsm_bad_action (bws->state, src, type); - } - - default: - nn_fsm_bad_source (bws->state, src, type); - } - -/******************************************************************************/ -/* WAITING state. */ -/* Waiting before re-bind is attempted. This way we won't overload */ -/* the system by continuous re-bind attempts. */ -/******************************************************************************/ - case NN_BWS_STATE_WAITING: - switch (src) { - - case NN_BWS_SRC_RECONNECT_TIMER: - switch (type) { - case NN_BACKOFF_TIMEOUT: - nn_backoff_stop (&bws->retry); - bws->state = NN_BWS_STATE_STOPPING_BACKOFF; - return; - default: - nn_fsm_bad_action (bws->state, src, type); - } - - default: - nn_fsm_bad_source (bws->state, src, type); - } - -/******************************************************************************/ -/* STOPPING_BACKOFF state. */ -/* backoff object was asked to stop, but it haven't stopped yet. */ -/******************************************************************************/ - case NN_BWS_STATE_STOPPING_BACKOFF: - switch (src) { - - case NN_BWS_SRC_RECONNECT_TIMER: - switch (type) { - case NN_BACKOFF_STOPPED: - nn_bws_start_listening (bws); - return; - default: - nn_fsm_bad_action (bws->state, src, type); - } - - default: - nn_fsm_bad_source (bws->state, src, type); - } - -/******************************************************************************/ /* Invalid state. */ /******************************************************************************/ default: @@ -416,11 +320,7 @@ static void nn_bws_handler (struct nn_fsm *self, int src, int type, } } -/******************************************************************************/ -/* State machine actions. */ -/******************************************************************************/ - -static void nn_bws_start_listening (struct nn_bws *self) +static int nn_bws_listen (struct nn_bws *self) { int rc; struct sockaddr_storage ss; @@ -442,8 +342,10 @@ static void nn_bws_start_listening (struct nn_bws *self) nn_assert (pos); ++pos; rc = nn_port_resolve (pos, end - pos); - nn_assert (rc >= 0); - port = rc; + if (rc < 0) { + return rc; + } + port = (uint16_t) rc; /* Parse the address. */ ipv4onlylen = sizeof (ipv4only); @@ -451,7 +353,9 @@ static void nn_bws_start_listening (struct nn_bws *self) &ipv4only, &ipv4onlylen); nn_assert (ipv4onlylen == sizeof (ipv4only)); rc = nn_iface_resolve (addr, pos - addr - 1, ipv4only, &ss, &sslen); - errnum_assert (rc == 0, -rc); + if (rc < 0) { + return rc; + } /* Combine the port and the address. */ if (ss.ss_family == AF_INET) { @@ -467,29 +371,30 @@ static void nn_bws_start_listening (struct nn_bws *self) /* Start listening for incoming connections. */ rc = nn_usock_start (&self->usock, ss.ss_family, SOCK_STREAM, 0); - if (nn_slow (rc < 0)) { - nn_backoff_start (&self->retry); - self->state = NN_BWS_STATE_WAITING; - return; + if (rc < 0) { + return rc; } rc = nn_usock_bind (&self->usock, (struct sockaddr*) &ss, (size_t) sslen); - if (nn_slow (rc < 0)) { + if (rc < 0) { nn_usock_stop (&self->usock); - self->state = NN_BWS_STATE_CLOSING; - return; + return rc; } rc = nn_usock_listen (&self->usock, NN_BWS_BACKLOG); - if (nn_slow (rc < 0)) { + if (rc < 0) { nn_usock_stop (&self->usock); - self->state = NN_BWS_STATE_CLOSING; - return; + return rc; } nn_bws_start_accepting(self); - self->state = NN_BWS_STATE_ACTIVE; + + return 0; } +/******************************************************************************/ +/* State machine actions. */ +/******************************************************************************/ + static void nn_bws_start_accepting (struct nn_bws *self) { nn_assert (self->aws == NULL); diff --git a/tests/bug328.c b/tests/bug328.c index 76d0725..dfb8d1e 100644 --- a/tests/bug328.c +++ b/tests/bug328.c @@ -1,6 +1,7 @@ /* Copyright (c) 2012 Martin Sustrik All rights reserved. Copyright 2016 Franklin "Snaipe" Mathieu <franklinmathieu@gmail.com> + Copyright 2016 Garrett D'Amore <garrett@damore.org> Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), @@ -30,7 +31,6 @@ int main (int argc, const char *argv[]) { int sb; int sc; - int s1; char socket_address[128]; test_addr_from(socket_address, "tcp", "127.0.0.1", @@ -38,21 +38,17 @@ int main (int argc, const char *argv[]) sb = test_socket (AF_SP, NN_PAIR); test_bind (sb, socket_address); - s1 = test_socket (AF_SP, NN_PAIR); - test_bind (s1, socket_address); sc = test_socket (AF_SP, NN_PAIR); test_connect (sc, socket_address); nn_sleep(100); test_send (sc, "ABC"); test_recv (sb, "ABC"); + nn_assert (nn_get_statistic (sc, NN_STAT_CURRENT_CONNECTIONS) == 1); test_close (sb); nn_sleep(300); - test_send (s1, "ABC"); - test_recv (sc, "ABC"); - + nn_assert (nn_get_statistic (sc, NN_STAT_CURRENT_CONNECTIONS) == 0); test_close (sc); - test_close (s1); return 0; } diff --git a/tests/ipc.c b/tests/ipc.c index 24b5390..7569376 100644 --- a/tests/ipc.c +++ b/tests/ipc.c @@ -37,6 +37,7 @@ int main () int sc; int i; int s1, s2; + int rc; int size; char * buf; @@ -98,39 +99,33 @@ int main () test_close (s1); test_close (sb); +/* On Windows, CreateNamedPipeA does not run exclusively. + We should look at fixing this, but it will require + changing the usock code for Windows. In the meantime just + disable this test on Windows. */ +#if !defined(NN_HAVE_WINDOWS) /* Test two sockets binding to the same address. */ sb = test_socket (AF_SP, NN_PAIR); test_bind (sb, SOCKET_ADDRESS); s1 = test_socket (AF_SP, NN_PAIR); - test_bind (s1, SOCKET_ADDRESS); + rc = nn_bind (s1, SOCKET_ADDRESS); + nn_assert (rc < 0); + errno_assert (nn_errno () == EADDRINUSE); sc = test_socket (AF_SP, NN_PAIR); test_connect (sc, SOCKET_ADDRESS); nn_sleep (100); test_send (sb, "ABC"); test_recv (sc, "ABC"); test_close (sb); - test_send (s1, "ABC"); - test_recv (sc, "ABC"); test_close (sc); test_close (s1); +#endif - /* Test closing a socket that is waiting to bind. */ - sb = test_socket (AF_SP, NN_PAIR); - test_bind (sb, SOCKET_ADDRESS); - nn_sleep (100); - s1 = test_socket (AF_SP, NN_PAIR); - test_bind (s1, SOCKET_ADDRESS); + /* Test closing a socket that is waiting to connect. */ sc = test_socket (AF_SP, NN_PAIR); test_connect (sc, SOCKET_ADDRESS); nn_sleep (100); - test_send (sb, "ABC"); - test_recv (sc, "ABC"); - test_close (s1); - test_send (sb, "ABC"); - test_recv (sc, "ABC"); - test_close (sb); test_close (sc); return 0; } - diff --git a/tests/tcp.c b/tests/tcp.c index b4c8130..a5063a2 100644 --- a/tests/tcp.c +++ b/tests/tcp.c @@ -174,15 +174,17 @@ int main (int argc, const char *argv[]) sb = test_socket (AF_SP, NN_PAIR); test_bind (sb, socket_address); s1 = test_socket (AF_SP, NN_PAIR); - test_bind (s1, socket_address); + + rc = nn_bind (s1, socket_address); + nn_assert (rc < 0); + errno_assert (nn_errno () == EADDRINUSE); + sc = test_socket (AF_SP, NN_PAIR); test_connect (sc, socket_address); nn_sleep (100); test_send (sb, "ABC"); test_recv (sc, "ABC"); test_close (sb); - test_send (s1, "ABC"); - test_recv (sc, "ABC"); test_close (sc); test_close (s1); @@ -215,21 +217,10 @@ int main (int argc, const char *argv[]) errno_assert (nn_errno () == EINVAL); test_close (sb); - /* Test closing a socket that is waiting to bind. */ - sb = test_socket (AF_SP, NN_PAIR); - test_bind (sb, socket_address); - nn_sleep (100); - s1 = test_socket (AF_SP, NN_PAIR); - test_bind (s1, socket_address); + /* Test closing a socket that is waiting to connect. */ sc = test_socket (AF_SP, NN_PAIR); test_connect (sc, socket_address); nn_sleep (100); - test_send (sb, "ABC"); - test_recv (sc, "ABC"); - test_close (s1); - test_send (sb, "ABC"); - test_recv (sc, "ABC"); - test_close (sb); test_close (sc); return 0; @@ -188,6 +188,17 @@ int main (int argc, const char *argv[]) test_close (sc); test_close (sb); + /* Test two sockets binding to the same address. */ + sb = test_socket (AF_SP, NN_PAIR); + test_bind (sb, socket_address); + sb2 = test_socket (AF_SP, NN_PAIR); + + rc = nn_bind (sb2, socket_address); + nn_assert (rc < 0); + errno_assert (nn_errno () == EADDRINUSE); + test_close(sb); + test_close(sb2); + /* Test that NN_RCVMAXSIZE can be -1, but not lower */ sb = test_socket (AF_SP, NN_PAIR); opt = -1; @@ -232,19 +243,10 @@ int main (int argc, const char *argv[]) test_text (); - /* Test closing a socket that is waiting to bind. */ - sb = test_socket (AF_SP, NN_PAIR); - test_bind (sb, socket_address); - sb2 = test_socket (AF_SP, NN_PAIR); - test_bind (sb2, socket_address); + /* Test closing a socket that is waiting to connect. */ sc = test_socket (AF_SP, NN_PAIR); test_connect (sc, socket_address); - test_send (sb, "ABC"); - test_recv (sc, "ABC"); - test_close (sb2); - test_send (sb, "ABC"); - test_recv (sc, "ABC"); - test_close (sb); + nn_sleep (100); test_close (sc); return 0; |