summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYann Ylavic <ylavic@apache.org>2021-12-29 16:53:51 +0000
committerYann Ylavic <ylavic@apache.org>2021-12-29 16:53:51 +0000
commitc9633b67197d6881571125b2f67afbc7f90597d3 (patch)
tree6385a5146b10a01465677b924e7b196c9c865048
parent0826d52bcf6ace26c2e44d0cb0958800b942ba89 (diff)
downloadapr-c9633b67197d6881571125b2f67afbc7f90597d3.tar.gz
Merge r1895106, r1895111, r1895175, r1895181, r1895465 from trunk:
Fix drain wakeup pipe issue when multiple threads call apr_pollset_wakeup/apr_pollcb_wakeup for the same pollset filling up drain pipe. Use atomics so that wakeup call is noop if some other thread allready done this Follow up to r1895106: now we want blocking reads on unix too so revert r1894914. Follow up to r1895106: Use less expensive atomics for wakeup. If pipe writers (wakeup) put a single byte until it's consumed by the reader (drain), it's enough to use an atomic cas for the writers (still) and an atomic (re)set for the reader (no more cas here). This requires that the reader never blocks on read though (e.g. spurious return from poll), so make the read side on the pipe non-blocking again/finally. Since synchronous non-blocking read is not a thing for Windows' Readfile(), add a ->socket flag to this arch's apr_file_t (like the existing ->pipe one) which file_socket_pipe_create() will set to make apr_file_read/write() handle non-blocking (nor overlapped) socket pipes with WSARecv/Send(). Use enum instead multiple booleans Fix remaining change when apr_filetype_e was added Submitted by: mturk, ylavic, ylavic, mturk, mturk git-svn-id: https://svn.apache.org/repos/asf/apr/apr/branches/1.7.x@1896510 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--file_io/win32/pipe.c64
-rw-r--r--file_io/win32/readwrite.c63
-rw-r--r--include/arch/unix/apr_arch_poll_private.h4
-rw-r--r--include/arch/win32/apr_arch_file_io.h8
-rw-r--r--network_io/os2/sockopt.c16
-rw-r--r--poll/os2/pollset.c26
-rw-r--r--poll/unix/epoll.c4
-rw-r--r--poll/unix/kqueue.c4
-rw-r--r--poll/unix/poll.c4
-rw-r--r--poll/unix/pollcb.c11
-rw-r--r--poll/unix/pollset.c11
-rw-r--r--poll/unix/port.c4
-rw-r--r--poll/unix/select.c2
-rw-r--r--poll/unix/wakeup.c26
14 files changed, 151 insertions, 96 deletions
diff --git a/file_io/win32/pipe.c b/file_io/win32/pipe.c
index dd8ef1352..7dcbfb00e 100644
--- a/file_io/win32/pipe.c
+++ b/file_io/win32/pipe.c
@@ -43,7 +43,7 @@ APR_DECLARE(apr_status_t) apr_file_pipe_timeout_set(apr_file_t *thepipe,
thepipe->timeout = timeout;
return APR_SUCCESS;
}
- if (!thepipe->pipe) {
+ if (thepipe->ftype != APR_FILETYPE_PIPE) {
return APR_ENOTIMPL;
}
if (timeout && !(thepipe->pOverlapped)) {
@@ -108,7 +108,7 @@ APR_DECLARE(apr_status_t) apr_file_pipe_create_pools(apr_file_t **in,
(*in) = (apr_file_t *)apr_pcalloc(pool_in, sizeof(apr_file_t));
(*in)->pool = pool_in;
(*in)->fname = NULL;
- (*in)->pipe = 1;
+ (*in)->ftype = APR_FILETYPE_PIPE;
(*in)->timeout = -1;
(*in)->ungetchar = -1;
(*in)->eof_hit = 0;
@@ -123,7 +123,7 @@ APR_DECLARE(apr_status_t) apr_file_pipe_create_pools(apr_file_t **in,
(*out) = (apr_file_t *)apr_pcalloc(pool_out, sizeof(apr_file_t));
(*out)->pool = pool_out;
(*out)->fname = NULL;
- (*out)->pipe = 1;
+ (*out)->ftype = APR_FILETYPE_PIPE;
(*out)->timeout = -1;
(*out)->ungetchar = -1;
(*out)->eof_hit = 0;
@@ -250,7 +250,7 @@ APR_DECLARE(apr_status_t) apr_os_pipe_put_ex(apr_file_t **file,
{
(*file) = apr_pcalloc(pool, sizeof(apr_file_t));
(*file)->pool = pool;
- (*file)->pipe = 1;
+ (*file)->ftype = APR_FILETYPE_PIPE;
(*file)->timeout = -1;
(*file)->ungetchar = -1;
(*file)->filehand = *thefile;
@@ -364,32 +364,28 @@ static apr_status_t create_socket_pipe(SOCKET *rd, SOCKET *wr)
rv = apr_get_netos_error();
goto cleanup;
}
- /* Verify the connection by reading the send identification.
- */
- do {
- if (nc++)
- Sleep(1);
- nrd = recv(*rd, (char *)iid, sizeof(iid), 0);
- rv = nrd == SOCKET_ERROR ? apr_get_netos_error() : APR_SUCCESS;
- } while (APR_STATUS_IS_EAGAIN(rv));
-
- if (nrd == sizeof(iid)) {
- if (memcmp(uid, iid, sizeof(uid)) == 0) {
- /* Wow, we recived what we send.
- * Put read side of the pipe to the blocking
- * mode and return.
- */
- bm = 0;
- if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
- rv = apr_get_netos_error();
- goto cleanup;
- }
- break;
- }
+ /* Verify the connection by reading/waiting for the identification */
+ bm = 0;
+ if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
+ rv = apr_get_netos_error();
+ goto cleanup;
}
- else if (nrd == SOCKET_ERROR) {
+ nrd = recv(*rd, (char *)iid, sizeof(iid), 0);
+ if (nrd == SOCKET_ERROR) {
+ rv = apr_get_netos_error();
goto cleanup;
}
+ if (nrd == (int)sizeof(uid) && memcmp(iid, uid, sizeof(uid)) == 0) {
+ /* Got the right identifier, put the poll()able read side of
+ * the pipe in nonblocking mode and return.
+ */
+ bm = 1;
+ if (ioctlsocket(*rd, FIONBIO, &bm) == SOCKET_ERROR) {
+ rv = apr_get_netos_error();
+ goto cleanup;
+ }
+ break;
+ }
closesocket(*rd);
}
/* We don't need the listening socket any more */
@@ -398,6 +394,7 @@ static apr_status_t create_socket_pipe(SOCKET *rd, SOCKET *wr)
cleanup:
/* Don't leak resources */
+ closesocket(ls);
if (*rd != INVALID_SOCKET)
closesocket(*rd);
if (*wr != INVALID_SOCKET)
@@ -405,7 +402,6 @@ cleanup:
*rd = INVALID_SOCKET;
*wr = INVALID_SOCKET;
- closesocket(ls);
return rv;
}
@@ -434,21 +430,21 @@ apr_status_t apr_file_socket_pipe_create(apr_file_t **in,
(*in) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t));
(*in)->pool = p;
(*in)->fname = NULL;
- (*in)->pipe = 1;
- (*in)->timeout = -1;
+ (*in)->ftype = APR_FILETYPE_SOCKET;
+ (*in)->timeout = 0; /* read end of the pipe is non-blocking */
(*in)->ungetchar = -1;
(*in)->eof_hit = 0;
(*in)->filePtr = 0;
(*in)->bufpos = 0;
(*in)->dataRead = 0;
(*in)->direction = 0;
- (*in)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED));
+ (*in)->pOverlapped = NULL;
(*in)->filehand = (HANDLE)rd;
(*out) = (apr_file_t *)apr_pcalloc(p, sizeof(apr_file_t));
(*out)->pool = p;
(*out)->fname = NULL;
- (*out)->pipe = 1;
+ (*out)->ftype = APR_FILETYPE_SOCKET;
(*out)->timeout = -1;
(*out)->ungetchar = -1;
(*out)->eof_hit = 0;
@@ -456,7 +452,7 @@ apr_status_t apr_file_socket_pipe_create(apr_file_t **in,
(*out)->bufpos = 0;
(*out)->dataRead = 0;
(*out)->direction = 0;
- (*out)->pOverlapped = (OVERLAPPED*)apr_pcalloc(p, sizeof(OVERLAPPED));
+ (*out)->pOverlapped = NULL;
(*out)->filehand = (HANDLE)wr;
apr_pool_cleanup_register(p, (void *)(*in), socket_pipe_cleanup,
@@ -470,7 +466,7 @@ apr_status_t apr_file_socket_pipe_create(apr_file_t **in,
apr_status_t apr_file_socket_pipe_close(apr_file_t *file)
{
apr_status_t stat;
- if (!file->pipe)
+ if (file->ftype != APR_FILETYPE_SOCKET)
return apr_file_close(file);
if ((stat = socket_pipe_cleanup(file)) == APR_SUCCESS) {
apr_pool_cleanup_kill(file->pool, file, socket_pipe_cleanup);
diff --git a/file_io/win32/readwrite.c b/file_io/win32/readwrite.c
index 701bec75b..fa4a23fbe 100644
--- a/file_io/win32/readwrite.c
+++ b/file_io/win32/readwrite.c
@@ -20,10 +20,12 @@
#include "apr_strings.h"
#include "apr_lib.h"
#include "apr_errno.h"
-#include <malloc.h>
+#include "apr_arch_networkio.h"
#include "apr_arch_atime.h"
#include "apr_arch_misc.h"
+#include <malloc.h>
+
/*
* read_with_timeout()
* Uses async i/o to emulate unix non-blocking i/o with timeouts.
@@ -40,7 +42,7 @@ static apr_status_t read_with_timeout(apr_file_t *file, void *buf, apr_size_t le
/* Peek at the pipe. If there is no data available, return APR_EAGAIN.
* If data is available, go ahead and read it.
*/
- if (file->pipe) {
+ if (file->ftype == APR_FILETYPE_PIPE) {
DWORD bytes;
if (!PeekNamedPipe(file->filehand, NULL, 0, NULL, &bytes, NULL)) {
rv = apr_get_os_error();
@@ -68,13 +70,28 @@ static apr_status_t read_with_timeout(apr_file_t *file, void *buf, apr_size_t le
}
}
- if (file->pOverlapped && !file->pipe) {
+ if (file->pOverlapped && file->ftype == APR_FILETYPE_FILE) {
file->pOverlapped->Offset = (DWORD)file->filePtr;
file->pOverlapped->OffsetHigh = (DWORD)(file->filePtr >> 32);
}
- if (ReadFile(file->filehand, buf, len,
- &bytesread, file->pOverlapped)) {
+ if (file->ftype == APR_FILETYPE_SOCKET) {
+ WSABUF wsaData;
+ DWORD flags = 0;
+
+ wsaData.buf = (char*) buf;
+ wsaData.len = (u_long)len;
+ if (WSARecv((SOCKET)file->filehand, &wsaData, 1, &bytesread,
+ &flags, NULL, NULL) == SOCKET_ERROR) {
+ rv = apr_get_netos_error();
+ bytesread = 0;
+ }
+ else {
+ rv = APR_SUCCESS;
+ }
+ }
+ else if (ReadFile(file->filehand, buf, len,
+ &bytesread, file->pOverlapped)) {
rv = APR_SUCCESS;
}
else {
@@ -133,7 +150,7 @@ static apr_status_t read_with_timeout(apr_file_t *file, void *buf, apr_size_t le
if (rv == APR_SUCCESS && bytesread == 0)
rv = APR_EOF;
- if (rv == APR_SUCCESS && file->pOverlapped && !file->pipe) {
+ if (rv == APR_SUCCESS && file->pOverlapped && file->ftype == APR_FILETYPE_FILE) {
file->filePtr += bytesread;
}
*nbytes = bytesread;
@@ -246,7 +263,7 @@ APR_DECLARE(apr_status_t) apr_file_read(apr_file_t *thefile, void *buf, apr_size
APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, apr_size_t *nbytes)
{
apr_status_t rv;
- DWORD bwrote;
+ DWORD bwrote = 0;
/* If the file is open for xthread support, allocate and
* initialize the overlapped and io completion event (hEvent).
@@ -298,9 +315,29 @@ APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, a
if (thefile->flags & APR_FOPEN_XTHREAD) {
apr_thread_mutex_unlock(thefile->mutex);
}
- return rv;
- } else {
- if (!thefile->pipe) {
+ }
+ else if (thefile->ftype == APR_FILETYPE_SOCKET) {
+ WSABUF wsaData;
+ DWORD flags = 0;
+
+ wsaData.buf = (char*) buf;
+ wsaData.len = (u_long)*nbytes;
+ if (WSASend((SOCKET)file->filehand, &wsaData, 1, &bwrote,
+ flags, NULL, NULL) == SOCKET_ERROR) {
+ rv = apr_get_netos_error();
+ bwrote = 0;
+ }
+ else {
+ rv = APR_SUCCESS;
+ }
+ *nbytes = bwrote;
+ }
+ else {
+ if (thefile->ftype != APR_FILETYPE_FILE) {
+ rv = WriteFile(thefile->filehand, buf, (DWORD)*nbytes, &bwrote,
+ thefile->pOverlapped);
+ }
+ else {
apr_off_t offset = 0;
apr_status_t rc;
if (thefile->append) {
@@ -332,10 +369,6 @@ APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, a
apr_thread_mutex_unlock(thefile->mutex);
}
}
- else {
- rv = WriteFile(thefile->filehand, buf, (DWORD)*nbytes, &bwrote,
- thefile->pOverlapped);
- }
if (rv) {
*nbytes = bwrote;
rv = APR_SUCCESS;
@@ -382,7 +415,7 @@ APR_DECLARE(apr_status_t) apr_file_write(apr_file_t *thefile, const void *buf, a
}
}
}
- if (rv == APR_SUCCESS && thefile->pOverlapped && !thefile->pipe) {
+ if (rv == APR_SUCCESS && thefile->pOverlapped && thefile->ftype == APR_FILETYPE_FILE) {
thefile->filePtr += *nbytes;
}
}
diff --git a/include/arch/unix/apr_arch_poll_private.h b/include/arch/unix/apr_arch_poll_private.h
index 4c2aaa704..8dd97d1ad 100644
--- a/include/arch/unix/apr_arch_poll_private.h
+++ b/include/arch/unix/apr_arch_poll_private.h
@@ -123,6 +123,7 @@ struct apr_pollset_t
/* Pipe descriptors used for wakeup */
apr_file_t *wakeup_pipe[2];
apr_pollfd_t wakeup_pfd;
+ volatile apr_uint32_t wakeup_set;
apr_pollset_private_t *p;
const apr_pollset_provider_t *provider;
};
@@ -151,6 +152,7 @@ struct apr_pollcb_t {
/* Pipe descriptors used for wakeup */
apr_file_t *wakeup_pipe[2];
apr_pollfd_t wakeup_pfd;
+ volatile apr_uint32_t wakeup_set;
int fd;
apr_pollcb_pset pollset;
apr_pollfd_t **copyset;
@@ -182,6 +184,6 @@ struct apr_pollcb_provider_t {
apr_status_t apr_poll_create_wakeup_pipe(apr_pool_t *pool, apr_pollfd_t *pfd,
apr_file_t **wakeup_pipe);
apr_status_t apr_poll_close_wakeup_pipe(apr_file_t **wakeup_pipe);
-void apr_poll_drain_wakeup_pipe(apr_file_t **wakeup_pipe);
+void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t **wakeup_pipe);
#endif /* APR_ARCH_POLL_PRIVATE_H */
diff --git a/include/arch/win32/apr_arch_file_io.h b/include/arch/win32/apr_arch_file_io.h
index 9fb8df19a..bef536450 100644
--- a/include/arch/win32/apr_arch_file_io.h
+++ b/include/arch/win32/apr_arch_file_io.h
@@ -159,10 +159,16 @@ apr_status_t more_finfo(apr_finfo_t *finfo, const void *ufile,
/* for apr_poll.c */
#define filedes filehand
+typedef enum {
+ APR_FILETYPE_FILE = 0,
+ APR_FILETYPE_PIPE,
+ APR_FILETYPE_SOCKET
+} apr_filetype_e;
+
struct apr_file_t {
apr_pool_t *pool;
HANDLE filehand;
- BOOLEAN pipe; /* Is this a pipe of a file? */
+ apr_filetype_e ftype; /* Is this a pipe, a socket or a file? */
OVERLAPPED *pOverlapped;
apr_interval_time_t timeout;
apr_int32_t flags;
diff --git a/network_io/os2/sockopt.c b/network_io/os2/sockopt.c
index 094cd24ea..6e97b49a6 100644
--- a/network_io/os2/sockopt.c
+++ b/network_io/os2/sockopt.c
@@ -32,8 +32,22 @@
APR_DECLARE(apr_status_t) apr_socket_timeout_set(apr_socket_t *sock,
apr_interval_time_t t)
{
+ apr_status_t rv = APR_SUCCESS;
+
+ /* If our new timeout is non-negative and our old timeout was
+ * negative, then we need to ensure that we are non-blocking.
+ * Conversely, if our new timeout is negative and we had
+ * non-negative timeout, we must make sure our socket is blocking.
+ */
+ if (t == 0 && sock->timeout != 0) {
+ rv = apr_socket_opt_set(sock, APR_SO_NONBLOCK, 1);
+ }
+ else if (t != 0 && sock->timeout == 0) {
+ rv = apr_socket_opt_set(sock, APR_SO_NONBLOCK, 0);
+ }
+
sock->timeout = t;
- return APR_SUCCESS;
+ return rv;
}
diff --git a/poll/os2/pollset.c b/poll/os2/pollset.c
index 2ec848105..c407f5d98 100644
--- a/poll/os2/pollset.c
+++ b/poll/os2/pollset.c
@@ -67,7 +67,6 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
if (rc == APR_SUCCESS) {
apr_sockaddr_t *listen_address;
- apr_socket_timeout_set((*pollset)->wake_listen, 0);
apr_sockaddr_info_get(&listen_address, "", APR_UNIX, 0, 0, p);
rc = apr_socket_bind((*pollset)->wake_listen, listen_address);
@@ -80,6 +79,7 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
wake_poll_fd.client_data = NULL;
apr_pollset_add(*pollset, &wake_poll_fd);
apr_socket_addr_get(&(*pollset)->wake_address, APR_LOCAL, (*pollset)->wake_listen);
+ apr_socket_timeout_set((*pollset)->wake_listen, 0);
rc = apr_socket_create(&(*pollset)->wake_sender, APR_UNIX, SOCK_DGRAM, 0, p);
}
@@ -263,17 +263,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
if (rtnevents) {
if (i == 0 && pollset->wake_listen != NULL) {
+ char ch;
+ apr_size_t len = 1;
struct apr_sockaddr_t from_addr;
- char buffer[16];
- apr_size_t buflen;
- for (;;) {
- buflen = sizeof(buffer);
- rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
- MSG_DONTWAIT, buffer, &buflen);
- if (rv != APR_SUCCESS) {
- break;
- }
- /* Woken up, drain the pipe still. */
+ rv = apr_socket_recvfrom(&from_addr, pollset->wake_listen,
+ MSG_DONTWAIT, &ch, &len);
+ if (rv == APR_SUCCESS) {
+ /* Woken up, senders can fill the pipe again */
+ apr_atomic_set32(&pollset->wakeup_set, 0);
rc = APR_EINTR;
}
}
@@ -298,12 +295,15 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
{
- if (pollset->wake_sender) {
+ if (!pollset->wake_sender)
+ return APR_EINIT;
+
+ if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0) {
apr_size_t len = 1;
return apr_socket_sendto(pollset->wake_sender, pollset->wake_address, 0, "", &len);
}
- return APR_EINIT;
+ return APR_SUCCESS;
}
diff --git a/poll/unix/epoll.c b/poll/unix/epoll.c
index 4ab03f67c..c00a1094d 100644
--- a/poll/unix/epoll.c
+++ b/poll/unix/epoll.c
@@ -289,7 +289,7 @@ static apr_status_t impl_pollset_poll(apr_pollset_t *pollset,
if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
fdptr->desc_type == APR_POLL_FILE &&
fdptr->desc.f == pollset->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, pollset->wakeup_pipe);
rv = APR_EINTR;
}
else {
@@ -460,7 +460,7 @@ static apr_status_t impl_pollcb_poll(apr_pollcb_t *pollcb,
if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
pollfd->desc_type == APR_POLL_FILE &&
pollfd->desc.f == pollcb->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, pollcb->wakeup_pipe);
return APR_EINTR;
}
diff --git a/poll/unix/kqueue.c b/poll/unix/kqueue.c
index 548464db1..e8a1ef95b 100644
--- a/poll/unix/kqueue.c
+++ b/poll/unix/kqueue.c
@@ -286,7 +286,7 @@ static apr_status_t impl_pollset_poll(apr_pollset_t *pollset,
if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
fd->desc_type == APR_POLL_FILE &&
fd->desc.f == pollset->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, pollset->wakeup_pipe);
rv = APR_EINTR;
}
else {
@@ -473,7 +473,7 @@ static apr_status_t impl_pollcb_poll(apr_pollcb_t *pollcb,
if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
pollfd->desc_type == APR_POLL_FILE &&
pollfd->desc.f == pollcb->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, pollcb->wakeup_pipe);
return APR_EINTR;
}
diff --git a/poll/unix/poll.c b/poll/unix/poll.c
index f148f5e50..51c94c7d1 100644
--- a/poll/unix/poll.c
+++ b/poll/unix/poll.c
@@ -279,7 +279,7 @@ static apr_status_t impl_pollset_poll(apr_pollset_t *pollset,
if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
pollset->p->query_set[i].desc_type == APR_POLL_FILE &&
pollset->p->query_set[i].desc.f == pollset->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, pollset->wakeup_pipe);
rv = APR_EINTR;
}
else {
@@ -431,7 +431,7 @@ static apr_status_t impl_pollcb_poll(apr_pollcb_t *pollcb,
if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
pollfd->desc_type == APR_POLL_FILE &&
pollfd->desc.f == pollcb->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, pollcb->wakeup_pipe);
return APR_EINTR;
}
diff --git a/poll/unix/pollcb.c b/poll/unix/pollcb.c
index a63ad5c9c..103ab9fa6 100644
--- a/poll/unix/pollcb.c
+++ b/poll/unix/pollcb.c
@@ -23,6 +23,7 @@
#include "apr_poll.h"
#include "apr_time.h"
#include "apr_portable.h"
+#include "apr_atomic.h"
#include "apr_arch_file_io.h"
#include "apr_arch_networkio.h"
#include "apr_arch_poll_private.h"
@@ -134,6 +135,7 @@ APR_DECLARE(apr_status_t) apr_pollcb_create_ex(apr_pollcb_t **ret_pollcb,
pollcb->flags = flags;
pollcb->pool = p;
pollcb->provider = provider;
+ pollcb->wakeup_set = 0;
rv = (*provider->create)(pollcb, size, p, flags);
if (rv == APR_ENOTIMPL) {
@@ -212,10 +214,13 @@ APR_DECLARE(apr_status_t) apr_pollcb_poll(apr_pollcb_t *pollcb,
APR_DECLARE(apr_status_t) apr_pollcb_wakeup(apr_pollcb_t *pollcb)
{
- if (pollcb->flags & APR_POLLSET_WAKEABLE)
- return apr_file_putc(1, pollcb->wakeup_pipe[1]);
- else
+ if (!(pollcb->flags & APR_POLLSET_WAKEABLE))
return APR_EINIT;
+
+ if (apr_atomic_cas32(&pollcb->wakeup_set, 1, 0) == 0)
+ return apr_file_putc(1, pollcb->wakeup_pipe[1]);
+
+ return APR_SUCCESS;
}
APR_DECLARE(const char *) apr_pollcb_method_name(apr_pollcb_t *pollcb)
diff --git a/poll/unix/pollset.c b/poll/unix/pollset.c
index 8fa817330..57374a5e7 100644
--- a/poll/unix/pollset.c
+++ b/poll/unix/pollset.c
@@ -23,6 +23,7 @@
#include "apr_poll.h"
#include "apr_time.h"
#include "apr_portable.h"
+#include "apr_atomic.h"
#include "apr_arch_file_io.h"
#include "apr_arch_networkio.h"
#include "apr_arch_poll_private.h"
@@ -144,6 +145,7 @@ APR_DECLARE(apr_status_t) apr_pollset_create_ex(apr_pollset_t **ret_pollset,
pollset->pool = p;
pollset->flags = flags;
pollset->provider = provider;
+ pollset->wakeup_set = 0;
rv = (*provider->create)(pollset, size, p, flags);
if (rv == APR_ENOTIMPL) {
@@ -220,10 +222,13 @@ APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t * pollset)
APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
{
- if (pollset->flags & APR_POLLSET_WAKEABLE)
- return apr_file_putc(1, pollset->wakeup_pipe[1]);
- else
+ if (!(pollset->flags & APR_POLLSET_WAKEABLE))
return APR_EINIT;
+
+ if (apr_atomic_cas32(&pollset->wakeup_set, 1, 0) == 0)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+
+ return APR_SUCCESS;
}
APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
diff --git a/poll/unix/port.c b/poll/unix/port.c
index c1e599412..638a8cba7 100644
--- a/poll/unix/port.c
+++ b/poll/unix/port.c
@@ -411,7 +411,7 @@ static apr_status_t impl_pollset_poll(apr_pollset_t *pollset,
if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
ep->pfd.desc_type == APR_POLL_FILE &&
ep->pfd.desc.f == pollset->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, pollset->wakeup_pipe);
rv = APR_EINTR;
}
else {
@@ -563,7 +563,7 @@ static apr_status_t impl_pollcb_poll(apr_pollcb_t *pollcb,
if ((pollcb->flags & APR_POLLSET_WAKEABLE) &&
pollfd->desc_type == APR_POLL_FILE &&
pollfd->desc.f == pollcb->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollcb->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollcb->wakeup_set, pollcb->wakeup_pipe);
return APR_EINTR;
}
diff --git a/poll/unix/select.c b/poll/unix/select.c
index 51be3c1cd..f2c1a1328 100644
--- a/poll/unix/select.c
+++ b/poll/unix/select.c
@@ -401,7 +401,7 @@ static apr_status_t impl_pollset_poll(apr_pollset_t *pollset,
else {
if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
pollset->p->query_set[i].desc.f == pollset->wakeup_pipe[0]) {
- apr_poll_drain_wakeup_pipe(pollset->wakeup_pipe);
+ apr_poll_drain_wakeup_pipe(&pollset->wakeup_set, pollset->wakeup_pipe);
rv = APR_EINTR;
continue;
}
diff --git a/poll/unix/wakeup.c b/poll/unix/wakeup.c
index 6c3dcd6fb..b7e9efd45 100644
--- a/poll/unix/wakeup.c
+++ b/poll/unix/wakeup.c
@@ -18,6 +18,7 @@
#include "apr_poll.h"
#include "apr_time.h"
#include "apr_portable.h"
+#include "apr_atomic.h"
#include "apr_arch_file_io.h"
#include "apr_arch_networkio.h"
#include "apr_arch_poll_private.h"
@@ -33,7 +34,7 @@ apr_status_t apr_poll_create_wakeup_pipe(apr_pool_t *pool, apr_pollfd_t *pfd,
apr_status_t rv;
if ((rv = apr_file_socket_pipe_create(&wakeup_pipe[0], &wakeup_pipe[1],
- pool)) != APR_SUCCESS)
+ pool)) != APR_SUCCESS)
return rv;
pfd->reqevents = APR_POLLIN;
@@ -80,9 +81,9 @@ apr_status_t apr_poll_create_wakeup_pipe(apr_pool_t *pool, apr_pollfd_t *pfd,
{
apr_status_t rv;
+ /* Read end of the pipe is non-blocking */
if ((rv = apr_file_pipe_create_ex(&wakeup_pipe[0], &wakeup_pipe[1],
- APR_WRITE_BLOCK,
- pool)) != APR_SUCCESS)
+ APR_WRITE_BLOCK, pool)))
return rv;
pfd->p = pool;
@@ -135,18 +136,11 @@ apr_status_t apr_poll_close_wakeup_pipe(apr_file_t **wakeup_pipe)
/* Read and discard whatever is in the wakeup pipe.
*/
-void apr_poll_drain_wakeup_pipe(apr_file_t **wakeup_pipe)
+void apr_poll_drain_wakeup_pipe(volatile apr_uint32_t *wakeup_set, apr_file_t **wakeup_pipe)
{
- char rb[512];
- apr_size_t nr = sizeof(rb);
-
- while (apr_file_read(wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
- /* Although we write just one byte to the other end of the pipe
- * during wakeup, multiple threads could call the wakeup.
- * So simply drain out from the input side of the pipe all
- * the data.
- */
- if (nr != sizeof(rb))
- break;
- }
+ char ch;
+
+ (void)apr_file_getc(&ch, wakeup_pipe[0]);
+ apr_atomic_set32(wakeup_set, 0);
}
+