diff options
author | Martin Sustrik <sustrik@250bpm.com> | 2014-11-20 04:17:09 +0100 |
---|---|---|
committer | Martin Sustrik <sustrik@250bpm.com> | 2014-11-20 16:28:55 +0100 |
commit | 2b74a5dbe8612797c7ff3a8f743c0ef21e3b57b4 (patch) | |
tree | 1e485f470471a5dd93c14adccbef5e46de410e9e /src/aio | |
parent | b50bd1ba9667eee92fece1c6b43891ea30c16398 (diff) | |
download | nanomsg-2b74a5dbe8612797c7ff3a8f743c0ef21e3b57b4.tar.gz |
Bound side of tcpmux transport implemented
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/aio')
-rw-r--r-- | src/aio/usock.h | 6 | ||||
-rw-r--r-- | src/aio/usock_posix.h | 3 | ||||
-rw-r--r-- | src/aio/usock_posix.inc | 54 | ||||
-rw-r--r-- | src/aio/usock_win.inc | 11 |
4 files changed, 66 insertions, 8 deletions
diff --git a/src/aio/usock.h b/src/aio/usock.h index fad0591..6a92e58 100644 --- a/src/aio/usock.h +++ b/src/aio/usock.h @@ -55,7 +55,9 @@ void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner); void nn_usock_term (struct nn_usock *self); int nn_usock_isidle (struct nn_usock *self); -int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol); +int nn_usock_start (struct nn_usock *self, + int domain, int type, int protocol); +void nn_usock_start_fd (struct nn_usock *self, int fd); void nn_usock_stop (struct nn_usock *self); void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner); @@ -85,7 +87,7 @@ void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr, void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov, int iovcnt); -void nn_usock_recv (struct nn_usock *self, void *buf, size_t len); +void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd); int nn_usock_geterrno (struct nn_usock *self); diff --git a/src/aio/usock_posix.h b/src/aio/usock_posix.h index 30f7a41..db288e6 100644 --- a/src/aio/usock_posix.h +++ b/src/aio/usock_posix.h @@ -57,6 +57,9 @@ struct nn_usock { position were already received by the user. The data that follow will be received in the future. */ size_t batch_pos; + + /* File descriptor received via SCM_RIGHTS, if any. */ + int *pfd; } in; /* Members related to sending data. */ diff --git a/src/aio/usock_posix.inc b/src/aio/usock_posix.inc index 0ca3a67..6af6fbf 100644 --- a/src/aio/usock_posix.inc +++ b/src/aio/usock_posix.inc @@ -56,6 +56,7 @@ #define NN_USOCK_ACTION_ACTIVATE 6 #define NN_USOCK_ACTION_DONE 7 #define NN_USOCK_ACTION_ERROR 8 +#define NN_USOCK_ACTION_STARTED 9 #define NN_USOCK_SRC_FD 1 #define NN_USOCK_SRC_TASK_CONNECTING 2 @@ -94,6 +95,7 @@ void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner) self->in.batch = NULL; self->in.batch_len = 0; self->in.batch_pos = 0; + self->in.pfd = NULL; memset (&self->out.hdr, 0, sizeof (struct msghdr)); @@ -172,6 +174,13 @@ int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol) return 0; } +void nn_usock_start_fd (struct nn_usock *self, int fd) +{ + nn_usock_init_from_fd (self, fd); + nn_fsm_start (&self->fsm); + nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED); +} + static void nn_usock_init_from_fd (struct nn_usock *self, int s) { int rc; @@ -442,7 +451,7 @@ void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov, nn_worker_execute (self->worker, &self->task_send); } -void nn_usock_recv (struct nn_usock *self, void *buf, size_t len) +void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd) { int rc; size_t nbytes; @@ -452,6 +461,7 @@ void nn_usock_recv (struct nn_usock *self, void *buf, size_t len) /* Try to receive the data immediately. */ nbytes = len; + self->in.pfd = fd; rc = nn_usock_recv_raw (self, buf, &nbytes); if (nn_slow (rc < 0)) { errnum_assert (rc == -ECONNRESET, -rc); @@ -629,6 +639,10 @@ static void nn_usock_handler (struct nn_fsm *self, int src, int type, case NN_USOCK_ACTION_BEING_ACCEPTED: usock->state = NN_USOCK_STATE_BEING_ACCEPTED; return; + case NN_USOCK_ACTION_STARTED: + nn_worker_add_fd (usock->worker, usock->s, &usock->wfd); + usock->state = NN_USOCK_STATE_ACTIVE; + return; default: nn_fsm_bad_action (usock->state, src, type); } @@ -1039,6 +1053,10 @@ static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len) size_t sz; size_t length; ssize_t nbytes; + struct iovec iov; + struct msghdr hdr; + unsigned char ctrl [256]; + struct cmsghdr *cmsg; /* If batch buffer doesn't exist, allocate it. The point of delayed deallocation to allow non-receiving sockets, such as TCP listening @@ -1064,10 +1082,20 @@ static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len) /* If recv request is greater than the batch buffer, get the data directly into the place. Otherwise, read data to the batch buffer. */ - if (length > NN_USOCK_BATCH_SIZE) - nbytes = recv (self->s, buf, length, 0); - else - nbytes = recv (self->s, self->in.batch, NN_USOCK_BATCH_SIZE, 0); + if (length > NN_USOCK_BATCH_SIZE) { + iov.iov_base = buf; + iov.iov_len = length; + } + else { + iov.iov_base = self->in.batch; + iov.iov_len = NN_USOCK_BATCH_SIZE; + } + memset (&hdr, 0, sizeof (hdr)); + hdr.msg_iov = &iov; + hdr.msg_iovlen = 1; + hdr.msg_control = ctrl; + hdr.msg_controllen = sizeof (ctrl); + nbytes = recvmsg (self->s, &hdr, 0); /* Handle any possible errors. */ if (nn_slow (nbytes <= 0)) { @@ -1088,6 +1116,22 @@ static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len) } } + /* Extract the associated file descriptor, if any. */ + cmsg = CMSG_FIRSTHDR (&hdr); + while (cmsg) { + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + if (self->in.pfd) { + *self->in.pfd = *((int*) CMSG_DATA (cmsg)); + self->in.pfd = NULL; + } + else { + nn_closefd (*((int*) CMSG_DATA (cmsg))); + } + break; + } + cmsg = CMSG_NXTHDR (&hdr, cmsg); + } + /* If the data were received directly into the place we can return straight away. */ if (length > NN_USOCK_BATCH_SIZE) { diff --git a/src/aio/usock_win.inc b/src/aio/usock_win.inc index 4704f6d..5eedfb6 100644 --- a/src/aio/usock_win.inc +++ b/src/aio/usock_win.inc @@ -167,6 +167,11 @@ int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol) return 0; } +void nn_usock_start_fd (struct nn_usock *self, int fd) +{ + nn_assert (0); +} + void nn_usock_stop (struct nn_usock *self) { nn_fsm_stop (&self->fsm); @@ -428,7 +433,7 @@ void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov, nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR); } -void nn_usock_recv (struct nn_usock *self, void *buf, size_t len) +void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd) { int rc; BOOL brc; @@ -436,6 +441,10 @@ void nn_usock_recv (struct nn_usock *self, void *buf, size_t len) DWORD wflags; DWORD error; + /* Passing file descriptors is not implemented on Windows platform. */ + if (fd) + *fd = -1; + /* Make sure that the socket is actually alive. */ nn_assert_state (self, NN_USOCK_STATE_ACTIVE); |