diff options
author | Yann Ylavic <ylavic@apache.org> | 2015-03-16 17:18:11 +0000 |
---|---|---|
committer | Yann Ylavic <ylavic@apache.org> | 2015-03-16 17:18:11 +0000 |
commit | e192797fca9eb584c88906cce94ba83a8b9afefb (patch) | |
tree | ffab65917c9d8e6cbc8a996e7636c419305f627d /util-misc | |
parent | a474b2251e05e4d05bfb331ed53dc68441410865 (diff) | |
download | apr-e192797fca9eb584c88906cce94ba83a8b9afefb.tar.gz |
apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to
support timedout operations. PR 56951.
Signed-off-by: Anthony Minessale <anthm freeswitch.org>
Signed-off-by: Travis Cross <tc traviscross.com>
Reviewed/Modified: ylavic
git-svn-id: https://svn.apache.org/repos/asf/apr/apr/trunk@1667073 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'util-misc')
-rw-r--r-- | util-misc/apr_queue.c | 135 |
1 files changed, 56 insertions, 79 deletions
diff --git a/util-misc/apr_queue.c b/util-misc/apr_queue.c index 097249329..dbc879b35 100644 --- a/util-misc/apr_queue.c +++ b/util-misc/apr_queue.c @@ -145,7 +145,8 @@ APR_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q, * the push operation has completed, it signals other threads waiting * in apr_queue_pop() that they may continue consuming sockets. */ -APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) +static apr_status_t queue_push(apr_queue_t *queue, void *data, + apr_interval_time_t timeout) { apr_status_t rv; @@ -159,9 +160,21 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) } if (apr_queue_full(queue)) { + if (!timeout) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return APR_EAGAIN; + } if (!queue->terminated) { queue->full_waiters++; - rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex); + if (timeout > 0) { + rv = apr_thread_cond_timedwait(queue->not_full, + queue->one_big_mutex, + timeout); + } + else { + rv = apr_thread_cond_wait(queue->not_full, + queue->one_big_mutex); + } queue->full_waiters--; if (rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); @@ -203,6 +216,11 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) return rv; } +APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) +{ + return queue_push(queue, data, -1); +} + /** * Push new data onto the queue. If the queue is full, return APR_EAGAIN. If * the push operation completes successfully, it signals other threads @@ -210,42 +228,13 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data) */ APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data) { - apr_status_t rv; - - if (queue->terminated) { - return APR_EOF; /* no more elements ever again */ - } - - rv = apr_thread_mutex_lock(queue->one_big_mutex); - if (rv != APR_SUCCESS) { - return rv; - } - - if (apr_queue_full(queue)) { - rv = apr_thread_mutex_unlock(queue->one_big_mutex); - if (rv != APR_SUCCESS) { - return rv; - } - return APR_EAGAIN; - } - - queue->data[queue->in] = data; - queue->in++; - if (queue->in >= queue->bounds) - queue->in -= queue->bounds; - queue->nelts++; - - if (queue->empty_waiters) { - Q_DBG("sig !empty", queue); - rv = apr_thread_cond_signal(queue->not_empty); - if (rv != APR_SUCCESS) { - apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; - } - } + return queue_push(queue, data, 0); +} - rv = apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; +APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data, + apr_interval_time_t timeout) +{ + return queue_push(queue, data, timeout); } /** @@ -257,11 +246,13 @@ APR_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) { /** * Retrieves the next item from the queue. If there are no - * items available, it will block until one becomes available. - * Once retrieved, the item is placed into the address specified by - * 'data'. + * items available, it will either return APR_EAGAIN (timeout = 0), + * or block until one becomes available (infinitely with timeout < 0, + * otherwise until the given timeout expires). Once retrieved, the + * item is placed into the address specified by 'data'. */ -APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) +static apr_status_t queue_pop(apr_queue_t *queue, void **data, + apr_interval_time_t timeout) { apr_status_t rv; @@ -276,9 +267,21 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) /* Keep waiting until we wake up and find that the queue is not empty. */ if (apr_queue_empty(queue)) { + if (!timeout) { + apr_thread_mutex_unlock(queue->one_big_mutex); + return APR_EAGAIN; + } if (!queue->terminated) { queue->empty_waiters++; - rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex); + if (timeout > 0) { + rv = apr_thread_cond_timedwait(queue->not_empty, + queue->one_big_mutex, + timeout); + } + else { + rv = apr_thread_cond_wait(queue->not_empty, + queue->one_big_mutex); + } queue->empty_waiters--; if (rv != APR_SUCCESS) { apr_thread_mutex_unlock(queue->one_big_mutex); @@ -320,46 +323,20 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) return rv; } -/** - * Retrieves the next item from the queue. If there are no - * items available, return APR_EAGAIN. Once retrieved, - * the item is placed into the address specified by 'data'. - */ -APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data) +APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data) { - apr_status_t rv; - - if (queue->terminated) { - return APR_EOF; /* no more elements ever again */ - } - - rv = apr_thread_mutex_lock(queue->one_big_mutex); - if (rv != APR_SUCCESS) { - return rv; - } - - if (apr_queue_empty(queue)) { - (void)apr_thread_mutex_unlock(queue->one_big_mutex); - return APR_EAGAIN; - } - - *data = queue->data[queue->out]; - queue->nelts--; + return queue_pop(queue, data, -1); +} - queue->out++; - if (queue->out >= queue->bounds) - queue->out -= queue->bounds; - if (queue->full_waiters) { - Q_DBG("signal !full", queue); - rv = apr_thread_cond_signal(queue->not_full); - if (rv != APR_SUCCESS) { - apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; - } - } +APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data) +{ + return queue_pop(queue, data, 0); +} - rv = apr_thread_mutex_unlock(queue->one_big_mutex); - return rv; +APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data, + apr_interval_time_t timeout) +{ + return queue_pop(queue, data, timeout); } APR_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue) |