summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CHANGES4
-rw-r--r--CMakeLists.txt2
-rw-r--r--modules/http2/NWGNUmod_http21
-rw-r--r--modules/http2/config2.m41
-rw-r--r--modules/http2/h2_conn.c11
-rw-r--r--modules/http2/h2_mplx.c52
-rw-r--r--modules/http2/h2_mplx.h6
-rw-r--r--modules/http2/h2_task.c1
-rw-r--r--modules/http2/h2_util.c290
-rw-r--r--modules/http2/h2_util.h51
-rw-r--r--modules/http2/h2_version.h4
-rw-r--r--modules/http2/h2_worker.c103
-rw-r--r--modules/http2/h2_worker.h135
-rw-r--r--modules/http2/h2_workers.c464
-rw-r--r--modules/http2/h2_workers.h51
15 files changed, 605 insertions, 571 deletions
diff --git a/CHANGES b/CHANGES
index 1a5e67658a..f01124afd8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
-*- coding: utf-8 -*-
Changes with Apache 2.5.0
+ *) mod_http2: h2 workers with improved scalability for better scheduling
+ performance. There are H2MaxWorkers threads created at start and the
+ number is kept constant. [Stefan Eissing]
+
*) mod_http2: obsoleted option H2SessionExtraFiles, will be ignored and
just log a warning. [Stefan Eissing]
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 37982f1bb6..7e30909cf1 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -439,7 +439,7 @@ SET(mod_http2_extra_sources
modules/http2/h2_session.c modules/http2/h2_stream.c
modules/http2/h2_switch.c modules/http2/h2_ngn_shed.c
modules/http2/h2_task.c modules/http2/h2_util.c
- modules/http2/h2_worker.c modules/http2/h2_workers.c
+ modules/http2/h2_workers.c
)
SET(mod_ldap_extra_defines LDAP_DECLARE_EXPORT)
SET(mod_ldap_extra_libs wldap32)
diff --git a/modules/http2/NWGNUmod_http2 b/modules/http2/NWGNUmod_http2
index 74a7a97874..f6d4a38561 100644
--- a/modules/http2/NWGNUmod_http2
+++ b/modules/http2/NWGNUmod_http2
@@ -204,7 +204,6 @@ FILES_nlm_objs = \
$(OBJDIR)/h2_switch.o \
$(OBJDIR)/h2_task.o \
$(OBJDIR)/h2_util.o \
- $(OBJDIR)/h2_worker.o \
$(OBJDIR)/h2_workers.o \
$(OBJDIR)/mod_http2.o \
$(EOLIST)
diff --git a/modules/http2/config2.m4 b/modules/http2/config2.m4
index a33b0df3f1..26f71632a0 100644
--- a/modules/http2/config2.m4
+++ b/modules/http2/config2.m4
@@ -39,7 +39,6 @@ h2_stream.lo dnl
h2_switch.lo dnl
h2_task.lo dnl
h2_util.lo dnl
-h2_worker.lo dnl
h2_workers.lo dnl
"
diff --git a/modules/http2/h2_conn.c b/modules/http2/h2_conn.c
index 4576a2c548..7cc39ecd93 100644
--- a/modules/http2/h2_conn.c
+++ b/modules/http2/h2_conn.c
@@ -38,7 +38,6 @@
#include "h2_stream.h"
#include "h2_h2.h"
#include "h2_task.h"
-#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_conn.h"
#include "h2_version.h"
@@ -129,13 +128,11 @@ apr_status_t h2_conn_child_init(apr_pool_t *pool, server_rec *s)
maxw = minw;
}
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
- "h2_workers: min=%d max=%d, mthrpchild=%d",
- minw, maxw, max_threads_per_child);
- workers = h2_workers_create(s, pool, minw, maxw);
-
idle_secs = h2_config_geti(config, H2_CONF_MAX_WORKER_IDLE_SECS);
- h2_workers_set_max_idle_secs(workers, idle_secs);
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
+ "h2_workers: min=%d max=%d, mthrpchild=%d, idle_secs=%d",
+ minw, maxw, max_threads_per_child, idle_secs);
+ workers = h2_workers_create(s, pool, minw, maxw, idle_secs);
ap_register_input_filter("H2_IN", h2_filter_core_input,
NULL, AP_FTYPE_CONNECTION);
diff --git a/modules/http2/h2_mplx.c b/modules/http2/h2_mplx.c
index d0a6940372..14f42d4d28 100644
--- a/modules/http2/h2_mplx.c
+++ b/modules/http2/h2_mplx.c
@@ -44,7 +44,6 @@
#include "h2_stream.h"
#include "h2_session.h"
#include "h2_task.h"
-#include "h2_worker.h"
#include "h2_workers.h"
#include "h2_util.h"
@@ -203,7 +202,6 @@ h2_mplx *h2_mplx_create(conn_rec *c, apr_pool_t *parent,
m = apr_pcalloc(parent, sizeof(h2_mplx));
if (m) {
m->id = c->id;
- APR_RING_ELEM_INIT(m, link);
m->c = c;
/* We create a pool with its own allocator to be used for
@@ -445,14 +443,14 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
int acquired;
/* How to shut down a h2 connection:
- * 0. tell the workers that no more tasks will come from us */
+ * 0. abort and tell the workers that no more tasks will come from us */
+ m->aborted = 1;
h2_workers_unregister(m->workers, m);
enter_mutex(m, &acquired);
/* How to shut down a h2 connection:
- * 1. set aborted flag and cancel all streams still active */
- m->aborted = 1;
+ * 1. cancel all streams still active */
while (!h2_ihash_iter(m->streams, stream_cancel_iter, m)) {
/* until empty */
}
@@ -496,6 +494,9 @@ void h2_mplx_release_and_join(h2_mplx *m, apr_thread_cond_t *wait)
leave_mutex(m, acquired);
+ /* 5. unregister again, now that our workers are done */
+ h2_workers_unregister(m->workers, m);
+
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): released", m->id);
}
@@ -679,7 +680,6 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
h2_stream_pri_cmp *cmp, void *ctx)
{
apr_status_t status;
- int do_registration = 0;
int acquired;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
@@ -695,23 +695,18 @@ apr_status_t h2_mplx_process(h2_mplx *m, struct h2_stream *stream,
H2_STRM_MSG(stream, "process, add to readyq"));
}
else {
- if (!m->need_registration) {
- m->need_registration = h2_iq_empty(m->q);
- }
- if (m->workers_busy < m->workers_max) {
- do_registration = m->need_registration;
- }
h2_iq_add(m->q, stream->id, cmp, ctx);
+ if (!m->is_registered) {
+ if (h2_workers_register(m->workers, m) == APR_SUCCESS) {
+ m->is_registered = 1;
+ }
+ }
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, status, m->c,
- H2_STRM_MSG(stream, "process, add to q"));
+ H2_STRM_MSG(stream, "process, added to q"));
}
}
leave_mutex(m, acquired);
}
- if (do_registration) {
- m->need_registration = 0;
- h2_workers_register(m->workers, m);
- }
return status;
}
@@ -771,17 +766,16 @@ h2_task *h2_mplx_pop_task(h2_mplx *m, int *has_more)
apr_status_t status;
int acquired;
+ *has_more = 0;
if ((status = enter_mutex(m, &acquired)) == APR_SUCCESS) {
- if (m->aborted) {
- *has_more = 0;
- }
- else {
+ if (!m->aborted) {
task = next_stream_task(m);
- *has_more = !h2_iq_empty(m->q);
- }
-
- if (has_more && !task) {
- m->need_registration = 1;
+ if (task != NULL && !h2_iq_empty(m->q)) {
+ *has_more = 1;
+ }
+ else {
+ m->is_registered = 0; /* h2_workers will discard this mplx */
+ }
}
leave_mutex(m, acquired);
}
@@ -845,7 +839,6 @@ static void task_done(h2_mplx *m, h2_task *task, h2_req_engine *ngn)
m->workers_limit = H2MIN(m->workers_limit * 2,
m->workers_max);
m->last_limit_change = task->done_at;
- m->need_registration = 1;
ap_log_cerror(APLOG_MARK, APLOG_TRACE1, 0, m->c,
"h2_mplx(%ld): increase worker limit to %d",
m->id, m->workers_limit);
@@ -911,6 +904,11 @@ void h2_mplx_task_done(h2_mplx *m, h2_task *task, h2_task **ptask)
/* caller wants another task */
*ptask = next_stream_task(m);
}
+ if (!m->is_registered && !h2_iq_empty(m->q)) {
+ if (h2_workers_register(m->workers, m) == APR_SUCCESS) {
+ m->is_registered = 1;
+ }
+ }
leave_mutex(m, acquired);
}
}
diff --git a/modules/http2/h2_mplx.h b/modules/http2/h2_mplx.h
index db40037aee..e4437d846e 100644
--- a/modules/http2/h2_mplx.h
+++ b/modules/http2/h2_mplx.h
@@ -64,11 +64,9 @@ struct h2_mplx {
conn_rec *c;
apr_pool_t *pool;
- APR_RING_ENTRY(h2_mplx) link;
-
unsigned int event_pending;
- unsigned int aborted : 1;
- unsigned int need_registration : 1;
+ unsigned int aborted;
+ unsigned int is_registered; /* is registered at h2_workers */
struct h2_ihash_t *streams; /* all streams currently processing */
struct h2_ihash_t *sredo; /* all streams that need to be re-started */
diff --git a/modules/http2/h2_task.c b/modules/http2/h2_task.c
index 3c2810a294..8e668b8c4f 100644
--- a/modules/http2/h2_task.c
+++ b/modules/http2/h2_task.c
@@ -45,7 +45,6 @@
#include "h2_session.h"
#include "h2_stream.h"
#include "h2_task.h"
-#include "h2_worker.h"
#include "h2_util.h"
static void H2_TASK_OUT_LOG(int lvl, h2_task *task, apr_bucket_brigade *bb,
diff --git a/modules/http2/h2_util.c b/modules/http2/h2_util.c
index 171c195ca1..0bd74dc28e 100644
--- a/modules/http2/h2_util.c
+++ b/modules/http2/h2_util.c
@@ -15,6 +15,8 @@
#include <assert.h>
#include <apr_strings.h>
+#include <apr_thread_mutex.h>
+#include <apr_thread_cond.h>
#include <httpd.h>
#include <http_core.h>
@@ -604,6 +606,294 @@ int h2_iq_contains(h2_iqueue *q, int sid)
}
/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+struct h2_fifo {
+ void **elems;
+ int nelems;
+ int head;
+ int count;
+ int aborted;
+ apr_thread_mutex_t *lock;
+ apr_thread_cond_t *not_empty;
+ apr_thread_cond_t *not_full;
+};
+
+static int nth_index(h2_fifo *fifo, int n)
+{
+ return (fifo->head + n) % fifo->nelems;
+}
+
+static apr_status_t fifo_destroy(void *data)
+{
+ h2_fifo *fifo = data;
+
+ apr_thread_cond_destroy(fifo->not_empty);
+ apr_thread_cond_destroy(fifo->not_full);
+ apr_thread_mutex_destroy(fifo->lock);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity)
+{
+ apr_status_t rv;
+ h2_fifo *fifo;
+
+ fifo = apr_pcalloc(pool, sizeof(*fifo));
+ if (fifo == NULL) {
+ return APR_ENOMEM;
+ }
+
+ rv = apr_thread_mutex_create(&fifo->lock,
+ APR_THREAD_MUTEX_UNNESTED, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_empty, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ rv = apr_thread_cond_create(&fifo->not_full, pool);
+ if (rv != APR_SUCCESS) {
+ return rv;
+ }
+
+ fifo->elems = apr_pcalloc(pool, capacity * sizeof(void*));
+ if (fifo->elems == NULL) {
+ return APR_ENOMEM;
+ }
+ fifo->nelems = capacity;
+
+ *pfifo = fifo;
+ apr_pool_cleanup_register(pool, fifo, fifo_destroy, apr_pool_cleanup_null);
+
+ return APR_SUCCESS;
+}
+
+apr_status_t h2_fifo_term(h2_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ fifo->aborted = 1;
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_interrupt(h2_fifo *fifo)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ apr_thread_cond_broadcast(fifo->not_full);
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+int h2_fifo_count(h2_fifo *fifo)
+{
+ return fifo->count;
+}
+
+static apr_status_t check_not_empty(h2_fifo *fifo, int block)
+{
+ if (fifo->count == 0) {
+ if (!block) {
+ return APR_EAGAIN;
+ }
+ while (fifo->count == 0) {
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_empty, fifo->lock);
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static apr_status_t fifo_push(h2_fifo *fifo, void *elem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if (fifo->count == fifo->nelems) {
+ if (block) {
+ while (fifo->count == fifo->nelems) {
+ if (fifo->aborted) {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EOF;
+ }
+ apr_thread_cond_wait(fifo->not_full, fifo->lock);
+ }
+ }
+ else {
+ apr_thread_mutex_unlock(fifo->lock);
+ return APR_EAGAIN;
+ }
+ }
+
+ ap_assert(fifo->count < fifo->nelems);
+ fifo->elems[nth_index(fifo, fifo->count)] = elem;
+ ++fifo->count;
+ if (fifo->count == 1) {
+ apr_thread_cond_broadcast(fifo->not_empty);
+ }
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 1);
+}
+
+apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem)
+{
+ return fifo_push(fifo, elem, 0);
+}
+
+static apr_status_t fifo_pull(h2_fifo *fifo, void **pelem, int block)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(fifo->lock);
+ *pelem = NULL;
+ return rv;
+ }
+
+ ap_assert(fifo->count > 0);
+ *pelem = fifo->elems[fifo->head];
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 1);
+}
+
+apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem)
+{
+ return fifo_pull(fifo, pelem, 0);
+}
+
+static apr_status_t fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx, int block)
+{
+ apr_status_t rv;
+ void *elem;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ if ((rv = check_not_empty(fifo, block)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(fifo->lock);
+ return rv;
+ }
+
+ ap_assert(fifo->count > 0);
+ elem = fifo->elems[fifo->head];
+
+ switch (fn(elem, ctx)) {
+ case H2_FIFO_OP_PULL:
+ --fifo->count;
+ if (fifo->count > 0) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count+1 == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ }
+ break;
+ case H2_FIFO_OP_REPUSH:
+ if (fifo->count > 1) {
+ fifo->head = nth_index(fifo, 1);
+ if (fifo->count < fifo->nelems) {
+ fifo->elems[nth_index(fifo, fifo->count-1)] = elem;
+ }
+ }
+ break;
+ }
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx)
+{
+ return fifo_peek(fifo, fn, ctx, 0);
+}
+
+apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx)
+{
+ return fifo_peek(fifo, fn, ctx, 0);
+}
+
+apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem)
+{
+ apr_status_t rv;
+
+ if (fifo->aborted) {
+ return APR_EOF;
+ }
+
+ if ((rv = apr_thread_mutex_lock(fifo->lock)) == APR_SUCCESS) {
+ int i, rc;
+ void *e;
+
+ rc = 0;
+ for (i = 0; i < fifo->count; ++i) {
+ e = fifo->elems[nth_index(fifo, i)];
+ if (e == elem) {
+ ++rc;
+ }
+ else if (rc) {
+ fifo->elems[nth_index(fifo, i-rc)] = e;
+ }
+ }
+ if (rc) {
+ fifo->count -= rc;
+ if (fifo->count + rc == fifo->nelems) {
+ apr_thread_cond_broadcast(fifo->not_full);
+ }
+ rv = APR_SUCCESS;
+ }
+ else {
+ rv = APR_EAGAIN;
+ }
+
+ apr_thread_mutex_unlock(fifo->lock);
+ }
+ return rv;
+}
+
+
+/*******************************************************************************
* h2_util for apt_table_t
******************************************************************************/
diff --git a/modules/http2/h2_util.h b/modules/http2/h2_util.h
index 7b92553445..503a8329bf 100644
--- a/modules/http2/h2_util.h
+++ b/modules/http2/h2_util.h
@@ -184,6 +184,57 @@ size_t h2_iq_mshift(h2_iqueue *q, int *pint, size_t max);
int h2_iq_contains(h2_iqueue *q, int sid);
/*******************************************************************************
+ * FIFO queue
+ ******************************************************************************/
+
+/**
+ * A thread-safe FIFO queue with some extra bells and whistles, if you
+ * do not need anything special, better use 'apr_queue'.
+ */
+typedef struct h2_fifo h2_fifo;
+
+apr_status_t h2_fifo_create(h2_fifo **pfifo, apr_pool_t *pool, int capacity);
+apr_status_t h2_fifo_term(h2_fifo *fifo);
+apr_status_t h2_fifo_interrupt(h2_fifo *fifo);
+
+int h2_fifo_count(h2_fifo *fifo);
+
+apr_status_t h2_fifo_push(h2_fifo *fifo, void *elem);
+apr_status_t h2_fifo_try_push(h2_fifo *fifo, void *elem);
+
+apr_status_t h2_fifo_pull(h2_fifo *fifo, void **pelem);
+apr_status_t h2_fifo_try_pull(h2_fifo *fifo, void **pelem);
+
+typedef enum {
+ H2_FIFO_OP_PULL, /* pull the element from the queue, ie discard it */
+ H2_FIFO_OP_REPUSH, /* pull and immediatley re-push it */
+} h2_fifo_op_t;
+
+typedef h2_fifo_op_t h2_fifo_peek_fn(void *head, void *ctx);
+
+/**
+ * Call given function on the head of the queue, once it exists, and
+ * perform the returned operation on it. The queue will hold its lock during
+ * this time, so no other operations on the queue are possible.
+ * @param fifo the queue to peek at
+ * @param fn the function to call on the head, once available
+ * @param ctx context to pass in call to function
+ */
+apr_status_t h2_fifo_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx);
+
+/**
+ * Non-blocking version of h2_fifo_peek.
+ */
+apr_status_t h2_fifo_try_peek(h2_fifo *fifo, h2_fifo_peek_fn *fn, void *ctx);
+
+/**
+ * Remove the elem from the queue, will remove multiple appearances.
+ * @param elem the element to remove
+ * @return APR_SUCCESS iff > 0 elems were removed, APR_EAGAIN otherwise.
+ */
+apr_status_t h2_fifo_remove(h2_fifo *fifo, void *elem);
+
+/*******************************************************************************
* common helpers
******************************************************************************/
/* h2_log2(n) iff n is a power of 2 */
diff --git a/modules/http2/h2_version.h b/modules/http2/h2_version.h
index fddfbccbac..fc38d8c1c0 100644
--- a/modules/http2/h2_version.h
+++ b/modules/http2/h2_version.h
@@ -26,7 +26,7 @@
* @macro
* Version number of the http2 module as c string
*/
-#define MOD_HTTP2_VERSION "1.9.4-DEV"
+#define MOD_HTTP2_VERSION "1.10.0-DEV"
/**
* @macro
@@ -34,7 +34,7 @@
* release. This is a 24 bit number with 8 bits for major number, 8 bits
* for minor and 8 bits for patch. Version 1.2.3 becomes 0x010203.
*/
-#define MOD_HTTP2_VERSION_NUM 0x010904
+#define MOD_HTTP2_VERSION_NUM 0x010a00
#endif /* mod_h2_h2_version_h */
diff --git a/modules/http2/h2_worker.c b/modules/http2/h2_worker.c
deleted file mode 100644
index 84e8f989eb..0000000000
--- a/modules/http2/h2_worker.c
+++ /dev/null
@@ -1,103 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <assert.h>
-
-#include <apr_thread_cond.h>
-
-#include <mpm_common.h>
-#include <httpd.h>
-#include <http_core.h>
-#include <http_log.h>
-
-#include "h2.h"
-#include "h2_private.h"
-#include "h2_conn.h"
-#include "h2_ctx.h"
-#include "h2_h2.h"
-#include "h2_mplx.h"
-#include "h2_task.h"
-#include "h2_worker.h"
-
-static void* APR_THREAD_FUNC execute(apr_thread_t *thread, void *wctx)
-{
- h2_worker *worker = (h2_worker *)wctx;
- int sticky;
-
- while (!worker->aborted) {
- h2_task *task;
-
- /* Get a h2_task from the main workers queue. */
- worker->get_next(worker, worker->ctx, &task, &sticky);
- while (task) {
-
- h2_task_do(task, thread, worker->id);
- /* report the task done and maybe get another one from the same
- * mplx (= master connection), if we can be sticky.
- */
- if (sticky && !worker->aborted) {
- h2_mplx_task_done(task->mplx, task, &task);
- }
- else {
- h2_mplx_task_done(task->mplx, task, NULL);
- task = NULL;
- }
- }
- }
-
- worker->worker_done(worker, worker->ctx);
- return NULL;
-}
-
-h2_worker *h2_worker_create(int id,
- apr_pool_t *pool,
- apr_threadattr_t *attr,
- h2_worker_mplx_next_fn *get_next,
- h2_worker_done_fn *worker_done,
- void *ctx)
-{
- h2_worker *w = apr_pcalloc(pool, sizeof(h2_worker));
- if (w) {
- w->id = id;
- APR_RING_ELEM_INIT(w, link);
- w->get_next = get_next;
- w->worker_done = worker_done;
- w->ctx = ctx;
- apr_thread_create(&w->thread, attr, execute, w, pool);
- }
- return w;
-}
-
-apr_status_t h2_worker_destroy(h2_worker *worker)
-{
- if (worker->thread) {
- apr_status_t status;
- apr_thread_join(&status, worker->thread);
- worker->thread = NULL;
- }
- return APR_SUCCESS;
-}
-
-void h2_worker_abort(h2_worker *worker)
-{
- worker->aborted = 1;
-}
-
-int h2_worker_is_aborted(h2_worker *worker)
-{
- return worker->aborted;
-}
-
-
diff --git a/modules/http2/h2_worker.h b/modules/http2/h2_worker.h
deleted file mode 100644
index 04ff570361..0000000000
--- a/modules/http2/h2_worker.h
+++ /dev/null
@@ -1,135 +0,0 @@
-/* Copyright 2015 greenbytes GmbH (https://www.greenbytes.de)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#ifndef __mod_h2__h2_worker__
-#define __mod_h2__h2_worker__
-
-struct h2_mplx;
-struct h2_request;
-struct h2_task;
-
-/* h2_worker is a basically a apr_thread_t that reads fromt he h2_workers
- * task queue and runs h2_tasks it is given.
- */
-typedef struct h2_worker h2_worker;
-
-/* Invoked when the worker wants a new task to process. Will block
- * until a h2_mplx becomes available or the worker itself
- * gets aborted (idle timeout, for example). */
-typedef apr_status_t h2_worker_mplx_next_fn(h2_worker *worker,
- void *ctx,
- struct h2_task **ptask,
- int *psticky);
-
-/* Invoked just before the worker thread exits. */
-typedef void h2_worker_done_fn(h2_worker *worker, void *ctx);
-
-
-struct h2_worker {
- int id;
- /** Links to the rest of the workers */
- APR_RING_ENTRY(h2_worker) link;
- apr_thread_t *thread;
- h2_worker_mplx_next_fn *get_next;
- h2_worker_done_fn *worker_done;
- void *ctx;
- int aborted;
-};
-
-/**
- * The magic pointer value that indicates the head of a h2_worker list
- * @param b The worker list
- * @return The magic pointer value
- */
-#define H2_WORKER_LIST_SENTINEL(b) APR_RING_SENTINEL((b), h2_worker, link)
-
-/**
- * Determine if the worker list is empty
- * @param b The list to check
- * @return true or false
- */
-#define H2_WORKER_LIST_EMPTY(b) APR_RING_EMPTY((b), h2_worker, link)
-
-/**
- * Return the first worker in a list
- * @param b The list to query
- * @return The first worker in the list
- */
-#define H2_WORKER_LIST_FIRST(b) APR_RING_FIRST(b)
-
-/**
- * Return the last worker in a list
- * @param b The list to query
- * @return The last worker int he list
- */
-#define H2_WORKER_LIST_LAST(b) APR_RING_LAST(b)
-
-/**
- * Insert a single worker at the front of a list
- * @param b The list to add to
- * @param e The worker to insert
- */
-#define H2_WORKER_LIST_INSERT_HEAD(b, e) do { \
- h2_worker *ap__b = (e); \
- APR_RING_INSERT_HEAD((b), ap__b, h2_worker, link); \
- } while (0)
-
-/**
- * Insert a single worker at the end of a list
- * @param b The list to add to
- * @param e The worker to insert
- */
-#define H2_WORKER_LIST_INSERT_TAIL(b, e) do { \
- h2_worker *ap__b = (e); \
- APR_RING_INSERT_TAIL((b), ap__b, h2_worker, link); \
- } while (0)
-
-/**
- * Get the next worker in the list
- * @param e The current worker
- * @return The next worker
- */
-#define H2_WORKER_NEXT(e) APR_RING_NEXT((e), link)
-/**
- * Get the previous worker in the list
- * @param e The current worker
- * @return The previous worker
- */
-#define H2_WORKER_PREV(e) APR_RING_PREV((e), link)
-
-/**
- * Remove a worker from its list
- * @param e The worker to remove
- */
-#define H2_WORKER_REMOVE(e) APR_RING_REMOVE((e), link)
-
-
-/* Create a new worker with given id, pool and attributes, callbacks
- * callback parameter.
- */
-h2_worker *h2_worker_create(int id,
- apr_pool_t *pool,
- apr_threadattr_t *attr,
- h2_worker_mplx_next_fn *get_next,
- h2_worker_done_fn *worker_done,
- void *ctx);
-
-apr_status_t h2_worker_destroy(h2_worker *worker);
-
-void h2_worker_abort(h2_worker *worker);
-
-int h2_worker_is_aborted(h2_worker *worker);
-
-#endif /* defined(__mod_h2__h2_worker__) */
diff --git a/modules/http2/h2_workers.c b/modules/http2/h2_workers.c
index e0f4308816..02e7d76336 100644
--- a/modules/http2/h2_workers.c
+++ b/modules/http2/h2_workers.c
@@ -27,242 +27,232 @@
#include "h2_private.h"
#include "h2_mplx.h"
#include "h2_task.h"
-#include "h2_worker.h"
#include "h2_workers.h"
+#include "h2_util.h"
+typedef struct h2_slot h2_slot;
+struct h2_slot {
+ int id;
+ h2_slot *next;
+ h2_workers *workers;
+ int aborted;
+ int sticks;
+ h2_task *task;
+ apr_thread_t *thread;
+ apr_thread_cond_t *not_idle;
+};
-static int in_list(h2_workers *workers, h2_mplx *m)
+static h2_slot *pop_slot(h2_slot **phead)
{
- h2_mplx *e;
- for (e = H2_MPLX_LIST_FIRST(&workers->mplxs);
- e != H2_MPLX_LIST_SENTINEL(&workers->mplxs);
- e = H2_MPLX_NEXT(e)) {
- if (e == m) {
- return 1;
+ /* Atomically pop a slot from the list */
+ for (;;) {
+ h2_slot *first = *phead;
+ if (first == NULL) {
+ return NULL;
+ }
+ if (apr_atomic_casptr((void*)phead, first->next, first) == first) {
+ first->next = NULL;
+ return first;
}
}
- return 0;
}
-static void cleanup_zombies(h2_workers *workers, int lock)
+static void push_slot(h2_slot **phead, h2_slot *slot)
{
- if (lock) {
- apr_thread_mutex_lock(workers->lock);
- }
- while (!H2_WORKER_LIST_EMPTY(&workers->zombies)) {
- h2_worker *zombie = H2_WORKER_LIST_FIRST(&workers->zombies);
- H2_WORKER_REMOVE(zombie);
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: cleanup zombie %d", zombie->id);
- h2_worker_destroy(zombie);
+ /* Atomically push a slot to the list */
+ ap_assert(!slot->next);
+ for (;;) {
+ h2_slot *next = slot->next = *phead;
+ if (apr_atomic_casptr((void*)phead, slot, next) == next) {
+ return;
+ }
}
- if (lock) {
+}
+
+static void wake_idle_worker(h2_workers *workers)
+{
+ h2_slot *slot = pop_slot(&workers->idle);
+ if (slot) {
+ apr_thread_mutex_lock(workers->lock);
+ apr_thread_cond_signal(slot->not_idle);
apr_thread_mutex_unlock(workers->lock);
}
}
-static h2_task *next_task(h2_workers *workers)
+static void cleanup_zombies(h2_workers *workers)
{
- h2_task *task = NULL;
- h2_mplx *last = NULL;
- int has_more;
-
- /* Get the next h2_mplx to process that has a task to hand out.
- * If it does, place it at the end of the queu and return the
- * task to the worker.
- * If it (currently) has no tasks, remove it so that it needs
- * to register again for scheduling.
- * If we run out of h2_mplx in the queue, we need to wait for
- * new mplx to arrive. Depending on how many workers do exist,
- * we do a timed wait or block indefinitely.
- */
- while (!task && !H2_MPLX_LIST_EMPTY(&workers->mplxs)) {
- h2_mplx *m = H2_MPLX_LIST_FIRST(&workers->mplxs);
-
- if (last == m) {
- break;
- }
- H2_MPLX_REMOVE(m);
- --workers->mplx_count;
-
- task = h2_mplx_pop_task(m, &has_more);
- if (has_more) {
- H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
- ++workers->mplx_count;
- if (!last) {
- last = m;
- }
+ h2_slot *slot;
+ while ((slot = pop_slot(&workers->zombies))) {
+ if (slot->thread) {
+ apr_status_t status;
+ apr_thread_join(&status, slot->thread);
+ slot->thread = NULL;
}
+ --workers->worker_count;
+ push_slot(&workers->free, slot);
}
- return task;
+}
+
+static apr_status_t slot_pull_task(h2_slot *slot, h2_mplx *m)
+{
+ int has_more;
+ slot->task = h2_mplx_pop_task(m, &has_more);
+ if (slot->task) {
+ /* Ok, we got something to give back to the worker for execution.
+ * If we still have idle workers, we let the worker be sticky,
+ * e.g. making it poll the task's h2_mplx instance for more work
+ * before asking back here. */
+ slot->sticks = slot->workers->max_workers;
+ return has_more? APR_EAGAIN : APR_SUCCESS;
+ }
+ slot->sticks = 0;
+ return APR_EOF;
+}
+
+static h2_fifo_op_t mplx_peek(void *head, void *ctx)
+{
+ h2_mplx *m = head;
+ h2_slot *slot = ctx;
+
+ if (slot_pull_task(slot, m) == APR_EAGAIN) {
+ wake_idle_worker(slot->workers);
+ return H2_FIFO_OP_REPUSH;
+ }
+ return H2_FIFO_OP_PULL;
}
/**
* Get the next task for the given worker. Will block until a task arrives
* or the max_wait timer expires and more than min workers exist.
*/
-static apr_status_t get_mplx_next(h2_worker *worker, void *ctx,
- h2_task **ptask, int *psticky)
+static apr_status_t get_next(h2_slot *slot)
{
+ h2_workers *workers = slot->workers;
apr_status_t status;
- apr_time_t wait_until = 0, now;
- h2_workers *workers = ctx;
- h2_task *task = NULL;
-
- *ptask = NULL;
- *psticky = 0;
- status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ++workers->idle_workers;
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): looking for work", worker->id);
-
- while (!h2_worker_is_aborted(worker) && !workers->aborted
- && !(task = next_task(workers))) {
-
- /* Need to wait for a new tasks to arrive. If we are above
- * minimum workers, we do a timed wait. When timeout occurs
- * and we have still more workers, we shut down one after
- * the other. */
- cleanup_zombies(workers, 0);
- if (workers->worker_count > workers->min_workers) {
- now = apr_time_now();
- if (now >= wait_until) {
- wait_until = now + apr_time_from_sec(workers->max_idle_secs);
- }
-
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): waiting signal, "
- "workers=%d, idle=%d", worker->id,
- (int)workers->worker_count,
- workers->idle_workers);
- status = apr_thread_cond_timedwait(workers->mplx_added,
- workers->lock,
- wait_until - now);
- if (status == APR_TIMEUP
- && workers->worker_count > workers->min_workers) {
- /* waited long enough without getting a task and
- * we are above min workers, abort this one. */
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0,
- workers->s,
- "h2_workers: aborting idle worker");
- h2_worker_abort(worker);
- break;
- }
- }
- else {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): waiting signal (eternal), "
- "worker_count=%d, idle=%d", worker->id,
- (int)workers->worker_count,
- workers->idle_workers);
- apr_thread_cond_wait(workers->mplx_added, workers->lock);
- }
+ slot->task = NULL;
+ while (!slot->aborted) {
+ if (!slot->task) {
+ status = h2_fifo_try_peek(workers->mplxs, mplx_peek, slot);
}
- /* Here, we either have gotten task or decided to shut down
- * the calling worker.
- */
- if (task) {
- /* Ok, we got something to give back to the worker for execution.
- * If we have more idle workers than h2_mplx in our queue, then
- * we let the worker be sticky, e.g. making it poll the task's
- * h2_mplx instance for more work before asking back here.
- * This avoids entering our global lock as long as enough idle
- * workers remain. Stickiness of a worker ends when the connection
- * has no new tasks to process, so the worker will get back here
- * eventually.
- */
- *ptask = task;
- *psticky = (workers->max_workers >= workers->mplx_count);
-
- if (workers->mplx_count && workers->idle_workers > 1) {
- apr_thread_cond_signal(workers->mplx_added);
- }
+ if (slot->task) {
+ return APR_SUCCESS;
}
- --workers->idle_workers;
+ apr_thread_mutex_lock(workers->lock);
+ ++workers->idle_workers;
+ cleanup_zombies(workers);
+ if (slot->next == NULL) {
+ push_slot(&workers->idle, slot);
+ }
+ apr_thread_cond_wait(slot->not_idle, workers->lock);
apr_thread_mutex_unlock(workers->lock);
}
-
- return *ptask? APR_SUCCESS : APR_EOF;
+ return APR_EOF;
}
-static void worker_done(h2_worker *worker, void *ctx)
+static void slot_done(h2_slot *slot)
{
- h2_workers *workers = ctx;
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_worker(%d): done", worker->id);
- H2_WORKER_REMOVE(worker);
- --workers->worker_count;
- H2_WORKER_LIST_INSERT_TAIL(&workers->zombies, worker);
-
- apr_thread_mutex_unlock(workers->lock);
- }
+ push_slot(&(slot->workers->zombies), slot);
}
-static apr_status_t add_worker(h2_workers *workers)
+
+static void* APR_THREAD_FUNC slot_run(apr_thread_t *thread, void *wctx)
{
- h2_worker *w = h2_worker_create(workers->next_worker_id++,
- workers->pool, workers->thread_attr,
- get_mplx_next, worker_done, workers);
- if (!w) {
- return APR_ENOMEM;
+ h2_slot *slot = wctx;
+
+ while (!slot->aborted) {
+
+ /* Get a h2_task from the mplxs queue. */
+ get_next(slot);
+ while (slot->task) {
+
+ h2_task_do(slot->task, thread, slot->id);
+
+ /* Report the task as done. If stickyness is left, offer the
+ * mplx the opportunity to give us back a new task right away.
+ */
+ if (!slot->aborted && (--slot->sticks > 0)) {
+ h2_mplx_task_done(slot->task->mplx, slot->task, &slot->task);
+ }
+ else {
+ h2_mplx_task_done(slot->task->mplx, slot->task, NULL);
+ slot->task = NULL;
+ }
+ }
}
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: adding worker(%d)", w->id);
- ++workers->worker_count;
- H2_WORKER_LIST_INSERT_TAIL(&workers->workers, w);
- return APR_SUCCESS;
+
+ slot_done(slot);
+ return NULL;
}
-static apr_status_t h2_workers_start(h2_workers *workers)
+static apr_status_t activate_slot(h2_workers *workers)
{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: starting");
-
- while (workers->worker_count < workers->min_workers
- && status == APR_SUCCESS) {
- status = add_worker(workers);
+ h2_slot *slot = pop_slot(&workers->free);
+ if (slot) {
+ apr_status_t status;
+
+ slot->workers = workers;
+ slot->aborted = 0;
+ slot->task = NULL;
+ if (!slot->not_idle) {
+ status = apr_thread_cond_create(&slot->not_idle, workers->pool);
+ if (status != APR_SUCCESS) {
+ push_slot(&workers->free, slot);
+ return status;
+ }
}
- apr_thread_mutex_unlock(workers->lock);
+
+ apr_thread_create(&slot->thread, workers->thread_attr, slot_run, slot,
+ workers->pool);
+ if (!slot->thread) {
+ push_slot(&workers->free, slot);
+ return APR_ENOMEM;
+ }
+
+ ++workers->worker_count;
+ return APR_SUCCESS;
}
- return status;
+ return APR_EAGAIN;
}
static apr_status_t workers_pool_cleanup(void *data)
{
h2_workers *workers = data;
- h2_worker *w;
+ h2_slot *slot;
if (!workers->aborted) {
+ apr_thread_mutex_lock(workers->lock);
workers->aborted = 1;
-
/* before we go, cleanup any zombies and abort the rest */
- cleanup_zombies(workers, 1);
- w = H2_WORKER_LIST_FIRST(&workers->workers);
- while (w != H2_WORKER_LIST_SENTINEL(&workers->workers)) {
- h2_worker_abort(w);
- w = H2_WORKER_NEXT(w);
+ cleanup_zombies(workers);
+ for (;;) {
+ slot = pop_slot(&workers->idle);
+ if (slot) {
+ slot->aborted = 1;
+ apr_thread_cond_signal(slot->not_idle);
+ }
+ else {
+ break;
+ }
}
- apr_thread_mutex_lock(workers->lock);
- apr_thread_cond_broadcast(workers->mplx_added);
apr_thread_mutex_unlock(workers->lock);
+
+ h2_fifo_term(workers->mplxs);
+ h2_fifo_interrupt(workers->mplxs);
}
return APR_SUCCESS;
}
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
- int min_workers, int max_workers)
+ int min_workers, int max_workers,
+ int idle_secs)
{
apr_status_t status;
h2_workers *workers;
apr_pool_t *pool;
+ int i;
ap_assert(s);
ap_assert(server_pool);
@@ -275,98 +265,74 @@ h2_workers *h2_workers_create(server_rec *s, apr_pool_t *server_pool,
apr_pool_create(&pool, server_pool);
apr_pool_tag(pool, "h2_workers");
workers = apr_pcalloc(pool, sizeof(h2_workers));
- if (workers) {
- workers->s = s;
- workers->pool = pool;
- workers->min_workers = min_workers;
- workers->max_workers = max_workers;
- workers->max_idle_secs = 10;
-
- apr_threadattr_create(&workers->thread_attr, workers->pool);
- if (ap_thread_stacksize != 0) {
- apr_threadattr_stacksize_set(workers->thread_attr,
- ap_thread_stacksize);
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
- "h2_workers: using stacksize=%ld",
- (long)ap_thread_stacksize);
- }
-
- APR_RING_INIT(&workers->workers, h2_worker, link);
- APR_RING_INIT(&workers->zombies, h2_worker, link);
- APR_RING_INIT(&workers->mplxs, h2_mplx, link);
-
- status = apr_thread_mutex_create(&workers->lock,
- APR_THREAD_MUTEX_DEFAULT,
- workers->pool);
- if (status == APR_SUCCESS) {
- status = apr_thread_cond_create(&workers->mplx_added, workers->pool);
- }
- if (status == APR_SUCCESS) {
- status = h2_workers_start(workers);
- }
- if (status == APR_SUCCESS) {
- apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
- return workers;
- }
+ if (!workers) {
+ return NULL;
}
- return NULL;
-}
+
+ workers->s = s;
+ workers->pool = pool;
+ workers->min_workers = min_workers;
+ workers->max_workers = max_workers;
+ workers->max_idle_secs = (idle_secs > 0)? idle_secs : 10;
-apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
-{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
+ status = h2_fifo_create(&workers->mplxs, pool, workers->max_workers);
if (status != APR_SUCCESS) {
- return status;
+ return NULL;
}
- ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
- "h2_workers: register mplx(%ld), idle=%d",
- m->id, workers->idle_workers);
- if (in_list(workers, m)) {
- status = APR_EAGAIN;
+ status = apr_threadattr_create(&workers->thread_attr, workers->pool);
+ if (status != APR_SUCCESS) {
+ return NULL;
}
- else {
- H2_MPLX_LIST_INSERT_TAIL(&workers->mplxs, m);
- ++workers->mplx_count;
- status = APR_SUCCESS;
+
+ if (ap_thread_stacksize != 0) {
+ apr_threadattr_stacksize_set(workers->thread_attr,
+ ap_thread_stacksize);
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, s,
+ "h2_workers: using stacksize=%ld",
+ (long)ap_thread_stacksize);
}
- if (workers->idle_workers > 0) {
- apr_thread_cond_signal(workers->mplx_added);
+ status = apr_thread_mutex_create(&workers->lock,
+ APR_THREAD_MUTEX_DEFAULT,
+ workers->pool);
+ if (status == APR_SUCCESS) {
+ int n = workers->nslots = workers->max_workers;
+ workers->slots = apr_pcalloc(workers->pool, n * sizeof(h2_slot));
+ if (workers->slots == NULL) {
+ status = APR_ENOMEM;
+ }
}
- else if (status == APR_SUCCESS
- && workers->worker_count < workers->max_workers) {
- ap_log_error(APLOG_MARK, APLOG_TRACE3, 0, workers->s,
- "h2_workers: got %d worker, adding 1",
- workers->worker_count);
- add_worker(workers);
+ if (status == APR_SUCCESS) {
+ workers->free = &workers->slots[0];
+ for (i = 0; i < workers->nslots-1; ++i) {
+ workers->slots[i].next = &workers->slots[i+1];
+ workers->slots[i].id = i;
+ }
+ while (workers->worker_count < workers->max_workers
+ && status == APR_SUCCESS) {
+ status = activate_slot(workers);
+ }
}
- apr_thread_mutex_unlock(workers->lock);
- return status;
+ if (status == APR_SUCCESS) {
+ apr_pool_pre_cleanup_register(pool, workers, workers_pool_cleanup);
+ return workers;
+ }
+ return NULL;
}
-apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
+apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m)
{
- apr_status_t status = apr_thread_mutex_lock(workers->lock);
- if (status == APR_SUCCESS) {
- status = APR_EAGAIN;
- if (in_list(workers, m)) {
- H2_MPLX_REMOVE(m);
- status = APR_SUCCESS;
- }
- apr_thread_mutex_unlock(workers->lock);
- }
+ apr_status_t status;
+ if ((status = h2_fifo_try_push(workers->mplxs, m)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_TRACE3, status, workers->s,
+ "h2_workers: unable to push mplx(%ld)", m->id);
+ }
+ wake_idle_worker(workers);
return status;
}
-void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs)
+apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m)
{
- if (idle_secs <= 0) {
- ap_log_error(APLOG_MARK, APLOG_WARNING, 0, workers->s,
- APLOGNO(02962) "h2_workers: max_worker_idle_sec value of %d"
- " is not valid, ignored.", idle_secs);
- return;
- }
- workers->max_idle_secs = idle_secs;
+ return h2_fifo_remove(workers->mplxs, m);
}
-
diff --git a/modules/http2/h2_workers.h b/modules/http2/h2_workers.h
index dee733baeb..3035ab3a81 100644
--- a/modules/http2/h2_workers.h
+++ b/modules/http2/h2_workers.h
@@ -27,6 +27,9 @@ struct apr_thread_cond_t;
struct h2_mplx;
struct h2_request;
struct h2_task;
+struct h2_fifo;
+
+struct h2_slot;
typedef struct h2_workers h2_workers;
@@ -44,14 +47,16 @@ struct h2_workers {
unsigned int aborted : 1;
apr_threadattr_t *thread_attr;
+ int nslots;
+ struct h2_slot *slots;
+
+ struct h2_slot *free;
+ struct h2_slot *idle;
+ struct h2_slot *zombies;
- APR_RING_HEAD(h2_worker_list, h2_worker) workers;
- APR_RING_HEAD(h2_worker_zombies, h2_worker) zombies;
- APR_RING_HEAD(h2_mplx_list, h2_mplx) mplxs;
- int mplx_count;
+ struct h2_fifo *mplxs;
struct apr_thread_mutex_t *lock;
- struct apr_thread_cond_t *mplx_added;
};
@@ -59,7 +64,7 @@ struct h2_workers {
* threads.
*/
h2_workers *h2_workers_create(server_rec *s, apr_pool_t *pool,
- int min_size, int max_size);
+ int min_size, int max_size, int idle_secs);
/**
* Registers a h2_mplx for task scheduling. If this h2_mplx runs
@@ -73,38 +78,4 @@ apr_status_t h2_workers_register(h2_workers *workers, struct h2_mplx *m);
*/
apr_status_t h2_workers_unregister(h2_workers *workers, struct h2_mplx *m);
-/**
- * Set the amount of seconds a h2_worker should wait for new tasks
- * before shutting down (if there are more than the minimum number of
- * workers).
- */
-void h2_workers_set_max_idle_secs(h2_workers *workers, int idle_secs);
-
-/**
- * Reservation of file handles available for transfer between workers
- * and master connections.
- *
- * When handling output from request processing, file handles are often
- * encountered when static files are served. The most efficient way is then
- * to forward the handle itself to the master connection where it can be
- * read or sendfile'd to the client. But file handles are a scarce resource,
- * so there needs to be a limit on how many handles are transferred this way.
- *
- * h2_workers keeps track of the number of reserved handles and observes a
- * configurable maximum value.
- *
- * @param workers the workers instance
- * @param count how many handles the caller wishes to reserve
- * @return the number of reserved handles, may be 0.
- */
-apr_size_t h2_workers_tx_reserve(h2_workers *workers, apr_size_t count);
-
-/**
- * Return a number of reserved file handles back to the pool. The number
- * overall may not exceed the numbers reserved.
- * @param workers the workers instance
- * @param count how many handles are returned to the pool
- */
-void h2_workers_tx_free(h2_workers *workers, apr_size_t count);
-
#endif /* defined(__mod_h2__h2_workers__) */