summaryrefslogtreecommitdiff
path: root/util-misc
diff options
context:
space:
mode:
authorYann Ylavic <ylavic@apache.org>2015-03-16 17:18:11 +0000
committerYann Ylavic <ylavic@apache.org>2015-03-16 17:18:11 +0000
commite192797fca9eb584c88906cce94ba83a8b9afefb (patch)
treeffab65917c9d8e6cbc8a996e7636c419305f627d /util-misc
parenta474b2251e05e4d05bfb331ed53dc68441410865 (diff)
downloadapr-e192797fca9eb584c88906cce94ba83a8b9afefb.tar.gz
apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to
support timedout operations. PR 56951. Signed-off-by: Anthony Minessale <anthm freeswitch.org> Signed-off-by: Travis Cross <tc traviscross.com> Reviewed/Modified: ylavic git-svn-id: https://svn.apache.org/repos/asf/apr/apr/trunk@1667073 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'util-misc')
-rw-r--r--util-misc/apr_queue.c135
1 files changed, 56 insertions, 79 deletions
diff --git a/util-misc/apr_queue.c b/util-misc/apr_queue.c
index 097249329..dbc879b35 100644
--- a/util-misc/apr_queue.c
+++ b/util-misc/apr_queue.c
@@ -145,7 +145,8 @@ APR_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q,
* the push operation has completed, it signals other threads waiting
* in apr_queue_pop() that they may continue consuming sockets.
*/
-APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+static apr_status_t queue_push(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout)
{
apr_status_t rv;
@@ -159,9 +160,21 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
}
if (apr_queue_full(queue)) {
+ if (!timeout) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return APR_EAGAIN;
+ }
if (!queue->terminated) {
queue->full_waiters++;
- rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
+ if (timeout > 0) {
+ rv = apr_thread_cond_timedwait(queue->not_full,
+ queue->one_big_mutex,
+ timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(queue->not_full,
+ queue->one_big_mutex);
+ }
queue->full_waiters--;
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -203,6 +216,11 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
return rv;
}
+APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+{
+ return queue_push(queue, data, -1);
+}
+
/**
* Push new data onto the queue. If the queue is full, return APR_EAGAIN. If
* the push operation completes successfully, it signals other threads
@@ -210,42 +228,13 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
*/
APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
{
- apr_status_t rv;
-
- if (queue->terminated) {
- return APR_EOF; /* no more elements ever again */
- }
-
- rv = apr_thread_mutex_lock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- if (apr_queue_full(queue)) {
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- return APR_EAGAIN;
- }
-
- queue->data[queue->in] = data;
- queue->in++;
- if (queue->in >= queue->bounds)
- queue->in -= queue->bounds;
- queue->nelts++;
-
- if (queue->empty_waiters) {
- Q_DBG("sig !empty", queue);
- rv = apr_thread_cond_signal(queue->not_empty);
- if (rv != APR_SUCCESS) {
- apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
- }
- }
+ return queue_push(queue, data, 0);
+}
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout)
+{
+ return queue_push(queue, data, timeout);
}
/**
@@ -257,11 +246,13 @@ APR_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
/**
* Retrieves the next item from the queue. If there are no
- * items available, it will block until one becomes available.
- * Once retrieved, the item is placed into the address specified by
- * 'data'.
+ * items available, it will either return APR_EAGAIN (timeout = 0),
+ * or block until one becomes available (infinitely with timeout < 0,
+ * otherwise until the given timeout expires). Once retrieved, the
+ * item is placed into the address specified by 'data'.
*/
-APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
+static apr_status_t queue_pop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout)
{
apr_status_t rv;
@@ -276,9 +267,21 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
/* Keep waiting until we wake up and find that the queue is not empty. */
if (apr_queue_empty(queue)) {
+ if (!timeout) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return APR_EAGAIN;
+ }
if (!queue->terminated) {
queue->empty_waiters++;
- rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
+ if (timeout > 0) {
+ rv = apr_thread_cond_timedwait(queue->not_empty,
+ queue->one_big_mutex,
+ timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(queue->not_empty,
+ queue->one_big_mutex);
+ }
queue->empty_waiters--;
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -320,46 +323,20 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
return rv;
}
-/**
- * Retrieves the next item from the queue. If there are no
- * items available, return APR_EAGAIN. Once retrieved,
- * the item is placed into the address specified by 'data'.
- */
-APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
{
- apr_status_t rv;
-
- if (queue->terminated) {
- return APR_EOF; /* no more elements ever again */
- }
-
- rv = apr_thread_mutex_lock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- if (apr_queue_empty(queue)) {
- (void)apr_thread_mutex_unlock(queue->one_big_mutex);
- return APR_EAGAIN;
- }
-
- *data = queue->data[queue->out];
- queue->nelts--;
+ return queue_pop(queue, data, -1);
+}
- queue->out++;
- if (queue->out >= queue->bounds)
- queue->out -= queue->bounds;
- if (queue->full_waiters) {
- Q_DBG("signal !full", queue);
- rv = apr_thread_cond_signal(queue->not_full);
- if (rv != APR_SUCCESS) {
- apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
- }
- }
+APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+{
+ return queue_pop(queue, data, 0);
+}
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout)
+{
+ return queue_pop(queue, data, timeout);
}
APR_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)