summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGarrett D'Amore <garrett@damore.org>2018-06-07 14:38:20 -0700
committerGarrett D'Amore <garrett@damore.org>2018-06-07 16:25:04 -0700
commitd4151607fa0ac79e6298db446278bdadc74c3d2c (patch)
tree48a1e5ff6c61c41e6f0d008d00291cf252b45702
parenteac025a27122a1ad8681fd85e7b1daf5dad13bf6 (diff)
downloadnanomsg-d4151607fa0ac79e6298db446278bdadc74c3d2c.tar.gz
fixes #978 nanomsg IPC on Windows is very fragile
This is critical for hardening use cases with other IPC clients such as nng or mangos. The old code made some very incorrect assumptions about the atomicity of named pipes and ReadFile. We've changed the code so that if ReadFile (or WSARecv incidentally) ever returns a partial read, we keep going. This solves a critial assertion error, and greatly improves the crash resistance of nanomsg when using IPC on Windows.
-rw-r--r--src/aio/usock_win.inc111
-rw-r--r--src/aio/worker_win.h15
-rw-r--r--src/aio/worker_win.inc42
3 files changed, 114 insertions, 54 deletions
diff --git a/src/aio/usock_win.inc b/src/aio/usock_win.inc
index ccbd607..5087f47 100644
--- a/src/aio/usock_win.inc
+++ b/src/aio/usock_win.inc
@@ -1,6 +1,8 @@
/*
Copyright (c) 2013 Martin Sustrik All rights reserved.
Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+ Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+ Copyright 2018 Capitar IT Group BV <info@capitar.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
@@ -311,7 +313,7 @@ void nn_usock_accept (struct nn_usock *self, struct nn_usock *listener)
listener->asock = self;
/* Asynchronous accept. */
- nn_worker_op_start (&listener->in, 0);
+ nn_worker_op_start (&listener->in);
}
void nn_usock_activate (struct nn_usock *self)
@@ -371,7 +373,7 @@ void nn_usock_connect (struct nn_usock *self, const struct sockaddr *addr,
}
/* Asynchronous connect. */
- nn_worker_op_start (&self->out, 0);
+ nn_worker_op_start (&self->out);
}
void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
@@ -418,7 +420,7 @@ void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
}
brc = WriteFile (self->p, self->pipesendbuf, (DWORD) len, NULL, &self->out.olpd);
if (nn_fast (brc || GetLastError() == ERROR_IO_PENDING)) {
- nn_worker_op_start (&self->out, 0);
+ nn_worker_op_start (&self->out);
return;
}
error = GetLastError();
@@ -430,12 +432,12 @@ void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
rc = WSASend (self->s, wbuf, iovcnt, NULL, 0, &self->out.olpd, NULL);
if (nn_fast (rc == 0)) {
- nn_worker_op_start (&self->out, 0);
+ nn_worker_op_start (&self->out);
return;
}
error = WSAGetLastError();
if (nn_fast (error == WSA_IO_PENDING)) {
- nn_worker_op_start (&self->out, 0);
+ nn_worker_op_start (&self->out);
return;
}
wsa_assert (error == WSAECONNABORTED || error == WSAECONNRESET ||
@@ -445,59 +447,86 @@ void nn_usock_send (struct nn_usock *self, const struct nn_iovec *iov,
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
}
-void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
+void nn_usock_recv_start_wsock (void *arg)
{
- int rc;
- BOOL brc;
+ struct nn_usock *self = arg;
WSABUF wbuf;
- DWORD wflags;
+ DWORD flags;
DWORD error;
- /* Passing file descriptors is not implemented on Windows platform. */
- if (fd)
- *fd = -1;
-
- /* Make sure that the socket is actually alive. */
- nn_assert_state (self, NN_USOCK_STATE_ACTIVE);
-
/* Start the receive operation. */
- wbuf.len = (ULONG) len;
- wbuf.buf = (char FAR*) buf;
- wflags = MSG_WAITALL;
+ wbuf.len = (ULONG) self->in.resid;
+ wbuf.buf = (char FAR*) self->in.buf;
+ flags = MSG_WAITALL;
memset (&self->in.olpd, 0, sizeof (self->in.olpd));
- if (self->domain == AF_UNIX) {
-
- /* Ensure the total buffer size does not exceed size limitation
- of WriteFile. */
- nn_assert (len <= MAXDWORD);
- brc = ReadFile (self->p, buf, (DWORD) len, NULL, &self->in.olpd);
- error = brc ? ERROR_SUCCESS : GetLastError ();
- }
- else {
- rc = WSARecv (self->s, &wbuf, 1, NULL, &wflags, &self->in.olpd, NULL);
- error = (rc == 0) ? ERROR_SUCCESS : WSAGetLastError ();
+ if (WSARecv (self->s, &wbuf, 1, NULL, &flags, &self->in.olpd, NULL) == 0) {
+ error = ERROR_SUCCESS;
+ } else {
+ error = WSAGetLastError ();
}
- if (nn_fast (error == ERROR_SUCCESS)) {
- nn_worker_op_start (&self->in, 1);
+ switch (error) {
+ case ERROR_SUCCESS:
+ case WSA_IO_PENDING:
+ nn_worker_op_start (&self->in);
return;
- }
- if (nn_fast (error == WSA_IO_PENDING)) {
- nn_worker_op_start (&self->in, 1);
+ default:
+ nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
return;
}
+}
+
+void nn_usock_recv_start_pipe (void *arg)
+{
+ struct nn_usock *self = arg;
+ void *buf = self->in.buf;
+ DWORD len = (DWORD) self->in.resid;
+ DWORD error;
+
+ /* Start the receive operation. */
+ memset (&self->in.olpd, 0, sizeof (self->in.olpd));
+
+ if (ReadFile (self->p, buf, len, NULL, &self->in.olpd)) {
+ error = ERROR_SUCCESS;
+ } else {
+ error = GetLastError ();
+ }
- if (error == WSAECONNABORTED || error == WSAECONNRESET ||
- error == WSAENETDOWN || error == WSAENETRESET ||
- error == WSAETIMEDOUT || error == WSAEWOULDBLOCK ||
- error == ERROR_PIPE_NOT_CONNECTED || error == ERROR_BROKEN_PIPE) {
+ switch (error) {
+ case ERROR_SUCCESS:
+ case ERROR_IO_PENDING:
+ nn_worker_op_start (&self->in);
+ return;
+
+ default:
nn_fsm_action (&self->fsm, NN_USOCK_ACTION_ERROR);
return;
}
+}
+
+void nn_usock_recv (struct nn_usock *self, void *buf, size_t len, int *fd)
+{
+ /* Passing file descriptors is not implemented on Windows platform. */
+ if (fd)
+ *fd = -1;
+
+ /* Make sure that the socket is actually alive. */
+ nn_assert_state (self, NN_USOCK_STATE_ACTIVE);
+
+ self->in.resid = len;
+ self->in.buf = buf;
+ self->in.arg = self;
+ self->in.zero_is_error = 1;
+ if (self->domain == AF_UNIX) {
+ self->in.start = nn_usock_recv_start_pipe;
+ }
+ else {
+ self->in.start = nn_usock_recv_start_wsock;
+ }
- wsa_assert (0);
+ self->in.start (self->in.arg);
}
static void nn_usock_create_io_completion (struct nn_usock *self)
@@ -628,7 +657,7 @@ void nn_usock_accept_pipe (struct nn_usock *self, struct nn_usock *listener)
listener->asock = self;
/* Asynchronous accept. */
- nn_worker_op_start (&listener->in, 0);
+ nn_worker_op_start (&listener->in);
}
static void nn_usock_close (struct nn_usock *self)
diff --git a/src/aio/worker_win.h b/src/aio/worker_win.h
index 1b1dac0..200f4dd 100644
--- a/src/aio/worker_win.h
+++ b/src/aio/worker_win.h
@@ -1,5 +1,7 @@
/*
Copyright (c) 2013 Martin Sustrik All rights reserved.
+ Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+ Copyright 2018 Capitar IT Group BV <info@capitar.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
@@ -42,16 +44,21 @@ struct nn_worker_op {
/* This structure is to be used by the user, not nn_worker_op itself.
Actual usage is specific to the asynchronous operation in question. */
OVERLAPPED olpd;
+
+ /* We might have transferred less than requested. This keeps track. */
+ size_t resid;
+ char *buf;
+ void *arg;
+ void (*start)(struct nn_usock *);
+ int zero_is_error;
};
void nn_worker_op_init (struct nn_worker_op *self, int src,
struct nn_fsm *owner);
void nn_worker_op_term (struct nn_worker_op *self);
-/* Call this function when asynchronous operation is started.
- If 'zeroiserror' is set to 1, zero bytes transferred will be treated
- as an error. */
-void nn_worker_op_start (struct nn_worker_op *self, int zeroiserror);
+/* Call this function when asynchronous operation is started. */
+void nn_worker_op_start (struct nn_worker_op *self);
int nn_worker_op_isidle (struct nn_worker_op *self);
diff --git a/src/aio/worker_win.inc b/src/aio/worker_win.inc
index d62028a..4e4b95e 100644
--- a/src/aio/worker_win.inc
+++ b/src/aio/worker_win.inc
@@ -1,6 +1,8 @@
/*
Copyright (c) 2013 Martin Sustrik All rights reserved.
Copyright (c) 2013 GoPivotal, Inc. All rights reserved.
+ Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
+ Copyright 2018 Capitar IT Group BV <info@capitar.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"),
@@ -32,7 +34,6 @@
#define NN_WORKER_OP_STATE_IDLE 1
#define NN_WORKER_OP_STATE_ACTIVE 2
-#define NN_WORKER_OP_STATE_ACTIVE_ZEROISERROR 3
/* The value of this variable is irrelevant. It's used only as a placeholder
for the address that is used as the 'stop' event ID. */
@@ -58,6 +59,10 @@ void nn_worker_op_init (struct nn_worker_op *self, int src,
self->src = src;
self->owner = owner;
self->state = NN_WORKER_OP_STATE_IDLE;
+ self->start = NULL;
+ self->buf = NULL;
+ self->resid = 0;
+ self->zero_is_error = 0;
}
void nn_worker_op_term (struct nn_worker_op *self)
@@ -65,11 +70,9 @@ void nn_worker_op_term (struct nn_worker_op *self)
nn_assert_state (self, NN_WORKER_OP_STATE_IDLE);
}
-void nn_worker_op_start (struct nn_worker_op *self, int zeroiserror)
+void nn_worker_op_start (struct nn_worker_op *self)
{
- nn_assert_state (self, NN_WORKER_OP_STATE_IDLE);
- self->state = zeroiserror ? NN_WORKER_OP_STATE_ACTIVE_ZEROISERROR :
- NN_WORKER_OP_STATE_ACTIVE;
+ self->state = NN_WORKER_OP_STATE_ACTIVE;
}
int nn_worker_op_isidle (struct nn_worker_op *self)
@@ -174,6 +177,7 @@ static void nn_worker_routine (void *arg)
/* Process I/O completion events. */
if (nn_fast (entries [i].lpOverlapped != NULL)) {
+ DWORD nxfer;
op = nn_cont (entries [i].lpOverlapped,
struct nn_worker_op, olpd);
@@ -183,9 +187,31 @@ static void nn_worker_routine (void *arg)
rc = entries [i].Internal & 0xc0000000;
switch (rc) {
case 0x00000000:
+ nxfer = entries[i].dwNumberOfBytesTransferred;
+
+ if ((nxfer == 0) && (op->zero_is_error != 0)) {
+ rc = NN_WORKER_OP_ERROR;
+ break;
+ }
+ if (op->start != NULL) {
+ if (nxfer > op->resid) {
+ rc = NN_WORKER_OP_ERROR;
+ break;
+ }
+ op->resid -= nxfer;
+ op->buf += nxfer;
+
+ /* If we still have more to transfer, keep going. */
+ if (op->resid != 0) {
+ op->start (op->arg);
+ continue;
+ }
+ }
rc = NN_WORKER_OP_DONE;
break;
+
case 0xc0000000:
+ nxfer = 0;
rc = NN_WORKER_OP_ERROR;
break;
default:
@@ -195,11 +221,9 @@ static void nn_worker_routine (void *arg)
/* Raise the completion event. */
nn_ctx_enter (op->owner->ctx);
nn_assert (op->state != NN_WORKER_OP_STATE_IDLE);
- if (rc != NN_WORKER_OP_ERROR &&
- op->state == NN_WORKER_OP_STATE_ACTIVE_ZEROISERROR &&
- entries [i].dwNumberOfBytesTransferred == 0)
- rc = NN_WORKER_OP_ERROR;
+
op->state = NN_WORKER_OP_STATE_IDLE;
+
nn_fsm_feed (op->owner, op->src, rc, op);
nn_ctx_leave (op->owner->ctx);