diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2014-11-18 21:41:46 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2014-11-20 16:28:55 +0100 |
commit | ef741eeb01173f094fabf6fad91d6f3d9e3a4211 (patch) | |
tree | f06a9adf1350ca396fc98af2b6ca5afbc91b834c | |
parent | 7cc7a88c91530438e7538177f92c7da43d304372 (diff) | |
download | nanomsg-ef741eeb01173f094fabf6fad91d6f3d9e3a4211.tar.gz |
First version of tcpmux transport added
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | Makefile.am | 15 | ||||
-rw-r--r-- | src/core/global.c | 2 | ||||
-rw-r--r-- | src/transports/tcpmux/atcpmux.c | 314 | ||||
-rw-r--r-- | src/transports/tcpmux/atcpmux.h | 81 | ||||
-rw-r--r-- | src/transports/tcpmux/btcpmux.c | 506 | ||||
-rw-r--r-- | src/transports/tcpmux/btcpmux.h | 33 | ||||
-rw-r--r-- | src/transports/tcpmux/ctcpmux.c | 632 | ||||
-rw-r--r-- | src/transports/tcpmux/ctcpmux.h | 33 | ||||
-rw-r--r-- | src/transports/tcpmux/stcpmux.c | 417 | ||||
-rw-r--r-- | src/transports/tcpmux/stcpmux.h | 90 | ||||
-rw-r--r-- | src/transports/tcpmux/tcpmux.c | 159 | ||||
-rw-r--r-- | src/transports/tcpmux/tcpmux.h | 30 |
12 files changed, 2311 insertions, 1 deletions
diff --git a/Makefile.am b/Makefile.am index 7ff82fe..efb2b76 100644 --- a/Makefile.am +++ b/Makefile.am @@ -305,12 +305,25 @@ TRANSPORTS_WS = \ src/transports/ws/sha1.h \ src/transports/ws/sha1.c +TRANSPORTS_TCPMUX = \ + src/transports/tcpmux/atcpmux.h \ + src/transports/tcpmux/atcpmux.c \ + src/transports/tcpmux/btcpmux.h \ + src/transports/tcpmux/btcpmux.c \ + src/transports/tcpmux/ctcpmux.h \ + src/transports/tcpmux/ctcpmux.c \ + src/transports/tcpmux/stcpmux.h \ + src/transports/tcpmux/stcpmux.c \ + src/transports/tcpmux/tcpmux.h \ + src/transports/tcpmux/tcpmux.c + NANOMSG_TRANSPORTS = \ $(TRANSPORTS_UTILS) \ $(TRANSPORTS_INPROC) \ $(TRANSPORTS_IPC) \ $(TRANSPORTS_TCP) \ - $(TRANSPORTS_WS) + $(TRANSPORTS_WS) \ + $(TRANSPORTS_TCPMUX) libnanomsg_la_SOURCES = \ src/transport.h \ diff --git a/src/core/global.c b/src/core/global.c index 67cc22f..672345d 100644 --- a/src/core/global.c +++ b/src/core/global.c @@ -47,6 +47,7 @@ #include "../transports/ipc/ipc.h" #include "../transports/tcp/tcp.h" #include "../transports/ws/ws.h" +#include "../transports/tcpmux/tcpmux.h" #include "../protocols/pair/pair.h" #include "../protocols/pair/xpair.h" @@ -256,6 +257,7 @@ static void nn_global_init (void) nn_global_add_transport (nn_ipc); nn_global_add_transport (nn_tcp); nn_global_add_transport (nn_ws); + nn_global_add_transport (nn_tcpmux); /* Plug in individual socktypes. */ nn_global_add_socktype (nn_pair_socktype); diff --git a/src/transports/tcpmux/atcpmux.c b/src/transports/tcpmux/atcpmux.c new file mode 100644 index 0000000..7b24ab0 --- /dev/null +++ b/src/transports/tcpmux/atcpmux.c @@ -0,0 +1,314 @@ +/* + Copyright (c) 2012-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "atcpmux.h" + +#include "../../utils/err.h" +#include "../../utils/cont.h" +#include "../../utils/attr.h" + +#define NN_ATCPMUX_STATE_IDLE 1 +#define NN_ATCPMUX_STATE_ACCEPTING 2 +#define NN_ATCPMUX_STATE_ACTIVE 3 +#define NN_ATCPMUX_STATE_STOPPING_STCPMUX 4 +#define NN_ATCPMUX_STATE_STOPPING_USOCK 5 +#define NN_ATCPMUX_STATE_DONE 6 +#define NN_ATCPMUX_STATE_STOPPING_STCPMUX_FINAL 7 +#define NN_ATCPMUX_STATE_STOPPING 8 + +#define NN_ATCPMUX_SRC_USOCK 1 +#define NN_ATCPMUX_SRC_STCPMUX 2 +#define NN_ATCPMUX_SRC_LISTENER 3 + +/* Private functions. */ +static void nn_atcpmux_handler (struct nn_fsm *self, int src, int type, + void *srcptr); +static void nn_atcpmux_shutdown (struct nn_fsm *self, int src, int type, + void *srcptr); + +void nn_atcpmux_init (struct nn_atcpmux *self, int src, + struct nn_epbase *epbase, struct nn_fsm *owner) +{ + nn_fsm_init (&self->fsm, nn_atcpmux_handler, nn_atcpmux_shutdown, + src, self, owner); + self->state = NN_ATCPMUX_STATE_IDLE; + self->epbase = epbase; + nn_usock_init (&self->usock, NN_ATCPMUX_SRC_USOCK, &self->fsm); + self->listener = NULL; + self->listener_owner.src = -1; + self->listener_owner.fsm = NULL; + nn_stcpmux_init (&self->stcpmux, NN_ATCPMUX_SRC_STCPMUX, + epbase, &self->fsm); + nn_fsm_event_init (&self->accepted); + nn_fsm_event_init (&self->done); + nn_list_item_init (&self->item); +} + +void nn_atcpmux_term (struct nn_atcpmux *self) +{ + nn_assert_state (self, NN_ATCPMUX_STATE_IDLE); + + nn_list_item_term (&self->item); + nn_fsm_event_term (&self->done); + nn_fsm_event_term (&self->accepted); + nn_stcpmux_term (&self->stcpmux); + nn_usock_term (&self->usock); + nn_fsm_term (&self->fsm); +} + +int nn_atcpmux_isidle (struct nn_atcpmux *self) +{ + return nn_fsm_isidle (&self->fsm); +} + +void nn_atcpmux_start (struct nn_atcpmux *self, struct nn_usock *listener) +{ + nn_assert_state (self, NN_ATCPMUX_STATE_IDLE); + + /* Take ownership of the listener socket. */ + self->listener = listener; + self->listener_owner.src = NN_ATCPMUX_SRC_LISTENER; + self->listener_owner.fsm = &self->fsm; + nn_usock_swap_owner (listener, &self->listener_owner); + + /* Start the state machine. */ + nn_fsm_start (&self->fsm); +} + +void nn_atcpmux_stop (struct nn_atcpmux *self) +{ + nn_fsm_stop (&self->fsm); +} + +static void nn_atcpmux_shutdown (struct nn_fsm *self, int src, int type, + NN_UNUSED void *srcptr) +{ + struct nn_atcpmux *atcpmux; + + atcpmux = nn_cont (self, struct nn_atcpmux, fsm); + + if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { + if (!nn_stcpmux_isidle (&atcpmux->stcpmux)) { + nn_epbase_stat_increment (atcpmux->epbase, + NN_STAT_DROPPED_CONNECTIONS, 1); + nn_stcpmux_stop (&atcpmux->stcpmux); + } + atcpmux->state = NN_ATCPMUX_STATE_STOPPING_STCPMUX_FINAL; + } + if (nn_slow (atcpmux->state == NN_ATCPMUX_STATE_STOPPING_STCPMUX_FINAL)) { + if (!nn_stcpmux_isidle (&atcpmux->stcpmux)) + return; + nn_usock_stop (&atcpmux->usock); + atcpmux->state = NN_ATCPMUX_STATE_STOPPING; + } + if (nn_slow (atcpmux->state == NN_ATCPMUX_STATE_STOPPING)) { + if (!nn_usock_isidle (&atcpmux->usock)) + return; + if (atcpmux->listener) { + nn_assert (atcpmux->listener_owner.fsm); + nn_usock_swap_owner (atcpmux->listener, &atcpmux->listener_owner); + atcpmux->listener = NULL; + atcpmux->listener_owner.src = -1; + atcpmux->listener_owner.fsm = NULL; + } + atcpmux->state = NN_ATCPMUX_STATE_IDLE; + nn_fsm_stopped (&atcpmux->fsm, NN_ATCPMUX_STOPPED); + return; + } + + nn_fsm_bad_action(atcpmux->state, src, type); +} + +static void nn_atcpmux_handler (struct nn_fsm *self, int src, int type, + NN_UNUSED void *srcptr) +{ + struct nn_atcpmux *atcpmux; + int val; + size_t sz; + + atcpmux = nn_cont (self, struct nn_atcpmux, fsm); + + switch (atcpmux->state) { + +/******************************************************************************/ +/* IDLE state. */ +/* The state machine wasn't yet started. */ +/******************************************************************************/ + case NN_ATCPMUX_STATE_IDLE: + switch (src) { + + case NN_FSM_ACTION: + switch (type) { + case NN_FSM_START: + nn_usock_accept (&atcpmux->usock, atcpmux->listener); + atcpmux->state = NN_ATCPMUX_STATE_ACCEPTING; + return; + default: + nn_fsm_bad_action (atcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (atcpmux->state, src, type); + } + +/******************************************************************************/ +/* ACCEPTING state. */ +/* Waiting for incoming connection. */ +/******************************************************************************/ + case NN_ATCPMUX_STATE_ACCEPTING: + switch (src) { + + case NN_ATCPMUX_SRC_USOCK: + switch (type) { + case NN_USOCK_ACCEPTED: + nn_epbase_clear_error (atcpmux->epbase); + + /* Set the relevant socket options. */ + sz = sizeof (val); + nn_epbase_getopt (atcpmux->epbase, NN_SOL_SOCKET, NN_SNDBUF, + &val, &sz); + nn_assert (sz == sizeof (val)); + nn_usock_setsockopt (&atcpmux->usock, SOL_SOCKET, SO_SNDBUF, + &val, sizeof (val)); + sz = sizeof (val); + nn_epbase_getopt (atcpmux->epbase, NN_SOL_SOCKET, NN_RCVBUF, + &val, &sz); + nn_assert (sz == sizeof (val)); + nn_usock_setsockopt (&atcpmux->usock, SOL_SOCKET, SO_RCVBUF, + &val, sizeof (val)); + + /* Return ownership of the listening socket to the parent. */ + nn_usock_swap_owner (atcpmux->listener, + &atcpmux->listener_owner); + atcpmux->listener = NULL; + atcpmux->listener_owner.src = -1; + atcpmux->listener_owner.fsm = NULL; + nn_fsm_raise (&atcpmux->fsm, &atcpmux->accepted, + NN_ATCPMUX_ACCEPTED); + + /* Start the stcpmux state machine. */ + nn_usock_activate (&atcpmux->usock); + nn_stcpmux_start (&atcpmux->stcpmux, &atcpmux->usock); + atcpmux->state = NN_ATCPMUX_STATE_ACTIVE; + + nn_epbase_stat_increment (atcpmux->epbase, + NN_STAT_ACCEPTED_CONNECTIONS, 1); + + return; + + default: + nn_fsm_bad_action (atcpmux->state, src, type); + } + + case NN_ATCPMUX_SRC_LISTENER: + switch (type) { + + case NN_USOCK_ACCEPT_ERROR: + nn_epbase_set_error (atcpmux->epbase, + nn_usock_geterrno(atcpmux->listener)); + nn_epbase_stat_increment (atcpmux->epbase, + NN_STAT_ACCEPT_ERRORS, 1); + nn_usock_accept (&atcpmux->usock, atcpmux->listener); + return; + + default: + nn_fsm_bad_action (atcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (atcpmux->state, src, type); + } + +/******************************************************************************/ +/* ACTIVE state. */ +/******************************************************************************/ + case NN_ATCPMUX_STATE_ACTIVE: + switch (src) { + + case NN_ATCPMUX_SRC_STCPMUX: + switch (type) { + case NN_STCPMUX_ERROR: + nn_stcpmux_stop (&atcpmux->stcpmux); + atcpmux->state = NN_ATCPMUX_STATE_STOPPING_STCPMUX; + nn_epbase_stat_increment (atcpmux->epbase, + NN_STAT_BROKEN_CONNECTIONS, 1); + return; + default: + nn_fsm_bad_action (atcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (atcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_STCPMUX state. */ +/******************************************************************************/ + case NN_ATCPMUX_STATE_STOPPING_STCPMUX: + switch (src) { + + case NN_ATCPMUX_SRC_STCPMUX: + switch (type) { + case NN_USOCK_SHUTDOWN: + return; + case NN_STCPMUX_STOPPED: + nn_usock_stop (&atcpmux->usock); + atcpmux->state = NN_ATCPMUX_STATE_STOPPING_USOCK; + return; + default: + nn_fsm_bad_action (atcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (atcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_USOCK state. */ +/******************************************************************************/ + case NN_ATCPMUX_STATE_STOPPING_USOCK: + switch (src) { + + case NN_ATCPMUX_SRC_USOCK: + switch (type) { + case NN_USOCK_SHUTDOWN: + return; + case NN_USOCK_STOPPED: + nn_fsm_raise (&atcpmux->fsm, &atcpmux->done, NN_ATCPMUX_ERROR); + atcpmux->state = NN_ATCPMUX_STATE_DONE; + return; + default: + nn_fsm_bad_action (atcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (atcpmux->state, src, type); + } + +/******************************************************************************/ +/* Invalid state. */ +/******************************************************************************/ + default: + nn_fsm_bad_state (atcpmux->state, src, type); + } +} + diff --git a/src/transports/tcpmux/atcpmux.h b/src/transports/tcpmux/atcpmux.h new file mode 100644 index 0000000..fb6f9a8 --- /dev/null +++ b/src/transports/tcpmux/atcpmux.h @@ -0,0 +1,81 @@ +/* + Copyright (c) 2013-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#ifndef NN_ATCPMUX_INCLUDED +#define NN_ATCPMUX_INCLUDED + +#include "stcpmux.h" + +#include "../../transport.h" + +#include "../../aio/fsm.h" +#include "../../aio/usock.h" + +#include "../../utils/list.h" + +/* State machine handling accepted TCPMUX sockets. */ + +/* In btcpmux, some events are just *assumed* to come from a child atcpmux + object. By using non-trivial event codes, we can do more reliable sanity + checking in such scenarios. */ +#define NN_ATCPMUX_ACCEPTED 34231 +#define NN_ATCPMUX_ERROR 34232 +#define NN_ATCPMUX_STOPPED 34233 + +struct nn_atcpmux { + + /* The state machine. */ + struct nn_fsm fsm; + int state; + + /* Pointer to the associated endpoint. */ + struct nn_epbase *epbase; + + /* Underlying socket. */ + struct nn_usock usock; + + /* Listening socket. Valid only while accepting new connection. */ + struct nn_usock *listener; + struct nn_fsm_owner listener_owner; + + /* State machine that takes care of the connection in the active state. */ + struct nn_stcpmux stcpmux; + + /* Events generated by atcpmux state machine. */ + struct nn_fsm_event accepted; + struct nn_fsm_event done; + + /* This member can be used by owner to keep individual atcpmuxes + in a list. */ + struct nn_list_item item; +}; + +void nn_atcpmux_init (struct nn_atcpmux *self, int src, + struct nn_epbase *epbase, struct nn_fsm *owner); +void nn_atcpmux_term (struct nn_atcpmux *self); + +int nn_atcpmux_isidle (struct nn_atcpmux *self); +void nn_atcpmux_start (struct nn_atcpmux *self, struct nn_usock *listener); +void nn_atcpmux_stop (struct nn_atcpmux *self); + +#endif + diff --git a/src/transports/tcpmux/btcpmux.c b/src/transports/tcpmux/btcpmux.c new file mode 100644 index 0000000..74bc7fe --- /dev/null +++ b/src/transports/tcpmux/btcpmux.c @@ -0,0 +1,506 @@ +/* + Copyright (c) 2012-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "btcpmux.h" +#include "atcpmux.h" + +#include "../utils/port.h" +#include "../utils/iface.h" + +#include "../../aio/fsm.h" +#include "../../aio/usock.h" + +#include "../utils/backoff.h" + +#include "../../utils/err.h" +#include "../../utils/cont.h" +#include "../../utils/alloc.h" +#include "../../utils/list.h" +#include "../../utils/fast.h" +#include "../../utils/int.h" + +#include <string.h> + +#if defined NN_HAVE_WINDOWS +#include "../../utils/win.h" +#else +#include <unistd.h> +#include <netinet/in.h> +#endif + +/* The backlog is set relatively high so that there are not too many failed + connection attemps during re-connection storms. */ +#define NN_BTCPMUX_BACKLOG 100 + +#define NN_BTCPMUX_STATE_IDLE 1 +#define NN_BTCPMUX_STATE_ACTIVE 2 +#define NN_BTCPMUX_STATE_STOPPING_ATCPMUX 3 +#define NN_BTCPMUX_STATE_STOPPING_USOCK 4 +#define NN_BTCPMUX_STATE_STOPPING_ATCPMUXES 5 +#define NN_BTCPMUX_STATE_LISTENING 6 +#define NN_BTCPMUX_STATE_WAITING 7 +#define NN_BTCPMUX_STATE_CLOSING 8 +#define NN_BTCPMUX_STATE_STOPPING_BACKOFF 9 + +#define NN_BTCPMUX_SRC_USOCK 1 +#define NN_BTCPMUX_SRC_ATCPMUX 2 +#define NN_BTCPMUX_SRC_RECONNECT_TIMER 3 + +struct nn_btcpmux { + + /* The state machine. */ + struct nn_fsm fsm; + int state; + + /* This object is a specific type of endpoint. + Thus it is derived from epbase. */ + struct nn_epbase epbase; + + /* The underlying listening TCPMUX socket. */ + struct nn_usock usock; + + /* The connection being accepted at the moment. */ + struct nn_atcpmux *atcpmux; + + /* List of accepted connections. */ + struct nn_list atcpmuxes; + + /* Used to wait before retrying to connect. */ + struct nn_backoff retry; +}; + +/* nn_epbase virtual interface implementation. */ +static void nn_btcpmux_stop (struct nn_epbase *self); +static void nn_btcpmux_destroy (struct nn_epbase *self); +const struct nn_epbase_vfptr nn_btcpmux_epbase_vfptr = { + nn_btcpmux_stop, + nn_btcpmux_destroy +}; + +/* Private functions. */ +static void nn_btcpmux_handler (struct nn_fsm *self, int src, int type, + void *srcptr); +static void nn_btcpmux_shutdown (struct nn_fsm *self, int src, int type, + void *srcptr); +static void nn_btcpmux_start_listening (struct nn_btcpmux *self); +static void nn_btcpmux_start_accepting (struct nn_btcpmux *self); + +int nn_btcpmux_create (void *hint, struct nn_epbase **epbase) +{ + int rc; + struct nn_btcpmux *self; + const char *addr; + const char *end; + const char *pos; + struct sockaddr_storage ss; + size_t sslen; + int ipv4only; + size_t ipv4onlylen; + int reconnect_ivl; + int reconnect_ivl_max; + size_t sz; + + /* Allocate the new endpoint object. */ + self = nn_alloc (sizeof (struct nn_btcpmux), "btcpmux"); + alloc_assert (self); + + /* Initalise the epbase. */ + nn_epbase_init (&self->epbase, &nn_btcpmux_epbase_vfptr, hint); + addr = nn_epbase_getaddr (&self->epbase); + + /* Parse the port. */ + end = addr + strlen (addr); + pos = strrchr (addr, ':'); + if (nn_slow (!pos)) { + nn_epbase_term (&self->epbase); + return -EINVAL; + } + ++pos; + rc = nn_port_resolve (pos, end - pos); + if (nn_slow (rc < 0)) { + nn_epbase_term (&self->epbase); + return -EINVAL; + } + + /* Check whether IPv6 is to be used. */ + ipv4onlylen = sizeof (ipv4only); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY, + &ipv4only, &ipv4onlylen); + nn_assert (ipv4onlylen == sizeof (ipv4only)); + + /* Parse the address. */ + rc = nn_iface_resolve (addr, pos - addr - 1, ipv4only, &ss, &sslen); + if (nn_slow (rc < 0)) { + nn_epbase_term (&self->epbase); + return -ENODEV; + } + + /* Initialise the structure. */ + nn_fsm_init_root (&self->fsm, nn_btcpmux_handler, nn_btcpmux_shutdown, + nn_epbase_getctx (&self->epbase)); + self->state = NN_BTCPMUX_STATE_IDLE; + sz = sizeof (reconnect_ivl); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL, + &reconnect_ivl, &sz); + nn_assert (sz == sizeof (reconnect_ivl)); + sz = sizeof (reconnect_ivl_max); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX, + &reconnect_ivl_max, &sz); + nn_assert (sz == sizeof (reconnect_ivl_max)); + if (reconnect_ivl_max == 0) + reconnect_ivl_max = reconnect_ivl; + nn_backoff_init (&self->retry, NN_BTCPMUX_SRC_RECONNECT_TIMER, + reconnect_ivl, reconnect_ivl_max, &self->fsm); + nn_usock_init (&self->usock, NN_BTCPMUX_SRC_USOCK, &self->fsm); + self->atcpmux = NULL; + nn_list_init (&self->atcpmuxes); + + /* Start the state machine. */ + nn_fsm_start (&self->fsm); + + /* Return the base class as an out parameter. */ + *epbase = &self->epbase; + + return 0; +} + +static void nn_btcpmux_stop (struct nn_epbase *self) +{ + struct nn_btcpmux *btcpmux; + + btcpmux = nn_cont (self, struct nn_btcpmux, epbase); + + nn_fsm_stop (&btcpmux->fsm); +} + +static void nn_btcpmux_destroy (struct nn_epbase *self) +{ + struct nn_btcpmux *btcpmux; + + btcpmux = nn_cont (self, struct nn_btcpmux, epbase); + + nn_assert_state (btcpmux, NN_BTCPMUX_STATE_IDLE); + nn_list_term (&btcpmux->atcpmuxes); + nn_assert (btcpmux->atcpmux == NULL); + nn_usock_term (&btcpmux->usock); + nn_backoff_term (&btcpmux->retry); + nn_epbase_term (&btcpmux->epbase); + nn_fsm_term (&btcpmux->fsm); + + nn_free (btcpmux); +} + +static void nn_btcpmux_shutdown (struct nn_fsm *self, int src, int type, + void *srcptr) +{ + struct nn_btcpmux *btcpmux; + struct nn_list_item *it; + struct nn_atcpmux *atcpmux; + + btcpmux = nn_cont (self, struct nn_btcpmux, fsm); + + if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { + nn_backoff_stop (&btcpmux->retry); + if (btcpmux->atcpmux) { + nn_atcpmux_stop (btcpmux->atcpmux); + btcpmux->state = NN_BTCPMUX_STATE_STOPPING_ATCPMUX; + } + else { + btcpmux->state = NN_BTCPMUX_STATE_STOPPING_USOCK; + } + } + if (nn_slow (btcpmux->state == NN_BTCPMUX_STATE_STOPPING_ATCPMUX)) { + if (!nn_atcpmux_isidle (btcpmux->atcpmux)) + return; + nn_atcpmux_term (btcpmux->atcpmux); + nn_free (btcpmux->atcpmux); + btcpmux->atcpmux = NULL; + nn_usock_stop (&btcpmux->usock); + btcpmux->state = NN_BTCPMUX_STATE_STOPPING_USOCK; + } + if (nn_slow (btcpmux->state == NN_BTCPMUX_STATE_STOPPING_USOCK)) { + if (!nn_usock_isidle (&btcpmux->usock)) + return; + for (it = nn_list_begin (&btcpmux->atcpmuxes); + it != nn_list_end (&btcpmux->atcpmuxes); + it = nn_list_next (&btcpmux->atcpmuxes, it)) { + atcpmux = nn_cont (it, struct nn_atcpmux, item); + nn_atcpmux_stop (atcpmux); + } + btcpmux->state = NN_BTCPMUX_STATE_STOPPING_ATCPMUXES; + goto atcpmuxes_stopping; + } + if (nn_slow (btcpmux->state == NN_BTCPMUX_STATE_STOPPING_ATCPMUXES)) { + nn_assert (src == NN_BTCPMUX_SRC_ATCPMUX && type == NN_ATCPMUX_STOPPED); + atcpmux = (struct nn_atcpmux *) srcptr; + nn_list_erase (&btcpmux->atcpmuxes, &atcpmux->item); + nn_atcpmux_term (atcpmux); + nn_free (atcpmux); + + /* If there are no more atcpmux state machines, we can stop the whole + btcpmux object. */ +atcpmuxes_stopping: + if (nn_list_empty (&btcpmux->atcpmuxes)) { + btcpmux->state = NN_BTCPMUX_STATE_IDLE; + nn_fsm_stopped_noevent (&btcpmux->fsm); + nn_epbase_stopped (&btcpmux->epbase); + return; + } + + return; + } + + nn_fsm_bad_action(btcpmux->state, src, type); +} + +static void nn_btcpmux_handler (struct nn_fsm *self, int src, int type, + void *srcptr) +{ + struct nn_btcpmux *btcpmux; + struct nn_atcpmux *atcpmux; + + btcpmux = nn_cont (self, struct nn_btcpmux, fsm); + + switch (btcpmux->state) { + +/******************************************************************************/ +/* IDLE state. */ +/******************************************************************************/ + case NN_BTCPMUX_STATE_IDLE: + switch (src) { + + case NN_FSM_ACTION: + switch (type) { + case NN_FSM_START: + nn_btcpmux_start_listening (btcpmux); + return; + default: + nn_fsm_bad_action (btcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (btcpmux->state, src, type); + } + +/******************************************************************************/ +/* ACTIVE state. */ +/* The execution is yielded to the atcpmux state machine in this state. */ +/******************************************************************************/ + case NN_BTCPMUX_STATE_ACTIVE: + if (srcptr == btcpmux->atcpmux) { + switch (type) { + case NN_ATCPMUX_ACCEPTED: + + /* Move the newly created connection to the list of existing + connections. */ + nn_list_insert (&btcpmux->atcpmuxes, &btcpmux->atcpmux->item, + nn_list_end (&btcpmux->atcpmuxes)); + btcpmux->atcpmux = NULL; + + /* Start waiting for a new incoming connection. */ + nn_btcpmux_start_accepting (btcpmux); + + return; + + default: + nn_fsm_bad_action (btcpmux->state, src, type); + } + } + + /* For all remaining events we'll assume they are coming from one + of remaining child atcpmux objects. */ + nn_assert (src == NN_BTCPMUX_SRC_ATCPMUX); + atcpmux = (struct nn_atcpmux*) srcptr; + switch (type) { + case NN_ATCPMUX_ERROR: + nn_atcpmux_stop (atcpmux); + return; + case NN_ATCPMUX_STOPPED: + nn_list_erase (&btcpmux->atcpmuxes, &atcpmux->item); + nn_atcpmux_term (atcpmux); + nn_free (atcpmux); + return; + default: + nn_fsm_bad_action (btcpmux->state, src, type); + } + +/******************************************************************************/ +/* CLOSING_USOCK state. */ +/* usock object was asked to stop but it haven't stopped yet. */ +/******************************************************************************/ + case NN_BTCPMUX_STATE_CLOSING: + switch (src) { + + case NN_BTCPMUX_SRC_USOCK: + switch (type) { + case NN_USOCK_SHUTDOWN: + return; + case NN_USOCK_STOPPED: + nn_backoff_start (&btcpmux->retry); + btcpmux->state = NN_BTCPMUX_STATE_WAITING; + return; + default: + nn_fsm_bad_action (btcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (btcpmux->state, src, type); + } + +/******************************************************************************/ +/* WAITING state. */ +/* Waiting before re-bind is attempted. This way we won't overload */ +/* the system by continuous re-bind attemps. */ +/******************************************************************************/ + case NN_BTCPMUX_STATE_WAITING: + switch (src) { + + case NN_BTCPMUX_SRC_RECONNECT_TIMER: + switch (type) { + case NN_BACKOFF_TIMEOUT: + nn_backoff_stop (&btcpmux->retry); + btcpmux->state = NN_BTCPMUX_STATE_STOPPING_BACKOFF; + return; + default: + nn_fsm_bad_action (btcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (btcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_BACKOFF state. */ +/* backoff object was asked to stop, but it haven't stopped yet. */ +/******************************************************************************/ + case NN_BTCPMUX_STATE_STOPPING_BACKOFF: + switch (src) { + + case NN_BTCPMUX_SRC_RECONNECT_TIMER: + switch (type) { + case NN_BACKOFF_STOPPED: + nn_btcpmux_start_listening (btcpmux); + return; + default: + nn_fsm_bad_action (btcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (btcpmux->state, src, type); + } + +/******************************************************************************/ +/* Invalid state. */ +/******************************************************************************/ + default: + nn_fsm_bad_state (btcpmux->state, src, type); + } +} + +/******************************************************************************/ +/* State machine actions. */ +/******************************************************************************/ + +static void nn_btcpmux_start_listening (struct nn_btcpmux *self) +{ + int rc; + struct sockaddr_storage ss; + size_t sslen; + int ipv4only; + size_t ipv4onlylen; + const char *addr; + const char *end; + const char *pos; + uint16_t port; + + /* First, resolve the IP address. */ + addr = nn_epbase_getaddr (&self->epbase); + memset (&ss, 0, sizeof (ss)); + + /* Parse the port. */ + end = addr + strlen (addr); + pos = strrchr (addr, ':'); + nn_assert (pos); + ++pos; + rc = nn_port_resolve (pos, end - pos); + nn_assert (rc >= 0); + port = rc; + + /* Parse the address. */ + ipv4onlylen = sizeof (ipv4only); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY, + &ipv4only, &ipv4onlylen); + nn_assert (ipv4onlylen == sizeof (ipv4only)); + rc = nn_iface_resolve (addr, pos - addr - 1, ipv4only, &ss, &sslen); + errnum_assert (rc == 0, -rc); + + /* Combine the port and the address. */ + if (ss.ss_family == AF_INET) { + ((struct sockaddr_in*) &ss)->sin_port = htons (port); + sslen = sizeof (struct sockaddr_in); + } + else if (ss.ss_family == AF_INET6) { + ((struct sockaddr_in6*) &ss)->sin6_port = htons (port); + sslen = sizeof (struct sockaddr_in6); + } + else + nn_assert (0); + + /* Start listening for incoming connections. */ + rc = nn_usock_start (&self->usock, ss.ss_family, SOCK_STREAM, 0); + if (nn_slow (rc < 0)) { + nn_backoff_start (&self->retry); + self->state = NN_BTCPMUX_STATE_WAITING; + return; + } + + rc = nn_usock_bind (&self->usock, (struct sockaddr*) &ss, (size_t) sslen); + if (nn_slow (rc < 0)) { + nn_usock_stop (&self->usock); + self->state = NN_BTCPMUX_STATE_CLOSING; + return; + } + + rc = nn_usock_listen (&self->usock, NN_BTCPMUX_BACKLOG); + if (nn_slow (rc < 0)) { + nn_usock_stop (&self->usock); + self->state = NN_BTCPMUX_STATE_CLOSING; + return; + } + nn_btcpmux_start_accepting(self); + self->state = NN_BTCPMUX_STATE_ACTIVE; +} + +static void nn_btcpmux_start_accepting (struct nn_btcpmux *self) +{ + nn_assert (self->atcpmux == NULL); + + /* Allocate new atcpmux state machine. */ + self->atcpmux = nn_alloc (sizeof (struct nn_atcpmux), "atcpmux"); + alloc_assert (self->atcpmux); + nn_atcpmux_init (self->atcpmux, NN_BTCPMUX_SRC_ATCPMUX, + &self->epbase, &self->fsm); + + /* Start waiting for a new incoming connection. */ + nn_atcpmux_start (self->atcpmux, &self->usock); +} + diff --git a/src/transports/tcpmux/btcpmux.h b/src/transports/tcpmux/btcpmux.h new file mode 100644 index 0000000..48983a8 --- /dev/null +++ b/src/transports/tcpmux/btcpmux.h @@ -0,0 +1,33 @@ +/* + Copyright (c) 2013-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#ifndef NN_BTCPMUX_INCLUDED +#define NN_BTCPMUX_INCLUDED + +#include "../../transport.h" + +/* State machine managing bound TCPMUX socket. */ + +int nn_btcpmux_create (void *hint, struct nn_epbase **epbase); + +#endif + diff --git a/src/transports/tcpmux/ctcpmux.c b/src/transports/tcpmux/ctcpmux.c new file mode 100644 index 0000000..d9f8501 --- /dev/null +++ b/src/transports/tcpmux/ctcpmux.c @@ -0,0 +1,632 @@ +/* + Copyright (c) 2012-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "ctcpmux.h" +#include "stcpmux.h" + +#include "../../tcpmux.h" + +#include "../utils/dns.h" +#include "../utils/port.h" +#include "../utils/iface.h" +#include "../utils/backoff.h" +#include "../utils/literal.h" + +#include "../../aio/fsm.h" +#include "../../aio/usock.h" + +#include "../../utils/err.h" +#include "../../utils/cont.h" +#include "../../utils/alloc.h" +#include "../../utils/fast.h" +#include "../../utils/int.h" +#include "../../utils/attr.h" + +#include <string.h> + +#if defined NN_HAVE_WINDOWS +#include "../../utils/win.h" +#else +#include <unistd.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#endif + +#define NN_CTCPMUX_STATE_IDLE 1 +#define NN_CTCPMUX_STATE_RESOLVING 2 +#define NN_CTCPMUX_STATE_STOPPING_DNS 3 +#define NN_CTCPMUX_STATE_CONNECTING 4 +#define NN_CTCPMUX_STATE_ACTIVE 5 +#define NN_CTCPMUX_STATE_STOPPING_STCPMUX 6 +#define NN_CTCPMUX_STATE_STOPPING_USOCK 7 +#define NN_CTCPMUX_STATE_WAITING 8 +#define NN_CTCPMUX_STATE_STOPPING_BACKOFF 9 +#define NN_CTCPMUX_STATE_STOPPING_STCPMUX_FINAL 10 +#define NN_CTCPMUX_STATE_STOPPING 11 + +#define NN_CTCPMUX_SRC_USOCK 1 +#define NN_CTCPMUX_SRC_RECONNECT_TIMER 2 +#define NN_CTCPMUX_SRC_DNS 3 +#define NN_CTCPMUX_SRC_STCPMUX 4 + +struct nn_ctcpmux { + + /* The state machine. */ + struct nn_fsm fsm; + int state; + + /* This object is a specific type of endpoint. + Thus it is derived from epbase. */ + struct nn_epbase epbase; + + /* The underlying TCPMUX socket. */ + struct nn_usock usock; + + /* Used to wait before retrying to connect. */ + struct nn_backoff retry; + + /* State machine that handles the active part of the connection + lifetime. */ + struct nn_stcpmux stcpmux; + + /* DNS resolver used to convert textual address into actual IP address + along with the variable to hold the result. */ + struct nn_dns dns; + struct nn_dns_result dns_result; +}; + +/* nn_epbase virtual interface implementation. */ +static void nn_ctcpmux_stop (struct nn_epbase *self); +static void nn_ctcpmux_destroy (struct nn_epbase *self); +const struct nn_epbase_vfptr nn_ctcpmux_epbase_vfptr = { + nn_ctcpmux_stop, + nn_ctcpmux_destroy +}; + +/* Private functions. */ +static void nn_ctcpmux_handler (struct nn_fsm *self, int src, int type, + void *srcptr); +static void nn_ctcpmux_shutdown (struct nn_fsm *self, int src, int type, + void *srcptr); +static void nn_ctcpmux_start_resolving (struct nn_ctcpmux *self); +static void nn_ctcpmux_start_connecting (struct nn_ctcpmux *self, + struct sockaddr_storage *ss, size_t sslen); + +int nn_ctcpmux_create (void *hint, struct nn_epbase **epbase) +{ + int rc; + const char *addr; + size_t addrlen; + const char *semicolon; + const char *hostname; + const char *colon; + const char *end; + struct sockaddr_storage ss; + size_t sslen; + int ipv4only; + size_t ipv4onlylen; + struct nn_ctcpmux *self; + int reconnect_ivl; + int reconnect_ivl_max; + size_t sz; + + /* Allocate the new endpoint object. */ + self = nn_alloc (sizeof (struct nn_ctcpmux), "ctcpmux"); + alloc_assert (self); + + /* Initalise the endpoint. */ + nn_epbase_init (&self->epbase, &nn_ctcpmux_epbase_vfptr, hint); + + /* Check whether IPv6 is to be used. */ + ipv4onlylen = sizeof (ipv4only); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY, + &ipv4only, &ipv4onlylen); + nn_assert (ipv4onlylen == sizeof (ipv4only)); + + /* Start parsing the address. */ + addr = nn_epbase_getaddr (&self->epbase); + addrlen = strlen (addr); + semicolon = strchr (addr, ';'); + hostname = semicolon ? semicolon + 1 : addr; + colon = strrchr (addr, ':'); + end = addr + addrlen; + + /* Parse the port. */ + if (nn_slow (!colon)) { + nn_epbase_term (&self->epbase); + return -EINVAL; + } + rc = nn_port_resolve (colon + 1, end - colon - 1); + if (nn_slow (rc < 0)) { + nn_epbase_term (&self->epbase); + return -EINVAL; + } + + /* Check whether the host portion of the address is either a literal + or a valid hostname. */ + if (nn_dns_check_hostname (hostname, colon - hostname) < 0 && + nn_literal_resolve (hostname, colon - hostname, ipv4only, + &ss, &sslen) < 0) { + nn_epbase_term (&self->epbase); + return -EINVAL; + } + + /* If local address is specified, check whether it is valid. */ + if (semicolon) { + rc = nn_iface_resolve (addr, semicolon - addr, ipv4only, &ss, &sslen); + if (rc < 0) { + nn_epbase_term (&self->epbase); + return -ENODEV; + } + } + + /* Initialise the structure. */ + nn_fsm_init_root (&self->fsm, nn_ctcpmux_handler, nn_ctcpmux_shutdown, + nn_epbase_getctx (&self->epbase)); + self->state = NN_CTCPMUX_STATE_IDLE; + nn_usock_init (&self->usock, NN_CTCPMUX_SRC_USOCK, &self->fsm); + sz = sizeof (reconnect_ivl); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL, + &reconnect_ivl, &sz); + nn_assert (sz == sizeof (reconnect_ivl)); + sz = sizeof (reconnect_ivl_max); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX, + &reconnect_ivl_max, &sz); + nn_assert (sz == sizeof (reconnect_ivl_max)); + if (reconnect_ivl_max == 0) + reconnect_ivl_max = reconnect_ivl; + nn_backoff_init (&self->retry, NN_CTCPMUX_SRC_RECONNECT_TIMER, + reconnect_ivl, reconnect_ivl_max, &self->fsm); + nn_stcpmux_init (&self->stcpmux, NN_CTCPMUX_SRC_STCPMUX, &self->epbase, &self->fsm); + nn_dns_init (&self->dns, NN_CTCPMUX_SRC_DNS, &self->fsm); + + /* Start the state machine. */ + nn_fsm_start (&self->fsm); + + /* Return the base class as an out parameter. */ + *epbase = &self->epbase; + + return 0; +} + +static void nn_ctcpmux_stop (struct nn_epbase *self) +{ + struct nn_ctcpmux *ctcpmux; + + ctcpmux = nn_cont (self, struct nn_ctcpmux, epbase); + + nn_fsm_stop (&ctcpmux->fsm); +} + +static void nn_ctcpmux_destroy (struct nn_epbase *self) +{ + struct nn_ctcpmux *ctcpmux; + + ctcpmux = nn_cont (self, struct nn_ctcpmux, epbase); + + nn_dns_term (&ctcpmux->dns); + nn_stcpmux_term (&ctcpmux->stcpmux); + nn_backoff_term (&ctcpmux->retry); + nn_usock_term (&ctcpmux->usock); + nn_fsm_term (&ctcpmux->fsm); + nn_epbase_term (&ctcpmux->epbase); + + nn_free (ctcpmux); +} + +static void nn_ctcpmux_shutdown (struct nn_fsm *self, int src, int type, + NN_UNUSED void *srcptr) +{ + struct nn_ctcpmux *ctcpmux; + + ctcpmux = nn_cont (self, struct nn_ctcpmux, fsm); + + if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { + if (!nn_stcpmux_isidle (&ctcpmux->stcpmux)) { + nn_epbase_stat_increment (&ctcpmux->epbase, + NN_STAT_DROPPED_CONNECTIONS, 1); + nn_stcpmux_stop (&ctcpmux->stcpmux); + } + ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_STCPMUX_FINAL; + } + if (nn_slow (ctcpmux->state == NN_CTCPMUX_STATE_STOPPING_STCPMUX_FINAL)) { + if (!nn_stcpmux_isidle (&ctcpmux->stcpmux)) + return; + nn_backoff_stop (&ctcpmux->retry); + nn_usock_stop (&ctcpmux->usock); + nn_dns_stop (&ctcpmux->dns); + ctcpmux->state = NN_CTCPMUX_STATE_STOPPING; + } + if (nn_slow (ctcpmux->state == NN_CTCPMUX_STATE_STOPPING)) { + if (!nn_backoff_isidle (&ctcpmux->retry) || + !nn_usock_isidle (&ctcpmux->usock) || + !nn_dns_isidle (&ctcpmux->dns)) + return; + ctcpmux->state = NN_CTCPMUX_STATE_IDLE; + nn_fsm_stopped_noevent (&ctcpmux->fsm); + nn_epbase_stopped (&ctcpmux->epbase); + return; + } + + nn_fsm_bad_state (ctcpmux->state, src, type); +} + +static void nn_ctcpmux_handler (struct nn_fsm *self, int src, int type, + NN_UNUSED void *srcptr) +{ + struct nn_ctcpmux *ctcpmux; + + ctcpmux = nn_cont (self, struct nn_ctcpmux, fsm); + + switch (ctcpmux->state) { + +/******************************************************************************/ +/* IDLE state. */ +/* The state machine wasn't yet started. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_IDLE: + switch (src) { + + case NN_FSM_ACTION: + switch (type) { + case NN_FSM_START: + nn_ctcpmux_start_resolving (ctcpmux); + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* RESOLVING state. */ +/* Name of the host to connect to is being resolved to get an IP address. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_RESOLVING: + switch (src) { + + case NN_CTCPMUX_SRC_DNS: + switch (type) { + case NN_DNS_DONE: + nn_dns_stop (&ctcpmux->dns); + ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_DNS; + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_DNS state. */ +/* dns object was asked to stop but it haven't stopped yet. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_STOPPING_DNS: + switch (src) { + + case NN_CTCPMUX_SRC_DNS: + switch (type) { + case NN_DNS_STOPPED: + if (ctcpmux->dns_result.error == 0) { + nn_ctcpmux_start_connecting (ctcpmux, + &ctcpmux->dns_result.addr, + ctcpmux->dns_result.addrlen); + return; + } + nn_backoff_start (&ctcpmux->retry); + ctcpmux->state = NN_CTCPMUX_STATE_WAITING; + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* CONNECTING state. */ +/* Non-blocking connect is under way. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_CONNECTING: + switch (src) { + + case NN_CTCPMUX_SRC_USOCK: + switch (type) { + case NN_USOCK_CONNECTED: + nn_stcpmux_start (&ctcpmux->stcpmux, &ctcpmux->usock); + ctcpmux->state = NN_CTCPMUX_STATE_ACTIVE; + nn_epbase_stat_increment (&ctcpmux->epbase, + NN_STAT_INPROGRESS_CONNECTIONS, -1); + nn_epbase_stat_increment (&ctcpmux->epbase, + NN_STAT_ESTABLISHED_CONNECTIONS, 1); + nn_epbase_clear_error (&ctcpmux->epbase); + return; + case NN_USOCK_ERROR: + nn_epbase_set_error (&ctcpmux->epbase, + nn_usock_geterrno (&ctcpmux->usock)); + nn_usock_stop (&ctcpmux->usock); + ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_USOCK; + nn_epbase_stat_increment (&ctcpmux->epbase, + NN_STAT_INPROGRESS_CONNECTIONS, -1); + nn_epbase_stat_increment (&ctcpmux->epbase, + NN_STAT_CONNECT_ERRORS, 1); + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* ACTIVE state. */ +/* Connection is established and handled by the stcpmux state machine. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_ACTIVE: + switch (src) { + + case NN_CTCPMUX_SRC_STCPMUX: + switch (type) { + case NN_STCPMUX_ERROR: + nn_stcpmux_stop (&ctcpmux->stcpmux); + ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_STCPMUX; + nn_epbase_stat_increment (&ctcpmux->epbase, + NN_STAT_BROKEN_CONNECTIONS, 1); + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_STCPMUX state. */ +/* stcpmux object was asked to stop but it haven't stopped yet. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_STOPPING_STCPMUX: + switch (src) { + + case NN_CTCPMUX_SRC_STCPMUX: + switch (type) { + case NN_USOCK_SHUTDOWN: + return; + case NN_STCPMUX_STOPPED: + nn_usock_stop (&ctcpmux->usock); + ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_USOCK; + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_USOCK state. */ +/* usock object was asked to stop but it haven't stopped yet. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_STOPPING_USOCK: + switch (src) { + + case NN_CTCPMUX_SRC_USOCK: + switch (type) { + case NN_USOCK_SHUTDOWN: + return; + case NN_USOCK_STOPPED: + nn_backoff_start (&ctcpmux->retry); + ctcpmux->state = NN_CTCPMUX_STATE_WAITING; + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* WAITING state. */ +/* Waiting before re-connection is attempted. This way we won't overload */ +/* the system by continuous re-connection attemps. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_WAITING: + switch (src) { + + case NN_CTCPMUX_SRC_RECONNECT_TIMER: + switch (type) { + case NN_BACKOFF_TIMEOUT: + nn_backoff_stop (&ctcpmux->retry); + ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_BACKOFF; + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_BACKOFF state. */ +/* backoff object was asked to stop, but it haven't stopped yet. */ +/******************************************************************************/ + case NN_CTCPMUX_STATE_STOPPING_BACKOFF: + switch (src) { + + case NN_CTCPMUX_SRC_RECONNECT_TIMER: + switch (type) { + case NN_BACKOFF_STOPPED: + nn_ctcpmux_start_resolving (ctcpmux); + return; + default: + nn_fsm_bad_action (ctcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (ctcpmux->state, src, type); + } + +/******************************************************************************/ +/* Invalid state. */ +/******************************************************************************/ + default: + nn_fsm_bad_state (ctcpmux->state, src, type); + } +} + +/******************************************************************************/ +/* State machine actions. */ +/******************************************************************************/ + +static void nn_ctcpmux_start_resolving (struct nn_ctcpmux *self) +{ + const char *addr; + const char *begin; + const char *end; + int ipv4only; + size_t ipv4onlylen; + + /* Extract the hostname part from address string. */ + addr = nn_epbase_getaddr (&self->epbase); + begin = strchr (addr, ';'); + if (!begin) + begin = addr; + else + ++begin; + end = strrchr (addr, ':'); + nn_assert (end); + + /* Check whether IPv6 is to be used. */ + ipv4onlylen = sizeof (ipv4only); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY, + &ipv4only, &ipv4onlylen); + nn_assert (ipv4onlylen == sizeof (ipv4only)); + + /* TODO: Get the actual value of IPV4ONLY option. */ + nn_dns_start (&self->dns, begin, end - begin, ipv4only, &self->dns_result); + + self->state = NN_CTCPMUX_STATE_RESOLVING; +} + +static void nn_ctcpmux_start_connecting (struct nn_ctcpmux *self, + struct sockaddr_storage *ss, size_t sslen) +{ + int rc; + struct sockaddr_storage remote; + size_t remotelen; + struct sockaddr_storage local; + size_t locallen; + const char *addr; + const char *end; + const char *colon; + const char *semicolon; + uint16_t port; + int ipv4only; + size_t ipv4onlylen; + int val; + size_t sz; + + /* Create IP address from the address string. */ + addr = nn_epbase_getaddr (&self->epbase); + memset (&remote, 0, sizeof (remote)); + + /* Parse the port. */ + end = addr + strlen (addr); + colon = strrchr (addr, ':'); + rc = nn_port_resolve (colon + 1, end - colon - 1); + errnum_assert (rc > 0, -rc); + port = rc; + + /* Check whether IPv6 is to be used. */ + ipv4onlylen = sizeof (ipv4only); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY, + &ipv4only, &ipv4onlylen); + nn_assert (ipv4onlylen == sizeof (ipv4only)); + + /* Parse the local address, if any. */ + semicolon = strchr (addr, ';'); + memset (&local, 0, sizeof (local)); + if (semicolon) + rc = nn_iface_resolve (addr, semicolon - addr, ipv4only, + &local, &locallen); + else + rc = nn_iface_resolve ("*", 1, ipv4only, &local, &locallen); + if (nn_slow (rc < 0)) { + nn_backoff_start (&self->retry); + self->state = NN_CTCPMUX_STATE_WAITING; + return; + } + + /* Combine the remote address and the port. */ + remote = *ss; + remotelen = sslen; + if (remote.ss_family == AF_INET) + ((struct sockaddr_in*) &remote)->sin_port = htons (port); + else if (remote.ss_family == AF_INET6) + ((struct sockaddr_in6*) &remote)->sin6_port = htons (port); + else + nn_assert (0); + + /* Try to start the underlying socket. */ + rc = nn_usock_start (&self->usock, remote.ss_family, SOCK_STREAM, 0); + if (nn_slow (rc < 0)) { + nn_backoff_start (&self->retry); + self->state = NN_CTCPMUX_STATE_WAITING; + return; + } + + /* Set the relevant socket options. */ + sz = sizeof (val); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_SNDBUF, &val, &sz); + nn_assert (sz == sizeof (val)); + nn_usock_setsockopt (&self->usock, SOL_SOCKET, SO_SNDBUF, + &val, sizeof (val)); + sz = sizeof (val); + nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RCVBUF, &val, &sz); + nn_assert (sz == sizeof (val)); + nn_usock_setsockopt (&self->usock, SOL_SOCKET, SO_RCVBUF, + &val, sizeof (val)); + + /* Bind the socket to the local network interface. */ + rc = nn_usock_bind (&self->usock, (struct sockaddr*) &local, locallen); + if (nn_slow (rc != 0)) { + nn_backoff_start (&self->retry); + self->state = NN_CTCPMUX_STATE_WAITING; + return; + } + + /* Start connecting. */ + nn_usock_connect (&self->usock, (struct sockaddr*) &remote, remotelen); + self->state = NN_CTCPMUX_STATE_CONNECTING; + nn_epbase_stat_increment (&self->epbase, + NN_STAT_INPROGRESS_CONNECTIONS, 1); +} + diff --git a/src/transports/tcpmux/ctcpmux.h b/src/transports/tcpmux/ctcpmux.h new file mode 100644 index 0000000..294f4fa --- /dev/null +++ b/src/transports/tcpmux/ctcpmux.h @@ -0,0 +1,33 @@ +/* + Copyright (c) 2013-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#ifndef NN_CTCPMUX_INCLUDED +#define NN_CTCPMUX_INCLUDED + +#include "../../transport.h" + +/* State machine managing connected TCPMUX socket. */ + +int nn_ctcpmux_create (void *hint, struct nn_epbase **epbase); + +#endif + diff --git a/src/transports/tcpmux/stcpmux.c b/src/transports/tcpmux/stcpmux.c new file mode 100644 index 0000000..abbf180 --- /dev/null +++ b/src/transports/tcpmux/stcpmux.c @@ -0,0 +1,417 @@ +/* + Copyright (c) 2013-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "stcpmux.h" + +#include "../../utils/err.h" +#include "../../utils/cont.h" +#include "../../utils/fast.h" +#include "../../utils/wire.h" +#include "../../utils/int.h" +#include "../../utils/attr.h" + +/* States of the object as a whole. */ +#define NN_STCPMUX_STATE_IDLE 1 +#define NN_STCPMUX_STATE_PROTOHDR 2 +#define NN_STCPMUX_STATE_STOPPING_STREAMHDR 3 +#define NN_STCPMUX_STATE_ACTIVE 4 +#define NN_STCPMUX_STATE_SHUTTING_DOWN 5 +#define NN_STCPMUX_STATE_DONE 6 +#define NN_STCPMUX_STATE_STOPPING 7 + +/* Possible states of the inbound part of the object. */ +#define NN_STCPMUX_INSTATE_HDR 1 +#define NN_STCPMUX_INSTATE_BODY 2 +#define NN_STCPMUX_INSTATE_HASMSG 3 + +/* Possible states of the outbound part of the object. */ +#define NN_STCPMUX_OUTSTATE_IDLE 1 +#define NN_STCPMUX_OUTSTATE_SENDING 2 + +/* Subordinate srcptr objects. */ +#define NN_STCPMUX_SRC_USOCK 1 +#define NN_STCPMUX_SRC_STREAMHDR 2 + +/* Stream is a special type of pipe. Implementation of the virtual pipe API. */ +static int nn_stcpmux_send (struct nn_pipebase *self, struct nn_msg *msg); +static int nn_stcpmux_recv (struct nn_pipebase *self, struct nn_msg *msg); +const struct nn_pipebase_vfptr nn_stcpmux_pipebase_vfptr = { + nn_stcpmux_send, + nn_stcpmux_recv +}; + +/* Private functions. */ +static void nn_stcpmux_handler (struct nn_fsm *self, int src, int type, + void *srcptr); +static void nn_stcpmux_shutdown (struct nn_fsm *self, int src, int type, + void *srcptr); + +void nn_stcpmux_init (struct nn_stcpmux *self, int src, + struct nn_epbase *epbase, struct nn_fsm *owner) +{ + nn_fsm_init (&self->fsm, nn_stcpmux_handler, nn_stcpmux_shutdown, + src, self, owner); + self->state = NN_STCPMUX_STATE_IDLE; + nn_streamhdr_init (&self->streamhdr, NN_STCPMUX_SRC_STREAMHDR, &self->fsm); + self->usock = NULL; + self->usock_owner.src = -1; + self->usock_owner.fsm = NULL; + nn_pipebase_init (&self->pipebase, &nn_stcpmux_pipebase_vfptr, epbase); + self->instate = -1; + nn_msg_init (&self->inmsg, 0); + self->outstate = -1; + nn_msg_init (&self->outmsg, 0); + nn_fsm_event_init (&self->done); +} + +void nn_stcpmux_term (struct nn_stcpmux *self) +{ + nn_assert_state (self, NN_STCPMUX_STATE_IDLE); + + nn_fsm_event_term (&self->done); + nn_msg_term (&self->outmsg); + nn_msg_term (&self->inmsg); + nn_pipebase_term (&self->pipebase); + nn_streamhdr_term (&self->streamhdr); + nn_fsm_term (&self->fsm); +} + +int nn_stcpmux_isidle (struct nn_stcpmux *self) +{ + return nn_fsm_isidle (&self->fsm); +} + +void nn_stcpmux_start (struct nn_stcpmux *self, struct nn_usock *usock) +{ + /* Take ownership of the underlying socket. */ + nn_assert (self->usock == NULL && self->usock_owner.fsm == NULL); + self->usock_owner.src = NN_STCPMUX_SRC_USOCK; + self->usock_owner.fsm = &self->fsm; + nn_usock_swap_owner (usock, &self->usock_owner); + self->usock = usock; + + /* Launch the state machine. */ + nn_fsm_start (&self->fsm); +} + +void nn_stcpmux_stop (struct nn_stcpmux *self) +{ + nn_fsm_stop (&self->fsm); +} + +static int nn_stcpmux_send (struct nn_pipebase *self, struct nn_msg *msg) +{ + struct nn_stcpmux *stcpmux; + struct nn_iovec iov [3]; + + stcpmux = nn_cont (self, struct nn_stcpmux, pipebase); + + nn_assert_state (stcpmux, NN_STCPMUX_STATE_ACTIVE); + nn_assert (stcpmux->outstate == NN_STCPMUX_OUTSTATE_IDLE); + + /* Move the message to the local storage. */ + nn_msg_term (&stcpmux->outmsg); + nn_msg_mv (&stcpmux->outmsg, msg); + + /* Serialise the message header. */ + nn_putll (stcpmux->outhdr, nn_chunkref_size (&stcpmux->outmsg.sphdr) + + nn_chunkref_size (&stcpmux->outmsg.body)); + + /* Start async sending. */ + iov [0].iov_base = stcpmux->outhdr; + iov [0].iov_len = sizeof (stcpmux->outhdr); + iov [1].iov_base = nn_chunkref_data (&stcpmux->outmsg.sphdr); + iov [1].iov_len = nn_chunkref_size (&stcpmux->outmsg.sphdr); + iov [2].iov_base = nn_chunkref_data (&stcpmux->outmsg.body); + iov [2].iov_len = nn_chunkref_size (&stcpmux->outmsg.body); + nn_usock_send (stcpmux->usock, iov, 3); + + stcpmux->outstate = NN_STCPMUX_OUTSTATE_SENDING; + + return 0; +} + +static int nn_stcpmux_recv (struct nn_pipebase *self, struct nn_msg *msg) +{ + struct nn_stcpmux *stcpmux; + + stcpmux = nn_cont (self, struct nn_stcpmux, pipebase); + + nn_assert_state (stcpmux, NN_STCPMUX_STATE_ACTIVE); + nn_assert (stcpmux->instate == NN_STCPMUX_INSTATE_HASMSG); + + /* Move received message to the user. */ + nn_msg_mv (msg, &stcpmux->inmsg); + nn_msg_init (&stcpmux->inmsg, 0); + + /* Start receiving new message. */ + stcpmux->instate = NN_STCPMUX_INSTATE_HDR; + nn_usock_recv (stcpmux->usock, stcpmux->inhdr, sizeof (stcpmux->inhdr)); + + return 0; +} + +static void nn_stcpmux_shutdown (struct nn_fsm *self, int src, int type, + NN_UNUSED void *srcptr) +{ + struct nn_stcpmux *stcpmux; + + stcpmux = nn_cont (self, struct nn_stcpmux, fsm); + + if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { + nn_pipebase_stop (&stcpmux->pipebase); + nn_streamhdr_stop (&stcpmux->streamhdr); + stcpmux->state = NN_STCPMUX_STATE_STOPPING; + } + if (nn_slow (stcpmux->state == NN_STCPMUX_STATE_STOPPING)) { + if (nn_streamhdr_isidle (&stcpmux->streamhdr)) { + nn_usock_swap_owner (stcpmux->usock, &stcpmux->usock_owner); + stcpmux->usock = NULL; + stcpmux->usock_owner.src = -1; + stcpmux->usock_owner.fsm = NULL; + stcpmux->state = NN_STCPMUX_STATE_IDLE; + nn_fsm_stopped (&stcpmux->fsm, NN_STCPMUX_STOPPED); + return; + } + return; + } + + nn_fsm_bad_state(stcpmux->state, src, type); +} + +static void nn_stcpmux_handler (struct nn_fsm *self, int src, int type, + NN_UNUSED void *srcptr) +{ + int rc; + struct nn_stcpmux *stcpmux; + uint64_t size; + + stcpmux = nn_cont (self, struct nn_stcpmux, fsm); + + switch (stcpmux->state) { + +/******************************************************************************/ +/* IDLE state. */ +/******************************************************************************/ + case NN_STCPMUX_STATE_IDLE: + switch (src) { + + case NN_FSM_ACTION: + switch (type) { + case NN_FSM_START: + nn_streamhdr_start (&stcpmux->streamhdr, stcpmux->usock, + &stcpmux->pipebase); + stcpmux->state = NN_STCPMUX_STATE_PROTOHDR; + return; + default: + nn_fsm_bad_action (stcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (stcpmux->state, src, type); + } + +/******************************************************************************/ +/* PROTOHDR state. */ +/******************************************************************************/ + case NN_STCPMUX_STATE_PROTOHDR: + switch (src) { + + case NN_STCPMUX_SRC_STREAMHDR: + switch (type) { + case NN_STREAMHDR_OK: + + /* Before moving to the active state stop the streamhdr + state machine. */ + nn_streamhdr_stop (&stcpmux->streamhdr); + stcpmux->state = NN_STCPMUX_STATE_STOPPING_STREAMHDR; + return; + + case NN_STREAMHDR_ERROR: + + /* Raise the error and move directly to the DONE state. + streamhdr object will be stopped later on. */ + stcpmux->state = NN_STCPMUX_STATE_DONE; + nn_fsm_raise (&stcpmux->fsm, &stcpmux->done, NN_STCPMUX_ERROR); + return; + + default: + nn_fsm_bad_action (stcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (stcpmux->state, src, type); + } + +/******************************************************************************/ +/* STOPPING_STREAMHDR state. */ +/******************************************************************************/ + case NN_STCPMUX_STATE_STOPPING_STREAMHDR: + switch (src) { + + case NN_STCPMUX_SRC_STREAMHDR: + switch (type) { + case NN_STREAMHDR_STOPPED: + + /* Start the pipe. */ + rc = nn_pipebase_start (&stcpmux->pipebase); + if (nn_slow (rc < 0)) { + stcpmux->state = NN_STCPMUX_STATE_DONE; + nn_fsm_raise (&stcpmux->fsm, &stcpmux->done, + NN_STCPMUX_ERROR); + return; + } + + /* Start receiving a message in asynchronous manner. */ + stcpmux->instate = NN_STCPMUX_INSTATE_HDR; + nn_usock_recv (stcpmux->usock, &stcpmux->inhdr, + sizeof (stcpmux->inhdr)); + + /* Mark the pipe as available for sending. */ + stcpmux->outstate = NN_STCPMUX_OUTSTATE_IDLE; + + stcpmux->state = NN_STCPMUX_STATE_ACTIVE; + return; + + default: + nn_fsm_bad_action (stcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (stcpmux->state, src, type); + } + +/******************************************************************************/ +/* ACTIVE state. */ +/******************************************************************************/ + case NN_STCPMUX_STATE_ACTIVE: + switch (src) { + + case NN_STCPMUX_SRC_USOCK: + switch (type) { + case NN_USOCK_SENT: + + /* The message is now fully sent. */ + nn_assert (stcpmux->outstate == NN_STCPMUX_OUTSTATE_SENDING); + stcpmux->outstate = NN_STCPMUX_OUTSTATE_IDLE; + nn_msg_term (&stcpmux->outmsg); + nn_msg_init (&stcpmux->outmsg, 0); + nn_pipebase_sent (&stcpmux->pipebase); + return; + + case NN_USOCK_RECEIVED: + + switch (stcpmux->instate) { + case NN_STCPMUX_INSTATE_HDR: + + /* Message header was received. Allocate memory for the + message. */ + size = nn_getll (stcpmux->inhdr); + nn_msg_term (&stcpmux->inmsg); + nn_msg_init (&stcpmux->inmsg, (size_t) size); + + /* Special case when size of the message body is 0. */ + if (!size) { + stcpmux->instate = NN_STCPMUX_INSTATE_HASMSG; + nn_pipebase_received (&stcpmux->pipebase); + return; + } + + /* Start receiving the message body. */ + stcpmux->instate = NN_STCPMUX_INSTATE_BODY; + nn_usock_recv (stcpmux->usock, + nn_chunkref_data (&stcpmux->inmsg.body), (size_t) size); + + return; + + case NN_STCPMUX_INSTATE_BODY: + + /* Message body was received. Notify the owner that it + can receive it. */ + stcpmux->instate = NN_STCPMUX_INSTATE_HASMSG; + nn_pipebase_received (&stcpmux->pipebase); + + return; + + default: + nn_fsm_error("Unexpected socket instate", + stcpmux->state, src, type); + } + + case NN_USOCK_SHUTDOWN: + nn_pipebase_stop (&stcpmux->pipebase); + stcpmux->state = NN_STCPMUX_STATE_SHUTTING_DOWN; + return; + + case NN_USOCK_ERROR: + nn_pipebase_stop (&stcpmux->pipebase); + stcpmux->state = NN_STCPMUX_STATE_DONE; + nn_fsm_raise (&stcpmux->fsm, &stcpmux->done, NN_STCPMUX_ERROR); + return; + + default: + nn_fsm_bad_action (stcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (stcpmux->state, src, type); + } + +/******************************************************************************/ +/* SHUTTING_DOWN state. */ +/* The underlying connection is closed. We are just waiting that underlying */ +/* usock being closed */ +/******************************************************************************/ + case NN_STCPMUX_STATE_SHUTTING_DOWN: + switch (src) { + + case NN_STCPMUX_SRC_USOCK: + switch (type) { + case NN_USOCK_ERROR: + stcpmux->state = NN_STCPMUX_STATE_DONE; + nn_fsm_raise (&stcpmux->fsm, &stcpmux->done, NN_STCPMUX_ERROR); + return; + default: + nn_fsm_bad_action (stcpmux->state, src, type); + } + + default: + nn_fsm_bad_source (stcpmux->state, src, type); + } + + +/******************************************************************************/ +/* DONE state. */ +/* The underlying connection is closed. There's nothing that can be done in */ +/* this state except stopping the object. */ +/******************************************************************************/ + case NN_STCPMUX_STATE_DONE: + nn_fsm_bad_source (stcpmux->state, src, type); + +/******************************************************************************/ +/* Invalid state. */ +/******************************************************************************/ + default: + nn_fsm_bad_state (stcpmux->state, src, type); + } +} + diff --git a/src/transports/tcpmux/stcpmux.h b/src/transports/tcpmux/stcpmux.h new file mode 100644 index 0000000..d339f54 --- /dev/null +++ b/src/transports/tcpmux/stcpmux.h @@ -0,0 +1,90 @@ +/* + Copyright (c) 2013-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#ifndef NN_STCPMUX_INCLUDED +#define NN_STCPMUX_INCLUDED + +#include "../../transport.h" + +#include "../../aio/fsm.h" +#include "../../aio/usock.h" + +#include "../utils/streamhdr.h" + +#include "../../utils/msg.h" + +/* This state machine handles TCPMUX connection from the point where it is + established to the point when it is broken. */ + +#define NN_STCPMUX_ERROR 1 +#define NN_STCPMUX_STOPPED 2 + +struct nn_stcpmux { + + /* The state machine. */ + struct nn_fsm fsm; + int state; + + /* The underlying socket. */ + struct nn_usock *usock; + + /* Child state machine to do protocol header exchange. */ + struct nn_streamhdr streamhdr; + + /* The original owner of the underlying socket. */ + struct nn_fsm_owner usock_owner; + + /* Pipe connecting this TCPMUX connection to the nanomsg core. */ + struct nn_pipebase pipebase; + + /* State of inbound state machine. */ + int instate; + + /* Buffer used to store the header of incoming message. */ + uint8_t inhdr [8]; + + /* Message being received at the moment. */ + struct nn_msg inmsg; + + /* State of the outbound state machine. */ + int outstate; + + /* Buffer used to store the header of outgoing message. */ + uint8_t outhdr [8]; + + /* Message being sent at the moment. */ + struct nn_msg outmsg; + + /* Event raised when the state machine ends. */ + struct nn_fsm_event done; +}; + +void nn_stcpmux_init (struct nn_stcpmux *self, int src, + struct nn_epbase *epbase, struct nn_fsm *owner); +void nn_stcpmux_term (struct nn_stcpmux *self); + +int nn_stcpmux_isidle (struct nn_stcpmux *self); +void nn_stcpmux_start (struct nn_stcpmux *self, struct nn_usock *usock); +void nn_stcpmux_stop (struct nn_stcpmux *self); + +#endif + diff --git a/src/transports/tcpmux/tcpmux.c b/src/transports/tcpmux/tcpmux.c new file mode 100644 index 0000000..eba42f6 --- /dev/null +++ b/src/transports/tcpmux/tcpmux.c @@ -0,0 +1,159 @@ +/* + Copyright (c) 2012-2014 Martin Sustrik All rights reserved. + Copyright (c) 2013 GoPivotal, Inc. All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "tcpmux.h" +#include "btcpmux.h" +#include "ctcpmux.h" + +#include "../../tcpmux.h" + +#include "../utils/port.h" +#include "../utils/iface.h" + +#include "../../utils/err.h" +#include "../../utils/alloc.h" +#include "../../utils/fast.h" +#include "../../utils/list.h" +#include "../../utils/cont.h" + +#include <string.h> + +#if defined NN_HAVE_WINDOWS +#include "../../utils/win.h" +#else +#include <unistd.h> +#endif + +/* TCPMUX-specific socket options. */ + +struct nn_tcpmux_optset { + struct nn_optset base; + int nodelay; +}; + +static void nn_tcpmux_optset_destroy (struct nn_optset *self); +static int nn_tcpmux_optset_setopt (struct nn_optset *self, int option, + const void *optval, size_t optvallen); +static int nn_tcpmux_optset_getopt (struct nn_optset *self, int option, + void *optval, size_t *optvallen); +static const struct nn_optset_vfptr nn_tcpmux_optset_vfptr = { + nn_tcpmux_optset_destroy, + nn_tcpmux_optset_setopt, + nn_tcpmux_optset_getopt +}; + +/* nn_transport interface. */ +static int nn_tcpmux_bind (void *hint, struct nn_epbase **epbase); +static int nn_tcpmux_connect (void *hint, struct nn_epbase **epbase); +static struct nn_optset *nn_tcpmux_optset (void); + +static struct nn_transport nn_tcpmux_vfptr = { + "tcpmux", + NN_TCPMUX, + NULL, + NULL, + nn_tcpmux_bind, + nn_tcpmux_connect, + nn_tcpmux_optset, + NN_LIST_ITEM_INITIALIZER +}; + +struct nn_transport *nn_tcpmux = &nn_tcpmux_vfptr; + +static int nn_tcpmux_bind (void *hint, struct nn_epbase **epbase) +{ + return nn_btcpmux_create (hint, epbase); +} + +static int nn_tcpmux_connect (void *hint, struct nn_epbase **epbase) +{ + return nn_ctcpmux_create (hint, epbase); +} + +static struct nn_optset *nn_tcpmux_optset () +{ + struct nn_tcpmux_optset *optset; + + optset = nn_alloc (sizeof (struct nn_tcpmux_optset), "optset (tcpmux)"); + alloc_assert (optset); + optset->base.vfptr = &nn_tcpmux_optset_vfptr; + + /* Default values for TCPMUX socket options. */ + optset->nodelay = 0; + + return &optset->base; +} + +static void nn_tcpmux_optset_destroy (struct nn_optset *self) +{ + struct nn_tcpmux_optset *optset; + + optset = nn_cont (self, struct nn_tcpmux_optset, base); + nn_free (optset); +} + +static int nn_tcpmux_optset_setopt (struct nn_optset *self, int option, + const void *optval, size_t optvallen) +{ + struct nn_tcpmux_optset *optset; + int val; + + optset = nn_cont (self, struct nn_tcpmux_optset, base); + + /* At this point we assume that all options are of type int. */ + if (optvallen != sizeof (int)) + return -EINVAL; + val = *(int*) optval; + + switch (option) { + case NN_TCPMUX_NODELAY: + if (nn_slow (val != 0 && val != 1)) + return -EINVAL; + optset->nodelay = val; + return 0; + default: + return -ENOPROTOOPT; + } +} + +static int nn_tcpmux_optset_getopt (struct nn_optset *self, int option, + void *optval, size_t *optvallen) +{ + struct nn_tcpmux_optset *optset; + int intval; + + optset = nn_cont (self, struct nn_tcpmux_optset, base); + + switch (option) { + case NN_TCPMUX_NODELAY: + intval = optset->nodelay; + break; + default: + return -ENOPROTOOPT; + } + memcpy (optval, &intval, + *optvallen < sizeof (int) ? *optvallen : sizeof (int)); + *optvallen = sizeof (int); + return 0; +} + diff --git a/src/transports/tcpmux/tcpmux.h b/src/transports/tcpmux/tcpmux.h new file mode 100644 index 0000000..dc3a687 --- /dev/null +++ b/src/transports/tcpmux/tcpmux.h @@ -0,0 +1,30 @@ +/* + Copyright (c) 2012-2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#ifndef NN_TCPMUX_INCLUDED +#define NN_TCPMUX_INCLUDED + +#include "../../transport.h" + +extern struct nn_transport *nn_tcpmux; + +#endif |