summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES4
-rw-r--r--include/apr_queue.h35
-rw-r--r--test/testqueue.c50
-rw-r--r--util-misc/apr_queue.c135
4 files changed, 144 insertions, 80 deletions
diff --git a/CHANGES b/CHANGES
index 4f37655c6..c778fac67 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
-*- coding: utf-8 -*-
Changes for APR 2.0.0
+ *) apr_queue: Add apr_queue_timedpush() and apr_queue_timedpop() to
+ support timedout operations. PR 56951. [Travis Cross
+ <tc+asf travislists com>, Yann Ylavic].
+
*) apr_pollset: On z/OS, threadsafe apr_pollset_poll() may return
"EDC8102I Operation would block" under load.
[Pat Odonnell <patod us.ibm.com>]
diff --git a/include/apr_queue.h b/include/apr_queue.h
index 1d09e9de3..e345da5d4 100644
--- a/include/apr_queue.h
+++ b/include/apr_queue.h
@@ -28,6 +28,7 @@
#include "apu.h"
#include "apr_errno.h"
#include "apr_pools.h"
+#include "apr_time.h"
#if APR_HAS_THREADS
@@ -91,7 +92,7 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data);
APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data);
/**
- * pop/get an object to the queue, returning immediately if the queue is empty
+ * pop/get an object from the queue, returning immediately if the queue is empty
*
* @param queue the queue
* @param data the data
@@ -103,6 +104,38 @@ APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data);
APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data);
/**
+ * push/add an object to the queue, waiting a maximum of timeout microseconds
+ * before returning if the queue is empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout the timeout
+ * @returns APR_EINTR the blocking operation was interrupted (try again)
+ * @returns APR_EAGAIN the queue is empty and timeout is 0
+ * @returns APR_TIMEUP the queue is empty and the timeout expired
+ * @returns APR_EOF the queue has been terminated
+ * @returns APR_SUCCESS on a successful pop
+ */
+APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout);
+
+/**
+ * pop/get an object from the queue, waiting a maximum of timeout microseconds
+ * before returning if the queue is empty
+ *
+ * @param queue the queue
+ * @param data the data
+ * @param timeout the timeout
+ * @returns APR_EINTR the blocking operation was interrupted (try again)
+ * @returns APR_EAGAIN the queue is empty and timeout is 0
+ * @returns APR_TIMEUP the queue is empty and the timeout expired
+ * @returns APR_EOF the queue has been terminated
+ * @returns APR_SUCCESS on a successful pop
+ */
+APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout);
+
+/**
* returns the size of the queue.
*
* @warning this is not threadsafe, and is intended for reporting/monitoring
diff --git a/test/testqueue.c b/test/testqueue.c
index 8f71775fd..264e4d3a1 100644
--- a/test/testqueue.c
+++ b/test/testqueue.c
@@ -121,6 +121,55 @@ static void test_queue_producer_consumer(abts_case *tc, void *data)
ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
}
+static void test_queue_timeout(abts_case *tc, void *data)
+{
+ apr_queue_t *q;
+ apr_status_t rv;
+ apr_time_t start;
+ unsigned int i;
+ void *value;
+
+ rv = apr_queue_create(&q, 5, p);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+
+ for (i = 0; i < 2; ++i) {
+ rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1));
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+ for (i = 0; i < 3; ++i) {
+ rv = apr_queue_trypush(q, NULL);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+
+ start = apr_time_now();
+ rv = apr_queue_timedpush(q, NULL, apr_time_from_msec(1));
+ ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv));
+ ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1));
+
+ rv = apr_queue_trypush(q, NULL);
+ ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv));
+
+ for (i = 0; i < 2; ++i) {
+ rv = apr_queue_timedpop(q, &value, apr_time_from_msec(1));
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+ for (i = 0; i < 3; ++i) {
+ rv = apr_queue_trypop(q, &value);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+ }
+
+ start = apr_time_now();
+ rv = apr_queue_timedpop(q, &value, apr_time_from_sec(1));
+ ABTS_TRUE(tc, APR_STATUS_IS_TIMEUP(rv));
+ ABTS_TRUE(tc, apr_time_now() - start >= apr_time_from_msec(1));
+
+ rv = apr_queue_trypop(q, &value);
+ ABTS_TRUE(tc, APR_STATUS_IS_EAGAIN(rv));
+
+ rv = apr_queue_term(q);
+ ABTS_INT_EQUAL(tc, APR_SUCCESS, rv);
+}
+
#endif /* APR_HAS_THREADS */
abts_suite *testqueue(abts_suite *suite)
@@ -129,6 +178,7 @@ abts_suite *testqueue(abts_suite *suite)
#if APR_HAS_THREADS
abts_run_test(suite, test_queue_producer_consumer, NULL);
+ abts_run_test(suite, test_queue_timeout, NULL);
#endif /* APR_HAS_THREADS */
return suite;
diff --git a/util-misc/apr_queue.c b/util-misc/apr_queue.c
index 097249329..dbc879b35 100644
--- a/util-misc/apr_queue.c
+++ b/util-misc/apr_queue.c
@@ -145,7 +145,8 @@ APR_DECLARE(apr_status_t) apr_queue_create(apr_queue_t **q,
* the push operation has completed, it signals other threads waiting
* in apr_queue_pop() that they may continue consuming sockets.
*/
-APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+static apr_status_t queue_push(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout)
{
apr_status_t rv;
@@ -159,9 +160,21 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
}
if (apr_queue_full(queue)) {
+ if (!timeout) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return APR_EAGAIN;
+ }
if (!queue->terminated) {
queue->full_waiters++;
- rv = apr_thread_cond_wait(queue->not_full, queue->one_big_mutex);
+ if (timeout > 0) {
+ rv = apr_thread_cond_timedwait(queue->not_full,
+ queue->one_big_mutex,
+ timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(queue->not_full,
+ queue->one_big_mutex);
+ }
queue->full_waiters--;
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -203,6 +216,11 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
return rv;
}
+APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
+{
+ return queue_push(queue, data, -1);
+}
+
/**
* Push new data onto the queue. If the queue is full, return APR_EAGAIN. If
* the push operation completes successfully, it signals other threads
@@ -210,42 +228,13 @@ APR_DECLARE(apr_status_t) apr_queue_push(apr_queue_t *queue, void *data)
*/
APR_DECLARE(apr_status_t) apr_queue_trypush(apr_queue_t *queue, void *data)
{
- apr_status_t rv;
-
- if (queue->terminated) {
- return APR_EOF; /* no more elements ever again */
- }
-
- rv = apr_thread_mutex_lock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- if (apr_queue_full(queue)) {
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
- return APR_EAGAIN;
- }
-
- queue->data[queue->in] = data;
- queue->in++;
- if (queue->in >= queue->bounds)
- queue->in -= queue->bounds;
- queue->nelts++;
-
- if (queue->empty_waiters) {
- Q_DBG("sig !empty", queue);
- rv = apr_thread_cond_signal(queue->not_empty);
- if (rv != APR_SUCCESS) {
- apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
- }
- }
+ return queue_push(queue, data, 0);
+}
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpush(apr_queue_t *queue, void *data,
+ apr_interval_time_t timeout)
+{
+ return queue_push(queue, data, timeout);
}
/**
@@ -257,11 +246,13 @@ APR_DECLARE(unsigned int) apr_queue_size(apr_queue_t *queue) {
/**
* Retrieves the next item from the queue. If there are no
- * items available, it will block until one becomes available.
- * Once retrieved, the item is placed into the address specified by
- * 'data'.
+ * items available, it will either return APR_EAGAIN (timeout = 0),
+ * or block until one becomes available (infinitely with timeout < 0,
+ * otherwise until the given timeout expires). Once retrieved, the
+ * item is placed into the address specified by 'data'.
*/
-APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
+static apr_status_t queue_pop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout)
{
apr_status_t rv;
@@ -276,9 +267,21 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
/* Keep waiting until we wake up and find that the queue is not empty. */
if (apr_queue_empty(queue)) {
+ if (!timeout) {
+ apr_thread_mutex_unlock(queue->one_big_mutex);
+ return APR_EAGAIN;
+ }
if (!queue->terminated) {
queue->empty_waiters++;
- rv = apr_thread_cond_wait(queue->not_empty, queue->one_big_mutex);
+ if (timeout > 0) {
+ rv = apr_thread_cond_timedwait(queue->not_empty,
+ queue->one_big_mutex,
+ timeout);
+ }
+ else {
+ rv = apr_thread_cond_wait(queue->not_empty,
+ queue->one_big_mutex);
+ }
queue->empty_waiters--;
if (rv != APR_SUCCESS) {
apr_thread_mutex_unlock(queue->one_big_mutex);
@@ -320,46 +323,20 @@ APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
return rv;
}
-/**
- * Retrieves the next item from the queue. If there are no
- * items available, return APR_EAGAIN. Once retrieved,
- * the item is placed into the address specified by 'data'.
- */
-APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+APR_DECLARE(apr_status_t) apr_queue_pop(apr_queue_t *queue, void **data)
{
- apr_status_t rv;
-
- if (queue->terminated) {
- return APR_EOF; /* no more elements ever again */
- }
-
- rv = apr_thread_mutex_lock(queue->one_big_mutex);
- if (rv != APR_SUCCESS) {
- return rv;
- }
-
- if (apr_queue_empty(queue)) {
- (void)apr_thread_mutex_unlock(queue->one_big_mutex);
- return APR_EAGAIN;
- }
-
- *data = queue->data[queue->out];
- queue->nelts--;
+ return queue_pop(queue, data, -1);
+}
- queue->out++;
- if (queue->out >= queue->bounds)
- queue->out -= queue->bounds;
- if (queue->full_waiters) {
- Q_DBG("signal !full", queue);
- rv = apr_thread_cond_signal(queue->not_full);
- if (rv != APR_SUCCESS) {
- apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
- }
- }
+APR_DECLARE(apr_status_t) apr_queue_trypop(apr_queue_t *queue, void **data)
+{
+ return queue_pop(queue, data, 0);
+}
- rv = apr_thread_mutex_unlock(queue->one_big_mutex);
- return rv;
+APR_DECLARE(apr_status_t) apr_queue_timedpop(apr_queue_t *queue, void **data,
+ apr_interval_time_t timeout)
+{
+ return queue_pop(queue, data, timeout);
}
APR_DECLARE(apr_status_t) apr_queue_interrupt_all(apr_queue_t *queue)