summaryrefslogtreecommitdiff
path: root/src/aio
diff options
context:
space:
mode:
authorMartin Sustrik <sustrik@250bpm.com>2014-11-20 04:17:09 +0100
committerMartin Sustrik <sustrik@250bpm.com>2014-11-20 16:28:55 +0100
commit2b74a5dbe8612797c7ff3a8f743c0ef21e3b57b4 (patch)
tree1e485f470471a5dd93c14adccbef5e46de410e9e /src/aio
parentb50bd1ba9667eee92fece1c6b43891ea30c16398 (diff)
downloadnanomsg-2b74a5dbe8612797c7ff3a8f743c0ef21e3b57b4.tar.gz
Bound side of tcpmux transport implemented
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
Diffstat (limited to 'src/aio')
-rw-r--r--src/aio/usock.h6
-rw-r--r--src/aio/usock_posix.h3
-rw-r--r--src/aio/usock_posix.inc54
-rw-r--r--src/aio/usock_win.inc11
4 files changed, 66 insertions, 8 deletions
diff --git a/src/aio/usock.h b/src/aio/usock.h
index fad0591..6a92e58 100644
--- a/src/aio/usock.h
+++ b/src/aio/usock.h
@@ -55,7 +55,9 @@ void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner);
void nn_usock_term (struct nn_usock *self);
int nn_usock_isidle (struct nn_usock *self);
-int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol);
+int nn_usock_start (struct nn_usock *self,
+ int domain, int type, int protocol);
+void nn_usock_start_fd (struct nn_usock *self, int fd);
void nn_usock_stop (struct nn_usock *self);
void nn_usock_swap_owner (struct nn_usock *self, struct nn_fsm_owner *owner);
@@ -85,7 +87,7 @@ void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
int iovcnt);
-void nn_usock_recv (struct nn_usock *self, void *buf, size_t len);
+void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd);
int nn_usock_geterrno (struct nn_usock *self);
diff --git a/src/aio/usock_posix.h b/src/aio/usock_posix.h
index 30f7a41..db288e6 100644
--- a/src/aio/usock_posix.h
+++ b/src/aio/usock_posix.h
@@ -57,6 +57,9 @@ struct nn_usock {
position were already received by the user. The data that follow
will be received in the future. */
size_t batch_pos;
+
+ /* File descriptor received via SCM_RIGHTS, if any. */
+ int *pfd;
} in;
/* Members related to sending data. */
diff --git a/src/aio/usock_posix.inc b/src/aio/usock_posix.inc
index 0ca3a67..6af6fbf 100644
--- a/src/aio/usock_posix.inc
+++ b/src/aio/usock_posix.inc
@@ -56,6 +56,7 @@
#define NN_USOCK_ACTION_ACTIVATE 6
#define NN_USOCK_ACTION_DONE 7
#define NN_USOCK_ACTION_ERROR 8
+#define NN_USOCK_ACTION_STARTED 9
#define NN_USOCK_SRC_FD 1
#define NN_USOCK_SRC_TASK_CONNECTING 2
@@ -94,6 +95,7 @@ void nn_usock_init (struct nn_usock *self, int src, struct nn_fsm *owner)
self->in.batch = NULL;
self->in.batch_len = 0;
self->in.batch_pos = 0;
+ self->in.pfd = NULL;
memset (&self->out.hdr, 0, sizeof (struct msghdr));
@@ -172,6 +174,13 @@ int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol)
return 0;
}
+void nn_usock_start_fd (struct nn_usock *self, int fd)
+{
+ nn_usock_init_from_fd (self, fd);
+ nn_fsm_start (&self->fsm);
+ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_STARTED);
+}
+
static void nn_usock_init_from_fd (struct nn_usock *self, int s)
{
int rc;
@@ -442,7 +451,7 @@ void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
nn_worker_execute (self->worker, &self->task_send);
}
-void nn_usock_recv (struct nn_usock *self, void *buf, size_t len)
+void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
{
int rc;
size_t nbytes;
@@ -452,6 +461,7 @@ void nn_usock_recv (struct nn_usock *self, void *buf, size_t len)
/* Try to receive the data immediately. */
nbytes = len;
+ self->in.pfd = fd;
rc = nn_usock_recv_raw (self, buf, &nbytes);
if (nn_slow (rc < 0)) {
errnum_assert (rc == -ECONNRESET, -rc);
@@ -629,6 +639,10 @@ static void nn_usock_handler (struct nn_fsm *self, int src, int type,
case NN_USOCK_ACTION_BEING_ACCEPTED:
usock->state = NN_USOCK_STATE_BEING_ACCEPTED;
return;
+ case NN_USOCK_ACTION_STARTED:
+ nn_worker_add_fd (usock->worker, usock->s, &usock->wfd);
+ usock->state = NN_USOCK_STATE_ACTIVE;
+ return;
default:
nn_fsm_bad_action (usock->state, src, type);
}
@@ -1039,6 +1053,10 @@ static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len)
size_t sz;
size_t length;
ssize_t nbytes;
+ struct iovec iov;
+ struct msghdr hdr;
+ unsigned char ctrl [256];
+ struct cmsghdr *cmsg;
/* If batch buffer doesn't exist, allocate it. The point of delayed
deallocation to allow non-receiving sockets, such as TCP listening
@@ -1064,10 +1082,20 @@ static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len)
/* If recv request is greater than the batch buffer, get the data directly
into the place. Otherwise, read data to the batch buffer. */
- if (length > NN_USOCK_BATCH_SIZE)
- nbytes = recv (self->s, buf, length, 0);
- else
- nbytes = recv (self->s, self->in.batch, NN_USOCK_BATCH_SIZE, 0);
+ if (length > NN_USOCK_BATCH_SIZE) {
+ iov.iov_base = buf;
+ iov.iov_len = length;
+ }
+ else {
+ iov.iov_base = self->in.batch;
+ iov.iov_len = NN_USOCK_BATCH_SIZE;
+ }
+ memset (&hdr, 0, sizeof (hdr));
+ hdr.msg_iov = &iov;
+ hdr.msg_iovlen = 1;
+ hdr.msg_control = ctrl;
+ hdr.msg_controllen = sizeof (ctrl);
+ nbytes = recvmsg (self->s, &hdr, 0);
/* Handle any possible errors. */
if (nn_slow (nbytes <= 0)) {
@@ -1088,6 +1116,22 @@ static int nn_usock_recv_raw (struct nn_usock *self, void *buf, size_t *len)
}
}
+ /* Extract the associated file descriptor, if any. */
+ cmsg = CMSG_FIRSTHDR (&hdr);
+ while (cmsg) {
+ if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
+ if (self->in.pfd) {
+ *self->in.pfd = *((int*) CMSG_DATA (cmsg));
+ self->in.pfd = NULL;
+ }
+ else {
+ nn_closefd (*((int*) CMSG_DATA (cmsg)));
+ }
+ break;
+ }
+ cmsg = CMSG_NXTHDR (&hdr, cmsg);
+ }
+
/* If the data were received directly into the place we can return
straight away. */
if (length > NN_USOCK_BATCH_SIZE) {
diff --git a/src/aio/usock_win.inc b/src/aio/usock_win.inc
index 4704f6d..5eedfb6 100644
--- a/src/aio/usock_win.inc
+++ b/src/aio/usock_win.inc
@@ -167,6 +167,11 @@ int nn_usock_start (struct nn_usock *self, int domain, int type, int protocol)
return 0;
}
+void nn_usock_start_fd (struct nn_usock *self, int fd)
+{
+ nn_assert (0);
+}
+
void nn_usock_stop (struct nn_usock *self)
{
nn_fsm_stop (&self->fsm);
@@ -428,7 +433,7 @@ void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
}
-void nn_usock_recv (struct nn_usock *self, void *buf, size_t len)
+void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
{
int rc;
BOOL brc;
@@ -436,6 +441,10 @@ void nn_usock_recv (struct nn_usock *self, void *buf, size_t len)
DWORD wflags;
DWORD error;
+ /* Passing file descriptors is not implemented on Windows platform. */
+ if (fd)
+ *fd = -1;
+
/* Make sure that the socket is actually alive. */
nn_assert_state (self, NN_USOCK_STATE_ACTIVE);