diff options
author | Mladen Turk <mturk@apache.org> | 2008-04-13 08:31:03 +0000 |
---|---|---|
committer | Mladen Turk <mturk@apache.org> | 2008-04-13 08:31:03 +0000 |
commit | fc18a5fd06cc94fdf12e6136d67480e500c166fa (patch) | |
tree | 4002c9a170be10f9dd02301511de408a02d26f27 /poll | |
parent | 8744772ba11042a1d24ac5d1899062d2c209bdbf (diff) | |
download | apr-fc18a5fd06cc94fdf12e6136d67480e500c166fa.tar.gz |
Introduce apr_pollset_wakeup()
git-svn-id: https://svn.apache.org/repos/asf/apr/apr/trunk@647540 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'poll')
-rw-r--r-- | poll/unix/epoll.c | 154 | ||||
-rw-r--r-- | poll/unix/kqueue.c | 15 | ||||
-rw-r--r-- | poll/unix/poll.c | 151 | ||||
-rw-r--r-- | poll/unix/port.c | 16 | ||||
-rw-r--r-- | poll/unix/select.c | 18 |
5 files changed, 325 insertions, 29 deletions
diff --git a/poll/unix/epoll.c b/poll/unix/epoll.c index 7a3831bee..5028e94d7 100644 --- a/poll/unix/epoll.c +++ b/poll/unix/epoll.c @@ -68,6 +68,8 @@ struct apr_pollset_t #if APR_HAS_THREADS /* A thread mutex to protect operations on the rings */ apr_thread_mutex_t *ring_lock; + /* Pipe descriptors used for wakeup */ + apr_file_t *wakeup_pipe[2]; #endif /* A ring containing all of the pollfd_t that are active */ APR_RING_HEAD(pfd_query_ring_t, pfd_elem_t) query_ring; @@ -80,11 +82,61 @@ struct apr_pollset_t static apr_status_t backend_cleanup(void *p_) { + apr_status_t rv = APR_SUCCESS; apr_pollset_t *pollset = (apr_pollset_t *) p_; + close(pollset->epoll_fd); - return APR_SUCCESS; +#if APR_HAS_THREADS + if (pollset->flags & APR_POLLSET_WAKEABLE) { + /* Close both sides of the wakeup pipe */ + rv |= apr_file_close(pollset->wakeup_pipe[0]); + rv |= apr_file_close(pollset->wakeup_pipe[1]); + } +#endif + return rv; } +#if APR_HAS_THREADS + +/* Create a dummy wakeup pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} + +/* Read and discard what's ever in the wakeup pipe. + */ +static void drain_wakeup_pipe(apr_pollset_t *pollset) +{ + char rb[512]; + apr_size_t nr = sizeof(rb); + + while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) { + /* Although we write just one byte to the other end of the pipe + * during wakeup, multiple treads could call the wakeup. + * So simply drain out from the input side of the pipe all + * the data. + */ + if (nr != sizeof(rb)) + break; + } +} + +#endif + APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, apr_pool_t *p, @@ -93,6 +145,13 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_status_t rv; int fd; +#if APR_HAS_THREADS + if (flags & APR_POLLSET_WAKEABLE) { + /* Add room for wakeup descriptor */ + size++; + } +#endif + fd = epoll_create(size); if (fd < 0) { *pollset = NULL; @@ -110,7 +169,8 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, return rv; } #else - if (flags & APR_POLLSET_THREADSAFE) { + if (flags & APR_POLLSET_THREADSAFE || + flags & APR_POLLSET_WAKEABLE) { *pollset = NULL; return APR_ENOTIMPL; } @@ -121,7 +181,6 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, (*pollset)->pool = p; (*pollset)->epoll_fd = fd; (*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event)); - apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup); (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); if (!(flags & APR_POLLSET_NOCOPY)) { @@ -129,6 +188,18 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link); APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link); } +#if APR_HAS_THREADS + if (flags & APR_POLLSET_WAKEABLE) { + /* Create wakeup pipe */ + if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) { + close(fd); + *pollset = NULL; + return rv; + } + } +#endif + apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup); + return APR_SUCCESS; } @@ -244,8 +315,9 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, apr_int32_t *num, const apr_pollfd_t **descriptors) { - int ret, i; + int ret, i, j; apr_status_t rv = APR_SUCCESS; + apr_pollfd_t fd; if (timeout > 0) { timeout /= 1000; @@ -263,23 +335,59 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, } else { if (pollset->flags & APR_POLLSET_NOCOPY) { - for (i = 0; i < ret; i++) { - pollset->result_set[i] = - *((apr_pollfd_t *) (pollset->pollset[i].data.ptr)); - pollset->result_set[i].rtnevents = - get_epoll_revent(pollset->pollset[i].events); + for (i = 0, j = 0; i < ret; i++) { + fd = *((apr_pollfd_t *) (pollset->pollset[i].data.ptr)); +#if APR_HAS_THREADS + /* Check if the polled descriptor is our + * wakeup pipe. In that case do not put it result set. + */ + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + fd.desc_type == APR_POLL_FILE && + fd.desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else +#endif + { + pollset->result_set[j] = fd; + pollset->result_set[j].rtnevents = + get_epoll_revent(pollset->pollset[i].events); + + j++; + } } + (*num) = j; } else { - for (i = 0; i < ret; i++) { - pollset->result_set[i] = - (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd); - pollset->result_set[i].rtnevents = - get_epoll_revent(pollset->pollset[i].events); + for (i = 0, j = 0; i < ret; i++) { + fd = (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd); +#if APR_HAS_THREADS + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + fd.desc_type == APR_POLL_FILE && + fd.desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else +#endif + { + pollset->result_set[j] = fd; + pollset->result_set[j].rtnevents = + get_epoll_revent(pollset->pollset[i].events); + j++; + } } + (*num) = j; } - if (descriptors) { + if (descriptors && (*num)) { *descriptors = pollset->result_set; } } @@ -296,6 +404,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, return rv; } +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ +#if APR_HAS_THREADS + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_file_putc(1, pollset->wakeup_pipe[1]); + else + return APR_EINIT; +#else + /* In case APR was compiled without thread support + * makes no sense to have wakeup operation usable + * only in multithreading environment. + */ + return APR_ENOTIMPL; +#endif +} + struct apr_pollcb_t { apr_pool_t *pool; apr_uint32_t nalloc; diff --git a/poll/unix/kqueue.c b/poll/unix/kqueue.c index 501953dc4..6547bf620 100644 --- a/poll/unix/kqueue.c +++ b/poll/unix/kqueue.c @@ -281,6 +281,21 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, return rv; } +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ +#if APR_HAS_THREADS + if (pollset->flags & APR_POLLSET_WAKEABLE) + return APR_ENOTIMPL; + else + return APR_EINIT; +#else + /* In case APR was compiled without thread support + * makes no sense to have wakeup operation usable + * only in multithreading environment. + */ + return APR_ENOTIMPL; +#endif +} struct apr_pollcb_t { apr_pool_t *pool; diff --git a/poll/unix/poll.c b/poll/unix/poll.c index cca8bfe8a..6f43c41ba 100644 --- a/poll/unix/poll.c +++ b/poll/unix/poll.c @@ -156,11 +156,70 @@ struct apr_pollset_t apr_pool_t *pool; apr_uint32_t nelts; apr_uint32_t nalloc; + apr_uint32_t flags; +#if APR_HAS_THREADS + /* Pipe descriptors used for wakeup */ + apr_file_t *wakeup_pipe[2]; +#endif struct pollfd *pollset; apr_pollfd_t *query_set; apr_pollfd_t *result_set; }; +#if APR_HAS_THREADS + +static apr_status_t wakeup_pipe_cleanup(void *p) +{ + apr_status_t rv = APR_SUCCESS; + apr_pollset_t *pollset = (apr_pollset_t *)p; + + if (pollset->flags & APR_POLLSET_WAKEABLE) { + /* Close both sides of the wakeup pipe */ + rv |= apr_file_close(pollset->wakeup_pipe[0]); + rv |= apr_file_close(pollset->wakeup_pipe[1]); + } + return rv; +} + +/* Create a dummy wakeup pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} + +/* Read and discard what's ever in the wakeup pipe. + */ +static void drain_wakeup_pipe(apr_pollset_t *pollset) +{ + char rb[512]; + apr_size_t nr = sizeof(rb); + + while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) { + /* Although we write just one byte to the other end of the pipe + * during wakeup, multiple treads could call the wakeup. + * So simply drain out from the input side of the pipe all + * the data. + */ + if (nr != sizeof(rb)) + break; + } +} + +#endif + APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, apr_pool_t *p, @@ -170,19 +229,46 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, *pollset = NULL; return APR_ENOTIMPL; } + if (flags & APR_POLLSET_WAKEABLE) { +#if APR_HAS_THREADS + /* Add room for wakeup descriptor */ + size++; +#else + *pollset = NULL; + return APR_ENOTIMPL; +#endif + } *pollset = apr_palloc(p, sizeof(**pollset)); (*pollset)->nelts = 0; (*pollset)->nalloc = size; (*pollset)->pool = p; + (*pollset)->flags = flags; (*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd)); (*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); +#if APR_HAS_THREADS + if (flags & APR_POLLSET_WAKEABLE) { + apr_status_t rv; + /* Create wakeup pipe */ + if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) { + *pollset = NULL; + return rv; + } + apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup, + wakeup_pipe_cleanup); + } +#endif + return APR_SUCCESS; } APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset) { +#if APR_HAS_THREADS + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_pool_cleanup_run(pollset->pool, pollset, backend_cleanup); +#endif return APR_SUCCESS; } @@ -242,32 +328,69 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, apr_int32_t *num, const apr_pollfd_t **descriptors) { - int rv; + int ret; + apr_status_t rv = APR_SUCCESS; apr_uint32_t i, j; if (timeout > 0) { timeout /= 1000; } - rv = poll(pollset->pollset, pollset->nelts, timeout); - (*num) = rv; - if (rv < 0) { + ret = poll(pollset->pollset, pollset->nelts, timeout); + (*num) = ret; + if (ret < 0) { return apr_get_netos_error(); } - if (rv == 0) { + else if (res == 0) { return APR_TIMEUP; } - j = 0; - for (i = 0; i < pollset->nelts; i++) { - if (pollset->pollset[i].revents != 0) { - pollset->result_set[j] = pollset->query_set[i]; - pollset->result_set[j].rtnevents = - get_revent(pollset->pollset[i].revents); - j++; + else { + for (i = 0, j = 0; i < pollset->nelts; i++) { + if (pollset->pollset[i].revents != 0) { +#if APR_HAS_THREADS + /* Check if the polled descriptor is our + * wakeup pipe. In that case do not put it result set. + */ + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + pollset->pollset[i].desc_type == APR_POLL_FILE && + pollset->pollset[i].desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else +#endif + { + pollset->result_set[j] = pollset->query_set[i]; + pollset->result_set[j].rtnevents = + get_revent(pollset->pollset[i].revents); + j++; + } + } } + (*num) = j; } - if (descriptors) + if (descriptors && (*num)) *descriptors = pollset->result_set; - return APR_SUCCESS; + return rv; +} + +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ +#if APR_HAS_THREADS + if (pollset->flags & APR_POLLSET_WAKEABLE) { + return apr_file_putc(1, pollset->wakeup_pipe[1]); + } + else + return APR_EINIT; +#else + /* In case APR was compiled without thread support + * makes no sense to have wakeup operation usable + * only in multithreading environment. + */ + return APR_ENOTIMPL; +#endif } APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb, diff --git a/poll/unix/port.c b/poll/unix/port.c index 05848d618..2842e2781 100644 --- a/poll/unix/port.c +++ b/poll/unix/port.c @@ -335,6 +335,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, return rv; } +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ +#if APR_HAS_THREADS + if (pollset->flags & APR_POLLSET_WAKEABLE) + return APR_ENOTIMPL; + else + return APR_EINIT; +#else + /* In case APR was compiled without thread support + * makes no sense to have wakeup operation usable + * only in multithreading environment. + */ + return APR_ENOTIMPL; +#endif +} + struct apr_pollcb_t { apr_pool_t *pool; apr_uint32_t nalloc; diff --git a/poll/unix/select.c b/poll/unix/select.c index 42e7a3f68..64b0f050a 100644 --- a/poll/unix/select.c +++ b/poll/unix/select.c @@ -175,6 +175,7 @@ struct apr_pollset_t apr_uint32_t nelts; apr_uint32_t nalloc; + apr_uint32_t flags; fd_set readset, writeset, exceptset; int maxfd; apr_pollfd_t *query_set; @@ -203,6 +204,7 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, (*pollset)->nelts = 0; (*pollset)->nalloc = size; (*pollset)->pool = p; + (*pollset)->flags = flags; FD_ZERO(&((*pollset)->readset)); FD_ZERO(&((*pollset)->writeset)); FD_ZERO(&((*pollset)->exceptset)); @@ -406,6 +408,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, return APR_SUCCESS; } +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ +#if APR_HAS_THREADS + if (pollset->flags & APR_POLLSET_WAKEABLE) + return APR_ENOTIMPL; + else + return APR_EINIT; +#else + /* In case APR was compiled without thread support + * makes no sense to have wakeup operation usable + * only in multithreading environment. + */ + return APR_ENOTIMPL; +#endif +} + APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb, apr_uint32_t size, apr_pool_t *p, |