diff options
-rw-r--r-- | CHANGES | 4 | ||||
-rw-r--r-- | include/apr_queue.h | 35 | ||||
-rw-r--r-- | test/testqueue.c | 50 | ||||
-rw-r--r-- | util-misc/apr_queue.c | 135 |
4 files changed, 144 insertions, 80 deletions
@@ -1,6 +1,10 @@ -*- coding: utf-8 -*- Changes for APR 2.0.0 + *) apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to + support timedout operations. PR 56951. [Travis Cross + <tc+asf travislists com>, Yann Ylavic]. + *) apr_pollset: On z/OS, threadsafe apr_pollset_poll() may return "EDC8102I Operation would block" under load. [Pat Odonnell <patod us.ibm.com>] diff --git a/include/apr_queue.h b/include/apr_queue.h index 1d09e9de3..e345da5d4 100644 --- a/include/apr_queue.h +++ b/include/apr_queue.h @@ -28,6 +28,7 @@ #include "apu.h" #include "apr_errno.h" #include "apr_pools.h" +#include "apr_time.h" #if APR_HAS_THREADS @@ -91,7 +92,7 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data); APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data); /** - * pop/get an object to the queue, returning immediately if the queue is empty + * pop/get an object from the queue, returning immediately if the queue is empty * * @param queue the queue * @param data the data @@ -103,6 +104,38 @@ APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data); APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data); /** + * push/add an object to the queue, waiting a maximum of timeout microseconds + * before returning if the queue is empty + * + * @param queue the queue + * @param data the data + * @param timeout the timeout + * @returns APR_EINTR the blocking operation was interrupted (try again) + * @returns APR_EAGAIN the queue is empty and timeout is 0 + * @returns APR_TIMEUP the queue is empty and the timeout expired + * @returns APR_EOF the queue has been terminated + * @returns APR_SUCCESS on a successful pop + */ +APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data, + apr_interval_time_t timeout); + +/** + * pop/get an object from the queue, waiting a maximum of timeout microseconds + * before returning if the queue is empty + * + * @param queue the queue + * @param data the data + * @param timeout the timeout + * @returns APR_EINTR the blocking operation was interrupted (try again) + * @returns APR_EAGAIN the queue is empty and timeout is 0 + * @returns APR_TIMEUP the queue is empty and the timeout expired + * @returns APR_EOF the queue has been terminated + * @returns APR_SUCCESS on a successful pop + */ +APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data, + apr_interval_time_t timeout); + +/** * returns the size of the queue. * * @warning this is not threadsafe, and is intended for reporting/monitoring diff --git a/test/testqueue.c b/test/testqueue.c index 8f71775fd..264e4d3a1 100644 --- a/test/testqueue.c +++ b/test/testqueue.c @@ -121,6 +121,55 @@ static void test_queue_producer_consumer(abts_case *tc, void *data) ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); } +static void test_queue_timeout(abts_case *tc, void *data) +{ + apr_queue_t *q; + apr_status_t rv; + apr_time_t start; + unsigned int i; + void *value; + + rv = apr_queue_create(&q, 5, p); + ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); + + for (i = 0; i < 2; ++i) { + rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1)); + ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); + } + for (i = 0; i < 3; ++i) { + rv = apr_queue_trypush(q, NULL); + ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); + } + + start = apr_time_now(); + rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1)); + ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv)); + ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1)); + + rv = apr_queue_trypush(q, NULL); + ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv)); + + for (i = 0; i < 2; ++i) { + rv = apr_queue_timedpop(q, &value, apr_time_from_msec(1)); + ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); + } + for (i = 0; i < 3; ++i) { + rv = apr_queue_trypop(q, &value); + ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); + } + + start = apr_time_now(); + rv = apr_queue_timedpop(q, &value, apr_time_from_sec(1)); + ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv)); + ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1)); + + rv = apr_queue_trypop(q, &value); + ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv)); + + rv = apr_queue_term(q); + ABTS_INT_EQUAL(tc, APR_SUCCESS, rv); +} + #endif /* APR_HAS_THREADS */ abts_suite *testqueue(abts_suite *suite) @@ -129,6 +178,7 @@ abts_suite *testqueue(abts_suite *suite) #if APR_HAS_THREADS abts_run_test(suite, test_queue_producer_consumer, NULL); + abts_run_test(suite, test_queue_timeout, NULL); #endif /* APR_HAS_THREADS */ return suite; diff --git a/util-misc/apr_queue.c b/util-misc/apr_queue.c index 097249329..dbc879b35 100644 --- a/util-misc/apr_queue.c +++ b/util-misc/apr_queue.c @@ -145,7 +145,8 @@ APR_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q, * the push operation has completed, it signals other threads waiting * in apr_queue_pop() that they may continue consuming sockets. */ -APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) +static apr_status_t queue_push(apr_queue_t *queue, void *data, + apr_interval_time_t timeout) { apr_status_t rv; @@ -159,9 +160,21 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) } if (apr_queue_full(queue)) { + if (!timeout) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return APR_EAGAIN; + } if (!queue->terminated) { queue->full_waiters++; - rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex); + if (timeout > 0) { + rv = apr_thread_cond_timedwait(queue->not_full, + queue->one_big_mutex, + timeout); + } + else { + rv = apr_thread_cond_wait(queue->not_full, + queue->one_big_mutex); + } queue->full_waiters--; if (rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); @@ -203,6 +216,11 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) return rv; } +APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) +{ + return queue_push(queue, data, -1); +} + /** * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If * the push operation completes successfully, it signals other threads @@ -210,42 +228,13 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) */ APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data) { - apr_status_t rv; - - if (queue->terminated) { - return APR_EOF; /* no more elements ever again */ - } - - rv = apr_thread_mutex_lock(queue->one_big_mutex); - if (rv != APR_SUCCESS) { - return rv; - } - - if (apr_queue_full(queue)) { - rv = apr_thread_mutex_unlock(queue->one_big_mutex); - if (rv != APR_SUCCESS) { - return rv; - } - return APR_EAGAIN; - } - - queue->data[queue->in] = data; - queue->in++; - if (queue->in >= queue->bounds) - queue->in -= queue->bounds; - queue->nelts++; - - if (queue->empty_waiters) { - Q_DBG("sig !empty", queue); - rv = apr_thread_cond_signal(queue->not_empty); - if (rv != APR_SUCCESS) { - apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; - } - } + return queue_push(queue, data, 0); +} - rv = apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; +APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data, + apr_interval_time_t timeout) +{ + return queue_push(queue, data, timeout); } /** @@ -257,11 +246,13 @@ APR_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) { /** * Retrieves the next item from the queue. If there are no - * items available, it will block until one becomes available. - * Once retrieved, the item is placed into the address specified by - * 'data'. + * items available, it will either return APR_EAGAIN (timeout = 0), + * or block until one becomes available (infinitely with timeout < 0, + * otherwise until the given timeout expires). Once retrieved, the + * item is placed into the address specified by 'data'. */ -APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) +static apr_status_t queue_pop(apr_queue_t *queue, void **data, + apr_interval_time_t timeout) { apr_status_t rv; @@ -276,9 +267,21 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) /* Keep waiting until we wake up and find that the queue is not empty. */ if (apr_queue_empty(queue)) { + if (!timeout) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return APR_EAGAIN; + } if (!queue->terminated) { queue->empty_waiters++; - rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); + if (timeout > 0) { + rv = apr_thread_cond_timedwait(queue->not_empty, + queue->one_big_mutex, + timeout); + } + else { + rv = apr_thread_cond_wait(queue->not_empty, + queue->one_big_mutex); + } queue->empty_waiters--; if (rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); @@ -320,46 +323,20 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) return rv; } -/** - * Retrieves the next item from the queue. If there are no - * items available, return APR_EAGAIN. Once retrieved, - * the item is placed into the address specified by 'data'. - */ -APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data) +APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) { - apr_status_t rv; - - if (queue->terminated) { - return APR_EOF; /* no more elements ever again */ - } - - rv = apr_thread_mutex_lock(queue->one_big_mutex); - if (rv != APR_SUCCESS) { - return rv; - } - - if (apr_queue_empty(queue)) { - (void)apr_thread_mutex_unlock(queue->one_big_mutex); - return APR_EAGAIN; - } - - *data = queue->data[queue->out]; - queue->nelts--; + return queue_pop(queue, data, -1); +} - queue->out++; - if (queue->out >= queue->bounds) - queue->out -= queue->bounds; - if (queue->full_waiters) { - Q_DBG("signal !full", queue); - rv = apr_thread_cond_signal(queue->not_full); - if (rv != APR_SUCCESS) { - apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; - } - } +APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data) +{ + return queue_pop(queue, data, 0); +} - rv = apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; +APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data, + apr_interval_time_t timeout) +{ + return queue_pop(queue, data, timeout); } APR_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue) |