diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2014-11-18 11:52:07 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2014-11-20 16:28:55 +0100 |
commit | cdadc4f152f36c0a497c10633ace3bcd9d688880 (patch) | |
tree | 410e402e20604d81e41c3dbe362d6f19bcba9352 | |
parent | 64ae6d918377ba96ac4fdf1f7a968ce0cd659d82 (diff) | |
download | nanomsg-cdadc4f152f36c0a497c10633ace3bcd9d688880.tar.gz |
nn_tcpmuxd() function added
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
-rw-r--r-- | Makefile.am | 3 | ||||
-rw-r--r-- | src/devices/tcpmuxd.c | 248 | ||||
-rw-r--r-- | src/nn.h | 6 | ||||
-rw-r--r-- | tools/tcpmuxd.c | 229 |
4 files changed, 271 insertions, 215 deletions
diff --git a/Makefile.am b/Makefile.am index 25084e0..5aa2f50 100644 --- a/Makefile.am +++ b/Makefile.am @@ -59,7 +59,8 @@ NANOMSG_CORE = \ NANOMSG_DEVICES =\ src/devices/device.c \ - src/devices/device.h + src/devices/device.h \ + src/devices/tcpmuxd.c NANOMSG_AIO = \ src/aio/ctx.h \ diff --git a/src/devices/tcpmuxd.c b/src/devices/tcpmuxd.c new file mode 100644 index 0000000..eaadb84 --- /dev/null +++ b/src/devices/tcpmuxd.c @@ -0,0 +1,248 @@ +/* + Copyright (c) 2014 Martin Sustrik All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "../nn.h" + +#include "../utils/thread.h" +#include "../utils/attr.h" +#include "../utils/err.h" +#include "../utils/int.h" +#include "../utils/cont.h" +#include "../utils/wire.h" +#include "../utils/alloc.h" +#include "../utils/list.h" +#include "../utils/mutex.h" + +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <netinet/in.h> +#include <netinet/tcp.h> +#include <sys/un.h> +#include <stddef.h> +#include <ctype.h> + +struct ipc_connection { + int fd; + char *service; + struct nn_list_item item; +}; + +struct ipc_connections { + struct nn_mutex sync; + struct nn_list connections; +}; + +struct ipc_connections ipcs; + +static int send_fd (int s, int fd) +{ + int rc; + struct iovec iov; + char c = 0; + struct msghdr msg; + char control [sizeof (struct cmsghdr) + 10]; + struct cmsghdr *cmsg; + + /* Compose the message. We'll send one byte long dummy message + accompanied with the fd.*/ + iov.iov_base = &c; + iov.iov_len = 1; + memset (&msg, 0, sizeof (msg)); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = control; + msg.msg_controllen = sizeof (control); + + /* Attach the file descriptor to the message. */ + cmsg = CMSG_FIRSTHDR (&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN (sizeof (fd)); + int *data = (int*) CMSG_DATA (cmsg); + *data = fd; + + /* Adjust the size of the control to match the data. */ + msg.msg_controllen = cmsg->cmsg_len; + + /* Pass the file descriptor to the registered process. */ + rc = sendmsg (s, &msg, 0); + if (rc < 0) + return -1; + nn_assert (rc == 1); + + return 0; +} + +static void ipc_listener_routine (void *arg) +{ + int rc; + char ipcaddr [32]; + struct sockaddr_un unaddr; + int listener; + int conn; + unsigned char buf [2]; + ssize_t ssz; + uint16_t sz; + struct ipc_connection *ipcc; + int i; + + /* Start listening for AF_UNIX connections. */ + snprintf (ipcaddr, sizeof (ipcaddr), "/tmp/tcpmux-%d.ipc", *((int*) arg)); + unlink (ipcaddr); + listener = socket (AF_UNIX, SOCK_STREAM, 0); + errno_assert (listener >= 0); + nn_assert (strlen (ipcaddr) < sizeof (unaddr.sun_path)); + unaddr.sun_family = AF_UNIX; + strcpy (unaddr.sun_path, ipcaddr); + rc = bind (listener, (struct sockaddr*) &unaddr, sizeof (unaddr)); + errno_assert (rc == 0); + rc = listen (listener, 100); + errno_assert (rc == 0); + + while (1) { + + /* Accept new IPC connection. */ + conn = accept (listener, NULL, NULL); + if (conn < 0 && errno == ECONNABORTED) + continue; + errno_assert (conn >= 0); + + /* Create new connection entry. */ + ipcc = nn_alloc (sizeof (struct ipc_connection), "ipc_connection"); + nn_assert (ipcc); + ipcc->fd = conn; + nn_list_item_init (&ipcc->item); + + /* Read the connection header. */ + ssz = recv (conn, buf, 2, 0); + errno_assert (ssz >= 0); + nn_assert (ssz == 2); + sz = nn_gets (buf); + ipcc->service = nn_alloc (sz + 1, "service"); + nn_assert (ipcc->service); + ssz = recv (conn, ipcc->service, sz, 0); + errno_assert (ssz >= 0); + nn_assert (ssz == sz); + for (i = 0; i != sz; ++i) + ipcc->service [sz] = tolower (ipcc->service [sz]); + ipcc->service [sz] = 0; + + /* Add the entry to the global IPC connections list. */ + nn_mutex_lock (&ipcs.sync); + nn_list_insert (&ipcs.connections, &ipcc->item, + nn_list_end (&ipcs.connections)); + nn_mutex_unlock (&ipcs.sync); + } +} + +int nn_tcpmuxd (int port) +{ + int rc; + struct nn_thread ipc_listener; + int listener; + struct sockaddr_in addr; + int opt; + struct nn_list_item *it; + struct ipc_connection *ipcc; + int conn; + char service [256]; + int pos; + ssize_t ssz; + + /* Initialise the global structures. */ + nn_mutex_init (&ipcs.sync); + nn_list_init (&ipcs.connections); + + /* Start listening for incoming IPC connections. */ + nn_thread_init (&ipc_listener, ipc_listener_routine, &port); + + /* Start listening for incoming TCP connections. */ + listener = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + errno_assert (listener >= 0); + opt = 1; + rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, &opt, + sizeof (opt)); + errno_assert (rc == 0); + memset (&addr, 0, sizeof (addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons (port); + addr.sin_addr.s_addr = INADDR_ANY; + rc = bind (listener, (struct sockaddr*) &addr, sizeof (addr)); + errno_assert (rc == 0); + rc = listen (listener, 100); + errno_assert (rc == 0); + + while (1) { + + /* Accept new TCP connection. */ + conn = accept (listener, NULL, NULL); + if (conn < 0 && errno == ECONNABORTED) + continue; + errno_assert (conn >= 0); + + /* Read TCPMUX header. */ + pos = 0; + while (1) { + nn_assert (pos < sizeof (service)); + ssz = recv (conn, &service [pos], 1, 0); + errno_assert (ssz >= 0); + nn_assert (ssz == 1); + service [pos] = tolower (service [pos]); + if (pos > 0 && service [pos - 1] == 0x0d && service [pos] == 0x0a) + break; + ++pos; + } + service [pos - 1] = 0; + + /* Check whether specified service is listening. */ + nn_mutex_lock (&ipcs.sync); + for (it = nn_list_begin (&ipcs.connections); + it != nn_list_end (&ipcs.connections); + it = nn_list_next (&ipcs.connections, it)) { + ipcc = nn_cont (it, struct ipc_connection, item); + if (strcmp (service, ipcc->service) == 0) + break; + } + + /* If no one is listening, tear down the connection. */ + if (it == nn_list_end (&ipcs.connections)) { + nn_mutex_unlock (&ipcs.sync); + ssz = send (conn, "-Service not available.\x0d\x0a", 25, 0); + errno_assert (ssz >= 0); + nn_assert (ssz == 25); + close (conn); + continue; + } + nn_mutex_unlock (&ipcs.sync); + + /* Send TCPMUX reply. */ + ssz = send (conn, "+\x0d\x0a", 3, 0); + errno_assert (ssz >= 0); + nn_assert (ssz == 3); + + /* Pass the file descriptor to the listening process. */ + rc = send_fd (ipcc->fd, conn); + errno_assert (rc == 0); + } +} + @@ -378,6 +378,12 @@ NN_EXPORT int nn_poll (struct nn_pollfd *fds, int nfds, int timeout); NN_EXPORT int nn_device (int s1, int s2); +/******************************************************************************/ +/* Built-in support for multiplexers. */ +/******************************************************************************/ + +NN_EXPORT int nn_tcpmuxd (int port); + #ifdef __cplusplus } #endif diff --git a/tools/tcpmuxd.c b/tools/tcpmuxd.c index 7806561..568fc3a 100644 --- a/tools/tcpmuxd.c +++ b/tools/tcpmuxd.c @@ -20,232 +20,33 @@ IN THE SOFTWARE. */ -#include "../src/utils/thread.h" -#include "../src/utils/attr.h" +#include "../src/nn.h" + #include "../src/utils/err.h" -#include "../src/utils/int.h" -#include "../src/utils/cont.h" -#include "../src/utils/wire.h" -#include "../src/utils/alloc.h" -#include "../src/utils/list.h" -#include "../src/utils/mutex.h" -#include <string.h> #include <stdlib.h> -#include <unistd.h> -#include <netinet/in.h> -#include <netinet/tcp.h> -#include <sys/un.h> -#include <stddef.h> -#include <ctype.h> - -#define TCPMUX_DEFAULT_PORT 9506 - -struct ipc_connection { - int fd; - char *service; - struct nn_list_item item; -}; - -struct ipc_connections { - struct nn_mutex sync; - struct nn_list connections; -}; - -struct ipc_connections ipcs; - -static int send_fd (int s, int fd) -{ - int rc; - struct iovec iov; - char c = 0; - struct msghdr msg; - char control [sizeof (struct cmsghdr) + 10]; - struct cmsghdr *cmsg; - - /* Compose the message. We'll send one byte long dummy message - accompanied with the fd.*/ - iov.iov_base = &c; - iov.iov_len = 1; - memset (&msg, 0, sizeof (msg)); - msg.msg_iov = &iov; - msg.msg_iovlen = 1; - msg.msg_control = control; - msg.msg_controllen = sizeof (control); +#include <stdio.h> - /* Attach the file descriptor to the message. */ - cmsg = CMSG_FIRSTHDR (&msg); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = CMSG_LEN (sizeof (fd)); - int *data = (int*) CMSG_DATA (cmsg); - *data = fd; +#define TCPMUX_DEFAULT_PORT 1 - /* Adjust the size of the control to match the data. */ - msg.msg_controllen = cmsg->cmsg_len; - - /* Pass the file descriptor to the registered process. */ - rc = sendmsg (s, &msg, 0); - if (rc < 0) - return -1; - nn_assert (rc == 1); - - return 0; -} - -static void ipc_listener_routine (NN_UNUSED void *arg) +int main (int argc, const char *argv []) { int rc; - char ipcaddr [32]; - struct sockaddr_un unaddr; - int listener; - int conn; - unsigned char buf [2]; - ssize_t ssz; - uint16_t sz; - struct ipc_connection *ipcc; - int i; - - /* Start listening for AF_UNIX connections. */ - snprintf (ipcaddr, sizeof (ipcaddr), "/tmp/tcpmux-%d.ipc", - (int) TCPMUX_DEFAULT_PORT); - unlink (ipcaddr); - listener = socket (AF_UNIX, SOCK_STREAM, 0); - errno_assert (listener >= 0); - nn_assert (strlen (ipcaddr) < sizeof (unaddr.sun_path)); - unaddr.sun_family = AF_UNIX; - strcpy (unaddr.sun_path, ipcaddr); - rc = bind (listener, (struct sockaddr*) &unaddr, sizeof (unaddr)); - errno_assert (rc == 0); - rc = listen (listener, 100); - errno_assert (rc == 0); - - while (1) { - - /* Accept new IPC connection. */ - conn = accept (listener, NULL, NULL); - if (conn < 0 && errno == ECONNABORTED) - continue; - errno_assert (conn >= 0); + int port = TCPMUX_DEFAULT_PORT; - /* Create new connection entry. */ - ipcc = nn_alloc (sizeof (struct ipc_connection), "ipc_connection"); - nn_assert (ipcc); - ipcc->fd = conn; - nn_list_item_init (&ipcc->item); - - /* Read the connection header. */ - ssz = recv (conn, buf, 2, 0); - errno_assert (ssz >= 0); - nn_assert (ssz == 2); - sz = nn_gets (buf); - ipcc->service = nn_alloc (sz + 1, "service"); - nn_assert (ipcc->service); - ssz = recv (conn, ipcc->service, sz, 0); - errno_assert (ssz >= 0); - nn_assert (ssz == sz); - for (i = 0; i != sz; ++i) - ipcc->service [sz] = tolower (ipcc->service [sz]); - ipcc->service [sz] = 0; - - /* Add the entry to the global IPC connections list. */ - nn_mutex_lock (&ipcs.sync); - nn_list_insert (&ipcs.connections, &ipcc->item, - nn_list_end (&ipcs.connections)); - nn_mutex_unlock (&ipcs.sync); + if (argc < 1 || argc > 2) { + fprintf (stderr, "usage: tcpmuxd [port]\n"); + return 1; } -} - -int main (int argvc, char *argv []) -{ - int rc; - struct nn_thread ipc_listener; - int listener; - struct sockaddr_in addr; - int opt; - struct nn_list_item *it; - struct ipc_connection *ipcc; - int conn; - char service [256]; - int pos; - ssize_t ssz; - /* Initialise the global structures. */ - nn_mutex_init (&ipcs.sync); - nn_list_init (&ipcs.connections); - - // TODO: Allow for different port numbers via command line argument. - - /* Start listening for incoming IPC connections. */ - nn_thread_init (&ipc_listener, ipc_listener_routine, NULL); + if (argc == 2) { + /* TODO: Use strtol here to detect malformed port numbers. */ + port = atoi (argv [2]); + } - /* Start listening for incoming TCP connections. */ - listener = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - errno_assert (listener >= 0); - opt = 1; - rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, &opt, - sizeof (opt)); - errno_assert (rc == 0); - memset (&addr, 0, sizeof (addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons (TCPMUX_DEFAULT_PORT); - addr.sin_addr.s_addr = INADDR_ANY; - rc = bind (listener, (struct sockaddr*) &addr, sizeof (addr)); + rc = nn_tcpmuxd (port); errno_assert (rc == 0); - rc = listen (listener, 100); - errno_assert (rc == 0); - - while (1) { - - /* Accept new TCP connection. */ - conn = accept (listener, NULL, NULL); - if (conn < 0 && errno == ECONNABORTED) - continue; - errno_assert (conn >= 0); - - /* Read TCPMUX header. */ - pos = 0; - while (1) { - nn_assert (pos < sizeof (service)); - ssz = recv (conn, &service [pos], 1, 0); - errno_assert (ssz >= 0); - nn_assert (ssz == 1); - service [pos] = tolower (service [pos]); - if (pos > 0 && service [pos - 1] == 0x0d && service [pos] == 0x0a) - break; - ++pos; - } - service [pos - 1] = 0; - - /* Check whether specified service is listening. */ - nn_mutex_lock (&ipcs.sync); - for (it = nn_list_begin (&ipcs.connections); - it != nn_list_end (&ipcs.connections); - it = nn_list_next (&ipcs.connections, it)) { - ipcc = nn_cont (it, struct ipc_connection, item); - if (strcmp (service, ipcc->service) == 0) - break; - } - /* If no one is listening, tear down the connection. */ - if (it == nn_list_end (&ipcs.connections)) { - nn_mutex_unlock (&ipcs.sync); - ssz = send (conn, "-Service not available.\x0d\x0a", 25, 0); - errno_assert (ssz >= 0); - nn_assert (ssz == 25); - close (conn); - continue; - } - nn_mutex_unlock (&ipcs.sync); - - /* Send TCPMUX reply. */ - ssz = send (conn, "+\x0d\x0a", 3, 0); - errno_assert (ssz >= 0); - nn_assert (ssz == 3); - - /* Pass the file descriptor to the listening process. */ - rc = send_fd (ipcc->fd, conn); - errno_assert (rc == 0); - } + return 0; } |