summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2014-11-18 21:41:46 +0100
committerMartin Sustrik <sustrik@250bpm.com>2014-11-20 16:28:55 +0100
commitef741eeb01173f094fabf6fad91d6f3d9e3a4211 (patch)
treef06a9adf1350ca396fc98af2b6ca5afbc91b834c
parent7cc7a88c91530438e7538177f92c7da43d304372 (diff)
downloadnanomsg-ef741eeb01173f094fabf6fad91d6f3d9e3a4211.tar.gz
First version of tcpmux transport added
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r--Makefile.am15
-rw-r--r--src/core/global.c2
-rw-r--r--src/transports/tcpmux/atcpmux.c314
-rw-r--r--src/transports/tcpmux/atcpmux.h81
-rw-r--r--src/transports/tcpmux/btcpmux.c506
-rw-r--r--src/transports/tcpmux/btcpmux.h33
-rw-r--r--src/transports/tcpmux/ctcpmux.c632
-rw-r--r--src/transports/tcpmux/ctcpmux.h33
-rw-r--r--src/transports/tcpmux/stcpmux.c417
-rw-r--r--src/transports/tcpmux/stcpmux.h90
-rw-r--r--src/transports/tcpmux/tcpmux.c159
-rw-r--r--src/transports/tcpmux/tcpmux.h30
12 files changed, 2311 insertions, 1 deletions
diff --git a/Makefile.am b/Makefile.am
index 7ff82fe..efb2b76 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -305,12 +305,25 @@ TRANSPORTS_WS = \
src/transports/ws/sha1.h \
src/transports/ws/sha1.c
+TRANSPORTS_TCPMUX = \
+ src/transports/tcpmux/atcpmux.h \
+ src/transports/tcpmux/atcpmux.c \
+ src/transports/tcpmux/btcpmux.h \
+ src/transports/tcpmux/btcpmux.c \
+ src/transports/tcpmux/ctcpmux.h \
+ src/transports/tcpmux/ctcpmux.c \
+ src/transports/tcpmux/stcpmux.h \
+ src/transports/tcpmux/stcpmux.c \
+ src/transports/tcpmux/tcpmux.h \
+ src/transports/tcpmux/tcpmux.c
+
NANOMSG_TRANSPORTS = \
$(TRANSPORTS_UTILS) \
$(TRANSPORTS_INPROC) \
$(TRANSPORTS_IPC) \
$(TRANSPORTS_TCP) \
- $(TRANSPORTS_WS)
+ $(TRANSPORTS_WS) \
+ $(TRANSPORTS_TCPMUX)
libnanomsg_la_SOURCES = \
src/transport.h \
diff --git a/src/core/global.c b/src/core/global.c
index 67cc22f..672345d 100644
--- a/src/core/global.c
+++ b/src/core/global.c
@@ -47,6 +47,7 @@
#include "../transports/ipc/ipc.h"
#include "../transports/tcp/tcp.h"
#include "../transports/ws/ws.h"
+#include "../transports/tcpmux/tcpmux.h"
#include "../protocols/pair/pair.h"
#include "../protocols/pair/xpair.h"
@@ -256,6 +257,7 @@ static void nn_global_init (void)
nn_global_add_transport (nn_ipc);
nn_global_add_transport (nn_tcp);
nn_global_add_transport (nn_ws);
+ nn_global_add_transport (nn_tcpmux);
/* Plug in individual socktypes. */
nn_global_add_socktype (nn_pair_socktype);
diff --git a/src/transports/tcpmux/atcpmux.c b/src/transports/tcpmux/atcpmux.c
new file mode 100644
index 0000000..7b24ab0
--- /dev/null
+++ b/src/transports/tcpmux/atcpmux.c
@@ -0,0 +1,314 @@
+/*
+ Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "atcpmux.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/attr.h"
+
+#define NN_ATCPMUX_STATE_IDLE 1
+#define NN_ATCPMUX_STATE_ACCEPTING 2
+#define NN_ATCPMUX_STATE_ACTIVE 3
+#define NN_ATCPMUX_STATE_STOPPING_STCPMUX 4
+#define NN_ATCPMUX_STATE_STOPPING_USOCK 5
+#define NN_ATCPMUX_STATE_DONE 6
+#define NN_ATCPMUX_STATE_STOPPING_STCPMUX_FINAL 7
+#define NN_ATCPMUX_STATE_STOPPING 8
+
+#define NN_ATCPMUX_SRC_USOCK 1
+#define NN_ATCPMUX_SRC_STCPMUX 2
+#define NN_ATCPMUX_SRC_LISTENER 3
+
+/* Private functions. */
+static void nn_atcpmux_handler (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+static void nn_atcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+
+void nn_atcpmux_init (struct nn_atcpmux *self, int src,
+ struct nn_epbase *epbase, struct nn_fsm *owner)
+{
+ nn_fsm_init (&self->fsm, nn_atcpmux_handler, nn_atcpmux_shutdown,
+ src, self, owner);
+ self->state = NN_ATCPMUX_STATE_IDLE;
+ self->epbase = epbase;
+ nn_usock_init (&self->usock, NN_ATCPMUX_SRC_USOCK, &self->fsm);
+ self->listener = NULL;
+ self->listener_owner.src = -1;
+ self->listener_owner.fsm = NULL;
+ nn_stcpmux_init (&self->stcpmux, NN_ATCPMUX_SRC_STCPMUX,
+ epbase, &self->fsm);
+ nn_fsm_event_init (&self->accepted);
+ nn_fsm_event_init (&self->done);
+ nn_list_item_init (&self->item);
+}
+
+void nn_atcpmux_term (struct nn_atcpmux *self)
+{
+ nn_assert_state (self, NN_ATCPMUX_STATE_IDLE);
+
+ nn_list_item_term (&self->item);
+ nn_fsm_event_term (&self->done);
+ nn_fsm_event_term (&self->accepted);
+ nn_stcpmux_term (&self->stcpmux);
+ nn_usock_term (&self->usock);
+ nn_fsm_term (&self->fsm);
+}
+
+int nn_atcpmux_isidle (struct nn_atcpmux *self)
+{
+ return nn_fsm_isidle (&self->fsm);
+}
+
+void nn_atcpmux_start (struct nn_atcpmux *self, struct nn_usock *listener)
+{
+ nn_assert_state (self, NN_ATCPMUX_STATE_IDLE);
+
+ /* Take ownership of the listener socket. */
+ self->listener = listener;
+ self->listener_owner.src = NN_ATCPMUX_SRC_LISTENER;
+ self->listener_owner.fsm = &self->fsm;
+ nn_usock_swap_owner (listener, &self->listener_owner);
+
+ /* Start the state machine. */
+ nn_fsm_start (&self->fsm);
+}
+
+void nn_atcpmux_stop (struct nn_atcpmux *self)
+{
+ nn_fsm_stop (&self->fsm);
+}
+
+static void nn_atcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ NN_UNUSED void *srcptr)
+{
+ struct nn_atcpmux *atcpmux;
+
+ atcpmux = nn_cont (self, struct nn_atcpmux, fsm);
+
+ if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
+ if (!nn_stcpmux_isidle (&atcpmux->stcpmux)) {
+ nn_epbase_stat_increment (atcpmux->epbase,
+ NN_STAT_DROPPED_CONNECTIONS, 1);
+ nn_stcpmux_stop (&atcpmux->stcpmux);
+ }
+ atcpmux->state = NN_ATCPMUX_STATE_STOPPING_STCPMUX_FINAL;
+ }
+ if (nn_slow (atcpmux->state == NN_ATCPMUX_STATE_STOPPING_STCPMUX_FINAL)) {
+ if (!nn_stcpmux_isidle (&atcpmux->stcpmux))
+ return;
+ nn_usock_stop (&atcpmux->usock);
+ atcpmux->state = NN_ATCPMUX_STATE_STOPPING;
+ }
+ if (nn_slow (atcpmux->state == NN_ATCPMUX_STATE_STOPPING)) {
+ if (!nn_usock_isidle (&atcpmux->usock))
+ return;
+ if (atcpmux->listener) {
+ nn_assert (atcpmux->listener_owner.fsm);
+ nn_usock_swap_owner (atcpmux->listener, &atcpmux->listener_owner);
+ atcpmux->listener = NULL;
+ atcpmux->listener_owner.src = -1;
+ atcpmux->listener_owner.fsm = NULL;
+ }
+ atcpmux->state = NN_ATCPMUX_STATE_IDLE;
+ nn_fsm_stopped (&atcpmux->fsm, NN_ATCPMUX_STOPPED);
+ return;
+ }
+
+ nn_fsm_bad_action(atcpmux->state, src, type);
+}
+
+static void nn_atcpmux_handler (struct nn_fsm *self, int src, int type,
+ NN_UNUSED void *srcptr)
+{
+ struct nn_atcpmux *atcpmux;
+ int val;
+ size_t sz;
+
+ atcpmux = nn_cont (self, struct nn_atcpmux, fsm);
+
+ switch (atcpmux->state) {
+
+/******************************************************************************/
+/* IDLE state. */
+/* The state machine wasn't yet started. */
+/******************************************************************************/
+ case NN_ATCPMUX_STATE_IDLE:
+ switch (src) {
+
+ case NN_FSM_ACTION:
+ switch (type) {
+ case NN_FSM_START:
+ nn_usock_accept (&atcpmux->usock, atcpmux->listener);
+ atcpmux->state = NN_ATCPMUX_STATE_ACCEPTING;
+ return;
+ default:
+ nn_fsm_bad_action (atcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (atcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* ACCEPTING state. */
+/* Waiting for incoming connection. */
+/******************************************************************************/
+ case NN_ATCPMUX_STATE_ACCEPTING:
+ switch (src) {
+
+ case NN_ATCPMUX_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_ACCEPTED:
+ nn_epbase_clear_error (atcpmux->epbase);
+
+ /* Set the relevant socket options. */
+ sz = sizeof (val);
+ nn_epbase_getopt (atcpmux->epbase, NN_SOL_SOCKET, NN_SNDBUF,
+ &val, &sz);
+ nn_assert (sz == sizeof (val));
+ nn_usock_setsockopt (&atcpmux->usock, SOL_SOCKET, SO_SNDBUF,
+ &val, sizeof (val));
+ sz = sizeof (val);
+ nn_epbase_getopt (atcpmux->epbase, NN_SOL_SOCKET, NN_RCVBUF,
+ &val, &sz);
+ nn_assert (sz == sizeof (val));
+ nn_usock_setsockopt (&atcpmux->usock, SOL_SOCKET, SO_RCVBUF,
+ &val, sizeof (val));
+
+ /* Return ownership of the listening socket to the parent. */
+ nn_usock_swap_owner (atcpmux->listener,
+ &atcpmux->listener_owner);
+ atcpmux->listener = NULL;
+ atcpmux->listener_owner.src = -1;
+ atcpmux->listener_owner.fsm = NULL;
+ nn_fsm_raise (&atcpmux->fsm, &atcpmux->accepted,
+ NN_ATCPMUX_ACCEPTED);
+
+ /* Start the stcpmux state machine. */
+ nn_usock_activate (&atcpmux->usock);
+ nn_stcpmux_start (&atcpmux->stcpmux, &atcpmux->usock);
+ atcpmux->state = NN_ATCPMUX_STATE_ACTIVE;
+
+ nn_epbase_stat_increment (atcpmux->epbase,
+ NN_STAT_ACCEPTED_CONNECTIONS, 1);
+
+ return;
+
+ default:
+ nn_fsm_bad_action (atcpmux->state, src, type);
+ }
+
+ case NN_ATCPMUX_SRC_LISTENER:
+ switch (type) {
+
+ case NN_USOCK_ACCEPT_ERROR:
+ nn_epbase_set_error (atcpmux->epbase,
+ nn_usock_geterrno(atcpmux->listener));
+ nn_epbase_stat_increment (atcpmux->epbase,
+ NN_STAT_ACCEPT_ERRORS, 1);
+ nn_usock_accept (&atcpmux->usock, atcpmux->listener);
+ return;
+
+ default:
+ nn_fsm_bad_action (atcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (atcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* ACTIVE state. */
+/******************************************************************************/
+ case NN_ATCPMUX_STATE_ACTIVE:
+ switch (src) {
+
+ case NN_ATCPMUX_SRC_STCPMUX:
+ switch (type) {
+ case NN_STCPMUX_ERROR:
+ nn_stcpmux_stop (&atcpmux->stcpmux);
+ atcpmux->state = NN_ATCPMUX_STATE_STOPPING_STCPMUX;
+ nn_epbase_stat_increment (atcpmux->epbase,
+ NN_STAT_BROKEN_CONNECTIONS, 1);
+ return;
+ default:
+ nn_fsm_bad_action (atcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (atcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_STCPMUX state. */
+/******************************************************************************/
+ case NN_ATCPMUX_STATE_STOPPING_STCPMUX:
+ switch (src) {
+
+ case NN_ATCPMUX_SRC_STCPMUX:
+ switch (type) {
+ case NN_USOCK_SHUTDOWN:
+ return;
+ case NN_STCPMUX_STOPPED:
+ nn_usock_stop (&atcpmux->usock);
+ atcpmux->state = NN_ATCPMUX_STATE_STOPPING_USOCK;
+ return;
+ default:
+ nn_fsm_bad_action (atcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (atcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_USOCK state. */
+/******************************************************************************/
+ case NN_ATCPMUX_STATE_STOPPING_USOCK:
+ switch (src) {
+
+ case NN_ATCPMUX_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_SHUTDOWN:
+ return;
+ case NN_USOCK_STOPPED:
+ nn_fsm_raise (&atcpmux->fsm, &atcpmux->done, NN_ATCPMUX_ERROR);
+ atcpmux->state = NN_ATCPMUX_STATE_DONE;
+ return;
+ default:
+ nn_fsm_bad_action (atcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (atcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* Invalid state. */
+/******************************************************************************/
+ default:
+ nn_fsm_bad_state (atcpmux->state, src, type);
+ }
+}
+
diff --git a/src/transports/tcpmux/atcpmux.h b/src/transports/tcpmux/atcpmux.h
new file mode 100644
index 0000000..fb6f9a8
--- /dev/null
+++ b/src/transports/tcpmux/atcpmux.h
@@ -0,0 +1,81 @@
+/*
+ Copyright (c) 2013-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_ATCPMUX_INCLUDED
+#define NN_ATCPMUX_INCLUDED
+
+#include "stcpmux.h"
+
+#include "../../transport.h"
+
+#include "../../aio/fsm.h"
+#include "../../aio/usock.h"
+
+#include "../../utils/list.h"
+
+/* State machine handling accepted TCPMUX sockets. */
+
+/* In btcpmux, some events are just *assumed* to come from a child atcpmux
+ object. By using non-trivial event codes, we can do more reliable sanity
+ checking in such scenarios. */
+#define NN_ATCPMUX_ACCEPTED 34231
+#define NN_ATCPMUX_ERROR 34232
+#define NN_ATCPMUX_STOPPED 34233
+
+struct nn_atcpmux {
+
+ /* The state machine. */
+ struct nn_fsm fsm;
+ int state;
+
+ /* Pointer to the associated endpoint. */
+ struct nn_epbase *epbase;
+
+ /* Underlying socket. */
+ struct nn_usock usock;
+
+ /* Listening socket. Valid only while accepting new connection. */
+ struct nn_usock *listener;
+ struct nn_fsm_owner listener_owner;
+
+ /* State machine that takes care of the connection in the active state. */
+ struct nn_stcpmux stcpmux;
+
+ /* Events generated by atcpmux state machine. */
+ struct nn_fsm_event accepted;
+ struct nn_fsm_event done;
+
+ /* This member can be used by owner to keep individual atcpmuxes
+ in a list. */
+ struct nn_list_item item;
+};
+
+void nn_atcpmux_init (struct nn_atcpmux *self, int src,
+ struct nn_epbase *epbase, struct nn_fsm *owner);
+void nn_atcpmux_term (struct nn_atcpmux *self);
+
+int nn_atcpmux_isidle (struct nn_atcpmux *self);
+void nn_atcpmux_start (struct nn_atcpmux *self, struct nn_usock *listener);
+void nn_atcpmux_stop (struct nn_atcpmux *self);
+
+#endif
+
diff --git a/src/transports/tcpmux/btcpmux.c b/src/transports/tcpmux/btcpmux.c
new file mode 100644
index 0000000..74bc7fe
--- /dev/null
+++ b/src/transports/tcpmux/btcpmux.c
@@ -0,0 +1,506 @@
+/*
+ Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "btcpmux.h"
+#include "atcpmux.h"
+
+#include "../utils/port.h"
+#include "../utils/iface.h"
+
+#include "../../aio/fsm.h"
+#include "../../aio/usock.h"
+
+#include "../utils/backoff.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/alloc.h"
+#include "../../utils/list.h"
+#include "../../utils/fast.h"
+#include "../../utils/int.h"
+
+#include <string.h>
+
+#if defined NN_HAVE_WINDOWS
+#include "../../utils/win.h"
+#else
+#include <unistd.h>
+#include <netinet/in.h>
+#endif
+
+/* The backlog is set relatively high so that there are not too many failed
+ connection attemps during re-connection storms. */
+#define NN_BTCPMUX_BACKLOG 100
+
+#define NN_BTCPMUX_STATE_IDLE 1
+#define NN_BTCPMUX_STATE_ACTIVE 2
+#define NN_BTCPMUX_STATE_STOPPING_ATCPMUX 3
+#define NN_BTCPMUX_STATE_STOPPING_USOCK 4
+#define NN_BTCPMUX_STATE_STOPPING_ATCPMUXES 5
+#define NN_BTCPMUX_STATE_LISTENING 6
+#define NN_BTCPMUX_STATE_WAITING 7
+#define NN_BTCPMUX_STATE_CLOSING 8
+#define NN_BTCPMUX_STATE_STOPPING_BACKOFF 9
+
+#define NN_BTCPMUX_SRC_USOCK 1
+#define NN_BTCPMUX_SRC_ATCPMUX 2
+#define NN_BTCPMUX_SRC_RECONNECT_TIMER 3
+
+struct nn_btcpmux {
+
+ /* The state machine. */
+ struct nn_fsm fsm;
+ int state;
+
+ /* This object is a specific type of endpoint.
+ Thus it is derived from epbase. */
+ struct nn_epbase epbase;
+
+ /* The underlying listening TCPMUX socket. */
+ struct nn_usock usock;
+
+ /* The connection being accepted at the moment. */
+ struct nn_atcpmux *atcpmux;
+
+ /* List of accepted connections. */
+ struct nn_list atcpmuxes;
+
+ /* Used to wait before retrying to connect. */
+ struct nn_backoff retry;
+};
+
+/* nn_epbase virtual interface implementation. */
+static void nn_btcpmux_stop (struct nn_epbase *self);
+static void nn_btcpmux_destroy (struct nn_epbase *self);
+const struct nn_epbase_vfptr nn_btcpmux_epbase_vfptr = {
+ nn_btcpmux_stop,
+ nn_btcpmux_destroy
+};
+
+/* Private functions. */
+static void nn_btcpmux_handler (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+static void nn_btcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+static void nn_btcpmux_start_listening (struct nn_btcpmux *self);
+static void nn_btcpmux_start_accepting (struct nn_btcpmux *self);
+
+int nn_btcpmux_create (void *hint, struct nn_epbase **epbase)
+{
+ int rc;
+ struct nn_btcpmux *self;
+ const char *addr;
+ const char *end;
+ const char *pos;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int ipv4only;
+ size_t ipv4onlylen;
+ int reconnect_ivl;
+ int reconnect_ivl_max;
+ size_t sz;
+
+ /* Allocate the new endpoint object. */
+ self = nn_alloc (sizeof (struct nn_btcpmux), "btcpmux");
+ alloc_assert (self);
+
+ /* Initalise the epbase. */
+ nn_epbase_init (&self->epbase, &nn_btcpmux_epbase_vfptr, hint);
+ addr = nn_epbase_getaddr (&self->epbase);
+
+ /* Parse the port. */
+ end = addr + strlen (addr);
+ pos = strrchr (addr, ':');
+ if (nn_slow (!pos)) {
+ nn_epbase_term (&self->epbase);
+ return -EINVAL;
+ }
+ ++pos;
+ rc = nn_port_resolve (pos, end - pos);
+ if (nn_slow (rc < 0)) {
+ nn_epbase_term (&self->epbase);
+ return -EINVAL;
+ }
+
+ /* Check whether IPv6 is to be used. */
+ ipv4onlylen = sizeof (ipv4only);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY,
+ &ipv4only, &ipv4onlylen);
+ nn_assert (ipv4onlylen == sizeof (ipv4only));
+
+ /* Parse the address. */
+ rc = nn_iface_resolve (addr, pos - addr - 1, ipv4only, &ss, &sslen);
+ if (nn_slow (rc < 0)) {
+ nn_epbase_term (&self->epbase);
+ return -ENODEV;
+ }
+
+ /* Initialise the structure. */
+ nn_fsm_init_root (&self->fsm, nn_btcpmux_handler, nn_btcpmux_shutdown,
+ nn_epbase_getctx (&self->epbase));
+ self->state = NN_BTCPMUX_STATE_IDLE;
+ sz = sizeof (reconnect_ivl);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL,
+ &reconnect_ivl, &sz);
+ nn_assert (sz == sizeof (reconnect_ivl));
+ sz = sizeof (reconnect_ivl_max);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX,
+ &reconnect_ivl_max, &sz);
+ nn_assert (sz == sizeof (reconnect_ivl_max));
+ if (reconnect_ivl_max == 0)
+ reconnect_ivl_max = reconnect_ivl;
+ nn_backoff_init (&self->retry, NN_BTCPMUX_SRC_RECONNECT_TIMER,
+ reconnect_ivl, reconnect_ivl_max, &self->fsm);
+ nn_usock_init (&self->usock, NN_BTCPMUX_SRC_USOCK, &self->fsm);
+ self->atcpmux = NULL;
+ nn_list_init (&self->atcpmuxes);
+
+ /* Start the state machine. */
+ nn_fsm_start (&self->fsm);
+
+ /* Return the base class as an out parameter. */
+ *epbase = &self->epbase;
+
+ return 0;
+}
+
+static void nn_btcpmux_stop (struct nn_epbase *self)
+{
+ struct nn_btcpmux *btcpmux;
+
+ btcpmux = nn_cont (self, struct nn_btcpmux, epbase);
+
+ nn_fsm_stop (&btcpmux->fsm);
+}
+
+static void nn_btcpmux_destroy (struct nn_epbase *self)
+{
+ struct nn_btcpmux *btcpmux;
+
+ btcpmux = nn_cont (self, struct nn_btcpmux, epbase);
+
+ nn_assert_state (btcpmux, NN_BTCPMUX_STATE_IDLE);
+ nn_list_term (&btcpmux->atcpmuxes);
+ nn_assert (btcpmux->atcpmux == NULL);
+ nn_usock_term (&btcpmux->usock);
+ nn_backoff_term (&btcpmux->retry);
+ nn_epbase_term (&btcpmux->epbase);
+ nn_fsm_term (&btcpmux->fsm);
+
+ nn_free (btcpmux);
+}
+
+static void nn_btcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ void *srcptr)
+{
+ struct nn_btcpmux *btcpmux;
+ struct nn_list_item *it;
+ struct nn_atcpmux *atcpmux;
+
+ btcpmux = nn_cont (self, struct nn_btcpmux, fsm);
+
+ if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
+ nn_backoff_stop (&btcpmux->retry);
+ if (btcpmux->atcpmux) {
+ nn_atcpmux_stop (btcpmux->atcpmux);
+ btcpmux->state = NN_BTCPMUX_STATE_STOPPING_ATCPMUX;
+ }
+ else {
+ btcpmux->state = NN_BTCPMUX_STATE_STOPPING_USOCK;
+ }
+ }
+ if (nn_slow (btcpmux->state == NN_BTCPMUX_STATE_STOPPING_ATCPMUX)) {
+ if (!nn_atcpmux_isidle (btcpmux->atcpmux))
+ return;
+ nn_atcpmux_term (btcpmux->atcpmux);
+ nn_free (btcpmux->atcpmux);
+ btcpmux->atcpmux = NULL;
+ nn_usock_stop (&btcpmux->usock);
+ btcpmux->state = NN_BTCPMUX_STATE_STOPPING_USOCK;
+ }
+ if (nn_slow (btcpmux->state == NN_BTCPMUX_STATE_STOPPING_USOCK)) {
+ if (!nn_usock_isidle (&btcpmux->usock))
+ return;
+ for (it = nn_list_begin (&btcpmux->atcpmuxes);
+ it != nn_list_end (&btcpmux->atcpmuxes);
+ it = nn_list_next (&btcpmux->atcpmuxes, it)) {
+ atcpmux = nn_cont (it, struct nn_atcpmux, item);
+ nn_atcpmux_stop (atcpmux);
+ }
+ btcpmux->state = NN_BTCPMUX_STATE_STOPPING_ATCPMUXES;
+ goto atcpmuxes_stopping;
+ }
+ if (nn_slow (btcpmux->state == NN_BTCPMUX_STATE_STOPPING_ATCPMUXES)) {
+ nn_assert (src == NN_BTCPMUX_SRC_ATCPMUX && type == NN_ATCPMUX_STOPPED);
+ atcpmux = (struct nn_atcpmux *) srcptr;
+ nn_list_erase (&btcpmux->atcpmuxes, &atcpmux->item);
+ nn_atcpmux_term (atcpmux);
+ nn_free (atcpmux);
+
+ /* If there are no more atcpmux state machines, we can stop the whole
+ btcpmux object. */
+atcpmuxes_stopping:
+ if (nn_list_empty (&btcpmux->atcpmuxes)) {
+ btcpmux->state = NN_BTCPMUX_STATE_IDLE;
+ nn_fsm_stopped_noevent (&btcpmux->fsm);
+ nn_epbase_stopped (&btcpmux->epbase);
+ return;
+ }
+
+ return;
+ }
+
+ nn_fsm_bad_action(btcpmux->state, src, type);
+}
+
+static void nn_btcpmux_handler (struct nn_fsm *self, int src, int type,
+ void *srcptr)
+{
+ struct nn_btcpmux *btcpmux;
+ struct nn_atcpmux *atcpmux;
+
+ btcpmux = nn_cont (self, struct nn_btcpmux, fsm);
+
+ switch (btcpmux->state) {
+
+/******************************************************************************/
+/* IDLE state. */
+/******************************************************************************/
+ case NN_BTCPMUX_STATE_IDLE:
+ switch (src) {
+
+ case NN_FSM_ACTION:
+ switch (type) {
+ case NN_FSM_START:
+ nn_btcpmux_start_listening (btcpmux);
+ return;
+ default:
+ nn_fsm_bad_action (btcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (btcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* ACTIVE state. */
+/* The execution is yielded to the atcpmux state machine in this state. */
+/******************************************************************************/
+ case NN_BTCPMUX_STATE_ACTIVE:
+ if (srcptr == btcpmux->atcpmux) {
+ switch (type) {
+ case NN_ATCPMUX_ACCEPTED:
+
+ /* Move the newly created connection to the list of existing
+ connections. */
+ nn_list_insert (&btcpmux->atcpmuxes, &btcpmux->atcpmux->item,
+ nn_list_end (&btcpmux->atcpmuxes));
+ btcpmux->atcpmux = NULL;
+
+ /* Start waiting for a new incoming connection. */
+ nn_btcpmux_start_accepting (btcpmux);
+
+ return;
+
+ default:
+ nn_fsm_bad_action (btcpmux->state, src, type);
+ }
+ }
+
+ /* For all remaining events we'll assume they are coming from one
+ of remaining child atcpmux objects. */
+ nn_assert (src == NN_BTCPMUX_SRC_ATCPMUX);
+ atcpmux = (struct nn_atcpmux*) srcptr;
+ switch (type) {
+ case NN_ATCPMUX_ERROR:
+ nn_atcpmux_stop (atcpmux);
+ return;
+ case NN_ATCPMUX_STOPPED:
+ nn_list_erase (&btcpmux->atcpmuxes, &atcpmux->item);
+ nn_atcpmux_term (atcpmux);
+ nn_free (atcpmux);
+ return;
+ default:
+ nn_fsm_bad_action (btcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* CLOSING_USOCK state. */
+/* usock object was asked to stop but it haven't stopped yet. */
+/******************************************************************************/
+ case NN_BTCPMUX_STATE_CLOSING:
+ switch (src) {
+
+ case NN_BTCPMUX_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_SHUTDOWN:
+ return;
+ case NN_USOCK_STOPPED:
+ nn_backoff_start (&btcpmux->retry);
+ btcpmux->state = NN_BTCPMUX_STATE_WAITING;
+ return;
+ default:
+ nn_fsm_bad_action (btcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (btcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* WAITING state. */
+/* Waiting before re-bind is attempted. This way we won't overload */
+/* the system by continuous re-bind attemps. */
+/******************************************************************************/
+ case NN_BTCPMUX_STATE_WAITING:
+ switch (src) {
+
+ case NN_BTCPMUX_SRC_RECONNECT_TIMER:
+ switch (type) {
+ case NN_BACKOFF_TIMEOUT:
+ nn_backoff_stop (&btcpmux->retry);
+ btcpmux->state = NN_BTCPMUX_STATE_STOPPING_BACKOFF;
+ return;
+ default:
+ nn_fsm_bad_action (btcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (btcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_BACKOFF state. */
+/* backoff object was asked to stop, but it haven't stopped yet. */
+/******************************************************************************/
+ case NN_BTCPMUX_STATE_STOPPING_BACKOFF:
+ switch (src) {
+
+ case NN_BTCPMUX_SRC_RECONNECT_TIMER:
+ switch (type) {
+ case NN_BACKOFF_STOPPED:
+ nn_btcpmux_start_listening (btcpmux);
+ return;
+ default:
+ nn_fsm_bad_action (btcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (btcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* Invalid state. */
+/******************************************************************************/
+ default:
+ nn_fsm_bad_state (btcpmux->state, src, type);
+ }
+}
+
+/******************************************************************************/
+/* State machine actions. */
+/******************************************************************************/
+
+static void nn_btcpmux_start_listening (struct nn_btcpmux *self)
+{
+ int rc;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int ipv4only;
+ size_t ipv4onlylen;
+ const char *addr;
+ const char *end;
+ const char *pos;
+ uint16_t port;
+
+ /* First, resolve the IP address. */
+ addr = nn_epbase_getaddr (&self->epbase);
+ memset (&ss, 0, sizeof (ss));
+
+ /* Parse the port. */
+ end = addr + strlen (addr);
+ pos = strrchr (addr, ':');
+ nn_assert (pos);
+ ++pos;
+ rc = nn_port_resolve (pos, end - pos);
+ nn_assert (rc >= 0);
+ port = rc;
+
+ /* Parse the address. */
+ ipv4onlylen = sizeof (ipv4only);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY,
+ &ipv4only, &ipv4onlylen);
+ nn_assert (ipv4onlylen == sizeof (ipv4only));
+ rc = nn_iface_resolve (addr, pos - addr - 1, ipv4only, &ss, &sslen);
+ errnum_assert (rc == 0, -rc);
+
+ /* Combine the port and the address. */
+ if (ss.ss_family == AF_INET) {
+ ((struct sockaddr_in*) &ss)->sin_port = htons (port);
+ sslen = sizeof (struct sockaddr_in);
+ }
+ else if (ss.ss_family == AF_INET6) {
+ ((struct sockaddr_in6*) &ss)->sin6_port = htons (port);
+ sslen = sizeof (struct sockaddr_in6);
+ }
+ else
+ nn_assert (0);
+
+ /* Start listening for incoming connections. */
+ rc = nn_usock_start (&self->usock, ss.ss_family, SOCK_STREAM, 0);
+ if (nn_slow (rc < 0)) {
+ nn_backoff_start (&self->retry);
+ self->state = NN_BTCPMUX_STATE_WAITING;
+ return;
+ }
+
+ rc = nn_usock_bind (&self->usock, (struct sockaddr*) &ss, (size_t) sslen);
+ if (nn_slow (rc < 0)) {
+ nn_usock_stop (&self->usock);
+ self->state = NN_BTCPMUX_STATE_CLOSING;
+ return;
+ }
+
+ rc = nn_usock_listen (&self->usock, NN_BTCPMUX_BACKLOG);
+ if (nn_slow (rc < 0)) {
+ nn_usock_stop (&self->usock);
+ self->state = NN_BTCPMUX_STATE_CLOSING;
+ return;
+ }
+ nn_btcpmux_start_accepting(self);
+ self->state = NN_BTCPMUX_STATE_ACTIVE;
+}
+
+static void nn_btcpmux_start_accepting (struct nn_btcpmux *self)
+{
+ nn_assert (self->atcpmux == NULL);
+
+ /* Allocate new atcpmux state machine. */
+ self->atcpmux = nn_alloc (sizeof (struct nn_atcpmux), "atcpmux");
+ alloc_assert (self->atcpmux);
+ nn_atcpmux_init (self->atcpmux, NN_BTCPMUX_SRC_ATCPMUX,
+ &self->epbase, &self->fsm);
+
+ /* Start waiting for a new incoming connection. */
+ nn_atcpmux_start (self->atcpmux, &self->usock);
+}
+
diff --git a/src/transports/tcpmux/btcpmux.h b/src/transports/tcpmux/btcpmux.h
new file mode 100644
index 0000000..48983a8
--- /dev/null
+++ b/src/transports/tcpmux/btcpmux.h
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2013-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_BTCPMUX_INCLUDED
+#define NN_BTCPMUX_INCLUDED
+
+#include "../../transport.h"
+
+/* State machine managing bound TCPMUX socket. */
+
+int nn_btcpmux_create (void *hint, struct nn_epbase **epbase);
+
+#endif
+
diff --git a/src/transports/tcpmux/ctcpmux.c b/src/transports/tcpmux/ctcpmux.c
new file mode 100644
index 0000000..d9f8501
--- /dev/null
+++ b/src/transports/tcpmux/ctcpmux.c
@@ -0,0 +1,632 @@
+/*
+ Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "ctcpmux.h"
+#include "stcpmux.h"
+
+#include "../../tcpmux.h"
+
+#include "../utils/dns.h"
+#include "../utils/port.h"
+#include "../utils/iface.h"
+#include "../utils/backoff.h"
+#include "../utils/literal.h"
+
+#include "../../aio/fsm.h"
+#include "../../aio/usock.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/alloc.h"
+#include "../../utils/fast.h"
+#include "../../utils/int.h"
+#include "../../utils/attr.h"
+
+#include <string.h>
+
+#if defined NN_HAVE_WINDOWS
+#include "../../utils/win.h"
+#else
+#include <unistd.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
+#define NN_CTCPMUX_STATE_IDLE 1
+#define NN_CTCPMUX_STATE_RESOLVING 2
+#define NN_CTCPMUX_STATE_STOPPING_DNS 3
+#define NN_CTCPMUX_STATE_CONNECTING 4
+#define NN_CTCPMUX_STATE_ACTIVE 5
+#define NN_CTCPMUX_STATE_STOPPING_STCPMUX 6
+#define NN_CTCPMUX_STATE_STOPPING_USOCK 7
+#define NN_CTCPMUX_STATE_WAITING 8
+#define NN_CTCPMUX_STATE_STOPPING_BACKOFF 9
+#define NN_CTCPMUX_STATE_STOPPING_STCPMUX_FINAL 10
+#define NN_CTCPMUX_STATE_STOPPING 11
+
+#define NN_CTCPMUX_SRC_USOCK 1
+#define NN_CTCPMUX_SRC_RECONNECT_TIMER 2
+#define NN_CTCPMUX_SRC_DNS 3
+#define NN_CTCPMUX_SRC_STCPMUX 4
+
+struct nn_ctcpmux {
+
+ /* The state machine. */
+ struct nn_fsm fsm;
+ int state;
+
+ /* This object is a specific type of endpoint.
+ Thus it is derived from epbase. */
+ struct nn_epbase epbase;
+
+ /* The underlying TCPMUX socket. */
+ struct nn_usock usock;
+
+ /* Used to wait before retrying to connect. */
+ struct nn_backoff retry;
+
+ /* State machine that handles the active part of the connection
+ lifetime. */
+ struct nn_stcpmux stcpmux;
+
+ /* DNS resolver used to convert textual address into actual IP address
+ along with the variable to hold the result. */
+ struct nn_dns dns;
+ struct nn_dns_result dns_result;
+};
+
+/* nn_epbase virtual interface implementation. */
+static void nn_ctcpmux_stop (struct nn_epbase *self);
+static void nn_ctcpmux_destroy (struct nn_epbase *self);
+const struct nn_epbase_vfptr nn_ctcpmux_epbase_vfptr = {
+ nn_ctcpmux_stop,
+ nn_ctcpmux_destroy
+};
+
+/* Private functions. */
+static void nn_ctcpmux_handler (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+static void nn_ctcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+static void nn_ctcpmux_start_resolving (struct nn_ctcpmux *self);
+static void nn_ctcpmux_start_connecting (struct nn_ctcpmux *self,
+ struct sockaddr_storage *ss, size_t sslen);
+
+int nn_ctcpmux_create (void *hint, struct nn_epbase **epbase)
+{
+ int rc;
+ const char *addr;
+ size_t addrlen;
+ const char *semicolon;
+ const char *hostname;
+ const char *colon;
+ const char *end;
+ struct sockaddr_storage ss;
+ size_t sslen;
+ int ipv4only;
+ size_t ipv4onlylen;
+ struct nn_ctcpmux *self;
+ int reconnect_ivl;
+ int reconnect_ivl_max;
+ size_t sz;
+
+ /* Allocate the new endpoint object. */
+ self = nn_alloc (sizeof (struct nn_ctcpmux), "ctcpmux");
+ alloc_assert (self);
+
+ /* Initalise the endpoint. */
+ nn_epbase_init (&self->epbase, &nn_ctcpmux_epbase_vfptr, hint);
+
+ /* Check whether IPv6 is to be used. */
+ ipv4onlylen = sizeof (ipv4only);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY,
+ &ipv4only, &ipv4onlylen);
+ nn_assert (ipv4onlylen == sizeof (ipv4only));
+
+ /* Start parsing the address. */
+ addr = nn_epbase_getaddr (&self->epbase);
+ addrlen = strlen (addr);
+ semicolon = strchr (addr, ';');
+ hostname = semicolon ? semicolon + 1 : addr;
+ colon = strrchr (addr, ':');
+ end = addr + addrlen;
+
+ /* Parse the port. */
+ if (nn_slow (!colon)) {
+ nn_epbase_term (&self->epbase);
+ return -EINVAL;
+ }
+ rc = nn_port_resolve (colon + 1, end - colon - 1);
+ if (nn_slow (rc < 0)) {
+ nn_epbase_term (&self->epbase);
+ return -EINVAL;
+ }
+
+ /* Check whether the host portion of the address is either a literal
+ or a valid hostname. */
+ if (nn_dns_check_hostname (hostname, colon - hostname) < 0 &&
+ nn_literal_resolve (hostname, colon - hostname, ipv4only,
+ &ss, &sslen) < 0) {
+ nn_epbase_term (&self->epbase);
+ return -EINVAL;
+ }
+
+ /* If local address is specified, check whether it is valid. */
+ if (semicolon) {
+ rc = nn_iface_resolve (addr, semicolon - addr, ipv4only, &ss, &sslen);
+ if (rc < 0) {
+ nn_epbase_term (&self->epbase);
+ return -ENODEV;
+ }
+ }
+
+ /* Initialise the structure. */
+ nn_fsm_init_root (&self->fsm, nn_ctcpmux_handler, nn_ctcpmux_shutdown,
+ nn_epbase_getctx (&self->epbase));
+ self->state = NN_CTCPMUX_STATE_IDLE;
+ nn_usock_init (&self->usock, NN_CTCPMUX_SRC_USOCK, &self->fsm);
+ sz = sizeof (reconnect_ivl);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL,
+ &reconnect_ivl, &sz);
+ nn_assert (sz == sizeof (reconnect_ivl));
+ sz = sizeof (reconnect_ivl_max);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RECONNECT_IVL_MAX,
+ &reconnect_ivl_max, &sz);
+ nn_assert (sz == sizeof (reconnect_ivl_max));
+ if (reconnect_ivl_max == 0)
+ reconnect_ivl_max = reconnect_ivl;
+ nn_backoff_init (&self->retry, NN_CTCPMUX_SRC_RECONNECT_TIMER,
+ reconnect_ivl, reconnect_ivl_max, &self->fsm);
+ nn_stcpmux_init (&self->stcpmux, NN_CTCPMUX_SRC_STCPMUX, &self->epbase, &self->fsm);
+ nn_dns_init (&self->dns, NN_CTCPMUX_SRC_DNS, &self->fsm);
+
+ /* Start the state machine. */
+ nn_fsm_start (&self->fsm);
+
+ /* Return the base class as an out parameter. */
+ *epbase = &self->epbase;
+
+ return 0;
+}
+
+static void nn_ctcpmux_stop (struct nn_epbase *self)
+{
+ struct nn_ctcpmux *ctcpmux;
+
+ ctcpmux = nn_cont (self, struct nn_ctcpmux, epbase);
+
+ nn_fsm_stop (&ctcpmux->fsm);
+}
+
+static void nn_ctcpmux_destroy (struct nn_epbase *self)
+{
+ struct nn_ctcpmux *ctcpmux;
+
+ ctcpmux = nn_cont (self, struct nn_ctcpmux, epbase);
+
+ nn_dns_term (&ctcpmux->dns);
+ nn_stcpmux_term (&ctcpmux->stcpmux);
+ nn_backoff_term (&ctcpmux->retry);
+ nn_usock_term (&ctcpmux->usock);
+ nn_fsm_term (&ctcpmux->fsm);
+ nn_epbase_term (&ctcpmux->epbase);
+
+ nn_free (ctcpmux);
+}
+
+static void nn_ctcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ NN_UNUSED void *srcptr)
+{
+ struct nn_ctcpmux *ctcpmux;
+
+ ctcpmux = nn_cont (self, struct nn_ctcpmux, fsm);
+
+ if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
+ if (!nn_stcpmux_isidle (&ctcpmux->stcpmux)) {
+ nn_epbase_stat_increment (&ctcpmux->epbase,
+ NN_STAT_DROPPED_CONNECTIONS, 1);
+ nn_stcpmux_stop (&ctcpmux->stcpmux);
+ }
+ ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_STCPMUX_FINAL;
+ }
+ if (nn_slow (ctcpmux->state == NN_CTCPMUX_STATE_STOPPING_STCPMUX_FINAL)) {
+ if (!nn_stcpmux_isidle (&ctcpmux->stcpmux))
+ return;
+ nn_backoff_stop (&ctcpmux->retry);
+ nn_usock_stop (&ctcpmux->usock);
+ nn_dns_stop (&ctcpmux->dns);
+ ctcpmux->state = NN_CTCPMUX_STATE_STOPPING;
+ }
+ if (nn_slow (ctcpmux->state == NN_CTCPMUX_STATE_STOPPING)) {
+ if (!nn_backoff_isidle (&ctcpmux->retry) ||
+ !nn_usock_isidle (&ctcpmux->usock) ||
+ !nn_dns_isidle (&ctcpmux->dns))
+ return;
+ ctcpmux->state = NN_CTCPMUX_STATE_IDLE;
+ nn_fsm_stopped_noevent (&ctcpmux->fsm);
+ nn_epbase_stopped (&ctcpmux->epbase);
+ return;
+ }
+
+ nn_fsm_bad_state (ctcpmux->state, src, type);
+}
+
+static void nn_ctcpmux_handler (struct nn_fsm *self, int src, int type,
+ NN_UNUSED void *srcptr)
+{
+ struct nn_ctcpmux *ctcpmux;
+
+ ctcpmux = nn_cont (self, struct nn_ctcpmux, fsm);
+
+ switch (ctcpmux->state) {
+
+/******************************************************************************/
+/* IDLE state. */
+/* The state machine wasn't yet started. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_IDLE:
+ switch (src) {
+
+ case NN_FSM_ACTION:
+ switch (type) {
+ case NN_FSM_START:
+ nn_ctcpmux_start_resolving (ctcpmux);
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* RESOLVING state. */
+/* Name of the host to connect to is being resolved to get an IP address. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_RESOLVING:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_DNS:
+ switch (type) {
+ case NN_DNS_DONE:
+ nn_dns_stop (&ctcpmux->dns);
+ ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_DNS;
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_DNS state. */
+/* dns object was asked to stop but it haven't stopped yet. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_STOPPING_DNS:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_DNS:
+ switch (type) {
+ case NN_DNS_STOPPED:
+ if (ctcpmux->dns_result.error == 0) {
+ nn_ctcpmux_start_connecting (ctcpmux,
+ &ctcpmux->dns_result.addr,
+ ctcpmux->dns_result.addrlen);
+ return;
+ }
+ nn_backoff_start (&ctcpmux->retry);
+ ctcpmux->state = NN_CTCPMUX_STATE_WAITING;
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* CONNECTING state. */
+/* Non-blocking connect is under way. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_CONNECTING:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_CONNECTED:
+ nn_stcpmux_start (&ctcpmux->stcpmux, &ctcpmux->usock);
+ ctcpmux->state = NN_CTCPMUX_STATE_ACTIVE;
+ nn_epbase_stat_increment (&ctcpmux->epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, -1);
+ nn_epbase_stat_increment (&ctcpmux->epbase,
+ NN_STAT_ESTABLISHED_CONNECTIONS, 1);
+ nn_epbase_clear_error (&ctcpmux->epbase);
+ return;
+ case NN_USOCK_ERROR:
+ nn_epbase_set_error (&ctcpmux->epbase,
+ nn_usock_geterrno (&ctcpmux->usock));
+ nn_usock_stop (&ctcpmux->usock);
+ ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_USOCK;
+ nn_epbase_stat_increment (&ctcpmux->epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, -1);
+ nn_epbase_stat_increment (&ctcpmux->epbase,
+ NN_STAT_CONNECT_ERRORS, 1);
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* ACTIVE state. */
+/* Connection is established and handled by the stcpmux state machine. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_ACTIVE:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_STCPMUX:
+ switch (type) {
+ case NN_STCPMUX_ERROR:
+ nn_stcpmux_stop (&ctcpmux->stcpmux);
+ ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_STCPMUX;
+ nn_epbase_stat_increment (&ctcpmux->epbase,
+ NN_STAT_BROKEN_CONNECTIONS, 1);
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_STCPMUX state. */
+/* stcpmux object was asked to stop but it haven't stopped yet. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_STOPPING_STCPMUX:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_STCPMUX:
+ switch (type) {
+ case NN_USOCK_SHUTDOWN:
+ return;
+ case NN_STCPMUX_STOPPED:
+ nn_usock_stop (&ctcpmux->usock);
+ ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_USOCK;
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_USOCK state. */
+/* usock object was asked to stop but it haven't stopped yet. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_STOPPING_USOCK:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_SHUTDOWN:
+ return;
+ case NN_USOCK_STOPPED:
+ nn_backoff_start (&ctcpmux->retry);
+ ctcpmux->state = NN_CTCPMUX_STATE_WAITING;
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* WAITING state. */
+/* Waiting before re-connection is attempted. This way we won't overload */
+/* the system by continuous re-connection attemps. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_WAITING:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_RECONNECT_TIMER:
+ switch (type) {
+ case NN_BACKOFF_TIMEOUT:
+ nn_backoff_stop (&ctcpmux->retry);
+ ctcpmux->state = NN_CTCPMUX_STATE_STOPPING_BACKOFF;
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_BACKOFF state. */
+/* backoff object was asked to stop, but it haven't stopped yet. */
+/******************************************************************************/
+ case NN_CTCPMUX_STATE_STOPPING_BACKOFF:
+ switch (src) {
+
+ case NN_CTCPMUX_SRC_RECONNECT_TIMER:
+ switch (type) {
+ case NN_BACKOFF_STOPPED:
+ nn_ctcpmux_start_resolving (ctcpmux);
+ return;
+ default:
+ nn_fsm_bad_action (ctcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (ctcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* Invalid state. */
+/******************************************************************************/
+ default:
+ nn_fsm_bad_state (ctcpmux->state, src, type);
+ }
+}
+
+/******************************************************************************/
+/* State machine actions. */
+/******************************************************************************/
+
+static void nn_ctcpmux_start_resolving (struct nn_ctcpmux *self)
+{
+ const char *addr;
+ const char *begin;
+ const char *end;
+ int ipv4only;
+ size_t ipv4onlylen;
+
+ /* Extract the hostname part from address string. */
+ addr = nn_epbase_getaddr (&self->epbase);
+ begin = strchr (addr, ';');
+ if (!begin)
+ begin = addr;
+ else
+ ++begin;
+ end = strrchr (addr, ':');
+ nn_assert (end);
+
+ /* Check whether IPv6 is to be used. */
+ ipv4onlylen = sizeof (ipv4only);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY,
+ &ipv4only, &ipv4onlylen);
+ nn_assert (ipv4onlylen == sizeof (ipv4only));
+
+ /* TODO: Get the actual value of IPV4ONLY option. */
+ nn_dns_start (&self->dns, begin, end - begin, ipv4only, &self->dns_result);
+
+ self->state = NN_CTCPMUX_STATE_RESOLVING;
+}
+
+static void nn_ctcpmux_start_connecting (struct nn_ctcpmux *self,
+ struct sockaddr_storage *ss, size_t sslen)
+{
+ int rc;
+ struct sockaddr_storage remote;
+ size_t remotelen;
+ struct sockaddr_storage local;
+ size_t locallen;
+ const char *addr;
+ const char *end;
+ const char *colon;
+ const char *semicolon;
+ uint16_t port;
+ int ipv4only;
+ size_t ipv4onlylen;
+ int val;
+ size_t sz;
+
+ /* Create IP address from the address string. */
+ addr = nn_epbase_getaddr (&self->epbase);
+ memset (&remote, 0, sizeof (remote));
+
+ /* Parse the port. */
+ end = addr + strlen (addr);
+ colon = strrchr (addr, ':');
+ rc = nn_port_resolve (colon + 1, end - colon - 1);
+ errnum_assert (rc > 0, -rc);
+ port = rc;
+
+ /* Check whether IPv6 is to be used. */
+ ipv4onlylen = sizeof (ipv4only);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_IPV4ONLY,
+ &ipv4only, &ipv4onlylen);
+ nn_assert (ipv4onlylen == sizeof (ipv4only));
+
+ /* Parse the local address, if any. */
+ semicolon = strchr (addr, ';');
+ memset (&local, 0, sizeof (local));
+ if (semicolon)
+ rc = nn_iface_resolve (addr, semicolon - addr, ipv4only,
+ &local, &locallen);
+ else
+ rc = nn_iface_resolve ("*", 1, ipv4only, &local, &locallen);
+ if (nn_slow (rc < 0)) {
+ nn_backoff_start (&self->retry);
+ self->state = NN_CTCPMUX_STATE_WAITING;
+ return;
+ }
+
+ /* Combine the remote address and the port. */
+ remote = *ss;
+ remotelen = sslen;
+ if (remote.ss_family == AF_INET)
+ ((struct sockaddr_in*) &remote)->sin_port = htons (port);
+ else if (remote.ss_family == AF_INET6)
+ ((struct sockaddr_in6*) &remote)->sin6_port = htons (port);
+ else
+ nn_assert (0);
+
+ /* Try to start the underlying socket. */
+ rc = nn_usock_start (&self->usock, remote.ss_family, SOCK_STREAM, 0);
+ if (nn_slow (rc < 0)) {
+ nn_backoff_start (&self->retry);
+ self->state = NN_CTCPMUX_STATE_WAITING;
+ return;
+ }
+
+ /* Set the relevant socket options. */
+ sz = sizeof (val);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_SNDBUF, &val, &sz);
+ nn_assert (sz == sizeof (val));
+ nn_usock_setsockopt (&self->usock, SOL_SOCKET, SO_SNDBUF,
+ &val, sizeof (val));
+ sz = sizeof (val);
+ nn_epbase_getopt (&self->epbase, NN_SOL_SOCKET, NN_RCVBUF, &val, &sz);
+ nn_assert (sz == sizeof (val));
+ nn_usock_setsockopt (&self->usock, SOL_SOCKET, SO_RCVBUF,
+ &val, sizeof (val));
+
+ /* Bind the socket to the local network interface. */
+ rc = nn_usock_bind (&self->usock, (struct sockaddr*) &local, locallen);
+ if (nn_slow (rc != 0)) {
+ nn_backoff_start (&self->retry);
+ self->state = NN_CTCPMUX_STATE_WAITING;
+ return;
+ }
+
+ /* Start connecting. */
+ nn_usock_connect (&self->usock, (struct sockaddr*) &remote, remotelen);
+ self->state = NN_CTCPMUX_STATE_CONNECTING;
+ nn_epbase_stat_increment (&self->epbase,
+ NN_STAT_INPROGRESS_CONNECTIONS, 1);
+}
+
diff --git a/src/transports/tcpmux/ctcpmux.h b/src/transports/tcpmux/ctcpmux.h
new file mode 100644
index 0000000..294f4fa
--- /dev/null
+++ b/src/transports/tcpmux/ctcpmux.h
@@ -0,0 +1,33 @@
+/*
+ Copyright (c) 2013-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_CTCPMUX_INCLUDED
+#define NN_CTCPMUX_INCLUDED
+
+#include "../../transport.h"
+
+/* State machine managing connected TCPMUX socket. */
+
+int nn_ctcpmux_create (void *hint, struct nn_epbase **epbase);
+
+#endif
+
diff --git a/src/transports/tcpmux/stcpmux.c b/src/transports/tcpmux/stcpmux.c
new file mode 100644
index 0000000..abbf180
--- /dev/null
+++ b/src/transports/tcpmux/stcpmux.c
@@ -0,0 +1,417 @@
+/*
+ Copyright (c) 2013-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "stcpmux.h"
+
+#include "../../utils/err.h"
+#include "../../utils/cont.h"
+#include "../../utils/fast.h"
+#include "../../utils/wire.h"
+#include "../../utils/int.h"
+#include "../../utils/attr.h"
+
+/* States of the object as a whole. */
+#define NN_STCPMUX_STATE_IDLE 1
+#define NN_STCPMUX_STATE_PROTOHDR 2
+#define NN_STCPMUX_STATE_STOPPING_STREAMHDR 3
+#define NN_STCPMUX_STATE_ACTIVE 4
+#define NN_STCPMUX_STATE_SHUTTING_DOWN 5
+#define NN_STCPMUX_STATE_DONE 6
+#define NN_STCPMUX_STATE_STOPPING 7
+
+/* Possible states of the inbound part of the object. */
+#define NN_STCPMUX_INSTATE_HDR 1
+#define NN_STCPMUX_INSTATE_BODY 2
+#define NN_STCPMUX_INSTATE_HASMSG 3
+
+/* Possible states of the outbound part of the object. */
+#define NN_STCPMUX_OUTSTATE_IDLE 1
+#define NN_STCPMUX_OUTSTATE_SENDING 2
+
+/* Subordinate srcptr objects. */
+#define NN_STCPMUX_SRC_USOCK 1
+#define NN_STCPMUX_SRC_STREAMHDR 2
+
+/* Stream is a special type of pipe. Implementation of the virtual pipe API. */
+static int nn_stcpmux_send (struct nn_pipebase *self, struct nn_msg *msg);
+static int nn_stcpmux_recv (struct nn_pipebase *self, struct nn_msg *msg);
+const struct nn_pipebase_vfptr nn_stcpmux_pipebase_vfptr = {
+ nn_stcpmux_send,
+ nn_stcpmux_recv
+};
+
+/* Private functions. */
+static void nn_stcpmux_handler (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+static void nn_stcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ void *srcptr);
+
+void nn_stcpmux_init (struct nn_stcpmux *self, int src,
+ struct nn_epbase *epbase, struct nn_fsm *owner)
+{
+ nn_fsm_init (&self->fsm, nn_stcpmux_handler, nn_stcpmux_shutdown,
+ src, self, owner);
+ self->state = NN_STCPMUX_STATE_IDLE;
+ nn_streamhdr_init (&self->streamhdr, NN_STCPMUX_SRC_STREAMHDR, &self->fsm);
+ self->usock = NULL;
+ self->usock_owner.src = -1;
+ self->usock_owner.fsm = NULL;
+ nn_pipebase_init (&self->pipebase, &nn_stcpmux_pipebase_vfptr, epbase);
+ self->instate = -1;
+ nn_msg_init (&self->inmsg, 0);
+ self->outstate = -1;
+ nn_msg_init (&self->outmsg, 0);
+ nn_fsm_event_init (&self->done);
+}
+
+void nn_stcpmux_term (struct nn_stcpmux *self)
+{
+ nn_assert_state (self, NN_STCPMUX_STATE_IDLE);
+
+ nn_fsm_event_term (&self->done);
+ nn_msg_term (&self->outmsg);
+ nn_msg_term (&self->inmsg);
+ nn_pipebase_term (&self->pipebase);
+ nn_streamhdr_term (&self->streamhdr);
+ nn_fsm_term (&self->fsm);
+}
+
+int nn_stcpmux_isidle (struct nn_stcpmux *self)
+{
+ return nn_fsm_isidle (&self->fsm);
+}
+
+void nn_stcpmux_start (struct nn_stcpmux *self, struct nn_usock *usock)
+{
+ /* Take ownership of the underlying socket. */
+ nn_assert (self->usock == NULL && self->usock_owner.fsm == NULL);
+ self->usock_owner.src = NN_STCPMUX_SRC_USOCK;
+ self->usock_owner.fsm = &self->fsm;
+ nn_usock_swap_owner (usock, &self->usock_owner);
+ self->usock = usock;
+
+ /* Launch the state machine. */
+ nn_fsm_start (&self->fsm);
+}
+
+void nn_stcpmux_stop (struct nn_stcpmux *self)
+{
+ nn_fsm_stop (&self->fsm);
+}
+
+static int nn_stcpmux_send (struct nn_pipebase *self, struct nn_msg *msg)
+{
+ struct nn_stcpmux *stcpmux;
+ struct nn_iovec iov [3];
+
+ stcpmux = nn_cont (self, struct nn_stcpmux, pipebase);
+
+ nn_assert_state (stcpmux, NN_STCPMUX_STATE_ACTIVE);
+ nn_assert (stcpmux->outstate == NN_STCPMUX_OUTSTATE_IDLE);
+
+ /* Move the message to the local storage. */
+ nn_msg_term (&stcpmux->outmsg);
+ nn_msg_mv (&stcpmux->outmsg, msg);
+
+ /* Serialise the message header. */
+ nn_putll (stcpmux->outhdr, nn_chunkref_size (&stcpmux->outmsg.sphdr) +
+ nn_chunkref_size (&stcpmux->outmsg.body));
+
+ /* Start async sending. */
+ iov [0].iov_base = stcpmux->outhdr;
+ iov [0].iov_len = sizeof (stcpmux->outhdr);
+ iov [1].iov_base = nn_chunkref_data (&stcpmux->outmsg.sphdr);
+ iov [1].iov_len = nn_chunkref_size (&stcpmux->outmsg.sphdr);
+ iov [2].iov_base = nn_chunkref_data (&stcpmux->outmsg.body);
+ iov [2].iov_len = nn_chunkref_size (&stcpmux->outmsg.body);
+ nn_usock_send (stcpmux->usock, iov, 3);
+
+ stcpmux->outstate = NN_STCPMUX_OUTSTATE_SENDING;
+
+ return 0;
+}
+
+static int nn_stcpmux_recv (struct nn_pipebase *self, struct nn_msg *msg)
+{
+ struct nn_stcpmux *stcpmux;
+
+ stcpmux = nn_cont (self, struct nn_stcpmux, pipebase);
+
+ nn_assert_state (stcpmux, NN_STCPMUX_STATE_ACTIVE);
+ nn_assert (stcpmux->instate == NN_STCPMUX_INSTATE_HASMSG);
+
+ /* Move received message to the user. */
+ nn_msg_mv (msg, &stcpmux->inmsg);
+ nn_msg_init (&stcpmux->inmsg, 0);
+
+ /* Start receiving new message. */
+ stcpmux->instate = NN_STCPMUX_INSTATE_HDR;
+ nn_usock_recv (stcpmux->usock, stcpmux->inhdr, sizeof (stcpmux->inhdr));
+
+ return 0;
+}
+
+static void nn_stcpmux_shutdown (struct nn_fsm *self, int src, int type,
+ NN_UNUSED void *srcptr)
+{
+ struct nn_stcpmux *stcpmux;
+
+ stcpmux = nn_cont (self, struct nn_stcpmux, fsm);
+
+ if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
+ nn_pipebase_stop (&stcpmux->pipebase);
+ nn_streamhdr_stop (&stcpmux->streamhdr);
+ stcpmux->state = NN_STCPMUX_STATE_STOPPING;
+ }
+ if (nn_slow (stcpmux->state == NN_STCPMUX_STATE_STOPPING)) {
+ if (nn_streamhdr_isidle (&stcpmux->streamhdr)) {
+ nn_usock_swap_owner (stcpmux->usock, &stcpmux->usock_owner);
+ stcpmux->usock = NULL;
+ stcpmux->usock_owner.src = -1;
+ stcpmux->usock_owner.fsm = NULL;
+ stcpmux->state = NN_STCPMUX_STATE_IDLE;
+ nn_fsm_stopped (&stcpmux->fsm, NN_STCPMUX_STOPPED);
+ return;
+ }
+ return;
+ }
+
+ nn_fsm_bad_state(stcpmux->state, src, type);
+}
+
+static void nn_stcpmux_handler (struct nn_fsm *self, int src, int type,
+ NN_UNUSED void *srcptr)
+{
+ int rc;
+ struct nn_stcpmux *stcpmux;
+ uint64_t size;
+
+ stcpmux = nn_cont (self, struct nn_stcpmux, fsm);
+
+ switch (stcpmux->state) {
+
+/******************************************************************************/
+/* IDLE state. */
+/******************************************************************************/
+ case NN_STCPMUX_STATE_IDLE:
+ switch (src) {
+
+ case NN_FSM_ACTION:
+ switch (type) {
+ case NN_FSM_START:
+ nn_streamhdr_start (&stcpmux->streamhdr, stcpmux->usock,
+ &stcpmux->pipebase);
+ stcpmux->state = NN_STCPMUX_STATE_PROTOHDR;
+ return;
+ default:
+ nn_fsm_bad_action (stcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (stcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* PROTOHDR state. */
+/******************************************************************************/
+ case NN_STCPMUX_STATE_PROTOHDR:
+ switch (src) {
+
+ case NN_STCPMUX_SRC_STREAMHDR:
+ switch (type) {
+ case NN_STREAMHDR_OK:
+
+ /* Before moving to the active state stop the streamhdr
+ state machine. */
+ nn_streamhdr_stop (&stcpmux->streamhdr);
+ stcpmux->state = NN_STCPMUX_STATE_STOPPING_STREAMHDR;
+ return;
+
+ case NN_STREAMHDR_ERROR:
+
+ /* Raise the error and move directly to the DONE state.
+ streamhdr object will be stopped later on. */
+ stcpmux->state = NN_STCPMUX_STATE_DONE;
+ nn_fsm_raise (&stcpmux->fsm, &stcpmux->done, NN_STCPMUX_ERROR);
+ return;
+
+ default:
+ nn_fsm_bad_action (stcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (stcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* STOPPING_STREAMHDR state. */
+/******************************************************************************/
+ case NN_STCPMUX_STATE_STOPPING_STREAMHDR:
+ switch (src) {
+
+ case NN_STCPMUX_SRC_STREAMHDR:
+ switch (type) {
+ case NN_STREAMHDR_STOPPED:
+
+ /* Start the pipe. */
+ rc = nn_pipebase_start (&stcpmux->pipebase);
+ if (nn_slow (rc < 0)) {
+ stcpmux->state = NN_STCPMUX_STATE_DONE;
+ nn_fsm_raise (&stcpmux->fsm, &stcpmux->done,
+ NN_STCPMUX_ERROR);
+ return;
+ }
+
+ /* Start receiving a message in asynchronous manner. */
+ stcpmux->instate = NN_STCPMUX_INSTATE_HDR;
+ nn_usock_recv (stcpmux->usock, &stcpmux->inhdr,
+ sizeof (stcpmux->inhdr));
+
+ /* Mark the pipe as available for sending. */
+ stcpmux->outstate = NN_STCPMUX_OUTSTATE_IDLE;
+
+ stcpmux->state = NN_STCPMUX_STATE_ACTIVE;
+ return;
+
+ default:
+ nn_fsm_bad_action (stcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (stcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* ACTIVE state. */
+/******************************************************************************/
+ case NN_STCPMUX_STATE_ACTIVE:
+ switch (src) {
+
+ case NN_STCPMUX_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_SENT:
+
+ /* The message is now fully sent. */
+ nn_assert (stcpmux->outstate == NN_STCPMUX_OUTSTATE_SENDING);
+ stcpmux->outstate = NN_STCPMUX_OUTSTATE_IDLE;
+ nn_msg_term (&stcpmux->outmsg);
+ nn_msg_init (&stcpmux->outmsg, 0);
+ nn_pipebase_sent (&stcpmux->pipebase);
+ return;
+
+ case NN_USOCK_RECEIVED:
+
+ switch (stcpmux->instate) {
+ case NN_STCPMUX_INSTATE_HDR:
+
+ /* Message header was received. Allocate memory for the
+ message. */
+ size = nn_getll (stcpmux->inhdr);
+ nn_msg_term (&stcpmux->inmsg);
+ nn_msg_init (&stcpmux->inmsg, (size_t) size);
+
+ /* Special case when size of the message body is 0. */
+ if (!size) {
+ stcpmux->instate = NN_STCPMUX_INSTATE_HASMSG;
+ nn_pipebase_received (&stcpmux->pipebase);
+ return;
+ }
+
+ /* Start receiving the message body. */
+ stcpmux->instate = NN_STCPMUX_INSTATE_BODY;
+ nn_usock_recv (stcpmux->usock,
+ nn_chunkref_data (&stcpmux->inmsg.body), (size_t) size);
+
+ return;
+
+ case NN_STCPMUX_INSTATE_BODY:
+
+ /* Message body was received. Notify the owner that it
+ can receive it. */
+ stcpmux->instate = NN_STCPMUX_INSTATE_HASMSG;
+ nn_pipebase_received (&stcpmux->pipebase);
+
+ return;
+
+ default:
+ nn_fsm_error("Unexpected socket instate",
+ stcpmux->state, src, type);
+ }
+
+ case NN_USOCK_SHUTDOWN:
+ nn_pipebase_stop (&stcpmux->pipebase);
+ stcpmux->state = NN_STCPMUX_STATE_SHUTTING_DOWN;
+ return;
+
+ case NN_USOCK_ERROR:
+ nn_pipebase_stop (&stcpmux->pipebase);
+ stcpmux->state = NN_STCPMUX_STATE_DONE;
+ nn_fsm_raise (&stcpmux->fsm, &stcpmux->done, NN_STCPMUX_ERROR);
+ return;
+
+ default:
+ nn_fsm_bad_action (stcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (stcpmux->state, src, type);
+ }
+
+/******************************************************************************/
+/* SHUTTING_DOWN state. */
+/* The underlying connection is closed. We are just waiting that underlying */
+/* usock being closed */
+/******************************************************************************/
+ case NN_STCPMUX_STATE_SHUTTING_DOWN:
+ switch (src) {
+
+ case NN_STCPMUX_SRC_USOCK:
+ switch (type) {
+ case NN_USOCK_ERROR:
+ stcpmux->state = NN_STCPMUX_STATE_DONE;
+ nn_fsm_raise (&stcpmux->fsm, &stcpmux->done, NN_STCPMUX_ERROR);
+ return;
+ default:
+ nn_fsm_bad_action (stcpmux->state, src, type);
+ }
+
+ default:
+ nn_fsm_bad_source (stcpmux->state, src, type);
+ }
+
+
+/******************************************************************************/
+/* DONE state. */
+/* The underlying connection is closed. There's nothing that can be done in */
+/* this state except stopping the object. */
+/******************************************************************************/
+ case NN_STCPMUX_STATE_DONE:
+ nn_fsm_bad_source (stcpmux->state, src, type);
+
+/******************************************************************************/
+/* Invalid state. */
+/******************************************************************************/
+ default:
+ nn_fsm_bad_state (stcpmux->state, src, type);
+ }
+}
+
diff --git a/src/transports/tcpmux/stcpmux.h b/src/transports/tcpmux/stcpmux.h
new file mode 100644
index 0000000..d339f54
--- /dev/null
+++ b/src/transports/tcpmux/stcpmux.h
@@ -0,0 +1,90 @@
+/*
+ Copyright (c) 2013-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_STCPMUX_INCLUDED
+#define NN_STCPMUX_INCLUDED
+
+#include "../../transport.h"
+
+#include "../../aio/fsm.h"
+#include "../../aio/usock.h"
+
+#include "../utils/streamhdr.h"
+
+#include "../../utils/msg.h"
+
+/* This state machine handles TCPMUX connection from the point where it is
+ established to the point when it is broken. */
+
+#define NN_STCPMUX_ERROR 1
+#define NN_STCPMUX_STOPPED 2
+
+struct nn_stcpmux {
+
+ /* The state machine. */
+ struct nn_fsm fsm;
+ int state;
+
+ /* The underlying socket. */
+ struct nn_usock *usock;
+
+ /* Child state machine to do protocol header exchange. */
+ struct nn_streamhdr streamhdr;
+
+ /* The original owner of the underlying socket. */
+ struct nn_fsm_owner usock_owner;
+
+ /* Pipe connecting this TCPMUX connection to the nanomsg core. */
+ struct nn_pipebase pipebase;
+
+ /* State of inbound state machine. */
+ int instate;
+
+ /* Buffer used to store the header of incoming message. */
+ uint8_t inhdr [8];
+
+ /* Message being received at the moment. */
+ struct nn_msg inmsg;
+
+ /* State of the outbound state machine. */
+ int outstate;
+
+ /* Buffer used to store the header of outgoing message. */
+ uint8_t outhdr [8];
+
+ /* Message being sent at the moment. */
+ struct nn_msg outmsg;
+
+ /* Event raised when the state machine ends. */
+ struct nn_fsm_event done;
+};
+
+void nn_stcpmux_init (struct nn_stcpmux *self, int src,
+ struct nn_epbase *epbase, struct nn_fsm *owner);
+void nn_stcpmux_term (struct nn_stcpmux *self);
+
+int nn_stcpmux_isidle (struct nn_stcpmux *self);
+void nn_stcpmux_start (struct nn_stcpmux *self, struct nn_usock *usock);
+void nn_stcpmux_stop (struct nn_stcpmux *self);
+
+#endif
+
diff --git a/src/transports/tcpmux/tcpmux.c b/src/transports/tcpmux/tcpmux.c
new file mode 100644
index 0000000..eba42f6
--- /dev/null
+++ b/src/transports/tcpmux/tcpmux.c
@@ -0,0 +1,159 @@
+/*
+ Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
+ Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#include "tcpmux.h"
+#include "btcpmux.h"
+#include "ctcpmux.h"
+
+#include "../../tcpmux.h"
+
+#include "../utils/port.h"
+#include "../utils/iface.h"
+
+#include "../../utils/err.h"
+#include "../../utils/alloc.h"
+#include "../../utils/fast.h"
+#include "../../utils/list.h"
+#include "../../utils/cont.h"
+
+#include <string.h>
+
+#if defined NN_HAVE_WINDOWS
+#include "../../utils/win.h"
+#else
+#include <unistd.h>
+#endif
+
+/* TCPMUX-specific socket options. */
+
+struct nn_tcpmux_optset {
+ struct nn_optset base;
+ int nodelay;
+};
+
+static void nn_tcpmux_optset_destroy (struct nn_optset *self);
+static int nn_tcpmux_optset_setopt (struct nn_optset *self, int option,
+ const void *optval, size_t optvallen);
+static int nn_tcpmux_optset_getopt (struct nn_optset *self, int option,
+ void *optval, size_t *optvallen);
+static const struct nn_optset_vfptr nn_tcpmux_optset_vfptr = {
+ nn_tcpmux_optset_destroy,
+ nn_tcpmux_optset_setopt,
+ nn_tcpmux_optset_getopt
+};
+
+/* nn_transport interface. */
+static int nn_tcpmux_bind (void *hint, struct nn_epbase **epbase);
+static int nn_tcpmux_connect (void *hint, struct nn_epbase **epbase);
+static struct nn_optset *nn_tcpmux_optset (void);
+
+static struct nn_transport nn_tcpmux_vfptr = {
+ "tcpmux",
+ NN_TCPMUX,
+ NULL,
+ NULL,
+ nn_tcpmux_bind,
+ nn_tcpmux_connect,
+ nn_tcpmux_optset,
+ NN_LIST_ITEM_INITIALIZER
+};
+
+struct nn_transport *nn_tcpmux = &nn_tcpmux_vfptr;
+
+static int nn_tcpmux_bind (void *hint, struct nn_epbase **epbase)
+{
+ return nn_btcpmux_create (hint, epbase);
+}
+
+static int nn_tcpmux_connect (void *hint, struct nn_epbase **epbase)
+{
+ return nn_ctcpmux_create (hint, epbase);
+}
+
+static struct nn_optset *nn_tcpmux_optset ()
+{
+ struct nn_tcpmux_optset *optset;
+
+ optset = nn_alloc (sizeof (struct nn_tcpmux_optset), "optset (tcpmux)");
+ alloc_assert (optset);
+ optset->base.vfptr = &nn_tcpmux_optset_vfptr;
+
+ /* Default values for TCPMUX socket options. */
+ optset->nodelay = 0;
+
+ return &optset->base;
+}
+
+static void nn_tcpmux_optset_destroy (struct nn_optset *self)
+{
+ struct nn_tcpmux_optset *optset;
+
+ optset = nn_cont (self, struct nn_tcpmux_optset, base);
+ nn_free (optset);
+}
+
+static int nn_tcpmux_optset_setopt (struct nn_optset *self, int option,
+ const void *optval, size_t optvallen)
+{
+ struct nn_tcpmux_optset *optset;
+ int val;
+
+ optset = nn_cont (self, struct nn_tcpmux_optset, base);
+
+ /* At this point we assume that all options are of type int. */
+ if (optvallen != sizeof (int))
+ return -EINVAL;
+ val = *(int*) optval;
+
+ switch (option) {
+ case NN_TCPMUX_NODELAY:
+ if (nn_slow (val != 0 && val != 1))
+ return -EINVAL;
+ optset->nodelay = val;
+ return 0;
+ default:
+ return -ENOPROTOOPT;
+ }
+}
+
+static int nn_tcpmux_optset_getopt (struct nn_optset *self, int option,
+ void *optval, size_t *optvallen)
+{
+ struct nn_tcpmux_optset *optset;
+ int intval;
+
+ optset = nn_cont (self, struct nn_tcpmux_optset, base);
+
+ switch (option) {
+ case NN_TCPMUX_NODELAY:
+ intval = optset->nodelay;
+ break;
+ default:
+ return -ENOPROTOOPT;
+ }
+ memcpy (optval, &intval,
+ *optvallen < sizeof (int) ? *optvallen : sizeof (int));
+ *optvallen = sizeof (int);
+ return 0;
+}
+
diff --git a/src/transports/tcpmux/tcpmux.h b/src/transports/tcpmux/tcpmux.h
new file mode 100644
index 0000000..dc3a687
--- /dev/null
+++ b/src/transports/tcpmux/tcpmux.h
@@ -0,0 +1,30 @@
+/*
+ Copyright (c) 2012-2014 Martin Sustrik All rights reserved.
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"),
+ to deal in the Software without restriction, including without limitation
+ the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ and/or sell copies of the Software, and to permit persons to whom
+ the Software is furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included
+ in all copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ IN THE SOFTWARE.
+*/
+
+#ifndef NN_TCPMUX_INCLUDED
+#define NN_TCPMUX_INCLUDED
+
+#include "../../transport.h"
+
+extern struct nn_transport *nn_tcpmux;
+
+#endif