summaryrefslogtreecommitdiff
path: root/poll
diff options
context:
space:
mode:
authorMladen Turk <mturk@apache.org>2008-04-13 08:31:03 +0000
committerMladen Turk <mturk@apache.org>2008-04-13 08:31:03 +0000
commitfc18a5fd06cc94fdf12e6136d67480e500c166fa (patch)
tree4002c9a170be10f9dd02301511de408a02d26f27 /poll
parent8744772ba11042a1d24ac5d1899062d2c209bdbf (diff)
downloadapr-fc18a5fd06cc94fdf12e6136d67480e500c166fa.tar.gz
Introduce apr_pollset_wakeup()
git-svn-id: https://svn.apache.org/repos/asf/apr/apr/trunk@647540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'poll')
-rw-r--r--poll/unix/epoll.c154
-rw-r--r--poll/unix/kqueue.c15
-rw-r--r--poll/unix/poll.c151
-rw-r--r--poll/unix/port.c16
-rw-r--r--poll/unix/select.c18
5 files changed, 325 insertions, 29 deletions
diff --git a/poll/unix/epoll.c b/poll/unix/epoll.c
index 7a3831bee..5028e94d7 100644
--- a/poll/unix/epoll.c
+++ b/poll/unix/epoll.c
@@ -68,6 +68,8 @@ struct apr_pollset_t
#if APR_HAS_THREADS
/* A thread mutex to protect operations on the rings */
apr_thread_mutex_t *ring_lock;
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
#endif
/* A ring containing all of the pollfd_t that are active */
APR_RING_HEAD(pfd_query_ring_t, pfd_elem_t) query_ring;
@@ -80,11 +82,61 @@ struct apr_pollset_t
static apr_status_t backend_cleanup(void *p_)
{
+ apr_status_t rv = APR_SUCCESS;
apr_pollset_t *pollset = (apr_pollset_t *) p_;
+
close(pollset->epoll_fd);
- return APR_SUCCESS;
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ rv |= apr_file_close(pollset->wakeup_pipe[0]);
+ rv |= apr_file_close(pollset->wakeup_pipe[1]);
+ }
+#endif
+ return rv;
}
+#if APR_HAS_THREADS
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
+#endif
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -93,6 +145,13 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_status_t rv;
int fd;
+#if APR_HAS_THREADS
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Add room for wakeup descriptor */
+ size++;
+ }
+#endif
+
fd = epoll_create(size);
if (fd < 0) {
*pollset = NULL;
@@ -110,7 +169,8 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
return rv;
}
#else
- if (flags & APR_POLLSET_THREADSAFE) {
+ if (flags & APR_POLLSET_THREADSAFE ||
+ flags & APR_POLLSET_WAKEABLE) {
*pollset = NULL;
return APR_ENOTIMPL;
}
@@ -121,7 +181,6 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
(*pollset)->pool = p;
(*pollset)->epoll_fd = fd;
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event));
- apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
if (!(flags & APR_POLLSET_NOCOPY)) {
@@ -129,6 +188,18 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
}
+#if APR_HAS_THREADS
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ close(fd);
+ *pollset = NULL;
+ return rv;
+ }
+ }
+#endif
+ apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
+
return APR_SUCCESS;
}
@@ -244,8 +315,9 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int ret, i;
+ int ret, i, j;
apr_status_t rv = APR_SUCCESS;
+ apr_pollfd_t fd;
if (timeout > 0) {
timeout /= 1000;
@@ -263,23 +335,59 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
}
else {
if (pollset->flags & APR_POLLSET_NOCOPY) {
- for (i = 0; i < ret; i++) {
- pollset->result_set[i] =
- *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
- pollset->result_set[i].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
+ for (i = 0, j = 0; i < ret; i++) {
+ fd = *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
+#if APR_HAS_THREADS
+ /* Check if the polled descriptor is our
+ * wakeup pipe. In that case do not put it result set.
+ */
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else
+#endif
+ {
+ pollset->result_set[j] = fd;
+ pollset->result_set[j].rtnevents =
+ get_epoll_revent(pollset->pollset[i].events);
+
+ j++;
+ }
}
+ (*num) = j;
}
else {
- for (i = 0; i < ret; i++) {
- pollset->result_set[i] =
- (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
- pollset->result_set[i].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
+ for (i = 0, j = 0; i < ret; i++) {
+ fd = (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
+#if APR_HAS_THREADS
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else
+#endif
+ {
+ pollset->result_set[j] = fd;
+ pollset->result_set[j].rtnevents =
+ get_epoll_revent(pollset->pollset[i].events);
+ j++;
+ }
}
+ (*num) = j;
}
- if (descriptors) {
+ if (descriptors && (*num)) {
*descriptors = pollset->result_set;
}
}
@@ -296,6 +404,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
+}
+
struct apr_pollcb_t {
apr_pool_t *pool;
apr_uint32_t nalloc;
diff --git a/poll/unix/kqueue.c b/poll/unix/kqueue.c
index 501953dc4..6547bf620 100644
--- a/poll/unix/kqueue.c
+++ b/poll/unix/kqueue.c
@@ -281,6 +281,21 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return APR_ENOTIMPL;
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
+}
struct apr_pollcb_t {
apr_pool_t *pool;
diff --git a/poll/unix/poll.c b/poll/unix/poll.c
index cca8bfe8a..6f43c41ba 100644
--- a/poll/unix/poll.c
+++ b/poll/unix/poll.c
@@ -156,11 +156,70 @@ struct apr_pollset_t
apr_pool_t *pool;
apr_uint32_t nelts;
apr_uint32_t nalloc;
+ apr_uint32_t flags;
+#if APR_HAS_THREADS
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
+#endif
struct pollfd *pollset;
apr_pollfd_t *query_set;
apr_pollfd_t *result_set;
};
+#if APR_HAS_THREADS
+
+static apr_status_t wakeup_pipe_cleanup(void *p)
+{
+ apr_status_t rv = APR_SUCCESS;
+ apr_pollset_t *pollset = (apr_pollset_t *)p;
+
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ rv |= apr_file_close(pollset->wakeup_pipe[0]);
+ rv |= apr_file_close(pollset->wakeup_pipe[1]);
+ }
+ return rv;
+}
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
+#endif
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -170,19 +229,46 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
*pollset = NULL;
return APR_ENOTIMPL;
}
+ if (flags & APR_POLLSET_WAKEABLE) {
+#if APR_HAS_THREADS
+ /* Add room for wakeup descriptor */
+ size++;
+#else
+ *pollset = NULL;
+ return APR_ENOTIMPL;
+#endif
+ }
*pollset = apr_palloc(p, sizeof(**pollset));
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->pool = p;
+ (*pollset)->flags = flags;
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd));
(*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+#if APR_HAS_THREADS
+ if (flags & APR_POLLSET_WAKEABLE) {
+ apr_status_t rv;
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ *pollset = NULL;
+ return rv;
+ }
+ apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup,
+ wakeup_pipe_cleanup);
+ }
+#endif
+
return APR_SUCCESS;
}
APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset)
{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_pool_cleanup_run(pollset->pool, pollset, backend_cleanup);
+#endif
return APR_SUCCESS;
}
@@ -242,32 +328,69 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int rv;
+ int ret;
+ apr_status_t rv = APR_SUCCESS;
apr_uint32_t i, j;
if (timeout > 0) {
timeout /= 1000;
}
- rv = poll(pollset->pollset, pollset->nelts, timeout);
- (*num) = rv;
- if (rv < 0) {
+ ret = poll(pollset->pollset, pollset->nelts, timeout);
+ (*num) = ret;
+ if (ret < 0) {
return apr_get_netos_error();
}
- if (rv == 0) {
+ else if (res == 0) {
return APR_TIMEUP;
}
- j = 0;
- for (i = 0; i < pollset->nelts; i++) {
- if (pollset->pollset[i].revents != 0) {
- pollset->result_set[j] = pollset->query_set[i];
- pollset->result_set[j].rtnevents =
- get_revent(pollset->pollset[i].revents);
- j++;
+ else {
+ for (i = 0, j = 0; i < pollset->nelts; i++) {
+ if (pollset->pollset[i].revents != 0) {
+#if APR_HAS_THREADS
+ /* Check if the polled descriptor is our
+ * wakeup pipe. In that case do not put it result set.
+ */
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ pollset->pollset[i].desc_type == APR_POLL_FILE &&
+ pollset->pollset[i].desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else
+#endif
+ {
+ pollset->result_set[j] = pollset->query_set[i];
+ pollset->result_set[j].rtnevents =
+ get_revent(pollset->pollset[i].revents);
+ j++;
+ }
+ }
}
+ (*num) = j;
}
- if (descriptors)
+ if (descriptors && (*num))
*descriptors = pollset->result_set;
- return APR_SUCCESS;
+ return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ }
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
}
APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,
diff --git a/poll/unix/port.c b/poll/unix/port.c
index 05848d618..2842e2781 100644
--- a/poll/unix/port.c
+++ b/poll/unix/port.c
@@ -335,6 +335,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return APR_ENOTIMPL;
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
+}
+
struct apr_pollcb_t {
apr_pool_t *pool;
apr_uint32_t nalloc;
diff --git a/poll/unix/select.c b/poll/unix/select.c
index 42e7a3f68..64b0f050a 100644
--- a/poll/unix/select.c
+++ b/poll/unix/select.c
@@ -175,6 +175,7 @@ struct apr_pollset_t
apr_uint32_t nelts;
apr_uint32_t nalloc;
+ apr_uint32_t flags;
fd_set readset, writeset, exceptset;
int maxfd;
apr_pollfd_t *query_set;
@@ -203,6 +204,7 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->pool = p;
+ (*pollset)->flags = flags;
FD_ZERO(&((*pollset)->readset));
FD_ZERO(&((*pollset)->writeset));
FD_ZERO(&((*pollset)->exceptset));
@@ -406,6 +408,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
return APR_SUCCESS;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+#if APR_HAS_THREADS
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return APR_ENOTIMPL;
+ else
+ return APR_EINIT;
+#else
+ /* In case APR was compiled without thread support
+ * makes no sense to have wakeup operation usable
+ * only in multithreading environment.
+ */
+ return APR_ENOTIMPL;
+#endif
+}
+
APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,
apr_uint32_t size,
apr_pool_t *p,